Fill out prev_events before calling persist_event

This commit is contained in:
Mark Haines 2014-08-27 13:34:28 +01:00
parent 474dcecb11
commit a03c7f27a8
5 changed files with 57 additions and 83 deletions

View File

@ -32,6 +32,8 @@ class BaseRoomHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _on_new_room_event(self, event, snapshot, extra_destinations=[]): def _on_new_room_event(self, event, snapshot, extra_destinations=[]):
snapshot.fill_out_prev_events(event)
store_id = yield self.store.persist_event(event) store_id = yield self.store.persist_event(event)
destinations = set(extra_destinations) destinations = set(extra_destinations)

View File

@ -78,7 +78,6 @@ class FederationHandler(BaseHandler):
Deferred: Resolved when it has successfully been queued for Deferred: Resolved when it has successfully been queued for
processing. processing.
""" """
yield self.fill_out_prev_events(event, snapshot)
pdu = self.pdu_codec.pdu_from_event(event) pdu = self.pdu_codec.pdu_from_event(event)
@ -87,7 +86,6 @@ class FederationHandler(BaseHandler):
yield self.replication_layer.send_pdu(pdu) yield self.replication_layer.send_pdu(pdu)
@log_function @log_function
def get_state_for_room(self, destination, room_id): def get_state_for_room(self, destination, room_id):
return self.replication_layer.get_state_for_context( return self.replication_layer.get_state_for_context(
@ -102,64 +100,18 @@ class FederationHandler(BaseHandler):
""" """
event = self.pdu_codec.event_from_pdu(pdu) event = self.pdu_codec.event_from_pdu(pdu)
try: with (yield self.lock_manager.lock(pdu.context)):
with (yield self.lock_manager.lock(pdu.context)): if event.is_state and not backfilled:
if event.is_state and not backfilled: is_new_state = yield self.state_handler.handle_new_state(
is_new_state = yield self.state_handler.handle_new_state( pdu
pdu )
) if not is_new_state:
if not is_new_state: return
return else:
else: is_new_state = False
is_new_state = False # TODO: Implement something in federation that allows us to
# respond to PDU.
yield self.on_receive(event, is_new_state, backfilled)
except AuthError:
# TODO: Implement something in federation that allows us to
# respond to PDU.
raise
return
@defer.inlineCallbacks
def _on_new_state(self, pdu, new_state_event):
# TODO: Do any store stuff here. Notifiy C2S about this new
# state.
yield self.store.update_current_state(
pdu_id=pdu.pdu_id,
origin=pdu.origin,
context=pdu.context,
pdu_type=pdu.pdu_type,
state_key=pdu.state_key
)
yield self.on_receive(new_state_event)
@defer.inlineCallbacks
def fill_out_prev_events(self, event, snapshot):
if hasattr(event, "prev_events"):
return
results = snapshot.prev_pdus
es = [
"%s@%s" % (p_id, origin) for p_id, origin, _ in results
]
event.prev_events = [e for e in es if e != event.event_id]
if results:
event.depth = max([int(v) for _, _, v in results]) + 1
else:
event.depth = 0
@log_function
@defer.inlineCallbacks
def on_receive(self, event, is_new_state, backfilled):
if hasattr(event, "state_key") and not is_new_state: if hasattr(event, "state_key") and not is_new_state:
logger.debug("Ignoring old state.") logger.debug("Ignoring old state.")
return return

View File

@ -58,12 +58,6 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def persist_event(self, event=None, backfilled=False, pdu=None): def persist_event(self, event=None, backfilled=False, pdu=None):
# FIXME (erikj): This should be removed when we start amalgamating
# event and pdu storage
if event is not None:
federation_handler = self.hs.get_handlers().federation_handler
yield federation_handler.fill_out_prev_events(event)
stream_ordering = None stream_ordering = None
if backfilled: if backfilled:
if not self.min_token_deferred.called: if not self.min_token_deferred.called:
@ -290,6 +284,21 @@ class Snapshot(object):
self.state_key = state_key self.state_key = state_key
self.prev_state_pdu = prev_state_pdu self.prev_state_pdu = prev_state_pdu
def fill_out_prev_events(self, event):
if hasattr(event, "prev_events"):
return
es = [
"%s@%s" % (p_id, origin) for p_id, origin, _ in self.prev_pdus
]
event.prev_events = [e for e in es if e != event.event_id]
if self.prev_pdus:
event.depth = max([int(v) for _, _, v in results]) + 1
else:
event.depth = 0
def schema_path(schema): def schema_path(schema):
""" Get a filesystem path for the named database schema """ Get a filesystem path for the named database schema

View File

@ -22,8 +22,9 @@ from synapse.api.events.room import (
from synapse.api.constants import Membership from synapse.api.constants import Membership
from synapse.handlers.federation import FederationHandler from synapse.handlers.federation import FederationHandler
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.federation.units import Pdu
from mock import NonCallableMock from mock import NonCallableMock, ANY
import logging import logging
@ -60,37 +61,42 @@ class FederationTestCase(unittest.TestCase):
@defer.inlineCallbacks @defer.inlineCallbacks
def test_msg(self): def test_msg(self):
event = self.hs.get_event_factory().create_event( pdu = Pdu(
etype=MessageEvent.TYPE, pdu_type=MessageEvent.TYPE,
msg_id="bob", context="foo",
room_id="foo",
content={"msgtype": u"fooo"}, content={"msgtype": u"fooo"},
ts=0,
pdu_id="a",
origin="b",
) )
store_id = "ASD" store_id = "ASD"
self.datastore.persist_event.return_value = defer.succeed(store_id) self.datastore.persist_event.return_value = defer.succeed(store_id)
self.datastore.get_room.return_value = defer.succeed(True) self.datastore.get_room.return_value = defer.succeed(True)
yield self.handlers.federation_handler.on_receive(event, False, False) yield self.handlers.federation_handler.on_receive_pdu(pdu, False)
self.datastore.persist_event.assert_called_once_with(event, False) self.datastore.persist_event.assert_called_once_with(ANY, False)
self.notifier.on_new_room_event.assert_called_once_with( self.notifier.on_new_room_event.assert_called_once_with(
event, store_id) ANY, store_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_invite_join_target_this(self): def test_invite_join_target_this(self):
room_id = "foo" room_id = "foo"
user_id = "@bob:red" user_id = "@bob:red"
event = self.hs.get_event_factory().create_event( pdu = Pdu(
etype=InviteJoinEvent.TYPE, pdu_type=InviteJoinEvent.TYPE,
user_id=user_id, user_id=user_id,
target_host=self.hostname, target_host=self.hostname,
room_id=room_id, context=room_id,
content={}, content={},
ts=0,
pdu_id="a",
origin="b",
) )
yield self.handlers.federation_handler.on_receive(event, False, False) yield self.handlers.federation_handler.on_receive_pdu(pdu, False)
mem_handler = self.handlers.room_member_handler mem_handler = self.handlers.room_member_handler
self.assertEquals(1, mem_handler.change_membership.call_count) self.assertEquals(1, mem_handler.change_membership.call_count)
@ -107,15 +113,18 @@ class FederationTestCase(unittest.TestCase):
room_id = "foo" room_id = "foo"
user_id = "@bob:red" user_id = "@bob:red"
event = self.hs.get_event_factory().create_event( pdu = Pdu(
etype=InviteJoinEvent.TYPE, pdu_type=InviteJoinEvent.TYPE,
user_id=user_id, user_id=user_id,
target_user_id="@red:not%s" % self.hostname, state_key="@red:not%s" % self.hostname,
room_id=room_id, context=room_id,
content={}, content={},
ts=0,
pdu_id="a",
origin="b",
) )
yield self.handlers.federation_handler.on_receive(event, False, False) yield self.handlers.federation_handler.on_receive_pdu(pdu, False)
mem_handler = self.handlers.room_member_handler mem_handler = self.handlers.room_member_handler
self.assertEquals(0, mem_handler.change_membership.call_count) self.assertEquals(0, mem_handler.change_membership.call_count)

View File

@ -127,7 +127,9 @@ class MemoryDataStore(object):
self.current_state = {} self.current_state = {}
self.events = [] self.events = []
Snapshot = namedtuple("Snapshot", "room_id user_id membership_state") class Snapshot(namedtuple("Snapshot", "room_id user_id membership_state")):
def fill_out_prev_events(self, event):
pass
def snapshot_room(self, room_id, user_id, state_type=None, state_key=None): def snapshot_room(self, room_id, user_id, state_type=None, state_key=None):
return self.Snapshot( return self.Snapshot(