Add experimental support for sharding event persister. Again. (#8294)

This is *not* ready for production yet. Caveats:

1. We should write some tests...
2. The stream token that we use for events can get stalled at the minimum position of all writers. This means that new events may not be processed and e.g. sent down sync streams if a writer isn't writing or is slow.
This commit is contained in:
Erik Johnston 2020-09-14 10:16:41 +01:00 committed by GitHub
parent a9dbe98ef9
commit 04cc249b43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 211 additions and 80 deletions

1
changelog.d/8294.feature Normal file
View File

@ -0,0 +1 @@
Add experimental support for sharding event persister.

View File

@ -832,11 +832,26 @@ class ShardedWorkerHandlingConfig:
def should_handle(self, instance_name: str, key: str) -> bool: def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key. """Whether this instance is responsible for handling the given key.
""" """
# If multiple instances are not defined we always return true
# If multiple instances are not defined we always return true.
if not self.instances or len(self.instances) == 1: if not self.instances or len(self.instances) == 1:
return True return True
return self.get_instance(key) == instance_name
def get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key.
Note: For things like federation sending the config for which instance
is sending is known only to the sender instance if there is only one.
Therefore `should_handle` should be used where possible.
"""
if not self.instances:
return "master"
if len(self.instances) == 1:
return self.instances[0]
# We shard by taking the hash, modulo it by the number of instances and # We shard by taking the hash, modulo it by the number of instances and
# then checking whether this instance matches the instance at that # then checking whether this instance matches the instance at that
# index. # index.
@ -846,7 +861,7 @@ class ShardedWorkerHandlingConfig:
dest_hash = sha256(key.encode("utf8")).digest() dest_hash = sha256(key.encode("utf8")).digest()
dest_int = int.from_bytes(dest_hash, byteorder="little") dest_int = int.from_bytes(dest_hash, byteorder="little")
remainder = dest_int % (len(self.instances)) remainder = dest_int % (len(self.instances))
return self.instances[remainder] == instance_name return self.instances[remainder]
__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"] __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]

View File

@ -142,3 +142,4 @@ class ShardedWorkerHandlingConfig:
instances: List[str] instances: List[str]
def __init__(self, instances: List[str]) -> None: ... def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ... def should_handle(self, instance_name: str, key: str) -> bool: ...
def get_instance(self, key: str) -> str: ...

View File

@ -13,12 +13,24 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from typing import List, Union
import attr import attr
from ._base import Config, ConfigError, ShardedWorkerHandlingConfig from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from .server import ListenerConfig, parse_listener_def from .server import ListenerConfig, parse_listener_def
def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
option expecting a list of strings.
"""
if isinstance(obj, str):
return [obj]
return obj
@attr.s @attr.s
class InstanceLocationConfig: class InstanceLocationConfig:
"""The host and port to talk to an instance via HTTP replication. """The host and port to talk to an instance via HTTP replication.
@ -33,11 +45,13 @@ class WriterLocations:
"""Specifies the instances that write various streams. """Specifies the instances that write various streams.
Attributes: Attributes:
events: The instance that writes to the event and backfill streams. events: The instances that write to the event and backfill streams.
events: The instance that writes to the typing stream. typing: The instance that writes to the typing stream.
""" """
events = attr.ib(default="master", type=str) events = attr.ib(
default=["master"], type=List[str], converter=_instance_to_list_converter
)
typing = attr.ib(default="master", type=str) typing = attr.ib(default="master", type=str)
@ -105,15 +119,18 @@ class WorkerConfig(Config):
writers = config.get("stream_writers") or {} writers = config.get("stream_writers") or {}
self.writers = WriterLocations(**writers) self.writers = WriterLocations(**writers)
# Check that the configured writer for events and typing also appears in # Check that the configured writers for events and typing also appears in
# `instance_map`. # `instance_map`.
for stream in ("events", "typing"): for stream in ("events", "typing"):
instance = getattr(self.writers, stream) instances = _instance_to_list_converter(getattr(self.writers, stream))
if instance != "master" and instance not in self.instance_map: for instance in instances:
raise ConfigError( if instance != "master" and instance not in self.instance_map:
"Instance %r is configured to write %s but does not appear in `instance_map` config." raise ConfigError(
% (instance, stream) "Instance %r is configured to write %s but does not appear in `instance_map` config."
) % (instance, stream)
)
self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
def generate_config_section(self, config_dir_path, server_name, **kwargs): def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\ return """\

View File

@ -896,7 +896,8 @@ class FederationHandler(BaseHandler):
) )
) )
await self._handle_new_events(dest, ev_infos, backfilled=True) if ev_infos:
await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
# Step 2: Persist the rest of the events in the chunk one by one # Step 2: Persist the rest of the events in the chunk one by one
events.sort(key=lambda e: e.depth) events.sort(key=lambda e: e.depth)
@ -1189,7 +1190,7 @@ class FederationHandler(BaseHandler):
event_infos.append(_NewEventInfo(event, None, auth)) event_infos.append(_NewEventInfo(event, None, auth))
await self._handle_new_events( await self._handle_new_events(
destination, event_infos, destination, room_id, event_infos,
) )
def _sanity_check_event(self, ev): def _sanity_check_event(self, ev):
@ -1336,15 +1337,15 @@ class FederationHandler(BaseHandler):
) )
max_stream_id = await self._persist_auth_tree( max_stream_id = await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj origin, room_id, auth_chain, state, event, room_version_obj
) )
# We wait here until this instance has seen the events come down # We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches. # replication (if we're using replication) as the below uses caches.
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position( await self._replication.wait_for_stream_position(
self.config.worker.writers.events, "events", max_stream_id self.config.worker.events_shard_config.get_instance(room_id),
"events",
max_stream_id,
) )
# Check whether this room is the result of an upgrade of a room we already know # Check whether this room is the result of an upgrade of a room we already know
@ -1593,7 +1594,7 @@ class FederationHandler(BaseHandler):
) )
context = await self.state_handler.compute_event_context(event) context = await self.state_handler.compute_event_context(event)
await self.persist_events_and_notify([(event, context)]) await self.persist_events_and_notify(event.room_id, [(event, context)])
return event return event
@ -1620,7 +1621,9 @@ class FederationHandler(BaseHandler):
await self.federation_client.send_leave(host_list, event) await self.federation_client.send_leave(host_list, event)
context = await self.state_handler.compute_event_context(event) context = await self.state_handler.compute_event_context(event)
stream_id = await self.persist_events_and_notify([(event, context)]) stream_id = await self.persist_events_and_notify(
event.room_id, [(event, context)]
)
return event, stream_id return event, stream_id
@ -1868,7 +1871,7 @@ class FederationHandler(BaseHandler):
) )
await self.persist_events_and_notify( await self.persist_events_and_notify(
[(event, context)], backfilled=backfilled event.room_id, [(event, context)], backfilled=backfilled
) )
except Exception: except Exception:
run_in_background( run_in_background(
@ -1881,6 +1884,7 @@ class FederationHandler(BaseHandler):
async def _handle_new_events( async def _handle_new_events(
self, self,
origin: str, origin: str,
room_id: str,
event_infos: Iterable[_NewEventInfo], event_infos: Iterable[_NewEventInfo],
backfilled: bool = False, backfilled: bool = False,
) -> None: ) -> None:
@ -1912,6 +1916,7 @@ class FederationHandler(BaseHandler):
) )
await self.persist_events_and_notify( await self.persist_events_and_notify(
room_id,
[ [
(ev_info.event, context) (ev_info.event, context)
for ev_info, context in zip(event_infos, contexts) for ev_info, context in zip(event_infos, contexts)
@ -1922,6 +1927,7 @@ class FederationHandler(BaseHandler):
async def _persist_auth_tree( async def _persist_auth_tree(
self, self,
origin: str, origin: str,
room_id: str,
auth_events: List[EventBase], auth_events: List[EventBase],
state: List[EventBase], state: List[EventBase],
event: EventBase, event: EventBase,
@ -1936,6 +1942,7 @@ class FederationHandler(BaseHandler):
Args: Args:
origin: Where the events came from origin: Where the events came from
room_id,
auth_events auth_events
state state
event event
@ -2010,17 +2017,20 @@ class FederationHandler(BaseHandler):
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
await self.persist_events_and_notify( await self.persist_events_and_notify(
room_id,
[ [
(e, events_to_context[e.event_id]) (e, events_to_context[e.event_id])
for e in itertools.chain(auth_events, state) for e in itertools.chain(auth_events, state)
] ],
) )
new_event_context = await self.state_handler.compute_event_context( new_event_context = await self.state_handler.compute_event_context(
event, old_state=state event, old_state=state
) )
return await self.persist_events_and_notify([(event, new_event_context)]) return await self.persist_events_and_notify(
room_id, [(event, new_event_context)]
)
async def _prep_event( async def _prep_event(
self, self,
@ -2871,6 +2881,7 @@ class FederationHandler(BaseHandler):
async def persist_events_and_notify( async def persist_events_and_notify(
self, self,
room_id: str,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]], event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False, backfilled: bool = False,
) -> int: ) -> int:
@ -2878,14 +2889,19 @@ class FederationHandler(BaseHandler):
necessary. necessary.
Args: Args:
event_and_contexts: room_id: The room ID of events being persisted.
event_and_contexts: Sequence of events with their associated
context that should be persisted. All events must belong to
the same room.
backfilled: Whether these events are a result of backfilled: Whether these events are a result of
backfilling or not backfilling or not
""" """
if self.config.worker.writers.events != self._instance_name: instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name:
result = await self._send_events( result = await self._send_events(
instance_name=self.config.worker.writers.events, instance_name=instance,
store=self.store, store=self.store,
room_id=room_id,
event_and_contexts=event_and_contexts, event_and_contexts=event_and_contexts,
backfilled=backfilled, backfilled=backfilled,
) )

View File

@ -376,9 +376,8 @@ class EventCreationHandler:
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.config = hs.config self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases self.require_membership_for_aliases = hs.config.require_membership_for_aliases
self._is_event_writer = ( self._events_shard_config = self.config.worker.events_shard_config
self.config.worker.writers.events == hs.get_instance_name() self._instance_name = hs.get_instance_name()
)
self.room_invite_state_types = self.hs.config.room_invite_state_types self.room_invite_state_types = self.hs.config.room_invite_state_types
@ -902,9 +901,10 @@ class EventCreationHandler:
try: try:
# If we're a worker we need to hit out to the master. # If we're a worker we need to hit out to the master.
if not self._is_event_writer: writer_instance = self._events_shard_config.get_instance(event.room_id)
if writer_instance != self._instance_name:
result = await self.send_event( result = await self.send_event(
instance_name=self.config.worker.writers.events, instance_name=writer_instance,
event_id=event.event_id, event_id=event.event_id,
store=self.store, store=self.store,
requester=requester, requester=requester,
@ -972,8 +972,10 @@ class EventCreationHandler:
This should only be run on the instance in charge of persisting events. This should only be run on the instance in charge of persisting events.
""" """
assert self._is_event_writer
assert self.storage.persistence is not None assert self.storage.persistence is not None
assert self._events_shard_config.should_handle(
self._instance_name, event.room_id
)
if ratelimit: if ratelimit:
# We check if this is a room admin redacting an event so that we # We check if this is a room admin redacting an event so that we

View File

@ -804,7 +804,9 @@ class RoomCreationHandler(BaseHandler):
# Always wait for room creation to progate before returning # Always wait for room creation to progate before returning
await self._replication.wait_for_stream_position( await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", last_stream_id self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
last_stream_id,
) )
return result, last_stream_id return result, last_stream_id
@ -1259,10 +1261,10 @@ class RoomShutdownHandler:
# We now wait for the create room to come back in via replication so # We now wait for the create room to come back in via replication so
# that we can assume that all the joins/invites have propogated before # that we can assume that all the joins/invites have propogated before
# we try and auto join below. # we try and auto join below.
#
# TODO: Currently the events stream is written to from master
await self._replication.wait_for_stream_position( await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", stream_id self.hs.config.worker.events_shard_config.get_instance(new_room_id),
"events",
stream_id,
) )
else: else:
new_room_id = None new_room_id = None
@ -1292,7 +1294,9 @@ class RoomShutdownHandler:
# Wait for leave to come in over replication before trying to forget. # Wait for leave to come in over replication before trying to forget.
await self._replication.wait_for_stream_position( await self._replication.wait_for_stream_position(
self.hs.config.worker.writers.events, "events", stream_id self.hs.config.worker.events_shard_config.get_instance(room_id),
"events",
stream_id,
) )
await self.room_member_handler.forget(target_requester.user, room_id) await self.room_member_handler.forget(target_requester.user, room_id)

View File

@ -82,13 +82,6 @@ class RoomMemberHandler:
self._enable_lookup = hs.config.enable_3pid_lookup self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles self.allow_per_room_profiles = self.config.allow_per_room_profiles
self._event_stream_writer_instance = hs.config.worker.writers.events
self._is_on_event_persistence_instance = (
self._event_stream_writer_instance == hs.get_instance_name()
)
if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence
self._join_rate_limiter_local = Ratelimiter( self._join_rate_limiter_local = Ratelimiter(
clock=self.clock, clock=self.clock,
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second, rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,

View File

@ -65,10 +65,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
self.federation_handler = hs.get_handlers().federation_handler self.federation_handler = hs.get_handlers().federation_handler
@staticmethod @staticmethod
async def _serialize_payload(store, event_and_contexts, backfilled): async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
""" """
Args: Args:
store store
room_id (str)
event_and_contexts (list[tuple[FrozenEvent, EventContext]]) event_and_contexts (list[tuple[FrozenEvent, EventContext]])
backfilled (bool): Whether or not the events are the result of backfilled (bool): Whether or not the events are the result of
backfilling backfilling
@ -88,7 +89,11 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
} }
) )
payload = {"events": event_payloads, "backfilled": backfilled} payload = {
"events": event_payloads,
"backfilled": backfilled,
"room_id": room_id,
}
return payload return payload
@ -96,6 +101,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
with Measure(self.clock, "repl_fed_send_events_parse"): with Measure(self.clock, "repl_fed_send_events_parse"):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
room_id = content["room_id"]
backfilled = content["backfilled"] backfilled = content["backfilled"]
event_payloads = content["events"] event_payloads = content["events"]
@ -120,7 +126,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
logger.info("Got %d events from federation", len(event_and_contexts)) logger.info("Got %d events from federation", len(event_and_contexts))
max_stream_id = await self.federation_handler.persist_events_and_notify( max_stream_id = await self.federation_handler.persist_events_and_notify(
event_and_contexts, backfilled room_id, event_and_contexts, backfilled
) )
return 200, {"max_stream_id": max_stream_id} return 200, {"max_stream_id": max_stream_id}

View File

@ -109,7 +109,7 @@ class ReplicationCommandHandler:
if isinstance(stream, (EventsStream, BackfillStream)): if isinstance(stream, (EventsStream, BackfillStream)):
# Only add EventStream and BackfillStream as a source on the # Only add EventStream and BackfillStream as a source on the
# instance in charge of event persistence. # instance in charge of event persistence.
if hs.config.worker.writers.events == hs.get_instance_name(): if hs.get_instance_name() in hs.config.worker.writers.events:
self._streams_to_replicate.append(stream) self._streams_to_replicate.append(stream)
continue continue

View File

@ -19,7 +19,7 @@ from typing import List, Tuple, Type
import attr import attr
from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance from ._base import Stream, StreamUpdateResult, Token
"""Handling of the 'events' replication stream """Handling of the 'events' replication stream
@ -117,7 +117,7 @@ class EventsStream(Stream):
self._store = hs.get_datastore() self._store = hs.get_datastore()
super().__init__( super().__init__(
hs.get_instance_name(), hs.get_instance_name(),
current_token_without_instance(self._store.get_current_events_token), self._store._stream_id_gen.get_current_token_for_writer,
self._update_function, self._update_function,
) )

View File

@ -75,7 +75,7 @@ class Databases:
# If we're on a process that can persist events also # If we're on a process that can persist events also
# instantiate a `PersistEventsStore` # instantiate a `PersistEventsStore`
if hs.config.worker.writers.events == hs.get_instance_name(): if hs.get_instance_name() in hs.config.worker.writers.events:
persist_events = PersistEventsStore(hs, database, main) persist_events = PersistEventsStore(hs, database, main)
if "state" in database_config.databases: if "state" in database_config.databases:

View File

@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
""" """
if stream_ordering <= self.stream_ordering_month_ago: if stream_ordering <= self.stream_ordering_month_ago:
raise StoreError(400, "stream_ordering too old") raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
sql = """ sql = """
SELECT event_id FROM stream_ordering_to_exterm SELECT event_id FROM stream_ordering_to_exterm

View File

@ -32,7 +32,7 @@ from synapse.logging.utils import log_function
from synapse.storage._base import db_to_json, make_in_list_sql_clause from synapse.storage._base import db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.search import SearchEntry from synapse.storage.databases.main.search import SearchEntry
from synapse.storage.util.id_generators import StreamIdGenerator from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import StateMap, get_domain_from_id from synapse.types import StateMap, get_domain_from_id
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.iterutils import batch_iter from synapse.util.iterutils import batch_iter
@ -97,18 +97,21 @@ class PersistEventsStore:
self.store = main_data_store self.store = main_data_store
self.database_engine = db.engine self.database_engine = db.engine
self._clock = hs.get_clock() self._clock = hs.get_clock()
self._instance_name = hs.get_instance_name()
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
self.is_mine_id = hs.is_mine_id self.is_mine_id = hs.is_mine_id
# Ideally we'd move these ID gens here, unfortunately some other ID # Ideally we'd move these ID gens here, unfortunately some other ID
# generators are chained off them so doing so is a bit of a PITA. # generators are chained off them so doing so is a bit of a PITA.
self._backfill_id_gen = self.store._backfill_id_gen # type: StreamIdGenerator self._backfill_id_gen = (
self._stream_id_gen = self.store._stream_id_gen # type: StreamIdGenerator self.store._backfill_id_gen
) # type: MultiWriterIdGenerator
self._stream_id_gen = self.store._stream_id_gen # type: MultiWriterIdGenerator
# This should only exist on instances that are configured to write # This should only exist on instances that are configured to write
assert ( assert (
hs.config.worker.writers.events == hs.get_instance_name() hs.get_instance_name() in hs.config.worker.writers.events
), "Can only instantiate EventsStore on master" ), "Can only instantiate EventsStore on master"
async def _persist_events_and_state_updates( async def _persist_events_and_state_updates(
@ -809,6 +812,7 @@ class PersistEventsStore:
table="events", table="events",
values=[ values=[
{ {
"instance_name": self._instance_name,
"stream_ordering": event.internal_metadata.stream_ordering, "stream_ordering": event.internal_metadata.stream_ordering,
"topological_ordering": event.depth, "topological_ordering": event.depth,
"depth": event.depth, "depth": event.depth,

View File

@ -42,7 +42,8 @@ from synapse.replication.tcp.streams import BackfillStream
from synapse.replication.tcp.streams.events import EventsStream from synapse.replication.tcp.streams.events import EventsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool from synapse.storage.database import DatabasePool
from synapse.storage.util.id_generators import StreamIdGenerator from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
from synapse.types import Collection, get_domain_from_id from synapse.types import Collection, get_domain_from_id
from synapse.util.caches.descriptors import Cache, cached from synapse.util.caches.descriptors import Cache, cached
from synapse.util.iterutils import batch_iter from synapse.util.iterutils import batch_iter
@ -78,27 +79,54 @@ class EventsWorkerStore(SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs): def __init__(self, database: DatabasePool, db_conn, hs):
super(EventsWorkerStore, self).__init__(database, db_conn, hs) super(EventsWorkerStore, self).__init__(database, db_conn, hs)
if hs.config.worker.writers.events == hs.get_instance_name(): if isinstance(database.engine, PostgresEngine):
# We are the process in charge of generating stream ids for events, # If we're using Postgres than we can use `MultiWriterIdGenerator`
# so instantiate ID generators based on the database # regardless of whether this process writes to the streams or not.
self._stream_id_gen = StreamIdGenerator( self._stream_id_gen = MultiWriterIdGenerator(
db_conn, "events", "stream_ordering", db_conn=db_conn,
db=database,
instance_name=hs.get_instance_name(),
table="events",
instance_column="instance_name",
id_column="stream_ordering",
sequence_name="events_stream_seq",
) )
self._backfill_id_gen = StreamIdGenerator( self._backfill_id_gen = MultiWriterIdGenerator(
db_conn, db_conn=db_conn,
"events", db=database,
"stream_ordering", instance_name=hs.get_instance_name(),
step=-1, table="events",
extra_tables=[("ex_outlier_stream", "event_stream_ordering")], instance_column="instance_name",
id_column="stream_ordering",
sequence_name="events_backfill_stream_seq",
positive=False,
) )
else: else:
# Another process is in charge of persisting events and generating # We shouldn't be running in worker mode with SQLite, but its useful
# stream IDs: rely on the replication streams to let us know which # to support it for unit tests.
# IDs we can process. #
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering") # If this process is the writer than we need to use
self._backfill_id_gen = SlavedIdTracker( # `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
db_conn, "events", "stream_ordering", step=-1 # updated over replication. (Multiple writers are not supported for
) # SQLite).
if hs.get_instance_name() in hs.config.worker.writers.events:
self._stream_id_gen = StreamIdGenerator(
db_conn, "events", "stream_ordering",
)
self._backfill_id_gen = StreamIdGenerator(
db_conn,
"events",
"stream_ordering",
step=-1,
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
)
else:
self._stream_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering"
)
self._backfill_id_gen = SlavedIdTracker(
db_conn, "events", "stream_ordering", step=-1
)
self._get_event_cache = Cache( self._get_event_cache = Cache(
"*getEvent*", "*getEvent*",

View File

@ -0,0 +1,16 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE events ADD COLUMN instance_name TEXT;

View File

@ -0,0 +1,26 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
SELECT setval('events_stream_seq', (
SELECT COALESCE(MAX(stream_ordering), 1) FROM events
));
CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq;
SELECT setval('events_backfill_stream_seq', (
SELECT COALESCE(-MIN(stream_ordering), 1) FROM events
));

View File

@ -240,8 +240,12 @@ class MultiWriterIdGenerator:
# gaps should be relatively rare it's still worth doing the book keeping # gaps should be relatively rare it's still worth doing the book keeping
# that allows us to skip forwards when there are gapless runs of # that allows us to skip forwards when there are gapless runs of
# positions. # positions.
#
# We start at 1 here as a) the first generated stream ID will be 2, and
# b) other parts of the code assume that stream IDs are strictly greater
# than 0.
self._persisted_upto_position = ( self._persisted_upto_position = (
min(self._current_positions.values()) if self._current_positions else 0 min(self._current_positions.values()) if self._current_positions else 1
) )
self._known_persisted_positions = [] # type: List[int] self._known_persisted_positions = [] # type: List[int]
@ -398,9 +402,7 @@ class MultiWriterIdGenerator:
equal to it have been successfully persisted. equal to it have been successfully persisted.
""" """
# Currently we don't support this operation, as it's not obvious how to return self.get_persisted_upto_position()
# condense the stream positions of multiple writers into a single int.
raise NotImplementedError()
def get_current_token_for_writer(self, instance_name: str) -> int: def get_current_token_for_writer(self, instance_name: str) -> int:
"""Returns the position of the given writer. """Returns the position of the given writer.