diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 4de4ebe84..967b459e0 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -36,34 +36,82 @@ logger = logging.getLogger(__name__) MAX_EVENTS_BEHIND = 10000 -EventStreamRow = namedtuple("EventStreamRow", - ("event_id", "room_id", "type", "state_key", "redacts")) -BackfillStreamRow = namedtuple("BackfillStreamRow", - ("event_id", "room_id", "type", "state_key", "redacts")) -PresenceStreamRow = namedtuple("PresenceStreamRow", - ("user_id", "state", "last_active_ts", - "last_federation_update_ts", "last_user_sync_ts", - "status_msg", "currently_active")) -TypingStreamRow = namedtuple("TypingStreamRow", - ("room_id", "user_ids")) -ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", - ("room_id", "receipt_type", "user_id", "event_id", - "data")) -PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) -PushersStreamRow = namedtuple("PushersStreamRow", - ("user_id", "app_id", "pushkey", "deleted",)) -CachesStreamRow = namedtuple("CachesStreamRow", - ("cache_func", "keys", "invalidation_ts",)) -PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", - ("room_id", "visibility", "appservice_id", - "network_id",)) -DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ("user_id", "destination",)) -ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) -FederationStreamRow = namedtuple("FederationStreamRow", ("type", "data",)) -TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", - ("user_id", "room_id", "data")) -AccountDataStreamRow = namedtuple("AccountDataStream", - ("user_id", "room_id", "data_type", "data")) +EventStreamRow = namedtuple("EventStreamRow", ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional +)) +BackfillStreamRow = namedtuple("BackfillStreamRow", ( + "event_id", # str + "room_id", # str + "type", # str + "state_key", # str, optional + "redacts", # str, optional +)) +PresenceStreamRow = namedtuple("PresenceStreamRow", ( + "user_id", # str + "state", # str + "last_active_ts", # int + "last_federation_update_ts", # int + "last_user_sync_ts", # int + "status_msg", # str + "currently_active", # bool +)) +TypingStreamRow = namedtuple("TypingStreamRow", ( + "room_id", # str + "user_ids", # list(str) +)) +ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", ( + "room_id", # str + "receipt_type", # str + "user_id", # str + "event_id", # str + "data", # dict +)) +PushRulesStreamRow = namedtuple("PushRulesStreamRow", ( + "user_id", # str +)) +PushersStreamRow = namedtuple("PushersStreamRow", ( + "user_id", # str + "app_id", # str + "pushkey", # str + "deleted", # bool +)) +CachesStreamRow = namedtuple("CachesStreamRow", ( + "cache_func", # str + "keys", # list(str) + "invalidation_ts", # int +)) +PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", ( + "room_id", # str + "visibility", # str + "appservice_id", # str, optional + "network_id", # str, optional +)) +DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", ( + "user_id", # str + "destination", # str +)) +ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( + "entity", # str +)) +FederationStreamRow = namedtuple("FederationStreamRow", ( + "type", # str + "data", # dict +)) +TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( + "user_id", # str + "room_id", # str + "data", # dict +)) +AccountDataStreamRow = namedtuple("AccountDataStream", ( + "user_id", # str + "room_id", # str + "data_type", # str + "data", # dict +)) class Stream(object):