Move storage functions for push calculations

This will allow push actions for an event to be calculated on workers.
This commit is contained in:
Erik Johnston 2018-02-27 12:01:36 +00:00
parent 3594dbc6dc
commit 493e25d554
6 changed files with 101 additions and 98 deletions

View File

@ -32,7 +32,6 @@ from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
@ -75,10 +74,6 @@ class PusherSlaveStore(
DataStore.get_profile_displayname.__func__ DataStore.get_profile_displayname.__func__
) )
who_forgot_in_room = (
RoomMemberStore.__dict__["who_forgot_in_room"]
)
class PusherServer(HomeServer): class PusherServer(HomeServer):
def setup(self): def setup(self):

View File

@ -62,8 +62,6 @@ logger = logging.getLogger("synapse.app.synchrotron")
class SynchrotronSlavedStore( class SynchrotronSlavedStore(
SlavedPushRuleStore,
SlavedEventStore,
SlavedReceiptsStore, SlavedReceiptsStore,
SlavedAccountDataStore, SlavedAccountDataStore,
SlavedApplicationServiceStore, SlavedApplicationServiceStore,
@ -73,14 +71,12 @@ class SynchrotronSlavedStore(
SlavedGroupServerStore, SlavedGroupServerStore,
SlavedDeviceInboxStore, SlavedDeviceInboxStore,
SlavedDeviceStore, SlavedDeviceStore,
SlavedPushRuleStore,
SlavedEventStore,
SlavedClientIpStore, SlavedClientIpStore,
RoomStore, RoomStore,
BaseSlavedStore, BaseSlavedStore,
): ):
who_forgot_in_room = (
RoomMemberStore.__dict__["who_forgot_in_room"]
)
did_forget = ( did_forget = (
RoomMemberStore.__dict__["did_forget"] RoomMemberStore.__dict__["did_forget"]
) )

View File

@ -380,6 +380,69 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# Now return the first `limit` # Now return the first `limit`
defer.returnValue(notifs[:limit]) defer.returnValue(notifs[:limit])
def add_push_actions_to_staging(self, event_id, user_id_actions):
"""Add the push actions for the event to the push action staging area.
Args:
event_id (str)
user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
user_id to list of push actions, where an action can either be
a string or dict.
Returns:
Deferred
"""
if not user_id_actions:
return
# This is a helper function for generating the necessary tuple that
# can be used to inert into the `event_push_actions_staging` table.
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
1, # notif column
is_highlight, # highlight column
)
def _add_push_actions_to_staging_txn(txn):
# We don't use _simple_insert_many here to avoid the overhead
# of generating lists of dicts.
sql = """
INSERT INTO event_push_actions_staging
(event_id, user_id, actions, notif, highlight)
VALUES (?, ?, ?, ?, ?)
"""
txn.executemany(sql, (
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.iteritems()
))
return self.runInteraction(
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)
def remove_push_actions_from_staging(self, event_id):
"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB
Args:
event_id (str)
"""
return self._simple_delete(
table="event_push_actions_staging",
keyvalues={
"event_id": event_id,
},
desc="remove_push_actions_from_staging",
)
class EventPushActionsStore(EventPushActionsWorkerStore): class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index" EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@ -775,69 +838,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
(rotate_to_stream_ordering,) (rotate_to_stream_ordering,)
) )
def add_push_actions_to_staging(self, event_id, user_id_actions):
"""Add the push actions for the event to the push action staging area.
Args:
event_id (str)
user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
user_id to list of push actions, where an action can either be
a string or dict.
Returns:
Deferred
"""
if not user_id_actions:
return
# This is a helper function for generating the necessary tuple that
# can be used to inert into the `event_push_actions_staging` table.
def _gen_entry(user_id, actions):
is_highlight = 1 if _action_has_highlight(actions) else 0
return (
event_id, # event_id column
user_id, # user_id column
_serialize_action(actions, is_highlight), # actions column
1, # notif column
is_highlight, # highlight column
)
def _add_push_actions_to_staging_txn(txn):
# We don't use _simple_insert_many here to avoid the overhead
# of generating lists of dicts.
sql = """
INSERT INTO event_push_actions_staging
(event_id, user_id, actions, notif, highlight)
VALUES (?, ?, ?, ?, ?)
"""
txn.executemany(sql, (
_gen_entry(user_id, actions)
for user_id, actions in user_id_actions.iteritems()
))
return self.runInteraction(
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)
def remove_push_actions_from_staging(self, event_id):
"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB
Args:
event_id (str)
"""
return self._simple_delete(
table="event_push_actions_staging",
keyvalues={
"event_id": event_id,
},
desc="remove_push_actions_from_staging",
)
def _action_has_highlight(actions): def _action_has_highlight(actions):
for action in actions: for action in actions:

View File

