Merge branch 'master' of github.com:tahoe-lafs/tahoe-lafs into 3788.refactor-src.test.mutable

This commit is contained in:
fenn-cs 2021-10-12 22:42:59 +01:00
commit bf670c0664
32 changed files with 1221 additions and 282 deletions

View File

@ -42,6 +42,9 @@ workflows:
- "nixos-19-09": - "nixos-19-09":
{} {}
- "nixos-21-05":
{}
# Test against PyPy 2.7 # Test against PyPy 2.7
- "pypy27-buster": - "pypy27-buster":
{} {}
@ -438,8 +441,7 @@ jobs:
image: "tahoelafsci/fedora:29-py" image: "tahoelafsci/fedora:29-py"
user: "nobody" user: "nobody"
nixos-19-09: &NIXOS
nixos-19-09:
docker: docker:
# Run in a highly Nix-capable environment. # Run in a highly Nix-capable environment.
- <<: *DOCKERHUB_AUTH - <<: *DOCKERHUB_AUTH
@ -447,6 +449,7 @@ jobs:
environment: environment:
NIX_PATH: "nixpkgs=https://github.com/NixOS/nixpkgs-channels/archive/nixos-19.09-small.tar.gz" NIX_PATH: "nixpkgs=https://github.com/NixOS/nixpkgs-channels/archive/nixos-19.09-small.tar.gz"
SOURCE: "nix/"
steps: steps:
- "checkout" - "checkout"
@ -463,7 +466,17 @@ jobs:
# build a couple simple little dependencies that don't take # build a couple simple little dependencies that don't take
# advantage of multiple cores and we get a little speedup by doing # advantage of multiple cores and we get a little speedup by doing
# them in parallel. # them in parallel.
nix-build --cores 3 --max-jobs 2 nix/ nix-build --cores 3 --max-jobs 2 "$SOURCE"
nixos-21-05:
<<: *NIXOS
environment:
# Note this doesn't look more similar to the 19.09 NIX_PATH URL because
# there was some internal shuffling by the NixOS project about how they
# publish stable revisions.
NIX_PATH: "nixpkgs=https://github.com/NixOS/nixpkgs/archive/d32b07e6df276d78e3640eb43882b80c9b2b3459.tar.gz"
SOURCE: "nix/py3.nix"
typechecks: typechecks:
docker: docker:

View File

@ -28,7 +28,7 @@ jobs:
- 3.9 - 3.9
include: include:
# On macOS don't bother with 3.6-3.8, just to get faster builds. # On macOS don't bother with 3.6-3.8, just to get faster builds.
- os: macos-latest - os: macos-10.15
python-version: 2.7 python-version: 2.7
- os: macos-latest - os: macos-latest
python-version: 3.9 python-version: 3.9
@ -168,7 +168,7 @@ jobs:
- 3.9 - 3.9
include: include:
# On macOS don't bother with 3.6, just to get faster builds. # On macOS don't bother with 3.6, just to get faster builds.
- os: macos-latest - os: macos-10.15
python-version: 2.7 python-version: 2.7
- os: macos-latest - os: macos-latest
python-version: 3.9 python-version: 3.9
@ -183,7 +183,7 @@ jobs:
# We have to use an older version of Tor for running integration # We have to use an older version of Tor for running integration
# tests on macOS. # tests on macOS.
- name: Install Tor [macOS, ${{ matrix.python-version }} ] - name: Install Tor [macOS, ${{ matrix.python-version }} ]
if: ${{ matrix.os == 'macos-latest' }} if: ${{ contains(matrix.os, 'macos') }}
run: | run: |
brew extract --version 0.4.5.8 tor homebrew/cask brew extract --version 0.4.5.8 tor homebrew/cask
brew install tor@0.4.5.8 brew install tor@0.4.5.8
@ -247,7 +247,7 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: os:
- macos-latest - macos-10.15
- windows-latest - windows-latest
- ubuntu-latest - ubuntu-latest
python-version: python-version:

View File

