Merge pull request #5183 from matrix-org/erikj/async_serialize_event

Allow client event serialization to be async
This commit is contained in:
Erik Johnston 2019-05-15 10:36:30 +01:00 committed by GitHub
commit 0aba6c8251
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 194 additions and 101 deletions

1
changelog.d/5183.misc Normal file
View File

@ -0,0 +1 @@
Allow client event serialization to be async.

View File

@ -19,7 +19,10 @@ from six import string_types
from frozendict import frozendict from frozendict import frozendict
from twisted.internet import defer
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util.async_helpers import yieldable_gather_results
from . import EventBase from . import EventBase
@ -311,3 +314,44 @@ def serialize_event(e, time_now_ms, as_client_event=True,
d = only_fields(d, only_event_fields) d = only_fields(d, only_event_fields)
return d return d
class EventClientSerializer(object):
"""Serializes events that are to be sent to clients.
This is used for bundling extra information with any events to be sent to
clients.
"""
def __init__(self, hs):
pass
def serialize_event(self, event, time_now, **kwargs):
"""Serializes a single event.
Args:
event (EventBase)
time_now (int): The current time in milliseconds
**kwargs: Arguments to pass to `serialize_event`
Returns:
Deferred[dict]: The serialized event
"""
event = serialize_event(event, time_now, **kwargs)
return defer.succeed(event)
def serialize_events(self, events, time_now, **kwargs):
"""Serializes multiple events.
Args:
event (iter[EventBase])
time_now (int): The current time in milliseconds
**kwargs: Arguments to pass to `serialize_event`
Returns:
Deferred[list[dict]]: The list of serialized events
"""
return yieldable_gather_results(
self.serialize_event, events,
time_now=time_now, **kwargs
)

View File

@ -21,7 +21,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, SynapseError from synapse.api.errors import AuthError, SynapseError
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.types import UserID from synapse.types import UserID
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
@ -50,6 +49,7 @@ class EventStreamHandler(BaseHandler):
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self._server_notices_sender = hs.get_server_notices_sender() self._server_notices_sender = hs.get_server_notices_sender()
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
@ -120,9 +120,9 @@ class EventStreamHandler(BaseHandler):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
chunks = [ chunks = yield self._event_serializer.serialize_events(
serialize_event(e, time_now, as_client_event) for e in events events, time_now, as_client_event=as_client_event,
] )
chunk = { chunk = {
"chunk": chunks, "chunk": chunks,

View File

@ -19,7 +19,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.handlers.presence import format_user_presence_state from synapse.handlers.presence import format_user_presence_state
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
@ -43,6 +42,7 @@ class InitialSyncHandler(BaseHandler):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.validator = EventValidator() self.validator = EventValidator()
self.snapshot_cache = SnapshotCache() self.snapshot_cache = SnapshotCache()
self._event_serializer = hs.get_event_client_serializer()
def snapshot_all_rooms(self, user_id=None, pagin_config=None, def snapshot_all_rooms(self, user_id=None, pagin_config=None,
as_client_event=True, include_archived=False): as_client_event=True, include_archived=False):
@ -138,7 +138,9 @@ class InitialSyncHandler(BaseHandler):
d["inviter"] = event.sender d["inviter"] = event.sender
invite_event = yield self.store.get_event(event.event_id) invite_event = yield self.store.get_event(event.event_id)
d["invite"] = serialize_event(invite_event, time_now, as_client_event) d["invite"] = yield self._event_serializer.serialize_event(
invite_event, time_now, as_client_event,
)
rooms_ret.append(d) rooms_ret.append(d)
@ -185,18 +187,21 @@ class InitialSyncHandler(BaseHandler):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
d["messages"] = { d["messages"] = {
"chunk": [ "chunk": (
serialize_event(m, time_now, as_client_event) yield self._event_serializer.serialize_events(
for m in messages messages, time_now=time_now,
], as_client_event=as_client_event,
)
),
"start": start_token.to_string(), "start": start_token.to_string(),
"end": end_token.to_string(), "end": end_token.to_string(),
} }
d["state"] = [ d["state"] = yield self._event_serializer.serialize_events(
serialize_event(c, time_now, as_client_event) current_state.values(),
for c in current_state.values() time_now=time_now,
] as_client_event=as_client_event
)
account_data_events = [] account_data_events = []
tags = tags_by_room.get(event.room_id) tags = tags_by_room.get(event.room_id)
@ -337,11 +342,15 @@ class InitialSyncHandler(BaseHandler):
"membership": membership, "membership": membership,
"room_id": room_id, "room_id": room_id,
"messages": { "messages": {
"chunk": [serialize_event(m, time_now) for m in messages], "chunk": (yield self._event_serializer.serialize_events(
messages, time_now,
)),
"start": start_token.to_string(), "start": start_token.to_string(),
"end": end_token.to_string(), "end": end_token.to_string(),
}, },
"state": [serialize_event(s, time_now) for s in room_state.values()], "state": (yield self._event_serializer.serialize_events(
room_state.values(), time_now,
)),
"presence": [], "presence": [],
"receipts": [], "receipts": [],
}) })
@ -355,10 +364,9 @@ class InitialSyncHandler(BaseHandler):
# TODO: These concurrently # TODO: These concurrently
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
state = [ state = yield self._event_serializer.serialize_events(
serialize_event(x, time_now) current_state.values(), time_now,
for x in current_state.values() )
]
now_token = yield self.hs.get_event_sources().get_current_token() now_token = yield self.hs.get_event_sources().get_current_token()
@ -425,7 +433,9 @@ class InitialSyncHandler(BaseHandler):
ret = { ret = {
"room_id": room_id, "room_id": room_id,
"messages": { "messages": {
"chunk": [serialize_event(m, time_now) for m in messages], "chunk": (yield self._event_serializer.serialize_events(
messages, time_now,
)),
"start": start_token.to_string(), "start": start_token.to_string(),
"end": end_token.to_string(), "end": end_token.to_string(),
}, },

View File

@ -32,7 +32,6 @@ from synapse.api.errors import (
) )
from synapse.api.room_versions import RoomVersions from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder from synapse.api.urls import ConsentURIBuilder
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
@ -57,6 +56,7 @@ class MessageHandler(object):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.state = hs.get_state_handler() self.state = hs.get_state_handler()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None, def get_room_data(self, user_id=None, room_id=None,
@ -164,9 +164,10 @@ class MessageHandler(object):
room_state = room_state[membership_event_id] room_state = room_state[membership_event_id]
now = self.clock.time_msec() now = self.clock.time_msec()
defer.returnValue( events = yield self._event_serializer.serialize_events(
[serialize_event(c, now) for c in room_state.values()] room_state.values(), now,
) )
defer.returnValue(events)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_joined_members(self, requester, room_id): def get_joined_members(self, requester, room_id):

View File

@ -20,7 +20,6 @@ from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.async_helpers import ReadWriteLock from synapse.util.async_helpers import ReadWriteLock
@ -78,6 +77,7 @@ class PaginationHandler(object):
self._purges_in_progress_by_room = set() self._purges_in_progress_by_room = set()
# map from purge id to PurgeStatus # map from purge id to PurgeStatus
self._purges_by_id = {} self._purges_by_id = {}
self._event_serializer = hs.get_event_client_serializer()
def start_purge_history(self, room_id, token, def start_purge_history(self, room_id, token,
delete_local_events=False): delete_local_events=False):
@ -278,18 +278,22 @@ class PaginationHandler(object):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
chunk = { chunk = {
"chunk": [ "chunk": (
serialize_event(e, time_now, as_client_event) yield self._event_serializer.serialize_events(
for e in events events, time_now,
], as_client_event=as_client_event,
)
),
"start": pagin_config.from_token.to_string(), "start": pagin_config.from_token.to_string(),
"end": next_token.to_string(), "end": next_token.to_string(),
} }
if state: if state:
chunk["state"] = [ chunk["state"] = (
serialize_event(e, time_now, as_client_event) yield self._event_serializer.serialize_events(
for e in state state, time_now,
] as_client_event=as_client_event,
)
)
defer.returnValue(chunk) defer.returnValue(chunk)

View File

@ -23,7 +23,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from synapse.events.utils import serialize_event
from synapse.storage.state import StateFilter from synapse.storage.state import StateFilter
from synapse.visibility import filter_events_for_client from synapse.visibility import filter_events_for_client
@ -36,6 +35,7 @@ class SearchHandler(BaseHandler):
def __init__(self, hs): def __init__(self, hs):
super(SearchHandler, self).__init__(hs) super(SearchHandler, self).__init__(hs)
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks @defer.inlineCallbacks
def get_old_rooms_from_upgraded_room(self, room_id): def get_old_rooms_from_upgraded_room(self, room_id):
@ -401,14 +401,16 @@ class SearchHandler(BaseHandler):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
for context in contexts.values(): for context in contexts.values():
context["events_before"] = [ context["events_before"] = (
serialize_event(e, time_now) yield self._event_serializer.serialize_events(
for e in context["events_before"] context["events_before"], time_now,
] )
context["events_after"] = [ )
serialize_event(e, time_now) context["events_after"] = (
for e in context["events_after"] yield self._event_serializer.serialize_events(
] context["events_after"], time_now,
)
)
state_results = {} state_results = {}
if include_state: if include_state:
@ -422,14 +424,13 @@ class SearchHandler(BaseHandler):
# We're now about to serialize the events. We should not make any # We're now about to serialize the events. We should not make any
# blocking calls after this. Otherwise the 'age' will be wrong # blocking calls after this. Otherwise the 'age' will be wrong
results = [ results = []
{ for e in allowed_events:
results.append({
"rank": rank_map[e.event_id], "rank": rank_map[e.event_id],
"result": serialize_event(e, time_now), "result": (yield self._event_serializer.serialize_event(e, time_now)),
"context": contexts.get(e.event_id, {}), "context": contexts.get(e.event_id, {}),
} })
for e in allowed_events
]
rooms_cat_res = { rooms_cat_res = {
"results": results, "results": results,
@ -438,10 +439,13 @@ class SearchHandler(BaseHandler):
} }
if state_results: if state_results:
rooms_cat_res["state"] = { s = {}
room_id: [serialize_event(e, time_now) for e in state] for room_id, state in state_results.items():
for room_id, state in state_results.items() s[room_id] = yield self._event_serializer.serialize_events(
} state, time_now,
)
rooms_cat_res["state"] = s
if room_groups and "room_id" in group_keys: if room_groups and "room_id" in group_keys:
rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups rooms_cat_res.setdefault("groups", {})["room_id"] = room_groups

View File

@ -19,7 +19,6 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.events.utils import serialize_event
from synapse.streams.config import PaginationConfig from synapse.streams.config import PaginationConfig
from .base import ClientV1RestServlet, client_path_patterns from .base import ClientV1RestServlet, client_path_patterns
@ -84,6 +83,7 @@ class EventRestServlet(ClientV1RestServlet):
super(EventRestServlet, self).__init__(hs) super(EventRestServlet, self).__init__(hs)
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler() self.event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, event_id): def on_GET(self, request, event_id):
@ -92,7 +92,8 @@ class EventRestServlet(ClientV1RestServlet):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
if event: if event:
defer.returnValue((200, serialize_event(event, time_now))) event = yield self._event_serializer.serialize_event(event, time_now)
defer.returnValue((200, event))
else: else:
defer.returnValue((404, "Event not found.")) defer.returnValue((404, "Event not found."))

View File

@ -26,7 +26,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.api.filtering import Filter from synapse.api.filtering import Filter
from synapse.events.utils import format_event_for_client_v2, serialize_event from synapse.events.utils import format_event_for_client_v2
from synapse.http.servlet import ( from synapse.http.servlet import (
assert_params_in_dict, assert_params_in_dict,
parse_integer, parse_integer,
@ -537,6 +537,7 @@ class RoomEventServlet(ClientV1RestServlet):
super(RoomEventServlet, self).__init__(hs) super(RoomEventServlet, self).__init__(hs)
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.event_handler = hs.get_event_handler() self.event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id, event_id): def on_GET(self, request, room_id, event_id):
@ -545,7 +546,8 @@ class RoomEventServlet(ClientV1RestServlet):
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
if event: if event:
defer.returnValue((200, serialize_event(event, time_now))) event = yield self._event_serializer.serialize_event(event, time_now)
defer.returnValue((200, event))
else: else:
defer.returnValue((404, "Event not found.")) defer.returnValue((404, "Event not found."))
@ -559,6 +561,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
super(RoomEventContextServlet, self).__init__(hs) super(RoomEventContextServlet, self).__init__(hs)
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.room_context_handler = hs.get_room_context_handler() self.room_context_handler = hs.get_room_context_handler()
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request, room_id, event_id): def on_GET(self, request, room_id, event_id):
@ -588,16 +591,18 @@ class RoomEventContextServlet(ClientV1RestServlet):
) )
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
results["events_before"] = [ results["events_before"] = yield self._event_serializer.serialize_events(
serialize_event(event, time_now) for event in results["events_before"] results["events_before"], time_now,
] )
results["event"] = serialize_event(results["event"], time_now) results["event"] = yield self._event_serializer.serialize_event(
results["events_after"] = [ results["event"], time_now,
serialize_event(event, time_now) for event in results["events_after"] )
] results["events_after"] = yield self._event_serializer.serialize_events(
results["state"] = [ results["events_after"], time_now,
serialize_event(event, time_now) for event in results["state"] )
] results["state"] = yield self._event_serializer.serialize_events(
results["state"], time_now,
)
defer.returnValue((200, results)) defer.returnValue((200, results))

