Detect unknown remote devices and mark cache as stale ()

We just mark the fact that the cache may be stale in the database for
now.
This commit is contained in:
Erik Johnston 2020-01-28 14:43:21 +00:00 committed by GitHub
parent a8ce7aeb43
commit e17a110661
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 126 additions and 8 deletions
changelog.d
synapse
handlers
replication/slave/storage
storage/data_stores/main

1
changelog.d/6776.misc Normal file
View file

@ -0,0 +1 @@
Detect unknown remote devices and mark cache as stale.

View file

@ -14,6 +14,7 @@
# limitations under the License.
import logging
from typing import Any, Dict
from canonicaljson import json
@ -65,6 +66,9 @@ class DeviceMessageHandler(object):
logger.warning("Request for keys for non-local user %s", user_id)
raise SynapseError(400, "Not a user here")
if not by_device:
continue
messages_by_device = {
device_id: {
"content": message_content,
@ -73,9 +77,12 @@ class DeviceMessageHandler(object):
}
for device_id, message_content in by_device.items()
}
if messages_by_device:
local_messages[user_id] = messages_by_device
yield self._check_for_unknown_devices(
message_type, sender_user_id, by_device
)
stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
origin, message_id, local_messages
)
@ -84,6 +91,52 @@ class DeviceMessageHandler(object):
"to_device_key", stream_id, users=local_messages.keys()
)
@defer.inlineCallbacks
def _check_for_unknown_devices(
self,
message_type: str,
sender_user_id: str,
by_device: Dict[str, Dict[str, Any]],
):
"""Checks inbound device messages for unkown remote devices, and if
found marks the remote cache for the user as stale.
"""
if message_type != "m.room_key_request":
return
# Get the sending device IDs
requesting_device_ids = set()
for message_content in by_device.values():
device_id = message_content.get("requesting_device_id")
requesting_device_ids.add(device_id)
# Check if we are tracking the devices of the remote user.
room_ids = yield self.store.get_rooms_for_user(sender_user_id)
if not room_ids:
logger.info(
"Received device message from remote device we don't"
" share a room with: %s %s",
sender_user_id,
requesting_device_ids,
)
return
# If we are tracking check that we know about the sending
# devices.
cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id)
unknown_devices = requesting_device_ids - set(cached_devices)
if unknown_devices:
logger.info(
"Received device message from remote device not in our cache: %s %s",
sender_user_id,
unknown_devices,
)
yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
# TODO: Poke something to start trying to refetch user's
# keys.
@defer.inlineCallbacks
def send_device_message(self, sender_user_id, message_type, messages):
set_tag("number_of_messages", len(messages))

View file

@ -742,6 +742,26 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
await self.user_joined_room(user, room_id)
# For encrypted messages we check that we know about the sending device,
# if we don't then we mark the device cache for that user as stale.
if event.type == EventTypes.Encryption:
device_id = event.content.get("device_id")
if device_id is not None:
cached_devices = await self.store.get_cached_devices_for_user(
event.sender
)
if device_id not in cached_devices:
logger.info(
"Received event from remote device not in our cache: %s %s",
event.sender,
device_id,
)
await self.store.mark_remote_user_device_cache_as_stale(
event.sender
)
# TODO: Poke something to start trying to refetch user's
# keys.
@log_function
async def backfill(self, dest, room_id, limit, extremities):
""" Trigger a backfill request to `dest` for the given `room_id`

View file

@ -72,6 +72,6 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
destination, token
)
self._get_cached_devices_for_user.invalidate((user_id,))
self.get_cached_devices_for_user.invalidate((user_id,))
self._get_cached_user_device.invalidate_many((user_id,))
self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))

View file

@ -457,7 +457,7 @@ class DeviceWorkerStore(SQLBaseStore):
device = yield self._get_cached_user_device(user_id, device_id)
results.setdefault(user_id, {})[device_id] = device
else:
results[user_id] = yield self._get_cached_devices_for_user(user_id)
results[user_id] = yield self.get_cached_devices_for_user(user_id)
set_tag("in_cache", results)
set_tag("not_in_cache", user_ids_not_in_cache)
@ -475,12 +475,12 @@ class DeviceWorkerStore(SQLBaseStore):
return db_to_json(content)
@cachedInlineCallbacks()
def _get_cached_devices_for_user(self, user_id):
def get_cached_devices_for_user(self, user_id):
devices = yield self.db.simple_select_list(
table="device_lists_remote_cache",
keyvalues={"user_id": user_id},
retcols=("device_id", "content"),
desc="_get_cached_devices_for_user",
desc="get_cached_devices_for_user",
)
return {
device["device_id"]: db_to_json(device["content"]) for device in devices
@ -641,6 +641,18 @@ class DeviceWorkerStore(SQLBaseStore):
return results
def mark_remote_user_device_cache_as_stale(self, user_id: str):
"""Records that the server has reason to believe the cache of the devices
for the remote users is out of date.
"""
return self.db.simple_upsert(
table="device_lists_remote_resync",
keyvalues={"user_id": user_id},
values={},
insertion_values={"added_ts": self._clock.time_msec()},
desc="make_remote_user_device_cache_as_stale",
)
class DeviceBackgroundUpdateStore(SQLBaseStore):
def __init__(self, database: Database, db_conn, hs):
@ -887,7 +899,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
)
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(
self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
)
@ -902,6 +914,13 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
lock=False,
)
# If we're replacing the remote user's device list cache presumably
# we've done a full resync, so we remove the entry that says we need
# to resync
self.db.simple_delete_txn(
txn, table="device_lists_remote_resync", keyvalues={"user_id": user_id},
)
def update_remote_device_list_cache(self, user_id, devices, stream_id):
"""Replace the entire cache of the remote user's devices.
@ -942,7 +961,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
],
)
txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
txn.call_after(
self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)

View file

@ -0,0 +1,25 @@
/* Copyright 2020 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Records whether the server thinks that the remote users cached device lists
-- may be out of date (e.g. if we have received a to device message from a
-- device we don't know about).
CREATE TABLE IF NOT EXISTS device_lists_remote_resync (
user_id TEXT NOT NULL,
added_ts BIGINT NOT NULL
);
CREATE UNIQUE INDEX device_lists_remote_resync_idx ON device_lists_remote_resync (user_id);
CREATE INDEX device_lists_remote_resync_ts_idx ON device_lists_remote_resync (added_ts);