diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5b0b798e5..963d938ed 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -233,12 +233,27 @@ class TransactionQueue(object): consumeErrors=True )) - events_processed_counter.inc_by(len(events)) - yield self.store.update_federation_out_pos( "events", next_token ) + if events: + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "federation_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "federation_sender", + ) + + events_processed_counter.inc_by(len(events)) + + synapse.metrics.event_processing_positions.set( + next_token, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 69eacff94..0245197c0 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -125,9 +125,23 @@ class ApplicationServicesHandler(object): for evs in events_by_room.itervalues() ], consumeErrors=True)) + yield self.store.set_appservice_last_pos(upper_bound) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_positions.set( + upper_bound, "appservice_sender", + ) + events_processed_counter.inc_by(len(events)) - yield self.store.set_appservice_last_pos(upper_bound) + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2ed82b04a..e3b831db6 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -23,7 +23,7 @@ from twisted.internet import reactor from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - MemoryUsageMetric, + MemoryUsageMetric, GaugeMetric, ) from .process_collector import register_process_collector @@ -65,6 +65,13 @@ class Metrics(object): """ return self._register(CounterMetric, *args, **kwargs) + def register_gauge(self, *args, **kwargs): + """ + Returns: + GaugeMetric + """ + return self._register(GaugeMetric, *args, **kwargs) + def register_callback(self, *args, **kwargs): """ Returns: @@ -144,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor") tick_time = reactor_metrics.register_distribution("tick_time") pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +synapse_metrics = get_metrics_for("synapse") + +# Used to track where various components have processed in the event stream, +# e.g. federation sending, appservice sending, etc. +event_processing_positions = synapse_metrics.register_gauge( + "event_processing_positions", labels=["name"], +) + +# Used to track the current max events stream position +event_persisted_position = synapse_metrics.register_gauge( + "event_persisted_position", +) + +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = synapse_metrics.register_gauge( + "event_processing_last_ts", labels=["name"], +) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = synapse_metrics.register_gauge( + "event_processing_lag", labels=["name"], +) + def runUntilCurrentTimer(func): diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index ff5aa8c0e..89bd47c3f 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -115,7 +115,7 @@ class CounterMetric(BaseMetric): # dict[list[str]]: value for each set of label values. the keys are the # label values, in the same order as the labels in self.labels. # - # (if the metric is a scalar, the (single) key is the empty list). + # (if the metric is a scalar, the (single) key is the empty tuple). self.counts = {} # Scalar metrics are never empty @@ -145,6 +145,36 @@ class CounterMetric(BaseMetric): ) +class GaugeMetric(BaseMetric): + """A metric that can go up or down + """ + + def __init__(self, *args, **kwargs): + super(GaugeMetric, self).__init__(*args, **kwargs) + + # dict[list[str]]: value for each set of label values. the keys are the + # label values, in the same order as the labels in self.labels. + # + # (if the metric is a scalar, the (single) key is the empty tuple). + self.guages = {} + + def set(self, v, *values): + if len(values) != self.dimension(): + raise ValueError( + "Expected as many values to inc() as labels (%d)" % (self.dimension()) + ) + + # TODO: should assert that the tag values are all strings + + self.guages[values] = v + + def render(self): + return flatten( + self._render_for_labels(k, self.guages[k]) + for k in sorted(self.guages.keys()) + ) + + class CallbackMetric(BaseMetric): """A metric that returns the numeric value returned by a callback whenever it is rendered. Typically this is used to implement gauges that yield the diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ece5e6c41..da44b52fd 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore): new_forward_extremeties=new_forward_extremeties, ) persist_event_counter.inc_by(len(chunk)) + synapse.metrics.event_persisted_position.set( + chunk[-1][0].internal_metadata.stream_ordering, + ) for event, context in chunk: if context.app_service: origin_type = "local" diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78b..a937b9bce 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,26 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event. + + Raises an exception for unknown events. + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timestamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a667fb6f0..b75345594 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -31,6 +31,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.mock_scheduler = Mock() hs = Mock() hs.get_datastore = Mock(return_value=self.mock_store) + self.mock_store.get_received_ts.return_value = 0 hs.get_application_service_api = Mock(return_value=self.mock_as_api) hs.get_application_service_scheduler = Mock(return_value=self.mock_scheduler) hs.get_clock.return_value = MockClock()