Merge branch 'develop' of github.com:matrix-org/synapse into erikj/censor_redactions

This commit is contained in:
Erik Johnston 2019-09-05 17:27:46 +01:00
commit 591d82f06b
144 changed files with 2394 additions and 1780 deletions

View file

@ -90,7 +90,7 @@ class AccountDataWorkerStore(SQLBaseStore):
room_data = by_room.setdefault(row["room_id"], {})
room_data[row["account_data_type"]] = json.loads(row["content"])
return (global_account_data, by_room)
return global_account_data, by_room
return self.runInteraction(
"get_account_data_for_user", get_account_data_for_user_txn
@ -205,7 +205,7 @@ class AccountDataWorkerStore(SQLBaseStore):
)
txn.execute(sql, (last_room_id, current_id, limit))
room_results = txn.fetchall()
return (global_results, room_results)
return global_results, room_results
return self.runInteraction(
"get_all_updated_account_data_txn", get_updated_account_data_txn
@ -244,13 +244,13 @@ class AccountDataWorkerStore(SQLBaseStore):
room_account_data = account_data_by_room.setdefault(row[0], {})
room_account_data[row[1]] = json.loads(row[2])
return (global_account_data, account_data_by_room)
return global_account_data, account_data_by_room
changed = self._account_data_stream_cache.has_entity_changed(
user_id, int(stream_id)
)
if not changed:
return ({}, {})
return {}, {}
return self.runInteraction(
"get_updated_account_data_for_user", get_updated_account_data_for_user_txn

View file

@ -165,7 +165,6 @@ class ApplicationServiceTransactionWorkerStore(
)
if result:
return result.get("state")
return
return None
def set_appservice_state(self, service, state):
@ -358,7 +357,7 @@ class ApplicationServiceTransactionWorkerStore(
events = yield self.get_events_as_list(event_ids)
return (upper_bound, events)
return upper_bound, events
class ApplicationServiceTransactionStore(ApplicationServiceTransactionWorkerStore):

View file

@ -19,6 +19,7 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache
@ -66,12 +67,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
messages.append(json.loads(row[1]))
if len(messages) < limit:
stream_pos = current_stream_id
return (messages, stream_pos)
return messages, stream_pos
return self.runInteraction(
"get_new_messages_for_device", get_new_messages_for_device_txn
)
@trace
@defer.inlineCallbacks
def delete_messages_for_device(self, user_id, device_id, up_to_stream_id):
"""
@ -87,11 +89,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), None
)
set_tag("last_deleted_stream_id", last_deleted_stream_id)
if last_deleted_stream_id:
has_changed = self._device_inbox_stream_cache.has_entity_changed(
user_id, last_deleted_stream_id
)
if not has_changed:
log_kv({"message": "No changes in cache since last check"})
return 0
def delete_messages_for_device_txn(txn):
@ -107,6 +113,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
"delete_messages_for_device", delete_messages_for_device_txn
)
log_kv(
{"message": "deleted {} messages for device".format(count), "count": count}
)
# Update the cache, ensuring that we only ever increase the value
last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
@ -117,6 +127,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
return count
@trace
def get_new_device_msgs_for_remote(
self, destination, last_stream_id, current_stream_id, limit
):
@ -132,16 +143,23 @@ class DeviceInboxWorkerStore(SQLBaseStore):
in the stream the messages got to.
"""
set_tag("destination", destination)
set_tag("last_stream_id", last_stream_id)
set_tag("current_stream_id", current_stream_id)
set_tag("limit", limit)
has_changed = self._device_federation_outbox_stream_cache.has_entity_changed(
destination, last_stream_id
)
if not has_changed or last_stream_id == current_stream_id:
log_kv({"message": "No new messages in stream"})
return defer.succeed(([], current_stream_id))
if limit <= 0:
# This can happen if we run out of room for EDUs in the transaction.
return defer.succeed(([], last_stream_id))
@trace
def get_new_messages_for_remote_destination_txn(txn):
sql = (
"SELECT stream_id, messages_json FROM device_federation_outbox"
@ -156,14 +174,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = row[0]
messages.append(json.loads(row[1]))
if len(messages) < limit:
log_kv({"message": "Set stream position to current position"})
stream_pos = current_stream_id
return (messages, stream_pos)
return messages, stream_pos
return self.runInteraction(
"get_new_device_msgs_for_remote",
get_new_messages_for_remote_destination_txn,
)
@trace
def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
"""Used to delete messages when the remote destination acknowledges
their receipt.
@ -214,6 +234,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, BackgroundUpdateStore):
expiry_ms=30 * 60 * 1000,
)
@trace
@defer.inlineCallbacks
def add_messages_to_device_inbox(
self, local_messages_by_user_then_device, remote_messages_by_destination

View file

@ -23,6 +23,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.logging.opentracing import (
get_active_span_text_map,
set_tag,
trace,
whitelisted_homeserver,
)
@ -94,7 +95,7 @@ class DeviceWorkerStore(SQLBaseStore):
destination, int(from_stream_id)
)
if not has_changed:
return (now_stream_id, [])
return now_stream_id, []
# We retrieve n+1 devices from the list of outbound pokes where n is
# our outbound device update limit. We then check if the very last
@ -117,7 +118,7 @@ class DeviceWorkerStore(SQLBaseStore):
# Return an empty list if there are no updates
if not updates:
return (now_stream_id, [])
return now_stream_id, []
# if we have exceeded the limit, we need to exclude any results with the
# same stream_id as the last row.
@ -167,13 +168,13 @@ class DeviceWorkerStore(SQLBaseStore):
# skip that stream_id and return an empty list, and continue with the next
# stream_id next time.
if not query_map:
return (stream_id_cutoff, [])
return stream_id_cutoff, []
results = yield self._get_device_update_edus_by_remote(
destination, from_stream_id, query_map
)
return (now_stream_id, results)
return now_stream_id, results
def _get_devices_by_remote_txn(
self, txn, destination, from_stream_id, now_stream_id, limit
@ -321,6 +322,7 @@ class DeviceWorkerStore(SQLBaseStore):
def get_device_stream_token(self):
return self._device_list_id_gen.get_current_token()
@trace
@defer.inlineCallbacks
def get_user_devices_from_cache(self, query_list):
"""Get the devices (and keys if any) for remote users from the cache.
@ -352,7 +354,10 @@ class DeviceWorkerStore(SQLBaseStore):
else:
results[user_id] = yield self._get_cached_devices_for_user(user_id)
return (user_ids_not_in_cache, results)
set_tag("in_cache", results)
set_tag("not_in_cache", user_ids_not_in_cache)
return user_ids_not_in_cache, results
@cachedInlineCallbacks(num_args=2, tree=True)
def _get_cached_user_device(self, user_id, device_id):
@ -851,7 +856,7 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
"ts": now,
"opentracing_context": json.dumps(context)
if whitelisted_homeserver(destination)
else None,
else "{}",
}
for destination in hosts
for device_id in device_ids

View file

@ -47,7 +47,6 @@ class DirectoryWorkerStore(SQLBaseStore):
if not room_id:
return None
return
servers = yield self._simple_select_onecol(
"room_alias_servers",
@ -58,7 +57,6 @@ class DirectoryWorkerStore(SQLBaseStore):
if not servers:
return None
return
return RoomAliasMapping(room_id, room_alias.to_string(), servers)

View file

@ -818,7 +818,7 @@ class EventsStore(
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
return (None, None)
return None, None
if len(new_state_groups) == 1 and len(old_state_groups) == 1:
# If we're going from one state group to another, lets check if
@ -835,7 +835,7 @@ class EventsStore(
# the current state in memory then lets also return that,
# but it doesn't matter if we don't.
new_state = state_groups_map.get(new_state_group)
return (new_state, delta_ids)
return new_state, delta_ids
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
@ -847,7 +847,7 @@ class EventsStore(
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
return (state_groups_map[new_state_groups.pop()], None)
return state_groups_map[new_state_groups.pop()], None
# Ok, we need to defer to the state handler to resolve our state sets.
@ -876,7 +876,7 @@ class EventsStore(
state_res_store=StateResolutionStore(self),
)
return (res.state, None)
return res.state, None
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@ -899,7 +899,7 @@ class EventsStore(
if ev_id != existing_state.get(key)
}
return (to_delete, to_insert)
return to_delete, to_insert
@log_function
def _persist_events_txn(
@ -2358,8 +2358,9 @@ class EventsStore(
"room_aliases",
"room_depth",
"room_memberships",
"room_state",
"room_stats",
"room_stats_state",
"room_stats_current",
"room_stats_historical",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",

View file

@ -90,7 +90,7 @@ class PresenceStore(SQLBaseStore):
presence_states,
)
return (stream_orderings[-1], self._presence_id_gen.get_current_token())
return stream_orderings[-1], self._presence_id_gen.get_current_token()
def _update_presence_txn(self, txn, stream_orderings, presence_states):
for stream_id, state in zip(stream_orderings, presence_states):

View file

@ -35,7 +35,6 @@ class ProfileWorkerStore(SQLBaseStore):
if e.code == 404:
# no match
return ProfileInfo(None, None)
return
else:
raise

View file

@ -133,7 +133,7 @@ class PusherWorkerStore(SQLBaseStore):
txn.execute(sql, (last_id, current_id, limit))
deleted = txn.fetchall()
return (updated, deleted)
return updated, deleted
return self.runInteraction(
"get_all_updated_pushers", get_all_updated_pushers_txn

View file

@ -478,7 +478,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
max_persisted_id = self._receipts_id_gen.get_current_token()
return (stream_id, max_persisted_id)
return stream_id, max_persisted_id
def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, data):
return self.runInteraction(

View file

@ -869,6 +869,17 @@ class RegistrationStore(
(user_id_obj.localpart, create_profile_with_displayname),
)
if self.hs.config.stats_enabled:
# we create a new completed user statistics row
# we don't strictly need current_token since this user really can't
# have any state deltas before now (as it is a new user), but still,
# we include it for completeness.
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
self._update_stats_delta_txn(
txn, now, "user", user_id, {}, complete_with_stream_id=current_token
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
@ -1140,6 +1151,7 @@ class RegistrationStore(
deferred str|None: A str representing a link to redirect the user
to if there is one.
"""
# Insert everything into a transaction in order to run atomically
def validate_threepid_session_txn(txn):
row = self._simple_select_one_txn(

View file

@ -112,29 +112,31 @@ class RoomMemberWorkerStore(EventsWorkerStore):
@cached(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT state_key FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
"""
else:
sql = """
SELECT state_key FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""
return self.runInteraction(
"get_users_in_room", self.get_users_in_room_txn, room_id
)
txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn]
def get_users_in_room_txn(self, txn, room_id):
# If we can assume current_state_events.membership is up to date
# then we can avoid a join, which is a Very Good Thing given how
# frequently this function gets called.
if self._current_state_events_membership_up_to_date:
sql = """
SELECT state_key FROM current_state_events
WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
"""
else:
sql = """
SELECT state_key FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""
return self.runInteraction("get_users_in_room", f)
txn.execute(sql, (room_id, Membership.JOIN))
return [to_ascii(r[0]) for r in txn]
@cached(max_entries=100000)
def get_room_summary(self, room_id):

View file

@ -0,0 +1,152 @@
/* Copyright 2018 New Vector Ltd
* Copyright 2019 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
----- First clean up from previous versions of room stats.
-- First remove old stats stuff
DROP TABLE IF EXISTS room_stats;
DROP TABLE IF EXISTS room_state;
DROP TABLE IF EXISTS room_stats_state;
DROP TABLE IF EXISTS user_stats;
DROP TABLE IF EXISTS room_stats_earliest_tokens;
DROP TABLE IF EXISTS _temp_populate_stats_position;
DROP TABLE IF EXISTS _temp_populate_stats_rooms;
DROP TABLE IF EXISTS stats_stream_pos;
-- Unschedule old background updates if they're still scheduled
DELETE FROM background_updates WHERE update_name IN (
'populate_stats_createtables',
'populate_stats_process_rooms',
'populate_stats_process_users',
'populate_stats_cleanup'
);
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_rooms', '{}', '');
INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
----- Create tables for our version of room stats.
-- single-row table to track position of incremental updates
DROP TABLE IF EXISTS stats_incremental_position;
CREATE TABLE stats_incremental_position (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_id BIGINT NOT NULL,
CHECK (Lock='X')
);
-- insert a null row and make sure it is the only one.
INSERT INTO stats_incremental_position (
stream_id
) SELECT COALESCE(MAX(stream_ordering), 0) from events;
-- represents PRESENT room statistics for a room
-- only holds absolute fields
DROP TABLE IF EXISTS room_stats_current;
CREATE TABLE room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
-- These are absolute counts
current_state_events INT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
banned_members INT NOT NULL,
local_users_in_room INT NOT NULL,
-- The maximum delta stream position that this row takes into account.
completed_delta_stream_id BIGINT NOT NULL
);
-- represents HISTORICAL room statistics for a room
DROP TABLE IF EXISTS room_stats_historical;
CREATE TABLE room_stats_historical (
room_id TEXT NOT NULL,
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
-- Note that end_ts is quantised.
end_ts BIGINT NOT NULL,
bucket_size BIGINT NOT NULL,
-- These stats are absolute counts
current_state_events BIGINT NOT NULL,
joined_members BIGINT NOT NULL,
invited_members BIGINT NOT NULL,
left_members BIGINT NOT NULL,
banned_members BIGINT NOT NULL,
local_users_in_room BIGINT NOT NULL,
-- These stats are per time slice
total_events BIGINT NOT NULL,
total_event_bytes BIGINT NOT NULL,
PRIMARY KEY (room_id, end_ts)
);
-- We use this index to speed up deletion of ancient room stats.
CREATE INDEX room_stats_historical_end_ts ON room_stats_historical (end_ts);
-- represents PRESENT statistics for a user
-- only holds absolute fields
DROP TABLE IF EXISTS user_stats_current;
CREATE TABLE user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
joined_rooms BIGINT NOT NULL,
-- The maximum delta stream position that this row takes into account.
completed_delta_stream_id BIGINT NOT NULL
);
-- represents HISTORICAL statistics for a user
DROP TABLE IF EXISTS user_stats_historical;
CREATE TABLE user_stats_historical (
user_id TEXT NOT NULL,
end_ts BIGINT NOT NULL,
bucket_size BIGINT NOT NULL,
joined_rooms BIGINT NOT NULL,
invites_sent BIGINT NOT NULL,
rooms_created BIGINT NOT NULL,
total_events BIGINT NOT NULL,
total_event_bytes BIGINT NOT NULL,
PRIMARY KEY (user_id, end_ts)
);
-- We use this index to speed up deletion of ancient user stats.
CREATE INDEX user_stats_historical_end_ts ON user_stats_historical (end_ts);
CREATE TABLE room_stats_state (
room_id TEXT NOT NULL,
name TEXT,
canonical_alias TEXT,
join_rules TEXT,
history_visibility TEXT,
encryption TEXT,
avatar TEXT,
guest_access TEXT,
is_federatable BOOLEAN,
topic TEXT
);
CREATE UNIQUE INDEX room_stats_state_room ON room_stats_state(room_id);

File diff suppressed because it is too large Load diff

View file

@ -364,7 +364,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
the chunk of events returned.
"""
if from_key == to_key:
return ([], from_key)
return [], from_key
from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
@ -374,7 +374,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
if not has_changed:
return ([], from_key)
return [], from_key
def f(txn):
sql = (
@ -407,7 +407,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# get.
key = from_key
return (ret, key)
return ret, key
@defer.inlineCallbacks
def get_membership_changes_for_user(self, user_id, from_key, to_key):
@ -496,7 +496,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"""
# Allow a zero limit here, and no-op.
if limit == 0:
return ([], end_token)
return [], end_token
end_token = RoomStreamToken.parse(end_token)
@ -511,7 +511,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# We want to return the results in ascending order.
rows.reverse()
return (rows, token)
return rows, token
def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
@ -783,7 +783,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
events = yield self.get_events_as_list(event_ids)
return (upper_bound, events)
return upper_bound, events
def get_federation_out_pos(self, typ):
return self._simple_select_one_onecol(

View file

@ -195,6 +195,6 @@ class ChainedIdGenerator(object):
with self._lock:
if self._unfinished_ids:
stream_id, chained_id = self._unfinished_ids[0]
return (stream_id - 1, chained_id)
return stream_id - 1, chained_id
return (self._current_max, self.chained_generator.get_current_token())
return self._current_max, self.chained_generator.get_current_token()