Convert *StreamRow classes to inner classes (#7116)

This just helps keep the rows closer to their streams, so that it's easier to
see what the format of each stream is.
This commit is contained in:
Richard van der Hoff 2020-03-23 13:59:11 +00:00 committed by GitHub
parent 5126cb1253
commit a564b92d37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 106 additions and 100 deletions

1
changelog.d/7116.misc Normal file
View File

@ -0,0 +1 @@
Convert `*StreamRow` classes to inner classes.

View File

@ -804,7 +804,7 @@ class FederationSenderHandler(object):
async def _on_new_receipts(self, rows): async def _on_new_receipts(self, rows):
""" """
Args: Args:
rows (iterable[synapse.replication.tcp.streams.ReceiptsStreamRow]): rows (Iterable[synapse.replication.tcp.streams.ReceiptsStream.ReceiptsStreamRow]):
new receipts to be processed new receipts to be processed
""" """
for receipt in rows: for receipt in rows:

View File

@ -477,7 +477,7 @@ def process_rows_for_federation(transaction_queue, rows):
Args: Args:
transaction_queue (FederationSender) transaction_queue (FederationSender)
rows (list(synapse.replication.tcp.streams.FederationStreamRow)) rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
""" """
# The federation stream contains a bunch of different types of # The federation stream contains a bunch of different types of

View File

@ -28,94 +28,6 @@ logger = logging.getLogger(__name__)
MAX_EVENTS_BEHIND = 500000 MAX_EVENTS_BEHIND = 500000
BackfillStreamRow = namedtuple(
"BackfillStreamRow",
(
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
"relates_to", # str, optional
),
)
PresenceStreamRow = namedtuple(
"PresenceStreamRow",
(
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
),
)
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
)
ReceiptsStreamRow = namedtuple(
"ReceiptsStreamRow",
(
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
),
)
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)
@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""
cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)
PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
),
)
@attr.s
class DeviceListsStreamRow:
entity = attr.ib(type=str)
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)
AccountDataStreamRow = namedtuple(
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
class Stream(object): class Stream(object):
"""Base class for the streams. """Base class for the streams.
@ -234,6 +146,18 @@ class BackfillStream(Stream):
or it went from being an outlier to not. or it went from being an outlier to not.
""" """
BackfillStreamRow = namedtuple(
"BackfillStreamRow",
(
"event_id", # str
"room_id", # str
"type", # str
"state_key", # str, optional
"redacts", # str, optional
"relates_to", # str, optional
),
)
NAME = "backfill" NAME = "backfill"
ROW_TYPE = BackfillStreamRow ROW_TYPE = BackfillStreamRow
@ -246,6 +170,19 @@ class BackfillStream(Stream):
class PresenceStream(Stream): class PresenceStream(Stream):
PresenceStreamRow = namedtuple(
"PresenceStreamRow",
(
"user_id", # str
"state", # str
"last_active_ts", # int
"last_federation_update_ts", # int
"last_user_sync_ts", # int
"status_msg", # str
"currently_active", # bool
),
)
NAME = "presence" NAME = "presence"
ROW_TYPE = PresenceStreamRow ROW_TYPE = PresenceStreamRow
@ -260,6 +197,10 @@ class PresenceStream(Stream):
class TypingStream(Stream): class TypingStream(Stream):
TypingStreamRow = namedtuple(
"TypingStreamRow", ("room_id", "user_ids") # str # list(str)
)
NAME = "typing" NAME = "typing"
ROW_TYPE = TypingStreamRow ROW_TYPE = TypingStreamRow
@ -273,6 +214,17 @@ class TypingStream(Stream):
class ReceiptsStream(Stream): class ReceiptsStream(Stream):
ReceiptsStreamRow = namedtuple(
"ReceiptsStreamRow",
(
"room_id", # str
"receipt_type", # str
"user_id", # str
"event_id", # str
"data", # dict
),
)
NAME = "receipts" NAME = "receipts"
ROW_TYPE = ReceiptsStreamRow ROW_TYPE = ReceiptsStreamRow
@ -289,6 +241,8 @@ class PushRulesStream(Stream):
"""A user has changed their push rules """A user has changed their push rules
""" """
PushRulesStreamRow = namedtuple("PushRulesStreamRow", ("user_id",)) # str
NAME = "push_rules" NAME = "push_rules"
ROW_TYPE = PushRulesStreamRow ROW_TYPE = PushRulesStreamRow
@ -309,6 +263,11 @@ class PushersStream(Stream):
"""A user has added/changed/removed a pusher """A user has added/changed/removed a pusher
""" """
PushersStreamRow = namedtuple(
"PushersStreamRow",
("user_id", "app_id", "pushkey", "deleted"), # str # str # str # bool
)
NAME = "pushers" NAME = "pushers"
ROW_TYPE = PushersStreamRow ROW_TYPE = PushersStreamRow
@ -326,6 +285,21 @@ class CachesStream(Stream):
the cache on the workers the cache on the workers
""" """
@attr.s
class CachesStreamRow:
"""Stream to inform workers they should invalidate their cache.
Attributes:
cache_func: Name of the cached function.
keys: The entry in the cache to invalidate. If None then will
invalidate all.
invalidation_ts: Timestamp of when the invalidation took place.
"""
cache_func = attr.ib(type=str)
keys = attr.ib(type=Optional[List[Any]])
invalidation_ts = attr.ib(type=int)
NAME = "caches" NAME = "caches"
ROW_TYPE = CachesStreamRow ROW_TYPE = CachesStreamRow
@ -342,6 +316,16 @@ class PublicRoomsStream(Stream):
"""The public rooms list changed """The public rooms list changed
""" """
PublicRoomsStreamRow = namedtuple(
"PublicRoomsStreamRow",
(
"room_id", # str
"visibility", # str
"appservice_id", # str, optional
"network_id", # str, optional
),
)
NAME = "public_rooms" NAME = "public_rooms"
ROW_TYPE = PublicRoomsStreamRow ROW_TYPE = PublicRoomsStreamRow
@ -359,6 +343,10 @@ class DeviceListsStream(Stream):
told about a device update. told about a device update.
""" """
@attr.s
class DeviceListsStreamRow:
entity = attr.ib(type=str)
NAME = "device_lists" NAME = "device_lists"
ROW_TYPE = DeviceListsStreamRow ROW_TYPE = DeviceListsStreamRow
@ -375,6 +363,8 @@ class ToDeviceStream(Stream):
"""New to_device messages for a client """New to_device messages for a client
""" """
ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ("entity",)) # str
NAME = "to_device" NAME = "to_device"
ROW_TYPE = ToDeviceStreamRow ROW_TYPE = ToDeviceStreamRow
@ -391,6 +381,10 @@ class TagAccountDataStream(Stream):
"""Someone added/removed a tag for a room """Someone added/removed a tag for a room
""" """
TagAccountDataStreamRow = namedtuple(
"TagAccountDataStreamRow", ("user_id", "room_id", "data") # str # str # dict
)
NAME = "tag_account_data" NAME = "tag_account_data"
ROW_TYPE = TagAccountDataStreamRow ROW_TYPE = TagAccountDataStreamRow
@ -407,6 +401,10 @@ class AccountDataStream(Stream):
"""Global or per room account data was changed """Global or per room account data was changed
""" """
AccountDataStreamRow = namedtuple(
"AccountDataStream", ("user_id", "room_id", "data_type") # str # str # str
)
NAME = "account_data" NAME = "account_data"
ROW_TYPE = AccountDataStreamRow ROW_TYPE = AccountDataStreamRow
@ -432,6 +430,11 @@ class AccountDataStream(Stream):
class GroupServerStream(Stream): class GroupServerStream(Stream):
GroupsStreamRow = namedtuple(
"GroupsStreamRow",
("group_id", "user_id", "type", "content"), # str # str # str # dict
)
NAME = "groups" NAME = "groups"
ROW_TYPE = GroupsStreamRow ROW_TYPE = GroupsStreamRow
@ -448,6 +451,8 @@ class UserSignatureStream(Stream):
"""A user has signed their own device with their user-signing key """A user has signed their own device with their user-signing key
""" """
UserSignatureStreamRow = namedtuple("UserSignatureStreamRow", ("user_id")) # str
NAME = "user_signature" NAME = "user_signature"
ROW_TYPE = UserSignatureStreamRow ROW_TYPE = UserSignatureStreamRow

