mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-06 09:04:11 -04:00
Merge pull request #759 from matrix-org/dbkr/email_notifs
Send email notifications for missed messages
This commit is contained in:
commit
fe97b81c09
31 changed files with 1470 additions and 34 deletions
|
@ -118,16 +118,19 @@ class EventPushActionsStore(SQLBaseStore):
|
|||
max_stream_ordering=None):
|
||||
def get_after_receipt(txn):
|
||||
sql = (
|
||||
"SELECT ep.event_id, ep.stream_ordering, ep.actions "
|
||||
"FROM event_push_actions AS ep, ("
|
||||
" SELECT room_id, user_id,"
|
||||
" max(topological_ordering) as topological_ordering,"
|
||||
" max(stream_ordering) as stream_ordering"
|
||||
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
|
||||
"e.received_ts "
|
||||
"FROM ("
|
||||
" SELECT room_id, user_id, "
|
||||
" max(topological_ordering) as topological_ordering, "
|
||||
" max(stream_ordering) as stream_ordering "
|
||||
" FROM events"
|
||||
" NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
|
||||
" GROUP BY room_id, user_id"
|
||||
") AS rl "
|
||||
"WHERE"
|
||||
") AS rl,"
|
||||
" event_push_actions AS ep"
|
||||
" INNER JOIN events AS e USING (room_id, event_id)"
|
||||
" WHERE"
|
||||
" ep.room_id = rl.room_id"
|
||||
" AND ("
|
||||
" ep.topological_ordering > rl.topological_ordering"
|
||||
|
@ -153,11 +156,13 @@ class EventPushActionsStore(SQLBaseStore):
|
|||
|
||||
def get_no_receipt(txn):
|
||||
sql = (
|
||||
"SELECT ep.event_id, ep.stream_ordering, ep.actions "
|
||||
"FROM event_push_actions AS ep "
|
||||
"WHERE ep.room_id not in ("
|
||||
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
|
||||
" e.received_ts"
|
||||
" FROM event_push_actions AS ep"
|
||||
" JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
|
||||
" WHERE ep.room_id not in ("
|
||||
" SELECT room_id FROM events NATURAL JOIN receipts_linearized"
|
||||
" WHERE receipt_type = 'm.read' AND user_id = ? "
|
||||
" WHERE receipt_type = 'm.read' AND user_id = ?"
|
||||
" GROUP BY room_id"
|
||||
") AND ep.user_id = ? AND ep.stream_ordering > ?"
|
||||
)
|
||||
|
@ -175,11 +180,29 @@ class EventPushActionsStore(SQLBaseStore):
|
|||
defer.returnValue([
|
||||
{
|
||||
"event_id": row[0],
|
||||
"stream_ordering": row[1],
|
||||
"actions": json.loads(row[2]),
|
||||
"room_id": row[1],
|
||||
"stream_ordering": row[2],
|
||||
"actions": json.loads(row[3]),
|
||||
"received_ts": row[4],
|
||||
} for row in after_read_receipt + no_read_receipt
|
||||
])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_time_of_last_push_action_before(self, stream_ordering):
|
||||
def f(txn):
|
||||
sql = (
|
||||
"SELECT e.received_ts"
|
||||
" FROM event_push_actions AS ep"
|
||||
" JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
|
||||
" WHERE ep.stream_ordering > ?"
|
||||
" ORDER BY ep.stream_ordering ASC"
|
||||
" LIMIT 1"
|
||||
)
|
||||
txn.execute(sql, (stream_ordering,))
|
||||
return txn.fetchone()
|
||||
result = yield self.runInteraction("get_time_of_last_push_action_before", f)
|
||||
defer.returnValue(result[0] if result else None)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_latest_push_action_stream_ordering(self):
|
||||
def f(txn):
|
||||
|
|
|
@ -144,6 +144,7 @@ class EventsStore(SQLBaseStore):
|
|||
|
||||
def __init__(self, hs):
|
||||
super(EventsStore, self).__init__(hs)
|
||||
self._clock = hs.get_clock()
|
||||
self.register_background_update_handler(
|
||||
self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
|
||||
)
|
||||
|
@ -565,6 +566,7 @@ class EventsStore(SQLBaseStore):
|
|||
"outlier": event.internal_metadata.is_outlier(),
|
||||
"content": encode_json(event.content).decode("UTF-8"),
|
||||
"origin_server_ts": int(event.origin_server_ts),
|
||||
"received_ts": self._clock.time_msec(),
|
||||
}
|
||||
for event, _ in events_and_contexts
|
||||
],
|
||||
|
|
|
@ -233,3 +233,30 @@ class PusherStore(SQLBaseStore):
|
|||
{'failing_since': failing_since},
|
||||
desc="update_pusher_failing_since",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_throttle_params_by_room(self, pusher_id):
|
||||
res = yield self._simple_select_list(
|
||||
"pusher_throttle",
|
||||
{"pusher": pusher_id},
|
||||
["room_id", "last_sent_ts", "throttle_ms"],
|
||||
desc="get_throttle_params_by_room"
|
||||
)
|
||||
|
||||
params_by_room = {}
|
||||
for row in res:
|
||||
params_by_room[row["room_id"]] = {
|
||||
"last_sent_ts": row["last_sent_ts"],
|
||||
"throttle_ms": row["throttle_ms"]
|
||||
}
|
||||
|
||||
defer.returnValue(params_by_room)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_throttle_params(self, pusher_id, room_id, params):
|
||||
yield self._simple_upsert(
|
||||
"pusher_throttle",
|
||||
{"pusher": pusher_id, "room_id": room_id},
|
||||
params,
|
||||
desc="set_throttle_params"
|
||||
)
|
||||
|
|
16
synapse/storage/schema/delta/31/events.sql
Normal file
16
synapse/storage/schema/delta/31/events.sql
Normal file
|
@ -0,0 +1,16 @@
|
|||
/* Copyright 2016 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
ALTER TABLE events ADD COLUMN received_ts BIGINT;
|
23
synapse/storage/schema/delta/31/pusher_throttle.sql
Normal file
23
synapse/storage/schema/delta/31/pusher_throttle.sql
Normal file
|
@ -0,0 +1,23 @@
|
|||
/* Copyright 2016 OpenMarket Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
CREATE TABLE pusher_throttle(
|
||||
pusher BIGINT NOT NULL,
|
||||
room_id TEXT NOT NULL,
|
||||
last_sent_ts BIGINT,
|
||||
throttle_ms BIGINT,
|
||||
PRIMARY KEY (pusher, room_id)
|
||||
);
|
Loading…
Add table
Add a link
Reference in a new issue