@ -482,8 +482,8 @@ The response includes ``already-have`` and ``allocated`` for two reasons:
This might be because a server has become unavailable and a remaining server needs to store more shares for the upload. This might be because a server has become unavailable and a remaining server needs to store more shares for the upload.
It could also just be that the client's preferred servers have changed. It could also just be that the client's preferred servers have changed.
``PUT /v1/immutable/:storage_index/:share_number`` ``PATCH /v1/immutable/:storage_index/:share_number``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Write data for the indicated share. Write data for the indicated share.
The share number must belong to the storage index. The share number must belong to the storage index.
@ -497,11 +497,8 @@ If any one of these requests fails then at most 128KiB of upload work needs to b
The server must recognize when all of the data has been received and mark the share as complete The server must recognize when all of the data has been received and mark the share as complete
(which it can do because it was informed of the size when the storage index was initialized). (which it can do because it was informed of the size when the storage index was initialized).
Clients should upload chunks in re-assembly order.
* When a chunk that does not complete the share is successfully uploaded the response is ``OK``. * When a chunk that does not complete the share is successfully uploaded the response is ``OK``.
* When the chunk that completes the share is successfully uploaded the response is ``CREATED``.
* If the *Content-Range* for a request covers part of the share that has already been uploaded the response is ``CONFLICT``.
The response body indicates the range of share data that has yet to be uploaded. The response body indicates the range of share data that has yet to be uploaded.
That is:: That is::
@ -514,6 +511,43 @@ Clients should upload chunks in re-assembly order.
] ]
} }
* When the chunk that completes the share is successfully uploaded the response is ``CREATED``.
* If the *Content-Range* for a request covers part of the share that has already,
and the data does not match already written data,
the response is ``CONFLICT``.
At this point the only thing to do is abort the upload and start from scratch (see below).
``PUT /v1/immutable/:storage_index/:share_number/abort``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
This cancels an *in-progress* upload.
The response code:
* When the upload is still in progress and therefore the abort has succeeded,
the response is ``OK``.
Future uploads can start from scratch with no pre-existing upload state stored on the server.
* If the uploaded has already finished, the response is 405 (Method Not Allowed)
and no change is made.
Discussion
``````````
``PUT`` verbs are only supposed to be used to replace the whole resource,
thus the use of ``PATCH``.
From RFC 7231::
An origin server that allows PUT on a given target resource MUST send
a 400 (Bad Request) response to a PUT request that contains a
Content-Range header field (Section 4.2 of [RFC7233]), since the
payload is likely to be partial content that has been mistakenly PUT
as a full representation. Partial content updates are possible by
targeting a separately identified resource with state that overlaps a
portion of the larger resource, or by using a different method that
has been specifically defined for partial updates (for example, the
PATCH method defined in [RFC5789]).
``POST /v1/immutable/:storage_index/:share_number/corrupt`` ``POST /v1/immutable/:storage_index/:share_number/corrupt``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@ -600,7 +634,6 @@ For example::
"test": [{ "test": [{
"offset": 3, "offset": 3,
"size": 5, "size": 5,
"operator": "eq",
"specimen": "hello" "specimen": "hello"
}, ...], }, ...],
"write": [{ "write": [{
@ -626,6 +659,9 @@ For example::
} }
} }
A test vector or read vector that read beyond the boundaries of existing data will return nothing for any bytes past the end.
As a result, if there is no data at all, an empty bytestring is returned no matter what the offset or length.
Reading Reading
~~~~~~~ ~~~~~~~
@ -666,19 +702,19 @@ Immutable Data
#. Upload the content for immutable share ``7``:: #. Upload the content for immutable share ``7``::
PUT /v1/immutable/AAAAAAAAAAAAAAAA/7 PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
Content-Range: bytes 0-15/48 Content-Range: bytes 0-15/48
<first 16 bytes of share data> <first 16 bytes of share data>
200 OK 200 OK
PUT /v1/immutable/AAAAAAAAAAAAAAAA/7 PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
Content-Range: bytes 16-31/48 Content-Range: bytes 16-31/48
<second 16 bytes of share data> <second 16 bytes of share data>
200 OK 200 OK
PUT /v1/immutable/AAAAAAAAAAAAAAAA/7 PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
Content-Range: bytes 32-47/48 Content-Range: bytes 32-47/48
<final 16 bytes of share data> <final 16 bytes of share data>
@ -701,7 +737,10 @@ Immutable Data
Mutable Data Mutable Data
~~~~~~~~~~~~ ~~~~~~~~~~~~
1. Create mutable share number ``3`` with ``10`` bytes of data in slot ``BBBBBBBBBBBBBBBB``:: 1. Create mutable share number ``3`` with ``10`` bytes of data in slot ``BBBBBBBBBBBBBBBB``.
The special test vector of size 1 but empty bytes will only pass
if there is no existing share,
otherwise it will read a byte which won't match `b""`::
POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
{ {
@ -715,7 +754,6 @@ Mutable Data
"test": [{ "test": [{
"offset": 0, "offset": 0,
"size": 1, "size": 1,
"operator": "eq",
"specimen": "" "specimen": ""
}], }],
"write": [{ "write": [{
@ -747,8 +785,7 @@ Mutable Data
3: { 3: {
"test": [{ "test": [{
"offset": 0, "offset": 0,
"size": <checkstring size>, "size": <length of checkstring>,
"operator": "eq",
"specimen": "<checkstring>" "specimen": "<checkstring>"
}], }],
"write": [{ "write": [{

View File

@ -0,0 +1 @@
tahoe-lafs now provides its statistics also in OpenMetrics format (for Prometheus et. al.) at `/statistics?t=openmetrics`.

0
newsfragments/3793.minor Normal file
View File

0
newsfragments/3795.minor Normal file
View File

0
newsfragments/3797.minor Normal file
View File

0
newsfragments/3798.minor Normal file
View File

0
newsfragments/3799.minor Normal file
View File

View File

@ -0,0 +1 @@
When uploading an immutable, overlapping writes that include conflicting data are rejected. In practice, this likely didn't happen in real-world usage.

0
newsfragments/3805.minor Normal file
View File

0
newsfragments/3806.minor Normal file
View File

View File

@ -0,0 +1 @@
Tahoe-LAFS now supports running on NixOS 21.05 with Python 3.

0
newsfragments/3810.minor Normal file
View File

View File

@ -0,0 +1,19 @@
{ lib, buildPythonPackage, fetchPypi }:
buildPythonPackage rec {
pname = "collections-extended";
version = "1.0.3";
src = fetchPypi {
inherit pname version;
sha256 = "0lb69x23asd68n0dgw6lzxfclavrp2764xsnh45jm97njdplznkw";
};
# Tests aren't in tarball, for 1.0.3 at least.
doCheck = false;
meta = with lib; {
homepage = https://github.com/mlenzen/collections-extended;
description = "Extra Python Collections - bags (multisets), setlists (unique list / indexed set), RangeMap and IndexedDict";
license = licenses.asl20;
};
}

View File

@ -2,22 +2,32 @@ self: super: {
python27 = super.python27.override { python27 = super.python27.override {
packageOverrides = python-self: python-super: { packageOverrides = python-self: python-super: {
# eliot is not part of nixpkgs at all at this time. # eliot is not part of nixpkgs at all at this time.
eliot = python-self.callPackage ./eliot.nix { }; eliot = python-self.pythonPackages.callPackage ./eliot.nix { };
# NixOS autobahn package has trollius as a dependency, although # NixOS autobahn package has trollius as a dependency, although
# it is optional. Trollius is unmaintained and fails on CI. # it is optional. Trollius is unmaintained and fails on CI.
autobahn = python-super.callPackage ./autobahn.nix { }; autobahn = python-super.pythonPackages.callPackage ./autobahn.nix { };
# Porting to Python 3 is greatly aided by the future package. A # Porting to Python 3 is greatly aided by the future package. A
# slightly newer version than appears in nixos 19.09 is helpful. # slightly newer version than appears in nixos 19.09 is helpful.
future = python-super.callPackage ./future.nix { }; future = python-super.pythonPackages.callPackage ./future.nix { };
# Need version of pyutil that supports Python 3. The version in 19.09 # Need version of pyutil that supports Python 3. The version in 19.09
# is too old. # is too old.
pyutil = python-super.callPackage ./pyutil.nix { }; pyutil = python-super.pythonPackages.callPackage ./pyutil.nix { };
# Need a newer version of Twisted, too. # Need a newer version of Twisted, too.
twisted = python-super.callPackage ./twisted.nix { }; twisted = python-super.pythonPackages.callPackage ./twisted.nix { };
# collections-extended is not part of nixpkgs at this time.
collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { };
};
};
python39 = super.python39.override {
packageOverrides = python-self: python-super: {
# collections-extended is not part of nixpkgs at this time.
collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { };
}; };
}; };
} }

7
nix/py3.nix Normal file
View File

@ -0,0 +1,7 @@
# This is the main entrypoint for the Tahoe-LAFS derivation.
{ pkgs ? import <nixpkgs> { } }:
# Add our Python packages to nixpkgs to simplify the expression for the
# Tahoe-LAFS derivation.
let pkgs' = pkgs.extend (import ./overlays.nix);
# Evaluate the expression for our Tahoe-LAFS derivation.
in pkgs'.python39.pkgs.callPackage ./tahoe-lafs.nix { }

View File

@ -97,7 +97,7 @@ EOF
setuptoolsTrial pyasn1 zope_interface setuptoolsTrial pyasn1 zope_interface
service-identity pyyaml magic-wormhole treq service-identity pyyaml magic-wormhole treq
eliot autobahn cryptography netifaces setuptools eliot autobahn cryptography netifaces setuptools
future pyutil distro configparser future pyutil distro configparser collections-extended
]; ];
checkInputs = with python.pkgs; [ checkInputs = with python.pkgs; [
@ -107,6 +107,7 @@ EOF
beautifulsoup4 beautifulsoup4
html5lib html5lib
tenacity tenacity
prometheus_client
]; ];
checkPhase = '' checkPhase = ''

View File

@ -137,6 +137,9 @@ install_requires = [
# Backported configparser for Python 2: # Backported configparser for Python 2:
"configparser ; python_version < '3.0'", "configparser ; python_version < '3.0'",
# For the RangeMap datastructure.
"collections-extended",
] ]
setup_requires = [ setup_requires = [
@ -404,6 +407,8 @@ setup(name="tahoe-lafs", # also set in __init__.py
"tenacity", "tenacity",
"paramiko", "paramiko",
"pytest-timeout", "pytest-timeout",
# Does our OpenMetrics endpoint adhere to the spec:
"prometheus-client == 0.11.0",
] + tor_requires + i2p_requires, ] + tor_requires + i2p_requires,
"tor": tor_requires, "tor": tor_requires,
"i2p": i2p_requires, "i2p": i2p_requires,

View File

@ -53,6 +53,14 @@ LeaseRenewSecret = Hash # used to protect lease renewal requests
LeaseCancelSecret = Hash # was used to protect lease cancellation requests LeaseCancelSecret = Hash # was used to protect lease cancellation requests
class DataTooLargeError(Exception):
"""The write went past the expected size of the bucket."""
class ConflictingWriteError(Exception):
"""Two writes happened to same immutable with different data."""
class RIBucketWriter(RemoteInterface): class RIBucketWriter(RemoteInterface):
""" Objects of this kind live on the server side. """ """ Objects of this kind live on the server side. """
def write(offset=Offset, data=ShareData): def write(offset=Offset, data=ShareData):
@ -91,9 +99,9 @@ class RIBucketReader(RemoteInterface):
TestVector = ListOf(TupleOf(Offset, ReadSize, bytes, bytes)) TestVector = ListOf(TupleOf(Offset, ReadSize, bytes, bytes))
# elements are (offset, length, operator, specimen) # elements are (offset, length, operator, specimen)
# operator is one of "lt, le, eq, ne, ge, gt" # operator must be b"eq", typically length==len(specimen), but one can ensure
# nop always passes and is used to fetch data while writing. # writes don't happen to empty shares by setting length to 1 and specimen to
# you should use length==len(specimen) for everything except nop # b"". The operator is still used for wire compatibility with old versions.
DataVector = ListOf(TupleOf(Offset, ShareData)) DataVector = ListOf(TupleOf(Offset, ShareData))
# (offset, data). This limits us to 30 writes of 1MiB each per call # (offset, data). This limits us to 30 writes of 1MiB each per call
TestAndWriteVectorsForShares = DictOf(int, TestAndWriteVectorsForShares = DictOf(int,
@ -351,6 +359,12 @@ class IStorageServer(Interface):
): ):
""" """
:see: ``RIStorageServer.slot_testv_readv_and_writev`` :see: ``RIStorageServer.slot_testv_readv_and_writev``
While the interface mostly matches, test vectors are simplified.
Instead of a tuple ``(offset, read_size, operator, expected_data)`` in
the original, for this method you need only pass in
``(offset, read_size, expected_data)``, with the operator implicitly
being ``b"eq"``.
""" """
def advise_corrupt_share( def advise_corrupt_share(

View File

@ -309,7 +309,7 @@ class SDMFSlotWriteProxy(object):
salt) salt)
else: else:
checkstring = checkstring_or_seqnum checkstring = checkstring_or_seqnum
self._testvs = [(0, len(checkstring), b"eq", checkstring)] self._testvs = [(0, len(checkstring), checkstring)]
def get_checkstring(self): def get_checkstring(self):
@ -318,7 +318,7 @@ class SDMFSlotWriteProxy(object):
server. server.
""" """
if self._testvs: if self._testvs:
return self._testvs[0][3] return self._testvs[0][2]
return b"" return b""
@ -548,9 +548,9 @@ class SDMFSlotWriteProxy(object):
if not self._testvs: if not self._testvs:
# Our caller has not provided us with another checkstring # Our caller has not provided us with another checkstring
# yet, so we assume that we are writing a new share, and set # yet, so we assume that we are writing a new share, and set
# a test vector that will allow a new share to be written. # a test vector that will only allow a new share to be written.
self._testvs = [] self._testvs = []
self._testvs.append(tuple([0, 1, b"eq", b""])) self._testvs.append(tuple([0, 1, b""]))
tw_vectors = {} tw_vectors = {}
tw_vectors[self.shnum] = (self._testvs, datavs, None) tw_vectors[self.shnum] = (self._testvs, datavs, None)
@ -889,7 +889,7 @@ class MDMFSlotWriteProxy(object):
self._testvs = [] self._testvs = []
else: else:
self._testvs = [] self._testvs = []
self._testvs.append((0, len(checkstring), b"eq", checkstring)) self._testvs.append((0, len(checkstring), checkstring))
def __repr__(self): def __repr__(self):
@ -1161,8 +1161,10 @@ class MDMFSlotWriteProxy(object):
"""I write the data vectors in datavs to the remote slot.""" """I write the data vectors in datavs to the remote slot."""
tw_vectors = {} tw_vectors = {}
if not self._testvs: if not self._testvs:
# Make sure we will only successfully write if the share didn't
# previously exist.
self._testvs = [] self._testvs = []
self._testvs.append(tuple([0, 1, b"eq", b""])) self._testvs.append(tuple([0, 1, b""]))
if not self._written: if not self._written:
# Write a new checkstring to the share when we write it, so # Write a new checkstring to the share when we write it, so
# that we have something to check later. # that we have something to check later.
@ -1170,7 +1172,7 @@ class MDMFSlotWriteProxy(object):
datavs.append((0, new_checkstring)) datavs.append((0, new_checkstring))
def _first_write(): def _first_write():
self._written = True self._written = True
self._testvs = [(0, len(new_checkstring), b"eq", new_checkstring)] self._testvs = [(0, len(new_checkstring), new_checkstring)]
on_success = _first_write on_success = _first_write
tw_vectors[self.shnum] = (self._testvs, datavs, None) tw_vectors[self.shnum] = (self._testvs, datavs, None)
d = self._storage_server.slot_testv_and_readv_and_writev( d = self._storage_server.slot_testv_and_readv_and_writev(

View File

@ -13,8 +13,9 @@ if PY2:
import os.path import os.path
from allmydata.util import base32 from allmydata.util import base32
class DataTooLargeError(Exception): # Backwards compatibility.
pass from allmydata.interfaces import DataTooLargeError # noqa: F401
class UnknownMutableContainerVersionError(Exception): class UnknownMutableContainerVersionError(Exception):
pass pass
class UnknownImmutableContainerVersionError(Exception): class UnknownImmutableContainerVersionError(Exception):

View File

@ -13,16 +13,20 @@ if PY2:
import os, stat, struct, time import os, stat, struct, time
from collections_extended import RangeMap
from foolscap.api import Referenceable from foolscap.api import Referenceable
from zope.interface import implementer from zope.interface import implementer
from allmydata.interfaces import RIBucketWriter, RIBucketReader from allmydata.interfaces import (
RIBucketWriter, RIBucketReader, ConflictingWriteError,
DataTooLargeError,
)
from allmydata.util import base32, fileutil, log from allmydata.util import base32, fileutil, log
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.hashutil import timing_safe_compare from allmydata.util.hashutil import timing_safe_compare
from allmydata.storage.lease import LeaseInfo from allmydata.storage.lease import LeaseInfo
from allmydata.storage.common import UnknownImmutableContainerVersionError, \ from allmydata.storage.common import UnknownImmutableContainerVersionError
DataTooLargeError
# each share file (in storage/shares/$SI/$SHNUM) contains lease information # each share file (in storage/shares/$SI/$SHNUM) contains lease information
# and share data. The share data is accessed by RIBucketWriter.write and # and share data. The share data is accessed by RIBucketWriter.write and
@ -204,19 +208,18 @@ class ShareFile(object):
@implementer(RIBucketWriter) @implementer(RIBucketWriter)
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary): def __init__(self, ss, incominghome, finalhome, max_size, lease_info):
self.ss = ss self.ss = ss
self.incominghome = incominghome self.incominghome = incominghome
self.finalhome = finalhome self.finalhome = finalhome
self._max_size = max_size # don't allow the client to write more than this self._max_size = max_size # don't allow the client to write more than this
self._canary = canary
self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
self.closed = False self.closed = False
self.throw_out_all_data = False self.throw_out_all_data = False
self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
# also, add our lease to the file now, so that other ones can be # also, add our lease to the file now, so that other ones can be
# added by simultaneous uploaders # added by simultaneous uploaders
self._sharefile.add_lease(lease_info) self._sharefile.add_lease(lease_info)
self._already_written = RangeMap()
def allocated_size(self): def allocated_size(self):
return self._max_size return self._max_size
@ -226,7 +229,20 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
precondition(not self.closed) precondition(not self.closed)
if self.throw_out_all_data: if self.throw_out_all_data:
return return
# Make sure we're not conflicting with existing data:
end = offset + len(data)
for (chunk_start, chunk_stop, _) in self._already_written.ranges(offset, end):
chunk_len = chunk_stop - chunk_start
actual_chunk = self._sharefile.read_share_data(chunk_start, chunk_len)
writing_chunk = data[chunk_start - offset:chunk_stop - offset]
if actual_chunk != writing_chunk:
raise ConflictingWriteError(
"Chunk {}-{} doesn't match already written data.".format(chunk_start, chunk_stop)
)
self._sharefile.write_share_data(offset, data) self._sharefile.write_share_data(offset, data)
self._already_written.set(True, offset, end)
self.ss.add_latency("write", time.time() - start) self.ss.add_latency("write", time.time() - start)
self.ss.count("write") self.ss.count("write")
@ -262,22 +278,19 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
pass pass
self._sharefile = None self._sharefile = None
self.closed = True self.closed = True
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
filelen = os.stat(self.finalhome)[stat.ST_SIZE] filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen) self.ss.bucket_writer_closed(self, filelen)
self.ss.add_latency("close", time.time() - start) self.ss.add_latency("close", time.time() - start)
self.ss.count("close") self.ss.count("close")
def _disconnected(self): def disconnected(self):
if not self.closed: if not self.closed:
self._abort() self._abort()
def remote_abort(self): def remote_abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome, log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL) facility="tahoe.storage", level=log.UNUSUAL)
if not self.closed:
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
self._abort() self._abort()
self.ss.count("abort") self.ss.count("abort")

View File

@ -434,20 +434,9 @@ class MutableShareFile(object):
# self._change_container_size() here. # self._change_container_size() here.
def testv_compare(a, op, b): def testv_compare(a, op, b):
assert op in (b"lt", b"le", b"eq", b"ne", b"ge", b"gt") assert op == b"eq"
if op == b"lt":
return a < b
if op == b"le":
return a <= b
if op == b"eq":
return a == b return a == b
if op == b"ne":
return a != b
if op == b"ge":
return a >= b
if op == b"gt":
return a > b
# never reached
class EmptyShare(object): class EmptyShare(object):

View File

@ -11,13 +11,14 @@ if PY2:
# Omit open() to get native behavior where open("w") always accepts native # Omit open() to get native behavior where open("w") always accepts native
# strings. Omit bytes so we don't leak future's custom bytes. # strings. Omit bytes so we don't leak future's custom bytes.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401
else:
from typing import Dict
import os, re, struct, time import os, re, struct, time
import weakref
import six import six
from foolscap.api import Referenceable from foolscap.api import Referenceable
from foolscap.ipb import IRemoteReference
from twisted.application import service from twisted.application import service
from zope.interface import implementer from zope.interface import implementer
@ -89,7 +90,6 @@ class StorageServer(service.MultiService, Referenceable):
self.incomingdir = os.path.join(sharedir, 'incoming') self.incomingdir = os.path.join(sharedir, 'incoming')
self._clean_incomplete() self._clean_incomplete()
fileutil.make_dirs(self.incomingdir) fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
log.msg("StorageServer created", facility="tahoe.storage") log.msg("StorageServer created", facility="tahoe.storage")
if reserved_space: if reserved_space:
@ -121,6 +121,17 @@ class StorageServer(service.MultiService, Referenceable):
self.lease_checker.setServiceParent(self) self.lease_checker.setServiceParent(self)
self._get_current_time = get_current_time self._get_current_time = get_current_time
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
# to connection: when disconnection happens, the BucketWriters are
# removed. For HTTP, this makes no sense, so there will be
# timeout-based cleanup; see
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807.
# Map in-progress filesystem path -> BucketWriter:
self._bucket_writers = {} # type: Dict[str,BucketWriter]
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
def __repr__(self): def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),) return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
@ -238,7 +249,7 @@ class StorageServer(service.MultiService, Referenceable):
def allocated_size(self): def allocated_size(self):
space = 0 space = 0
for bw in self._active_writers: for bw in self._bucket_writers.values():
space += bw.allocated_size() space += bw.allocated_size()
return space return space
@ -263,10 +274,13 @@ class StorageServer(service.MultiService, Referenceable):
} }
return version return version
def remote_allocate_buckets(self, storage_index, def _allocate_buckets(self, storage_index,
renew_secret, cancel_secret, renew_secret, cancel_secret,
sharenums, allocated_size, sharenums, allocated_size,
canary, owner_num=0): owner_num=0):
"""
Generic bucket allocation API.
"""
# owner_num is not for clients to set, but rather it should be # owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated # curried into the PersonalStorageServer instance that is dedicated
# to a particular owner. # to a particular owner.
@ -315,7 +329,7 @@ class StorageServer(service.MultiService, Referenceable):
# great! we already have it. easy. # great! we already have it. easy.
pass pass
elif os.path.exists(incominghome): elif os.path.exists(incominghome):
# Note that we don't create BucketWriters for shnums that # For Foolscap we don't create BucketWriters for shnums that
# have a partial share (in incoming/), so if a second upload # have a partial share (in incoming/), so if a second upload
# occurs while the first is still in progress, the second # occurs while the first is still in progress, the second
# uploader will use different storage servers. # uploader will use different storage servers.
@ -323,11 +337,11 @@ class StorageServer(service.MultiService, Referenceable):
elif (not limited) or (remaining_space >= max_space_per_bucket): elif (not limited) or (remaining_space >= max_space_per_bucket):
# ok! we need to create the new share file. # ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome, bw = BucketWriter(self, incominghome, finalhome,
max_space_per_bucket, lease_info, canary) max_space_per_bucket, lease_info)
if self.no_storage: if self.no_storage:
bw.throw_out_all_data = True bw.throw_out_all_data = True
bucketwriters[shnum] = bw bucketwriters[shnum] = bw
self._active_writers[bw] = 1 self._bucket_writers[incominghome] = bw
if limited: if limited:
remaining_space -= max_space_per_bucket remaining_space -= max_space_per_bucket
else: else:
@ -340,6 +354,21 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("allocate", self._get_current_time() - start) self.add_latency("allocate", self._get_current_time() - start)
return alreadygot, bucketwriters return alreadygot, bucketwriters
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
return alreadygot, bucketwriters
def _iter_share_files(self, storage_index): def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index): for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
@ -383,7 +412,10 @@ class StorageServer(service.MultiService, Referenceable):
def bucket_writer_closed(self, bw, consumed_size): def bucket_writer_closed(self, bw, consumed_size):
if self.stats_provider: if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size) self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._active_writers[bw] del self._bucket_writers[bw.incominghome]
if bw in self._bucket_writer_disconnect_markers:
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
canary.dontNotifyOnDisconnect(disconnect_marker)
def _get_bucket_shares(self, storage_index): def _get_bucket_shares(self, storage_index):
"""Return a list of (shnum, pathname) tuples for files that hold """Return a list of (shnum, pathname) tuples for files that hold

