Add some debug about processing read receipts.

I'm hoping to establish which rooms are having lots of RRs sent for them, and
how old the events are when they are sent.
This commit is contained in:
Richard van der Hoff 2019-03-04 18:18:11 +00:00
parent b29693a30b
commit 2db49ea476
2 changed files with 21 additions and 6 deletions

1
changelog.d/4798.misc Normal file
View File

@ -0,0 +1 @@
Add some debug about processing read receipts.

View File

@ -346,15 +346,23 @@ class ReceiptsStore(ReceiptsWorkerStore):
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id): user_id, event_id, data, stream_id):
"""Inserts a read-receipt into the database if it's newer than the current RR
Returns: int|None
None if the RR is older than the current RR
otherwise, the rx timestamp of the event that the RR corresponds to
(or 0 if the event is unknown)
"""
res = self._simple_select_one_txn( res = self._simple_select_one_txn(
txn, txn,
table="events", table="events",
retcols=["topological_ordering", "stream_ordering"], retcols=["stream_ordering", "received_ts"],
keyvalues={"event_id": event_id}, keyvalues={"event_id": event_id},
allow_none=True allow_none=True
) )
stream_ordering = int(res["stream_ordering"]) if res else None stream_ordering = int(res["stream_ordering"]) if res else None
rx_ts = res["received_ts"] if res else 0
# We don't want to clobber receipts for more recent events, so we # We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts # have to compare orderings of existing receipts
@ -373,7 +381,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
"one for later event %s", "one for later event %s",
event_id, eid, event_id, eid,
) )
return False return None
txn.call_after( txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type) self.get_receipts_for_room.invalidate, (room_id, receipt_type)
@ -429,7 +437,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
stream_ordering=stream_ordering, stream_ordering=stream_ordering,
) )
return True return rx_ts
@defer.inlineCallbacks @defer.inlineCallbacks
def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data):
@ -466,7 +474,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
stream_id_manager = self._receipts_id_gen.get_next() stream_id_manager = self._receipts_id_gen.get_next()
with stream_id_manager as stream_id: with stream_id_manager as stream_id:
have_persisted = yield self.runInteraction( event_ts = yield self.runInteraction(
"insert_linearized_receipt", "insert_linearized_receipt",
self.insert_linearized_receipt_txn, self.insert_linearized_receipt_txn,
room_id, receipt_type, user_id, linearized_event_id, room_id, receipt_type, user_id, linearized_event_id,
@ -474,9 +482,15 @@ class ReceiptsStore(ReceiptsWorkerStore):
stream_id=stream_id, stream_id=stream_id,
) )
if not have_persisted: if event_ts is None:
defer.returnValue(None) defer.returnValue(None)
now = self._clock.time_msec()
logger.debug(
"RR for event %s in %s (%i ms old)",
linearized_event_id, room_id, now - event_ts,
)
yield self.insert_graph_receipt( yield self.insert_graph_receipt(
room_id, receipt_type, user_id, event_ids, data room_id, receipt_type, user_id, event_ids, data
) )