Track last processed event received_ts

This commit is contained in:
Erik Johnston 2018-04-11 11:52:19 +01:00
parent 92e34615c5
commit 4dae4a97ed
4 changed files with 52 additions and 0 deletions

View File

@ -243,6 +243,17 @@ class TransactionQueue(object):
next_token, "federation_sender", next_token, "federation_sender",
) )
if events:
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.set(
now - ts, "federation_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "federation_sender",
)
finally: finally:
self._is_processing = False self._is_processing = False

View File

@ -115,6 +115,16 @@ class ApplicationServicesHandler(object):
synapse.metrics.event_processing_positions.set( synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender", upper_bound, "appservice_sender",
) )
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.set(
now - ts, "appservice_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "appservice_sender",
)
finally: finally:
self.is_processing = False self.is_processing = False

View File

@ -164,6 +164,19 @@ event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position", "event_persisted_position",
) )
# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = synapse_metrics.register_gauge(
"event_processing_last_ts", labels=["name"],
)
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = synapse_metrics.register_gauge(
"event_processing_lag", labels=["name"],
)
def runUntilCurrentTimer(func): def runUntilCurrentTimer(func):

View File

@ -51,6 +51,24 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
class EventsWorkerStore(SQLBaseStore): class EventsWorkerStore(SQLBaseStore):
def get_received_ts(self, event_id):
"""Get received_ts (when it was persisted) for the event
Args:
event_id (str)
Returns:
Deferred[int|None]: Timstamp in milliseconds, or None for events
that were persisted before received_ts was implemented.
"""
return self._simple_select_one_onecol(
table="events",
keyvalues={
"event_id": event_id,
},
retcol="received_ts",
desc="get_received_ts",
)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True, def get_event(self, event_id, check_redacted=True,