mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-09-18 13:14:34 -04:00
Fix races in room stats (and other) updates. (#6187)
Hopefully this will fix the occasional failures we were seeing in the room directory. The problem was that events are not necessarily persisted (and `current_state_delta_stream` updated) in the same order as their stream_id. So for instance current_state_delta 9 might be persisted *before* current_state_delta 8. Then, when the room stats saw stream_id 9, it assumed it had done everything up to 9, and never came back to do stream_id 8. We can solve this easily by only processing up to the stream_id where we know all events have been persisted.
This commit is contained in:
parent
562b4e51dd
commit
a139420a3c
7 changed files with 63 additions and 25 deletions
|
@ -803,17 +803,25 @@ class PresenceHandler(object):
|
|||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "presence_delta"):
|
||||
deltas = yield self.store.get_current_state_deltas(self._event_pos)
|
||||
if not deltas:
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self._event_pos == room_max_stream_ordering:
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
"Processing presence stats %s->%s",
|
||||
self._event_pos,
|
||||
room_max_stream_ordering,
|
||||
)
|
||||
max_pos, deltas = yield self.store.get_current_state_deltas(
|
||||
self._event_pos, room_max_stream_ordering
|
||||
)
|
||||
yield self._handle_state_delta(deltas)
|
||||
|
||||
self._event_pos = deltas[-1]["stream_id"]
|
||||
self._event_pos = max_pos
|
||||
|
||||
# Expose current event processing position to prometheus
|
||||
synapse.metrics.event_processing_positions.labels("presence").set(
|
||||
self._event_pos
|
||||
max_pos
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
|
|
@ -87,21 +87,23 @@ class StatsHandler(StateDeltasHandler):
|
|||
# Be sure to read the max stream_ordering *before* checking if there are any outstanding
|
||||
# deltas, since there is otherwise a chance that we could miss updates which arrive
|
||||
# after we check the deltas.
|
||||
room_max_stream_ordering = yield self.store.get_room_max_stream_ordering()
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self.pos == room_max_stream_ordering:
|
||||
break
|
||||
|
||||
deltas = yield self.store.get_current_state_deltas(self.pos)
|
||||
logger.debug(
|
||||
"Processing room stats %s->%s", self.pos, room_max_stream_ordering
|
||||
)
|
||||
max_pos, deltas = yield self.store.get_current_state_deltas(
|
||||
self.pos, room_max_stream_ordering
|
||||
)
|
||||
|
||||
if deltas:
|
||||
logger.debug("Handling %d state deltas", len(deltas))
|
||||
room_deltas, user_deltas = yield self._handle_deltas(deltas)
|
||||
|
||||
max_pos = deltas[-1]["stream_id"]
|
||||
else:
|
||||
room_deltas = {}
|
||||
user_deltas = {}
|
||||
max_pos = room_max_stream_ordering
|
||||
|
||||
# Then count deltas for total_events and total_event_bytes.
|
||||
room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
|
||||
|
|
|
@ -138,21 +138,28 @@ class UserDirectoryHandler(StateDeltasHandler):
|
|||
# Loop round handling deltas until we're up to date
|
||||
while True:
|
||||
with Measure(self.clock, "user_dir_delta"):
|
||||
deltas = yield self.store.get_current_state_deltas(self.pos)
|
||||
if not deltas:
|
||||
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
|
||||
if self.pos == room_max_stream_ordering:
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
"Processing user stats %s->%s", self.pos, room_max_stream_ordering
|
||||
)
|
||||
max_pos, deltas = yield self.store.get_current_state_deltas(
|
||||
self.pos, room_max_stream_ordering
|
||||
)
|
||||
|
||||
logger.info("Handling %d state deltas", len(deltas))
|
||||
yield self._handle_deltas(deltas)
|
||||
|
||||
self.pos = deltas[-1]["stream_id"]
|
||||
self.pos = max_pos
|
||||
|
||||
# Expose current event processing position to prometheus
|
||||
synapse.metrics.event_processing_positions.labels("user_dir").set(
|
||||
self.pos
|
||||
max_pos
|
||||
)
|
||||
|
||||
yield self.store.update_user_directory_stream_pos(self.pos)
|
||||
yield self.store.update_user_directory_stream_pos(max_pos)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _handle_deltas(self, deltas):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue