mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-23 10:40:23 +00:00
Merge pull request #1284 from tahoe-lafs/3880-http-storage-logging
HTTP storage logging, part 1 Fixes ticket:3880
This commit is contained in:
commit
174609116e
0
newsfragments/3880.minor
Normal file
0
newsfragments/3880.minor
Normal file
@ -4,6 +4,7 @@ HTTP client that talks to the HTTP storage server.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from eliot import start_action, register_exception_extractor
|
||||
from typing import Union, Optional, Sequence, Mapping, BinaryIO
|
||||
from base64 import b64encode
|
||||
from io import BytesIO
|
||||
@ -18,7 +19,7 @@ from collections_extended import RangeMap
|
||||
from werkzeug.datastructures import Range, ContentRange
|
||||
from twisted.web.http_headers import Headers
|
||||
from twisted.web import http
|
||||
from twisted.web.iweb import IPolicyForHTTPS
|
||||
from twisted.web.iweb import IPolicyForHTTPS, IResponse
|
||||
from twisted.internet.defer import inlineCallbacks, returnValue, fail, Deferred, succeed
|
||||
from twisted.internet.interfaces import (
|
||||
IOpenSSLClientConnectionCreator,
|
||||
@ -63,6 +64,9 @@ class ClientException(Exception):
|
||||
self.code = code
|
||||
|
||||
|
||||
register_exception_extractor(ClientException, lambda e: {"response_code": e.code})
|
||||
|
||||
|
||||
# Schemas for server responses.
|
||||
#
|
||||
# Tags are of the form #6.nnn, where the number is documented at
|
||||
@ -337,7 +341,7 @@ class StorageClient(object):
|
||||
https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
|
||||
return cls(https_url, swissnum, treq_client, reactor)
|
||||
|
||||
def relative_url(self, path):
|
||||
def relative_url(self, path: str) -> DecodedURL:
|
||||
"""Get a URL relative to the base URL."""
|
||||
return self._base_url.click(path)
|
||||
|
||||
@ -351,19 +355,20 @@ class StorageClient(object):
|
||||
)
|
||||
return headers
|
||||
|
||||
def request(
|
||||
@async_to_deferred
|
||||
async def request(
|
||||
self,
|
||||
method,
|
||||
url,
|
||||
lease_renew_secret=None,
|
||||
lease_cancel_secret=None,
|
||||
upload_secret=None,
|
||||
write_enabler_secret=None,
|
||||
headers=None,
|
||||
message_to_serialize=None,
|
||||
method: str,
|
||||
url: DecodedURL,
|
||||
lease_renew_secret: Optional[bytes] = None,
|
||||
lease_cancel_secret: Optional[bytes] = None,
|
||||
upload_secret: Optional[bytes] = None,
|
||||
write_enabler_secret: Optional[bytes] = None,
|
||||
headers: Optional[Headers] = None,
|
||||
message_to_serialize: object = None,
|
||||
timeout: float = 60,
|
||||
**kwargs,
|
||||
):
|
||||
) -> IResponse:
|
||||
"""
|
||||
Like ``treq.request()``, but with optional secrets that get translated
|
||||
into corresponding HTTP headers.
|
||||
@ -373,6 +378,41 @@ class StorageClient(object):
|
||||
|
||||
Default timeout is 60 seconds.
|
||||
"""
|
||||
with start_action(
|
||||
action_type="allmydata:storage:http-client:request",
|
||||
method=method,
|
||||
url=url.to_text(),
|
||||
timeout=timeout,
|
||||
) as ctx:
|
||||
response = await self._request(
|
||||
method,
|
||||
url,
|
||||
lease_renew_secret,
|
||||
lease_cancel_secret,
|
||||
upload_secret,
|
||||
write_enabler_secret,
|
||||
headers,
|
||||
message_to_serialize,
|
||||
timeout,
|
||||
**kwargs,
|
||||
)
|
||||
ctx.add_success_fields(response_code=response.code)
|
||||
return response
|
||||
|
||||
async def _request(
|
||||
self,
|
||||
method: str,
|
||||
url: DecodedURL,
|
||||
lease_renew_secret: Optional[bytes] = None,
|
||||
lease_cancel_secret: Optional[bytes] = None,
|
||||
upload_secret: Optional[bytes] = None,
|
||||
write_enabler_secret: Optional[bytes] = None,
|
||||
headers: Optional[Headers] = None,
|
||||
message_to_serialize: object = None,
|
||||
timeout: float = 60,
|
||||
**kwargs,
|
||||
) -> IResponse:
|
||||
"""The implementation of request()."""
|
||||
headers = self._get_headers(headers)
|
||||
|
||||
# Add secrets:
|
||||
@ -403,7 +443,7 @@ class StorageClient(object):
|
||||
kwargs["data"] = dumps(message_to_serialize)
|
||||
headers.addRawHeader("Content-Type", CBOR_MIME_TYPE)
|
||||
|
||||
return self._treq.request(
|
||||
return await self._treq.request(
|
||||
method, url, headers=headers, timeout=timeout, **kwargs
|
||||
)
|
||||
|
||||
@ -448,12 +488,14 @@ class StorageClientGeneral(object):
|
||||
# Add some features we know are true because the HTTP API
|
||||
# specification requires them and because other parts of the storage
|
||||
# client implementation assumes they will be present.
|
||||
decoded_response[b"http://allmydata.org/tahoe/protocols/storage/v1"].update({
|
||||
b'tolerates-immutable-read-overrun': True,
|
||||
b'delete-mutable-shares-with-zero-length-writev': True,
|
||||
b'fills-holes-with-zero-bytes': True,
|
||||
b'prevents-read-past-end-of-share-data': True,
|
||||
})
|
||||
decoded_response[b"http://allmydata.org/tahoe/protocols/storage/v1"].update(
|
||||
{
|
||||
b"tolerates-immutable-read-overrun": True,
|
||||
b"delete-mutable-shares-with-zero-length-writev": True,
|
||||
b"fills-holes-with-zero-bytes": True,
|
||||
b"prevents-read-past-end-of-share-data": True,
|
||||
}
|
||||
)
|
||||
returnValue(decoded_response)
|
||||
|
||||
@inlineCallbacks
|
||||
|
@ -12,6 +12,7 @@ from tempfile import TemporaryFile
|
||||
from os import SEEK_END, SEEK_SET
|
||||
import mmap
|
||||
|
||||
from eliot import start_action
|
||||
from cryptography.x509 import Certificate as CryptoCertificate
|
||||
from zope.interface import implementer
|
||||
from klein import Klein
|
||||
@ -97,30 +98,50 @@ def _extract_secrets(
|
||||
|
||||
def _authorization_decorator(required_secrets):
|
||||
"""
|
||||
Check the ``Authorization`` header, and extract ``X-Tahoe-Authorization``
|
||||
headers and pass them in.
|
||||
1. Check the ``Authorization`` header matches server swissnum.
|
||||
2. Extract ``X-Tahoe-Authorization`` headers and pass them in.
|
||||
3. Log the request and response.
|
||||
"""
|
||||
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def route(self, request, *args, **kwargs):
|
||||
if not timing_safe_compare(
|
||||
request.requestHeaders.getRawHeaders("Authorization", [""])[0].encode(
|
||||
"utf-8"
|
||||
),
|
||||
swissnum_auth_header(self._swissnum),
|
||||
):
|
||||
request.setResponseCode(http.UNAUTHORIZED)
|
||||
return b""
|
||||
authorization = request.requestHeaders.getRawHeaders(
|
||||
"X-Tahoe-Authorization", []
|
||||
)
|
||||
try:
|
||||
secrets = _extract_secrets(authorization, required_secrets)
|
||||
except ClientSecretsException:
|
||||
request.setResponseCode(http.BAD_REQUEST)
|
||||
return b"Missing required secrets"
|
||||
return f(self, request, secrets, *args, **kwargs)
|
||||
with start_action(
|
||||
action_type="allmydata:storage:http-server:handle-request",
|
||||
method=request.method,
|
||||
path=request.path,
|
||||
) as ctx:
|
||||
try:
|
||||
# Check Authorization header:
|
||||
if not timing_safe_compare(
|
||||
request.requestHeaders.getRawHeaders("Authorization", [""])[0].encode(
|
||||
"utf-8"
|
||||
),
|
||||
swissnum_auth_header(self._swissnum),
|
||||
):
|
||||
raise _HTTPError(http.UNAUTHORIZED)
|
||||
|
||||
# Check secrets:
|
||||
authorization = request.requestHeaders.getRawHeaders(
|
||||
"X-Tahoe-Authorization", []
|
||||
)
|
||||
try:
|
||||
secrets = _extract_secrets(authorization, required_secrets)
|
||||
except ClientSecretsException:
|
||||
raise _HTTPError(http.BAD_REQUEST)
|
||||
|
||||
# Run the business logic:
|
||||
result = f(self, request, secrets, *args, **kwargs)
|
||||
except _HTTPError as e:
|
||||
# This isn't an error necessarily for logging purposes,
|
||||
# it's an implementation detail, an easier way to set
|
||||
# response codes.
|
||||
ctx.add_success_fields(response_code=e.code)
|
||||
ctx.finish()
|
||||
raise
|
||||
else:
|
||||
ctx.add_success_fields(response_code=request.code)
|
||||
return result
|
||||
|
||||
return route
|
||||
|
||||
@ -468,6 +489,21 @@ def read_range(
|
||||
return d
|
||||
|
||||
|
||||
def _add_error_handling(app: Klein):
|
||||
"""Add exception handlers to a Klein app."""
|
||||
@app.handle_errors(_HTTPError)
|
||||
def _http_error(_, request, failure):
|
||||
"""Handle ``_HTTPError`` exceptions."""
|
||||
request.setResponseCode(failure.value.code)
|
||||
return b""
|
||||
|
||||
@app.handle_errors(CDDLValidationError)
|
||||
def _cddl_validation_error(_, request, failure):
|
||||
"""Handle CDDL validation errors."""
|
||||
request.setResponseCode(http.BAD_REQUEST)
|
||||
return str(failure.value).encode("utf-8")
|
||||
|
||||
|
||||
class HTTPServer(object):
|
||||
"""
|
||||
A HTTP interface to the storage server.
|
||||
@ -475,18 +511,7 @@ class HTTPServer(object):
|
||||
|
||||
_app = Klein()
|
||||
_app.url_map.converters["storage_index"] = StorageIndexConverter
|
||||
|
||||
@_app.handle_errors(_HTTPError)
|
||||
def _http_error(self, request, failure):
|
||||
"""Handle ``_HTTPError`` exceptions."""
|
||||
request.setResponseCode(failure.value.code)
|
||||
return b""
|
||||
|
||||
@_app.handle_errors(CDDLValidationError)
|
||||
def _cddl_validation_error(self, request, failure):
|
||||
"""Handle CDDL validation errors."""
|
||||
request.setResponseCode(http.BAD_REQUEST)
|
||||
return str(failure.value).encode("utf-8")
|
||||
_add_error_handling(_app)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -54,6 +54,7 @@ from ..storage.http_server import (
|
||||
ClientSecretsException,
|
||||
_authorized_route,
|
||||
StorageIndexConverter,
|
||||
_add_error_handling,
|
||||
)
|
||||
from ..storage.http_client import (
|
||||
StorageClient,
|
||||
@ -253,6 +254,7 @@ class TestApp(object):
|
||||
|
||||
clock: IReactorTime
|
||||
_app = Klein()
|
||||
_add_error_handling(_app)
|
||||
_swissnum = SWISSNUM_FOR_TEST # Match what the test client is using
|
||||
|
||||
@_authorized_route(_app, {Secrets.UPLOAD}, "/upload_secret", methods=["GET"])
|
||||
@ -346,7 +348,7 @@ class CustomHTTPServerTests(SyncTestCase):
|
||||
response = result_of(
|
||||
self.client.request(
|
||||
"GET",
|
||||
"http://127.0.0.1/upload_secret",
|
||||
DecodedURL.from_text("http://127.0.0.1/upload_secret"),
|
||||
)
|
||||
)
|
||||
self.assertEqual(response.code, 400)
|
||||
@ -354,7 +356,9 @@ class CustomHTTPServerTests(SyncTestCase):
|
||||
# With secret, we're good.
|
||||
response = result_of(
|
||||
self.client.request(
|
||||
"GET", "http://127.0.0.1/upload_secret", upload_secret=b"MAGIC"
|
||||
"GET",
|
||||
DecodedURL.from_text("http://127.0.0.1/upload_secret"),
|
||||
upload_secret=b"MAGIC",
|
||||
)
|
||||
)
|
||||
self.assertEqual(response.code, 200)
|
||||
@ -378,7 +382,7 @@ class CustomHTTPServerTests(SyncTestCase):
|
||||
response = result_of(
|
||||
self.client.request(
|
||||
"GET",
|
||||
f"http://127.0.0.1/bytes/{length}",
|
||||
DecodedURL.from_text(f"http://127.0.0.1/bytes/{length}"),
|
||||
)
|
||||
)
|
||||
|
||||
@ -399,7 +403,7 @@ class CustomHTTPServerTests(SyncTestCase):
|
||||
response = result_of(
|
||||
self.client.request(
|
||||
"GET",
|
||||
f"http://127.0.0.1/bytes/{length}",
|
||||
DecodedURL.from_text(f"http://127.0.0.1/bytes/{length}"),
|
||||
)
|
||||
)
|
||||
|
||||
@ -414,7 +418,7 @@ class CustomHTTPServerTests(SyncTestCase):
|
||||
response = result_of(
|
||||
self.client.request(
|
||||
"GET",
|
||||
"http://127.0.0.1/slowly_never_finish_result",
|
||||
DecodedURL.from_text("http://127.0.0.1/slowly_never_finish_result"),
|
||||
)
|
||||
)
|
||||
|
||||
@ -442,7 +446,7 @@ class CustomHTTPServerTests(SyncTestCase):
|
||||
response = result_of(
|
||||
self.client.request(
|
||||
"GET",
|
||||
"http://127.0.0.1/die",
|
||||
DecodedURL.from_text("http://127.0.0.1/die"),
|
||||
)
|
||||
)
|
||||
|
||||
@ -459,6 +463,7 @@ class Reactor(Clock):
|
||||
|
||||
Advancing the clock also runs any callbacks scheduled via callFromThread.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
Clock.__init__(self)
|
||||
self._queue = Queue()
|
||||
@ -499,7 +504,9 @@ class HttpTestFixture(Fixture):
|
||||
self.storage_server = StorageServer(
|
||||
self.tempdir.path, b"\x00" * 20, clock=self.clock
|
||||
)
|
||||
self.http_server = HTTPServer(self.clock, self.storage_server, SWISSNUM_FOR_TEST)
|
||||
self.http_server = HTTPServer(
|
||||
self.clock, self.storage_server, SWISSNUM_FOR_TEST
|
||||
)
|
||||
self.treq = StubTreq(self.http_server.get_resource())
|
||||
self.client = StorageClient(
|
||||
DecodedURL.from_text("http://127.0.0.1"),
|
||||
|
Loading…
x
Reference in New Issue
Block a user