client: Use the new AsyncClient functionality from nio for syncing.

This commit is contained in:
Damir Jelić 2019-05-07 14:44:29 +02:00
parent c96bdea1d3
commit cf07cfdc16

View File

@ -48,6 +48,16 @@ class PanClient(AsyncClient):
self.key_verificatins_tasks = []
self.key_request_tasks = []
self.add_response_callback(
self.verify_devices,
KeysQueryResponse
)
self.add_response_callback(
self.sync_tasks,
SyncResponse
)
@property
def unable_to_decrypt(self):
"""Room event signaling that the message couldn't be decrypted."""
@ -65,8 +75,21 @@ class PanClient(AsyncClient):
message = InfoMessage(string)
await self.queue.put(message)
def verify_devices(self, changed_devices):
async def sync_tasks(self, response):
try:
await asyncio.gather(*self.key_verificatins_tasks)
except LocalProtocolError as e:
logger.info(e)
await asyncio.gather(*self.key_request_tasks)
self.key_verificatins_tasks = []
self.key_request_tasks = []
async def verify_devices(self, response):
# Verify new devices automatically for now.
changed_devices = response.changed
for user_id, device_dict in changed_devices.items():
for device in device_dict.values():
if device.deleted:
@ -76,6 +99,9 @@ class PanClient(AsyncClient):
"user {}".format(device.id, user_id))
self.verify_device(device)
message = DevicesMessage(self.user_id, response.changed)
await self.queue.put(message)
def undecrypted_event_cb(self, room, event):
loop = asyncio.get_event_loop()
@ -138,15 +164,23 @@ class PanClient(AsyncClient):
The loop can be stopped with the stop_loop() method.
"""
assert not self.task
logger.info(f"Starting sync loop for {self.user_id}")
loop = asyncio.get_event_loop()
task = loop.create_task(self.loop())
timeout = 30000
sync_filter = {
"room": {
"state": {"lazy_load_members": True}
}
}
task = loop.create_task(self.sync_forever(timeout, sync_filter))
self.task = task
return task
async def _to_device(self, message):
response = await self.to_device(message)
return message, response
async def confirm_sas(self, message):
user_id = message.user_id
device_id = message.device_id
@ -166,101 +200,15 @@ class PanClient(AsyncClient):
else:
await self.send_info(f"Waiting for {device.user_id} to confirm...")
async def send_to_device_messages(self):
if not self.outgoing_to_device_messages:
return
tasks = []
for message in self.outgoing_to_device_messages:
task = asyncio.create_task(self._to_device(message))
tasks.append(task)
await asyncio.gather(*tasks)
async def loop(self):
self.loop_running = True
self.loop_stopped.clear()
self.synced.clear()
logger.info(f"Starting sync loop for {self.user_id}")
while True:
try:
if not self.logged_in:
# TODO login
pass
response = await self.sync(
30000,
sync_filter={
"room": {
"state": {"lazy_load_members": True}
}
}
)
if response.transport_response.status != 200:
await asyncio.sleep(5)
continue
await self.send_to_device_messages()
try:
await asyncio.gather(*self.key_verificatins_tasks)
except LocalProtocolError as e:
logger.info(e)
self.key_verificatins_tasks = []
await asyncio.gather(*self.key_request_tasks)
if self.should_upload_keys:
await self.keys_upload()
if self.should_query_keys:
key_query_response = await self.keys_query()
if isinstance(key_query_response, KeysQueryResponse):
self.verify_devices(key_query_response.changed)
message = DevicesMessage(
self.user_id,
key_query_response.changed
)
await self.queue.put(message)
if not isinstance(response, SyncResponse):
# TODO error handling
pass
self.synced.set()
self.synced.clear()
except asyncio.CancelledError:
logger.info("Stopping the sync loop")
self._loop_stop()
break
except (
ClientConnectionError,
ConnectionRefusedError
):
try:
await asyncio.sleep(5)
except asyncio.CancelledError:
self._loop_stop()
break
def _loop_stop(self):
self.loop_running = False
self.loop_stopped.set()
async def loop_stop(self):
"""Stop the client loop."""
logger.info("Stopping the sync loop")
if not self.task or self.task.done():
return
self.task.cancel()
await self.loop_stopped.wait()
await self.task
async def encrypt(self, room_id, msgtype, content):
try: