daemon: Allow device verification over the dbus API.

This commit is contained in:
Damir Jelić 2019-04-18 13:01:10 +02:00
parent 1613a2fa5a
commit e32a73a011
4 changed files with 79 additions and 31 deletions

View File

@ -83,7 +83,6 @@ class PanClient(AsyncClient):
self.verify_devices(key_query_response.changed) self.verify_devices(key_query_response.changed)
message = DevicesMessage( message = DevicesMessage(
self.user_id, self.user_id,
self.device_id,
key_query_response.changed key_query_response.changed
) )
await self.queue.put(message) await self.queue.put(message)

View File

@ -25,20 +25,22 @@ from nio import EncryptionError, GroupEncryptionError, LoginResponse
from pantalaimon.client import PanClient from pantalaimon.client import PanClient
from pantalaimon.log import logger from pantalaimon.log import logger
from pantalaimon.store import ClientInfo, PanStore from pantalaimon.store import ClientInfo, PanStore
from pantalaimon.ui import glib_loop, shutdown_glib_loop from pantalaimon.ui import glib_loop, shutdown_glib_loop, DeviceVerifyMessage
@attr.s @attr.s
class ProxyDaemon: class ProxyDaemon:
homeserver = attr.ib() homeserver = attr.ib()
data_dir = attr.ib() data_dir = attr.ib()
queue = attr.ib() send_queue = attr.ib()
recv_queue = attr.ib()
proxy = attr.ib(default=None) proxy = attr.ib(default=None)
ssl = attr.ib(default=None) ssl = attr.ib(default=None)
store = attr.ib(type=PanStore, init=False) store = attr.ib(type=PanStore, init=False)
homeserver_url = attr.ib(init=False, default=attr.Factory(dict)) homeserver_url = attr.ib(init=False, default=attr.Factory(dict))
pan_clients = attr.ib(init=False, default=attr.Factory(dict)) pan_clients = attr.ib(init=False, default=attr.Factory(dict))
queue_task = attr.ib(init=False)
client_info = attr.ib( client_info = attr.ib(
init=False, init=False,
default=attr.Factory(dict), default=attr.Factory(dict),
@ -70,7 +72,7 @@ class ProxyDaemon:
pan_client = PanClient( pan_client = PanClient(
self.homeserver_url, self.homeserver_url,
self.queue, self.send_queue,
user_id, user_id,
device_id, device_id,
store_path=self.data_dir, store_path=self.data_dir,
@ -84,6 +86,36 @@ class ProxyDaemon:
pan_client.start_loop() pan_client.start_loop()
loop = asyncio.get_event_loop()
self.queue_task = loop.create_task(self.queue_loop())
async def queue_loop(self):
message = await self.recv_queue.get()
logger.debug(f"Daemon got message {message}")
if isinstance(message, DeviceVerifyMessage):
client = self.pan_clients.get(message.pan_user, None)
if not client:
return
device = client.device_store[message.user_id].get(
message.device_id,
None
)
if not device:
return
ret = client.verify_device(device)
if ret:
logger.info(f"Device {message.device_id} of user "
f"{message.user_id} succesfully verified")
else:
logger.info(f"Device {message.device_id} of user "
f"{message.user_id} already verified")
def get_access_token(self, request): def get_access_token(self, request):
# type: (aiohttp.web.BaseRequest) -> str # type: (aiohttp.web.BaseRequest) -> str
"""Extract the access token from the request. """Extract the access token from the request.
@ -180,7 +212,7 @@ class ProxyDaemon:
pan_client = PanClient( pan_client = PanClient(
self.homeserver_url, self.homeserver_url,
self.queue, self.send_queue,
user, user,
store_path=self.data_dir, store_path=self.data_dir,
ssl=self.ssl, ssl=self.ssl,
@ -425,8 +457,10 @@ class ProxyDaemon:
await self.default_session.close() await self.default_session.close()
self.default_session = None self.default_session = None
self.queue_task.cancel()
async def init(homeserver, http_proxy, ssl, queue):
async def init(homeserver, http_proxy, ssl, send_queue, recv_queue):
"""Initialize the proxy and the http server.""" """Initialize the proxy and the http server."""
data_dir = user_data_dir("pantalaimon", "") data_dir = user_data_dir("pantalaimon", "")
@ -438,7 +472,8 @@ async def init(homeserver, http_proxy, ssl, queue):
proxy = ProxyDaemon( proxy = ProxyDaemon(
homeserver, homeserver,
data_dir, data_dir,
queue=queue, send_queue=send_queue,
recv_queue=recv_queue,
proxy=http_proxy, proxy=http_proxy,
ssl=ssl, ssl=ssl,
) )
@ -639,19 +674,27 @@ def start(
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop) pan_queue = janus.Queue(loop=loop)
ui_queue = janus.Queue(loop=loop)
proxy, app = loop.run_until_complete(init( proxy, app = loop.run_until_complete(init(
homeserver, homeserver,
proxy.geturl() if proxy else None, proxy.geturl() if proxy else None,
ssl, ssl,
queue.async_q pan_queue.async_q,
ui_queue.async_q
)) ))
data_dir = user_data_dir("pantalaimon", "") data_dir = user_data_dir("pantalaimon", "")
fut = loop.run_in_executor(None, glib_loop, queue.sync_q, data_dir) fut = loop.run_in_executor(
None,
glib_loop,
pan_queue.sync_q,
ui_queue.sync_q,
data_dir
)
kill_glib = partial(shutdown_glib_loop, fut, queue.async_q) kill_glib = partial(shutdown_glib_loop, fut, pan_queue.async_q)
app.on_shutdown.append(proxy.shutdown) app.on_shutdown.append(proxy.shutdown)
app.on_shutdown.append(kill_glib) app.on_shutdown.append(kill_glib)

View File

@ -214,7 +214,7 @@ class PanStore:
@use_database @use_database
def load_all_devices(self): def load_all_devices(self):
# type (str, str) -> Dict[str, Dict[str, DeviceStore]] # type (str, str) -> Dict[str, Dict[str, DeviceStore]]
store = defaultdict(dict) store = dict()
query = Accounts.select() query = Accounts.select()
@ -239,6 +239,6 @@ class PanStore:
"trust_state": trust_state.name "trust_state": trust_state.name
}) })
store[account.user_id][account.device_id] = device_store store[account.user_id] = device_store
return store return store

View File

@ -27,22 +27,21 @@ class ShutDownMessage(Message):
@attr.s @attr.s
class DevicesMessage(Message): class DevicesMessage(Message):
user_id = attr.ib() user_id = attr.ib()
device_id = attr.ib()
devices = attr.ib() devices = attr.ib()
@attr.s @attr.s
class DeviceVerifyMessage(Message): class DeviceVerifyMessage(Message):
pan_user = attr.ib()
user_id = attr.ib() user_id = attr.ib()
device_id = attr.ib() device_id = attr.ib()
device_user = attr.ib()
device_device_id = attr.ib()
class Devices(dbus.service.Object): class Devices(dbus.service.Object):
def __init__(self, bus_name, device_list): def __init__(self, bus_name, queue, device_list):
super().__init__(bus_name, "/org/pantalaimon/Devices") super().__init__(bus_name, "/org/pantalaimon/Devices")
self.device_list = device_list self.device_list = device_list
self.queue = queue
@dbus.service.method("org.pantalaimon.devices.list", @dbus.service.method("org.pantalaimon.devices.list",
out_signature="a{sa{saa{ss}}}") out_signature="a{sa{saa{ss}}}")
@ -50,33 +49,39 @@ class Devices(dbus.service.Object):
return self.device_list return self.device_list
@dbus.service.method("org.pantalaimon.devices.verify", @dbus.service.method("org.pantalaimon.devices.verify",
in_signature="ssss") in_signature="sss")
def verify(self, user_id, device_id, devices_user, devices_id): def verify(self, pan_user, user_id, device_id):
device_store = self.device_list[user_id].get(device_id, None) device_store = self.device_list.get(pan_user)
if not device_store: if not device_store:
logger.debug(f"Not verifying device, no store found for user " logger.debug(f"Not verifying device, no store found for user "
f"{user_id}") f"{pan_user}")
return return
logger.debug(f"Verifying device {devices_user} {devices_id}") logger.debug(f"Verifying device {user_id} {device_id}")
message = DeviceVerifyMessage(
pan_user,
user_id,
device_id
)
self.queue.put(message)
return return
@dbus.service.method("org.pantalaimon.devices.start_verification", @dbus.service.method("org.pantalaimon.devices.start_verification",
in_signature="ssss") in_signature="sss")
def start_verify(self, user_id, device_id, devices_user, devices_id): def start_verify(self, pan_user, user_id, device_id):
device_store = self.device_list[user_id].get(device_id, None) device_store = self.device_list.get(pan_user)
if not device_store: if not device_store:
logger.info(f"Not verifying device, no store found for user " logger.info(f"Not verifying device, no store found for user "
f"{user_id}") f"{user_id}")
return return
logger.info(f"Verifying device {devices_user} {devices_id}") logger.info(f"Verifying device {user_id} {device_id}")
return return
def update_devices(self, message): def update_devices(self, message):
device_store = self.device_list[message.user_id][message.device_id] device_store = self.device_list[message.user_id]
for user_id, device_dict in message.devices.items(): for user_id, device_dict in message.devices.items():
for device in device_dict.values(): for device in device_dict.values():
@ -108,7 +113,7 @@ class Users(dbus.service.Object):
return return
def glib_loop(queue, data_dir): def glib_loop(receive_queue, send_queue, data_dir):
DBusGMainLoop(set_as_default=True) DBusGMainLoop(set_as_default=True)
loop = GLib.MainLoop() loop = GLib.MainLoop()
@ -122,23 +127,24 @@ def glib_loop(queue, data_dir):
# TODO update bus data if the asyncio thread tells us so. # TODO update bus data if the asyncio thread tells us so.
Users(bus_name, users) Users(bus_name, users)
device_bus = Devices(bus_name, devices) device_bus = Devices(bus_name, send_queue, devices)
def message_callback(): def message_callback():
try: try:
message = queue.get_nowait() message = receive_queue.get_nowait()
except Empty: except Empty:
return True return True
logger.info(f"Dbus loop received message {message}") logger.info(f"Dbus loop received message {message}")
if isinstance(message, ShutDownMessage): if isinstance(message, ShutDownMessage):
queue.task_done() receive_queue.task_done()
loop.quit() loop.quit()
return False return False
elif isinstance(message, DevicesMessage): elif isinstance(message, DevicesMessage):
device_bus.update_devices(message) device_bus.update_devices(message)
receive_queue.task_done()
return True return True