From 7318690e2c812539f4eba8e96964545ad364a992 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 7 May 2019 18:37:32 +0200 Subject: [PATCH] daemon: Buffer the messages endpoint if there are undecryptable messages. --- pantalaimon/client.py | 4 ++-- pantalaimon/daemon.py | 26 +++++++++++++++++--------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/pantalaimon/client.py b/pantalaimon/client.py index ee298d5..60f50a7 100644 --- a/pantalaimon/client.py +++ b/pantalaimon/client.py @@ -244,7 +244,7 @@ class PanClient(AsyncClient): return False - def decrypt_messages_body(self, body): + def decrypt_messages_body(self, body, ignore_failures=True): # type: (Dict[Any, Any]) -> Dict[Any, Any] """Go through a messages response and decrypt megolm encrypted events. @@ -267,7 +267,7 @@ class PanClient(AsyncClient): "\n{}".format(pformat(event))) continue - self.pan_decrypt_event(event) + self.pan_decrypt_event(event, ignore_failures=ignore_failures) return body diff --git a/pantalaimon/daemon.py b/pantalaimon/daemon.py index a876d3c..2046029 100755 --- a/pantalaimon/daemon.py +++ b/pantalaimon/daemon.py @@ -471,14 +471,18 @@ class ProxyDaemon: }) ) - async def decrypt_sync(self, client, sync_body): - """Try to decrypt the sync body.""" - async def decrypt_loop(client, sync_body): + async def decrypt_body(self, client, body, sync=True): + """Try to decrypt the a sync or messages body.""" + decryption_method = ( + client.decrypt_sync_body if sync else client.decrypt_messages_body + ) + + async def decrypt_loop(client, body): while True: try: logger.info("Trying to decrypt sync") - return client.decrypt_sync_body( - sync_body, + return decryption_method( + body, ignore_failures=False ) except EncryptionError: @@ -489,12 +493,12 @@ class ProxyDaemon: try: return await asyncio.wait_for( - decrypt_loop(client, sync_body), + decrypt_loop(client, body), timeout=self.decryption_timeout) except asyncio.TimeoutError: logger.info("Decryption attempt timed out, decrypting with " "failures") - return client.decrypt_sync_body(sync_body, ignore_failures=True) + return decryption_method(body, ignore_failures=True) async def sync(self, request): access_token = self.get_access_token(request) @@ -534,7 +538,7 @@ class ProxyDaemon: if response.status == 200: try: json_response = await response.json() - json_response = await self.decrypt_sync(client, json_response) + json_response = await self.decrypt_body(client, json_response) return web.Response( status=response.status, @@ -569,7 +573,11 @@ class ProxyDaemon: if response.status == 200: try: json_response = await response.json() - json_response = client.decrypt_messages_body(json_response) + json_response = await self.decrypt_body( + client, + json_response, + sync=False + ) return web.Response( status=response.status,