View File

@ -17,10 +17,7 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.events.utils import ( from synapse.events.utils import format_event_for_client_v2_without_room_id
format_event_for_client_v2_without_room_id,
serialize_event,
)
from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.servlet import RestServlet, parse_integer, parse_string
from ._base import client_v2_patterns from ._base import client_v2_patterns
@ -36,6 +33,7 @@ class NotificationsServlet(RestServlet):
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):
@ -69,11 +67,11 @@ class NotificationsServlet(RestServlet):
"profile_tag": pa["profile_tag"], "profile_tag": pa["profile_tag"],
"actions": pa["actions"], "actions": pa["actions"],
"ts": pa["received_ts"], "ts": pa["received_ts"],
"event": serialize_event( "event": (yield self._event_serializer.serialize_event(
notif_events[pa["event_id"]], notif_events[pa["event_id"]],
self.clock.time_msec(), self.clock.time_msec(),
event_format=format_event_for_client_v2_without_room_id, event_format=format_event_for_client_v2_without_room_id,
), )),
} }
if pa["room_id"] not in receipts_by_room: if pa["room_id"] not in receipts_by_room:

View File

@ -26,7 +26,6 @@ from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
from synapse.events.utils import ( from synapse.events.utils import (
format_event_for_client_v2_without_room_id, format_event_for_client_v2_without_room_id,
format_event_raw, format_event_raw,
serialize_event,
) )
from synapse.handlers.presence import format_user_presence_state from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.sync import SyncConfig from synapse.handlers.sync import SyncConfig
@ -86,6 +85,7 @@ class SyncRestServlet(RestServlet):
self.filtering = hs.get_filtering() self.filtering = hs.get_filtering()
self.presence_handler = hs.get_presence_handler() self.presence_handler = hs.get_presence_handler()
self._server_notices_sender = hs.get_server_notices_sender() self._server_notices_sender = hs.get_server_notices_sender()
self._event_serializer = hs.get_event_client_serializer()
@defer.inlineCallbacks @defer.inlineCallbacks
def on_GET(self, request): def on_GET(self, request):
@ -168,14 +168,14 @@ class SyncRestServlet(RestServlet):
) )
time_now = self.clock.time_msec() time_now = self.clock.time_msec()
response_content = self.encode_response( response_content = yield self.encode_response(
time_now, sync_result, requester.access_token_id, filter time_now, sync_result, requester.access_token_id, filter
) )
defer.returnValue((200, response_content)) defer.returnValue((200, response_content))
@staticmethod @defer.inlineCallbacks
def encode_response(time_now, sync_result, access_token_id, filter): def encode_response(self, time_now, sync_result, access_token_id, filter):
if filter.event_format == 'client': if filter.event_format == 'client':
event_formatter = format_event_for_client_v2_without_room_id event_formatter = format_event_for_client_v2_without_room_id
elif filter.event_format == 'federation': elif filter.event_format == 'federation':
@ -183,24 +183,24 @@ class SyncRestServlet(RestServlet):
else: else:
raise Exception("Unknown event format %s" % (filter.event_format, )) raise Exception("Unknown event format %s" % (filter.event_format, ))
joined = SyncRestServlet.encode_joined( joined = yield self.encode_joined(
sync_result.joined, time_now, access_token_id, sync_result.joined, time_now, access_token_id,
filter.event_fields, filter.event_fields,
event_formatter, event_formatter,
) )
invited = SyncRestServlet.encode_invited( invited = yield self.encode_invited(
sync_result.invited, time_now, access_token_id, sync_result.invited, time_now, access_token_id,
event_formatter, event_formatter,
) )
archived = SyncRestServlet.encode_archived( archived = yield self.encode_archived(
sync_result.archived, time_now, access_token_id, sync_result.archived, time_now, access_token_id,
filter.event_fields, filter.event_fields,
event_formatter, event_formatter,
) )
return { defer.returnValue({
"account_data": {"events": sync_result.account_data}, "account_data": {"events": sync_result.account_data},
"to_device": {"events": sync_result.to_device}, "to_device": {"events": sync_result.to_device},
"device_lists": { "device_lists": {
@ -222,7 +222,7 @@ class SyncRestServlet(RestServlet):
}, },
"device_one_time_keys_count": sync_result.device_one_time_keys_count, "device_one_time_keys_count": sync_result.device_one_time_keys_count,
"next_batch": sync_result.next_batch.to_string(), "next_batch": sync_result.next_batch.to_string(),
} })
@staticmethod @staticmethod
def encode_presence(events, time_now): def encode_presence(events, time_now):
@ -239,8 +239,8 @@ class SyncRestServlet(RestServlet):
] ]
} }
@staticmethod @defer.inlineCallbacks
def encode_joined(rooms, time_now, token_id, event_fields, event_formatter): def encode_joined(self, rooms, time_now, token_id, event_fields, event_formatter):
""" """
Encode the joined rooms in a sync result Encode the joined rooms in a sync result
@ -261,15 +261,15 @@ class SyncRestServlet(RestServlet):
""" """
joined = {} joined = {}
for room in rooms: for room in rooms:
joined[room.room_id] = SyncRestServlet.encode_room( joined[room.room_id] = yield self.encode_room(
room, time_now, token_id, joined=True, only_fields=event_fields, room, time_now, token_id, joined=True, only_fields=event_fields,
event_formatter=event_formatter, event_formatter=event_formatter,
) )
return joined defer.returnValue(joined)
@staticmethod @defer.inlineCallbacks
def encode_invited(rooms, time_now, token_id, event_formatter): def encode_invited(self, rooms, time_now, token_id, event_formatter):
""" """
Encode the invited rooms in a sync result Encode the invited rooms in a sync result
@ -289,7 +289,7 @@ class SyncRestServlet(RestServlet):
""" """
invited = {} invited = {}
for room in rooms: for room in rooms:
invite = serialize_event( invite = yield self._event_serializer.serialize_event(
room.invite, time_now, token_id=token_id, room.invite, time_now, token_id=token_id,
event_format=event_formatter, event_format=event_formatter,
is_invite=True, is_invite=True,
@ -302,10 +302,10 @@ class SyncRestServlet(RestServlet):
"invite_state": {"events": invited_state} "invite_state": {"events": invited_state}
} }
return invited defer.returnValue(invited)
@staticmethod @defer.inlineCallbacks
def encode_archived(rooms, time_now, token_id, event_fields, event_formatter): def encode_archived(self, rooms, time_now, token_id, event_fields, event_formatter):
""" """
Encode the archived rooms in a sync result Encode the archived rooms in a sync result
@ -326,17 +326,17 @@ class SyncRestServlet(RestServlet):
""" """
joined = {} joined = {}
for room in rooms: for room in rooms:
joined[room.room_id] = SyncRestServlet.encode_room( joined[room.room_id] = yield self.encode_room(
room, time_now, token_id, joined=False, room, time_now, token_id, joined=False,
only_fields=event_fields, only_fields=event_fields,
event_formatter=event_formatter, event_formatter=event_formatter,
) )
return joined defer.returnValue(joined)
@staticmethod @defer.inlineCallbacks
def encode_room( def encode_room(
room, time_now, token_id, joined, self, room, time_now, token_id, joined,
only_fields, event_formatter, only_fields, event_formatter,
): ):
""" """
@ -355,9 +355,10 @@ class SyncRestServlet(RestServlet):
Returns: Returns:
dict[str, object]: the room, encoded in our response format dict[str, object]: the room, encoded in our response format
""" """
def serialize(event): def serialize(events):
return serialize_event( return self._event_serializer.serialize_events(
event, time_now, token_id=token_id, events, time_now=time_now,
token_id=token_id,
event_format=event_formatter, event_format=event_formatter,
only_event_fields=only_fields, only_event_fields=only_fields,
) )
@ -376,8 +377,8 @@ class SyncRestServlet(RestServlet):
event.event_id, room.room_id, event.room_id, event.event_id, room.room_id, event.room_id,
) )
serialized_state = [serialize(e) for e in state_events] serialized_state = yield serialize(state_events)
serialized_timeline = [serialize(e) for e in timeline_events] serialized_timeline = yield serialize(timeline_events)
account_data = room.account_data account_data = room.account_data
@ -397,7 +398,7 @@ class SyncRestServlet(RestServlet):
result["unread_notifications"] = room.unread_notifications result["unread_notifications"] = room.unread_notifications
result["summary"] = room.summary result["summary"] = room.summary
return result defer.returnValue(result)
def register_servlets(hs, http_server): def register_servlets(hs, http_server):

