Merge pull request #674 from matrix-org/markjh/replicate_state

Use a stream id generator to assign state group ids
This commit is contained in:
Mark Haines 2016-03-30 15:58:49 +01:00
commit fc66df1e60
4 changed files with 59 additions and 49 deletions

View File

@ -31,7 +31,7 @@ class _EventInternalMetadata(object):
return dict(self.__dict__) return dict(self.__dict__)
def is_outlier(self): def is_outlier(self):
return hasattr(self, "outlier") and self.outlier return getattr(self, "outlier", False)
def _event_dict_property(key): def _event_dict_property(key):

View File

@ -116,7 +116,7 @@ class DataStore(RoomMemberStore, RoomStore,
) )
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") self._state_groups_id_gen = StreamIdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id") self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")

View File

@ -26,6 +26,7 @@ from synapse.api.constants import EventTypes
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
from contextlib import contextmanager from contextlib import contextmanager
import logging import logging
import math import math
import ujson as json import ujson as json
@ -79,41 +80,57 @@ class EventsStore(SQLBaseStore):
len(events_and_contexts) len(events_and_contexts)
) )
state_group_id_manager = self._state_groups_id_gen.get_next_mult(
len(events_and_contexts)
)
with stream_ordering_manager as stream_orderings: with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings): with state_group_id_manager as state_group_ids:
event.internal_metadata.stream_ordering = stream for (event, context), stream, state_group_id in zip(
events_and_contexts, stream_orderings, state_group_ids
):
event.internal_metadata.stream_ordering = stream
# Assign a state group_id in case a new id is needed for
# this context. In theory we only need to assign this
# for contexts that have current_state and aren't outliers
# but that make the code more complicated. Assigning an ID
# per event only causes the state_group_ids to grow as fast
# as the stream_ordering so in practise shouldn't be a problem.
context.new_state_group_id = state_group_id
chunks = [ chunks = [
events_and_contexts[x:x + 100] events_and_contexts[x:x + 100]
for x in xrange(0, len(events_and_contexts), 100) for x in xrange(0, len(events_and_contexts), 100)
] ]
for chunk in chunks: for chunk in chunks:
# We can't easily parallelize these since different chunks # We can't easily parallelize these since different chunks
# might contain the same event. :( # might contain the same event. :(
yield self.runInteraction( yield self.runInteraction(
"persist_events", "persist_events",
self._persist_events_txn, self._persist_events_txn,
events_and_contexts=chunk, events_and_contexts=chunk,
backfilled=backfilled, backfilled=backfilled,
is_new_state=is_new_state, is_new_state=is_new_state,
) )
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def persist_event(self, event, context, def persist_event(self, event, context,
is_new_state=True, current_state=None): is_new_state=True, current_state=None):
try: try:
with self._stream_id_gen.get_next() as stream_ordering: with self._stream_id_gen.get_next() as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering with self._state_groups_id_gen.get_next() as state_group_id:
yield self.runInteraction( event.internal_metadata.stream_ordering = stream_ordering
"persist_event", context.new_state_group_id = state_group_id
self._persist_event_txn, yield self.runInteraction(
event=event, "persist_event",
context=context, self._persist_event_txn,
is_new_state=is_new_state, event=event,
current_state=current_state, context=context,
) is_new_state=is_new_state,
current_state=current_state,
)
except _RollbackButIsFineException: except _RollbackButIsFineException:
pass pass
@ -178,7 +195,7 @@ class EventsStore(SQLBaseStore):
@log_function @log_function
def _persist_event_txn(self, txn, event, context, def _persist_event_txn(self, txn, event, context,
is_new_state=True, current_state=None): is_new_state, current_state):
# We purposefully do this first since if we include a `current_state` # We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table # key, we *want* to update the `current_state_events` table
if current_state: if current_state:
@ -215,7 +232,7 @@ class EventsStore(SQLBaseStore):
@log_function @log_function
def _persist_events_txn(self, txn, events_and_contexts, backfilled, def _persist_events_txn(self, txn, events_and_contexts, backfilled,
is_new_state=True): is_new_state):
depth_updates = {} depth_updates = {}
for event, context in events_and_contexts: for event, context in events_and_contexts:
# Remove the any existing cache entries for the event_ids # Remove the any existing cache entries for the event_ids
@ -282,9 +299,7 @@ class EventsStore(SQLBaseStore):
outlier_persisted = have_persisted[event.event_id] outlier_persisted = have_persisted[event.event_id]
if not event.internal_metadata.is_outlier() and outlier_persisted: if not event.internal_metadata.is_outlier() and outlier_persisted:
self._store_state_groups_txn( self._store_mult_state_groups_txn(txn, ((event, context),))
txn, event, context,
)
metadata_json = encode_json( metadata_json = encode_json(
event.internal_metadata.get_dict() event.internal_metadata.get_dict()
@ -310,19 +325,14 @@ class EventsStore(SQLBaseStore):
self._update_extremeties(txn, [event]) self._update_extremeties(txn, [event])
events_and_contexts = filter( events_and_contexts = [
lambda ec: ec[0] not in to_remove, ec for ec in events_and_contexts if ec[0] not in to_remove
events_and_contexts ]
)
if not events_and_contexts: if not events_and_contexts:
return return
self._store_mult_state_groups_txn(txn, [ self._store_mult_state_groups_txn(txn, events_and_contexts)
(event, context)
for event, context in events_and_contexts
if not event.internal_metadata.is_outlier()
])
self._handle_mult_prev_events( self._handle_mult_prev_events(
txn, txn,

View File

@ -64,12 +64,12 @@ class StateStore(SQLBaseStore):
for group, state_map in group_to_state.items() for group, state_map in group_to_state.items()
}) })
def _store_state_groups_txn(self, txn, event, context):
return self._store_mult_state_groups_txn(txn, [(event, context)])
def _store_mult_state_groups_txn(self, txn, events_and_contexts): def _store_mult_state_groups_txn(self, txn, events_and_contexts):
state_groups = {} state_groups = {}
for event, context in events_and_contexts: for event, context in events_and_contexts:
if event.internal_metadata.is_outlier():
continue
if context.current_state is None: if context.current_state is None:
continue continue
@ -82,7 +82,8 @@ class StateStore(SQLBaseStore):
if event.is_state(): if event.is_state():
state_events[(event.type, event.state_key)] = event state_events[(event.type, event.state_key)] = event
state_group = self._state_groups_id_gen.get_next() state_group = context.new_state_group_id
self._simple_insert_txn( self._simple_insert_txn(
txn, txn,
table="state_groups", table="state_groups",
@ -114,11 +115,10 @@ class StateStore(SQLBaseStore):
table="event_to_state_groups", table="event_to_state_groups",
values=[ values=[
{ {
"state_group": state_groups[event.event_id], "state_group": state_group_id,
"event_id": event.event_id, "event_id": event_id,
} }
for event, context in events_and_contexts for event_id, state_group_id in state_groups.items()
if context.current_state is not None
], ],
) )