Sketch of first HTTPS client logic test.

This commit is contained in:
Itamar Turner-Trauring 2022-03-25 13:49:11 -04:00
parent 6f86675766
commit 712f4f1385

View File

@ -6,14 +6,30 @@ server authentication logic, which may one day apply outside of HTTP Storage
Protocol.
"""
from cryptography.x509 import load_pem_x509_certificate
import datetime
from functools import wraps
from contextlib import asynccontextmanager
from .common import SyncTestCase
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization, hashes
from twisted.internet.endpoints import quoteStringArgument, serverFromString
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.web.server import Site
from twisted.web.static import Data
from twisted.web.client import Agent, HTTPConnectionPool
from treq.client import HTTPClient
from .common import SyncTestCase, AsyncTestCase
from ..storage.http_common import get_spki_hash
from ..storage.http_client import _StorageClientHTTPSPolicy
class HTTPFurlTests(SyncTestCase):
"""Tests for HTTP furls."""
class HTTPSFurlTests(SyncTestCase):
"""Tests for HTTPS furls."""
def test_spki_hash(self):
"""The output of ``get_spki_hash()`` matches the semantics of RFC 7469.
@ -48,5 +64,139 @@ PudP0aniyq/gbPhxq0tYF628IBvhDAnr/2kqEmVF2TDr2Sm/Y3PDBuPY6MeIxjnr
ox5zO3LrQmQw11OaIAs2/kviKAoKTFFxeyYcpS5RuKNDZfHQCXlLwt9bySxG
-----END CERTIFICATE-----
"""
certificate = load_pem_x509_certificate(certificate_text)
certificate = x509.load_pem_x509_certificate(certificate_text)
self.assertEqual(get_spki_hash(certificate), expected_hash)
def async_to_deferred(f):
"""
Wrap an async function to return a Deferred instead.
"""
@wraps(f)
def not_async(*args, **kwargs):
return Deferred.fromCoroutine(f(*args, **kwargs))
return not_async
class PinningHTTPSValidation(AsyncTestCase):
"""
Test client-side validation logic of HTTPS certificates that uses
Tahoe-LAFS's pinning-based scheme instead of the traditional certificate
authority scheme.
https://cryptography.io/en/latest/x509/tutorial/#creating-a-self-signed-certificate
"""
# NEEDED TESTS
#
# Success case
#
# Failure cases:
# Self-signed cert has wrong hash. Cert+private key match each other.
# Self-signed cert has correct hash, but doesn't match private key, so is invalid cert.
# Self-signed cert has correct hash, is valid, but expired.
# Anonymous server, without certificate.
# Cert has correct hash, but is not self-signed.
# Certificate that isn't valid yet (i.e. from the future)? Or is that silly.
def to_file(self, key_or_cert) -> str:
"""
Write the given key or cert to a temporary file on disk, return the
path.
"""
path = self.mktemp()
with open(path, "wb") as f:
if isinstance(key_or_cert, x509.Certificate):
data = key_or_cert.public_bytes(serialization.Encoding.PEM)
else:
data = key_or_cert.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)
f.write(data)
return path
def generate_private_key(self):
"""Create a RSA private key."""
return rsa.generate_private_key(public_exponent=65537, key_size=2048)
def generate_certificate(self, private_key, expires_days: int):
"""Generate a certificate from a RSA private key."""
subject = issuer = x509.Name(
[x509.NameAttribute(NameOID.ORGANIZATION_NAME, "Yoyodyne")]
)
return (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(private_key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(datetime.datetime.utcnow())
.not_valid_after(
datetime.datetime.utcnow() + datetime.timedelta(days=expires_days)
)
.add_extension(
x509.SubjectAlternativeName([x509.DNSName("localhost")]),
critical=False,
# Sign our certificate with our private key
)
.sign(private_key, hashes.SHA256())
)
@asynccontextmanager
async def listen(self, private_key_path, cert_path) -> str:
"""
Context manager that runs a HTTPS server with the given private key
and certificate.
Returns a URL that will connect to the server.
"""
endpoint = serverFromString(
reactor,
"ssl:privateKey={}:certKey={}:port=0:interface=127.0.0.1".format(
quoteStringArgument(str(private_key_path)),
quoteStringArgument(str(cert_path)),
),
)
root = Data(b"YOYODYNE", "text/plain")
root.isLeaf = True
listening_port = await endpoint.listen(Site(root))
try:
yield f"https://127.0.0.1:{listening_port.getHost().port}/"
finally:
await listening_port.stopListening()
def request(self, url: str, expected_certificate: x509.Certificate):
"""
Send a HTTPS request to the given URL, ensuring that the given
certificate is the one used via SPKI-hash-based pinning comparison.
"""
# No persistent connections, so we don't have dirty reactor at the end
# of the test.
treq_client = HTTPClient(
Agent(
reactor,
_StorageClientHTTPSPolicy(
expected_spki_hash=get_spki_hash(expected_certificate)
),
pool=HTTPConnectionPool(reactor, persistent=False),
)
)
return treq_client.get(url)
@async_to_deferred
async def test_success(self):
"""
If all conditions are met, a TLS client using the Tahoe-LAFS policy can
connect to the server.
"""
private_key = self.generate_private_key()
certificate = self.generate_certificate(private_key, 10)
async with self.listen(
self.to_file(private_key), self.to_file(certificate)
) as url:
response = await self.request(url, certificate)
self.assertEqual(await response.content(), b"YOYODYNE")