diff --git a/changelog.d/9020.misc b/changelog.d/9020.misc new file mode 100644 index 000000000..4ff0b94b9 --- /dev/null +++ b/changelog.d/9020.misc @@ -0,0 +1 @@ +Add type hints to push module. diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 9e7ac149a..f4f7ec96f 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -40,7 +40,7 @@ class PusherConfig: ts = attr.ib(type=int) lang = attr.ib(type=Optional[str]) data = attr.ib(type=Optional[JsonDict]) - last_stream_ordering = attr.ib(type=Optional[int]) + last_stream_ordering = attr.ib(type=int) last_success = attr.ib(type=Optional[int]) failing_since = attr.ib(type=Optional[int]) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index d2eff75a5..4ac1b3174 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -157,7 +157,6 @@ class EmailPusher(Pusher): being run. """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - assert start is not None unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email( self.user_id, start, self.max_stream_ordering ) @@ -220,12 +219,8 @@ class EmailPusher(Pusher): ) async def save_last_stream_ordering_and_success( - self, last_stream_ordering: Optional[int] + self, last_stream_ordering: int ) -> None: - if last_stream_ordering is None: - # This happens if we haven't yet processed anything - return - self.last_stream_ordering = last_stream_ordering pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( self.app_id, diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 417fe0f1f..e048b0d59 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -176,7 +176,6 @@ class HttpPusher(Pusher): Never call this directly: use _process which will only allow this to run once per pusher. """ - assert self.last_stream_ordering is not None unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http( self.user_id, self.last_stream_ordering, self.max_stream_ordering ) @@ -205,7 +204,6 @@ class HttpPusher(Pusher): http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - assert self.last_stream_ordering is not None pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( self.app_id, self.pushkey, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 8158356d4..eed16dbfb 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -106,6 +106,10 @@ class PusherPool: time_now_msec = self.clock.time_msec() + # create the pusher setting last_stream_ordering to the current maximum + # stream ordering, so it will process pushes from this point onwards. + last_stream_ordering = self.store.get_room_max_stream_ordering() + # we try to create the pusher just to validate the config: it # will then get pulled out of the database, # recreated, added and started: this means we have only one @@ -124,16 +128,12 @@ class PusherPool: ts=time_now_msec, lang=lang, data=data, - last_stream_ordering=None, + last_stream_ordering=last_stream_ordering, last_success=None, failing_since=None, ) ) - # create the pusher setting last_stream_ordering to the current maximum - # stream ordering, so it will process pushes from this point onwards. - last_stream_ordering = self.store.get_room_max_stream_ordering() - await self.store.add_pusher( user_id=user_id, access_token=access_token,