Add new storage functions for new replication

The new replication protocol will keep all the streams separate, rather
than muxing multiple streams into one.
This commit is contained in:
Erik Johnston 2017-03-27 14:03:38 +01:00
parent 30348c924c
commit 24d35ab47b
5 changed files with 137 additions and 4 deletions

View File

@ -293,6 +293,9 @@ class TypingHandler(object):
rows.sort() rows.sort()
return rows return rows
def get_current_token(self):
return self._latest_room_serial
class TypingNotificationEventSource(object): class TypingNotificationEventSource(object):
def __init__(self, hs): def __init__(self, hs):

View File

@ -504,7 +504,7 @@ class ReplicationResource(Resource):
if device_lists is not None and device_lists != current_position: if device_lists is not None and device_lists != current_position:
changes = yield self.store.get_all_device_list_changes_for_remotes( changes = yield self.store.get_all_device_list_changes_for_remotes(
device_lists, device_lists, current_position,
) )
writer.write_header_and_rows("device_lists", changes, ( writer.write_header_and_rows("device_lists", changes, (
"position", "user_id", "destination", "position", "user_id", "destination",

View File

@ -533,7 +533,7 @@ class DeviceStore(SQLBaseStore):
rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key) rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
defer.returnValue(set(row[0] for row in rows)) defer.returnValue(set(row[0] for row in rows))
def get_all_device_list_changes_for_remotes(self, from_key): def get_all_device_list_changes_for_remotes(self, from_key, to_key):
"""Return a list of `(stream_id, user_id, destination)` which is the """Return a list of `(stream_id, user_id, destination)` which is the
combined list of changes to devices, and which destinations need to be combined list of changes to devices, and which destinations need to be
poked. `destination` may be None if no destinations need to be poked. poked. `destination` may be None if no destinations need to be poked.
@ -541,11 +541,11 @@ class DeviceStore(SQLBaseStore):
sql = """ sql = """
SELECT stream_id, user_id, destination FROM device_lists_stream SELECT stream_id, user_id, destination FROM device_lists_stream
LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id)
WHERE stream_id > ? WHERE ? < stream_id AND stream_id <= ?
""" """
return self._execute( return self._execute(
"get_all_device_list_changes_for_remotes", None, "get_all_device_list_changes_for_remotes", None,
sql, from_key, sql, from_key, to_key
) )
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -1771,6 +1771,94 @@ class EventsStore(SQLBaseStore):
"""The current minimum token that backfilled events have reached""" """The current minimum token that backfilled events have reached"""
return -self._backfill_id_gen.get_current_token() return -self._backfill_id_gen.get_current_token()
def get_current_events_token(self):
"""The current maximum token that events have reached"""
return self._stream_id_gen.get_current_token()
def get_all_new_forward_event_rows(self, last_id, current_id, limit):
if last_id == current_id:
return defer.succeed([])
def get_all_new_forward_event_rows(txn):
sql = (
"SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" WHERE ? < stream_ordering AND stream_ordering <= ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
new_event_updates = txn.fetchall()
if len(new_event_updates) == limit:
upper_bound = new_event_updates[-1][0]
else:
upper_bound = current_id
sql = (
"SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts"
" FROM events AS e"
" INNER JOIN ex_outlier_stream USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" WHERE ? < event_stream_ordering"
" AND event_stream_ordering <= ?"
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (last_id, upper_bound))
new_event_updates.extend(txn)
return new_event_updates
return self.runInteraction(
"get_all_new_forward_event_rows", get_all_new_forward_event_rows
)
def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
if last_id == current_id:
return defer.succeed([])
def get_all_new_backfill_event_rows(txn):
sql = (
"SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts"
" FROM events AS e"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" WHERE ? > stream_ordering AND stream_ordering >= ?"
" ORDER BY stream_ordering ASC"
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, limit))
new_event_updates = txn.fetchall()
if len(new_event_updates) == limit:
upper_bound = new_event_updates[-1][0]
else:
upper_bound = current_id
sql = (
"SELECT -event_stream_ordering, e.event_id, e.room_id, e.type,"
" state_key, redacts"
" FROM events AS e"
" INNER JOIN ex_outlier_stream USING (event_id)"
" LEFT JOIN redactions USING (event_id)"
" LEFT JOIN state_events USING (event_id)"
" WHERE ? > event_stream_ordering"
" AND event_stream_ordering >= ?"
" ORDER BY event_stream_ordering DESC"
)
txn.execute(sql, (-last_id, -upper_bound))
new_event_updates.extend(txn.fetchall())
return new_event_updates
return self.runInteraction(
"get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
)
@cached(num_args=5, max_entries=10) @cached(num_args=5, max_entries=10)
def get_all_new_events(self, last_backfill_id, last_forward_id, def get_all_new_events(self, last_backfill_id, last_forward_id,
current_backfill_id, current_forward_id, limit): current_backfill_id, current_forward_id, limit):

View File

@ -135,6 +135,48 @@ class PusherStore(SQLBaseStore):
"get_all_updated_pushers", get_all_updated_pushers_txn "get_all_updated_pushers", get_all_updated_pushers_txn
) )
def get_all_updated_pushers_rows(self, last_id, current_id, limit):
"""Get all the pushers that have changed between the given tokens.
Returns:
list(tuple): each tuple consists of:
stream_id (str)
user_id (str)
app_id (str)
pushkey (str)
was_deleted (bool): whether the pusher was added/updated (False)
or deleted (True)
"""
if last_id == current_id:
return defer.succeed([])
def get_all_updated_pushers_rows_txn(txn):
sql = (
"SELECT id, user_name, app_id, pushkey"
" FROM pushers"
" WHERE ? < id AND id <= ?"
" ORDER BY id ASC LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
results = [list(row) + [False] for row in txn]
sql = (
"SELECT stream_id, user_id, app_id, pushkey"
" FROM deleted_pushers"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC LIMIT ?"
)
txn.execute(sql, (last_id, current_id, limit))
results.extend(list(row) + [True] for row in txn)
results.sort() # Sort so that they're ordered by stream id
return results
return self.runInteraction(
"get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
)
@cachedInlineCallbacks(num_args=1, max_entries=15000) @cachedInlineCallbacks(num_args=1, max_entries=15000)
def get_if_user_has_pusher(self, user_id): def get_if_user_has_pusher(self, user_id):
# This only exists for the cachedList decorator # This only exists for the cachedList decorator