mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-28 05:36:58 -05:00
Merge pull request #3722 from matrix-org/erikj/bg_process_iteration
LaterGauge needs to call thread safe functions
This commit is contained in:
commit
2b1a4b2596
1
changelog.d/3722.bugfix
Normal file
1
changelog.d/3722.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix error collecting prometheus metrics when run on dedicated thread due to threading concurrency issues
|
@ -15,6 +15,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
|
|
||||||
from prometheus_client.core import Counter, Histogram
|
from prometheus_client.core import Counter, Histogram
|
||||||
|
|
||||||
@ -111,6 +112,9 @@ in_flight_requests_db_sched_duration = Counter(
|
|||||||
# The set of all in flight requests, set[RequestMetrics]
|
# The set of all in flight requests, set[RequestMetrics]
|
||||||
_in_flight_requests = set()
|
_in_flight_requests = set()
|
||||||
|
|
||||||
|
# Protects the _in_flight_requests set from concurrent accesss
|
||||||
|
_in_flight_requests_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
def _get_in_flight_counts():
|
def _get_in_flight_counts():
|
||||||
"""Returns a count of all in flight requests by (method, server_name)
|
"""Returns a count of all in flight requests by (method, server_name)
|
||||||
@ -120,6 +124,7 @@ def _get_in_flight_counts():
|
|||||||
"""
|
"""
|
||||||
# Cast to a list to prevent it changing while the Prometheus
|
# Cast to a list to prevent it changing while the Prometheus
|
||||||
# thread is collecting metrics
|
# thread is collecting metrics
|
||||||
|
with _in_flight_requests_lock:
|
||||||
reqs = list(_in_flight_requests)
|
reqs = list(_in_flight_requests)
|
||||||
|
|
||||||
for rm in reqs:
|
for rm in reqs:
|
||||||
@ -154,9 +159,11 @@ class RequestMetrics(object):
|
|||||||
# to the "in flight" metrics.
|
# to the "in flight" metrics.
|
||||||
self._request_stats = self.start_context.get_resource_usage()
|
self._request_stats = self.start_context.get_resource_usage()
|
||||||
|
|
||||||
|
with _in_flight_requests_lock:
|
||||||
_in_flight_requests.add(self)
|
_in_flight_requests.add(self)
|
||||||
|
|
||||||
def stop(self, time_sec, request):
|
def stop(self, time_sec, request):
|
||||||
|
with _in_flight_requests_lock:
|
||||||
_in_flight_requests.discard(self)
|
_in_flight_requests.discard(self)
|
||||||
|
|
||||||
context = LoggingContext.current_context()
|
context = LoggingContext.current_context()
|
||||||
|
@ -13,6 +13,8 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import threading
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
|
from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
|
||||||
@ -78,6 +80,9 @@ _background_process_counts = dict() # type: dict[str, int]
|
|||||||
# of process descriptions that no longer have any active processes.
|
# of process descriptions that no longer have any active processes.
|
||||||
_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
|
_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
|
||||||
|
|
||||||
|
# A lock that covers the above dicts
|
||||||
|
_bg_metrics_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
class _Collector(object):
|
class _Collector(object):
|
||||||
"""A custom metrics collector for the background process metrics.
|
"""A custom metrics collector for the background process metrics.
|
||||||
@ -92,7 +97,11 @@ class _Collector(object):
|
|||||||
labels=["name"],
|
labels=["name"],
|
||||||
)
|
)
|
||||||
|
|
||||||
for desc, processes in six.iteritems(_background_processes):
|
# We copy the dict so that it doesn't change from underneath us
|
||||||
|
with _bg_metrics_lock:
|
||||||
|
_background_processes_copy = dict(_background_processes)
|
||||||
|
|
||||||
|
for desc, processes in six.iteritems(_background_processes_copy):
|
||||||
background_process_in_flight_count.add_metric(
|
background_process_in_flight_count.add_metric(
|
||||||
(desc,), len(processes),
|
(desc,), len(processes),
|
||||||
)
|
)
|
||||||
@ -167,18 +176,25 @@ def run_as_background_process(desc, func, *args, **kwargs):
|
|||||||
"""
|
"""
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def run():
|
def run():
|
||||||
|
with _bg_metrics_lock:
|
||||||
count = _background_process_counts.get(desc, 0)
|
count = _background_process_counts.get(desc, 0)
|
||||||
_background_process_counts[desc] = count + 1
|
_background_process_counts[desc] = count + 1
|
||||||
|
|
||||||
_background_process_start_count.labels(desc).inc()
|
_background_process_start_count.labels(desc).inc()
|
||||||
|
|
||||||
with LoggingContext(desc) as context:
|
with LoggingContext(desc) as context:
|
||||||
context.request = "%s-%i" % (desc, count)
|
context.request = "%s-%i" % (desc, count)
|
||||||
proc = _BackgroundProcess(desc, context)
|
proc = _BackgroundProcess(desc, context)
|
||||||
|
|
||||||
|
with _bg_metrics_lock:
|
||||||
_background_processes.setdefault(desc, set()).add(proc)
|
_background_processes.setdefault(desc, set()).add(proc)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
yield func(*args, **kwargs)
|
yield func(*args, **kwargs)
|
||||||
finally:
|
finally:
|
||||||
proc.update_metrics()
|
proc.update_metrics()
|
||||||
|
|
||||||
|
with _bg_metrics_lock:
|
||||||
_background_processes[desc].remove(proc)
|
_background_processes[desc].remove(proc)
|
||||||
|
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
|
Loading…
x
Reference in New Issue
Block a user