This commit is contained in:
Erik Johnston 2017-04-10 10:02:17 +01:00
parent a828a64b75
commit ab904caf33
2 changed files with 8 additions and 6 deletions

View File

@ -238,6 +238,8 @@ class FederationRemoteSendQueue(object):
if from_token > self.pos: if from_token > self.pos:
from_token = -1 from_token = -1
# list of tuple(int, BaseFederationRow), where the first is the position
# of the federation stream.
rows = [] rows = []
# There should be only one reader, so lets delete everything its # There should be only one reader, so lets delete everything its
@ -476,14 +478,15 @@ BufferedToSend = namedtuple("BufferedToSend", (
def process_rows_for_federation(federation_sender, rows): def process_rows_for_federation(federation_sender, rows):
"""Parse a list of rows from the federation stream and them send them out. """Parse a list of rows from the federation stream and put them in the
transaction queue ready for sending to the relevant homeservers.
Args: Args:
federation_sender (TransactionQueue) federation_sender (TransactionQueue)
rows (list(FederationStreamRow)) rows (list(synapse.replication.tcp.streams.FederationStreamRow))
""" """
# The federation stream containis a bunch of different types of # The federation stream contains a bunch of different types of
# rows that need to be handled differently. We parse the rows, put # rows that need to be handled differently. We parse the rows, put
# them into the appropriate collection and then send them off. # them into the appropriate collection and then send them off.
@ -505,7 +508,6 @@ def process_rows_for_federation(federation_sender, rows):
parsed_row = RowType.from_data(row.data) parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff) parsed_row.add_to_buffer(buff)
# We've finished collecting, send everything off
for destination, states in buff.presence.iteritems(): for destination, states in buff.presence.iteritems():
federation_sender.send_presence(destination, states) federation_sender.send_presence(destination, states)

View File

@ -98,8 +98,8 @@ ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
"entity", # str "entity", # str
)) ))
FederationStreamRow = namedtuple("FederationStreamRow", ( FederationStreamRow = namedtuple("FederationStreamRow", (
"type", # str "type", # str, the type of data as defined in the BaseFederationRows
"data", # dict "data", # dict, serialization of a federation.send_queue.BaseFederationRow
)) ))
TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
"user_id", # str "user_id", # str