Calculate push actions on worker

This commit is contained in:
Erik Johnston 2018-02-15 16:30:10 +00:00
parent 493e25d554
commit 28e973ac11
3 changed files with 69 additions and 25 deletions

View File

@ -27,10 +27,14 @@ from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler from synapse.replication.tcp.client import ReplicationClientHandler
@ -48,6 +52,10 @@ logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore( class EventCreatorSlavedStore(
SlavedAccountDataStore,
SlavedPusherStore,
SlavedReceiptsStore,
SlavedPushRuleStore,
SlavedDeviceStore, SlavedDeviceStore,
SlavedClientIpStore, SlavedClientIpStore,
SlavedApplicationServiceStore, SlavedApplicationServiceStore,

View File

@ -553,21 +553,67 @@ class EventCreationHandler(object):
event, event,
context, context,
ratelimit=True, ratelimit=True,
extra_users=[] extra_users=[],
): ):
# We now need to go and hit out to wherever we need to hit out to. """Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.
# If we're a worker we need to hit out to the master. If called from a worker will hit out to the master process for final
if self.config.worker_app: processing.
yield send_event_to_master(
self.http_client, Args:
host=self.config.worker_replication_host, requester (Requester)
port=self.config.worker_replication_http_port, event (FrozenEvent)
requester=requester, context (EventContext)
event=event, ratelimit (bool)
context=context, extra_users (list(str)): Any extra users to notify about event
"""
yield self.action_generator.handle_push_actions_for_event(
event, context
)
try:
# We now need to go and hit out to wherever we need to hit out to.
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
)
return
yield self.persist_and_notify_client_event(
requester,
event,
context,
ratelimit=ratelimit,
extra_users=extra_users,
) )
return except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions
# staging area, if we calculated them.
preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
raise
@defer.inlineCallbacks
def persist_and_notify_client_event(
self,
requester,
event,
context,
ratelimit=True,
extra_users=[],
):
"""Called when we have fully built and authed the event. This should
only be run on master.
"""
assert not self.config.worker_app
if ratelimit: if ratelimit:
yield self.base_handler.ratelimit(requester) yield self.base_handler.ratelimit(requester)
@ -679,20 +725,10 @@ class EventCreationHandler(object):
"Changing the room create event is forbidden", "Changing the room create event is forbidden",
) )
yield self.action_generator.handle_push_actions_for_event( (event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context event, context=context
) )
try:
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
)
except: # noqa: E722, as we reraise the exception this is fine.
# Ensure that we actually remove the entries in the push actions
# staging area
preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
raise
# this intentionally does not yield: we don't care about the result # this intentionally does not yield: we don't care about the result
# and don't need to wait for it. # and don't need to wait for it.
preserve_fn(self.pusher_pool.on_new_notifications)( preserve_fn(self.pusher_pool.on_new_notifications)(

View File

@ -106,7 +106,7 @@ class ReplicationSendEventRestServlet(RestServlet):
event.event_id, event.room_id, event.event_id, event.room_id,
) )
yield self.event_creation_handler.handle_new_client_event( yield self.event_creation_handler.persist_and_notify_client_event(
requester, event, context, requester, event, context,
) )