Handle AES-256 compatibility in Identity

This commit is contained in:
Mark Qvist 2025-05-06 16:12:15 +02:00
parent 3cbcbec942
commit 437da99d63
2 changed files with 63 additions and 61 deletions

View file

@ -37,7 +37,8 @@ from RNS.Cryptography import AES
from RNS.Cryptography.AES import AES_128_CBC from RNS.Cryptography.AES import AES_128_CBC
from RNS.Cryptography.AES import AES_256_CBC from RNS.Cryptography.AES import AES_256_CBC
# import RNS # TODO: Remove # TODO: Remove after migration
import RNS
class Token(): class Token():
""" """
@ -66,23 +67,11 @@ class Token():
self._signing_key = key[:16] self._signing_key = key[:16]
self._encryption_key = key[16:] self._encryption_key = key[16:]
####################################################################
self.mode_legacy = AES_128_CBC # TODO: Remove after migration
self._signing_key_128 = key[:16] # TODO: Remove after migration
self._encryption_key_128 = key[16:] # TODO: Remove after migration
####################################################################
elif len(key) == 64: elif len(key) == 64:
self.mode = AES_256_CBC self.mode = AES_256_CBC
self._signing_key = key[:32] self._signing_key = key[:32]
self._encryption_key = key[32:] self._encryption_key = key[32:]
####################################################################
self.mode_legacy = AES_128_CBC # TODO: Remove after migration
self._signing_key_128 = key[:16] # TODO: Remove after migration
self._encryption_key_128 = key[16:32] # TODO: Remove after migration
####################################################################
else: raise ValueError("Token key must be 128 or 256 bits, not "+str(len(key)*8)) else: raise ValueError("Token key must be 128 or 256 bits, not "+str(len(key)*8))
else: raise TypeError(f"Invalid token mode: {mode}") else: raise TypeError(f"Invalid token mode: {mode}")
@ -93,11 +82,8 @@ class Token():
else: else:
received_hmac = token[-32:] received_hmac = token[-32:]
expected_hmac = HMAC.new(self._signing_key, token[:-32]).digest() expected_hmac = HMAC.new(self._signing_key, token[:-32]).digest()
expected_hmac_128 = HMAC.new(self._signing_key_128, token[:-32]).digest() # TODO: Remove after migration
# TODO: Reset after migration if received_hmac == expected_hmac: return True
# if received_hmac == expected_hmac: return True
if received_hmac == expected_hmac or received_hmac == expected_hmac_128: return True
else: return False else: return False
@ -105,6 +91,7 @@ class Token():
if not isinstance(data, bytes): raise TypeError("Token plaintext input must be bytes") if not isinstance(data, bytes): raise TypeError("Token plaintext input must be bytes")
iv = os.urandom(16) iv = os.urandom(16)
# RNS.log(f"Encrypting with {self.mode}") # TODO: Remove
ciphertext = self.mode.encrypt( ciphertext = self.mode.encrypt(
plaintext = PKCS7.pad(data), plaintext = PKCS7.pad(data),
key = self._encryption_key, key = self._encryption_key,
@ -116,6 +103,7 @@ class Token():
def decrypt(self, token = None): def decrypt(self, token = None):
# RNS.log(f"Trying decryption with {self.mode}") # TODO: Remove
if not isinstance(token, bytes): raise TypeError("Token must be bytes") if not isinstance(token, bytes): raise TypeError("Token must be bytes")
if not self.verify_hmac(token): raise ValueError("Token HMAC was invalid") if not self.verify_hmac(token): raise ValueError("Token HMAC was invalid")
@ -123,30 +111,14 @@ class Token():
ciphertext = token[16:-32] ciphertext = token[16:-32]
try: try:
try: plaintext = PKCS7.unpad(
# RNS.log(f"Trying decryption with {self.mode}", RNS.LOG_DEBUG) # TODO: Remove self.mode.decrypt(
plaintext = PKCS7.unpad( ciphertext = ciphertext,
self.mode.decrypt( key = self._encryption_key,
ciphertext = ciphertext, iv = iv))
key = self._encryption_key,
iv = iv))
# RNS.log(f"Decrypted packet with {self.mode}", RNS.LOG_DEBUG) # TODO: Remove # RNS.log(f"Decrypted packet with {self.mode}") # TODO: Remove
return plaintext return plaintext
# TODO: Remove after migration ############################
except Exception as e:
# RNS.log(f"{self.mode} decryption failed", RNS.LOG_DEBUG) # TODO: Remove
# RNS.log(f"Trying decryption with {self.mode_legacy}", RNS.LOG_DEBUG) # TODO: Remove
plaintext = PKCS7.unpad(
self.mode_legacy.decrypt(
ciphertext = ciphertext,
key = self._encryption_key_128,
iv = iv))
# RNS.log(f"Decrypted packet with {self.mode_legacy}", RNS.LOG_DEBUG) # TODO: Remove
return plaintext
###########################################################
except Exception as e: except Exception as e:
RNS.trace_exception(e) # TODO: Remove after migration RNS.trace_exception(e) # TODO: Remove after migration

View file

@ -79,13 +79,16 @@ class Identity:
HASHLENGTH = 256 # In bits HASHLENGTH = 256 # In bits
SIGLENGTH = KEYSIZE # In bits SIGLENGTH = KEYSIZE # In bits
NAME_HASH_LENGTH = 80 NAME_HASH_LENGTH = 80
TRUNCATED_HASHLENGTH = RNS.Reticulum.TRUNCATED_HASHLENGTH TRUNCATED_HASHLENGTH = RNS.Reticulum.TRUNCATED_HASHLENGTH
""" """
Constant specifying the truncated hash length (in bits) used by Reticulum Constant specifying the truncated hash length (in bits) used by Reticulum
for addressable hashes and other purposes. Non-configurable. for addressable hashes and other purposes. Non-configurable.
""" """
DERIVED_KEY_LENGTH = 512//8
DERIVED_KEY_LENGTH_LEGACY = 256//8
# Storage # Storage
known_destinations = {} known_destinations = {}
known_ratchets = {} known_ratchets = {}
@ -544,8 +547,6 @@ class Identity:
RNS.log("The contained exception was: "+str(e)) RNS.log("The contained exception was: "+str(e))
def __init__(self,create_keys=True): def __init__(self,create_keys=True):
self.derived_key_length = 64
# Initialize keys to none # Initialize keys to none
self.prv = None self.prv = None
self.prv_bytes = None self.prv_bytes = None
@ -678,8 +679,20 @@ class Identity:
shared_key = ephemeral_key.exchange(target_public_key) shared_key = ephemeral_key.exchange(target_public_key)
# TODO: Reset after migration
# derived_key = RNS.Cryptography.hkdf(
# length=Identity.DERIVED_KEY_LENGTH,
# derive_from=shared_key,
# salt=self.get_salt(),
# context=self.get_context(),
# )
# Use legacy derived key length (AES-128) during migration by
# default. This allows AES-256 capable instances on RNS 0.9.5
# to still communicate with older versions. This migration
# handling will be removed in RNS 0.9.6.
derived_key = RNS.Cryptography.hkdf( derived_key = RNS.Cryptography.hkdf(
length=self.derived_key_length, length=Identity.DERIVED_KEY_LENGTH_LEGACY,
derive_from=shared_key, derive_from=shared_key,
salt=self.get_salt(), salt=self.get_salt(),
context=self.get_context(), context=self.get_context(),
@ -702,6 +715,37 @@ class Identity:
:returns: Plaintext as *bytes*, or *None* if decryption fails. :returns: Plaintext as *bytes*, or *None* if decryption fails.
:raises: *KeyError* if the instance does not hold a private key. :raises: *KeyError* if the instance does not hold a private key.
""" """
# This handles decryption during migration to AES-256 where
# older instances may still use AES-128. If decryption fails
# initially, AES-128 will be attempted as a fallback mode.
# This handler will be removed in RNS 0.9.6.
def migration_decrypt(shared_key, ciphertext):
try:
derived_key = RNS.Cryptography.hkdf(
length=Identity.DERIVED_KEY_LENGTH,
derive_from=shared_key,
salt=self.get_salt(),
context=self.get_context())
token = Token(derived_key)
plaintext = token.decrypt(ciphertext)
# TODO: Remove after migration
# If decryption fails, try legacy decryption mode
except Exception as e:
RNS.log("Decryption failed, attempting legacy mode fallback", RNS.LOG_DEBUG)
derived_key = RNS.Cryptography.hkdf(
length=Identity.DERIVED_KEY_LENGTH_LEGACY,
derive_from=shared_key,
salt=self.get_salt(),
context=self.get_context())
token = Token(derived_key)
plaintext = token.decrypt(ciphertext)
return plaintext
if self.prv != None: if self.prv != None:
if len(ciphertext_token) > Identity.KEYSIZE//8//2: if len(ciphertext_token) > Identity.KEYSIZE//8//2:
plaintext = None plaintext = None
@ -716,15 +760,8 @@ class Identity:
ratchet_prv = X25519PrivateKey.from_private_bytes(ratchet) ratchet_prv = X25519PrivateKey.from_private_bytes(ratchet)
ratchet_id = Identity._get_ratchet_id(ratchet_prv.public_key().public_bytes()) ratchet_id = Identity._get_ratchet_id(ratchet_prv.public_key().public_bytes())
shared_key = ratchet_prv.exchange(peer_pub) shared_key = ratchet_prv.exchange(peer_pub)
derived_key = RNS.Cryptography.hkdf( plaintext = migration_decrypt(shared_key, ciphertext)
length=self.derived_key_length,
derive_from=shared_key,
salt=self.get_salt(),
context=self.get_context(),
)
token = Token(derived_key)
plaintext = token.decrypt(ciphertext)
if ratchet_id_receiver: if ratchet_id_receiver:
ratchet_id_receiver.latest_ratchet_id = ratchet_id ratchet_id_receiver.latest_ratchet_id = ratchet_id
@ -741,15 +778,8 @@ class Identity:
if plaintext == None: if plaintext == None:
shared_key = self.prv.exchange(peer_pub) shared_key = self.prv.exchange(peer_pub)
derived_key = RNS.Cryptography.hkdf( plaintext = migration_decrypt(shared_key, ciphertext)
length=self.derived_key_length,
derive_from=shared_key,
salt=self.get_salt(),
context=self.get_context(),
)
token = Token(derived_key)
plaintext = token.decrypt(ciphertext)
if ratchet_id_receiver: if ratchet_id_receiver:
ratchet_id_receiver.latest_ratchet_id = None ratchet_id_receiver.latest_ratchet_id = None