Merge pull request #2549 from matrix-org/rav/event_persist_logcontexts

Fix logcontext handling for persist_events
This commit is contained in:
Richard van der Hoff 2017-10-17 13:20:35 +01:00 committed by GitHub
commit 4cc8bb0767
2 changed files with 22 additions and 7 deletions

View File

@ -21,7 +21,7 @@ from synapse.events.utils import prune_event
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import (
preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
preserve_fn, PreserveLoggingContext, make_deferred_yieldable
)
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@ -88,13 +88,23 @@ class _EventPeristenceQueue(object):
def add_to_queue(self, room_id, events_and_contexts, backfilled):
"""Add events to the queue, with the given persist_event options.
NB: due to the normal usage pattern of this method, it does *not*
follow the synapse logcontext rules, and leaves the logcontext in
place whether or not the returned deferred is ready.
Args:
room_id (str):
events_and_contexts (list[(EventBase, EventContext)]):
backfilled (bool):
Returns:
defer.Deferred: a deferred which will resolve once the events are
persisted. Runs its callbacks *without* a logcontext.
"""
queue = self._event_persist_queues.setdefault(room_id, deque())
if queue:
# if the last item in the queue has the same `backfilled` setting,
# we can just add these new events to that item.
end_item = queue[-1]
if end_item.backfilled == backfilled:
end_item.events_and_contexts.extend(events_and_contexts)
@ -113,11 +123,11 @@ class _EventPeristenceQueue(object):
def handle_queue(self, room_id, per_item_callback):
"""Attempts to handle the queue for a room if not already being handled.
The given callback will be invoked with for each item in the queue,1
The given callback will be invoked with for each item in the queue,
of type _EventPersistQueueItem. The per_item_callback will continuously
be called with new items, unless the queue becomnes empty. The return
value of the function will be given to the deferreds waiting on the item,
exceptions will be passed to the deferres as well.
exceptions will be passed to the deferreds as well.
This function should therefore be called whenever anything is added
to the queue.
@ -233,7 +243,7 @@ class EventsStore(SQLBaseStore):
deferreds = []
for room_id, evs_ctxs in partitioned.iteritems():
d = preserve_fn(self._event_persist_queue.add_to_queue)(
d = self._event_persist_queue.add_to_queue(
room_id, evs_ctxs,
backfilled=backfilled,
)
@ -242,7 +252,7 @@ class EventsStore(SQLBaseStore):
for room_id in partitioned:
self._maybe_start_persisting(room_id)
return preserve_context_over_deferred(
return make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True)
)
@ -267,7 +277,7 @@ class EventsStore(SQLBaseStore):
self._maybe_start_persisting(event.room_id)
yield preserve_context_over_deferred(deferred)
yield make_deferred_yieldable(deferred)
max_persisted_id = yield self._stream_id_gen.get_current_token()
defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@ -1526,7 +1536,7 @@ class EventsStore(SQLBaseStore):
if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]]
res = yield preserve_context_over_deferred(defer.gatherResults(
res = yield make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],

View File

@ -53,6 +53,11 @@ class ObservableDeferred(object):
Cancelling or otherwise resolving an observer will not affect the original
ObservableDeferred.
NB that it does not attempt to do anything with logcontexts; in general
you should probably make_deferred_yieldable the deferreds
returned by `observe`, and ensure that the original deferred runs its
callbacks in the sentinel logcontext.
"""
__slots__ = ["_deferred", "_observers", "_result"]