Added config parsing and announce jobs to lxmd

This commit is contained in:
Mark Qvist 2022-10-22 20:12:54 +02:00
parent 8cdf12e866
commit 8bb21688e5

View File

@ -22,39 +22,137 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
DEFFERED_JOBS_DELAY = 10
JOBS_INTERVAL = 5
import RNS
import LXMF
import argparse
import threading
import time
import os
from LXMF._version import __version__
from RNS.vendor.configobj import ConfigObj
configpath = None
ignoredpath = None
identitypath = None
storagedir = None
targetloglevel = None
identity = None
lxmd_config = None
message_router = None
lxmf_destination = None
active_configuration = {}
last_peer_announce = None
last_node_announce = None
def create_default_config(configpath):
lxmd_config = ConfigObj(__default_lxmd_config__.splitlines())
lxmd_config.filename = configpath
lxmd_config.write()
def apply_config():
# TODO: Apply configuration
active_configuration["display_name"] = "Anonymous Peer"
active_configuration["enable_propagation_node"] = True
active_configuration["message_storage_limit"] = 2000
active_configuration["prioritised_lxmf_destinations"] = []
active_configuration["ignored_lxmf_destinations"] = []
global active_configuration, targetloglevel
try:
# Load peer settings
if "lxmf" in lxmd_config and "display_name" in lxmd_config["lxmf"]:
active_configuration["display_name"] = lxmd_config["lxmf"]["display_name"]
else:
active_configuration["display_name"] = "Anonymous Peer"
if "lxmf" in lxmd_config and "announce_at_start" in lxmd_config["lxmf"]:
active_configuration["peer_announce_at_start"] = lxmd_config["lxmf"].as_bool("announce_at_start")
else:
active_configuration["peer_announce_at_start"] = False
if "lxmf" in lxmd_config and "announce_interval" in lxmd_config["lxmf"]:
active_configuration["peer_announce_interval"] = lxmd_config["lxmf"].as_int("announce_interval")*60
else:
active_configuration["peer_announce_interval"] = None
if "lxmf" in lxmd_config and "on_inbound" in lxmd_config["lxmf"]:
active_configuration["on_inbound"] = lxmd_config["lxmf"]["on_inbound"]
else:
active_configuration["on_inbound"] = None
# Load propagation node settings
if "propagation" in lxmd_config and "enable_node" in lxmd_config["propagation"]:
active_configuration["enable_propagation_node"] = lxmd_config["propagation"].as_bool("enable_node")
else:
active_configuration["enable_propagation_node"] = False
if "propagation" in lxmd_config and "announce_at_start" in lxmd_config["propagation"]:
active_configuration["node_announce_at_start"] = lxmd_config["propagation"].as_bool("announce_at_start")
else:
active_configuration["node_announce_at_start"] = False
if "propagation" in lxmd_config and "announce_interval" in lxmd_config["propagation"]:
active_configuration["node_announce_interval"] = lxmd_config["propagation"].as_int("announce_interval")*60
else:
active_configuration["node_announce_interval"] = None
if "propagation" in lxmd_config and "message_storage_limit" in lxmd_config["propagation"]:
active_configuration["message_storage_limit"] = lxmd_config["propagation"].as_float("message_storage_limit")
if active_configuration["message_storage_limit"] < 0.005:
active_configuration["message_storage_limit"] = 0.005
else:
active_configuration["message_storage_limit"] = 2000
if "propagation" in lxmd_config and "prioritise_destinations" in lxmd_config["propagation"]:
active_configuration["prioritised_lxmf_destinations"] = lxmd_config["propagation"].as_list("prioritise_destinations")
else:
active_configuration["prioritised_lxmf_destinations"] = []
# Load various settings
if "logging" in lxmd_config and "loglevel" in lxmd_config["logging"]:
targetloglevel = lxmd_config["logging"].as_int("loglevel")
active_configuration["ignored_lxmf_destinations"] = []
if os.path.isfile(ignoredpath):
try:
fh = open(ignoredpath, "rb")
ignored_input = fh.read()
fh.close()
ignored_hash_strs = ignored_input.splitlines()
for hash_str in ignored_hash_strs:
if len(hash_str) == RNS.Identity.TRUNCATED_HASHLENGTH//8*2:
try:
ignored_hash = bytes.fromhex(hash_str.decode("utf-8"))
active_configuration["ignored_lxmf_destinations"].append(ignored_hash)
except Exception as e:
RNS.log("Could not decode RNS Identity hash from: "+str(hash_str), RNS.LOG_DEBUG)
RNS.log("The contained exception was: "+str(e), RNS.LOG_DEBUG)
except Exception as e:
RNS.log("Error while loading list of ignored destinations: "+str(e), RNS.LOG_ERROR)
except Exception as e:
RNS.log("Could not apply LXM Daemon configuration. The contained exception was: "+str(e), RNS.LOG_ERROR)
raise e
exit(3)
def lxmf_delivery(lxm):
# TODO: Implement delivery callback
pass
global active_configuration
RNS.log("Received "+str(lxm), RNS.LOG_DEBUG)
if active_configuration["on_inbound"]:
RNS.log("Calling external program to handle message", RNS.LOG_DEBUG)
else:
RNS.log("No action defined for inbound messages, ignoring", RNS.LOG_EXTREME)
def program_setup(configdir = None, rnsconfigdir = None, run_pn = False, on_inbound = None, verbosity = 0, quietness = 0, service = False):
global configpath, ignoredpath, identitypath, storagedir
global lxmd_config, active_configuration, targetloglevel
global message_router, lxmf_destination
def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0, service = False):
targetloglevel = 3+verbosity-quietness
if service:
@ -73,6 +171,7 @@ def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0,
configdir = RNS.Reticulum.userdir+"/.lxmd"
configpath = configdir+"/config"
ignoredpath = configdir+"/ignored"
identitypath = configdir+"/identity"
storagedir = configdir+"/storage"
@ -107,7 +206,7 @@ def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0,
RNS.log("Loaded Primary Identity %s" % (str(identity)))
else:
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
nomadnet.panic()
exit(4)
except Exception as e:
RNS.log("Could not load the Primary Identity from "+identitypath, RNS.LOG_ERROR)
RNS.log("The contained exception was: %s" % (str(e)), RNS.LOG_ERROR)
@ -141,7 +240,7 @@ def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0,
RNS.log("LXMF Router ready to receive on "+RNS.prettyhexrep(lxmf_destination.hash))
if active_configuration["enable_propagation_node"]:
if run_pn or active_configuration["enable_propagation_node"]:
message_router.set_message_storage_limit(megabytes=active_configuration["message_storage_limit"])
for dest_str in active_configuration["prioritised_lxmf_destinations"]:
try:
@ -158,9 +257,49 @@ def program_setup(configdir, rnsconfigdir, run_pn, verbosity = 0, quietness = 0,
RNS.log("Started lxmd version {version}".format(version=__version__), RNS.LOG_NOTICE)
threading.Thread(target=deferred_start_jobs, daemon=True).start()
while True:
time.sleep(1)
def jobs():
global active_configuration, last_peer_announce, last_node_announce
global message_router, lxmf_destination
while True:
try:
if time.time() > last_peer_announce + active_configuration["peer_announce_interval"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
message_router.announce(lxmf_destination.hash)
last_peer_announce = time.time()
if time.time() > last_node_announce + active_configuration["node_announce_interval"]:
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
message_router.announce_propagation_node()
last_node_announce = time.time()
except Exception as e:
RNS.log("An error occurred while running periodic jobs. The contained exception was: "+str(e), RNS.LOG_ERROR)
time.sleep(JOBS_INTERVAL)
def deferred_start_jobs():
global active_configuration, last_peer_announce, last_node_announce
global message_router, lxmf_destination
time.sleep(DEFFERED_JOBS_DELAY)
RNS.log("Running deferred start jobs")
if active_configuration["peer_announce_at_start"]:
RNS.log("Sending announce for LXMF delivery destination", RNS.LOG_EXTREME)
message_router.announce(lxmf_destination.hash)
if active_configuration["node_announce_at_start"]:
RNS.log("Sending announce for LXMF Propagation Node", RNS.LOG_EXTREME)
message_router.announce_propagation_node()
last_peer_announce = time.time()
last_node_announce = time.time()
threading.Thread(target=jobs, daemon=True).start()
def main():
try:
parser = argparse.ArgumentParser(description="Lightweight Extensible Messaging Daemon")
@ -184,6 +323,7 @@ def main():
configdir = args.config,
rnsconfigdir=args.rnsconfig,
run_pn=args.propagation_node,
on_inbound=args.on_inbound,
verbosity=args.verbose,
quietness=args.quiet,
service=args.service
@ -245,6 +385,19 @@ display_name = Anonymous Peer
# destination when the LXM Daemon starts up.
announce_at_start = no
# You can also announce the delivery destination
# at a specified interval. This is not enabled by
# default.
# announce_interval = 360
# You can configure an external program to be run
# every time a message is received. The program
# will receive as an argument the full path to the
# message saved as a file. The example below will
# simply result in the message getting deleted as
# soon as it has been received.
# on_inbound = rm
[logging]
# Valid log levels are 0 through 7: