Merge pull request #6274 from matrix-org/erikj/replication_async

Port replication http server endpoints to async/await
This commit is contained in:
Erik Johnston 2019-10-30 11:44:08 +01:00 committed by GitHub
commit ba4cc5541c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 27 additions and 44 deletions

1
changelog.d/6274.misc Normal file
View File

@ -0,0 +1 @@
Port replication http server endpoints to async/await.

View File

@ -110,14 +110,14 @@ class ReplicationEndpoint(object):
return {} return {}
@abc.abstractmethod @abc.abstractmethod
def _handle_request(self, request, **kwargs): async def _handle_request(self, request, **kwargs):
"""Handle incoming request. """Handle incoming request.
This is called with the request object and PATH_ARGS. This is called with the request object and PATH_ARGS.
Returns: Returns:
Deferred[dict]: A JSON serialisable dict to be used as response tuple[int, dict]: HTTP status code and a JSON serialisable dict
body of request. to be used as response body of request.
""" """
pass pass

View File

@ -82,8 +82,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
return payload return payload
@defer.inlineCallbacks async def _handle_request(self, request):
def _handle_request(self, request):
with Measure(self.clock, "repl_fed_send_events_parse"): with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
@ -101,15 +100,13 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
EventType = event_type_from_format_version(format_ver) EventType = event_type_from_format_version(format_ver)
event = EventType(event_dict, internal_metadata, rejected_reason) event = EventType(event_dict, internal_metadata, rejected_reason)
context = yield EventContext.deserialize( context = EventContext.deserialize(self.store, event_payload["context"])
self.store, event_payload["context"]
)
event_and_contexts.append((event, context)) event_and_contexts.append((event, context))
logger.info("Got %d events from federation", len(event_and_contexts)) logger.info("Got %d events from federation", len(event_and_contexts))
yield self.federation_handler.persist_events_and_notify( await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled event_and_contexts, backfilled
) )
@ -144,8 +141,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
def _serialize_payload(edu_type, origin, content): def _serialize_payload(edu_type, origin, content):
return {"origin": origin, "content": content} return {"origin": origin, "content": content}
@defer.inlineCallbacks async def _handle_request(self, request, edu_type):
def _handle_request(self, request, edu_type):
with Measure(self.clock, "repl_fed_send_edu_parse"): with Measure(self.clock, "repl_fed_send_edu_parse"):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
@ -154,7 +150,7 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
logger.info("Got %r edu from %s", edu_type, origin) logger.info("Got %r edu from %s", edu_type, origin)
result = yield self.registry.on_edu(edu_type, origin, edu_content) result = await self.registry.on_edu(edu_type, origin, edu_content)
return 200, result return 200, result
@ -193,8 +189,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
""" """
return {"args": args} return {"args": args}
@defer.inlineCallbacks async def _handle_request(self, request, query_type):
def _handle_request(self, request, query_type):
with Measure(self.clock, "repl_fed_query_parse"): with Measure(self.clock, "repl_fed_query_parse"):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
@ -202,7 +197,7 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
logger.info("Got %r query", query_type) logger.info("Got %r query", query_type)
result = yield self.registry.on_query(query_type, args) result = await self.registry.on_query(query_type, args)
return 200, result return 200, result
@ -234,9 +229,8 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
""" """
return {} return {}
@defer.inlineCallbacks async def _handle_request(self, request, room_id):
def _handle_request(self, request, room_id): await self.store.clean_room_for_join(room_id)
yield self.store.clean_room_for_join(room_id)
return 200, {} return 200, {}

View File

@ -15,8 +15,6 @@
import logging import logging
from twisted.internet import defer
from synapse.http.servlet import parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint from synapse.replication.http._base import ReplicationEndpoint
@ -52,15 +50,14 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
"is_guest": is_guest, "is_guest": is_guest,
} }
@defer.inlineCallbacks async def _handle_request(self, request, user_id):
def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
device_id = content["device_id"] device_id = content["device_id"]
initial_display_name = content["initial_display_name"] initial_display_name = content["initial_display_name"]
is_guest = content["is_guest"] is_guest = content["is_guest"]
device_id, access_token = yield self.registration_handler.register_device( device_id, access_token = await self.registration_handler.register_device(
user_id, device_id, initial_display_name, is_guest user_id, device_id, initial_display_name, is_guest
) )

