mirror of
https://github.com/markqvist/Reticulum.git
synced 2025-05-07 00:45:54 -04:00
Added AES_256_CBC support to Token
This commit is contained in:
parent
c4cafed6aa
commit
ef30d21b58
1 changed files with 35 additions and 34 deletions
|
@ -33,7 +33,9 @@ import time
|
||||||
|
|
||||||
from RNS.Cryptography import HMAC
|
from RNS.Cryptography import HMAC
|
||||||
from RNS.Cryptography import PKCS7
|
from RNS.Cryptography import PKCS7
|
||||||
|
from RNS.Cryptography import AES
|
||||||
from RNS.Cryptography.AES import AES_128_CBC
|
from RNS.Cryptography.AES import AES_128_CBC
|
||||||
|
from RNS.Cryptography.AES import AES_256_CBC
|
||||||
|
|
||||||
class Token():
|
class Token():
|
||||||
"""
|
"""
|
||||||
|
@ -48,45 +50,50 @@ class Token():
|
||||||
TOKEN_OVERHEAD = 48 # Bytes
|
TOKEN_OVERHEAD = 48 # Bytes
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def generate_key():
|
def generate_key(mode=AES_128_CBC):
|
||||||
return os.urandom(32)
|
if mode == AES_128_CBC: return os.urandom(32)
|
||||||
|
elif mode == AES_256_CBC: return os.urandom(64)
|
||||||
|
else: raise TypeError(f"Invalid token mode: {mode}")
|
||||||
|
|
||||||
def __init__(self, key = None):
|
def __init__(self, key=None, mode=AES):
|
||||||
if key == None:
|
if key == None: raise ValueError("Token key cannot be None")
|
||||||
raise ValueError("Token key cannot be None")
|
|
||||||
|
|
||||||
if len(key) != 32:
|
if mode == AES:
|
||||||
raise ValueError("Token key must be 32 bytes, not "+str(len(key)))
|
if len(key) == 32:
|
||||||
|
self.mode = AES_128_CBC
|
||||||
self._signing_key = key[:16]
|
self._signing_key = key[:16]
|
||||||
self._encryption_key = key[16:]
|
self._encryption_key = key[16:]
|
||||||
|
|
||||||
|
elif len(key) == 64:
|
||||||
|
self.mode = AES_256_CBC
|
||||||
|
self._signing_key = key[:32]
|
||||||
|
self._encryption_key = key[32:]
|
||||||
|
|
||||||
|
else: raise ValueError("Token key must be 128 or 256 bits, not "+str(len(key)*8))
|
||||||
|
|
||||||
|
else: raise TypeError(f"Invalid token mode: {mode}")
|
||||||
|
|
||||||
|
|
||||||
def verify_hmac(self, token):
|
def verify_hmac(self, token):
|
||||||
if len(token) <= 32:
|
if len(token) <= 32: raise ValueError("Cannot verify HMAC on token of only "+str(len(token))+" bytes")
|
||||||
raise ValueError("Cannot verify HMAC on token of only "+str(len(token))+" bytes")
|
|
||||||
else:
|
else:
|
||||||
received_hmac = token[-32:]
|
received_hmac = token[-32:]
|
||||||
expected_hmac = HMAC.new(self._signing_key, token[:-32]).digest()
|
expected_hmac = HMAC.new(self._signing_key, token[:-32]).digest()
|
||||||
|
|
||||||
if received_hmac == expected_hmac:
|
if received_hmac == expected_hmac: return True
|
||||||
return True
|
else: return False
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def encrypt(self, data = None):
|
def encrypt(self, data = None):
|
||||||
iv = os.urandom(16)
|
iv = os.urandom(16)
|
||||||
current_time = int(time.time())
|
current_time = int(time.time())
|
||||||
|
|
||||||
if not isinstance(data, bytes):
|
if not isinstance(data, bytes): raise TypeError("Token plaintext input must be bytes")
|
||||||
raise TypeError("Token plaintext input must be bytes")
|
|
||||||
|
|
||||||
ciphertext = AES_128_CBC.encrypt(
|
ciphertext = self.mode.encrypt(
|
||||||
plaintext = PKCS7.pad(data),
|
plaintext = PKCS7.pad(data),
|
||||||
key = self._encryption_key,
|
key = self._encryption_key,
|
||||||
iv = iv,
|
iv = iv)
|
||||||
)
|
|
||||||
|
|
||||||
signed_parts = iv+ciphertext
|
signed_parts = iv+ciphertext
|
||||||
|
|
||||||
|
@ -94,25 +101,19 @@ class Token():
|
||||||
|
|
||||||
|
|
||||||
def decrypt(self, token = None):
|
def decrypt(self, token = None):
|
||||||
if not isinstance(token, bytes):
|
if not isinstance(token, bytes): raise TypeError("Token must be bytes")
|
||||||
raise TypeError("Token must be bytes")
|
if not self.verify_hmac(token): raise ValueError("Token HMAC was invalid")
|
||||||
|
|
||||||
if not self.verify_hmac(token):
|
|
||||||
raise ValueError("Token HMAC was invalid")
|
|
||||||
|
|
||||||
iv = token[:16]
|
iv = token[:16]
|
||||||
ciphertext = token[16:-32]
|
ciphertext = token[16:-32]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
plaintext = PKCS7.unpad(
|
plaintext = PKCS7.unpad(
|
||||||
AES_128_CBC.decrypt(
|
self.mode.decrypt(
|
||||||
ciphertext,
|
ciphertext = ciphertext,
|
||||||
self._encryption_key,
|
key = self._encryption_key,
|
||||||
iv,
|
iv = iv))
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
return plaintext
|
return plaintext
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e: raise ValueError("Could not decrypt token")
|
||||||
raise ValueError("Could not decrypt token")
|
|
Loading…
Add table
Add a link
Reference in a new issue