mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-04 18:20:50 -05:00
Merge remote-tracking branch 'origin/develop' into rav/remove_who_forgot_in_room
This commit is contained in:
commit
30bfed5aa5
1
changelog.d/3555.feature
Normal file
1
changelog.d/3555.feature
Normal file
@ -0,0 +1 @@
|
|||||||
|
Add support for client_reader to handle more APIs
|
1
changelog.d/3584.misc
Normal file
1
changelog.d/3584.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Lazily load state on master process when using workers to reduce DB consumption
|
1
changelog.d/3590.misc
Normal file
1
changelog.d/3590.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Add some measure blocks to persist_events
|
1
changelog.d/3591.misc
Normal file
1
changelog.d/3591.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix some random logcontext leaks.
|
@ -206,6 +206,10 @@ Handles client API endpoints. It can handle REST endpoints matching the
|
|||||||
following regular expressions::
|
following regular expressions::
|
||||||
|
|
||||||
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
|
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
|
||||||
|
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
|
||||||
|
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
|
||||||
|
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
|
||||||
|
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
|
||||||
|
|
||||||
``synapse.app.user_dir``
|
``synapse.app.user_dir``
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
@ -739,3 +739,37 @@ class Auth(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
return query_params[0]
|
return query_params[0]
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def check_in_room_or_world_readable(self, room_id, user_id):
|
||||||
|
"""Checks that the user is or was in the room or the room is world
|
||||||
|
readable. If it isn't then an exception is raised.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[tuple[str, str|None]]: Resolves to the current membership of
|
||||||
|
the user in the room and the membership event ID of the user. If
|
||||||
|
the user is not in the room and never has been, then
|
||||||
|
`(Membership.JOIN, None)` is returned.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
# check_user_was_in_room will return the most recent membership
|
||||||
|
# event for the user if:
|
||||||
|
# * The user is a non-guest user, and was ever in the room
|
||||||
|
# * The user is a guest user, and has joined the room
|
||||||
|
# else it will throw.
|
||||||
|
member_event = yield self.check_user_was_in_room(room_id, user_id)
|
||||||
|
defer.returnValue((member_event.membership, member_event.event_id))
|
||||||
|
except AuthError:
|
||||||
|
visibility = yield self.state.get_current_state(
|
||||||
|
room_id, EventTypes.RoomHistoryVisibility, ""
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
visibility and
|
||||||
|
visibility.content["history_visibility"] == "world_readable"
|
||||||
|
):
|
||||||
|
defer.returnValue((Membership.JOIN, None))
|
||||||
|
return
|
||||||
|
raise AuthError(
|
||||||
|
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
||||||
|
)
|
||||||
|
@ -40,7 +40,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
|
|||||||
from synapse.replication.slave.storage.room import RoomStore
|
from synapse.replication.slave.storage.room import RoomStore
|
||||||
from synapse.replication.slave.storage.transactions import TransactionStore
|
from synapse.replication.slave.storage.transactions import TransactionStore
|
||||||
from synapse.replication.tcp.client import ReplicationClientHandler
|
from synapse.replication.tcp.client import ReplicationClientHandler
|
||||||
from synapse.rest.client.v1.room import PublicRoomListRestServlet
|
from synapse.rest.client.v1.room import (
|
||||||
|
JoinedRoomMemberListRestServlet,
|
||||||
|
PublicRoomListRestServlet,
|
||||||
|
RoomEventContextServlet,
|
||||||
|
RoomMemberListRestServlet,
|
||||||
|
RoomStateRestServlet,
|
||||||
|
)
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.storage.engines import create_engine
|
from synapse.storage.engines import create_engine
|
||||||
from synapse.util.httpresourcetree import create_resource_tree
|
from synapse.util.httpresourcetree import create_resource_tree
|
||||||
@ -82,7 +88,13 @@ class ClientReaderServer(HomeServer):
|
|||||||
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
|
||||||
elif name == "client":
|
elif name == "client":
|
||||||
resource = JsonResource(self, canonical_json=False)
|
resource = JsonResource(self, canonical_json=False)
|
||||||
|
|
||||||
PublicRoomListRestServlet(self).register(resource)
|
PublicRoomListRestServlet(self).register(resource)
|
||||||
|
RoomMemberListRestServlet(self).register(resource)
|
||||||
|
JoinedRoomMemberListRestServlet(self).register(resource)
|
||||||
|
RoomStateRestServlet(self).register(resource)
|
||||||
|
RoomEventContextServlet(self).register(resource)
|
||||||
|
|
||||||
resources.update({
|
resources.update({
|
||||||
"/_matrix/client/r0": resource,
|
"/_matrix/client/r0": resource,
|
||||||
"/_matrix/client/unstable": resource,
|
"/_matrix/client/unstable": resource,
|
||||||
|
@ -163,6 +163,9 @@ class EventContext(object):
|
|||||||
context._prev_state_id = input["prev_state_id"]
|
context._prev_state_id = input["prev_state_id"]
|
||||||
context._event_type = input["event_type"]
|
context._event_type = input["event_type"]
|
||||||
context._event_state_key = input["event_state_key"]
|
context._event_state_key = input["event_state_key"]
|
||||||
|
|
||||||
|
context._current_state_ids = None
|
||||||
|
context._prev_state_ids = None
|
||||||
context._fetching_state_deferred = None
|
context._fetching_state_deferred = None
|
||||||
|
|
||||||
context.state_group = input["state_group"]
|
context.state_group = input["state_group"]
|
||||||
@ -214,6 +217,17 @@ class EventContext(object):
|
|||||||
|
|
||||||
defer.returnValue(self._prev_state_ids)
|
defer.returnValue(self._prev_state_ids)
|
||||||
|
|
||||||
|
def get_cached_current_state_ids(self):
|
||||||
|
"""Gets the current state IDs if we have them already cached.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dict[(str, str), str]|None: Returns None if we haven't cached the
|
||||||
|
state or if state_group is None, which happens when the associated
|
||||||
|
event is an outlier.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self._current_state_ids
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _fill_out_state(self, store):
|
def _fill_out_state(self, store):
|
||||||
"""Called to populate the _current_state_ids and _prev_state_ids
|
"""Called to populate the _current_state_ids and _prev_state_ids
|
||||||
|
@ -17,9 +17,7 @@ from .admin import AdminHandler
|
|||||||
from .directory import DirectoryHandler
|
from .directory import DirectoryHandler
|
||||||
from .federation import FederationHandler
|
from .federation import FederationHandler
|
||||||
from .identity import IdentityHandler
|
from .identity import IdentityHandler
|
||||||
from .message import MessageHandler
|
|
||||||
from .register import RegistrationHandler
|
from .register import RegistrationHandler
|
||||||
from .room import RoomContextHandler
|
|
||||||
from .search import SearchHandler
|
from .search import SearchHandler
|
||||||
|
|
||||||
|
|
||||||
@ -44,10 +42,8 @@ class Handlers(object):
|
|||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.registration_handler = RegistrationHandler(hs)
|
self.registration_handler = RegistrationHandler(hs)
|
||||||
self.message_handler = MessageHandler(hs)
|
|
||||||
self.federation_handler = FederationHandler(hs)
|
self.federation_handler = FederationHandler(hs)
|
||||||
self.directory_handler = DirectoryHandler(hs)
|
self.directory_handler = DirectoryHandler(hs)
|
||||||
self.admin_handler = AdminHandler(hs)
|
self.admin_handler = AdminHandler(hs)
|
||||||
self.identity_handler = IdentityHandler(hs)
|
self.identity_handler = IdentityHandler(hs)
|
||||||
self.search_handler = SearchHandler(hs)
|
self.search_handler = SearchHandler(hs)
|
||||||
self.room_context_handler = RoomContextHandler(hs)
|
|
||||||
|
@ -23,6 +23,7 @@ from twisted.internet import defer
|
|||||||
|
|
||||||
import synapse
|
import synapse
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
|
|||||||
yield self._check_user_exists(event.state_key)
|
yield self._check_user_exists(event.state_key)
|
||||||
|
|
||||||
if not self.started_scheduler:
|
if not self.started_scheduler:
|
||||||
self.scheduler.start().addErrback(log_failure)
|
def start_scheduler():
|
||||||
|
return self.scheduler.start().addErrback(log_failure)
|
||||||
|
run_as_background_process("as_scheduler", start_scheduler)
|
||||||
self.started_scheduler = True
|
self.started_scheduler = True
|
||||||
|
|
||||||
# Fork off pushes to these services
|
# Fork off pushes to these services
|
||||||
|
@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
|
|||||||
try:
|
try:
|
||||||
if event.membership == Membership.JOIN:
|
if event.membership == Membership.JOIN:
|
||||||
room_end_token = now_token.room_key
|
room_end_token = now_token.room_key
|
||||||
deferred_room_state = self.state_handler.get_current_state(
|
deferred_room_state = run_in_background(
|
||||||
event.room_id
|
self.state_handler.get_current_state,
|
||||||
|
event.room_id,
|
||||||
)
|
)
|
||||||
elif event.membership == Membership.LEAVE:
|
elif event.membership == Membership.LEAVE:
|
||||||
room_end_token = "s%d" % (event.stream_ordering,)
|
room_end_token = "s%d" % (event.stream_ordering,)
|
||||||
deferred_room_state = self.store.get_state_for_events(
|
deferred_room_state = run_in_background(
|
||||||
[event.event_id], None
|
self.store.get_state_for_events,
|
||||||
|
[event.event_id], None,
|
||||||
)
|
)
|
||||||
deferred_room_state.addCallback(
|
deferred_room_state.addCallback(
|
||||||
lambda states: states[event.event_id]
|
lambda states: states[event.event_id]
|
||||||
@ -387,19 +389,21 @@ class InitialSyncHandler(BaseHandler):
|
|||||||
receipts = []
|
receipts = []
|
||||||
defer.returnValue(receipts)
|
defer.returnValue(receipts)
|
||||||
|
|
||||||
presence, receipts, (messages, token) = yield defer.gatherResults(
|
presence, receipts, (messages, token) = yield make_deferred_yieldable(
|
||||||
[
|
defer.gatherResults(
|
||||||
run_in_background(get_presence),
|
[
|
||||||
run_in_background(get_receipts),
|
run_in_background(get_presence),
|
||||||
run_in_background(
|
run_in_background(get_receipts),
|
||||||
self.store.get_recent_events_for_room,
|
run_in_background(
|
||||||
room_id,
|
self.store.get_recent_events_for_room,
|
||||||
limit=limit,
|
room_id,
|
||||||
end_token=now_token.room_key,
|
limit=limit,
|
||||||
)
|
end_token=now_token.room_key,
|
||||||
],
|
)
|
||||||
consumeErrors=True,
|
],
|
||||||
).addErrback(unwrapFirstError)
|
consumeErrors=True,
|
||||||
|
).addErrback(unwrapFirstError),
|
||||||
|
)
|
||||||
|
|
||||||
messages = yield filter_events_for_client(
|
messages = yield filter_events_for_client(
|
||||||
self.store, user_id, messages, is_peeking=is_peeking,
|
self.store, user_id, messages, is_peeking=is_peeking,
|
||||||
|
@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
|
|||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet.defer import succeed
|
from twisted.internet.defer import succeed
|
||||||
from twisted.python.failure import Failure
|
|
||||||
|
|
||||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
||||||
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
|
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
|
||||||
@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
|
|||||||
from synapse.events.utils import serialize_event
|
from synapse.events.utils import serialize_event
|
||||||
from synapse.events.validator import EventValidator
|
from synapse.events.validator import EventValidator
|
||||||
from synapse.replication.http.send_event import send_event_to_master
|
from synapse.replication.http.send_event import send_event_to_master
|
||||||
from synapse.types import RoomAlias, RoomStreamToken, UserID
|
from synapse.types import RoomAlias, UserID
|
||||||
from synapse.util.async import Linearizer, ReadWriteLock
|
from synapse.util.async import Linearizer
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.logcontext import run_in_background
|
from synapse.util.logcontext import run_in_background
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.util.stringutils import random_string
|
|
||||||
from synapse.visibility import filter_events_for_client
|
|
||||||
|
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class PurgeStatus(object):
|
class MessageHandler(object):
|
||||||
"""Object tracking the status of a purge request
|
"""Contains some read only APIs to get state about a room
|
||||||
|
|
||||||
This class contains information on the progress of a purge request, for
|
|
||||||
return by get_purge_status.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
status (int): Tracks whether this request has completed. One of
|
|
||||||
STATUS_{ACTIVE,COMPLETE,FAILED}
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
STATUS_ACTIVE = 0
|
|
||||||
STATUS_COMPLETE = 1
|
|
||||||
STATUS_FAILED = 2
|
|
||||||
|
|
||||||
STATUS_TEXT = {
|
|
||||||
STATUS_ACTIVE: "active",
|
|
||||||
STATUS_COMPLETE: "complete",
|
|
||||||
STATUS_FAILED: "failed",
|
|
||||||
}
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.status = PurgeStatus.STATUS_ACTIVE
|
|
||||||
|
|
||||||
def asdict(self):
|
|
||||||
return {
|
|
||||||
"status": PurgeStatus.STATUS_TEXT[self.status]
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class MessageHandler(BaseHandler):
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(MessageHandler, self).__init__(hs)
|
self.auth = hs.get_auth()
|
||||||
self.hs = hs
|
|
||||||
self.state = hs.get_state_handler()
|
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
self.state = hs.get_state_handler()
|
||||||
self.pagination_lock = ReadWriteLock()
|
self.store = hs.get_datastore()
|
||||||
self._purges_in_progress_by_room = set()
|
|
||||||
# map from purge id to PurgeStatus
|
|
||||||
self._purges_by_id = {}
|
|
||||||
|
|
||||||
def start_purge_history(self, room_id, token,
|
|
||||||
delete_local_events=False):
|
|
||||||
"""Start off a history purge on a room.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
room_id (str): The room to purge from
|
|
||||||
|
|
||||||
token (str): topological token to delete events before
|
|
||||||
delete_local_events (bool): True to delete local events as well as
|
|
||||||
remote ones
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: unique ID for this purge transaction.
|
|
||||||
"""
|
|
||||||
if room_id in self._purges_in_progress_by_room:
|
|
||||||
raise SynapseError(
|
|
||||||
400,
|
|
||||||
"History purge already in progress for %s" % (room_id, ),
|
|
||||||
)
|
|
||||||
|
|
||||||
purge_id = random_string(16)
|
|
||||||
|
|
||||||
# we log the purge_id here so that it can be tied back to the
|
|
||||||
# request id in the log lines.
|
|
||||||
logger.info("[purge] starting purge_id %s", purge_id)
|
|
||||||
|
|
||||||
self._purges_by_id[purge_id] = PurgeStatus()
|
|
||||||
run_in_background(
|
|
||||||
self._purge_history,
|
|
||||||
purge_id, room_id, token, delete_local_events,
|
|
||||||
)
|
|
||||||
return purge_id
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _purge_history(self, purge_id, room_id, token,
|
|
||||||
delete_local_events):
|
|
||||||
"""Carry out a history purge on a room.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
purge_id (str): The id for this purge
|
|
||||||
room_id (str): The room to purge from
|
|
||||||
token (str): topological token to delete events before
|
|
||||||
delete_local_events (bool): True to delete local events as well as
|
|
||||||
remote ones
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Deferred
|
|
||||||
"""
|
|
||||||
self._purges_in_progress_by_room.add(room_id)
|
|
||||||
try:
|
|
||||||
with (yield self.pagination_lock.write(room_id)):
|
|
||||||
yield self.store.purge_history(
|
|
||||||
room_id, token, delete_local_events,
|
|
||||||
)
|
|
||||||
logger.info("[purge] complete")
|
|
||||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
|
||||||
except Exception:
|
|
||||||
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
|
|
||||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
|
||||||
finally:
|
|
||||||
self._purges_in_progress_by_room.discard(room_id)
|
|
||||||
|
|
||||||
# remove the purge from the list 24 hours after it completes
|
|
||||||
def clear_purge():
|
|
||||||
del self._purges_by_id[purge_id]
|
|
||||||
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
|
|
||||||
|
|
||||||
def get_purge_status(self, purge_id):
|
|
||||||
"""Get the current status of an active purge
|
|
||||||
|
|
||||||
Args:
|
|
||||||
purge_id (str): purge_id returned by start_purge_history
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
PurgeStatus|None
|
|
||||||
"""
|
|
||||||
return self._purges_by_id.get(purge_id)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
|
||||||
as_client_event=True, event_filter=None):
|
|
||||||
"""Get messages in a room.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
requester (Requester): The user requesting messages.
|
|
||||||
room_id (str): The room they want messages from.
|
|
||||||
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
|
||||||
config rules to apply, if any.
|
|
||||||
as_client_event (bool): True to get events in client-server format.
|
|
||||||
event_filter (Filter): Filter to apply to results or None
|
|
||||||
Returns:
|
|
||||||
dict: Pagination API results
|
|
||||||
"""
|
|
||||||
user_id = requester.user.to_string()
|
|
||||||
|
|
||||||
if pagin_config.from_token:
|
|
||||||
room_token = pagin_config.from_token.room_key
|
|
||||||
else:
|
|
||||||
pagin_config.from_token = (
|
|
||||||
yield self.hs.get_event_sources().get_current_token_for_room(
|
|
||||||
room_id=room_id
|
|
||||||
)
|
|
||||||
)
|
|
||||||
room_token = pagin_config.from_token.room_key
|
|
||||||
|
|
||||||
room_token = RoomStreamToken.parse(room_token)
|
|
||||||
|
|
||||||
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
|
|
||||||
"room_key", str(room_token)
|
|
||||||
)
|
|
||||||
|
|
||||||
source_config = pagin_config.get_source_config("room")
|
|
||||||
|
|
||||||
with (yield self.pagination_lock.read(room_id)):
|
|
||||||
membership, member_event_id = yield self._check_in_room_or_world_readable(
|
|
||||||
room_id, user_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if source_config.direction == 'b':
|
|
||||||
# if we're going backwards, we might need to backfill. This
|
|
||||||
# requires that we have a topo token.
|
|
||||||
if room_token.topological:
|
|
||||||
max_topo = room_token.topological
|
|
||||||
else:
|
|
||||||
max_topo = yield self.store.get_max_topological_token(
|
|
||||||
room_id, room_token.stream
|
|
||||||
)
|
|
||||||
|
|
||||||
if membership == Membership.LEAVE:
|
|
||||||
# If they have left the room then clamp the token to be before
|
|
||||||
# they left the room, to save the effort of loading from the
|
|
||||||
# database.
|
|
||||||
leave_token = yield self.store.get_topological_token_for_event(
|
|
||||||
member_event_id
|
|
||||||
)
|
|
||||||
leave_token = RoomStreamToken.parse(leave_token)
|
|
||||||
if leave_token.topological < max_topo:
|
|
||||||
source_config.from_key = str(leave_token)
|
|
||||||
|
|
||||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
|
||||||
room_id, max_topo
|
|
||||||
)
|
|
||||||
|
|
||||||
events, next_key = yield self.store.paginate_room_events(
|
|
||||||
room_id=room_id,
|
|
||||||
from_key=source_config.from_key,
|
|
||||||
to_key=source_config.to_key,
|
|
||||||
direction=source_config.direction,
|
|
||||||
limit=source_config.limit,
|
|
||||||
event_filter=event_filter,
|
|
||||||
)
|
|
||||||
|
|
||||||
next_token = pagin_config.from_token.copy_and_replace(
|
|
||||||
"room_key", next_key
|
|
||||||
)
|
|
||||||
|
|
||||||
if not events:
|
|
||||||
defer.returnValue({
|
|
||||||
"chunk": [],
|
|
||||||
"start": pagin_config.from_token.to_string(),
|
|
||||||
"end": next_token.to_string(),
|
|
||||||
})
|
|
||||||
|
|
||||||
if event_filter:
|
|
||||||
events = event_filter.filter(events)
|
|
||||||
|
|
||||||
events = yield filter_events_for_client(
|
|
||||||
self.store,
|
|
||||||
user_id,
|
|
||||||
events,
|
|
||||||
is_peeking=(member_event_id is None),
|
|
||||||
)
|
|
||||||
|
|
||||||
time_now = self.clock.time_msec()
|
|
||||||
|
|
||||||
chunk = {
|
|
||||||
"chunk": [
|
|
||||||
serialize_event(e, time_now, as_client_event)
|
|
||||||
for e in events
|
|
||||||
],
|
|
||||||
"start": pagin_config.from_token.to_string(),
|
|
||||||
"end": next_token.to_string(),
|
|
||||||
}
|
|
||||||
|
|
||||||
defer.returnValue(chunk)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_room_data(self, user_id=None, room_id=None,
|
def get_room_data(self, user_id=None, room_id=None,
|
||||||
@ -286,12 +64,12 @@ class MessageHandler(BaseHandler):
|
|||||||
Raises:
|
Raises:
|
||||||
SynapseError if something went wrong.
|
SynapseError if something went wrong.
|
||||||
"""
|
"""
|
||||||
membership, membership_event_id = yield self._check_in_room_or_world_readable(
|
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||||
room_id, user_id
|
room_id, user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
if membership == Membership.JOIN:
|
if membership == Membership.JOIN:
|
||||||
data = yield self.state_handler.get_current_state(
|
data = yield self.state.get_current_state(
|
||||||
room_id, event_type, state_key
|
room_id, event_type, state_key
|
||||||
)
|
)
|
||||||
elif membership == Membership.LEAVE:
|
elif membership == Membership.LEAVE:
|
||||||
@ -303,31 +81,6 @@ class MessageHandler(BaseHandler):
|
|||||||
|
|
||||||
defer.returnValue(data)
|
defer.returnValue(data)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _check_in_room_or_world_readable(self, room_id, user_id):
|
|
||||||
try:
|
|
||||||
# check_user_was_in_room will return the most recent membership
|
|
||||||
# event for the user if:
|
|
||||||
# * The user is a non-guest user, and was ever in the room
|
|
||||||
# * The user is a guest user, and has joined the room
|
|
||||||
# else it will throw.
|
|
||||||
member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
|
|
||||||
defer.returnValue((member_event.membership, member_event.event_id))
|
|
||||||
return
|
|
||||||
except AuthError:
|
|
||||||
visibility = yield self.state_handler.get_current_state(
|
|
||||||
room_id, EventTypes.RoomHistoryVisibility, ""
|
|
||||||
)
|
|
||||||
if (
|
|
||||||
visibility and
|
|
||||||
visibility.content["history_visibility"] == "world_readable"
|
|
||||||
):
|
|
||||||
defer.returnValue((Membership.JOIN, None))
|
|
||||||
return
|
|
||||||
raise AuthError(
|
|
||||||
403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_state_events(self, user_id, room_id, is_guest=False):
|
def get_state_events(self, user_id, room_id, is_guest=False):
|
||||||
"""Retrieve all state events for a given room. If the user is
|
"""Retrieve all state events for a given room. If the user is
|
||||||
@ -340,12 +93,12 @@ class MessageHandler(BaseHandler):
|
|||||||
Returns:
|
Returns:
|
||||||
A list of dicts representing state events. [{}, {}, {}]
|
A list of dicts representing state events. [{}, {}, {}]
|
||||||
"""
|
"""
|
||||||
membership, membership_event_id = yield self._check_in_room_or_world_readable(
|
membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||||
room_id, user_id
|
room_id, user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
if membership == Membership.JOIN:
|
if membership == Membership.JOIN:
|
||||||
room_state = yield self.state_handler.get_current_state(room_id)
|
room_state = yield self.state.get_current_state(room_id)
|
||||||
elif membership == Membership.LEAVE:
|
elif membership == Membership.LEAVE:
|
||||||
room_state = yield self.store.get_state_for_events(
|
room_state = yield self.store.get_state_for_events(
|
||||||
[membership_event_id], None
|
[membership_event_id], None
|
||||||
@ -373,7 +126,7 @@ class MessageHandler(BaseHandler):
|
|||||||
if not requester.app_service:
|
if not requester.app_service:
|
||||||
# We check AS auth after fetching the room membership, as it
|
# We check AS auth after fetching the room membership, as it
|
||||||
# requires us to pull out all joined members anyway.
|
# requires us to pull out all joined members anyway.
|
||||||
membership, _ = yield self._check_in_room_or_world_readable(
|
membership, _ = yield self.auth.check_in_room_or_world_readable(
|
||||||
room_id, user_id
|
room_id, user_id
|
||||||
)
|
)
|
||||||
if membership != Membership.JOIN:
|
if membership != Membership.JOIN:
|
||||||
|
265
synapse/handlers/pagination.py
Normal file
265
synapse/handlers/pagination.py
Normal file
@ -0,0 +1,265 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2014 - 2016 OpenMarket Ltd
|
||||||
|
# Copyright 2017 - 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
|
from synapse.api.constants import Membership
|
||||||
|
from synapse.api.errors import SynapseError
|
||||||
|
from synapse.events.utils import serialize_event
|
||||||
|
from synapse.types import RoomStreamToken
|
||||||
|
from synapse.util.async import ReadWriteLock
|
||||||
|
from synapse.util.logcontext import run_in_background
|
||||||
|
from synapse.util.stringutils import random_string
|
||||||
|
from synapse.visibility import filter_events_for_client
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PurgeStatus(object):
|
||||||
|
"""Object tracking the status of a purge request
|
||||||
|
|
||||||
|
This class contains information on the progress of a purge request, for
|
||||||
|
return by get_purge_status.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
status (int): Tracks whether this request has completed. One of
|
||||||
|
STATUS_{ACTIVE,COMPLETE,FAILED}
|
||||||
|
"""
|
||||||
|
|
||||||
|
STATUS_ACTIVE = 0
|
||||||
|
STATUS_COMPLETE = 1
|
||||||
|
STATUS_FAILED = 2
|
||||||
|
|
||||||
|
STATUS_TEXT = {
|
||||||
|
STATUS_ACTIVE: "active",
|
||||||
|
STATUS_COMPLETE: "complete",
|
||||||
|
STATUS_FAILED: "failed",
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.status = PurgeStatus.STATUS_ACTIVE
|
||||||
|
|
||||||
|
def asdict(self):
|
||||||
|
return {
|
||||||
|
"status": PurgeStatus.STATUS_TEXT[self.status]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class PaginationHandler(object):
|
||||||
|
"""Handles pagination and purge history requests.
|
||||||
|
|
||||||
|
These are in the same handler due to the fact we need to block clients
|
||||||
|
paginating during a purge.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.hs = hs
|
||||||
|
self.auth = hs.get_auth()
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
|
self.pagination_lock = ReadWriteLock()
|
||||||
|
self._purges_in_progress_by_room = set()
|
||||||
|
# map from purge id to PurgeStatus
|
||||||
|
self._purges_by_id = {}
|
||||||
|
|
||||||
|
def start_purge_history(self, room_id, token,
|
||||||
|
delete_local_events=False):
|
||||||
|
"""Start off a history purge on a room.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str): The room to purge from
|
||||||
|
|
||||||
|
token (str): topological token to delete events before
|
||||||
|
delete_local_events (bool): True to delete local events as well as
|
||||||
|
remote ones
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: unique ID for this purge transaction.
|
||||||
|
"""
|
||||||
|
if room_id in self._purges_in_progress_by_room:
|
||||||
|
raise SynapseError(
|
||||||
|
400,
|
||||||
|
"History purge already in progress for %s" % (room_id, ),
|
||||||
|
)
|
||||||
|
|
||||||
|
purge_id = random_string(16)
|
||||||
|
|
||||||
|
# we log the purge_id here so that it can be tied back to the
|
||||||
|
# request id in the log lines.
|
||||||
|
logger.info("[purge] starting purge_id %s", purge_id)
|
||||||
|
|
||||||
|
self._purges_by_id[purge_id] = PurgeStatus()
|
||||||
|
run_in_background(
|
||||||
|
self._purge_history,
|
||||||
|
purge_id, room_id, token, delete_local_events,
|
||||||
|
)
|
||||||
|
return purge_id
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _purge_history(self, purge_id, room_id, token,
|
||||||
|
delete_local_events):
|
||||||
|
"""Carry out a history purge on a room.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
purge_id (str): The id for this purge
|
||||||
|
room_id (str): The room to purge from
|
||||||
|
token (str): topological token to delete events before
|
||||||
|
delete_local_events (bool): True to delete local events as well as
|
||||||
|
remote ones
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred
|
||||||
|
"""
|
||||||
|
self._purges_in_progress_by_room.add(room_id)
|
||||||
|
try:
|
||||||
|
with (yield self.pagination_lock.write(room_id)):
|
||||||
|
yield self.store.purge_history(
|
||||||
|
room_id, token, delete_local_events,
|
||||||
|
)
|
||||||
|
logger.info("[purge] complete")
|
||||||
|
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
||||||
|
except Exception:
|
||||||
|
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
|
||||||
|
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
||||||
|
finally:
|
||||||
|
self._purges_in_progress_by_room.discard(room_id)
|
||||||
|
|
||||||
|
# remove the purge from the list 24 hours after it completes
|
||||||
|
def clear_purge():
|
||||||
|
del self._purges_by_id[purge_id]
|
||||||
|
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
|
||||||
|
|
||||||
|
def get_purge_status(self, purge_id):
|
||||||
|
"""Get the current status of an active purge
|
||||||
|
|
||||||
|
Args:
|
||||||
|
purge_id (str): purge_id returned by start_purge_history
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
PurgeStatus|None
|
||||||
|
"""
|
||||||
|
return self._purges_by_id.get(purge_id)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||||
|
as_client_event=True, event_filter=None):
|
||||||
|
"""Get messages in a room.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
requester (Requester): The user requesting messages.
|
||||||
|
room_id (str): The room they want messages from.
|
||||||
|
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
||||||
|
config rules to apply, if any.
|
||||||
|
as_client_event (bool): True to get events in client-server format.
|
||||||
|
event_filter (Filter): Filter to apply to results or None
|
||||||
|
Returns:
|
||||||
|
dict: Pagination API results
|
||||||
|
"""
|
||||||
|
user_id = requester.user.to_string()
|
||||||
|
|
||||||
|
if pagin_config.from_token:
|
||||||
|
room_token = pagin_config.from_token.room_key
|
||||||
|
else:
|
||||||
|
pagin_config.from_token = (
|
||||||
|
yield self.hs.get_event_sources().get_current_token_for_room(
|
||||||
|
room_id=room_id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
room_token = pagin_config.from_token.room_key
|
||||||
|
|
||||||
|
room_token = RoomStreamToken.parse(room_token)
|
||||||
|
|
||||||
|
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
|
||||||
|
"room_key", str(room_token)
|
||||||
|
)
|
||||||
|
|
||||||
|
source_config = pagin_config.get_source_config("room")
|
||||||
|
|
||||||
|
with (yield self.pagination_lock.read(room_id)):
|
||||||
|
membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||||
|
room_id, user_id
|
||||||
|
)
|
||||||
|
|
||||||
|
if source_config.direction == 'b':
|
||||||
|
# if we're going backwards, we might need to backfill. This
|
||||||
|
# requires that we have a topo token.
|
||||||
|
if room_token.topological:
|
||||||
|
max_topo = room_token.topological
|
||||||
|
else:
|
||||||
|
max_topo = yield self.store.get_max_topological_token(
|
||||||
|
room_id, room_token.stream
|
||||||
|
)
|
||||||
|
|
||||||
|
if membership == Membership.LEAVE:
|
||||||
|
# If they have left the room then clamp the token to be before
|
||||||
|
# they left the room, to save the effort of loading from the
|
||||||
|
# database.
|
||||||
|
leave_token = yield self.store.get_topological_token_for_event(
|
||||||
|
member_event_id
|
||||||
|
)
|
||||||
|
leave_token = RoomStreamToken.parse(leave_token)
|
||||||
|
if leave_token.topological < max_topo:
|
||||||
|
source_config.from_key = str(leave_token)
|
||||||
|
|
||||||
|
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||||
|
room_id, max_topo
|
||||||
|
)
|
||||||
|
|
||||||
|
events, next_key = yield self.store.paginate_room_events(
|
||||||
|
room_id=room_id,
|
||||||
|
from_key=source_config.from_key,
|
||||||
|
to_key=source_config.to_key,
|
||||||
|
direction=source_config.direction,
|
||||||
|
limit=source_config.limit,
|
||||||
|
event_filter=event_filter,
|
||||||
|
)
|
||||||
|
|
||||||
|
next_token = pagin_config.from_token.copy_and_replace(
|
||||||
|
"room_key", next_key
|
||||||
|
)
|
||||||
|
|
||||||
|
if not events:
|
||||||
|
defer.returnValue({
|
||||||
|
"chunk": [],
|
||||||
|
"start": pagin_config.from_token.to_string(),
|
||||||
|
"end": next_token.to_string(),
|
||||||
|
})
|
||||||
|
|
||||||
|
if event_filter:
|
||||||
|
events = event_filter.filter(events)
|
||||||
|
|
||||||
|
events = yield filter_events_for_client(
|
||||||
|
self.store,
|
||||||
|
user_id,
|
||||||
|
events,
|
||||||
|
is_peeking=(member_event_id is None),
|
||||||
|
)
|
||||||
|
|
||||||
|
time_now = self.clock.time_msec()
|
||||||
|
|
||||||
|
chunk = {
|
||||||
|
"chunk": [
|
||||||
|
serialize_event(e, time_now, as_client_event)
|
||||||
|
for e in events
|
||||||
|
],
|
||||||
|
"start": pagin_config.from_token.to_string(),
|
||||||
|
"end": next_token.to_string(),
|
||||||
|
}
|
||||||
|
|
||||||
|
defer.returnValue(chunk)
|
@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class RoomContextHandler(BaseHandler):
|
class RoomContextHandler(object):
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.hs = hs
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_event_context(self, user, room_id, event_id, limit):
|
def get_event_context(self, user, room_id, event_id, limit):
|
||||||
"""Retrieves events, pagination tokens and state around a given event
|
"""Retrieves events, pagination tokens and state around a given event
|
||||||
|
@ -244,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
|
|||||||
hs (synapse.server.HomeServer)
|
hs (synapse.server.HomeServer)
|
||||||
"""
|
"""
|
||||||
super(PurgeHistoryRestServlet, self).__init__(hs)
|
super(PurgeHistoryRestServlet, self).__init__(hs)
|
||||||
self.handlers = hs.get_handlers()
|
self.pagination_handler = hs.get_pagination_handler()
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@ -319,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
|
|||||||
errcode=Codes.BAD_JSON,
|
errcode=Codes.BAD_JSON,
|
||||||
)
|
)
|
||||||
|
|
||||||
purge_id = yield self.handlers.message_handler.start_purge_history(
|
purge_id = yield self.pagination_handler.start_purge_history(
|
||||||
room_id, token,
|
room_id, token,
|
||||||
delete_local_events=delete_local_events,
|
delete_local_events=delete_local_events,
|
||||||
)
|
)
|
||||||
@ -341,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
|
|||||||
hs (synapse.server.HomeServer)
|
hs (synapse.server.HomeServer)
|
||||||
"""
|
"""
|
||||||
super(PurgeHistoryStatusRestServlet, self).__init__(hs)
|
super(PurgeHistoryStatusRestServlet, self).__init__(hs)
|
||||||
self.handlers = hs.get_handlers()
|
self.pagination_handler = hs.get_pagination_handler()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_GET(self, request, purge_id):
|
def on_GET(self, request, purge_id):
|
||||||
@ -351,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
|
|||||||
if not is_admin:
|
if not is_admin:
|
||||||
raise AuthError(403, "You are not a server admin")
|
raise AuthError(403, "You are not a server admin")
|
||||||
|
|
||||||
purge_status = self.handlers.message_handler.get_purge_status(purge_id)
|
purge_status = self.pagination_handler.get_purge_status(purge_id)
|
||||||
if purge_status is None:
|
if purge_status is None:
|
||||||
raise NotFoundError("purge id '%s' not found" % purge_id)
|
raise NotFoundError("purge id '%s' not found" % purge_id)
|
||||||
|
|
||||||
|
@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
|
|||||||
self.handlers = hs.get_handlers()
|
self.handlers = hs.get_handlers()
|
||||||
self.event_creation_hander = hs.get_event_creation_handler()
|
self.event_creation_hander = hs.get_event_creation_handler()
|
||||||
self.room_member_handler = hs.get_room_member_handler()
|
self.room_member_handler = hs.get_room_member_handler()
|
||||||
|
self.message_handler = hs.get_message_handler()
|
||||||
|
|
||||||
def register(self, http_server):
|
def register(self, http_server):
|
||||||
# /room/$roomid/state/$eventtype
|
# /room/$roomid/state/$eventtype
|
||||||
@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
|
|||||||
format = parse_string(request, "format", default="content",
|
format = parse_string(request, "format", default="content",
|
||||||
allowed_values=["content", "event"])
|
allowed_values=["content", "event"])
|
||||||
|
|
||||||
msg_handler = self.handlers.message_handler
|
msg_handler = self.message_handler
|
||||||
data = yield msg_handler.get_room_data(
|
data = yield msg_handler.get_room_data(
|
||||||
user_id=requester.user.to_string(),
|
user_id=requester.user.to_string(),
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
|
|||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(RoomMemberListRestServlet, self).__init__(hs)
|
super(RoomMemberListRestServlet, self).__init__(hs)
|
||||||
self.handlers = hs.get_handlers()
|
self.message_handler = hs.get_message_handler()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_GET(self, request, room_id):
|
def on_GET(self, request, room_id):
|
||||||
# TODO support Pagination stream API (limit/tokens)
|
# TODO support Pagination stream API (limit/tokens)
|
||||||
requester = yield self.auth.get_user_by_req(request)
|
requester = yield self.auth.get_user_by_req(request)
|
||||||
handler = self.handlers.message_handler
|
events = yield self.message_handler.get_state_events(
|
||||||
events = yield handler.get_state_events(
|
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
user_id=requester.user.to_string(),
|
user_id=requester.user.to_string(),
|
||||||
)
|
)
|
||||||
@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
|
|||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(JoinedRoomMemberListRestServlet, self).__init__(hs)
|
super(JoinedRoomMemberListRestServlet, self).__init__(hs)
|
||||||
self.message_handler = hs.get_handlers().message_handler
|
self.message_handler = hs.get_message_handler()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_GET(self, request, room_id):
|
def on_GET(self, request, room_id):
|
||||||
@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
|
|||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(RoomMessageListRestServlet, self).__init__(hs)
|
super(RoomMessageListRestServlet, self).__init__(hs)
|
||||||
self.handlers = hs.get_handlers()
|
self.pagination_handler = hs.get_pagination_handler()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_GET(self, request, room_id):
|
def on_GET(self, request, room_id):
|
||||||
@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
|
|||||||
event_filter = Filter(json.loads(filter_json))
|
event_filter = Filter(json.loads(filter_json))
|
||||||
else:
|
else:
|
||||||
event_filter = None
|
event_filter = None
|
||||||
handler = self.handlers.message_handler
|
msgs = yield self.pagination_handler.get_messages(
|
||||||
msgs = yield handler.get_messages(
|
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
requester=requester,
|
requester=requester,
|
||||||
pagin_config=pagination_config,
|
pagin_config=pagination_config,
|
||||||
@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
|
|||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(RoomStateRestServlet, self).__init__(hs)
|
super(RoomStateRestServlet, self).__init__(hs)
|
||||||
self.handlers = hs.get_handlers()
|
self.message_handler = hs.get_message_handler()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_GET(self, request, room_id):
|
def on_GET(self, request, room_id):
|
||||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||||
handler = self.handlers.message_handler
|
|
||||||
# Get all the current state for this room
|
# Get all the current state for this room
|
||||||
events = yield handler.get_state_events(
|
events = yield self.message_handler.get_state_events(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
user_id=requester.user.to_string(),
|
user_id=requester.user.to_string(),
|
||||||
is_guest=requester.is_guest,
|
is_guest=requester.is_guest,
|
||||||
@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
|
|||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
super(RoomEventContextServlet, self).__init__(hs)
|
super(RoomEventContextServlet, self).__init__(hs)
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.handlers = hs.get_handlers()
|
self.room_context_handler = hs.get_room_context_handler()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def on_GET(self, request, room_id, event_id):
|
def on_GET(self, request, room_id, event_id):
|
||||||
@ -533,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
|
|||||||
|
|
||||||
limit = parse_integer(request, "limit", default=10)
|
limit = parse_integer(request, "limit", default=10)
|
||||||
|
|
||||||
results = yield self.handlers.room_context_handler.get_event_context(
|
results = yield self.room_context_handler.get_event_context(
|
||||||
requester.user,
|
requester.user,
|
||||||
room_id,
|
room_id,
|
||||||
event_id,
|
event_id,
|
||||||
|
@ -52,12 +52,13 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
|
|||||||
from synapse.handlers.events import EventHandler, EventStreamHandler
|
from synapse.handlers.events import EventHandler, EventStreamHandler
|
||||||
from synapse.handlers.groups_local import GroupsLocalHandler
|
from synapse.handlers.groups_local import GroupsLocalHandler
|
||||||
from synapse.handlers.initial_sync import InitialSyncHandler
|
from synapse.handlers.initial_sync import InitialSyncHandler
|
||||||
from synapse.handlers.message import EventCreationHandler
|
from synapse.handlers.message import EventCreationHandler, MessageHandler
|
||||||
|
from synapse.handlers.pagination import PaginationHandler
|
||||||
from synapse.handlers.presence import PresenceHandler
|
from synapse.handlers.presence import PresenceHandler
|
||||||
from synapse.handlers.profile import ProfileHandler
|
from synapse.handlers.profile import ProfileHandler
|
||||||
from synapse.handlers.read_marker import ReadMarkerHandler
|
from synapse.handlers.read_marker import ReadMarkerHandler
|
||||||
from synapse.handlers.receipts import ReceiptsHandler
|
from synapse.handlers.receipts import ReceiptsHandler
|
||||||
from synapse.handlers.room import RoomCreationHandler
|
from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
|
||||||
from synapse.handlers.room_list import RoomListHandler
|
from synapse.handlers.room_list import RoomListHandler
|
||||||
from synapse.handlers.room_member import RoomMemberMasterHandler
|
from synapse.handlers.room_member import RoomMemberMasterHandler
|
||||||
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
|
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
|
||||||
@ -165,6 +166,9 @@ class HomeServer(object):
|
|||||||
'federation_registry',
|
'federation_registry',
|
||||||
'server_notices_manager',
|
'server_notices_manager',
|
||||||
'server_notices_sender',
|
'server_notices_sender',
|
||||||
|
'message_handler',
|
||||||
|
'pagination_handler',
|
||||||
|
'room_context_handler',
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, hostname, reactor=None, **kwargs):
|
def __init__(self, hostname, reactor=None, **kwargs):
|
||||||
@ -431,6 +435,15 @@ class HomeServer(object):
|
|||||||
return WorkerServerNoticesSender(self)
|
return WorkerServerNoticesSender(self)
|
||||||
return ServerNoticesSender(self)
|
return ServerNoticesSender(self)
|
||||||
|
|
||||||
|
def build_message_handler(self):
|
||||||
|
return MessageHandler(self)
|
||||||
|
|
||||||
|
def build_pagination_handler(self):
|
||||||
|
return PaginationHandler(self)
|
||||||
|
|
||||||
|
def build_room_context_handler(self):
|
||||||
|
return RoomContextHandler(self)
|
||||||
|
|
||||||
def remove_pusher(self, app_id, push_key, user_id):
|
def remove_pusher(self, app_id, push_key, user_id):
|
||||||
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ from synapse.types import RoomStreamToken, get_domain_from_id
|
|||||||
from synapse.util.async import ObservableDeferred
|
from synapse.util.async import ObservableDeferred
|
||||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
@ -147,7 +147,8 @@ class _EventPeristenceQueue(object):
|
|||||||
# callbacks on the deferred.
|
# callbacks on the deferred.
|
||||||
try:
|
try:
|
||||||
ret = yield per_item_callback(item)
|
ret = yield per_item_callback(item)
|
||||||
item.deferred.callback(ret)
|
with PreserveLoggingContext():
|
||||||
|
item.deferred.callback(ret)
|
||||||
except Exception:
|
except Exception:
|
||||||
item.deferred.errback()
|
item.deferred.errback()
|
||||||
finally:
|
finally:
|
||||||
@ -417,19 +418,29 @@ class EventsStore(EventsWorkerStore):
|
|||||||
logger.info(
|
logger.info(
|
||||||
"Calculating state delta for room %s", room_id,
|
"Calculating state delta for room %s", room_id,
|
||||||
)
|
)
|
||||||
current_state = yield self._get_new_state_after_events(
|
|
||||||
room_id,
|
with Measure(
|
||||||
ev_ctx_rm,
|
self._clock,
|
||||||
latest_event_ids,
|
"persist_events.get_new_state_after_events",
|
||||||
new_latest_event_ids,
|
):
|
||||||
)
|
current_state = yield self._get_new_state_after_events(
|
||||||
|
room_id,
|
||||||
|
ev_ctx_rm,
|
||||||
|
latest_event_ids,
|
||||||
|
new_latest_event_ids,
|
||||||
|
)
|
||||||
|
|
||||||
if current_state is not None:
|
if current_state is not None:
|
||||||
current_state_for_room[room_id] = current_state
|
current_state_for_room[room_id] = current_state
|
||||||
delta = yield self._calculate_state_delta(
|
with Measure(
|
||||||
room_id, current_state,
|
self._clock,
|
||||||
)
|
"persist_events.calculate_state_delta",
|
||||||
if delta is not None:
|
):
|
||||||
state_delta_for_room[room_id] = delta
|
delta = yield self._calculate_state_delta(
|
||||||
|
room_id, current_state,
|
||||||
|
)
|
||||||
|
if delta is not None:
|
||||||
|
state_delta_for_room[room_id] = delta
|
||||||
|
|
||||||
yield self.runInteraction(
|
yield self.runInteraction(
|
||||||
"persist_events",
|
"persist_events",
|
||||||
@ -549,7 +560,12 @@ class EventsStore(EventsWorkerStore):
|
|||||||
if ctx.state_group in state_groups_map:
|
if ctx.state_group in state_groups_map:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
state_groups_map[ctx.state_group] = yield ctx.get_current_state_ids(self)
|
# We're only interested in pulling out state that has already
|
||||||
|
# been cached in the context. We'll pull stuff out of the DB later
|
||||||
|
# if necessary.
|
||||||
|
current_state_ids = ctx.get_cached_current_state_ids()
|
||||||
|
if current_state_ids is not None:
|
||||||
|
state_groups_map[ctx.state_group] = current_state_ids
|
||||||
|
|
||||||
# We need to map the event_ids to their state groups. First, let's
|
# We need to map the event_ids to their state groups. First, let's
|
||||||
# check if the event is one we're persisting, in which case we can
|
# check if the event is one we're persisting, in which case we can
|
||||||
|
@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
|
|||||||
)
|
)
|
||||||
|
|
||||||
if newly_inserted:
|
if newly_inserted:
|
||||||
self.runInteraction(
|
yield self.runInteraction(
|
||||||
"add_pusher",
|
"add_pusher",
|
||||||
self._invalidate_cache_and_stream,
|
self._invalidate_cache_and_stream,
|
||||||
self.get_if_user_has_pusher, (user_id,)
|
self.get_if_user_has_pusher, (user_id,)
|
||||||
|
Loading…
Reference in New Issue
Block a user