mirror of
https://github.com/matrix-org/pantalaimon.git
synced 2025-03-19 21:06:01 -04:00
daemon: Buffer the messages endpoint if there are undecryptable messages.
This commit is contained in:
parent
400c299187
commit
7318690e2c
@ -244,7 +244,7 @@ class PanClient(AsyncClient):
|
||||
|
||||
return False
|
||||
|
||||
def decrypt_messages_body(self, body):
|
||||
def decrypt_messages_body(self, body, ignore_failures=True):
|
||||
# type: (Dict[Any, Any]) -> Dict[Any, Any]
|
||||
"""Go through a messages response and decrypt megolm encrypted events.
|
||||
|
||||
@ -267,7 +267,7 @@ class PanClient(AsyncClient):
|
||||
"\n{}".format(pformat(event)))
|
||||
continue
|
||||
|
||||
self.pan_decrypt_event(event)
|
||||
self.pan_decrypt_event(event, ignore_failures=ignore_failures)
|
||||
|
||||
return body
|
||||
|
||||
|
@ -471,14 +471,18 @@ class ProxyDaemon:
|
||||
})
|
||||
)
|
||||
|
||||
async def decrypt_sync(self, client, sync_body):
|
||||
"""Try to decrypt the sync body."""
|
||||
async def decrypt_loop(client, sync_body):
|
||||
async def decrypt_body(self, client, body, sync=True):
|
||||
"""Try to decrypt the a sync or messages body."""
|
||||
decryption_method = (
|
||||
client.decrypt_sync_body if sync else client.decrypt_messages_body
|
||||
)
|
||||
|
||||
async def decrypt_loop(client, body):
|
||||
while True:
|
||||
try:
|
||||
logger.info("Trying to decrypt sync")
|
||||
return client.decrypt_sync_body(
|
||||
sync_body,
|
||||
return decryption_method(
|
||||
body,
|
||||
ignore_failures=False
|
||||
)
|
||||
except EncryptionError:
|
||||
@ -489,12 +493,12 @@ class ProxyDaemon:
|
||||
|
||||
try:
|
||||
return await asyncio.wait_for(
|
||||
decrypt_loop(client, sync_body),
|
||||
decrypt_loop(client, body),
|
||||
timeout=self.decryption_timeout)
|
||||
except asyncio.TimeoutError:
|
||||
logger.info("Decryption attempt timed out, decrypting with "
|
||||
"failures")
|
||||
return client.decrypt_sync_body(sync_body, ignore_failures=True)
|
||||
return decryption_method(body, ignore_failures=True)
|
||||
|
||||
async def sync(self, request):
|
||||
access_token = self.get_access_token(request)
|
||||
@ -534,7 +538,7 @@ class ProxyDaemon:
|
||||
if response.status == 200:
|
||||
try:
|
||||
json_response = await response.json()
|
||||
json_response = await self.decrypt_sync(client, json_response)
|
||||
json_response = await self.decrypt_body(client, json_response)
|
||||
|
||||
return web.Response(
|
||||
status=response.status,
|
||||
@ -569,7 +573,11 @@ class ProxyDaemon:
|
||||
if response.status == 200:
|
||||
try:
|
||||
json_response = await response.json()
|
||||
json_response = client.decrypt_messages_body(json_response)
|
||||
json_response = await self.decrypt_body(
|
||||
client,
|
||||
json_response,
|
||||
sync=False
|
||||
)
|
||||
|
||||
return web.Response(
|
||||
status=response.status,
|
||||
|
Loading…
x
Reference in New Issue
Block a user