Speed up processing of federation stream RDATA rows.

Instead of storing and sending an ACK for every single row we send
synchronously, we instead do it asynchronously while batching up
updates.
This commit is contained in:
Erik Johnston 2020-05-27 19:31:44 +01:00
parent 4e3a617635
commit 35c308731d
3 changed files with 31 additions and 2 deletions

View File

@ -863,9 +863,24 @@ class FederationSenderHandler(object):
a FEDERATION_ACK back to the master, and stores the token that we have processed a FEDERATION_ACK back to the master, and stores the token that we have processed
in `federation_stream_position` so that we can restart where we left off. in `federation_stream_position` so that we can restart where we left off.
""" """
try:
self.federation_position = token self.federation_position = token
# We save and send the ACK to master asynchronously, so we don't block
# processing on persistence. We don't need to do this operation for
# every single RDATA we receive, we just need to do it periodically.
if self._fed_position_linearizer.is_queued(None):
# There is already a task queued up to save and send the token, so
# no need to queue up another task.
return
run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
async def _save_and_send_ack(self):
"""Save the current federation position in the database and send an ACK
to master with where we're up to.
"""
try:
# We linearize here to ensure we don't have races updating the token # We linearize here to ensure we don't have races updating the token
# #
# XXX this appears to be redundant, since the ReplicationCommandHandler # XXX this appears to be redundant, since the ReplicationCommandHandler

View File

@ -116,6 +116,8 @@ class ReplicationCommandHandler:
# batching works. # batching works.
self._pending_batches = {} # type: Dict[str, List[Any]] self._pending_batches = {} # type: Dict[str, List[Any]]
self._queued_events = {} # type: Dict[str, List[Any]]
# The factory used to create connections. # The factory used to create connections.
self._factory = None # type: Optional[ReconnectingClientFactory] self._factory = None # type: Optional[ReconnectingClientFactory]

View File

@ -225,6 +225,18 @@ class Linearizer(object):
{} {}
) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]] ) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
def is_queued(self, key) -> bool:
"""Checks whether there is a process queued up waiting
"""
entry = self.key_to_defer.get(key)
if not entry:
# No entry so nothing is waiting.
return False
# There are waiting deferreds only in the OrderedDict of deferreds is
# non-empty.
return bool(entry[1])
def queue(self, key): def queue(self, key):
# we avoid doing defer.inlineCallbacks here, so that cancellation works correctly. # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
# (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not # (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not