From 36be39b8b39f4fc9b3a272faa306660b95145dad Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Wed, 1 Mar 2017 14:12:11 +0000
Subject: [PATCH 1/3] Fix device list update to not constantly resync

---
 synapse/handlers/device.py | 177 ++++++++++++++++++++++++++-----------
 1 file changed, 123 insertions(+), 54 deletions(-)

diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index ca7137f31..23dab53df 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -34,10 +34,11 @@ class DeviceHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self.federation_sender = hs.get_federation_sender()
         self.federation = hs.get_replication_layer()
-        self._remote_edue_linearizer = Linearizer(name="remote_device_list")
+
+        self._edu_updater = DeviceListEduUpdater(hs, self)
 
         self.federation.register_edu_handler(
-            "m.device_list_update", self._incoming_device_list_update,
+            "m.device_list_update", self._edu_updater.incoming_device_list_update,
         )
         self.federation.register_query_handler(
             "user_devices", self.on_federation_query_user_devices,
@@ -299,58 +300,6 @@ class DeviceHandler(BaseHandler):
         # and those that actually still share a room with the user
         defer.returnValue(users_who_share_room & possibly_changed)
 
-    @measure_func("_incoming_device_list_update")
-    @defer.inlineCallbacks
-    def _incoming_device_list_update(self, origin, edu_content):
-        user_id = edu_content["user_id"]
-        device_id = edu_content["device_id"]
-        stream_id = edu_content["stream_id"]
-        prev_ids = edu_content.get("prev_id", [])
-
-        if get_domain_from_id(user_id) != origin:
-            # TODO: Raise?
-            logger.warning("Got device list update edu for %r from %r", user_id, origin)
-            return
-
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        if not rooms:
-            # We don't share any rooms with this user. Ignore update, as we
-            # probably won't get any further updates.
-            return
-
-        with (yield self._remote_edue_linearizer.queue(user_id)):
-            # If the prev id matches whats in our cache table, then we don't need
-            # to resync the users device list, otherwise we do.
-            resync = True
-            if len(prev_ids) == 1:
-                extremity = yield self.store.get_device_list_last_stream_id_for_remote(
-                    user_id
-                )
-                logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
-                if str(extremity) == str(prev_ids[0]):
-                    resync = False
-
-            if resync:
-                # Fetch all devices for the user.
-                result = yield self.federation.query_user_devices(origin, user_id)
-                stream_id = result["stream_id"]
-                devices = result["devices"]
-                yield self.store.update_remote_device_list_cache(
-                    user_id, devices, stream_id,
-                )
-                device_ids = [device["device_id"] for device in devices]
-                yield self.notify_device_update(user_id, device_ids)
-            else:
-                # Simply update the single device, since we know that is the only
-                # change (becuase of the single prev_id matching the current cache)
-                content = dict(edu_content)
-                for key in ("user_id", "device_id", "stream_id", "prev_ids"):
-                    content.pop(key, None)
-                yield self.store.update_remote_device_list_cache_entry(
-                    user_id, device_id, content, stream_id,
-                )
-                yield self.notify_device_update(user_id, [device_id])
-
     @defer.inlineCallbacks
     def on_federation_query_user_devices(self, user_id):
         stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
@@ -376,3 +325,123 @@ def _update_device_from_client_ips(device, client_ips):
         "last_seen_ts": ip.get("last_seen"),
         "last_seen_ip": ip.get("ip"),
     })
