From 437da99d63b0d6c1b279c9810604f4cdb4e600d4 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Tue, 6 May 2025 16:12:15 +0200 Subject: [PATCH] Handle AES-256 compatibility in Identity --- RNS/Cryptography/Token.py | 52 +++++++--------------------- RNS/Identity.py | 72 +++++++++++++++++++++++++++------------ 2 files changed, 63 insertions(+), 61 deletions(-) diff --git a/RNS/Cryptography/Token.py b/RNS/Cryptography/Token.py index 1bb7bec..53cc27a 100644 --- a/RNS/Cryptography/Token.py +++ b/RNS/Cryptography/Token.py @@ -37,7 +37,8 @@ from RNS.Cryptography import AES from RNS.Cryptography.AES import AES_128_CBC from RNS.Cryptography.AES import AES_256_CBC -# import RNS # TODO: Remove +# TODO: Remove after migration +import RNS class Token(): """ @@ -66,23 +67,11 @@ class Token(): self._signing_key = key[:16] self._encryption_key = key[16:] - #################################################################### - self.mode_legacy = AES_128_CBC # TODO: Remove after migration - self._signing_key_128 = key[:16] # TODO: Remove after migration - self._encryption_key_128 = key[16:] # TODO: Remove after migration - #################################################################### - elif len(key) == 64: self.mode = AES_256_CBC self._signing_key = key[:32] self._encryption_key = key[32:] - #################################################################### - self.mode_legacy = AES_128_CBC # TODO: Remove after migration - self._signing_key_128 = key[:16] # TODO: Remove after migration - self._encryption_key_128 = key[16:32] # TODO: Remove after migration - #################################################################### - else: raise ValueError("Token key must be 128 or 256 bits, not "+str(len(key)*8)) else: raise TypeError(f"Invalid token mode: {mode}") @@ -93,11 +82,8 @@ class Token(): else: received_hmac = token[-32:] expected_hmac = HMAC.new(self._signing_key, token[:-32]).digest() - expected_hmac_128 = HMAC.new(self._signing_key_128, token[:-32]).digest() # TODO: Remove after migration - # TODO: Reset after migration - # if received_hmac == expected_hmac: return True - if received_hmac == expected_hmac or received_hmac == expected_hmac_128: return True + if received_hmac == expected_hmac: return True else: return False @@ -105,6 +91,7 @@ class Token(): if not isinstance(data, bytes): raise TypeError("Token plaintext input must be bytes") iv = os.urandom(16) + # RNS.log(f"Encrypting with {self.mode}") # TODO: Remove ciphertext = self.mode.encrypt( plaintext = PKCS7.pad(data), key = self._encryption_key, @@ -116,6 +103,7 @@ class Token(): def decrypt(self, token = None): + # RNS.log(f"Trying decryption with {self.mode}") # TODO: Remove if not isinstance(token, bytes): raise TypeError("Token must be bytes") if not self.verify_hmac(token): raise ValueError("Token HMAC was invalid") @@ -123,30 +111,14 @@ class Token(): ciphertext = token[16:-32] try: - try: - # RNS.log(f"Trying decryption with {self.mode}", RNS.LOG_DEBUG) # TODO: Remove - plaintext = PKCS7.unpad( - self.mode.decrypt( - ciphertext = ciphertext, - key = self._encryption_key, - iv = iv)) + plaintext = PKCS7.unpad( + self.mode.decrypt( + ciphertext = ciphertext, + key = self._encryption_key, + iv = iv)) - # RNS.log(f"Decrypted packet with {self.mode}", RNS.LOG_DEBUG) # TODO: Remove - return plaintext - - # TODO: Remove after migration ############################ - except Exception as e: - # RNS.log(f"{self.mode} decryption failed", RNS.LOG_DEBUG) # TODO: Remove - # RNS.log(f"Trying decryption with {self.mode_legacy}", RNS.LOG_DEBUG) # TODO: Remove - plaintext = PKCS7.unpad( - self.mode_legacy.decrypt( - ciphertext = ciphertext, - key = self._encryption_key_128, - iv = iv)) - - # RNS.log(f"Decrypted packet with {self.mode_legacy}", RNS.LOG_DEBUG) # TODO: Remove - return plaintext - ########################################################### + # RNS.log(f"Decrypted packet with {self.mode}") # TODO: Remove + return plaintext except Exception as e: RNS.trace_exception(e) # TODO: Remove after migration diff --git a/RNS/Identity.py b/RNS/Identity.py index 148454c..c1e17aa 100644 --- a/RNS/Identity.py +++ b/RNS/Identity.py @@ -79,13 +79,16 @@ class Identity: HASHLENGTH = 256 # In bits SIGLENGTH = KEYSIZE # In bits - NAME_HASH_LENGTH = 80 - TRUNCATED_HASHLENGTH = RNS.Reticulum.TRUNCATED_HASHLENGTH + NAME_HASH_LENGTH = 80 + TRUNCATED_HASHLENGTH = RNS.Reticulum.TRUNCATED_HASHLENGTH """ Constant specifying the truncated hash length (in bits) used by Reticulum for addressable hashes and other purposes. Non-configurable. """ + DERIVED_KEY_LENGTH = 512//8 + DERIVED_KEY_LENGTH_LEGACY = 256//8 + # Storage known_destinations = {} known_ratchets = {} @@ -544,8 +547,6 @@ class Identity: RNS.log("The contained exception was: "+str(e)) def __init__(self,create_keys=True): - self.derived_key_length = 64 - # Initialize keys to none self.prv = None self.prv_bytes = None @@ -678,8 +679,20 @@ class Identity: shared_key = ephemeral_key.exchange(target_public_key) + # TODO: Reset after migration + # derived_key = RNS.Cryptography.hkdf( + # length=Identity.DERIVED_KEY_LENGTH, + # derive_from=shared_key, + # salt=self.get_salt(), + # context=self.get_context(), + # ) + + # Use legacy derived key length (AES-128) during migration by + # default. This allows AES-256 capable instances on RNS 0.9.5 + # to still communicate with older versions. This migration + # handling will be removed in RNS 0.9.6. derived_key = RNS.Cryptography.hkdf( - length=self.derived_key_length, + length=Identity.DERIVED_KEY_LENGTH_LEGACY, derive_from=shared_key, salt=self.get_salt(), context=self.get_context(), @@ -702,6 +715,37 @@ class Identity: :returns: Plaintext as *bytes*, or *None* if decryption fails. :raises: *KeyError* if the instance does not hold a private key. """ + + # This handles decryption during migration to AES-256 where + # older instances may still use AES-128. If decryption fails + # initially, AES-128 will be attempted as a fallback mode. + # This handler will be removed in RNS 0.9.6. + def migration_decrypt(shared_key, ciphertext): + try: + derived_key = RNS.Cryptography.hkdf( + length=Identity.DERIVED_KEY_LENGTH, + derive_from=shared_key, + salt=self.get_salt(), + context=self.get_context()) + + token = Token(derived_key) + plaintext = token.decrypt(ciphertext) + + # TODO: Remove after migration + # If decryption fails, try legacy decryption mode + except Exception as e: + RNS.log("Decryption failed, attempting legacy mode fallback", RNS.LOG_DEBUG) + derived_key = RNS.Cryptography.hkdf( + length=Identity.DERIVED_KEY_LENGTH_LEGACY, + derive_from=shared_key, + salt=self.get_salt(), + context=self.get_context()) + + token = Token(derived_key) + plaintext = token.decrypt(ciphertext) + + return plaintext + if self.prv != None: if len(ciphertext_token) > Identity.KEYSIZE//8//2: plaintext = None @@ -716,15 +760,8 @@ class Identity: ratchet_prv = X25519PrivateKey.from_private_bytes(ratchet) ratchet_id = Identity._get_ratchet_id(ratchet_prv.public_key().public_bytes()) shared_key = ratchet_prv.exchange(peer_pub) - derived_key = RNS.Cryptography.hkdf( - length=self.derived_key_length, - derive_from=shared_key, - salt=self.get_salt(), - context=self.get_context(), - ) + plaintext = migration_decrypt(shared_key, ciphertext) - token = Token(derived_key) - plaintext = token.decrypt(ciphertext) if ratchet_id_receiver: ratchet_id_receiver.latest_ratchet_id = ratchet_id @@ -741,15 +778,8 @@ class Identity: if plaintext == None: shared_key = self.prv.exchange(peer_pub) - derived_key = RNS.Cryptography.hkdf( - length=self.derived_key_length, - derive_from=shared_key, - salt=self.get_salt(), - context=self.get_context(), - ) + plaintext = migration_decrypt(shared_key, ciphertext) - token = Token(derived_key) - plaintext = token.decrypt(ciphertext) if ratchet_id_receiver: ratchet_id_receiver.latest_ratchet_id = None