mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-15 03:05:27 -04:00
Merge remote-tracking branch 'upstream/release-v1.67'
This commit is contained in:
commit
ca9515d2c7
133 changed files with 3557 additions and 2185 deletions
|
@ -26,12 +26,13 @@ from twisted.web.server import Request
|
|||
|
||||
from synapse.api.errors import HttpResponseException, SynapseError
|
||||
from synapse.http import RequestTimedOutError
|
||||
from synapse.http.server import HttpServer, is_method_cancellable
|
||||
from synapse.http.server import HttpServer
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging import opentracing
|
||||
from synapse.logging.opentracing import trace_with_opname
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util.caches.response_cache import ResponseCache
|
||||
from synapse.util.cancellation import is_function_cancellable
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -311,7 +312,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
|||
url_args = list(self.PATH_ARGS)
|
||||
method = self.METHOD
|
||||
|
||||
if self.CACHE and is_method_cancellable(self._handle_request):
|
||||
if self.CACHE and is_function_cancellable(self._handle_request):
|
||||
raise Exception(
|
||||
f"{self.__class__.__name__} has been marked as cancellable, but CACHE "
|
||||
"is set. The cancellable flag would have no effect."
|
||||
|
@ -359,6 +360,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
|||
# The `@cancellable` decorator may be applied to `_handle_request`. But we
|
||||
# told `HttpServer.register_paths` that our handler is `_check_auth_and_handle`,
|
||||
# so we have to set up the cancellable flag ourselves.
|
||||
request.is_render_cancellable = is_method_cancellable(self._handle_request)
|
||||
request.is_render_cancellable = is_function_cancellable(self._handle_request)
|
||||
|
||||
return await self._handle_request(request, **kwargs)
|
||||
|
|
|
@ -31,6 +31,5 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
|
|||
self._push_rules_stream_id_gen.advance(instance_name, token)
|
||||
for row in rows:
|
||||
self.get_push_rules_for_user.invalidate((row.user_id,))
|
||||
self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
|
||||
self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
|
||||
return super().process_replication_rows(stream_name, instance_name, token, rows)
|
||||
|
|
|
@ -416,10 +416,7 @@ class FederationSenderHandler:
|
|||
if not self._is_mine_id(receipt.user_id):
|
||||
continue
|
||||
# Private read receipts never get sent over federation.
|
||||
if receipt.receipt_type in (
|
||||
ReceiptTypes.READ_PRIVATE,
|
||||
ReceiptTypes.UNSTABLE_READ_PRIVATE,
|
||||
):
|
||||
if receipt.receipt_type == ReceiptTypes.READ_PRIVATE:
|
||||
continue
|
||||
receipt_info = ReadReceipt(
|
||||
receipt.room_id,
|
||||
|
|
|
@ -35,7 +35,6 @@ from twisted.internet.protocol import ReconnectingClientFactory
|
|||
|
||||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
|
||||
from synapse.replication.tcp.commands import (
|
||||
ClearUserSyncsCommand,
|
||||
Command,
|
||||
|
@ -332,46 +331,31 @@ class ReplicationCommandHandler:
|
|||
|
||||
def start_replication(self, hs: "HomeServer") -> None:
|
||||
"""Helper method to start replication."""
|
||||
if hs.config.redis.redis_enabled:
|
||||
from synapse.replication.tcp.redis import (
|
||||
RedisDirectTcpReplicationClientFactory,
|
||||
)
|
||||
from synapse.replication.tcp.redis import RedisDirectTcpReplicationClientFactory
|
||||
|
||||
# First let's ensure that we have a ReplicationStreamer started.
|
||||
hs.get_replication_streamer()
|
||||
# First let's ensure that we have a ReplicationStreamer started.
|
||||
hs.get_replication_streamer()
|
||||
|
||||
# We need two connections to redis, one for the subscription stream and
|
||||
# one to send commands to (as you can't send further redis commands to a
|
||||
# connection after SUBSCRIBE is called).
|
||||
# We need two connections to redis, one for the subscription stream and
|
||||
# one to send commands to (as you can't send further redis commands to a
|
||||
# connection after SUBSCRIBE is called).
|
||||
|
||||
# First create the connection for sending commands.
|
||||
outbound_redis_connection = hs.get_outbound_redis_connection()
|
||||
# First create the connection for sending commands.
|
||||
outbound_redis_connection = hs.get_outbound_redis_connection()
|
||||
|
||||
# Now create the factory/connection for the subscription stream.
|
||||
self._factory = RedisDirectTcpReplicationClientFactory(
|
||||
hs,
|
||||
outbound_redis_connection,
|
||||
channel_names=self._channels_to_subscribe_to,
|
||||
)
|
||||
hs.get_reactor().connectTCP(
|
||||
hs.config.redis.redis_host,
|
||||
hs.config.redis.redis_port,
|
||||
self._factory,
|
||||
timeout=30,
|
||||
bindAddress=None,
|
||||
)
|
||||
else:
|
||||
client_name = hs.get_instance_name()
|
||||
self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
|
||||
host = hs.config.worker.worker_replication_host
|
||||
port = hs.config.worker.worker_replication_port
|
||||
hs.get_reactor().connectTCP(
|
||||
host,
|
||||
port,
|
||||
self._factory,
|
||||
timeout=30,
|
||||
bindAddress=None,
|
||||
)
|
||||
# Now create the factory/connection for the subscription stream.
|
||||
self._factory = RedisDirectTcpReplicationClientFactory(
|
||||
hs,
|
||||
outbound_redis_connection,
|
||||
channel_names=self._channels_to_subscribe_to,
|
||||
)
|
||||
hs.get_reactor().connectTCP(
|
||||
hs.config.redis.redis_host,
|
||||
hs.config.redis.redis_port,
|
||||
self._factory,
|
||||
timeout=30,
|
||||
bindAddress=None,
|
||||
)
|
||||
|
||||
def get_streams(self) -> Dict[str, Stream]:
|
||||
"""Get a map from stream name to all streams."""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue