mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-20 11:38:52 +00:00
test all error-cases, and some fixups
This commit is contained in:
parent
802f1afde6
commit
a27a5ce581
@ -112,6 +112,7 @@ def verify_signature(public_key, alleged_signature, data):
|
||||
|
||||
:param bytes data: the data which was allegedly signed
|
||||
"""
|
||||
_validate_public_key(public_key)
|
||||
try:
|
||||
public_key.verify(
|
||||
alleged_signature,
|
||||
@ -133,7 +134,7 @@ def _validate_public_key(public_key):
|
||||
"""
|
||||
if not isinstance(public_key, rsa.RSAPublicKey):
|
||||
raise ValueError(
|
||||
"public_key not an RSAPublicKey"
|
||||
"public_key must be an RSAPublicKey"
|
||||
)
|
||||
|
||||
|
||||
@ -144,5 +145,5 @@ def _validate_private_key(private_key):
|
||||
"""
|
||||
if not isinstance(private_key, rsa.RSAPrivateKey):
|
||||
raise ValueError(
|
||||
"private_key not an RSAPrivateKey"
|
||||
"private_key must be an RSAPrivateKey"
|
||||
)
|
||||
|
@ -5,8 +5,14 @@ from base64 import b64decode
|
||||
from binascii import a2b_hex, b2a_hex
|
||||
from os import path
|
||||
|
||||
from allmydata.crypto import aes
|
||||
from allmydata.crypto import ed25519, rsa
|
||||
from allmydata.crypto import (
|
||||
aes,
|
||||
ed25519,
|
||||
rsa,
|
||||
remove_prefix,
|
||||
BadPrefixError
|
||||
)
|
||||
|
||||
|
||||
RESOURCE_DIR = path.join(path.dirname(__file__), 'data')
|
||||
|
||||
@ -219,6 +225,55 @@ class TestRegression(unittest.TestCase):
|
||||
priv_key, pub_key = rsa.create_signing_keypair_from_string(self.RSA_2048_PRIV_KEY)
|
||||
rsa.verify_signature(pub_key, self.RSA_2048_SIG, b'test')
|
||||
|
||||
def test_encrypt_data_not_bytes(self):
|
||||
'''
|
||||
only bytes can be encrypted
|
||||
'''
|
||||
key = '\x00' * 16
|
||||
encryptor = aes.create_encryptor(key)
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
aes.encrypt_data(encryptor, six.text_type("not bytes"))
|
||||
self.assertIn(
|
||||
"was not bytes",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_key_incorrect_size(self):
|
||||
'''
|
||||
only bytes can be encrypted
|
||||
'''
|
||||
key = '\x00' * 12
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
encryptor = aes.create_encryptor(key)
|
||||
self.assertIn(
|
||||
"16 or 32 bytes long",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_iv_not_bytes(self):
|
||||
'''
|
||||
iv must be bytes
|
||||
'''
|
||||
key = '\x00' * 16
|
||||
with self.assertRaises(TypeError) as ctx:
|
||||
encryptor = aes.create_encryptor(key, iv=six.text_type("1234567890abcdef"))
|
||||
self.assertIn(
|
||||
"was not bytes",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_incorrect_iv_size(self):
|
||||
'''
|
||||
iv must be 16 bytes
|
||||
'''
|
||||
key = '\x00' * 16
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
encryptor = aes.create_encryptor(key, iv='\x00' * 3)
|
||||
self.assertIn(
|
||||
"16 bytes long",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
|
||||
class TestEd25519(unittest.TestCase):
|
||||
|
||||
@ -252,6 +307,88 @@ class TestEd25519(unittest.TestCase):
|
||||
ed25519.bytes_from_verifying_key(public_key2),
|
||||
)
|
||||
|
||||
def test_deserialize_private_not_bytes(self):
|
||||
'''
|
||||
serialized key must be bytes
|
||||
'''
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
ed25519.signing_keypair_from_bytes(six.text_type("not bytes"))
|
||||
self.assertIn(
|
||||
"must be bytes",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_deserialize_public_not_bytes(self):
|
||||
'''
|
||||
serialized key must be bytes
|
||||
'''
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
ed25519.verifying_key_from_bytes(six.text_type("not bytes"))
|
||||
self.assertIn(
|
||||
"must be bytes",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_signed_data_not_bytes(self):
|
||||
'''
|
||||
data to sign must be bytes
|
||||
'''
|
||||
priv, pub = ed25519.create_signing_keypair()
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
ed25519.sign_data(priv, six.text_type("not bytes"))
|
||||
self.assertIn(
|
||||
"must be bytes",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_signature_not_bytes(self):
|
||||
'''
|
||||
signature must be bytes
|
||||
'''
|
||||
priv, pub = ed25519.create_signing_keypair()
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
ed25519.verify_signature(pub, six.text_type("not bytes"), b"data")
|
||||
self.assertIn(
|
||||
"must be bytes",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_signature_data_not_bytes(self):
|
||||
'''
|
||||
signature must be bytes
|
||||
'''
|
||||
priv, pub = ed25519.create_signing_keypair()
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
ed25519.verify_signature(pub, b"signature", six.text_type("not bytes"))
|
||||
self.assertIn(
|
||||
"must be bytes",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_sign_invalid_pubkey(self):
|
||||
'''
|
||||
pubkey must be correct
|
||||
'''
|
||||
priv, pub = ed25519.create_signing_keypair()
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
ed25519.sign_data(object(), b"data")
|
||||
self.assertIn(
|
||||
"must be an Ed25519PrivateKey",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_verify_invalid_pubkey(self):
|
||||
'''
|
||||
pubkey must be correct
|
||||
'''
|
||||
priv, pub = ed25519.create_signing_keypair()
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
ed25519.verify_signature(object(), b"signature", b"data")
|
||||
self.assertIn(
|
||||
"must be an Ed25519PublicKey",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
|
||||
class TestRsa(unittest.TestCase):
|
||||
|
||||
@ -279,3 +416,40 @@ class TestRsa(unittest.TestCase):
|
||||
# ..and a failed way
|
||||
with self.assertRaises(rsa.BadSignature):
|
||||
rsa.verify_signature(pub_key, sig1, data_to_sign + b"more")
|
||||
|
||||
def test_sign_invalid_pubkey(self):
|
||||
'''
|
||||
pubkey must be correct
|
||||
'''
|
||||
priv, pub = rsa.create_signing_keypair(1024)
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
rsa.sign_data(object(), b"data")
|
||||
self.assertIn(
|
||||
"must be an RSAPrivateKey",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
def test_verify_invalid_pubkey(self):
|
||||
'''
|
||||
pubkey must be correct
|
||||
'''
|
||||
priv, pub = rsa.create_signing_keypair(1024)
|
||||
with self.assertRaises(ValueError) as ctx:
|
||||
rsa.verify_signature(object(), b"signature", b"data")
|
||||
self.assertIn(
|
||||
"must be an RSAPublicKey",
|
||||
str(ctx.exception)
|
||||
)
|
||||
|
||||
|
||||
class TestUtil(unittest.TestCase):
|
||||
|
||||
def test_remove_prefix_good(self):
|
||||
self.assertEquals(
|
||||
remove_prefix(b"foobar", b"foo"),
|
||||
b"bar"
|
||||
)
|
||||
|
||||
def test_remove_prefix_bad(self):
|
||||
with self.assertRaises(BadPrefixError):
|
||||
remove_prefix(b"foobar", b"bar")
|
||||
|
@ -1012,6 +1012,27 @@ class Signatures(SyncTestCase):
|
||||
self.failUnlessRaises(UnknownKeyError,
|
||||
unsign_from_foolscap, (bad_msg, sig, "v999-key"))
|
||||
|
||||
def test_unsigned_announcement(self):
|
||||
from allmydata.crypto import ed25519
|
||||
ed25519.verifying_key_from_string("pub-v0-wodst6ly4f7i7akt2nxizsmmy2rlmer6apltl56zctn67wfyu5tq")
|
||||
mock_tub = Mock()
|
||||
ic = IntroducerClient(
|
||||
mock_tub,
|
||||
u"pb://",
|
||||
u"fake_nick",
|
||||
"0.0.0",
|
||||
"1.2.3",
|
||||
{},
|
||||
(0, u"i am a nonce"),
|
||||
"invalid",
|
||||
)
|
||||
self.assertEqual(0, ic._debug_counts["inbound_announcement"])
|
||||
ic.got_announcements([
|
||||
("message", "v0-aaaaaaa", "v0-wodst6ly4f7i7akt2nxizsmmy2rlmer6apltl56zctn67wfyu5tq")
|
||||
])
|
||||
# we should have rejected this announcement due to a bad signature
|
||||
self.assertEqual(0, ic._debug_counts["inbound_announcement"])
|
||||
|
||||
|
||||
# add tests of StorageFarmBroker: if it receives duplicate announcements, it
|
||||
# should leave the Reconnector in place, also if it receives
|
||||
|
@ -788,6 +788,14 @@ class FileUtil(ReallyEqualMixin, unittest.TestCase):
|
||||
self.failUnlessFalse(symlinkinfo.isfile)
|
||||
self.failUnlessFalse(symlinkinfo.isdir)
|
||||
|
||||
def test_encrypted_tempfile(self):
|
||||
from allmydata.util.fileutil import EncryptedTemporaryFile
|
||||
f = EncryptedTemporaryFile()
|
||||
f.write("foobar")
|
||||
f.close()
|
||||
print(f.file)
|
||||
print(dir(f.file))
|
||||
|
||||
|
||||
class PollMixinTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
@ -112,7 +112,7 @@ class EncryptedTemporaryFile(object):
|
||||
cipher = aes.create_encryptor(self.key, iv)
|
||||
# this is just to advance the counter
|
||||
aes.encrypt_data(cipher, "\x00" * offset_small)
|
||||
return aes.encrypt_data(ciper, data)
|
||||
return aes.encrypt_data(cipher, data)
|
||||
|
||||
def close(self):
|
||||
self.file.close()
|
||||
|
Loading…
Reference in New Issue
Block a user