mirror of
https://github.com/matrix-org/pantalaimon.git
synced 2024-10-01 03:35:38 -04:00
pacntl: Switch to argparse and add more functionality.
This commit is contained in:
parent
6adedbaa0b
commit
a560462da7
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
import attr
|
import attr
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import argparse
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
from typing import List
|
from typing import List
|
||||||
@ -9,7 +10,8 @@ from typing import List
|
|||||||
from prompt_toolkit import PromptSession
|
from prompt_toolkit import PromptSession
|
||||||
from prompt_toolkit.eventloop.defaults import use_asyncio_event_loop
|
from prompt_toolkit.eventloop.defaults import use_asyncio_event_loop
|
||||||
from prompt_toolkit.patch_stdout import patch_stdout
|
from prompt_toolkit.patch_stdout import patch_stdout
|
||||||
from prompt_toolkit.completion import Completer, Completion
|
from prompt_toolkit.completion import Completer, Completion, PathCompleter
|
||||||
|
from prompt_toolkit.document import Document
|
||||||
|
|
||||||
import dbus
|
import dbus
|
||||||
from gi.repository import GLib
|
from gi.repository import GLib
|
||||||
@ -20,6 +22,68 @@ DBusGMainLoop(set_as_default=True)
|
|||||||
use_asyncio_event_loop()
|
use_asyncio_event_loop()
|
||||||
|
|
||||||
|
|
||||||
|
class ParseError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class PanctlArgParse(argparse.ArgumentParser):
|
||||||
|
def print_usage(self, file=None):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def error(self, message):
|
||||||
|
message = (
|
||||||
|
f"Error: {message} "
|
||||||
|
f"(see help)"
|
||||||
|
)
|
||||||
|
print(message)
|
||||||
|
raise ParseError
|
||||||
|
|
||||||
|
|
||||||
|
class PanctlParser():
|
||||||
|
def __init__(self):
|
||||||
|
self.parser = PanctlArgParse()
|
||||||
|
subparsers = self.parser.add_subparsers(dest="subcommand")
|
||||||
|
subparsers.add_parser("list-users")
|
||||||
|
|
||||||
|
start = subparsers.add_parser("start-verification")
|
||||||
|
start.add_argument("pan_user", type=str)
|
||||||
|
start.add_argument("user_id", type=str)
|
||||||
|
start.add_argument("device_id", type=str)
|
||||||
|
|
||||||
|
accept = subparsers.add_parser("accept-verification")
|
||||||
|
accept.add_argument("pan_user", type=str)
|
||||||
|
accept.add_argument("user_id", type=str)
|
||||||
|
accept.add_argument("device_id", type=str)
|
||||||
|
|
||||||
|
confirm = subparsers.add_parser("confirm-verification")
|
||||||
|
confirm.add_argument("pan_user", type=str)
|
||||||
|
confirm.add_argument("user_id", type=str)
|
||||||
|
confirm.add_argument("device_id", type=str)
|
||||||
|
|
||||||
|
verify = subparsers.add_parser("verify-device")
|
||||||
|
verify.add_argument("pan_user", type=str)
|
||||||
|
verify.add_argument("user_id", type=str)
|
||||||
|
verify.add_argument("device_id", type=str)
|
||||||
|
|
||||||
|
unverify = subparsers.add_parser("verify-device")
|
||||||
|
unverify.add_argument("pan_user", type=str)
|
||||||
|
unverify.add_argument("user_id", type=str)
|
||||||
|
unverify.add_argument("device_id", type=str)
|
||||||
|
|
||||||
|
import_keys = subparsers.add_parser("import-keys")
|
||||||
|
import_keys.add_argument("pan_user", type=str)
|
||||||
|
import_keys.add_argument("path", type=str)
|
||||||
|
import_keys.add_argument("passphrase", type=str)
|
||||||
|
|
||||||
|
export_keys = subparsers.add_parser("export-keys")
|
||||||
|
export_keys.add_argument("pan_user", type=str)
|
||||||
|
export_keys.add_argument("path", type=str)
|
||||||
|
export_keys.add_argument("passphrase", type=str)
|
||||||
|
|
||||||
|
def parse_args(self, argv):
|
||||||
|
return self.parser.parse_args(argv)
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
class PanCompleter(Completer):
|
class PanCompleter(Completer):
|
||||||
"""Completer for panctl commands."""
|
"""Completer for panctl commands."""
|
||||||
@ -27,6 +91,7 @@ class PanCompleter(Completer):
|
|||||||
commands = attr.ib(type=List[str])
|
commands = attr.ib(type=List[str])
|
||||||
ctl = attr.ib()
|
ctl = attr.ib()
|
||||||
devices = attr.ib()
|
devices = attr.ib()
|
||||||
|
path_completer = PathCompleter(expanduser=True)
|
||||||
|
|
||||||
def complete_commands(self, last_word):
|
def complete_commands(self, last_word):
|
||||||
"""Complete the available commands."""
|
"""Complete the available commands."""
|
||||||
@ -70,8 +135,7 @@ class PanCompleter(Completer):
|
|||||||
|
|
||||||
return compl_words
|
return compl_words
|
||||||
|
|
||||||
def complete_verification(self, command, last_word, words):
|
def complete_pan_users(self, last_word):
|
||||||
def complete_pan_users():
|
|
||||||
users = self.ctl.list_users(
|
users = self.ctl.list_users(
|
||||||
dbus_interface="org.pantalaimon.control"
|
dbus_interface="org.pantalaimon.control"
|
||||||
)
|
)
|
||||||
@ -80,8 +144,9 @@ class PanCompleter(Completer):
|
|||||||
for compl_word in compl_words:
|
for compl_word in compl_words:
|
||||||
yield Completion(compl_word, -len(last_word))
|
yield Completion(compl_word, -len(last_word))
|
||||||
|
|
||||||
|
def complete_verification(self, command, last_word, words):
|
||||||
if len(words) == 2:
|
if len(words) == 2:
|
||||||
return complete_pan_users()
|
return self.complete_pan_users(last_word)
|
||||||
elif len(words) == 3:
|
elif len(words) == 3:
|
||||||
pan_user = words[1]
|
pan_user = words[1]
|
||||||
return self.complete_users(last_word, pan_user)
|
return self.complete_users(last_word, pan_user)
|
||||||
@ -92,6 +157,24 @@ class PanCompleter(Completer):
|
|||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
def complete_key_file_cmds(
|
||||||
|
self,
|
||||||
|
document,
|
||||||
|
complete_event,
|
||||||
|
command,
|
||||||
|
last_word,
|
||||||
|
words
|
||||||
|
):
|
||||||
|
if len(words) == 2:
|
||||||
|
return self.complete_pan_users(last_word)
|
||||||
|
if len(words) == 3:
|
||||||
|
return self.path_completer.get_completions(
|
||||||
|
Document(last_word),
|
||||||
|
complete_event
|
||||||
|
)
|
||||||
|
|
||||||
|
return ""
|
||||||
|
|
||||||
def get_completions(self, document, complete_event):
|
def get_completions(self, document, complete_event):
|
||||||
"""Build the completions."""
|
"""Build the completions."""
|
||||||
text_before_cursor = document.text_before_cursor
|
text_before_cursor = document.text_before_cursor
|
||||||
@ -116,6 +199,18 @@ class PanCompleter(Completer):
|
|||||||
]:
|
]:
|
||||||
return self.complete_verification(command, last_word, words)
|
return self.complete_verification(command, last_word, words)
|
||||||
|
|
||||||
|
elif command in [
|
||||||
|
"export-keys",
|
||||||
|
"import-keys",
|
||||||
|
]:
|
||||||
|
return self.complete_key_file_cmds(
|
||||||
|
document,
|
||||||
|
complete_event,
|
||||||
|
command,
|
||||||
|
last_word,
|
||||||
|
words
|
||||||
|
)
|
||||||
|
|
||||||
return ""
|
return ""
|
||||||
|
|
||||||
|
|
||||||
@ -216,44 +311,26 @@ class PanCtl:
|
|||||||
print(" ", user, device)
|
print(" ", user, device)
|
||||||
|
|
||||||
def import_keys(self, args):
|
def import_keys(self, args):
|
||||||
try:
|
|
||||||
user, filepath, passphrase = args
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid arguments for command")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.ctl.import_keys(
|
self.ctl.import_keys(
|
||||||
user,
|
args.pan_user,
|
||||||
filepath,
|
args.path,
|
||||||
passphrase,
|
args.passphrase,
|
||||||
dbus_interface="org.pantalaimon.control"
|
dbus_interface="org.pantalaimon.control"
|
||||||
)
|
)
|
||||||
|
|
||||||
def export_keys(self, args):
|
def export_keys(self, args):
|
||||||
try:
|
|
||||||
user, filepath, passphrase = args
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid arguments for command")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.ctl.export_keys(
|
self.ctl.export_keys(
|
||||||
user,
|
args.pan_user,
|
||||||
filepath,
|
args.path,
|
||||||
passphrase,
|
args.passphrase,
|
||||||
dbus_interface="org.pantalaimon.control"
|
dbus_interface="org.pantalaimon.control"
|
||||||
)
|
)
|
||||||
|
|
||||||
def confirm_sas(self, args):
|
def confirm_sas(self, args):
|
||||||
try:
|
|
||||||
pan_user, user, device = args
|
|
||||||
except ValueError:
|
|
||||||
print("Invalid arguments for command")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.devices.confirm_sas(
|
self.devices.confirm_sas(
|
||||||
pan_user,
|
args.pan_user,
|
||||||
user,
|
args.user_id,
|
||||||
device,
|
args.device_id,
|
||||||
dbus_interface="org.pantalaimon.devices"
|
dbus_interface="org.pantalaimon.devices"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -269,36 +346,32 @@ class PanCtl:
|
|||||||
except EOFError:
|
except EOFError:
|
||||||
break
|
break
|
||||||
|
|
||||||
words = result.split(" ")
|
if not result:
|
||||||
|
|
||||||
if not words:
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
command = words[0]
|
parser = PanctlParser()
|
||||||
|
|
||||||
|
try:
|
||||||
|
parsed_args = parser.parse_args(result.split())
|
||||||
|
except ParseError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
command = parsed_args.subcommand
|
||||||
|
|
||||||
if command == "list-users":
|
if command == "list-users":
|
||||||
self.list_users()
|
self.list_users()
|
||||||
|
|
||||||
elif command == "export-keys":
|
elif command == "export-keys":
|
||||||
args = words[1:]
|
self.export_keys(parsed_args)
|
||||||
self.export_keys(args)
|
|
||||||
|
|
||||||
elif command == "import-keys":
|
elif command == "import-keys":
|
||||||
args = words[1:]
|
self.import_keys(parsed_args)
|
||||||
self.import_keys(args)
|
|
||||||
|
|
||||||
elif command == "accept-verification":
|
elif command == "accept-verification":
|
||||||
pass
|
pass
|
||||||
|
|
||||||
elif command == "confirm-verification":
|
elif command == "confirm-verification":
|
||||||
args = words[1:]
|
self.confirm_sas(parsed_args)
|
||||||
self.confirm_sas(args)
|
|
||||||
|
|
||||||
elif not command:
|
|
||||||
continue
|
|
||||||
|
|
||||||
else:
|
|
||||||
print(f"Unknown command {command}.")
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
Loading…
Reference in New Issue
Block a user