initial cut at a room summary API (#3574)

This commit is contained in:
Matthew Hodgson 2018-08-16 10:46:50 +02:00 committed by Richard van der Hoff
parent 859989f958
commit 3f543dc021
6 changed files with 159 additions and 18 deletions

1
changelog.d/3574.feature Normal file
View File

@ -0,0 +1 @@
implement `summary` block in /sync response as per MSC688

View File

@ -75,6 +75,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"ephemeral", "ephemeral",
"account_data", "account_data",
"unread_notifications", "unread_notifications",
"summary",
])): ])):
__slots__ = [] __slots__ = []
@ -503,10 +504,142 @@ class SyncHandler(object):
state = {} state = {}
defer.returnValue(state) defer.returnValue(state)
@defer.inlineCallbacks
def compute_summary(self, room_id, sync_config, batch, state, now_token):
""" Works out a room summary block for this room, summarising the number
of joined members in the room, and providing the 'hero' members if the
room has no name so clients can consistently name rooms. Also adds
state events to 'state' if needed to describe the heroes.
Args:
room_id(str):
sync_config(synapse.handlers.sync.SyncConfig):
batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
the room that will be sent to the user.
state(dict): dict of (type, state_key) -> Event as returned by
compute_state_delta
now_token(str): Token of the end of the current batch.
Returns:
A deferred dict describing the room summary
"""
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
last_events, _ = yield self.store.get_recent_event_ids_for_room(
room_id, end_token=now_token.room_key, limit=1,
)
if not last_events:
defer.returnValue(None)
return
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
last_event.event_id, [
(EventTypes.Member, None),
(EventTypes.Name, ''),
(EventTypes.CanonicalAlias, ''),
]
)
member_ids = {
state_key: event_id
for (t, state_key), event_id in state_ids.iteritems()
if t == EventTypes.Member
}
name_id = state_ids.get((EventTypes.Name, ''))
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
summary = {}
# FIXME: it feels very heavy to load up every single membership event
# just to calculate the counts.
member_events = yield self.store.get_events(member_ids.values())
joined_user_ids = []
invited_user_ids = []
for ev in member_events.values():
if ev.content.get("membership") == Membership.JOIN:
joined_user_ids.append(ev.state_key)
elif ev.content.get("membership") == Membership.INVITE:
invited_user_ids.append(ev.state_key)
# TODO: only send these when they change.
summary["m.joined_member_count"] = len(joined_user_ids)
summary["m.invited_member_count"] = len(invited_user_ids)
if name_id or canonical_alias_id:
defer.returnValue(summary)
# FIXME: order by stream ordering, not alphabetic
me = sync_config.user.to_string()
if (joined_user_ids or invited_user_ids):
summary['m.heroes'] = sorted(
[
user_id
for user_id in (joined_user_ids + invited_user_ids)
if user_id != me
]
)[0:5]
else:
summary['m.heroes'] = sorted(
[user_id for user_id in member_ids.keys() if user_id != me]
)[0:5]
if not sync_config.filter_collection.lazy_load_members():
defer.returnValue(summary)
# ensure we send membership events for heroes if needed
cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.get_lazy_loaded_members_cache(cache_key)
# track which members the client should already know about via LL:
# Ones which are already in state...
existing_members = set(
user_id for (typ, user_id) in state.keys()
if typ == EventTypes.Member
)
# ...or ones which are in the timeline...
for ev in batch.events:
if ev.type == EventTypes.Member:
existing_members.add(ev.state_key)
# ...and then ensure any missing ones get included in state.
missing_hero_event_ids = [
member_ids[hero_id]
for hero_id in summary['m.heroes']
if (
cache.get(hero_id) != member_ids[hero_id] and
hero_id not in existing_members
)
]
missing_hero_state = yield self.store.get_events(missing_hero_event_ids)
missing_hero_state = missing_hero_state.values()
for s in missing_hero_state:
cache.set(s.state_key, s.event_id)
state[(EventTypes.Member, s.state_key)] = s
defer.returnValue(summary)
def get_lazy_loaded_members_cache(self, cache_key):
cache = self.lazy_loaded_members_cache.get(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)
return cache
@defer.inlineCallbacks @defer.inlineCallbacks
def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token, def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
full_state): full_state):
""" Works out the differnce in state between the start of the timeline """ Works out the difference in state between the start of the timeline
and the previous sync. and the previous sync.
Args: Args:
@ -520,7 +653,7 @@ class SyncHandler(object):
full_state(bool): Whether to force returning the full state. full_state(bool): Whether to force returning the full state.
Returns: Returns:
A deferred new event dictionary A deferred dict of (type, state_key) -> Event
""" """
# TODO(mjark) Check if the state events were received by the server # TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state # after the previous sync, since we need to include those state
@ -618,13 +751,7 @@ class SyncHandler(object):
if lazy_load_members and not include_redundant_members: if lazy_load_members and not include_redundant_members:
cache_key = (sync_config.user.to_string(), sync_config.device_id) cache_key = (sync_config.user.to_string(), sync_config.device_id)
cache = self.lazy_loaded_members_cache.get(cache_key) cache = self.get_lazy_loaded_members_cache(cache_key)
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
self.lazy_loaded_members_cache[cache_key] = cache
else:
logger.debug("found LruCache for %r", cache_key)
# if it's a new sync sequence, then assume the client has had # if it's a new sync sequence, then assume the client has had
# amnesia and doesn't want any recent lazy-loaded members # amnesia and doesn't want any recent lazy-loaded members
@ -1425,7 +1552,6 @@ class SyncHandler(object):
if events == [] and tags is None: if events == [] and tags is None:
return return
since_token = sync_result_builder.since_token
now_token = sync_result_builder.now_token now_token = sync_result_builder.now_token
sync_config = sync_result_builder.sync_config sync_config = sync_result_builder.sync_config
@ -1468,6 +1594,18 @@ class SyncHandler(object):
full_state=full_state full_state=full_state
) )
summary = {}
if (
sync_config.filter_collection.lazy_load_members() and
(
any(ev.type == EventTypes.Member for ev in batch.events) or
since_token is None
)
):
summary = yield self.compute_summary(
room_id, sync_config, batch, state, now_token
)
if room_builder.rtype == "joined": if room_builder.rtype == "joined":
unread_notifications = {} unread_notifications = {}
room_sync = JoinedSyncResult( room_sync = JoinedSyncResult(
@ -1477,6 +1615,7 @@ class SyncHandler(object):
ephemeral=ephemeral, ephemeral=ephemeral,
account_data=account_data_events, account_data=account_data_events,
unread_notifications=unread_notifications, unread_notifications=unread_notifications,
summary=summary,
) )
if room_sync or always_include: if room_sync or always_include:

