Merge remote-tracking branch 'upstream/release-v1.50'

This commit is contained in:
Tulir Asokan 2022-01-07 14:21:32 +02:00
commit e9caf56ca0
205 changed files with 4905 additions and 2749 deletions

View file

@ -462,9 +462,9 @@ class ApplicationServicesHandler:
Args:
room_alias: The room alias to query.
Returns:
namedtuple: with keys "room_id" and "servers" or None if no
association can be found.
RoomAliasMapping or None if no association can be found.
"""
room_alias_str = room_alias.to_string()
services = self.store.get_app_services()

View file

@ -997,9 +997,7 @@ class AuthHandler:
# really don't want is active access_tokens without a record of the
# device, so we double-check it here.
if device_id is not None:
try:
await self.store.get_device(user_id, device_id)
except StoreError:
if await self.store.get_device(user_id, device_id) is None:
await self.store.delete_access_token(access_token)
raise StoreError(400, "Login raced against device deletion")

View file

@ -106,10 +106,10 @@ class DeviceWorkerHandler:
Raises:
errors.NotFoundError: if the device was not found
"""
try:
device = await self.store.get_device(user_id, device_id)
except errors.StoreError:
raise errors.NotFoundError
device = await self.store.get_device(user_id, device_id)
if device is None:
raise errors.NotFoundError()
ips = await self.store.get_last_client_ip_by_device(user_id, device_id)
_update_device_from_client_ips(device, ips)
@ -602,6 +602,8 @@ class DeviceHandler(DeviceWorkerHandler):
access_token, device_id
)
old_device = await self.store.get_device(user_id, old_device_id)
if old_device is None:
raise errors.NotFoundError()
await self.store.update_device(user_id, device_id, old_device["display_name"])
# can't call self.delete_device because that will clobber the
# access token so call the storage layer directly

View file

@ -280,13 +280,15 @@ class DirectoryHandler:
users = await self.store.get_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users}
servers = set(extra_servers) | set(servers)
servers_set = set(extra_servers) | set(servers)
# If this server is in the list of servers, return it first.
if self.server_name in servers:
servers = [self.server_name] + [s for s in servers if s != self.server_name]
if self.server_name in servers_set:
servers = [self.server_name] + [
s for s in servers_set if s != self.server_name
]
else:
servers = list(servers)
servers = list(servers_set)
return {"room_id": room_id, "servers": servers}

View file