View File

@ -35,6 +35,7 @@ from synapse.crypto import context_factory
from synapse.crypto.keyring import Keyring from synapse.crypto.keyring import Keyring
from synapse.events.builder import EventBuilderFactory from synapse.events.builder import EventBuilderFactory
from synapse.events.spamcheck import SpamChecker from synapse.events.spamcheck import SpamChecker
from synapse.events.utils import EventClientSerializer
from synapse.federation.federation_client import FederationClient from synapse.federation.federation_client import FederationClient
from synapse.federation.federation_server import ( from synapse.federation.federation_server import (
FederationHandlerRegistry, FederationHandlerRegistry,
@ -185,6 +186,7 @@ class HomeServer(object):
'sendmail', 'sendmail',
'registration_handler', 'registration_handler',
'account_validity_handler', 'account_validity_handler',
'event_client_serializer',
] ]
REQUIRED_ON_MASTER_STARTUP = [ REQUIRED_ON_MASTER_STARTUP = [
@ -511,6 +513,9 @@ class HomeServer(object):
def build_account_validity_handler(self): def build_account_validity_handler(self):
return AccountValidityHandler(self) return AccountValidityHandler(self)
def build_event_client_serializer(self):
return EventClientSerializer(self)
def remove_pusher(self, app_id, push_key, user_id): def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)

View File

@ -156,6 +156,25 @@ def concurrently_execute(func, args, limit):
], consumeErrors=True)).addErrback(unwrapFirstError) ], consumeErrors=True)).addErrback(unwrapFirstError)
def yieldable_gather_results(func, iter, *args, **kwargs):
"""Executes the function with each argument concurrently.
Args:
func (func): Function to execute that returns a Deferred
iter (iter): An iterable that yields items that get passed as the first
argument to the function
*args: Arguments to be passed to each call to func
Returns
Deferred[list]: Resolved when all functions have been invoked, or errors if
one of the function calls fails.
"""
return logcontext.make_deferred_yieldable(defer.gatherResults([
run_in_background(func, item, *args, **kwargs)
for item in iter
], consumeErrors=True)).addErrback(unwrapFirstError)
class Linearizer(object): class Linearizer(object):
"""Limits concurrent access to resources based on a key. Useful to ensure """Limits concurrent access to resources based on a key. Useful to ensure
only a few things happen at a time on a given resource. only a few things happen at a time on a given resource.