Merge remote-tracking branch 'origin/master' into 3999.structure-config-manipulation

This commit is contained in:
Itamar Turner-Trauring
2023-05-01 12:52:18 -04:00
26 changed files with 357 additions and 263 deletions

View File

@ -39,6 +39,8 @@ dockerhub-auth-template: &DOCKERHUB_AUTH
<<: *DOCKERHUB_CONTEXT <<: *DOCKERHUB_CONTEXT
- "build-image-ubuntu-20-04": - "build-image-ubuntu-20-04":
<<: *DOCKERHUB_CONTEXT <<: *DOCKERHUB_CONTEXT
- "build-image-ubuntu-22-04":
<<: *DOCKERHUB_CONTEXT
- "build-image-fedora-35": - "build-image-fedora-35":
<<: *DOCKERHUB_CONTEXT <<: *DOCKERHUB_CONTEXT
- "build-image-oraclelinux-8": - "build-image-oraclelinux-8":
@ -78,6 +80,9 @@ workflows:
- "ubuntu-20-04": - "ubuntu-20-04":
{} {}
- "ubuntu-22-04":
{}
# Equivalent to RHEL 8; CentOS 8 is dead. # Equivalent to RHEL 8; CentOS 8 is dead.
- "oraclelinux-8": - "oraclelinux-8":
{} {}
@ -88,6 +93,8 @@ workflows:
matrix: matrix:
parameters: parameters:
pythonVersion: pythonVersion:
- "python38"
- "python39"
- "python310" - "python310"
- "nixos": - "nixos":
@ -253,7 +260,7 @@ jobs:
name: "Submit coverage results" name: "Submit coverage results"
command: | command: |
if [ -n "${UPLOAD_COVERAGE}" ]; then if [ -n "${UPLOAD_COVERAGE}" ]; then
/tmp/venv/bin/codecov echo "TODO: Need a new coverage solution, see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/4011"
fi fi
docker: docker:
@ -333,6 +340,16 @@ jobs:
<<: *UTF_8_ENVIRONMENT <<: *UTF_8_ENVIRONMENT
TAHOE_LAFS_TOX_ENVIRONMENT: "py39" TAHOE_LAFS_TOX_ENVIRONMENT: "py39"
ubuntu-22-04:
<<: *DEBIAN
docker:
- <<: *DOCKERHUB_AUTH
image: "tahoelafsci/ubuntu:22.04-py3.10"
user: "nobody"
environment:
<<: *UTF_8_ENVIRONMENT
TAHOE_LAFS_TOX_ENVIRONMENT: "py310"
oraclelinux-8: &RHEL_DERIV oraclelinux-8: &RHEL_DERIV
docker: docker:
- <<: *DOCKERHUB_AUTH - <<: *DOCKERHUB_AUTH
@ -479,6 +496,15 @@ jobs:
PYTHON_VERSION: "3.9" PYTHON_VERSION: "3.9"
build-image-ubuntu-22-04:
<<: *BUILD_IMAGE
environment:
DISTRO: "ubuntu"
TAG: "22.04"
PYTHON_VERSION: "3.10"
build-image-oraclelinux-8: build-image-oraclelinux-8:
<<: *BUILD_IMAGE <<: *BUILD_IMAGE

View File

@ -47,3 +47,7 @@ export PIP_FIND_LINKS="file://${WHEELHOUSE_PATH}"
# above, it may still not be able to get us a compatible version unless we # above, it may still not be able to get us a compatible version unless we
# explicitly ask for one. # explicitly ask for one.
"${PIP}" install --upgrade setuptools==44.0.0 wheel "${PIP}" install --upgrade setuptools==44.0.0 wheel
# Just about every user of this image wants to use tox from the bootstrap
# virtualenv so go ahead and install it now.
"${PIP}" install "tox~=3.0"

View File

@ -3,18 +3,6 @@
# https://vaneyckt.io/posts/safer_bash_scripts_with_set_euxo_pipefail/ # https://vaneyckt.io/posts/safer_bash_scripts_with_set_euxo_pipefail/
set -euxo pipefail set -euxo pipefail
# Basic Python packages that you just need to have around to do anything,
# practically speaking.
BASIC_DEPS="pip wheel"
# Python packages we need to support the test infrastructure. *Not* packages
# Tahoe-LAFS itself (implementation or test suite) need.
TEST_DEPS="tox~=3.0 codecov"
# Python packages we need to generate test reports for CI infrastructure.
# *Not* packages Tahoe-LAFS itself (implement or test suite) need.
REPORTING_DEPS="python-subunit junitxml subunitreporter"
# The filesystem location of the wheelhouse which we'll populate with wheels # The filesystem location of the wheelhouse which we'll populate with wheels
# for all of our dependencies. # for all of our dependencies.
WHEELHOUSE_PATH="$1" WHEELHOUSE_PATH="$1"
@ -41,15 +29,5 @@ export PIP_FIND_LINKS="file://${WHEELHOUSE_PATH}"
LANG="en_US.UTF-8" "${PIP}" \ LANG="en_US.UTF-8" "${PIP}" \
wheel \ wheel \
--wheel-dir "${WHEELHOUSE_PATH}" \ --wheel-dir "${WHEELHOUSE_PATH}" \
"${PROJECT_ROOT}"[test] \ "${PROJECT_ROOT}"[testenv] \
${BASIC_DEPS} \ "${PROJECT_ROOT}"[test]
${TEST_DEPS} \
${REPORTING_DEPS}
# Not strictly wheelhouse population but ... Note we omit basic deps here.
# They're in the wheelhouse if Tahoe-LAFS wants to drag them in but it will
# have to ask.
"${PIP}" \
install \
${TEST_DEPS} \
${REPORTING_DEPS}

View File

