diff --git a/pantalaimon/client.py b/pantalaimon/client.py index 807c7dc..466f874 100644 --- a/pantalaimon/client.py +++ b/pantalaimon/client.py @@ -54,7 +54,7 @@ class PanClient(AsyncClient): self.key_request_tasks = [] self.add_response_callback( - self.verify_devices, + self.keys_query_cb, KeysQueryResponse ) @@ -90,8 +90,7 @@ class PanClient(AsyncClient): self.key_verificatins_tasks = [] self.key_request_tasks = [] - async def verify_devices(self, response): - # Verify new devices automatically for now. + async def keys_query_cb(self, response): changed_devices = response.changed for user_id, device_dict in changed_devices.items(): diff --git a/pantalaimon/daemon.py b/pantalaimon/daemon.py index 92236d8..d0cf1ac 100755 --- a/pantalaimon/daemon.py +++ b/pantalaimon/daemon.py @@ -13,7 +13,8 @@ from aiohttp import ClientSession, web from aiohttp.client_exceptions import (ContentTypeError, ClientConnectionError) from multidict import CIMultiDict -from nio import EncryptionError, GroupEncryptionError, LoginResponse +from nio import (EncryptionError, GroupEncryptionError, LoginResponse, + MembersSyncError) from pantalaimon.client import PanClient from pantalaimon.log import logger @@ -26,10 +27,14 @@ from pantalaimon.thread_messages import ( DeviceConfirmSasMessage, SasMessage, AcceptSasMessage, - DaemonResponse + DaemonResponse, ) +class SendError(Exception): + pass + + @attr.s class ProxyDaemon: name = attr.ib() @@ -615,6 +620,24 @@ class ProxyDaemon: body=await response.read() ) + async def _send_message(self, client, room_id, msgtype, content, txnid): + """Send a room message, fetch room members and share group sessions.""" + retries = 5 + + for i in range(retries): + try: + return await client.room_send(room_id, msgtype, content, txnid) + except GroupEncryptionError: + await client.share_group_session(room_id) + except MembersSyncError: + await client.joined_members(room_id) + + if client.should_query_keys: + keys_query_response = await client.keys_query() + await client.keys_query_cb(keys_query_response) + + raise SendError("Service Unavailable") + async def send_message(self, request): access_token = self.get_access_token(request) @@ -649,12 +672,12 @@ class ProxyDaemon: return self._not_json try: - response = await client.room_send(room_id, msgtype, content, txnid) - except GroupEncryptionError: - await client.share_group_session(room_id) - response = await client.room_send(room_id, msgtype, content, txnid) + response = await self._send_message(client, room_id, msgtype, + content, txnid) except ClientConnectionError as e: return web.Response(status=500, text=str(e)) + except SendError as e: + return web.Response(status=503, text=str(e)) return web.Response( status=response.transport_response.status,