From d6752ce5da38d35857fe324800d76a86ee1e64f1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 18 Dec 2019 14:26:58 +0000 Subject: [PATCH] Clean up startup for the pusher (#6558) * Remove redundant python2 support code `str.decode()` doesn't exist on python3, so presumably this code was doing nothing * Filter out pushers with corrupt data When we get a row with unparsable json, drop the row, rather than returning a row with null `data`, which will then cause an explosion later on. * Improve logging when we can't start a pusher Log the ID to help us understand the problem * Make email pusher setup more robust We know we'll have a `data` member, since that comes from the database. What we *don't* know is if that is a dict, and if that has a `brand` member, and if that member is a string. --- changelog.d/6558.misc | 1 + synapse/push/pusher.py | 12 +++++---- synapse/push/pusherpool.py | 10 ++++--- synapse/rest/client/v1/pusher.py | 31 +++++++++++----------- synapse/storage/data_stores/main/pusher.py | 25 ++++++----------- tests/push/test_email.py | 3 +++ tests/push/test_http.py | 4 +++ 7 files changed, 44 insertions(+), 42 deletions(-) create mode 100644 changelog.d/6558.misc diff --git a/changelog.d/6558.misc b/changelog.d/6558.misc new file mode 100644 index 000000000..a7572f1a8 --- /dev/null +++ b/changelog.d/6558.misc @@ -0,0 +1 @@ +Clean up logs from the push notifier at startup. \ No newline at end of file diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index f277aeb13..8ad0bf593 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -80,9 +80,11 @@ class PusherFactory(object): return EmailPusher(self.hs, pusherdict, mailer) def _app_name_from_pusherdict(self, pusherdict): - if "data" in pusherdict and "brand" in pusherdict["data"]: - app_name = pusherdict["data"]["brand"] - else: - app_name = self.config.email_app_name + data = pusherdict["data"] - return app_name + if isinstance(data, dict): + brand = data.get("brand") + if isinstance(brand, str): + return brand + + return self.config.email_app_name diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 0f6992202..b9dca5bc6 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -232,7 +232,6 @@ class PusherPool: Deferred """ pushers = yield self.store.get_all_pushers() - logger.info("Starting %d pushers", len(pushers)) # Stagger starting up the pushers so we don't completely drown the # process on start up. @@ -245,7 +244,7 @@ class PusherPool: """Start the given pusher Args: - pusherdict (dict): + pusherdict (dict): dict with the values pulled from the db table Returns: Deferred[EmailPusher|HttpPusher] @@ -254,7 +253,8 @@ class PusherPool: p = self.pusher_factory.create_pusher(pusherdict) except PusherConfigException as e: logger.warning( - "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s", + "Pusher incorrectly configured id=%i, user=%s, appid=%s, pushkey=%s: %s", + pusherdict["id"], pusherdict.get("user_name"), pusherdict.get("app_id"), pusherdict.get("pushkey"), @@ -262,7 +262,9 @@ class PusherPool: ) return except Exception: - logger.exception("Couldn't start a pusher: caught Exception") + logger.exception( + "Couldn't start pusher id %i: caught Exception", pusherdict["id"], + ) return if not p: diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index 0791866f5..6f6b7aed6 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -28,6 +28,17 @@ from synapse.rest.client.v2_alpha._base import client_patterns logger = logging.getLogger(__name__) +ALLOWED_KEYS = { + "app_display_name", + "app_id", + "data", + "device_display_name", + "kind", + "lang", + "profile_tag", + "pushkey", +} + class PushersRestServlet(RestServlet): PATTERNS = client_patterns("/pushers$", v1=True) @@ -43,23 +54,11 @@ class PushersRestServlet(RestServlet): pushers = await self.hs.get_datastore().get_pushers_by_user_id(user.to_string()) - allowed_keys = [ - "app_display_name", - "app_id", - "data", - "device_display_name", - "kind", - "lang", - "profile_tag", - "pushkey", - ] + filtered_pushers = list( + {k: v for k, v in p.items() if k in ALLOWED_KEYS} for p in pushers + ) - for p in pushers: - for k, v in list(p.items()): - if k not in allowed_keys: - del p[k] - - return 200, {"pushers": pushers} + return 200, {"pushers": filtered_pushers} def on_OPTIONS(self, _): return 200, {} diff --git a/synapse/storage/data_stores/main/pusher.py b/synapse/storage/data_stores/main/pusher.py index f07309ef0..6b0323326 100644 --- a/synapse/storage/data_stores/main/pusher.py +++ b/synapse/storage/data_stores/main/pusher.py @@ -15,8 +15,7 @@ # limitations under the License. import logging - -import six +from typing import Iterable, Iterator from canonicaljson import encode_canonical_json, json @@ -27,21 +26,16 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList logger = logging.getLogger(__name__) -if six.PY2: - db_binary_type = six.moves.builtins.buffer -else: - db_binary_type = memoryview - class PusherWorkerStore(SQLBaseStore): - def _decode_pushers_rows(self, rows): + def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[dict]: + """JSON-decode the data in the rows returned from the `pushers` table + + Drops any rows whose data cannot be decoded + """ for r in rows: dataJson = r["data"] - r["data"] = None try: - if isinstance(dataJson, db_binary_type): - dataJson = str(dataJson).decode("UTF8") - r["data"] = json.loads(dataJson) except Exception as e: logger.warning( @@ -50,12 +44,9 @@ class PusherWorkerStore(SQLBaseStore): dataJson, e.args[0], ) - pass + continue - if isinstance(r["pushkey"], db_binary_type): - r["pushkey"] = str(r["pushkey"]).decode("UTF8") - - return rows + yield r @defer.inlineCallbacks def user_has_pusher(self, user_id): diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 358b593cd..80187406b 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -165,6 +165,7 @@ class EmailPusherTests(HomeserverTestCase): pushers = self.get_success( self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) ) + pushers = list(pushers) self.assertEqual(len(pushers), 1) last_stream_ordering = pushers[0]["last_stream_ordering"] @@ -175,6 +176,7 @@ class EmailPusherTests(HomeserverTestCase): pushers = self.get_success( self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) ) + pushers = list(pushers) self.assertEqual(len(pushers), 1) self.assertEqual(last_stream_ordering, pushers[0]["last_stream_ordering"]) @@ -192,5 +194,6 @@ class EmailPusherTests(HomeserverTestCase): pushers = self.get_success( self.hs.get_datastore().get_pushers_by(dict(user_name=self.user_id)) ) + pushers = list(pushers) self.assertEqual(len(pushers), 1) self.assertTrue(pushers[0]["last_stream_ordering"] > last_stream_ordering) diff --git a/tests/push/test_http.py b/tests/push/test_http.py index af2327fb6..fe3441f08 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -104,6 +104,7 @@ class HTTPPusherTests(HomeserverTestCase): pushers = self.get_success( self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) ) + pushers = list(pushers) self.assertEqual(len(pushers), 1) last_stream_ordering = pushers[0]["last_stream_ordering"] @@ -114,6 +115,7 @@ class HTTPPusherTests(HomeserverTestCase): pushers = self.get_success( self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) ) + pushers = list(pushers) self.assertEqual(len(pushers), 1) self.assertEqual(last_stream_ordering, pushers[0]["last_stream_ordering"]) @@ -132,6 +134,7 @@ class HTTPPusherTests(HomeserverTestCase): pushers = self.get_success( self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) ) + pushers = list(pushers) self.assertEqual(len(pushers), 1) self.assertTrue(pushers[0]["last_stream_ordering"] > last_stream_ordering) last_stream_ordering = pushers[0]["last_stream_ordering"] @@ -151,5 +154,6 @@ class HTTPPusherTests(HomeserverTestCase): pushers = self.get_success( self.hs.get_datastore().get_pushers_by(dict(user_name=user_id)) ) + pushers = list(pushers) self.assertEqual(len(pushers), 1) self.assertTrue(pushers[0]["last_stream_ordering"] > last_stream_ordering)