Test for server-side secret handling.

This commit is contained in:
Itamar Turner-Trauring 2021-12-16 11:17:32 -05:00
parent 87fa9ac2a8
commit da52a9aede

View File

@ -21,10 +21,14 @@ from twisted.trial.unittest import TestCase
from twisted.internet.defer import inlineCallbacks from twisted.internet.defer import inlineCallbacks
from treq.testing import StubTreq from treq.testing import StubTreq
from klein import Klein
from hyperlink import DecodedURL from hyperlink import DecodedURL
from ..storage.server import StorageServer from ..storage.server import StorageServer
from ..storage.http_server import HTTPServer, _extract_secrets, Secrets, ClientSecretsException from ..storage.http_server import (
HTTPServer, _extract_secrets, Secrets, ClientSecretsException,
_authorized_route,
)
from ..storage.http_client import StorageClient, ClientException from ..storage.http_client import StorageClient, ClientException
@ -98,22 +102,80 @@ class ExtractSecretsTests(TestCase):
_extract_secrets(["lease-renew-secret x"], {Secrets.LEASE_RENEW}) _extract_secrets(["lease-renew-secret x"], {Secrets.LEASE_RENEW})
class HTTPTests(TestCase): SWISSNUM_FOR_TEST = b"abcd"
class TestApp(object):
"""HTTP API for testing purposes."""
_app = Klein()
_swissnum = SWISSNUM_FOR_TEST # Match what the test client is using
@_authorized_route(_app, {Secrets.UPLOAD}, "/upload_secret", methods=["GET"])
def validate_upload_secret(self, request, authorization):
if authorization == {Secrets.UPLOAD: b"abc"}:
return "OK"
else:
return "BAD: {}".format(authorization)
class RoutingTests(TestCase):
""" """
Tests of HTTP client talking to the HTTP server. Tests for the HTTP routing infrastructure.
"""
def setUp(self):
self._http_server = TestApp()
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST,
treq=StubTreq(self._http_server._app.resource()),
)
@inlineCallbacks
def test_authorization_enforcement(self):
"""
The requirement for secrets is enforced; if they are not given, a 400
response code is returned.
"""
secret = b"abc"
# Without secret, get a 400 error.
response = yield self.client._request(
"GET", "http://127.0.0.1/upload_secret", {}
)
self.assertEqual(response.code, 400)
# With secret, we're good.
response = yield self.client._request(
"GET", "http://127.0.0.1/upload_secret", {Secrets.UPLOAD, b"abc"}
)
self.assertEqual(response.code, 200)
def setup_http_test(self):
"""
Setup HTTP tests; call from ``setUp``.
"""
if PY2:
raise SkipTest("Not going to bother supporting Python 2")
self.storage_server = StorageServer(self.mktemp(), b"\x00" * 20)
# TODO what should the swissnum _actually_ be?
self._http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST)
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST,
treq=StubTreq(self._http_server.get_resource()),
)
class GenericHTTPAPITests(TestCase):
"""
Tests of HTTP client talking to the HTTP server, for generic HTTP API
endpoints and concerns.
""" """
def setUp(self): def setUp(self):
if PY2: setup_http_test(self)
raise SkipTest("Not going to bother supporting Python 2")
self.storage_server = StorageServer(self.mktemp(), b"\x00" * 20)
# TODO what should the swissnum _actually_ be?
self._http_server = HTTPServer(self.storage_server, b"abcd")
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
b"abcd",
treq=StubTreq(self._http_server.get_resource()),
)
@inlineCallbacks @inlineCallbacks
def test_bad_authentication(self): def test_bad_authentication(self):