mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-04-25 13:49:07 -04:00
Merge remote-tracking branch 'upstream/release-v1.36'
This commit is contained in:
commit
93523869f7
11
CHANGES.md
11
CHANGES.md
@ -1,3 +1,14 @@
|
|||||||
|
Synapse 1.36.0rc2 (2021-06-11)
|
||||||
|
==============================
|
||||||
|
|
||||||
|
Bugfixes
|
||||||
|
--------
|
||||||
|
|
||||||
|
- Fix a bug which caused presence updates to stop working some time after a restart, when using a presence writer worker. Broke in v1.33.0. ([\#10149](https://github.com/matrix-org/synapse/issues/10149))
|
||||||
|
- Fix a bug when using federation sender worker where it would send out more presence updates than necessary, leading to high resource usage. Broke in v1.33.0. ([\#10163](https://github.com/matrix-org/synapse/issues/10163))
|
||||||
|
- Fix a bug where Synapse could send the same presence update to a remote twice. ([\#10165](https://github.com/matrix-org/synapse/issues/10165))
|
||||||
|
|
||||||
|
|
||||||
Synapse 1.36.0rc1 (2021-06-08)
|
Synapse 1.36.0rc1 (2021-06-08)
|
||||||
==============================
|
==============================
|
||||||
|
|
||||||
|
@ -47,7 +47,7 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
__version__ = "1.36.0rc1"
|
__version__ = "1.36.0rc2"
|
||||||
|
|
||||||
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
|
||||||
# We import here so that we don't have to install a bunch of deps when
|
# We import here so that we don't have to install a bunch of deps when
|
||||||
|
@ -299,14 +299,14 @@ class BasePresenceHandler(abc.ABC):
|
|||||||
if not states:
|
if not states:
|
||||||
return
|
return
|
||||||
|
|
||||||
hosts_and_states = await get_interested_remotes(
|
hosts_to_states = await get_interested_remotes(
|
||||||
self.store,
|
self.store,
|
||||||
self.presence_router,
|
self.presence_router,
|
||||||
states,
|
states,
|
||||||
)
|
)
|
||||||
|
|
||||||
for destinations, states in hosts_and_states:
|
for destination, host_states in hosts_to_states.items():
|
||||||
self._federation.send_presence_to_destinations(states, destinations)
|
self._federation.send_presence_to_destinations(host_states, [destination])
|
||||||
|
|
||||||
async def send_full_presence_to_users(self, user_ids: Collection[str]):
|
async def send_full_presence_to_users(self, user_ids: Collection[str]):
|
||||||
"""
|
"""
|
||||||
@ -495,9 +495,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||||||
users=users_to_states.keys(),
|
users=users_to_states.keys(),
|
||||||
)
|
)
|
||||||
|
|
||||||
# If this is a federation sender, notify about presence updates.
|
|
||||||
await self.maybe_send_presence_to_interested_destinations(states)
|
|
||||||
|
|
||||||
async def process_replication_rows(
|
async def process_replication_rows(
|
||||||
self, stream_name: str, instance_name: str, token: int, rows: list
|
self, stream_name: str, instance_name: str, token: int, rows: list
|
||||||
):
|
):
|
||||||
@ -519,11 +516,27 @@ class WorkerPresenceHandler(BasePresenceHandler):
|
|||||||
for row in rows
|
for row in rows
|
||||||
]
|
]
|
||||||
|
|
||||||
for state in states:
|
# The list of states to notify sync streams and remote servers about.
|
||||||
self.user_to_current_state[state.user_id] = state
|
# This is calculated by comparing the old and new states for each user
|
||||||
|
# using `should_notify(..)`.
|
||||||
|
#
|
||||||
|
# Note that this is necessary as the presence writer will periodically
|
||||||
|
# flush presence state changes that should not be notified about to the
|
||||||
|
# DB, and so will be sent over the replication stream.
|
||||||
|
state_to_notify = []
|
||||||
|
|
||||||
|
for new_state in states:
|
||||||
|
old_state = self.user_to_current_state.get(new_state.user_id)
|
||||||
|
self.user_to_current_state[new_state.user_id] = new_state
|
||||||
|
|
||||||
|
if not old_state or should_notify(old_state, new_state):
|
||||||
|
state_to_notify.append(new_state)
|
||||||
|
|
||||||
stream_id = token
|
stream_id = token
|
||||||
await self.notify_from_replication(states, stream_id)
|
await self.notify_from_replication(state_to_notify, stream_id)
|
||||||
|
|
||||||
|
# If this is a federation sender, notify about presence updates.
|
||||||
|
await self.maybe_send_presence_to_interested_destinations(state_to_notify)
|
||||||
|
|
||||||
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
|
||||||
return [
|
return [
|
||||||
@ -829,15 +842,15 @@ class PresenceHandler(BasePresenceHandler):
|
|||||||
if to_federation_ping:
|
if to_federation_ping:
|
||||||
federation_presence_out_counter.inc(len(to_federation_ping))
|
federation_presence_out_counter.inc(len(to_federation_ping))
|
||||||
|
|
||||||
hosts_and_states = await get_interested_remotes(
|
hosts_to_states = await get_interested_remotes(
|
||||||
self.store,
|
self.store,
|
||||||
self.presence_router,
|
self.presence_router,
|
||||||
list(to_federation_ping.values()),
|
list(to_federation_ping.values()),
|
||||||
)
|
)
|
||||||
|
|
||||||
for destinations, states in hosts_and_states:
|
for destination, states in hosts_to_states.items():
|
||||||
self._federation_queue.send_presence_to_destinations(
|
self._federation_queue.send_presence_to_destinations(
|
||||||
states, destinations
|
states, [destination]
|
||||||
)
|
)
|
||||||
|
|
||||||
async def _handle_timeouts(self) -> None:
|
async def _handle_timeouts(self) -> None:
|
||||||
@ -1962,7 +1975,7 @@ async def get_interested_remotes(
|
|||||||
store: DataStore,
|
store: DataStore,
|
||||||
presence_router: PresenceRouter,
|
presence_router: PresenceRouter,
|
||||||
states: List[UserPresenceState],
|
states: List[UserPresenceState],
|
||||||
) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
|
) -> Dict[str, Set[UserPresenceState]]:
|
||||||
"""Given a list of presence states figure out which remote servers
|
"""Given a list of presence states figure out which remote servers
|
||||||
should be sent which.
|
should be sent which.
|
||||||
|
|
||||||
@ -1974,11 +1987,9 @@ async def get_interested_remotes(
|
|||||||
states: A list of incoming user presence updates.
|
states: A list of incoming user presence updates.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A list of 2-tuples of destinations and states, where for
|
A map from destinations to presence states to send to that destination.
|
||||||
each tuple the list of UserPresenceState should be sent to each
|
|
||||||
destination
|
|
||||||
"""
|
"""
|
||||||
hosts_and_states = [] # type: List[Tuple[Collection[str], List[UserPresenceState]]]
|
hosts_and_states: Dict[str, Set[UserPresenceState]] = {}
|
||||||
|
|
||||||
# First we look up the rooms each user is in (as well as any explicit
|
# First we look up the rooms each user is in (as well as any explicit
|
||||||
# subscriptions), then for each distinct room we look up the remote
|
# subscriptions), then for each distinct room we look up the remote
|
||||||
@ -1990,11 +2001,12 @@ async def get_interested_remotes(
|
|||||||
for room_id, states in room_ids_to_states.items():
|
for room_id, states in room_ids_to_states.items():
|
||||||
user_ids = await store.get_users_in_room(room_id)
|
user_ids = await store.get_users_in_room(room_id)
|
||||||
hosts = {get_domain_from_id(user_id) for user_id in user_ids}
|
hosts = {get_domain_from_id(user_id) for user_id in user_ids}
|
||||||
hosts_and_states.append((hosts, states))
|
for host in hosts:
|
||||||
|
hosts_and_states.setdefault(host, set()).update(states)
|
||||||
|
|
||||||
for user_id, states in users_to_states.items():
|
for user_id, states in users_to_states.items():
|
||||||
host = get_domain_from_id(user_id)
|
host = get_domain_from_id(user_id)
|
||||||
hosts_and_states.append(([host], states))
|
hosts_and_states.setdefault(host, set()).update(states)
|
||||||
|
|
||||||
return hosts_and_states
|
return hosts_and_states
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ class PresenceStore(SQLBaseStore):
|
|||||||
instance_name=self._instance_name,
|
instance_name=self._instance_name,
|
||||||
tables=[("presence_stream", "instance_name", "stream_id")],
|
tables=[("presence_stream", "instance_name", "stream_id")],
|
||||||
sequence_name="presence_stream_sequence",
|
sequence_name="presence_stream_sequence",
|
||||||
writers=hs.config.worker.writers.to_device,
|
writers=hs.config.worker.writers.presence,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self._presence_id_gen = StreamIdGenerator(
|
self._presence_id_gen = StreamIdGenerator(
|
||||||
|
@ -397,6 +397,11 @@ class MultiWriterIdGenerator:
|
|||||||
# ... persist event ...
|
# ... persist event ...
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# If we have a list of instances that are allowed to write to this
|
||||||
|
# stream, make sure we're in it.
|
||||||
|
if self._writers and self._instance_name not in self._writers:
|
||||||
|
raise Exception("Tried to allocate stream ID on non-writer")
|
||||||
|
|
||||||
return _MultiWriterCtxManager(self)
|
return _MultiWriterCtxManager(self)
|
||||||
|
|
||||||
def get_next_mult(self, n: int):
|
def get_next_mult(self, n: int):
|
||||||
@ -406,6 +411,11 @@ class MultiWriterIdGenerator:
|
|||||||
# ... persist events ...
|
# ... persist events ...
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# If we have a list of instances that are allowed to write to this
|
||||||
|
# stream, make sure we're in it.
|
||||||
|
if self._writers and self._instance_name not in self._writers:
|
||||||
|
raise Exception("Tried to allocate stream ID on non-writer")
|
||||||
|
|
||||||
return _MultiWriterCtxManager(self, n)
|
return _MultiWriterCtxManager(self, n)
|
||||||
|
|
||||||
def get_next_txn(self, txn: LoggingTransaction):
|
def get_next_txn(self, txn: LoggingTransaction):
|
||||||
@ -416,6 +426,11 @@ class MultiWriterIdGenerator:
|
|||||||
# ... persist event ...
|
# ... persist event ...
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# If we have a list of instances that are allowed to write to this
|
||||||
|
# stream, make sure we're in it.
|
||||||
|
if self._writers and self._instance_name not in self._writers:
|
||||||
|
raise Exception("Tried to allocate stream ID on non-writer")
|
||||||
|
|
||||||
next_id = self._load_next_id_txn(txn)
|
next_id = self._load_next_id_txn(txn)
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user