Merge PDUs and Events into one object

This commit is contained in:
Mark Haines 2014-11-14 21:25:02 +00:00
parent 8c2b5ea7c4
commit cb4b6c844a
10 changed files with 91 additions and 212 deletions

View file

@ -22,7 +22,6 @@ from synapse.api.errors import AuthError, FederationError, SynapseError
from synapse.api.events.room import RoomMemberEvent
from synapse.api.constants import Membership
from synapse.util.logutils import log_function
from synapse.federation.pdu_codec import PduCodec
from synapse.util.async import run_on_reactor
from synapse.crypto.event_signing import (
compute_event_signature, check_event_content_hash
@ -69,8 +68,6 @@ class FederationHandler(BaseHandler):
self.replication_layer.set_handler(self)
self.pdu_codec = PduCodec(hs)
# When joining a room we need to queue any events for that room up
self.room_queues = {}
@ -92,7 +89,7 @@ class FederationHandler(BaseHandler):
yield run_on_reactor()
pdu = self.pdu_codec.pdu_from_event(event)
pdu = event
if not hasattr(pdu, "destinations") or not pdu.destinations:
pdu.destinations = []
@ -105,7 +102,7 @@ class FederationHandler(BaseHandler):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
"""
event = self.pdu_codec.event_from_pdu(pdu)
event = pdu
logger.debug("Got event: %s", event.event_id)
@ -118,18 +115,15 @@ class FederationHandler(BaseHandler):
logger.debug("Processing event: %s", event.event_id)
redacted_event = prune_event(event)
redacted_event.origin = pdu.origin
redacted_event.origin_server_ts = pdu.origin_server_ts
redacted_pdu = self.pdu_codec.pdu_from_event(redacted_event)
redacted_pdu_json = redacted_pdu.get_dict()
redacted_pdu_json = redacted_event.get_pdu_json()
try:
yield self.keyring.verify_json_for_server(
event.origin, redacted_pdu_json
)
except SynapseError as e:
logger.warn("Signature check failed for %s redacted to %s",
encode_canonical_json(pdu.get_dict()),
encode_canonical_json(pdu.get_pdu_json()),
encode_canonical_json(redacted_pdu_json),
)
raise FederationError(
@ -147,7 +141,7 @@ class FederationHandler(BaseHandler):
event = redacted_event
if state:
state = [self.pdu_codec.event_from_pdu(p) for p in state]
state = [p for p in state]
is_new_state = yield self.state_handler.annotate_event_with_state(
event,
@ -239,7 +233,7 @@ class FederationHandler(BaseHandler):
events = []
for pdu in pdus:
event = self.pdu_codec.event_from_pdu(pdu)
event = pdu
# FIXME (erikj): Not sure this actually works :/
yield self.state_handler.annotate_event_with_state(event)
@ -260,15 +254,15 @@ class FederationHandler(BaseHandler):
destination=target_host,
context=event.room_id,
event_id=event.event_id,
pdu=self.pdu_codec.pdu_from_event(event)
pdu=event
)
defer.returnValue(self.pdu_codec.event_from_pdu(pdu))
defer.returnValue(pdu)
@defer.inlineCallbacks
def on_event_auth(self, event_id):
auth = yield self.store.get_auth_chain(event_id)
defer.returnValue([self.pdu_codec.pdu_from_event(e) for e in auth])
defer.returnValue([e for e in auth])
@log_function
@defer.inlineCallbacks
@ -292,7 +286,7 @@ class FederationHandler(BaseHandler):
logger.debug("Got response to make_join: %s", pdu)
event = self.pdu_codec.event_from_pdu(pdu)
event = pdu
# We should assert some things.
assert(event.type == RoomMemberEvent.TYPE)
@ -310,10 +304,10 @@ class FederationHandler(BaseHandler):
state = yield self.replication_layer.send_join(
target_host,
self.pdu_codec.pdu_from_event(event)
event
)
state = [self.pdu_codec.event_from_pdu(p) for p in state]
state = [p for p in state]
logger.debug("do_invite_join state: %s", state)
@ -387,7 +381,7 @@ class FederationHandler(BaseHandler):
yield self.auth.add_auth_events(event)
self.auth.check(event, raises=True)
pdu = self.pdu_codec.pdu_from_event(event)
pdu = event
defer.returnValue(pdu)
@ -397,7 +391,7 @@ class FederationHandler(BaseHandler):
""" We have received a join event for a room. Fully process it and
respond with the current state and auth chains.
"""
event = self.pdu_codec.event_from_pdu(pdu)
event = pdu
event.outlier = False
@ -429,7 +423,7 @@ class FederationHandler(BaseHandler):
"user_joined_room", user=user, room_id=event.room_id
)
new_pdu = self.pdu_codec.pdu_from_event(event)
new_pdu = event
destinations = set()
@ -450,17 +444,10 @@ class FederationHandler(BaseHandler):
yield self.replication_layer.send_pdu(new_pdu)
auth_chain = yield self.store.get_auth_chain(event.event_id)
pdu_auth_chain = [
self.pdu_codec.pdu_from_event(e)
for e in auth_chain
]
defer.returnValue({
"state": [
self.pdu_codec.pdu_from_event(e)
for e in event.state_events.values()
],
"auth_chain": pdu_auth_chain,
"state": event.state_events.values(),
"auth_chain": auth_chain,
})
@defer.inlineCallbacks
@ -469,7 +456,7 @@ class FederationHandler(BaseHandler):
Respond with the now signed event.
"""
event = self.pdu_codec.event_from_pdu(pdu)
event = pdu
event.outlier = True
@ -493,7 +480,7 @@ class FederationHandler(BaseHandler):
event, extra_users=[target_user],
)
defer.returnValue(self.pdu_codec.pdu_from_event(event))
defer.returnValue(event)
@defer.inlineCallbacks
def get_state_for_pdu(self, origin, room_id, event_id):
@ -524,12 +511,7 @@ class FederationHandler(BaseHandler):
else:
del results[(event.type, event.state_key)]
defer.returnValue(
[
self.pdu_codec.pdu_from_event(s)
for s in results.values()
]
)
defer.returnValue(results.values())
else:
defer.returnValue([])
@ -546,10 +528,7 @@ class FederationHandler(BaseHandler):
limit
)
defer.returnValue([
self.pdu_codec.pdu_from_event(e)
for e in events
])
defer.returnValue(events)
@defer.inlineCallbacks
@log_function
@ -572,7 +551,7 @@ class FederationHandler(BaseHandler):
if not in_room:
raise AuthError(403, "Host not in room.")
defer.returnValue(self.pdu_codec.pdu_from_event(event))
defer.returnValue(event)
else:
defer.returnValue(None)