Merge remote-tracking branch 'origin/master' into 3902-listen-storage-http

This commit is contained in:
Itamar Turner-Trauring 2022-07-29 09:45:17 -04:00
commit 21bb9e50f6
10 changed files with 189 additions and 43 deletions

View File

@ -654,6 +654,11 @@ The ``Range`` header may be used to request exactly one ``bytes`` range, in whic
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported.
If the response reads beyond the end of the data, the response may be shorter than the requested range.
The resulting ``Content-Range`` header will be consistent with the returned data.
If the response to a query is an empty range, the ``NO CONTENT`` (204) response code will be used.
Discussion
``````````
@ -754,6 +759,11 @@ The ``Range`` header may be used to request exactly one ``bytes`` range, in whic
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
Multiple ranges in a single request are *not* supported; open-ended ranges are also not supported.
If the response reads beyond the end of the data, the response may be shorter than the requested range.
The resulting ``Content-Range`` header will be consistent with the returned data.
If the response to a query is an empty range, the ``NO CONTENT`` (204) response code will be used.
``POST /v1/mutable/:storage_index/:share_number/corrupt``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

0
newsfragments/3709.minor Normal file
View File

0
newsfragments/3909.minor Normal file
View File

View File

@ -133,7 +133,8 @@ install_requires = [
# HTTP server and client
"klein",
"werkzeug",
# 2.2.0 has a bug: https://github.com/pallets/werkzeug/issues/2465
"werkzeug != 2.2.0",
"treq",
"cbor2",
"pycddl",

View File

@ -7,6 +7,7 @@ from __future__ import annotations
from typing import Union, Optional, Sequence, Mapping, BinaryIO
from base64 import b64encode
from io import BytesIO
from os import SEEK_END
from attrs import define, asdict, frozen, field
@ -29,6 +30,7 @@ from treq.client import HTTPClient
from treq.testing import StubTreq
from OpenSSL import SSL
from cryptography.hazmat.bindings.openssl.binding import Binding
from werkzeug.http import parse_content_range_header
from .http_common import (
swissnum_auth_header,
@ -461,13 +463,48 @@ def read_share_chunk(
"GET",
url,
headers=Headers(
# Ranges in HTTP are _inclusive_, Python's convention is exclusive,
# but Range constructor does that the conversion for us.
{"range": [Range("bytes", [(offset, offset + length)]).to_header()]}
),
)
if response.code == http.NO_CONTENT:
return b""
if response.code == http.PARTIAL_CONTENT:
body = yield response.content()
returnValue(body)
content_range = parse_content_range_header(
response.headers.getRawHeaders("content-range")[0] or ""
)
if (
content_range is None
or content_range.stop is None
or content_range.start is None
):
raise ValueError(
"Content-Range was missing, invalid, or in format we don't support"
)
supposed_length = content_range.stop - content_range.start
if supposed_length > length:
raise ValueError("Server sent more than we asked for?!")
# It might also send less than we asked for. That's (probably) OK, e.g.
# if we went past the end of the file.
body = yield limited_content(response, supposed_length)
body.seek(0, SEEK_END)
actual_length = body.tell()
if actual_length != supposed_length:
# Most likely a mutable that got changed out from under us, but
# conceivably could be a bug...
raise ValueError(
f"Length of response sent from server ({actual_length}) "
+ f"didn't match Content-Range header ({supposed_length})"
)
body.seek(0)
return body.read()
else:
# Technically HTTP allows sending an OK with full body under these
# circumstances, but the server is not designed to do that so we ignore
# that possibility for now...
raise ClientException(response.code)

View File

@ -349,35 +349,31 @@ class _ReadRangeProducer:
result: Deferred
start: int
remaining: int
first_read: bool = field(default=True)
def resumeProducing(self):
to_read = min(self.remaining, 65536)
data = self.read_data(self.start, to_read)
assert len(data) <= to_read
if self.first_read and self.remaining > 0:
# For empty bodies the content-range header makes no sense since
# the end of the range is inclusive.
#
# TODO this is wrong for requests that go beyond the end of the
# share. This will be fixed in
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3907 by making that
# edge case not happen.
self.request.setHeader(
"content-range",
ContentRange(
"bytes", self.start, self.start + self.remaining
).to_header(),
)
self.first_read = False
if not data and self.remaining > 0:
# TODO Either data is missing locally (storage issue?) or a bug,
# abort response with error? Until
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3907 is implemented
# we continue anyway.
pass
d, self.result = self.result, None
d.errback(
ValueError(
f"Should be {self.remaining} bytes left, but we got an empty read"
)
)
self.stopProducing()
return
if len(data) > self.remaining:
d, self.result = self.result, None
d.errback(
ValueError(
f"Should be {self.remaining} bytes left, but we got more than that ({len(data)})!"
)
)
self.stopProducing()
return
self.start += len(data)
self.remaining -= len(data)
@ -385,22 +381,25 @@ class _ReadRangeProducer:
self.request.write(data)
# TODO remove the second clause in https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3907
if self.remaining == 0 or not data:
self.request.unregisterProducer()
d = self.result
del self.result
d.callback(b"")
return
if self.remaining == 0:
self.stopProducing()
def pauseProducing(self):
pass
def stopProducing(self):
pass
if self.request is not None:
self.request.unregisterProducer()
self.request = None
if self.result is not None:
d = self.result
self.result = None
d.callback(b"")
def read_range(request: Request, read_data: ReadData) -> Union[Deferred, bytes]:
def read_range(
request: Request, read_data: ReadData, share_length: int
) -> Union[Deferred, bytes]:
"""
Read an optional ``Range`` header, reads data appropriately via the given
callable, writes the data to the request.
@ -436,10 +435,25 @@ def read_range(request: Request, read_data: ReadData) -> Union[Deferred, bytes]:
):
raise _HTTPError(http.REQUESTED_RANGE_NOT_SATISFIABLE)
# TODO if end is beyond the end of the share, either return error, or maybe
# just return what we can...
offset, end = range_header.ranges[0]
# If we're being ask to read beyond the length of the share, just read
# less:
end = min(end, share_length)
if offset >= end:
# Basically we'd need to return an empty body. However, the
# Content-Range header can't actually represent empty lengths... so
# (mis)use 204 response code to indicate that.
raise _HTTPError(http.NO_CONTENT)
request.setResponseCode(http.PARTIAL_CONTENT)
# Actual conversion from Python's exclusive ranges to inclusive ranges is
# handled by werkzeug.
request.setHeader(
"content-range",
ContentRange("bytes", offset, end).to_header(),
)
d = Deferred()
request.registerProducer(
_ReadRangeProducer(
@ -681,7 +695,7 @@ class HTTPServer(object):
request.setResponseCode(http.NOT_FOUND)
return b""
return read_range(request, bucket.read)
return read_range(request, bucket.read, bucket.get_length())
@_authorized_route(
_app,
@ -769,6 +783,13 @@ class HTTPServer(object):
def read_mutable_chunk(self, request, authorization, storage_index, share_number):
"""Read a chunk from a mutable."""
try:
share_length = self._storage_server.get_mutable_share_length(
storage_index, share_number
)
except KeyError:
raise _HTTPError(http.NOT_FOUND)
def read_data(offset, length):
try:
return self._storage_server.slot_readv(
@ -777,7 +798,7 @@ class HTTPServer(object):
except KeyError:
raise _HTTPError(http.NOT_FOUND)
return read_range(request, read_data)
return read_range(request, read_data, share_length)
@_authorized_route(
_app, set(), "/v1/mutable/<storage_index:storage_index>/shares", methods=["GET"]

View File

@ -199,8 +199,16 @@ class ShareFile(object):
raise UnknownImmutableContainerVersionError(filename, version)
self._num_leases = num_leases
self._lease_offset = filesize - (num_leases * self.LEASE_SIZE)
self._length = filesize - 0xc - (num_leases * self.LEASE_SIZE)
self._data_offset = 0xc
def get_length(self):
"""
Return the length of the data in the share, if we're reading.
"""
return self._length
def unlink(self):
os.unlink(self.home)
@ -544,6 +552,12 @@ class BucketReader(object):
self.shnum,
reason)
def get_length(self):
"""
Return the length of the data in the share.
"""
return self._share_file.get_length()
@implementer(RIBucketReader)
class FoolscapBucketReader(Referenceable): # type: ignore # warner/foolscap#78

View File

@ -412,11 +412,14 @@ class MutableShareFile(object):
datav.append(self._read_share_data(f, offset, length))
return datav
# def remote_get_length(self):
# f = open(self.home, 'rb')
# data_length = self._read_data_length(f)
# f.close()
# return data_length
def get_length(self):
"""
Return the length of the data in the share.
"""
f = open(self.home, 'rb')
data_length = self._read_data_length(f)
f.close()
return data_length
def check_write_enabler(self, write_enabler, si_s):
with open(self.home, 'rb+') as f:

View File

@ -794,6 +794,20 @@ class StorageServer(service.MultiService):
return None
def get_immutable_share_length(self, storage_index: bytes, share_number: int) -> int:
"""Returns the length (in bytes) of an immutable."""
si_dir = storage_index_to_dir(storage_index)
path = os.path.join(self.sharedir, si_dir, str(share_number))
return ShareFile(path).get_length()
def get_mutable_share_length(self, storage_index: bytes, share_number: int) -> int:
"""Returns the length (in bytes) of a mutable."""
si_dir = storage_index_to_dir(storage_index)
path = os.path.join(self.sharedir, si_dir, str(share_number))
if not os.path.exists(path):
raise KeyError("No such storage index or share number")
return MutableShareFile(path).get_length()
@implementer(RIStorageServer)
class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78

View File

@ -688,6 +688,19 @@ class Server(unittest.TestCase):
writer.abort()
self.failUnlessEqual(ss.allocated_size(), 0)
def test_immutable_length(self):
"""
``get_immutable_share_length()`` returns the length of an immutable
share, as does ``BucketWriter.get_length()``..
"""
ss = self.create("test_immutable_length")
_, writers = self.allocate(ss, b"allocate", [22], 75)
bucket = writers[22]
bucket.write(0, b"X" * 75)
bucket.close()
self.assertEqual(ss.get_immutable_share_length(b"allocate", 22), 75)
self.assertEqual(ss.get_buckets(b"allocate")[22].get_length(), 75)
def test_allocate(self):
ss = self.create("test_allocate")
@ -1340,6 +1353,39 @@ class MutableServer(unittest.TestCase):
(set(), {0, 1, 2, 4}, {0, 1, 4})
)
def test_mutable_share_length(self):
"""``get_mutable_share_length()`` returns the length of the share."""
ss = self.create("test_mutable_share_length")
self.allocate(ss, b"si1", b"we1", b"le1", [16], 23)
ss.slot_testv_and_readv_and_writev(
b"si1", (self.write_enabler(b"we1"),
self.renew_secret(b"le1"),
self.cancel_secret(b"le1")),
{16: ([], [(0, b"x" * 23)], None)},
[]
)
self.assertEqual(ss.get_mutable_share_length(b"si1", 16), 23)
def test_mutable_share_length_unknown(self):
"""
``get_mutable_share_length()`` raises a ``KeyError`` on unknown shares.
"""
ss = self.create("test_mutable_share_length_unknown")
self.allocate(ss, b"si1", b"we1", b"le1", [16], 23)
ss.slot_testv_and_readv_and_writev(
b"si1", (self.write_enabler(b"we1"),
self.renew_secret(b"le1"),
self.cancel_secret(b"le1")),
{16: ([], [(0, b"x" * 23)], None)},
[]
)
with self.assertRaises(KeyError):
# Wrong share number.
ss.get_mutable_share_length(b"si1", 17)
with self.assertRaises(KeyError):
# Wrong storage index
ss.get_mutable_share_length(b"unknown", 16)
def test_bad_magic(self):
ss = self.create("test_bad_magic")
self.allocate(ss, b"si1", b"we1", next(self._lease_secret), set([0]), 10)