Added paper message generator

This commit is contained in:
Mark Qvist 2022-11-19 17:54:29 +01:00
parent f75ea842d9
commit 3d31620160

View File

@ -2,6 +2,7 @@ import RNS
import RNS.vendor.umsgpack as msgpack import RNS.vendor.umsgpack as msgpack
import time import time
import base64
from .LXMF import APP_NAME from .LXMF import APP_NAME
@ -22,7 +23,8 @@ class LXMessage:
OPPORTUNISTIC = 0x01 OPPORTUNISTIC = 0x01
DIRECT = 0x02 DIRECT = 0x02
PROPAGATED = 0x03 PROPAGATED = 0x03
valid_methods = [OPPORTUNISTIC, DIRECT, PROPAGATED] PAPER = 0x05
valid_methods = [OPPORTUNISTIC, DIRECT, PROPAGATED, PAPER]
SOURCE_UNKNOWN = 0x01 SOURCE_UNKNOWN = 0x01
SIGNATURE_INVALID = 0x02 SIGNATURE_INVALID = 0x02
@ -76,6 +78,12 @@ class LXMessage:
ENCRYPTION_DESCRIPTION_EC = "Curve25519" ENCRYPTION_DESCRIPTION_EC = "Curve25519"
ENCRYPTION_DESCRIPTION_UNENCRYPTED = "Unencrypted" ENCRYPTION_DESCRIPTION_UNENCRYPTED = "Unencrypted"
# Constants for QR/URL encoding LXMs
URL_PROTO_SPECIFIER = "lxm"
QR_ERROR_CORRECTION = "ERROR_CORRECT_L"
QR_MAX_STORAGE = 2953
PAPER_MDU = ((QR_MAX_STORAGE-(len(URL_PROTO_SPECIFIER)+len("://")))*6)//8
def __str__(self): def __str__(self):
if self.hash != None: if self.hash != None:
return "<LXMessage "+RNS.hexrep(self.hash, delimit=False)+">" return "<LXMessage "+RNS.hexrep(self.hash, delimit=False)+">"
@ -196,6 +204,7 @@ class LXMessage:
self.timestamp = time.time() self.timestamp = time.time()
self.propagation_packed = None self.propagation_packed = None
self.paper_packed = None
self.payload = [self.timestamp, self.title, self.content, self.fields] self.payload = [self.timestamp, self.title, self.content, self.fields]
@ -263,6 +272,19 @@ class LXMessage:
self.method = self.desired_method self.method = self.desired_method
self.representation = LXMessage.RESOURCE self.representation = LXMessage.RESOURCE
elif self.desired_method == LXMessage.PAPER:
paper_content_limit = LXMessage.PAPER_MDU
encrypted_data = self.__destination.encrypt(self.packed[LXMessage.DESTINATION_LENGTH:])
self.paper_packed = msgpack.packb(self.packed[:LXMessage.DESTINATION_LENGTH]+encrypted_data)
content_size = len(self.paper_packed)
if content_size <= paper_content_limit:
self.method = self.desired_method
self.representation = LXMessage.PAPER
else:
raise TypeError("LXMessage desired paper delivery method, but content exceeds paper message maximum size.")
else: else:
raise ValueError("Attempt to re-pack LXMessage "+str(self)+" that was already packed") raise ValueError("Attempt to re-pack LXMessage "+str(self)+" that was already packed")
@ -327,6 +349,16 @@ class LXMessage:
else: else:
self.transport_encrypted = False self.transport_encrypted = False
self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_UNENCRYPTED self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_UNENCRYPTED
elif self.method == LXMessage.PAPER:
if self.__destination.type == RNS.Destination.SINGLE:
self.transport_encrypted = True
self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_EC
elif self.__destination.type == RNS.Destination.GROUP:
self.transport_encrypted = True
self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_AES
else:
self.transport_encrypted = False
self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_UNENCRYPTED
else: else:
self.transport_encrypted = False self.transport_encrypted = False
self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_UNENCRYPTED self.transport_encryption = LXMessage.ENCRYPTION_DESCRIPTION_UNENCRYPTED
@ -351,6 +383,16 @@ class LXMessage:
except Exception as e: except Exception as e:
RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR) RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
def __mark_paper_generated(self, receipt = None):
RNS.log("Paper message generation succeeded for "+str(self), RNS.LOG_DEBUG)
self.state = LXMessage.PAPER
if self.__delivery_callback != None and callable(self.__delivery_callback):
try:
self.__delivery_callback(self)
except Exception as e:
RNS.log("An error occurred in the external delivery callback for "+str(message), RNS.LOG_ERROR)
def __resource_concluded(self, resource): def __resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
self.__mark_delivered() self.__mark_delivered()
@ -441,6 +483,56 @@ class LXMessage:
RNS.log("Error while writing LXMF message to file \""+str(file_path)+"\". The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error while writing LXMF message to file \""+str(file_path)+"\". The contained exception was: "+str(e), RNS.LOG_ERROR)
return None return None
def as_url(self, finalise=True):
self.determine_transport_encryption()
if not self.packed:
self.pack()
if self.desired_method == LXMessage.PAPER and self.paper_packed != None:
# Encode packed LXM with URL-safe base64 and remove padding
encoded_bytes = base64.urlsafe_b64encode(self.paper_packed)
# Add protocol specifier and return
lxm_url = LXMessage.URL_PROTO_SPECIFIER+"://"+encoded_bytes.decode("utf-8").replace("=","")
# TODO: Remove
# RNS.log(str(len(lxm_url))+" byte LXM URL: "+str(lxm_url), RNS.LOG_EXTREME)
if finalise:
self.__mark_paper_generated()
return lxm_url
else:
raise TypeError("Attempt to represent LXM with non-paper delivery method as URL")
def as_qr(self):
self.determine_transport_encryption()
if not self.packed:
self.pack()
if self.desired_method == LXMessage.PAPER and self.paper_packed != None:
import importlib
if importlib.util.find_spec('qrcode') != None:
import qrcode
qr = qrcode.make(
error_correction = qrcode.constants.__dict__[LXMessage.QR_ERROR_CORRECTION],
border = 1,
data = self.as_url(finalise=False),
)
self.__mark_paper_generated()
return qr
else:
RNS.log("Generating QR-code representanions of LXMs requires the \"qrcode\" module to be installed.", RNS.LOG_CRITICAL)
RNS.log("You can install it with the command: python3 -m pip install qrcode", RNS.LOG_CRITICAL)
return None
else:
raise TypeError("Attempt to represent LXM with non-paper delivery method as QR-code")
@staticmethod @staticmethod
def unpack_from_bytes(lxmf_bytes): def unpack_from_bytes(lxmf_bytes):