Allow censoring of events to happen on workers. (#7492)

This is safe as we can now write to cache invalidation stream on workers, and is required for when we move event persistence off master.
This commit is contained in:
Erik Johnston 2020-05-13 17:15:40 +01:00 committed by GitHub
parent 46cb2550bb
commit 1124111a12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 4 additions and 8 deletions

1
changelog.d/7492.misc Normal file
View File

@ -0,0 +1 @@
Allow censoring of events to happen on workers.

View File

@ -122,6 +122,7 @@ from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet from synapse.rest.client.versions import VersionsRestServlet
from synapse.rest.key.v2 import KeyApiV2Resource from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.data_stores.main.censor_events import CensorEventsStore
from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore from synapse.storage.data_stores.main.media_repository import MediaRepositoryStore
from synapse.storage.data_stores.main.monthly_active_users import ( from synapse.storage.data_stores.main.monthly_active_users import (
MonthlyActiveUsersWorkerStore, MonthlyActiveUsersWorkerStore,
@ -442,6 +443,7 @@ class GenericWorkerSlavedStore(
SlavedGroupServerStore, SlavedGroupServerStore,
SlavedAccountDataStore, SlavedAccountDataStore,
SlavedPusherStore, SlavedPusherStore,
CensorEventsStore,
SlavedEventStore, SlavedEventStore,
SlavedKeyStore, SlavedKeyStore,
RoomStore, RoomStore,

View File

@ -72,7 +72,6 @@ class MessageHandler(object):
self.state_store = self.storage.state self.state_store = self.storage.state
self._event_serializer = hs.get_event_client_serializer() self._event_serializer = hs.get_event_client_serializer()
self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
self._is_worker_app = bool(hs.config.worker_app)
# The scheduled call to self._expire_event. None if no call is currently # The scheduled call to self._expire_event. None if no call is currently
# scheduled. # scheduled.
@ -260,7 +259,6 @@ class MessageHandler(object):
Args: Args:
event (EventBase): The event to schedule the expiry of. event (EventBase): The event to schedule the expiry of.
""" """
assert not self._is_worker_app
expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER) expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
if not isinstance(expiry_ts, int) or event.is_state(): if not isinstance(expiry_ts, int) or event.is_state():

View File

@ -33,15 +33,10 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class CensorEventsStore(CacheInvalidationWorkerStore, EventsWorkerStore, SQLBaseStore): class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBaseStore):
def __init__(self, database: Database, db_conn, hs: "HomeServer"): def __init__(self, database: Database, db_conn, hs: "HomeServer"):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
# This should only exist on master for now
assert (
hs.config.worker.worker_app is None
), "Can only instantiate CensorEventsStore on master"
def _censor_redactions(): def _censor_redactions():
return run_as_background_process( return run_as_background_process(
"_censor_redactions", self._censor_redactions "_censor_redactions", self._censor_redactions