Merge pull request #569 from matrix-org/erikj/initial_sync

Improvements to initial /syncs
This commit is contained in:
Erik Johnston 2016-02-10 13:43:15 +00:00
commit 4a2ace1857
2 changed files with 76 additions and 75 deletions

View File

@ -18,7 +18,7 @@ from ._base import BaseHandler
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from twisted.internet import defer from twisted.internet import defer
@ -228,10 +228,14 @@ class SyncHandler(BaseHandler):
invited = [] invited = []
archived = [] archived = []
deferreds = [] deferreds = []
for event in room_list:
if event.membership == Membership.JOIN: room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
with PreserveLoggingContext(LoggingContext.current_context()): for room_list_chunk in room_list_chunks:
room_sync_deferred = self.full_state_sync_for_joined_room( for event in room_list_chunk:
if event.membership == Membership.JOIN:
room_sync_deferred = preserve_fn(
self.full_state_sync_for_joined_room
)(
room_id=event.room_id, room_id=event.room_id,
sync_config=sync_config, sync_config=sync_config,
now_token=now_token, now_token=now_token,
@ -240,20 +244,21 @@ class SyncHandler(BaseHandler):
tags_by_room=tags_by_room, tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room, account_data_by_room=account_data_by_room,
) )
room_sync_deferred.addCallback(joined.append) room_sync_deferred.addCallback(joined.append)
deferreds.append(room_sync_deferred) deferreds.append(room_sync_deferred)
elif event.membership == Membership.INVITE: elif event.membership == Membership.INVITE:
invite = yield self.store.get_event(event.event_id) invite = yield self.store.get_event(event.event_id)
invited.append(InvitedSyncResult( invited.append(InvitedSyncResult(
room_id=event.room_id, room_id=event.room_id,
invite=invite, invite=invite,
)) ))
elif event.membership in (Membership.LEAVE, Membership.BAN): elif event.membership in (Membership.LEAVE, Membership.BAN):
leave_token = now_token.copy_and_replace( leave_token = now_token.copy_and_replace(
"room_key", "s%d" % (event.stream_ordering,) "room_key", "s%d" % (event.stream_ordering,)
) )
with PreserveLoggingContext(LoggingContext.current_context()): room_sync_deferred = preserve_fn(
room_sync_deferred = self.full_state_sync_for_archived_room( self.full_state_sync_for_archived_room
)(
sync_config=sync_config, sync_config=sync_config,
room_id=event.room_id, room_id=event.room_id,
leave_event_id=event.event_id, leave_event_id=event.event_id,
@ -262,12 +267,12 @@ class SyncHandler(BaseHandler):
tags_by_room=tags_by_room, tags_by_room=tags_by_room,
account_data_by_room=account_data_by_room, account_data_by_room=account_data_by_room,
) )
room_sync_deferred.addCallback(archived.append) room_sync_deferred.addCallback(archived.append)
deferreds.append(room_sync_deferred) deferreds.append(room_sync_deferred)
yield defer.gatherResults( yield defer.gatherResults(
deferreds, consumeErrors=True deferreds, consumeErrors=True
).addErrback(unwrapFirstError) ).addErrback(unwrapFirstError)
account_data_for_user = sync_config.filter_collection.filter_account_data( account_data_for_user = sync_config.filter_collection.filter_account_data(
self.account_data_for_user(account_data) self.account_data_for_user(account_data)

View File

@ -171,41 +171,43 @@ class StateStore(SQLBaseStore):
events = yield self._get_events(event_ids, get_prev_content=False) events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events) defer.returnValue(events)
def _get_state_groups_from_groups(self, groups_and_types): def _get_state_groups_from_groups(self, groups, types):
"""Returns dictionary state_group -> state event ids """Returns dictionary state_group -> state event ids
Args:
groups_and_types (list): list of 2-tuple (`group`, `types`)
""" """
def f(txn): def f(txn, groups):
if types is not None:
where_clause = "AND (%s)" % (
" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
)
else:
where_clause = ""
sql = (
"SELECT state_group, event_id FROM state_groups_state WHERE"
" state_group IN (%s) %s" % (
",".join("?" for _ in groups),
where_clause,
)
)
args = list(groups)
if types is not None:
args.extend([i for typ in types for i in typ])
txn.execute(sql, args)
rows = self.cursor_to_dict(txn)
results = {} results = {}
for group, types in groups_and_types: for row in rows:
if types is not None: results.setdefault(row["state_group"], []).append(row["event_id"])
where_clause = "AND (%s)" % (
" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
)
else:
where_clause = ""
sql = (
"SELECT event_id FROM state_groups_state WHERE"
" state_group = ? %s"
) % (where_clause,)
args = [group]
if types is not None:
args.extend([i for typ in types for i in typ])
txn.execute(sql, args)
results[group] = [r[0] for r in txn.fetchall()]
return results return results
return self.runInteraction( chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
"_get_state_groups_from_groups", for chunk in chunks:
f, return self.runInteraction(
) "_get_state_groups_from_groups",
f, chunk
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_for_events(self, event_ids, types): def get_state_for_events(self, event_ids, types):
@ -264,26 +266,20 @@ class StateStore(SQLBaseStore):
) )
@cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids", @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
num_args=1) num_args=1, inlineCallbacks=True)
def _get_state_group_for_events(self, event_ids): def _get_state_group_for_events(self, event_ids):
"""Returns mapping event_id -> state_group """Returns mapping event_id -> state_group
""" """
def f(txn): rows = yield self._simple_select_many_batch(
results = {} table="event_to_state_groups",
for event_id in event_ids: column="event_id",
results[event_id] = self._simple_select_one_onecol_txn( iterable=event_ids,
txn, keyvalues={},
table="event_to_state_groups", retcols=("event_id", "state_group",),
keyvalues={ desc="_get_state_group_for_events",
"event_id": event_id, )
},
retcol="state_group",
allow_none=True,
)
return results defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
return self.runInteraction("_get_state_group_for_events", f)
def _get_some_state_from_cache(self, group, types): def _get_some_state_from_cache(self, group, types):
"""Checks if group is in cache. See `_get_state_for_groups` """Checks if group is in cache. See `_get_state_for_groups`
@ -355,7 +351,7 @@ class StateStore(SQLBaseStore):
all events are returned. all events are returned.
""" """
results = {} results = {}
missing_groups_and_types = [] missing_groups = []
if types is not None: if types is not None:
for group in set(groups): for group in set(groups):
state_dict, missing_types, got_all = self._get_some_state_from_cache( state_dict, missing_types, got_all = self._get_some_state_from_cache(
@ -364,7 +360,7 @@ class StateStore(SQLBaseStore):
results[group] = state_dict results[group] = state_dict
if not got_all: if not got_all:
missing_groups_and_types.append((group, missing_types)) missing_groups.append(group)
else: else:
for group in set(groups): for group in set(groups):
state_dict, got_all = self._get_all_state_from_cache( state_dict, got_all = self._get_all_state_from_cache(
@ -373,9 +369,9 @@ class StateStore(SQLBaseStore):
results[group] = state_dict results[group] = state_dict
if not got_all: if not got_all:
missing_groups_and_types.append((group, None)) missing_groups.append(group)
if not missing_groups_and_types: if not missing_groups:
defer.returnValue({ defer.returnValue({
group: { group: {
type_tuple: event type_tuple: event
@ -389,7 +385,7 @@ class StateStore(SQLBaseStore):
cache_seq_num = self._state_group_cache.sequence cache_seq_num = self._state_group_cache.sequence
group_state_dict = yield self._get_state_groups_from_groups( group_state_dict = yield self._get_state_groups_from_groups(
missing_groups_and_types missing_groups, types
) )
state_events = yield self._get_events( state_events = yield self._get_events(