mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-13 18:29:26 -05:00
Merge pull request #343 from matrix-org/erikj/fix_retries
Fix broken cache for getting retry times.
This commit is contained in:
commit
6a3a840b19
@ -202,6 +202,7 @@ class TransactionQueue(object):
|
|||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def _attempt_new_transaction(self, destination):
|
def _attempt_new_transaction(self, destination):
|
||||||
|
# list of (pending_pdu, deferred, order)
|
||||||
if destination in self.pending_transactions:
|
if destination in self.pending_transactions:
|
||||||
# XXX: pending_transactions can get stuck on by a never-ending
|
# XXX: pending_transactions can get stuck on by a never-ending
|
||||||
# request at which point pending_pdus_by_dest just keeps growing.
|
# request at which point pending_pdus_by_dest just keeps growing.
|
||||||
@ -213,9 +214,6 @@ class TransactionQueue(object):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
|
||||||
|
|
||||||
# list of (pending_pdu, deferred, order)
|
|
||||||
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
|
||||||
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
pending_edus = self.pending_edus_by_dest.pop(destination, [])
|
||||||
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
pending_failures = self.pending_failures_by_dest.pop(destination, [])
|
||||||
@ -228,20 +226,22 @@ class TransactionQueue(object):
|
|||||||
logger.debug("TX [%s] Nothing to send", destination)
|
logger.debug("TX [%s] Nothing to send", destination)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Sort based on the order field
|
|
||||||
pending_pdus.sort(key=lambda t: t[2])
|
|
||||||
|
|
||||||
pdus = [x[0] for x in pending_pdus]
|
|
||||||
edus = [x[0] for x in pending_edus]
|
|
||||||
failures = [x[0].get_dict() for x in pending_failures]
|
|
||||||
deferreds = [
|
|
||||||
x[1]
|
|
||||||
for x in pending_pdus + pending_edus + pending_failures
|
|
||||||
]
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.pending_transactions[destination] = 1
|
self.pending_transactions[destination] = 1
|
||||||
|
|
||||||
|
logger.debug("TX [%s] _attempt_new_transaction", destination)
|
||||||
|
|
||||||
|
# Sort based on the order field
|
||||||
|
pending_pdus.sort(key=lambda t: t[2])
|
||||||
|
|
||||||
|
pdus = [x[0] for x in pending_pdus]
|
||||||
|
edus = [x[0] for x in pending_edus]
|
||||||
|
failures = [x[0].get_dict() for x in pending_failures]
|
||||||
|
deferreds = [
|
||||||
|
x[1]
|
||||||
|
for x in pending_pdus + pending_edus + pending_failures
|
||||||
|
]
|
||||||
|
|
||||||
txn_id = str(self._next_txn_id)
|
txn_id = str(self._next_txn_id)
|
||||||
|
|
||||||
limiter = yield get_retry_limiter(
|
limiter = yield get_retry_limiter(
|
||||||
|
@ -253,16 +253,6 @@ class TransactionStore(SQLBaseStore):
|
|||||||
retry_interval (int) - how long until next retry in ms
|
retry_interval (int) - how long until next retry in ms
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# As this is the new value, we might as well prefill the cache
|
|
||||||
self.get_destination_retry_timings.prefill(
|
|
||||||
destination,
|
|
||||||
{
|
|
||||||
"destination": destination,
|
|
||||||
"retry_last_ts": retry_last_ts,
|
|
||||||
"retry_interval": retry_interval
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
# XXX: we could chose to not bother persisting this if our cache thinks
|
# XXX: we could chose to not bother persisting this if our cache thinks
|
||||||
# this is a NOOP
|
# this is a NOOP
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
@ -275,31 +265,25 @@ class TransactionStore(SQLBaseStore):
|
|||||||
|
|
||||||
def _set_destination_retry_timings(self, txn, destination,
|
def _set_destination_retry_timings(self, txn, destination,
|
||||||
retry_last_ts, retry_interval):
|
retry_last_ts, retry_interval):
|
||||||
query = (
|
txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
|
||||||
"UPDATE destinations"
|
|
||||||
" SET retry_last_ts = ?, retry_interval = ?"
|
|
||||||
" WHERE destination = ?"
|
|
||||||
)
|
|
||||||
|
|
||||||
txn.execute(
|
self._simple_upsert_txn(
|
||||||
query,
|
txn,
|
||||||
(
|
"destinations",
|
||||||
retry_last_ts, retry_interval, destination,
|
keyvalues={
|
||||||
)
|
"destination": destination,
|
||||||
|
},
|
||||||
|
values={
|
||||||
|
"retry_last_ts": retry_last_ts,
|
||||||
|
"retry_interval": retry_interval,
|
||||||
|
},
|
||||||
|
insertion_values={
|
||||||
|
"destination": destination,
|
||||||
|
"retry_last_ts": retry_last_ts,
|
||||||
|
"retry_interval": retry_interval,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
if txn.rowcount == 0:
|
|
||||||
# destination wasn't already in table. Insert it.
|
|
||||||
self._simple_insert_txn(
|
|
||||||
txn,
|
|
||||||
table="destinations",
|
|
||||||
values={
|
|
||||||
"destination": destination,
|
|
||||||
"retry_last_ts": retry_last_ts,
|
|
||||||
"retry_interval": retry_interval,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
def get_destinations_needing_retry(self):
|
def get_destinations_needing_retry(self):
|
||||||
"""Get all destinations which are due a retry for sending a transaction.
|
"""Get all destinations which are due a retry for sending a transaction.
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user