synapse-product/synapse/handlers/pagination.py

456 lines
17 KiB
Python
Raw Normal View History

2018-07-20 10:32:23 -04:00
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
# Copyright 2017 - 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Any, Dict, Optional, Set
2018-07-20 10:32:23 -04:00
from twisted.python.failure import Failure
2018-08-16 09:22:47 -04:00
from synapse.api.constants import EventTypes, Membership
2018-07-20 10:32:23 -04:00
from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
from synapse.types import Requester, RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock
2018-07-20 10:32:23 -04:00
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
from synapse.server import HomeServer
2018-07-20 10:32:23 -04:00
logger = logging.getLogger(__name__)
2020-09-04 06:54:56 -04:00
class PurgeStatus:
2018-07-20 10:32:23 -04:00
"""Object tracking the status of a purge request
This class contains information on the progress of a purge request, for
return by get_purge_status.
Attributes:
status (int): Tracks whether this request has completed. One of
STATUS_{ACTIVE,COMPLETE,FAILED}
"""
STATUS_ACTIVE = 0
STATUS_COMPLETE = 1
STATUS_FAILED = 2
STATUS_TEXT = {
STATUS_ACTIVE: "active",
STATUS_COMPLETE: "complete",
STATUS_FAILED: "failed",
}
def __init__(self):
self.status = PurgeStatus.STATUS_ACTIVE
def asdict(self):
2019-06-20 05:32:02 -04:00
return {"status": PurgeStatus.STATUS_TEXT[self.status]}
2018-07-20 10:32:23 -04:00
2020-09-04 06:54:56 -04:00
class PaginationHandler:
2018-07-20 10:32:23 -04:00
"""Handles pagination and purge history requests.
These are in the same handler due to the fact we need to block clients
paginating during a purge.
"""
def __init__(self, hs: "HomeServer"):
2018-07-20 10:32:23 -04:00
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
2019-10-23 12:25:54 -04:00
self.storage = hs.get_storage()
self.state_store = self.storage.state
2018-07-20 10:32:23 -04:00
self.clock = hs.get_clock()
2019-08-22 05:42:59 -04:00
self._server_name = hs.hostname
2018-07-20 10:32:23 -04:00
self.pagination_lock = ReadWriteLock()
self._purges_in_progress_by_room = set() # type: Set[str]
2018-07-20 10:32:23 -04:00
# map from purge id to PurgeStatus
self._purges_by_id = {} # type: Dict[str, PurgeStatus]
self._event_serializer = hs.get_event_client_serializer()
2018-07-20 10:32:23 -04:00
self._retention_default_max_lifetime = hs.config.retention_default_max_lifetime
self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min
self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max
if hs.config.retention_enabled:
# Run the purge jobs described in the configuration file.
for job in hs.config.retention_purge_jobs:
logger.info("Setting up purge job with config: %s", job)
self.clock.looping_call(
run_as_background_process,
job["interval"],
"purge_history_for_rooms_in_range",
self.purge_history_for_rooms_in_range,
job["shortest_max_lifetime"],
job["longest_max_lifetime"],
)
async def purge_history_for_rooms_in_range(
self, min_ms: Optional[int], max_ms: Optional[int]
):
"""Purge outdated events from rooms within the given retention range.
If a default retention policy is defined in the server's configuration and its
'max_lifetime' is within this range, also targets rooms which don't have a
retention policy.
Args:
min_ms: Duration in milliseconds that define the lower limit of
the range to handle (exclusive). If None, it means that the range has no
lower limit.
max_ms: Duration in milliseconds that define the upper limit of
the range to handle (inclusive). If None, it means that the range has no
upper limit.
"""
# We want the storage layer to include rooms with no retention policy in its
# return value only if a default retention policy is defined in the server's
# configuration and that policy's 'max_lifetime' is either lower (or equal) than
# max_ms or higher than min_ms (or both).
if self._retention_default_max_lifetime is not None:
include_null = True
if min_ms is not None and min_ms >= self._retention_default_max_lifetime:
# The default max_lifetime is lower than (or equal to) min_ms.
include_null = False
if max_ms is not None and max_ms < self._retention_default_max_lifetime:
# The default max_lifetime is higher than max_ms.
include_null = False
else:
include_null = False
logger.info(
"[purge] Running purge job for %s < max_lifetime <= %s (include NULLs = %s)",
min_ms,
max_ms,
include_null,
)
rooms = await self.store.get_rooms_for_retention_period_in_range(
min_ms, max_ms, include_null
)
logger.debug("[purge] Rooms to purge: %s", rooms)
for room_id, retention_policy in rooms.items():
logger.info("[purge] Attempting to purge messages in room %s", room_id)
if room_id in self._purges_in_progress_by_room:
logger.warning(
"[purge] not purging room %s as there's an ongoing purge running"
" for this room",
room_id,
)
continue
# If max_lifetime is None, it means that the room has no retention policy.
# Given we only retrieve such rooms when there's a default retention policy
# defined in the server's configuration, we can safely assume that's the
# case and use it for this room.
max_lifetime = (
retention_policy["max_lifetime"] or self._retention_default_max_lifetime
)
# Cap the effective max_lifetime to be within the range allowed in the
# config.
# We do this in two steps:
# 1. Make sure it's higher or equal to the minimum allowed value, and if
# it's not replace it with that value. This is because the server
# operator can be required to not delete information before a given
# time, e.g. to comply with freedom of information laws.
# 2. Make sure the resulting value is lower or equal to the maximum allowed
# value, and if it's not replace it with that value. This is because the
# server operator can be required to delete any data after a specific
# amount of time.
if self._retention_allowed_lifetime_min is not None:
max_lifetime = max(self._retention_allowed_lifetime_min, max_lifetime)
if self._retention_allowed_lifetime_max is not None:
max_lifetime = min(max_lifetime, self._retention_allowed_lifetime_max)
logger.debug("[purge] max_lifetime for room %s: %s", room_id, max_lifetime)
# Figure out what token we should start purging at.
ts = self.clock.time_msec() - max_lifetime
stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
r = await self.store.get_room_event_before_stream_ordering(
2019-11-19 08:22:37 -05:00
room_id, stream_ordering,
)
if not r:
logger.warning(
"[purge] purging events not possible: No event found "
"(ts %i => stream_ordering %i)",
2019-11-19 08:22:37 -05:00
ts,
stream_ordering,
)
continue
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
purge_id = random_string(16)
self._purges_by_id[purge_id] = PurgeStatus()
logger.info(
"Starting purging events in room %s (purge_id %s)" % (room_id, purge_id)
)
# We want to purge everything, including local events, and to run the purge in
# the background so that it's not blocking any other operation apart from
# other purges in the same room.
run_as_background_process(
2019-11-19 08:22:37 -05:00
"_purge_history", self._purge_history, purge_id, room_id, token, True,
)
def start_purge_history(
self, room_id: str, token: str, delete_local_events: bool = False
) -> str:
2018-07-20 10:32:23 -04:00
"""Start off a history purge on a room.
Args:
room_id: The room to purge from
token: topological token to delete events before
delete_local_events: True to delete local events as well as
2018-07-20 10:32:23 -04:00
remote ones
Returns:
unique ID for this purge transaction.
2018-07-20 10:32:23 -04:00
"""
if room_id in self._purges_in_progress_by_room:
raise SynapseError(
2019-06-20 05:32:02 -04:00
400, "History purge already in progress for %s" % (room_id,)
2018-07-20 10:32:23 -04:00
)
purge_id = random_string(16)
# we log the purge_id here so that it can be tied back to the
# request id in the log lines.
logger.info("[purge] starting purge_id %s", purge_id)
self._purges_by_id[purge_id] = PurgeStatus()
run_in_background(
2019-06-20 05:32:02 -04:00
self._purge_history, purge_id, room_id, token, delete_local_events
2018-07-20 10:32:23 -04:00
)
return purge_id
async def _purge_history(
self, purge_id: str, room_id: str, token: str, delete_local_events: bool
) -> None:
2018-07-20 10:32:23 -04:00
"""Carry out a history purge on a room.
Args:
purge_id: The id for this purge
room_id: The room to purge from
token: topological token to delete events before
delete_local_events: True to delete local events as well as remote ones
2018-07-20 10:32:23 -04:00
"""
self._purges_in_progress_by_room.add(room_id)
try:
with await self.pagination_lock.write(room_id):
await self.storage.purge_events.purge_history(
2019-10-30 11:12:49 -04:00
room_id, token, delete_local_events
)
2018-07-20 10:32:23 -04:00
logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception:
2019-02-25 11:56:41 -05:00
f = Failure()
logger.error(
2019-06-20 05:32:02 -04:00
"[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
2019-02-25 11:56:41 -05:00
)
2018-07-20 10:32:23 -04:00
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
finally:
self._purges_in_progress_by_room.discard(room_id)
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
2019-06-20 05:32:02 -04:00
2018-07-20 10:32:23 -04:00
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id: str) -> Optional[PurgeStatus]:
2018-07-20 10:32:23 -04:00
"""Get the current status of an active purge
Args:
purge_id: purge_id returned by start_purge_history
2018-07-20 10:32:23 -04:00
"""
return self._purges_by_id.get(purge_id)
async def purge_room(self, room_id: str) -> None:
2019-08-22 05:42:59 -04:00
"""Purge the given room from the database"""
with await self.pagination_lock.write(room_id):
2019-08-22 05:42:59 -04:00
# check we know about the room
await self.store.get_room_version_id(room_id)
2019-08-22 05:42:59 -04:00
# first check that we have no users in this room
joined = await self.store.is_host_joined(room_id, self._server_name)
2019-08-22 05:42:59 -04:00
if joined:
raise SynapseError(400, "Users are still joined to this room")
2019-10-30 11:12:49 -04:00
await self.storage.purge_events.purge_room(room_id)
2019-08-22 05:42:59 -04:00
async def get_messages(
2019-06-20 05:32:02 -04:00
self,
requester: Requester,
room_id: str,
pagin_config: PaginationConfig,
as_client_event: bool = True,
event_filter: Optional[Filter] = None,
) -> Dict[str, Any]:
2018-07-20 10:32:23 -04:00
"""Get messages in a room.
Args:
requester: The user requesting messages.
room_id: The room they want messages from.
pagin_config: The pagination config rules to apply, if any.
as_client_event: True to get events in client-server format.
event_filter: Filter to apply to results or None
2018-07-20 10:32:23 -04:00
Returns:
Pagination API results
2018-07-20 10:32:23 -04:00
"""
user_id = requester.user.to_string()
if pagin_config.from_token:
from_token = pagin_config.from_token
2018-07-20 10:32:23 -04:00
else:
from_token = self.hs.get_event_sources().get_current_token_for_pagination()
2018-07-20 10:32:23 -04:00
if pagin_config.limit is None:
# This shouldn't happen as we've set a default limit before this
# gets called.
raise Exception("limit not set")
2018-07-20 10:32:23 -04:00
room_token = from_token.room_key
2018-07-20 10:32:23 -04:00
with await self.pagination_lock.read(room_id):
(
membership,
member_event_id,
) = await self.auth.check_user_in_room_or_world_readable(
room_id, user_id, allow_departed_users=True
)
2018-07-20 10:32:23 -04:00
if pagin_config.direction == "b":
2018-07-20 10:32:23 -04:00
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
if room_token.topological:
curr_topo = room_token.topological
2018-07-20 10:32:23 -04:00
else:
curr_topo = await self.store.get_current_topological_token(
2018-07-20 10:32:23 -04:00
room_id, room_token.stream
)
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
# they left the room, to save the effort of loading from the
# database.
# This is only None if the room is world_readable, in which
# case "JOIN" would have been returned.
assert member_event_id
2020-09-08 12:43:31 -04:00
leave_token_str = await self.store.get_topological_token_for_event(
2018-07-20 10:32:23 -04:00
member_event_id
)
2020-09-08 12:43:31 -04:00
leave_token = RoomStreamToken.parse(leave_token_str)
assert leave_token.topological is not None
Synapse 1.20.0rc5 (2020-09-18) ============================== In addition to the below, Synapse 1.20.0rc5 also includes the bug fix that was included in 1.19.3. Features -------- - Add flags to the `/versions` endpoint for whether new rooms default to using E2EE. ([\#8343](https://github.com/matrix-org/synapse/issues/8343)) Bugfixes -------- - Fix rate limiting of federation `/send` requests. ([\#8342](https://github.com/matrix-org/synapse/issues/8342)) - Fix a longstanding bug where back pagination over federation could get stuck if it failed to handle a received event. ([\#8349](https://github.com/matrix-org/synapse/issues/8349)) Internal Changes ---------------- - Blacklist [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753) SyTests until it is implemented. ([\#8285](https://github.com/matrix-org/synapse/issues/8285)) -----BEGIN PGP SIGNATURE----- iQIzBAABCAAdFiEEF3tZXk38tRDFVnUIM/xY9qcRMEgFAl9kzk8ACgkQM/xY9qcR MEim6A//aERkhyLGRlGpLd37lCyFQCeffTMH1rTvu04iIBQBaUZ6g7CYWOpK43zT U8kt379+5OShjdAXs/X4XP+ucdHVbrwsRSP3hBS/fFLiDT0fJgP8uiSf5QqO6NnT OqDyXYjcXvj/c6tMKglVtsdh8u4hFwNZjGPMGG68IzJu14uEhnD100cL9jSB9bLB ongWpsQzzdGBpJPSFRjv9dCUSeRbzyUdl1t0uqzrNqyN9s/JnzFTn7ZYo6y3lnSS dHGVMMo/12M2PkbBHnbJVvDY5Q/R7ZxyXlpz0gvSNOQIw8FqYFnuB0Niy5dQhXSR Sy5h4qbczLxqbql1x+lmzeQm4ZMORsW/Tl4C3z6yK6OYaOCJHIf9en4DplTSTqp1 t+85JxWR2wH10d99YHBpaYKmkVovpwgchrO4YWrtXljUFAhhavzf+YiAdOHYT52s RDsDLsvjMbxEHsz4cHfycmshYhjzjb340wkoDXuQpj0zrO99d+Zd83xdK8pS0UQn OaljLRAd/5iBjTSyZPSrB1U5141OzlM3QZVJzaYAnP12yhR9eaX2twSCk+lPYOWd nhLJjNnj1B1XSGArthuE5NLyEiCPz6KyN2RhO0EOx5YjZN9TwH7LS9upyNFe1nN1 GIhO5gz+jWLuBZE3xzRNjJyCx/I/LolpCwGMvKDu6638rpsbrPs= =tT5/ -----END PGP SIGNATURE----- Merge tag 'v1.20.0rc5' into develop Synapse 1.20.0rc5 (2020-09-18) ============================== In addition to the below, Synapse 1.20.0rc5 also includes the bug fix that was included in 1.19.3. Features -------- - Add flags to the `/versions` endpoint for whether new rooms default to using E2EE. ([\#8343](https://github.com/matrix-org/synapse/issues/8343)) Bugfixes -------- - Fix rate limiting of federation `/send` requests. ([\#8342](https://github.com/matrix-org/synapse/issues/8342)) - Fix a longstanding bug where back pagination over federation could get stuck if it failed to handle a received event. ([\#8349](https://github.com/matrix-org/synapse/issues/8349)) Internal Changes ---------------- - Blacklist [MSC2753](https://github.com/matrix-org/matrix-doc/pull/2753) SyTests until it is implemented. ([\#8285](https://github.com/matrix-org/synapse/issues/8285))
2020-09-18 11:17:58 -04:00
if leave_token.topological < curr_topo:
from_token = from_token.copy_and_replace(
"room_key", leave_token
)
2018-07-20 10:32:23 -04:00
await self.hs.get_handlers().federation_handler.maybe_backfill(
room_id, curr_topo, limit=pagin_config.limit,
2018-07-20 10:32:23 -04:00
)
to_room_key = None
if pagin_config.to_token:
to_room_key = pagin_config.to_token.room_key
events, next_key = await self.store.paginate_room_events(
2018-07-20 10:32:23 -04:00
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
direction=pagin_config.direction,
limit=pagin_config.limit,
2018-07-20 10:32:23 -04:00
event_filter=event_filter,
)
next_token = from_token.copy_and_replace("room_key", next_key)
2018-07-20 10:32:23 -04:00
if events:
if event_filter:
events = event_filter.filter(events)
events = await filter_events_for_client(
2019-10-23 12:25:54 -04:00
self.storage, user_id, events, is_peeking=(member_event_id is None)
)
2018-07-20 10:32:23 -04:00
if not events:
return {
"chunk": [],
"start": from_token.to_string(),
"end": next_token.to_string(),
}
2018-07-20 10:32:23 -04:00
2018-08-16 09:22:47 -04:00
state = None
2018-12-06 05:32:05 -05:00
if event_filter and event_filter.lazy_load_members() and len(events) > 0:
2018-08-16 09:22:47 -04:00
# TODO: remove redundant members
# FIXME: we also care about invite targets etc.
state_filter = StateFilter.from_types(
2019-06-20 05:32:02 -04:00
(EventTypes.Member, event.sender) for event in events
)
2018-08-16 09:22:47 -04:00
state_ids = await self.state_store.get_state_ids_for_event(
2019-06-20 05:32:02 -04:00
events[0].event_id, state_filter=state_filter
2018-08-16 09:22:47 -04:00
)
if state_ids:
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()
2018-08-16 09:22:47 -04:00
2018-07-20 10:32:23 -04:00
time_now = self.clock.time_msec()
chunk = {
"chunk": (
await self._event_serializer.serialize_events(
2019-06-20 05:32:02 -04:00
events, time_now, as_client_event=as_client_event
)
),
"start": from_token.to_string(),
2018-07-20 10:32:23 -04:00
"end": next_token.to_string(),
}
2018-08-16 09:22:47 -04:00
if state:
chunk["state"] = await self._event_serializer.serialize_events(
state, time_now, as_client_event=as_client_event
)
2018-08-16 09:22:47 -04:00
return chunk