diff --git a/changelog.d/10107.bugfix b/changelog.d/10107.bugfix new file mode 100644 index 000000000..80030efab --- /dev/null +++ b/changelog.d/10107.bugfix @@ -0,0 +1 @@ +Fixed a bug that could cause Synapse to stop notifying application services. Contributed by Willem Mulder. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 177310f0b..862638cc4 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -87,7 +87,8 @@ class ApplicationServicesHandler: self.is_processing = True try: limit = 100 - while True: + upper_bound = -1 + while upper_bound < self.current_max: ( upper_bound, events, @@ -95,9 +96,6 @@ class ApplicationServicesHandler: self.current_max, limit ) - if not events: - break - events_by_room = {} # type: Dict[str, List[EventBase]] for event in events: events_by_room.setdefault(event.room_id, []).append(event) @@ -153,9 +151,6 @@ class ApplicationServicesHandler: await self.store.set_appservice_last_pos(upper_bound) - now = self.clock.time_msec() - ts = await self.store.get_received_ts(events[-1].event_id) - synapse.metrics.event_processing_positions.labels( "appservice_sender" ).set(upper_bound) @@ -168,12 +163,16 @@ class ApplicationServicesHandler: event_processing_loop_counter.labels("appservice_sender").inc() - synapse.metrics.event_processing_lag.labels( - "appservice_sender" - ).set(now - ts) - synapse.metrics.event_processing_last_ts.labels( - "appservice_sender" - ).set(ts) + if events: + now = self.clock.time_msec() + ts = await self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.labels( + "appservice_sender" + ).set(now - ts) + synapse.metrics.event_processing_last_ts.labels( + "appservice_sender" + ).set(ts) finally: self.is_processing = False diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index b037b12a0..5d6cc2885 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -57,10 +57,10 @@ class AppServiceHandlerTestCase(unittest.TestCase): sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar" ) self.mock_store.get_new_events_for_appservice.side_effect = [ - make_awaitable((0, [event])), make_awaitable((0, [])), + make_awaitable((1, [event])), ] - self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.handler.notify_interested_services(RoomStreamToken(None, 1)) self.mock_scheduler.submit_event_for_as.assert_called_once_with( interested_service, event @@ -77,7 +77,6 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.mock_as_api.query_user.return_value = make_awaitable(True) self.mock_store.get_new_events_for_appservice.side_effect = [ make_awaitable((0, [event])), - make_awaitable((0, [])), ] self.handler.notify_interested_services(RoomStreamToken(None, 0)) @@ -95,7 +94,6 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.mock_as_api.query_user.return_value = make_awaitable(True) self.mock_store.get_new_events_for_appservice.side_effect = [ make_awaitable((0, [event])), - make_awaitable((0, [])), ] self.handler.notify_interested_services(RoomStreamToken(None, 0))