Make handle_new_client_event throws PartialStateConflictError (#14665)

Then adapts calling code to retry when needed so it doesn't 500
to clients.

Signed-off-by: Mathieu Velten <mathieuv@matrix.org>
Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>
This commit is contained in:
Mathieu Velten 2022-12-15 17:04:23 +01:00 committed by GitHub
parent 046320b9b6
commit 54c012c5a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 366 additions and 245 deletions

1
changelog.d/14665.misc Normal file
View File

@ -0,0 +1 @@
Change `handle_new_client_event` signature so that a 429 does not reach clients on `PartialStateConflictError`, and internally retry when needed instead.

View File

@ -1343,32 +1343,53 @@ class FederationHandler:
) )
EventValidator().validate_builder(builder) EventValidator().validate_builder(builder)
event, context = await self.event_creation_handler.create_new_client_event(
builder=builder
)
event, context = await self.add_display_name_to_third_party_invite( # Try several times, it could fail with PartialStateConflictError
room_version_obj, event_dict, event, context # in send_membership_event, cf comment in except block.
) max_retries = 5
for i in range(max_retries):
try:
(
event,
context,
) = await self.event_creation_handler.create_new_client_event(
builder=builder
)
EventValidator().validate_new(event, self.config) event, context = await self.add_display_name_to_third_party_invite(
room_version_obj, event_dict, event, context
)
# We need to tell the transaction queue to send this out, even EventValidator().validate_new(event, self.config)
# though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = self.hs.hostname
try: # We need to tell the transaction queue to send this out, even
validate_event_for_room_version(event) # though the sender isn't a local user.
await self._event_auth_handler.check_auth_rules_from_context(event) event.internal_metadata.send_on_behalf_of = self.hs.hostname
except AuthError as e:
logger.warning("Denying new third party invite %r because %s", event, e)
raise e
await self._check_signature(event, context) try:
validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(
event
)
except AuthError as e:
logger.warning(
"Denying new third party invite %r because %s", event, e
)
raise e
# We retrieve the room member handler here as to not cause a cyclic dependency await self._check_signature(event, context)
member_handler = self.hs.get_room_member_handler()
await member_handler.send_membership_event(None, event, context) # We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler()
await member_handler.send_membership_event(None, event, context)
break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass
else: else:
destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)} destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)}
@ -1400,28 +1421,46 @@ class FederationHandler:
room_version_obj, event_dict room_version_obj, event_dict
) )
event, context = await self.event_creation_handler.create_new_client_event( # Try several times, it could fail with PartialStateConflictError
builder=builder # in send_membership_event, cf comment in except block.
) max_retries = 5
event, context = await self.add_display_name_to_third_party_invite( for i in range(max_retries):
room_version_obj, event_dict, event, context try:
) (
event,
context,
) = await self.event_creation_handler.create_new_client_event(
builder=builder
)
event, context = await self.add_display_name_to_third_party_invite(
room_version_obj, event_dict, event, context
)
try: try:
validate_event_for_room_version(event) validate_event_for_room_version(event)
await self._event_auth_handler.check_auth_rules_from_context(event) await self._event_auth_handler.check_auth_rules_from_context(event)
except AuthError as e: except AuthError as e:
logger.warning("Denying third party invite %r because %s", event, e) logger.warning("Denying third party invite %r because %s", event, e)
raise e raise e
await self._check_signature(event, context) await self._check_signature(event, context)
# We need to tell the transaction queue to send this out, even # We need to tell the transaction queue to send this out, even
# though the sender isn't a local user. # though the sender isn't a local user.
event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender) event.internal_metadata.send_on_behalf_of = get_domain_from_id(
event.sender
)
# We retrieve the room member handler here as to not cause a cyclic dependency # We retrieve the room member handler here as to not cause a cyclic dependency
member_handler = self.hs.get_room_member_handler() member_handler = self.hs.get_room_member_handler()
await member_handler.send_membership_event(None, event, context) await member_handler.send_membership_event(None, event, context)
break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass
async def add_display_name_to_third_party_invite( async def add_display_name_to_third_party_invite(
self, self,

View File

@ -37,7 +37,6 @@ from synapse.api.errors import (
AuthError, AuthError,
Codes, Codes,
ConsentNotGivenError, ConsentNotGivenError,
LimitExceededError,
NotFoundError, NotFoundError,
ShadowBanError, ShadowBanError,
SynapseError, SynapseError,
@ -999,60 +998,73 @@ class EventCreationHandler:
event.internal_metadata.stream_ordering, event.internal_metadata.stream_ordering,
) )
event, context = await self.create_event( # Try several times, it could fail with PartialStateConflictError
requester, # in handle_new_client_event, cf comment in except block.
event_dict, max_retries = 5
txn_id=txn_id, for i in range(max_retries):
allow_no_prev_events=allow_no_prev_events, try:
prev_event_ids=prev_event_ids, event, context = await self.create_event(
state_event_ids=state_event_ids, requester,
outlier=outlier, event_dict,
historical=historical, txn_id=txn_id,
depth=depth, allow_no_prev_events=allow_no_prev_events,
) prev_event_ids=prev_event_ids,
state_event_ids=state_event_ids,
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( outlier=outlier,
event.sender, historical=historical,
) depth=depth,
spam_check_result = await self.spam_checker.check_event_for_spam(event)
if spam_check_result != self.spam_checker.NOT_SPAM:
if isinstance(spam_check_result, tuple):
try:
[code, dict] = spam_check_result
raise SynapseError(
403,
"This message had been rejected as probable spam",
code,
dict,
)
except ValueError:
logger.error(
"Spam-check module returned invalid error value. Expecting [code, dict], got %s",
spam_check_result,
)
raise SynapseError(
403,
"This message has been rejected as probable spam",
Codes.FORBIDDEN,
)
# Backwards compatibility: if the return value is not an error code, it
# means the module returned an error message to be included in the
# SynapseError (which is now deprecated).
raise SynapseError(
403,
spam_check_result,
Codes.FORBIDDEN,
) )
ev = await self.handle_new_client_event( assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
requester=requester, event.sender,
events_and_context=[(event, context)], )
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban, spam_check_result = await self.spam_checker.check_event_for_spam(event)
) if spam_check_result != self.spam_checker.NOT_SPAM:
if isinstance(spam_check_result, tuple):
try:
[code, dict] = spam_check_result
raise SynapseError(
403,
"This message had been rejected as probable spam",
code,
dict,
)
except ValueError:
logger.error(
"Spam-check module returned invalid error value. Expecting [code, dict], got %s",
spam_check_result,
)
raise SynapseError(
403,
"This message has been rejected as probable spam",
Codes.FORBIDDEN,
)
# Backwards compatibility: if the return value is not an error code, it
# means the module returned an error message to be included in the
# SynapseError (which is now deprecated).
raise SynapseError(
403,
spam_check_result,
Codes.FORBIDDEN,
)
ev = await self.handle_new_client_event(
requester=requester,
events_and_context=[(event, context)],
ratelimit=ratelimit,
ignore_shadow_ban=ignore_shadow_ban,
)
break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass
# we know it was persisted, so must have a stream ordering # we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering assert ev.internal_metadata.stream_ordering
@ -1356,7 +1368,7 @@ class EventCreationHandler:
Raises: Raises:
ShadowBanError if the requester has been shadow-banned. ShadowBanError if the requester has been shadow-banned.
SynapseError(503) if attempting to persist a partial state event in PartialStateConflictError if attempting to persist a partial state event in
a room that has been un-partial stated. a room that has been un-partial stated.
""" """
extra_users = extra_users or [] extra_users = extra_users or []
@ -1418,34 +1430,23 @@ class EventCreationHandler:
# We now persist the event (and update the cache in parallel, since we # We now persist the event (and update the cache in parallel, since we
# don't want to block on it). # don't want to block on it).
event, context = events_and_context[0] event, context = events_and_context[0]
try: result, _ = await make_deferred_yieldable(
result, _ = await make_deferred_yieldable( gather_results(
gather_results( (
( run_in_background(
run_in_background( self._persist_events,
self._persist_events, requester=requester,
requester=requester, events_and_context=events_and_context,
events_and_context=events_and_context, ratelimit=ratelimit,
ratelimit=ratelimit, extra_users=extra_users,
extra_users=extra_users,
),
run_in_background(
self.cache_joined_hosts_for_events, events_and_context
).addErrback(
log_failure, "cache_joined_hosts_for_event failed"
),
), ),
consumeErrors=True, run_in_background(
) self.cache_joined_hosts_for_events, events_and_context
).addErrback(unwrapFirstError) ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
except PartialStateConflictError as e: ),
# The event context needs to be recomputed. consumeErrors=True,
# Turn the error into a 429, as a hint to the client to try again.
logger.info(
"Room %s was un-partial stated while persisting client event.",
event.room_id,
) )
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) ).addErrback(unwrapFirstError)
return result return result
@ -2012,26 +2013,39 @@ class EventCreationHandler:
for user_id in members: for user_id in members:
requester = create_requester(user_id, authenticated_entity=self.server_name) requester = create_requester(user_id, authenticated_entity=self.server_name)
try: try:
event, context = await self.create_event( # Try several times, it could fail with PartialStateConflictError
requester, # in handle_new_client_event, cf comment in except block.
{ max_retries = 5
"type": EventTypes.Dummy, for i in range(max_retries):
"content": {}, try:
"room_id": room_id, event, context = await self.create_event(
"sender": user_id, requester,
}, {
) "type": EventTypes.Dummy,
"content": {},
"room_id": room_id,
"sender": user_id,
},
)
event.internal_metadata.proactively_send = False event.internal_metadata.proactively_send = False
# Since this is a dummy-event it is OK if it is sent by a # Since this is a dummy-event it is OK if it is sent by a
# shadow-banned user. # shadow-banned user.
await self.handle_new_client_event( await self.handle_new_client_event(
requester, requester,
events_and_context=[(event, context)], events_and_context=[(event, context)],
ratelimit=False, ratelimit=False,
ignore_shadow_ban=True, ignore_shadow_ban=True,
) )
break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass
return True return True
except AuthError: except AuthError:
logger.info( logger.info(

View File

@ -62,6 +62,7 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.handlers.relations import BundledAggregations from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin from synapse.rest.admin._base import assert_user_is_admin
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.streams import EventSource from synapse.streams import EventSource
from synapse.types import ( from synapse.types import (
JsonDict, JsonDict,
@ -207,46 +208,64 @@ class RoomCreationHandler:
new_room_id = self._generate_room_id() new_room_id = self._generate_room_id()
# Check whether the user has the power level to carry out the upgrade. # Try several times, it could fail with PartialStateConflictError
# `check_auth_rules_from_context` will check that they are in the room and have # in _upgrade_room, cf comment in except block.
# the required power level to send the tombstone event. max_retries = 5
( for i in range(max_retries):
tombstone_event, try:
tombstone_context, # Check whether the user has the power level to carry out the upgrade.
) = await self.event_creation_handler.create_event( # `check_auth_rules_from_context` will check that they are in the room and have
requester, # the required power level to send the tombstone event.
{ (
"type": EventTypes.Tombstone, tombstone_event,
"state_key": "", tombstone_context,
"room_id": old_room_id, ) = await self.event_creation_handler.create_event(
"sender": user_id, requester,
"content": { {
"body": "This room has been replaced", "type": EventTypes.Tombstone,
"replacement_room": new_room_id, "state_key": "",
}, "room_id": old_room_id,
}, "sender": user_id,
) "content": {
validate_event_for_room_version(tombstone_event) "body": "This room has been replaced",
await self._event_auth_handler.check_auth_rules_from_context(tombstone_event) "replacement_room": new_room_id,
},
},
)
validate_event_for_room_version(tombstone_event)
await self._event_auth_handler.check_auth_rules_from_context(
tombstone_event
)
# Upgrade the room # Upgrade the room
# #
# If this user has sent multiple upgrade requests for the same room # If this user has sent multiple upgrade requests for the same room
# and one of them is not complete yet, cache the response and # and one of them is not complete yet, cache the response and
# return it to all subsequent requests # return it to all subsequent requests
ret = await self._upgrade_response_cache.wrap( ret = await self._upgrade_response_cache.wrap(
(old_room_id, user_id), (old_room_id, user_id),
self._upgrade_room, self._upgrade_room,
requester, requester,
old_room_id, old_room_id,
old_room, # args for _upgrade_room old_room, # args for _upgrade_room
new_room_id, new_room_id,
new_version, new_version,
tombstone_event, tombstone_event,
tombstone_context, tombstone_context,
) )
return ret return ret
except PartialStateConflictError as e:
# Clean up the cache so we can retry properly
self._upgrade_response_cache.unset((old_room_id, user_id))
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass
# This is to satisfy mypy and should never happen
raise PartialStateConflictError()
async def _upgrade_room( async def _upgrade_room(
self, self,

View File

@ -375,6 +375,8 @@ class RoomBatchHandler:
# Events are sorted by (topological_ordering, stream_ordering) # Events are sorted by (topological_ordering, stream_ordering)
# where topological_ordering is just depth. # where topological_ordering is just depth.
for (event, context) in reversed(events_to_persist): for (event, context) in reversed(events_to_persist):
# This call can't raise `PartialStateConflictError` since we forbid
# use of the historical batch API during partial state
await self.event_creation_handler.handle_new_client_event( await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service( await self.create_requester_for_user_id_from_app_service(
event.sender, app_service_requester.app_service event.sender, app_service_requester.app_service

View File

@ -34,6 +34,7 @@ from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.logging import opentracing from synapse.logging import opentracing
from synapse.module_api import NOT_SPAM from synapse.module_api import NOT_SPAM
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.types import ( from synapse.types import (
JsonDict, JsonDict,
Requester, Requester,
@ -392,60 +393,81 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
event_pos = await self.store.get_position_for_event(existing_event_id) event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream return existing_event_id, event_pos.stream
event, context = await self.event_creation_handler.create_event( # Try several times, it could fail with PartialStateConflictError,
requester, # in handle_new_client_event, cf comment in except block.
{ max_retries = 5
"type": EventTypes.Member, for i in range(max_retries):
"content": content, try:
"room_id": room_id, event, context = await self.event_creation_handler.create_event(
"sender": requester.user.to_string(), requester,
"state_key": user_id, {
# For backwards compatibility: "type": EventTypes.Member,
"membership": membership, "content": content,
"origin_server_ts": origin_server_ts, "room_id": room_id,
}, "sender": requester.user.to_string(),
txn_id=txn_id, "state_key": user_id,
allow_no_prev_events=allow_no_prev_events, # For backwards compatibility:
prev_event_ids=prev_event_ids, "membership": membership,
state_event_ids=state_event_ids, "origin_server_ts": origin_server_ts,
depth=depth, },
require_consent=require_consent, txn_id=txn_id,
outlier=outlier, allow_no_prev_events=allow_no_prev_events,
historical=historical, prev_event_ids=prev_event_ids,
) state_event_ids=state_event_ids,
depth=depth,
prev_state_ids = await context.get_prev_state_ids( require_consent=require_consent,
StateFilter.from_types([(EventTypes.Member, None)]) outlier=outlier,
) historical=historical,
prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
if event.membership == Membership.JOIN:
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN
# Only rate-limit if the user actually joined the room, otherwise we'll end
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
await self._join_rate_per_room_limiter.ratelimit(
requester, key=room_id, update=False
) )
with opentracing.start_active_span("handle_new_client_event"):
result_event = await self.event_creation_handler.handle_new_client_event(
requester,
events_and_context=[(event, context)],
extra_users=[target],
ratelimit=ratelimit,
)
if event.membership == Membership.LEAVE: prev_state_ids = await context.get_prev_state_ids(
if prev_member_event_id: StateFilter.from_types([(EventTypes.Member, None)])
prev_member_event = await self.store.get_event(prev_member_event_id) )
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id) prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, user_id), None
)
if event.membership == Membership.JOIN:
newly_joined = True
if prev_member_event_id:
prev_member_event = await self.store.get_event(
prev_member_event_id
)
newly_joined = prev_member_event.membership != Membership.JOIN
# Only rate-limit if the user actually joined the room, otherwise we'll end
# up blocking profile updates.
if newly_joined and ratelimit:
await self._join_rate_limiter_local.ratelimit(requester)
await self._join_rate_per_room_limiter.ratelimit(
requester, key=room_id, update=False
)
with opentracing.start_active_span("handle_new_client_event"):
result_event = (
await self.event_creation_handler.handle_new_client_event(
requester,
events_and_context=[(event, context)],
extra_users=[target],
ratelimit=ratelimit,
)
)
if event.membership == Membership.LEAVE:
if prev_member_event_id:
prev_member_event = await self.store.get_event(
prev_member_event_id
)
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)
break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass
# we know it was persisted, so should have a stream ordering # we know it was persisted, so should have a stream ordering
assert result_event.internal_metadata.stream_ordering assert result_event.internal_metadata.stream_ordering
@ -1234,6 +1256,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: Whether to rate limit this request. ratelimit: Whether to rate limit this request.
Raises: Raises:
SynapseError if there was a problem changing the membership. SynapseError if there was a problem changing the membership.
PartialStateConflictError: if attempting to persist a partial state event in
a room that has been un-partial stated.
""" """
target_user = UserID.from_string(event.state_key) target_user = UserID.from_string(event.state_key)
room_id = event.room_id room_id = event.room_id
@ -1863,21 +1887,37 @@ class RoomMemberMasterHandler(RoomMemberHandler):
list(previous_membership_event.auth_event_ids()) + prev_event_ids list(previous_membership_event.auth_event_ids()) + prev_event_ids
) )
event, context = await self.event_creation_handler.create_event( # Try several times, it could fail with PartialStateConflictError
requester, # in handle_new_client_event, cf comment in except block.
event_dict, max_retries = 5
txn_id=txn_id, for i in range(max_retries):
prev_event_ids=prev_event_ids, try:
auth_event_ids=auth_event_ids, event, context = await self.event_creation_handler.create_event(
outlier=True, requester,
) event_dict,
event.internal_metadata.out_of_band_membership = True txn_id=txn_id,
prev_event_ids=prev_event_ids,
auth_event_ids=auth_event_ids,
outlier=True,
)
event.internal_metadata.out_of_band_membership = True
result_event = (
await self.event_creation_handler.handle_new_client_event(
requester,
events_and_context=[(event, context)],
extra_users=[UserID.from_string(target_user)],
)
)
break
except PartialStateConflictError as e:
# Persisting couldn't happen because the room got un-partial stated
# in the meantime and context needs to be recomputed, so let's do so.
if i == max_retries - 1:
raise e
pass
result_event = await self.event_creation_handler.handle_new_client_event(
requester,
events_and_context=[(event, context)],
extra_users=[UserID.from_string(target_user)],
)
# we know it was persisted, so must have a stream ordering # we know it was persisted, so must have a stream ordering
assert result_event.internal_metadata.stream_ordering assert result_event.internal_metadata.stream_ordering

View File

@ -167,12 +167,10 @@ class ResponseCache(Generic[KV]):
# the should_cache bit, we leave it in the cache for now and schedule # the should_cache bit, we leave it in the cache for now and schedule
# its removal later. # its removal later.
if self.timeout_sec and context.should_cache: if self.timeout_sec and context.should_cache:
self.clock.call_later( self.clock.call_later(self.timeout_sec, self.unset, key)
self.timeout_sec, self._result_cache.pop, key, None
)
else: else:
# otherwise, remove the result immediately. # otherwise, remove the result immediately.
self._result_cache.pop(key, None) self.unset(key)
return r return r
# make sure we do this *after* adding the entry to result_cache, # make sure we do this *after* adding the entry to result_cache,
@ -181,6 +179,14 @@ class ResponseCache(Generic[KV]):
result.addBoth(on_complete) result.addBoth(on_complete)
return entry return entry
def unset(self, key: KV) -> None:
"""Remove the cached value for this key from the cache, if any.
Args:
key: key used to remove the cached value
"""
self._result_cache.pop(key, None)
async def wrap( async def wrap(
self, self,
key: KV, key: KV,