mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-22 09:31:03 -05:00
* Revert "Add experimental support for sharding event persister. (#8170)"
This reverts commit 82c1ee1c22
.
* Changelog
This commit is contained in:
parent
be16ee59a8
commit
9f8abdcc38
@ -1 +0,0 @@
|
|||||||
Add experimental support for sharding event persister.
|
|
1
changelog.d/8242.feature
Normal file
1
changelog.d/8242.feature
Normal file
@ -0,0 +1 @@
|
|||||||
|
Back out experimental support for sharding event persister. **PLEASE REMOVE THIS LINE FROM THE FINAL CHANGELOG**
|
@ -832,26 +832,11 @@ class ShardedWorkerHandlingConfig:
|
|||||||
def should_handle(self, instance_name: str, key: str) -> bool:
|
def should_handle(self, instance_name: str, key: str) -> bool:
|
||||||
"""Whether this instance is responsible for handling the given key.
|
"""Whether this instance is responsible for handling the given key.
|
||||||
"""
|
"""
|
||||||
# If multiple instances are not defined we always return true
|
|
||||||
|
# If multiple instances are not defined we always return true.
|
||||||
if not self.instances or len(self.instances) == 1:
|
if not self.instances or len(self.instances) == 1:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return self.get_instance(key) == instance_name
|
|
||||||
|
|
||||||
def get_instance(self, key: str) -> str:
|
|
||||||
"""Get the instance responsible for handling the given key.
|
|
||||||
|
|
||||||
Note: For things like federation sending the config for which instance
|
|
||||||
is sending is known only to the sender instance if there is only one.
|
|
||||||
Therefore `should_handle` should be used where possible.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if not self.instances:
|
|
||||||
return "master"
|
|
||||||
|
|
||||||
if len(self.instances) == 1:
|
|
||||||
return self.instances[0]
|
|
||||||
|
|
||||||
# We shard by taking the hash, modulo it by the number of instances and
|
# We shard by taking the hash, modulo it by the number of instances and
|
||||||
# then checking whether this instance matches the instance at that
|
# then checking whether this instance matches the instance at that
|
||||||
# index.
|
# index.
|
||||||
@ -861,7 +846,7 @@ class ShardedWorkerHandlingConfig:
|
|||||||
dest_hash = sha256(key.encode("utf8")).digest()
|
dest_hash = sha256(key.encode("utf8")).digest()
|
||||||
dest_int = int.from_bytes(dest_hash, byteorder="little")
|
dest_int = int.from_bytes(dest_hash, byteorder="little")
|
||||||
remainder = dest_int % (len(self.instances))
|
remainder = dest_int % (len(self.instances))
|
||||||
return self.instances[remainder]
|
return self.instances[remainder] == instance_name
|
||||||
|
|
||||||
|
|
||||||
__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
|
__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
|
||||||
|
@ -142,4 +142,3 @@ class ShardedWorkerHandlingConfig:
|
|||||||
instances: List[str]
|
instances: List[str]
|
||||||
def __init__(self, instances: List[str]) -> None: ...
|
def __init__(self, instances: List[str]) -> None: ...
|
||||||
def should_handle(self, instance_name: str, key: str) -> bool: ...
|
def should_handle(self, instance_name: str, key: str) -> bool: ...
|
||||||
def get_instance(self, key: str) -> str: ...
|
|
||||||
|
@ -13,24 +13,12 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from typing import List, Union
|
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
|
from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
|
||||||
from .server import ListenerConfig, parse_listener_def
|
from .server import ListenerConfig, parse_listener_def
|
||||||
|
|
||||||
|
|
||||||
def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
|
|
||||||
"""Helper for allowing parsing a string or list of strings to a config
|
|
||||||
option expecting a list of strings.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if isinstance(obj, str):
|
|
||||||
return [obj]
|
|
||||||
return obj
|
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
class InstanceLocationConfig:
|
class InstanceLocationConfig:
|
||||||
"""The host and port to talk to an instance via HTTP replication.
|
"""The host and port to talk to an instance via HTTP replication.
|
||||||
@ -45,13 +33,11 @@ class WriterLocations:
|
|||||||
"""Specifies the instances that write various streams.
|
"""Specifies the instances that write various streams.
|
||||||
|
|
||||||
Attributes:
|
Attributes:
|
||||||
events: The instances that write to the event and backfill streams.
|
events: The instance that writes to the event and backfill streams.
|
||||||
typing: The instance that writes to the typing stream.
|
events: The instance that writes to the typing stream.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
events = attr.ib(
|
events = attr.ib(default="master", type=str)
|
||||||
default=["master"], type=List[str], converter=_instance_to_list_converter
|
|
||||||
)
|
|
||||||
typing = attr.ib(default="master", type=str)
|
typing = attr.ib(default="master", type=str)
|
||||||
|
|
||||||
|
|
||||||
@ -119,19 +105,16 @@ class WorkerConfig(Config):
|
|||||||
writers = config.get("stream_writers") or {}
|
writers = config.get("stream_writers") or {}
|
||||||
self.writers = WriterLocations(**writers)
|
self.writers = WriterLocations(**writers)
|
||||||
|
|
||||||
# Check that the configured writers for events and typing also appears in
|
# Check that the configured writer for events and typing also appears in
|
||||||
# `instance_map`.
|
# `instance_map`.
|
||||||
for stream in ("events", "typing"):
|
for stream in ("events", "typing"):
|
||||||
instances = _instance_to_list_converter(getattr(self.writers, stream))
|
instance = getattr(self.writers, stream)
|
||||||
for instance in instances:
|
|
||||||
if instance != "master" and instance not in self.instance_map:
|
if instance != "master" and instance not in self.instance_map:
|
||||||
raise ConfigError(
|
raise ConfigError(
|
||||||
"Instance %r is configured to write %s but does not appear in `instance_map` config."
|
"Instance %r is configured to write %s but does not appear in `instance_map` config."
|
||||||
% (instance, stream)
|
% (instance, stream)
|
||||||
)
|
)
|
||||||
|
|
||||||
self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
|
|
||||||
|
|
||||||
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
def generate_config_section(self, config_dir_path, server_name, **kwargs):
|
||||||
return """\
|
return """\
|
||||||
## Workers ##
|
## Workers ##
|
||||||
|
@ -923,8 +923,7 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if ev_infos:
|
await self._handle_new_events(dest, ev_infos, backfilled=True)
|
||||||
await self._handle_new_events(dest, room_id, ev_infos, backfilled=True)
|
|
||||||
|
|
||||||
# Step 2: Persist the rest of the events in the chunk one by one
|
# Step 2: Persist the rest of the events in the chunk one by one
|
||||||
events.sort(key=lambda e: e.depth)
|
events.sort(key=lambda e: e.depth)
|
||||||
@ -1217,7 +1216,7 @@ class FederationHandler(BaseHandler):
|
|||||||
event_infos.append(_NewEventInfo(event, None, auth))
|
event_infos.append(_NewEventInfo(event, None, auth))
|
||||||
|
|
||||||
await self._handle_new_events(
|
await self._handle_new_events(
|
||||||
destination, room_id, event_infos,
|
destination, event_infos,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _sanity_check_event(self, ev):
|
def _sanity_check_event(self, ev):
|
||||||
@ -1364,15 +1363,15 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
max_stream_id = await self._persist_auth_tree(
|
max_stream_id = await self._persist_auth_tree(
|
||||||
origin, room_id, auth_chain, state, event, room_version_obj
|
origin, auth_chain, state, event, room_version_obj
|
||||||
)
|
)
|
||||||
|
|
||||||
# We wait here until this instance has seen the events come down
|
# We wait here until this instance has seen the events come down
|
||||||
# replication (if we're using replication) as the below uses caches.
|
# replication (if we're using replication) as the below uses caches.
|
||||||
|
#
|
||||||
|
# TODO: Currently the events stream is written to from master
|
||||||
await self._replication.wait_for_stream_position(
|
await self._replication.wait_for_stream_position(
|
||||||
self.config.worker.events_shard_config.get_instance(room_id),
|
self.config.worker.writers.events, "events", max_stream_id
|
||||||
"events",
|
|
||||||
max_stream_id,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check whether this room is the result of an upgrade of a room we already know
|
# Check whether this room is the result of an upgrade of a room we already know
|
||||||
@ -1626,7 +1625,7 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
context = await self.state_handler.compute_event_context(event)
|
context = await self.state_handler.compute_event_context(event)
|
||||||
await self.persist_events_and_notify(event.room_id, [(event, context)])
|
await self.persist_events_and_notify([(event, context)])
|
||||||
|
|
||||||
return event
|
return event
|
||||||
|
|
||||||
@ -1653,9 +1652,7 @@ class FederationHandler(BaseHandler):
|
|||||||
await self.federation_client.send_leave(host_list, event)
|
await self.federation_client.send_leave(host_list, event)
|
||||||
|
|
||||||
context = await self.state_handler.compute_event_context(event)
|
context = await self.state_handler.compute_event_context(event)
|
||||||
stream_id = await self.persist_events_and_notify(
|
stream_id = await self.persist_events_and_notify([(event, context)])
|
||||||
event.room_id, [(event, context)]
|
|
||||||
)
|
|
||||||
|
|
||||||
return event, stream_id
|
return event, stream_id
|
||||||
|
|
||||||
@ -1903,7 +1900,7 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
await self.persist_events_and_notify(
|
await self.persist_events_and_notify(
|
||||||
event.room_id, [(event, context)], backfilled=backfilled
|
[(event, context)], backfilled=backfilled
|
||||||
)
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
run_in_background(
|
run_in_background(
|
||||||
@ -1916,7 +1913,6 @@ class FederationHandler(BaseHandler):
|
|||||||
async def _handle_new_events(
|
async def _handle_new_events(
|
||||||
self,
|
self,
|
||||||
origin: str,
|
origin: str,
|
||||||
room_id: str,
|
|
||||||
event_infos: Iterable[_NewEventInfo],
|
event_infos: Iterable[_NewEventInfo],
|
||||||
backfilled: bool = False,
|
backfilled: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -1948,7 +1944,6 @@ class FederationHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
await self.persist_events_and_notify(
|
await self.persist_events_and_notify(
|
||||||
room_id,
|
|
||||||
[
|
[
|
||||||
(ev_info.event, context)
|
(ev_info.event, context)
|
||||||
for ev_info, context in zip(event_infos, contexts)
|
for ev_info, context in zip(event_infos, contexts)
|
||||||
@ -1959,7 +1954,6 @@ class FederationHandler(BaseHandler):
|
|||||||
async def _persist_auth_tree(
|
async def _persist_auth_tree(
|
||||||
self,
|
self,
|
||||||
origin: str,
|
origin: str,
|
||||||
room_id: str,
|
|
||||||
auth_events: List[EventBase],
|
auth_events: List[EventBase],
|
||||||
state: List[EventBase],
|
state: List[EventBase],
|
||||||
event: EventBase,
|
event: EventBase,
|
||||||
@ -1974,7 +1968,6 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
origin: Where the events came from
|
origin: Where the events came from
|
||||||
room_id,
|
|
||||||
auth_events
|
auth_events
|
||||||
state
|
state
|
||||||
event
|
event
|
||||||
@ -2049,20 +2042,17 @@ class FederationHandler(BaseHandler):
|
|||||||
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
|
||||||
|
|
||||||
await self.persist_events_and_notify(
|
await self.persist_events_and_notify(
|
||||||
room_id,
|
|
||||||
[
|
[
|
||||||
(e, events_to_context[e.event_id])
|
(e, events_to_context[e.event_id])
|
||||||
for e in itertools.chain(auth_events, state)
|
for e in itertools.chain(auth_events, state)
|
||||||
],
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
new_event_context = await self.state_handler.compute_event_context(
|
new_event_context = await self.state_handler.compute_event_context(
|
||||||
event, old_state=state
|
event, old_state=state
|
||||||
)
|
)
|
||||||
|
|
||||||
return await self.persist_events_and_notify(
|
return await self.persist_events_and_notify([(event, new_event_context)])
|
||||||
room_id, [(event, new_event_context)]
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _prep_event(
|
async def _prep_event(
|
||||||
self,
|
self,
|
||||||
@ -2913,7 +2903,6 @@ class FederationHandler(BaseHandler):
|
|||||||
|
|
||||||
async def persist_events_and_notify(
|
async def persist_events_and_notify(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
|
||||||
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
|
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
|
||||||
backfilled: bool = False,
|
backfilled: bool = False,
|
||||||
) -> int:
|
) -> int:
|
||||||
@ -2921,19 +2910,14 @@ class FederationHandler(BaseHandler):
|
|||||||
necessary.
|
necessary.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
room_id: The room ID of events being persisted.
|
event_and_contexts:
|
||||||
event_and_contexts: Sequence of events with their associated
|
|
||||||
context that should be persisted. All events must belong to
|
|
||||||
the same room.
|
|
||||||
backfilled: Whether these events are a result of
|
backfilled: Whether these events are a result of
|
||||||
backfilling or not
|
backfilling or not
|
||||||
"""
|
"""
|
||||||
instance = self.config.worker.events_shard_config.get_instance(room_id)
|
if self.config.worker.writers.events != self._instance_name:
|
||||||
if instance != self._instance_name:
|
|
||||||
result = await self._send_events(
|
result = await self._send_events(
|
||||||
instance_name=instance,
|
instance_name=self.config.worker.writers.events,
|
||||||
store=self.store,
|
store=self.store,
|
||||||
room_id=room_id,
|
|
||||||
event_and_contexts=event_and_contexts,
|
event_and_contexts=event_and_contexts,
|
||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
)
|
)
|
||||||
|
@ -376,8 +376,9 @@ class EventCreationHandler(object):
|
|||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self.config = hs.config
|
self.config = hs.config
|
||||||
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
|
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
|
||||||
self._events_shard_config = self.config.worker.events_shard_config
|
self._is_event_writer = (
|
||||||
self._instance_name = hs.get_instance_name()
|
self.config.worker.writers.events == hs.get_instance_name()
|
||||||
|
)
|
||||||
|
|
||||||
self.room_invite_state_types = self.hs.config.room_invite_state_types
|
self.room_invite_state_types = self.hs.config.room_invite_state_types
|
||||||
|
|
||||||
@ -903,10 +904,9 @@ class EventCreationHandler(object):
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
# If we're a worker we need to hit out to the master.
|
# If we're a worker we need to hit out to the master.
|
||||||
writer_instance = self._events_shard_config.get_instance(event.room_id)
|
if not self._is_event_writer:
|
||||||
if writer_instance != self._instance_name:
|
|
||||||
result = await self.send_event(
|
result = await self.send_event(
|
||||||
instance_name=writer_instance,
|
instance_name=self.config.worker.writers.events,
|
||||||
event_id=event.event_id,
|
event_id=event.event_id,
|
||||||
store=self.store,
|
store=self.store,
|
||||||
requester=requester,
|
requester=requester,
|
||||||
@ -974,9 +974,7 @@ class EventCreationHandler(object):
|
|||||||
|
|
||||||
This should only be run on the instance in charge of persisting events.
|
This should only be run on the instance in charge of persisting events.
|
||||||
"""
|
"""
|
||||||
assert self._events_shard_config.should_handle(
|
assert self._is_event_writer
|
||||||
self._instance_name, event.room_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if ratelimit:
|
if ratelimit:
|
||||||
# We check if this is a room admin redacting an event so that we
|
# We check if this is a room admin redacting an event so that we
|
||||||
|
@ -804,9 +804,7 @@ class RoomCreationHandler(BaseHandler):
|
|||||||
|
|
||||||
# Always wait for room creation to progate before returning
|
# Always wait for room creation to progate before returning
|
||||||
await self._replication.wait_for_stream_position(
|
await self._replication.wait_for_stream_position(
|
||||||
self.hs.config.worker.events_shard_config.get_instance(room_id),
|
self.hs.config.worker.writers.events, "events", last_stream_id
|
||||||
"events",
|
|
||||||
last_stream_id,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return result, last_stream_id
|
return result, last_stream_id
|
||||||
@ -1262,10 +1260,10 @@ class RoomShutdownHandler(object):
|
|||||||
# We now wait for the create room to come back in via replication so
|
# We now wait for the create room to come back in via replication so
|
||||||
# that we can assume that all the joins/invites have propogated before
|
# that we can assume that all the joins/invites have propogated before
|
||||||
# we try and auto join below.
|
# we try and auto join below.
|
||||||
|
#
|
||||||
|
# TODO: Currently the events stream is written to from master
|
||||||
await self._replication.wait_for_stream_position(
|
await self._replication.wait_for_stream_position(
|
||||||
self.hs.config.worker.events_shard_config.get_instance(new_room_id),
|
self.hs.config.worker.writers.events, "events", stream_id
|
||||||
"events",
|
|
||||||
stream_id,
|
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
new_room_id = None
|
new_room_id = None
|
||||||
@ -1295,9 +1293,7 @@ class RoomShutdownHandler(object):
|
|||||||
|
|
||||||
# Wait for leave to come in over replication before trying to forget.
|
# Wait for leave to come in over replication before trying to forget.
|
||||||
await self._replication.wait_for_stream_position(
|
await self._replication.wait_for_stream_position(
|
||||||
self.hs.config.worker.events_shard_config.get_instance(room_id),
|
self.hs.config.worker.writers.events, "events", stream_id
|
||||||
"events",
|
|
||||||
stream_id,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
await self.room_member_handler.forget(target_requester.user, room_id)
|
await self.room_member_handler.forget(target_requester.user, room_id)
|
||||||
|
@ -82,6 +82,13 @@ class RoomMemberHandler(object):
|
|||||||
self._enable_lookup = hs.config.enable_3pid_lookup
|
self._enable_lookup = hs.config.enable_3pid_lookup
|
||||||
self.allow_per_room_profiles = self.config.allow_per_room_profiles
|
self.allow_per_room_profiles = self.config.allow_per_room_profiles
|
||||||
|
|
||||||
|
self._event_stream_writer_instance = hs.config.worker.writers.events
|
||||||
|
self._is_on_event_persistence_instance = (
|
||||||
|
self._event_stream_writer_instance == hs.get_instance_name()
|
||||||
|
)
|
||||||
|
if self._is_on_event_persistence_instance:
|
||||||
|
self.persist_event_storage = hs.get_storage().persistence
|
||||||
|
|
||||||
self._join_rate_limiter_local = Ratelimiter(
|
self._join_rate_limiter_local = Ratelimiter(
|
||||||
clock=self.clock,
|
clock=self.clock,
|
||||||
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
|
rate_hz=hs.config.ratelimiting.rc_joins_local.per_second,
|
||||||
|
@ -65,11 +65,10 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
|||||||
self.federation_handler = hs.get_handlers().federation_handler
|
self.federation_handler = hs.get_handlers().federation_handler
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
|
async def _serialize_payload(store, event_and_contexts, backfilled):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
store
|
store
|
||||||
room_id (str)
|
|
||||||
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
|
event_and_contexts (list[tuple[FrozenEvent, EventContext]])
|
||||||
backfilled (bool): Whether or not the events are the result of
|
backfilled (bool): Whether or not the events are the result of
|
||||||
backfilling
|
backfilling
|
||||||
@ -89,11 +88,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
payload = {
|
payload = {"events": event_payloads, "backfilled": backfilled}
|
||||||
"events": event_payloads,
|
|
||||||
"backfilled": backfilled,
|
|
||||||
"room_id": room_id,
|
|
||||||
}
|
|
||||||
|
|
||||||
return payload
|
return payload
|
||||||
|
|
||||||
@ -101,7 +96,6 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
|||||||
with Measure(self.clock, "repl_fed_send_events_parse"):
|
with Measure(self.clock, "repl_fed_send_events_parse"):
|
||||||
content = parse_json_object_from_request(request)
|
content = parse_json_object_from_request(request)
|
||||||
|
|
||||||
room_id = content["room_id"]
|
|
||||||
backfilled = content["backfilled"]
|
backfilled = content["backfilled"]
|
||||||
|
|
||||||
event_payloads = content["events"]
|
event_payloads = content["events"]
|
||||||
@ -126,7 +120,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
|
|||||||
logger.info("Got %d events from federation", len(event_and_contexts))
|
logger.info("Got %d events from federation", len(event_and_contexts))
|
||||||
|
|
||||||
max_stream_id = await self.federation_handler.persist_events_and_notify(
|
max_stream_id = await self.federation_handler.persist_events_and_notify(
|
||||||
room_id, event_and_contexts, backfilled
|
event_and_contexts, backfilled
|
||||||
)
|
)
|
||||||
|
|
||||||
return 200, {"max_stream_id": max_stream_id}
|
return 200, {"max_stream_id": max_stream_id}
|
||||||
|
@ -109,7 +109,7 @@ class ReplicationCommandHandler:
|
|||||||
if isinstance(stream, (EventsStream, BackfillStream)):
|
if isinstance(stream, (EventsStream, BackfillStream)):
|
||||||
# Only add EventStream and BackfillStream as a source on the
|
# Only add EventStream and BackfillStream as a source on the
|
||||||
# instance in charge of event persistence.
|
# instance in charge of event persistence.
|
||||||
if hs.get_instance_name() in hs.config.worker.writers.events:
|
if hs.config.worker.writers.events == hs.get_instance_name():
|
||||||
self._streams_to_replicate.append(stream)
|
self._streams_to_replicate.append(stream)
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
@ -19,7 +19,7 @@ from typing import List, Tuple, Type
|
|||||||
|
|
||||||
import attr
|
import attr
|
||||||
|
|
||||||
from ._base import Stream, StreamUpdateResult, Token
|
from ._base import Stream, StreamUpdateResult, Token, current_token_without_instance
|
||||||
|
|
||||||
"""Handling of the 'events' replication stream
|
"""Handling of the 'events' replication stream
|
||||||
|
|
||||||
@ -117,7 +117,7 @@ class EventsStream(Stream):
|
|||||||
self._store = hs.get_datastore()
|
self._store = hs.get_datastore()
|
||||||
super().__init__(
|
super().__init__(
|
||||||
hs.get_instance_name(),
|
hs.get_instance_name(),
|
||||||
self._store._stream_id_gen.get_current_token_for_writer,
|
current_token_without_instance(self._store.get_current_events_token),
|
||||||
self._update_function,
|
self._update_function,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ class Databases(object):
|
|||||||
|
|
||||||
# If we're on a process that can persist events also
|
# If we're on a process that can persist events also
|
||||||
# instantiate a `PersistEventsStore`
|
# instantiate a `PersistEventsStore`
|
||||||
if hs.get_instance_name() in hs.config.worker.writers.events:
|
if hs.config.worker.writers.events == hs.get_instance_name():
|
||||||
persist_events = PersistEventsStore(hs, database, main)
|
persist_events = PersistEventsStore(hs, database, main)
|
||||||
|
|
||||||
if "state" in database_config.databases:
|
if "state" in database_config.databases:
|
||||||
|
@ -438,7 +438,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
if stream_ordering <= self.stream_ordering_month_ago:
|
if stream_ordering <= self.stream_ordering_month_ago:
|
||||||
raise StoreError(400, "stream_ordering too old %s" % (stream_ordering,))
|
raise StoreError(400, "stream_ordering too old")
|
||||||
|
|
||||||
sql = """
|
sql = """
|
||||||
SELECT event_id FROM stream_ordering_to_exterm
|
SELECT event_id FROM stream_ordering_to_exterm
|
||||||
|
@ -97,7 +97,6 @@ class PersistEventsStore:
|
|||||||
self.store = main_data_store
|
self.store = main_data_store
|
||||||
self.database_engine = db.engine
|
self.database_engine = db.engine
|
||||||
self._clock = hs.get_clock()
|
self._clock = hs.get_clock()
|
||||||
self._instance_name = hs.get_instance_name()
|
|
||||||
|
|
||||||
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
|
self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
@ -109,7 +108,7 @@ class PersistEventsStore:
|
|||||||
|
|
||||||
# This should only exist on instances that are configured to write
|
# This should only exist on instances that are configured to write
|
||||||
assert (
|
assert (
|
||||||
hs.get_instance_name() in hs.config.worker.writers.events
|
hs.config.worker.writers.events == hs.get_instance_name()
|
||||||
), "Can only instantiate EventsStore on master"
|
), "Can only instantiate EventsStore on master"
|
||||||
|
|
||||||
async def _persist_events_and_state_updates(
|
async def _persist_events_and_state_updates(
|
||||||
@ -801,7 +800,6 @@ class PersistEventsStore:
|
|||||||
table="events",
|
table="events",
|
||||||
values=[
|
values=[
|
||||||
{
|
{
|
||||||
"instance_name": self._instance_name,
|
|
||||||
"stream_ordering": event.internal_metadata.stream_ordering,
|
"stream_ordering": event.internal_metadata.stream_ordering,
|
||||||
"topological_ordering": event.depth,
|
"topological_ordering": event.depth,
|
||||||
"depth": event.depth,
|
"depth": event.depth,
|
||||||
|
@ -42,8 +42,7 @@ from synapse.replication.tcp.streams import BackfillStream
|
|||||||
from synapse.replication.tcp.streams.events import EventsStream
|
from synapse.replication.tcp.streams.events import EventsStream
|
||||||
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
|
||||||
from synapse.storage.database import DatabasePool
|
from synapse.storage.database import DatabasePool
|
||||||
from synapse.storage.engines import PostgresEngine
|
from synapse.storage.util.id_generators import StreamIdGenerator
|
||||||
from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
|
|
||||||
from synapse.types import Collection, get_domain_from_id
|
from synapse.types import Collection, get_domain_from_id
|
||||||
from synapse.util.caches.descriptors import Cache, cached
|
from synapse.util.caches.descriptors import Cache, cached
|
||||||
from synapse.util.iterutils import batch_iter
|
from synapse.util.iterutils import batch_iter
|
||||||
@ -79,37 +78,9 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
def __init__(self, database: DatabasePool, db_conn, hs):
|
def __init__(self, database: DatabasePool, db_conn, hs):
|
||||||
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
|
super(EventsWorkerStore, self).__init__(database, db_conn, hs)
|
||||||
|
|
||||||
if isinstance(database.engine, PostgresEngine):
|
if hs.config.worker.writers.events == hs.get_instance_name():
|
||||||
# If we're using Postgres than we can use `MultiWriterIdGenerator`
|
# We are the process in charge of generating stream ids for events,
|
||||||
# regardless of whether this process writes to the streams or not.
|
# so instantiate ID generators based on the database
|
||||||
self._stream_id_gen = MultiWriterIdGenerator(
|
|
||||||
db_conn=db_conn,
|
|
||||||
db=database,
|
|
||||||
instance_name=hs.get_instance_name(),
|
|
||||||
table="events",
|
|
||||||
instance_column="instance_name",
|
|
||||||
id_column="stream_ordering",
|
|
||||||
sequence_name="events_stream_seq",
|
|
||||||
)
|
|
||||||
self._backfill_id_gen = MultiWriterIdGenerator(
|
|
||||||
db_conn=db_conn,
|
|
||||||
db=database,
|
|
||||||
instance_name=hs.get_instance_name(),
|
|
||||||
table="events",
|
|
||||||
instance_column="instance_name",
|
|
||||||
id_column="stream_ordering",
|
|
||||||
sequence_name="events_backfill_stream_seq",
|
|
||||||
positive=False,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# We shouldn't be running in worker mode with SQLite, but its useful
|
|
||||||
# to support it for unit tests.
|
|
||||||
#
|
|
||||||
# If this process is the writer than we need to use
|
|
||||||
# `StreamIdGenerator`, otherwise we use `SlavedIdTracker` which gets
|
|
||||||
# updated over replication. (Multiple writers are not supported for
|
|
||||||
# SQLite).
|
|
||||||
if hs.get_instance_name() in hs.config.worker.writers.events:
|
|
||||||
self._stream_id_gen = StreamIdGenerator(
|
self._stream_id_gen = StreamIdGenerator(
|
||||||
db_conn, "events", "stream_ordering",
|
db_conn, "events", "stream_ordering",
|
||||||
)
|
)
|
||||||
@ -121,9 +92,10 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self._stream_id_gen = SlavedIdTracker(
|
# Another process is in charge of persisting events and generating
|
||||||
db_conn, "events", "stream_ordering"
|
# stream IDs: rely on the replication streams to let us know which
|
||||||
)
|
# IDs we can process.
|
||||||
|
self._stream_id_gen = SlavedIdTracker(db_conn, "events", "stream_ordering")
|
||||||
self._backfill_id_gen = SlavedIdTracker(
|
self._backfill_id_gen = SlavedIdTracker(
|
||||||
db_conn, "events", "stream_ordering", step=-1
|
db_conn, "events", "stream_ordering", step=-1
|
||||||
)
|
)
|
||||||
|
@ -1,16 +0,0 @@
|
|||||||
/* Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
ALTER TABLE events ADD COLUMN instance_name TEXT;
|
|
@ -1,26 +0,0 @@
|
|||||||
/* Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
CREATE SEQUENCE IF NOT EXISTS events_stream_seq;
|
|
||||||
|
|
||||||
SELECT setval('events_stream_seq', (
|
|
||||||
SELECT COALESCE(MAX(stream_ordering), 1) FROM events
|
|
||||||
));
|
|
||||||
|
|
||||||
CREATE SEQUENCE IF NOT EXISTS events_backfill_stream_seq;
|
|
||||||
|
|
||||||
SELECT setval('events_backfill_stream_seq', (
|
|
||||||
SELECT COALESCE(-MIN(stream_ordering), 1) FROM events
|
|
||||||
));
|
|
@ -231,12 +231,8 @@ class MultiWriterIdGenerator:
|
|||||||
# gaps should be relatively rare it's still worth doing the book keeping
|
# gaps should be relatively rare it's still worth doing the book keeping
|
||||||
# that allows us to skip forwards when there are gapless runs of
|
# that allows us to skip forwards when there are gapless runs of
|
||||||
# positions.
|
# positions.
|
||||||
#
|
|
||||||
# We start at 1 here as a) the first generated stream ID will be 2, and
|
|
||||||
# b) other parts of the code assume that stream IDs are strictly greater
|
|
||||||
# than 0.
|
|
||||||
self._persisted_upto_position = (
|
self._persisted_upto_position = (
|
||||||
min(self._current_positions.values()) if self._current_positions else 1
|
min(self._current_positions.values()) if self._current_positions else 0
|
||||||
)
|
)
|
||||||
self._known_persisted_positions = [] # type: List[int]
|
self._known_persisted_positions = [] # type: List[int]
|
||||||
|
|
||||||
@ -366,7 +362,9 @@ class MultiWriterIdGenerator:
|
|||||||
equal to it have been successfully persisted.
|
equal to it have been successfully persisted.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.get_persisted_upto_position()
|
# Currently we don't support this operation, as it's not obvious how to
|
||||||
|
# condense the stream positions of multiple writers into a single int.
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
def get_current_token_for_writer(self, instance_name: str) -> int:
|
def get_current_token_for_writer(self, instance_name: str) -> int:
|
||||||
"""Returns the position of the given writer.
|
"""Returns the position of the given writer.
|
||||||
|
Loading…
Reference in New Issue
Block a user