mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-18 16:37:08 -05:00
Move PaginationHandler to its own file
This commit is contained in:
parent
0ecf68aedc
commit
5c88bb722f
@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
|
|||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from twisted.internet.defer import succeed
|
from twisted.internet.defer import succeed
|
||||||
from twisted.python.failure import Failure
|
|
||||||
|
|
||||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
||||||
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
|
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
|
||||||
@ -32,49 +31,17 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
|
|||||||
from synapse.events.utils import serialize_event
|
from synapse.events.utils import serialize_event
|
||||||
from synapse.events.validator import EventValidator
|
from synapse.events.validator import EventValidator
|
||||||
from synapse.replication.http.send_event import send_event_to_master
|
from synapse.replication.http.send_event import send_event_to_master
|
||||||
from synapse.types import RoomAlias, RoomStreamToken, UserID
|
from synapse.types import RoomAlias, UserID
|
||||||
from synapse.util.async import Limiter, ReadWriteLock
|
from synapse.util.async import Limiter
|
||||||
from synapse.util.frozenutils import frozendict_json_encoder
|
from synapse.util.frozenutils import frozendict_json_encoder
|
||||||
from synapse.util.logcontext import run_in_background
|
from synapse.util.logcontext import run_in_background
|
||||||
from synapse.util.metrics import measure_func
|
from synapse.util.metrics import measure_func
|
||||||
from synapse.util.stringutils import random_string
|
|
||||||
from synapse.visibility import filter_events_for_client
|
|
||||||
|
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class PurgeStatus(object):
|
|
||||||
"""Object tracking the status of a purge request
|
|
||||||
|
|
||||||
This class contains information on the progress of a purge request, for
|
|
||||||
return by get_purge_status.
|
|
||||||
|
|
||||||
Attributes:
|
|
||||||
status (int): Tracks whether this request has completed. One of
|
|
||||||
STATUS_{ACTIVE,COMPLETE,FAILED}
|
|
||||||
"""
|
|
||||||
|
|
||||||
STATUS_ACTIVE = 0
|
|
||||||
STATUS_COMPLETE = 1
|
|
||||||
STATUS_FAILED = 2
|
|
||||||
|
|
||||||
STATUS_TEXT = {
|
|
||||||
STATUS_ACTIVE: "active",
|
|
||||||
STATUS_COMPLETE: "complete",
|
|
||||||
STATUS_FAILED: "failed",
|
|
||||||
}
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.status = PurgeStatus.STATUS_ACTIVE
|
|
||||||
|
|
||||||
def asdict(self):
|
|
||||||
return {
|
|
||||||
"status": PurgeStatus.STATUS_TEXT[self.status]
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class MessageHandler(object):
|
class MessageHandler(object):
|
||||||
"""Contains some read only APIs to get state about a room
|
"""Contains some read only APIs to get state about a room
|
||||||
"""
|
"""
|
||||||
@ -189,211 +156,6 @@ class MessageHandler(object):
|
|||||||
})
|
})
|
||||||
|
|
||||||
|
|
||||||
class PaginationHandler(object):
|
|
||||||
"""Handles pagination and purge history requests.
|
|
||||||
|
|
||||||
These are in the same handler due to the fact we need to block clients
|
|
||||||
paginating during a purge.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, hs):
|
|
||||||
self.hs = hs
|
|
||||||
self.auth = hs.get_auth()
|
|
||||||
self.store = hs.get_datastore()
|
|
||||||
self.clock = hs.get_clock()
|
|
||||||
|
|
||||||
self.pagination_lock = ReadWriteLock()
|
|
||||||
self._purges_in_progress_by_room = set()
|
|
||||||
# map from purge id to PurgeStatus
|
|
||||||
self._purges_by_id = {}
|
|
||||||
|
|
||||||
def start_purge_history(self, room_id, token,
|
|
||||||
delete_local_events=False):
|
|
||||||
"""Start off a history purge on a room.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
room_id (str): The room to purge from
|
|
||||||
|
|
||||||
token (str): topological token to delete events before
|
|
||||||
delete_local_events (bool): True to delete local events as well as
|
|
||||||
remote ones
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: unique ID for this purge transaction.
|
|
||||||
"""
|
|
||||||
if room_id in self._purges_in_progress_by_room:
|
|
||||||
raise SynapseError(
|
|
||||||
400,
|
|
||||||
"History purge already in progress for %s" % (room_id, ),
|
|
||||||
)
|
|
||||||
|
|
||||||
purge_id = random_string(16)
|
|
||||||
|
|
||||||
# we log the purge_id here so that it can be tied back to the
|
|
||||||
# request id in the log lines.
|
|
||||||
logger.info("[purge] starting purge_id %s", purge_id)
|
|
||||||
|
|
||||||
self._purges_by_id[purge_id] = PurgeStatus()
|
|
||||||
run_in_background(
|
|
||||||
self._purge_history,
|
|
||||||
purge_id, room_id, token, delete_local_events,
|
|
||||||
)
|
|
||||||
return purge_id
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def _purge_history(self, purge_id, room_id, token,
|
|
||||||
delete_local_events):
|
|
||||||
"""Carry out a history purge on a room.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
purge_id (str): The id for this purge
|
|
||||||
room_id (str): The room to purge from
|
|
||||||
token (str): topological token to delete events before
|
|
||||||
delete_local_events (bool): True to delete local events as well as
|
|
||||||
remote ones
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Deferred
|
|
||||||
"""
|
|
||||||
self._purges_in_progress_by_room.add(room_id)
|
|
||||||
try:
|
|
||||||
with (yield self.pagination_lock.write(room_id)):
|
|
||||||
yield self.store.purge_history(
|
|
||||||
room_id, token, delete_local_events,
|
|
||||||
)
|
|
||||||
logger.info("[purge] complete")
|
|
||||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
|
||||||
except Exception:
|
|
||||||
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
|
|
||||||
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
|
||||||
finally:
|
|
||||||
self._purges_in_progress_by_room.discard(room_id)
|
|
||||||
|
|
||||||
# remove the purge from the list 24 hours after it completes
|
|
||||||
def clear_purge():
|
|
||||||
del self._purges_by_id[purge_id]
|
|
||||||
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
|
|
||||||
|
|
||||||
def get_purge_status(self, purge_id):
|
|
||||||
"""Get the current status of an active purge
|
|
||||||
|
|
||||||
Args:
|
|
||||||
purge_id (str): purge_id returned by start_purge_history
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
PurgeStatus|None
|
|
||||||
"""
|
|
||||||
return self._purges_by_id.get(purge_id)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_messages(self, requester, room_id=None, pagin_config=None,
|
|
||||||
as_client_event=True, event_filter=None):
|
|
||||||
"""Get messages in a room.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
requester (Requester): The user requesting messages.
|
|
||||||
room_id (str): The room they want messages from.
|
|
||||||
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
|
||||||
config rules to apply, if any.
|
|
||||||
as_client_event (bool): True to get events in client-server format.
|
|
||||||
event_filter (Filter): Filter to apply to results or None
|
|
||||||
Returns:
|
|
||||||
dict: Pagination API results
|
|
||||||
"""
|
|
||||||
user_id = requester.user.to_string()
|
|
||||||
|
|
||||||
if pagin_config.from_token:
|
|
||||||
room_token = pagin_config.from_token.room_key
|
|
||||||
else:
|
|
||||||
pagin_config.from_token = (
|
|
||||||
yield self.hs.get_event_sources().get_current_token_for_room(
|
|
||||||
room_id=room_id
|
|
||||||
)
|
|
||||||
)
|
|
||||||
room_token = pagin_config.from_token.room_key
|
|
||||||
|
|
||||||
room_token = RoomStreamToken.parse(room_token)
|
|
||||||
|
|
||||||
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
|
|
||||||
"room_key", str(room_token)
|
|
||||||
)
|
|
||||||
|
|
||||||
source_config = pagin_config.get_source_config("room")
|
|
||||||
|
|
||||||
with (yield self.pagination_lock.read(room_id)):
|
|
||||||
membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
|
|
||||||
room_id, user_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if source_config.direction == 'b':
|
|
||||||
# if we're going backwards, we might need to backfill. This
|
|
||||||
# requires that we have a topo token.
|
|
||||||
if room_token.topological:
|
|
||||||
max_topo = room_token.topological
|
|
||||||
else:
|
|
||||||
max_topo = yield self.store.get_max_topological_token(
|
|
||||||
room_id, room_token.stream
|
|
||||||
)
|
|
||||||
|
|
||||||
if membership == Membership.LEAVE:
|
|
||||||
# If they have left the room then clamp the token to be before
|
|
||||||
# they left the room, to save the effort of loading from the
|
|
||||||
# database.
|
|
||||||
leave_token = yield self.store.get_topological_token_for_event(
|
|
||||||
member_event_id
|
|
||||||
)
|
|
||||||
leave_token = RoomStreamToken.parse(leave_token)
|
|
||||||
if leave_token.topological < max_topo:
|
|
||||||
source_config.from_key = str(leave_token)
|
|
||||||
|
|
||||||
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
|
||||||
room_id, max_topo
|
|
||||||
)
|
|
||||||
|
|
||||||
events, next_key = yield self.store.paginate_room_events(
|
|
||||||
room_id=room_id,
|
|
||||||
from_key=source_config.from_key,
|
|
||||||
to_key=source_config.to_key,
|
|
||||||
direction=source_config.direction,
|
|
||||||
limit=source_config.limit,
|
|
||||||
event_filter=event_filter,
|
|
||||||
)
|
|
||||||
|
|
||||||
next_token = pagin_config.from_token.copy_and_replace(
|
|
||||||
"room_key", next_key
|
|
||||||
)
|
|
||||||
|
|
||||||
if not events:
|
|
||||||
defer.returnValue({
|
|
||||||
"chunk": [],
|
|
||||||
"start": pagin_config.from_token.to_string(),
|
|
||||||
"end": next_token.to_string(),
|
|
||||||
})
|
|
||||||
|
|
||||||
if event_filter:
|
|
||||||
events = event_filter.filter(events)
|
|
||||||
|
|
||||||
events = yield filter_events_for_client(
|
|
||||||
self.store,
|
|
||||||
user_id,
|
|
||||||
events,
|
|
||||||
is_peeking=(member_event_id is None),
|
|
||||||
)
|
|
||||||
|
|
||||||
time_now = self.clock.time_msec()
|
|
||||||
|
|
||||||
chunk = {
|
|
||||||
"chunk": [
|
|
||||||
serialize_event(e, time_now, as_client_event)
|
|
||||||
for e in events
|
|
||||||
],
|
|
||||||
"start": pagin_config.from_token.to_string(),
|
|
||||||
"end": next_token.to_string(),
|
|
||||||
}
|
|
||||||
|
|
||||||
defer.returnValue(chunk)
|
|
||||||
|
|
||||||
|
|
||||||
class EventCreationHandler(object):
|
class EventCreationHandler(object):
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.hs = hs
|
self.hs = hs
|
||||||
|
265
synapse/handlers/pagination.py
Normal file
265
synapse/handlers/pagination.py
Normal file
@ -0,0 +1,265 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2014 - 2016 OpenMarket Ltd
|
||||||
|
# Copyright 2017 - 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
|
from synapse.api.constants import Membership
|
||||||
|
from synapse.api.errors import SynapseError
|
||||||
|
from synapse.events.utils import serialize_event
|
||||||
|
from synapse.types import RoomStreamToken
|
||||||
|
from synapse.util.async import ReadWriteLock
|
||||||
|
from synapse.util.logcontext import run_in_background
|
||||||
|
from synapse.util.stringutils import random_string
|
||||||
|
from synapse.visibility import filter_events_for_client
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PurgeStatus(object):
|
||||||
|
"""Object tracking the status of a purge request
|
||||||
|
|
||||||
|
This class contains information on the progress of a purge request, for
|
||||||
|
return by get_purge_status.
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
status (int): Tracks whether this request has completed. One of
|
||||||
|
STATUS_{ACTIVE,COMPLETE,FAILED}
|
||||||
|
"""
|
||||||
|
|
||||||
|
STATUS_ACTIVE = 0
|
||||||
|
STATUS_COMPLETE = 1
|
||||||
|
STATUS_FAILED = 2
|
||||||
|
|
||||||
|
STATUS_TEXT = {
|
||||||
|
STATUS_ACTIVE: "active",
|
||||||
|
STATUS_COMPLETE: "complete",
|
||||||
|
STATUS_FAILED: "failed",
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.status = PurgeStatus.STATUS_ACTIVE
|
||||||
|
|
||||||
|
def asdict(self):
|
||||||
|
return {
|
||||||
|
"status": PurgeStatus.STATUS_TEXT[self.status]
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class PaginationHandler(object):
|
||||||
|
"""Handles pagination and purge history requests.
|
||||||
|
|
||||||
|
These are in the same handler due to the fact we need to block clients
|
||||||
|
paginating during a purge.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, hs):
|
||||||
|
self.hs = hs
|
||||||
|
self.auth = hs.get_auth()
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
|
self.pagination_lock = ReadWriteLock()
|
||||||
|
self._purges_in_progress_by_room = set()
|
||||||
|
# map from purge id to PurgeStatus
|
||||||
|
self._purges_by_id = {}
|
||||||
|
|
||||||
|
def start_purge_history(self, room_id, token,
|
||||||
|
delete_local_events=False):
|
||||||
|
"""Start off a history purge on a room.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str): The room to purge from
|
||||||
|
|
||||||
|
token (str): topological token to delete events before
|
||||||
|
delete_local_events (bool): True to delete local events as well as
|
||||||
|
remote ones
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: unique ID for this purge transaction.
|
||||||
|
"""
|
||||||
|
if room_id in self._purges_in_progress_by_room:
|
||||||
|
raise SynapseError(
|
||||||
|
400,
|
||||||
|
"History purge already in progress for %s" % (room_id, ),
|
||||||
|
)
|
||||||
|
|
||||||
|
purge_id = random_string(16)
|
||||||
|
|
||||||
|
# we log the purge_id here so that it can be tied back to the
|
||||||
|
# request id in the log lines.
|
||||||
|
logger.info("[purge] starting purge_id %s", purge_id)
|
||||||
|
|
||||||
|
self._purges_by_id[purge_id] = PurgeStatus()
|
||||||
|
run_in_background(
|
||||||
|
self._purge_history,
|
||||||
|
purge_id, room_id, token, delete_local_events,
|
||||||
|
)
|
||||||
|
return purge_id
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _purge_history(self, purge_id, room_id, token,
|
||||||
|
delete_local_events):
|
||||||
|
"""Carry out a history purge on a room.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
purge_id (str): The id for this purge
|
||||||
|
room_id (str): The room to purge from
|
||||||
|
token (str): topological token to delete events before
|
||||||
|
delete_local_events (bool): True to delete local events as well as
|
||||||
|
remote ones
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred
|
||||||
|
"""
|
||||||
|
self._purges_in_progress_by_room.add(room_id)
|
||||||
|
try:
|
||||||
|
with (yield self.pagination_lock.write(room_id)):
|
||||||
|
yield self.store.purge_history(
|
||||||
|
room_id, token, delete_local_events,
|
||||||
|
)
|
||||||
|
logger.info("[purge] complete")
|
||||||
|
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
|
||||||
|
except Exception:
|
||||||
|
logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
|
||||||
|
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
|
||||||
|
finally:
|
||||||
|
self._purges_in_progress_by_room.discard(room_id)
|
||||||
|
|
||||||
|
# remove the purge from the list 24 hours after it completes
|
||||||
|
def clear_purge():
|
||||||
|
del self._purges_by_id[purge_id]
|
||||||
|
self.hs.get_reactor().callLater(24 * 3600, clear_purge)
|
||||||
|
|
||||||
|
def get_purge_status(self, purge_id):
|
||||||
|
"""Get the current status of an active purge
|
||||||
|
|
||||||
|
Args:
|
||||||
|
purge_id (str): purge_id returned by start_purge_history
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
PurgeStatus|None
|
||||||
|
"""
|
||||||
|
return self._purges_by_id.get(purge_id)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_messages(self, requester, room_id=None, pagin_config=None,
|
||||||
|
as_client_event=True, event_filter=None):
|
||||||
|
"""Get messages in a room.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
requester (Requester): The user requesting messages.
|
||||||
|
room_id (str): The room they want messages from.
|
||||||
|
pagin_config (synapse.api.streams.PaginationConfig): The pagination
|
||||||
|
config rules to apply, if any.
|
||||||
|
as_client_event (bool): True to get events in client-server format.
|
||||||
|
event_filter (Filter): Filter to apply to results or None
|
||||||
|
Returns:
|
||||||
|
dict: Pagination API results
|
||||||
|
"""
|
||||||
|
user_id = requester.user.to_string()
|
||||||
|
|
||||||
|
if pagin_config.from_token:
|
||||||
|
room_token = pagin_config.from_token.room_key
|
||||||
|
else:
|
||||||
|
pagin_config.from_token = (
|
||||||
|
yield self.hs.get_event_sources().get_current_token_for_room(
|
||||||
|
room_id=room_id
|
||||||
|
)
|
||||||
|
)
|
||||||
|
room_token = pagin_config.from_token.room_key
|
||||||
|
|
||||||
|
room_token = RoomStreamToken.parse(room_token)
|
||||||
|
|
||||||
|
pagin_config.from_token = pagin_config.from_token.copy_and_replace(
|
||||||
|
"room_key", str(room_token)
|
||||||
|
)
|
||||||
|
|
||||||
|
source_config = pagin_config.get_source_config("room")
|
||||||
|
|
||||||
|
with (yield self.pagination_lock.read(room_id)):
|
||||||
|
membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
|
||||||
|
room_id, user_id
|
||||||
|
)
|
||||||
|
|
||||||
|
if source_config.direction == 'b':
|
||||||
|
# if we're going backwards, we might need to backfill. This
|
||||||
|
# requires that we have a topo token.
|
||||||
|
if room_token.topological:
|
||||||
|
max_topo = room_token.topological
|
||||||
|
else:
|
||||||
|
max_topo = yield self.store.get_max_topological_token(
|
||||||
|
room_id, room_token.stream
|
||||||
|
)
|
||||||
|
|
||||||
|
if membership == Membership.LEAVE:
|
||||||
|
# If they have left the room then clamp the token to be before
|
||||||
|
# they left the room, to save the effort of loading from the
|
||||||
|
# database.
|
||||||
|
leave_token = yield self.store.get_topological_token_for_event(
|
||||||
|
member_event_id
|
||||||
|
)
|
||||||
|
leave_token = RoomStreamToken.parse(leave_token)
|
||||||
|
if leave_token.topological < max_topo:
|
||||||
|
source_config.from_key = str(leave_token)
|
||||||
|
|
||||||
|
yield self.hs.get_handlers().federation_handler.maybe_backfill(
|
||||||
|
room_id, max_topo
|
||||||
|
)
|
||||||
|
|
||||||
|
events, next_key = yield self.store.paginate_room_events(
|
||||||
|
room_id=room_id,
|
||||||
|
from_key=source_config.from_key,
|
||||||
|
to_key=source_config.to_key,
|
||||||
|
direction=source_config.direction,
|
||||||
|
limit=source_config.limit,
|
||||||
|
event_filter=event_filter,
|
||||||
|
)
|
||||||
|
|
||||||
|
next_token = pagin_config.from_token.copy_and_replace(
|
||||||
|
"room_key", next_key
|
||||||
|
)
|
||||||
|
|
||||||
|
if not events:
|
||||||
|
defer.returnValue({
|
||||||
|
"chunk": [],
|
||||||
|
"start": pagin_config.from_token.to_string(),
|
||||||
|
"end": next_token.to_string(),
|
||||||
|
})
|
||||||
|
|
||||||
|
if event_filter:
|
||||||
|
events = event_filter.filter(events)
|
||||||
|
|
||||||
|
events = yield filter_events_for_client(
|
||||||
|
self.store,
|
||||||
|
user_id,
|
||||||
|
events,
|
||||||
|
is_peeking=(member_event_id is None),
|
||||||
|
)
|
||||||
|
|
||||||
|
time_now = self.clock.time_msec()
|
||||||
|
|
||||||
|
chunk = {
|
||||||
|
"chunk": [
|
||||||
|
serialize_event(e, time_now, as_client_event)
|
||||||
|
for e in events
|
||||||
|
],
|
||||||
|
"start": pagin_config.from_token.to_string(),
|
||||||
|
"end": next_token.to_string(),
|
||||||
|
}
|
||||||
|
|
||||||
|
defer.returnValue(chunk)
|
@ -52,11 +52,8 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
|
|||||||
from synapse.handlers.events import EventHandler, EventStreamHandler
|
from synapse.handlers.events import EventHandler, EventStreamHandler
|
||||||
from synapse.handlers.groups_local import GroupsLocalHandler
|
from synapse.handlers.groups_local import GroupsLocalHandler
|
||||||
from synapse.handlers.initial_sync import InitialSyncHandler
|
from synapse.handlers.initial_sync import InitialSyncHandler
|
||||||
from synapse.handlers.message import (
|
from synapse.handlers.message import EventCreationHandler, MessageHandler
|
||||||
EventCreationHandler,
|
from synapse.handlers.pagination import PaginationHandler
|
||||||
MessageHandler,
|
|
||||||
PaginationHandler,
|
|
||||||
)
|
|
||||||
from synapse.handlers.presence import PresenceHandler
|
from synapse.handlers.presence import PresenceHandler
|
||||||
from synapse.handlers.profile import ProfileHandler
|
from synapse.handlers.profile import ProfileHandler
|
||||||
from synapse.handlers.read_marker import ReadMarkerHandler
|
from synapse.handlers.read_marker import ReadMarkerHandler
|
||||||
|
Loading…
Reference in New Issue
Block a user