Wire together receipts and the notifer/federation

This commit is contained in:
Erik Johnston 2015-07-07 15:25:30 +01:00
parent 716e426933
commit ca041d5526
4 changed files with 126 additions and 33 deletions

View File

@ -37,7 +37,8 @@ class ReceiptsHandler(BaseHandler):
"m.receipt", self._received_remote_receipt "m.receipt", self._received_remote_receipt
) )
self._latest_serial = 0 # self._earliest_cached_serial = 0
# self._rooms_to_latest_serial = {}
@defer.inlineCallbacks @defer.inlineCallbacks
def received_client_receipt(self, room_id, receipt_type, user_id, def received_client_receipt(self, room_id, receipt_type, user_id,
@ -53,7 +54,9 @@ class ReceiptsHandler(BaseHandler):
"event_ids": [event_id], "event_ids": [event_id],
} }
yield self._handle_new_receipts([receipt]) is_new = yield self._handle_new_receipts([receipt])
if is_new:
self._push_remotes([receipt]) self._push_remotes([receipt])
@defer.inlineCallbacks @defer.inlineCallbacks
@ -81,33 +84,24 @@ class ReceiptsHandler(BaseHandler):
user_id = receipt["user_id"] user_id = receipt["user_id"]
event_ids = receipt["event_ids"] event_ids = receipt["event_ids"]
stream_id, max_persisted_id = yield self.store.insert_receipt( res = yield self.store.insert_receipt(
room_id, receipt_type, user_id, event_ids, room_id, receipt_type, user_id, event_ids,
) )
# TODO: Use max_persisted_id if not res:
# res will be None if this read receipt is 'old'
defer.returnValue(False)
self._latest_serial = max(self._latest_serial, stream_id) stream_id, max_persisted_id = res
with PreserveLoggingContext(): with PreserveLoggingContext():
self.notifier.on_new_event( self.notifier.on_new_event(
"receipt_key", self._latest_serial, rooms=[room_id] "receipt_key", max_persisted_id, rooms=[room_id]
) )
localusers = set() defer.returnValue(True)
remotedomains = set()
rm_handler = self.hs.get_handlers().room_member_handler
yield rm_handler.fetch_room_distributions_into(
room_id, localusers=localusers, remotedomains=remotedomains
)
receipt["remotedomains"] = remotedomains
self.notifier.on_new_event(
"receipt_key", self._latest_room_serial, rooms=[room_id]
)
@defer.inlineCallbacks
def _push_remotes(self, receipts): def _push_remotes(self, receipts):
# TODO: Some of this stuff should be coallesced. # TODO: Some of this stuff should be coallesced.
for receipt in receipts: for receipt in receipts:
@ -115,7 +109,15 @@ class ReceiptsHandler(BaseHandler):
receipt_type = receipt["receipt_type"] receipt_type = receipt["receipt_type"]
user_id = receipt["user_id"] user_id = receipt["user_id"]
event_ids = receipt["event_ids"] event_ids = receipt["event_ids"]
remotedomains = receipt["remotedomains"]
remotedomains = set()
rm_handler = self.hs.get_handlers().room_member_handler
yield rm_handler.fetch_room_distributions_into(
room_id, localusers=None, remotedomains=remotedomains
)
logger.debug("Sending receipt to: %r", remotedomains)
for domain in remotedomains: for domain in remotedomains:
self.federation.send_edu( self.federation.send_edu(
@ -130,3 +132,40 @@ class ReceiptsHandler(BaseHandler):
}, },
}, },
) )
class ReceiptEventSource(object):
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_new_events_for_user(self, user, from_key, limit):
from_key = int(from_key)
to_key = yield self.get_current_key()
rooms = yield self.store.get_rooms_for_user(user.to_string())
rooms = [room.room_id for room in rooms]
content = {}
for room_id in rooms:
result = yield self.store.get_linearized_receipts_for_room(
room_id, from_key, to_key
)
if result:
content[room_id] = result
if not content:
defer.returnValue(([], to_key))
event = {
"type": "m.receipt",
"content": content,
}
defer.returnValue(([event], to_key))
def get_current_key(self, direction='f'):
return self.store.get_max_receipt_stream_id()
@defer.inlineCallbacks
def get_pagination_rows(self, user, config, key):
defer.returnValue(([{}], 0))

View File

