mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-02 10:46:06 -04:00
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/disable_sql_bytes
This commit is contained in:
commit
91f43dca39
135 changed files with 2482 additions and 1700 deletions
|
@ -33,16 +33,9 @@ logger = logging.getLogger(__name__)
|
|||
LAST_SEEN_GRANULARITY = 120 * 1000
|
||||
|
||||
|
||||
class ClientIpStore(background_updates.BackgroundUpdateStore):
|
||||
class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
|
||||
self.client_ip_last_seen = Cache(
|
||||
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
|
||||
)
|
||||
|
||||
super(ClientIpStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.user_ips_max_age = hs.config.user_ips_max_age
|
||||
super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.register_background_index_update(
|
||||
"user_ips_device_index",
|
||||
|
@ -92,19 +85,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
"devices_last_seen", self._devices_last_seen_update
|
||||
)
|
||||
|
||||
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
|
||||
self._batch_row_update = {}
|
||||
|
||||
self._client_ip_looper = self._clock.looping_call(
|
||||
self._update_client_ips_batch, 5 * 1000
|
||||
)
|
||||
self.hs.get_reactor().addSystemEventTrigger(
|
||||
"before", "shutdown", self._update_client_ips_batch
|
||||
)
|
||||
|
||||
if self.user_ips_max_age:
|
||||
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _remove_user_ip_nonunique(self, progress, batch_size):
|
||||
def f(conn):
|
||||
|
@ -303,6 +283,110 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
|
||||
return batch_size
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _devices_last_seen_update(self, progress, batch_size):
|
||||
"""Background update to insert last seen info into devices table
|
||||
"""
|
||||
|
||||
last_user_id = progress.get("last_user_id", "")
|
||||
last_device_id = progress.get("last_device_id", "")
|
||||
|
||||
def _devices_last_seen_update_txn(txn):
|
||||
# This consists of two queries:
|
||||
#
|
||||
# 1. The sub-query searches for the next N devices and joins
|
||||
# against user_ips to find the max last_seen associated with
|
||||
# that device.
|
||||
# 2. The outer query then joins again against user_ips on
|
||||
# user/device/last_seen. This *should* hopefully only
|
||||
# return one row, but if it does return more than one then
|
||||
# we'll just end up updating the same device row multiple
|
||||
# times, which is fine.
|
||||
|
||||
if self.database_engine.supports_tuple_comparison:
|
||||
where_clause = "(user_id, device_id) > (?, ?)"
|
||||
where_args = [last_user_id, last_device_id]
|
||||
else:
|
||||
# We explicitly do a `user_id >= ? AND (...)` here to ensure
|
||||
# that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)`
|
||||
# makes it hard for query optimiser to tell that it can use the
|
||||
# index on user_id
|
||||
where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)"
|
||||
where_args = [last_user_id, last_user_id, last_device_id]
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
last_seen, ip, user_agent, user_id, device_id
|
||||
FROM (
|
||||
SELECT
|
||||
user_id, device_id, MAX(u.last_seen) AS last_seen
|
||||
FROM devices
|
||||
INNER JOIN user_ips AS u USING (user_id, device_id)
|
||||
WHERE %(where_clause)s
|
||||
GROUP BY user_id, device_id
|
||||
ORDER BY user_id ASC, device_id ASC
|
||||
LIMIT ?
|
||||
) c
|
||||
INNER JOIN user_ips AS u USING (user_id, device_id, last_seen)
|
||||
""" % {
|
||||
"where_clause": where_clause
|
||||
}
|
||||
txn.execute(sql, where_args + [batch_size])
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
sql = """
|
||||
UPDATE devices
|
||||
SET last_seen = ?, ip = ?, user_agent = ?
|
||||
WHERE user_id = ? AND device_id = ?
|
||||
"""
|
||||
txn.execute_batch(sql, rows)
|
||||
|
||||
_, _, _, user_id, device_id = rows[-1]
|
||||
self._background_update_progress_txn(
|
||||
txn,
|
||||
"devices_last_seen",
|
||||
{"last_user_id": user_id, "last_device_id": device_id},
|
||||
)
|
||||
|
||||
return len(rows)
|
||||
|
||||
updated = yield self.runInteraction(
|
||||
"_devices_last_seen_update", _devices_last_seen_update_txn
|
||||
)
|
||||
|
||||
if not updated:
|
||||
yield self._end_background_update("devices_last_seen")
|
||||
|
||||
return updated
|
||||
|
||||
|
||||
class ClientIpStore(ClientIpBackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
|
||||
self.client_ip_last_seen = Cache(
|
||||
name="client_ip_last_seen", keylen=4, max_entries=50000 * CACHE_SIZE_FACTOR
|
||||
)
|
||||
|
||||
super(ClientIpStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.user_ips_max_age = hs.config.user_ips_max_age
|
||||
|
||||
# (user_id, access_token, ip,) -> (user_agent, device_id, last_seen)
|
||||
self._batch_row_update = {}
|
||||
|
||||
self._client_ip_looper = self._clock.looping_call(
|
||||
self._update_client_ips_batch, 5 * 1000
|
||||
)
|
||||
self.hs.get_reactor().addSystemEventTrigger(
|
||||
"before", "shutdown", self._update_client_ips_batch
|
||||
)
|
||||
|
||||
if self.user_ips_max_age:
|
||||
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def insert_client_ip(
|
||||
self, user_id, access_token, ip, user_agent, device_id, now=None
|
||||
|
@ -454,85 +538,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
|
|||
for (access_token, ip), (user_agent, last_seen) in iteritems(results)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _devices_last_seen_update(self, progress, batch_size):
|
||||
"""Background update to insert last seen info into devices table
|
||||
"""
|
||||
|
||||
last_user_id = progress.get("last_user_id", "")
|
||||
last_device_id = progress.get("last_device_id", "")
|
||||
|
||||
def _devices_last_seen_update_txn(txn):
|
||||
# This consists of two queries:
|
||||
#
|
||||
# 1. The sub-query searches for the next N devices and joins
|
||||
# against user_ips to find the max last_seen associated with
|
||||
# that device.
|
||||
# 2. The outer query then joins again against user_ips on
|
||||
# user/device/last_seen. This *should* hopefully only
|
||||
# return one row, but if it does return more than one then
|
||||
# we'll just end up updating the same device row multiple
|
||||
# times, which is fine.
|
||||
|
||||
if self.database_engine.supports_tuple_comparison:
|
||||
where_clause = "(user_id, device_id) > (?, ?)"
|
||||
where_args = [last_user_id, last_device_id]
|
||||
else:
|
||||
# We explicitly do a `user_id >= ? AND (...)` here to ensure
|
||||
# that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)`
|
||||
# makes it hard for query optimiser to tell that it can use the
|
||||
# index on user_id
|
||||
where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)"
|
||||
where_args = [last_user_id, last_user_id, last_device_id]
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
last_seen, ip, user_agent, user_id, device_id
|
||||
FROM (
|
||||
SELECT
|
||||
user_id, device_id, MAX(u.last_seen) AS last_seen
|
||||
FROM devices
|
||||
INNER JOIN user_ips AS u USING (user_id, device_id)
|
||||
WHERE %(where_clause)s
|
||||
GROUP BY user_id, device_id
|
||||
ORDER BY user_id ASC, device_id ASC
|
||||
LIMIT ?
|
||||
) c
|
||||
INNER JOIN user_ips AS u USING (user_id, device_id, last_seen)
|
||||
""" % {
|
||||
"where_clause": where_clause
|
||||
}
|
||||
txn.execute(sql, where_args + [batch_size])
|
||||
|
||||
rows = txn.fetchall()
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
sql = """
|
||||
UPDATE devices
|
||||
SET last_seen = ?, ip = ?, user_agent = ?
|
||||
WHERE user_id = ? AND device_id = ?
|
||||
"""
|
||||
txn.execute_batch(sql, rows)
|
||||
|
||||
_, _, _, user_id, device_id = rows[-1]
|
||||
self._background_update_progress_txn(
|
||||
txn,
|
||||
"devices_last_seen",
|
||||
{"last_user_id": user_id, "last_device_id": device_id},
|
||||
)
|
||||
|
||||
return len(rows)
|
||||
|
||||
updated = yield self.runInteraction(
|
||||
"_devices_last_seen_update", _devices_last_seen_update_txn
|
||||
)
|
||||
|
||||
if not updated:
|
||||
yield self._end_background_update("devices_last_seen")
|
||||
|
||||
return updated
|
||||
|
||||
@wrap_as_background_process("prune_old_user_ips")
|
||||
async def _prune_old_user_ips(self):
|
||||
"""Removes entries in user IPs older than the configured period.
|
||||
|
|
|
@ -208,11 +208,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
|
||||
class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
|
||||
class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore):
|
||||
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(DeviceInboxStore, self).__init__(db_conn, hs)
|
||||
super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.register_background_index_update(
|
||||
"device_inbox_stream_index",
|
||||
|
@ -225,6 +225,26 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
|
|||
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_drop_index_device_inbox(self, progress, batch_size):
|
||||
def reindex_txn(conn):
|
||||
txn = conn.cursor()
|
||||
txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
|
||||
txn.close()
|
||||
|
||||
yield self.runWithConnection(reindex_txn)
|
||||
|
||||
yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
|
||||
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(DeviceInboxStore, self).__init__(db_conn, hs)
|
||||
|
||||
# Map of (user_id, device_id) to the last stream_id that has been
|
||||
# deleted up to. This is so that we can no op deletions.
|
||||
self._last_device_delete_cache = ExpiringCache(
|
||||
|
@ -435,16 +455,3 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
|
|||
return self.runInteraction(
|
||||
"get_all_new_device_messages", get_all_new_device_messages_txn
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_drop_index_device_inbox(self, progress, batch_size):
|
||||
def reindex_txn(conn):
|
||||
txn = conn.cursor()
|
||||
txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
|
||||
txn.close()
|
||||
|
||||
yield self.runWithConnection(reindex_txn)
|
||||
|
||||
yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
|
||||
|
||||
return 1
|
||||
|
|
|
@ -512,17 +512,9 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
return results
|
||||
|
||||
|
||||
class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
||||
class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(DeviceStore, self).__init__(db_conn, hs)
|
||||
|
||||
# Map of (user_id, device_id) -> bool. If there is an entry that implies
|
||||
# the device exists.
|
||||
self.device_id_exists_cache = Cache(
|
||||
name="device_id_exists", keylen=2, max_entries=10000
|
||||
)
|
||||
|
||||
self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
|
||||
super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.register_background_index_update(
|
||||
"device_lists_stream_idx",
|
||||
|
@ -555,6 +547,31 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
|||
self._drop_device_list_streams_non_unique_indexes,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
|
||||
def f(conn):
|
||||
txn = conn.cursor()
|
||||
txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
|
||||
txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
|
||||
txn.close()
|
||||
|
||||
yield self.runWithConnection(f)
|
||||
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
|
||||
return 1
|
||||
|
||||
|
||||
class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(DeviceStore, self).__init__(db_conn, hs)
|
||||
|
||||
# Map of (user_id, device_id) -> bool. If there is an entry that implies
|
||||
# the device exists.
|
||||
self.device_id_exists_cache = Cache(
|
||||
name="device_id_exists", keylen=2, max_entries=10000
|
||||
)
|
||||
|
||||
self._clock.looping_call(self._prune_old_outbound_device_pokes, 60 * 60 * 1000)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def store_device(self, user_id, device_id, initial_device_display_name):
|
||||
"""Ensure the given device is known; add it to the store if not
|
||||
|
@ -910,15 +927,3 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
|||
"_prune_old_outbound_device_pokes",
|
||||
_prune_txn,
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size):
|
||||
def f(conn):
|
||||
txn = conn.cursor()
|
||||
txn.execute("DROP INDEX IF EXISTS device_lists_remote_cache_id")
|
||||
txn.execute("DROP INDEX IF EXISTS device_lists_remote_extremeties_id")
|
||||
txn.close()
|
||||
|
||||
yield self.runWithConnection(f)
|
||||
yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
|
||||
return 1
|
||||
|
|
|
@ -15,11 +15,9 @@
|
|||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
|
||||
|
||||
class MediaRepositoryStore(BackgroundUpdateStore):
|
||||
"""Persistence for attachments and avatars"""
|
||||
|
||||
class MediaRepositoryBackgroundUpdateStore(BackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(MediaRepositoryStore, self).__init__(db_conn, hs)
|
||||
super(MediaRepositoryBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.register_background_index_update(
|
||||
update_name="local_media_repository_url_idx",
|
||||
|
@ -29,6 +27,13 @@ class MediaRepositoryStore(BackgroundUpdateStore):
|
|||
where_clause="url_cache IS NOT NULL",
|
||||
)
|
||||
|
||||
|
||||
class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
|
||||
"""Persistence for attachments and avatars"""
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(MediaRepositoryStore, self).__init__(db_conn, hs)
|
||||
|
||||
def get_local_media(self, media_id):
|
||||
"""Get the metadata for a local piece of media
|
||||
Returns:
|
||||
|
|
|
@ -183,8 +183,8 @@ class PushRulesWorkerStore(
|
|||
return results
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def move_push_rule_from_room_to_room(self, new_room_id, user_id, rule):
|
||||
"""Move a single push rule from one room to another for a specific user.
|
||||
def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule):
|
||||
"""Copy a single push rule from one room to another for a specific user.
|
||||
|
||||
Args:
|
||||
new_room_id (str): ID of the new room.
|
||||
|
@ -209,14 +209,11 @@ class PushRulesWorkerStore(
|
|||
actions=rule["actions"],
|
||||
)
|
||||
|
||||
# Delete push rule for the old room
|
||||
yield self.delete_push_rule(user_id, rule["rule_id"])
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def move_push_rules_from_room_to_room_for_user(
|
||||
def copy_push_rules_from_room_to_room_for_user(
|
||||
self, old_room_id, new_room_id, user_id
|
||||
):
|
||||
"""Move all of the push rules from one room to another for a specific
|
||||
"""Copy all of the push rules from one room to another for a specific
|
||||
user.
|
||||
|
||||
Args:
|
||||
|
@ -227,15 +224,14 @@ class PushRulesWorkerStore(
|
|||
# Retrieve push rules for this user
|
||||
user_push_rules = yield self.get_push_rules_for_user(user_id)
|
||||
|
||||
# Get rules relating to the old room, move them to the new room, then
|
||||
# delete them from the old room
|
||||
# Get rules relating to the old room and copy them to the new room
|
||||
for rule in user_push_rules:
|
||||
conditions = rule.get("conditions", [])
|
||||
if any(
|
||||
(c.get("key") == "room_id" and c.get("pattern") == old_room_id)
|
||||
for c in conditions
|
||||
):
|
||||
self.move_push_rule_from_room_to_room(new_room_id, user_id, rule)
|
||||
yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def bulk_get_push_rules_for_room(self, event, context):
|
||||
|
|
|
@ -493,7 +493,9 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
def _find_next_generated_user_id(txn):
|
||||
txn.execute("SELECT name FROM users")
|
||||
# We bound between '@1' and '@a' to avoid pulling the entire table
|
||||
# out.
|
||||
txn.execute("SELECT name FROM users WHERE '@1' <= name AND name < '@a'")
|
||||
|
||||
regex = re.compile(r"^@(\d+):")
|
||||
|
||||
|
@ -785,13 +787,14 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
)
|
||||
|
||||
|
||||
class RegistrationStore(
|
||||
class RegistrationBackgroundUpdateStore(
|
||||
RegistrationWorkerStore, background_updates.BackgroundUpdateStore
|
||||
):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(RegistrationStore, self).__init__(db_conn, hs)
|
||||
super(RegistrationBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.clock = hs.get_clock()
|
||||
self.config = hs.config
|
||||
|
||||
self.register_background_index_update(
|
||||
"access_tokens_device_index",
|
||||
|
@ -807,8 +810,6 @@ class RegistrationStore(
|
|||
columns=["creation_ts"],
|
||||
)
|
||||
|
||||
self._account_validity = hs.config.account_validity
|
||||
|
||||
# we no longer use refresh tokens, but it's possible that some people
|
||||
# might have a background update queued to build this index. Just
|
||||
# clear the background update.
|
||||
|
@ -822,17 +823,6 @@ class RegistrationStore(
|
|||
"users_set_deactivated_flag", self._background_update_set_deactivated_flag
|
||||
)
|
||||
|
||||
# Create a background job for culling expired 3PID validity tokens
|
||||
def start_cull():
|
||||
# run as a background process to make sure that the database transactions
|
||||
# have a logcontext to report to
|
||||
return run_as_background_process(
|
||||
"cull_expired_threepid_validation_tokens",
|
||||
self.cull_expired_threepid_validation_tokens,
|
||||
)
|
||||
|
||||
hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_update_set_deactivated_flag(self, progress, batch_size):
|
||||
"""Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
|
||||
|
@ -894,6 +884,54 @@ class RegistrationStore(
|
|||
|
||||
return nb_processed
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _bg_user_threepids_grandfather(self, progress, batch_size):
|
||||
"""We now track which identity servers a user binds their 3PID to, so
|
||||
we need to handle the case of existing bindings where we didn't track
|
||||
this.
|
||||
|
||||
We do this by grandfathering in existing user threepids assuming that
|
||||
they used one of the server configured trusted identity servers.
|
||||
"""
|
||||
id_servers = set(self.config.trusted_third_party_id_servers)
|
||||
|
||||
def _bg_user_threepids_grandfather_txn(txn):
|
||||
sql = """
|
||||
INSERT INTO user_threepid_id_server
|
||||
(user_id, medium, address, id_server)
|
||||
SELECT user_id, medium, address, ?
|
||||
FROM user_threepids
|
||||
"""
|
||||
|
||||
txn.executemany(sql, [(id_server,) for id_server in id_servers])
|
||||
|
||||
if id_servers:
|
||||
yield self.runInteraction(
|
||||
"_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
|
||||
)
|
||||
|
||||
yield self._end_background_update("user_threepids_grandfather")
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
class RegistrationStore(RegistrationBackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(RegistrationStore, self).__init__(db_conn, hs)
|
||||
|
||||
self._account_validity = hs.config.account_validity
|
||||
|
||||
# Create a background job for culling expired 3PID validity tokens
|
||||
def start_cull():
|
||||
# run as a background process to make sure that the database transactions
|
||||
# have a logcontext to report to
|
||||
return run_as_background_process(
|
||||
"cull_expired_threepid_validation_tokens",
|
||||
self.cull_expired_threepid_validation_tokens,
|
||||
)
|
||||
|
||||
hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_access_token_to_user(self, user_id, token, device_id, valid_until_ms):
|
||||
"""Adds an access token for the given user.
|
||||
|
@ -1242,36 +1280,6 @@ class RegistrationStore(
|
|||
desc="get_users_pending_deactivation",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _bg_user_threepids_grandfather(self, progress, batch_size):
|
||||
"""We now track which identity servers a user binds their 3PID to, so
|
||||
we need to handle the case of existing bindings where we didn't track
|
||||
this.
|
||||
|
||||
We do this by grandfathering in existing user threepids assuming that
|
||||
they used one of the server configured trusted identity servers.
|
||||
"""
|
||||
id_servers = set(self.config.trusted_third_party_id_servers)
|
||||
|
||||
def _bg_user_threepids_grandfather_txn(txn):
|
||||
sql = """
|
||||
INSERT INTO user_threepid_id_server
|
||||
(user_id, medium, address, id_server)
|
||||
SELECT user_id, medium, address, ?
|
||||
FROM user_threepids
|
||||
"""
|
||||
|
||||
txn.executemany(sql, [(id_server,) for id_server in id_servers])
|
||||
|
||||
if id_servers:
|
||||
yield self.runInteraction(
|
||||
"_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
|
||||
)
|
||||
|
||||
yield self._end_background_update("user_threepids_grandfather")
|
||||
|
||||
return 1
|
||||
|
||||
def validate_threepid_session(self, session_id, client_secret, token, current_ts):
|
||||
"""Attempt to validate a threepid session using a token
|
||||
|
||||
|
@ -1463,17 +1471,6 @@ class RegistrationStore(
|
|||
self.clock.time_msec(),
|
||||
)
|
||||
|
||||
def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
|
||||
self._simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"deactivated": 1 if deactivated else 0},
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_deactivated_status, (user_id,)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def set_user_deactivated_status(self, user_id, deactivated):
|
||||
"""Set the `deactivated` property for the provided user to the provided value.
|
||||
|
@ -1489,3 +1486,14 @@ class RegistrationStore(
|
|||
user_id,
|
||||
deactivated,
|
||||
)
|
||||
|
||||
def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
|
||||
self._simple_update_one_txn(
|
||||
txn=txn,
|
||||
table="users",
|
||||
keyvalues={"name": user_id},
|
||||
updatevalues={"deactivated": 1 if deactivated else 0},
|
||||
)
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_user_deactivated_status, (user_id,)
|
||||
)
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2014-2016 OpenMarket Ltd
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -16,6 +17,7 @@
|
|||
import collections
|
||||
import logging
|
||||
import re
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
|
@ -24,6 +26,7 @@ from twisted.internet import defer
|
|||
from synapse.api.errors import StoreError
|
||||
from synapse.storage._base import SQLBaseStore
|
||||
from synapse.storage.search import SearchStore
|
||||
from synapse.types import ThirdPartyInstanceID
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -63,103 +66,196 @@ class RoomWorkerStore(SQLBaseStore):
|
|||
desc="get_public_room_ids",
|
||||
)
|
||||
|
||||
@cached(num_args=2, max_entries=100)
|
||||
def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
|
||||
"""Get pulbic rooms for a particular list, or across all lists.
|
||||
def count_public_rooms(self, network_tuple, ignore_non_federatable):
|
||||
"""Counts the number of public rooms as tracked in the room_stats_current
|
||||
and room_stats_state table.
|
||||
|
||||
Args:
|
||||
stream_id (int)
|
||||
network_tuple (ThirdPartyInstanceID): The list to use (None, None)
|
||||
means the main list, None means all lsits.
|
||||
network_tuple (ThirdPartyInstanceID|None)
|
||||
ignore_non_federatable (bool): If true filters out non-federatable rooms
|
||||
"""
|
||||
return self.runInteraction(
|
||||
"get_public_room_ids_at_stream_id",
|
||||
self.get_public_room_ids_at_stream_id_txn,
|
||||
stream_id,
|
||||
network_tuple=network_tuple,
|
||||
)
|
||||
|
||||
def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, network_tuple):
|
||||
return {
|
||||
rm
|
||||
for rm, vis in self.get_published_at_stream_id_txn(
|
||||
txn, stream_id, network_tuple=network_tuple
|
||||
).items()
|
||||
if vis
|
||||
}
|
||||
def _count_public_rooms_txn(txn):
|
||||
query_args = []
|
||||
|
||||
def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
|
||||
if network_tuple:
|
||||
# We want to get from a particular list. No aggregation required.
|
||||
|
||||
sql = """
|
||||
SELECT room_id, visibility FROM public_room_list_stream
|
||||
INNER JOIN (
|
||||
SELECT room_id, max(stream_id) AS stream_id
|
||||
FROM public_room_list_stream
|
||||
WHERE stream_id <= ? %s
|
||||
GROUP BY room_id
|
||||
) grouped USING (room_id, stream_id)
|
||||
if network_tuple:
|
||||
if network_tuple.appservice_id:
|
||||
published_sql = """
|
||||
SELECT room_id from appservice_room_list
|
||||
WHERE appservice_id = ? AND network_id = ?
|
||||
"""
|
||||
query_args.append(network_tuple.appservice_id)
|
||||
query_args.append(network_tuple.network_id)
|
||||
else:
|
||||
published_sql = """
|
||||
SELECT room_id FROM rooms WHERE is_public
|
||||
"""
|
||||
else:
|
||||
published_sql = """
|
||||
SELECT room_id FROM rooms WHERE is_public
|
||||
UNION SELECT room_id from appservice_room_list
|
||||
"""
|
||||
|
||||
if network_tuple.appservice_id is not None:
|
||||
txn.execute(
|
||||
sql % ("AND appservice_id = ? AND network_id = ?",),
|
||||
(stream_id, network_tuple.appservice_id, network_tuple.network_id),
|
||||
sql = """
|
||||
SELECT
|
||||
COALESCE(COUNT(*), 0)
|
||||
FROM (
|
||||
%(published_sql)s
|
||||
) published
|
||||
INNER JOIN room_stats_state USING (room_id)
|
||||
INNER JOIN room_stats_current USING (room_id)
|
||||
WHERE
|
||||
(
|
||||
join_rules = 'public' OR history_visibility = 'world_readable'
|
||||
)
|
||||
AND joined_members > 0
|
||||
""" % {
|
||||
"published_sql": published_sql
|
||||
}
|
||||
|
||||
txn.execute(sql, query_args)
|
||||
return txn.fetchone()[0]
|
||||
|
||||
return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_largest_public_rooms(
|
||||
self,
|
||||
network_tuple: Optional[ThirdPartyInstanceID],
|
||||
search_filter: Optional[dict],
|
||||
limit: Optional[int],
|
||||
bounds: Optional[Tuple[int, str]],
|
||||
forwards: bool,
|
||||
ignore_non_federatable: bool = False,
|
||||
):
|
||||
"""Gets the largest public rooms (where largest is in terms of joined
|
||||
members, as tracked in the statistics table).
|
||||
|
||||
Args:
|
||||
network_tuple
|
||||
search_filter
|
||||
limit: Maxmimum number of rows to return, unlimited otherwise.
|
||||
bounds: An uppoer or lower bound to apply to result set if given,
|
||||
consists of a joined member count and room_id (these are
|
||||
excluded from result set).
|
||||
forwards: true iff going forwards, going backwards otherwise
|
||||
ignore_non_federatable: If true filters out non-federatable rooms.
|
||||
|
||||
Returns:
|
||||
Rooms in order: biggest number of joined users first.
|
||||
We then arbitrarily use the room_id as a tie breaker.
|
||||
|
||||
"""
|
||||
|
||||
where_clauses = []
|
||||
query_args = []
|
||||
|
||||
if network_tuple:
|
||||
if network_tuple.appservice_id:
|
||||
published_sql = """
|
||||
SELECT room_id from appservice_room_list
|
||||
WHERE appservice_id = ? AND network_id = ?
|
||||
"""
|
||||
query_args.append(network_tuple.appservice_id)
|
||||
query_args.append(network_tuple.network_id)
|
||||
else:
|
||||
published_sql = """
|
||||
SELECT room_id FROM rooms WHERE is_public
|
||||
"""
|
||||
else:
|
||||
published_sql = """
|
||||
SELECT room_id FROM rooms WHERE is_public
|
||||
UNION SELECT room_id from appservice_room_list
|
||||
"""
|
||||
|
||||
# Work out the bounds if we're given them, these bounds look slightly
|
||||
# odd, but are designed to help query planner use indices by pulling
|
||||
# out a common bound.
|
||||
if bounds:
|
||||
last_joined_members, last_room_id = bounds
|
||||
if forwards:
|
||||
where_clauses.append(
|
||||
"""
|
||||
joined_members <= ? AND (
|
||||
joined_members < ? OR room_id < ?
|
||||
)
|
||||
"""
|
||||
)
|
||||
else:
|
||||
txn.execute(sql % ("AND appservice_id IS NULL",), (stream_id,))
|
||||
return dict(txn)
|
||||
else:
|
||||
# We want to get from all lists, so we need to aggregate the results
|
||||
where_clauses.append(
|
||||
"""
|
||||
joined_members >= ? AND (
|
||||
joined_members > ? OR room_id > ?
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
logger.info("Executing full list")
|
||||
query_args += [last_joined_members, last_joined_members, last_room_id]
|
||||
|
||||
sql = """
|
||||
SELECT room_id, visibility
|
||||
FROM public_room_list_stream
|
||||
INNER JOIN (
|
||||
SELECT
|
||||
room_id, max(stream_id) AS stream_id, appservice_id,
|
||||
network_id
|
||||
FROM public_room_list_stream
|
||||
WHERE stream_id <= ?
|
||||
GROUP BY room_id, appservice_id, network_id
|
||||
) grouped USING (room_id, stream_id)
|
||||
if ignore_non_federatable:
|
||||
where_clauses.append("is_federatable")
|
||||
|
||||
if search_filter and search_filter.get("generic_search_term", None):
|
||||
search_term = "%" + search_filter["generic_search_term"] + "%"
|
||||
|
||||
where_clauses.append(
|
||||
"""
|
||||
(
|
||||
name LIKE ?
|
||||
OR topic LIKE ?
|
||||
OR canonical_alias LIKE ?
|
||||
)
|
||||
"""
|
||||
)
|
||||
query_args += [search_term, search_term, search_term]
|
||||
|
||||
where_clause = ""
|
||||
if where_clauses:
|
||||
where_clause = " AND " + " AND ".join(where_clauses)
|
||||
|
||||
sql = """
|
||||
SELECT
|
||||
room_id, name, topic, canonical_alias, joined_members,
|
||||
avatar, history_visibility, joined_members, guest_access
|
||||
FROM (
|
||||
%(published_sql)s
|
||||
) published
|
||||
INNER JOIN room_stats_state USING (room_id)
|
||||
INNER JOIN room_stats_current USING (room_id)
|
||||
WHERE
|
||||
(
|
||||
join_rules = 'public' OR history_visibility = 'world_readable'
|
||||
)
|
||||
AND joined_members > 0
|
||||
%(where_clause)s
|
||||
ORDER BY joined_members %(dir)s, room_id %(dir)s
|
||||
""" % {
|
||||
"published_sql": published_sql,
|
||||
"where_clause": where_clause,
|
||||
"dir": "DESC" if forwards else "ASC",
|
||||
}
|
||||
|
||||
if limit is not None:
|
||||
query_args.append(limit)
|
||||
|
||||
sql += """
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
txn.execute(sql, (stream_id,))
|
||||
def _get_largest_public_rooms_txn(txn):
|
||||
txn.execute(sql, query_args)
|
||||
|
||||
results = {}
|
||||
# A room is visible if its visible on any list.
|
||||
for room_id, visibility in txn:
|
||||
results[room_id] = bool(visibility) or results.get(room_id, False)
|
||||
results = self.cursor_to_dict(txn)
|
||||
|
||||
if not forwards:
|
||||
results.reverse()
|
||||
|
||||
return results
|
||||
|
||||
def get_public_room_changes(self, prev_stream_id, new_stream_id, network_tuple):
|
||||
def get_public_room_changes_txn(txn):
|
||||
then_rooms = self.get_public_room_ids_at_stream_id_txn(
|
||||
txn, prev_stream_id, network_tuple
|
||||
)
|
||||
|
||||
now_rooms_dict = self.get_published_at_stream_id_txn(
|
||||
txn, new_stream_id, network_tuple
|
||||
)
|
||||
|
||||
now_rooms_visible = set(rm for rm, vis in now_rooms_dict.items() if vis)
|
||||
now_rooms_not_visible = set(
|
||||
rm for rm, vis in now_rooms_dict.items() if not vis
|
||||
)
|
||||
|
||||
newly_visible = now_rooms_visible - then_rooms
|
||||
newly_unpublished = now_rooms_not_visible & then_rooms
|
||||
|
||||
return newly_visible, newly_unpublished
|
||||
|
||||
return self.runInteraction(
|
||||
"get_public_room_changes", get_public_room_changes_txn
|
||||
ret_val = yield self.runInteraction(
|
||||
"get_largest_public_rooms", _get_largest_public_rooms_txn
|
||||
)
|
||||
defer.returnValue(ret_val)
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def is_room_blocked(self, room_id):
|
||||
|
|
|
@ -27,12 +27,14 @@ from synapse.api.constants import EventTypes, Membership
|
|||
from synapse.metrics import LaterGauge
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage._base import LoggingTransaction
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
from synapse.storage.engines import Sqlite3Engine
|
||||
from synapse.storage.events_worker import EventsWorkerStore
|
||||
from synapse.types import get_domain_from_id
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
from synapse.util.caches import intern_string
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
||||
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.stringutils import to_ascii
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -483,6 +485,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_joined_users_from_state(self, room_id, state_entry):
|
||||
state_group = state_entry.state_group
|
||||
if not state_group:
|
||||
|
@ -492,9 +495,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
# To do this we set the state_group to a new object as object() != object()
|
||||
state_group = object()
|
||||
|
||||
return self._get_joined_users_from_context(
|
||||
room_id, state_group, state_entry.state, context=state_entry
|
||||
)
|
||||
with Measure(self._clock, "get_joined_users_from_state"):
|
||||
return (
|
||||
yield self._get_joined_users_from_context(
|
||||
room_id, state_group, state_entry.state, context=state_entry
|
||||
)
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks(
|
||||
num_args=2, cache_context=True, iterable=True, max_entries=100000
|
||||
|
@ -567,25 +573,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
missing_member_event_ids.append(event_id)
|
||||
|
||||
if missing_member_event_ids:
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="room_memberships",
|
||||
column="event_id",
|
||||
iterable=missing_member_event_ids,
|
||||
retcols=("user_id", "display_name", "avatar_url"),
|
||||
keyvalues={"membership": Membership.JOIN},
|
||||
batch_size=500,
|
||||
desc="_get_joined_users_from_context",
|
||||
)
|
||||
|
||||
users_in_room.update(
|
||||
{
|
||||
to_ascii(row["user_id"]): ProfileInfo(
|
||||
avatar_url=to_ascii(row["avatar_url"]),
|
||||
display_name=to_ascii(row["display_name"]),
|
||||
)
|
||||
for row in rows
|
||||
}
|
||||
event_to_memberships = yield self._get_joined_profiles_from_event_ids(
|
||||
missing_member_event_ids
|
||||
)
|
||||
users_in_room.update((row for row in event_to_memberships.values() if row))
|
||||
|
||||
if event is not None and event.type == EventTypes.Member:
|
||||
if event.membership == Membership.JOIN:
|
||||
|
@ -597,6 +588,47 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
|
||||
return users_in_room
|
||||
|
||||
@cached(max_entries=10000)
|
||||
def _get_joined_profile_from_event_id(self, event_id):
|
||||
raise NotImplementedError()
|
||||
|
||||
@cachedList(
|
||||
cached_method_name="_get_joined_profile_from_event_id",
|
||||
list_name="event_ids",
|
||||
inlineCallbacks=True,
|
||||
)
|
||||
def _get_joined_profiles_from_event_ids(self, event_ids):
|
||||
"""For given set of member event_ids check if they point to a join
|
||||
event and if so return the associated user and profile info.
|
||||
|
||||
Args:
|
||||
event_ids (Iterable[str]): The member event IDs to lookup
|
||||
|
||||
Returns:
|
||||
Deferred[dict[str, Tuple[str, ProfileInfo]|None]]: Map from event ID
|
||||
to `user_id` and ProfileInfo (or None if not join event).
|
||||
"""
|
||||
|
||||
rows = yield self._simple_select_many_batch(
|
||||
table="room_memberships",
|
||||
column="event_id",
|
||||
iterable=event_ids,
|
||||
retcols=("user_id", "display_name", "avatar_url", "event_id"),
|
||||
keyvalues={"membership": Membership.JOIN},
|
||||
batch_size=500,
|
||||
desc="_get_membership_from_event_ids",
|
||||
)
|
||||
|
||||
return {
|
||||
row["event_id"]: (
|
||||
row["user_id"],
|
||||
ProfileInfo(
|
||||
avatar_url=row["avatar_url"], display_name=row["display_name"]
|
||||
),
|
||||
)
|
||||
for row in rows
|
||||
}
|
||||
|
||||
@cachedInlineCallbacks(max_entries=10000)
|
||||
def is_host_joined(self, room_id, host):
|
||||
if "%" in host or "_" in host:
|
||||
|
@ -669,6 +701,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
|
||||
return True
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_joined_hosts(self, room_id, state_entry):
|
||||
state_group = state_entry.state_group
|
||||
if not state_group:
|
||||
|
@ -678,9 +711,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
# To do this we set the state_group to a new object as object() != object()
|
||||
state_group = object()
|
||||
|
||||
return self._get_joined_hosts(
|
||||
room_id, state_group, state_entry.state, state_entry=state_entry
|
||||
)
|
||||
with Measure(self._clock, "get_joined_hosts"):
|
||||
return (
|
||||
yield self._get_joined_hosts(
|
||||
room_id, state_group, state_entry.state, state_entry=state_entry
|
||||
)
|
||||
)
|
||||
|
||||
@cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True)
|
||||
# @defer.inlineCallbacks
|
||||
|
@ -785,9 +821,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
|
|||
return set(room_ids)
|
||||
|
||||
|
||||
class RoomMemberStore(RoomMemberWorkerStore):
|
||||
class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(RoomMemberStore, self).__init__(db_conn, hs)
|
||||
super(RoomMemberBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||
self.register_background_update_handler(
|
||||
_MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
|
||||
)
|
||||
|
@ -803,112 +839,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
|||
where_clause="forgotten = 1",
|
||||
)
|
||||
|
||||
def _store_room_members_txn(self, txn, events, backfilled):
|
||||
"""Store a room member in the database.
|
||||
"""
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="room_memberships",
|
||||
values=[
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"user_id": event.state_key,
|
||||
"sender": event.user_id,
|
||||
"room_id": event.room_id,
|
||||
"membership": event.membership,
|
||||
"display_name": event.content.get("displayname", None),
|
||||
"avatar_url": event.content.get("avatar_url", None),
|
||||
}
|
||||
for event in events
|
||||
],
|
||||
)
|
||||
|
||||
for event in events:
|
||||
txn.call_after(
|
||||
self._membership_stream_cache.entity_has_changed,
|
||||
event.state_key,
|
||||
event.internal_metadata.stream_ordering,
|
||||
)
|
||||
txn.call_after(
|
||||
self.get_invited_rooms_for_user.invalidate, (event.state_key,)
|
||||
)
|
||||
|
||||
# We update the local_invites table only if the event is "current",
|
||||
# i.e., its something that has just happened. If the event is an
|
||||
# outlier it is only current if its an "out of band membership",
|
||||
# like a remote invite or a rejection of a remote invite.
|
||||
is_new_state = not backfilled and (
|
||||
not event.internal_metadata.is_outlier()
|
||||
or event.internal_metadata.is_out_of_band_membership()
|
||||
)
|
||||
is_mine = self.hs.is_mine_id(event.state_key)
|
||||
if is_new_state and is_mine:
|
||||
if event.membership == Membership.INVITE:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="local_invites",
|
||||
values={
|
||||
"event_id": event.event_id,
|
||||
"invitee": event.state_key,
|
||||
"inviter": event.sender,
|
||||
"room_id": event.room_id,
|
||||
"stream_id": event.internal_metadata.stream_ordering,
|
||||
},
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
|
||||
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
|
||||
" AND replaced_by is NULL"
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
event.internal_metadata.stream_ordering,
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
event.state_key,
|
||||
),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def locally_reject_invite(self, user_id, room_id):
|
||||
sql = (
|
||||
"UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
|
||||
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
|
||||
" AND replaced_by is NULL"
|
||||
)
|
||||
|
||||
def f(txn, stream_ordering):
|
||||
txn.execute(sql, (stream_ordering, True, room_id, user_id))
|
||||
|
||||
with self._stream_id_gen.get_next() as stream_ordering:
|
||||
yield self.runInteraction("locally_reject_invite", f, stream_ordering)
|
||||
|
||||
def forget(self, user_id, room_id):
|
||||
"""Indicate that user_id wishes to discard history for room_id."""
|
||||
|
||||
def f(txn):
|
||||
sql = (
|
||||
"UPDATE"
|
||||
" room_memberships"
|
||||
" SET"
|
||||
" forgotten = 1"
|
||||
" WHERE"
|
||||
" user_id = ?"
|
||||
" AND"
|
||||
" room_id = ?"
|
||||
)
|
||||
txn.execute(sql, (user_id, room_id))
|
||||
|
||||
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_forgotten_rooms_for_user, (user_id,)
|
||||
)
|
||||
|
||||
return self.runInteraction("forget_membership", f)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_add_membership_profile(self, progress, batch_size):
|
||||
target_min_stream_id = progress.get(
|
||||
|
@ -1043,6 +973,117 @@ class RoomMemberStore(RoomMemberWorkerStore):
|
|||
return row_count
|
||||
|
||||
|
||||
class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(RoomMemberStore, self).__init__(db_conn, hs)
|
||||
|
||||
def _store_room_members_txn(self, txn, events, backfilled):
|
||||
"""Store a room member in the database.
|
||||
"""
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="room_memberships",
|
||||
values=[
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"user_id": event.state_key,
|
||||
"sender": event.user_id,
|
||||
"room_id": event.room_id,
|
||||
"membership": event.membership,
|
||||
"display_name": event.content.get("displayname", None),
|
||||
"avatar_url": event.content.get("avatar_url", None),
|
||||
}
|
||||
for event in events
|
||||
],
|
||||
)
|
||||
|
||||
for event in events:
|
||||
txn.call_after(
|
||||
self._membership_stream_cache.entity_has_changed,
|
||||
event.state_key,
|
||||
event.internal_metadata.stream_ordering,
|
||||
)
|
||||
txn.call_after(
|
||||
self.get_invited_rooms_for_user.invalidate, (event.state_key,)
|
||||
)
|
||||
|
||||
# We update the local_invites table only if the event is "current",
|
||||
# i.e., its something that has just happened. If the event is an
|
||||
# outlier it is only current if its an "out of band membership",
|
||||
# like a remote invite or a rejection of a remote invite.
|
||||
is_new_state = not backfilled and (
|
||||
not event.internal_metadata.is_outlier()
|
||||
or event.internal_metadata.is_out_of_band_membership()
|
||||
)
|
||||
is_mine = self.hs.is_mine_id(event.state_key)
|
||||
if is_new_state and is_mine:
|
||||
if event.membership == Membership.INVITE:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
table="local_invites",
|
||||
values={
|
||||
"event_id": event.event_id,
|
||||
"invitee": event.state_key,
|
||||
"inviter": event.sender,
|
||||
"room_id": event.room_id,
|
||||
"stream_id": event.internal_metadata.stream_ordering,
|
||||
},
|
||||
)
|
||||
else:
|
||||
sql = (
|
||||
"UPDATE local_invites SET stream_id = ?, replaced_by = ? WHERE"
|
||||
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
|
||||
" AND replaced_by is NULL"
|
||||
)
|
||||
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
event.internal_metadata.stream_ordering,
|
||||
event.event_id,
|
||||
event.room_id,
|
||||
event.state_key,
|
||||
),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def locally_reject_invite(self, user_id, room_id):
|
||||
sql = (
|
||||
"UPDATE local_invites SET stream_id = ?, locally_rejected = ? WHERE"
|
||||
" room_id = ? AND invitee = ? AND locally_rejected is NULL"
|
||||
" AND replaced_by is NULL"
|
||||
)
|
||||
|
||||
def f(txn, stream_ordering):
|
||||
txn.execute(sql, (stream_ordering, True, room_id, user_id))
|
||||
|
||||
with self._stream_id_gen.get_next() as stream_ordering:
|
||||
yield self.runInteraction("locally_reject_invite", f, stream_ordering)
|
||||
|
||||
def forget(self, user_id, room_id):
|
||||
"""Indicate that user_id wishes to discard history for room_id."""
|
||||
|
||||
def f(txn):
|
||||
sql = (
|
||||
"UPDATE"
|
||||
" room_memberships"
|
||||
" SET"
|
||||
" forgotten = 1"
|
||||
" WHERE"
|
||||
" user_id = ?"
|
||||
" AND"
|
||||
" room_id = ?"
|
||||
)
|
||||
txn.execute(sql, (user_id, room_id))
|
||||
|
||||
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.get_forgotten_rooms_for_user, (user_id,)
|
||||
)
|
||||
|
||||
return self.runInteraction("forget_membership", f)
|
||||
|
||||
|
||||
class _JoinedHostsCache(object):
|
||||
"""Cache for joined hosts in a room that is optimised to handle updates
|
||||
via state deltas.
|
||||
|
|
20
synapse/storage/schema/delta/56/drop_unused_event_tables.sql
Normal file
20
synapse/storage/schema/delta/56/drop_unused_event_tables.sql
Normal file
|
@ -0,0 +1,20 @@
|
|||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- these tables are never used.
|
||||
DROP TABLE IF EXISTS room_names;
|
||||
DROP TABLE IF EXISTS topics;
|
||||
DROP TABLE IF EXISTS history_visibility;
|
||||
DROP TABLE IF EXISTS guest_access;
|
16
synapse/storage/schema/delta/56/public_room_list_idx.sql
Normal file
16
synapse/storage/schema/delta/56/public_room_list_idx.sql
Normal file
|
@ -0,0 +1,16 @@
|
|||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
CREATE INDEX public_room_list_stream_network ON public_room_list_stream (appservice_id, network_id, room_id);
|
52
synapse/storage/schema/delta/56/unique_user_filter_index.py
Normal file
52
synapse/storage/schema/delta/56/unique_user_filter_index.py
Normal file
|
@ -0,0 +1,52 @@
|
|||
import logging
|
||||
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
"""
|
||||
This migration updates the user_filters table as follows:
|
||||
|
||||
- drops any (user_id, filter_id) duplicates
|
||||
- makes the columns NON-NULLable
|
||||
- turns the index into a UNIQUE index
|
||||
"""
|
||||
|
||||
|
||||
def run_upgrade(cur, database_engine, *args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
def run_create(cur, database_engine, *args, **kwargs):
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
select_clause = """
|
||||
SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json
|
||||
FROM user_filters
|
||||
"""
|
||||
else:
|
||||
select_clause = """
|
||||
SELECT * FROM user_filters GROUP BY user_id, filter_id
|
||||
"""
|
||||
sql = """
|
||||
DROP TABLE IF EXISTS user_filters_migration;
|
||||
DROP INDEX IF EXISTS user_filters_unique;
|
||||
CREATE TABLE user_filters_migration (
|
||||
user_id TEXT NOT NULL,
|
||||
filter_id BIGINT NOT NULL,
|
||||
filter_json BYTEA NOT NULL
|
||||
);
|
||||
INSERT INTO user_filters_migration (user_id, filter_id, filter_json)
|
||||
%s;
|
||||
CREATE UNIQUE INDEX user_filters_unique ON user_filters_migration
|
||||
(user_id, filter_id);
|
||||
DROP TABLE user_filters;
|
||||
ALTER TABLE user_filters_migration RENAME TO user_filters;
|
||||
""" % (
|
||||
select_clause,
|
||||
)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
cur.execute(sql)
|
||||
else:
|
||||
cur.executescript(sql)
|
|
@ -36,7 +36,7 @@ SearchEntry = namedtuple(
|
|||
)
|
||||
|
||||
|
||||
class SearchStore(BackgroundUpdateStore):
|
||||
class SearchBackgroundUpdateStore(BackgroundUpdateStore):
|
||||
|
||||
EVENT_SEARCH_UPDATE_NAME = "event_search"
|
||||
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
|
||||
|
@ -44,7 +44,7 @@ class SearchStore(BackgroundUpdateStore):
|
|||
EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(SearchStore, self).__init__(db_conn, hs)
|
||||
super(SearchBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||
|
||||
if not hs.config.enable_search:
|
||||
return
|
||||
|
@ -289,29 +289,6 @@ class SearchStore(BackgroundUpdateStore):
|
|||
|
||||
return num_rows
|
||||
|
||||
def store_event_search_txn(self, txn, event, key, value):
|
||||
"""Add event to the search table
|
||||
|
||||
Args:
|
||||
txn (cursor):
|
||||
event (EventBase):
|
||||
key (str):
|
||||
value (str):
|
||||
"""
|
||||
self.store_search_entries_txn(
|
||||
txn,
|
||||
(
|
||||
SearchEntry(
|
||||
key=key,
|
||||
value=value,
|
||||
event_id=event.event_id,
|
||||
room_id=event.room_id,
|
||||
stream_ordering=event.internal_metadata.stream_ordering,
|
||||
origin_server_ts=event.origin_server_ts,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
def store_search_entries_txn(self, txn, entries):
|
||||
"""Add entries to the search table
|
||||
|
||||
|
@ -358,6 +335,34 @@ class SearchStore(BackgroundUpdateStore):
|
|||
# This should be unreachable.
|
||||
raise Exception("Unrecognized database engine")
|
||||
|
||||
|
||||
class SearchStore(SearchBackgroundUpdateStore):
|
||||
def __init__(self, db_conn, hs):
|
||||
super(SearchStore, self).__init__(db_conn, hs)
|
||||
|
||||
def store_event_search_txn(self, txn, event, key, value):
|
||||
"""Add event to the search table
|
||||
|
||||
Args:
|
||||
txn (cursor):
|
||||
event (EventBase):
|
||||
key (str):
|
||||
value (str):
|
||||
"""
|
||||
self.store_search_entries_txn(
|
||||
txn,
|
||||
(
|
||||
SearchEntry(
|
||||
key=key,
|
||||
value=value,
|
||||
event_id=event.event_id,
|
||||
room_id=event.room_id,
|
||||
stream_ordering=event.internal_metadata.stream_ordering,
|
||||
origin_server_ts=event.origin_server_ts,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def search_msgs(self, room_ids, search_term, keys):
|
||||
"""Performs a full text search over events with given keys.
|
||||
|
|
|
@ -353,8 +353,158 @@ class StateFilter(object):
|
|||
return member_filter, non_member_filter
|
||||
|
||||
|
||||
class StateGroupBackgroundUpdateStore(SQLBaseStore):
|
||||
"""Defines functions related to state groups needed to run the state backgroud
|
||||
updates.
|
||||
"""
|
||||
|
||||
def _count_state_group_hops_txn(self, txn, state_group):
|
||||
"""Given a state group, count how many hops there are in the tree.
|
||||
|
||||
This is used to ensure the delta chains don't get too long.
|
||||
"""
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = """
|
||||
WITH RECURSIVE state(state_group) AS (
|
||||
VALUES(?::bigint)
|
||||
UNION ALL
|
||||
SELECT prev_state_group FROM state_group_edges e, state s
|
||||
WHERE s.state_group = e.state_group
|
||||
)
|
||||
SELECT count(*) FROM state;
|
||||
"""
|
||||
|
||||
txn.execute(sql, (state_group,))
|
||||
row = txn.fetchone()
|
||||
if row and row[0]:
|
||||
return row[0]
|
||||
else:
|
||||
return 0
|
||||
else:
|
||||
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
||||
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
||||
next_group = state_group
|
||||
count = 0
|
||||
|
||||
while next_group:
|
||||
next_group = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
keyvalues={"state_group": next_group},
|
||||
retcol="prev_state_group",
|
||||
allow_none=True,
|
||||
)
|
||||
if next_group:
|
||||
count += 1
|
||||
|
||||
return count
|
||||
|
||||
def _get_state_groups_from_groups_txn(
|
||||
self, txn, groups, state_filter=StateFilter.all()
|
||||
):
|
||||
results = {group: {} for group in groups}
|
||||
|
||||
where_clause, where_args = state_filter.make_sql_filter_clause()
|
||||
|
||||
# Unless the filter clause is empty, we're going to append it after an
|
||||
# existing where clause
|
||||
if where_clause:
|
||||
where_clause = " AND (%s)" % (where_clause,)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
# Temporarily disable sequential scans in this transaction. This is
|
||||
# a temporary hack until we can add the right indices in
|
||||
txn.execute("SET LOCAL enable_seqscan=off")
|
||||
|
||||
# The below query walks the state_group tree so that the "state"
|
||||
# table includes all state_groups in the tree. It then joins
|
||||
# against `state_groups_state` to fetch the latest state.
|
||||
# It assumes that previous state groups are always numerically
|
||||
# lesser.
|
||||
# The PARTITION is used to get the event_id in the greatest state
|
||||
# group for the given type, state_key.
|
||||
# This may return multiple rows per (type, state_key), but last_value
|
||||
# should be the same.
|
||||
sql = """
|
||||
WITH RECURSIVE state(state_group) AS (
|
||||
VALUES(?::bigint)
|
||||
UNION ALL
|
||||
SELECT prev_state_group FROM state_group_edges e, state s
|
||||
WHERE s.state_group = e.state_group
|
||||
)
|
||||
SELECT DISTINCT type, state_key, last_value(event_id) OVER (
|
||||
PARTITION BY type, state_key ORDER BY state_group ASC
|
||||
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
|
||||
) AS event_id FROM state_groups_state
|
||||
WHERE state_group IN (
|
||||
SELECT state_group FROM state
|
||||
)
|
||||
"""
|
||||
|
||||
for group in groups:
|
||||
args = [group]
|
||||
args.extend(where_args)
|
||||
|
||||
txn.execute(sql + where_clause, args)
|
||||
for row in txn:
|
||||
typ, state_key, event_id = row
|
||||
key = (typ, state_key)
|
||||
results[group][key] = event_id
|
||||
else:
|
||||
max_entries_returned = state_filter.max_entries_returned()
|
||||
|
||||
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
||||
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
||||
for group in groups:
|
||||
next_group = group
|
||||
|
||||
while next_group:
|
||||
# We did this before by getting the list of group ids, and
|
||||
# then passing that list to sqlite to get latest event for
|
||||
# each (type, state_key). However, that was terribly slow
|
||||
# without the right indices (which we can't add until
|
||||
# after we finish deduping state, which requires this func)
|
||||
args = [next_group]
|
||||
args.extend(where_args)
|
||||
|
||||
txn.execute(
|
||||
"SELECT type, state_key, event_id FROM state_groups_state"
|
||||
" WHERE state_group = ? " + where_clause,
|
||||
args,
|
||||
)
|
||||
results[group].update(
|
||||
((typ, state_key), event_id)
|
||||
for typ, state_key, event_id in txn
|
||||
if (typ, state_key) not in results[group]
|
||||
)
|
||||
|
||||
# If the number of entries in the (type,state_key)->event_id dict
|
||||
# matches the number of (type,state_keys) types we were searching
|
||||
# for, then we must have found them all, so no need to go walk
|
||||
# further down the tree... UNLESS our types filter contained
|
||||
# wildcards (i.e. Nones) in which case we have to do an exhaustive
|
||||
# search
|
||||
if (
|
||||
max_entries_returned is not None
|
||||
and len(results[group]) == max_entries_returned
|
||||
):
|
||||
break
|
||||
|
||||
next_group = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
keyvalues={"state_group": next_group},
|
||||
retcol="prev_state_group",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
# this inherits from EventsWorkerStore because it calls self.get_events
|
||||
class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
class StateGroupWorkerStore(
|
||||
EventsWorkerStore, StateGroupBackgroundUpdateStore, SQLBaseStore
|
||||
):
|
||||
"""The parts of StateGroupStore that can be called from workers.
|
||||
"""
|
||||
|
||||
|
@ -694,107 +844,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
return results
|
||||
|
||||
def _get_state_groups_from_groups_txn(
|
||||
self, txn, groups, state_filter=StateFilter.all()
|
||||
):
|
||||
results = {group: {} for group in groups}
|
||||
|
||||
where_clause, where_args = state_filter.make_sql_filter_clause()
|
||||
|
||||
# Unless the filter clause is empty, we're going to append it after an
|
||||
# existing where clause
|
||||
if where_clause:
|
||||
where_clause = " AND (%s)" % (where_clause,)
|
||||
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
# Temporarily disable sequential scans in this transaction. This is
|
||||
# a temporary hack until we can add the right indices in
|
||||
txn.execute("SET LOCAL enable_seqscan=off")
|
||||
|
||||
# The below query walks the state_group tree so that the "state"
|
||||
# table includes all state_groups in the tree. It then joins
|
||||
# against `state_groups_state` to fetch the latest state.
|
||||
# It assumes that previous state groups are always numerically
|
||||
# lesser.
|
||||
# The PARTITION is used to get the event_id in the greatest state
|
||||
# group for the given type, state_key.
|
||||
# This may return multiple rows per (type, state_key), but last_value
|
||||
# should be the same.
|
||||
sql = """
|
||||
WITH RECURSIVE state(state_group) AS (
|
||||
VALUES(?::bigint)
|
||||
UNION ALL
|
||||
SELECT prev_state_group FROM state_group_edges e, state s
|
||||
WHERE s.state_group = e.state_group
|
||||
)
|
||||
SELECT DISTINCT type, state_key, last_value(event_id) OVER (
|
||||
PARTITION BY type, state_key ORDER BY state_group ASC
|
||||
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
|
||||
) AS event_id FROM state_groups_state
|
||||
WHERE state_group IN (
|
||||
SELECT state_group FROM state
|
||||
)
|
||||
"""
|
||||
|
||||
for group in groups:
|
||||
args = [group]
|
||||
args.extend(where_args)
|
||||
|
||||
txn.execute(sql + where_clause, args)
|
||||
for row in txn:
|
||||
typ, state_key, event_id = row
|
||||
key = (typ, state_key)
|
||||
results[group][key] = event_id
|
||||
else:
|
||||
max_entries_returned = state_filter.max_entries_returned()
|
||||
|
||||
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
||||
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
||||
for group in groups:
|
||||
next_group = group
|
||||
|
||||
while next_group:
|
||||
# We did this before by getting the list of group ids, and
|
||||
# then passing that list to sqlite to get latest event for
|
||||
# each (type, state_key). However, that was terribly slow
|
||||
# without the right indices (which we can't add until
|
||||
# after we finish deduping state, which requires this func)
|
||||
args = [next_group]
|
||||
args.extend(where_args)
|
||||
|
||||
txn.execute(
|
||||
"SELECT type, state_key, event_id FROM state_groups_state"
|
||||
" WHERE state_group = ? " + where_clause,
|
||||
args,
|
||||
)
|
||||
results[group].update(
|
||||
((typ, state_key), event_id)
|
||||
for typ, state_key, event_id in txn
|
||||
if (typ, state_key) not in results[group]
|
||||
)
|
||||
|
||||
# If the number of entries in the (type,state_key)->event_id dict
|
||||
# matches the number of (type,state_keys) types we were searching
|
||||
# for, then we must have found them all, so no need to go walk
|
||||
# further down the tree... UNLESS our types filter contained
|
||||
# wildcards (i.e. Nones) in which case we have to do an exhaustive
|
||||
# search
|
||||
if (
|
||||
max_entries_returned is not None
|
||||
and len(results[group]) == max_entries_returned
|
||||
):
|
||||
break
|
||||
|
||||
next_group = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
keyvalues={"state_group": next_group},
|
||||
retcol="prev_state_group",
|
||||
allow_none=True,
|
||||
)
|
||||
|
||||
return results
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_state_for_events(self, event_ids, state_filter=StateFilter.all()):
|
||||
"""Given a list of event_ids and type tuples, return a list of state
|
||||
|
@ -1238,66 +1287,10 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||
|
||||
return self.runInteraction("store_state_group", _store_state_group_txn)
|
||||
|
||||
def _count_state_group_hops_txn(self, txn, state_group):
|
||||
"""Given a state group, count how many hops there are in the tree.
|
||||
|
||||
This is used to ensure the delta chains don't get too long.
|
||||
"""
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
sql = """
|
||||
WITH RECURSIVE state(state_group) AS (
|
||||
VALUES(?::bigint)
|
||||
UNION ALL
|
||||
SELECT prev_state_group FROM state_group_edges e, state s
|
||||
WHERE s.state_group = e.state_group
|
||||
)
|
||||
SELECT count(*) FROM state;
|
||||
"""
|
||||
|
||||
txn.execute(sql, (state_group,))
|
||||
row = txn.fetchone()
|
||||
if row and row[0]:
|
||||
return row[0]
|
||||
else:
|
||||
return 0
|
||||
else:
|
||||
# We don't use WITH RECURSIVE on sqlite3 as there are distributions
|
||||
# that ship with an sqlite3 version that doesn't support it (e.g. wheezy)
|
||||
next_group = state_group
|
||||
count = 0
|
||||
|
||||
while next_group:
|
||||
next_group = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
keyvalues={"state_group": next_group},
|
||||
retcol="prev_state_group",
|
||||
allow_none=True,
|
||||
)
|
||||
if next_group:
|
||||
count += 1
|
||||
|
||||
return count
|
||||
|
||||
|
||||
class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
||||
""" Keeps track of the state at a given event.
|
||||
|
||||
This is done by the concept of `state groups`. Every event is a assigned
|
||||
a state group (identified by an arbitrary string), which references a
|
||||
collection of state events. The current state of an event is then the
|
||||
collection of state events referenced by the event's state group.
|
||||
|
||||
Hence, every change in the current state causes a new state group to be
|
||||
generated. However, if no change happens (e.g., if we get a message event
|
||||
with only one parent it inherits the state group from its parent.)
|
||||
|
||||
There are three tables:
|
||||
* `state_groups`: Stores group name, first event with in the group and
|
||||
room id.
|
||||
* `event_to_state_groups`: Maps events to state groups.
|
||||
* `state_groups_state`: Maps state group to state events.
|
||||
"""
|
||||
class StateBackgroundUpdateStore(
|
||||
StateGroupBackgroundUpdateStore, BackgroundUpdateStore
|
||||
):
|
||||
|
||||
STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
|
||||
STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
|
||||
|
@ -1305,7 +1298,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
|||
EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(StateStore, self).__init__(db_conn, hs)
|
||||
super(StateBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||
self.register_background_update_handler(
|
||||
self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
|
||||
self._background_deduplicate_state,
|
||||
|
@ -1327,34 +1320,6 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
|||
columns=["state_group"],
|
||||
)
|
||||
|
||||
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
|
||||
state_groups = {}
|
||||
for event, context in events_and_contexts:
|
||||
if event.internal_metadata.is_outlier():
|
||||
continue
|
||||
|
||||
# if the event was rejected, just give it the same state as its
|
||||
# predecessor.
|
||||
if context.rejected:
|
||||
state_groups[event.event_id] = context.prev_group
|
||||
continue
|
||||
|
||||
state_groups[event.event_id] = context.state_group
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="event_to_state_groups",
|
||||
values=[
|
||||
{"state_group": state_group_id, "event_id": event_id}
|
||||
for event_id, state_group_id in iteritems(state_groups)
|
||||
],
|
||||
)
|
||||
|
||||
for event_id, state_group_id in iteritems(state_groups):
|
||||
txn.call_after(
|
||||
self._get_state_group_for_event.prefill, (event_id,), state_group_id
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _background_deduplicate_state(self, progress, batch_size):
|
||||
"""This background update will slowly deduplicate state by reencoding
|
||||
|
@ -1527,3 +1492,54 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
|||
yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME)
|
||||
|
||||
return 1
|
||||
|
||||
|
||||
class StateStore(StateGroupWorkerStore, StateBackgroundUpdateStore):
|
||||
""" Keeps track of the state at a given event.
|
||||
|
||||
This is done by the concept of `state groups`. Every event is a assigned
|
||||
a state group (identified by an arbitrary string), which references a
|
||||
collection of state events. The current state of an event is then the
|
||||
collection of state events referenced by the event's state group.
|
||||
|
||||
Hence, every change in the current state causes a new state group to be
|
||||
generated. However, if no change happens (e.g., if we get a message event
|
||||
with only one parent it inherits the state group from its parent.)
|
||||
|
||||
There are three tables:
|
||||
* `state_groups`: Stores group name, first event with in the group and
|
||||
room id.
|
||||
* `event_to_state_groups`: Maps events to state groups.
|
||||
* `state_groups_state`: Maps state group to state events.
|
||||
"""
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(StateStore, self).__init__(db_conn, hs)
|
||||
|
||||
def _store_event_state_mappings_txn(self, txn, events_and_contexts):
|
||||
state_groups = {}
|
||||
for event, context in events_and_contexts:
|
||||
if event.internal_metadata.is_outlier():
|
||||
continue
|
||||
|
||||
# if the event was rejected, just give it the same state as its
|
||||
# predecessor.
|
||||
if context.rejected:
|
||||
state_groups[event.event_id] = context.prev_group
|
||||
continue
|
||||
|
||||
state_groups[event.event_id] = context.state_group
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="event_to_state_groups",
|
||||
values=[
|
||||
{"state_group": state_group_id, "event_id": event_id}
|
||||
for event_id, state_group_id in iteritems(state_groups)
|
||||
],
|
||||
)
|
||||
|
||||
for event_id, state_group_id in iteritems(state_groups):
|
||||
txn.call_after(
|
||||
self._get_state_group_for_event.prefill, (event_id,), state_group_id
|
||||
)
|
||||
|
|
|
@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class StateDeltasStore(SQLBaseStore):
|
||||
def get_current_state_deltas(self, prev_stream_id):
|
||||
def get_current_state_deltas(self, prev_stream_id: int, max_stream_id: int):
|
||||
"""Fetch a list of room state changes since the given stream id
|
||||
|
||||
Each entry in the result contains the following fields:
|
||||
|
@ -36,15 +36,27 @@ class StateDeltasStore(SQLBaseStore):
|
|||
|
||||
Args:
|
||||
prev_stream_id (int): point to get changes since (exclusive)
|
||||
max_stream_id (int): the point that we know has been correctly persisted
|
||||
- ie, an upper limit to return changes from.
|
||||
|
||||
Returns:
|
||||
Deferred[list[dict]]: results
|
||||
Deferred[tuple[int, list[dict]]: A tuple consisting of:
|
||||
- the stream id which these results go up to
|
||||
- list of current_state_delta_stream rows. If it is empty, we are
|
||||
up to date.
|
||||
"""
|
||||
prev_stream_id = int(prev_stream_id)
|
||||
|
||||
# check we're not going backwards
|
||||
assert prev_stream_id <= max_stream_id
|
||||
|
||||
if not self._curr_state_delta_stream_cache.has_any_entity_changed(
|
||||
prev_stream_id
|
||||
):
|
||||
return []
|
||||
# if the CSDs haven't changed between prev_stream_id and now, we
|
||||
# know for certain that they haven't changed between prev_stream_id and
|
||||
# max_stream_id.
|
||||
return max_stream_id, []
|
||||
|
||||
def get_current_state_deltas_txn(txn):
|
||||
# First we calculate the max stream id that will give us less than
|
||||
|
@ -54,21 +66,29 @@ class StateDeltasStore(SQLBaseStore):
|
|||
sql = """
|
||||
SELECT stream_id, count(*)
|
||||
FROM current_state_delta_stream
|
||||
WHERE stream_id > ?
|
||||
WHERE stream_id > ? AND stream_id <= ?
|
||||
GROUP BY stream_id
|
||||
ORDER BY stream_id ASC
|
||||
LIMIT 100
|
||||
"""
|
||||
txn.execute(sql, (prev_stream_id,))
|
||||
txn.execute(sql, (prev_stream_id, max_stream_id))
|
||||
|
||||
total = 0
|
||||
max_stream_id = prev_stream_id
|
||||
for max_stream_id, count in txn:
|
||||
|
||||
for stream_id, count in txn:
|
||||
total += count
|
||||
if total > 100:
|
||||
# We arbitarily limit to 100 entries to ensure we don't
|
||||
# select toooo many.
|
||||
logger.debug(
|
||||
"Clipping current_state_delta_stream rows to stream_id %i",
|
||||
stream_id,
|
||||
)
|
||||
clipped_stream_id = stream_id
|
||||
break
|
||||
else:
|
||||
# if there's no problem, we may as well go right up to the max_stream_id
|
||||
clipped_stream_id = max_stream_id
|
||||
|
||||
# Now actually get the deltas
|
||||
sql = """
|
||||
|
@ -77,8 +97,8 @@ class StateDeltasStore(SQLBaseStore):
|
|||
WHERE ? < stream_id AND stream_id <= ?
|
||||
ORDER BY stream_id ASC
|
||||
"""
|
||||
txn.execute(sql, (prev_stream_id, max_stream_id))
|
||||
return self.cursor_to_dict(txn)
|
||||
txn.execute(sql, (prev_stream_id, clipped_stream_id))
|
||||
return clipped_stream_id, self.cursor_to_dict(txn)
|
||||
|
||||
return self.runInteraction(
|
||||
"get_current_state_deltas", get_current_state_deltas_txn
|
||||
|
|
|
@ -332,6 +332,9 @@ class StatsStore(StateDeltasStore):
|
|||
def _bulk_update_stats_delta_txn(txn):
|
||||
for stats_type, stats_updates in updates.items():
|
||||
for stats_id, fields in stats_updates.items():
|
||||
logger.info(
|
||||
"Updating %s stats for %s: %s", stats_type, stats_id, fields
|
||||
)
|
||||
self._update_stats_delta_txn(
|
||||
txn,
|
||||
ts=ts,
|
||||
|
|
|
@ -32,14 +32,14 @@ logger = logging.getLogger(__name__)
|
|||
TEMP_TABLE = "_temp_populate_user_directory"
|
||||
|
||||
|
||||
class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
||||
class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore):
|
||||
|
||||
# How many records do we calculate before sending it to
|
||||
# add_users_who_share_private_rooms?
|
||||
SHARE_PRIVATE_WORKING_SET = 500
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(UserDirectoryStore, self).__init__(db_conn, hs)
|
||||
super(UserDirectoryBackgroundUpdateStore, self).__init__(db_conn, hs)
|
||||
|
||||
self.server_name = hs.hostname
|
||||
|
||||
|
@ -452,55 +452,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
"update_profile_in_user_dir", _update_profile_in_user_dir_txn
|
||||
)
|
||||
|
||||
def remove_from_user_dir(self, user_id):
|
||||
def _remove_from_user_dir_txn(txn):
|
||||
self._simple_delete_txn(
|
||||
txn, table="user_directory", keyvalues={"user_id": user_id}
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn, table="user_directory_search", keyvalues={"user_id": user_id}
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
table="users_who_share_private_rooms",
|
||||
keyvalues={"user_id": user_id},
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
table="users_who_share_private_rooms",
|
||||
keyvalues={"other_user_id": user_id},
|
||||
)
|
||||
txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
|
||||
|
||||
return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users_in_dir_due_to_room(self, room_id):
|
||||
"""Get all user_ids that are in the room directory because they're
|
||||
in the given room_id
|
||||
"""
|
||||
user_ids_share_pub = yield self._simple_select_onecol(
|
||||
table="users_in_public_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="user_id",
|
||||
desc="get_users_in_dir_due_to_room",
|
||||
)
|
||||
|
||||
user_ids_share_priv = yield self._simple_select_onecol(
|
||||
table="users_who_share_private_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="other_user_id",
|
||||
desc="get_users_in_dir_due_to_room",
|
||||
)
|
||||
|
||||
user_ids = set(user_ids_share_pub)
|
||||
user_ids.update(user_ids_share_priv)
|
||||
|
||||
return user_ids
|
||||
|
||||
def add_users_who_share_private_room(self, room_id, user_id_tuples):
|
||||
"""Insert entries into the users_who_share_private_rooms table. The first
|
||||
user should be a local user.
|
||||
|
@ -551,6 +502,98 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
"add_users_in_public_rooms", _add_users_in_public_rooms_txn
|
||||
)
|
||||
|
||||
def delete_all_from_user_dir(self):
|
||||
"""Delete the entire user directory
|
||||
"""
|
||||
|
||||
def _delete_all_from_user_dir_txn(txn):
|
||||
txn.execute("DELETE FROM user_directory")
|
||||
txn.execute("DELETE FROM user_directory_search")
|
||||
txn.execute("DELETE FROM users_in_public_rooms")
|
||||
txn.execute("DELETE FROM users_who_share_private_rooms")
|
||||
txn.call_after(self.get_user_in_directory.invalidate_all)
|
||||
|
||||
return self.runInteraction(
|
||||
"delete_all_from_user_dir", _delete_all_from_user_dir_txn
|
||||
)
|
||||
|
||||
@cached()
|
||||
def get_user_in_directory(self, user_id):
|
||||
return self._simple_select_one(
|
||||
table="user_directory",
|
||||
keyvalues={"user_id": user_id},
|
||||
retcols=("display_name", "avatar_url"),
|
||||
allow_none=True,
|
||||
desc="get_user_in_directory",
|
||||
)
|
||||
|
||||
def update_user_directory_stream_pos(self, stream_id):
|
||||
return self._simple_update_one(
|
||||
table="user_directory_stream_pos",
|
||||
keyvalues={},
|
||||
updatevalues={"stream_id": stream_id},
|
||||
desc="update_user_directory_stream_pos",
|
||||
)
|
||||
|
||||
|
||||
class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
|
||||
|
||||
# How many records do we calculate before sending it to
|
||||
# add_users_who_share_private_rooms?
|
||||
SHARE_PRIVATE_WORKING_SET = 500
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
super(UserDirectoryStore, self).__init__(db_conn, hs)
|
||||
|
||||
def remove_from_user_dir(self, user_id):
|
||||
def _remove_from_user_dir_txn(txn):
|
||||
self._simple_delete_txn(
|
||||
txn, table="user_directory", keyvalues={"user_id": user_id}
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn, table="user_directory_search", keyvalues={"user_id": user_id}
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn, table="users_in_public_rooms", keyvalues={"user_id": user_id}
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
table="users_who_share_private_rooms",
|
||||
keyvalues={"user_id": user_id},
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
table="users_who_share_private_rooms",
|
||||
keyvalues={"other_user_id": user_id},
|
||||
)
|
||||
txn.call_after(self.get_user_in_directory.invalidate, (user_id,))
|
||||
|
||||
return self.runInteraction("remove_from_user_dir", _remove_from_user_dir_txn)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_users_in_dir_due_to_room(self, room_id):
|
||||
"""Get all user_ids that are in the room directory because they're
|
||||
in the given room_id
|
||||
"""
|
||||
user_ids_share_pub = yield self._simple_select_onecol(
|
||||
table="users_in_public_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="user_id",
|
||||
desc="get_users_in_dir_due_to_room",
|
||||
)
|
||||
|
||||
user_ids_share_priv = yield self._simple_select_onecol(
|
||||
table="users_who_share_private_rooms",
|
||||
keyvalues={"room_id": room_id},
|
||||
retcol="other_user_id",
|
||||
desc="get_users_in_dir_due_to_room",
|
||||
)
|
||||
|
||||
user_ids = set(user_ids_share_pub)
|
||||
user_ids.update(user_ids_share_priv)
|
||||
|
||||
return user_ids
|
||||
|
||||
def remove_user_who_share_room(self, user_id, room_id):
|
||||
"""
|
||||
Deletes entries in the users_who_share_*_rooms table. The first
|
||||
|
@ -637,31 +680,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
|
||||
return [room_id for room_id, in rows]
|
||||
|
||||
def delete_all_from_user_dir(self):
|
||||
"""Delete the entire user directory
|
||||
"""
|
||||
|
||||
def _delete_all_from_user_dir_txn(txn):
|
||||
txn.execute("DELETE FROM user_directory")
|
||||
txn.execute("DELETE FROM user_directory_search")
|
||||
txn.execute("DELETE FROM users_in_public_rooms")
|
||||
txn.execute("DELETE FROM users_who_share_private_rooms")
|
||||
txn.call_after(self.get_user_in_directory.invalidate_all)
|
||||
|
||||
return self.runInteraction(
|
||||
"delete_all_from_user_dir", _delete_all_from_user_dir_txn
|
||||
)
|
||||
|
||||
@cached()
|
||||
def get_user_in_directory(self, user_id):
|
||||
return self._simple_select_one(
|
||||
table="user_directory",
|
||||
keyvalues={"user_id": user_id},
|
||||
retcols=("display_name", "avatar_url"),
|
||||
allow_none=True,
|
||||
desc="get_user_in_directory",
|
||||
)
|
||||
|
||||
def get_user_directory_stream_pos(self):
|
||||
return self._simple_select_one_onecol(
|
||||
table="user_directory_stream_pos",
|
||||
|
@ -670,14 +688,6 @@ class UserDirectoryStore(StateDeltasStore, BackgroundUpdateStore):
|
|||
desc="get_user_directory_stream_pos",
|
||||
)
|
||||
|
||||
def update_user_directory_stream_pos(self, stream_id):
|
||||
return self._simple_update_one(
|
||||
table="user_directory_stream_pos",
|
||||
keyvalues={},
|
||||
updatevalues={"stream_id": stream_id},
|
||||
desc="update_user_directory_stream_pos",
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def search_user_dir(self, user_id, search_term, limit):
|
||||
"""Searches for users in directory
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue