diff --git a/changelog.d/8170.feature b/changelog.d/8170.feature deleted file mode 100644 index b363e929e..000000000 --- a/changelog.d/8170.feature +++ /dev/null @@ -1 +0,0 @@ -Add experimental support for sharding event persister. diff --git a/changelog.d/8242.feature b/changelog.d/8242.feature new file mode 100644 index 000000000..f6891e360 --- /dev/null +++ b/changelog.d/8242.feature @@ -0,0 +1 @@ +Back out experimental support for sharding event persister. **PLEASE REMOVE THIS LINE FROM THE FINAL CHANGELOG** diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 73f0717b0..141748742 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -832,26 +832,11 @@ class ShardedWorkerHandlingConfig: def should_handle(self, instance_name: str, key: str) -> bool: """Whether this instance is responsible for handling the given key. """ - # If multiple instances are not defined we always return true + + # If multiple instances are not defined we always return true. if not self.instances or len(self.instances) == 1: return True - return self.get_instance(key) == instance_name - - def get_instance(self, key: str) -> str: - """Get the instance responsible for handling the given key. - - Note: For things like federation sending the config for which instance - is sending is known only to the sender instance if there is only one. - Therefore `should_handle` should be used where possible. - """ - - if not self.instances: - return "master" - - if len(self.instances) == 1: - return self.instances[0] - # We shard by taking the hash, modulo it by the number of instances and # then checking whether this instance matches the instance at that # index. @@ -861,7 +846,7 @@ class ShardedWorkerHandlingConfig: dest_hash = sha256(key.encode("utf8")).digest() dest_int = int.from_bytes(dest_hash, byteorder="little") remainder = dest_int % (len(self.instances)) - return self.instances[remainder] + return self.instances[remainder] == instance_name __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"] diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index b8faafa9b..eb911e8f9 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -142,4 +142,3 @@ class ShardedWorkerHandlingConfig: instances: List[str] def __init__(self, instances: List[str]) -> None: ... def should_handle(self, instance_name: str, key: str) -> bool: ... - def get_instance(self, key: str) -> str: ... diff --git a/synapse/config/workers.py b/synapse/config/workers.py index f23e42cdf..c784a7150 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -13,24 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Union - import attr from ._base import Config, ConfigError, ShardedWorkerHandlingConfig from .server import ListenerConfig, parse_listener_def -def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: - """Helper for allowing parsing a string or list of strings to a config - option expecting a list of strings. - """ - - if isinstance(obj, str): - return [obj] - return obj - - @attr.s class InstanceLocationConfig: """The host and port to talk to an instance via HTTP replication. @@ -45,13 +33,11 @@ class WriterLocations: """Specifies the instances that write various streams. Attributes: - events: The instances that write to the event and backfill streams. - typing: The instance that writes to the typing stream. + events: The instance that writes to the event and backfill streams. + events: The instance that writes to the typing stream. """ - events = attr.ib( - default=["master"], type=List[str], converter=_instance_to_list_converter - ) + events = attr.ib(default="master", type=str) typing = attr.ib(default="master", type=str) @@ -119,18 +105,15 @@ class WorkerConfig(Config): writers = config.get("stream_writers") or {} self.writers = WriterLocations(**writers) - # Check that the configured writers for events and typing also appears in + # Check that the configured writer for events and typing also appears in # `instance_map`. for stream in ("events", "typing"): - instances = _instance_to_list_converter(getattr(self.writers, stream)) - for instance in instances: - if instance != "master" and instance not in self.instance_map: - raise ConfigError( - "Instance %r is configured to write %s but does not appear in `instance_map` config." - % (instance, stream) - ) - - self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) + instance = getattr(self.writers, stream) + if instance != "master" and instance not in self.instance_map: + raise ConfigError( + "Instance %r is configured to write %s but does not appear in `instance_map` config." + % (instance, stream) + ) def generate_config_section(self, config_dir_path, server_name, **kwargs): return """\ diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 310c7f713..43f2986f8 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -923,8 +923,7 @@ class FederationHandler(BaseHandler): ) ) - if ev_infos: - await self._handle_new_events(dest, room_id, ev_infos, backfilled=True) + await self._handle_new_events(dest, ev_infos, backfilled=True) # Step 2: Persist the rest of the events in the chunk one by one events.sort(key=lambda e: e.depth) @@ -1217,7 +1216,7 @@ class FederationHandler(BaseHandler): event_infos.append(_NewEventInfo(event, None, auth)) await self._handle_new_events( - destination, room_id, event_infos, + destination, event_infos, ) def _sanity_check_event(self, ev): @@ -1364,15 +1363,15 @@ class FederationHandler(BaseHandler): ) max_stream_id = await self._persist_auth_tree( - origin, room_id, auth_chain, state, event, room_version_obj + origin, auth_chain, state, event, room_version_obj ) # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. + # + # TODO: Currently the events stream is written to from master await self._replication.wait_for_stream_position( - self.config.worker.events_shard_config.get_instance(room_id), - "events", - max_stream_id, + self.config.worker.writers.events, "events", max_stream_id ) # Check whether this room is the result of an upgrade of a room we already know @@ -1626,7 +1625,7 @@ class FederationHandler(BaseHandler): ) context = await self.state_handler.compute_event_context(event) - await self.persist_events_and_notify(event.room_id, [(event, context)]) + await self.persist_events_and_notify([(event, context)]) return event @@ -1653,9 +1652,7 @@ class FederationHandler(BaseHandler): await self.federation_client.send_leave(host_list, event) context = await self.state_handler.compute_event_context(event) - stream_id = await self.persist_events_and_notify( - event.room_id, [(event, context)] - ) + stream_id = await self.persist_events_and_notify([(event, context)]) return event, stream_id @@ -1903,7 +1900,7 @@ class FederationHandler(BaseHandler): ) await self.persist_events_and_notify( - event.room_id, [(event, context)], backfilled=backfilled + [(event, context)], backfilled=backfilled ) except Exception: run_in_background( @@ -1916,7 +1913,6 @@ class FederationHandler(BaseHandler): async def _handle_new_events( self, origin: str, - room_id: str, event_infos: Iterable[_NewEventInfo], backfilled: bool = False, ) -> None: @@ -1948,7 +1944,6 @@ class FederationHandler(BaseHandler): ) await self.persist_events_and_notify( - room_id, [ (ev_info.event, context) for ev_info, context in zip(event_infos, contexts) @@ -1959,7 +1954,6 @@ class FederationHandler(BaseHandler): async def _persist_auth_tree( self, origin: str, - room_id: str, auth_events: List[EventBase], state: List[EventBase], event: EventBase, @@ -1974,7 +1968,6 @@ class FederationHandler(BaseHandler): Args: origin: Where the events came from - room_id, auth_events state event @@ -2049,20 +2042,17 @@ class FederationHandler(BaseHandler): events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR await self.persist_events_and_notify( - room_id, [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) - ], + ] ) new_event_context = await self.state_handler.compute_event_context( event, old_state=state ) - return await self.persist_events_and_notify( - room_id, [(event, new_event_context)] - ) + return await self.persist_events_and_notify([(event, new_event_context)]) async def _prep_event( self, @@ -2913,7 +2903,6 @@ class FederationHandler(BaseHandler): async def persist_events_and_notify( self, - room_id: str, event_and_contexts: Sequence[Tuple[EventBase, EventContext]], backfilled: bool = False, ) -> int: @@ -2921,19 +2910,14 @@ class FederationHandler(BaseHandler): necessary. Args: - room_id: The room ID of events being persisted. - event_and_contexts: Sequence of events with their associated - context that should be persisted. All events must belong to - the same room. + event_and_contexts: backfilled: Whether these events are a result of backfilling or not """ - instance = self.config.worker.events_shard_config.get_instance(room_id) - if instance != self._instance_name: + if self.config.worker.writers.events != self._instance_name: result = await self._send_events( - instance_name=instance, + instance_name=self.config.worker.writers.events, store=self.store, - room_id=room_id, event_and_contexts=event_and_contexts, backfilled=backfilled, ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 6398774c0..72bb63816 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -376,8 +376,9 @@ class EventCreationHandler(object): self.notifier = hs.get_notifier() self.config = hs.config self.require_membership_for_aliases = hs.config.require_membership_for_aliases - self._events_shard_config = self.config.worker.events_shard_config - self._instance_name = hs.get_instance_name() + self._is_event_writer = ( + self.config.worker.writers.events == hs.get_instance_name() + ) self.room_invite_state_types = self.hs.config.room_invite_state_types @@ -903,10 +904,9 @@ class EventCreationHandler(object): try: # If we're a worker we need to hit out to the master. - writer_instance = self._events_shard_config.get_instance(event.room_id) - if writer_instance != self._instance_name: + if not self._is_event_writer: result = await self.send_event( - instance_name=writer_instance, + instance_name=self.config.worker.writers.events, event_id=event.event_id, store=self.store, requester=requester, @@ -974,9 +974,7 @@ class EventCreationHandler(object): This should only be run on the instance in charge of persisting events. """ - assert self._events_shard_config.should_handle( - self._instance_name, event.room_id - ) + assert self._is_event_writer if ratelimit: # We check if this is a room admin redacting an event so that we diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 55794c305..9d5b1828d 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -804,9 +804,7 @@ class RoomCreationHandler(BaseHandler): # Always wait for room creation to progate before returning await self._replication.wait_for_stream_position( - self.hs.config.worker.events_shard_config.get_instance(room_id), - "events", - last_stream_id, + self.hs.config.worker.writers.events, "events", last_stream_id ) return result, last_stream_id @@ -1262,10 +1260,10 @@ class RoomShutdownHandler(object): # We now wait for the create room to come back in via replication so # that we can assume that all the joins/invites have propogated before # we try and auto join below. + # + # TODO: Currently the events stream is written to from master await self._replication.wait_for_stream_position( - self.hs.config.worker.events_shard_config.get_instance(new_room_id), - "events", - stream_id, + self.hs.config.worker.writers.events, "events", stream_id ) else: new_room_id = None @@ -1295,9 +1293,7 @@ class RoomShutdownHandler(object): # Wait for leave to come in over replication before trying to forget. await self._replication.wait_for_stream_position( - self.hs.config.worker.events_shard_config.get_instance(room_id), - "events", - stream_id, + self.hs.config.worker.writers.events, "events", stream_id ) await self.room_member_handler.forget(target_requester.user, room_id) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index e5cb3c2d5..a7962b0ad 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -82,6 +82,13 @@ class RoomMemberHandler(object): self._enable_lookup = hs.config.enable_3pid_lookup self.allow_per_room_profiles = self.config.allow_per_room_profiles + self._event_stream_writer_instance = hs.config.worker.writers.events + self._is_on_event_persistence_instance = ( + self._event_stream_writer_instance == hs.get_instance_name() + ) + if self._is_on_event_persistence_instance: + self.persist_event_storage = hs.get_storage().persistence + self._join_rate_limiter_local = Ratelimiter( clock=self.clock, rate_hz=hs.config.ratelimiting.rc_joins_local.per_second, diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 5c8be747e..6b5631514 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -65,11 +65,10 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.federation_handler = hs.get_handlers().federation_handler @staticmethod - async def _serialize_payload(store, room_id, event_and_contexts, backfilled): + async def _serialize_payload(store, event_and_contexts, backfilled): """ Args: store - room_id (str) event_and_contexts (list[tuple[FrozenEvent, EventContext]]) backfilled (bool): Whether or not the events are the result of backfilling @@ -89,11 +88,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): } ) - payload = { - "events": event_payloads, - "backfilled": backfilled, - "room_id": room_id, - } + payload = {"events": event_payloads, "backfilled": backfilled} return payload @@ -101,7 +96,6 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): with Measure(self.clock, "repl_fed_send_events_parse"): content = parse_json_object_from_request(request) - room_id = content["room_id"] backfilled = content["backfilled"] event_payloads = content["events"] @@ -126,7 +120,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): logger.info("Got %d events from federation", len(event_and_contexts)) max_stream_id = await self.federation_handler.persist_events_and_notify( - room_id, event_and_contexts, backfilled + event_and_contexts, backfilled ) return 200, {"max_stream_id": max_stream_id} diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index b323841f7..1c303f3a4 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -109,7 +109,7 @@ class ReplicationCommandHandler: if isinstance(stream, (EventsStream, BackfillStream)): # Only add EventStream and BackfillStream as a source on the # instance in charge of event persistence. - if hs.get_instance_name() in hs.config.worker.writers.events: + if hs.config.worker.writers.events == hs.get_instance_name(): self._streams_to_replicate.append(stream) continue diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 3705618b4..16c63ff4e 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -19,7 +19,7 @@ from typing import List, Tuple, Type import attr -from ._base import Stream, StreamUpdateResult, Token +from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance """Handling of the 'events' replication stream @@ -117,7 +117,7 @@ class EventsStream(Stream): self._store = hs.get_datastore() super().__init__( hs.get_instance_name(), - self._store._stream_id_gen.get_current_token_for_writer, + current_token_without_instance(self._store.get_current_events_token), self._update_function, ) diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index c73d54fb6..0ac854aee 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -68,7 +68,7 @@ class Databases(object): # If we're on a process that can persist events also # instantiate a `PersistEventsStore` - if hs.get_instance_name() in hs.config.worker.writers.events: + if hs.config.worker.writers.events == hs.get_instance_name(): persist_events = PersistEventsStore(hs, database, main) if "state" in database_config.databases: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 4c3c162ac..0b69aa6a9 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas """ if stream_ordering <= self.stream_ordering_month_ago: - raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,)) + raise StoreError(400, "stream_ordering too old") sql = """ SELECT event_id FROM stream_ordering_to_exterm diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b94fe7ac1..b3d27a2ee 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -97,7 +97,6 @@ class PersistEventsStore: self.store = main_data_store self.database_engine = db.engine self._clock = hs.get_clock() - self._instance_name = hs.get_instance_name() self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages self.is_mine_id = hs.is_mine_id @@ -109,7 +108,7 @@ class PersistEventsStore: # This should only exist on instances that are configured to write assert ( - hs.get_instance_name() in hs.config.worker.writers.events + hs.config.worker.writers.events == hs.get_instance_name() ), "Can only instantiate EventsStore on master" async def _persist_events_and_state_updates( @@ -801,7 +800,6 @@ class PersistEventsStore: table="events", values=[ { - "instance_name": self._instance_name, "stream_ordering": event.internal_metadata.stream_ordering, "topological_ordering": event.depth, "depth": event.depth, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 17f5997b8..a7a73cc3d 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -42,8 +42,7 @@ from synapse.replication.tcp.streams import BackfillStream from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import DatabasePool -from synapse.storage.engines import PostgresEngine -from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator +from synapse.storage.util.id_generators import StreamIdGenerator from synapse.types import Collection, get_domain_from_id from synapse.util.caches.descriptors import Cache, cached from synapse.util.iterutils import batch_iter @@ -79,54 +78,27 @@ class EventsWorkerStore(SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super(EventsWorkerStore, self).__init__(database, db_conn, hs) - if isinstance(database.engine, PostgresEngine): - # If we're using Postgres than we can use `MultiWriterIdGenerator` - # regardless of whether this process writes to the streams or not. - self._stream_id_gen = MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - instance_name=hs.get_instance_name(), - table="events", - instance_column="instance_name", - id_column="stream_ordering", - sequence_name="events_stream_seq", + if hs.config.worker.writers.events == hs.get_instance_name(): + # We are the process in charge of generating stream ids for events, + # so instantiate ID generators based on the database + self._stream_id_gen = StreamIdGenerator( + db_conn, "events", "stream_ordering", ) - self._backfill_id_gen = MultiWriterIdGenerator( - db_conn=db_conn, - db=database, - instance_name=hs.get_instance_name(), - table="events", - instance_column="instance_name", - id_column="stream_ordering", - sequence_name="events_backfill_stream_seq", - positive=False, + self._backfill_id_gen = StreamIdGenerator( + db_conn, + "events", + "stream_ordering", + step=-1, + extra_tables=[("ex_outlier_stream", "event_stream_ordering")], ) else: - # We shouldn't be running in worker mode with SQLite, but its useful - # to support it for unit tests. - # - # If this process is the writer than we need to use - # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets - # updated over replication. (Multiple writers are not supported for - # SQLite). - if hs.get_instance_name() in hs.config.worker.writers.events: - self._stream_id_gen = StreamIdGenerator( - db_conn, "events", "stream_ordering", - ) - self._backfill_id_gen = StreamIdGenerator( - db_conn, - "events", - "stream_ordering", - step=-1, - extra_tables=[("ex_outlier_stream", "event_stream_ordering")], - ) - else: - self._stream_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering" - ) - self._backfill_id_gen = SlavedIdTracker( - db_conn, "events", "stream_ordering", step=-1 - ) + # Another process is in charge of persisting events and generating + # stream IDs: rely on the replication streams to let us know which + # IDs we can process. + self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") + self._backfill_id_gen = SlavedIdTracker( + db_conn, "events", "stream_ordering", step=-1 + ) self._get_event_cache = Cache( "*getEvent*", diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql deleted file mode 100644 index 98ff76d70..000000000 --- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql +++ /dev/null @@ -1,16 +0,0 @@ -/* Copyright 2020 The Matrix.org Foundation C.I.C. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -ALTER TABLE events ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres b/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres deleted file mode 100644 index 97c1e6a0c..000000000 --- a/synapse/storage/databases/main/schema/delta/58/14events_instance_name.sql.postgres +++ /dev/null @@ -1,26 +0,0 @@ -/* Copyright 2020 The Matrix.org Foundation C.I.C. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -CREATE SEQUENCE IF NOT EXISTS events_stream_seq; - -SELECT setval('events_stream_seq', ( - SELECT COALESCE(MAX(stream_ordering), 1) FROM events -)); - -CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq; - -SELECT setval('events_backfill_stream_seq', ( - SELECT COALESCE(-MIN(stream_ordering), 1) FROM events -)); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 8fd21c2bf..9f3d23f0a 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -231,12 +231,8 @@ class MultiWriterIdGenerator: # gaps should be relatively rare it's still worth doing the book keeping # that allows us to skip forwards when there are gapless runs of # positions. - # - # We start at 1 here as a) the first generated stream ID will be 2, and - # b) other parts of the code assume that stream IDs are strictly greater - # than 0. self._persisted_upto_position = ( - min(self._current_positions.values()) if self._current_positions else 1 + min(self._current_positions.values()) if self._current_positions else 0 ) self._known_persisted_positions = [] # type: List[int] @@ -366,7 +362,9 @@ class MultiWriterIdGenerator: equal to it have been successfully persisted. """ - return self.get_persisted_upto_position() + # Currently we don't support this operation, as it's not obvious how to + # condense the stream positions of multiple writers into a single int. + raise NotImplementedError() def get_current_token_for_writer(self, instance_name: str) -> int: """Returns the position of the given writer.