Implemented compatibility handling for AES-256 migration

This commit is contained in:
Mark Qvist 2025-05-06 12:08:17 +02:00
parent 5dc8cdc6dc
commit d7791c60e2
4 changed files with 64 additions and 26 deletions

View file

@ -37,6 +37,8 @@ from RNS.Cryptography import AES
from RNS.Cryptography.AES import AES_128_CBC
from RNS.Cryptography.AES import AES_256_CBC
# import RNS # TODO: Remove
class Token():
"""
This class provides a slightly modified implementation of the Fernet spec
@ -64,11 +66,23 @@ class Token():
self._signing_key = key[:16]
self._encryption_key = key[16:]
####################################################################
self.mode_legacy = AES_128_CBC # TODO: Remove after migration
self._signing_key_128 = key[:16] # TODO: Remove after migration
self._encryption_key_128 = key[16:] # TODO: Remove after migration
####################################################################
elif len(key) == 64:
self.mode = AES_256_CBC
self._signing_key = key[:32]
self._encryption_key = key[32:]
####################################################################
self.mode_legacy = AES_128_CBC # TODO: Remove after migration
self._signing_key_128 = key[:16] # TODO: Remove after migration
self._encryption_key_128 = key[16:32] # TODO: Remove after migration
####################################################################
else: raise ValueError("Token key must be 128 or 256 bits, not "+str(len(key)*8))
else: raise TypeError(f"Invalid token mode: {mode}")
@ -79,16 +93,17 @@ class Token():
else:
received_hmac = token[-32:]
expected_hmac = HMAC.new(self._signing_key, token[:-32]).digest()
expected_hmac_128 = HMAC.new(self._signing_key_128, token[:-32]).digest() # TODO: Remove after migration
if received_hmac == expected_hmac: return True
# TODO: Reset after migration
# if received_hmac == expected_hmac: return True
if received_hmac == expected_hmac or received_hmac == expected_hmac_128: return True
else: return False
def encrypt(self, data = None):
iv = os.urandom(16)
current_time = int(time.time())
if not isinstance(data, bytes): raise TypeError("Token plaintext input must be bytes")
iv = os.urandom(16)
ciphertext = self.mode.encrypt(
plaintext = PKCS7.pad(data),
@ -108,12 +123,31 @@ class Token():
ciphertext = token[16:-32]
try:
plaintext = PKCS7.unpad(
self.mode.decrypt(
ciphertext = ciphertext,
key = self._encryption_key,
iv = iv))
try:
# RNS.log(f"Trying decryption with {self.mode}", RNS.LOG_DEBUG) # TODO: Remove
plaintext = PKCS7.unpad(
self.mode.decrypt(
ciphertext = ciphertext,
key = self._encryption_key,
iv = iv))
return plaintext
# RNS.log(f"Decrypted packet with {self.mode}", RNS.LOG_DEBUG) # TODO: Remove
return plaintext
except Exception as e: raise ValueError("Could not decrypt token")
# TODO: Remove after migration ############################
except Exception as e:
# RNS.log(f"{self.mode} decryption failed", RNS.LOG_DEBUG) # TODO: Remove
# RNS.log(f"Trying decryption with {self.mode_legacy}", RNS.LOG_DEBUG) # TODO: Remove
plaintext = PKCS7.unpad(
self.mode_legacy.decrypt(
ciphertext = ciphertext,
key = self._encryption_key_128,
iv = iv))
# RNS.log(f"Decrypted packet with {self.mode_legacy}", RNS.LOG_DEBUG) # TODO: Remove
return plaintext
###########################################################
except Exception as e:
RNS.trace_exception(e) # TODO: Remove after migration
raise ValueError("Could not decrypt token")