Process transactions serially.

Since the events received in a transaction are ordered, later events
might depend on earlier events and so we shouldn't blindly process them
in parellel.
This commit is contained in:
Erik Johnston 2015-03-02 11:39:40 +00:00
parent a025055643
commit 23d9bd1d74

View File

@ -112,17 +112,23 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id) logger.debug("[%s] Transaction is new", transaction.transaction_id)
with PreserveLoggingContext(): with PreserveLoggingContext():
dl = [] results = []
for pdu in pdu_list: for pdu in pdu_list:
d = self._handle_new_pdu(transaction.origin, pdu) d = self._handle_new_pdu(transaction.origin, pdu)
def handle_failure(failure): def handle_failure(failure):
failure.trap(FederationError) failure.trap(FederationError)
self.send_failure(failure.value, transaction.origin) self.send_failure(failure.value, transaction.origin)
return failure
d.addErrback(handle_failure) d.addErrback(handle_failure)
dl.append(d) try:
yield d
results.append({})
except Exception as e:
results.append({"error": str(e)})
if hasattr(transaction, "edus"): if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]: for edu in [Edu(**x) for x in transaction.edus]:
@ -135,21 +141,11 @@ class FederationServer(FederationBase):
for failure in getattr(transaction, "pdu_failures", []): for failure in getattr(transaction, "pdu_failures", []):
logger.info("Got failure %r", failure) logger.info("Got failure %r", failure)
results = yield defer.DeferredList(dl, consumeErrors=True) logger.debug("Returning: %s", str(results))
ret = []
for r in results:
if r[0]:
ret.append({})
else:
logger.exception(r[1])
ret.append({"error": str(r[1].value)})
logger.debug("Returning: %s", str(ret))
response = { response = {
"pdus": dict(zip( "pdus": dict(zip(
(p.event_id for p in pdu_list), ret (p.event_id for p in pdu_list), results
)), )),
} }