Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2017-02-16 15:12:52 +00:00
commit c97d491649
9 changed files with 462 additions and 83 deletions

View file

@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.api import errors from synapse.api import errors
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.util import stringutils from synapse.util import stringutils
@ -246,30 +245,51 @@ class DeviceHandler(BaseHandler):
# Then work out if any users have since joined # Then work out if any users have since joined
rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key) rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
stream_ordering = RoomStreamToken.parse_stream_token(
from_token.room_key).stream
possibly_changed = set(changed) possibly_changed = set(changed)
for room_id in rooms_changed: for room_id in rooms_changed:
# Fetch the current state at the time. # Fetch the current state at the time.
stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
try: try:
event_ids = yield self.store.get_forward_extremeties_for_room( event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_ordering=stream_ordering room_id, stream_ordering=stream_ordering
) )
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) except errors.StoreError:
except: # we have purged the stream_ordering index since the stream
prev_state_ids = {} # ordering: treat it the same as a new room
event_ids = []
current_state_ids = yield self.state.get_current_state_ids(room_id) current_state_ids = yield self.state.get_current_state_ids(room_id)
# special-case for an empty prev state: include all members
# in the changed list
if not event_ids:
for key, event_id in current_state_ids.iteritems():
etype, state_key = key
if etype != EventTypes.Member:
continue
possibly_changed.add(state_key)
continue
# mapping from event_id -> state_dict
prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
# If there has been any change in membership, include them in the # If there has been any change in membership, include them in the
# possibly changed list. We'll check if they are joined below, # possibly changed list. We'll check if they are joined below,
# and we're not toooo worried about spuriously adding users. # and we're not toooo worried about spuriously adding users.
for key, event_id in current_state_ids.iteritems(): for key, event_id in current_state_ids.iteritems():
etype, state_key = key etype, state_key = key
if etype == EventTypes.Member: if etype != EventTypes.Member:
prev_event_id = prev_state_ids.get(key, None) continue
# check if this member has changed since any of the extremities
# at the stream_ordering, and add them to the list if so.
for state_dict in prev_state_ids.values():
prev_event_id = state_dict.get(key, None)
if not prev_event_id or prev_event_id != event_id: if not prev_event_id or prev_event_id != event_id:
possibly_changed.add(state_key) possibly_changed.add(state_key)
break
users_who_share_room = yield self.store.get_users_who_share_room_with_user( users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id user_id

View file

@ -85,6 +85,12 @@ class SlavedEventStore(BaseSlavedStore):
get_unread_event_push_actions_by_room_for_user = ( get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
) )
_get_unread_counts_by_receipt_txn = (
DataStore._get_unread_counts_by_receipt_txn.__func__
)
_get_unread_counts_by_pos_txn = (
DataStore._get_unread_counts_by_pos_txn.__func__
)
_get_state_group_for_events = ( _get_state_group_for_events = (
StateStore.__dict__["_get_state_group_for_events"] StateStore.__dict__["_get_state_group_for_events"]
) )

View file

@ -609,6 +609,10 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
raise SynapseError(400, "Missing user_id key.") raise SynapseError(400, "Missing user_id key.")
target = UserID.from_string(content["user_id"]) target = UserID.from_string(content["user_id"])
event_content = None
if 'reason' in content and membership_action in ['kick', 'ban']:
event_content = {'reason': content['reason']}
yield self.handlers.room_member_handler.update_membership( yield self.handlers.room_member_handler.update_membership(
requester=requester, requester=requester,
target=target, target=target,
@ -616,6 +620,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
action=membership_action, action=membership_action,
txn_id=txn_id, txn_id=txn_id,
third_party_signed=content.get("third_party_signed", None), third_party_signed=content.get("third_party_signed", None),
content=event_content,
) )
defer.returnValue((200, {})) defer.returnValue((200, {}))

View file

