Merge pull request #95 from matrix-org/serialize_transaction_processing

Process transactions serially.
This commit is contained in:
Erik Johnston 2015-03-02 13:33:05 +00:00
commit 210d3c5d72

View File

@ -112,17 +112,19 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id) logger.debug("[%s] Transaction is new", transaction.transaction_id)
with PreserveLoggingContext(): with PreserveLoggingContext():
dl = [] results = []
for pdu in pdu_list: for pdu in pdu_list:
d = self._handle_new_pdu(transaction.origin, pdu) d = self._handle_new_pdu(transaction.origin, pdu)
def handle_failure(failure): try:
failure.trap(FederationError) yield d
self.send_failure(failure.value, transaction.origin) results.append({})
except FederationError as e:
d.addErrback(handle_failure) self.send_failure(e, transaction.origin)
results.append({"error": str(e)})
dl.append(d) except Exception as e:
results.append({"error": str(e)})
if hasattr(transaction, "edus"): if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]: for edu in [Edu(**x) for x in transaction.edus]:
@ -135,21 +137,11 @@ class FederationServer(FederationBase):
for failure in getattr(transaction, "pdu_failures", []): for failure in getattr(transaction, "pdu_failures", []):
logger.info("Got failure %r", failure) logger.info("Got failure %r", failure)
results = yield defer.DeferredList(dl, consumeErrors=True) logger.debug("Returning: %s", str(results))
ret = []
for r in results:
if r[0]:
ret.append({})
else:
logger.exception(r[1])
ret.append({"error": str(r[1].value)})
logger.debug("Returning: %s", str(ret))
response = { response = {
"pdus": dict(zip( "pdus": dict(zip(
(p.event_id for p in pdu_list), ret (p.event_id for p in pdu_list), results
)), )),
} }