Merge pull request #7055 from matrix-org/babolivier/get_time_of_last_push_action_before

Move get_time_of_last_push_action_before to the EventPushActionsWorkerStore
This commit is contained in:
Brendan Abolivier 2020-03-09 14:53:50 +00:00 committed by GitHub
commit 14b2ebe767
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 18 deletions

1
changelog.d/7055.misc Normal file
View File

@ -0,0 +1 @@
Merge worker apps together.

View File

@ -555,10 +555,12 @@ class Mailer(object):
else: else:
# If the reason room doesn't have a name, say who the messages # If the reason room doesn't have a name, say who the messages
# are from explicitly to avoid, "messages in the Bob room" # are from explicitly to avoid, "messages in the Bob room"
room_id = reason["room_id"]
sender_ids = list( sender_ids = list(
{ {
notif_events[n["event_id"]].sender notif_events[n["event_id"]].sender
for n in notifs_by_room[reason["room_id"]] for n in notifs_by_room[room_id]
} }
) )

View File

@ -608,6 +608,23 @@ class EventPushActionsWorkerStore(SQLBaseStore):
return range_end return range_end
@defer.inlineCallbacks
def get_time_of_last_push_action_before(self, stream_ordering):
def f(txn):
sql = (
"SELECT e.received_ts"
" FROM event_push_actions AS ep"
" JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
" WHERE ep.stream_ordering > ?"
" ORDER BY ep.stream_ordering ASC"
" LIMIT 1"
)
txn.execute(sql, (stream_ordering,))
return txn.fetchone()
result = yield self.db.runInteraction("get_time_of_last_push_action_before", f)
return result[0] if result else None
class EventPushActionsStore(EventPushActionsWorkerStore): class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index" EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@ -735,23 +752,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"]) pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
return push_actions return push_actions
@defer.inlineCallbacks
def get_time_of_last_push_action_before(self, stream_ordering):
def f(txn):
sql = (
"SELECT e.received_ts"
" FROM event_push_actions AS ep"
" JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
" WHERE ep.stream_ordering > ?"
" ORDER BY ep.stream_ordering ASC"
" LIMIT 1"
)
txn.execute(sql, (stream_ordering,))
return txn.fetchone()
result = yield self.db.runInteraction("get_time_of_last_push_action_before", f)
return result[0] if result else None
@defer.inlineCallbacks @defer.inlineCallbacks
def get_latest_push_action_stream_ordering(self): def get_latest_push_action_stream_ordering(self):
def f(txn): def f(txn):