diff --git a/changelog.d/7422.feature b/changelog.d/7422.feature new file mode 100644 index 000000000..d6d5bb216 --- /dev/null +++ b/changelog.d/7422.feature @@ -0,0 +1 @@ +Add a configuration setting to tweak the threshold for dummy events. diff --git a/changelog.d/7426.misc b/changelog.d/7426.misc new file mode 100644 index 000000000..731f4dcb5 --- /dev/null +++ b/changelog.d/7426.misc @@ -0,0 +1 @@ +Clean up some LoggingContext code. diff --git a/changelog.d/7439.feature b/changelog.d/7439.feature new file mode 100644 index 000000000..ce6140fdd --- /dev/null +++ b/changelog.d/7439.feature @@ -0,0 +1 @@ +Add support for running replication over Redis when using workers. diff --git a/changelog.d/7442.misc b/changelog.d/7442.misc new file mode 100644 index 000000000..a8fd5ad80 --- /dev/null +++ b/changelog.d/7442.misc @@ -0,0 +1 @@ +Run group attestation renewal in series rather than parallel for performance. \ No newline at end of file diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index fc970986c..98ead7dc0 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -253,6 +253,18 @@ listeners: # bind_addresses: ['::1', '127.0.0.1'] # type: manhole +# Forward extremities can build up in a room due to networking delays between +# homeservers. Once this happens in a large room, calculation of the state of +# that room can become quite expensive. To mitigate this, once the number of +# forward extremities reaches a given threshold, Synapse will send an +# org.matrix.dummy_event event, which will reduce the forward extremities +# in the room. +# +# This setting defines the threshold (i.e. number of forward extremities in the +# room) at which dummy events are sent. The default value is 10. +# +#dummy_events_threshold: 5 + ## Homeserver blocking ## diff --git a/synapse/config/server.py b/synapse/config/server.py index c6d58effd..6d8823184 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -505,6 +505,9 @@ class ServerConfig(Config): "cleanup_extremities_with_dummy_events", True ) + # The number of forward extremities in a room needed to send a dummy event. + self.dummy_events_threshold = config.get("dummy_events_threshold", 10) + self.enable_ephemeral_messages = config.get("enable_ephemeral_messages", False) # Inhibits the /requestToken endpoints from returning an error that might leak @@ -823,6 +826,18 @@ class ServerConfig(Config): # bind_addresses: ['::1', '127.0.0.1'] # type: manhole + # Forward extremities can build up in a room due to networking delays between + # homeservers. Once this happens in a large room, calculation of the state of + # that room can become quite expensive. To mitigate this, once the number of + # forward extremities reaches a given threshold, Synapse will send an + # org.matrix.dummy_event event, which will reduce the forward extremities + # in the room. + # + # This setting defines the threshold (i.e. number of forward extremities in the + # room) at which dummy events are sent. The default value is 10. + # + #dummy_events_threshold: 5 + ## Homeserver blocking ## diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 1eec3874b..27b0c0265 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -46,7 +46,6 @@ from twisted.internet import defer from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id -from synapse.util.async_helpers import yieldable_gather_results logger = logging.getLogger(__name__) @@ -208,6 +207,5 @@ class GroupAttestionRenewer(object): "Error renewing attestation of %r in %r", user_id, group_id ) - await yieldable_gather_results( - _renew_attestation, ((row["group_id"], row["user_id"]) for row in rows) - ) + for row in rows: + await _renew_attestation((row["group_id"], row["user_id"])) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index a324f0934..a622a600b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -419,6 +419,8 @@ class EventCreationHandler(object): self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages + self._dummy_events_threshold = hs.config.dummy_events_threshold + @defer.inlineCallbacks def create_event( self, @@ -1085,7 +1087,7 @@ class EventCreationHandler(object): """ self._expire_rooms_to_exclude_from_dummy_event_insertion() room_ids = await self.store.get_rooms_with_many_extremities( - min_count=10, + min_count=self._dummy_events_threshold, limit=5, room_id_filter=self._rooms_to_exclude_from_dummy_event_insertion.keys(), ) diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 856534e91..8b9c4e38b 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -431,15 +431,7 @@ class LoggingContext(object): return utime_delta, stime_delta = self._get_cputime(rusage) - self._resource_usage.ru_utime += utime_delta - self._resource_usage.ru_stime += stime_delta - - # if we have a parent, pass our CPU usage stats on - if self.parent_context: - self.parent_context._resource_usage += self._resource_usage - - # reset them in case we get entered again - self._resource_usage.reset() + self.add_cputime(utime_delta, stime_delta) finally: self.usage_start = None @@ -497,30 +489,52 @@ class LoggingContext(object): return utime_delta, stime_delta + def add_cputime(self, utime_delta: float, stime_delta: float) -> None: + """Update the CPU time usage of this context (and any parents, recursively). + + Args: + utime_delta: additional user time, in seconds, spent in this context. + stime_delta: additional system time, in seconds, spent in this context. + """ + self._resource_usage.ru_utime += utime_delta + self._resource_usage.ru_stime += stime_delta + if self.parent_context: + self.parent_context.add_cputime(utime_delta, stime_delta) + def add_database_transaction(self, duration_sec: float) -> None: + """Record the use of a database transaction and the length of time it took. + + Args: + duration_sec: The number of seconds the database transaction took. + """ if duration_sec < 0: raise ValueError("DB txn time can only be non-negative") self._resource_usage.db_txn_count += 1 self._resource_usage.db_txn_duration_sec += duration_sec + if self.parent_context: + self.parent_context.add_database_transaction(duration_sec) def add_database_scheduled(self, sched_sec: float) -> None: """Record a use of the database pool Args: - sched_sec (float): number of seconds it took us to get a - connection + sched_sec: number of seconds it took us to get a connection """ if sched_sec < 0: raise ValueError("DB scheduling time can only be non-negative") self._resource_usage.db_sched_duration_sec += sched_sec + if self.parent_context: + self.parent_context.add_database_scheduled(sched_sec) def record_event_fetch(self, event_count: int) -> None: """Record a number of events being fetched from the db Args: - event_count (int): number of events being fetched + event_count: number of events being fetched """ self._resource_usage.evt_db_fetch_count += event_count + if self.parent_context: + self.parent_context.record_event_fetch(event_count) class LoggingContextFilter(logging.Filter): diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 733c51b75..39c99a280 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -98,7 +98,9 @@ CONDITIONAL_REQUIREMENTS = { "sentry": ["sentry-sdk>=0.7.2"], "opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"], "jwt": ["pyjwt>=1.6.4"], - "redis": ["txredisapi>=1.4.7"], + # hiredis is not a *strict* dependency, but it makes things much faster. + # (if it is not installed, we fall back to slow code.) + "redis": ["txredisapi>=1.4.7", "hiredis"], } ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]