Merge branch 'master' of github.com:tahoe-lafs/tahoe-lafs into 3816.improve-release-checklist

This commit is contained in:
fenn-cs 2022-01-24 15:46:09 +01:00
commit e34ebba4bc
29 changed files with 907 additions and 374 deletions

View File

@ -5,6 +5,20 @@ User-Visible Changes in Tahoe-LAFS
==================================
.. towncrier start line
Release 1.17.1 (2022-01-07)
'''''''''''''''''''''''''''
Bug Fixes
---------
- Fixed regression on Python 3 causing the JSON version of the Welcome page to sometimes produce a 500 error (`#3852 <https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3852>`_)
- Fixed regression on Python 3 where JSON HTTP POSTs failed to be processed. (`#3854 <https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3854>`_)
Misc/Other
----------
- `#3848 <https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3848>`_, `#3849 <https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3849>`_, `#3850 <https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3850>`_, `#3856 <https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3856>`_
Release 1.17.0 (2021-12-06)

View File

@ -28,15 +28,15 @@ To install Tahoe-LAFS on Windows:
3. Open the installer by double-clicking it. Select the **Add Python to PATH** check-box, then click **Install Now**.
4. Start PowerShell and enter the following command to verify python installation::
python --version
5. Enter the following command to install Tahoe-LAFS::
pip install tahoe-lafs
6. Verify installation by checking for the version::
tahoe --version
If you want to hack on Tahoe's source code, you can install Tahoe in a ``virtualenv`` on your Windows Machine. To learn more, see :doc:`install-on-windows`.
@ -56,13 +56,13 @@ If you are working on MacOS or a Linux distribution which does not have Tahoe-LA
* **pip**: Most python installations already include `pip`. However, if your installation does not, see `pip installation <https://pip.pypa.io/en/stable/installing/>`_.
2. Install Tahoe-LAFS using pip::
pip install tahoe-lafs
3. Verify installation by checking for the version::
tahoe --version
If you are looking to hack on the source code or run pre-release code, we recommend you install Tahoe-LAFS on a `virtualenv` instance. To learn more, see :doc:`install-on-linux`.
If you are looking to hack on the source code or run pre-release code, we recommend you install Tahoe-LAFS on a `virtualenv` instance. To learn more, see :doc:`install-on-linux`.
You can always write to the `tahoe-dev mailing list <https://lists.tahoe-lafs.org/mailman/listinfo/tahoe-dev>`_ or chat on the `Libera.chat IRC <irc://irc.libera.chat/%23tahoe-lafs>`_ if you are not able to get Tahoe-LAFS up and running on your deployment.

View File

@ -369,6 +369,19 @@ The authentication *type* used is ``Tahoe-LAFS``.
The swissnum from the NURL used to locate the storage service is used as the *credentials*.
If credentials are not presented or the swissnum is not associated with a storage service then no storage processing is performed and the request receives an ``401 UNAUTHORIZED`` response.
There are also, for some endpoints, secrets sent via ``X-Tahoe-Authorization`` headers.
If these are:
1. Missing.
2. The wrong length.
3. Not the expected kind of secret.
4. They are otherwise unparseable before they are actually semantically used.
the server will respond with ``400 BAD REQUEST``.
401 is not used because this isn't an authorization problem, this is a "you sent garbage and should know better" bug.
If authorization using the secret fails, then a ``401 UNAUTHORIZED`` response should be sent.
General
~~~~~~~

View File

@ -88,7 +88,6 @@ Create Branch and Apply Updates
- change the value given for `version` from `OLD.post1` to `NEW.post1`
- update "docs/known_issues.rst" if appropriate
- update "docs/Installation/install-tahoe.rst" references to the new release
- Push the branch to github
- Create a (draft) PR; this should trigger CI (note that github
doesn't let you create a PR without some changes on the branch so
@ -129,6 +128,11 @@ they will need to evaluate which contributors' signatures they trust.
- ``tox -e deprecations,upcoming-deprecations``
- clone to a clean, local checkout (to avoid extra files being included in the release)
- cd /tmp
- git clone /home/meejah/src/tahoe-lafs
- build tarballs
- tox -e tarballs
@ -176,14 +180,20 @@ need to be uploaded to https://tahoe-lafs.org in `~source/downloads`
- secure-copy all release artifacts to the download area on the
tahoe-lafs.org host machine. `~source/downloads` on there maps to
https://tahoe-lafs.org/downloads/ on the Web.
- scp dist/*1.15.0* username@tahoe-lafs.org:/home/source/downloads
https://tahoe-lafs.org/downloads/ on the Web:
- scp dist/*1.15.0* username@tahoe-lafs.org:/home/source/downloads
- the following developers have access to do this:
- exarkun
- meejah
- warner
Push the signed tag to the main repository:
- git push origin tahoe-lafs-1.17.1
For the actual release, the tarball and signature files need to be
uploaded to PyPI as well.

0
newsfragments/3859.minor Normal file
View File

View File

@ -7,7 +7,7 @@
, html5lib, pyutil, distro, configparser, klein, cbor2
}:
python.pkgs.buildPythonPackage rec {
# Most of the time this is not exactly the release version (eg 1.17.0).
# Most of the time this is not exactly the release version (eg 1.17.1).
# Give it a `post` component to make it look newer than the release version
# and we'll bump this up at the time of each release.
#
@ -20,7 +20,7 @@ python.pkgs.buildPythonPackage rec {
# is not a reproducable artifact (in the sense of "reproducable builds") so
# it is excluded from the source tree by default. When it is included, the
# package tends to be frequently spuriously rebuilt.
version = "1.17.0.post1";
version = "1.17.1.post1";
name = "tahoe-lafs-${version}";
src = lib.cleanSourceWith {
src = ../.;

View File

@ -1,6 +1,6 @@
ANNOUNCING Tahoe, the Least-Authority File Store, v1.17.0
ANNOUNCING Tahoe, the Least-Authority File Store, v1.17.1
The Tahoe-LAFS team is pleased to announce version 1.17.0 of
The Tahoe-LAFS team is pleased to announce version 1.17.1 of
Tahoe-LAFS, an extremely reliable decentralized storage
system. Get it with "pip install tahoe-lafs", or download a
tarball here:
@ -15,19 +15,12 @@ unique security and fault-tolerance properties:
https://tahoe-lafs.readthedocs.org/en/latest/about.html
The previous stable release of Tahoe-LAFS was v1.16.0, released on
October 19, 2021.
The previous stable release of Tahoe-LAFS was v1.17.0, released on
December 6, 2021.
This release fixes several security issues raised as part of an audit
by Cure53. We developed fixes for these issues in a private
repository. Shortly after this release, public tickets will be updated
with further information (along with, of course, all the code).
This release fixes two Python3-releated regressions and 4 minor bugs.
There is also OpenMetrics support now and several bug fixes.
In all, 46 issues have been fixed since the last release.
Please see ``NEWS.rst`` for a more complete list of changes.
Please see ``NEWS.rst`` [1] for a complete list of changes.
WHAT IS IT GOOD FOR?
@ -66,12 +59,12 @@ to v1.0 (which was released March 25, 2008). Clients from this
release can read files and directories produced by clients of
all versions since v1.0.
Network connections are limited by the Introducer protocol in
use. If the Introducer is running v1.10 or v1.11, then servers
from this release (v1.12) can serve clients of all versions
back to v1.0 . If it is running v1.12, then they can only
serve clients back to v1.10. Clients from this release can use
servers back to v1.10, but not older servers.
Network connections are limited by the Introducer protocol in use. If
the Introducer is running v1.10 or v1.11, then servers from this
release can serve clients of all versions back to v1.0 . If it is
running v1.12 or higher, then they can only serve clients back to
v1.10. Clients from this release can use servers back to v1.10, but
not older servers.
Except for the new optional MDMF format, we have not made any
intentional compatibility changes. However we do not yet have
@ -79,7 +72,7 @@ the test infrastructure to continuously verify that all new
versions are interoperable with previous versions. We intend
to build such an infrastructure in the future.
This is the twenty-first release in the version 1 series. This
This is the twenty-second release in the version 1 series. This
series of Tahoe-LAFS will be actively supported and maintained
for the foreseeable future, and future versions of Tahoe-LAFS
will retain the ability to read and write files compatible
@ -139,7 +132,7 @@ Of Fame" [13].
ACKNOWLEDGEMENTS
This is the eighteenth release of Tahoe-LAFS to be created
This is the nineteenth release of Tahoe-LAFS to be created
solely as a labor of love by volunteers. Thank you very much
to the team of "hackers in the public interest" who make
Tahoe-LAFS possible.
@ -147,16 +140,16 @@ Tahoe-LAFS possible.
meejah
on behalf of the Tahoe-LAFS team
December 6, 2021
January 7, 2022
Planet Earth
[1] https://github.com/tahoe-lafs/tahoe-lafs/blob/tahoe-lafs-1.17.0/NEWS.rst
[1] https://github.com/tahoe-lafs/tahoe-lafs/blob/tahoe-lafs-1.17.1/NEWS.rst
[2] https://github.com/tahoe-lafs/tahoe-lafs/blob/master/docs/known_issues.rst
[3] https://tahoe-lafs.org/trac/tahoe-lafs/wiki/RelatedProjects
[4] https://github.com/tahoe-lafs/tahoe-lafs/blob/tahoe-lafs-1.17.0/COPYING.GPL
[5] https://github.com/tahoe-lafs/tahoe-lafs/blob/tahoe-lafs-1.17.0/COPYING.TGPPL.rst
[6] https://tahoe-lafs.readthedocs.org/en/tahoe-lafs-1.17.0/INSTALL.html
[4] https://github.com/tahoe-lafs/tahoe-lafs/blob/tahoe-lafs-1.17.1/COPYING.GPL
[5] https://github.com/tahoe-lafs/tahoe-lafs/blob/tahoe-lafs-1.17.1/COPYING.TGPPL.rst
[6] https://tahoe-lafs.readthedocs.org/en/tahoe-lafs-1.17.1/INSTALL.html
[7] https://lists.tahoe-lafs.org/mailman/listinfo/tahoe-dev
[8] https://tahoe-lafs.org/trac/tahoe-lafs/roadmap
[9] https://github.com/tahoe-lafs/tahoe-lafs/blob/master/CREDITS

View File

@ -409,7 +409,9 @@ setup(name="tahoe-lafs", # also set in __init__.py
"html5lib",
"junitxml",
"tenacity",
"paramiko",
# Pin old version until
# https://github.com/paramiko/paramiko/issues/1961 is fixed.
"paramiko < 2.9",
"pytest-timeout",
# Does our OpenMetrics endpoint adhere to the spec:
"prometheus-client == 0.11.0",

View File

@ -36,7 +36,7 @@ from twisted.python.filepath import FilePath
import allmydata
from allmydata.crypto import rsa, ed25519
from allmydata.crypto.util import remove_prefix
from allmydata.storage.server import StorageServer
from allmydata.storage.server import StorageServer, FoolscapStorageServer
from allmydata import storage_client
from allmydata.immutable.upload import Uploader
from allmydata.immutable.offloaded import Helper
@ -834,7 +834,7 @@ class _Client(node.Node, pollmixin.PollMixin):
if anonymous_storage_enabled(self.config):
furl_file = self.config.get_private_path("storage.furl").encode(get_filesystem_encoding())
furl = self.tub.registerReference(ss, furlFile=furl_file)
furl = self.tub.registerReference(FoolscapStorageServer(ss), furlFile=furl_file)
announcement["anonymous-storage-FURL"] = furl
enabled_storage_servers = self._enable_storage_servers(

View File

@ -19,7 +19,7 @@ else:
from typing import Union
from treq.testing import StubTreq
import base64
from base64 import b64encode
# TODO Make sure to import Python version?
from cbor2 import loads
@ -44,7 +44,7 @@ def _decode_cbor(response):
def swissnum_auth_header(swissnum): # type: (bytes) -> bytes
"""Return value for ``Authentication`` header."""
return b"Tahoe-LAFS " + base64.b64encode(swissnum).strip()
return b"Tahoe-LAFS " + b64encode(swissnum).strip()
class StorageClient(object):
@ -68,12 +68,25 @@ class StorageClient(object):
)
return headers
def _request(self, method, url, secrets, **kwargs):
"""
Like ``treq.request()``, but additional argument of secrets mapping
``http_server.Secret`` to the bytes value of the secret.
"""
headers = self._get_headers()
for key, value in secrets.items():
headers.addRawHeader(
"X-Tahoe-Authorization",
b"%s %s" % (key.value.encode("ascii"), b64encode(value).strip())
)
return self._treq.request(method, url, headers=headers, **kwargs)
@inlineCallbacks
def get_version(self):
"""
Return the version metadata for the server.
"""
url = self._base_url.click("/v1/version")
response = yield self._treq.get(url, headers=self._get_headers())
response = yield self._request("GET", url, {})
decoded_response = yield _decode_cbor(response)
returnValue(decoded_response)

View File

@ -13,8 +13,12 @@ if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
else:
from typing import Dict, List, Set
from functools import wraps
from enum import Enum
from base64 import b64decode
from klein import Klein
from twisted.web import http
@ -24,40 +28,95 @@ from cbor2 import dumps
from .server import StorageServer
from .http_client import swissnum_auth_header
from ..util.hashutil import timing_safe_compare
def _authorization_decorator(f):
class Secrets(Enum):
"""Different kinds of secrets the client may send."""
LEASE_RENEW = "lease-renew-secret"
LEASE_CANCEL = "lease-cancel-secret"
UPLOAD = "upload-secret"
class ClientSecretsException(Exception):
"""The client did not send the appropriate secrets."""
def _extract_secrets(
header_values, required_secrets
): # type: (List[str], Set[Secrets]) -> Dict[Secrets, bytes]
"""
Check the ``Authorization`` header, and (TODO: in later revision of code)
extract ``X-Tahoe-Authorization`` headers and pass them in.
Given list of values of ``X-Tahoe-Authorization`` headers, and required
secrets, return dictionary mapping secrets to decoded values.
If too few secrets were given, or too many, a ``ClientSecretsException`` is
raised.
"""
string_key_to_enum = {e.value: e for e in Secrets}
result = {}
try:
for header_value in header_values:
string_key, string_value = header_value.strip().split(" ", 1)
key = string_key_to_enum[string_key]
value = b64decode(string_value)
if key in (Secrets.LEASE_CANCEL, Secrets.LEASE_RENEW) and len(value) != 32:
raise ClientSecretsException("Lease secrets must be 32 bytes long")
result[key] = value
except (ValueError, KeyError):
raise ClientSecretsException("Bad header value(s): {}".format(header_values))
if result.keys() != required_secrets:
raise ClientSecretsException(
"Expected {} secrets, got {}".format(required_secrets, result.keys())
)
return result
def _authorization_decorator(required_secrets):
"""
Check the ``Authorization`` header, and extract ``X-Tahoe-Authorization``
headers and pass them in.
"""
@wraps(f)
def route(self, request, *args, **kwargs):
if request.requestHeaders.getRawHeaders("Authorization", [None])[0] != str(
swissnum_auth_header(self._swissnum), "ascii"
):
request.setResponseCode(http.UNAUTHORIZED)
return b""
# authorization = request.requestHeaders.getRawHeaders("X-Tahoe-Authorization", [])
# For now, just a placeholder:
authorization = None
return f(self, request, authorization, *args, **kwargs)
def decorator(f):
@wraps(f)
def route(self, request, *args, **kwargs):
if not timing_safe_compare(
request.requestHeaders.getRawHeaders("Authorization", [None])[0].encode(
"utf-8"
),
swissnum_auth_header(self._swissnum),
):
request.setResponseCode(http.UNAUTHORIZED)
return b""
authorization = request.requestHeaders.getRawHeaders(
"X-Tahoe-Authorization", []
)
try:
secrets = _extract_secrets(authorization, required_secrets)
except ClientSecretsException:
request.setResponseCode(400)
return b""
return f(self, request, secrets, *args, **kwargs)
return route
return route
return decorator
def _authorized_route(app, *route_args, **route_kwargs):
def _authorized_route(app, required_secrets, *route_args, **route_kwargs):
"""
Like Klein's @route, but with additional support for checking the
``Authorization`` header as well as ``X-Tahoe-Authorization`` headers. The
latter will (TODO: in later revision of code) get passed in as second
argument to wrapped functions.
latter will get passed in as second argument to wrapped functions, a
dictionary mapping a ``Secret`` value to the uploaded secret.
:param required_secrets: Set of required ``Secret`` types.
"""
def decorator(f):
@app.route(*route_args, **route_kwargs)
@_authorization_decorator
@_authorization_decorator(required_secrets)
def handle_route(*args, **kwargs):
return f(*args, **kwargs)
@ -89,6 +148,9 @@ class HTTPServer(object):
# TODO if data is big, maybe want to use a temporary file eventually...
return dumps(data)
@_authorized_route(_app, "/v1/version", methods=["GET"])
##### Generic APIs #####
@_authorized_route(_app, set(), "/v1/version", methods=["GET"])
def version(self, request, authorization):
return self._cbor(request, self._storage_server.remote_get_version())
"""Return version information."""
return self._cbor(request, self._storage_server.get_version())

View File

@ -352,8 +352,10 @@ class ShareFile(object):
return space_freed
@implementer(RIBucketWriter)
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
class BucketWriter(object):
"""
Keep track of the process of writing to a ShareFile.
"""
def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock):
self.ss = ss
@ -373,7 +375,7 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def allocated_size(self):
return self._max_size
def remote_write(self, offset, data):
def write(self, offset, data):
# Delay the timeout, since we received data:
self._timeout.reset(30 * 60)
start = self._clock.seconds()
@ -397,9 +399,6 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self.ss.add_latency("write", self._clock.seconds() - start)
self.ss.count("write")
def remote_close(self):
self.close()
def close(self):
precondition(not self.closed)
self._timeout.cancel()
@ -451,13 +450,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
def remote_abort(self):
def abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
self.ss.count("abort")
def abort(self):
if self.closed:
return
@ -480,8 +476,28 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self._timeout.cancel()
@implementer(RIBucketReader)
class BucketReader(Referenceable): # type: ignore # warner/foolscap#78
@implementer(RIBucketWriter)
class FoolscapBucketWriter(Referenceable): # type: ignore # warner/foolscap#78
"""
Foolscap-specific BucketWriter.
"""
def __init__(self, bucket_writer):
self._bucket_writer = bucket_writer
def remote_write(self, offset, data):
return self._bucket_writer.write(offset, data)
def remote_close(self):
return self._bucket_writer.close()
def remote_abort(self):
return self._bucket_writer.abort()
class BucketReader(object):
"""
Manage the process for reading from a ``ShareFile``.
"""
def __init__(self, ss, sharefname, storage_index=None, shnum=None):
self.ss = ss
@ -496,15 +512,31 @@ class BucketReader(Referenceable): # type: ignore # warner/foolscap#78
),
self.shnum)
def remote_read(self, offset, length):
def read(self, offset, length):
start = time.time()
data = self._share_file.read_share_data(offset, length)
self.ss.add_latency("read", time.time() - start)
self.ss.count("read")
return data
def advise_corrupt_share(self, reason):
return self.ss.advise_corrupt_share(b"immutable",
self.storage_index,
self.shnum,
reason)
@implementer(RIBucketReader)
class FoolscapBucketReader(Referenceable): # type: ignore # warner/foolscap#78
"""
Foolscap wrapper for ``BucketReader``
"""
def __init__(self, bucket_reader):
self._bucket_reader = bucket_reader
def remote_read(self, offset, length):
return self._bucket_reader.read(offset, length)
def remote_advise_corrupt_share(self, reason):
return self.ss.remote_advise_corrupt_share(b"immutable",
self.storage_index,
self.shnum,
reason)
return self._bucket_reader.advise_corrupt_share(reason)

View File

@ -12,7 +12,7 @@ if PY2:
# strings. Omit bytes so we don't leak future's custom bytes.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401
else:
from typing import Dict
from typing import Dict, Tuple
import os, re
@ -32,7 +32,10 @@ from allmydata.storage.lease import LeaseInfo
from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
create_mutable_sharefile
from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
from allmydata.storage.immutable import (
ShareFile, BucketWriter, BucketReader, FoolscapBucketWriter,
FoolscapBucketReader,
)
from allmydata.storage.crawler import BucketCountingCrawler
from allmydata.storage.expirer import LeaseCheckingCrawler
@ -55,10 +58,10 @@ NUM_RE=re.compile("^[0-9]+$")
DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
@implementer(RIStorageServer, IStatsProducer)
class StorageServer(service.MultiService, Referenceable):
@implementer(IStatsProducer)
class StorageServer(service.MultiService):
"""
A filesystem-based implementation of ``RIStorageServer``.
Implement the business logic for the storage server.
"""
name = 'storage'
# only the tests change this to anything else
@ -125,16 +128,11 @@ class StorageServer(service.MultiService, Referenceable):
self.lease_checker.setServiceParent(self)
self._clock = clock
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
# to connection: when disconnection happens, the BucketWriters are
# removed. For HTTP, this makes no sense, so there will be
# timeout-based cleanup; see
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807.
# Map in-progress filesystem path -> BucketWriter:
self._bucket_writers = {} # type: Dict[str,BucketWriter]
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
# These callables will be called with BucketWriters that closed:
self._call_on_bucket_writer_close = []
def stopService(self):
# Cancel any in-progress uploads:
@ -263,7 +261,7 @@ class StorageServer(service.MultiService, Referenceable):
space += bw.allocated_size()
return space
def remote_get_version(self):
def get_version(self):
remaining_space = self.get_available_space()
if remaining_space is None:
# We're on a platform that has no API to get disk stats.
@ -284,7 +282,7 @@ class StorageServer(service.MultiService, Referenceable):
}
return version
def _allocate_buckets(self, storage_index,
def allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
owner_num=0, renew_leases=True):
@ -370,21 +368,6 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("allocate", self._clock.seconds() - start)
return set(alreadygot), bucketwriters
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
return alreadygot, bucketwriters
def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f:
@ -400,8 +383,7 @@ class StorageServer(service.MultiService, Referenceable):
continue # non-sharefile
yield sf
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
def add_lease(self, storage_index, renew_secret, cancel_secret, owner_num=1):
start = self._clock.seconds()
self.count("add-lease")
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
@ -415,7 +397,7 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("add-lease", self._clock.seconds() - start)
return None
def remote_renew_lease(self, storage_index, renew_secret):
def renew_lease(self, storage_index, renew_secret):
start = self._clock.seconds()
self.count("renew")
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
@ -431,9 +413,14 @@ class StorageServer(service.MultiService, Referenceable):
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._bucket_writers[bw.incominghome]
if bw in self._bucket_writer_disconnect_markers:
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
canary.dontNotifyOnDisconnect(disconnect_marker)
for handler in self._call_on_bucket_writer_close:
handler(bw)
def register_bucket_writer_close_handler(self, handler):
"""
The handler will be called with any ``BucketWriter`` that closes.
"""
self._call_on_bucket_writer_close.append(handler)
def _get_bucket_shares(self, storage_index):
"""Return a list of (shnum, pathname) tuples for files that hold
@ -449,7 +436,7 @@ class StorageServer(service.MultiService, Referenceable):
# Commonly caused by there being no buckets at all.
pass
def remote_get_buckets(self, storage_index):
def get_buckets(self, storage_index):
start = self._clock.seconds()
self.count("get")
si_s = si_b2a(storage_index)
@ -641,7 +628,7 @@ class StorageServer(service.MultiService, Referenceable):
secrets,
test_and_write_vectors,
read_vector,
renew_leases,
renew_leases=True,
):
"""
Read data from shares and conditionally write some data to them.
@ -699,18 +686,6 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("writev", self._clock.seconds() - start)
return (testv_is_good, read_data)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
read_vector):
return self.slot_testv_and_readv_and_writev(
storage_index,
secrets,
test_and_write_vectors,
read_vector,
renew_leases=True,
)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
owner_num=0):
(write_enabler, renew_secret, cancel_secret) = secrets
@ -721,7 +696,7 @@ class StorageServer(service.MultiService, Referenceable):
self)
return share
def remote_slot_readv(self, storage_index, shares, readv):
def slot_readv(self, storage_index, shares, readv):
start = self._clock.seconds()
self.count("readv")
si_s = si_b2a(storage_index)
@ -763,8 +738,8 @@ class StorageServer(service.MultiService, Referenceable):
return True
return False
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
def advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
# This is a remote API, I believe, so this has to be bytes for legacy
# protocol backwards compatibility reasons.
assert isinstance(share_type, bytes)
@ -804,6 +779,90 @@ class StorageServer(service.MultiService, Referenceable):
return None
@implementer(RIStorageServer)
class FoolscapStorageServer(Referenceable): # type: ignore # warner/foolscap#78
"""
A filesystem-based implementation of ``RIStorageServer``.
For Foolscap, BucketWriter lifetime is tied to connection: when
disconnection happens, the BucketWriters are removed.
"""
name = 'storage'
def __init__(self, storage_server): # type: (StorageServer) -> None
self._server = storage_server
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,Tuple[IRemoteReference, object]]
self._server.register_bucket_writer_close_handler(self._bucket_writer_closed)
def _bucket_writer_closed(self, bw):
if bw in self._bucket_writer_disconnect_markers:
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
canary.dontNotifyOnDisconnect(disconnect_marker)
def remote_get_version(self):
return self._server.get_version()
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._server.allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
# Wrap BucketWriters with Foolscap adapter:
bucketwriters = {
k: FoolscapBucketWriter(bw)
for (k, bw) in bucketwriters.items()
}
return alreadygot, bucketwriters
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
return self._server.add_lease(storage_index, renew_secret, cancel_secret)
def remote_renew_lease(self, storage_index, renew_secret):
return self._server.renew_lease(storage_index, renew_secret)
def remote_get_buckets(self, storage_index):
return {
k: FoolscapBucketReader(bucket)
for (k, bucket) in self._server.get_buckets(storage_index).items()
}
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
read_vector):
return self._server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
test_and_write_vectors,
read_vector,
renew_leases=True,
)
def remote_slot_readv(self, storage_index, shares, readv):
return self._server.slot_readv(storage_index, shares, readv)
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
reason):
return self._server.advise_corrupt_share(share_type, storage_index, shnum,
reason)
CORRUPTION_REPORT_FORMAT = """\
report: Share Corruption
type: {type}

View File

@ -1,8 +1,4 @@
from .common_util import (
FakeCanary,
)
def upload_immutable(storage_server, storage_index, renew_secret, cancel_secret, shares):
"""
Synchronously upload some immutable shares to a ``StorageServer``.
@ -20,17 +16,16 @@ def upload_immutable(storage_server, storage_index, renew_secret, cancel_secret,
:return: ``None``
"""
already, writers = storage_server.remote_allocate_buckets(
already, writers = storage_server.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
shares.keys(),
len(next(iter(shares.values()))),
canary=FakeCanary(),
)
for shnum, writer in writers.items():
writer.remote_write(0, shares[shnum])
writer.remote_close()
writer.write(0, shares[shnum])
writer.close()
def upload_mutable(storage_server, storage_index, secrets, shares):
@ -57,7 +52,7 @@ def upload_mutable(storage_server, storage_index, secrets, shares):
}
read_vector = []
storage_server.remote_slot_testv_and_readv_and_writev(
storage_server.slot_testv_and_readv_and_writev(
storage_index,
secrets,
test_and_write_vectors,

View File

@ -50,7 +50,9 @@ from allmydata.util.assertutil import _assert
from allmydata import uri as tahoe_uri
from allmydata.client import _Client
from allmydata.storage.server import StorageServer, storage_index_to_dir
from allmydata.storage.server import (
StorageServer, storage_index_to_dir, FoolscapStorageServer,
)
from allmydata.util import fileutil, idlib, hashutil
from allmydata.util.hashutil import permute_server_hash
from allmydata.util.fileutil import abspath_expanduser_unicode
@ -417,7 +419,7 @@ class NoNetworkGrid(service.MultiService):
ss.setServiceParent(middleman)
serverid = ss.my_nodeid
self.servers_by_number[i] = ss
wrapper = wrap_storage_server(ss)
wrapper = wrap_storage_server(FoolscapStorageServer(ss))
self.wrappers_by_id[serverid] = wrapper
self.proxies_by_id[serverid] = NoNetworkServer(serverid, wrapper)
self.rebuild_serverlist()

View File

@ -773,13 +773,13 @@ class AddLease(GridTestMixin, unittest.TestCase):
d.addCallback(_check_cr, "mutable-normal")
really_did_break = []
# now break the server's remote_add_lease call
# now break the server's add_lease call
def _break_add_lease(ign):
def broken_add_lease(*args, **kwargs):
really_did_break.append(1)
raise KeyError("intentional failure, should be ignored")
assert self.g.servers_by_number[0].remote_add_lease
self.g.servers_by_number[0].remote_add_lease = broken_add_lease
assert self.g.servers_by_number[0].add_lease
self.g.servers_by_number[0].add_lease = broken_add_lease
d.addCallback(_break_add_lease)
# and confirm that the files still look healthy

View File

@ -601,7 +601,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
"enabled = true\n")
c = yield client.create_client(basedir)
ss = c.getServiceNamed("storage")
verdict = ss.remote_get_version()
verdict = ss.get_version()
self.failUnlessReallyEqual(verdict[b"application-version"],
allmydata.__full_version__.encode("ascii"))
self.failIfEqual(str(allmydata.__version__), "unknown")

View File

@ -27,7 +27,7 @@ from allmydata.util import fileutil, hashutil, pollmixin
from allmydata.storage.server import StorageServer, si_b2a
from allmydata.storage.crawler import ShareCrawler, TimeSliceExceeded
from allmydata.test.common_util import StallMixin, FakeCanary
from allmydata.test.common_util import StallMixin
class BucketEnumeratingCrawler(ShareCrawler):
cpu_slice = 500 # make sure it can complete in a single slice
@ -124,12 +124,12 @@ class Basic(unittest.TestCase, StallMixin, pollmixin.PollMixin):
def write(self, i, ss, serverid, tail=0):
si = self.si(i)
si = si[:-1] + bytes(bytearray((tail,)))
had,made = ss.remote_allocate_buckets(si,
self.rs(i, serverid),
self.cs(i, serverid),
set([0]), 99, FakeCanary())
made[0].remote_write(0, b"data")
made[0].remote_close()
had,made = ss.allocate_buckets(si,
self.rs(i, serverid),
self.cs(i, serverid),
set([0]), 99)
made[0].write(0, b"data")
made[0].close()
return si_b2a(si)
def test_immediate(self):

View File

@ -39,6 +39,7 @@ from allmydata.crypto import aes
from allmydata.storage.server import (
si_b2a,
StorageServer,
FoolscapStorageServer,
)
from allmydata.storage_client import StorageFarmBroker
from allmydata.immutable.layout import (
@ -427,7 +428,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase):
"""
storage_index = b"a" * 16
serverid = b"b" * 20
storage = StorageServer(self.mktemp(), serverid)
storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid))
rref_without_ueb = LocalWrapper(storage, fireNow)
yield write_bad_share(rref_without_ueb, storage_index)
server_without_ueb = NoNetworkServer(serverid, rref_without_ueb)
@ -451,7 +452,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase):
"""
storage_index = b"a" * 16
serverid = b"b" * 20
storage = StorageServer(self.mktemp(), serverid)
storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid))
rref_with_ueb = LocalWrapper(storage, fireNow)
ueb = {
"needed_shares": 2,
@ -487,7 +488,7 @@ class CHKCheckerAndUEBFetcherTests(SyncTestCase):
in [b"b", b"c"]
)
storages = list(
StorageServer(self.mktemp(), serverid)
FoolscapStorageServer(StorageServer(self.mktemp(), serverid))
for serverid
in serverids
)

View File

@ -73,7 +73,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
def _copy_share(self, share, to_server):
(sharenum, sharefile) = share
(id, ss) = to_server
shares_dir = os.path.join(ss.original.storedir, "shares")
shares_dir = os.path.join(ss.original._server.storedir, "shares")
si = uri.from_string(self.uri).get_storage_index()
si_dir = os.path.join(shares_dir, storage_index_to_dir(si))
if not os.path.exists(si_dir):
@ -82,7 +82,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
shutil.copy(sharefile, new_sharefile)
self.shares = self.find_uri_shares(self.uri)
# Make sure that the storage server has the share.
self.failUnless((sharenum, ss.original.my_nodeid, new_sharefile)
self.failUnless((sharenum, ss.original._server.my_nodeid, new_sharefile)
in self.shares)
def _corrupt_share(self, share, corruptor_func):

View File

@ -39,13 +39,18 @@ from hypothesis import given, strategies
import itertools
from allmydata import interfaces
from allmydata.util import fileutil, hashutil, base32
from allmydata.storage.server import StorageServer, DEFAULT_RENEWAL_TIME
from allmydata.storage.server import (
StorageServer, DEFAULT_RENEWAL_TIME, FoolscapStorageServer,
)
from allmydata.storage.shares import get_share_file
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.mutable_schema import (
ALL_SCHEMAS as ALL_MUTABLE_SCHEMAS,
)
from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
from allmydata.storage.immutable import (
BucketWriter, BucketReader, ShareFile, FoolscapBucketWriter,
FoolscapBucketReader,
)
from allmydata.storage.immutable_schema import (
ALL_SCHEMAS as ALL_IMMUTABLE_SCHEMAS,
)
@ -157,25 +162,25 @@ class Bucket(unittest.TestCase):
def test_create(self):
incoming, final = self.make_workdir("test_create")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*25)
bw.remote_write(75, b"d"*7)
bw.remote_close()
bw.write(0, b"a"*25)
bw.write(25, b"b"*25)
bw.write(50, b"c"*25)
bw.write(75, b"d"*7)
bw.close()
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*7) # last block may be short
bw.remote_close()
bw.write(0, b"a"*25)
bw.write(25, b"b"*25)
bw.write(50, b"c"*7) # last block may be short
bw.close()
# now read from it
br = BucketReader(self, bw.finalhome)
self.failUnlessEqual(br.remote_read(0, 25), b"a"*25)
self.failUnlessEqual(br.remote_read(25, 25), b"b"*25)
self.failUnlessEqual(br.remote_read(50, 7), b"c"*7)
self.failUnlessEqual(br.read(0, 25), b"a"*25)
self.failUnlessEqual(br.read(25, 25), b"b"*25)
self.failUnlessEqual(br.read(50, 7), b"c"*7)
def test_write_past_size_errors(self):
"""Writing beyond the size of the bucket throws an exception."""
@ -185,7 +190,7 @@ class Bucket(unittest.TestCase):
)
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
with self.assertRaises(DataTooLargeError):
bw.remote_write(offset, b"a" * length)
bw.write(offset, b"a" * length)
@given(
maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
@ -205,25 +210,25 @@ class Bucket(unittest.TestCase):
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, expected_data[10:20])
bw.remote_write(30, expected_data[30:40])
bw.remote_write(50, expected_data[50:60])
bw.write(10, expected_data[10:20])
bw.write(30, expected_data[30:40])
bw.write(50, expected_data[50:60])
# Then, an overlapping write but with matching data:
bw.remote_write(
bw.write(
maybe_overlapping_offset,
expected_data[
maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length
]
)
# Now fill in the holes:
bw.remote_write(0, expected_data[0:10])
bw.remote_write(20, expected_data[20:30])
bw.remote_write(40, expected_data[40:50])
bw.remote_write(60, expected_data[60:])
bw.remote_close()
bw.write(0, expected_data[0:10])
bw.write(20, expected_data[20:30])
bw.write(40, expected_data[40:50])
bw.write(60, expected_data[60:])
bw.close()
br = BucketReader(self, bw.finalhome)
self.assertEqual(br.remote_read(0, length), expected_data)
self.assertEqual(br.read(0, length), expected_data)
@given(
@ -243,21 +248,21 @@ class Bucket(unittest.TestCase):
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, b"1" * 10)
bw.remote_write(30, b"1" * 10)
bw.remote_write(50, b"1" * 10)
bw.write(10, b"1" * 10)
bw.write(30, b"1" * 10)
bw.write(50, b"1" * 10)
# Then, write something that might overlap with some of them, but
# conflicts. Then fill in holes left by first three writes. Conflict is
# inevitable.
with self.assertRaises(ConflictingWriteError):
bw.remote_write(
bw.write(
maybe_overlapping_offset,
b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset),
)
bw.remote_write(0, b"1" * 10)
bw.remote_write(20, b"1" * 10)
bw.remote_write(40, b"1" * 10)
bw.remote_write(60, b"1" * 40)
bw.write(0, b"1" * 10)
bw.write(20, b"1" * 10)
bw.write(40, b"1" * 10)
bw.write(60, b"1" * 40)
def test_read_past_end_of_share_data(self):
# test vector for immutable files (hard-coded contents of an immutable share
@ -302,15 +307,15 @@ class Bucket(unittest.TestCase):
# Now read from it.
br = BucketReader(mockstorageserver, final)
self.failUnlessEqual(br.remote_read(0, len(share_data)), share_data)
self.failUnlessEqual(br.read(0, len(share_data)), share_data)
# Read past the end of share data to get the cancel secret.
read_length = len(share_data) + len(ownernumber) + len(renewsecret) + len(cancelsecret)
result_of_read = br.remote_read(0, read_length)
result_of_read = br.read(0, read_length)
self.failUnlessEqual(result_of_read, share_data)
result_of_read = br.remote_read(0, len(share_data)+1)
result_of_read = br.read(0, len(share_data)+1)
self.failUnlessEqual(result_of_read, share_data)
def _assert_timeout_only_after_30_minutes(self, clock, bw):
@ -348,7 +353,7 @@ class Bucket(unittest.TestCase):
clock.advance(29 * 60)
# .. but we receive a write! So that should delay the timeout again to
# another 30 minutes.
bw.remote_write(0, b"hello")
bw.write(0, b"hello")
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_closing_cancels_timeout(self):
@ -402,7 +407,7 @@ class BucketProxy(unittest.TestCase):
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock())
rb = RemoteBucket(bw)
rb = RemoteBucket(FoolscapBucketWriter(bw))
return bw, rb, final
def make_lease(self):
@ -474,7 +479,7 @@ class BucketProxy(unittest.TestCase):
# now read everything back
def _start_reading(res):
br = BucketReader(self, sharefname)
rb = RemoteBucket(br)
rb = RemoteBucket(FoolscapBucketReader(br))
server = NoNetworkServer(b"abc", None)
rbp = rbp_class(rb, server, storage_index=b"")
self.failUnlessIn("to peer", repr(rbp))
@ -542,20 +547,20 @@ class Server(unittest.TestCase):
def test_declares_fixed_1528(self):
ss = self.create("test_declares_fixed_1528")
ver = ss.remote_get_version()
ver = ss.get_version()
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnless(sv1.get(b'prevents-read-past-end-of-share-data'), sv1)
def test_declares_maximum_share_sizes(self):
ss = self.create("test_declares_maximum_share_sizes")
ver = ss.remote_get_version()
ver = ss.get_version()
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnlessIn(b'maximum-immutable-share-size', sv1)
self.failUnlessIn(b'maximum-mutable-share-size', sv1)
def test_declares_available_space(self):
ss = self.create("test_declares_available_space")
ver = ss.remote_get_version()
ver = ss.get_version()
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnlessIn(b'available-space', sv1)
@ -566,7 +571,9 @@ class Server(unittest.TestCase):
"""
renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
return ss._allocate_buckets(
if isinstance(ss, FoolscapStorageServer):
ss = ss._server
return ss.allocate_buckets(
storage_index,
renew_secret, cancel_secret,
sharenums, size,
@ -590,12 +597,12 @@ class Server(unittest.TestCase):
shnum, bucket = list(writers.items())[0]
# This test is going to hammer your filesystem if it doesn't make a sparse file for this. :-(
bucket.remote_write(2**32, b"ab")
bucket.remote_close()
bucket.write(2**32, b"ab")
bucket.close()
readers = ss.remote_get_buckets(b"allocate")
readers = ss.get_buckets(b"allocate")
reader = readers[shnum]
self.failUnlessEqual(reader.remote_read(2**32, 2), b"ab")
self.failUnlessEqual(reader.read(2**32, 2), b"ab")
def test_dont_overfill_dirs(self):
"""
@ -606,8 +613,8 @@ class Server(unittest.TestCase):
ss = self.create("test_dont_overfill_dirs")
already, writers = self.allocate(ss, b"storageindex", [0], 10)
for i, wb in writers.items():
wb.remote_write(0, b"%10d" % i)
wb.remote_close()
wb.write(0, b"%10d" % i)
wb.close()
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
"shares")
children_of_storedir = set(os.listdir(storedir))
@ -616,8 +623,8 @@ class Server(unittest.TestCase):
# chars the same as the first storageindex.
already, writers = self.allocate(ss, b"storageindey", [0], 10)
for i, wb in writers.items():
wb.remote_write(0, b"%10d" % i)
wb.remote_close()
wb.write(0, b"%10d" % i)
wb.close()
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
"shares")
new_children_of_storedir = set(os.listdir(storedir))
@ -627,8 +634,8 @@ class Server(unittest.TestCase):
ss = self.create("test_remove_incoming")
already, writers = self.allocate(ss, b"vid", list(range(3)), 10)
for i,wb in writers.items():
wb.remote_write(0, b"%10d" % i)
wb.remote_close()
wb.write(0, b"%10d" % i)
wb.close()
incoming_share_dir = wb.incominghome
incoming_bucket_dir = os.path.dirname(incoming_share_dir)
incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
@ -647,32 +654,32 @@ class Server(unittest.TestCase):
# Now abort the writers.
for writer in writers.values():
writer.remote_abort()
writer.abort()
self.failUnlessEqual(ss.allocated_size(), 0)
def test_allocate(self):
ss = self.create("test_allocate")
self.failUnlessEqual(ss.remote_get_buckets(b"allocate"), {})
self.failUnlessEqual(ss.get_buckets(b"allocate"), {})
already,writers = self.allocate(ss, b"allocate", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
# while the buckets are open, they should not count as readable
self.failUnlessEqual(ss.remote_get_buckets(b"allocate"), {})
self.failUnlessEqual(ss.get_buckets(b"allocate"), {})
# close the buckets
for i,wb in writers.items():
wb.remote_write(0, b"%25d" % i)
wb.remote_close()
wb.write(0, b"%25d" % i)
wb.close()
# aborting a bucket that was already closed is a no-op
wb.remote_abort()
wb.abort()
# now they should be readable
b = ss.remote_get_buckets(b"allocate")
b = ss.get_buckets(b"allocate")
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
self.failUnlessEqual(b[0].remote_read(0, 25), b"%25d" % 0)
self.failUnlessEqual(b[0].read(0, 25), b"%25d" % 0)
b_str = str(b[0])
self.failUnlessIn("BucketReader", b_str)
self.failUnlessIn("mfwgy33dmf2g 0", b_str)
@ -693,15 +700,15 @@ class Server(unittest.TestCase):
# aborting the writes should remove the tempfiles
for i,wb in writers2.items():
wb.remote_abort()
wb.abort()
already2,writers2 = self.allocate(ss, b"allocate", [2,3,4,5], 75)
self.failUnlessEqual(already2, set([0,1,2]))
self.failUnlessEqual(set(writers2.keys()), set([5]))
for i,wb in writers2.items():
wb.remote_abort()
wb.abort()
for i,wb in writers.items():
wb.remote_abort()
wb.abort()
def test_allocate_without_lease_renewal(self):
"""
@ -724,8 +731,8 @@ class Server(unittest.TestCase):
ss, storage_index, [0], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
writer.write(0, b"x")
writer.close()
# It should have a lease granted at the current time.
shares = dict(ss._get_bucket_shares(storage_index))
@ -747,8 +754,8 @@ class Server(unittest.TestCase):
ss, storage_index, [1], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
writer.write(0, b"x")
writer.close()
# The first share's lease expiration time is unchanged.
shares = dict(ss._get_bucket_shares(storage_index))
@ -764,8 +771,8 @@ class Server(unittest.TestCase):
def test_bad_container_version(self):
ss = self.create("test_bad_container_version")
a,w = self.allocate(ss, b"si1", [0], 10)
w[0].remote_write(0, b"\xff"*10)
w[0].remote_close()
w[0].write(0, b"\xff"*10)
w[0].close()
fn = os.path.join(ss.sharedir, storage_index_to_dir(b"si1"), "0")
f = open(fn, "rb+")
@ -773,17 +780,17 @@ class Server(unittest.TestCase):
f.write(struct.pack(">L", 0)) # this is invalid: minimum used is v1
f.close()
ss.remote_get_buckets(b"allocate")
ss.get_buckets(b"allocate")
e = self.failUnlessRaises(UnknownImmutableContainerVersionError,
ss.remote_get_buckets, b"si1")
ss.get_buckets, b"si1")
self.assertEqual(e.filename, fn)
self.assertEqual(e.version, 0)
self.assertIn("had unexpected version 0", str(e))
def test_disconnect(self):
# simulate a disconnection
ss = self.create("test_disconnect")
ss = FoolscapStorageServer(self.create("test_disconnect"))
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
canary = FakeCanary()
@ -831,7 +838,7 @@ class Server(unittest.TestCase):
renew_secret = b"R" * 32
cancel_secret = b"C" * 32
with self.assertRaises(interfaces.NoSpace):
ss.remote_add_lease(storage_index, renew_secret, cancel_secret)
ss.add_lease(storage_index, renew_secret, cancel_secret)
def test_reserved_space_mutable_lease(self):
"""
@ -864,13 +871,13 @@ class Server(unittest.TestCase):
# in the share header. Even if we're out of disk space, on a boring
# enough filesystem we can write these.
for i in range(3):
ss.remote_add_lease(storage_index, next(renew_secrets), cancel_secret)
ss.add_lease(storage_index, next(renew_secrets), cancel_secret)
# Having used all of the space for leases in the header, we would have
# to allocate storage for the next lease. Since there is no space
# available, this must fail instead.
with self.assertRaises(interfaces.NoSpace):
ss.remote_add_lease(storage_index, next(renew_secrets), cancel_secret)
ss.add_lease(storage_index, next(renew_secrets), cancel_secret)
def test_reserved_space(self):
@ -885,7 +892,7 @@ class Server(unittest.TestCase):
}
self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
ss = self.create("test_reserved_space", reserved_space=reserved)
ss = FoolscapStorageServer(self.create("test_reserved_space", reserved_space=reserved))
# 15k available, 10k reserved, leaves 5k for shares
# a newly created and filled share incurs this much overhead, beyond
@ -906,28 +913,28 @@ class Server(unittest.TestCase):
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed
self.failUnlessEqual(len(ss._bucket_writers), 3)
self.failUnlessEqual(len(ss._server._bucket_writers), 3)
# allocating 1001-byte shares only leaves room for one
canary2 = FakeCanary()
already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2)
self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._bucket_writers), 4)
self.failUnlessEqual(len(ss._server._bucket_writers), 4)
# we abandon the first set, so their provisional allocation should be
# returned
canary.disconnected()
self.failUnlessEqual(len(ss._bucket_writers), 1)
self.failUnlessEqual(len(ss._server._bucket_writers), 1)
# now we have a provisional allocation of 1001 bytes
# and we close the second set, so their provisional allocation should
# become real, long-term allocation, and grows to include the
# overhead.
for bw in writers2.values():
bw.remote_write(0, b"a"*25)
bw.remote_close()
self.failUnlessEqual(len(ss._bucket_writers), 0)
bw.write(0, b"a"*25)
bw.close()
self.failUnlessEqual(len(ss._server._bucket_writers), 0)
# this also changes the amount reported as available by call_get_disk_stats
allocated = 1001 + OVERHEAD + LEASE_SIZE
@ -944,12 +951,12 @@ class Server(unittest.TestCase):
canary=canary3,
)
self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._bucket_writers), 39)
self.failUnlessEqual(len(ss._server._bucket_writers), 39)
canary3.disconnected()
self.failUnlessEqual(len(ss._bucket_writers), 0)
ss.disownServiceParent()
self.failUnlessEqual(len(ss._server._bucket_writers), 0)
ss._server.disownServiceParent()
del ss
def test_seek(self):
@ -978,24 +985,22 @@ class Server(unittest.TestCase):
Given a StorageServer, create a bucket with 5 shares and return renewal
and cancellation secrets.
"""
canary = FakeCanary()
sharenums = list(range(5))
size = 100
# Creating a bucket also creates a lease:
rs, cs = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
already, writers = ss.remote_allocate_buckets(storage_index, rs, cs,
sharenums, size, canary)
already, writers = ss.allocate_buckets(storage_index, rs, cs,
sharenums, size)
self.failUnlessEqual(len(already), expected_already)
self.failUnlessEqual(len(writers), expected_writers)
for wb in writers.values():
wb.remote_close()
wb.close()
return rs, cs
def test_leases(self):
ss = self.create("test_leases")
canary = FakeCanary()
sharenums = list(range(5))
size = 100
@ -1018,54 +1023,54 @@ class Server(unittest.TestCase):
# and a third lease, using add-lease
rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
ss.remote_add_lease(b"si1", rs2a, cs2a)
ss.add_lease(b"si1", rs2a, cs2a)
(lease1, lease2, lease3) = ss.get_leases(b"si1")
self.assertTrue(lease1.is_renew_secret(rs1))
self.assertTrue(lease2.is_renew_secret(rs2))
self.assertTrue(lease3.is_renew_secret(rs2a))
# add-lease on a missing storage index is silently ignored
self.assertIsNone(ss.remote_add_lease(b"si18", b"", b""))
self.assertIsNone(ss.add_lease(b"si18", b"", b""))
# check that si0 is readable
readers = ss.remote_get_buckets(b"si0")
readers = ss.get_buckets(b"si0")
self.failUnlessEqual(len(readers), 5)
# renew the first lease. Only the proper renew_secret should work
ss.remote_renew_lease(b"si0", rs0)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, b"si0", cs0)
self.failUnlessRaises(IndexError, ss.remote_renew_lease, b"si0", rs1)
ss.renew_lease(b"si0", rs0)
self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", cs0)
self.failUnlessRaises(IndexError, ss.renew_lease, b"si0", rs1)
# check that si0 is still readable
readers = ss.remote_get_buckets(b"si0")
readers = ss.get_buckets(b"si0")
self.failUnlessEqual(len(readers), 5)
# There is no such method as remote_cancel_lease for now -- see
# ticket #1528.
self.failIf(hasattr(ss, 'remote_cancel_lease'), \
"ss should not have a 'remote_cancel_lease' method/attribute")
self.failIf(hasattr(FoolscapStorageServer(ss), 'remote_cancel_lease'), \
"ss should not have a 'remote_cancel_lease' method/attribute")
# test overlapping uploads
rs3,cs3 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
rs4,cs4 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si3", rs3, cs3,
sharenums, size, canary)
already,writers = ss.allocate_buckets(b"si3", rs3, cs3,
sharenums, size)
self.failUnlessEqual(len(already), 0)
self.failUnlessEqual(len(writers), 5)
already2,writers2 = ss.remote_allocate_buckets(b"si3", rs4, cs4,
sharenums, size, canary)
already2,writers2 = ss.allocate_buckets(b"si3", rs4, cs4,
sharenums, size)
self.failUnlessEqual(len(already2), 0)
self.failUnlessEqual(len(writers2), 0)
for wb in writers.values():
wb.remote_close()
wb.close()
leases = list(ss.get_leases(b"si3"))
self.failUnlessEqual(len(leases), 1)
already3,writers3 = ss.remote_allocate_buckets(b"si3", rs4, cs4,
sharenums, size, canary)
already3,writers3 = ss.allocate_buckets(b"si3", rs4, cs4,
sharenums, size)
self.failUnlessEqual(len(already3), 5)
self.failUnlessEqual(len(writers3), 0)
@ -1090,7 +1095,7 @@ class Server(unittest.TestCase):
clock.advance(123456)
# Adding a lease with matching renewal secret just renews it:
ss.remote_add_lease(b"si0", renewal_secret, cancel_secret)
ss.add_lease(b"si0", renewal_secret, cancel_secret)
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.get_expiration_time(), 123 + 123456 + DEFAULT_RENEWAL_TIME)
@ -1126,14 +1131,14 @@ class Server(unittest.TestCase):
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
for i,wb in writers.items():
wb.remote_write(0, b"%25d" % i)
wb.remote_close()
wb.write(0, b"%25d" % i)
wb.close()
# since we discard the data, the shares should be present but sparse.
# Since we write with some seeks, the data we read back will be all
# zeros.
b = ss.remote_get_buckets(b"vid")
b = ss.get_buckets(b"vid")
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
self.failUnlessEqual(b[0].remote_read(0, 25), b"\x00" * 25)
self.failUnlessEqual(b[0].read(0, 25), b"\x00" * 25)
def test_reserved_space_advise_corruption(self):
"""
@ -1148,8 +1153,8 @@ class Server(unittest.TestCase):
ss.setServiceParent(self.sparent)
upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
ss.remote_advise_corrupt_share(b"immutable", b"si0", 0,
b"This share smells funny.\n")
ss.advise_corrupt_share(b"immutable", b"si0", 0,
b"This share smells funny.\n")
self.assertEqual(
[],
@ -1163,8 +1168,8 @@ class Server(unittest.TestCase):
si0_s = base32.b2a(b"si0")
upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
ss.remote_advise_corrupt_share(b"immutable", b"si0", 0,
b"This share smells funny.\n")
ss.advise_corrupt_share(b"immutable", b"si0", 0,
b"This share smells funny.\n")
reportdir = os.path.join(workdir, "corruption-advisories")
reports = os.listdir(reportdir)
self.failUnlessEqual(len(reports), 1)
@ -1183,12 +1188,12 @@ class Server(unittest.TestCase):
already,writers = self.allocate(ss, b"si1", [1], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([1]))
writers[1].remote_write(0, b"data")
writers[1].remote_close()
writers[1].write(0, b"data")
writers[1].close()
b = ss.remote_get_buckets(b"si1")
b = ss.get_buckets(b"si1")
self.failUnlessEqual(set(b.keys()), set([1]))
b[1].remote_advise_corrupt_share(b"This share tastes like dust.\n")
b[1].advise_corrupt_share(b"This share tastes like dust.\n")
reports = os.listdir(reportdir)
self.failUnlessEqual(len(reports), 2)
@ -1214,8 +1219,8 @@ class Server(unittest.TestCase):
upload_immutable(ss, b"si0", b"r" * 32, b"c" * 32, {0: b""})
# And try to submit a corruption advisory about a different share
ss.remote_advise_corrupt_share(b"immutable", b"si0", 1,
b"This share smells funny.\n")
ss.advise_corrupt_share(b"immutable", b"si0", 1,
b"This share smells funny.\n")
self.assertEqual(
[],
@ -1266,7 +1271,7 @@ class MutableServer(unittest.TestCase):
write_enabler = self.write_enabler(we_tag)
renew_secret = self.renew_secret(lease_tag)
cancel_secret = self.cancel_secret(lease_tag)
rstaraw = ss.remote_slot_testv_and_readv_and_writev
rstaraw = ss.slot_testv_and_readv_and_writev
testandwritev = dict( [ (shnum, ([], [], None) )
for shnum in sharenums ] )
readv = []
@ -1287,7 +1292,7 @@ class MutableServer(unittest.TestCase):
f.seek(0)
f.write(b"BAD MAGIC")
f.close()
read = ss.remote_slot_readv
read = ss.slot_readv
e = self.failUnlessRaises(UnknownMutableContainerVersionError,
read, b"si1", [0], [(0,10)])
self.assertEqual(e.filename, fn)
@ -1299,8 +1304,8 @@ class MutableServer(unittest.TestCase):
ss = self.create("test_container_size")
self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
set([0,1,2]), 100)
read = ss.remote_slot_readv
rstaraw = ss.remote_slot_testv_and_readv_and_writev
read = ss.slot_readv
rstaraw = ss.slot_testv_and_readv_and_writev
secrets = ( self.write_enabler(b"we1"),
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
@ -1380,7 +1385,7 @@ class MutableServer(unittest.TestCase):
# Also see if the server explicitly declares that it supports this
# feature.
ver = ss.remote_get_version()
ver = ss.get_version()
storage_v1_ver = ver[b"http://allmydata.org/tahoe/protocols/storage/v1"]
self.failUnless(storage_v1_ver.get(b"fills-holes-with-zero-bytes"))
@ -1398,7 +1403,7 @@ class MutableServer(unittest.TestCase):
self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
set([0,1,2]), 100)
read = ss.remote_slot_readv
read = ss.slot_readv
self.failUnlessEqual(read(b"si1", [0], [(0, 10)]),
{0: [b""]})
self.failUnlessEqual(read(b"si1", [], [(0, 10)]),
@ -1411,7 +1416,7 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
write = ss.slot_testv_and_readv_and_writev
answer = write(b"si1", secrets,
{0: ([], [(0,data)], None)},
[])
@ -1421,7 +1426,7 @@ class MutableServer(unittest.TestCase):
{0: [b"00000000001111111111"]})
self.failUnlessEqual(read(b"si1", [0], [(95,10)]),
{0: [b"99999"]})
#self.failUnlessEqual(s0.remote_get_length(), 100)
#self.failUnlessEqual(s0.get_length(), 100)
bad_secrets = (b"bad write enabler", secrets[1], secrets[2])
f = self.failUnlessRaises(BadWriteEnablerError,
@ -1455,8 +1460,8 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
read = ss.remote_slot_readv
write = ss.slot_testv_and_readv_and_writev
read = ss.slot_readv
def reset():
write(b"si1", secrets,
@ -1500,8 +1505,8 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
read = ss.remote_slot_readv
write = ss.slot_testv_and_readv_and_writev
read = ss.slot_readv
data = [(b"%d" % i) * 100 for i in range(3)]
rc = write(b"si1", secrets,
{0: ([], [(0,data[0])], None),
@ -1543,8 +1548,8 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1-%d" % n),
self.cancel_secret(b"we1-%d" % n) )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
read = ss.remote_slot_readv
write = ss.slot_testv_and_readv_and_writev
read = ss.slot_readv
rc = write(b"si1", secrets(0), {0: ([], [(0,data)], None)}, [])
self.failUnlessEqual(rc, (True, {}))
@ -1560,7 +1565,7 @@ class MutableServer(unittest.TestCase):
self.failUnlessEqual(len(list(s0.get_leases())), 1)
# add-lease on a missing storage index is silently ignored
self.failUnlessEqual(ss.remote_add_lease(b"si18", b"", b""), None)
self.failUnlessEqual(ss.add_lease(b"si18", b"", b""), None)
# re-allocate the slots and use the same secrets, that should update
# the lease
@ -1568,7 +1573,7 @@ class MutableServer(unittest.TestCase):
self.failUnlessEqual(len(list(s0.get_leases())), 1)
# renew it directly
ss.remote_renew_lease(b"si1", secrets(0)[1])
ss.renew_lease(b"si1", secrets(0)[1])
self.failUnlessEqual(len(list(s0.get_leases())), 1)
# now allocate them with a bunch of different secrets, to trigger the
@ -1576,7 +1581,7 @@ class MutableServer(unittest.TestCase):
write(b"si1", secrets(1), {0: ([], [(0,data)], None)}, [])
self.failUnlessEqual(len(list(s0.get_leases())), 2)
secrets2 = secrets(2)
ss.remote_add_lease(b"si1", secrets2[1], secrets2[2])
ss.add_lease(b"si1", secrets2[1], secrets2[2])
self.failUnlessEqual(len(list(s0.get_leases())), 3)
write(b"si1", secrets(3), {0: ([], [(0,data)], None)}, [])
write(b"si1", secrets(4), {0: ([], [(0,data)], None)}, [])
@ -1594,11 +1599,11 @@ class MutableServer(unittest.TestCase):
# read back the leases, make sure they're still intact.
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
ss.remote_renew_lease(b"si1", secrets(0)[1])
ss.remote_renew_lease(b"si1", secrets(1)[1])
ss.remote_renew_lease(b"si1", secrets(2)[1])
ss.remote_renew_lease(b"si1", secrets(3)[1])
ss.remote_renew_lease(b"si1", secrets(4)[1])
ss.renew_lease(b"si1", secrets(0)[1])
ss.renew_lease(b"si1", secrets(1)[1])
ss.renew_lease(b"si1", secrets(2)[1])
ss.renew_lease(b"si1", secrets(3)[1])
ss.renew_lease(b"si1", secrets(4)[1])
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
# get a new copy of the leases, with the current timestamps. Reading
# data and failing to renew/cancel leases should leave the timestamps
@ -1609,7 +1614,7 @@ class MutableServer(unittest.TestCase):
# examine the exception thus raised, make sure the old nodeid is
# present, to provide for share migration
e = self.failUnlessRaises(IndexError,
ss.remote_renew_lease, b"si1",
ss.renew_lease, b"si1",
secrets(20)[1])
e_s = str(e)
self.failUnlessIn("Unable to renew non-existent lease", e_s)
@ -1644,7 +1649,7 @@ class MutableServer(unittest.TestCase):
self.renew_secret(b"we1-%d" % n),
self.cancel_secret(b"we1-%d" % n) )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
write = ss.slot_testv_and_readv_and_writev
write_enabler, renew_secret, cancel_secret = secrets(0)
rc = write(b"si1", (write_enabler, renew_secret, cancel_secret),
{0: ([], [(0,data)], None)}, [])
@ -1660,7 +1665,7 @@ class MutableServer(unittest.TestCase):
clock.advance(835)
# Adding a lease renews it:
ss.remote_add_lease(b"si1", renew_secret, cancel_secret)
ss.add_lease(b"si1", renew_secret, cancel_secret)
[lease] = s0.get_leases()
self.assertEqual(lease.get_expiration_time(),
235 + 835 + DEFAULT_RENEWAL_TIME)
@ -1669,8 +1674,8 @@ class MutableServer(unittest.TestCase):
ss = self.create("test_remove")
self.allocate(ss, b"si1", b"we1", next(self._lease_secret),
set([0,1,2]), 100)
readv = ss.remote_slot_readv
writev = ss.remote_slot_testv_and_readv_and_writev
readv = ss.slot_readv
writev = ss.slot_testv_and_readv_and_writev
secrets = ( self.write_enabler(b"we1"),
self.renew_secret(b"we1"),
self.cancel_secret(b"we1") )
@ -1774,7 +1779,7 @@ class MutableServer(unittest.TestCase):
# We don't even need to create any shares to exercise this
# functionality. Just go straight to sending a truncate-to-zero
# write.
testv_is_good, read_data = ss.remote_slot_testv_and_readv_and_writev(
testv_is_good, read_data = ss.slot_testv_and_readv_and_writev(
storage_index=storage_index,
secrets=secrets,
test_and_write_vectors={
@ -1792,7 +1797,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
self.sparent = LoggingServiceParent()
self._lease_secret = itertools.count()
self.ss = self.create("MDMFProxies storage test server")
self.rref = RemoteBucket(self.ss)
self.rref = RemoteBucket(FoolscapStorageServer(self.ss))
self.storage_server = _StorageServer(lambda: self.rref)
self.secrets = (self.write_enabler(b"we_secret"),
self.renew_secret(b"renew_secret"),
@ -1959,7 +1964,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
If tail_segment=True, then I will write a share that has a
smaller tail segment than other segments.
"""
write = self.ss.remote_slot_testv_and_readv_and_writev
write = self.ss.slot_testv_and_readv_and_writev
data = self.build_test_mdmf_share(tail_segment, empty)
# Finally, we write the whole thing to the storage server in one
# pass.
@ -2027,7 +2032,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
empty=False):
# Some tests need SDMF shares to verify that we can still
# read them. This method writes one, which resembles but is not
write = self.ss.remote_slot_testv_and_readv_and_writev
write = self.ss.slot_testv_and_readv_and_writev
share = self.build_test_sdmf_share(empty)
testvs = [(0, 1, b"eq", b"")]
tws = {}
@ -2359,7 +2364,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
# blocks.
mw = self._make_new_mw(b"si1", 0)
# Test writing some blocks.
read = self.ss.remote_slot_readv
read = self.ss.slot_readv
expected_private_key_offset = struct.calcsize(MDMFHEADER)
expected_sharedata_offset = struct.calcsize(MDMFHEADER) + \
PRIVATE_KEY_SIZE + \
@ -3150,7 +3155,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
d = sdmfr.finish_publishing()
def _then(ignored):
self.failUnlessEqual(self.rref.write_count, 1)
read = self.ss.remote_slot_readv
read = self.ss.slot_readv
self.failUnlessEqual(read(b"si1", [0], [(0, len(data))]),
{0: [data]})
d.addCallback(_then)
@ -3207,7 +3212,7 @@ class MDMFProxies(unittest.TestCase, ShouldFailMixin):
sdmfw.finish_publishing())
def _then_again(results):
self.failUnless(results[0])
read = self.ss.remote_slot_readv
read = self.ss.slot_readv
self.failUnlessEqual(read(b"si1", [0], [(1, 8)]),
{0: [struct.pack(">Q", 1)]})
self.failUnlessEqual(read(b"si1", [0], [(9, len(data) - 9)]),

View File

@ -14,36 +14,217 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
from unittest import SkipTest
from base64 import b64encode
from twisted.trial.unittest import TestCase
from twisted.internet.defer import inlineCallbacks
from hypothesis import assume, given, strategies as st
from fixtures import Fixture, TempDir
from treq.testing import StubTreq
from klein import Klein
from hyperlink import DecodedURL
from .common import AsyncTestCase, SyncTestCase
from ..storage.server import StorageServer
from ..storage.http_server import HTTPServer
from ..storage.http_server import (
HTTPServer,
_extract_secrets,
Secrets,
ClientSecretsException,
_authorized_route,
)
from ..storage.http_client import StorageClient, ClientException
class HTTPTests(TestCase):
def _post_process(params):
secret_types, secrets = params
secrets = {t: s for (t, s) in zip(secret_types, secrets)}
headers = [
"{} {}".format(
secret_type.value, str(b64encode(secrets[secret_type]), "ascii").strip()
)
for secret_type in secret_types
]
return secrets, headers
# Creates a tuple of ({Secret enum value: secret_bytes}, [http headers with secrets]).
SECRETS_STRATEGY = (
st.sets(st.sampled_from(Secrets))
.flatmap(
lambda secret_types: st.tuples(
st.just(secret_types),
st.lists(
st.binary(min_size=32, max_size=32),
min_size=len(secret_types),
max_size=len(secret_types),
),
)
)
.map(_post_process)
)
class ExtractSecretsTests(SyncTestCase):
"""
Tests of HTTP client talking to the HTTP server.
Tests for ``_extract_secrets``.
"""
def setUp(self):
if PY2:
raise SkipTest("Not going to bother supporting Python 2")
self.storage_server = StorageServer(self.mktemp(), b"\x00" * 20)
# TODO what should the swissnum _actually_ be?
self._http_server = HTTPServer(self.storage_server, b"abcd")
self.skipTest("Not going to bother supporting Python 2")
super(ExtractSecretsTests, self).setUp()
@given(secrets_to_send=SECRETS_STRATEGY)
def test_extract_secrets(self, secrets_to_send):
"""
``_extract_secrets()`` returns a dictionary with the extracted secrets
if the input secrets match the required secrets.
"""
secrets, headers = secrets_to_send
# No secrets needed, none given:
self.assertEqual(_extract_secrets(headers, secrets.keys()), secrets)
@given(
secrets_to_send=SECRETS_STRATEGY,
secrets_to_require=st.sets(st.sampled_from(Secrets)),
)
def test_wrong_number_of_secrets(self, secrets_to_send, secrets_to_require):
"""
If the wrong number of secrets are passed to ``_extract_secrets``, a
``ClientSecretsException`` is raised.
"""
secrets_to_send, headers = secrets_to_send
assume(secrets_to_send.keys() != secrets_to_require)
with self.assertRaises(ClientSecretsException):
_extract_secrets(headers, secrets_to_require)
def test_bad_secret_missing_value(self):
"""
Missing value in ``_extract_secrets`` result in
``ClientSecretsException``.
"""
with self.assertRaises(ClientSecretsException):
_extract_secrets(["lease-renew-secret"], {Secrets.LEASE_RENEW})
def test_bad_secret_unknown_prefix(self):
"""
Missing value in ``_extract_secrets`` result in
``ClientSecretsException``.
"""
with self.assertRaises(ClientSecretsException):
_extract_secrets(["FOO eA=="], {})
def test_bad_secret_not_base64(self):
"""
A non-base64 value in ``_extract_secrets`` result in
``ClientSecretsException``.
"""
with self.assertRaises(ClientSecretsException):
_extract_secrets(["lease-renew-secret x"], {Secrets.LEASE_RENEW})
def test_bad_secret_wrong_length_lease_renew(self):
"""
Lease renewal secrets must be 32-bytes long.
"""
with self.assertRaises(ClientSecretsException):
_extract_secrets(["lease-renew-secret eA=="], {Secrets.LEASE_RENEW})
def test_bad_secret_wrong_length_lease_cancel(self):
"""
Lease cancel secrets must be 32-bytes long.
"""
with self.assertRaises(ClientSecretsException):
_extract_secrets(["lease-cancel-secret eA=="], {Secrets.LEASE_RENEW})
SWISSNUM_FOR_TEST = b"abcd"
class TestApp(object):
"""HTTP API for testing purposes."""
_app = Klein()
_swissnum = SWISSNUM_FOR_TEST # Match what the test client is using
@_authorized_route(_app, {Secrets.UPLOAD}, "/upload_secret", methods=["GET"])
def validate_upload_secret(self, request, authorization):
if authorization == {Secrets.UPLOAD: b"MAGIC"}:
return "GOOD SECRET"
else:
return "BAD: {}".format(authorization)
class RoutingTests(AsyncTestCase):
"""
Tests for the HTTP routing infrastructure.
"""
def setUp(self):
if PY2:
self.skipTest("Not going to bother supporting Python 2")
super(RoutingTests, self).setUp()
# Could be a fixture, but will only be used in this test class so not
# going to bother:
self._http_server = TestApp()
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
b"abcd",
treq=StubTreq(self._http_server.get_resource()),
SWISSNUM_FOR_TEST,
treq=StubTreq(self._http_server._app.resource()),
)
@inlineCallbacks
def test_authorization_enforcement(self):
"""
The requirement for secrets is enforced; if they are not given, a 400
response code is returned.
"""
# Without secret, get a 400 error.
response = yield self.client._request(
"GET", "http://127.0.0.1/upload_secret", {}
)
self.assertEqual(response.code, 400)
# With secret, we're good.
response = yield self.client._request(
"GET", "http://127.0.0.1/upload_secret", {Secrets.UPLOAD: b"MAGIC"}
)
self.assertEqual(response.code, 200)
self.assertEqual((yield response.content()), b"GOOD SECRET")
class HttpTestFixture(Fixture):
"""
Setup HTTP tests' infrastructure, the storage server and corresponding
client.
"""
def _setUp(self):
self.tempdir = self.useFixture(TempDir())
self.storage_server = StorageServer(self.tempdir.path, b"\x00" * 20)
# TODO what should the swissnum _actually_ be?
self.http_server = HTTPServer(self.storage_server, SWISSNUM_FOR_TEST)
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
SWISSNUM_FOR_TEST,
treq=StubTreq(self.http_server.get_resource()),
)
class GenericHTTPAPITests(AsyncTestCase):
"""
Tests of HTTP client talking to the HTTP server, for generic HTTP API
endpoints and concerns.
"""
def setUp(self):
if PY2:
self.skipTest("Not going to bother supporting Python 2")
super(GenericHTTPAPITests, self).setUp()
self.http = self.useFixture(HttpTestFixture())
@inlineCallbacks
def test_bad_authentication(self):
"""
@ -53,7 +234,7 @@ class HTTPTests(TestCase):
client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
b"something wrong",
treq=StubTreq(self._http_server.get_resource()),
treq=StubTreq(self.http.http_server.get_resource()),
)
with self.assertRaises(ClientException) as e:
yield client.get_version()
@ -67,14 +248,14 @@ class HTTPTests(TestCase):
We ignore available disk space and max immutable share size, since that
might change across calls.
"""
version = yield self.client.get_version()
version = yield self.http.client.get_version()
version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
b"available-space"
)
version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
b"maximum-immutable-share-size"
)
expected_version = self.storage_server.remote_get_version()
expected_version = self.http.storage_server.get_version()
expected_version[b"http://allmydata.org/tahoe/protocols/storage/v1"].pop(
b"available-space"
)

View File

@ -53,7 +53,6 @@ from allmydata.scripts.admin import (
from allmydata.scripts.runner import (
Options,
)
from .common_util import FakeCanary
from .common_web import (
render,
@ -304,28 +303,27 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin):
mutable_si_3, rs3, cs3, we3 = make_mutable(b"\x03" * 16)
rs3a, cs3a = make_extra_lease(mutable_si_3, 1)
sharenums = [0]
canary = FakeCanary()
# note: 'tahoe debug dump-share' will not handle this file, since the
# inner contents are not a valid CHK share
data = b"\xff" * 1000
a,w = ss.remote_allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1000, canary)
w[0].remote_write(0, data)
w[0].remote_close()
a,w = ss.allocate_buckets(immutable_si_0, rs0, cs0, sharenums,
1000)
w[0].write(0, data)
w[0].close()
a,w = ss.remote_allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1000, canary)
w[0].remote_write(0, data)
w[0].remote_close()
ss.remote_add_lease(immutable_si_1, rs1a, cs1a)
a,w = ss.allocate_buckets(immutable_si_1, rs1, cs1, sharenums,
1000)
w[0].write(0, data)
w[0].close()
ss.add_lease(immutable_si_1, rs1a, cs1a)
writev = ss.remote_slot_testv_and_readv_and_writev
writev = ss.slot_testv_and_readv_and_writev
writev(mutable_si_2, (we2, rs2, cs2),
{0: ([], [(0,data)], len(data))}, [])
writev(mutable_si_3, (we3, rs3, cs3),
{0: ([], [(0,data)], len(data))}, [])
ss.remote_add_lease(mutable_si_3, rs3a, cs3a)
ss.add_lease(mutable_si_3, rs3a, cs3a)
self.sis = [immutable_si_0, immutable_si_1, mutable_si_2, mutable_si_3]
self.renew_secrets = [rs0, rs1, rs1a, rs2, rs3, rs3a]

View File

@ -11,6 +11,7 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import time
import json
from urllib.parse import (
quote,
@ -24,14 +25,23 @@ from twisted.web.template import Tag
from twisted.web.test.requesthelper import DummyRequest
from twisted.application import service
from testtools.twistedsupport import succeeded
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import (
inlineCallbacks,
succeed,
)
from ...storage_client import (
NativeStorageServer,
StorageFarmBroker,
)
from ...web.root import RootElement
from ...web.root import (
RootElement,
Root,
)
from ...util.connection_status import ConnectionStatus
from ...crypto.ed25519 import (
create_signing_keypair,
)
from allmydata.web.root import URIHandler
from allmydata.client import _Client
@ -47,6 +57,7 @@ from ..common import (
from ..common import (
SyncTestCase,
AsyncTestCase,
)
from testtools.matchers import (
@ -138,3 +149,94 @@ class RenderServiceRow(SyncTestCase):
self.assertThat(item.slotData.get("version"), Equals(""))
self.assertThat(item.slotData.get("nickname"), Equals(""))
class RenderRoot(AsyncTestCase):
@inlineCallbacks
def test_root_json(self):
"""
The 'welcome' / root page renders properly with ?t=json when some
servers show None for available_space while others show a
valid int
See also https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3852
"""
ann = {
"anonymous-storage-FURL": "pb://w2hqnbaa25yw4qgcvghl5psa3srpfgw3@tcp:127.0.0.1:51309/vucto2z4fxment3vfxbqecblbf6zyp6x",
"permutation-seed-base32": "w2hqnbaa25yw4qgcvghl5psa3srpfgw3",
}
srv0 = NativeStorageServer(b"server_id0", ann, None, {}, EMPTY_CLIENT_CONFIG)
srv0.get_connection_status = lambda: ConnectionStatus(False, "summary0", {}, 0, 0)
srv1 = NativeStorageServer(b"server_id1", ann, None, {}, EMPTY_CLIENT_CONFIG)
srv1.get_connection_status = lambda: ConnectionStatus(False, "summary1", {}, 0, 0)
# arrange for this server to have some valid available space
srv1.get_available_space = lambda: 12345
class FakeClient(_Client):
history = []
stats_provider = object()
nickname = ""
nodeid = b"asdf"
_node_public_key = create_signing_keypair()[1]
introducer_clients = []
helper = None
def __init__(self):
service.MultiService.__init__(self)
self.storage_broker = StorageFarmBroker(
permute_peers=True,
tub_maker=None,
node_config=EMPTY_CLIENT_CONFIG,
)
self.storage_broker.test_add_server(b"test-srv0", srv0)
self.storage_broker.test_add_server(b"test-srv1", srv1)
root = Root(FakeClient(), now_fn=time.time)
lines = []
req = DummyRequest(b"")
req.fields = {}
req.args = {
b"t": [b"json"],
}
# for some reason, DummyRequest is already finished when we
# try to add a notifyFinish handler, so override that
# behavior.
def nop():
return succeed(None)
req.notifyFinish = nop
req.write = lines.append
yield root.render(req)
raw_js = b"".join(lines).decode("utf8")
js = json.loads(raw_js)
servers = js["servers"]
self.assertEquals(len(servers), 2)
self.assertIn(
{
"connection_status": "summary0",
"nodeid": "server_id0",
"last_received_data": 0,
"version": None,
"available_space": None,
"nickname": ""
},
servers
)
self.assertIn(
{
"connection_status": "summary1",
"nodeid": "server_id1",
"last_received_data": 0,
"version": None,
"available_space": 12345,
"nickname": ""
},
servers
)

View File

@ -820,29 +820,37 @@ class Web(WebMixin, WebErrorMixin, testutil.StallMixin, testutil.ReallyEqualMixi
"""
d = self.GET("/?t=json")
def _check(res):
"""
Check that the results are correct.
We can't depend on the order of servers in the output
"""
decoded = json.loads(res)
expected = {
u'introducers': {
u'statuses': [],
self.assertEqual(decoded['introducers'], {u'statuses': []})
actual_servers = decoded[u"servers"]
self.assertEquals(len(actual_servers), 2)
self.assertIn(
{
u"nodeid": u'other_nodeid',
u'available_space': 123456,
u'connection_status': u'summary',
u'last_received_data': 30,
u'nickname': u'other_nickname \u263b',
u'version': u'1.0',
},
u'servers': sorted([
{u"nodeid": u'other_nodeid',
u'available_space': 123456,
u'connection_status': u'summary',
u'last_received_data': 30,
u'nickname': u'other_nickname \u263b',
u'version': u'1.0',
},
{u"nodeid": u'disconnected_nodeid',
u'available_space': 123456,
u'connection_status': u'summary',
u'last_received_data': 35,
u'nickname': u'disconnected_nickname \u263b',
u'version': u'1.0',
},
], key=lambda o: sorted(o.items())),
}
self.assertEqual(expected, decoded)
actual_servers
)
self.assertIn(
{
u"nodeid": u'disconnected_nodeid',
u'available_space': 123456,
u'connection_status': u'summary',
u'last_received_data': 35,
u'nickname': u'disconnected_nickname \u263b',
u'version': u'1.0',
},
actual_servers
)
d.addCallback(_check)
return d

View File

@ -90,10 +90,11 @@ class TahoeLAFSRequestTests(SyncTestCase):
"""
self._fields_test(b"GET", {}, b"", Equals(None))
def test_form_fields(self):
def test_form_fields_if_filename_set(self):
"""
When a ``POST`` request is received, form fields are parsed into
``TahoeLAFSRequest.fields``.
``TahoeLAFSRequest.fields`` and the body is bytes (presuming ``filename``
is set).
"""
form_data, boundary = multipart_formdata([
[param(u"name", u"foo"),
@ -121,6 +122,49 @@ class TahoeLAFSRequestTests(SyncTestCase):
),
)
def test_form_fields_if_name_is_file(self):
"""
When a ``POST`` request is received, form fields are parsed into
``TahoeLAFSRequest.fields`` and the body is bytes when ``name``
is set to ``"file"``.
"""
form_data, boundary = multipart_formdata([
[param(u"name", u"foo"),
body(u"bar"),
],
[param(u"name", u"file"),
body(u"some file contents"),
],
])
self._fields_test(
b"POST",
{b"content-type": b"multipart/form-data; boundary=" + bytes(boundary, 'ascii')},
form_data.encode("ascii"),
AfterPreprocessing(
lambda fs: {
k: fs.getvalue(k)
for k
in fs.keys()
},
Equals({
"foo": "bar",
"file": b"some file contents",
}),
),
)
def test_form_fields_require_correct_mime_type(self):
"""
The body of a ``POST`` is not parsed into fields if its mime type is
not ``multipart/form-data``.
Reproducer for https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3854
"""
data = u'{"lalala": "lolo"}'
data = data.encode("utf-8")
self._fields_test(b"POST", {"content-type": "application/json"},
data, Equals(None))
class TahoeLAFSSiteTests(SyncTestCase):
"""

View File

@ -297,14 +297,12 @@ class Root(MultiFormatResource):
}
return json.dumps(result, indent=1) + "\n"
def _describe_known_servers(self, broker):
return sorted(list(
return list(
self._describe_server(server)
for server
in broker.get_known_servers()
), key=lambda o: sorted(o.items()))
)
def _describe_server(self, server):
status = server.get_connection_status()

View File

@ -114,7 +114,8 @@ class TahoeLAFSRequest(Request, object):
self.path, argstring = x
self.args = parse_qs(argstring, 1)
if self.method == b'POST':
content_type = (self.requestHeaders.getRawHeaders("content-type") or [""])[0]
if self.method == b'POST' and content_type.split(";")[0] in ("multipart/form-data", "application/x-www-form-urlencoded"):
# We use FieldStorage here because it performs better than
# cgi.parse_multipart(self.content, pdict) which is what
# twisted.web.http.Request uses.