mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-07 12:37:55 -05:00
Deduplicate redundant lazy-loaded members (#3331)
* attempt at deduplicating lazy-loaded members as per the proposal; we can deduplicate redundant lazy-loaded members which are sent in the same sync sequence. we do this heuristically rather than requiring the client to somehow tell us which members it has chosen to cache, by instead caching the last N members sent to a client, and not sending them again. For now we hardcode N to 100. Each cache for a given (user,device) tuple is in turn cached for up to X minutes (to avoid the caches building up). For now we hardcode X to 30. * add include_redundant_members filter option & make it work * remove stale todo * add tests for _get_some_state_from_cache * incorporate review
This commit is contained in:
parent
9e68b1bd2d
commit
a75231b507
1
changelog.d/3331.feature
Normal file
1
changelog.d/3331.feature
Normal file
@ -0,0 +1 @@
|
|||||||
|
add support for the include_redundant_members filter param as per MSC1227
|
@ -117,6 +117,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
|
|||||||
"lazy_load_members": {
|
"lazy_load_members": {
|
||||||
"type": "boolean"
|
"type": "boolean"
|
||||||
},
|
},
|
||||||
|
"include_redundant_members": {
|
||||||
|
"type": "boolean"
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -267,6 +270,9 @@ class FilterCollection(object):
|
|||||||
def lazy_load_members(self):
|
def lazy_load_members(self):
|
||||||
return self._room_state_filter.lazy_load_members()
|
return self._room_state_filter.lazy_load_members()
|
||||||
|
|
||||||
|
def include_redundant_members(self):
|
||||||
|
return self._room_state_filter.include_redundant_members()
|
||||||
|
|
||||||
def filter_presence(self, events):
|
def filter_presence(self, events):
|
||||||
return self._presence_filter.filter(events)
|
return self._presence_filter.filter(events)
|
||||||
|
|
||||||
@ -426,6 +432,9 @@ class Filter(object):
|
|||||||
def lazy_load_members(self):
|
def lazy_load_members(self):
|
||||||
return self.filter_json.get("lazy_load_members", False)
|
return self.filter_json.get("lazy_load_members", False)
|
||||||
|
|
||||||
|
def include_redundant_members(self):
|
||||||
|
return self.filter_json.get("include_redundant_members", False)
|
||||||
|
|
||||||
|
|
||||||
def _matches_wildcard(actual_value, filter_value):
|
def _matches_wildcard(actual_value, filter_value):
|
||||||
if filter_value.endswith("*"):
|
if filter_value.endswith("*"):
|
||||||
|
@ -26,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
|
|||||||
from synapse.push.clientformat import format_push_rules_for_user
|
from synapse.push.clientformat import format_push_rules_for_user
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken
|
||||||
from synapse.util.async import concurrently_execute
|
from synapse.util.async import concurrently_execute
|
||||||
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
|
from synapse.util.caches.lrucache import LruCache
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.util.logcontext import LoggingContext
|
from synapse.util.logcontext import LoggingContext
|
||||||
from synapse.util.metrics import Measure, measure_func
|
from synapse.util.metrics import Measure, measure_func
|
||||||
@ -33,6 +35,14 @@ from synapse.visibility import filter_events_for_client
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Store the cache that tracks which lazy-loaded members have been sent to a given
|
||||||
|
# client for no more than 30 minutes.
|
||||||
|
LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
|
||||||
|
|
||||||
|
# Remember the last 100 members we sent to a client for the purposes of
|
||||||
|
# avoiding redundantly sending the same lazy-loaded members to the client
|
||||||
|
LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
|
||||||
|
|
||||||
|
|
||||||
SyncConfig = collections.namedtuple("SyncConfig", [
|
SyncConfig = collections.namedtuple("SyncConfig", [
|
||||||
"user",
|
"user",
|
||||||
@ -182,6 +192,12 @@ class SyncHandler(object):
|
|||||||
self.response_cache = ResponseCache(hs, "sync")
|
self.response_cache = ResponseCache(hs, "sync")
|
||||||
self.state = hs.get_state_handler()
|
self.state = hs.get_state_handler()
|
||||||
|
|
||||||
|
# ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
|
||||||
|
self.lazy_loaded_members_cache = ExpiringCache(
|
||||||
|
"lazy_loaded_members_cache", self.clock,
|
||||||
|
max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
|
||||||
|
)
|
||||||
|
|
||||||
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
|
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
|
||||||
full_state=False):
|
full_state=False):
|
||||||
"""Get the sync for a client if we have new data for it now. Otherwise
|
"""Get the sync for a client if we have new data for it now. Otherwise
|
||||||
@ -505,9 +521,13 @@ class SyncHandler(object):
|
|||||||
with Measure(self.clock, "compute_state_delta"):
|
with Measure(self.clock, "compute_state_delta"):
|
||||||
|
|
||||||
types = None
|
types = None
|
||||||
lazy_load_members = sync_config.filter_collection.lazy_load_members()
|
|
||||||
filtered_types = None
|
filtered_types = None
|
||||||
|
|
||||||
|
lazy_load_members = sync_config.filter_collection.lazy_load_members()
|
||||||
|
include_redundant_members = (
|
||||||
|
sync_config.filter_collection.include_redundant_members()
|
||||||
|
)
|
||||||
|
|
||||||
if lazy_load_members:
|
if lazy_load_members:
|
||||||
# We only request state for the members needed to display the
|
# We only request state for the members needed to display the
|
||||||
# timeline:
|
# timeline:
|
||||||
@ -523,6 +543,11 @@ class SyncHandler(object):
|
|||||||
# only apply the filtering to room members
|
# only apply the filtering to room members
|
||||||
filtered_types = [EventTypes.Member]
|
filtered_types = [EventTypes.Member]
|
||||||
|
|
||||||
|
timeline_state = {
|
||||||
|
(event.type, event.state_key): event.event_id
|
||||||
|
for event in batch.events if event.is_state()
|
||||||
|
}
|
||||||
|
|
||||||
if full_state:
|
if full_state:
|
||||||
if batch:
|
if batch:
|
||||||
current_state_ids = yield self.store.get_state_ids_for_event(
|
current_state_ids = yield self.store.get_state_ids_for_event(
|
||||||
@ -543,11 +568,6 @@ class SyncHandler(object):
|
|||||||
|
|
||||||
state_ids = current_state_ids
|
state_ids = current_state_ids
|
||||||
|
|
||||||
timeline_state = {
|
|
||||||
(event.type, event.state_key): event.event_id
|
|
||||||
for event in batch.events if event.is_state()
|
|
||||||
}
|
|
||||||
|
|
||||||
state_ids = _calculate_state(
|
state_ids = _calculate_state(
|
||||||
timeline_contains=timeline_state,
|
timeline_contains=timeline_state,
|
||||||
timeline_start=state_ids,
|
timeline_start=state_ids,
|
||||||
@ -571,21 +591,6 @@ class SyncHandler(object):
|
|||||||
filtered_types=filtered_types,
|
filtered_types=filtered_types,
|
||||||
)
|
)
|
||||||
|
|
||||||
timeline_state = {
|
|
||||||
(event.type, event.state_key): event.event_id
|
|
||||||
for event in batch.events if event.is_state()
|
|
||||||
}
|
|
||||||
|
|
||||||
# TODO: optionally filter out redundant membership events at this
|
|
||||||
# point, to stop repeatedly sending members in every /sync as if
|
|
||||||
# the client isn't tracking them.
|
|
||||||
# When implemented, this should filter using event_ids (not mxids).
|
|
||||||
# In practice, limited syncs are
|
|
||||||
# relatively rare so it's not a total disaster to send redundant
|
|
||||||
# members down at this point. Redundant members are ones which
|
|
||||||
# repeatedly get sent down /sync because we don't know if the client
|
|
||||||
# is caching them or not.
|
|
||||||
|
|
||||||
state_ids = _calculate_state(
|
state_ids = _calculate_state(
|
||||||
timeline_contains=timeline_state,
|
timeline_contains=timeline_state,
|
||||||
timeline_start=state_at_timeline_start,
|
timeline_start=state_at_timeline_start,
|
||||||
@ -596,16 +601,48 @@ class SyncHandler(object):
|
|||||||
else:
|
else:
|
||||||
state_ids = {}
|
state_ids = {}
|
||||||
if lazy_load_members:
|
if lazy_load_members:
|
||||||
# TODO: filter out redundant members based on their mxids (not their
|
|
||||||
# event_ids) at this point. We know we can do it based on mxid as this
|
|
||||||
# is an non-gappy incremental sync.
|
|
||||||
|
|
||||||
if types:
|
if types:
|
||||||
state_ids = yield self.store.get_state_ids_for_event(
|
state_ids = yield self.store.get_state_ids_for_event(
|
||||||
batch.events[0].event_id, types=types,
|
batch.events[0].event_id, types=types,
|
||||||
filtered_types=filtered_types,
|
filtered_types=filtered_types,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if lazy_load_members and not include_redundant_members:
|
||||||
|
cache_key = (sync_config.user.to_string(), sync_config.device_id)
|
||||||
|
cache = self.lazy_loaded_members_cache.get(cache_key)
|
||||||
|
if cache is None:
|
||||||
|
logger.debug("creating LruCache for %r", cache_key)
|
||||||
|
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
|
||||||
|
self.lazy_loaded_members_cache[cache_key] = cache
|
||||||
|
else:
|
||||||
|
logger.debug("found LruCache for %r", cache_key)
|
||||||
|
|
||||||
|
# if it's a new sync sequence, then assume the client has had
|
||||||
|
# amnesia and doesn't want any recent lazy-loaded members
|
||||||
|
# de-duplicated.
|
||||||
|
if since_token is None:
|
||||||
|
logger.debug("clearing LruCache for %r", cache_key)
|
||||||
|
cache.clear()
|
||||||
|
else:
|
||||||
|
# only send members which aren't in our LruCache (either
|
||||||
|
# because they're new to this client or have been pushed out
|
||||||
|
# of the cache)
|
||||||
|
logger.debug("filtering state from %r...", state_ids)
|
||||||
|
state_ids = {
|
||||||
|
t: event_id
|
||||||
|
for t, event_id in state_ids.iteritems()
|
||||||
|
if cache.get(t[1]) != event_id
|
||||||
|
}
|
||||||
|
logger.debug("...to %r", state_ids)
|
||||||
|
|
||||||
|
# add any member IDs we are about to send into our LruCache
|
||||||
|
for t, event_id in itertools.chain(
|
||||||
|
state_ids.items(),
|
||||||
|
timeline_state.items(),
|
||||||
|
):
|
||||||
|
if t[0] == EventTypes.Member:
|
||||||
|
cache.set(t[1], event_id)
|
||||||
|
|
||||||
state = {}
|
state = {}
|
||||||
if state_ids:
|
if state_ids:
|
||||||
state = yield self.store.get_events(list(state_ids.values()))
|
state = yield self.store.get_events(list(state_ids.values()))
|
||||||
|
Loading…
Reference in New Issue
Block a user