Cleanups.

This commit is contained in:
Itamar Turner-Trauring 2022-03-25 15:46:42 -04:00
parent 23ce581405
commit 638154b2ad
2 changed files with 17 additions and 25 deletions

View File

@ -81,7 +81,11 @@ class _TLSContextFactory(CertificateOptions):
Originally implemented as part of Foolscap.
"""
def getContext(self, expected_spki_hash: bytes) -> SSL.Context:
def __init__(self, expected_spki_hash: bytes):
self.expected_spki_hash = expected_spki_hash
CertificateOptions.__init__(self)
def getContext(self) -> SSL.Context:
def always_validate(conn, cert, errno, depth, preverify_ok):
# This function is called to validate the certificate received by
# the other end. OpenSSL calls it multiple times, each time it
@ -105,15 +109,12 @@ class _TLSContextFactory(CertificateOptions):
)
# TODO can we do this once instead of multiple times?
if errno in things_are_ok and timing_safe_compare(
get_spki_hash(cert.to_cryptography()), expected_spki_hash
get_spki_hash(cert.to_cryptography()), self.expected_spki_hash
):
return 1
# TODO: log the details of the error, because otherwise they get
# lost in the PyOpenSSL exception that will eventually be raised
# (possibly OpenSSL.SSL.Error: certificate verify failed)
# I think that X509_V_ERR_CERT_SIGNATURE_FAILURE is the most
# obvious sign of hostile attack.
return 0
ctx = CertificateOptions.getContext(self)
@ -128,13 +129,8 @@ class _TLSContextFactory(CertificateOptions):
@attr.s
class _StorageClientHTTPSPolicy:
"""
A HTTPS policy that:
1. Makes sure the SPKI hash of the certificate matches a known hash (NEEDS TEST).
2. The certificate hasn't expired. (NEEDS TEST)
3. The server has a private key that matches the certificate (NEEDS TEST).
I.e. pinning-based validation.
A HTTPS policy that ensures the SPKI hash of the public key matches a known
hash, i.e. pinning-based validation.
"""
expected_spki_hash = attr.ib(type=bytes)
@ -146,7 +142,7 @@ class _StorageClientHTTPSPolicy:
# IOpenSSLClientConnectionCreator
def clientConnectionForTLS(self, tlsProtocol):
connection = SSL.Connection(
_TLSContextFactory().getContext(self.expected_spki_hash), None
_TLSContextFactory(self.expected_spki_hash).getContext(), None
)
connection.set_app_data(tlsProtocol)
return connection

View File

@ -90,15 +90,6 @@ class PinningHTTPSValidation(AsyncTestCase):
https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate
"""
# NEEDED TESTS
#
# Failure cases:
# DONE Self-signed cert has wrong hash. Cert+private key match each other.
# Self-signed cert has correct hash, is valid, but expired.
# Anonymous server, without certificate.
# Cert has correct hash, but is not self-signed.
# Certificate that isn't valid yet (i.e. from the future)? Or is that silly.
def to_file(self, key_or_cert) -> str:
"""
Write the given key or cert to a temporary file on disk, return the
@ -224,7 +215,8 @@ class PinningHTTPSValidation(AsyncTestCase):
async def test_server_certificate_expired(self):
"""
If the server's certificate has expired, the request to the server
fails even if the hash matches the one the client expects.
succeeds if the hash matches the one the client expects; expiration has
no effect.
"""
private_key = self.generate_private_key()
certificate = self.generate_certificate(private_key, expires_days=-10)
@ -232,5 +224,9 @@ class PinningHTTPSValidation(AsyncTestCase):
async with self.listen(
self.to_file(private_key), self.to_file(certificate)
) as url:
with self.assertRaises(ResponseNeverReceived):
await self.request(url, certificate)
response = await self.request(url, certificate)
self.assertEqual(await response.content(), b"YOYODYNE")
# TODO an obvious attack is a private key that doesn't match the
# certificate... but OpenSSL (quite rightly) won't let you listen with that
# so I don't know how to test that!