First bits of emailpusher

Mostly logic of when to send an email
This commit is contained in:
David Baker 2016-04-19 14:24:36 +01:00
parent 48af68ba8e
commit 07d765209d
7 changed files with 335 additions and 8 deletions

View file

@ -118,15 +118,17 @@ class EventPushActionsStore(SQLBaseStore):
max_stream_ordering=None):
def get_after_receipt(txn):
sql = (
"SELECT ep.event_id, ep.stream_ordering, ep.actions "
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
"e.received_ts "
"FROM event_push_actions AS ep, ("
" SELECT room_id, user_id,"
" max(topological_ordering) as topological_ordering,"
" max(stream_ordering) as stream_ordering"
" SELECT room_id, user_id, "
" max(topological_ordering) as topological_ordering, "
" max(stream_ordering) as stream_ordering "
" FROM events"
" NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
" GROUP BY room_id, user_id"
") AS rl "
"NATURAL JOIN events e "
"WHERE"
" ep.room_id = rl.room_id"
" AND ("
@ -153,8 +155,10 @@ class EventPushActionsStore(SQLBaseStore):
def get_no_receipt(txn):
sql = (
"SELECT ep.event_id, ep.stream_ordering, ep.actions "
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
"e.received_ts "
"FROM event_push_actions AS ep "
"JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id "
"WHERE ep.room_id not in ("
" SELECT room_id FROM events NATURAL JOIN receipts_linearized"
" WHERE receipt_type = 'm.read' AND user_id = ? "
@ -175,11 +179,30 @@ class EventPushActionsStore(SQLBaseStore):
defer.returnValue([
{
"event_id": row[0],
"stream_ordering": row[1],
"actions": json.loads(row[2]),
"room_id": row[1],
"stream_ordering": row[2],
"actions": json.loads(row[3]),
"received_ts": row[4],
} for row in after_read_receipt + no_read_receipt
])
@defer.inlineCallbacks
def get_time_of_last_push_action_before(self, stream_ordering):
def f(txn):
sql = (
"SELECT e.received_ts "
"FROM event_push_actions AS ep "
"JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id "
"WHERE ep.stream_ordering > ? "
"ORDER BY ep.stream_ordering ASC "
"LIMIT 1"
)
txn.execute(sql, (stream_ordering,))
return txn.fetchone()
result = yield self.runInteraction("get_time_of_last_push_action_before", f)
defer.returnValue(result[0] if result is not None else None)
@defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self):
def f(txn):
@ -190,6 +213,26 @@ class EventPushActionsStore(SQLBaseStore):
)
defer.returnValue(result[0] or 0)
@defer.inlineCallbacks
def get_time_of_latest_push_action_by_room_for_user(self, user_id):
"""
Returns only the received_ts of the last notification in each of the
user's rooms, in a dict by room_id
"""
def f(txn):
txn.execute(
"SELECT ep.room_id, MAX(e.received_ts) "
"FROM event_push_actions AS ep "
"JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id "
"GROUP BY ep.room_id"
)
return txn.fetchall()
result = yield self.runInteraction(
"get_time_of_latest_push_action_by_room_for_user", f
)
defer.returnValue({row[0]: row[1] for row in result})
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
# Sad that we have to blow away the cache for the whole room here
txn.call_after(

View file

@ -55,6 +55,7 @@ class EventsStore(SQLBaseStore):
def __init__(self, hs):
super(EventsStore, self).__init__(hs)
self._clock = hs.get_clock()
self.register_background_update_handler(
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
)
@ -427,6 +428,7 @@ class EventsStore(SQLBaseStore):
"outlier": event.internal_metadata.is_outlier(),
"content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
}
for event, _ in events_and_contexts
],

View file

@ -230,3 +230,30 @@ class PusherStore(SQLBaseStore):
{'failing_since': failing_since},
desc="update_pusher_failing_since",
)
@defer.inlineCallbacks
def get_throttle_params_by_room(self, pusher_id):
res = yield self._simple_select_list(
"pusher_throttle",
{"pusher": pusher_id},
["room_id", "last_sent_ts", "throttle_ms"],
desc="get_throttle_params_by_room"
)
params_by_room = {}
for row in res:
params_by_room[row["room_id"]] = {
"last_sent_ts": row["last_sent_ts"],
"throttle_ms": row["throttle_ms"]
}
defer.returnValue(params_by_room)
@defer.inlineCallbacks
def set_throttle_params(self, pusher_id, room_id, params):
yield self._simple_upsert(
"pusher_throttle",
{"pusher": pusher_id, "room_id": room_id},
params,
desc="set_throttle_params"
)

View file

@ -0,0 +1,16 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
ALTER TABLE events ADD COLUMN received_ts BIGINT;

View file

@ -0,0 +1,23 @@
/* Copyright 2016 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
CREATE TABLE pusher_throttle(
pusher BIGINT NOT NULL,
room_id TEXT NOT NULL,
last_sent_ts BIGINT,
throttle_ms BIGINT,
PRIMARY KEY (pusher, room_id)
);