Merge pull request #1301 from tahoe-lafs/4027-invalid-unicode

Invalid unicode in Authorization header should give better response

Fixes ticket:4027
This commit is contained in:
Itamar Turner-Trauring 2023-05-23 14:53:22 -04:00 committed by GitHub
commit d510103f96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 71 additions and 11 deletions

0
newsfragments/4027.minor Normal file
View File

View File

@ -4,7 +4,7 @@ HTTP server for storage.
from __future__ import annotations
from typing import Any, Callable, Union, cast
from typing import Any, Callable, Union, cast, Optional
from functools import wraps
from base64 import b64decode
import binascii
@ -75,7 +75,7 @@ def _extract_secrets(
secrets, return dictionary mapping secrets to decoded values.
If too few secrets were given, or too many, a ``ClientSecretsException`` is
raised.
raised; its text is sent in the HTTP response.
"""
string_key_to_enum = {e.value: e for e in Secrets}
result = {}
@ -84,6 +84,10 @@ def _extract_secrets(
string_key, string_value = header_value.strip().split(" ", 1)
key = string_key_to_enum[string_key]
value = b64decode(string_value)
if value == b"":
raise ClientSecretsException(
"Failed to decode secret {}".format(string_key)
)
if key in (Secrets.LEASE_CANCEL, Secrets.LEASE_RENEW) and len(value) != 32:
raise ClientSecretsException("Lease secrets must be 32 bytes long")
result[key] = value
@ -91,7 +95,9 @@ def _extract_secrets(
raise ClientSecretsException("Bad header value(s): {}".format(header_values))
if result.keys() != required_secrets:
raise ClientSecretsException(
"Expected {} secrets, got {}".format(required_secrets, result.keys())
"Expected {} in X-Tahoe-Authorization headers, got {}".format(
[r.value for r in required_secrets], list(result.keys())
)
)
return result
@ -116,13 +122,19 @@ def _authorization_decorator(required_secrets):
) as ctx:
try:
# Check Authorization header:
try:
auth_header = request.requestHeaders.getRawHeaders(
"Authorization", [""]
)[0].encode("utf-8")
except UnicodeError:
raise _HTTPError(http.BAD_REQUEST, "Bad Authorization header")
if not timing_safe_compare(
request.requestHeaders.getRawHeaders("Authorization", [""])[
0
].encode("utf-8"),
auth_header,
swissnum_auth_header(self._swissnum),
):
raise _HTTPError(http.UNAUTHORIZED)
raise _HTTPError(
http.UNAUTHORIZED, "Wrong Authorization header"
)
# Check secrets:
authorization = request.requestHeaders.getRawHeaders(
@ -130,8 +142,8 @@ def _authorization_decorator(required_secrets):
)
try:
secrets = _extract_secrets(authorization, required_secrets)
except ClientSecretsException:
raise _HTTPError(http.BAD_REQUEST)
except ClientSecretsException as e:
raise _HTTPError(http.BAD_REQUEST, str(e))
# Run the business logic:
result = f(self, request, secrets, *args, **kwargs)
@ -272,8 +284,10 @@ class _HTTPError(Exception):
Raise from ``HTTPServer`` endpoint to return the given HTTP response code.
"""
def __init__(self, code: int):
def __init__(self, code: int, body: Optional[str] = None):
Exception.__init__(self, (code, body))
self.code = code
self.body = body
# CDDL schemas.
@ -499,7 +513,10 @@ def _add_error_handling(app: Klein):
def _http_error(_, request, failure):
"""Handle ``_HTTPError`` exceptions."""
request.setResponseCode(failure.value.code)
return b""
if failure.value.body is not None:
return failure.value.body
else:
return b""
@app.handle_errors(CDDLValidationError)
def _cddl_validation_error(_, request, failure):

View File

@ -257,6 +257,10 @@ class TestApp(object):
_add_error_handling(_app)
_swissnum = SWISSNUM_FOR_TEST # Match what the test client is using
@_authorized_route(_app, {}, "/noop", methods=["GET"])
def noop(self, request, authorization):
return "noop"
@_authorized_route(_app, {Secrets.UPLOAD}, "/upload_secret", methods=["GET"])
def validate_upload_secret(self, request, authorization):
if authorization == {Secrets.UPLOAD: b"MAGIC"}:
@ -340,10 +344,49 @@ class CustomHTTPServerTests(SyncTestCase):
)
self._http_server.clock = self.client._clock
def test_bad_swissnum_from_client(self) -> None:
"""
If the swissnum is invalid, a BAD REQUEST response code is returned.
"""
headers = Headers()
# The value is not UTF-8.
headers.addRawHeader("Authorization", b"\x00\xFF\x00\xFF")
response = result_of(
self.client._treq.request(
"GET",
DecodedURL.from_text("http://127.0.0.1/noop"),
headers=headers,
)
)
self.assertEqual(response.code, 400)
def test_bad_secret(self) -> None:
"""
If the secret is invalid (not base64), a BAD REQUEST
response code is returned.
"""
bad_secret = b"upload-secret []<>"
headers = Headers()
headers.addRawHeader(
"X-Tahoe-Authorization",
bad_secret,
)
response = result_of(
self.client.request(
"GET",
DecodedURL.from_text("http://127.0.0.1/upload_secret"),
headers=headers,
)
)
self.assertEqual(response.code, 400)
def test_authorization_enforcement(self):
"""
The requirement for secrets is enforced by the ``_authorized_route``
decorator; if they are not given, a 400 response code is returned.
Note that this refers to ``X-Tahoe-Authorization``, not the
``Authorization`` header used for the swissnum.
"""
# Without secret, get a 400 error.
response = result_of(