Merge pull request #983 from matrix-org/erikj/retry_on_integrity_error

Retry event persistence on IntegrityError
This commit is contained in:
Erik Johnston 2016-08-04 15:40:03 +01:00 committed by GitHub
commit 835ceeee76

View File

@ -27,6 +27,7 @@ from synapse.api.errors import SynapseError
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
from collections import deque, namedtuple, OrderedDict from collections import deque, namedtuple, OrderedDict
from functools import wraps
import synapse import synapse
import synapse.metrics import synapse.metrics
@ -150,6 +151,26 @@ class _EventPeristenceQueue(object):
_EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
def _retry_on_integrity_error(func):
"""Wraps a database function so that it gets retried on IntegrityError,
with `delete_existing=True` passed in.
Args:
func: function that returns a Deferred and accepts a `delete_existing` arg
"""
@wraps(func)
@defer.inlineCallbacks
def f(self, *args, **kwargs):
try:
res = yield func(self, *args, **kwargs)
except self.database_engine.module.IntegrityError:
logger.exception("IntegrityError, retrying.")
res = yield func(self, *args, delete_existing=True, **kwargs)
defer.returnValue(res)
return f
class EventsStore(SQLBaseStore): class EventsStore(SQLBaseStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@ -229,8 +250,10 @@ class EventsStore(SQLBaseStore):
self._event_persist_queue.handle_queue(room_id, persisting_queue) self._event_persist_queue.handle_queue(room_id, persisting_queue)
@_retry_on_integrity_error
@defer.inlineCallbacks @defer.inlineCallbacks
def _persist_events(self, events_and_contexts, backfilled=False): def _persist_events(self, events_and_contexts, backfilled=False,
delete_existing=False):
if not events_and_contexts: if not events_and_contexts:
return return
@ -273,12 +296,15 @@ class EventsStore(SQLBaseStore):
self._persist_events_txn, self._persist_events_txn,
events_and_contexts=chunk, events_and_contexts=chunk,
backfilled=backfilled, backfilled=backfilled,
delete_existing=delete_existing,
) )
persist_event_counter.inc_by(len(chunk)) persist_event_counter.inc_by(len(chunk))
@_retry_on_integrity_error
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def _persist_event(self, event, context, current_state=None, backfilled=False): def _persist_event(self, event, context, current_state=None, backfilled=False,
delete_existing=False):
try: try:
with self._stream_id_gen.get_next() as stream_ordering: with self._stream_id_gen.get_next() as stream_ordering:
with self._state_groups_id_gen.get_next() as state_group_id: with self._state_groups_id_gen.get_next() as state_group_id:
@ -291,6 +317,7 @@ class EventsStore(SQLBaseStore):
context=context, context=context,
current_state=current_state, current_state=current_state,
backfilled=backfilled, backfilled=backfilled,
delete_existing=delete_existing,
) )
persist_event_counter.inc() persist_event_counter.inc()
except _RollbackButIsFineException: except _RollbackButIsFineException:
@ -353,7 +380,8 @@ class EventsStore(SQLBaseStore):
defer.returnValue({e.event_id: e for e in events}) defer.returnValue({e.event_id: e for e in events})
@log_function @log_function
def _persist_event_txn(self, txn, event, context, current_state, backfilled=False): def _persist_event_txn(self, txn, event, context, current_state, backfilled=False,
delete_existing=False):
# We purposefully do this first since if we include a `current_state` # We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table # key, we *want* to update the `current_state_events` table
if current_state: if current_state:
@ -393,15 +421,20 @@ class EventsStore(SQLBaseStore):
txn, txn,
[(event, context)], [(event, context)],
backfilled=backfilled, backfilled=backfilled,
delete_existing=delete_existing,
) )
@log_function @log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled): def _persist_events_txn(self, txn, events_and_contexts, backfilled,
delete_existing=False):
"""Insert some number of room events into the necessary database tables. """Insert some number of room events into the necessary database tables.
Rejected events are only inserted into the events table, the events_json table, Rejected events are only inserted into the events table, the events_json table,
and the rejections table. Things reading from those table will need to check and the rejections table. Things reading from those table will need to check
whether the event was rejected. whether the event was rejected.
If delete_existing is True then existing events will be purged from the
database before insertion. This is useful when retrying due to IntegrityError.
""" """
# Ensure that we don't have the same event twice. # Ensure that we don't have the same event twice.
# Pick the earliest non-outlier if there is one, else the earliest one. # Pick the earliest non-outlier if there is one, else the earliest one.
@ -537,6 +570,35 @@ class EventsStore(SQLBaseStore):
] ]
} }
if delete_existing:
# For paranoia reasons, we go and delete all the existing entries
# for these events so we can reinsert them.
# This gets around any problems with some tables already having
# entries.
logger.info("Deleting existing")
for table in (
"events",
"event_json",
"event_content_hashes",
"event_destinations",
"event_edge_hashes",
"event_edges",
"event_forward_extremities",
"event_push_actions",
"event_reference_hashes",
"event_search",
"event_signatures",
"event_to_state_groups",
"rejections",
"redactions",
):
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
[(ev.event_id,) for ev, _ in events_and_contexts]
)
self._simple_insert_many_txn( self._simple_insert_many_txn(
txn, txn,
table="event_json", table="event_json",