Merge pull request #456 from matrix-org/store_event_actions

Send unread notification counts
This commit is contained in:
David Baker 2016-01-08 14:47:15 +00:00
commit c232780081
19 changed files with 541 additions and 36 deletions

View file

@ -19,6 +19,7 @@ from synapse.api.errors import LimitExceededError, SynapseError, AuthError
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID, RoomAlias
from synapse.push.action_generator import ActionGenerator
from synapse.util.logcontext import PreserveLoggingContext
@ -252,6 +253,11 @@ class BaseHandler(object):
event, context=context
)
action_generator = ActionGenerator(self.store)
yield action_generator.handle_push_actions_for_event(
event, self
)
destinations = set(extra_destinations)
for k, s in context.current_state.items():
try:

View file

@ -36,6 +36,8 @@ from synapse.events.utils import prune_event
from synapse.util.retryutils import NotRetryingDestination
from synapse.push.action_generator import ActionGenerator
from twisted.internet import defer
import itertools
@ -242,6 +244,12 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
if not backfilled and not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.store)
yield action_generator.handle_push_actions_for_event(
event, self
)
@defer.inlineCallbacks
def _filter_events_for_server(self, server_name, room_id, events):
event_to_state = yield self.store.get_state_for_events(

View file

@ -84,7 +84,8 @@ class RegistrationHandler(BaseHandler):
localpart=None,
password=None,
generate_token=True,
guest_access_token=None
guest_access_token=None,
make_guest=False
):
"""Registers a new client on the server.
@ -118,6 +119,7 @@ class RegistrationHandler(BaseHandler):
token=token,
password_hash=password_hash,
was_guest=guest_access_token is not None,
make_guest=make_guest,
)
yield registered_user(self.distributor, user)

View file

@ -54,6 +54,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"state", # dict[(str, str), FrozenEvent]
"ephemeral",
"account_data",
"unread_notification_count",
])):
__slots__ = []
@ -66,6 +67,8 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
or self.state
or self.ephemeral
or self.account_data
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)
@ -163,6 +166,18 @@ class SyncHandler(BaseHandler):
else:
return self.incremental_sync_with_gap(sync_config, since_token)
def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
if room_id not in ephemeral_by_room:
return None
for e in ephemeral_by_room[room_id]:
if e['type'] != 'm.receipt':
continue
for receipt_event_id, val in e['content'].items():
if 'm.read' in val:
if user_id in val['m.read']:
return receipt_event_id
return None
@defer.inlineCallbacks
def full_state_sync(self, sync_config, timeline_since_token):
"""Get a sync for a client which is starting without any state.
@ -274,6 +289,13 @@ class SyncHandler(BaseHandler):
room_id, sync_config, now_token, since_token=timeline_since_token
)
notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, ephemeral_by_room
)
notif_count = None
if notifs is not None:
notif_count = len(notifs)
current_state = yield self.get_state_at(room_id, now_token)
defer.returnValue(JoinedSyncResult(
@ -284,6 +306,7 @@ class SyncHandler(BaseHandler):
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
unread_notification_count=notif_count,
))
def account_data_for_user(self, account_data):
@ -423,6 +446,13 @@ class SyncHandler(BaseHandler):
)
now_token = now_token.copy_and_replace("presence_key", presence_key)
# We now fetch all ephemeral events for this room in order to get
# this users current read receipt. This could almost certainly be
# optimised.
_, all_ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token
)
now_token, ephemeral_by_room = yield self.ephemeral_by_room(
sync_config, now_token, since_token
)
@ -496,6 +526,13 @@ class SyncHandler(BaseHandler):
else:
prev_batch = now_token
notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, all_ephemeral_by_room
)
notif_count = None
if notifs is not None:
notif_count = len(notifs)
just_joined = yield self.check_joined_room(sync_config, state)
if just_joined:
logger.debug("User has just joined %s: needs full state",
@ -516,6 +553,7 @@ class SyncHandler(BaseHandler):
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
unread_notification_count=notif_count
)
logger.debug("Result for room %s: %r", room_id, room_sync)
@ -650,6 +688,13 @@ class SyncHandler(BaseHandler):
if just_joined:
state = yield self.get_state_at(room_id, now_token)
notifs = yield self.unread_notifs_for_room_id(
room_id, sync_config, ephemeral_by_room
)
notif_count = None
if notifs is not None:
notif_count = len(notifs)
room_sync = JoinedSyncResult(
room_id=room_id,
timeline=batch,
@ -658,6 +703,7 @@ class SyncHandler(BaseHandler):
account_data=self.account_data_for_room(
room_id, tags_by_room, account_data_by_room
),
unread_notification_count=notif_count,
)
logger.debug("Room sync: %r", room_sync)
@ -788,3 +834,20 @@ class SyncHandler(BaseHandler):
if join_event.content["membership"] == Membership.JOIN:
return True
return False
@defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
last_unread_event_id = self.last_read_event_id_for_room_and_user(
room_id, sync_config.user.to_string(), ephemeral_by_room
)
notifs = []
if last_unread_event_id:
notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
room_id, sync_config.user.to_string(), last_unread_event_id
)
else:
# There is no new information in this period, so your notification
# count is whatever it was last time.
defer.returnValue(None)
defer.returnValue(notifs)