@ -79,9 +79,10 @@ else
alternative="false" alternative="false"
fi fi
WORKDIR=/tmp/tahoe-lafs.tox
${TIMEOUT} ${BOOTSTRAP_VENV}/bin/tox \ ${TIMEOUT} ${BOOTSTRAP_VENV}/bin/tox \
-c ${PROJECT_ROOT}/tox.ini \ -c ${PROJECT_ROOT}/tox.ini \
--workdir /tmp/tahoe-lafs.tox \ --workdir "${WORKDIR}" \
-e "${TAHOE_LAFS_TOX_ENVIRONMENT}" \ -e "${TAHOE_LAFS_TOX_ENVIRONMENT}" \
${TAHOE_LAFS_TOX_ARGS} || "${alternative}" ${TAHOE_LAFS_TOX_ARGS} || "${alternative}"
@ -93,5 +94,6 @@ if [ -n "${ARTIFACTS}" ]; then
# Create a junitxml results area. # Create a junitxml results area.
mkdir -p "$(dirname "${JUNITXML}")" mkdir -p "$(dirname "${JUNITXML}")"
"${BOOTSTRAP_VENV}"/bin/subunit2junitxml < "${SUBUNIT2}" > "${JUNITXML}" || "${alternative}"
"${WORKDIR}/${TAHOE_LAFS_TOX_ENVIRONMENT}/bin/subunit2junitxml" < "${SUBUNIT2}" > "${JUNITXML}" || "${alternative}"
fi fi

View File

@ -26,12 +26,7 @@ shift || :
# Tell pip where it can find any existing wheels. # Tell pip where it can find any existing wheels.
export PIP_FIND_LINKS="file://${WHEELHOUSE_PATH}" export PIP_FIND_LINKS="file://${WHEELHOUSE_PATH}"
export PIP_NO_INDEX="1"
# It is tempting to also set PIP_NO_INDEX=1 but (a) that will cause problems
# between the time dependencies change and the images are re-built and (b) the
# upcoming-deprecations job wants to install some dependencies from github and
# it's awkward to get that done any earlier than the tox run. So, we don't
# set it.
# Get everything else installed in it, too. # Get everything else installed in it, too.
"${BOOTSTRAP_VENV}"/bin/tox \ "${BOOTSTRAP_VENV}"/bin/tox \

View File

@ -46,7 +46,6 @@ jobs:
matrix: matrix:
os: os:
- windows-latest - windows-latest
- ubuntu-latest
python-version: python-version:
- "3.8" - "3.8"
- "3.9" - "3.9"
@ -80,7 +79,7 @@ jobs:
- name: Install Python packages - name: Install Python packages
run: | run: |
pip install --upgrade codecov "tox<4" tox-gh-actions setuptools pip install --upgrade "tox<4" tox-gh-actions setuptools
pip list pip list
- name: Display tool versions - name: Display tool versions

2
.gitignore vendored
View File

@ -53,3 +53,5 @@ zope.interface-*.egg
# This is the plaintext of the private environment needed for some CircleCI # This is the plaintext of the private environment needed for some CircleCI
# operations. It's never supposed to be checked in. # operations. It's never supposed to be checked in.
secret-env-plain secret-env-plain
.ruff_cache

12
.ruff.toml Normal file
View File

@ -0,0 +1,12 @@
select = [
# Pyflakes checks
"F",
# Prohibit tabs:
"W191",
# No trailing whitespace:
"W291",
"W293",
# Make sure we bind closure variables in a loop (equivalent to pylint
# cell-var-from-loop):
"B023",
]

View File

@ -4,6 +4,7 @@ Ported to Python 3.
from __future__ import annotations from __future__ import annotations
import os
import sys import sys
import shutil import shutil
from time import sleep from time import sleep
@ -49,6 +50,11 @@ from .util import (
) )
from allmydata.node import read_config from allmydata.node import read_config
# No reason for HTTP requests to take longer than two minutes in the
# integration tests. See allmydata/scripts/common_http.py for usage.
os.environ["__TAHOE_CLI_HTTP_TIMEOUT"] = "120"
# pytest customization hooks # pytest customization hooks
def pytest_addoption(parser): def pytest_addoption(parser):

View File

@ -1,5 +1,3 @@
from __future__ import print_function
""" """
this is a load-generating client program. It does all of its work through a this is a load-generating client program. It does all of its work through a
given tahoe node (specified by URL), and performs random reads and writes given tahoe node (specified by URL), and performs random reads and writes

View File

@ -1,44 +0,0 @@
#!/usr/bin/env python
from __future__ import print_function
import os, sys
from twisted.python import usage
class Options(usage.Options):
optFlags = [
("recursive", "r", "Search for .py files recursively"),
]
def parseArgs(self, *starting_points):
self.starting_points = starting_points
found = [False]
def check(fn):
f = open(fn, "r")
for i,line in enumerate(f.readlines()):
if line == "\n":
continue
if line[-1] == "\n":
line = line[:-1]
if line.rstrip() != line:
# the %s:%d:%d: lets emacs' compile-mode jump to those locations
print("%s:%d:%d: trailing whitespace" % (fn, i+1, len(line)+1))
found[0] = True
f.close()
o = Options()
o.parseOptions()
if o['recursive']:
for starting_point in o.starting_points:
for root, dirs, files in os.walk(starting_point):
for fn in [f for f in files if f.endswith(".py")]:
fn = os.path.join(root, fn)
check(fn)
else:
for fn in o.starting_points:
check(fn)
if found[0]:
sys.exit(1)
sys.exit(0)

0
newsfragments/3880.minor Normal file
View File

0
newsfragments/4005.minor Normal file
View File

0
newsfragments/4006.minor Normal file
View File

0
newsfragments/4010.minor Normal file
View File