View File

@ -17,20 +17,20 @@ from collections import namedtuple
from ._base import Stream from ._base import Stream
FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)
class FederationStream(Stream): class FederationStream(Stream):
"""Data to be sent over federation. Only available when master has federation """Data to be sent over federation. Only available when master has federation
sending disabled. sending disabled.
""" """
FederationStreamRow = namedtuple(
"FederationStreamRow",
(
"type", # str, the type of data as defined in the BaseFederationRows
"data", # dict, serialization of a federation.send_queue.BaseFederationRow
),
)
NAME = "federation" NAME = "federation"
ROW_TYPE = FederationStreamRow ROW_TYPE = FederationStreamRow

View File

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.replication.tcp.streams._base import ReceiptsStreamRow from synapse.replication.tcp.streams._base import ReceiptsStream
from tests.replication.tcp.streams._base import BaseStreamTestCase from tests.replication.tcp.streams._base import BaseStreamTestCase
@ -38,7 +38,7 @@ class ReceiptsStreamTestCase(BaseStreamTestCase):
rdata_rows = self.test_handler.received_rdata_rows rdata_rows = self.test_handler.received_rdata_rows
self.assertEqual(1, len(rdata_rows)) self.assertEqual(1, len(rdata_rows))
self.assertEqual(rdata_rows[0][0], "receipts") self.assertEqual(rdata_rows[0][0], "receipts")
row = rdata_rows[0][2] # type: ReceiptsStreamRow row = rdata_rows[0][2] # type: ReceiptsStream.ReceiptsStreamRow
self.assertEqual(ROOM_ID, row.room_id) self.assertEqual(ROOM_ID, row.room_id)
self.assertEqual("m.read", row.receipt_type) self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id) self.assertEqual(USER_ID, row.user_id)