mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-02 11:06:07 -04:00
Use state groups to get current state. Make join dance actually work.
This commit is contained in:
parent
f71627567b
commit
5ffe5ab43f
10 changed files with 231 additions and 72 deletions
|
@ -62,6 +62,9 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
self.pdu_codec = PduCodec(hs)
|
||||
|
||||
# When joining a room we need to queue any events for that room up
|
||||
self.room_queues = {}
|
||||
|
||||
@log_function
|
||||
@defer.inlineCallbacks
|
||||
def handle_new_event(self, event, snapshot):
|
||||
|
@ -95,22 +98,25 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
logger.debug("Got event: %s", event.event_id)
|
||||
|
||||
if event.room_id in self.room_queues:
|
||||
self.room_queues[event.room_id].append(pdu)
|
||||
return
|
||||
|
||||
if state:
|
||||
state = [self.pdu_codec.event_from_pdu(p) for p in state]
|
||||
state = {(e.type, e.state_key): e for e in state}
|
||||
yield self.state_handler.annotate_state_groups(event, state=state)
|
||||
|
||||
is_new_state = yield self.state_handler.annotate_state_groups(
|
||||
event,
|
||||
state=state
|
||||
)
|
||||
|
||||
logger.debug("Event: %s", event)
|
||||
|
||||
if not backfilled:
|
||||
yield self.auth.check(event, None, raises=True)
|
||||
|
||||
if event.is_state and not backfilled:
|
||||
is_new_state = yield self.state_handler.handle_new_state(
|
||||
pdu
|
||||
)
|
||||
else:
|
||||
is_new_state = False
|
||||
is_new_state = is_new_state and not backfilled
|
||||
|
||||
# TODO: Implement something in federation that allows us to
|
||||
# respond to PDU.
|
||||
|
@ -211,6 +217,8 @@ class FederationHandler(BaseHandler):
|
|||
assert(event.state_key == joinee)
|
||||
assert(event.room_id == room_id)
|
||||
|
||||
self.room_queues[room_id] = []
|
||||
|
||||
event.event_id = self.event_factory.create_event_id()
|
||||
event.content = content
|
||||
|
||||
|
@ -219,15 +227,14 @@ class FederationHandler(BaseHandler):
|
|||
self.pdu_codec.pdu_from_event(event)
|
||||
)
|
||||
|
||||
# TODO (erikj): Time out here.
|
||||
d = defer.Deferred()
|
||||
self.waiting_for_join_list.setdefault((joinee, room_id), []).append(d)
|
||||
reactor.callLater(10, d.cancel)
|
||||
state = [self.pdu_codec.event_from_pdu(p) for p in state]
|
||||
|
||||
try:
|
||||
yield d
|
||||
except defer.CancelledError:
|
||||
raise SynapseError(500, "Unable to join remote room")
|
||||
logger.debug("do_invite_join state: %s", state)
|
||||
|
||||
is_new_state = yield self.state_handler.annotate_state_groups(
|
||||
event,
|
||||
state=state
|
||||
)
|
||||
|
||||
try:
|
||||
yield self.store.store_room(
|
||||
|
@ -239,6 +246,32 @@ class FederationHandler(BaseHandler):
|
|||
# FIXME
|
||||
pass
|
||||
|
||||
for e in state:
|
||||
# FIXME: Auth these.
|
||||
is_new_state = yield self.state_handler.annotate_state_groups(
|
||||
e,
|
||||
state=state
|
||||
)
|
||||
|
||||
yield self.store.persist_event(
|
||||
e,
|
||||
backfilled=False,
|
||||
is_new_state=False
|
||||
)
|
||||
|
||||
yield self.store.persist_event(
|
||||
event,
|
||||
backfilled=False,
|
||||
is_new_state=is_new_state
|
||||
)
|
||||
|
||||
room_queue = self.room_queues[room_id]
|
||||
del self.room_queues[room_id]
|
||||
|
||||
for p in room_queue:
|
||||
p.outlier = True
|
||||
yield self.on_receive_pdu(p, backfilled=False)
|
||||
|
||||
defer.returnValue(True)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -264,13 +297,9 @@ class FederationHandler(BaseHandler):
|
|||
def on_send_join_request(self, origin, pdu):
|
||||
event = self.pdu_codec.event_from_pdu(pdu)
|
||||
|
||||
yield self.state_handler.annotate_state_groups(event)
|
||||
is_new_state= yield self.state_handler.annotate_state_groups(event)
|
||||
yield self.auth.check(event, None, raises=True)
|
||||
|
||||
is_new_state = yield self.state_handler.handle_new_state(
|
||||
pdu
|
||||
)
|
||||
|
||||
# FIXME (erikj): All this is duplicated above :(
|
||||
|
||||
yield self.store.persist_event(
|
||||
|
@ -303,7 +332,10 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
yield self.replication_layer.send_pdu(new_pdu)
|
||||
|
||||
defer.returnValue(event.state_events.values())
|
||||
defer.returnValue([
|
||||
self.pdu_codec.pdu_from_event(e)
|
||||
for e in event.state_events.values()
|
||||
])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_for_pdu(self, pdu_id, pdu_origin):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue