Remove obsolete order field in send_new_transaction (#8245)

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
This commit is contained in:
reivilibre 2020-09-03 19:23:07 +01:00 committed by GitHub
parent f97f9485ee
commit 4535e849d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 23 additions and 28 deletions

1
changelog.d/8245.misc Normal file
View File

@ -0,0 +1 @@
Remove obsolete `order` field from federation send queues.

View File

@ -108,8 +108,6 @@ class FederationSender(object):
), ),
) )
self._order = 1
self._is_processing = False self._is_processing = False
self._last_poked_id = -1 self._last_poked_id = -1
@ -272,9 +270,6 @@ class FederationSender(object):
# a transaction in progress. If we do, stick it in the pending_pdus # a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later. # table and we'll get back to it later.
order = self._order
self._order += 1
destinations = set(destinations) destinations = set(destinations)
destinations.discard(self.server_name) destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations)) logger.debug("Sending to: %s", str(destinations))
@ -286,7 +281,7 @@ class FederationSender(object):
sent_pdus_destination_dist_count.inc() sent_pdus_destination_dist_count.inc()
for destination in destinations: for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order) self._get_per_destination_queue(destination).send_pdu(pdu)
async def send_read_receipt(self, receipt: ReadReceipt) -> None: async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room """Send a RR to any other servers in the room

View File

@ -92,8 +92,8 @@ class PerDestinationQueue(object):
self._destination = destination self._destination = destination
self.transmission_loop_running = False self.transmission_loop_running = False
# a list of tuples of (pending pdu, order) # a list of pending PDUs
self._pending_pdus = [] # type: List[Tuple[EventBase, int]] self._pending_pdus = [] # type: List[EventBase]
# XXX this is never actually used: see # XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549 # https://github.com/matrix-org/synapse/issues/7549
@ -132,14 +132,13 @@ class PerDestinationQueue(object):
+ len(self._pending_edus_keyed) + len(self._pending_edus_keyed)
) )
def send_pdu(self, pdu: EventBase, order: int) -> None: def send_pdu(self, pdu: EventBase) -> None:
"""Add a PDU to the queue, and start the transmission loop if necessary """Add a PDU to the queue, and start the transmission loop if necessary
Args: Args:
pdu: pdu to send pdu: pdu to send
order
""" """
self._pending_pdus.append((pdu, order)) self._pending_pdus.append(pdu)
self.attempt_new_transaction() self.attempt_new_transaction()
def send_presence(self, states: Iterable[UserPresenceState]) -> None: def send_presence(self, states: Iterable[UserPresenceState]) -> None:
@ -185,7 +184,7 @@ class PerDestinationQueue(object):
returns immediately. Otherwise kicks off the process of sending a returns immediately. Otherwise kicks off the process of sending a
transaction in the background. transaction in the background.
""" """
# list of (pending_pdu, deferred, order)
if self.transmission_loop_running: if self.transmission_loop_running:
# XXX: this can get stuck on by a never-ending # XXX: this can get stuck on by a never-ending
# request at which point pending_pdus just keeps growing. # request at which point pending_pdus just keeps growing.
@ -210,7 +209,7 @@ class PerDestinationQueue(object):
) )
async def _transaction_transmission_loop(self) -> None: async def _transaction_transmission_loop(self) -> None:
pending_pdus = [] # type: List[Tuple[EventBase, int]] pending_pdus = [] # type: List[EventBase]
try: try:
self.transmission_loop_running = True self.transmission_loop_running = True
@ -373,13 +372,13 @@ class PerDestinationQueue(object):
"TX [%s] Failed to send transaction: %s", self._destination, e "TX [%s] Failed to send transaction: %s", self._destination, e
) )
for p, _ in pending_pdus: for p in pending_pdus:
logger.info( logger.info(
"Failed to send event %s to %s", p.event_id, self._destination "Failed to send event %s to %s", p.event_id, self._destination
) )
except Exception: except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination) logger.exception("TX [%s] Failed to send transaction", self._destination)
for p, _ in pending_pdus: for p in pending_pdus:
logger.info( logger.info(
"Failed to send event %s to %s", p.event_id, self._destination "Failed to send event %s to %s", p.event_id, self._destination
) )

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from typing import TYPE_CHECKING, List, Tuple from typing import TYPE_CHECKING, List
from synapse.api.errors import HttpResponseException from synapse.api.errors import HttpResponseException
from synapse.events import EventBase from synapse.events import EventBase
@ -53,11 +53,17 @@ class TransactionManager(object):
@measure_func("_send_new_transaction") @measure_func("_send_new_transaction")
async def send_new_transaction( async def send_new_transaction(
self, self, destination: str, pdus: List[EventBase], edus: List[Edu],
destination: str, ) -> bool:
pending_pdus: List[Tuple[EventBase, int]], """
pending_edus: List[Edu], Args:
): destination: The destination to send to (e.g. 'example.org')
pdus: In-order list of PDUs to send
edus: List of EDUs to send
Returns:
True iff the transaction was successful
"""
# Make a transaction-sending opentracing span. This span follows on from # Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is # all the edus in that transaction. This needs to be done since there is
@ -67,7 +73,7 @@ class TransactionManager(object):
span_contexts = [] span_contexts = []
keep_destination = whitelisted_homeserver(destination) keep_destination = whitelisted_homeserver(destination)
for edu in pending_edus: for edu in edus:
context = edu.get_context() context = edu.get_context()
if context: if context:
span_contexts.append(extract_text_map(json_decoder.decode(context))) span_contexts.append(extract_text_map(json_decoder.decode(context)))
@ -75,12 +81,6 @@ class TransactionManager(object):
edu.strip_context() edu.strip_context()
with start_active_span_follows_from("send_transaction", span_contexts): with start_active_span_follows_from("send_transaction", span_contexts):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
success = True success = True
logger.debug("TX [%s] _attempt_new_transaction", destination) logger.debug("TX [%s] _attempt_new_transaction", destination)