diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 66a995157..2771f7c3c 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from . import engines from twisted.internet import defer @@ -87,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - while True: - if self._background_update_timer is not None: - return + assert(self._background_update_timer is not None, + "background updates already running") + logger.info("Starting background schema updates") + + while True: sleep = defer.Deferred() self._background_update_timer = self._clock.call_later( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None @@ -101,22 +104,23 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_timer = None try: - result = yield self.do_background_update( + result = yield self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) except: logger.exception("Error doing update") - - if result is None: - logger.info( - "No more background updates to do." - " Unscheduling background update task." - ) - return + else: + if result is None: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + defer.returnValue() @defer.inlineCallbacks - def do_background_update(self, desired_duration_ms): - """Does some amount of work on a background update + def do_next_background_update(self, desired_duration_ms): + """Does some amount of work on the next queued background update + Args: desired_duration_ms(float): How long we want to spend updating. @@ -135,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_queue.append(update['update_name']) if not self._background_update_queue: + # no work left to do defer.returnValue(None) + # pop from the front, and add back to the back update_name = self._background_update_queue.pop(0) self._background_update_queue.append(update_name) + res = yield self._do_background_update(update_name, desired_duration_ms) + defer.returnValue(res) + + @defer.inlineCallbacks + def _do_background_update(self, update_name, desired_duration_ms): + logger.info("Starting update batch on background update '%s'", + update_name) + update_handler = self._background_update_handlers[update_name] performance = self._background_update_performance.get(update_name) @@ -202,6 +216,64 @@ class BackgroundUpdateStore(SQLBaseStore): """ self._background_update_handlers[update_name] = update_handler + def register_background_index_update(self, update_name, index_name, + table, columns): + """Helper for store classes to do a background index addition + + To use: + + 1. use a schema delta file to add a background update. Example: + INSERT INTO background_updates (update_name, progress_json) VALUES + ('my_new_index', '{}'); + + 2. In the Store constructor, call this method + + Args: + update_name (str): update_name to register for + index_name (str): name of index to add + table (str): table to add index to + columns (list[str]): columns/expressions to include in index + """ + + # if this is postgres, we add the indexes concurrently. Otherwise + # we fall back to doing it inline + if isinstance(self.database_engine, engines.PostgresEngine): + conc = True + else: + conc = False + + sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \ + % { + "conc": "CONCURRENTLY" if conc else "", + "name": index_name, + "table": table, + "columns": ", ".join(columns), + } + + def create_index_concurrently(conn): + conn.rollback() + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + c = conn.cursor() + c.execute(sql) + conn.set_session(autocommit=False) + + def create_index(conn): + c = conn.cursor() + c.execute(sql) + + @defer.inlineCallbacks + def updater(progress, batch_size): + logger.info("Adding index %s to %s", index_name, table) + if conc: + yield self.runWithConnection(create_index_concurrently) + else: + yield self.runWithConnection(create_index) + yield self._end_background_update(update_name) + defer.returnValue(1) + + self.register_background_update_handler(update_name, updater) + def start_background_update(self, update_name, progress): """Starts a background update running. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index e31fa53c3..71e5ea112 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,10 +15,11 @@ import logging -from ._base import SQLBaseStore, Cache - from twisted.internet import defer +from ._base import Cache +from . import background_updates + logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller @@ -27,8 +28,7 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -class ClientIpStore(SQLBaseStore): - +class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", @@ -37,6 +37,13 @@ class ClientIpStore(SQLBaseStore): super(ClientIpStore, self).__init__(hs) + self.register_background_index_update( + "user_ips_device_index", + index_name="user_ips_device_id", + table="user_ips", + columns=["user_id", "device_id", "last_seen"], + ) + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) diff --git a/synapse/storage/schema/delta/33/user_ips_index.sql b/synapse/storage/schema/delta/33/user_ips_index.sql index 8a05677d4..473f75a78 100644 --- a/synapse/storage/schema/delta/33/user_ips_index.sql +++ b/synapse/storage/schema/delta/33/user_ips_index.sql @@ -13,4 +13,5 @@ * limitations under the License. */ -CREATE INDEX user_ips_device_id ON user_ips(user_id, device_id, last_seen); +INSERT INTO background_updates (update_name, progress_json) VALUES + ('user_ips_device_index', '{}'); diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 6e4d9b137..1286b4ce2 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -10,7 +10,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - hs = yield setup_test_homeserver() + hs = yield setup_test_homeserver() # type: synapse.server.HomeServer self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -20,11 +20,20 @@ class BackgroundUpdateTestCase(unittest.TestCase): "test_update", self.update_handler ) + # run the real background updates, to get them out the way + # (perhaps we should run them as part of the test HS setup, since we + # run all of the other schema setup stuff there?) + while True: + res = yield self.store.do_next_background_update(1000) + if res is None: + break + @defer.inlineCallbacks def test_do_background_update(self): desired_count = 1000 duration_ms = 42 + # first step: make a bit of progress @defer.inlineCallbacks def update(progress, count): self.clock.advance_time_msec(count * duration_ms) @@ -42,7 +51,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): yield self.store.start_background_update("test_update", {"my_key": 1}) self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNotNone(result) @@ -50,15 +59,15 @@ class BackgroundUpdateTestCase(unittest.TestCase): {"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE ) + # second step: complete the update @defer.inlineCallbacks def update(progress, count): yield self.store._end_background_update("test_update") defer.returnValue(count) self.update_handler.side_effect = update - self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNotNone(result) @@ -66,8 +75,9 @@ class BackgroundUpdateTestCase(unittest.TestCase): {"my_key": 2}, desired_count ) + # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNone(result)