Fix processing of groups stream, and use symbolic names for streams (#7117)

`groups` != `receipts`

Introduced in #6964
This commit is contained in:
Richard van der Hoff 2020-03-23 11:39:36 +00:00 committed by GitHub
parent 96071eea8f
commit b3cee0ce67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 30 deletions

1
changelog.d/7117.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug which meant that groups updates were not correctly replicated between workers.

View File

@ -65,12 +65,23 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.replication.tcp.streams._base import ( from synapse.replication.tcp.streams import (
AccountDataStream,
DeviceListsStream, DeviceListsStream,
GroupServerStream,
PresenceStream,
PushersStream,
PushRulesStream,
ReceiptsStream, ReceiptsStream,
TagAccountDataStream,
ToDeviceStream, ToDeviceStream,
TypingStream,
)
from synapse.replication.tcp.streams.events import (
EventsStream,
EventsStreamEventRow,
EventsStreamRow,
) )
from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
from synapse.rest.admin import register_servlets_for_media_repo from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.client.v1 import events from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@ -626,7 +637,7 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
if self.send_handler: if self.send_handler:
self.send_handler.process_replication_rows(stream_name, token, rows) self.send_handler.process_replication_rows(stream_name, token, rows)
if stream_name == "events": if stream_name == EventsStream.NAME:
# We shouldn't get multiple rows per token for events stream, so # We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows. # we don't need to optimise this for multiple rows.
for row in rows: for row in rows:
@ -649,44 +660,44 @@ class GenericWorkerReplicationHandler(ReplicationClientHandler):
) )
await self.pusher_pool.on_new_notifications(token, token) await self.pusher_pool.on_new_notifications(token, token)
elif stream_name == "push_rules": elif stream_name == PushRulesStream.NAME:
self.notifier.on_new_event( self.notifier.on_new_event(
"push_rules_key", token, users=[row.user_id for row in rows] "push_rules_key", token, users=[row.user_id for row in rows]
) )
elif stream_name in ("account_data", "tag_account_data"): elif stream_name in (AccountDataStream.NAME, TagAccountDataStream.NAME):
self.notifier.on_new_event( self.notifier.on_new_event(
"account_data_key", token, users=[row.user_id for row in rows] "account_data_key", token, users=[row.user_id for row in rows]
) )
elif stream_name == "receipts": elif stream_name == ReceiptsStream.NAME:
self.notifier.on_new_event( self.notifier.on_new_event(
"receipt_key", token, rooms=[row.room_id for row in rows] "receipt_key", token, rooms=[row.room_id for row in rows]
) )
await self.pusher_pool.on_new_receipts( await self.pusher_pool.on_new_receipts(
token, token, {row.room_id for row in rows} token, token, {row.room_id for row in rows}
) )
elif stream_name == "typing": elif stream_name == TypingStream.NAME:
self.typing_handler.process_replication_rows(token, rows) self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event( self.notifier.on_new_event(
"typing_key", token, rooms=[row.room_id for row in rows] "typing_key", token, rooms=[row.room_id for row in rows]
) )
elif stream_name == "to_device": elif stream_name == ToDeviceStream.NAME:
entities = [row.entity for row in rows if row.entity.startswith("@")] entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities: if entities:
self.notifier.on_new_event("to_device_key", token, users=entities) self.notifier.on_new_event("to_device_key", token, users=entities)
elif stream_name == "device_lists": elif stream_name == DeviceListsStream.NAME:
all_room_ids = set() all_room_ids = set()
for row in rows: for row in rows:
if row.entity.startswith("@"): if row.entity.startswith("@"):
room_ids = await self.store.get_rooms_for_user(row.entity) room_ids = await self.store.get_rooms_for_user(row.entity)
all_room_ids.update(room_ids) all_room_ids.update(room_ids)
self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence": elif stream_name == PresenceStream.NAME:
await self.presence_handler.process_replication_rows(token, rows) await self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts": elif stream_name == GroupServerStream.NAME:
self.notifier.on_new_event( self.notifier.on_new_event(
"groups_key", token, users=[row.user_id for row in rows] "groups_key", token, users=[row.user_id for row in rows]
) )
elif stream_name == "pushers": elif stream_name == PushersStream.NAME:
for row in rows: for row in rows:
if row.deleted: if row.deleted:
self.stop_pusher(row.user_id, row.app_id, row.pushkey) self.stop_pusher(row.user_id, row.app_id, row.pushkey)

View File

@ -24,27 +24,61 @@ Each stream is defined by the following information:
current_token: The function that returns the current token for the stream current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens update_function: The function that returns a list of updates between two tokens
""" """
from synapse.replication.tcp.streams._base import (
from . import _base, events, federation AccountDataStream,
BackfillStream,
CachesStream,
DeviceListsStream,
GroupServerStream,
PresenceStream,
PublicRoomsStream,
PushersStream,
PushRulesStream,
ReceiptsStream,
TagAccountDataStream,
ToDeviceStream,
TypingStream,
UserSignatureStream,
)
from synapse.replication.tcp.streams.events import EventsStream
from synapse.replication.tcp.streams.federation import FederationStream
STREAMS_MAP = { STREAMS_MAP = {
stream.NAME: stream stream.NAME: stream
for stream in ( for stream in (
events.EventsStream, EventsStream,
_base.BackfillStream, BackfillStream,
_base.PresenceStream, PresenceStream,
_base.TypingStream, TypingStream,
_base.ReceiptsStream, ReceiptsStream,
_base.PushRulesStream, PushRulesStream,
_base.PushersStream, PushersStream,
_base.CachesStream, CachesStream,
_base.PublicRoomsStream, PublicRoomsStream,
_base.DeviceListsStream, DeviceListsStream,
_base.ToDeviceStream, ToDeviceStream,
federation.FederationStream, FederationStream,
_base.TagAccountDataStream, TagAccountDataStream,
_base.AccountDataStream, AccountDataStream,
_base.GroupServerStream, GroupServerStream,
_base.UserSignatureStream, UserSignatureStream,
) )
} }
__all__ = [
"STREAMS_MAP",
"BackfillStream",
"PresenceStream",
"TypingStream",
"ReceiptsStream",
"PushRulesStream",
"PushersStream",
"CachesStream",
"PublicRoomsStream",
"DeviceListsStream",
"ToDeviceStream",
"TagAccountDataStream",
"AccountDataStream",
"GroupServerStream",
"UserSignatureStream",
]