View File

@ -370,6 +370,7 @@ class SyncRestServlet(RestServlet):
ephemeral_events = room.ephemeral ephemeral_events = room.ephemeral
result["ephemeral"] = {"events": ephemeral_events} result["ephemeral"] = {"events": ephemeral_events}
result["unread_notifications"] = room.unread_notifications result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary
return result return result

View File

@ -1150,17 +1150,16 @@ class SQLBaseStore(object):
defer.returnValue(retval) defer.returnValue(retval)
def get_user_count_txn(self, txn): def get_user_count_txn(self, txn):
"""Get a total number of registerd users in the users list. """Get a total number of registered users in the users list.
Args: Args:
txn : Transaction object txn : Transaction object
Returns: Returns:
defer.Deferred: resolves to int int : number of users
""" """
sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;" sql_count = "SELECT COUNT(*) FROM users WHERE is_guest = 0;"
txn.execute(sql_count) txn.execute(sql_count)
count = txn.fetchone()[0] return txn.fetchone()[0]
defer.returnValue(count)
def _simple_search_list(self, table, term, col, retcols, def _simple_search_list(self, table, term, col, retcols,
desc="_simple_search_list"): desc="_simple_search_list"):

View File

@ -480,7 +480,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None): def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None):
""" """
Get the state dicts corresponding to a list of events Get the state dicts corresponding to a list of events, containing the event_ids
of the state events (as opposed to the events themselves)
Args: Args:
event_ids(list(str)): events whose state should be returned event_ids(list(str)): events whose state should be returned
@ -493,7 +494,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events. If None, `types` filtering is applied to all events.
Returns: Returns:
A deferred dict from event_id -> (type, state_key) -> state_event A deferred dict from event_id -> (type, state_key) -> event_id
""" """
event_to_groups = yield self._get_state_group_for_events( event_to_groups = yield self._get_state_group_for_events(
event_ids, event_ids,