@ -281,15 +281,30 @@ class EventFederationStore(SQLBaseStore):
) )
def get_forward_extremeties_for_room(self, room_id, stream_ordering): def get_forward_extremeties_for_room(self, room_id, stream_ordering):
"""For a given room_id and stream_ordering, return the forward
extremeties of the room at that point in "time".
Throws a StoreError if we have since purged the index for
stream_orderings from that point.
Args:
room_id (str):
stream_ordering (int):
Returns:
deferred, which resolves to a list of event_ids
"""
# We want to make the cache more effective, so we clamp to the last # We want to make the cache more effective, so we clamp to the last
# change before the given ordering. # change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
# We don't always have a full stream_to_exterm_id table, e.g. after # We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a # the upgrade that introduced it, so we make sure we never ask for a
# try and pin to a stream_ordering from before a restart # stream_ordering from before a restart
last_change = max(self._stream_order_on_start, last_change) last_change = max(self._stream_order_on_start, last_change)
# provided the last_change is recent enough, we now clamp the requested
# stream_ordering to it.
if last_change > self.stream_ordering_month_ago: if last_change > self.stream_ordering_month_ago:
stream_ordering = min(last_change, stream_ordering) stream_ordering = min(last_change, stream_ordering)

View file

@ -15,6 +15,7 @@
from ._base import SQLBaseStore from ._base import SQLBaseStore
from twisted.internet import defer from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from .stream import lower_bound from .stream import lower_bound
@ -25,11 +26,46 @@ import ujson as json
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DEFAULT_NOTIF_ACTION = ["notify", {"set_tweak": "highlight", "value": False}]
DEFAULT_HIGHLIGHT_ACTION = [
"notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
]
def _serialize_action(actions, is_highlight):
"""Custom serializer for actions. This allows us to "compress" common actions.
We use the fact that most users have the same actions for notifs (and for
highlights).
We store these default actions as the empty string rather than the full JSON.
Since the empty string isn't valid JSON there is no risk of this clashing with
any real JSON actions
"""
if is_highlight:
if actions == DEFAULT_HIGHLIGHT_ACTION:
return "" # We use empty string as the column is non-NULL
else:
if actions == DEFAULT_NOTIF_ACTION:
return ""
return json.dumps(actions)
def _deserialize_action(actions, is_highlight):
"""Custom deserializer for actions. This allows us to "compress" common actions
"""
if actions:
return json.loads(actions)
if is_highlight:
return DEFAULT_HIGHLIGHT_ACTION
else:
return DEFAULT_NOTIF_ACTION
class EventPushActionsStore(SQLBaseStore): class EventPushActionsStore(SQLBaseStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index" EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(self, hs): def __init__(self, hs):
self.stream_ordering_month_ago = None
super(EventPushActionsStore, self).__init__(hs) super(EventPushActionsStore, self).__init__(hs)
self.register_background_index_update( self.register_background_index_update(
@ -47,6 +83,9 @@ class EventPushActionsStore(SQLBaseStore):
where_clause="highlight=1" where_clause="highlight=1"
) )
self._doing_notif_rotation = False
self._clock.looping_call(self._rotate_notifs, 30 * 60 * 1000)
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
""" """
Args: Args:
@ -55,15 +94,17 @@ class EventPushActionsStore(SQLBaseStore):
""" """
values = [] values = []
for uid, actions in tuples: for uid, actions in tuples:
is_highlight = 1 if _action_has_highlight(actions) else 0
values.append({ values.append({
'room_id': event.room_id, 'room_id': event.room_id,
'event_id': event.event_id, 'event_id': event.event_id,
'user_id': uid, 'user_id': uid,
'actions': json.dumps(actions), 'actions': _serialize_action(actions, is_highlight),
'stream_ordering': event.internal_metadata.stream_ordering, 'stream_ordering': event.internal_metadata.stream_ordering,
'topological_ordering': event.depth, 'topological_ordering': event.depth,
'notif': 1, 'notif': 1,
'highlight': 1 if _action_has_highlight(actions) else 0, 'highlight': is_highlight,
}) })
for uid, __ in tuples: for uid, __ in tuples:
@ -77,67 +118,90 @@ class EventPushActionsStore(SQLBaseStore):
def get_unread_event_push_actions_by_room_for_user( def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id self, room_id, user_id, last_read_event_id
): ):
def _get_unread_event_push_actions_by_room(txn):
sql = (
"SELECT stream_ordering, topological_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
txn.execute(
sql, (room_id, last_read_event_id)
)
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0}
stream_ordering = results[0][0]
topological_ordering = results[0][1]
token = RoomStreamToken(
topological_ordering, stream_ordering
)
# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*)"
" FROM (SELECT * FROM event_push_actions"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND %s LIMIT 100) as ea"
) % (lower_bound(token, self.database_engine, inclusive=False),)
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
notify_count = row[0] if row else 0
# Now get the number of highlights
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
highlight_count = row[0] if row else 0
return {
"notify_count": notify_count,
"highlight_count": highlight_count,
}
ret = yield self.runInteraction( ret = yield self.runInteraction(
"get_unread_event_push_actions_by_room", "get_unread_event_push_actions_by_room",
_get_unread_event_push_actions_by_room self._get_unread_counts_by_receipt_txn,
room_id, user_id, last_read_event_id
) )
defer.returnValue(ret) defer.returnValue(ret)
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
last_read_event_id):
sql = (
"SELECT stream_ordering, topological_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
txn.execute(
sql, (room_id, last_read_event_id)
)
results = txn.fetchall()
if len(results) == 0:
return {"notify_count": 0, "highlight_count": 0}
stream_ordering = results[0][0]
topological_ordering = results[0][1]
return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, topological_ordering, stream_ordering
)
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
stream_ordering):
token = RoomStreamToken(
topological_ordering, stream_ordering
)
# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
# notif=1
sql = (
"SELECT count(*)"
" FROM (SELECT * FROM event_push_actions"
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND %s LIMIT 100) as ea"
) % (lower_bound(token, self.database_engine, inclusive=False),)
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
notify_count = row[0] if row else 0
summary_notif_count = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary",
keyvalues={
"user_id": user_id,
"room_id": room_id,
},
retcol="notif_count",
allow_none=True,
)
if summary_notif_count:
notify_count += summary_notif_count
# Now get the number of highlights
sql = (
"SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
highlight_count = row[0] if row else 0
return {
"notify_count": notify_count,
"highlight_count": highlight_count,
}
@defer.inlineCallbacks @defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
def f(txn): def f(txn):
@ -176,7 +240,8 @@ class EventPushActionsStore(SQLBaseStore):
# find rooms that have a read receipt in them and return the next # find rooms that have a read receipt in them and return the next
# push actions # push actions
sql = ( sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions" "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" ep.highlight "
" FROM (" " FROM ("
" SELECT room_id," " SELECT room_id,"
" MAX(topological_ordering) as topological_ordering," " MAX(topological_ordering) as topological_ordering,"
@ -217,7 +282,7 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn): def get_no_receipt(txn):
sql = ( sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" e.received_ts" " ep.highlight "
" FROM event_push_actions AS ep" " FROM event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)" " INNER JOIN events AS e USING (room_id, event_id)"
" WHERE" " WHERE"
@ -246,7 +311,7 @@ class EventPushActionsStore(SQLBaseStore):
"event_id": row[0], "event_id": row[0],
"room_id": row[1], "room_id": row[1],
"stream_ordering": row[2], "stream_ordering": row[2],
"actions": json.loads(row[3]), "actions": _deserialize_action(row[3], row[4]),
} for row in after_read_receipt + no_read_receipt } for row in after_read_receipt + no_read_receipt
] ]
@ -285,7 +350,7 @@ class EventPushActionsStore(SQLBaseStore):
def get_after_receipt(txn): def get_after_receipt(txn):
sql = ( sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" e.received_ts" " ep.highlight, e.received_ts"
" FROM (" " FROM ("
" SELECT room_id," " SELECT room_id,"
" MAX(topological_ordering) as topological_ordering," " MAX(topological_ordering) as topological_ordering,"
@ -327,7 +392,7 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn): def get_no_receipt(txn):
sql = ( sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" e.received_ts" " ep.highlight, e.received_ts"
" FROM event_push_actions AS ep" " FROM event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)" " INNER JOIN events AS e USING (room_id, event_id)"
" WHERE" " WHERE"
@ -357,8 +422,8 @@ class EventPushActionsStore(SQLBaseStore):
"event_id": row[0], "event_id": row[0],
"room_id": row[1], "room_id": row[1],
"stream_ordering": row[2], "stream_ordering": row[2],
"actions": json.loads(row[3]), "actions": _deserialize_action(row[3], row[4]),
"received_ts": row[4], "received_ts": row[5],
} for row in after_read_receipt + no_read_receipt } for row in after_read_receipt + no_read_receipt
] ]
@ -392,7 +457,7 @@ class EventPushActionsStore(SQLBaseStore):
sql = ( sql = (
"SELECT epa.event_id, epa.room_id," "SELECT epa.event_id, epa.room_id,"
" epa.stream_ordering, epa.topological_ordering," " epa.stream_ordering, epa.topological_ordering,"
" epa.actions, epa.profile_tag, e.received_ts" " epa.actions, epa.highlight, epa.profile_tag, e.received_ts"
" FROM event_push_actions epa, events e" " FROM event_push_actions epa, events e"
" WHERE epa.event_id = e.event_id" " WHERE epa.event_id = e.event_id"
" AND epa.user_id = ? %s" " AND epa.user_id = ? %s"
@ -407,7 +472,7 @@ class EventPushActionsStore(SQLBaseStore):
"get_push_actions_for_user", f "get_push_actions_for_user", f
) )
for pa in push_actions: for pa in push_actions:
pa["actions"] = json.loads(pa["actions"]) pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
defer.returnValue(push_actions) defer.returnValue(push_actions)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -448,10 +513,14 @@ class EventPushActionsStore(SQLBaseStore):
) )
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering): topological_ordering, stream_ordering):
""" """
Purges old, stale push actions for a user and room before a given Purges old push actions for a user and room before a given
topological_ordering topological_ordering.
We however keep a months worth of highlighted notifications, so that
users can still get a list of recent highlights.
Args: Args:
txn: The transcation txn: The transcation
room_id: Room ID to delete from room_id: Room ID to delete from
@ -475,10 +544,16 @@ class EventPushActionsStore(SQLBaseStore):
txn.execute( txn.execute(
"DELETE FROM event_push_actions " "DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND " " WHERE user_id = ? AND room_id = ? AND "
" topological_ordering < ? AND stream_ordering < ?", " topological_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago) (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
) )
txn.execute("""
DELETE FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))
@defer.inlineCallbacks @defer.inlineCallbacks
def _find_stream_orderings_for_times(self): def _find_stream_orderings_for_times(self):
yield self.runInteraction( yield self.runInteraction(
@ -495,6 +570,14 @@ class EventPushActionsStore(SQLBaseStore):
"Found stream ordering 1 month ago: it's %d", "Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago self.stream_ordering_month_ago
) )
logger.info("Searching for stream ordering 1 day ago")
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 day ago: it's %d",
self.stream_ordering_day_ago
)
def _find_first_stream_ordering_after_ts_txn(self, txn, ts): def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
""" """
@ -534,6 +617,120 @@ class EventPushActionsStore(SQLBaseStore):
return range_end return range_end
@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
self._doing_notif_rotation = True
try:
while True:
logger.info("Rotating notifications")
caught_up = yield self.runInteraction(
"_rotate_notifs",
self._rotate_notifs_txn
)
if caught_up:
break
yield sleep(5)
finally:
self._doing_notif_rotation = False
def _rotate_notifs_txn(self, txn):
"""Archives older notifications into event_push_summary. Returns whether
the archiving process has caught up or not.
"""
# We want to make sure that we only ever do this one at a time
self.database_engine.lock_table(txn, "event_push_summary")
# We don't to try and rotate millions of rows at once, so we cap the
# maximum stream ordering we'll rotate before.
txn.execute("""
SELECT stream_ordering FROM event_push_actions
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""")
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row
rotate_to_stream_ordering = min(
self.stream_ordering_day_ago, offset_stream_ordering
)
caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
else:
rotate_to_stream_ordering = self.stream_ordering_day_ago
caught_up = True
self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
# We have caught up iff we were limited by `stream_ordering_day_ago`
return caught_up
def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)
# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
coalesce(old.notif_count, 0) + upd.notif_count,
upd.stream_ordering,
old.user_id
FROM (
SELECT user_id, room_id, count(*) as notif_count,
max(stream_ordering) as stream_ordering
FROM event_push_actions
WHERE ? <= stream_ordering AND stream_ordering < ?
AND highlight = 0
GROUP BY user_id, room_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""
txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,))
rows = txn.fetchall()
# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
# existing table.
self._simple_insert_many_txn(
txn,
table="event_push_summary",
values=[
{
"user_id": row[0],
"room_id": row[1],
"notif_count": row[2],
"stream_ordering": row[3],
}
for row in rows if row[4] is None
]
)
txn.executemany(
"""
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None)
)
txn.execute(
"DELETE FROM event_push_actions"
" WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
(old_rotate_stream_ordering, rotate_to_stream_ordering,)
)
txn.execute(
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
(rotate_to_stream_ordering,)
)
def _action_has_highlight(actions): def _action_has_highlight(actions):
for action in actions: for action in actions:

