Track where event stream processing have gotten up to

This commit is contained in:
Erik Johnston 2018-04-11 11:07:51 +01:00
parent ab825aa328
commit 92e34615c5
4 changed files with 24 additions and 0 deletions

View File

@ -239,6 +239,10 @@ class TransactionQueue(object):
"events", next_token "events", next_token
) )
synapse.metrics.event_processing_positions.set(
next_token, "federation_sender",
)
finally: finally:
self._is_processing = False self._is_processing = False

View File

@ -111,6 +111,10 @@ class ApplicationServicesHandler(object):
events_processed_counter.inc_by(len(events)) events_processed_counter.inc_by(len(events))
yield self.store.set_appservice_last_pos(upper_bound) yield self.store.set_appservice_last_pos(upper_bound)
synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender",
)
finally: finally:
self.is_processing = False self.is_processing = False

View File

@ -151,6 +151,19 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
tick_time = reactor_metrics.register_distribution("tick_time") tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls") pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
synapse_metrics = get_metrics_for("synapse")
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = synapse_metrics.register_gauge(
"event_processing_positions", labels=["name"],
)
# Used to track the current max events stream position
event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position",
)
def runUntilCurrentTimer(func): def runUntilCurrentTimer(func):

View File

@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties=new_forward_extremeties, new_forward_extremeties=new_forward_extremeties,
) )
persist_event_counter.inc_by(len(chunk)) persist_event_counter.inc_by(len(chunk))
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
for event, context in chunk: for event, context in chunk:
if context.app_service: if context.app_service:
origin_type = "local" origin_type = "local"