mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-22 11:45:15 +00:00
provide IEncryptor and IDecryptor interfaces
This commit is contained in:
parent
ac583ebc8d
commit
56cf188c90
@ -17,27 +17,52 @@ from cryptography.hazmat.primitives.ciphers import (
|
||||
modes,
|
||||
CipherContext,
|
||||
)
|
||||
from zope.interface import (
|
||||
Interface,
|
||||
directlyProvides,
|
||||
providedBy,
|
||||
)
|
||||
|
||||
|
||||
DEFAULT_IV = b'\x00' * 16
|
||||
|
||||
|
||||
class IEncryptor(Interface):
|
||||
"""
|
||||
An object which can encrypt data.
|
||||
|
||||
Create one using :func:`create_encryptor` and use it with
|
||||
:func:`encrypt_data`
|
||||
"""
|
||||
|
||||
|
||||
class IDecryptor(Interface):
|
||||
"""
|
||||
An object which can decrypt data.
|
||||
|
||||
Create one using :func:`create_decryptor` and use it with
|
||||
:func:`decrypt_data`
|
||||
"""
|
||||
|
||||
|
||||
def create_encryptor(key, iv=None):
|
||||
"""
|
||||
Create and return a new object which can do AES encryptions with
|
||||
the given key and initialization vector (IV). The default IV is 16
|
||||
zero-bytes.
|
||||
|
||||
The returned object is suitable for use with `encrypt_data`
|
||||
:param bytes key: the key bytes, should be 128 or 256 bits (16 or
|
||||
32 bytes)
|
||||
|
||||
:param bytes iv: the Initialization Vector consisting of 16 bytes,
|
||||
or None for the default (which is 16 zero bytes)
|
||||
|
||||
:returns: an object suitable for use with :func:`encrypt_data` (an
|
||||
:class:`IEncryptor`)
|
||||
"""
|
||||
key = _validate_key(key)
|
||||
iv = _validate_iv(iv)
|
||||
cipher = Cipher(
|
||||
algorithms.AES(key),
|
||||
modes.CTR(iv),
|
||||
backend=default_backend()
|
||||
)
|
||||
return cipher.encryptor()
|
||||
cryptor = _create_cryptor(key, iv)
|
||||
directlyProvides(cryptor, IEncryptor)
|
||||
return cryptor
|
||||
|
||||
|
||||
def encrypt_data(encryptor, plaintext):
|
||||
@ -51,26 +76,80 @@ def encrypt_data(encryptor, plaintext):
|
||||
:returns: bytes of ciphertext
|
||||
"""
|
||||
|
||||
_validate_encryptor(encryptor)
|
||||
_validate_cryptor(encryptor, encrypt=True)
|
||||
if not isinstance(plaintext, six.binary_type):
|
||||
raise ValueError('Plaintext was not bytes')
|
||||
raise ValueError('Plaintext must be bytes')
|
||||
|
||||
return encryptor.update(plaintext)
|
||||
|
||||
|
||||
create_decryptor = create_encryptor
|
||||
|
||||
|
||||
decrypt_data = encrypt_data
|
||||
|
||||
|
||||
def _validate_encryptor(encryptor):
|
||||
def create_decryptor(key, iv=None):
|
||||
"""
|
||||
raise ValueError if `encryptor` is not a valid object
|
||||
Create and return a new object which can do AES decryptions with
|
||||
the given key and initialization vector (IV). The default IV is 16
|
||||
zero-bytes.
|
||||
|
||||
:param bytes key: the key bytes, should be 128 or 256 bits (16 or
|
||||
32 bytes)
|
||||
|
||||
:param bytes iv: the Initialization Vector consisting of 16 bytes,
|
||||
or None for the default (which is 16 zero bytes)
|
||||
|
||||
:returns: an object suitable for use with :func:`decrypt_data` (an
|
||||
:class:`IDecryptor`)
|
||||
"""
|
||||
if not isinstance(encryptor, CipherContext):
|
||||
cryptor = _create_cryptor(key, iv)
|
||||
directlyProvides(cryptor, IDecryptor)
|
||||
return cryptor
|
||||
|
||||
|
||||
def decrypt_data(decryptor, plaintext):
|
||||
"""
|
||||
AES-decrypt `plaintext` with the given `decryptor`.
|
||||
|
||||
:param decryptor: an instance previously returned from `create_decryptor`
|
||||
|
||||
:param bytes plaintext: the data to decrypt
|
||||
|
||||
:returns: bytes of ciphertext
|
||||
"""
|
||||
|
||||
_validate_cryptor(decryptor, encrypt=False)
|
||||
if not isinstance(plaintext, six.binary_type):
|
||||
raise ValueError('Plaintext must be bytes')
|
||||
|
||||
return decryptor.update(plaintext)
|
||||
|
||||
|
||||
def _create_cryptor(key, iv):
|
||||
"""
|
||||
Internal helper.
|
||||
|
||||
See :func:`create_encryptor` or :func:`create_decryptor`.
|
||||
"""
|
||||
key = _validate_key(key)
|
||||
iv = _validate_iv(iv)
|
||||
cipher = Cipher(
|
||||
algorithms.AES(key),
|
||||
modes.CTR(iv),
|
||||
backend=default_backend()
|
||||
)
|
||||
return cipher.encryptor()
|
||||
|
||||
|
||||
def _validate_cryptor(cryptor, encrypt=True):
|
||||
"""
|
||||
raise ValueError if `cryptor` is not a valid object
|
||||
"""
|
||||
klass = IEncryptor if encrypt else IDecryptor
|
||||
name = "encryptor" if encrypt else "decryptor"
|
||||
if not isinstance(cryptor, CipherContext):
|
||||
raise ValueError(
|
||||
"'encryptor' must be a CipherContext"
|
||||
"'{}' must be a CipherContext".format(name)
|
||||
)
|
||||
if not klass.providedBy(cryptor):
|
||||
raise ValueError(
|
||||
"'{}' must be created with create_{}()".format(name, name)
|
||||
)
|
||||
|
||||
|
||||
@ -79,9 +158,9 @@ def _validate_key(key):
|
||||
confirm `key` is suitable for AES encryption, or raise ValueError
|
||||
"""
|
||||
if not isinstance(key, six.binary_type):
|
||||
raise TypeError('Key was not bytes')
|
||||
raise TypeError('Key must be bytes')
|
||||
if len(key) not in (16, 32):
|
||||
raise ValueError('Key was not 16 or 32 bytes long')
|
||||
raise ValueError('Key must be 16 or 32 bytes long')
|
||||
return key
|
||||
|
||||
|
||||
@ -94,7 +173,7 @@ def _validate_iv(iv):
|
||||
if iv is None:
|
||||
return DEFAULT_IV
|
||||
if not isinstance(iv, six.binary_type):
|
||||
raise TypeError('IV was not bytes')
|
||||
raise TypeError('IV must be bytes')
|
||||
if len(iv) != 16:
|
||||
raise ValueError('IV was not 16 bytes long')
|
||||
raise ValueError('IV must be 16 bytes long')
|
||||
return iv
|
||||
|
@ -54,21 +54,21 @@ class TestRegression(unittest.TestCase):
|
||||
This was the old startup test run at import time in `pycryptopp.cipher.aes`.
|
||||
"""
|
||||
enc0 = b"dc95c078a2408989ad48a21492842087530f8afbc74536b9a963b4f1c4cb738b"
|
||||
cryptor = aes.create_encryptor(key=b"\x00" * 32)
|
||||
cryptor = aes.create_decryptor(key=b"\x00" * 32)
|
||||
ct = aes.decrypt_data(cryptor, b"\x00" * 32)
|
||||
self.assertEqual(enc0, b2a_hex(ct))
|
||||
|
||||
cryptor = aes.create_encryptor(key=b"\x00" * 32)
|
||||
cryptor = aes.create_decryptor(key=b"\x00" * 32)
|
||||
ct1 = aes.decrypt_data(cryptor, b"\x00" * 15)
|
||||
ct2 = aes.decrypt_data(cryptor, b"\x00" * 17)
|
||||
self.assertEqual(enc0, b2a_hex(ct1+ct2))
|
||||
|
||||
enc0 = b"66e94bd4ef8a2c3b884cfa59ca342b2e"
|
||||
cryptor = aes.create_encryptor(key=b"\x00" * 16)
|
||||
cryptor = aes.create_decryptor(key=b"\x00" * 16)
|
||||
ct = aes.decrypt_data(cryptor, b"\x00" * 16)
|
||||
self.assertEqual(enc0, b2a_hex(ct))
|
||||
|
||||
cryptor = aes.create_encryptor(key=b"\x00" * 16)
|
||||
cryptor = aes.create_decryptor(key=b"\x00" * 16)
|
||||
ct1 = aes.decrypt_data(cryptor, b"\x00" * 8)
|
||||
ct2 = aes.decrypt_data(cryptor, b"\x00" * 8)
|
||||
self.assertEqual(enc0, b2a_hex(ct1+ct2))
|
||||
@ -107,7 +107,7 @@ class TestRegression(unittest.TestCase):
|
||||
plaintext = b'test'
|
||||
expected_ciphertext = b'\x7fEK\\'
|
||||
|
||||
k = aes.create_encryptor(self.AES_KEY)
|
||||
k = aes.create_decryptor(self.AES_KEY)
|
||||
ciphertext = aes.decrypt_data(k, plaintext)
|
||||
|
||||
self.assertEqual(ciphertext, expected_ciphertext)
|
||||
@ -129,7 +129,7 @@ class TestRegression(unittest.TestCase):
|
||||
b'\x1f\xa1|\xd2$E\xb5\xe7\x9d\xae\xd1\x1f)\xe4\xc7\x83\xb8\xd5|dHhU\xc8\x9a\xb1\x10\xed'
|
||||
b'\xd1\xe7|\xd1')
|
||||
|
||||
k = aes.create_encryptor(self.AES_KEY)
|
||||
k = aes.create_decryptor(self.AES_KEY)
|
||||
ciphertext = aes.decrypt_data(k, plaintext)
|
||||
|
||||
self.assertEqual(ciphertext, expected_ciphertext)
|
||||
@ -148,7 +148,7 @@ class TestRegression(unittest.TestCase):
|
||||
plaintext = b'test'
|
||||
expected_ciphertext = b'\x82\x0e\rt'
|
||||
|
||||
k = aes.create_encryptor(self.AES_KEY, iv=self.IV)
|
||||
k = aes.create_decryptor(self.AES_KEY, iv=self.IV)
|
||||
ciphertext = aes.decrypt_data(k, plaintext)
|
||||
|
||||
self.assertEqual(ciphertext, expected_ciphertext)
|
||||
@ -170,7 +170,7 @@ class TestRegression(unittest.TestCase):
|
||||
b'\x97a\xdc\x100?\xf5L\x9f\xd9\xeeO\x98\xda\xf5g\x93\xa7q\xe1\xb1~\xf8\x1b\xe8[\\s'
|
||||
b'\x144$\x86\xeaC^f')
|
||||
|
||||
k = aes.create_encryptor(self.AES_KEY, iv=self.IV)
|
||||
k = aes.create_decryptor(self.AES_KEY, iv=self.IV)
|
||||
ciphertext = aes.decrypt_data(k, plaintext)
|
||||
|
||||
self.assertEqual(ciphertext, expected_ciphertext)
|
||||
@ -230,7 +230,7 @@ class TestRegression(unittest.TestCase):
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
aes.encrypt_data(encryptor, u"not bytes")
|
||||
self.assertIn(
|
||||
"was not bytes",
|
||||
"must be bytes",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
@ -254,7 +254,7 @@ class TestRegression(unittest.TestCase):
|
||||
with self.assertRaises(TypeError) as ctx:
|
||||
aes.create_encryptor(key, iv=u"1234567890abcdef")
|
||||
self.assertIn(
|
||||
"was not bytes",
|
||||
"must be bytes",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
|
@ -189,12 +189,12 @@ class AssistedUpload(unittest.TestCase):
|
||||
|
||||
key = hashutil.convergence_hash(k, n, segsize, DATA, "test convergence string")
|
||||
assert len(key) == 16
|
||||
encryptor = aes.create_encryptor(key)
|
||||
decryptor = aes.create_decryptor(key)
|
||||
SI = hashutil.storage_index_hash(key)
|
||||
SI_s = si_b2a(SI)
|
||||
encfile = os.path.join(self.basedir, "CHK_encoding", SI_s)
|
||||
f = open(encfile, "wb")
|
||||
f.write(aes.decrypt_data(encryptor, DATA))
|
||||
f.write(aes.decrypt_data(decryptor, DATA))
|
||||
f.close()
|
||||
|
||||
u = upload.Uploader(self.helper_furl)
|
||||
|
Loading…
x
Reference in New Issue
Block a user