+
+
+class DeviceListEduUpdater(object):
+    "Handles incoming device list updates from federation and updates the DB"
+
+    def __init__(self, hs, device_handler):
+        self.store = hs.get_datastore()
+        self.federation = hs.get_replication_layer()
+        self.clock = hs.get_clock()
+        self.device_handler = device_handler
+
+        self._remote_edue_linearizer = Linearizer(name="remote_device_list")
+
+        # user_id -> list of updates waiting to be handled.
+        self._pending_updates = {}
+
+        # Recently seen stream ids. We don't bother keeping these in the DB,
+        # but they're useful to have them about to reduce the number of spurious
+        # resyncs.
+        self._seen_updates = {}
+
+    @defer.inlineCallbacks
+    def incoming_device_list_update(self, origin, edu_content):
+        """Called on incoming device list update from federation. Responsible
+        for parsing the EDU and adding to pending updates list.
+        """
+
+        user_id = edu_content.pop("user_id")
+        device_id = edu_content.pop("device_id")
+        stream_id = str(edu_content.pop("stream_id"))  # They may come as ints
+        prev_ids = edu_content.pop("prev_id", [])
+        prev_ids = [str(p) for p in prev_ids]   # They may come as ints
+
+        if get_domain_from_id(user_id) != origin:
+            # TODO: Raise?
+            logger.warning("Got device list update edu for %r from %r", user_id, origin)
+            return
+
+        rooms = yield self.store.get_rooms_for_user(user_id)
+        if not rooms:
+            # We don't share any rooms with this user. Ignore update, as we
+            # probably won't get any further updates.
+            return
+
+        self._pending_updates.setdefault(user_id, []).append(
+            (device_id, stream_id, prev_ids, edu_content)
+        )
+
+        yield self._handle_device_updates(user_id)
+
+    @measure_func("_incoming_device_list_update")
+    @defer.inlineCallbacks
+    def _handle_device_updates(self, user_id):
+        "Actually handle pending updates."
+
+        with (yield self._remote_edue_linearizer.queue(user_id)):
+            pending_updates = self._pending_updates.pop(user_id, [])
+            if not pending_updates:
+                # This can happen since we batch updates
+                return
+
+            resync = yield self._need_to_do_resync(user_id, pending_updates)
+
+            if resync:
+                # Fetch all devices for the user.
+                origin = get_domain_from_id(user_id)
+                result = yield self.federation.query_user_devices(origin, user_id)
+                stream_id = result["stream_id"]
+                devices = result["devices"]
+                yield self.store.update_remote_device_list_cache(
+                    user_id, devices, stream_id,
+                )
+                device_ids = [device["device_id"] for device in devices]
+                yield self.device_handler.notify_device_update(user_id, device_ids)
+            else:
+                # Simply update the single device, since we know that is the only
+                # change (becuase of the single prev_id matching the current cache)
+                for device_id, stream_id, prev_ids, content in pending_updates:
+                    yield self.store.update_remote_device_list_cache_entry(
+                        user_id, device_id, content, stream_id,
+                    )
+
+                yield self.device_handler.notify_device_update(
+                    user_id, [device_id for device_id, _, _, _ in pending_updates]
+                )
+
+            self._seen_updates.setdefault(user_id, set()).update(
+                [stream_id for _, stream_id, _, _ in pending_updates]
+            )
+
+    @defer.inlineCallbacks
+    def _need_to_do_resync(self, user_id, updates):
+        """Given a list of updates for a user figure out if we need to do a full
+        resync, or whether we have enough data that we can just apply the delta.
+        """
+        seen_updates = self._seen_updates.get(user_id, set())
+
+        extremity = yield self.store.get_device_list_last_stream_id_for_remote(
+            user_id
+        )
+
+        stream_id_in_updates = set()  # stream_ids in updates list
+        for _, stream_id, prev_ids, _ in updates:
+            if not prev_ids:
+                # We always do a resync if there are no previous IDs
+                defer.returnValue(True)
+
+            for prev_id in prev_ids:
+                if prev_id == extremity:
+                    continue
+                elif prev_id in seen_updates:
+                    continue
+                elif prev_id in stream_id_in_updates:
+                    continue
+                else:
+                    defer.returnValue(True)
+
+            stream_id_in_updates.add(stream_id)
+
+        defer.returnValue(False)

From 9834367eeaecd7f356cf7cda1e1b3eb79f8f2ea2 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Fri, 3 Mar 2017 15:31:57 +0000
Subject: [PATCH 2/3] Spelling

---
 synapse/handlers/device.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 23dab53df..540b43879 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -336,7 +336,7 @@ class DeviceListEduUpdater(object):
         self.clock = hs.get_clock()
         self.device_handler = device_handler
 
-        self._remote_edue_linearizer = Linearizer(name="remote_device_list")
+        self._remote_edu_linearizer = Linearizer(name="remote_device_list")
 
         # user_id -> list of updates waiting to be handled.
         self._pending_updates = {}
@@ -380,7 +380,7 @@ class DeviceListEduUpdater(object):
     def _handle_device_updates(self, user_id):
         "Actually handle pending updates."
 
-        with (yield self._remote_edue_linearizer.queue(user_id)):
+        with (yield self._remote_edu_linearizer.queue(user_id)):
             pending_updates = self._pending_updates.pop(user_id, [])
             if not pending_updates:
                 # This can happen since we batch updates

From f2581ee8b81e72c49b0c16ca41071c87292c0227 Mon Sep 17 00:00:00 2001
From: Erik Johnston <erik@matrix.org>
Date: Fri, 3 Mar 2017 16:02:53 +0000
Subject: [PATCH 3/3] Don't keep around old stream IDs forever

---
 synapse/handlers/device.py | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 540b43879..e859b3165 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -16,6 +16,7 @@ from synapse.api import errors
 from synapse.api.constants import EventTypes
 from synapse.util import stringutils
 from synapse.util.async import Linearizer
+from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import measure_func
 from synapse.types import get_domain_from_id, RoomStreamToken
 from twisted.internet import defer
@@ -344,7 +345,13 @@ class DeviceListEduUpdater(object):
         # Recently seen stream ids. We don't bother keeping these in the DB,
         # but they're useful to have them about to reduce the number of spurious
         # resyncs.
-        self._seen_updates = {}
+        self._seen_updates = ExpiringCache(
+            cache_name="device_update_edu",
+            clock=self.clock,
+            max_len=10000,
+            expiry_ms=30 * 60 * 1000,
+            iterable=True,
+        )
 
     @defer.inlineCallbacks
     def incoming_device_list_update(self, origin, edu_content):
@@ -412,7 +419,7 @@ class DeviceListEduUpdater(object):
                 )
 
             self._seen_updates.setdefault(user_id, set()).update(
-                [stream_id for _, stream_id, _, _ in pending_updates]
+                stream_id for _, stream_id, _, _ in pending_updates
             )
 
     @defer.inlineCallbacks