Linearize some federation endpoints based on (origin, room_id)

This commit is contained in:
Erik Johnston 2016-06-17 16:43:45 +01:00
parent 9ba2bf1570
commit 8f4a9bbc16
2 changed files with 77 additions and 66 deletions

View File

@ -49,6 +49,7 @@ class FederationServer(FederationBase):
super(FederationServer, self).__init__(hs) super(FederationServer, self).__init__(hs)
self._room_pdu_linearizer = Linearizer() self._room_pdu_linearizer = Linearizer()
self._server_linearizer = Linearizer()
def set_handler(self, handler): def set_handler(self, handler):
"""Sets the handler that the replication layer will use to communicate """Sets the handler that the replication layer will use to communicate
@ -89,11 +90,14 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_backfill_request(self, origin, room_id, versions, limit): def on_backfill_request(self, origin, room_id, versions, limit):
pdus = yield self.handler.on_backfill_request( with (yield self._server_linearizer.queue((origin, room_id))):
origin, room_id, versions, limit pdus = yield self.handler.on_backfill_request(
) origin, room_id, versions, limit
)
defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) res = self._transaction_from_pdus(pdus).get_dict()
defer.returnValue((200, res))
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
@ -184,27 +188,28 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_context_state_request(self, origin, room_id, event_id): def on_context_state_request(self, origin, room_id, event_id):
if event_id: with (yield self._server_linearizer.queue((origin, room_id))):
pdus = yield self.handler.get_state_for_pdu( if event_id:
origin, room_id, event_id, pdus = yield self.handler.get_state_for_pdu(
) origin, room_id, event_id,
auth_chain = yield self.store.get_auth_chain( )
[pdu.event_id for pdu in pdus] auth_chain = yield self.store.get_auth_chain(
) [pdu.event_id for pdu in pdus]
)
for event in auth_chain: for event in auth_chain:
# We sign these again because there was a bug where we # We sign these again because there was a bug where we
# incorrectly signed things the first time round # incorrectly signed things the first time round
if self.hs.is_mine_id(event.event_id): if self.hs.is_mine_id(event.event_id):
event.signatures.update( event.signatures.update(
compute_event_signature( compute_event_signature(
event, event,
self.hs.hostname, self.hs.hostname,
self.hs.config.signing_key[0] self.hs.config.signing_key[0]
)
) )
) else:
else: raise NotImplementedError("Specify an event")
raise NotImplementedError("Specify an event")
defer.returnValue((200, { defer.returnValue((200, {
"pdus": [pdu.get_pdu_json() for pdu in pdus], "pdus": [pdu.get_pdu_json() for pdu in pdus],
@ -283,14 +288,16 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_event_auth(self, origin, room_id, event_id): def on_event_auth(self, origin, room_id, event_id):
time_now = self._clock.time_msec() with (yield self._server_linearizer.queue((origin, room_id))):
auth_pdus = yield self.handler.on_event_auth(event_id) time_now = self._clock.time_msec()
defer.returnValue((200, { auth_pdus = yield self.handler.on_event_auth(event_id)
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], res = {
})) "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
}
defer.returnValue((200, res))
@defer.inlineCallbacks @defer.inlineCallbacks
def on_query_auth_request(self, origin, content, event_id): def on_query_auth_request(self, origin, content, room_id, event_id):
""" """
Content is a dict with keys:: Content is a dict with keys::
auth_chain (list): A list of events that give the auth chain. auth_chain (list): A list of events that give the auth chain.
@ -309,32 +316,33 @@ class FederationServer(FederationBase):
Returns: Returns:
Deferred: Results in `dict` with the same format as `content` Deferred: Results in `dict` with the same format as `content`
""" """
auth_chain = [ with (yield self._server_linearizer.queue((origin, room_id))):
self.event_from_pdu_json(e) auth_chain = [
for e in content["auth_chain"] self.event_from_pdu_json(e)
] for e in content["auth_chain"]
]
signed_auth = yield self._check_sigs_and_hash_and_fetch( signed_auth = yield self._check_sigs_and_hash_and_fetch(
origin, auth_chain, outlier=True origin, auth_chain, outlier=True
) )
ret = yield self.handler.on_query_auth( ret = yield self.handler.on_query_auth(
origin, origin,
event_id, event_id,
signed_auth, signed_auth,
content.get("rejects", []), content.get("rejects", []),
content.get("missing", []), content.get("missing", []),
) )
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
send_content = { send_content = {
"auth_chain": [ "auth_chain": [
e.get_pdu_json(time_now) e.get_pdu_json(time_now)
for e in ret["auth_chain"] for e in ret["auth_chain"]
], ],
"rejects": ret.get("rejects", []), "rejects": ret.get("rejects", []),
"missing": ret.get("missing", []), "missing": ret.get("missing", []),
} }
defer.returnValue( defer.returnValue(
(200, send_content) (200, send_content)
@ -386,21 +394,24 @@ class FederationServer(FederationBase):
@log_function @log_function
def on_get_missing_events(self, origin, room_id, earliest_events, def on_get_missing_events(self, origin, room_id, earliest_events,
latest_events, limit, min_depth): latest_events, limit, min_depth):
logger.info( with (yield self._server_linearizer.queue((origin, room_id))):
"on_get_missing_events: earliest_events: %r, latest_events: %r," logger.info(
" limit: %d, min_depth: %d", "on_get_missing_events: earliest_events: %r, latest_events: %r,"
earliest_events, latest_events, limit, min_depth " limit: %d, min_depth: %d",
) earliest_events, latest_events, limit, min_depth
missing_events = yield self.handler.on_get_missing_events( )
origin, room_id, earliest_events, latest_events, limit, min_depth missing_events = yield self.handler.on_get_missing_events(
) origin, room_id, earliest_events, latest_events, limit, min_depth
)
if len(missing_events) < 5: if len(missing_events) < 5:
logger.info("Returning %d events: %r", len(missing_events), missing_events) logger.info(
else: "Returning %d events: %r", len(missing_events), missing_events
logger.info("Returning %d events", len(missing_events)) )
else:
logger.info("Returning %d events", len(missing_events))
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
defer.returnValue({ defer.returnValue({
"events": [ev.get_pdu_json(time_now) for ev in missing_events], "events": [ev.get_pdu_json(time_now) for ev in missing_events],

View File

@ -388,7 +388,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, origin, content, query, context, event_id): def on_POST(self, origin, content, query, context, event_id):
new_content = yield self.handler.on_query_auth_request( new_content = yield self.handler.on_query_auth_request(
origin, content, event_id origin, content, context, event_id
) )
defer.returnValue((200, new_content)) defer.returnValue((200, new_content))