diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 783ccf12f..9c69fe511 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -106,15 +106,12 @@ class FederationClient(FederationBase): Deferred: Completes when we have successfully processed the PDU and replicated it to any interested remote home servers. """ - order = self._order - self._order += 1 - sent_pdus_destination_dist.inc_by(len(destinations)) logger.debug("[%s] transaction_layer.send_pdu... ", pdu.event_id) # TODO, add errback, etc. - self._transaction_queue.send_pdu(pdu, destinations, order) + self._transaction_queue.send_pdu(pdu, destinations) logger.debug( "[%s] transaction_layer.send_pdu... done", @@ -127,16 +124,7 @@ class FederationClient(FederationBase): @log_function def send_edu(self, destination, edu_type, content, key=None): - edu = Edu( - origin=self.server_name, - destination=destination, - edu_type=edu_type, - content=content, - ) - - sent_edus_counter.inc() - - self._transaction_queue.send_edu(edu, key=key) + self._transaction_queue.send_edu(destination, edu_type, content, key=key) @log_function def send_device_messages(self, destination): diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index ea66a5dcb..043baef13 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -68,8 +68,6 @@ class ReplicationLayer(FederationClient, FederationServer): self.transaction_actions = TransactionActions(self.store) self._transaction_queue = TransactionQueue(hs, transport_layer) - self._order = 0 - self.hs = hs super(ReplicationLayer, self).__init__(hs) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index e0abe4b40..69e01d652 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -95,6 +95,8 @@ class TransactionQueue(object): # HACK to get unique tx id self._next_txn_id = int(self.clock.time_msec()) + self._order = 1 + def can_send_to(self, destination): """Can we send messages to the given server? @@ -115,11 +117,14 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") - def send_pdu(self, pdu, destinations, order): + def send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. + order = self._order + self._order += 1 + destinations = set(destinations) destinations = set( dest for dest in destinations if self.can_send_to(dest) @@ -140,6 +145,9 @@ class TransactionQueue(object): ) def send_presence(self, destination, states): + if not self.can_send_to(destination): + return + self.pending_presence_by_dest.setdefault(destination, {}).update({ state.user_id: state for state in states }) @@ -148,8 +156,13 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def send_edu(self, edu, key=None): - destination = edu.destination + def send_edu(self, destination, edu_type, content, key=None): + edu = Edu( + origin=self.server_name, + destination=destination, + edu_type=edu_type, + content=content, + ) if not self.can_send_to(destination): return