Fix-up assertions about last stream token in push (#9020)

The last stream token is always known and we do not need to handle none.
This commit is contained in:
Patrick Cloke 2021-01-05 10:53:15 -05:00 committed by GitHub
parent 31b1905e13
commit 37eaf9c272
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 8 additions and 14 deletions

1
changelog.d/9020.misc Normal file
View File

@ -0,0 +1 @@
Add type hints to push module.

View File

@ -40,7 +40,7 @@ class PusherConfig:
ts = attr.ib(type=int) ts = attr.ib(type=int)
lang = attr.ib(type=Optional[str]) lang = attr.ib(type=Optional[str])
data = attr.ib(type=Optional[JsonDict]) data = attr.ib(type=Optional[JsonDict])
last_stream_ordering = attr.ib(type=Optional[int]) last_stream_ordering = attr.ib(type=int)
last_success = attr.ib(type=Optional[int]) last_success = attr.ib(type=Optional[int])
failing_since = attr.ib(type=Optional[int]) failing_since = attr.ib(type=Optional[int])

View File

@ -157,7 +157,6 @@ class EmailPusher(Pusher):
being run. being run.
""" """
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
assert start is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email( unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
self.user_id, start, self.max_stream_ordering self.user_id, start, self.max_stream_ordering
) )
@ -220,12 +219,8 @@ class EmailPusher(Pusher):
) )
async def save_last_stream_ordering_and_success( async def save_last_stream_ordering_and_success(
self, last_stream_ordering: Optional[int] self, last_stream_ordering: int
) -> None: ) -> None:
if last_stream_ordering is None:
# This happens if we haven't yet processed anything
return
self.last_stream_ordering = last_stream_ordering self.last_stream_ordering = last_stream_ordering
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id, self.app_id,

View File

@ -176,7 +176,6 @@ class HttpPusher(Pusher):
Never call this directly: use _process which will only allow this to Never call this directly: use _process which will only allow this to
run once per pusher. run once per pusher.
""" """
assert self.last_stream_ordering is not None
unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http( unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
self.user_id, self.last_stream_ordering, self.max_stream_ordering self.user_id, self.last_stream_ordering, self.max_stream_ordering
) )
@ -205,7 +204,6 @@ class HttpPusher(Pusher):
http_push_processed_counter.inc() http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"] self.last_stream_ordering = push_action["stream_ordering"]
assert self.last_stream_ordering is not None
pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
self.app_id, self.app_id,
self.pushkey, self.pushkey,

View File

@ -106,6 +106,10 @@ class PusherPool:
time_now_msec = self.clock.time_msec() time_now_msec = self.clock.time_msec()
# create the pusher setting last_stream_ordering to the current maximum
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()
# we try to create the pusher just to validate the config: it # we try to create the pusher just to validate the config: it
# will then get pulled out of the database, # will then get pulled out of the database,
# recreated, added and started: this means we have only one # recreated, added and started: this means we have only one
@ -124,16 +128,12 @@ class PusherPool:
ts=time_now_msec, ts=time_now_msec,
lang=lang, lang=lang,
data=data, data=data,
last_stream_ordering=None, last_stream_ordering=last_stream_ordering,
last_success=None, last_success=None,
failing_since=None, failing_since=None,
) )
) )
# create the pusher setting last_stream_ordering to the current maximum
# stream ordering, so it will process pushes from this point onwards.
last_stream_ordering = self.store.get_room_max_stream_ordering()
await self.store.add_pusher( await self.store.add_pusher(
user_id=user_id, user_id=user_id,
access_token=access_token, access_token=access_token,