@ -28,7 +28,7 @@ class ReceiptRestServlet(RestServlet):
PATTERN = client_v2_pattern( PATTERN = client_v2_pattern(
"/rooms/(?P<room_id>[^/]*)" "/rooms/(?P<room_id>[^/]*)"
"/receipt/(?P<receipt_type>[^/]*)" "/receipt/(?P<receipt_type>[^/]*)"
"/(?P<event_id>[^/])*" "/(?P<event_id>[^/]*)$"
) )
def __init__(self, hs): def __init__(self, hs):
@ -41,7 +41,6 @@ class ReceiptRestServlet(RestServlet):
def on_POST(self, request, room_id, receipt_type, event_id): def on_POST(self, request, room_id, receipt_type, event_id):
user, client = yield self.auth.get_user_by_req(request) user, client = yield self.auth.get_user_by_req(request)
# TODO: STUFF
yield self.receipts_handler.received_client_receipt( yield self.receipts_handler.received_client_receipt(
room_id, room_id,
receipt_type, receipt_type,

View File

@ -17,17 +17,33 @@ from ._base import SQLBaseStore, cached
from twisted.internet import defer from twisted.internet import defer
import logging
logger = logging.getLogger(__name__)
class ReceiptsStore(SQLBaseStore): class ReceiptsStore(SQLBaseStore):
@cached
@defer.inlineCallbacks @defer.inlineCallbacks
def get_linearized_receipts_for_room(self, room_id): def get_linearized_receipts_for_room(self, room_id, from_key, to_key):
rows = yield self._simple_select_list( def f(txn):
table="receipts_linearized", sql = (
keyvalues={"room_id": room_id}, "SELECT * FROM receipts_linearized WHERE"
retcols=["receipt_type", "user_id", "event_id"], " room_id = ? AND stream_id > ? AND stream_id <= ?"
desc="get_linearized_receipts_for_room", )
txn.execute(
sql,
(room_id, from_key, to_key)
)
rows = self.cursor_to_dict(txn)
return rows
rows = yield self.runInteraction(
"get_linearized_receipts_for_room", f
) )
result = {} result = {}
@ -40,6 +56,9 @@ class ReceiptsStore(SQLBaseStore):
defer.returnValue(result) defer.returnValue(result)
def get_max_receipt_stream_id(self):
return self._receipts_id_gen.get_max_token(self)
@cached @cached
@defer.inlineCallbacks @defer.inlineCallbacks
def get_graph_receipts_for_room(self, room_id): def get_graph_receipts_for_room(self, room_id):
@ -62,11 +81,38 @@ class ReceiptsStore(SQLBaseStore):
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, stream_id): user_id, event_id, stream_id):
# We don't want to clobber receipts for more recent events, so we
# have to compare orderings of existing receipts
sql = (
"SELECT topological_ordering, stream_ordering, event_id FROM events"
" INNER JOIN receipts_linearized as r USING (event_id, room_id)"
" WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
)
txn.execute(sql, (room_id, receipt_type, user_id))
results = txn.fetchall()
if results:
res = self._simple_select_one_txn(
txn,
table="events",
retcols=["topological_ordering", "stream_ordering"],
keyvalues={"event_id": event_id},
)
topological_ordering = int(res["topological_ordering"])
stream_ordering = int(res["stream_ordering"])
for to, so, _ in results:
if int(to) > topological_ordering:
return False
elif int(to) == topological_ordering and int(so) >= stream_ordering:
return False
self._simple_delete_txn( self._simple_delete_txn(
txn, txn,
table="receipts_linearized", table="receipts_linearized",
keyvalues={ keyvalues={
"stream_id": stream_id,
"room_id": room_id, "room_id": room_id,
"receipt_type": receipt_type, "receipt_type": receipt_type,
"user_id": user_id, "user_id": user_id,
@ -85,6 +131,8 @@ class ReceiptsStore(SQLBaseStore):
} }
) )
return True
@defer.inlineCallbacks @defer.inlineCallbacks
def insert_receipt(self, room_id, receipt_type, user_id, event_ids): def insert_receipt(self, room_id, receipt_type, user_id, event_ids):
if not event_ids: if not event_ids:
@ -115,13 +163,16 @@ class ReceiptsStore(SQLBaseStore):
stream_id_manager = yield self._receipts_id_gen.get_next(self) stream_id_manager = yield self._receipts_id_gen.get_next(self)
with stream_id_manager as stream_id: with stream_id_manager as stream_id:
yield self.runInteraction( have_persisted = yield self.runInteraction(
"insert_linearized_receipt", "insert_linearized_receipt",
self.insert_linearized_receipt_txn, self.insert_linearized_receipt_txn,
room_id, receipt_type, user_id, linearized_event_id, room_id, receipt_type, user_id, linearized_event_id,
stream_id=stream_id, stream_id=stream_id,
) )
if not have_persisted:
defer.returnValue(None)
yield self.insert_graph_receipt( yield self.insert_graph_receipt(
room_id, receipt_type, user_id, event_ids room_id, receipt_type, user_id, event_ids
) )

View File

@ -20,6 +20,7 @@ from synapse.types import StreamToken
from synapse.handlers.presence import PresenceEventSource from synapse.handlers.presence import PresenceEventSource
from synapse.handlers.room import RoomEventSource from synapse.handlers.room import RoomEventSource
from synapse.handlers.typing import TypingNotificationEventSource from synapse.handlers.typing import TypingNotificationEventSource
from synapse.handlers.receipts import ReceiptEventSource
class NullSource(object): class NullSource(object):
@ -43,6 +44,7 @@ class EventSources(object):
"room": RoomEventSource, "room": RoomEventSource,
"presence": PresenceEventSource, "presence": PresenceEventSource,
"typing": TypingNotificationEventSource, "typing": TypingNotificationEventSource,
"receipt": ReceiptEventSource,
} }
def __init__(self, hs): def __init__(self, hs):
@ -63,7 +65,9 @@ class EventSources(object):
typing_key=( typing_key=(
yield self.sources["typing"].get_current_key() yield self.sources["typing"].get_current_key()
), ),
receipt_key="0", receipt_key=(
yield self.sources["receipt"].get_current_key()
),
) )
defer.returnValue(token) defer.returnValue(token)