diff --git a/.circleci/config.yml b/.circleci/config.yml index 62d1bd752..2fc8e88e7 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -42,6 +42,9 @@ workflows: - "nixos-19-09": {} + - "nixos-21-05": + {} + # Test against PyPy 2.7 - "pypy27-buster": {} @@ -438,8 +441,7 @@ jobs: image: "tahoelafsci/fedora:29-py" user: "nobody" - - nixos-19-09: + nixos-19-09: &NIXOS docker: # Run in a highly Nix-capable environment. - <<: *DOCKERHUB_AUTH @@ -447,6 +449,7 @@ jobs: environment: NIX_PATH: "nixpkgs=https://github.com/NixOS/nixpkgs-channels/archive/nixos-19.09-small.tar.gz" + SOURCE: "nix/" steps: - "checkout" @@ -463,7 +466,17 @@ jobs: # build a couple simple little dependencies that don't take # advantage of multiple cores and we get a little speedup by doing # them in parallel. - nix-build --cores 3 --max-jobs 2 nix/ + nix-build --cores 3 --max-jobs 2 "$SOURCE" + + nixos-21-05: + <<: *NIXOS + + environment: + # Note this doesn't look more similar to the 19.09 NIX_PATH URL because + # there was some internal shuffling by the NixOS project about how they + # publish stable revisions. + NIX_PATH: "nixpkgs=https://github.com/NixOS/nixpkgs/archive/d32b07e6df276d78e3640eb43882b80c9b2b3459.tar.gz" + SOURCE: "nix/py3.nix" typechecks: docker: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e161ec243..45b2986a3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: - 3.9 include: # On macOS don't bother with 3.6-3.8, just to get faster builds. - - os: macos-latest + - os: macos-10.15 python-version: 2.7 - os: macos-latest python-version: 3.9 @@ -168,7 +168,7 @@ jobs: - 3.9 include: # On macOS don't bother with 3.6, just to get faster builds. - - os: macos-latest + - os: macos-10.15 python-version: 2.7 - os: macos-latest python-version: 3.9 @@ -183,7 +183,7 @@ jobs: # We have to use an older version of Tor for running integration # tests on macOS. - name: Install Tor [macOS, ${{ matrix.python-version }} ] - if: ${{ matrix.os == 'macos-latest' }} + if: ${{ contains(matrix.os, 'macos') }} run: | brew extract --version 0.4.5.8 tor homebrew/cask brew install tor@0.4.5.8 @@ -247,7 +247,7 @@ jobs: fail-fast: false matrix: os: - - macos-latest + - macos-10.15 - windows-latest - ubuntu-latest python-version: diff --git a/docs/proposed/http-storage-node-protocol.rst b/docs/proposed/http-storage-node-protocol.rst index a84d62176..521bf476d 100644 --- a/docs/proposed/http-storage-node-protocol.rst +++ b/docs/proposed/http-storage-node-protocol.rst @@ -482,8 +482,8 @@ The response includes ``already-have`` and ``allocated`` for two reasons: This might be because a server has become unavailable and a remaining server needs to store more shares for the upload. It could also just be that the client's preferred servers have changed. -``PUT /v1/immutable/:storage_index/:share_number`` -!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +``PATCH /v1/immutable/:storage_index/:share_number`` +!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Write data for the indicated share. The share number must belong to the storage index. @@ -497,11 +497,8 @@ If any one of these requests fails then at most 128KiB of upload work needs to b The server must recognize when all of the data has been received and mark the share as complete (which it can do because it was informed of the size when the storage index was initialized). -Clients should upload chunks in re-assembly order. * When a chunk that does not complete the share is successfully uploaded the response is ``OK``. -* When the chunk that completes the share is successfully uploaded the response is ``CREATED``. -* If the *Content-Range* for a request covers part of the share that has already been uploaded the response is ``CONFLICT``. The response body indicates the range of share data that has yet to be uploaded. That is:: @@ -514,6 +511,43 @@ Clients should upload chunks in re-assembly order. ] } +* When the chunk that completes the share is successfully uploaded the response is ``CREATED``. +* If the *Content-Range* for a request covers part of the share that has already, + and the data does not match already written data, + the response is ``CONFLICT``. + At this point the only thing to do is abort the upload and start from scratch (see below). + +``PUT /v1/immutable/:storage_index/:share_number/abort`` +!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! + +This cancels an *in-progress* upload. + +The response code: + +* When the upload is still in progress and therefore the abort has succeeded, + the response is ``OK``. + Future uploads can start from scratch with no pre-existing upload state stored on the server. +* If the uploaded has already finished, the response is 405 (Method Not Allowed) + and no change is made. + + +Discussion +`````````` + +``PUT`` verbs are only supposed to be used to replace the whole resource, +thus the use of ``PATCH``. +From RFC 7231:: + + An origin server that allows PUT on a given target resource MUST send + a 400 (Bad Request) response to a PUT request that contains a + Content-Range header field (Section 4.2 of [RFC7233]), since the + payload is likely to be partial content that has been mistakenly PUT + as a full representation. Partial content updates are possible by + targeting a separately identified resource with state that overlaps a + portion of the larger resource, or by using a different method that + has been specifically defined for partial updates (for example, the + PATCH method defined in [RFC5789]). + ``POST /v1/immutable/:storage_index/:share_number/corrupt`` !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! @@ -600,7 +634,6 @@ For example:: "test": [{ "offset": 3, "size": 5, - "operator": "eq", "specimen": "hello" }, ...], "write": [{ @@ -626,6 +659,9 @@ For example:: } } +A test vector or read vector that read beyond the boundaries of existing data will return nothing for any bytes past the end. +As a result, if there is no data at all, an empty bytestring is returned no matter what the offset or length. + Reading ~~~~~~~ @@ -666,19 +702,19 @@ Immutable Data #. Upload the content for immutable share ``7``:: - PUT /v1/immutable/AAAAAAAAAAAAAAAA/7 + PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7 Content-Range: bytes 0-15/48 200 OK - PUT /v1/immutable/AAAAAAAAAAAAAAAA/7 + PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7 Content-Range: bytes 16-31/48 200 OK - PUT /v1/immutable/AAAAAAAAAAAAAAAA/7 + PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7 Content-Range: bytes 32-47/48 @@ -701,7 +737,10 @@ Immutable Data Mutable Data ~~~~~~~~~~~~ -1. Create mutable share number ``3`` with ``10`` bytes of data in slot ``BBBBBBBBBBBBBBBB``:: +1. Create mutable share number ``3`` with ``10`` bytes of data in slot ``BBBBBBBBBBBBBBBB``. +The special test vector of size 1 but empty bytes will only pass +if there is no existing share, +otherwise it will read a byte which won't match `b""`:: POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write { @@ -715,7 +754,6 @@ Mutable Data "test": [{ "offset": 0, "size": 1, - "operator": "eq", "specimen": "" }], "write": [{ @@ -747,8 +785,7 @@ Mutable Data 3: { "test": [{ "offset": 0, - "size": , - "operator": "eq", + "size": , "specimen": "" }], "write": [{ diff --git a/newsfragments/3786.feature b/newsfragments/3786.feature new file mode 100644 index 000000000..ecbfc0372 --- /dev/null +++ b/newsfragments/3786.feature @@ -0,0 +1 @@ +tahoe-lafs now provides its statistics also in OpenMetrics format (for Prometheus et. al.) at `/statistics?t=openmetrics`. diff --git a/newsfragments/3793.minor b/newsfragments/3793.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3795.minor b/newsfragments/3795.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3797.minor b/newsfragments/3797.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3799.minor b/newsfragments/3799.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3801.bugfix b/newsfragments/3801.bugfix new file mode 100644 index 000000000..504b3999d --- /dev/null +++ b/newsfragments/3801.bugfix @@ -0,0 +1 @@ +When uploading an immutable, overlapping writes that include conflicting data are rejected. In practice, this likely didn't happen in real-world usage. \ No newline at end of file diff --git a/newsfragments/3805.minor b/newsfragments/3805.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3806.minor b/newsfragments/3806.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3808.installation b/newsfragments/3808.installation new file mode 100644 index 000000000..157f08a0c --- /dev/null +++ b/newsfragments/3808.installation @@ -0,0 +1 @@ +Tahoe-LAFS now supports running on NixOS 21.05 with Python 3. diff --git a/newsfragments/3810.minor b/newsfragments/3810.minor new file mode 100644 index 000000000..e69de29bb diff --git a/nix/collections-extended.nix b/nix/collections-extended.nix new file mode 100644 index 000000000..3f1ad165a --- /dev/null +++ b/nix/collections-extended.nix @@ -0,0 +1,19 @@ +{ lib, buildPythonPackage, fetchPypi }: +buildPythonPackage rec { + pname = "collections-extended"; + version = "1.0.3"; + + src = fetchPypi { + inherit pname version; + sha256 = "0lb69x23asd68n0dgw6lzxfclavrp2764xsnh45jm97njdplznkw"; + }; + + # Tests aren't in tarball, for 1.0.3 at least. + doCheck = false; + + meta = with lib; { + homepage = https://github.com/mlenzen/collections-extended; + description = "Extra Python Collections - bags (multisets), setlists (unique list / indexed set), RangeMap and IndexedDict"; + license = licenses.asl20; + }; +} diff --git a/nix/overlays.nix b/nix/overlays.nix index 2bf58575e..fbd0ce3bb 100644 --- a/nix/overlays.nix +++ b/nix/overlays.nix @@ -2,22 +2,32 @@ self: super: { python27 = super.python27.override { packageOverrides = python-self: python-super: { # eliot is not part of nixpkgs at all at this time. - eliot = python-self.callPackage ./eliot.nix { }; + eliot = python-self.pythonPackages.callPackage ./eliot.nix { }; # NixOS autobahn package has trollius as a dependency, although # it is optional. Trollius is unmaintained and fails on CI. - autobahn = python-super.callPackage ./autobahn.nix { }; + autobahn = python-super.pythonPackages.callPackage ./autobahn.nix { }; # Porting to Python 3 is greatly aided by the future package. A # slightly newer version than appears in nixos 19.09 is helpful. - future = python-super.callPackage ./future.nix { }; + future = python-super.pythonPackages.callPackage ./future.nix { }; # Need version of pyutil that supports Python 3. The version in 19.09 # is too old. - pyutil = python-super.callPackage ./pyutil.nix { }; + pyutil = python-super.pythonPackages.callPackage ./pyutil.nix { }; # Need a newer version of Twisted, too. - twisted = python-super.callPackage ./twisted.nix { }; + twisted = python-super.pythonPackages.callPackage ./twisted.nix { }; + + # collections-extended is not part of nixpkgs at this time. + collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { }; + }; + }; + + python39 = super.python39.override { + packageOverrides = python-self: python-super: { + # collections-extended is not part of nixpkgs at this time. + collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { }; }; }; } diff --git a/nix/py3.nix b/nix/py3.nix new file mode 100644 index 000000000..34ede49dd --- /dev/null +++ b/nix/py3.nix @@ -0,0 +1,7 @@ +# This is the main entrypoint for the Tahoe-LAFS derivation. +{ pkgs ? import { } }: +# Add our Python packages to nixpkgs to simplify the expression for the +# Tahoe-LAFS derivation. +let pkgs' = pkgs.extend (import ./overlays.nix); +# Evaluate the expression for our Tahoe-LAFS derivation. +in pkgs'.python39.pkgs.callPackage ./tahoe-lafs.nix { } diff --git a/nix/tahoe-lafs.nix b/nix/tahoe-lafs.nix index 35b29f1cc..c7db6c583 100644 --- a/nix/tahoe-lafs.nix +++ b/nix/tahoe-lafs.nix @@ -97,7 +97,7 @@ EOF setuptoolsTrial pyasn1 zope_interface service-identity pyyaml magic-wormhole treq eliot autobahn cryptography netifaces setuptools - future pyutil distro configparser + future pyutil distro configparser collections-extended ]; checkInputs = with python.pkgs; [ @@ -107,6 +107,7 @@ EOF beautifulsoup4 html5lib tenacity + prometheus_client ]; checkPhase = '' diff --git a/setup.py b/setup.py index e1d711ccf..8c6396937 100644 --- a/setup.py +++ b/setup.py @@ -137,6 +137,9 @@ install_requires = [ # Backported configparser for Python 2: "configparser ; python_version < '3.0'", + + # For the RangeMap datastructure. + "collections-extended", ] setup_requires = [ @@ -404,6 +407,8 @@ setup(name="tahoe-lafs", # also set in __init__.py "tenacity", "paramiko", "pytest-timeout", + # Does our OpenMetrics endpoint adhere to the spec: + "prometheus-client == 0.11.0", ] + tor_requires + i2p_requires, "tor": tor_requires, "i2p": i2p_requires, diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 1c64bce8a..5522663ee 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -53,6 +53,14 @@ LeaseRenewSecret = Hash # used to protect lease renewal requests LeaseCancelSecret = Hash # was used to protect lease cancellation requests +class DataTooLargeError(Exception): + """The write went past the expected size of the bucket.""" + + +class ConflictingWriteError(Exception): + """Two writes happened to same immutable with different data.""" + + class RIBucketWriter(RemoteInterface): """ Objects of this kind live on the server side. """ def write(offset=Offset, data=ShareData): @@ -91,9 +99,9 @@ class RIBucketReader(RemoteInterface): TestVector = ListOf(TupleOf(Offset, ReadSize, bytes, bytes)) # elements are (offset, length, operator, specimen) -# operator is one of "lt, le, eq, ne, ge, gt" -# nop always passes and is used to fetch data while writing. -# you should use length==len(specimen) for everything except nop +# operator must be b"eq", typically length==len(specimen), but one can ensure +# writes don't happen to empty shares by setting length to 1 and specimen to +# b"". The operator is still used for wire compatibility with old versions. DataVector = ListOf(TupleOf(Offset, ShareData)) # (offset, data). This limits us to 30 writes of 1MiB each per call TestAndWriteVectorsForShares = DictOf(int, @@ -351,6 +359,12 @@ class IStorageServer(Interface): ): """ :see: ``RIStorageServer.slot_testv_readv_and_writev`` + + While the interface mostly matches, test vectors are simplified. + Instead of a tuple ``(offset, read_size, operator, expected_data)`` in + the original, for this method you need only pass in + ``(offset, read_size, expected_data)``, with the operator implicitly + being ``b"eq"``. """ def advise_corrupt_share( diff --git a/src/allmydata/mutable/layout.py b/src/allmydata/mutable/layout.py index ce51a8833..8bb2f3083 100644 --- a/src/allmydata/mutable/layout.py +++ b/src/allmydata/mutable/layout.py @@ -309,7 +309,7 @@ class SDMFSlotWriteProxy(object): salt) else: checkstring = checkstring_or_seqnum - self._testvs = [(0, len(checkstring), b"eq", checkstring)] + self._testvs = [(0, len(checkstring), checkstring)] def get_checkstring(self): @@ -318,7 +318,7 @@ class SDMFSlotWriteProxy(object): server. """ if self._testvs: - return self._testvs[0][3] + return self._testvs[0][2] return b"" @@ -548,9 +548,9 @@ class SDMFSlotWriteProxy(object): if not self._testvs: # Our caller has not provided us with another checkstring # yet, so we assume that we are writing a new share, and set - # a test vector that will allow a new share to be written. + # a test vector that will only allow a new share to be written. self._testvs = [] - self._testvs.append(tuple([0, 1, b"eq", b""])) + self._testvs.append(tuple([0, 1, b""])) tw_vectors = {} tw_vectors[self.shnum] = (self._testvs, datavs, None) @@ -889,7 +889,7 @@ class MDMFSlotWriteProxy(object): self._testvs = [] else: self._testvs = [] - self._testvs.append((0, len(checkstring), b"eq", checkstring)) + self._testvs.append((0, len(checkstring), checkstring)) def __repr__(self): @@ -1161,8 +1161,10 @@ class MDMFSlotWriteProxy(object): """I write the data vectors in datavs to the remote slot.""" tw_vectors = {} if not self._testvs: + # Make sure we will only successfully write if the share didn't + # previously exist. self._testvs = [] - self._testvs.append(tuple([0, 1, b"eq", b""])) + self._testvs.append(tuple([0, 1, b""])) if not self._written: # Write a new checkstring to the share when we write it, so # that we have something to check later. @@ -1170,7 +1172,7 @@ class MDMFSlotWriteProxy(object): datavs.append((0, new_checkstring)) def _first_write(): self._written = True - self._testvs = [(0, len(new_checkstring), b"eq", new_checkstring)] + self._testvs = [(0, len(new_checkstring), new_checkstring)] on_success = _first_write tw_vectors[self.shnum] = (self._testvs, datavs, None) d = self._storage_server.slot_testv_and_readv_and_writev( diff --git a/src/allmydata/storage/common.py b/src/allmydata/storage/common.py index cb6116e5b..e5563647f 100644 --- a/src/allmydata/storage/common.py +++ b/src/allmydata/storage/common.py @@ -13,8 +13,9 @@ if PY2: import os.path from allmydata.util import base32 -class DataTooLargeError(Exception): - pass +# Backwards compatibility. +from allmydata.interfaces import DataTooLargeError # noqa: F401 + class UnknownMutableContainerVersionError(Exception): pass class UnknownImmutableContainerVersionError(Exception): diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index 4b60d79f1..b8b18f140 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -13,16 +13,20 @@ if PY2: import os, stat, struct, time +from collections_extended import RangeMap + from foolscap.api import Referenceable from zope.interface import implementer -from allmydata.interfaces import RIBucketWriter, RIBucketReader +from allmydata.interfaces import ( + RIBucketWriter, RIBucketReader, ConflictingWriteError, + DataTooLargeError, +) from allmydata.util import base32, fileutil, log from allmydata.util.assertutil import precondition from allmydata.util.hashutil import timing_safe_compare from allmydata.storage.lease import LeaseInfo -from allmydata.storage.common import UnknownImmutableContainerVersionError, \ - DataTooLargeError +from allmydata.storage.common import UnknownImmutableContainerVersionError # each share file (in storage/shares/$SI/$SHNUM) contains lease information # and share data. The share data is accessed by RIBucketWriter.write and @@ -204,19 +208,18 @@ class ShareFile(object): @implementer(RIBucketWriter) class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 - def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary): + def __init__(self, ss, incominghome, finalhome, max_size, lease_info): self.ss = ss self.incominghome = incominghome self.finalhome = finalhome self._max_size = max_size # don't allow the client to write more than this - self._canary = canary - self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) self.closed = False self.throw_out_all_data = False self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) # also, add our lease to the file now, so that other ones can be # added by simultaneous uploaders self._sharefile.add_lease(lease_info) + self._already_written = RangeMap() def allocated_size(self): return self._max_size @@ -226,7 +229,20 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 precondition(not self.closed) if self.throw_out_all_data: return + + # Make sure we're not conflicting with existing data: + end = offset + len(data) + for (chunk_start, chunk_stop, _) in self._already_written.ranges(offset, end): + chunk_len = chunk_stop - chunk_start + actual_chunk = self._sharefile.read_share_data(chunk_start, chunk_len) + writing_chunk = data[chunk_start - offset:chunk_stop - offset] + if actual_chunk != writing_chunk: + raise ConflictingWriteError( + "Chunk {}-{} doesn't match already written data.".format(chunk_start, chunk_stop) + ) self._sharefile.write_share_data(offset, data) + + self._already_written.set(True, offset, end) self.ss.add_latency("write", time.time() - start) self.ss.count("write") @@ -262,22 +278,19 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 pass self._sharefile = None self.closed = True - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) filelen = os.stat(self.finalhome)[stat.ST_SIZE] self.ss.bucket_writer_closed(self, filelen) self.ss.add_latency("close", time.time() - start) self.ss.count("close") - def _disconnected(self): + def disconnected(self): if not self.closed: self._abort() def remote_abort(self): log.msg("storage: aborting sharefile %s" % self.incominghome, facility="tahoe.storage", level=log.UNUSUAL) - if not self.closed: - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) self._abort() self.ss.count("abort") diff --git a/src/allmydata/storage/mutable.py b/src/allmydata/storage/mutable.py index a44a2e18d..2ef0c3215 100644 --- a/src/allmydata/storage/mutable.py +++ b/src/allmydata/storage/mutable.py @@ -434,20 +434,9 @@ class MutableShareFile(object): # self._change_container_size() here. def testv_compare(a, op, b): - assert op in (b"lt", b"le", b"eq", b"ne", b"ge", b"gt") - if op == b"lt": - return a < b - if op == b"le": - return a <= b - if op == b"eq": - return a == b - if op == b"ne": - return a != b - if op == b"ge": - return a >= b - if op == b"gt": - return a > b - # never reached + assert op == b"eq" + return a == b + class EmptyShare(object): diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index f4996756e..041783a4e 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -11,13 +11,14 @@ if PY2: # Omit open() to get native behavior where open("w") always accepts native # strings. Omit bytes so we don't leak future's custom bytes. from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401 - +else: + from typing import Dict import os, re, struct, time -import weakref import six from foolscap.api import Referenceable +from foolscap.ipb import IRemoteReference from twisted.application import service from zope.interface import implementer @@ -89,7 +90,6 @@ class StorageServer(service.MultiService, Referenceable): self.incomingdir = os.path.join(sharedir, 'incoming') self._clean_incomplete() fileutil.make_dirs(self.incomingdir) - self._active_writers = weakref.WeakKeyDictionary() log.msg("StorageServer created", facility="tahoe.storage") if reserved_space: @@ -121,6 +121,17 @@ class StorageServer(service.MultiService, Referenceable): self.lease_checker.setServiceParent(self) self._get_current_time = get_current_time + # Currently being-written Bucketwriters. For Foolscap, lifetime is tied + # to connection: when disconnection happens, the BucketWriters are + # removed. For HTTP, this makes no sense, so there will be + # timeout-based cleanup; see + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807. + + # Map in-progress filesystem path -> BucketWriter: + self._bucket_writers = {} # type: Dict[str,BucketWriter] + # Canaries and disconnect markers for BucketWriters created via Foolscap: + self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)] + def __repr__(self): return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) @@ -238,7 +249,7 @@ class StorageServer(service.MultiService, Referenceable): def allocated_size(self): space = 0 - for bw in self._active_writers: + for bw in self._bucket_writers.values(): space += bw.allocated_size() return space @@ -263,10 +274,13 @@ class StorageServer(service.MultiService, Referenceable): } return version - def remote_allocate_buckets(self, storage_index, - renew_secret, cancel_secret, - sharenums, allocated_size, - canary, owner_num=0): + def _allocate_buckets(self, storage_index, + renew_secret, cancel_secret, + sharenums, allocated_size, + owner_num=0): + """ + Generic bucket allocation API. + """ # owner_num is not for clients to set, but rather it should be # curried into the PersonalStorageServer instance that is dedicated # to a particular owner. @@ -315,7 +329,7 @@ class StorageServer(service.MultiService, Referenceable): # great! we already have it. easy. pass elif os.path.exists(incominghome): - # Note that we don't create BucketWriters for shnums that + # For Foolscap we don't create BucketWriters for shnums that # have a partial share (in incoming/), so if a second upload # occurs while the first is still in progress, the second # uploader will use different storage servers. @@ -323,11 +337,11 @@ class StorageServer(service.MultiService, Referenceable): elif (not limited) or (remaining_space >= max_space_per_bucket): # ok! we need to create the new share file. bw = BucketWriter(self, incominghome, finalhome, - max_space_per_bucket, lease_info, canary) + max_space_per_bucket, lease_info) if self.no_storage: bw.throw_out_all_data = True bucketwriters[shnum] = bw - self._active_writers[bw] = 1 + self._bucket_writers[incominghome] = bw if limited: remaining_space -= max_space_per_bucket else: @@ -340,6 +354,21 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("allocate", self._get_current_time() - start) return alreadygot, bucketwriters + def remote_allocate_buckets(self, storage_index, + renew_secret, cancel_secret, + sharenums, allocated_size, + canary, owner_num=0): + """Foolscap-specific ``allocate_buckets()`` API.""" + alreadygot, bucketwriters = self._allocate_buckets( + storage_index, renew_secret, cancel_secret, sharenums, allocated_size, + owner_num=owner_num, + ) + # Abort BucketWriters if disconnection happens. + for bw in bucketwriters.values(): + disconnect_marker = canary.notifyOnDisconnect(bw.disconnected) + self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker) + return alreadygot, bucketwriters + def _iter_share_files(self, storage_index): for shnum, filename in self._get_bucket_shares(storage_index): with open(filename, 'rb') as f: @@ -383,7 +412,10 @@ class StorageServer(service.MultiService, Referenceable): def bucket_writer_closed(self, bw, consumed_size): if self.stats_provider: self.stats_provider.count('storage_server.bytes_added', consumed_size) - del self._active_writers[bw] + del self._bucket_writers[bw.incominghome] + if bw in self._bucket_writer_disconnect_markers: + canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) + canary.dontNotifyOnDisconnect(disconnect_marker) def _get_bucket_shares(self, storage_index): """Return a list of (shnum, pathname) tuples for files that hold diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py index e9dc8c84c..ac6c107d5 100644 --- a/src/allmydata/storage_client.py +++ b/src/allmydata/storage_client.py @@ -994,11 +994,19 @@ class _StorageServer(object): tw_vectors, r_vector, ): + # Match the wire protocol, which requires 4-tuples for test vectors. + wire_format_tw_vectors = { + key: ( + [(start, length, b"eq", data) for (start, length, data) in value[0]], + value[1], + value[2], + ) for (key, value) in tw_vectors.items() + } return self._rref.callRemote( "slot_testv_and_readv_and_writev", storage_index, secrets, - tw_vectors, + wire_format_tw_vectors, r_vector, ) diff --git a/src/allmydata/test/common_util.py b/src/allmydata/test/common_util.py index b5229ca11..d2d20916d 100644 --- a/src/allmydata/test/common_util.py +++ b/src/allmydata/test/common_util.py @@ -314,6 +314,16 @@ class FakeCanary(object): def getPeer(self): return "" + def disconnected(self): + """Disconnect the canary, to be called by test code. + + Can only happen once. + """ + if self.disconnectors is not None: + for (f, args, kwargs) in list(self.disconnectors.values()): + f(*args, **kwargs) + self.disconnectors = None + class ShouldFailMixin(object): diff --git a/src/allmydata/test/mutable/util.py b/src/allmydata/test/mutable/util.py index 7e3bd3ec7..dac61a6e3 100644 --- a/src/allmydata/test/mutable/util.py +++ b/src/allmydata/test/mutable/util.py @@ -149,7 +149,7 @@ class FakeStorageServer(object): readv = {} for shnum, (testv, writev, new_length) in list(tw_vectors.items()): for (offset, length, op, specimen) in testv: - assert op in (b"le", b"eq", b"ge") + assert op == b"eq" # TODO: this isn't right, the read is controlled by read_vector, # not by testv readv[shnum] = [ specimen diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 328000489..5b704e6e0 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -20,11 +20,9 @@ if PY2: from random import Random -from testtools import skipIf - from twisted.internet.defer import inlineCallbacks -from foolscap.api import Referenceable +from foolscap.api import Referenceable, RemoteException from allmydata.interfaces import IStorageServer from .common_system import SystemTestMixin @@ -77,6 +75,10 @@ class IStorageServerImmutableAPIsTestsMixin(object): Tests for ``IStorageServer``'s immutable APIs. ``self.storage_server`` is expected to provide ``IStorageServer``. + + ``self.disconnect()`` should disconnect and then reconnect, creating a new + ``self.storage_server``. Some implementations may wish to skip tests using + this; HTTP has no notion of disconnection. """ @inlineCallbacks @@ -98,13 +100,10 @@ class IStorageServerImmutableAPIsTestsMixin(object): # We validate the bucket objects' interface in a later test. @inlineCallbacks - @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793") def test_allocate_buckets_repeat(self): """ - allocate_buckets() with the same storage index returns the same result, - because the shares have not been written to. - - This fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793 + ``IStorageServer.allocate_buckets()`` with the same storage index does not return + work-in-progress buckets, but will add any newly added buckets. """ storage_index, renew_secret, cancel_secret = ( new_storage_index(), @@ -115,7 +114,7 @@ class IStorageServerImmutableAPIsTestsMixin(object): storage_index, renew_secret, cancel_secret, - sharenums=set(range(5)), + sharenums=set(range(4)), allocated_size=1024, canary=Referenceable(), ) @@ -128,40 +127,51 @@ class IStorageServerImmutableAPIsTestsMixin(object): Referenceable(), ) self.assertEqual(already_got, already_got2) - self.assertEqual(set(allocated.keys()), set(allocated2.keys())) + self.assertEqual(set(allocated2.keys()), {4}) - @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793") @inlineCallbacks - def test_allocate_buckets_more_sharenums(self): + def test_disconnection(self): """ - allocate_buckets() with the same storage index but more sharenums - acknowledges the extra shares don't exist. + If we disconnect in the middle of writing to a bucket, all data is + wiped, and it's even possible to write different data to the bucket. - Fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793 + (In the real world one shouldn't do that, but writing different data is + a good way to test that the original data really was wiped.) + + HTTP protocol should skip this test, since disconnection is meaningless + concept; this is more about testing implicit contract the Foolscap + implementation depends on doesn't change as we refactor things. """ storage_index, renew_secret, cancel_secret = ( new_storage_index(), new_secret(), new_secret(), ) - yield self.storage_server.allocate_buckets( + (_, allocated) = yield self.storage_server.allocate_buckets( storage_index, renew_secret, cancel_secret, - sharenums=set(range(5)), + sharenums={0}, allocated_size=1024, canary=Referenceable(), ) - (already_got2, allocated2) = yield self.storage_server.allocate_buckets( + + # Bucket 1 is fully written in one go. + yield allocated[0].callRemote("write", 0, b"1" * 1024) + + # Disconnect: + yield self.disconnect() + + # Write different data with no complaint: + (_, allocated) = yield self.storage_server.allocate_buckets( storage_index, renew_secret, cancel_secret, - sharenums=set(range(7)), + sharenums={0}, allocated_size=1024, canary=Referenceable(), ) - self.assertEqual(already_got2, set()) # none were fully written - self.assertEqual(set(allocated2.keys()), set(range(7))) + yield allocated[0].callRemote("write", 0, b"2" * 1024) @inlineCallbacks def test_written_shares_are_allocated(self): @@ -248,26 +258,516 @@ class IStorageServerImmutableAPIsTestsMixin(object): (yield buckets[2].callRemote("read", 0, 1024)), b"3" * 512 + b"4" * 512 ) - @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801") - def test_overlapping_writes(self): + @inlineCallbacks + def test_non_matching_overlapping_writes(self): """ - The policy for overlapping writes is TBD: - https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3801 + When doing overlapping writes in immutable uploads, non-matching writes + fail. """ + storage_index, renew_secret, cancel_secret = ( + new_storage_index(), + new_secret(), + new_secret(), + ) + (_, allocated) = yield self.storage_server.allocate_buckets( + storage_index, + renew_secret, + cancel_secret, + sharenums={0}, + allocated_size=30, + canary=Referenceable(), + ) + + yield allocated[0].callRemote("write", 0, b"1" * 25) + # Overlapping write that doesn't match: + with self.assertRaises(RemoteException): + yield allocated[0].callRemote("write", 20, b"2" * 10) + + @inlineCallbacks + def test_matching_overlapping_writes(self): + """ + When doing overlapping writes in immutable uploads, matching writes + succeed. + """ + storage_index, renew_secret, cancel_secret = ( + new_storage_index(), + new_secret(), + new_secret(), + ) + (_, allocated) = yield self.storage_server.allocate_buckets( + storage_index, + renew_secret, + cancel_secret, + sharenums={0}, + allocated_size=25, + canary=Referenceable(), + ) + + yield allocated[0].callRemote("write", 0, b"1" * 10) + # Overlapping write that matches: + yield allocated[0].callRemote("write", 5, b"1" * 20) + yield allocated[0].callRemote("close") + + buckets = yield self.storage_server.get_buckets(storage_index) + self.assertEqual(set(buckets.keys()), {0}) + + self.assertEqual((yield buckets[0].callRemote("read", 0, 25)), b"1" * 25) + + @inlineCallbacks + def test_get_buckets_skips_unfinished_buckets(self): + """ + Buckets that are not fully written are not returned by + ``IStorageServer.get_buckets()`` implementations. + """ + storage_index = new_storage_index() + (_, allocated) = yield self.storage_server.allocate_buckets( + storage_index, + renew_secret=new_secret(), + cancel_secret=new_secret(), + sharenums=set(range(5)), + allocated_size=10, + canary=Referenceable(), + ) + + # Bucket 1 is fully written + yield allocated[1].callRemote("write", 0, b"1" * 10) + yield allocated[1].callRemote("close") + + # Bucket 2 is partially written + yield allocated[2].callRemote("write", 0, b"1" * 5) + + buckets = yield self.storage_server.get_buckets(storage_index) + self.assertEqual(set(buckets.keys()), {1}) + + @inlineCallbacks + def test_read_bucket_at_offset(self): + """ + Given a read bucket returned from ``IStorageServer.get_buckets()``, it + is possible to read at different offsets and lengths, with reads past + the end resulting in empty bytes. + """ + length = 256 * 17 + + storage_index = new_storage_index() + (_, allocated) = yield self.storage_server.allocate_buckets( + storage_index, + renew_secret=new_secret(), + cancel_secret=new_secret(), + sharenums=set(range(1)), + allocated_size=length, + canary=Referenceable(), + ) + + total_data = _randbytes(256 * 17) + yield allocated[0].callRemote("write", 0, total_data) + yield allocated[0].callRemote("close") + + buckets = yield self.storage_server.get_buckets(storage_index) + bucket = buckets[0] + for start, to_read in [ + (0, 250), # fraction + (0, length), # whole thing + (100, 1024), # offset fraction + (length + 1, 100), # completely out of bounds + (length - 100, 200), # partially out of bounds + ]: + data = yield bucket.callRemote("read", start, to_read) + self.assertEqual( + data, + total_data[start : start + to_read], + "Didn't match for start {}, length {}".format(start, to_read), + ) + + @inlineCallbacks + def test_bucket_advise_corrupt_share(self): + """ + Calling ``advise_corrupt_share()`` on a bucket returned by + ``IStorageServer.get_buckets()`` does not result in error (other + behavior is opaque at this level of abstraction). + """ + storage_index = new_storage_index() + (_, allocated) = yield self.storage_server.allocate_buckets( + storage_index, + renew_secret=new_secret(), + cancel_secret=new_secret(), + sharenums=set(range(1)), + allocated_size=10, + canary=Referenceable(), + ) + + yield allocated[0].callRemote("write", 0, b"0123456789") + yield allocated[0].callRemote("close") + + buckets = yield self.storage_server.get_buckets(storage_index) + yield buckets[0].callRemote("advise_corrupt_share", b"OH NO") + + +class IStorageServerMutableAPIsTestsMixin(object): + """ + Tests for ``IStorageServer``'s mutable APIs. + + ``self.storage_server`` is expected to provide ``IStorageServer``. + + ``STARAW`` is short for ``slot_testv_and_readv_and_writev``. + """ + + def new_secrets(self): + """Return a 3-tuple of secrets for STARAW calls.""" + return (new_secret(), new_secret(), new_secret()) + + def staraw(self, *args, **kwargs): + """Like ``slot_testv_and_readv_and_writev``, but less typing.""" + return self.storage_server.slot_testv_and_readv_and_writev(*args, **kwargs) + + @inlineCallbacks + def test_STARAW_reads_after_write(self): + """ + When data is written with + ``IStorageServer.slot_testv_and_readv_and_writev``, it can then be read + by a separate call using that API. + """ + secrets = self.new_secrets() + storage_index = new_storage_index() + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"abcdefg")], 7), + 1: ([], [(0, b"0123"), (4, b"456")], 7), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + (_, reads) = yield self.staraw( + storage_index, + secrets, + tw_vectors={}, + # Whole thing, partial, going beyond the edge, completely outside + # range: + r_vector=[(0, 7), (2, 3), (6, 8), (100, 10)], + ) + self.assertEqual( + reads, + {0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]}, + ) + + @inlineCallbacks + def test_SATRAW_reads_happen_before_writes_in_single_query(self): + """ + If a ``IStorageServer.slot_testv_and_readv_and_writev`` command + contains both reads and writes, the read returns results that precede + the write. + """ + secrets = self.new_secrets() + storage_index = new_storage_index() + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"abcdefg")], 7), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + # Read and write in same command; read happens before write: + (written, reads) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"X" * 7)], 7), + }, + r_vector=[(0, 7)], + ) + self.assertEqual(written, True) + self.assertEqual(reads, {0: [b"abcdefg"]}) + + # The write is available in next read: + (_, reads) = yield self.staraw( + storage_index, + secrets, + tw_vectors={}, + r_vector=[(0, 7)], + ) + self.assertEqual(reads, {0: [b"X" * 7]}) + + @inlineCallbacks + def test_SATRAW_writes_happens_only_if_test_matches(self): + """ + If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes both a + test and a write, the write succeeds if the test matches, and fails if + the test does not match. + """ + secrets = self.new_secrets() + storage_index = new_storage_index() + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"1" * 7)], 7), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + # Test matches, so write happens: + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ( + [(0, 3, b"1" * 3), (3, 4, b"1" * 4)], + [(0, b"2" * 7)], + 7, + ), + }, + r_vector=[], + ) + self.assertEqual(written, True) + (_, reads) = yield self.staraw( + storage_index, + secrets, + tw_vectors={}, + r_vector=[(0, 7)], + ) + self.assertEqual(reads, {0: [b"2" * 7]}) + + # Test does not match, so write does not happen: + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([(0, 7, b"1" * 7)], [(0, b"3" * 7)], 7), + }, + r_vector=[], + ) + self.assertEqual(written, False) + (_, reads) = yield self.staraw( + storage_index, + secrets, + tw_vectors={}, + r_vector=[(0, 7)], + ) + self.assertEqual(reads, {0: [b"2" * 7]}) + + @inlineCallbacks + def test_SATRAW_tests_past_end_of_data(self): + """ + If a ``IStorageServer.slot_testv_and_readv_and_writev`` includes a test + vector that reads past the end of the data, the result is limited to + actual available data. + """ + secrets = self.new_secrets() + storage_index = new_storage_index() + + # Since there is no data on server, the test vector will return empty + # string, which matches expected result, so write will succeed. + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([(0, 10, b"")], [(0, b"1" * 7)], 7), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + # Now the test vector is a 10-read off of a 7-byte value, but expected + # value is still 7 bytes, so the write will again succeed. + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([(0, 10, b"1" * 7)], [(0, b"2" * 7)], 7), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + @inlineCallbacks + def test_SATRAW_reads_past_end_of_data(self): + """ + If a ``IStorageServer.slot_testv_and_readv_and_writev`` reads past the + end of the data, the result is limited to actual available data. + """ + secrets = self.new_secrets() + storage_index = new_storage_index() + + # Write some data + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"12345")], 5), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + # Reads past end. + (_, reads) = yield self.staraw( + storage_index, + secrets, + tw_vectors={}, + r_vector=[(0, 100), (2, 50)], + ) + self.assertEqual(reads, {0: [b"12345", b"345"]}) + + @inlineCallbacks + def test_STARAW_write_enabler_must_match(self): + """ + If the write enabler secret passed to + ``IStorageServer.slot_testv_and_readv_and_writev`` doesn't match + previous writes, the write fails. + """ + secrets = self.new_secrets() + storage_index = new_storage_index() + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"1" * 7)], 7), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + # Write enabler secret does not match, so write does not happen: + bad_secrets = (new_secret(),) + secrets[1:] + with self.assertRaises(RemoteException): + yield self.staraw( + storage_index, + bad_secrets, + tw_vectors={ + 0: ([], [(0, b"2" * 7)], 7), + }, + r_vector=[], + ) + (_, reads) = yield self.staraw( + storage_index, + secrets, + tw_vectors={}, + r_vector=[(0, 7)], + ) + self.assertEqual(reads, {0: [b"1" * 7]}) + + @inlineCallbacks + def test_STARAW_zero_new_length_deletes(self): + """ + A zero new length passed to + ``IStorageServer.slot_testv_and_readv_and_writev`` deletes the share. + """ + secrets = self.new_secrets() + storage_index = new_storage_index() + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"1" * 7)], 7), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + # Write with new length of 0: + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"1" * 7)], 0), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + # It's gone! + (_, reads) = yield self.staraw( + storage_index, + secrets, + tw_vectors={}, + r_vector=[(0, 7)], + ) + self.assertEqual(reads, {}) + + @inlineCallbacks + def test_slot_readv(self): + """ + Data written with ``IStorageServer.slot_testv_and_readv_and_writev()`` + can be read using ``IStorageServer.slot_readv()``. Reads can't go past + the end of the data. + """ + secrets = self.new_secrets() + storage_index = new_storage_index() + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"abcdefg")], 7), + 1: ([], [(0, b"0123"), (4, b"456")], 7), + # This will never get read from, just here to show we only read + # from shares explicitly requested by slot_readv: + 2: ([], [(0, b"XYZW")], 4), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + reads = yield self.storage_server.slot_readv( + storage_index, + shares=[0, 1], + # Whole thing, partial, going beyond the edge, completely outside + # range: + readv=[(0, 7), (2, 3), (6, 8), (100, 10)], + ) + self.assertEqual( + reads, + {0: [b"abcdefg", b"cde", b"g", b""], 1: [b"0123456", b"234", b"6", b""]}, + ) + + @inlineCallbacks + def test_slot_readv_no_shares(self): + """ + With no shares given, ``IStorageServer.slot_readv()`` reads from all shares. + """ + secrets = self.new_secrets() + storage_index = new_storage_index() + (written, _) = yield self.staraw( + storage_index, + secrets, + tw_vectors={ + 0: ([], [(0, b"abcdefg")], 7), + 1: ([], [(0, b"0123456")], 7), + 2: ([], [(0, b"9876543")], 7), + }, + r_vector=[], + ) + self.assertEqual(written, True) + + reads = yield self.storage_server.slot_readv( + storage_index, + shares=[], + readv=[(0, 7)], + ) + self.assertEqual( + reads, + {0: [b"abcdefg"], 1: [b"0123456"], 2: [b"9876543"]}, + ) class _FoolscapMixin(SystemTestMixin): """Run tests on Foolscap version of ``IStorageServer.""" + def _get_native_server(self): + return next(iter(self.clients[0].storage_broker.get_known_servers())) + @inlineCallbacks def setUp(self): AsyncTestCase.setUp(self) self.basedir = "test_istorageserver/" + self.id() yield SystemTestMixin.setUp(self) yield self.set_up_nodes(1) - self.storage_server = next( - iter(self.clients[0].storage_broker.get_known_servers()) - ).get_storage_server() + self.storage_server = self._get_native_server().get_storage_server() self.assertTrue(IStorageServer.providedBy(self.storage_server)) @inlineCallbacks @@ -275,6 +775,16 @@ class _FoolscapMixin(SystemTestMixin): AsyncTestCase.tearDown(self) yield SystemTestMixin.tearDown(self) + @inlineCallbacks + def disconnect(self): + """ + Disconnect and then reconnect with a new ``IStorageServer``. + """ + current = self.storage_server + yield self.bounce_client(0) + self.storage_server = self._get_native_server().get_storage_server() + assert self.storage_server is not current + class FoolscapSharedAPIsTests( _FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase @@ -286,3 +796,9 @@ class FoolscapImmutableAPIsTests( _FoolscapMixin, IStorageServerImmutableAPIsTestsMixin, AsyncTestCase ): """Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" + + +class FoolscapMutableAPIsTests( + _FoolscapMixin, IStorageServerMutableAPIsTestsMixin, AsyncTestCase +): + """Foolscap-specific tests for immutable ``IStorageServer`` APIs.""" diff --git a/src/allmydata/test/test_openmetrics.py b/src/allmydata/test/test_openmetrics.py new file mode 100644 index 000000000..66cbc7dec --- /dev/null +++ b/src/allmydata/test/test_openmetrics.py @@ -0,0 +1,273 @@ +""" +Tests for ``/statistics?t=openmetrics``. + +Ported to Python 3. +""" + +from __future__ import print_function +from __future__ import absolute_import +from __future__ import division +from __future__ import unicode_literals + +from future.utils import PY2 + +if PY2: + # fmt: off + from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 + # fmt: on + +from prometheus_client.openmetrics import parser + +from treq.testing import RequestTraversalAgent + +from twisted.web.http import OK +from twisted.web.client import readBody +from twisted.web.resource import Resource + +from testtools.twistedsupport import succeeded +from testtools.matchers import ( + AfterPreprocessing, + Equals, + MatchesAll, + MatchesStructure, + MatchesPredicate, +) +from testtools.content import text_content + +from allmydata.web.status import Statistics +from allmydata.test.common import SyncTestCase + + +class FakeStatsProvider(object): + """ + A stats provider that hands backed a canned collection of performance + statistics. + """ + + def get_stats(self): + # Parsed into a dict from a running tahoe's /statistics?t=json + stats = { + "stats": { + "storage_server.latencies.get.99_9_percentile": None, + "storage_server.latencies.close.10_0_percentile": 0.00021910667419433594, + "storage_server.latencies.read.01_0_percentile": 2.8848648071289062e-05, + "storage_server.latencies.writev.99_9_percentile": None, + "storage_server.latencies.read.99_9_percentile": None, + "storage_server.latencies.allocate.99_0_percentile": 0.000988006591796875, + "storage_server.latencies.writev.mean": 0.00045332245070571654, + "storage_server.latencies.close.99_9_percentile": None, + "cpu_monitor.15min_avg": 0.00017592000079223033, + "storage_server.disk_free_for_root": 103289454592, + "storage_server.latencies.get.99_0_percentile": 0.000347137451171875, + "storage_server.latencies.get.mean": 0.00021158285060171353, + "storage_server.latencies.read.90_0_percentile": 8.893013000488281e-05, + "storage_server.latencies.write.01_0_percentile": 3.600120544433594e-05, + "storage_server.latencies.write.99_9_percentile": 0.00017690658569335938, + "storage_server.latencies.close.90_0_percentile": 0.00033211708068847656, + "storage_server.disk_total": 103497859072, + "storage_server.latencies.close.95_0_percentile": 0.0003509521484375, + "storage_server.latencies.readv.samplesize": 1000, + "storage_server.disk_free_for_nonroot": 103289454592, + "storage_server.latencies.close.mean": 0.0002715024480059103, + "storage_server.latencies.writev.95_0_percentile": 0.0007410049438476562, + "storage_server.latencies.readv.90_0_percentile": 0.0003781318664550781, + "storage_server.latencies.readv.99_0_percentile": 0.0004050731658935547, + "storage_server.latencies.allocate.mean": 0.0007128627429454784, + "storage_server.latencies.close.samplesize": 326, + "storage_server.latencies.get.50_0_percentile": 0.0001819133758544922, + "storage_server.latencies.write.50_0_percentile": 4.482269287109375e-05, + "storage_server.latencies.readv.01_0_percentile": 0.0002970695495605469, + "storage_server.latencies.get.10_0_percentile": 0.00015687942504882812, + "storage_server.latencies.allocate.90_0_percentile": 0.0008189678192138672, + "storage_server.latencies.get.samplesize": 472, + "storage_server.total_bucket_count": 393, + "storage_server.latencies.read.mean": 5.936201880959903e-05, + "storage_server.latencies.allocate.01_0_percentile": 0.0004208087921142578, + "storage_server.latencies.allocate.99_9_percentile": None, + "storage_server.latencies.readv.mean": 0.00034061360359191893, + "storage_server.disk_used": 208404480, + "storage_server.latencies.allocate.50_0_percentile": 0.0007410049438476562, + "storage_server.latencies.read.99_0_percentile": 0.00011992454528808594, + "node.uptime": 3805759.8545179367, + "storage_server.latencies.writev.10_0_percentile": 0.00035190582275390625, + "storage_server.latencies.writev.90_0_percentile": 0.0006821155548095703, + "storage_server.latencies.close.01_0_percentile": 0.00021505355834960938, + "storage_server.latencies.close.50_0_percentile": 0.0002579689025878906, + "cpu_monitor.1min_avg": 0.0002130000000003444, + "storage_server.latencies.writev.50_0_percentile": 0.0004138946533203125, + "storage_server.latencies.read.95_0_percentile": 9.107589721679688e-05, + "storage_server.latencies.readv.95_0_percentile": 0.0003859996795654297, + "storage_server.latencies.write.10_0_percentile": 3.719329833984375e-05, + "storage_server.accepting_immutable_shares": 1, + "storage_server.latencies.writev.samplesize": 309, + "storage_server.latencies.get.95_0_percentile": 0.0003190040588378906, + "storage_server.latencies.readv.10_0_percentile": 0.00032210350036621094, + "storage_server.latencies.get.90_0_percentile": 0.0002999305725097656, + "storage_server.latencies.get.01_0_percentile": 0.0001239776611328125, + "cpu_monitor.total": 641.4941180000001, + "storage_server.latencies.write.samplesize": 1000, + "storage_server.latencies.write.95_0_percentile": 9.489059448242188e-05, + "storage_server.latencies.read.50_0_percentile": 6.890296936035156e-05, + "storage_server.latencies.writev.01_0_percentile": 0.00033211708068847656, + "storage_server.latencies.read.10_0_percentile": 3.0994415283203125e-05, + "storage_server.latencies.allocate.10_0_percentile": 0.0004949569702148438, + "storage_server.reserved_space": 0, + "storage_server.disk_avail": 103289454592, + "storage_server.latencies.write.99_0_percentile": 0.00011301040649414062, + "storage_server.latencies.write.90_0_percentile": 9.083747863769531e-05, + "cpu_monitor.5min_avg": 0.0002370666691157502, + "storage_server.latencies.write.mean": 5.8008909225463864e-05, + "storage_server.latencies.readv.50_0_percentile": 0.00033020973205566406, + "storage_server.latencies.close.99_0_percentile": 0.0004038810729980469, + "storage_server.allocated": 0, + "storage_server.latencies.writev.99_0_percentile": 0.0007710456848144531, + "storage_server.latencies.readv.99_9_percentile": 0.0004780292510986328, + "storage_server.latencies.read.samplesize": 170, + "storage_server.latencies.allocate.samplesize": 406, + "storage_server.latencies.allocate.95_0_percentile": 0.0008411407470703125, + }, + "counters": { + "storage_server.writev": 309, + "storage_server.bytes_added": 197836146, + "storage_server.close": 326, + "storage_server.readv": 14299, + "storage_server.allocate": 406, + "storage_server.read": 170, + "storage_server.write": 3775, + "storage_server.get": 472, + }, + } + return stats + + +class HackItResource(Resource, object): + """ + A bridge between ``RequestTraversalAgent`` and ``MultiFormatResource`` + (used by ``Statistics``). ``MultiFormatResource`` expects the request + object to have a ``fields`` attribute but Twisted's ``IRequest`` has no + such attribute. Create it here. + """ + + def getChildWithDefault(self, path, request): + request.fields = None + return Resource.getChildWithDefault(self, path, request) + + +class OpenMetrics(SyncTestCase): + """ + Tests for ``/statistics?t=openmetrics``. + """ + + def test_spec_compliance(self): + """ + Does our output adhere to the `OpenMetrics ` spec? + https://github.com/OpenObservability/OpenMetrics/ + https://prometheus.io/docs/instrumenting/exposition_formats/ + """ + root = HackItResource() + root.putChild(b"", Statistics(FakeStatsProvider())) + rta = RequestTraversalAgent(root) + d = rta.request(b"GET", b"http://localhost/?t=openmetrics") + self.assertThat(d, succeeded(matches_stats(self))) + + +def matches_stats(testcase): + """ + Create a matcher that matches a response that confirms to the OpenMetrics + specification. + + * The ``Content-Type`` is **application/openmetrics-text; version=1.0.0; charset=utf-8**. + * The status is **OK**. + * The body can be parsed by an OpenMetrics parser. + * The metric families in the body are grouped and sorted. + * At least one of the expected families appears in the body. + + :param testtools.TestCase testcase: The case to which to add detail about the matching process. + + :return: A matcher. + """ + return MatchesAll( + MatchesStructure( + code=Equals(OK), + # "The content type MUST be..." + headers=has_header( + "content-type", + "application/openmetrics-text; version=1.0.0; charset=utf-8", + ), + ), + AfterPreprocessing( + readBodyText, + succeeded( + MatchesAll( + MatchesPredicate(add_detail(testcase, "response body"), "%s dummy"), + parses_as_openmetrics(), + ) + ), + ), + ) + + +def add_detail(testcase, name): + """ + Create a matcher that always matches and as a side-effect adds the matched + value as detail to the testcase. + + :param testtools.TestCase testcase: The case to which to add the detail. + + :return: A matcher. + """ + + def predicate(value): + testcase.addDetail(name, text_content(value)) + return True + + return predicate + + +def readBodyText(response): + """ + Read the response body and decode it using UTF-8. + + :param twisted.web.iweb.IResponse response: The response from which to + read the body. + + :return: A ``Deferred`` that fires with the ``str`` body. + """ + d = readBody(response) + d.addCallback(lambda body: body.decode("utf-8")) + return d + + +def has_header(name, value): + """ + Create a matcher that matches a response object that includes the given + name / value pair. + + :param str name: The name of the item in the HTTP header to match. + :param str value: The value of the item in the HTTP header to match by equality. + + :return: A matcher. + """ + return AfterPreprocessing( + lambda headers: headers.getRawHeaders(name), + Equals([value]), + ) + + +def parses_as_openmetrics(): + """ + Create a matcher that matches a ``str`` string that can be parsed as an + OpenMetrics response and includes a certain well-known value expected by + the tests. + + :return: A matcher. + """ + # The parser throws if it does not like its input. + # Wrapped in a list() to drain the generator. + return AfterPreprocessing( + lambda body: list(parser.text_string_to_metric_families(body)), + AfterPreprocessing( + lambda families: families[-1].name, + Equals("tahoe_stats_storage_server_total_bucket_count"), + ), + ) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index bd0ab80f3..d18960a1e 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -8,7 +8,7 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals -from future.utils import native_str, PY2, bytes_to_native_str +from future.utils import native_str, PY2, bytes_to_native_str, bchr if PY2: from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 from six import ensure_str @@ -19,13 +19,15 @@ import platform import stat import struct import shutil -import gc +from uuid import uuid4 from twisted.trial import unittest from twisted.internet import defer from twisted.internet.task import Clock +from hypothesis import given, strategies + import itertools from allmydata import interfaces from allmydata.util import fileutil, hashutil, base32 @@ -33,7 +35,7 @@ from allmydata.storage.server import StorageServer, DEFAULT_RENEWAL_TIME from allmydata.storage.shares import get_share_file from allmydata.storage.mutable import MutableShareFile from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile -from allmydata.storage.common import DataTooLargeError, storage_index_to_dir, \ +from allmydata.storage.common import storage_index_to_dir, \ UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError, \ si_b2a, si_a2b from allmydata.storage.lease import LeaseInfo @@ -47,7 +49,9 @@ from allmydata.mutable.layout import MDMFSlotWriteProxy, MDMFSlotReadProxy, \ SIGNATURE_SIZE, \ VERIFICATION_KEY_SIZE, \ SHARE_HASH_CHAIN_SIZE -from allmydata.interfaces import BadWriteEnablerError +from allmydata.interfaces import ( + BadWriteEnablerError, DataTooLargeError, ConflictingWriteError, +) from allmydata.test.no_network import NoNetworkServer from allmydata.storage_client import ( _StorageServer, @@ -124,8 +128,7 @@ class Bucket(unittest.TestCase): def test_create(self): incoming, final = self.make_workdir("test_create") - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw.remote_write(0, b"a"*25) bw.remote_write(25, b"b"*25) bw.remote_write(50, b"c"*25) @@ -134,8 +137,7 @@ class Bucket(unittest.TestCase): def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw.remote_write(0, b"a"*25) bw.remote_write(25, b"b"*25) bw.remote_write(50, b"c"*7) # last block may be short @@ -147,6 +149,88 @@ class Bucket(unittest.TestCase): self.failUnlessEqual(br.remote_read(25, 25), b"b"*25) self.failUnlessEqual(br.remote_read(50, 7), b"c"*7) + def test_write_past_size_errors(self): + """Writing beyond the size of the bucket throws an exception.""" + for (i, (offset, length)) in enumerate([(0, 201), (10, 191), (202, 34)]): + incoming, final = self.make_workdir( + "test_write_past_size_errors-{}".format(i) + ) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) + with self.assertRaises(DataTooLargeError): + bw.remote_write(offset, b"a" * length) + + @given( + maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98), + maybe_overlapping_length=strategies.integers(min_value=1, max_value=100), + ) + def test_overlapping_writes_ok_if_matching( + self, maybe_overlapping_offset, maybe_overlapping_length + ): + """ + Writes that overlap with previous writes are OK when the content is the + same. + """ + length = 100 + expected_data = b"".join(bchr(i) for i in range(100)) + incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) + bw = BucketWriter( + self, incoming, final, length, self.make_lease(), + ) + # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. + bw.remote_write(10, expected_data[10:20]) + bw.remote_write(30, expected_data[30:40]) + bw.remote_write(50, expected_data[50:60]) + # Then, an overlapping write but with matching data: + bw.remote_write( + maybe_overlapping_offset, + expected_data[ + maybe_overlapping_offset:maybe_overlapping_offset + maybe_overlapping_length + ] + ) + # Now fill in the holes: + bw.remote_write(0, expected_data[0:10]) + bw.remote_write(20, expected_data[20:30]) + bw.remote_write(40, expected_data[40:50]) + bw.remote_write(60, expected_data[60:]) + bw.remote_close() + + br = BucketReader(self, bw.finalhome) + self.assertEqual(br.remote_read(0, length), expected_data) + + + @given( + maybe_overlapping_offset=strategies.integers(min_value=0, max_value=98), + maybe_overlapping_length=strategies.integers(min_value=1, max_value=100), + ) + def test_overlapping_writes_not_ok_if_different( + self, maybe_overlapping_offset, maybe_overlapping_length + ): + """ + Writes that overlap with previous writes fail with an exception if the + contents don't match. + """ + length = 100 + incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) + bw = BucketWriter( + self, incoming, final, length, self.make_lease(), + ) + # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. + bw.remote_write(10, b"1" * 10) + bw.remote_write(30, b"1" * 10) + bw.remote_write(50, b"1" * 10) + # Then, write something that might overlap with some of them, but + # conflicts. Then fill in holes left by first three writes. Conflict is + # inevitable. + with self.assertRaises(ConflictingWriteError): + bw.remote_write( + maybe_overlapping_offset, + b'X' * min(maybe_overlapping_length, length - maybe_overlapping_offset), + ) + bw.remote_write(0, b"1" * 10) + bw.remote_write(20, b"1" * 10) + bw.remote_write(40, b"1" * 10) + bw.remote_write(60, b"1" * 40) + def test_read_past_end_of_share_data(self): # test vector for immutable files (hard-coded contents of an immutable share # file): @@ -228,8 +312,7 @@ class BucketProxy(unittest.TestCase): final = os.path.join(basedir, "bucket") fileutil.make_dirs(basedir) fileutil.make_dirs(os.path.join(basedir, "tmp")) - bw = BucketWriter(self, incoming, final, size, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, size, self.make_lease()) rb = RemoteBucket(bw) return bw, rb, final @@ -579,26 +662,24 @@ class Server(unittest.TestCase): # the size we request. OVERHEAD = 3*4 LEASE_SIZE = 4+32+32+4 - canary = FakeCanary(True) + canary = FakeCanary() already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary) self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally # allocated, allowing only 2000 more to be claimed - self.failUnlessEqual(len(ss._active_writers), 3) + self.failUnlessEqual(len(ss._bucket_writers), 3) # allocating 1001-byte shares only leaves room for one - already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary) + canary2 = FakeCanary() + already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2) self.failUnlessEqual(len(writers2), 1) - self.failUnlessEqual(len(ss._active_writers), 4) + self.failUnlessEqual(len(ss._bucket_writers), 4) # we abandon the first set, so their provisional allocation should be # returned + canary.disconnected() - del already - del writers - gc.collect() - - self.failUnlessEqual(len(ss._active_writers), 1) + self.failUnlessEqual(len(ss._bucket_writers), 1) # now we have a provisional allocation of 1001 bytes # and we close the second set, so their provisional allocation should @@ -607,25 +688,21 @@ class Server(unittest.TestCase): for bw in writers2.values(): bw.remote_write(0, b"a"*25) bw.remote_close() - del already2 - del writers2 - del bw - self.failUnlessEqual(len(ss._active_writers), 0) + self.failUnlessEqual(len(ss._bucket_writers), 0) # this also changes the amount reported as available by call_get_disk_stats allocated = 1001 + OVERHEAD + LEASE_SIZE # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and # 5000-1085=3915 free, therefore we can fit 39 100byte shares - already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary) + canary3 = FakeCanary() + already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3) self.failUnlessEqual(len(writers3), 39) - self.failUnlessEqual(len(ss._active_writers), 39) + self.failUnlessEqual(len(ss._bucket_writers), 39) - del already3 - del writers3 - gc.collect() + canary3.disconnected() - self.failUnlessEqual(len(ss._active_writers), 0) + self.failUnlessEqual(len(ss._bucket_writers), 0) ss.disownServiceParent() del ss @@ -1074,23 +1151,6 @@ class MutableServer(unittest.TestCase): })) self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) - # as should this one - answer = write(b"si1", secrets, - {0: ([(10, 5, b"lt", b"11111"), - ], - [(0, b"x"*100)], - None), - }, - [(10,5)], - ) - self.failUnlessEqual(answer, (False, - {0: [b"11111"], - 1: [b""], - 2: [b""]}, - )) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) - - def test_operators(self): # test operators, the data we're comparing is '11111' in all cases. # test both fail+pass, reset data after each one. @@ -1110,63 +1170,6 @@ class MutableServer(unittest.TestCase): reset() - # lt - answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11110"), - ], - [(0, b"x"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (False, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) - self.failUnlessEqual(read(b"si1", [], [(0,100)]), {0: [data]}) - reset() - - answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11111"), - ], - [(0, b"x"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (False, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) - reset() - - answer = write(b"si1", secrets, {0: ([(10, 5, b"lt", b"11112"), - ], - [(0, b"y"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (True, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) - reset() - - # le - answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11110"), - ], - [(0, b"x"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (False, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) - reset() - - answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11111"), - ], - [(0, b"y"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (True, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) - reset() - - answer = write(b"si1", secrets, {0: ([(10, 5, b"le", b"11112"), - ], - [(0, b"y"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (True, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) - reset() - # eq answer = write(b"si1", secrets, {0: ([(10, 5, b"eq", b"11112"), ], @@ -1186,81 +1189,6 @@ class MutableServer(unittest.TestCase): self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) reset() - # ne - answer = write(b"si1", secrets, {0: ([(10, 5, b"ne", b"11111"), - ], - [(0, b"x"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (False, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) - reset() - - answer = write(b"si1", secrets, {0: ([(10, 5, b"ne", b"11112"), - ], - [(0, b"y"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (True, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) - reset() - - # ge - answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11110"), - ], - [(0, b"y"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (True, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) - reset() - - answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11111"), - ], - [(0, b"y"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (True, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) - reset() - - answer = write(b"si1", secrets, {0: ([(10, 5, b"ge", b"11112"), - ], - [(0, b"y"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (False, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) - reset() - - # gt - answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11110"), - ], - [(0, b"y"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (True, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [b"y"*100]}) - reset() - - answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11111"), - ], - [(0, b"x"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (False, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) - reset() - - answer = write(b"si1", secrets, {0: ([(10, 5, b"gt", b"11112"), - ], - [(0, b"x"*100)], - None, - )}, [(10,5)]) - self.failUnlessEqual(answer, (False, {0: [b"11111"]})) - self.failUnlessEqual(read(b"si1", [0], [(0,100)]), {0: [data]}) - reset() - # finally, test some operators against empty shares answer = write(b"si1", secrets, {1: ([(10, 5, b"eq", b"11112"), ], diff --git a/src/allmydata/web/status.py b/src/allmydata/web/status.py index 158d897f9..65647f491 100644 --- a/src/allmydata/web/status.py +++ b/src/allmydata/web/status.py @@ -14,6 +14,7 @@ from past.builtins import long import itertools import hashlib +import re from twisted.internet import defer from twisted.python.filepath import FilePath from twisted.web.resource import Resource @@ -1551,6 +1552,37 @@ class Statistics(MultiFormatResource): req.setHeader("content-type", "text/plain") return json.dumps(stats, indent=1) + "\n" + @render_exception + def render_OPENMETRICS(self, req): + """ + Render our stats in `OpenMetrics ` format. + For example Prometheus and Victoriametrics can parse this. + Point the scraper to ``/statistics?t=openmetrics`` (instead of the + default ``/metrics``). + """ + req.setHeader("content-type", "application/openmetrics-text; version=1.0.0; charset=utf-8") + stats = self._provider.get_stats() + ret = [] + + def mangle_name(name): + return re.sub( + u"_(\d\d)_(\d)_percentile", + u'{quantile="0.\g<1>\g<2>"}', + name.replace(u".", u"_") + ) + + def mangle_value(val): + return str(val) if val is not None else u"NaN" + + for (k, v) in sorted(stats['counters'].items()): + ret.append(u"tahoe_counters_%s %s" % (mangle_name(k), mangle_value(v))) + for (k, v) in sorted(stats['stats'].items()): + ret.append(u"tahoe_stats_%s %s" % (mangle_name(k), mangle_value(v))) + + ret.append(u"# EOF\n") + + return u"\n".join(ret) + class StatisticsElement(Element): loader = XMLFile(FilePath(__file__).sibling("statistics.xhtml"))