0
newsfragments/4012.minor Normal file
View File

0
newsfragments/4014.minor Normal file
View File

0
newsfragments/4019.minor Normal file
View File

1
newsfragments/4020.minor Normal file
View File

@ -0,0 +1 @@

View File

@ -6,6 +6,9 @@ develop = update_version develop
bdist_egg = update_version bdist_egg bdist_egg = update_version bdist_egg
bdist_wheel = update_version bdist_wheel bdist_wheel = update_version bdist_wheel
# This has been replaced by ruff (see .ruff.toml), which has same checks as
# flake8 plus many more, and is also faster. However, we're keeping this config
# in case people still use flake8 in IDEs, etc..
[flake8] [flake8]
# Enforce all pyflakes constraints, and also prohibit tabs for indentation. # Enforce all pyflakes constraints, and also prohibit tabs for indentation.
# Reference: # Reference:

View File

@ -141,8 +141,10 @@ install_requires = [
# HTTP server and client # HTTP server and client
"klein", "klein",
# 2.2.0 has a bug: https://github.com/pallets/werkzeug/issues/2465 # 2.2.0 has a bug: https://github.com/pallets/werkzeug/issues/2465
"werkzeug != 2.2.0", # 2.3.x has an incompatibility with Klein: https://github.com/twisted/klein/pull/575
"werkzeug != 2.2.0, < 2.3",
"treq", "treq",
"cbor2", "cbor2",
@ -398,16 +400,31 @@ setup(name="tahoe-lafs", # also set in __init__.py
"dulwich", "dulwich",
"gpg", "gpg",
], ],
"test": [
"flake8", # Here are the dependencies required to set up a reproducible test
# Pin a specific pyflakes so we don't have different folks # environment. This could be for CI or local development. These
# disagreeing on what is or is not a lint issue. We can bump # are *not* library dependencies of the test suite itself. They are
# this version from time to time, but we will do it # the tools we use to run the test suite at all.
# intentionally. "testenv": [
"pyflakes == 3.0.1", # Pin all of these versions for the same reason you ever want to
# pin anything: to prevent new releases with regressions from
# introducing spurious failures into CI runs for whatever
# development work is happening at the time. The versions
# selected here are just the current versions at the time.
# Bumping them to keep up with future releases is fine as long
# as those releases are known to actually work.
"pip==22.0.3",
"wheel==0.37.1",
"setuptools==60.9.1",
"subunitreporter==22.2.0",
"python-subunit==1.4.2",
"junitxml==0.7",
"coverage ~= 5.0", "coverage ~= 5.0",
],
# Here are the library dependencies of the test suite.
"test": [
"mock", "mock",
"tox ~= 3.0",
"pytest", "pytest",
"pytest-twisted", "pytest-twisted",
"hypothesis >= 3.6.1", "hypothesis >= 3.6.1",
@ -416,7 +433,6 @@ setup(name="tahoe-lafs", # also set in __init__.py
"fixtures", "fixtures",
"beautifulsoup4", "beautifulsoup4",
"html5lib", "html5lib",
"junitxml",
# Pin old version until # Pin old version until
# https://github.com/paramiko/paramiko/issues/1961 is fixed. # https://github.com/paramiko/paramiko/issues/1961 is fixed.
"paramiko < 2.9", "paramiko < 2.9",

View File

@ -1,19 +1,11 @@
""" """
Ported to Python 3. Blocking HTTP client APIs.
""" """
from __future__ import unicode_literals
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import os import os
from io import BytesIO from io import BytesIO
from six.moves import urllib, http_client from http import client as http_client
import six import urllib
import allmydata # for __full_version__ import allmydata # for __full_version__
from allmydata.util.encodingutil import quote_output from allmydata.util.encodingutil import quote_output
@ -51,7 +43,7 @@ class BadResponse(object):
def do_http(method, url, body=b""): def do_http(method, url, body=b""):
if isinstance(body, bytes): if isinstance(body, bytes):
body = BytesIO(body) body = BytesIO(body)
elif isinstance(body, six.text_type): elif isinstance(body, str):
raise TypeError("do_http body must be a bytestring, not unicode") raise TypeError("do_http body must be a bytestring, not unicode")
else: else:
# We must give a Content-Length header to twisted.web, otherwise it # We must give a Content-Length header to twisted.web, otherwise it
@ -61,10 +53,17 @@ def do_http(method, url, body=b""):
assert body.seek assert body.seek
assert body.read assert body.read
scheme, host, port, path = parse_url(url) scheme, host, port, path = parse_url(url)
# For testing purposes, allow setting a timeout on HTTP requests. If this
# ever become a user-facing feature, this should probably be a CLI option?
timeout = os.environ.get("__TAHOE_CLI_HTTP_TIMEOUT", None)
if timeout is not None:
timeout = float(timeout)
if scheme == "http": if scheme == "http":
c = http_client.HTTPConnection(host, port) c = http_client.HTTPConnection(host, port, timeout=timeout, blocksize=65536)
elif scheme == "https": elif scheme == "https":
c = http_client.HTTPSConnection(host, port) c = http_client.HTTPSConnection(host, port, timeout=timeout, blocksize=65536)
else: else:
raise ValueError("unknown scheme '%s', need http or https" % scheme) raise ValueError("unknown scheme '%s', need http or https" % scheme)
c.putrequest(method, path) c.putrequest(method, path)
@ -85,7 +84,7 @@ def do_http(method, url, body=b""):
return BadResponse(url, err) return BadResponse(url, err)
while True: while True:
data = body.read(8192) data = body.read(65536)
if not data: if not data:
break break
c.send(data) c.send(data)
@ -94,16 +93,14 @@ def do_http(method, url, body=b""):
def format_http_success(resp): def format_http_success(resp):
# ensure_text() shouldn't be necessary when Python 2 is dropped.
return quote_output( return quote_output(
"%s %s" % (resp.status, six.ensure_text(resp.reason)), "%s %s" % (resp.status, resp.reason),
quotemarks=False) quotemarks=False)
def format_http_error(msg, resp): def format_http_error(msg, resp):
# ensure_text() shouldn't be necessary when Python 2 is dropped.
return quote_output( return quote_output(
"%s: %s %s\n%s" % (msg, resp.status, six.ensure_text(resp.reason), "%s: %s %s\n%r" % (msg, resp.status, resp.reason,
six.ensure_text(resp.read())), resp.read()),
quotemarks=False) quotemarks=False)
def check_http_error(resp, stderr): def check_http_error(resp, stderr):

View File

@ -4,7 +4,8 @@ HTTP client that talks to the HTTP storage server.
from __future__ import annotations from __future__ import annotations
from typing import Union, Optional, Sequence, Mapping, BinaryIO from eliot import start_action, register_exception_extractor
from typing import Union, Optional, Sequence, Mapping, BinaryIO, cast, TypedDict, Set
from base64 import b64encode from base64 import b64encode
from io import BytesIO from io import BytesIO
from os import SEEK_END from os import SEEK_END
@ -18,8 +19,8 @@ from collections_extended import RangeMap
from werkzeug.datastructures import Range, ContentRange from werkzeug.datastructures import Range, ContentRange
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from twisted.web import http from twisted.web import http
from twisted.web.iweb import IPolicyForHTTPS from twisted.web.iweb import IPolicyForHTTPS, IResponse
from twisted.internet.defer import inlineCallbacks, returnValue, fail, Deferred, succeed from twisted.internet.defer import inlineCallbacks, Deferred, succeed
from twisted.internet.interfaces import ( from twisted.internet.interfaces import (
IOpenSSLClientConnectionCreator, IOpenSSLClientConnectionCreator,
IReactorTime, IReactorTime,
@ -63,6 +64,9 @@ class ClientException(Exception):
self.code = code self.code = code
register_exception_extractor(ClientException, lambda e: {"response_code": e.code})
# Schemas for server responses. # Schemas for server responses.
# #
# Tags are of the form #6.nnn, where the number is documented at # Tags are of the form #6.nnn, where the number is documented at
@ -337,7 +341,7 @@ class StorageClient(object):
https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port) https_url = DecodedURL().replace(scheme="https", host=nurl.host, port=nurl.port)
return cls(https_url, swissnum, treq_client, reactor) return cls(https_url, swissnum, treq_client, reactor)
def relative_url(self, path): def relative_url(self, path: str) -> DecodedURL:
"""Get a URL relative to the base URL.""" """Get a URL relative to the base URL."""
return self._base_url.click(path) return self._base_url.click(path)
@ -351,19 +355,20 @@ class StorageClient(object):
) )
return headers return headers
def request( @async_to_deferred
async def request(
self, self,
method, method: str,
url, url: DecodedURL,
lease_renew_secret=None, lease_renew_secret: Optional[bytes] = None,
lease_cancel_secret=None, lease_cancel_secret: Optional[bytes] = None,
upload_secret=None, upload_secret: Optional[bytes] = None,
write_enabler_secret=None, write_enabler_secret: Optional[bytes] = None,
headers=None, headers: Optional[Headers] = None,
message_to_serialize=None, message_to_serialize: object = None,
timeout: float = 60, timeout: float = 60,
**kwargs, **kwargs,
): ) -> IResponse:
""" """
Like ``treq.request()``, but with optional secrets that get translated Like ``treq.request()``, but with optional secrets that get translated
into corresponding HTTP headers. into corresponding HTTP headers.
@ -373,6 +378,41 @@ class StorageClient(object):
Default timeout is 60 seconds. Default timeout is 60 seconds.
""" """
with start_action(
action_type="allmydata:storage:http-client:request",
method=method,
url=url.to_text(),
timeout=timeout,
) as ctx:
response = await self._request(
method,
url,
lease_renew_secret,
lease_cancel_secret,
upload_secret,
write_enabler_secret,
headers,
message_to_serialize,
timeout,
**kwargs,
)
ctx.add_success_fields(response_code=response.code)
return response
async def _request(
self,
method: str,
url: DecodedURL,
lease_renew_secret: Optional[bytes] = None,
lease_cancel_secret: Optional[bytes] = None,
upload_secret: Optional[bytes] = None,
write_enabler_secret: Optional[bytes] = None,
headers: Optional[Headers] = None,
message_to_serialize: object = None,
timeout: float = 60,
**kwargs,
) -> IResponse:
"""The implementation of request()."""
headers = self._get_headers(headers) headers = self._get_headers(headers)
# Add secrets: # Add secrets:
@ -403,28 +443,32 @@ class StorageClient(object):
kwargs["data"] = dumps(message_to_serialize) kwargs["data"] = dumps(message_to_serialize)
headers.addRawHeader("Content-Type", CBOR_MIME_TYPE) headers.addRawHeader("Content-Type", CBOR_MIME_TYPE)
return self._treq.request( return await self._treq.request(
method, url, headers=headers, timeout=timeout, **kwargs method, url, headers=headers, timeout=timeout, **kwargs
) )
def decode_cbor(self, response, schema: Schema): async def decode_cbor(self, response, schema: Schema) -> object:
"""Given HTTP response, return decoded CBOR body.""" """Given HTTP response, return decoded CBOR body."""
with start_action(action_type="allmydata:storage:http-client:decode-cbor"):
def got_content(f: BinaryIO): if response.code > 199 and response.code < 300:
data = f.read() content_type = get_content_type(response.headers)
schema.validate_cbor(data) if content_type == CBOR_MIME_TYPE:
return loads(data) f = await limited_content(response, self._clock)
data = f.read()
if response.code > 199 and response.code < 300: schema.validate_cbor(data)
content_type = get_content_type(response.headers) return loads(data)
if content_type == CBOR_MIME_TYPE: else:
return limited_content(response, self._clock).addCallback(got_content) raise ClientException(
-1,
"Server didn't send CBOR, content type is {}".format(
content_type
),
)
else: else:
raise ClientException(-1, "Server didn't send CBOR") data = (
else: await limited_content(response, self._clock, max_length=10_000)
return treq.content(response).addCallback( ).read()
lambda data: fail(ClientException(response.code, response.phrase, data)) raise ClientException(response.code, response.phrase, data)
)
@define(hash=True) @define(hash=True)
@ -435,26 +479,32 @@ class StorageClientGeneral(object):
_client: StorageClient _client: StorageClient
@inlineCallbacks @async_to_deferred
def get_version(self): async def get_version(self):
""" """
Return the version metadata for the server. Return the version metadata for the server.
""" """
url = self._client.relative_url("/storage/v1/version") url = self._client.relative_url("/storage/v1/version")
response = yield self._client.request("GET", url) response = await self._client.request("GET", url)
decoded_response = yield self._client.decode_cbor( decoded_response = cast(
response, _SCHEMAS["get_version"] Mapping[bytes, object],
await self._client.decode_cbor(response, _SCHEMAS["get_version"]),
) )
# Add some features we know are true because the HTTP API # Add some features we know are true because the HTTP API
# specification requires them and because other parts of the storage # specification requires them and because other parts of the storage
# client implementation assumes they will be present. # client implementation assumes they will be present.
decoded_response[b"http://allmydata.org/tahoe/protocols/storage/v1"].update({ cast(
b'tolerates-immutable-read-overrun': True, Mapping[bytes, object],
b'delete-mutable-shares-with-zero-length-writev': True, decoded_response[b"http://allmydata.org/tahoe/protocols/storage/v1"],
b'fills-holes-with-zero-bytes': True, ).update(
b'prevents-read-past-end-of-share-data': True, {
}) b"tolerates-immutable-read-overrun": True,
returnValue(decoded_response) b"delete-mutable-shares-with-zero-length-writev": True,
b"fills-holes-with-zero-bytes": True,
b"prevents-read-past-end-of-share-data": True,
}
)
return decoded_response
@inlineCallbacks @inlineCallbacks
def add_or_renew_lease( def add_or_renew_lease(
@ -605,16 +655,16 @@ class StorageClientImmutables(object):
_client: StorageClient _client: StorageClient
@inlineCallbacks @async_to_deferred
def create( async def create(
self, self,
storage_index, storage_index: bytes,
share_numbers, share_numbers: set[int],
allocated_size, allocated_size: int,
upload_secret, upload_secret: bytes,
lease_renew_secret, lease_renew_secret: bytes,
lease_cancel_secret, lease_cancel_secret: bytes,
): # type: (bytes, set[int], int, bytes, bytes, bytes) -> Deferred[ImmutableCreateResult] ) -> ImmutableCreateResult:
""" """
Create a new storage index for an immutable. Create a new storage index for an immutable.
@ -633,7 +683,7 @@ class StorageClientImmutables(object):
) )
message = {"share-numbers": share_numbers, "allocated-size": allocated_size} message = {"share-numbers": share_numbers, "allocated-size": allocated_size}
response = yield self._client.request( response = await self._client.request(
"POST", "POST",
url, url,
lease_renew_secret=lease_renew_secret, lease_renew_secret=lease_renew_secret,
@ -641,14 +691,13 @@ class StorageClientImmutables(object):
upload_secret=upload_secret, upload_secret=upload_secret,
message_to_serialize=message, message_to_serialize=message,
) )
decoded_response = yield self._client.decode_cbor( decoded_response = cast(
response, _SCHEMAS["allocate_buckets"] Mapping[str, Set[int]],
await self._client.decode_cbor(response, _SCHEMAS["allocate_buckets"]),
) )
returnValue( return ImmutableCreateResult(
ImmutableCreateResult( already_have=decoded_response["already-have"],
already_have=decoded_response["already-have"], allocated=decoded_response["allocated"],
allocated=decoded_response["allocated"],
)
) )
@inlineCallbacks @inlineCallbacks
@ -674,10 +723,15 @@ class StorageClientImmutables(object):
response.code, response.code,
) )
@inlineCallbacks @async_to_deferred
def write_share_chunk( async def write_share_chunk(
self, storage_index, share_number, upload_secret, offset, data self,
): # type: (bytes, int, bytes, int, bytes) -> Deferred[UploadProgress] storage_index: bytes,
share_number: int,
upload_secret: bytes,
offset: int,
data: bytes,
) -> UploadProgress:
""" """
Upload a chunk of data for a specific share. Upload a chunk of data for a specific share.
@ -695,7 +749,7 @@ class StorageClientImmutables(object):
_encode_si(storage_index), share_number _encode_si(storage_index), share_number
) )
) )
response = yield self._client.request( response = await self._client.request(
"PATCH", "PATCH",
url, url,
upload_secret=upload_secret, upload_secret=upload_secret,
@ -719,13 +773,16 @@ class StorageClientImmutables(object):
raise ClientException( raise ClientException(
response.code, response.code,
) )
body = yield self._client.decode_cbor( body = cast(
response, _SCHEMAS["immutable_write_share_chunk"] Mapping[str, Sequence[Mapping[str, int]]],
await self._client.decode_cbor(
response, _SCHEMAS["immutable_write_share_chunk"]
),
) )
remaining = RangeMap() remaining = RangeMap()
for chunk in body["required"]: for chunk in body["required"]:
remaining.set(True, chunk["begin"], chunk["end"]) remaining.set(True, chunk["begin"], chunk["end"])
returnValue(UploadProgress(finished=finished, required=remaining)) return UploadProgress(finished=finished, required=remaining)
def read_share_chunk( def read_share_chunk(
self, storage_index, share_number, offset, length self, storage_index, share_number, offset, length
@ -737,21 +794,23 @@ class StorageClientImmutables(object):
self._client, "immutable", storage_index, share_number, offset, length self._client, "immutable", storage_index, share_number, offset, length
) )
@inlineCallbacks @async_to_deferred
def list_shares(self, storage_index: bytes) -> Deferred[set[int]]: async def list_shares(self, storage_index: bytes) -> Set[int]:
""" """
Return the set of shares for a given storage index. Return the set of shares for a given storage index.
""" """
url = self._client.relative_url( url = self._client.relative_url(
"/storage/v1/immutable/{}/shares".format(_encode_si(storage_index)) "/storage/v1/immutable/{}/shares".format(_encode_si(storage_index))
) )
response = yield self._client.request( response = await self._client.request(
"GET", "GET",
url, url,
) )
if response.code == http.OK: if response.code == http.OK:
body = yield self._client.decode_cbor(response, _SCHEMAS["list_shares"]) return cast(
returnValue(set(body)) Set[int],
await self._client.decode_cbor(response, _SCHEMAS["list_shares"]),
)
else: else:
raise ClientException(response.code) raise ClientException(response.code)
@ -821,6 +880,13 @@ class ReadTestWriteResult:
reads: Mapping[int, Sequence[bytes]] reads: Mapping[int, Sequence[bytes]]
# Result type for mutable read/test/write HTTP response. Can't just use
# dict[int,list[bytes]] because on Python 3.8 that will error out.
MUTABLE_RTW = TypedDict(
"MUTABLE_RTW", {"success": bool, "data": Mapping[int, Sequence[bytes]]}
)
@frozen @frozen
class StorageClientMutables: class StorageClientMutables:
""" """
@ -867,8 +933,11 @@ class StorageClientMutables:
message_to_serialize=message, message_to_serialize=message,
) )
if response.code == http.OK: if response.code == http.OK:
result = await self._client.decode_cbor( result = cast(
response, _SCHEMAS["mutable_read_test_write"] MUTABLE_RTW,
await self._client.decode_cbor(
response, _SCHEMAS["mutable_read_test_write"]
),
) )
return ReadTestWriteResult(success=result["success"], reads=result["data"]) return ReadTestWriteResult(success=result["success"], reads=result["data"])
else: else:
@ -889,7 +958,7 @@ class StorageClientMutables:
) )
@async_to_deferred @async_to_deferred
async def list_shares(self, storage_index: bytes) -> set[int]: async def list_shares(self, storage_index: bytes) -> Set[int]:
""" """
List the share numbers for a given storage index. List the share numbers for a given storage index.
""" """
@ -898,8 +967,11 @@ class StorageClientMutables:
) )
response = await self._client.request("GET", url) response = await self._client.request("GET", url)
if response.code == http.OK: if response.code == http.OK:
return await self._client.decode_cbor( return cast(
response, _SCHEMAS["mutable_list_shares"] Set[int],
await self._client.decode_cbor(
response, _SCHEMAS["mutable_list_shares"]
),
) )
else: else:
raise ClientException(response.code) raise ClientException(response.code)

