Invalidate retry cache in both directions

This commit is contained in:
Erik Johnston 2016-11-22 17:45:44 +00:00
parent 51e89709aa
commit 90565d015e
6 changed files with 132 additions and 27 deletions

View file

@ -14,6 +14,7 @@
# limitations under the License.
from ._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
from synapse.util.caches.descriptors import cached
from twisted.internet import defer
@ -200,25 +201,48 @@ class TransactionStore(SQLBaseStore):
def _set_destination_retry_timings(self, txn, destination,
retry_last_ts, retry_interval):
txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
self.database_engine.lock_table(txn, "destinations")
self._simple_upsert_txn(
self._invalidate_cache_and_stream(
txn, self.get_destination_retry_timings, (destination,)
)
# We need to be careful here as the data may have changed from under us
# due to a worker setting the timings.
prev_row = self._simple_select_one_txn(
txn,
"destinations",
table="destinations",
keyvalues={
"destination": destination,
},
values={
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
insertion_values={
"destination": destination,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
}
retcols=("retry_last_ts", "retry_interval"),
allow_none=True,
)
if not prev_row:
self._simple_insert_txn(
txn,
table="destinations",
values={
"destination": destination,
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
}
)
elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
self._simple_update_one_txn(
txn,
"destinations",
keyvalues={
"destination": destination,
},
updatevalues={
"retry_last_ts": retry_last_ts,
"retry_interval": retry_interval,
},
)
def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction.