client: Use coroutines for the event callbacks.

This commit is contained in:
Damir Jelić 2019-07-03 09:50:45 +02:00
parent 3e32b9544b
commit fa1c2bb694

View File

@ -175,11 +175,7 @@ class PanClient(AsyncClient):
RoomNameEvent,
),
)
self.key_verificatins_tasks = []
self.key_request_tasks = []
self.add_response_callback(self.keys_query_cb, KeysQueryResponse)
self.add_response_callback(self.sync_tasks, SyncResponse)
def store_message_cb(self, room, event):
@ -304,16 +300,6 @@ class PanClient(AsyncClient):
return
async def sync_tasks(self, response):
try:
await asyncio.gather(*self.key_verificatins_tasks)
except LocalProtocolError as e:
logger.info(e)
await asyncio.gather(*self.key_request_tasks)
self.key_verificatins_tasks = []
self.key_request_tasks = []
await self.index.commit_events()
self.pan_store.save_token(self.server_name, self.user_id, self.next_batch)
@ -337,9 +323,7 @@ class PanClient(AsyncClient):
if response.changed:
await self.send_update_devices(response.changed)
def undecrypted_event_cb(self, room, event):
loop = asyncio.get_event_loop()
async def undecrypted_event_cb(self, room, event):
logger.info(
"Unable to decrypt event from {} via {}.".format(
event.sender, event.device_id
@ -348,13 +332,15 @@ class PanClient(AsyncClient):
if event.session_id not in self.outgoing_key_requests:
logger.info("Requesting room key for undecrypted event.")
task = loop.create_task(self.request_room_key(event))
self.key_request_tasks.append(task)
def key_verification_cb(self, event):
# TODO we may want to retry this
try:
await self.request_room_key(event)
except ClientConnectionError:
pass
async def key_verification_cb(self, event):
logger.info("Received key verification event: {}".format(event))
loop = asyncio.get_event_loop()
if isinstance(event, KeyVerificationStart):
logger.info(
f"{event.sender} via {event.from_device} has started "
@ -365,8 +351,7 @@ class PanClient(AsyncClient):
self.user_id, event.sender, event.from_device, event.transaction_id
)
task = loop.create_task(self.queue.put(message))
self.key_verificatins_tasks.append(task)
await self.queue.put(message)
elif isinstance(event, KeyVerificationKey):
sas = self.key_verifications.get(event.transaction_id, None)
@ -380,8 +365,7 @@ class PanClient(AsyncClient):
self.user_id, device.user_id, device.id, sas.transaction_id, emoji
)
task = loop.create_task(self.queue.put(message))
self.key_verificatins_tasks.append(task)
await self.queue.put(message)
elif isinstance(event, KeyVerificationMac):
sas = self.key_verifications.get(event.transaction_id, None)
@ -390,16 +374,12 @@ class PanClient(AsyncClient):
device = sas.other_olm_device
if sas.verified:
task = loop.create_task(
self.send_message(
SasDoneSignal(
self.user_id, device.user_id, device.id, sas.transaction_id
)
await self.send_message(
SasDoneSignal(
self.user_id, device.user_id, device.id, sas.transaction_id
)
)
self.key_verificatins_tasks.append(task)
task = loop.create_task(self.send_update_device(device))
self.key_verificatins_tasks.append(task)
await self.send_update_device(device)
def start_loop(self):
"""Start a loop that runs forever and keeps on syncing with the server.