diff --git a/pantalaimon/client.py b/pantalaimon/client.py index dc0cf15..ad5f366 100644 --- a/pantalaimon/client.py +++ b/pantalaimon/client.py @@ -48,6 +48,16 @@ class PanClient(AsyncClient): self.key_verificatins_tasks = [] self.key_request_tasks = [] + self.add_response_callback( + self.verify_devices, + KeysQueryResponse + ) + + self.add_response_callback( + self.sync_tasks, + SyncResponse + ) + @property def unable_to_decrypt(self): """Room event signaling that the message couldn't be decrypted.""" @@ -65,8 +75,21 @@ class PanClient(AsyncClient): message = InfoMessage(string) await self.queue.put(message) - def verify_devices(self, changed_devices): + async def sync_tasks(self, response): + try: + await asyncio.gather(*self.key_verificatins_tasks) + except LocalProtocolError as e: + logger.info(e) + + await asyncio.gather(*self.key_request_tasks) + + self.key_verificatins_tasks = [] + self.key_request_tasks = [] + + async def verify_devices(self, response): # Verify new devices automatically for now. + changed_devices = response.changed + for user_id, device_dict in changed_devices.items(): for device in device_dict.values(): if device.deleted: @@ -76,6 +99,9 @@ class PanClient(AsyncClient): "user {}".format(device.id, user_id)) self.verify_device(device) + message = DevicesMessage(self.user_id, response.changed) + await self.queue.put(message) + def undecrypted_event_cb(self, room, event): loop = asyncio.get_event_loop() @@ -138,15 +164,23 @@ class PanClient(AsyncClient): The loop can be stopped with the stop_loop() method. """ + assert not self.task + + logger.info(f"Starting sync loop for {self.user_id}") + loop = asyncio.get_event_loop() - task = loop.create_task(self.loop()) + timeout = 30000 + + sync_filter = { + "room": { + "state": {"lazy_load_members": True} + } + } + + task = loop.create_task(self.sync_forever(timeout, sync_filter)) self.task = task return task - async def _to_device(self, message): - response = await self.to_device(message) - return message, response - async def confirm_sas(self, message): user_id = message.user_id device_id = message.device_id @@ -166,101 +200,15 @@ class PanClient(AsyncClient): else: await self.send_info(f"Waiting for {device.user_id} to confirm...") - async def send_to_device_messages(self): - if not self.outgoing_to_device_messages: - return - - tasks = [] - - for message in self.outgoing_to_device_messages: - task = asyncio.create_task(self._to_device(message)) - tasks.append(task) - - await asyncio.gather(*tasks) - - async def loop(self): - self.loop_running = True - self.loop_stopped.clear() - self.synced.clear() - - logger.info(f"Starting sync loop for {self.user_id}") - - while True: - try: - if not self.logged_in: - # TODO login - pass - - response = await self.sync( - 30000, - sync_filter={ - "room": { - "state": {"lazy_load_members": True} - } - } - ) - - if response.transport_response.status != 200: - await asyncio.sleep(5) - continue - - await self.send_to_device_messages() - - try: - await asyncio.gather(*self.key_verificatins_tasks) - except LocalProtocolError as e: - logger.info(e) - - self.key_verificatins_tasks = [] - - await asyncio.gather(*self.key_request_tasks) - - if self.should_upload_keys: - await self.keys_upload() - - if self.should_query_keys: - key_query_response = await self.keys_query() - if isinstance(key_query_response, KeysQueryResponse): - self.verify_devices(key_query_response.changed) - message = DevicesMessage( - self.user_id, - key_query_response.changed - ) - await self.queue.put(message) - - if not isinstance(response, SyncResponse): - # TODO error handling - pass - - self.synced.set() - self.synced.clear() - - except asyncio.CancelledError: - logger.info("Stopping the sync loop") - self._loop_stop() - break - - except ( - ClientConnectionError, - ConnectionRefusedError - ): - try: - await asyncio.sleep(5) - except asyncio.CancelledError: - self._loop_stop() - break - - def _loop_stop(self): - self.loop_running = False - self.loop_stopped.set() - async def loop_stop(self): """Stop the client loop.""" + logger.info("Stopping the sync loop") + if not self.task or self.task.done(): return self.task.cancel() - await self.loop_stopped.wait() + await self.task async def encrypt(self, room_id, msgtype, content): try: