From 1058d141272f17435b3548a4b12abb51a6372383 Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Mon, 20 Aug 2018 17:16:58 +0100 Subject: [PATCH 1/5] Make the in flight background process metrics thread safe --- synapse/metrics/background_process_metrics.py | 25 +++++++++++++++---- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index ce678d5f7..264fb9389 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -14,6 +14,7 @@ # limitations under the License. import six +import threading from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily @@ -78,6 +79,9 @@ _background_process_counts = dict() # type: dict[str, int] # of process descriptions that no longer have any active processes. _background_processes = dict() # type: dict[str, set[_BackgroundProcess]] +# A lock that covers the above dicts +_bg_metrics_lock = threading.Lock() + class _Collector(object): """A custom metrics collector for the background process metrics. @@ -92,7 +96,11 @@ class _Collector(object): labels=["name"], ) - for desc, processes in six.iteritems(_background_processes): + # We copy the dict so that it doesn't change from underneath us + with _bg_metrics_lock: + _background_processes_copy = dict(_background_processes) + + for desc, processes in six.iteritems(_background_processes_copy): background_process_in_flight_count.add_metric( (desc,), len(processes), ) @@ -167,19 +175,26 @@ def run_as_background_process(desc, func, *args, **kwargs): """ @defer.inlineCallbacks def run(): - count = _background_process_counts.get(desc, 0) - _background_process_counts[desc] = count + 1 + with _bg_metrics_lock: + count = _background_process_counts.get(desc, 0) + _background_process_counts[desc] = count + 1 + _background_process_start_count.labels(desc).inc() with LoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) proc = _BackgroundProcess(desc, context) - _background_processes.setdefault(desc, set()).add(proc) + + with _bg_metrics_lock: + _background_processes.setdefault(desc, set()).add(proc) + try: yield func(*args, **kwargs) finally: proc.update_metrics() - _background_processes[desc].remove(proc) + + with _bg_metrics_lock: + _background_processes[desc].remove(proc) with PreserveLoggingContext(): return run() From b01a75549893bf4f8f986a74a499e7cb6131868c Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Mon, 20 Aug 2018 17:27:52 +0100 Subject: [PATCH 2/5] Make the in flight requests metrics thread safe --- synapse/http/request_metrics.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 588e28057..284d5e05d 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +import threading from prometheus_client.core import Counter, Histogram @@ -111,6 +112,9 @@ in_flight_requests_db_sched_duration = Counter( # The set of all in flight requests, set[RequestMetrics] _in_flight_requests = set() +# Protects the _in_flight_requests set from concurrent accesss +_in_flight_reqeuests_lock = threading.Lock() + def _get_in_flight_counts(): """Returns a count of all in flight requests by (method, server_name) @@ -120,7 +124,8 @@ def _get_in_flight_counts(): """ # Cast to a list to prevent it changing while the Prometheus # thread is collecting metrics - reqs = list(_in_flight_requests) + with _in_flight_reqeuests_lock: + reqs = list(_in_flight_requests) for rm in reqs: rm.update_metrics() @@ -154,10 +159,12 @@ class RequestMetrics(object): # to the "in flight" metrics. self._request_stats = self.start_context.get_resource_usage() - _in_flight_requests.add(self) + with _in_flight_reqeuests_lock: + _in_flight_requests.add(self) def stop(self, time_sec, request): - _in_flight_requests.discard(self) + with _in_flight_reqeuests_lock: + _in_flight_requests.discard(self) context = LoggingContext.current_context() From e2c0aa2c2619d6db7f6d6b1413430133e365f4cd Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Mon, 20 Aug 2018 17:40:59 +0100 Subject: [PATCH 3/5] Newsfile --- changelog.d/3722.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3722.bugfix diff --git a/changelog.d/3722.bugfix b/changelog.d/3722.bugfix new file mode 100644 index 000000000..16cbaf76c --- /dev/null +++ b/changelog.d/3722.bugfix @@ -0,0 +1 @@ +Fix error collecting prometheus metrics when run on dedicated thread due to threading concurrency issues From 3f6762f0bbeb611714ba6c9c4329ea7d5e46cbda Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Tue, 21 Aug 2018 09:38:38 +0100 Subject: [PATCH 4/5] isort --- synapse/metrics/background_process_metrics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 264fb9389..167167be0 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -13,9 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import six import threading +import six + from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily from twisted.internet import defer From cd6937fb266b72080f73a7a41da761f7f8775d96 Mon Sep 17 00:00:00 2001 From: Erik Johnston <erik@matrix.org> Date: Tue, 21 Aug 2018 16:28:10 +0100 Subject: [PATCH 5/5] Fix typo --- synapse/http/request_metrics.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 284d5e05d..72c265467 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -113,7 +113,7 @@ in_flight_requests_db_sched_duration = Counter( _in_flight_requests = set() # Protects the _in_flight_requests set from concurrent accesss -_in_flight_reqeuests_lock = threading.Lock() +_in_flight_requests_lock = threading.Lock() def _get_in_flight_counts(): @@ -124,7 +124,7 @@ def _get_in_flight_counts(): """ # Cast to a list to prevent it changing while the Prometheus # thread is collecting metrics - with _in_flight_reqeuests_lock: + with _in_flight_requests_lock: reqs = list(_in_flight_requests) for rm in reqs: @@ -159,11 +159,11 @@ class RequestMetrics(object): # to the "in flight" metrics. self._request_stats = self.start_context.get_resource_usage() - with _in_flight_reqeuests_lock: + with _in_flight_requests_lock: _in_flight_requests.add(self) def stop(self, time_sec, request): - with _in_flight_reqeuests_lock: + with _in_flight_requests_lock: _in_flight_requests.discard(self) context = LoggingContext.current_context()