@ -65,8 +65,12 @@ class E2eKeysHandler:
else:
# Only register this edu handler on master as it requires writing
# device updates to the db
#
# FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
federation_registry.register_edu_handler(
"m.signing_key_update",
self._edu_updater.incoming_signing_key_update,
)
# also handle the unstable version
# FIXME: remove this when enough servers have upgraded
federation_registry.register_edu_handler(
"org.matrix.signing_key_update",
self._edu_updater.incoming_signing_key_update,
@ -576,7 +580,9 @@ class E2eKeysHandler:
log_kv(
{"message": "Did not update one_time_keys", "reason": "no keys given"}
)
fallback_keys = keys.get("org.matrix.msc2732.fallback_keys", None)
fallback_keys = keys.get("fallback_keys") or keys.get(
"org.matrix.msc2732.fallback_keys"
)
if fallback_keys and isinstance(fallback_keys, dict):
log_kv(
{

View file

@ -14,7 +14,9 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Optional
from typing import TYPE_CHECKING, Dict, Optional
from typing_extensions import Literal
from synapse.api.errors import (
Codes,
@ -24,6 +26,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.logging.opentracing import log_kv, trace
from synapse.storage.databases.main.e2e_room_keys import RoomKey
from synapse.types import JsonDict
from synapse.util.async_helpers import Linearizer
@ -58,7 +61,9 @@ class E2eRoomKeysHandler:
version: str,
room_id: Optional[str] = None,
session_id: Optional[str] = None,
) -> List[JsonDict]:
) -> Dict[
Literal["rooms"], Dict[str, Dict[Literal["sessions"], Dict[str, RoomKey]]]
]:
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
room, or a given session.
See EndToEndRoomKeyStore.get_e2e_room_keys for full details.
@ -72,8 +77,8 @@ class E2eRoomKeysHandler:
Raises:
NotFoundError: if the backup version does not exist
Returns:
A list of dicts giving the session_data and message metadata for
these room keys.
A dict giving the session_data and message metadata for these room keys.
`{"rooms": {room_id: {"sessions": {session_id: room_key}}}}`
"""
# we deliberately take the lock to get keys so that changing the version
@ -273,7 +278,7 @@ class E2eRoomKeysHandler:
@staticmethod
def _should_replace_room_key(
current_room_key: Optional[JsonDict], room_key: JsonDict
current_room_key: Optional[RoomKey], room_key: RoomKey
) -> bool:
"""
Determine whether to replace a given current_room_key (if any)

View file

@ -79,13 +79,14 @@ class EventStreamHandler:
# thundering herds on restart.
timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
events, tokens = await self.notifier.get_events_for(
stream_result = await self.notifier.get_events_for(
auth_user,
pagin_config,
timeout,
is_guest=is_guest,
explicit_room_id=room_id,
)
events = stream_result.events
time_now = self.clock.time_msec()
@ -122,14 +123,12 @@ class EventStreamHandler:
events,
time_now,
as_client_event=as_client_event,
# Don't bundle aggregations as this is a deprecated API.
bundle_aggregations=False,
)
chunk = {
"chunk": chunks,
"start": await tokens[0].to_string(self.store),
"end": await tokens[1].to_string(self.store),
"start": await stream_result.start_token.to_string(self.store),
"end": await stream_result.end_token.to_string(self.store),
}
return chunk

View file

@ -360,31 +360,34 @@ class FederationHandler:
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
states = await make_deferred_yieldable(
states_list = await make_deferred_yieldable(
defer.gatherResults(
[resolve(room_id, [e]) for e in event_ids], consumeErrors=True
)
)
# dict[str, dict[tuple, str]], a map from event_id to state map of
# event_ids.
states = dict(zip(event_ids, [s.state for s in states]))
# A map from event_id to state map of event_ids.
state_ids: Dict[str, StateMap[str]] = dict(
zip(event_ids, [s.state for s in states_list])
)
state_map = await self.store.get_events(
[e_id for ids in states.values() for e_id in ids.values()],
[e_id for ids in state_ids.values() for e_id in ids.values()],
get_prev_content=False,
)
states = {
# A map from event_id to state map of events.
state_events: Dict[str, StateMap[EventBase]] = {
key: {
k: state_map[e_id]
for k, e_id in state_dict.items()
if e_id in state_map
}
for key, state_dict in states.items()
for key, state_dict in state_ids.items()
}
for e_id in event_ids:
likely_extremeties_domains = get_domains_from_state(states[e_id])
likely_extremeties_domains = get_domains_from_state(state_events[e_id])
success = await try_backfill(
[

View file

@ -421,9 +421,6 @@ class FederationEventHandler:
Raises:
SynapseError if the response is in some way invalid.
"""
for e in itertools.chain(auth_events, state):
e.internal_metadata.outlier = True
event_map = {e.event_id: e for e in itertools.chain(auth_events, state)}
create_event = None
@ -666,7 +663,9 @@ class FederationEventHandler:
logger.info("Processing pulled event %s", event)
# these should not be outliers.
assert not event.internal_metadata.is_outlier()
assert (
not event.internal_metadata.is_outlier()
), "pulled event unexpectedly flagged as outlier"
event_id = event.event_id
@ -1192,7 +1191,6 @@ class FederationEventHandler:
[destination],
event_id,
room_version,
outlier=True,
)
if event is None:
logger.warning(
@ -1221,9 +1219,10 @@ class FederationEventHandler:
"""Persist a batch of outlier events fetched from remote servers.
We first sort the events to make sure that we process each event's auth_events
before the event itself, and then auth and persist them.
before the event itself.
Notifies about the events where appropriate.
We then mark the events as outliers, persist them to the database, and, where
appropriate (eg, an invite), awake the notifier.
Params:
room_id: the room that the events are meant to be in (though this has
@ -1274,7 +1273,8 @@ class FederationEventHandler:
Persists a batch of events where we have (theoretically) already persisted all
of their auth events.
Notifies about the events where appropriate.
Marks the events as outliers, auths them, persists them to the database, and,
where appropriate (eg, an invite), awakes the notifier.
Params:
origin: where the events came from
@ -1312,6 +1312,9 @@ class FederationEventHandler:
return None
auth.append(ae)
# we're not bothering about room state, so flag the event as an outlier.
event.internal_metadata.outlier = True
context = EventContext.for_outlier()
try:
validate_event_for_room_version(room_version_obj, event)
@ -1838,7 +1841,7 @@ class FederationEventHandler:
The stream ID after which all events have been persisted.
"""
if not event_and_contexts:
return self._store.get_current_events_token()
return self._store.get_room_max_stream_ordering()
instance = self._config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:

View file

@ -13,21 +13,27 @@
# limitations under the License.
import logging
from typing import TYPE_CHECKING, List, Optional, Tuple
from twisted.internet import defer
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
from synapse.api.constants import EduTypes, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.receipts import ReceiptEventSource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.storage.roommember import RoomsForUser
from synapse.streams.config import PaginationConfig
from synapse.types import JsonDict, Requester, RoomStreamToken, StreamToken, UserID
from synapse.types import (
JsonDict,
Requester,
RoomStreamToken,
StateMap,
StreamToken,
UserID,
)
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import concurrently_execute
from synapse.util.async_helpers import concurrently_execute, gather_results
from synapse.util.caches.response_cache import ResponseCache
from synapse.visibility import filter_events_for_client
@ -167,8 +173,6 @@ class InitialSyncHandler:
d["invite"] = await self._event_serializer.serialize_event(
invite_event,
time_now,
# Don't bundle aggregations as this is a deprecated API.
bundle_aggregations=False,
as_client_event=as_client_event,
)
@ -190,14 +194,13 @@ class InitialSyncHandler:
)
deferred_room_state = run_in_background(
self.state_store.get_state_for_events, [event.event_id]
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
).addCallback(
lambda states: cast(StateMap[EventBase], states[event.event_id])
)
(messages, token), current_state = await make_deferred_yieldable(
defer.gatherResults(
[
gather_results(
(
run_in_background(
self.store.get_recent_events_for_room,
event.room_id,
@ -205,7 +208,7 @@ class InitialSyncHandler:
end_token=room_end_token,
),
deferred_room_state,
]
)
)
).addErrback(unwrapFirstError)
@ -222,8 +225,6 @@ class InitialSyncHandler:
await self._event_serializer.serialize_events(
messages,
time_now=time_now,
# Don't bundle aggregations as this is a deprecated API.
bundle_aggregations=False,
as_client_event=as_client_event,
)
),
@ -234,8 +235,6 @@ class InitialSyncHandler:
d["state"] = await self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
# Don't bundle aggregations as this is a deprecated API.
bundle_aggregations=False,
as_client_event=as_client_event,
)
@ -377,9 +376,7 @@ class InitialSyncHandler:
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(
messages, time_now, bundle_aggregations=False
)
await self._event_serializer.serialize_events(messages, time_now)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
@ -387,7 +384,7 @@ class InitialSyncHandler:
"state": (
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(
room_state.values(), time_now, bundle_aggregations=False
room_state.values(), time_now
)
),
"presence": [],
@ -408,7 +405,7 @@ class InitialSyncHandler:
time_now = self.clock.time_msec()
# Don't bundle aggregations as this is a deprecated API.
state = await self._event_serializer.serialize_events(
current_state.values(), time_now, bundle_aggregations=False
current_state.values(), time_now
)
now_token = self.hs.get_event_sources().get_current_token()
@ -454,8 +451,8 @@ class InitialSyncHandler:
return receipts
presence, receipts, (messages, token) = await make_deferred_yieldable(
defer.gatherResults(
[
gather_results(
(
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
@ -464,7 +461,7 @@ class InitialSyncHandler:
limit=limit,
end_token=now_token.room_key,
),
],
),
consumeErrors=True,
).addErrback(unwrapFirstError)
)
@ -483,9 +480,7 @@ class InitialSyncHandler:
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(
messages, time_now, bundle_aggregations=False
)
await self._event_serializer.serialize_events(messages, time_now)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),

View file

@ -21,7 +21,6 @@ from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
from canonicaljson import encode_canonical_json
from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall
from synapse import event_auth
@ -57,7 +56,7 @@ from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder, json_encoder, log_failure
from synapse.util.async_helpers import Linearizer, unwrapFirstError
from synapse.util.async_helpers import Linearizer, gather_results, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
@ -498,6 +497,7 @@ class EventCreationHandler:
require_consent: bool = True,
outlier: bool = False,
historical: bool = False,
allow_no_prev_events: bool = False,
depth: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
@ -609,6 +609,7 @@ class EventCreationHandler:
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
depth=depth,
allow_no_prev_events=allow_no_prev_events,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@ -884,6 +885,7 @@ class EventCreationHandler:
prev_event_ids: Optional[List[str]] = None,
auth_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
allow_no_prev_events: bool = False,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@ -914,6 +916,7 @@ class EventCreationHandler:
full_state_ids_at_event = None
if auth_event_ids is not None:
# If auth events are provided, prev events must be also.
# prev_event_ids could be an empty array though.
assert prev_event_ids is not None
# Copy the full auth state before it stripped down
@ -945,14 +948,22 @@ class EventCreationHandler:
else:
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
# we now ought to have some prev_events (unless it's a create event).
#
# do a quick sanity check here, rather than waiting until we've created the
# Do a quick sanity check here, rather than waiting until we've created the
# event and then try to auth it (which fails with a somewhat confusing "No
# create event in auth events")
assert (
builder.type == EventTypes.Create or len(prev_event_ids) > 0
), "Attempting to create an event with no prev_events"
if allow_no_prev_events:
# We allow events with no `prev_events` but it better have some `auth_events`
assert (
builder.type == EventTypes.Create
# Allow an event to have empty list of prev_event_ids
# only if it has auth_event_ids.
or auth_event_ids
), "Attempting to create a non-m.room.create event with no prev_events or auth_event_ids"
else:
# we now ought to have some prev_events (unless it's a create event).
assert (
builder.type == EventTypes.Create or prev_event_ids
), "Attempting to create a non-m.room.create event with no prev_events"
event = await builder.build(
prev_event_ids=prev_event_ids,
@ -1158,9 +1169,9 @@ class EventCreationHandler:
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
result = await make_deferred_yieldable(
defer.gatherResults(
[
result, _ = await make_deferred_yieldable(
gather_results(
(
run_in_background(
self._persist_event,
requester=requester,
@ -1172,12 +1183,12 @@ class EventCreationHandler:
run_in_background(
self.cache_joined_hosts_for_event, event, context
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
],
),
consumeErrors=True,
)
).addErrback(unwrapFirstError)
return result[0]
return result
async def _persist_event(
self,

View file

@ -542,7 +542,10 @@ class PaginationHandler:
chunk = {
"chunk": (
await self._event_serializer.serialize_events(
events, time_now, as_client_event=as_client_event
events,
time_now,
bundle_aggregations=True,
as_client_event=as_client_event,
)
),
"start": await from_token.to_string(self.store),

View file

@ -729,7 +729,7 @@ class PresenceHandler(BasePresenceHandler):
# Presence is best effort and quickly heals itself, so lets just always
# stream from the current state when we restart.
self._event_pos = self.store.get_current_events_token()
self._event_pos = self.store.get_room_max_stream_ordering()
self._event_processing = False
async def _on_shutdown(self) -> None:

View file

@ -14,7 +14,7 @@
import logging
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse.api.constants import ReadReceiptEventFields
from synapse.api.constants import ReadReceiptEventFields, ReceiptTypes
from synapse.appservice import ApplicationService
from synapse.streams import EventSource
from synapse.types import JsonDict, ReadReceipt, UserID, get_domain_from_id
@ -179,7 +179,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
for event_id in content.keys():
event_content = content.get(event_id, {})
m_read = event_content.get("m.read", {})
m_read = event_content.get(ReceiptTypes.READ, {})
# If m_read is missing copy over the original event_content as there is nothing to process here
if not m_read:
@ -207,7 +207,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
# Set new users unless empty
if len(new_users.keys()) > 0:
new_event["content"][event_id] = {"m.read": new_users}
new_event["content"][event_id] = {ReceiptTypes.READ: new_users}
# Append new_event to visible_events unless empty
if len(new_event["content"].keys()) > 0:

View file

@ -172,7 +172,7 @@ class RoomCreationHandler:
user_id = requester.user.to_string()
# Check if this room is already being upgraded by another person
for key in self._upgrade_response_cache.pending_result_cache:
for key in self._upgrade_response_cache.keys():
if key[0] == old_room_id and key[1] != user_id:
# Two different people are trying to upgrade the same room.
# Send the second an error.

View file

@ -13,9 +13,9 @@
# limitations under the License.
import logging
from collections import namedtuple
from typing import TYPE_CHECKING, Any, Optional, Tuple
import attr
import msgpack
from unpaddedbase64 import decode_base64, encode_base64
@ -474,16 +474,12 @@ class RoomListHandler:
)
class RoomListNextBatch(
namedtuple(
"RoomListNextBatch",
(
"last_joined_members", # The count to get rooms after/before
"last_room_id", # The room_id to get rooms after/before
"direction_is_forward", # Bool if this is a next_batch, false if prev_batch
),
)
):
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomListNextBatch:
last_joined_members: int # The count to get rooms after/before
last_room_id: str # The room_id to get rooms after/before
direction_is_forward: bool # True if this is a next_batch, false if prev_batch
KEY_DICT = {
"last_joined_members": "m",
"last_room_id": "r",
@ -502,12 +498,12 @@ class RoomListNextBatch(
def to_token(self) -> str:
return encode_base64(
msgpack.dumps(
{self.KEY_DICT[key]: val for key, val in self._asdict().items()}
{self.KEY_DICT[key]: val for key, val in attr.asdict(self).items()}
)
)
def copy_and_replace(self, **kwds: Any) -> "RoomListNextBatch":
return self._replace(**kwds)
return attr.evolve(self, **kwds)
def _matches_room_entry(room_entry: JsonDict, search_filter: dict) -> bool:

View file

@ -658,7 +658,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if block_invite:
raise SynapseError(403, "Invites have been disabled on this server")
if prev_event_ids:
# An empty prev_events list is allowed as long as the auth_event_ids are present
if prev_event_ids is not None:
return await self._local_membership_update(
requester=requester,
target=target,
@ -1019,7 +1020,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# Add new room to the room directory if the old room was there
# Remove old room from the room directory
old_room = await self.store.get_room(old_room_id)
if old_room and old_room["is_public"]:
if old_room is not None and old_room["is_public"]:
await self.store.set_room_is_public(old_room_id, False)
await self.store.set_room_is_public(room_id, True)
@ -1030,7 +1031,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
local_group_ids = await self.store.get_local_groups_for_room(old_room_id)
for group_id in local_group_ids:
# Add new the new room to those groups
await self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
await self.store.add_room_to_group(
group_id, room_id, old_room is not None and old_room["is_public"]
)
# Remove the old room from those groups
await self.store.remove_room_from_group(group_id, old_room_id)

View file

@ -80,6 +80,17 @@ class StatsHandler:
# If self.pos is None then means we haven't fetched it from DB
if self.pos is None:
self.pos = await self.store.get_stats_positions()
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
logger.warning(
"Event stream ordering appears to have gone backwards (%i -> %i): "
"rewinding stats processor",
self.pos,
room_max_stream_ordering,
)
self.pos = room_max_stream_ordering
# Loop round handling deltas until we're up to date

View file

@ -28,7 +28,7 @@ from typing import (
import attr
from prometheus_client import Counter
from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.constants import AccountDataTypes, EventTypes, Membership, ReceiptTypes
from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
@ -36,6 +36,7 @@ from synapse.events import EventBase
from synapse.logging.context import current_context
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.databases.main.event_push_actions import NotifCounts
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
@ -421,7 +422,7 @@ class SyncHandler:
span to track the sync. See `generate_sync_result` for the next part of your
indoctrination.
"""
with start_active_span("current_sync_for_user"):
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
sync_result = await self.generate_sync_result(
sync_config, since_token, full_state
@ -1040,18 +1041,17 @@ class SyncHandler:
async def unread_notifs_for_room_id(
self, room_id: str, sync_config: SyncConfig
) -> Dict[str, int]:
) -> NotifCounts:
with Measure(self.clock, "unread_notifs_for_room_id"):
last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
user_id=sync_config.user.to_string(),
room_id=room_id,
receipt_type="m.read",
receipt_type=ReceiptTypes.READ,
)
notifs = await self.store.get_unread_event_push_actions_by_room_for_user(
return await self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
return notifs
async def generate_sync_result(
self,
@ -1584,7 +1584,8 @@ class SyncHandler:
)
logger.debug("Generated room entry for %s", room_entry.room_id)
await concurrently_execute(handle_room_entries, room_entries, 10)
with start_active_span("sync.generate_room_entries"):
await concurrently_execute(handle_room_entries, room_entries, 10)
sync_result_builder.invited.extend(invited)
sync_result_builder.knocked.extend(knocked)
@ -1661,20 +1662,20 @@ class SyncHandler:
) -> _RoomChanges:
"""Determine the changes in rooms to report to the user.
Ideally, we want to report all events whose stream ordering `s` lies in the
range `since_token < s <= now_token`, where the two tokens are read from the
sync_result_builder.
This function is a first pass at generating the rooms part of the sync response.
It determines which rooms have changed during the sync period, and categorises
them into four buckets: "knock", "invite", "join" and "leave".
If there are too many events in that range to report, things get complicated.
In this situation we return a truncated list of the most recent events, and
indicate in the response that there is a "gap" of omitted events. Additionally:
1. Finds all membership changes for the user in the sync period (from
`since_token` up to `now_token`).
2. Uses those to place the room in one of the four categories above.
3. Builds a `_RoomChanges` struct to record this, and return that struct.
- we include a "state_delta", to describe the changes in state over the gap,
- we include all membership events applying to the user making the request,
even those in the gap.
See the spec for the rationale:
https://spec.matrix.org/v1.1/client-server-api/#syncing
For rooms classified as "knock", "invite" or "leave", we just need to report
a single membership event in the eventual /sync response. For "join" we need
to fetch additional non-membership events, e.g. messages in the room. That is
more complicated, so instead we report an intermediary `RoomSyncResultBuilder`
struct, and leave the additional work to `_generate_room_entry`.
The sync_result_builder is not modified by this function.
"""
@ -1685,16 +1686,6 @@ class SyncHandler:
assert since_token
# The spec
# https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
# notes that membership events need special consideration:
#
# > When a sync is limited, the server MUST return membership events for events
# > in the gap (between since and the start of the returned timeline), regardless
# > as to whether or not they are redundant.
#
# We fetch such events here, but we only seem to use them for categorising rooms
# as newly joined, newly left, invited or knocked.
# TODO: we've already called this function and ran this query in
# _have_rooms_changed. We could keep the results in memory to avoid a
# second query, at the cost of more complicated source code.
@ -2008,6 +1999,23 @@ class SyncHandler:
"""Populates the `joined` and `archived` section of `sync_result_builder`
based on the `room_builder`.
Ideally, we want to report all events whose stream ordering `s` lies in the
range `since_token < s <= now_token`, where the two tokens are read from the
sync_result_builder.
If there are too many events in that range to report, things get complicated.
In this situation we return a truncated list of the most recent events, and
indicate in the response that there is a "gap" of omitted events. Lots of this
is handled in `_load_filtered_recents`, but some of is handled in this method.
Additionally:
- we include a "state_delta", to describe the changes in state over the gap,
- we include all membership events applying to the user making the request,
even those in the gap.
See the spec for the rationale:
https://spec.matrix.org/v1.1/client-server-api/#syncing
Args:
sync_result_builder
ignored_users: Set of users ignored by user.
@ -2037,7 +2045,7 @@ class SyncHandler:
since_token = room_builder.since_token
upto_token = room_builder.upto_token
with start_active_span("generate_room_entry"):
with start_active_span("sync.generate_room_entry"):
set_tag("room_id", room_id)
log_kv({"events": len(events or ())})
@ -2165,10 +2173,10 @@ class SyncHandler:
if room_sync or always_include:
notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
unread_notifications["notification_count"] = notifs["notify_count"]
unread_notifications["highlight_count"] = notifs["highlight_count"]
unread_notifications["notification_count"] = notifs.notify_count
unread_notifications["highlight_count"] = notifs.highlight_count
room_sync.unread_count = notifs["unread_count"]
room_sync.unread_count = notifs.unread_count
sync_result_builder.joined.append(room_sync)

View file

@ -13,9 +13,10 @@
# limitations under the License.
import logging
import random
from collections import namedtuple
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
import attr
from synapse.api.errors import AuthError, ShadowBanError, SynapseError
from synapse.appservice import ApplicationService
from synapse.metrics.background_process_metrics import (
@ -37,7 +38,10 @@ logger = logging.getLogger(__name__)
# A tiny object useful for storing a user's membership in a room, as a mapping
# key
RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
@attr.s(slots=True, frozen=True, auto_attribs=True)
class RoomMember:
room_id: str
user_id: str
# How often we expect remote servers to resend us presence.
@ -119,7 +123,7 @@ class FollowerTypingHandler:
self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
def is_typing(self, member: RoomMember) -> bool:
return member.user_id in self._room_typing.get(member.room_id, [])
return member.user_id in self._room_typing.get(member.room_id, set())
async def _push_remote(self, member: RoomMember, typing: bool) -> None:
if not self.federation:
@ -166,9 +170,9 @@ class FollowerTypingHandler:
for row in rows:
self._room_serials[row.room_id] = token
prev_typing = set(self._room_typing.get(row.room_id, []))
prev_typing = self._room_typing.get(row.room_id, set())
now_typing = set(row.user_ids)
self._room_typing[row.room_id] = row.user_ids
self._room_typing[row.room_id] = now_typing
if self.federation:
run_as_background_process(

View file

@ -148,9 +148,21 @@ class UserDirectoryHandler(StateDeltasHandler):
if self.pos is None:
self.pos = await self.store.get_user_directory_stream_pos()
# If still None then the initial background update hasn't happened yet.
if self.pos is None:
return None
# If still None then the initial background update hasn't happened yet.
if self.pos is None:
return None
room_max_stream_ordering = self.store.get_room_max_stream_ordering()
if self.pos > room_max_stream_ordering:
# apparently, we've processed more events than exist in the database!
# this can happen if events are removed with history purge or similar.
logger.warning(
"Event stream ordering appears to have gone backwards (%i -> %i): "
"rewinding user directory processor",
self.pos,
room_max_stream_ordering,
)
self.pos = room_max_stream_ordering
# Loop round handling deltas until we're up to date
while True: