track replication destination health, and perform exponential back-off when sending transactions. does *not* yet retry transactions, but drops them on the floor if waiting for a server to recover.

This commit is contained in:
Matthew Hodgson 2014-12-07 02:26:07 +00:00
parent ce212eb83a
commit aed62a3583
9 changed files with 156 additions and 18 deletions

View file

@ -114,7 +114,7 @@ class TransactionStore(SQLBaseStore):
def _prep_send_transaction(self, txn, transaction_id, destination,
origin_server_ts):
# First we find out what the prev_txs should be.
# First we find out what the prev_txns should be.
# Since we know that we are only sending one transaction at a time,
# we can simply take the last one.
query = "%s ORDER BY id DESC LIMIT 1" % (
@ -205,6 +205,71 @@ class TransactionStore(SQLBaseStore):
return ReceivedTransactionsTable.decode_results(txn.fetchall())
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
Args:
destination (str)
Returns:
None if not retrying
tuple: (retry_last_ts, retry_interval)
retry_ts: time of last retry attempt in unix epoch ms
retry_interval: how long until next retry in ms
"""
return self.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings, destination)
def _get_destination_retry_timings(cls, txn, destination):
query = DestinationsTable.select_statement("destination = ?")
txn.execute(query, (destination,))
result = DestinationsTable.decode_single_result(txn.fetchone())
if result and result[0] > 0:
return result
else:
return None
def set_destination_retry_timings(self, destination):
"""Sets the current retry timings for a given destination.
Both timings should be zero if retrying is no longer occuring.
Args:
destination (str)
retry_last_ts (int) - time of last retry attempt in unix epoch ms
retry_interval (int) - how long until next retry in ms
"""
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings, destination, retry_last_ts, retry_interval)
def _set_destination_retry_timings(cls, txn, destination, retry_last_ts, retry_interval):
query = (
"INSERT OR REPLACE INTO %s "
"(retry_last_ts, retry_interval) "
"VALUES (?, ?) "
"WHERE destination = ?"
) % DestinationsTable.table_name
txn.execute(query, (retry_last_ts, retry_interval, destination))
def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction.
Returns:
list: A list of `DestinationsTable.EntryType`
"""
return self.runInteraction(
"get_destinations_needing_retry",
self._get_destinations_needing_retry
)
def _get_destinations_needing_retry(cls, txn):
where = "retry_last_ts > 0 and retry_next_ts < now()"
query = DestinationsTable.select_statement(where)
txn.execute(query)
return DestinationsTable.decode_results(txn.fetchall())
class ReceivedTransactionsTable(Table):
table_name = "received_transactions"
@ -247,3 +312,14 @@ class TransactionsToPduTable(Table):
]
EntryType = namedtuple("TransactionsToPduEntry", fields)
class DestinationsTable(Table):
table_name = "destinations"
fields = [
"destination",
"retry_last_ts",
"retry_interval",
]
EntryType = namedtuple("DestinationsEntry", fields)