From b5e646a18ce2a293e5d35dcb560ba50183a87429 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 13 May 2016 11:36:50 +0100 Subject: [PATCH 1/4] Make email notifs work on the pusher synapse Plus general bugfix to email notif code --- synapse/app/pusher.py | 47 ++++++++++++++++++++++++++++++++++++++++++ synapse/push/mailer.py | 1 + 2 files changed, 48 insertions(+) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8e9c0e196..662cd0dc6 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -24,6 +24,8 @@ from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.state import StateStore +from synapse.storage.roommember import RoomMemberStore +from synapse.storage.account_data import AccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore @@ -60,6 +62,7 @@ class SlaveConfig(DatabaseConfig): self.soft_file_limit = config.get("soft_file_limit") self.daemonize = config.get("daemonize") self.pid_file = self.abspath(config.get("pid_file")) + self.public_baseurl = config["public_baseurl"] def default_config(self, server_name, **kwargs): pid_file = self.abspath("pusher.pid") @@ -132,6 +135,30 @@ class PusherSlaveStore( DataStore.get_state_groups.__func__ ) + _get_state_for_groups = ( + DataStore._get_state_for_groups.__func__ + ) + + _get_all_state_from_cache = ( + DataStore._get_all_state_from_cache.__func__ + ) + + get_events_around = ( + DataStore.get_events_around.__func__ + ) + + _get_events_around_txn = ( + DataStore._get_events_around_txn.__func__ + ) + + get_state_for_events = ( + DataStore.get_state_for_events.__func__ + ) + + _get_some_state_from_cache = ( + DataStore._get_some_state_from_cache.__func__ + ) + _get_state_group_for_events = ( StateStore.__dict__["_get_state_group_for_events"] ) @@ -140,6 +167,26 @@ class PusherSlaveStore( StateStore.__dict__["_get_state_group_for_event"] ) + _get_state_groups_from_groups = ( + StateStore.__dict__["_get_state_groups_from_groups"] + ) + + _get_state_group_from_group = ( + StateStore.__dict__["_get_state_group_from_group"] + ) + + get_global_account_data_by_type_for_users = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] + ) + + get_global_account_data_by_type_for_user = ( + AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] + ) + + who_forgot_in_room = ( + RoomMemberStore.__dict__["who_forgot_in_room"] + ) + class PusherServer(HomeServer): diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 5d60c1efc..2be294f52 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -397,6 +397,7 @@ class Mailer(object): return "" serverAndMediaId = value[6:] + fragment = None if '#' in serverAndMediaId: (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1) fragment = "#" + fragment From 206eb9fd947ba86060340ba2154d1112570b76cd Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 16:58:14 +0100 Subject: [PATCH 2/4] Shift some of the state_group methods into the SlavedEventStore --- synapse/app/pusher.py | 45 --------------------- synapse/replication/slave/storage/events.py | 19 +++++++++ 2 files changed, 19 insertions(+), 45 deletions(-) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 662cd0dc6..9d41b62db 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -23,7 +23,6 @@ from synapse.config.logger import LoggingConfig from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.storage.state import StateStore from synapse.storage.roommember import RoomMemberStore from synapse.storage.account_data import AccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -131,50 +130,6 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) - get_state_groups = ( - DataStore.get_state_groups.__func__ - ) - - _get_state_for_groups = ( - DataStore._get_state_for_groups.__func__ - ) - - _get_all_state_from_cache = ( - DataStore._get_all_state_from_cache.__func__ - ) - - get_events_around = ( - DataStore.get_events_around.__func__ - ) - - _get_events_around_txn = ( - DataStore._get_events_around_txn.__func__ - ) - - get_state_for_events = ( - DataStore.get_state_for_events.__func__ - ) - - _get_some_state_from_cache = ( - DataStore._get_some_state_from_cache.__func__ - ) - - _get_state_group_for_events = ( - StateStore.__dict__["_get_state_group_for_events"] - ) - - _get_state_group_for_event = ( - StateStore.__dict__["_get_state_group_for_event"] - ) - - _get_state_groups_from_groups = ( - StateStore.__dict__["_get_state_groups_from_groups"] - ) - - _get_state_group_from_group = ( - StateStore.__dict__["_get_state_group_from_group"] - ) - get_global_account_data_by_type_for_users = ( AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 7ba7a6f6e..0e29bd51d 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -75,6 +75,18 @@ class SlavedEventStore(BaseSlavedStore): get_unread_event_push_actions_by_room_for_user = ( EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] ) + _get_state_group_for_events = ( + StateStore.__dict__["_get_state_group_for_events"] + ) + _get_state_group_for_event = ( + StateStore.__dict__["_get_state_group_for_event"] + ) + _get_state_groups_from_groups = ( + StateStore.__dict__["_get_state_groups_from_groups"] + ) + _get_state_group_from_group = ( + StateStore.__dict__["_get_state_group_from_group"] + ) get_unread_push_actions_for_user_in_range = ( DataStore.get_unread_push_actions_for_user_in_range.__func__ @@ -96,6 +108,9 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_room = ( DataStore.get_room_events_stream_for_room.__func__ ) + get_events_around = DataStore.get_events_around.__func__ + get_state_for_events = DataStore.get_state_for_events.__func__ + get_state_groups = DataStore.get_state_groups.__func__ _set_before_and_after = DataStore._set_before_and_after @@ -116,6 +131,10 @@ class SlavedEventStore(BaseSlavedStore): DataStore._get_rooms_for_user_where_membership_is_txn.__func__ ) _get_members_rows_txn = DataStore._get_members_rows_txn.__func__ + _get_state_for_groups = DataStore._get_state_for_groups.__func__ + _get_all_state_from_cache = DataStore._get_all_state_from_cache.__func__ + _get_events_around_txn = DataStore._get_events_around_txn.__func__ + _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() From f03ddc98ec38771a329f47c13c76dd4e93fbef16 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 17:01:28 +0100 Subject: [PATCH 3/4] Use the SlavedAccountDataStore --- synapse/app/pusher.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9d41b62db..8ff8329f8 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -24,10 +24,10 @@ from synapse.config.emailconfig import EmailConfig from synapse.http.site import SynapseSite from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.storage.roommember import RoomMemberStore -from synapse.storage.account_data import AccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.storage.engines import create_engine from synapse.storage import DataStore from synapse.util.async import sleep @@ -100,7 +100,8 @@ class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig): class PusherSlaveStore( - SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore + SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore, + SlavedAccountDataStore ): update_pusher_last_stream_ordering_and_success = ( DataStore.update_pusher_last_stream_ordering_and_success.__func__ @@ -130,14 +131,6 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) - get_global_account_data_by_type_for_users = ( - AccountDataStore.__dict__["get_global_account_data_by_type_for_users"] - ) - - get_global_account_data_by_type_for_user = ( - AccountDataStore.__dict__["get_global_account_data_by_type_for_user"] - ) - who_forgot_in_room = ( RoomMemberStore.__dict__["who_forgot_in_room"] ) From b3f29dc1e59ccb3e53a124d9c0d85d26377350f4 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 13 May 2016 17:16:27 +0100 Subject: [PATCH 4/4] Manually expire broken caches like the who_forgot_in_room --- synapse/app/pusher.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 8ff8329f8..135dd58c1 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -131,6 +131,11 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) + # XXX: This is a bit broken because we don't persist forgotten rooms + # in a way that they can be streamed. This means that we don't have a + # way to invalidate the forgotten rooms cache correctly. + # For now we expire the cache every 10 minutes. + BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 who_forgot_in_room = ( RoomMemberStore.__dict__["who_forgot_in_room"] ) @@ -214,6 +219,7 @@ class PusherServer(HomeServer): store = self.get_datastore() replication_url = self.config.replication_url pusher_pool = self.get_pusherpool() + clock = self.get_clock() def stop_pusher(user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) @@ -265,11 +271,21 @@ class PusherServer(HomeServer): min_stream_id, max_stream_id, affected_room_ids ) + def expire_broken_caches(): + store.who_forgot_in_room.invalidate_all() + + next_expire_broken_caches_ms = 0 while True: try: args = store.stream_positions() args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) + now_ms = clock.time_msec() + if now_ms > next_expire_broken_caches_ms: + expire_broken_caches() + next_expire_broken_caches_ms = ( + now_ms + store.BROKEN_CACHE_EXPIRY_MS + ) yield store.process_replication(result) poke_pushers(result) except: