mirror of
https://github.com/markqvist/Reticulum.git
synced 2025-01-18 10:57:07 -05:00
148 lines
4.3 KiB
Python
148 lines
4.3 KiB
Python
|
from base64 import b64decode, b64encode, b32encode
|
||
|
from hashlib import sha256
|
||
|
import struct
|
||
|
import re
|
||
|
|
||
|
|
||
|
I2P_B64_CHARS = "-~"
|
||
|
|
||
|
def i2p_b64encode(x):
|
||
|
"""Encode I2P destination"""
|
||
|
return b64encode(x, altchars=I2P_B64_CHARS.encode()).decode()
|
||
|
|
||
|
def i2p_b64decode(x):
|
||
|
"""Decode I2P destination"""
|
||
|
return b64decode(x, altchars=I2P_B64_CHARS, validate=True)
|
||
|
|
||
|
SAM_BUFSIZE = 4096
|
||
|
DEFAULT_ADDRESS = ("127.0.0.1", 7656)
|
||
|
DEFAULT_MIN_VER = "3.1"
|
||
|
DEFAULT_MAX_VER = "3.1"
|
||
|
TRANSIENT_DESTINATION = "TRANSIENT"
|
||
|
|
||
|
VALID_BASE32_ADDRESS = re.compile(r"^([a-zA-Z0-9]{52}).b32.i2p$")
|
||
|
VALID_BASE64_ADDRESS = re.compile(r"^([a-zA-Z0-9-~=]{516,528})$")
|
||
|
|
||
|
class Message(object):
|
||
|
"""Parse SAM message to an object"""
|
||
|
def __init__(self, s):
|
||
|
self.opts = {}
|
||
|
if type(s) != str:
|
||
|
self._reply_string = s.decode().strip()
|
||
|
else:
|
||
|
self._reply_string = s
|
||
|
|
||
|
self.cmd, self.action, opts = self._reply_string.split(" ", 2)
|
||
|
for v in opts.split(" "):
|
||
|
data = v.split("=", 1) if "=" in v else (v, True)
|
||
|
self.opts[data[0]] = data[1]
|
||
|
|
||
|
def __getitem__(self, key):
|
||
|
return self.opts[key]
|
||
|
|
||
|
@property
|
||
|
def ok(self):
|
||
|
return self["RESULT"] == "OK"
|
||
|
|
||
|
def __repr__(self):
|
||
|
return self._reply_string
|
||
|
|
||
|
|
||
|
# SAM request messages
|
||
|
|
||
|
def hello(min_version, max_version):
|
||
|
return "HELLO VERSION MIN={} MAX={}\n".format(min_version,
|
||
|
max_version).encode()
|
||
|
|
||
|
def session_create(style, session_id, destination, options=""):
|
||
|
return "SESSION CREATE STYLE={} ID={} DESTINATION={} {}\n".format(
|
||
|
style, session_id, destination, options).encode()
|
||
|
|
||
|
|
||
|
def stream_connect(session_id, destination, silent="false"):
|
||
|
return "STREAM CONNECT ID={} DESTINATION={} SILENT={}\n".format(
|
||
|
session_id, destination, silent).encode()
|
||
|
|
||
|
def stream_accept(session_id, silent="false"):
|
||
|
return "STREAM ACCEPT ID={} SILENT={}\n".format(session_id, silent).encode()
|
||
|
|
||
|
def stream_forward(session_id, port, options=""):
|
||
|
return "STREAM FORWARD ID={} PORT={} {}\n".format(
|
||
|
session_id, port, options).encode()
|
||
|
|
||
|
|
||
|
|
||
|
def naming_lookup(name):
|
||
|
return "NAMING LOOKUP NAME={}\n".format(name).encode()
|
||
|
|
||
|
def dest_generate(signature_type):
|
||
|
return "DEST GENERATE SIGNATURE_TYPE={}\n".format(signature_type).encode()
|
||
|
|
||
|
class Destination(object):
|
||
|
"""I2P destination
|
||
|
|
||
|
https://geti2p.net/spec/common-structures#destination
|
||
|
|
||
|
:param data: (optional) Base64 encoded data or binary data
|
||
|
:param path: (optional) A path to a file with binary data
|
||
|
:param has_private_key: (optional) Does data have a private key?
|
||
|
"""
|
||
|
|
||
|
ECDSA_SHA256_P256 = 1
|
||
|
ECDSA_SHA384_P384 = 2
|
||
|
ECDSA_SHA512_P521 = 3
|
||
|
EdDSA_SHA512_Ed25519 = 7
|
||
|
|
||
|
default_sig_type = EdDSA_SHA512_Ed25519
|
||
|
|
||
|
_pubkey_size = 256
|
||
|
_signkey_size = 128
|
||
|
_min_cert_size = 3
|
||
|
|
||
|
def __init__(self, data=None, path=None, has_private_key=False):
|
||
|
#: Binary destination
|
||
|
self.data = bytes()
|
||
|
#: Base64 encoded destination
|
||
|
self.base64 = ""
|
||
|
#: :class:`RNS.vendor.i2plib.PrivateKey` instance or None
|
||
|
self.private_key = None
|
||
|
|
||
|
if path:
|
||
|
with open(path, "rb") as f: data = f.read()
|
||
|
|
||
|
if data and has_private_key:
|
||
|
self.private_key = PrivateKey(data)
|
||
|
|
||
|
cert_len = struct.unpack("!H", self.private_key.data[385:387])[0]
|
||
|
data = self.private_key.data[:387+cert_len]
|
||
|
|
||
|
if not data:
|
||
|
raise Exception("Can't create a destination with no data")
|
||
|
|
||
|
self.data = data if type(data) == bytes else i2p_b64decode(data)
|
||
|
self.base64 = data if type(data) == str else i2p_b64encode(data)
|
||
|
|
||
|
def __repr__(self):
|
||
|
return "<Destination: {}>".format(self.base32)
|
||
|
|
||
|
@property
|
||
|
def base32(self):
|
||
|
"""Base32 destination hash of this destination"""
|
||
|
desthash = sha256(self.data).digest()
|
||
|
return b32encode(desthash).decode()[:52].lower()
|
||
|
|
||
|
class PrivateKey(object):
|
||
|
"""I2P private key
|
||
|
|
||
|
https://geti2p.net/spec/common-structures#keysandcert
|
||
|
|
||
|
:param data: Base64 encoded data or binary data
|
||
|
"""
|
||
|
|
||
|
def __init__(self, data):
|
||
|
#: Binary private key
|
||
|
self.data = data if type(data) == bytes else i2p_b64decode(data)
|
||
|
#: Base64 encoded private key
|
||
|
self.base64 = data if type(data) == str else i2p_b64encode(data)
|
||
|
|