Merge remote-tracking branch 'origin/release-v1.75' into develop

This commit is contained in:
Richard van der Hoff 2023-01-12 16:45:23 +00:00
commit 0f061f39f0
5 changed files with 61 additions and 13 deletions

1
changelog.d/14810.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in Synapse 1.75.0rc1 where device lists could be miscalculated with some sync filters.

1
changelog.d/14817.bugfix Normal file
View File

@ -0,0 +1 @@
Fix race where calling `/members` or `/state` with an `at` parameter could fail for newly created rooms, when using multiple workers.

View File

@ -283,9 +283,6 @@ class FilterCollection:
await self._room_filter.filter(events) await self._room_filter.filter(events)
) )
def blocks_all_rooms(self) -> bool:
return self._room_filter.filters_all_rooms()
def blocks_all_presence(self) -> bool: def blocks_all_presence(self) -> bool:
return ( return (
self._presence_filter.filters_all_types() self._presence_filter.filters_all_types()

View File

@ -1793,10 +1793,6 @@ class SyncHandler:
- newly_left_users - newly_left_users
""" """
# If the request doesn't care about rooms then nothing to do!
if sync_result_builder.sync_config.filter_collection.blocks_all_rooms():
return set(), set(), set(), set()
since_token = sync_result_builder.since_token since_token = sync_result_builder.since_token
# 1. Start by fetching all ephemeral events in rooms we've joined (if required). # 1. Start by fetching all ephemeral events in rooms we've joined (if required).

View File

@ -801,14 +801,67 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
before this stream ordering. before this stream ordering.
""" """
last_row = await self.get_room_event_before_stream_ordering( def get_last_event_in_room_before_stream_ordering_txn(
room_id=room_id, txn: LoggingTransaction,
stream_ordering=end_token.stream, ) -> Optional[str]:
# We need to handle the fact that the stream tokens can be vector
# clocks. We do this by getting all rows between the minimum and
# maximum stream ordering in the token, plus one row less than the
# minimum stream ordering. We then filter the results against the
# token and return the first row that matches.
sql = """
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
FROM events
LEFT JOIN rejections USING (event_id)
WHERE room_id = ?
AND ? < stream_ordering AND stream_ordering <= ?
AND NOT outlier
AND rejections.event_id IS NULL
ORDER BY stream_ordering DESC
) AS a
UNION
SELECT * FROM (
SELECT instance_name, stream_ordering, topological_ordering, event_id
FROM events
LEFT JOIN rejections USING (event_id)
WHERE room_id = ?
AND stream_ordering <= ?
AND NOT outlier
AND rejections.event_id IS NULL
ORDER BY stream_ordering DESC
LIMIT 1
) AS b
"""
txn.execute(
sql,
(
room_id,
end_token.stream,
end_token.get_max_stream_pos(),
room_id,
end_token.stream,
),
) )
if last_row:
return last_row[2] for instance_name, stream_ordering, topological_ordering, event_id in txn:
if _filter_results(
lower_token=None,
upper_token=end_token,
instance_name=instance_name,
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
):
return event_id
return None return None
return await self.db_pool.runInteraction(
"get_last_event_in_room_before_stream_ordering",
get_last_event_in_room_before_stream_ordering_txn,
)
async def get_current_room_stream_token_for_room_id( async def get_current_room_stream_token_for_room_id(
self, room_id: str self, room_id: str
) -> RoomStreamToken: ) -> RoomStreamToken: