mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Merge pull request #945 from matrix-org/rav/background_reindex
Create index on user_ips in the background
This commit is contained in:
commit
bf3de7b90b
@ -14,6 +14,7 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from ._base import SQLBaseStore
|
||||||
|
from . import engines
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
@ -87,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def start_doing_background_updates(self):
|
def start_doing_background_updates(self):
|
||||||
while True:
|
assert(self._background_update_timer is not None,
|
||||||
if self._background_update_timer is not None:
|
"background updates already running")
|
||||||
return
|
|
||||||
|
|
||||||
|
logger.info("Starting background schema updates")
|
||||||
|
|
||||||
|
while True:
|
||||||
sleep = defer.Deferred()
|
sleep = defer.Deferred()
|
||||||
self._background_update_timer = self._clock.call_later(
|
self._background_update_timer = self._clock.call_later(
|
||||||
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
|
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
|
||||||
@ -101,22 +104,23 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||||||
self._background_update_timer = None
|
self._background_update_timer = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = yield self.do_background_update(
|
result = yield self.do_next_background_update(
|
||||||
self.BACKGROUND_UPDATE_DURATION_MS
|
self.BACKGROUND_UPDATE_DURATION_MS
|
||||||
)
|
)
|
||||||
except:
|
except:
|
||||||
logger.exception("Error doing update")
|
logger.exception("Error doing update")
|
||||||
|
else:
|
||||||
if result is None:
|
if result is None:
|
||||||
logger.info(
|
logger.info(
|
||||||
"No more background updates to do."
|
"No more background updates to do."
|
||||||
" Unscheduling background update task."
|
" Unscheduling background update task."
|
||||||
)
|
)
|
||||||
return
|
defer.returnValue()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def do_background_update(self, desired_duration_ms):
|
def do_next_background_update(self, desired_duration_ms):
|
||||||
"""Does some amount of work on a background update
|
"""Does some amount of work on the next queued background update
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
desired_duration_ms(float): How long we want to spend
|
desired_duration_ms(float): How long we want to spend
|
||||||
updating.
|
updating.
|
||||||
@ -135,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||||||
self._background_update_queue.append(update['update_name'])
|
self._background_update_queue.append(update['update_name'])
|
||||||
|
|
||||||
if not self._background_update_queue:
|
if not self._background_update_queue:
|
||||||
|
# no work left to do
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
|
# pop from the front, and add back to the back
|
||||||
update_name = self._background_update_queue.pop(0)
|
update_name = self._background_update_queue.pop(0)
|
||||||
self._background_update_queue.append(update_name)
|
self._background_update_queue.append(update_name)
|
||||||
|
|
||||||
|
res = yield self._do_background_update(update_name, desired_duration_ms)
|
||||||
|
defer.returnValue(res)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def _do_background_update(self, update_name, desired_duration_ms):
|
||||||
|
logger.info("Starting update batch on background update '%s'",
|
||||||
|
update_name)
|
||||||
|
|
||||||
update_handler = self._background_update_handlers[update_name]
|
update_handler = self._background_update_handlers[update_name]
|
||||||
|
|
||||||
performance = self._background_update_performance.get(update_name)
|
performance = self._background_update_performance.get(update_name)
|
||||||
@ -202,6 +216,64 @@ class BackgroundUpdateStore(SQLBaseStore):
|
|||||||
"""
|
"""
|
||||||
self._background_update_handlers[update_name] = update_handler
|
self._background_update_handlers[update_name] = update_handler
|
||||||
|
|
||||||
|
def register_background_index_update(self, update_name, index_name,
|
||||||
|
table, columns):
|
||||||
|
"""Helper for store classes to do a background index addition
|
||||||
|
|
||||||
|
To use:
|
||||||
|
|
||||||
|
1. use a schema delta file to add a background update. Example:
|
||||||
|
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||||
|
('my_new_index', '{}');
|
||||||
|
|
||||||
|
2. In the Store constructor, call this method
|
||||||
|
|
||||||
|
Args:
|
||||||
|
update_name (str): update_name to register for
|
||||||
|
index_name (str): name of index to add
|
||||||
|
table (str): table to add index to
|
||||||
|
columns (list[str]): columns/expressions to include in index
|
||||||
|
"""
|
||||||
|
|
||||||
|
# if this is postgres, we add the indexes concurrently. Otherwise
|
||||||
|
# we fall back to doing it inline
|
||||||
|
if isinstance(self.database_engine, engines.PostgresEngine):
|
||||||
|
conc = True
|
||||||
|
else:
|
||||||
|
conc = False
|
||||||
|
|
||||||
|
sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \
|
||||||
|
% {
|
||||||
|
"conc": "CONCURRENTLY" if conc else "",
|
||||||
|
"name": index_name,
|
||||||
|
"table": table,
|
||||||
|
"columns": ", ".join(columns),
|
||||||
|
}
|
||||||
|
|
||||||
|
def create_index_concurrently(conn):
|
||||||
|
conn.rollback()
|
||||||
|
# postgres insists on autocommit for the index
|
||||||
|
conn.set_session(autocommit=True)
|
||||||
|
c = conn.cursor()
|
||||||
|
c.execute(sql)
|
||||||
|
conn.set_session(autocommit=False)
|
||||||
|
|
||||||
|
def create_index(conn):
|
||||||
|
c = conn.cursor()
|
||||||
|
c.execute(sql)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def updater(progress, batch_size):
|
||||||
|
logger.info("Adding index %s to %s", index_name, table)
|
||||||
|
if conc:
|
||||||
|
yield self.runWithConnection(create_index_concurrently)
|
||||||
|
else:
|
||||||
|
yield self.runWithConnection(create_index)
|
||||||
|
yield self._end_background_update(update_name)
|
||||||
|
defer.returnValue(1)
|
||||||
|
|
||||||
|
self.register_background_update_handler(update_name, updater)
|
||||||
|
|
||||||
def start_background_update(self, update_name, progress):
|
def start_background_update(self, update_name, progress):
|
||||||
"""Starts a background update running.
|
"""Starts a background update running.
|
||||||
|
|
||||||
|
@ -15,10 +15,11 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from ._base import SQLBaseStore, Cache
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from ._base import Cache
|
||||||
|
from . import background_updates
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
|
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
|
||||||
@ -27,8 +28,7 @@ logger = logging.getLogger(__name__)
|
|||||||
LAST_SEEN_GRANULARITY = 120 * 1000
|
LAST_SEEN_GRANULARITY = 120 * 1000
|
||||||
|
|
||||||
|
|
||||||
class ClientIpStore(SQLBaseStore):
|
class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||||
|
|
||||||
def __init__(self, hs):
|
def __init__(self, hs):
|
||||||
self.client_ip_last_seen = Cache(
|
self.client_ip_last_seen = Cache(
|
||||||
name="client_ip_last_seen",
|
name="client_ip_last_seen",
|
||||||
@ -37,6 +37,13 @@ class ClientIpStore(SQLBaseStore):
|
|||||||
|
|
||||||
super(ClientIpStore, self).__init__(hs)
|
super(ClientIpStore, self).__init__(hs)
|
||||||
|
|
||||||
|
self.register_background_index_update(
|
||||||
|
"user_ips_device_index",
|
||||||
|
index_name="user_ips_device_id",
|
||||||
|
table="user_ips",
|
||||||
|
columns=["user_id", "device_id", "last_seen"],
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
|
def insert_client_ip(self, user, access_token, ip, user_agent, device_id):
|
||||||
now = int(self._clock.time_msec())
|
now = int(self._clock.time_msec())
|
||||||
|
@ -13,4 +13,5 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
CREATE INDEX user_ips_device_id ON user_ips(user_id, device_id, last_seen);
|
INSERT INTO background_updates (update_name, progress_json) VALUES
|
||||||
|
('user_ips_device_index', '{}');
|
||||||
|
@ -10,7 +10,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
hs = yield setup_test_homeserver()
|
hs = yield setup_test_homeserver() # type: synapse.server.HomeServer
|
||||||
self.store = hs.get_datastore()
|
self.store = hs.get_datastore()
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
@ -20,11 +20,20 @@ class BackgroundUpdateTestCase(unittest.TestCase):
|
|||||||
"test_update", self.update_handler
|
"test_update", self.update_handler
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# run the real background updates, to get them out the way
|
||||||
|
# (perhaps we should run them as part of the test HS setup, since we
|
||||||
|
# run all of the other schema setup stuff there?)
|
||||||
|
while True:
|
||||||
|
res = yield self.store.do_next_background_update(1000)
|
||||||
|
if res is None:
|
||||||
|
break
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_do_background_update(self):
|
def test_do_background_update(self):
|
||||||
desired_count = 1000
|
desired_count = 1000
|
||||||
duration_ms = 42
|
duration_ms = 42
|
||||||
|
|
||||||
|
# first step: make a bit of progress
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def update(progress, count):
|
def update(progress, count):
|
||||||
self.clock.advance_time_msec(count * duration_ms)
|
self.clock.advance_time_msec(count * duration_ms)
|
||||||
@ -42,7 +51,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
|
|||||||
yield self.store.start_background_update("test_update", {"my_key": 1})
|
yield self.store.start_background_update("test_update", {"my_key": 1})
|
||||||
|
|
||||||
self.update_handler.reset_mock()
|
self.update_handler.reset_mock()
|
||||||
result = yield self.store.do_background_update(
|
result = yield self.store.do_next_background_update(
|
||||||
duration_ms * desired_count
|
duration_ms * desired_count
|
||||||
)
|
)
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
@ -50,15 +59,15 @@ class BackgroundUpdateTestCase(unittest.TestCase):
|
|||||||
{"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE
|
{"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# second step: complete the update
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def update(progress, count):
|
def update(progress, count):
|
||||||
yield self.store._end_background_update("test_update")
|
yield self.store._end_background_update("test_update")
|
||||||
defer.returnValue(count)
|
defer.returnValue(count)
|
||||||
|
|
||||||
self.update_handler.side_effect = update
|
self.update_handler.side_effect = update
|
||||||
|
|
||||||
self.update_handler.reset_mock()
|
self.update_handler.reset_mock()
|
||||||
result = yield self.store.do_background_update(
|
result = yield self.store.do_next_background_update(
|
||||||
duration_ms * desired_count
|
duration_ms * desired_count
|
||||||
)
|
)
|
||||||
self.assertIsNotNone(result)
|
self.assertIsNotNone(result)
|
||||||
@ -66,8 +75,9 @@ class BackgroundUpdateTestCase(unittest.TestCase):
|
|||||||
{"my_key": 2}, desired_count
|
{"my_key": 2}, desired_count
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# third step: we don't expect to be called any more
|
||||||
self.update_handler.reset_mock()
|
self.update_handler.reset_mock()
|
||||||
result = yield self.store.do_background_update(
|
result = yield self.store.do_next_background_update(
|
||||||
duration_ms * desired_count
|
duration_ms * desired_count
|
||||||
)
|
)
|
||||||
self.assertIsNone(result)
|
self.assertIsNone(result)
|
||||||
|
Loading…
Reference in New Issue
Block a user