mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-05-07 22:24:57 -04:00
Stop user directory from failing if it encounters users not in the users
table. (#11053)
The following scenarios would halt the user directory updater: - user joins room - user leaves room - user present in room which switches from private to public, or vice versa. for two classes of users: - appservice senders - users missing from the user table. If this happened, the user directory would be stuck, unable to make forward progress. Exclude both cases from the user directory, so that we ignore them. Co-authored-by: Eric Eastwood <erice@element.io> Co-authored-by: reivilibre <oliverw@matrix.org> Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> Co-authored-by: Brendan Abolivier <babolivier@matrix.org>
This commit is contained in:
parent
1db9282dfa
commit
b83e822556
13 changed files with 918 additions and 90 deletions
|
@ -182,85 +182,87 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
|
|||
)
|
||||
|
||||
@trace(opname="outgoing_replication_request")
|
||||
@outgoing_gauge.track_inprogress()
|
||||
async def send_request(*, instance_name="master", **kwargs):
|
||||
if instance_name == local_instance_name:
|
||||
raise Exception("Trying to send HTTP request to self")
|
||||
if instance_name == "master":
|
||||
host = master_host
|
||||
port = master_port
|
||||
elif instance_name in instance_map:
|
||||
host = instance_map[instance_name].host
|
||||
port = instance_map[instance_name].port
|
||||
else:
|
||||
raise Exception(
|
||||
"Instance %r not in 'instance_map' config" % (instance_name,)
|
||||
with outgoing_gauge.track_inprogress():
|
||||
if instance_name == local_instance_name:
|
||||
raise Exception("Trying to send HTTP request to self")
|
||||
if instance_name == "master":
|
||||
host = master_host
|
||||
port = master_port
|
||||
elif instance_name in instance_map:
|
||||
host = instance_map[instance_name].host
|
||||
port = instance_map[instance_name].port
|
||||
else:
|
||||
raise Exception(
|
||||
"Instance %r not in 'instance_map' config" % (instance_name,)
|
||||
)
|
||||
|
||||
data = await cls._serialize_payload(**kwargs)
|
||||
|
||||
url_args = [
|
||||
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
|
||||
]
|
||||
|
||||
if cls.CACHE:
|
||||
txn_id = random_string(10)
|
||||
url_args.append(txn_id)
|
||||
|
||||
if cls.METHOD == "POST":
|
||||
request_func = client.post_json_get_json
|
||||
elif cls.METHOD == "PUT":
|
||||
request_func = client.put_json
|
||||
elif cls.METHOD == "GET":
|
||||
request_func = client.get_json
|
||||
else:
|
||||
# We have already asserted in the constructor that a
|
||||
# compatible was picked, but lets be paranoid.
|
||||
raise Exception(
|
||||
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
|
||||
)
|
||||
|
||||
uri = "http://%s:%s/_synapse/replication/%s/%s" % (
|
||||
host,
|
||||
port,
|
||||
cls.NAME,
|
||||
"/".join(url_args),
|
||||
)
|
||||
|
||||
data = await cls._serialize_payload(**kwargs)
|
||||
try:
|
||||
# We keep retrying the same request for timeouts. This is so that we
|
||||
# have a good idea that the request has either succeeded or failed
|
||||
# on the master, and so whether we should clean up or not.
|
||||
while True:
|
||||
headers: Dict[bytes, List[bytes]] = {}
|
||||
# Add an authorization header, if configured.
|
||||
if replication_secret:
|
||||
headers[b"Authorization"] = [
|
||||
b"Bearer " + replication_secret
|
||||
]
|
||||
opentracing.inject_header_dict(headers, check_destination=False)
|
||||
try:
|
||||
result = await request_func(uri, data, headers=headers)
|
||||
break
|
||||
except RequestTimedOutError:
|
||||
if not cls.RETRY_ON_TIMEOUT:
|
||||
raise
|
||||
|
||||
url_args = [
|
||||
urllib.parse.quote(kwargs[name], safe="") for name in cls.PATH_ARGS
|
||||
]
|
||||
logger.warning("%s request timed out; retrying", cls.NAME)
|
||||
|
||||
if cls.CACHE:
|
||||
txn_id = random_string(10)
|
||||
url_args.append(txn_id)
|
||||
# If we timed out we probably don't need to worry about backing
|
||||
# off too much, but lets just wait a little anyway.
|
||||
await clock.sleep(1)
|
||||
except HttpResponseException as e:
|
||||
# We convert to SynapseError as we know that it was a SynapseError
|
||||
# on the main process that we should send to the client. (And
|
||||
# importantly, not stack traces everywhere)
|
||||
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
|
||||
raise e.to_synapse_error()
|
||||
except Exception as e:
|
||||
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
|
||||
raise SynapseError(502, "Failed to talk to main process") from e
|
||||
|
||||
if cls.METHOD == "POST":
|
||||
request_func = client.post_json_get_json
|
||||
elif cls.METHOD == "PUT":
|
||||
request_func = client.put_json
|
||||
elif cls.METHOD == "GET":
|
||||
request_func = client.get_json
|
||||
else:
|
||||
# We have already asserted in the constructor that a
|
||||
# compatible was picked, but lets be paranoid.
|
||||
raise Exception(
|
||||
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
|
||||
)
|
||||
|
||||
uri = "http://%s:%s/_synapse/replication/%s/%s" % (
|
||||
host,
|
||||
port,
|
||||
cls.NAME,
|
||||
"/".join(url_args),
|
||||
)
|
||||
|
||||
try:
|
||||
# We keep retrying the same request for timeouts. This is so that we
|
||||
# have a good idea that the request has either succeeded or failed on
|
||||
# the master, and so whether we should clean up or not.
|
||||
while True:
|
||||
headers: Dict[bytes, List[bytes]] = {}
|
||||
# Add an authorization header, if configured.
|
||||
if replication_secret:
|
||||
headers[b"Authorization"] = [b"Bearer " + replication_secret]
|
||||
opentracing.inject_header_dict(headers, check_destination=False)
|
||||
try:
|
||||
result = await request_func(uri, data, headers=headers)
|
||||
break
|
||||
except RequestTimedOutError:
|
||||
if not cls.RETRY_ON_TIMEOUT:
|
||||
raise
|
||||
|
||||
logger.warning("%s request timed out; retrying", cls.NAME)
|
||||
|
||||
# If we timed out we probably don't need to worry about backing
|
||||
# off too much, but lets just wait a little anyway.
|
||||
await clock.sleep(1)
|
||||
except HttpResponseException as e:
|
||||
# We convert to SynapseError as we know that it was a SynapseError
|
||||
# on the main process that we should send to the client. (And
|
||||
# importantly, not stack traces everywhere)
|
||||
_outgoing_request_counter.labels(cls.NAME, e.code).inc()
|
||||
raise e.to_synapse_error()
|
||||
except Exception as e:
|
||||
_outgoing_request_counter.labels(cls.NAME, "ERR").inc()
|
||||
raise SynapseError(502, "Failed to talk to main process") from e
|
||||
|
||||
_outgoing_request_counter.labels(cls.NAME, 200).inc()
|
||||
return result
|
||||
_outgoing_request_counter.labels(cls.NAME, 200).inc()
|
||||
return result
|
||||
|
||||
return send_request
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue