Change device list streams to have one row per ID (#7010)

* Add 'device_lists_outbound_pokes' as extra table.

This makes sure we check all the relevant tables to get the current max
stream ID.

Currently not doing so isn't problematic as the max stream ID in
`device_lists_outbound_pokes` is the same as in `device_lists_stream`,
however that will change.

* Change device lists stream to have one row per id.

This will make it possible to process the streams more incrementally,
avoiding having to process large chunks at once.

* Change device list replication to match new semantics.

Instead of sending down batches of user ID/host tuples, send down a row
per entity (user ID or host).

* Newsfile

* Remove handling of multiple rows per ID

* Fix worker handling

* Comments from review
This commit is contained in:
Erik Johnston 2020-03-19 11:36:53 +00:00 committed by GitHub
commit a319cb1dd1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 112 additions and 130 deletions

View file

@ -29,7 +29,13 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
self.hs = hs
self._device_list_id_gen = SlavedIdTracker(
db_conn, "device_lists_stream", "stream_id"
db_conn,
"device_lists_stream",
"stream_id",
extra_tables=[
("user_signature_stream", "stream_id"),
("device_lists_outbound_pokes", "stream_id"),
],
)
device_list_max = self._device_list_id_gen.get_current_token()
self._device_list_stream_cache = StreamChangeCache(
@ -55,23 +61,27 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
def process_replication_rows(self, stream_name, token, rows):
if stream_name == DeviceListsStream.NAME:
self._device_list_id_gen.advance(token)
for row in rows:
self._invalidate_caches_for_devices(token, row.user_id, row.destination)
self._invalidate_caches_for_devices(token, rows)
elif stream_name == UserSignatureStream.NAME:
self._device_list_id_gen.advance(token)
for row in rows:
self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
return super(SlavedDeviceStore, self).process_replication_rows(
stream_name, token, rows
)
def _invalidate_caches_for_devices(self, token, user_id, destination):
self._device_list_stream_cache.entity_has_changed(user_id, token)
def _invalidate_caches_for_devices(self, token, rows):
for row in rows:
# The entities are either user IDs (starting with '@') whose devices
# have changed, or remote servers that we need to tell about
# changes.
if row.entity.startswith("@"):
self._device_list_stream_cache.entity_has_changed(row.entity, token)
self.get_cached_devices_for_user.invalidate((row.entity,))
self._get_cached_user_device.invalidate_many((row.entity,))
self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
if destination:
self._device_list_federation_stream_cache.entity_has_changed(
destination, token
)
self.get_cached_devices_for_user.invalidate((user_id,))
self._get_cached_user_device.invalidate_many((user_id,))
self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
else:
self._device_list_federation_stream_cache.entity_has_changed(
row.entity, token
)

View file

@ -94,9 +94,13 @@ PublicRoomsStreamRow = namedtuple(
"network_id", # str, optional
),
)
DeviceListsStreamRow = namedtuple(
"DeviceListsStreamRow", ("user_id", "destination") # str # str
)
@attr.s
class DeviceListsStreamRow:
entity = attr.ib(type=str)
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
@ -363,7 +367,8 @@ class PublicRoomsStream(Stream):
class DeviceListsStream(Stream):
"""Someone added/changed/removed a device
"""Either a user has updated their devices or a remote server needs to be
told about a device update.
"""
NAME = "device_lists"