@ -15,6 +15,10 @@
# limitations under the License. # limitations under the License.
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.storage.appservice import ApplicationServiceWorkerStore
from synapse.storage.pusher import PusherWorkerStore
from synapse.storage.receipts import ReceiptsWorkerStore
from synapse.storage.roommember import RoomMemberWorkerStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.push.baserules import list_with_base_rules from synapse.push.baserules import list_with_base_rules
@ -51,7 +55,11 @@ def _load_rules(rawrules, enabled_map):
return rules return rules
class PushRulesWorkerStore(SQLBaseStore): class PushRulesWorkerStore(ApplicationServiceWorkerStore,
ReceiptsWorkerStore,
PusherWorkerStore,
RoomMemberWorkerStore,
SQLBaseStore):
"""This is an abstract base class where subclasses must implement """This is an abstract base class where subclasses must implement
`get_max_push_rules_stream_id` which can be called in the initializer. `get_max_push_rules_stream_id` which can be called in the initializer.
""" """
@ -140,8 +148,6 @@ class PushRulesWorkerStore(SQLBaseStore):
"have_push_rules_changed", have_push_rules_changed_txn "have_push_rules_changed", have_push_rules_changed_txn
) )
class PushRuleStore(PushRulesWorkerStore):
@cachedList(cached_method_name="get_push_rules_for_user", @cachedList(cached_method_name="get_push_rules_for_user",
list_name="user_ids", num_args=1, inlineCallbacks=True) list_name="user_ids", num_args=1, inlineCallbacks=True)
def bulk_get_push_rules(self, user_ids): def bulk_get_push_rules(self, user_ids):
@ -281,6 +287,8 @@ class PushRuleStore(PushRulesWorkerStore):
results.setdefault(row['user_name'], {})[row['rule_id']] = enabled results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
defer.returnValue(results) defer.returnValue(results)
class PushRuleStore(PushRulesWorkerStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def add_push_rule( def add_push_rule(
self, user_id, rule_id, priority_class, conditions, actions, self, user_id, rule_id, priority_class, conditions, actions,

View File

@ -175,11 +175,6 @@ class PusherWorkerStore(SQLBaseStore):
"get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
) )
class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()
@cachedInlineCallbacks(num_args=1, max_entries=15000) @cachedInlineCallbacks(num_args=1, max_entries=15000)
def get_if_user_has_pusher(self, user_id): def get_if_user_has_pusher(self, user_id):
# This only exists for the cachedList decorator # This only exists for the cachedList decorator
@ -201,6 +196,11 @@ class PusherStore(PusherWorkerStore):
defer.returnValue(result) defer.returnValue(result)
class PusherStore(PusherWorkerStore):
def get_pushers_stream_token(self):
return self._pushers_id_gen.get_current_token()
@defer.inlineCallbacks @defer.inlineCallbacks
def add_pusher(self, user_id, access_token, kind, app_id, def add_pusher(self, user_id, access_token, kind, app_id,
app_display_name, device_display_name, app_display_name, device_display_name,
@ -233,14 +233,18 @@ class PusherStore(PusherWorkerStore):
) )
if newly_inserted: if newly_inserted:
# get_if_user_has_pusher only cares if the user has self.runInteraction(
# at least *one* pusher. "add_pusher",
self.get_if_user_has_pusher.invalidate(user_id,) self._invalidate_cache_and_stream,
self.get_if_user_has_pusher, (user_id,)
)
@defer.inlineCallbacks @defer.inlineCallbacks
def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
def delete_pusher_txn(txn, stream_id): def delete_pusher_txn(txn, stream_id):
txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,)) self._invalidate_cache_and_stream(
txn, self.get_if_user_has_pusher, (user_id,)
)
self._simple_delete_one_txn( self._simple_delete_one_txn(
txn, txn,

View File

@ -432,6 +432,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def _get_joined_hosts_cache(self, room_id): def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id) return _JoinedHostsCache(self, room_id)
@cached()
def who_forgot_in_room(self, room_id):
return self._simple_select_list(
table="room_memberships",
retcols=("user_id", "event_id"),
keyvalues={
"room_id": room_id,
"forgotten": 1,
},
desc="who_forgot"
)
class RoomMemberStore(RoomMemberWorkerStore): class RoomMemberStore(RoomMemberWorkerStore):
def __init__(self, db_conn, hs): def __init__(self, db_conn, hs):
@ -595,18 +607,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
forgot = yield self.runInteraction("did_forget_membership_at", f) forgot = yield self.runInteraction("did_forget_membership_at", f)
defer.returnValue(forgot == 1) defer.returnValue(forgot == 1)
@cached()
def who_forgot_in_room(self, room_id):
return self._simple_select_list(
table="room_memberships",
retcols=("user_id", "event_id"),
keyvalues={
"room_id": room_id,
"forgotten": 1,
},
desc="who_forgot"
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size): def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get( target_min_stream_id = progress.get(