Remove explicit calls to send_pdu

This commit is contained in:
Erik Johnston 2016-11-21 14:48:51 +00:00
parent 524d61bf7e
commit 9687e039e7
3 changed files with 9 additions and 61 deletions

View File

@ -158,10 +158,6 @@ class FederationRemoteSendQueue(object):
self.failures[pos] = (destination, str(failure))
def send_pdu(self, pdu, destinations):
# This gets sent down a separate path
pass
def send_device_messages(self, destination):
pos = self._next_pos()
self.device_messages[pos] = destination

View File

@ -19,6 +19,7 @@ from twisted.internet import defer
from .persistence import TransactionActions
from .units import Transaction, Edu
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn
@ -153,13 +154,17 @@ class TransactionQueue(object):
event.room_id, latest_event_ids=[event.event_id],
)
destinations = [
destinations = set(
get_domain_from_id(user_id) for user_id in users_in_room
]
)
if event.type == EventTypes.Member:
if event.content["membership"] == Membership.JOIN:
destinations.add(get_domain_from_id(event.state_key))
logger.debug("Sending %s to %r", event, destinations)
self.send_pdu(event, destinations)
self._send_pdu(event, destinations)
yield self.store.update_federation_out_pos(
"events", next_token
@ -168,7 +173,7 @@ class TransactionQueue(object):
finally:
self._is_processing = False
def send_pdu(self, pdu, destinations):
def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
# a transaction in progress. If we do, stick it in the pending_pdus
# table and we'll get back to it later.

View File

@ -81,22 +81,6 @@ class FederationHandler(BaseHandler):
# When joining a room we need to queue any events for that room up
self.room_queues = {}
def handle_new_event(self, event, destinations):
""" Takes in an event from the client to server side, that has already
been authed and handled by the state module, and sends it to any
remote home servers that may be interested.
Args:
event: The event to send
destinations: A list of destinations to send it to
Returns:
Deferred: Resolved when it has successfully been queued for
processing.
"""
return self.federation_sender.send_pdu(event, destinations)
@log_function
@defer.inlineCallbacks
def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
@ -831,25 +815,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
new_pdu = event
users_in_room = yield self.store.get_joined_users_from_context(event, context)
destinations = set(
get_domain_from_id(user_id) for user_id in users_in_room
if not self.hs.is_mine_id(user_id)
)
destinations.discard(origin)
logger.debug(
"on_send_join_request: Sending event: %s, signatures: %s",
event.event_id,
event.signatures,
)
self.federation_sender.send_pdu(new_pdu, destinations)
state_ids = context.prev_state_ids.values()
auth_chain = yield self.store.get_auth_chain(set(
[event.event_id] + state_ids
@ -1056,24 +1021,6 @@ class FederationHandler(BaseHandler):
event, event_stream_id, max_stream_id, extra_users=extra_users
)
new_pdu = event
users_in_room = yield self.store.get_joined_users_from_context(event, context)
destinations = set(
get_domain_from_id(user_id) for user_id in users_in_room
if not self.hs.is_mine_id(user_id)
)
destinations.discard(origin)
logger.debug(
"on_send_leave_request: Sending event: %s, signatures: %s",
event.event_id,
event.signatures,
)
self.federation_sender.send_pdu(new_pdu, destinations)
defer.returnValue(None)
@defer.inlineCallbacks