Add /state_ids federation API

The new API only returns the event_ids for the state, as most
requesters will already have the vast majority of the events already.
This commit is contained in:
Erik Johnston 2016-08-03 14:47:37 +01:00
parent 97f072db74
commit e3a720217a
4 changed files with 125 additions and 3 deletions

View File

@ -314,10 +314,33 @@ class FederationClient(FederationBase):
Deferred: Results in a list of PDUs. Deferred: Results in a list of PDUs.
""" """
result = yield self.transport_layer.get_room_state( try:
# First we try and ask for just the IDs, as thats far quicker if
# we have most of the state and auth_chain already.
# However, this may 404 if the other side has an old synapse.
result = yield self.transport_layer.get_room_state_ids(
destination, room_id, event_id=event_id, destination, room_id, event_id=event_id,
) )
state_event_ids = result["pdus"]
auth_event_ids = result.get("auth_chain", [])
event_map, _failed_to_fetch = yield self.get_events(
[destination], room_id, set(state_event_ids + auth_event_ids)
)
pdus = [event_map[e_id] for e_id in state_event_ids]
auth_chain = [event_map[e_id] for e_id in auth_event_ids]
auth_chain.sort(key=lambda e: e.depth)
defer.returnValue((pdus, auth_chain))
except HttpResponseException as e:
if e.code == 404:
logger.info("Failed to use get_room_state_ids API, falling back")
else:
raise e
pdus = [ pdus = [
self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] self.event_from_pdu_json(p, outlier=True) for p in result["pdus"]
] ]
@ -339,6 +362,50 @@ class FederationClient(FederationBase):
defer.returnValue((signed_pdus, signed_auth)) defer.returnValue((signed_pdus, signed_auth))
@defer.inlineCallbacks
def get_events(self, destinations, room_id, event_ids, return_local=True):
if return_local:
seen_events = yield self.store.get_events(event_ids)
signed_events = seen_events.values()
else:
seen_events = yield self.store.have_events(event_ids)
signed_events = []
failed_to_fetch = []
missing_events = set(event_ids)
for k in seen_events:
missing_events.discard(k)
if not missing_events:
defer.returnValue((signed_events, failed_to_fetch))
def random_server_list():
srvs = list(destinations)
random.shuffle(srvs)
return srvs
batch_size = 20
for i in xrange(0, len(missing_events), batch_size):
batch = missing_events[i:i + batch_size]
deferreds = [
self.get_pdu(
destinations=random_server_list(),
event_id=e_id,
).addBoth(lambda r, e: (r, e), e_id)
for e_id in batch
]
res = yield defer.DeferredList(deferreds, consumeErrors=True)
for (result, val), (e_id, _) in res:
if result and val:
signed_events.append(val)
else:
failed_to_fetch.add(e_id)
defer.returnValue((signed_events, failed_to_fetch))
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def get_event_auth(self, destination, room_id, event_id): def get_event_auth(self, destination, room_id, event_id):

View File

@ -214,6 +214,27 @@ class FederationServer(FederationBase):
defer.returnValue((200, resp)) defer.returnValue((200, resp))
@defer.inlineCallbacks
def on_state_ids_request(self, origin, room_id, event_id):
if not event_id:
raise NotImplementedError("Specify an event")
in_room = yield self.auth.check_host_in_room(room_id, origin)
if not in_room:
raise AuthError(403, "Host not in room.")
pdus = yield self.handler.get_state_for_pdu(
room_id, event_id,
)
auth_chain = yield self.store.get_auth_chain(
[pdu.event_id for pdu in pdus]
)
defer.returnValue((200, {
"pdus": [pdu.event_id for pdu in pdus],
"auth_chain": [pdu.event_id for pdu in auth_chain],
}))
@defer.inlineCallbacks @defer.inlineCallbacks
def _on_context_state_request_compute(self, room_id, event_id): def _on_context_state_request_compute(self, room_id, event_id):
pdus = yield self.handler.get_state_for_pdu( pdus = yield self.handler.get_state_for_pdu(

View File

@ -54,6 +54,28 @@ class TransportLayerClient(object):
destination, path=path, args={"event_id": event_id}, destination, path=path, args={"event_id": event_id},
) )
@log_function
def get_room_state_ids(self, destination, room_id, event_id):
""" Requests all state for a given room from the given server at the
given event. Returns the state's event_id's
Args:
destination (str): The host name of the remote home server we want
to get the state from.
context (str): The name of the context we want the state of
event_id (str): The event we want the context at.
Returns:
Deferred: Results in a dict received from the remote homeserver.
"""
logger.debug("get_room_state_ids dest=%s, room=%s",
destination, room_id)
path = PREFIX + "/state_ids/%s/" % room_id
return self.client.get_json(
destination, path=path, args={"event_id": event_id},
)
@log_function @log_function
def get_event(self, destination, event_id, timeout=None): def get_event(self, destination, event_id, timeout=None):
""" Requests the pdu with give id and origin from the given server. """ Requests the pdu with give id and origin from the given server.

View File

@ -271,6 +271,17 @@ class FederationStateServlet(BaseFederationServlet):
) )
class FederationStateIdsServlet(BaseFederationServlet):
PATH = "/state_ids/(?P<room_id>[^/]*)/"
def on_GET(self, origin, content, query, room_id):
return self.handler.on_state_ids_request(
origin,
room_id,
query.get("event_id", [None])[0],
)
class FederationBackfillServlet(BaseFederationServlet): class FederationBackfillServlet(BaseFederationServlet):
PATH = "/backfill/(?P<context>[^/]*)/" PATH = "/backfill/(?P<context>[^/]*)/"
@ -538,6 +549,7 @@ SERVLET_CLASSES = (
FederationPullServlet, FederationPullServlet,
FederationEventServlet, FederationEventServlet,
FederationStateServlet, FederationStateServlet,
FederationStateIdsServlet,
FederationBackfillServlet, FederationBackfillServlet,
FederationQueryServlet, FederationQueryServlet,
FederationMakeJoinServlet, FederationMakeJoinServlet,