View File

@ -15,8 +15,6 @@
import logging import logging
from twisted.internet import defer
from synapse.http.servlet import parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID from synapse.types import Requester, UserID
@ -65,8 +63,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
"content": content, "content": content,
} }
@defer.inlineCallbacks async def _handle_request(self, request, room_id, user_id):
def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"] remote_room_hosts = content["remote_room_hosts"]
@ -79,7 +76,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
logger.info("remote_join: %s into room: %s", user_id, room_id) logger.info("remote_join: %s into room: %s", user_id, room_id)
yield self.federation_handler.do_invite_join( await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user_id, event_content remote_room_hosts, room_id, user_id, event_content
) )
@ -123,8 +120,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
"remote_room_hosts": remote_room_hosts, "remote_room_hosts": remote_room_hosts,
} }
@defer.inlineCallbacks async def _handle_request(self, request, room_id, user_id):
def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"] remote_room_hosts = content["remote_room_hosts"]
@ -137,7 +133,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id) logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id)
try: try:
event = yield self.federation_handler.do_remotely_reject_invite( event = await self.federation_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, user_id remote_room_hosts, room_id, user_id
) )
ret = event.get_pdu_json() ret = event.get_pdu_json()
@ -150,7 +146,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
# #
logger.warn("Failed to reject invite: %s", e) logger.warn("Failed to reject invite: %s", e)
yield self.store.locally_reject_invite(user_id, room_id) await self.store.locally_reject_invite(user_id, room_id)
ret = {} ret = {}
return 200, ret return 200, ret

View File

@ -15,8 +15,6 @@
import logging import logging
from twisted.internet import defer
from synapse.http.servlet import parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint from synapse.replication.http._base import ReplicationEndpoint
@ -74,11 +72,10 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
"address": address, "address": address,
} }
@defer.inlineCallbacks async def _handle_request(self, request, user_id):
def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
yield self.registration_handler.register_with_store( await self.registration_handler.register_with_store(
user_id=user_id, user_id=user_id,
password_hash=content["password_hash"], password_hash=content["password_hash"],
was_guest=content["was_guest"], was_guest=content["was_guest"],
@ -117,14 +114,13 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
""" """
return {"auth_result": auth_result, "access_token": access_token} return {"auth_result": auth_result, "access_token": access_token}
@defer.inlineCallbacks async def _handle_request(self, request, user_id):
def _handle_request(self, request, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
auth_result = content["auth_result"] auth_result = content["auth_result"]
access_token = content["access_token"] access_token = content["access_token"]
yield self.registration_handler.post_registration_actions( await self.registration_handler.post_registration_actions(
user_id=user_id, auth_result=auth_result, access_token=access_token user_id=user_id, auth_result=auth_result, access_token=access_token
) )

View File

@ -87,8 +87,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
return payload return payload
@defer.inlineCallbacks async def _handle_request(self, request, event_id):
def _handle_request(self, request, event_id):
with Measure(self.clock, "repl_send_event_parse"): with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
@ -101,7 +100,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
event = EventType(event_dict, internal_metadata, rejected_reason) event = EventType(event_dict, internal_metadata, rejected_reason)
requester = Requester.deserialize(self.store, content["requester"]) requester = Requester.deserialize(self.store, content["requester"])
context = yield EventContext.deserialize(self.store, content["context"]) context = EventContext.deserialize(self.store, content["context"])
ratelimit = content["ratelimit"] ratelimit = content["ratelimit"]
extra_users = [UserID.from_string(u) for u in content["extra_users"]] extra_users = [UserID.from_string(u) for u in content["extra_users"]]
@ -113,7 +112,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
"Got event to send with ID: %s into room: %s", event.event_id, event.room_id "Got event to send with ID: %s into room: %s", event.event_id, event.room_id
) )
yield self.event_creation_handler.persist_and_notify_client_event( await self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users requester, event, context, ratelimit=ratelimit, extra_users=extra_users
) )