From e32a73a0117e4e66ce51e65a4f541910826f30bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 18 Apr 2019 13:01:10 +0200 Subject: [PATCH] daemon: Allow device verification over the dbus API. --- pantalaimon/client.py | 1 - pantalaimon/daemon.py | 63 ++++++++++++++++++++++++++++++++++++------- pantalaimon/store.py | 4 +-- pantalaimon/ui.py | 42 ++++++++++++++++------------- 4 files changed, 79 insertions(+), 31 deletions(-) diff --git a/pantalaimon/client.py b/pantalaimon/client.py index b8e6fc7..82413ec 100644 --- a/pantalaimon/client.py +++ b/pantalaimon/client.py @@ -83,7 +83,6 @@ class PanClient(AsyncClient): self.verify_devices(key_query_response.changed) message = DevicesMessage( self.user_id, - self.device_id, key_query_response.changed ) await self.queue.put(message) diff --git a/pantalaimon/daemon.py b/pantalaimon/daemon.py index 9916157..47a4a40 100755 --- a/pantalaimon/daemon.py +++ b/pantalaimon/daemon.py @@ -25,20 +25,22 @@ from nio import EncryptionError, GroupEncryptionError, LoginResponse from pantalaimon.client import PanClient from pantalaimon.log import logger from pantalaimon.store import ClientInfo, PanStore -from pantalaimon.ui import glib_loop, shutdown_glib_loop +from pantalaimon.ui import glib_loop, shutdown_glib_loop, DeviceVerifyMessage @attr.s class ProxyDaemon: homeserver = attr.ib() data_dir = attr.ib() - queue = attr.ib() + send_queue = attr.ib() + recv_queue = attr.ib() proxy = attr.ib(default=None) ssl = attr.ib(default=None) store = attr.ib(type=PanStore, init=False) homeserver_url = attr.ib(init=False, default=attr.Factory(dict)) pan_clients = attr.ib(init=False, default=attr.Factory(dict)) + queue_task = attr.ib(init=False) client_info = attr.ib( init=False, default=attr.Factory(dict), @@ -70,7 +72,7 @@ class ProxyDaemon: pan_client = PanClient( self.homeserver_url, - self.queue, + self.send_queue, user_id, device_id, store_path=self.data_dir, @@ -84,6 +86,36 @@ class ProxyDaemon: pan_client.start_loop() + loop = asyncio.get_event_loop() + self.queue_task = loop.create_task(self.queue_loop()) + + async def queue_loop(self): + message = await self.recv_queue.get() + logger.debug(f"Daemon got message {message}") + + if isinstance(message, DeviceVerifyMessage): + client = self.pan_clients.get(message.pan_user, None) + + if not client: + return + + device = client.device_store[message.user_id].get( + message.device_id, + None + ) + + if not device: + return + + ret = client.verify_device(device) + + if ret: + logger.info(f"Device {message.device_id} of user " + f"{message.user_id} succesfully verified") + else: + logger.info(f"Device {message.device_id} of user " + f"{message.user_id} already verified") + def get_access_token(self, request): # type: (aiohttp.web.BaseRequest) -> str """Extract the access token from the request. @@ -180,7 +212,7 @@ class ProxyDaemon: pan_client = PanClient( self.homeserver_url, - self.queue, + self.send_queue, user, store_path=self.data_dir, ssl=self.ssl, @@ -425,8 +457,10 @@ class ProxyDaemon: await self.default_session.close() self.default_session = None + self.queue_task.cancel() -async def init(homeserver, http_proxy, ssl, queue): + +async def init(homeserver, http_proxy, ssl, send_queue, recv_queue): """Initialize the proxy and the http server.""" data_dir = user_data_dir("pantalaimon", "") @@ -438,7 +472,8 @@ async def init(homeserver, http_proxy, ssl, queue): proxy = ProxyDaemon( homeserver, data_dir, - queue=queue, + send_queue=send_queue, + recv_queue=recv_queue, proxy=http_proxy, ssl=ssl, ) @@ -639,19 +674,27 @@ def start( loop = asyncio.get_event_loop() - queue = janus.Queue(loop=loop) + pan_queue = janus.Queue(loop=loop) + ui_queue = janus.Queue(loop=loop) proxy, app = loop.run_until_complete(init( homeserver, proxy.geturl() if proxy else None, ssl, - queue.async_q + pan_queue.async_q, + ui_queue.async_q )) data_dir = user_data_dir("pantalaimon", "") - fut = loop.run_in_executor(None, glib_loop, queue.sync_q, data_dir) + fut = loop.run_in_executor( + None, + glib_loop, + pan_queue.sync_q, + ui_queue.sync_q, + data_dir + ) - kill_glib = partial(shutdown_glib_loop, fut, queue.async_q) + kill_glib = partial(shutdown_glib_loop, fut, pan_queue.async_q) app.on_shutdown.append(proxy.shutdown) app.on_shutdown.append(kill_glib) diff --git a/pantalaimon/store.py b/pantalaimon/store.py index 4ff6eb8..dfe9aea 100644 --- a/pantalaimon/store.py +++ b/pantalaimon/store.py @@ -214,7 +214,7 @@ class PanStore: @use_database def load_all_devices(self): # type (str, str) -> Dict[str, Dict[str, DeviceStore]] - store = defaultdict(dict) + store = dict() query = Accounts.select() @@ -239,6 +239,6 @@ class PanStore: "trust_state": trust_state.name }) - store[account.user_id][account.device_id] = device_store + store[account.user_id] = device_store return store diff --git a/pantalaimon/ui.py b/pantalaimon/ui.py index 847136f..909cff9 100644 --- a/pantalaimon/ui.py +++ b/pantalaimon/ui.py @@ -27,22 +27,21 @@ class ShutDownMessage(Message): @attr.s class DevicesMessage(Message): user_id = attr.ib() - device_id = attr.ib() devices = attr.ib() @attr.s class DeviceVerifyMessage(Message): + pan_user = attr.ib() user_id = attr.ib() device_id = attr.ib() - device_user = attr.ib() - device_device_id = attr.ib() class Devices(dbus.service.Object): - def __init__(self, bus_name, device_list): + def __init__(self, bus_name, queue, device_list): super().__init__(bus_name, "/org/pantalaimon/Devices") self.device_list = device_list + self.queue = queue @dbus.service.method("org.pantalaimon.devices.list", out_signature="a{sa{saa{ss}}}") @@ -50,33 +49,39 @@ class Devices(dbus.service.Object): return self.device_list @dbus.service.method("org.pantalaimon.devices.verify", - in_signature="ssss") - def verify(self, user_id, device_id, devices_user, devices_id): - device_store = self.device_list[user_id].get(device_id, None) + in_signature="sss") + def verify(self, pan_user, user_id, device_id): + device_store = self.device_list.get(pan_user) if not device_store: logger.debug(f"Not verifying device, no store found for user " - f"{user_id}") + f"{pan_user}") return - logger.debug(f"Verifying device {devices_user} {devices_id}") + logger.debug(f"Verifying device {user_id} {device_id}") + message = DeviceVerifyMessage( + pan_user, + user_id, + device_id + ) + self.queue.put(message) return @dbus.service.method("org.pantalaimon.devices.start_verification", - in_signature="ssss") - def start_verify(self, user_id, device_id, devices_user, devices_id): - device_store = self.device_list[user_id].get(device_id, None) + in_signature="sss") + def start_verify(self, pan_user, user_id, device_id): + device_store = self.device_list.get(pan_user) if not device_store: logger.info(f"Not verifying device, no store found for user " f"{user_id}") return - logger.info(f"Verifying device {devices_user} {devices_id}") + logger.info(f"Verifying device {user_id} {device_id}") return def update_devices(self, message): - device_store = self.device_list[message.user_id][message.device_id] + device_store = self.device_list[message.user_id] for user_id, device_dict in message.devices.items(): for device in device_dict.values(): @@ -108,7 +113,7 @@ class Users(dbus.service.Object): return -def glib_loop(queue, data_dir): +def glib_loop(receive_queue, send_queue, data_dir): DBusGMainLoop(set_as_default=True) loop = GLib.MainLoop() @@ -122,23 +127,24 @@ def glib_loop(queue, data_dir): # TODO update bus data if the asyncio thread tells us so. Users(bus_name, users) - device_bus = Devices(bus_name, devices) + device_bus = Devices(bus_name, send_queue, devices) def message_callback(): try: - message = queue.get_nowait() + message = receive_queue.get_nowait() except Empty: return True logger.info(f"Dbus loop received message {message}") if isinstance(message, ShutDownMessage): - queue.task_done() + receive_queue.task_done() loop.quit() return False elif isinstance(message, DevicesMessage): device_bus.update_devices(message) + receive_queue.task_done() return True