mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-04 01:04:48 -04:00
Merge branch 'release-v1.50' into develop
This commit is contained in:
commit
e7da1ced24
5 changed files with 277 additions and 20 deletions
|
@ -191,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
@trace
|
||||
async def get_device_updates_by_remote(
|
||||
self, destination: str, from_stream_id: int, limit: int
|
||||
) -> Tuple[int, List[Tuple[str, dict]]]:
|
||||
) -> Tuple[int, List[Tuple[str, JsonDict]]]:
|
||||
"""Get a stream of device updates to send to the given remote server.
|
||||
|
||||
Args:
|
||||
|
@ -200,9 +200,10 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
limit: Maximum number of device updates to return
|
||||
|
||||
Returns:
|
||||
A mapping from the current stream id (ie, the stream id of the last
|
||||
update included in the response), and the list of updates, where
|
||||
each update is a pair of EDU type and EDU contents.
|
||||
- The current stream id (i.e. the stream id of the last update included
|
||||
in the response); and
|
||||
- The list of updates, where each update is a pair of EDU type and
|
||||
EDU contents.
|
||||
"""
|
||||
now_stream_id = self.get_device_stream_token()
|
||||
|
||||
|
@ -221,6 +222,9 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
limit,
|
||||
)
|
||||
|
||||
# We need to ensure `updates` doesn't grow too big.
|
||||
# Currently: `len(updates) <= limit`.
|
||||
|
||||
# Return an empty list if there are no updates
|
||||
if not updates:
|
||||
return now_stream_id, []
|
||||
|
@ -270,19 +274,50 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
# The most recent request's opentracing_context is used as the
|
||||
# context which created the Edu.
|
||||
|
||||
# This is the stream ID that we will return for the consumer to resume
|
||||
# following this stream later.
|
||||
last_processed_stream_id = from_stream_id
|
||||
|
||||
query_map = {}
|
||||
cross_signing_keys_by_user = {}
|
||||
for user_id, device_id, update_stream_id, update_context in updates:
|
||||
if (
|
||||
# Calculate the remaining length budget.
|
||||
# Note that, for now, each entry in `cross_signing_keys_by_user`
|
||||
# gives rise to two device updates in the result, so those cost twice
|
||||
# as much (and are the whole reason we need to separately calculate
|
||||
# the budget; we know len(updates) <= limit otherwise!)
|
||||
# N.B. len() on dicts is cheap since they store their size.
|
||||
remaining_length_budget = limit - (
|
||||
len(query_map) + 2 * len(cross_signing_keys_by_user)
|
||||
)
|
||||
assert remaining_length_budget >= 0
|
||||
|
||||
is_master_key_update = (
|
||||
user_id in master_key_by_user
|
||||
and device_id == master_key_by_user[user_id]["device_id"]
|
||||
):
|
||||
result = cross_signing_keys_by_user.setdefault(user_id, {})
|
||||
result["master_key"] = master_key_by_user[user_id]["key_info"]
|
||||
elif (
|
||||
)
|
||||
is_self_signing_key_update = (
|
||||
user_id in self_signing_key_by_user
|
||||
and device_id == self_signing_key_by_user[user_id]["device_id"]
|
||||
)
|
||||
|
||||
is_cross_signing_key_update = (
|
||||
is_master_key_update or is_self_signing_key_update
|
||||
)
|
||||
|
||||
if (
|
||||
is_cross_signing_key_update
|
||||
and user_id not in cross_signing_keys_by_user
|
||||
):
|
||||
# This will give rise to 2 device updates.
|
||||
# If we don't have the budget, stop here!
|
||||
if remaining_length_budget < 2:
|
||||
break
|
||||
|
||||
if is_master_key_update:
|
||||
result = cross_signing_keys_by_user.setdefault(user_id, {})
|
||||
result["master_key"] = master_key_by_user[user_id]["key_info"]
|
||||
elif is_self_signing_key_update:
|
||||
result = cross_signing_keys_by_user.setdefault(user_id, {})
|
||||
result["self_signing_key"] = self_signing_key_by_user[user_id][
|
||||
"key_info"
|
||||
|
@ -290,24 +325,47 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
else:
|
||||
key = (user_id, device_id)
|
||||
|
||||
if key not in query_map and remaining_length_budget < 1:
|
||||
# We don't have space for a new entry
|
||||
break
|
||||
|
||||
previous_update_stream_id, _ = query_map.get(key, (0, None))
|
||||
|
||||
if update_stream_id > previous_update_stream_id:
|
||||
# FIXME If this overwrites an older update, this discards the
|
||||
# previous OpenTracing context.
|
||||
# It might make it harder to track down issues using OpenTracing.
|
||||
# If there's a good reason why it doesn't matter, a comment here
|
||||
# about that would not hurt.
|
||||
query_map[key] = (update_stream_id, update_context)
|
||||
|
||||
# As this update has been added to the response, advance the stream
|
||||
# position.
|
||||
last_processed_stream_id = update_stream_id
|
||||
|
||||
# In the worst case scenario, each update is for a distinct user and is
|
||||
# added either to the query_map or to cross_signing_keys_by_user,
|
||||
# but not both:
|
||||
# len(query_map) + len(cross_signing_keys_by_user) <= len(updates) here,
|
||||
# so len(query_map) + len(cross_signing_keys_by_user) <= limit.
|
||||
|
||||
results = await self._get_device_update_edus_by_remote(
|
||||
destination, from_stream_id, query_map
|
||||
)
|
||||
|
||||
# add the updated cross-signing keys to the results list
|
||||
# len(results) <= len(query_map) here,
|
||||
# so len(results) + len(cross_signing_keys_by_user) <= limit.
|
||||
|
||||
# Add the updated cross-signing keys to the results list
|
||||
for user_id, result in cross_signing_keys_by_user.items():
|
||||
result["user_id"] = user_id
|
||||
results.append(("m.signing_key_update", result))
|
||||
# also send the unstable version
|
||||
# FIXME: remove this when enough servers have upgraded
|
||||
# and remove the length budgeting above.
|
||||
results.append(("org.matrix.signing_key_update", result))
|
||||
|
||||
return now_stream_id, results
|
||||
return last_processed_stream_id, results
|
||||
|
||||
def _get_device_updates_by_remote_txn(
|
||||
self,
|
||||
|
@ -316,7 +374,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
from_stream_id: int,
|
||||
now_stream_id: int,
|
||||
limit: int,
|
||||
):
|
||||
) -> List[Tuple[str, str, int, Optional[str]]]:
|
||||
"""Return device update information for a given remote destination
|
||||
|
||||
Args:
|
||||
|
@ -327,7 +385,11 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
limit: Maximum number of device updates to return
|
||||
|
||||
Returns:
|
||||
List: List of device updates
|
||||
List: List of device update tuples:
|
||||
- user_id
|
||||
- device_id
|
||||
- stream_id
|
||||
- opentracing_context
|
||||
"""
|
||||
# get the list of device updates that need to be sent
|
||||
sql = """
|
||||
|
@ -351,15 +413,21 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
Args:
|
||||
destination: The host the device updates are intended for
|
||||
from_stream_id: The minimum stream_id to filter updates by, exclusive
|
||||
query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
|
||||
user_id/device_id to update stream_id and the relevant json-encoded
|
||||
opentracing context
|
||||
query_map: Dictionary mapping (user_id, device_id) to
|
||||
(update stream_id, the relevant json-encoded opentracing context)
|
||||
|
||||
Returns:
|
||||
List of objects representing an device update EDU
|
||||
List of objects representing a device update EDU.
|
||||
|
||||
Postconditions:
|
||||
The returned list has a length not exceeding that of the query_map:
|
||||
len(result) <= len(query_map)
|
||||
"""
|
||||
devices = (
|
||||
await self.get_e2e_device_keys_and_signatures(
|
||||
# Because these are (user_id, device_id) tuples with all
|
||||
# device_ids not being None, the returned list's length will not
|
||||
# exceed that of query_map.
|
||||
query_map.keys(),
|
||||
include_all_devices=True,
|
||||
include_deleted_devices=True,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue