mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
async/await for SyncReplicationHandler.process_and_notify
This commit is contained in:
parent
bc42da4ab8
commit
c74de81bfc
@ -371,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
|||||||
def get_currently_syncing_users(self):
|
def get_currently_syncing_users(self):
|
||||||
return self.presence_handler.get_currently_syncing_users()
|
return self.presence_handler.get_currently_syncing_users()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
async def process_and_notify(self, stream_name, token, rows):
|
||||||
def process_and_notify(self, stream_name, token, rows):
|
|
||||||
try:
|
try:
|
||||||
if stream_name == "events":
|
if stream_name == "events":
|
||||||
# We shouldn't get multiple rows per token for events stream, so
|
# We shouldn't get multiple rows per token for events stream, so
|
||||||
@ -380,7 +379,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
|||||||
for row in rows:
|
for row in rows:
|
||||||
if row.type != EventsStreamEventRow.TypeId:
|
if row.type != EventsStreamEventRow.TypeId:
|
||||||
continue
|
continue
|
||||||
event = yield self.store.get_event(row.data.event_id)
|
event = await self.store.get_event(row.data.event_id)
|
||||||
extra_users = ()
|
extra_users = ()
|
||||||
if event.type == EventTypes.Member:
|
if event.type == EventTypes.Member:
|
||||||
extra_users = (event.state_key,)
|
extra_users = (event.state_key,)
|
||||||
@ -412,11 +411,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
|
|||||||
elif stream_name == "device_lists":
|
elif stream_name == "device_lists":
|
||||||
all_room_ids = set()
|
all_room_ids = set()
|
||||||
for row in rows:
|
for row in rows:
|
||||||
room_ids = yield self.store.get_rooms_for_user(row.user_id)
|
room_ids = await self.store.get_rooms_for_user(row.user_id)
|
||||||
all_room_ids.update(room_ids)
|
all_room_ids.update(room_ids)
|
||||||
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
|
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
|
||||||
elif stream_name == "presence":
|
elif stream_name == "presence":
|
||||||
yield self.presence_handler.process_replication_rows(token, rows)
|
await self.presence_handler.process_replication_rows(token, rows)
|
||||||
elif stream_name == "receipts":
|
elif stream_name == "receipts":
|
||||||
self.notifier.on_new_event(
|
self.notifier.on_new_event(
|
||||||
"groups_key", token, users=[row.user_id for row in rows]
|
"groups_key", token, users=[row.user_id for row in rows]
|
||||||
|
Loading…
Reference in New Issue
Block a user