View File

@ -994,11 +994,19 @@ class _StorageServer(object):
tw_vectors, tw_vectors,
r_vector, r_vector,
): ):
# Match the wire protocol, which requires 4-tuples for test vectors.
wire_format_tw_vectors = {
key: (
[(start, length, b"eq", data) for (start, length, data) in value[0]],
value[1],
value[2],
) for (key, value) in tw_vectors.items()
}
return self._rref.callRemote( return self._rref.callRemote(
"slot_testv_and_readv_and_writev", "slot_testv_and_readv_and_writev",
storage_index, storage_index,
secrets, secrets,
tw_vectors, wire_format_tw_vectors,
r_vector, r_vector,
) )

View File

@ -314,6 +314,16 @@ class FakeCanary(object):
def getPeer(self): def getPeer(self):
return "<fake>" return "<fake>"
def disconnected(self):
"""Disconnect the canary, to be called by test code.
Can only happen once.
"""
if self.disconnectors is not None:
for (f, args, kwargs) in list(self.disconnectors.values()):
f(*args, **kwargs)
self.disconnectors = None
class ShouldFailMixin(object): class ShouldFailMixin(object):

View File

@ -149,7 +149,7 @@ class FakeStorageServer(object):
readv = {} readv = {}
for shnum, (testv, writev, new_length) in list(tw_vectors.items()): for shnum, (testv, writev, new_length) in list(tw_vectors.items()):
for (offset, length, op, specimen) in testv: for (offset, length, op, specimen) in testv:
assert op in (b"le", b"eq", b"ge") assert op == b"eq"
# TODO: this isn't right, the read is controlled by read_vector, # TODO: this isn't right, the read is controlled by read_vector,
# not by testv # not by testv
readv[shnum] = [ specimen readv[shnum] = [ specimen

View File

@ -20,11 +20,9 @@ if PY2:
from random import Random from random import Random
from testtools import skipIf
from twisted.internet.defer import inlineCallbacks from twisted.internet.defer import inlineCallbacks
from foolscap.api import Referenceable from foolscap.api import Referenceable, RemoteException
from allmydata.interfaces import IStorageServer from allmydata.interfaces import IStorageServer
from .common_system import SystemTestMixin from .common_system import SystemTestMixin
@ -77,6 +75,10 @@ class IStorageServerImmutableAPIsTestsMixin(object):
Tests for ``IStorageServer``'s immutable APIs. Tests for ``IStorageServer``'s immutable APIs.
``self.storage_server`` is expected to provide ``IStorageServer``. ``self.storage_server`` is expected to provide ``IStorageServer``.
``self.disconnect()`` should disconnect and then reconnect, creating a new
``self.storage_server``. Some implementations may wish to skip tests using
this; HTTP has no notion of disconnection.
""" """
@inlineCallbacks @inlineCallbacks
@ -98,13 +100,10 @@ class IStorageServerImmutableAPIsTestsMixin(object):
# We validate the bucket objects' interface in a later test. # We validate the bucket objects' interface in a later test.
@inlineCallbacks @inlineCallbacks
@skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793")
def test_allocate_buckets_repeat(self): def test_allocate_buckets_repeat(self):
""" """
allocate_buckets() with the same storage index returns the same result, ``IStorageServer.allocate_buckets()`` with the same storage index does not return
because the shares have not been written to. work-in-progress buckets, but will add any newly added buckets.
This fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793
""" """
storage_index, renew_secret, cancel_secret = ( storage_index, renew_secret, cancel_secret = (
new_storage_index(), new_storage_index(),
@ -115,7 +114,7 @@ class IStorageServerImmutableAPIsTestsMixin(object):
storage_index, storage_index,
renew_secret, renew_secret,
cancel_secret, cancel_secret,
sharenums=set(range(5)), sharenums=set(range(4)),
allocated_size=1024, allocated_size=1024,
canary=Referenceable(), canary=Referenceable(),
) )
@ -128,40 +127,64 @@ class IStorageServerImmutableAPIsTestsMixin(object):
Referenceable(), Referenceable(),
) )
self.assertEqual(already_got, already_got2) self.assertEqual(already_got, already_got2)
self.assertEqual(set(allocated.keys()), set(allocated2.keys())) self.assertEqual(set(allocated2.keys()), {4})
@skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793")
@inlineCallbacks @inlineCallbacks
def test_allocate_buckets_more_sharenums(self): def abort_or_disconnect_half_way(self, abort_or_disconnect):
""" """
allocate_buckets() with the same storage index but more sharenums If we disconnect/abort in the middle of writing to a bucket, all data
acknowledges the extra shares don't exist. is wiped, and it's even possible to write different data to the bucket.
Fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793 (In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
``abort_or_disconnect`` is a callback that takes a bucket and aborts up
load, or perhaps disconnects the whole connection.
""" """
storage_index, renew_secret, cancel_secret = ( storage_index, renew_secret, cancel_secret = (
new_storage_index(), new_storage_index(),
new_secret(), new_secret(),
new_secret(), new_secret(),
) )
yield self.storage_server.allocate_buckets( (_, allocated) = yield self.storage_server.allocate_buckets(
storage_index, storage_index,
renew_secret, renew_secret,
cancel_secret, cancel_secret,
sharenums=set(range(5)), sharenums={0},
allocated_size=1024, allocated_size=1024,
canary=Referenceable(), canary=Referenceable(),
) )
(already_got2, allocated2) = yield self.storage_server.allocate_buckets(
# Bucket 1 is fully written in one go.
yield allocated[0].callRemote("write", 0, b"1" * 1024)
# Disconnect or abort, depending on the test:
yield abort_or_disconnect(allocated[0])
# Write different data with no complaint:
(_, allocated) = yield self.storage_server.allocate_buckets(
storage_index, storage_index,
renew_secret, renew_secret,
cancel_secret, cancel_secret,
sharenums=set(range(7)), sharenums={0},
allocated_size=1024, allocated_size=1024,
canary=Referenceable(), canary=Referenceable(),
) )
self.assertEqual(already_got2, set()) # none were fully written yield allocated[0].callRemote("write", 0, b"2" * 1024)
self.assertEqual(set(allocated2.keys()), set(range(7)))
def test_disconnection(self):
"""
If we disconnect in the middle of writing to a bucket, all data is
wiped, and it's even possible to write different data to the bucket.
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
HTTP protocol should skip this test, since disconnection is meaningless
concept; this is more about testing implicit contract the Foolscap
implementation depends on doesn't change as we refactor things.
"""
return self.abort_or_disconnect_half_way(lambda _: self.disconnect())
@inlineCallbacks @inlineCallbacks
def test_written_shares_are_allocated(self): def test_written_shares_are_allocated(self):
@ -248,26 +271,529 @@ class IStorageServerImmutableAPIsTestsMixin(object):
(yield buckets[2].callRemote("read", 0, 1024)), b"3" * 512 + b"4" * 512 (yield buckets[2].callRemote("read", 0, 1024)), b"3" * 512 + b"4" * 512
) )
@skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801") @inlineCallbacks
def test_overlapping_writes(self): def test_non_matching_overlapping_writes(self):
""" """
The policy for overlapping writes is TBD: When doing overlapping writes in immutable uploads, non-matching writes
https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801 fail.
""" """
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
new_secret(),
new_secret(),
)
(_, allocated) = yield self.storage_server.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums={0},
allocated_size=30,
canary=Referenceable(),
)
yield allocated[0].callRemote("write", 0, b"1" * 25)
# Overlapping write that doesn't match:
with self.assertRaises(RemoteException):
yield allocated[0].callRemote("write", 20, b"2" * 10)
@inlineCallbacks
def test_matching_overlapping_writes(self):
"""
When doing overlapping writes in immutable uploads, matching writes
succeed.
"""
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
new_secret(),
new_secret(),
)
(_, allocated) = yield self.storage_server.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums={0},
allocated_size=25,
canary=Referenceable(),
)
yield allocated[0].callRemote("write", 0, b"1" * 10)
# Overlapping write that matches:
yield allocated[0].callRemote("write", 5, b"1" * 20)
yield allocated[0].callRemote("close")
buckets = yield self.storage_server.get_buckets(storage_index)
self.assertEqual(set(buckets.keys()), {0})
self.assertEqual((yield buckets[0].callRemote("read", 0, 25)), b"1" * 25)
def test_abort(self):
"""
If we call ``abort`` on the ``RIBucketWriter`` to disconnect in the
middle of writing to a bucket, all data is wiped, and it's even
possible to write different data to the bucket.
(In the real world one probably wouldn't do that, but writing different
data is a good way to test that the original data really was wiped.)
"""
return self.abort_or_disconnect_half_way(
lambda bucket: bucket.callRemote("abort")
)
@inlineCallbacks
def test_get_buckets_skips_unfinished_buckets(self):
"""
Buckets that are not fully written are not returned by
``IStorageServer.get_buckets()`` implementations.
"""
storage_index = new_storage_index()
(_, allocated) = yield self.storage_server.allocate_buckets(
storage_index,
renew_secret=new_secret(),
cancel_secret=new_secret(),
sharenums=set(range(5)),
allocated_size=10,
canary=Referenceable(),
)
# Bucket 1 is fully written
yield allocated[1].callRemote("write", 0, b"1" * 10)
yield allocated[1].callRemote("close")
# Bucket 2 is partially written
yield allocated[2].callRemote("write", 0, b"1" * 5)
buckets = yield self.storage_server.get_buckets(storage_index)
self.assertEqual(set(buckets.keys()), {1})
@inlineCallbacks
def test_read_bucket_at_offset(self):
"""
Given a read bucket returned from ``IStorageServer.get_buckets()``, it
is possible to read at different offsets and lengths, with reads past
the end resulting in empty bytes.
"""
length = 256 * 17
storage_index = new_storage_index()
(_, allocated) = yield self.storage_server.allocate_buckets(
storage_index,
renew_secret=new_secret(),
cancel_secret=new_secret(),
sharenums=set(range(1)),
allocated_size=length,
canary=Referenceable(),
)
total_data = _randbytes(256 * 17)
yield allocated[0].callRemote("write", 0, total_data)
yield allocated[0].callRemote("close")
buckets = yield self.storage_server.get_buckets(storage_index)
bucket = buckets[0]
for start, to_read in [
(0, 250), # fraction
(0, length), # whole thing
(100, 1024), # offset fraction
(length + 1, 100), # completely out of bounds
(length - 100, 200), # partially out of bounds
]:
data = yield bucket.callRemote("read", start, to_read)
self.assertEqual(
data,
total_data[start : start + to_read],
"Didn't match for start {}, length {}".format(start, to_read),
)
@inlineCallbacks
def test_bucket_advise_corrupt_share(self):
"""
Calling ``advise_corrupt_share()`` on a bucket returned by
``IStorageServer.get_buckets()`` does not result in error (other
behavior is opaque at this level of abstraction).
"""
storage_index = new_storage_index()
(_, allocated) = yield self.storage_server.allocate_buckets(
storage_index,
renew_secret=new_secret(),
cancel_secret=new_secret(),
sharenums=set(range(1)),
allocated_size=10,
canary=Referenceable(),
)
yield allocated[0].callRemote("write", 0, b"0123456789")
yield allocated[0].callRemote("close")
buckets = yield self.storage_server.get_buckets(storage_index)
yield buckets[0].callRemote("advise_corrupt_share", b"OH NO")
class IStorageServerMutableAPIsTestsMixin(object):
"""
Tests for ``IStorageServer``'s mutable APIs.
``self.storage_server`` is expected to provide ``IStorageServer``.
``STARAW`` is short for ``slot_testv_and_readv_and_writev``.
"""
def new_secrets(self):
"""Return a 3-tuple of secrets for STARAW calls."""
return (new_secret(), new_secret(), new_secret())
def staraw(self, *args, **kwargs):
"""Like ``slot_testv_and_readv_and_writev``, but less typing."""
return self.storage_server.slot_testv_and_readv_and_writev(*args, **kwargs)
@inlineCallbacks
def test_STARAW_reads_after_write(self):
"""
When data is written with
``IStorageServer.slot_testv_and_readv_and_writev``, it can then be read
by a separate call using that API.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"abcdefg")], 7),
1: ([], [(0, b"0123"), (4, b"456")], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
# Whole thing, partial, going beyond the edge, completely outside
# range:
r_vector=[(0, 7), (2, 3), (6, 8), (100, 10)],
)
self.assertEqual(
reads,
{0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]},
)
@inlineCallbacks
def test_SATRAW_reads_happen_before_writes_in_single_query(self):
"""
If a ``IStorageServer.slot_testv_and_readv_and_writev`` command
contains both reads and writes, the read returns results that precede
the write.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"abcdefg")], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Read and write in same command; read happens before write:
(written, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"X" * 7)], 7),
},
r_vector=[(0, 7)],
)
self.assertEqual(written, True)
self.assertEqual(reads, {0: [b"abcdefg"]})
# The write is available in next read:
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {0: [b"X" * 7]})
@inlineCallbacks
def test_SATRAW_writes_happens_only_if_test_matches(self):
"""
If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes both a
test and a write, the write succeeds if the test matches, and fails if
the test does not match.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"1" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Test matches, so write happens:
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: (
[(0, 3, b"1" * 3), (3, 4, b"1" * 4)],
[(0, b"2" * 7)],
7,
),
},
r_vector=[],
)
self.assertEqual(written, True)
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {0: [b"2" * 7]})
# Test does not match, so write does not happen:
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([(0, 7, b"1" * 7)], [(0, b"3" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, False)
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {0: [b"2" * 7]})
@inlineCallbacks
def test_SATRAW_tests_past_end_of_data(self):
"""
If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes a test
vector that reads past the end of the data, the result is limited to
actual available data.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
# Since there is no data on server, the test vector will return empty
# string, which matches expected result, so write will succeed.
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([(0, 10, b"")], [(0, b"1" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Now the test vector is a 10-read off of a 7-byte value, but expected
# value is still 7 bytes, so the write will again succeed.
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([(0, 10, b"1" * 7)], [(0, b"2" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
@inlineCallbacks
def test_SATRAW_reads_past_end_of_data(self):
"""
If a ``IStorageServer.slot_testv_and_readv_and_writev`` reads past the
end of the data, the result is limited to actual available data.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
# Write some data
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"12345")], 5),
},
r_vector=[],
)
self.assertEqual(written, True)
# Reads past end.
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 100), (2, 50)],
)
self.assertEqual(reads, {0: [b"12345", b"345"]})
@inlineCallbacks
def test_STARAW_write_enabler_must_match(self):
"""
If the write enabler secret passed to
``IStorageServer.slot_testv_and_readv_and_writev`` doesn't match
previous writes, the write fails.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"1" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Write enabler secret does not match, so write does not happen:
bad_secrets = (new_secret(),) + secrets[1:]
with self.assertRaises(RemoteException):
yield self.staraw(
storage_index,
bad_secrets,
tw_vectors={
0: ([], [(0, b"2" * 7)], 7),
},
r_vector=[],
)
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {0: [b"1" * 7]})
@inlineCallbacks
def test_STARAW_zero_new_length_deletes(self):
"""
A zero new length passed to
``IStorageServer.slot_testv_and_readv_and_writev`` deletes the share.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"1" * 7)], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
# Write with new length of 0:
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"1" * 7)], 0),
},
r_vector=[],
)
self.assertEqual(written, True)
# It's gone!
(_, reads) = yield self.staraw(
storage_index,
secrets,
tw_vectors={},
r_vector=[(0, 7)],
)
self.assertEqual(reads, {})
@inlineCallbacks
def test_slot_readv(self):
"""
Data written with ``IStorageServer.slot_testv_and_readv_and_writev()``
can be read using ``IStorageServer.slot_readv()``. Reads can't go past
the end of the data.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"abcdefg")], 7),
1: ([], [(0, b"0123"), (4, b"456")], 7),
# This will never get read from, just here to show we only read
# from shares explicitly requested by slot_readv:
2: ([], [(0, b"XYZW")], 4),
},
r_vector=[],
)
self.assertEqual(written, True)
reads = yield self.storage_server.slot_readv(
storage_index,
shares=[0, 1],
# Whole thing, partial, going beyond the edge, completely outside
# range:
readv=[(0, 7), (2, 3), (6, 8), (100, 10)],
)
self.assertEqual(
reads,
{0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]},
)
@inlineCallbacks
def test_slot_readv_no_shares(self):
"""
With no shares given, ``IStorageServer.slot_readv()`` reads from all shares.
"""
secrets = self.new_secrets()
storage_index = new_storage_index()
(written, _) = yield self.staraw(
storage_index,
secrets,
tw_vectors={
0: ([], [(0, b"abcdefg")], 7),
1: ([], [(0, b"0123456")], 7),
2: ([], [(0, b"9876543")], 7),
},
r_vector=[],
)
self.assertEqual(written, True)
reads = yield self.storage_server.slot_readv(
storage_index,
shares=[],
readv=[(0, 7)],
)
self.assertEqual(
reads,
{0: [b"abcdefg"], 1: [b"0123456"], 2: [b"9876543"]},
)
class _FoolscapMixin(SystemTestMixin): class _FoolscapMixin(SystemTestMixin):
"""Run tests on Foolscap version of ``IStorageServer.""" """Run tests on Foolscap version of ``IStorageServer."""
def _get_native_server(self):
return next(iter(self.clients[0].storage_broker.get_known_servers()))
@inlineCallbacks @inlineCallbacks
def setUp(self): def setUp(self):
AsyncTestCase.setUp(self) AsyncTestCase.setUp(self)
self.basedir = "test_istorageserver/" + self.id() self.basedir = "test_istorageserver/" + self.id()
yield SystemTestMixin.setUp(self) yield SystemTestMixin.setUp(self)
yield self.set_up_nodes(1) yield self.set_up_nodes(1)
self.storage_server = next( self.storage_server = self._get_native_server().get_storage_server()
iter(self.clients[0].storage_broker.get_known_servers())
).get_storage_server()
self.assertTrue(IStorageServer.providedBy(self.storage_server)) self.assertTrue(IStorageServer.providedBy(self.storage_server))
@inlineCallbacks @inlineCallbacks
@ -275,6 +801,16 @@ class _FoolscapMixin(SystemTestMixin):
AsyncTestCase.tearDown(self) AsyncTestCase.tearDown(self)
yield SystemTestMixin.tearDown(self) yield SystemTestMixin.tearDown(self)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
current = self.storage_server
yield self.bounce_client(0)
self.storage_server = self._get_native_server().get_storage_server()
assert self.storage_server is not current
class FoolscapSharedAPIsTests( class FoolscapSharedAPIsTests(
_FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase _FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase
@ -286,3 +822,9 @@ class FoolscapImmutableAPIsTests(
_FoolscapMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase _FoolscapMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase
): ):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" """Foolscap-specific tests for immutable ``IStorageServer`` APIs."""
class FoolscapMutableAPIsTests(
_FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase
):
"""Foolscap-specific tests for immutable ``IStorageServer`` APIs."""

View File

@ -0,0 +1,273 @@
"""
Tests for ``/statistics?t=openmetrics``.
Ported to Python 3.
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
from prometheus_client.openmetrics import parser
from treq.testing import RequestTraversalAgent
from twisted.web.http import OK
from twisted.web.client import readBody
from twisted.web.resource import Resource
from testtools.twistedsupport import succeeded
from testtools.matchers import (
AfterPreprocessing,
Equals,
MatchesAll,
MatchesStructure,
MatchesPredicate,
)
from testtools.content import text_content
from allmydata.web.status import Statistics
from allmydata.test.common import SyncTestCase
class FakeStatsProvider(object):
"""
A stats provider that hands backed a canned collection of performance
statistics.
"""
def get_stats(self):
# Parsed into a dict from a running tahoe's /statistics?t=json
stats = {
"stats": {
"storage_server.latencies.get.99_9_percentile": None,
"storage_server.latencies.close.10_0_percentile": 0.00021910667419433594,
"storage_server.latencies.read.01_0_percentile": 2.8848648071289062e-05,
"storage_server.latencies.writev.99_9_percentile": None,
"storage_server.latencies.read.99_9_percentile": None,
"storage_server.latencies.allocate.99_0_percentile": 0.000988006591796875,
"storage_server.latencies.writev.mean": 0.00045332245070571654,
"storage_server.latencies.close.99_9_percentile": None,
"cpu_monitor.15min_avg": 0.00017592000079223033,
"storage_server.disk_free_for_root": 103289454592,
"storage_server.latencies.get.99_0_percentile": 0.000347137451171875,
"storage_server.latencies.get.mean": 0.00021158285060171353,
"storage_server.latencies.read.90_0_percentile": 8.893013000488281e-05,
"storage_server.latencies.write.01_0_percentile": 3.600120544433594e-05,
"storage_server.latencies.write.99_9_percentile": 0.00017690658569335938,
"storage_server.latencies.close.90_0_percentile": 0.00033211708068847656,
"storage_server.disk_total": 103497859072,
"storage_server.latencies.close.95_0_percentile": 0.0003509521484375,
"storage_server.latencies.readv.samplesize": 1000,
"storage_server.disk_free_for_nonroot": 103289454592,
"storage_server.latencies.close.mean": 0.0002715024480059103,
"storage_server.latencies.writev.95_0_percentile": 0.0007410049438476562,
"storage_server.latencies.readv.90_0_percentile": 0.0003781318664550781,
"storage_server.latencies.readv.99_0_percentile": 0.0004050731658935547,
"storage_server.latencies.allocate.mean": 0.0007128627429454784,
"storage_server.latencies.close.samplesize": 326,
"storage_server.latencies.get.50_0_percentile": 0.0001819133758544922,
"storage_server.latencies.write.50_0_percentile": 4.482269287109375e-05,
"storage_server.latencies.readv.01_0_percentile": 0.0002970695495605469,
"storage_server.latencies.get.10_0_percentile": 0.00015687942504882812,
"storage_server.latencies.allocate.90_0_percentile": 0.0008189678192138672,
"storage_server.latencies.get.samplesize": 472,
"storage_server.total_bucket_count": 393,
"storage_server.latencies.read.mean": 5.936201880959903e-05,
"storage_server.latencies.allocate.01_0_percentile": 0.0004208087921142578,
"storage_server.latencies.allocate.99_9_percentile": None,
"storage_server.latencies.readv.mean": 0.00034061360359191893,
"storage_server.disk_used": 208404480,
"storage_server.latencies.allocate.50_0_percentile": 0.0007410049438476562,
"storage_server.latencies.read.99_0_percentile": 0.00011992454528808594,
"node.uptime": 3805759.8545179367,
"storage_server.latencies.writev.10_0_percentile": 0.00035190582275390625,
"storage_server.latencies.writev.90_0_percentile": 0.0006821155548095703,
"storage_server.latencies.close.01_0_percentile": 0.00021505355834960938,
"storage_server.latencies.close.50_0_percentile": 0.0002579689025878906,
"cpu_monitor.1min_avg": 0.0002130000000003444,
"storage_server.latencies.writev.50_0_percentile": 0.0004138946533203125,
"storage_server.latencies.read.95_0_percentile": 9.107589721679688e-05,
"storage_server.latencies.readv.95_0_percentile": 0.0003859996795654297,
"storage_server.latencies.write.10_0_percentile": 3.719329833984375e-05,
"storage_server.accepting_immutable_shares": 1,
"storage_server.latencies.writev.samplesize": 309,
"storage_server.latencies.get.95_0_percentile": 0.0003190040588378906,
"storage_server.latencies.readv.10_0_percentile": 0.00032210350036621094,
"storage_server.latencies.get.90_0_percentile": 0.0002999305725097656,
"storage_server.latencies.get.01_0_percentile": 0.0001239776611328125,
"cpu_monitor.total": 641.4941180000001,
"storage_server.latencies.write.samplesize": 1000,
"storage_server.latencies.write.95_0_percentile": 9.489059448242188e-05,
"storage_server.latencies.read.50_0_percentile": 6.890296936035156e-05,
"storage_server.latencies.writev.01_0_percentile": 0.00033211708068847656,
"storage_server.latencies.read.10_0_percentile": 3.0994415283203125e-05,
"storage_server.latencies.allocate.10_0_percentile": 0.0004949569702148438,
"storage_server.reserved_space": 0,
"storage_server.disk_avail": 103289454592,
"storage_server.latencies.write.99_0_percentile": 0.00011301040649414062,
"storage_server.latencies.write.90_0_percentile": 9.083747863769531e-05,
"cpu_monitor.5min_avg": 0.0002370666691157502,
"storage_server.latencies.write.mean": 5.8008909225463864e-05,
"storage_server.latencies.readv.50_0_percentile": 0.00033020973205566406,
"storage_server.latencies.close.99_0_percentile": 0.0004038810729980469,
"storage_server.allocated": 0,
"storage_server.latencies.writev.99_0_percentile": 0.0007710456848144531,
"storage_server.latencies.readv.99_9_percentile": 0.0004780292510986328,
"storage_server.latencies.read.samplesize": 170,
"storage_server.latencies.allocate.samplesize": 406,
"storage_server.latencies.allocate.95_0_percentile": 0.0008411407470703125,
},
"counters": {
"storage_server.writev": 309,
"storage_server.bytes_added": 197836146,
"storage_server.close": 326,
"storage_server.readv": 14299,
"storage_server.allocate": 406,
"storage_server.read": 170,
"storage_server.write": 3775,
"storage_server.get": 472,
},
}
return stats
class HackItResource(Resource, object):
"""
A bridge between ``RequestTraversalAgent`` and ``MultiFormatResource``
(used by ``Statistics``). ``MultiFormatResource`` expects the request
object to have a ``fields`` attribute but Twisted's ``IRequest`` has no
such attribute. Create it here.
"""
def getChildWithDefault(self, path, request):
request.fields = None
return Resource.getChildWithDefault(self, path, request)
class OpenMetrics(SyncTestCase):
"""
Tests for ``/statistics?t=openmetrics``.
"""
def test_spec_compliance(self):
"""
Does our output adhere to the `OpenMetrics <https://openmetrics.io/>` spec?
https://github.com/OpenObservability/OpenMetrics/
https://prometheus.io/docs/instrumenting/exposition_formats/
"""
root = HackItResource()
root.putChild(b"", Statistics(FakeStatsProvider()))
rta = RequestTraversalAgent(root)
d = rta.request(b"GET", b"http://localhost/?t=openmetrics")
self.assertThat(d, succeeded(matches_stats(self)))
def matches_stats(testcase):
"""
Create a matcher that matches a response that confirms to the OpenMetrics
specification.
* The ``Content-Type`` is **application/openmetrics-text; version=1.0.0; charset=utf-8**.
* The status is **OK**.
* The body can be parsed by an OpenMetrics parser.
* The metric families in the body are grouped and sorted.
* At least one of the expected families appears in the body.
:param testtools.TestCase testcase: The case to which to add detail about the matching process.
:return: A matcher.
"""
return MatchesAll(
MatchesStructure(
code=Equals(OK),
# "The content type MUST be..."
headers=has_header(
"content-type",
"application/openmetrics-text; version=1.0.0; charset=utf-8",
),
),
AfterPreprocessing(
readBodyText,
succeeded(
MatchesAll(
MatchesPredicate(add_detail(testcase, "response body"), "%s dummy"),
parses_as_openmetrics(),
)
),
),
)
def add_detail(testcase, name):
"""
Create a matcher that always matches and as a side-effect adds the matched
value as detail to the testcase.
:param testtools.TestCase testcase: The case to which to add the detail.
:return: A matcher.
"""
def predicate(value):
testcase.addDetail(name, text_content(value))
return True
return predicate
def readBodyText(response):
"""
Read the response body and decode it using UTF-8.
:param twisted.web.iweb.IResponse response: The response from which to
read the body.
:return: A ``Deferred`` that fires with the ``str`` body.
"""
d = readBody(response)
d.addCallback(lambda body: body.decode("utf-8"))
return d
def has_header(name, value):
"""
Create a matcher that matches a response object that includes the given
name / value pair.
:param str name: The name of the item in the HTTP header to match.
:param str value: The value of the item in the HTTP header to match by equality.
:return: A matcher.
"""
return AfterPreprocessing(
lambda headers: headers.getRawHeaders(name),
Equals([value]),
)
def parses_as_openmetrics():
"""
Create a matcher that matches a ``str`` string that can be parsed as an
OpenMetrics response and includes a certain well-known value expected by
the tests.
:return: A matcher.
"""
# The parser throws if it does not like its input.
# Wrapped in a list() to drain the generator.
return AfterPreprocessing(
lambda body: list(parser.text_string_to_metric_families(body)),
AfterPreprocessing(
lambda families: families[-1].name,
Equals("tahoe_stats_storage_server_total_bucket_count"),
),
)

View File

@ -8,7 +8,7 @@ from __future__ import division
from __future__ import print_function from __future__ import print_function
from __future__ import unicode_literals from __future__ import unicode_literals
from future.utils import native_str, PY2, bytes_to_native_str from future.utils import native_str, PY2, bytes_to_native_str, bchr
if PY2: if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six import ensure_str from six import ensure_str
@ -19,13 +19,15 @@ import platform
import stat import stat
import struct import struct
import shutil import shutil
import gc from uuid import uuid4
from twisted.trial import unittest from twisted.trial import unittest
from twisted.internet import defer from twisted.internet import defer
from twisted.internet.task import Clock from twisted.internet.task import Clock
from hypothesis import given, strategies
import itertools import itertools
from allmydata import interfaces from allmydata import interfaces
from allmydata.util import fileutil, hashutil, base32 from allmydata.util import fileutil, hashutil, base32
@ -33,7 +35,7 @@ from allmydata.storage.server import StorageServer, DEFAULT_RENEWAL_TIME
from allmydata.storage.shares import get_share_file from allmydata.storage.shares import get_share_file
from allmydata.storage.mutable import MutableShareFile from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \ from allmydata.storage.common import storage_index_to_dir, \
UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError, \ UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError, \
si_b2a, si_a2b si_b2a, si_a2b
from allmydata.storage.lease import LeaseInfo from allmydata.storage.lease import LeaseInfo
@ -47,7 +49,9 @@ from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \
SIGNATURE_SIZE, \ SIGNATURE_SIZE, \
VERIFICATION_KEY_SIZE, \ VERIFICATION_KEY_SIZE, \
SHARE_HASH_CHAIN_SIZE SHARE_HASH_CHAIN_SIZE
from allmydata.interfaces import BadWriteEnablerError from allmydata.interfaces import (
BadWriteEnablerError, DataTooLargeError, ConflictingWriteError,
)
from allmydata.test.no_network import NoNetworkServer from allmydata.test.no_network import NoNetworkServer
from allmydata.storage_client import ( from allmydata.storage_client import (
_StorageServer, _StorageServer,
@ -124,8 +128,7 @@ class Bucket(unittest.TestCase):
def test_create(self): def test_create(self):
incoming, final = self.make_workdir("test_create") incoming, final = self.make_workdir("test_create")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), bw = BucketWriter(self, incoming, final, 200, self.make_lease())
FakeCanary())
bw.remote_write(0, b"a"*25) bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25) bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*25) bw.remote_write(50, b"c"*25)
@ -134,8 +137,7 @@ class Bucket(unittest.TestCase):
def test_readwrite(self): def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite") incoming, final = self.make_workdir("test_readwrite")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), bw = BucketWriter(self, incoming, final, 200, self.make_lease())
FakeCanary())
bw.remote_write(0, b"a"*25) bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25) bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*7) # last block may be short bw.remote_write(50, b"c"*7) # last block may be short
@ -147,6 +149,88 @@ class Bucket(unittest.TestCase):
self.failUnlessEqual(br.remote_read(25, 25), b"b"*25) self.failUnlessEqual(br.remote_read(25, 25), b"b"*25)
self.failUnlessEqual(br.remote_read(50, 7), b"c"*7) self.failUnlessEqual(br.remote_read(50, 7), b"c"*7)
def test_write_past_size_errors(self):
"""Writing beyond the size of the bucket throws an exception."""
for (i, (offset, length)) in enumerate([(0, 201), (10, 191), (202, 34)]):
incoming, final = self.make_workdir(
"test_write_past_size_errors-{}".format(i)
)
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
with self.assertRaises(DataTooLargeError):
bw.remote_write(offset, b"a" * length)
@given(
maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
)
def test_overlapping_writes_ok_if_matching(
self, maybe_overlapping_offset, maybe_overlapping_length
):
"""
Writes that overlap with previous writes are OK when the content is the
same.
"""
length = 100
expected_data = b"".join(bchr(i) for i in range(100))
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, expected_data[10:20])
bw.remote_write(30, expected_data[30:40])
bw.remote_write(50, expected_data[50:60])
# Then, an overlapping write but with matching data:
bw.remote_write(
maybe_overlapping_offset,
expected_data[
maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length
]
)
# Now fill in the holes:
bw.remote_write(0, expected_data[0:10])
bw.remote_write(20, expected_data[20:30])
bw.remote_write(40, expected_data[40:50])
bw.remote_write(60, expected_data[60:])
bw.remote_close()
br = BucketReader(self, bw.finalhome)
self.assertEqual(br.remote_read(0, length), expected_data)
@given(
maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98),
maybe_overlapping_length=strategies.integers(min_value=1, max_value=100),
)
def test_overlapping_writes_not_ok_if_different(
self, maybe_overlapping_offset, maybe_overlapping_length
):
"""
Writes that overlap with previous writes fail with an exception if the
contents don't match.
"""
length = 100
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, b"1" * 10)
bw.remote_write(30, b"1" * 10)
bw.remote_write(50, b"1" * 10)
# Then, write something that might overlap with some of them, but
# conflicts. Then fill in holes left by first three writes. Conflict is
# inevitable.
with self.assertRaises(ConflictingWriteError):
bw.remote_write(
maybe_overlapping_offset,
b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset),
)
bw.remote_write(0, b"1" * 10)
bw.remote_write(20, b"1" * 10)
bw.remote_write(40, b"1" * 10)
bw.remote_write(60, b"1" * 40)
def test_read_past_end_of_share_data(self): def test_read_past_end_of_share_data(self):
# test vector for immutable files (hard-coded contents of an immutable share # test vector for immutable files (hard-coded contents of an immutable share
# file): # file):
@ -228,8 +312,7 @@ class BucketProxy(unittest.TestCase):
final = os.path.join(basedir, "bucket") final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir) fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp")) fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease(), bw = BucketWriter(self, incoming, final, size, self.make_lease())
FakeCanary())
rb = RemoteBucket(bw) rb = RemoteBucket(bw)
return bw, rb, final return bw, rb, final
@ -579,26 +662,24 @@ class Server(unittest.TestCase):
# the size we request. # the size we request.
OVERHEAD = 3*4 OVERHEAD = 3*4
LEASE_SIZE = 4+32+32+4 LEASE_SIZE = 4+32+32+4
canary = FakeCanary(True) canary = FakeCanary()
already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary) already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary)
self.failUnlessEqual(len(writers), 3) self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally # now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed # allocated, allowing only 2000 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3) self.failUnlessEqual(len(ss._bucket_writers), 3)
# allocating 1001-byte shares only leaves room for one # allocating 1001-byte shares only leaves room for one
already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary) canary2 = FakeCanary()
already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2)
self.failUnlessEqual(len(writers2), 1) self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4) self.failUnlessEqual(len(ss._bucket_writers), 4)
# we abandon the first set, so their provisional allocation should be # we abandon the first set, so their provisional allocation should be
# returned # returned
canary.disconnected()
del already self.failUnlessEqual(len(ss._bucket_writers), 1)
del writers
gc.collect()
self.failUnlessEqual(len(ss._active_writers), 1)
# now we have a provisional allocation of 1001 bytes # now we have a provisional allocation of 1001 bytes
# and we close the second set, so their provisional allocation should # and we close the second set, so their provisional allocation should
@ -607,25 +688,21 @@ class Server(unittest.TestCase):
for bw in writers2.values(): for bw in writers2.values():
bw.remote_write(0, b"a"*25) bw.remote_write(0, b"a"*25)
bw.remote_close() bw.remote_close()
del already2 self.failUnlessEqual(len(ss._bucket_writers), 0)
del writers2
del bw
self.failUnlessEqual(len(ss._active_writers), 0)
# this also changes the amount reported as available by call_get_disk_stats # this also changes the amount reported as available by call_get_disk_stats
allocated = 1001 + OVERHEAD + LEASE_SIZE allocated = 1001 + OVERHEAD + LEASE_SIZE
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares # 5000-1085=3915 free, therefore we can fit 39 100byte shares
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary) canary3 = FakeCanary()
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3)
self.failUnlessEqual(len(writers3), 39) self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._active_writers), 39) self.failUnlessEqual(len(ss._bucket_writers), 39)
del already3 canary3.disconnected()
del writers3
gc.collect()
self.failUnlessEqual(len(ss._active_writers), 0) self.failUnlessEqual(len(ss._bucket_writers), 0)
ss.disownServiceParent() ss.disownServiceParent()
del ss del ss
@ -1074,23 +1151,6 @@ class MutableServer(unittest.TestCase):
})) }))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
# as should this one
answer = write(b"si1", secrets,
{0: ([(10, 5, b"lt", b"11111"),
],
[(0, b"x"*100)],
None),
},
[(10,5)],
)
self.failUnlessEqual(answer, (False,
{0: [b"11111"],
1: [b""],
2: [b""]},
))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
def test_operators(self): def test_operators(self):
# test operators, the data we're comparing is '11111' in all cases. # test operators, the data we're comparing is '11111' in all cases.
# test both fail+pass, reset data after each one. # test both fail+pass, reset data after each one.
@ -1110,63 +1170,6 @@ class MutableServer(unittest.TestCase):
reset() reset()
# lt
answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11110"),
],
[(0, b"x"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
self.failUnlessEqual(read(b"si1", [], [(0,100)]), {0: [data]})
reset()
answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11111"),
],
[(0, b"x"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
reset()
answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11112"),
],
[(0, b"y"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
reset()
# le
answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11110"),
],
[(0, b"x"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
reset()
answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11111"),
],
[(0, b"y"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
reset()
answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11112"),
],
[(0, b"y"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
reset()
# eq # eq
answer = write(b"si1", secrets, {0: ([(10, 5, b"eq", b"11112"), answer = write(b"si1", secrets, {0: ([(10, 5, b"eq", b"11112"),
], ],
@ -1186,81 +1189,6 @@ class MutableServer(unittest.TestCase):
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
reset() reset()
# ne
answer = write(b"si1", secrets, {0: ([(10, 5, b"ne", b"11111"),
],
[(0, b"x"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
reset()
answer = write(b"si1", secrets, {0: ([(10, 5, b"ne", b"11112"),
],
[(0, b"y"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
reset()
# ge
answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11110"),
],
[(0, b"y"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
reset()
answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11111"),
],
[(0, b"y"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
reset()
answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11112"),
],
[(0, b"y"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
reset()
# gt
answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11110"),
],
[(0, b"y"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (True, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]})
reset()
answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11111"),
],
[(0, b"x"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
reset()
answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11112"),
],
[(0, b"x"*100)],
None,
)}, [(10,5)])
self.failUnlessEqual(answer, (False, {0: [b"11111"]}))
self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]})
reset()
# finally, test some operators against empty shares # finally, test some operators against empty shares
answer = write(b"si1", secrets, {1: ([(10, 5, b"eq", b"11112"), answer = write(b"si1", secrets, {1: ([(10, 5, b"eq", b"11112"),
], ],

View File

@ -14,6 +14,7 @@ from past.builtins import long
import itertools import itertools
import hashlib import hashlib
import re
from twisted.internet import defer from twisted.internet import defer
from twisted.python.filepath import FilePath from twisted.python.filepath import FilePath
from twisted.web.resource import Resource from twisted.web.resource import Resource
@ -1551,6 +1552,37 @@ class Statistics(MultiFormatResource):
req.setHeader("content-type", "text/plain") req.setHeader("content-type", "text/plain")
return json.dumps(stats, indent=1) + "\n" return json.dumps(stats, indent=1) + "\n"
@render_exception
def render_OPENMETRICS(self, req):
"""
Render our stats in `OpenMetrics <https://openmetrics.io/>` format.
For example Prometheus and Victoriametrics can parse this.
Point the scraper to ``/statistics?t=openmetrics`` (instead of the
default ``/metrics``).
"""
req.setHeader("content-type", "application/openmetrics-text; version=1.0.0; charset=utf-8")
stats = self._provider.get_stats()
ret = []
def mangle_name(name):
return re.sub(
u"_(\d\d)_(\d)_percentile",
u'{quantile="0.\g<1>\g<2>"}',
name.replace(u".", u"_")
)
def mangle_value(val):
return str(val) if val is not None else u"NaN"
for (k, v) in sorted(stats['counters'].items()):
ret.append(u"tahoe_counters_%s %s" % (mangle_name(k), mangle_value(v)))
for (k, v) in sorted(stats['stats'].items()):
ret.append(u"tahoe_stats_%s %s" % (mangle_name(k), mangle_value(v)))
ret.append(u"# EOF\n")
return u"\n".join(ret)
class StatisticsElement(Element): class StatisticsElement(Element):
loader = XMLFile(FilePath(__file__).sibling("statistics.xhtml")) loader = XMLFile(FilePath(__file__).sibling("statistics.xhtml"))