View File

@ -12,6 +12,7 @@ from tempfile import TemporaryFile
from os import SEEK_END, SEEK_SET from os import SEEK_END, SEEK_SET
import mmap import mmap
from eliot import start_action
from cryptography.x509 import Certificate as CryptoCertificate from cryptography.x509 import Certificate as CryptoCertificate
from zope.interface import implementer from zope.interface import implementer
from klein import Klein from klein import Klein
@ -97,30 +98,50 @@ def _extract_secrets(
def _authorization_decorator(required_secrets): def _authorization_decorator(required_secrets):
""" """
Check the ``Authorization`` header, and extract ``X-Tahoe-Authorization`` 1. Check the ``Authorization`` header matches server swissnum.
headers and pass them in. 2. Extract ``X-Tahoe-Authorization`` headers and pass them in.
3. Log the request and response.
""" """
def decorator(f): def decorator(f):
@wraps(f) @wraps(f)
def route(self, request, *args, **kwargs): def route(self, request, *args, **kwargs):
if not timing_safe_compare( with start_action(
request.requestHeaders.getRawHeaders("Authorization", [""])[0].encode( action_type="allmydata:storage:http-server:handle-request",
"utf-8" method=request.method,
), path=request.path,
swissnum_auth_header(self._swissnum), ) as ctx:
): try:
request.setResponseCode(http.UNAUTHORIZED) # Check Authorization header:
return b"" if not timing_safe_compare(
authorization = request.requestHeaders.getRawHeaders( request.requestHeaders.getRawHeaders("Authorization", [""])[0].encode(
"X-Tahoe-Authorization", [] "utf-8"
) ),
try: swissnum_auth_header(self._swissnum),
secrets = _extract_secrets(authorization, required_secrets) ):
except ClientSecretsException: raise _HTTPError(http.UNAUTHORIZED)
request.setResponseCode(http.BAD_REQUEST)
return b"Missing required secrets" # Check secrets:
return f(self, request, secrets, *args, **kwargs) authorization = request.requestHeaders.getRawHeaders(
"X-Tahoe-Authorization", []
)
try:
secrets = _extract_secrets(authorization, required_secrets)
except ClientSecretsException:
raise _HTTPError(http.BAD_REQUEST)
# Run the business logic:
result = f(self, request, secrets, *args, **kwargs)
except _HTTPError as e:
# This isn't an error necessarily for logging purposes,
# it's an implementation detail, an easier way to set
# response codes.
ctx.add_success_fields(response_code=e.code)
ctx.finish()
raise
else:
ctx.add_success_fields(response_code=request.code)
return result
return route return route
@ -468,6 +489,21 @@ def read_range(
return d return d
def _add_error_handling(app: Klein):
"""Add exception handlers to a Klein app."""
@app.handle_errors(_HTTPError)
def _http_error(_, request, failure):
"""Handle ``_HTTPError`` exceptions."""
request.setResponseCode(failure.value.code)
return b""
@app.handle_errors(CDDLValidationError)
def _cddl_validation_error(_, request, failure):
"""Handle CDDL validation errors."""
request.setResponseCode(http.BAD_REQUEST)
return str(failure.value).encode("utf-8")
class HTTPServer(object): class HTTPServer(object):
""" """
A HTTP interface to the storage server. A HTTP interface to the storage server.
@ -475,18 +511,7 @@ class HTTPServer(object):
_app = Klein() _app = Klein()
_app.url_map.converters["storage_index"] = StorageIndexConverter _app.url_map.converters["storage_index"] = StorageIndexConverter
_add_error_handling(_app)
@_app.handle_errors(_HTTPError)
def _http_error(self, request, failure):
"""Handle ``_HTTPError`` exceptions."""
request.setResponseCode(failure.value.code)
return b""
@_app.handle_errors(CDDLValidationError)
def _cddl_validation_error(self, request, failure):
"""Handle CDDL validation errors."""
request.setResponseCode(http.BAD_REQUEST)
return str(failure.value).encode("utf-8")
def __init__( def __init__(
self, self,

View File

@ -34,7 +34,7 @@ from hyperlink import DecodedURL
from collections_extended import RangeMap from collections_extended import RangeMap
from twisted.internet.task import Clock, Cooperator from twisted.internet.task import Clock, Cooperator
from twisted.internet.interfaces import IReactorTime, IReactorFromThreads from twisted.internet.interfaces import IReactorTime, IReactorFromThreads
from twisted.internet.defer import CancelledError, Deferred from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
from twisted.web import http from twisted.web import http
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from werkzeug import routing from werkzeug import routing
@ -54,6 +54,7 @@ from ..storage.http_server import (
ClientSecretsException, ClientSecretsException,
_authorized_route, _authorized_route,
StorageIndexConverter, StorageIndexConverter,
_add_error_handling,
) )
from ..storage.http_client import ( from ..storage.http_client import (
StorageClient, StorageClient,
@ -253,6 +254,7 @@ class TestApp(object):
clock: IReactorTime clock: IReactorTime
_app = Klein() _app = Klein()
_add_error_handling(_app)
_swissnum = SWISSNUM_FOR_TEST # Match what the test client is using _swissnum = SWISSNUM_FOR_TEST # Match what the test client is using
@_authorized_route(_app, {Secrets.UPLOAD}, "/upload_secret", methods=["GET"]) @_authorized_route(_app, {Secrets.UPLOAD}, "/upload_secret", methods=["GET"])
@ -346,7 +348,7 @@ class CustomHTTPServerTests(SyncTestCase):
response = result_of( response = result_of(
self.client.request( self.client.request(
"GET", "GET",
"http://127.0.0.1/upload_secret", DecodedURL.from_text("http://127.0.0.1/upload_secret"),
) )
) )
self.assertEqual(response.code, 400) self.assertEqual(response.code, 400)
@ -354,7 +356,9 @@ class CustomHTTPServerTests(SyncTestCase):
# With secret, we're good. # With secret, we're good.
response = result_of( response = result_of(
self.client.request( self.client.request(
"GET", "http://127.0.0.1/upload_secret", upload_secret=b"MAGIC" "GET",
DecodedURL.from_text("http://127.0.0.1/upload_secret"),
upload_secret=b"MAGIC",
) )
) )
self.assertEqual(response.code, 200) self.assertEqual(response.code, 200)
@ -378,7 +382,7 @@ class CustomHTTPServerTests(SyncTestCase):
response = result_of( response = result_of(
self.client.request( self.client.request(
"GET", "GET",
f"http://127.0.0.1/bytes/{length}", DecodedURL.from_text(f"http://127.0.0.1/bytes/{length}"),
) )
) )
@ -399,7 +403,7 @@ class CustomHTTPServerTests(SyncTestCase):
response = result_of( response = result_of(
self.client.request( self.client.request(
"GET", "GET",
f"http://127.0.0.1/bytes/{length}", DecodedURL.from_text(f"http://127.0.0.1/bytes/{length}"),
) )
) )
@ -414,7 +418,7 @@ class CustomHTTPServerTests(SyncTestCase):
response = result_of( response = result_of(
self.client.request( self.client.request(
"GET", "GET",
"http://127.0.0.1/slowly_never_finish_result", DecodedURL.from_text("http://127.0.0.1/slowly_never_finish_result"),
) )
) )
@ -442,7 +446,7 @@ class CustomHTTPServerTests(SyncTestCase):
response = result_of( response = result_of(
self.client.request( self.client.request(
"GET", "GET",
"http://127.0.0.1/die", DecodedURL.from_text("http://127.0.0.1/die"),
) )
) )
@ -459,6 +463,7 @@ class Reactor(Clock):
Advancing the clock also runs any callbacks scheduled via callFromThread. Advancing the clock also runs any callbacks scheduled via callFromThread.
""" """
def __init__(self): def __init__(self):
Clock.__init__(self) Clock.__init__(self)
self._queue = Queue() self._queue = Queue()
@ -499,7 +504,9 @@ class HttpTestFixture(Fixture):
self.storage_server = StorageServer( self.storage_server = StorageServer(
self.tempdir.path, b"\x00" * 20, clock=self.clock self.tempdir.path, b"\x00" * 20, clock=self.clock
) )
self.http_server = HTTPServer(self.clock, self.storage_server, SWISSNUM_FOR_TEST) self.http_server = HTTPServer(
self.clock, self.storage_server, SWISSNUM_FOR_TEST
)
self.treq = StubTreq(self.http_server.get_resource()) self.treq = StubTreq(self.http_server.get_resource())
self.client = StorageClient( self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"), DecodedURL.from_text("http://127.0.0.1"),
@ -513,6 +520,7 @@ class HttpTestFixture(Fixture):
Like ``result_of``, but supports fake reactor and ``treq`` testing Like ``result_of``, but supports fake reactor and ``treq`` testing
infrastructure necessary to support asynchronous HTTP server endpoints. infrastructure necessary to support asynchronous HTTP server endpoints.
""" """
d = ensureDeferred(d)
result = [] result = []
error = [] error = []
d.addCallbacks(result.append, error.append) d.addCallbacks(result.append, error.append)

64
tox.ini
View File

@ -23,38 +23,34 @@ minversion = 2.4
[testenv] [testenv]
passenv = TAHOE_LAFS_* PIP_* SUBUNITREPORTER_* USERPROFILE HOMEDRIVE HOMEPATH passenv = TAHOE_LAFS_* PIP_* SUBUNITREPORTER_* USERPROFILE HOMEDRIVE HOMEPATH
# Get "certifi" to avoid bug #2913. Basically if a `setup_requires=...` causes
# a package to be installed (with setuptools) then it'll fail on certain
# platforms (travis's OX-X 10.12, Slackware 14.2) because PyPI's TLS
# requirements (TLS >= 1.2) are incompatible with the old TLS clients
# available to those systems. Installing it ahead of time (with pip) avoids
# this problem.
deps = deps =
# Pin all of these versions for the same reason you ever want to pin # We pull in certify *here* to avoid bug #2913. Basically if a
# anything: to prevent new releases with regressions from introducing # `setup_requires=...` causes a package to be installed (with setuptools)
# spurious failures into CI runs for whatever development work is # then it'll fail on certain platforms (travis's OX-X 10.12, Slackware
# happening at the time. The versions selected here are just the current # 14.2) because PyPI's TLS requirements (TLS >= 1.2) are incompatible with
# versions at the time. Bumping them to keep up with future releases is # the old TLS clients available to those systems. Installing it ahead of
# fine as long as those releases are known to actually work. # time (with pip) avoids this problem.
pip==22.0.3 #
setuptools==60.9.1 # We don't pin an exact version of it because it contains CA certificates
wheel==0.37.1 # which necessarily change over time. Pinning this is guaranteed to cause
subunitreporter==22.2.0 # things to break eventually as old certificates expire and as new ones
# As an exception, we don't pin certifi because it contains CA # are used in the wild that aren't present in whatever version we pin.
# certificates which necessarily change over time. Pinning this is # Hopefully there won't be functionality regressions in new releases of
# guaranteed to cause things to break eventually as old certificates # this package that cause us the kind of suffering we're trying to avoid
# expire and as new ones are used in the wild that aren't present in # with the above pins.
# whatever version we pin. Hopefully there won't be functionality certifi
# regressions in new releases of this package that cause us the kind of
# suffering we're trying to avoid with the above pins.
certifi
# We add usedevelop=False because testing against a true installation gives # We add usedevelop=False because testing against a true installation gives
# more useful results. # more useful results.
usedevelop = False usedevelop = False
# We use extras=test to get things like "mock" that are required for our unit
# tests. extras =
extras = test # Get general testing environment dependencies so we can run the tests
# how we like.
testenv
# And get all of the test suite's actual direct Python dependencies.
test
setenv = setenv =
# Define TEST_SUITE in the environment as an aid to constructing the # Define TEST_SUITE in the environment as an aid to constructing the
@ -99,10 +95,12 @@ commands =
[testenv:codechecks] [testenv:codechecks]
basepython = python3 basepython = python3
skip_install = true
deps = deps =
# Make sure we get a version of PyLint that respects config, and isn't too # Pin a specific version so we get consistent outcomes; update this
# old. # occasionally:
pylint < 2.18, >2.14 ruff == 0.0.263
towncrier
# On macOS, git inside of towncrier needs $HOME. # On macOS, git inside of towncrier needs $HOME.
passenv = HOME passenv = HOME
setenv = setenv =
@ -110,13 +108,9 @@ setenv =
# entire codebase, including various pieces of supporting code. # entire codebase, including various pieces of supporting code.
DEFAULT_FILES=src integration static misc setup.py DEFAULT_FILES=src integration static misc setup.py
commands = commands =
flake8 {posargs:{env:DEFAULT_FILES}} ruff check {posargs:{env:DEFAULT_FILES}}
python misc/coding_tools/check-umids.py {posargs:{env:DEFAULT_FILES}} python misc/coding_tools/check-umids.py {posargs:{env:DEFAULT_FILES}}
python misc/coding_tools/check-debugging.py {posargs:{env:DEFAULT_FILES}} python misc/coding_tools/check-debugging.py {posargs:{env:DEFAULT_FILES}}
python misc/coding_tools/find-trailing-spaces.py -r {posargs:{env:DEFAULT_FILES}}
# PyLint has other useful checks, might want to enable them:
# http://pylint.pycqa.org/en/latest/technical_reference/features.html
pylint --disable=all --enable=cell-var-from-loop {posargs:{env:DEFAULT_FILES}}
# If towncrier.check fails, you forgot to add a towncrier news # If towncrier.check fails, you forgot to add a towncrier news
# fragment explaining the change in this branch. Create one at # fragment explaining the change in this branch. Create one at