View file

@ -351,6 +351,7 @@ class ReceiptsStore(SQLBaseStore):
room_id=room_id, room_id=room_id,
user_id=user_id, user_id=user_id,
topological_ordering=topological_ordering, topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
) )
return True return True

View file

@ -0,0 +1,37 @@
/* Copyright 2017 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Aggregate of old notification counts that have been deleted out of the
-- main event_push_actions table. This count does not include those that were
-- highlights, as they remain in the event_push_actions table.
CREATE TABLE event_push_summary (
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
notif_count BIGINT NOT NULL,
stream_ordering BIGINT NOT NULL
);
CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id);
-- The stream ordering up to which we have aggregated the event_push_actions
-- table into event_push_summary
CREATE TABLE event_push_summary_stream_ordering (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_ordering BIGINT NOT NULL,
CHECK (Lock='X')
);
INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0);

View file

@ -413,7 +413,19 @@ class StateStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids}) defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_ids_for_events(self, event_ids, types): def get_state_ids_for_events(self, event_ids, types=None):
"""
Get the state dicts corresponding to a list of events
Args:
event_ids(list(str)): events whose state should be returned
types(list[(str, str)]|None): List of (type, state_key) tuples
which are used to filter the state fetched. May be None, which
matches any key
Returns:
A deferred dict from event_id -> (type, state_key) -> state_event
"""
event_to_groups = yield self._get_state_group_for_events( event_to_groups = yield self._get_state_group_for_events(
event_ids, event_ids,
) )

View file

@ -17,9 +17,15 @@ from twisted.internet import defer
import tests.unittest import tests.unittest
import tests.utils import tests.utils
from mock import Mock
USER_ID = "@user:example.com" USER_ID = "@user:example.com"
PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}]
HIGHLIGHT = [
"notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
]
class EventPushActionsStoreTestCase(tests.unittest.TestCase): class EventPushActionsStoreTestCase(tests.unittest.TestCase):
@ -39,3 +45,83 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
yield self.store.get_unread_push_actions_for_user_in_range_for_email( yield self.store.get_unread_push_actions_for_user_in_range_for_email(
USER_ID, 0, 1000, 20 USER_ID, 0, 1000, 20
) )
@defer.inlineCallbacks
def test_count_aggregation(self):
room_id = "!foo:example.com"
user_id = "@user1235:example.com"
@defer.inlineCallbacks
def _assert_counts(noitf_count, highlight_count):
counts = yield self.store.runInteraction(
"", self.store._get_unread_counts_by_pos_txn,
room_id, user_id, 0, 0
)
self.assertEquals(
counts,
{"notify_count": noitf_count, "highlight_count": highlight_count}
)
def _inject_actions(stream, action):
event = Mock()
event.room_id = room_id
event.event_id = "$test:example.com"
event.internal_metadata.stream_ordering = stream
event.depth = stream
tuples = [(user_id, action)]
return self.store.runInteraction(
"", self.store._set_push_actions_for_event_and_users_txn,
event, tuples
)
def _rotate(stream):
return self.store.runInteraction(
"", self.store._rotate_notifs_before_txn, stream
)
def _mark_read(stream, depth):
return self.store.runInteraction(
"", self.store._remove_old_push_actions_before_txn,
room_id, user_id, depth, stream
)
yield _assert_counts(0, 0)
yield _inject_actions(1, PlAIN_NOTIF)
yield _assert_counts(1, 0)
yield _rotate(2)
yield _assert_counts(1, 0)
yield _inject_actions(3, PlAIN_NOTIF)
yield _assert_counts(2, 0)
yield _rotate(4)
yield _assert_counts(2, 0)
yield _inject_actions(5, PlAIN_NOTIF)
yield _mark_read(3, 3)
yield _assert_counts(1, 0)
yield _mark_read(5, 5)
yield _assert_counts(0, 0)
yield _inject_actions(6, PlAIN_NOTIF)
yield _rotate(7)
yield self.store._simple_delete(
table="event_push_actions",
keyvalues={"1": 1},
desc="",
)
yield _assert_counts(1, 0)
yield _mark_read(7, 7)
yield _assert_counts(0, 0)
yield _inject_actions(8, HIGHLIGHT)
yield _assert_counts(1, 1)
yield _rotate(9)
yield _assert_counts(1, 1)
yield _rotate(10)
yield _assert_counts(1, 1)