Merge remote-tracking branch 'upstream/release-v1.36'

This commit is contained in:
Tulir Asokan 2021-06-08 16:24:37 +03:00
commit 8d96b324dc
116 changed files with 4194 additions and 2503 deletions

View file

@ -22,6 +22,7 @@ from collections.abc import Container
from http import HTTPStatus
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
@ -178,6 +179,8 @@ class FederationHandler(BaseHandler):
self.room_queues = {} # type: Dict[str, List[Tuple[EventBase, str]]]
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
self._room_backfill = Linearizer("room_backfill")
self.third_party_event_rules = hs.get_third_party_event_rules()
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
@ -577,7 +580,9 @@ class FederationHandler(BaseHandler):
# Fetch the state events from the DB, and check we have the auth events.
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
auth_events_in_store = await self.store.have_seen_events(
room_id, auth_event_ids
)
# Check for missing events. We handle state and auth event seperately,
# as we want to pull the state from the DB, but we don't for the auth
@ -610,7 +615,7 @@ class FederationHandler(BaseHandler):
if missing_auth_events:
auth_events_in_store = await self.store.have_seen_events(
missing_auth_events
room_id, missing_auth_events
)
missing_auth_events.difference_update(auth_events_in_store)
@ -710,7 +715,7 @@ class FederationHandler(BaseHandler):
missing_auth_events = set(auth_event_ids) - fetched_events.keys()
missing_auth_events.difference_update(
await self.store.have_seen_events(missing_auth_events)
await self.store.have_seen_events(room_id, missing_auth_events)
)
logger.debug("We are also missing %i auth events", len(missing_auth_events))
@ -1039,6 +1044,12 @@ class FederationHandler(BaseHandler):
return. This is used as part of the heuristic to decide if we
should back paginate.
"""
with (await self._room_backfill.queue(room_id)):
return await self._maybe_backfill_inner(room_id, current_depth, limit)
async def _maybe_backfill_inner(
self, room_id: str, current_depth: int, limit: int
) -> bool:
extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
if not extremities:
@ -1354,11 +1365,12 @@ class FederationHandler(BaseHandler):
event_infos.append(_NewEventInfo(event, None, auth))
await self._auth_and_persist_events(
destination,
room_id,
event_infos,
)
if event_infos:
await self._auth_and_persist_events(
destination,
room_id,
event_infos,
)
def _sanity_check_event(self, ev: EventBase) -> None:
"""
@ -2067,7 +2079,7 @@ class FederationHandler(BaseHandler):
self,
origin: str,
room_id: str,
event_infos: Iterable[_NewEventInfo],
event_infos: Collection[_NewEventInfo],
backfilled: bool = False,
) -> None:
"""Creates the appropriate contexts and persists events. The events
@ -2078,6 +2090,9 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
if not event_infos:
return
async def prep(ev_info: _NewEventInfo):
event = ev_info.event
with nested_logging_context(suffix=event.event_id):
@ -2206,13 +2221,14 @@ class FederationHandler(BaseHandler):
raise
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
await self.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
],
)
if auth_events or state:
await self.persist_events_and_notify(
room_id,
[
(e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state)
],
)
new_event_context = await self.state_handler.compute_event_context(
event, old_state=state
@ -2475,7 +2491,7 @@ class FederationHandler(BaseHandler):
#
# we start by checking if they are in the store, and then try calling /event_auth/.
if missing_auth:
have_events = await self.store.have_seen_events(missing_auth)
have_events = await self.store.have_seen_events(event.room_id, missing_auth)
logger.debug("Events %s are in the store", have_events)
missing_auth.difference_update(have_events)
@ -2494,7 +2510,7 @@ class FederationHandler(BaseHandler):
return context
seen_remotes = await self.store.have_seen_events(
[e.event_id for e in remote_auth_chain]
event.room_id, [e.event_id for e in remote_auth_chain]
)
for e in remote_auth_chain:
@ -3051,11 +3067,18 @@ class FederationHandler(BaseHandler):
the same room.
backfilled: Whether these events are a result of
backfilling or not
Returns:
The stream ID after which all events have been persisted.
"""
if not event_and_contexts:
return self.store.get_current_events_token()
instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
# Limit the number of events sent over federation.
for batch in batch_iter(event_and_contexts, 1000):
# Limit the number of events sent over replication. We choose 200
# here as that is what we default to in `max_request_body_size(..)`
for batch in batch_iter(event_and_contexts, 200):
result = await self._send_events(
instance_name=instance,
store=self.store,