Merge remote-tracking branch 'origin/master' into 3758.refactor.web-tests-grid-logs-root

This commit is contained in:
Jean-Paul Calderone 2021-12-02 08:36:35 -05:00
commit 82099f04c4
44 changed files with 1528 additions and 404 deletions

2
.gitignore vendored
View File

@ -29,7 +29,7 @@ zope.interface-*.egg
.pc
/src/allmydata/test/plugins/dropin.cache
/_trial_temp*
**/_trial_temp*
/tmp*
/*.patch
/dist/

View File

@ -363,11 +363,11 @@ one branch contains all of the share data;
another branch contains all of the lease data;
etc.
Authorization is required for all endpoints.
An ``Authorization`` header in requests is required for all endpoints.
The standard HTTP authorization protocol is used.
The authentication *type* used is ``Tahoe-LAFS``.
The swissnum from the NURL used to locate the storage service is used as the *credentials*.
If credentials are not presented or the swissnum is not associated with a storage service then no storage processing is performed and the request receives an ``UNAUTHORIZED`` response.
If credentials are not presented or the swissnum is not associated with a storage service then no storage processing is performed and the request receives an ``401 UNAUTHORIZED`` response.
General
~~~~~~~
@ -396,17 +396,19 @@ For example::
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Either renew or create a new lease on the bucket addressed by ``storage_index``.
The details of the lease are encoded in the request body.
The renew secret and cancellation secret should be included as ``X-Tahoe-Authorization`` headers.
For example::
{"renew-secret": "abcd", "cancel-secret": "efgh"}
X-Tahoe-Authorization: lease-renew-secret <base64-lease-renew-secret>
X-Tahoe-Authorization: lease-cancel-secret <base64-lease-cancel-secret>
If the ``renew-secret`` value matches an existing lease
If the ``lease-renew-secret`` value matches an existing lease
then the expiration time of that lease will be changed to 31 days after the time of this operation.
If it does not match an existing lease
then a new lease will be created with this ``renew-secret`` which expires 31 days after the time of this operation.
then a new lease will be created with this ``lease-renew-secret`` which expires 31 days after the time of this operation.
``renew-secret`` and ``cancel-secret`` values must be 32 bytes long.
``lease-renew-secret`` and ``lease-cancel-secret`` values must be 32 bytes long.
The server treats them as opaque values.
:ref:`Share Leases` gives details about how the Tahoe-LAFS storage client constructs these values.
@ -423,8 +425,10 @@ In these cases the server takes no action and returns ``NOT FOUND``.
Discussion
``````````
We considered an alternative where ``renew-secret`` and ``cancel-secret`` are placed in query arguments on the request path.
We chose to put these values into the request body to make the URL simpler.
We considered an alternative where ``lease-renew-secret`` and ``lease-cancel-secret`` are placed in query arguments on the request path.
This increases chances of leaking secrets in logs.
Putting the secrets in the body reduces the chances of leaking secrets,
but eventually we chose headers as the least likely information to be logged.
Several behaviors here are blindly copied from the Foolscap-based storage server protocol.
@ -450,14 +454,22 @@ A lease is also created for the shares.
Details of the buckets to create are encoded in the request body.
For example::
{"renew-secret": "efgh", "cancel-secret": "ijkl",
"share-numbers": [1, 7, ...], "allocated-size": 12345}
{"share-numbers": [1, 7, ...], "allocated-size": 12345}
The request must include ``X-Tahoe-Authorization`` HTTP headers that set the various secrets—upload, lease renewal, lease cancellation—that will be later used to authorize various operations.
For example::
X-Tahoe-Authorization: lease-renew-secret <base64-lease-renew-secret>
X-Tahoe-Authorization: lease-cancel-secret <base64-lease-cancel-secret>
X-Tahoe-Authorization: upload-secret <base64-upload-secret>
The response body includes encoded information about the created buckets.
For example::
{"already-have": [1, ...], "allocated": [7, ...]}
The upload secret is an opaque _byte_ string.
Discussion
``````````
@ -482,6 +494,20 @@ The response includes ``already-have`` and ``allocated`` for two reasons:
This might be because a server has become unavailable and a remaining server needs to store more shares for the upload.
It could also just be that the client's preferred servers have changed.
Regarding upload secrets,
the goal is for uploading and aborting (see next sections) to be authenticated by more than just the storage index.
In the future, we may want to generate them in a way that allows resuming/canceling when the client has issues.
In the short term, they can just be a random byte string.
The primary security constraint is that each upload to each server has its own unique upload key,
tied to uploading that particular storage index to this particular server.
Rejected designs for upload secrets:
* Upload secret per share number.
In order to make the secret unguessable by attackers, which includes other servers,
it must contain randomness.
Randomness means there is no need to have a secret per share, since adding share-specific content to randomness doesn't actually make the secret any better.
``PATCH /v1/immutable/:storage_index/:share_number``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@ -498,6 +524,12 @@ If any one of these requests fails then at most 128KiB of upload work needs to b
The server must recognize when all of the data has been received and mark the share as complete
(which it can do because it was informed of the size when the storage index was initialized).
The request must include a ``X-Tahoe-Authorization`` header that includes the upload secret::
X-Tahoe-Authorization: upload-secret <base64-upload-secret>
Responses:
* When a chunk that does not complete the share is successfully uploaded the response is ``OK``.
The response body indicates the range of share data that has yet to be uploaded.
That is::
@ -522,6 +554,10 @@ The server must recognize when all of the data has been received and mark the sh
This cancels an *in-progress* upload.
The request must include a ``X-Tahoe-Authorization`` header that includes the upload secret::
X-Tahoe-Authorization: upload-secret <base64-upload-secret>
The response code:
* When the upload is still in progress and therefore the abort has succeeded,
@ -619,16 +655,16 @@ The first write operation on a mutable storage index creates it
(that is,
there is no separate "create this storage index" operation as there is for the immutable storage index type).
The request body includes the secrets necessary to rewrite to the shares
along with test, read, and write vectors for the operation.
The request must include ``X-Tahoe-Authorization`` headers with write enabler and lease secrets::
X-Tahoe-Authorization: write-enabler <base64-write-enabler-secret>
X-Tahoe-Authorization: lease-cancel-secret <base64-lease-cancel-secret>
X-Tahoe-Authorization: lease-renew-secret <base64-lease-renew-secret>
The request body includes test, read, and write vectors for the operation.
For example::
{
"secrets": {
"write-enabler": "abcd",
"lease-renew": "efgh",
"lease-cancel": "ijkl"
},
"test-write-vectors": {
0: {
"test": [{
@ -694,8 +730,12 @@ Immutable Data
1. Create a bucket for storage index ``AAAAAAAAAAAAAAAA`` to hold two immutable shares, discovering that share ``1`` was already uploaded::
POST /v1/immutable/AAAAAAAAAAAAAAAA
{"renew-secret": "efgh", "cancel-secret": "ijkl",
"share-numbers": [1, 7], "allocated-size": 48}
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: lease-renew-secret efgh
X-Tahoe-Authorization: lease-cancel-secret jjkl
X-Tahoe-Authorization: upload-secret xyzf
{"share-numbers": [1, 7], "allocated-size": 48}
200 OK
{"already-have": [1], "allocated": [7]}
@ -703,26 +743,34 @@ Immutable Data
#. Upload the content for immutable share ``7``::
PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
Authorization: Tahoe-LAFS nurl-swissnum
Content-Range: bytes 0-15/48
X-Tahoe-Authorization: upload-secret xyzf
<first 16 bytes of share data>
200 OK
PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
Authorization: Tahoe-LAFS nurl-swissnum
Content-Range: bytes 16-31/48
X-Tahoe-Authorization: upload-secret xyzf
<second 16 bytes of share data>
200 OK
PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7
Authorization: Tahoe-LAFS nurl-swissnum
Content-Range: bytes 32-47/48
X-Tahoe-Authorization: upload-secret xyzf
<final 16 bytes of share data>
201 CREATED
#. Download the content of the previously uploaded immutable share ``7``::
GET /v1/immutable/AAAAAAAAAAAAAAAA?share=7&offset=0&size=48
GET /v1/immutable/AAAAAAAAAAAAAAAA?share=7
Authorization: Tahoe-LAFS nurl-swissnum
Range: bytes=0-47
200 OK
<complete 48 bytes of previously uploaded data>
@ -730,7 +778,9 @@ Immutable Data
#. Renew the lease on all immutable shares in bucket ``AAAAAAAAAAAAAAAA``::
PUT /v1/lease/AAAAAAAAAAAAAAAA
{"renew-secret": "efgh", "cancel-secret": "ijkl"}
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: lease-cancel-secret jjkl
X-Tahoe-Authorization: lease-renew-secret efgh
204 NO CONTENT
@ -743,12 +793,12 @@ if there is no existing share,
otherwise it will read a byte which won't match `b""`::
POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: write-enabler abcd
X-Tahoe-Authorization: lease-cancel-secret efgh
X-Tahoe-Authorization: lease-renew-secret ijkl
{
"secrets": {
"write-enabler": "abcd",
"lease-renew": "efgh",
"lease-cancel": "ijkl"
},
"test-write-vectors": {
3: {
"test": [{
@ -775,12 +825,12 @@ otherwise it will read a byte which won't match `b""`::
#. Safely rewrite the contents of a known version of mutable share number ``3`` (or fail)::
POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: write-enabler abcd
X-Tahoe-Authorization: lease-cancel-secret efgh
X-Tahoe-Authorization: lease-renew-secret ijkl
{
"secrets": {
"write-enabler": "abcd",
"lease-renew": "efgh",
"lease-cancel": "ijkl"
},
"test-write-vectors": {
3: {
"test": [{
@ -807,12 +857,16 @@ otherwise it will read a byte which won't match `b""`::
#. Download the contents of share number ``3``::
GET /v1/mutable/BBBBBBBBBBBBBBBB?share=3&offset=0&size=10
Authorization: Tahoe-LAFS nurl-swissnum
<complete 16 bytes of previously uploaded data>
#. Renew the lease on previously uploaded mutable share in slot ``BBBBBBBBBBBBBBBB``::
PUT /v1/lease/BBBBBBBBBBBBBBBB
{"renew-secret": "efgh", "cancel-secret": "ijkl"}
Authorization: Tahoe-LAFS nurl-swissnum
X-Tahoe-Authorization: lease-cancel-secret efgh
X-Tahoe-Authorization: lease-renew-secret ijkl
204 NO CONTENT

View File

@ -35,6 +35,9 @@ from allmydata.test.common import (
if sys.platform.startswith('win'):
pytest.skip('Skipping Tor tests on Windows', allow_module_level=True)
if PY2:
pytest.skip('Skipping Tor tests on Python 2 because dependencies are hard to come by', allow_module_level=True)
@pytest_twisted.inlineCallbacks
def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl):
yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)

0
newsfragments/3527.minor Normal file
View File

View File

@ -0,0 +1 @@
If uploading an immutable hasn't had a write for 30 minutes, the storage server will abort the upload.

0
newsfragments/3820.minor Normal file
View File

0
newsfragments/3833.minor Normal file
View File

0
newsfragments/3834.minor Normal file
View File

0
newsfragments/3836.minor Normal file
View File

1
newsfragments/3837.other Normal file
View File

@ -0,0 +1 @@
Tahoe-LAFS no longer runs its Tor integration test suite on Python 2 due to the increased complexity of obtaining compatible versions of necessary dependencies.

0
newsfragments/3838.minor Normal file
View File

0
newsfragments/3842.minor Normal file
View File

0
newsfragments/3843.minor Normal file
View File

0
newsfragments/3847.minor Normal file
View File

20
nix/cbor2.nix Normal file
View File

@ -0,0 +1,20 @@
{ lib, buildPythonPackage, fetchPypi, setuptools_scm }:
buildPythonPackage rec {
pname = "cbor2";
version = "5.2.0";
src = fetchPypi {
sha256 = "1gwlgjl70vlv35cgkcw3cg7b5qsmws36hs4mmh0l9msgagjs4fm3";
inherit pname version;
};
doCheck = false;
propagatedBuildInputs = [ setuptools_scm ];
meta = with lib; {
homepage = https://github.com/agronholm/cbor2;
description = "CBOR encoder/decoder";
license = licenses.mit;
};
}

View File

@ -21,6 +21,9 @@ self: super: {
# collections-extended is not part of nixpkgs at this time.
collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { };
# cbor2 is not part of nixpkgs at this time.
cbor2 = python-super.pythonPackages.callPackage ./cbor2.nix { };
};
};

View File

@ -4,7 +4,7 @@
, setuptools, setuptoolsTrial, pyasn1, zope_interface
, service-identity, pyyaml, magic-wormhole, treq, appdirs
, beautifulsoup4, eliot, autobahn, cryptography, netifaces
, html5lib, pyutil, distro, configparser
, html5lib, pyutil, distro, configparser, klein, cbor2
}:
python.pkgs.buildPythonPackage rec {
# Most of the time this is not exactly the release version (eg 1.16.0).
@ -95,9 +95,10 @@ EOF
propagatedBuildInputs = with python.pkgs; [
twisted foolscap zfec appdirs
setuptoolsTrial pyasn1 zope_interface
service-identity pyyaml magic-wormhole treq
service-identity pyyaml magic-wormhole
eliot autobahn cryptography netifaces setuptools
future pyutil distro configparser collections-extended
klein cbor2 treq
];
checkInputs = with python.pkgs; [

View File

@ -140,6 +140,11 @@ install_requires = [
# For the RangeMap datastructure.
"collections-extended",
# HTTP server and client
"klein",
"treq",
"cbor2"
]
setup_requires = [
@ -397,7 +402,6 @@ setup(name="tahoe-lafs", # also set in __init__.py
# Python 2.7.
"decorator < 5",
"hypothesis >= 3.6.1",
"treq",
"towncrier",
"testtools",
"fixtures",

View File

@ -141,7 +141,9 @@ def write_introducer(basedir, petname, furl):
"""
if isinstance(furl, bytes):
furl = furl.decode("utf-8")
basedir.child(b"private").child(b"introducers.yaml").setContent(
private = basedir.child(b"private")
private.makedirs(ignoreExistingDirectory=True)
private.child(b"introducers.yaml").setContent(
safe_dump({
"introducers": {
petname: {

View File

@ -15,15 +15,22 @@ try:
except ImportError:
pass
# do not import any allmydata modules at this level. Do that from inside
# individual functions instead.
import struct, time, os, sys
from twisted.python import usage, failure
from twisted.internet import defer
from foolscap.logging import cli as foolscap_cli
from allmydata.scripts.common import BaseOptions
from allmydata.scripts.common import BaseOptions
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.mutable.layout import unpack_share
from allmydata.mutable.layout import MDMFSlotReadProxy
from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import base32
from allmydata.util.encodingutil import quote_output
class DumpOptions(BaseOptions):
def getSynopsis(self):
@ -56,13 +63,11 @@ def dump_share(options):
# check the version, to see if we have a mutable or immutable share
print("share filename: %s" % quote_output(options['filename']), file=out)
f = open(options['filename'], "rb")
prefix = f.read(32)
f.close()
if prefix == MutableShareFile.MAGIC:
return dump_mutable_share(options)
# otherwise assume it's immutable
return dump_immutable_share(options)
with open(options['filename'], "rb") as f:
if MutableShareFile.is_valid_header(f.read(32)):
return dump_mutable_share(options)
# otherwise assume it's immutable
return dump_immutable_share(options)
def dump_immutable_share(options):
from allmydata.storage.immutable import ShareFile
@ -712,125 +717,122 @@ def call(c, *args, **kwargs):
return results[0]
def describe_share(abs_sharefile, si_s, shnum_s, now, out):
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.mutable.layout import unpack_share
from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import base32
from allmydata.util.encodingutil import quote_output
import struct
f = open(abs_sharefile, "rb")
prefix = f.read(32)
if prefix == MutableShareFile.MAGIC:
# mutable share
m = MutableShareFile(abs_sharefile)
WE, nodeid = m._read_write_enabler_and_nodeid(f)
data_length = m._read_data_length(f)
expiration_time = min( [lease.get_expiration_time()
for (i,lease) in m._enumerate_leases(f)] )
expiration = max(0, expiration_time - now)
share_type = "unknown"
f.seek(m.DATA_OFFSET)
version = f.read(1)
if version == b"\x00":
# this slot contains an SMDF share
share_type = "SDMF"
elif version == b"\x01":
share_type = "MDMF"
if share_type == "SDMF":
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, 2000))
try:
pieces = unpack_share(data)
except NeedMoreDataError as e:
# retry once with the larger size
size = e.needed_bytes
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = pieces
print("SDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
elif share_type == "MDMF":
from allmydata.mutable.layout import MDMFSlotReadProxy
fake_shnum = 0
# TODO: factor this out with dump_MDMF_share()
class ShareDumper(MDMFSlotReadProxy):
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
f.seek(m.DATA_OFFSET+where)
data.append(f.read(length))
return defer.succeed({fake_shnum: data})
p = ShareDumper(None, "fake-si", fake_shnum)
def extract(func):
stash = []
# these methods return Deferreds, but we happen to know that
# they run synchronously when not actually talking to a
# remote server
d = func()
d.addCallback(stash.append)
return stash[0]
verinfo = extract(p.get_verinfo)
(seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
offsets) = verinfo
print("MDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
with open(abs_sharefile, "rb") as f:
prefix = f.read(32)
if MutableShareFile.is_valid_header(prefix):
_describe_mutable_share(abs_sharefile, f, now, si_s, out)
elif ShareFile.is_valid_header(prefix):
_describe_immutable_share(abs_sharefile, now, si_s, out)
else:
print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out)
print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out)
elif struct.unpack(">L", prefix[:4]) == (1,):
# immutable
def _describe_mutable_share(abs_sharefile, f, now, si_s, out):
# mutable share
m = MutableShareFile(abs_sharefile)
WE, nodeid = m._read_write_enabler_and_nodeid(f)
data_length = m._read_data_length(f)
expiration_time = min( [lease.get_expiration_time()
for (i,lease) in m._enumerate_leases(f)] )
expiration = max(0, expiration_time - now)
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
ReadBucketProxy.__init__(self, None, None, "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
return defer.succeed(sf.read_share_data(offset, size))
share_type = "unknown"
f.seek(m.DATA_OFFSET)
version = f.read(1)
if version == b"\x00":
# this slot contains an SMDF share
share_type = "SDMF"
elif version == b"\x01":
share_type = "MDMF"
# use a ReadBucketProxy to parse the bucket and find the uri extension
sf = ShareFile(abs_sharefile)
bp = ImmediateReadBucketProxy(sf)
if share_type == "SDMF":
f.seek(m.DATA_OFFSET)
expiration_time = min( [lease.get_expiration_time()
for lease in sf.get_leases()] )
expiration = max(0, expiration_time - now)
# Read at least the mutable header length, if possible. If there's
# less data than that in the share, don't try to read more (we won't
# be able to unpack the header in this case but we surely don't want
# to try to unpack bytes *following* the data section as if they were
# header data). Rather than 2000 we could use HEADER_LENGTH from
# allmydata/mutable/layout.py, probably.
data = f.read(min(data_length, 2000))
UEB_data = call(bp.get_uri_extension)
unpacked = uri.unpack_extension_readable(UEB_data)
try:
pieces = unpack_share(data)
except NeedMoreDataError as e:
# retry once with the larger size
size = e.needed_bytes
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = pieces
k = unpacked["needed_shares"]
N = unpacked["total_shares"]
filesize = unpacked["size"]
ueb_hash = unpacked["UEB_hash"]
print("SDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
elif share_type == "MDMF":
fake_shnum = 0
# TODO: factor this out with dump_MDMF_share()
class ShareDumper(MDMFSlotReadProxy):
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
f.seek(m.DATA_OFFSET+where)
data.append(f.read(length))
return defer.succeed({fake_shnum: data})
print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize,
str(ueb_hash, "utf-8"), expiration,
quote_output(abs_sharefile)), file=out)
p = ShareDumper(None, "fake-si", fake_shnum)
def extract(func):
stash = []
# these methods return Deferreds, but we happen to know that
# they run synchronously when not actually talking to a
# remote server
d = func()
d.addCallback(stash.append)
return stash[0]
verinfo = extract(p.get_verinfo)
(seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
offsets) = verinfo
print("MDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
else:
print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out)
print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out)
def _describe_immutable_share(abs_sharefile, now, si_s, out):
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
ReadBucketProxy.__init__(self, None, None, "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
return defer.succeed(sf.read_share_data(offset, size))
# use a ReadBucketProxy to parse the bucket and find the uri extension
sf = ShareFile(abs_sharefile)
bp = ImmediateReadBucketProxy(sf)
expiration_time = min(lease.get_expiration_time()
for lease in sf.get_leases())
expiration = max(0, expiration_time - now)
UEB_data = call(bp.get_uri_extension)
unpacked = uri.unpack_extension_readable(UEB_data)
k = unpacked["needed_shares"]
N = unpacked["total_shares"]
filesize = unpacked["size"]
ueb_hash = unpacked["UEB_hash"]
print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize,
str(ueb_hash, "utf-8"), expiration,
quote_output(abs_sharefile)), file=out)
f.close()
def catalog_shares(options):
from allmydata.util.encodingutil import listdir_unicode, quote_output
@ -933,34 +935,35 @@ def corrupt_share(options):
f.write(d)
f.close()
f = open(fn, "rb")
prefix = f.read(32)
f.close()
if prefix == MutableShareFile.MAGIC:
# mutable
m = MutableShareFile(fn)
f = open(fn, "rb")
f.seek(m.DATA_OFFSET)
data = f.read(2000)
# make sure this slot contains an SMDF share
assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported"
f.close()
with open(fn, "rb") as f:
prefix = f.read(32)
(version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
ig_datalen, offsets) = unpack_header(data)
if MutableShareFile.is_valid_header(prefix):
# mutable
m = MutableShareFile(fn)
with open(fn, "rb") as f:
f.seek(m.DATA_OFFSET)
# Read enough data to get a mutable header to unpack.
data = f.read(2000)
# make sure this slot contains an SMDF share
assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported"
f.close()
assert version == 0, "we only handle v0 SDMF files"
start = m.DATA_OFFSET + offsets["share_data"]
end = m.DATA_OFFSET + offsets["enc_privkey"]
flip_bit(start, end)
else:
# otherwise assume it's immutable
f = ShareFile(fn)
bp = ReadBucketProxy(None, None, '')
offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
start = f._data_offset + offsets["data"]
end = f._data_offset + offsets["plaintext_hash_tree"]
flip_bit(start, end)
(version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
ig_datalen, offsets) = unpack_header(data)
assert version == 0, "we only handle v0 SDMF files"
start = m.DATA_OFFSET + offsets["share_data"]
end = m.DATA_OFFSET + offsets["enc_privkey"]
flip_bit(start, end)
else:
# otherwise assume it's immutable
f = ShareFile(fn)
bp = ReadBucketProxy(None, None, '')
offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
start = f._data_offset + offsets["data"]
end = f._data_offset + offsets["plaintext_hash_tree"]
flip_bit(start, end)

View File

@ -0,0 +1,79 @@
"""
HTTP client that talks to the HTTP storage server.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
else:
# typing module not available in Python 2, and we only do type checking in
# Python 3 anyway.
from typing import Union
from treq.testing import StubTreq
import base64
# TODO Make sure to import Python version?
from cbor2 import loads
from twisted.web.http_headers import Headers
from twisted.internet.defer import inlineCallbacks, returnValue, fail
from hyperlink import DecodedURL
import treq
class ClientException(Exception):
"""An unexpected error."""
def _decode_cbor(response):
"""Given HTTP response, return decoded CBOR body."""
if response.code > 199 and response.code < 300:
return treq.content(response).addCallback(loads)
return fail(ClientException(response.code, response.phrase))
def swissnum_auth_header(swissnum): # type: (bytes) -> bytes
"""Return value for ``Authentication`` header."""
return b"Tahoe-LAFS " + base64.b64encode(swissnum).strip()
class StorageClient(object):
"""
HTTP client that talks to the HTTP storage server.
"""
def __init__(
self, url, swissnum, treq=treq
): # type: (DecodedURL, bytes, Union[treq,StubTreq]) -> None
self._base_url = url
self._swissnum = swissnum
self._treq = treq
def _get_headers(self): # type: () -> Headers
"""Return the basic headers to be used by default."""
headers = Headers()
headers.addRawHeader(
"Authorization",
swissnum_auth_header(self._swissnum),
)
return headers
@inlineCallbacks
def get_version(self):
"""
Return the version metadata for the server.
"""
url = self._base_url.click("/v1/version")
response = yield self._treq.get(url, headers=self._get_headers())
decoded_response = yield _decode_cbor(response)
returnValue(decoded_response)

View File

@ -0,0 +1,94 @@
"""
HTTP server for storage.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
from functools import wraps
from klein import Klein
from twisted.web import http
# TODO Make sure to use pure Python versions?
from cbor2 import dumps
from .server import StorageServer
from .http_client import swissnum_auth_header
def _authorization_decorator(f):
"""
Check the ``Authorization`` header, and (TODO: in later revision of code)
extract ``X-Tahoe-Authorization`` headers and pass them in.
"""
@wraps(f)
def route(self, request, *args, **kwargs):
if request.requestHeaders.getRawHeaders("Authorization", [None])[0] != str(
swissnum_auth_header(self._swissnum), "ascii"
):
request.setResponseCode(http.UNAUTHORIZED)
return b""
# authorization = request.requestHeaders.getRawHeaders("X-Tahoe-Authorization", [])
# For now, just a placeholder:
authorization = None
return f(self, request, authorization, *args, **kwargs)
return route
def _authorized_route(app, *route_args, **route_kwargs):
"""
Like Klein's @route, but with additional support for checking the
``Authorization`` header as well as ``X-Tahoe-Authorization`` headers. The
latter will (TODO: in later revision of code) get passed in as second
argument to wrapped functions.
"""
def decorator(f):
@app.route(*route_args, **route_kwargs)
@_authorization_decorator
def handle_route(*args, **kwargs):
return f(*args, **kwargs)
return handle_route
return decorator
class HTTPServer(object):
"""
A HTTP interface to the storage server.
"""
_app = Klein()
def __init__(
self, storage_server, swissnum
): # type: (StorageServer, bytes) -> None
self._storage_server = storage_server
self._swissnum = swissnum
def get_resource(self):
"""Return twisted.web ``Resource`` for this object."""
return self._app.resource()
def _cbor(self, request, data):
"""Return CBOR-encoded data."""
request.setHeader("Content-Type", "application/cbor")
# TODO if data is big, maybe want to use a temporary file eventually...
return dumps(data)
@_authorized_route(_app, "/v1/version", methods=["GET"])
def version(self, request, authorization):
return self._cbor(request, self._storage_server.remote_get_version())

View File

@ -24,7 +24,6 @@ from allmydata.interfaces import (
)
from allmydata.util import base32, fileutil, log
from allmydata.util.assertutil import precondition
from allmydata.util.hashutil import timing_safe_compare
from allmydata.storage.lease import LeaseInfo
from allmydata.storage.common import UnknownImmutableContainerVersionError
@ -57,6 +56,21 @@ class ShareFile(object):
LEASE_SIZE = struct.calcsize(">L32s32sL")
sharetype = "immutable"
@classmethod
def is_valid_header(cls, header):
# type: (bytes) -> bool
"""
Determine if the given bytes constitute a valid header for this type of
container.
:param header: Some bytes from the beginning of a container.
:return: ``True`` if the bytes could belong to this container,
``False`` otherwise.
"""
(version,) = struct.unpack(">L", header[:4])
return version == 1
def __init__(self, filename, max_size=None, create=False):
""" If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
precondition((max_size is not None) or (not create), max_size, create)
@ -165,7 +179,7 @@ class ShareFile(object):
secret.
"""
for i,lease in enumerate(self.get_leases()):
if timing_safe_compare(lease.renew_secret, renew_secret):
if lease.is_renew_secret(renew_secret):
# yup. See if we need to update the owner time.
if allow_backdate or new_expire_time > lease.get_expiration_time():
# yes
@ -194,7 +208,7 @@ class ShareFile(object):
leases = list(self.get_leases())
num_leases_removed = 0
for i,lease in enumerate(leases):
if timing_safe_compare(lease.cancel_secret, cancel_secret):
if lease.is_cancel_secret(cancel_secret):
leases[i] = None
num_leases_removed += 1
if not num_leases_removed:
@ -219,7 +233,7 @@ class ShareFile(object):
@implementer(RIBucketWriter)
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def __init__(self, ss, incominghome, finalhome, max_size, lease_info):
def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock):
self.ss = ss
self.incominghome = incominghome
self.finalhome = finalhome
@ -231,12 +245,16 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
# added by simultaneous uploaders
self._sharefile.add_lease(lease_info)
self._already_written = RangeMap()
self._clock = clock
self._timeout = clock.callLater(30 * 60, self._abort_due_to_timeout)
def allocated_size(self):
return self._max_size
def remote_write(self, offset, data):
start = time.time()
# Delay the timeout, since we received data:
self._timeout.reset(30 * 60)
start = self._clock.seconds()
precondition(not self.closed)
if self.throw_out_all_data:
return
@ -254,12 +272,16 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self._sharefile.write_share_data(offset, data)
self._already_written.set(True, offset, end)
self.ss.add_latency("write", time.time() - start)
self.ss.add_latency("write", self._clock.seconds() - start)
self.ss.count("write")
def remote_close(self):
self.close()
def close(self):
precondition(not self.closed)
start = time.time()
self._timeout.cancel()
start = self._clock.seconds()
fileutil.make_dirs(os.path.dirname(self.finalhome))
fileutil.rename(self.incominghome, self.finalhome)
@ -292,20 +314,28 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen)
self.ss.add_latency("close", time.time() - start)
self.ss.add_latency("close", self._clock.seconds() - start)
self.ss.count("close")
def disconnected(self):
if not self.closed:
self._abort()
self.abort()
def _abort_due_to_timeout(self):
"""
Called if we run out of time.
"""
log.msg("storage: aborting sharefile %s due to timeout" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
def remote_abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self._abort()
self.abort()
self.ss.count("abort")
def _abort(self):
def abort(self):
if self.closed:
return
@ -323,6 +353,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self.closed = True
self.ss.bucket_writer_closed(self, 0)
# Cancel timeout if it wasn't already cancelled.
if self._timeout.active():
self._timeout.cancel()
@implementer(RIBucketReader)
class BucketReader(Referenceable): # type: ignore # warner/foolscap#78

View File

@ -15,6 +15,8 @@ import struct, time
import attr
from allmydata.util.hashutil import timing_safe_compare
@attr.s(frozen=True)
class LeaseInfo(object):
"""
@ -68,6 +70,26 @@ class LeaseInfo(object):
_expiration_time=new_expire_time,
)
def is_renew_secret(self, candidate_secret):
# type: (bytes) -> bool
"""
Check a string to see if it is the correct renew secret.
:return: ``True`` if it is the correct renew secret, ``False``
otherwise.
"""
return timing_safe_compare(self.renew_secret, candidate_secret)
def is_cancel_secret(self, candidate_secret):
# type: (bytes) -> bool
"""
Check a string to see if it is the correct cancel secret.
:return: ``True`` if it is the correct cancel secret, ``False``
otherwise.
"""
return timing_safe_compare(self.cancel_secret, candidate_secret)
def get_grant_renew_time_time(self):
# hack, based upon fixed 31day expiration period
return self._expiration_time - 31*24*60*60

View File

@ -67,6 +67,20 @@ class MutableShareFile(object):
MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
# TODO: decide upon a policy for max share size
@classmethod
def is_valid_header(cls, header):
# type: (bytes) -> bool
"""
Determine if the given bytes constitute a valid header for this type of
container.
:param header: Some bytes from the beginning of a container.
:return: ``True`` if the bytes could belong to this container,
``False`` otherwise.
"""
return header.startswith(cls.MAGIC)
def __init__(self, filename, parent=None):
self.home = filename
if os.path.exists(self.home):
@ -77,7 +91,7 @@ class MutableShareFile(object):
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
struct.unpack(">32s20s32sQQ", data)
if magic != self.MAGIC:
if not self.is_valid_header(data):
msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
(filename, magic, self.MAGIC)
raise UnknownMutableContainerVersionError(msg)
@ -313,7 +327,7 @@ class MutableShareFile(object):
accepting_nodeids = set()
with open(self.home, 'rb+') as f:
for (leasenum,lease) in self._enumerate_leases(f):
if timing_safe_compare(lease.renew_secret, renew_secret):
if lease.is_renew_secret(renew_secret):
# yup. See if we need to update the owner time.
if allow_backdate or new_expire_time > lease.get_expiration_time():
# yes
@ -357,7 +371,7 @@ class MutableShareFile(object):
with open(self.home, 'rb+') as f:
for (leasenum,lease) in self._enumerate_leases(f):
accepting_nodeids.add(lease.nodeid)
if timing_safe_compare(lease.cancel_secret, cancel_secret):
if lease.is_cancel_secret(cancel_secret):
self._write_lease_record(f, leasenum, blank_lease)
modified += 1
else:
@ -388,7 +402,7 @@ class MutableShareFile(object):
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
struct.unpack(">32s20s32sQQ", data)
assert magic == self.MAGIC
assert self.is_valid_header(data)
return (write_enabler, write_enabler_nodeid)
def readv(self, readv):

View File

@ -14,12 +14,13 @@ if PY2:
else:
from typing import Dict
import os, re, struct, time
import os, re
import six
from foolscap.api import Referenceable
from foolscap.ipb import IRemoteReference
from twisted.application import service
from twisted.internet import reactor
from zope.interface import implementer
from allmydata.interfaces import RIStorageServer, IStatsProducer
@ -57,6 +58,9 @@ DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
@implementer(RIStorageServer, IStatsProducer)
class StorageServer(service.MultiService, Referenceable):
"""
A filesystem-based implementation of ``RIStorageServer``.
"""
name = 'storage'
LeaseCheckerClass = LeaseCheckingCrawler
@ -68,7 +72,7 @@ class StorageServer(service.MultiService, Referenceable):
expiration_override_lease_duration=None,
expiration_cutoff_date=None,
expiration_sharetypes=("mutable", "immutable"),
get_current_time=time.time):
clock=reactor):
service.MultiService.__init__(self)
assert isinstance(nodeid, bytes)
assert len(nodeid) == 20
@ -119,7 +123,7 @@ class StorageServer(service.MultiService, Referenceable):
expiration_cutoff_date,
expiration_sharetypes)
self.lease_checker.setServiceParent(self)
self._get_current_time = get_current_time
self._clock = clock
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
# to connection: when disconnection happens, the BucketWriters are
@ -132,6 +136,12 @@ class StorageServer(service.MultiService, Referenceable):
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
def stopService(self):
# Cancel any in-progress uploads:
for bw in list(self._bucket_writers.values()):
bw.disconnected()
return service.MultiService.stopService(self)
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
@ -277,14 +287,19 @@ class StorageServer(service.MultiService, Referenceable):
def _allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
owner_num=0):
owner_num=0, renew_leases=True):
"""
Generic bucket allocation API.
:param bool renew_leases: If and only if this is ``True`` then renew a
secret-matching lease on (or, if none match, add a new lease to)
existing shares in this bucket. Any *new* shares are given a new
lease regardless.
"""
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
start = self._get_current_time()
start = self._clock.seconds()
self.count("allocate")
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
@ -297,7 +312,7 @@ class StorageServer(service.MultiService, Referenceable):
# goes into the share files themselves. It could also be put into a
# separate database. Note that the lease should not be added until
# the BucketWriter has been closed.
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -319,8 +334,9 @@ class StorageServer(service.MultiService, Referenceable):
# file, they'll want us to hold leases for this file.
for (shnum, fn) in self._get_bucket_shares(storage_index):
alreadygot.add(shnum)
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
if renew_leases:
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
@ -337,7 +353,8 @@ class StorageServer(service.MultiService, Referenceable):
elif (not limited) or (remaining_space >= max_space_per_bucket):
# ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome,
max_space_per_bucket, lease_info)
max_space_per_bucket, lease_info,
clock=self._clock)
if self.no_storage:
bw.throw_out_all_data = True
bucketwriters[shnum] = bw
@ -351,7 +368,7 @@ class StorageServer(service.MultiService, Referenceable):
if bucketwriters:
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
self.add_latency("allocate", self._get_current_time() - start)
self.add_latency("allocate", self._clock.seconds() - start)
return alreadygot, bucketwriters
def remote_allocate_buckets(self, storage_index,
@ -361,7 +378,7 @@ class StorageServer(service.MultiService, Referenceable):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
@ -373,12 +390,12 @@ class StorageServer(service.MultiService, Referenceable):
for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f:
header = f.read(32)
if header[:32] == MutableShareFile.MAGIC:
if MutableShareFile.is_valid_header(header):
sf = MutableShareFile(filename, self)
# note: if the share has been migrated, the renew_lease()
# call will throw an exception, with information to help the
# client update the lease.
elif header[:4] == struct.pack(">L", 1):
elif ShareFile.is_valid_header(header):
sf = ShareFile(filename)
else:
continue # non-sharefile
@ -386,26 +403,26 @@ class StorageServer(service.MultiService, Referenceable):
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
start = self._get_current_time()
start = self._clock.seconds()
self.count("add-lease")
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
new_expire_time, self.my_nodeid)
for sf in self._iter_share_files(storage_index):
sf.add_or_renew_lease(lease_info)
self.add_latency("add-lease", self._get_current_time() - start)
self.add_latency("add-lease", self._clock.seconds() - start)
return None
def remote_renew_lease(self, storage_index, renew_secret):
start = self._get_current_time()
start = self._clock.seconds()
self.count("renew")
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
found_buckets = False
for sf in self._iter_share_files(storage_index):
found_buckets = True
sf.renew_lease(renew_secret, new_expire_time)
self.add_latency("renew", self._get_current_time() - start)
self.add_latency("renew", self._clock.seconds() - start)
if not found_buckets:
raise IndexError("no such lease to renew")
@ -432,7 +449,7 @@ class StorageServer(service.MultiService, Referenceable):
pass
def remote_get_buckets(self, storage_index):
start = self._get_current_time()
start = self._clock.seconds()
self.count("get")
si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %r" % si_s)
@ -440,7 +457,7 @@ class StorageServer(service.MultiService, Referenceable):
for shnum, filename in self._get_bucket_shares(storage_index):
bucketreaders[shnum] = BucketReader(self, filename,
storage_index, shnum)
self.add_latency("get", self._get_current_time() - start)
self.add_latency("get", self._clock.seconds() - start)
return bucketreaders
def get_leases(self, storage_index):
@ -579,10 +596,8 @@ class StorageServer(service.MultiService, Referenceable):
else:
if sharenum not in shares:
# allocate a new share
allocated_size = 2000 # arbitrary, really
share = self._allocate_slot_share(bucketdir, secrets,
sharenum,
allocated_size,
owner_num=0)
shares[sharenum] = share
shares[sharenum].writev(datav, new_length)
@ -601,7 +616,7 @@ class StorageServer(service.MultiService, Referenceable):
:return LeaseInfo: Information for a new lease for a share.
"""
ownerid = 1 # TODO
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(ownerid,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -631,13 +646,15 @@ class StorageServer(service.MultiService, Referenceable):
Read data from shares and conditionally write some data to them.
:param bool renew_leases: If and only if this is ``True`` and the test
vectors pass then shares in this slot will also have an updated
lease applied to them.
vectors pass then shares mentioned in ``test_and_write_vectors``
that still exist after the changes are made will also have a
secret-matching lease renewed (or, if none match, a new lease
added).
See ``allmydata.interfaces.RIStorageServer`` for details about other
parameters and return value.
"""
start = self._get_current_time()
start = self._clock.seconds()
self.count("writev")
si_s = si_b2a(storage_index)
log.msg("storage: slot_writev %r" % si_s)
@ -678,7 +695,7 @@ class StorageServer(service.MultiService, Referenceable):
self._add_or_renew_leases(remaining_shares, lease_info)
# all done
self.add_latency("writev", self._get_current_time() - start)
self.add_latency("writev", self._clock.seconds() - start)
return (testv_is_good, read_data)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
@ -694,7 +711,7 @@ class StorageServer(service.MultiService, Referenceable):
)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
allocated_size, owner_num=0):
owner_num=0):
(write_enabler, renew_secret, cancel_secret) = secrets
my_nodeid = self.my_nodeid
fileutil.make_dirs(bucketdir)
@ -704,7 +721,7 @@ class StorageServer(service.MultiService, Referenceable):
return share
def remote_slot_readv(self, storage_index, shares, readv):
start = self._get_current_time()
start = self._clock.seconds()
self.count("readv")
si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_readv %r %r" % (si_s, shares),
@ -713,7 +730,7 @@ class StorageServer(service.MultiService, Referenceable):
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
if not os.path.isdir(bucketdir):
self.add_latency("readv", self._get_current_time() - start)
self.add_latency("readv", self._clock.seconds() - start)
return {}
datavs = {}
for sharenum_s in os.listdir(bucketdir):
@ -727,7 +744,7 @@ class StorageServer(service.MultiService, Referenceable):
datavs[sharenum] = msf.readv(readv)
log.msg("returning shares %s" % (list(datavs.keys()),),
facility="tahoe.storage", level=log.NOISY, parent=lp)
self.add_latency("readv", self._get_current_time() - start)
self.add_latency("readv", self._clock.seconds() - start)
return datavs
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,

View File

@ -17,8 +17,7 @@ from allmydata.storage.immutable import ShareFile
def get_share_file(filename):
with open(filename, "rb") as f:
prefix = f.read(32)
if prefix == MutableShareFile.MAGIC:
if MutableShareFile.is_valid_header(prefix):
return MutableShareFile(filename)
# otherwise assume it's immutable
return ShareFile(filename)

View File

@ -125,5 +125,5 @@ if sys.platform == "win32":
initialize()
from eliot import to_file
from allmydata.util.jsonbytes import AnyBytesJSONEncoder
to_file(open("eliot.log", "wb"), encoder=AnyBytesJSONEncoder)
from allmydata.util.eliotutil import eliot_json_encoder
to_file(open("eliot.log", "wb"), encoder=eliot_json_encoder)

View File

@ -11,16 +11,24 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import os
import mock
try:
from typing import Any, List, Tuple
except ImportError:
pass
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.python import usage
from allmydata.util import configutil
from allmydata.util import tor_provider, i2p_provider
from ..common_util import run_cli, parse_cli
from ..common import (
disable_modules,
)
from ...scripts import create_node
from ... import client
def read_config(basedir):
tahoe_cfg = os.path.join(basedir, "tahoe.cfg")
config = configutil.get_config(tahoe_cfg)
@ -105,11 +113,12 @@ class Config(unittest.TestCase):
@defer.inlineCallbacks
def test_client_hide_ip_no_i2p_txtorcon(self):
# hmm, I must be doing something weird, these don't work as
# @mock.patch decorators for some reason
txi2p = mock.patch('allmydata.util.i2p_provider._import_txi2p', return_value=None)
txtorcon = mock.patch('allmydata.util.tor_provider._import_txtorcon', return_value=None)
with txi2p, txtorcon:
"""
The ``create-client`` sub-command tells the user to install the necessary
dependencies if they have neither tor nor i2p support installed and
they request network location privacy with the ``--hide-ip`` flag.
"""
with disable_modules("txi2p", "txtorcon"):
basedir = self.mktemp()
rc, out, err = yield run_cli("create-client", "--hide-ip", basedir)
self.assertTrue(rc != 0, out)
@ -118,8 +127,7 @@ class Config(unittest.TestCase):
@defer.inlineCallbacks
def test_client_i2p_option_no_txi2p(self):
txi2p = mock.patch('allmydata.util.i2p_provider._import_txi2p', return_value=None)
with txi2p:
with disable_modules("txi2p"):
basedir = self.mktemp()
rc, out, err = yield run_cli("create-node", "--listen=i2p", "--i2p-launch", basedir)
self.assertTrue(rc != 0)
@ -127,8 +135,7 @@ class Config(unittest.TestCase):
@defer.inlineCallbacks
def test_client_tor_option_no_txtorcon(self):
txtorcon = mock.patch('allmydata.util.tor_provider._import_txtorcon', return_value=None)
with txtorcon:
with disable_modules("txtorcon"):
basedir = self.mktemp()
rc, out, err = yield run_cli("create-node", "--listen=tor", "--tor-launch", basedir)
self.assertTrue(rc != 0)
@ -145,9 +152,7 @@ class Config(unittest.TestCase):
@defer.inlineCallbacks
def test_client_hide_ip_no_txtorcon(self):
txtorcon = mock.patch('allmydata.util.tor_provider._import_txtorcon',
return_value=None)
with txtorcon:
with disable_modules("txtorcon"):
basedir = self.mktemp()
rc, out, err = yield run_cli("create-client", "--hide-ip", basedir)
self.assertEqual(0, rc)
@ -295,11 +300,10 @@ class Config(unittest.TestCase):
def test_node_slow_tor(self):
basedir = self.mktemp()
d = defer.Deferred()
with mock.patch("allmydata.util.tor_provider.create_config",
return_value=d):
d2 = run_cli("create-node", "--listen=tor", basedir)
d.callback(({}, "port", "location"))
rc, out, err = yield d2
self.patch(tor_provider, "create_config", lambda *a, **kw: d)
d2 = run_cli("create-node", "--listen=tor", basedir)
d.callback(({}, "port", "location"))
rc, out, err = yield d2
self.assertEqual(rc, 0)
self.assertIn("Node created", out)
self.assertEqual(err, "")
@ -308,11 +312,10 @@ class Config(unittest.TestCase):
def test_node_slow_i2p(self):
basedir = self.mktemp()
d = defer.Deferred()
with mock.patch("allmydata.util.i2p_provider.create_config",
return_value=d):
d2 = run_cli("create-node", "--listen=i2p", basedir)
d.callback(({}, "port", "location"))
rc, out, err = yield d2
self.patch(i2p_provider, "create_config", lambda *a, **kw: d)
d2 = run_cli("create-node", "--listen=i2p", basedir)
d.callback(({}, "port", "location"))
rc, out, err = yield d2
self.assertEqual(rc, 0)
self.assertIn("Node created", out)
self.assertEqual(err, "")
@ -353,6 +356,27 @@ class Config(unittest.TestCase):
self.assertIn("is not empty", err)
self.assertIn("To avoid clobbering anything, I am going to quit now", err)
def fake_config(testcase, module, result):
# type: (unittest.TestCase, Any, Any) -> List[Tuple]
"""
Monkey-patch a fake configuration function into the given module.
:param testcase: The test case to use to do the monkey-patching.
:param module: The module into which to patch the fake function.
:param result: The return value for the fake function.
:return: A list of tuples of the arguments the fake function was called
with.
"""
calls = []
def fake_config(reactor, cli_config):
calls.append((reactor, cli_config))
return result
testcase.patch(module, "create_config", fake_config)
return calls
class Tor(unittest.TestCase):
def test_default(self):
basedir = self.mktemp()
@ -360,12 +384,14 @@ class Tor(unittest.TestCase):
tor_port = "ghi"
tor_location = "jkl"
config_d = defer.succeed( (tor_config, tor_port, tor_location) )
with mock.patch("allmydata.util.tor_provider.create_config",
return_value=config_d) as co:
rc, out, err = self.successResultOf(
run_cli("create-node", "--listen=tor", basedir))
self.assertEqual(len(co.mock_calls), 1)
args = co.mock_calls[0][1]
calls = fake_config(self, tor_provider, config_d)
rc, out, err = self.successResultOf(
run_cli("create-node", "--listen=tor", basedir),
)
self.assertEqual(len(calls), 1)
args = calls[0]
self.assertIdentical(args[0], reactor)
self.assertIsInstance(args[1], create_node.CreateNodeOptions)
self.assertEqual(args[1]["listen"], "tor")
@ -380,12 +406,15 @@ class Tor(unittest.TestCase):
tor_port = "ghi"
tor_location = "jkl"
config_d = defer.succeed( (tor_config, tor_port, tor_location) )
with mock.patch("allmydata.util.tor_provider.create_config",
return_value=config_d) as co:
rc, out, err = self.successResultOf(
run_cli("create-node", "--listen=tor", "--tor-launch",
basedir))
args = co.mock_calls[0][1]
calls = fake_config(self, tor_provider, config_d)
rc, out, err = self.successResultOf(
run_cli(
"create-node", "--listen=tor", "--tor-launch",
basedir,
),
)
args = calls[0]
self.assertEqual(args[1]["listen"], "tor")
self.assertEqual(args[1]["tor-launch"], True)
self.assertEqual(args[1]["tor-control-port"], None)
@ -396,12 +425,15 @@ class Tor(unittest.TestCase):
tor_port = "ghi"
tor_location = "jkl"
config_d = defer.succeed( (tor_config, tor_port, tor_location) )
with mock.patch("allmydata.util.tor_provider.create_config",
return_value=config_d) as co:
rc, out, err = self.successResultOf(
run_cli("create-node", "--listen=tor", "--tor-control-port=mno",
basedir))
args = co.mock_calls[0][1]
calls = fake_config(self, tor_provider, config_d)
rc, out, err = self.successResultOf(
run_cli(
"create-node", "--listen=tor", "--tor-control-port=mno",
basedir,
),
)
args = calls[0]
self.assertEqual(args[1]["listen"], "tor")
self.assertEqual(args[1]["tor-launch"], False)
self.assertEqual(args[1]["tor-control-port"], "mno")
@ -434,12 +466,13 @@ class I2P(unittest.TestCase):
i2p_port = "ghi"
i2p_location = "jkl"
dest_d = defer.succeed( (i2p_config, i2p_port, i2p_location) )
with mock.patch("allmydata.util.i2p_provider.create_config",
return_value=dest_d) as co:
rc, out, err = self.successResultOf(
run_cli("create-node", "--listen=i2p", basedir))
self.assertEqual(len(co.mock_calls), 1)
args = co.mock_calls[0][1]
calls = fake_config(self, i2p_provider, dest_d)
rc, out, err = self.successResultOf(
run_cli("create-node", "--listen=i2p", basedir),
)
self.assertEqual(len(calls), 1)
args = calls[0]
self.assertIdentical(args[0], reactor)
self.assertIsInstance(args[1], create_node.CreateNodeOptions)
self.assertEqual(args[1]["listen"], "i2p")
@ -461,12 +494,15 @@ class I2P(unittest.TestCase):
i2p_port = "ghi"
i2p_location = "jkl"
dest_d = defer.succeed( (i2p_config, i2p_port, i2p_location) )
with mock.patch("allmydata.util.i2p_provider.create_config",
return_value=dest_d) as co:
rc, out, err = self.successResultOf(
run_cli("create-node", "--listen=i2p", "--i2p-sam-port=mno",
basedir))
args = co.mock_calls[0][1]
calls = fake_config(self, i2p_provider, dest_d)
rc, out, err = self.successResultOf(
run_cli(
"create-node", "--listen=i2p", "--i2p-sam-port=mno",
basedir,
),
)
args = calls[0]
self.assertEqual(args[1]["listen"], "i2p")
self.assertEqual(args[1]["i2p-launch"], False)
self.assertEqual(args[1]["i2p-sam-port"], "mno")

View File

@ -28,6 +28,7 @@ __all__ = [
import sys
import os, random, struct
from contextlib import contextmanager
import six
import tempfile
from tempfile import mktemp
@ -267,8 +268,12 @@ class UseNode(object):
node_config = attr.ib(default=attr.Factory(dict))
config = attr.ib(default=None)
reactor = attr.ib(default=None)
def setUp(self):
self.assigner = SameProcessStreamEndpointAssigner()
self.assigner.setUp()
def format_config_items(config):
return "\n".join(
" = ".join((key, value))
@ -292,6 +297,23 @@ class UseNode(object):
"default",
self.introducer_furl,
)
node_config = self.node_config.copy()
if "tub.port" not in node_config:
if "tub.location" in node_config:
raise ValueError(
"UseNode fixture does not support specifying tub.location "
"without tub.port"
)
# Don't use the normal port auto-assignment logic. It produces
# collisions and makes tests fail spuriously.
tub_location, tub_endpoint = self.assigner.assign(self.reactor)
node_config.update({
"tub.port": tub_endpoint,
"tub.location": tub_location,
})
self.config = config_from_string(
self.basedir.asTextMode().path,
"tub.port",
@ -304,7 +326,7 @@ storage.plugins = {storage_plugin}
{plugin_config_section}
""".format(
storage_plugin=self.storage_plugin,
node_config=format_config_items(self.node_config),
node_config=format_config_items(node_config),
plugin_config_section=plugin_config_section,
)
)
@ -316,7 +338,7 @@ storage.plugins = {storage_plugin}
)
def cleanUp(self):
pass
self.assigner.tearDown()
def getDetails(self):
@ -1068,7 +1090,7 @@ def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
def _corrupt_mutable_share_data(data, debug=False):
prefix = data[:32]
assert prefix == MutableShareFile.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
assert MutableShareFile.is_valid_header(prefix), "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
data_offset = MutableShareFile.DATA_OFFSET
sharetype = data[data_offset:data_offset+1]
assert sharetype == b"\x00", "non-SDMF mutable shares not supported"
@ -1213,6 +1235,29 @@ class ConstantAddresses(object):
raise Exception("{!r} has no client endpoint.")
return self._handler
@contextmanager
def disable_modules(*names):
"""
A context manager which makes modules appear to be missing while it is
active.
:param *names: The names of the modules to disappear. Only top-level
modules are supported (that is, "." is not allowed in any names).
This is an implementation shortcoming which could be lifted if
desired.
"""
if any("." in name for name in names):
raise ValueError("Names containing '.' are not supported.")
missing = object()
modules = list(sys.modules.get(n, missing) for n in names)
for n in names:
sys.modules[n] = None
yield
for n, original in zip(names, modules):
if original is missing:
del sys.modules[n]
else:
sys.modules[n] = original
class _TestCaseMixin(object):
"""

View File

@ -672,11 +672,14 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
"""
iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir):
_, port_endpoint = self.port_assigner.assign(reactor)
_, web_port_endpoint = self.port_assigner.assign(reactor)
main_location_hint, main_port_endpoint = self.port_assigner.assign(reactor)
introducer_config = (
u"[node]\n"
u"nickname = introducer \N{BLACK SMILING FACE}\n" +
u"web.port = {}\n".format(port_endpoint)
u"web.port = {}\n".format(web_port_endpoint) +
u"tub.port = {}\n".format(main_port_endpoint) +
u"tub.location = {}\n".format(main_location_hint)
).encode("utf-8")
fileutil.make_dirs(iv_dir)
@ -764,13 +767,15 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
def _generate_config(self, which, basedir):
config = {}
except1 = set(range(self.numclients)) - {1}
allclients = set(range(self.numclients))
except1 = allclients - {1}
feature_matrix = {
("client", "nickname"): except1,
# client 1 has to auto-assign an address.
("node", "tub.port"): except1,
("node", "tub.location"): except1,
# Auto-assigning addresses is extremely failure prone and not
# amenable to automated testing in _this_ manner.
("node", "tub.port"): allclients,
("node", "tub.location"): allclients,
# client 0 runs a webserver and a helper
# client 3 runs a webserver but no helper
@ -852,7 +857,13 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# connection-lost code
basedir = FilePath(self.getdir("client%d" % client_num))
basedir.makedirs()
config = "[client]\n"
config = (
"[node]\n"
"tub.location = {}\n"
"tub.port = {}\n"
"[client]\n"
).format(*self.port_assigner.assign(reactor))
if helper_furl:
config += "helper.furl = %s\n" % helper_furl
basedir.child("tahoe.cfg").setContent(config.encode("utf-8"))

View File

@ -42,7 +42,6 @@ from zope.interface import (
from eliot import (
ActionType,
Field,
MemoryLogger,
ILogger,
)
from eliot.testing import (
@ -54,8 +53,9 @@ from twisted.python.monkey import (
MonkeyPatcher,
)
from ..util.jsonbytes import AnyBytesJSONEncoder
from ..util.eliotutil import (
MemoryLogger,
)
_NAME = Field.for_types(
u"name",
@ -71,14 +71,6 @@ RUN_TEST = ActionType(
)
# On Python 3, we want to use our custom JSON encoder when validating messages
# can be encoded to JSON:
if PY2:
_memory_logger = MemoryLogger
else:
_memory_logger = lambda: MemoryLogger(encoder=AnyBytesJSONEncoder)
@attr.s
class EliotLoggedRunTest(object):
"""
@ -170,7 +162,7 @@ def with_logging(
"""
@wraps(test_method)
def run_with_logging(*args, **kwargs):
validating_logger = _memory_logger()
validating_logger = MemoryLogger()
original = swap_logger(None)
try:
swap_logger(_TwoLoggers(original, validating_logger))

View File

@ -89,6 +89,7 @@ from .common import (
UseTestPlugins,
MemoryIntroducerClient,
get_published_announcements,
UseNode,
)
from .matchers import (
MatchesSameElements,
@ -953,13 +954,14 @@ class Run(unittest.TestCase, testutil.StallMixin):
@defer.inlineCallbacks
def test_reloadable(self):
basedir = FilePath("test_client.Run.test_reloadable")
private = basedir.child("private")
private.makedirs()
from twisted.internet import reactor
dummy = "pb://wl74cyahejagspqgy4x5ukrvfnevlknt@127.0.0.1:58889/bogus"
write_introducer(basedir, "someintroducer", dummy)
basedir.child("tahoe.cfg").setContent(BASECONFIG. encode("ascii"))
c1 = yield client.create_client(basedir.path)
fixture = UseNode(None, None, FilePath(self.mktemp()), dummy, reactor=reactor)
fixture.setUp()
self.addCleanup(fixture.cleanUp)
c1 = yield fixture.create_node()
c1.setServiceParent(self.sparent)
# delay to let the service start up completely. I'm not entirely sure
@ -981,7 +983,7 @@ class Run(unittest.TestCase, testutil.StallMixin):
# also change _check_exit_trigger to use it instead of a raw
# reactor.stop, also instrument the shutdown event in an
# attribute that we can check.)
c2 = yield client.create_client(basedir.path)
c2 = yield fixture.create_node()
c2.setServiceParent(self.sparent)
yield c2.disownServiceParent()

View File

@ -10,16 +10,30 @@ from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import sys
import random
import unittest
from hypothesis import given
from hypothesis.strategies import lists, sampled_from
from testtools.matchers import Equals
from twisted.python.reflect import (
ModuleNotFound,
namedAny,
)
from .common import (
SyncTestCase,
disable_modules,
)
from allmydata.test.common_util import flip_one_bit
class TestFlipOneBit(unittest.TestCase):
class TestFlipOneBit(SyncTestCase):
def setUp(self):
random.seed(42) # I tried using version=1 on PY3 to avoid the if below, to no avail.
super(TestFlipOneBit, self).setUp()
# I tried using version=1 on PY3 to avoid the if below, to no avail.
random.seed(42)
def test_accepts_byte_string(self):
actual = flip_one_bit(b'foo')
@ -27,3 +41,61 @@ class TestFlipOneBit(unittest.TestCase):
def test_rejects_unicode_string(self):
self.assertRaises(AssertionError, flip_one_bit, u'foo')
def some_existing_modules():
"""
Build the names of modules (as native strings) that exist and can be
imported.
"""
candidates = sorted(
name
for name
in sys.modules
if "." not in name
and sys.modules[name] is not None
)
return sampled_from(candidates)
class DisableModulesTests(SyncTestCase):
"""
Tests for ``disable_modules``.
"""
def setup_example(self):
return sys.modules.copy()
def teardown_example(self, safe_modules):
sys.modules.update(safe_modules)
@given(lists(some_existing_modules(), unique=True))
def test_importerror(self, module_names):
"""
While the ``disable_modules`` context manager is active any import of the
modules identified by the names passed to it result in ``ImportError``
being raised.
"""
def get_modules():
return list(
namedAny(name)
for name
in module_names
)
before_modules = get_modules()
with disable_modules(*module_names):
for name in module_names:
with self.assertRaises(ModuleNotFound):
namedAny(name)
after_modules = get_modules()
self.assertThat(before_modules, Equals(after_modules))
def test_dotted_names_rejected(self):
"""
If names with "." in them are passed to ``disable_modules`` then
``ValueError`` is raised.
"""
with self.assertRaises(ValueError):
with disable_modules("foo.bar"):
pass

View File

@ -27,13 +27,12 @@ from fixtures import (
)
from testtools import (
TestCase,
)
from testtools import (
TestResult,
)
from testtools.matchers import (
Is,
IsInstance,
Not,
MatchesStructure,
Equals,
HasLength,
@ -65,11 +64,11 @@ from twisted.internet.task import deferLater
from twisted.internet import reactor
from ..util.eliotutil import (
eliot_json_encoder,
log_call_deferred,
_parse_destination_description,
_EliotLogging,
)
from ..util.jsonbytes import AnyBytesJSONEncoder
from .common import (
SyncTestCase,
@ -77,24 +76,105 @@ from .common import (
)
class EliotLoggedTestTests(AsyncTestCase):
def passes():
"""
Create a matcher that matches a ``TestCase`` that runs without failures or
errors.
"""
def run(case):
result = TestResult()
case.run(result)
return result.wasSuccessful()
return AfterPreprocessing(run, Equals(True))
class EliotLoggedTestTests(TestCase):
"""
Tests for the automatic log-related provided by ``AsyncTestCase``.
This class uses ``testtools.TestCase`` because it is inconvenient to nest
``AsyncTestCase`` inside ``AsyncTestCase`` (in particular, Eliot messages
emitted by the inner test case get observed by the outer test case and if
an inner case emits invalid messages they cause the outer test case to
fail).
"""
def test_fails(self):
"""
A test method of an ``AsyncTestCase`` subclass can fail.
"""
class UnderTest(AsyncTestCase):
def test_it(self):
self.fail("make sure it can fail")
self.assertThat(UnderTest("test_it"), Not(passes()))
def test_unserializable_fails(self):
"""
A test method of an ``AsyncTestCase`` subclass that logs an unserializable
value with Eliot fails.
"""
class world(object):
"""
an unserializable object
"""
class UnderTest(AsyncTestCase):
def test_it(self):
Message.log(hello=world)
self.assertThat(UnderTest("test_it"), Not(passes()))
def test_logs_non_utf_8_byte(self):
"""
A test method of an ``AsyncTestCase`` subclass can log a message that
contains a non-UTF-8 byte string and return ``None`` and pass.
"""
class UnderTest(AsyncTestCase):
def test_it(self):
Message.log(hello=b"\xFF")
self.assertThat(UnderTest("test_it"), passes())
def test_returns_none(self):
Message.log(hello="world")
"""
A test method of an ``AsyncTestCase`` subclass can log a message and
return ``None`` and pass.
"""
class UnderTest(AsyncTestCase):
def test_it(self):
Message.log(hello="world")
self.assertThat(UnderTest("test_it"), passes())
def test_returns_fired_deferred(self):
Message.log(hello="world")
return succeed(None)
"""
A test method of an ``AsyncTestCase`` subclass can log a message and
return an already-fired ``Deferred`` and pass.
"""
class UnderTest(AsyncTestCase):
def test_it(self):
Message.log(hello="world")
return succeed(None)
self.assertThat(UnderTest("test_it"), passes())
def test_returns_unfired_deferred(self):
Message.log(hello="world")
# @eliot_logged_test automatically gives us an action context but it's
# still our responsibility to maintain it across stack-busting
# operations.
d = DeferredContext(deferLater(reactor, 0.0, lambda: None))
d.addCallback(lambda ignored: Message.log(goodbye="world"))
# We didn't start an action. We're not finishing an action.
return d.result
"""
A test method of an ``AsyncTestCase`` subclass can log a message and
return an unfired ``Deferred`` and pass when the ``Deferred`` fires.
"""
class UnderTest(AsyncTestCase):
def test_it(self):
Message.log(hello="world")
# @eliot_logged_test automatically gives us an action context
# but it's still our responsibility to maintain it across
# stack-busting operations.
d = DeferredContext(deferLater(reactor, 0.0, lambda: None))
d.addCallback(lambda ignored: Message.log(goodbye="world"))
# We didn't start an action. We're not finishing an action.
return d.result
self.assertThat(UnderTest("test_it"), passes())
class ParseDestinationDescriptionTests(SyncTestCase):
@ -109,7 +189,7 @@ class ParseDestinationDescriptionTests(SyncTestCase):
reactor = object()
self.assertThat(
_parse_destination_description("file:-")(reactor),
Equals(FileDestination(stdout, encoder=AnyBytesJSONEncoder)),
Equals(FileDestination(stdout, encoder=eliot_json_encoder)),
)

View File

@ -21,6 +21,7 @@ if PY2:
from random import Random
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import Clock
from foolscap.api import Referenceable, RemoteException
@ -1017,16 +1018,17 @@ class _FoolscapMixin(SystemTestMixin):
self.server = s
break
assert self.server is not None, "Couldn't find StorageServer"
self._current_time = 123456
self.server._get_current_time = self.fake_time
self._clock = Clock()
self._clock.advance(123456)
self.server._clock = self._clock
def fake_time(self):
"""Return the current fake, test-controlled, time."""
return self._current_time
return self._clock.seconds()
def fake_sleep(self, seconds):
"""Advance the fake time by the given number of seconds."""
self._current_time += seconds
self._clock.advance(seconds)
@inlineCallbacks
def tearDown(self):

View File

@ -69,6 +69,8 @@ import allmydata.test.common_util as testutil
from .common import (
ConstantAddresses,
SameProcessStreamEndpointAssigner,
UseNode,
)
def port_numbers():
@ -80,11 +82,10 @@ class LoggingMultiService(service.MultiService):
# see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2946
def testing_tub(config_data=''):
def testing_tub(reactor, config_data=''):
"""
Creates a 'main' Tub for testing purposes, from config data
"""
from twisted.internet import reactor
basedir = 'dummy_basedir'
config = config_from_string(basedir, 'DEFAULT_PORTNUMFILE_BLANK', config_data)
fileutil.make_dirs(os.path.join(basedir, 'private'))
@ -112,6 +113,9 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
# try to bind the port. We'll use a low-numbered one that's likely to
# conflict with another service to prove it.
self._available_port = 22
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
def _test_location(
self,
@ -137,11 +141,23 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
:param local_addresses: If not ``None`` then a list of addresses to
supply to the system under test as local addresses.
"""
from twisted.internet import reactor
basedir = self.mktemp()
create_node_dir(basedir, "testing")
if tub_port is None:
# Always configure a usable tub.port address instead of relying on
# the automatic port assignment. The automatic port assignment is
# prone to collisions and spurious test failures.
_, tub_port = self.port_assigner.assign(reactor)
config_data = "[node]\n"
if tub_port:
config_data += "tub.port = {}\n".format(tub_port)
config_data += "tub.port = {}\n".format(tub_port)
# If they wanted a certain location, go for it. This probably won't
# agree with the tub.port value we set but that only matters if
# anything tries to use this to establish a connection ... which
# nothing in this test suite will.
if tub_location is not None:
config_data += "tub.location = {}\n".format(tub_location)
@ -149,7 +165,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
self.patch(iputil, 'get_local_addresses_sync',
lambda: local_addresses)
tub = testing_tub(config_data)
tub = testing_tub(reactor, config_data)
class Foo(object):
pass
@ -431,7 +447,12 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
@defer.inlineCallbacks
def test_logdir_is_str(self):
basedir = "test_node/test_logdir_is_str"
from twisted.internet import reactor
basedir = FilePath(self.mktemp())
fixture = UseNode(None, None, basedir, "pb://introducer/furl", {}, reactor=reactor)
fixture.setUp()
self.addCleanup(fixture.cleanUp)
ns = Namespace()
ns.called = False
@ -440,8 +461,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
self.failUnless(isinstance(logdir, str), logdir)
self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
create_node_dir(basedir, "nothing to see here")
yield client.create_client(basedir)
yield fixture.create_node()
self.failUnless(ns.called)
def test_set_config_unescaped_furl_hash(self):

View File

@ -128,7 +128,7 @@ class Bucket(unittest.TestCase):
def test_create(self):
incoming, final = self.make_workdir("test_create")
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*25)
@ -137,7 +137,7 @@ class Bucket(unittest.TestCase):
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*7) # last block may be short
@ -155,7 +155,7 @@ class Bucket(unittest.TestCase):
incoming, final = self.make_workdir(
"test_write_past_size_errors-{}".format(i)
)
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
with self.assertRaises(DataTooLargeError):
bw.remote_write(offset, b"a" * length)
@ -174,7 +174,7 @@ class Bucket(unittest.TestCase):
expected_data = b"".join(bchr(i) for i in range(100))
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, expected_data[10:20])
@ -212,7 +212,7 @@ class Bucket(unittest.TestCase):
length = 100
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, b"1" * 10)
@ -285,6 +285,67 @@ class Bucket(unittest.TestCase):
result_of_read = br.remote_read(0, len(share_data)+1)
self.failUnlessEqual(result_of_read, share_data)
def _assert_timeout_only_after_30_minutes(self, clock, bw):
"""
The ``BucketWriter`` times out and is closed after 30 minutes, but not
sooner.
"""
self.assertFalse(bw.closed)
# 29 minutes pass. Everything is fine.
for i in range(29):
clock.advance(60)
self.assertFalse(bw.closed, "Bucket closed after only %d minutes" % (i + 1,))
# After the 30th minute, the bucket is closed due to lack of writes.
clock.advance(60)
self.assertTrue(bw.closed)
def test_bucket_expires_if_no_writes_for_30_minutes(self):
"""
If a ``BucketWriter`` receives no writes for 30 minutes, it is removed.
"""
incoming, final = self.make_workdir("test_bucket_expires")
clock = Clock()
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_writes_delay_timeout(self):
"""
So long as the ``BucketWriter`` receives writes, the the removal
timeout is put off.
"""
incoming, final = self.make_workdir("test_bucket_writes_delay_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
# 29 minutes pass, getting close to the timeout...
clock.advance(29 * 60)
# .. but we receive a write! So that should delay the timeout again to
# another 30 minutes.
bw.remote_write(0, b"hello")
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_closing_cancels_timeout(self):
"""
Closing cancels the ``BucketWriter`` timeout.
"""
incoming, final = self.make_workdir("test_bucket_close_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
self.assertTrue(clock.getDelayedCalls())
bw.close()
self.assertFalse(clock.getDelayedCalls())
def test_bucket_aborting_cancels_timeout(self):
"""
Closing cancels the ``BucketWriter`` timeout.
"""
incoming, final = self.make_workdir("test_bucket_abort_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
self.assertTrue(clock.getDelayedCalls())
bw.abort()
self.assertFalse(clock.getDelayedCalls())
class RemoteBucket(object):
def __init__(self, target):
@ -312,7 +373,7 @@ class BucketProxy(unittest.TestCase):
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease())
bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock())
rb = RemoteBucket(bw)
return bw, rb, final
@ -438,11 +499,13 @@ class Server(unittest.TestCase):
basedir = os.path.join("storage", "Server", name)
return basedir
def create(self, name, reserved_space=0, klass=StorageServer, get_current_time=time.time):
def create(self, name, reserved_space=0, klass=StorageServer, clock=None):
if clock is None:
clock = Clock()
workdir = self.workdir(name)
ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space,
stats_provider=FakeStatsProvider(),
get_current_time=get_current_time)
clock=clock)
ss.setServiceParent(self.sparent)
return ss
@ -468,14 +531,19 @@ class Server(unittest.TestCase):
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnlessIn(b'available-space', sv1)
def allocate(self, ss, storage_index, sharenums, size, canary=None):
def allocate(self, ss, storage_index, sharenums, size, renew_leases=True):
"""
Call directly into the storage server's allocate_buckets implementation,
skipping the Foolscap layer.
"""
renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
if not canary:
canary = FakeCanary()
return ss.remote_allocate_buckets(storage_index,
renew_secret, cancel_secret,
sharenums, size, canary)
return ss._allocate_buckets(
storage_index,
renew_secret, cancel_secret,
sharenums, size,
renew_leases=renew_leases,
)
def test_large_share(self):
syslow = platform.system().lower()
@ -554,7 +622,6 @@ class Server(unittest.TestCase):
writer.remote_abort()
self.failUnlessEqual(ss.allocated_size(), 0)
def test_allocate(self):
ss = self.create("test_allocate")
@ -608,6 +675,64 @@ class Server(unittest.TestCase):
for i,wb in writers.items():
wb.remote_abort()
def test_allocate_without_lease_renewal(self):
"""
``StorageServer._allocate_buckets`` does not renew leases on existing
shares if ``renew_leases`` is ``False``.
"""
first_lease = 456
second_lease = 543
storage_index = b"allocate"
clock = Clock()
clock.advance(first_lease)
ss = self.create(
"test_allocate_without_lease_renewal",
clock=clock,
)
# Put a share on there
already, writers = self.allocate(
ss, storage_index, [0], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
# It should have a lease granted at the current time.
shares = dict(ss._get_bucket_shares(storage_index))
self.assertEqual(
[first_lease],
list(
lease.get_grant_renew_time_time()
for lease
in ShareFile(shares[0]).get_leases()
),
)
# Let some time pass so we can tell if the lease on share 0 is
# renewed.
clock.advance(second_lease)
# Put another share on there.
already, writers = self.allocate(
ss, storage_index, [1], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
# The first share's lease expiration time is unchanged.
shares = dict(ss._get_bucket_shares(storage_index))
self.assertEqual(
[first_lease],
list(
lease.get_grant_renew_time_time()
for lease
in ShareFile(shares[0]).get_leases()
),
)
def test_bad_container_version(self):
ss = self.create("test_bad_container_version")
a,w = self.allocate(ss, b"si1", [0], 10)
@ -629,8 +754,17 @@ class Server(unittest.TestCase):
def test_disconnect(self):
# simulate a disconnection
ss = self.create("test_disconnect")
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
canary = FakeCanary()
already,writers = self.allocate(ss, b"disconnect", [0,1,2], 75, canary)
already,writers = ss.remote_allocate_buckets(
b"disconnect",
renew_secret,
cancel_secret,
sharenums=[0,1,2],
allocated_size=75,
canary=canary,
)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
for (f,args,kwargs) in list(canary.disconnectors.values()):
@ -662,8 +796,17 @@ class Server(unittest.TestCase):
# the size we request.
OVERHEAD = 3*4
LEASE_SIZE = 4+32+32+4
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
canary = FakeCanary()
already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary)
already, writers = ss.remote_allocate_buckets(
b"vid1",
renew_secret,
cancel_secret,
sharenums=[0,1,2],
allocated_size=1000,
canary=canary,
)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed
@ -696,7 +839,14 @@ class Server(unittest.TestCase):
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
canary3 = FakeCanary()
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3)
already3, writers3 = ss.remote_allocate_buckets(
b"vid3",
renew_secret,
cancel_secret,
sharenums=list(range(100)),
allocated_size=100,
canary=canary3,
)
self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._bucket_writers), 39)
@ -755,28 +905,28 @@ class Server(unittest.TestCase):
# Create a bucket:
rs0, cs0 = self.create_bucket_5_shares(ss, b"si0")
leases = list(ss.get_leases(b"si0"))
self.failUnlessEqual(len(leases), 1)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
(lease,) = ss.get_leases(b"si0")
self.assertTrue(lease.is_renew_secret(rs0))
rs1, cs1 = self.create_bucket_5_shares(ss, b"si1")
# take out a second lease on si1
rs2, cs2 = self.create_bucket_5_shares(ss, b"si1", 5, 0)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 2)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
(lease1, lease2) = ss.get_leases(b"si1")
self.assertTrue(lease1.is_renew_secret(rs1))
self.assertTrue(lease2.is_renew_secret(rs2))
# and a third lease, using add-lease
rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
ss.remote_add_lease(b"si1", rs2a, cs2a)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 3)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
(lease1, lease2, lease3) = ss.get_leases(b"si1")
self.assertTrue(lease1.is_renew_secret(rs1))
self.assertTrue(lease2.is_renew_secret(rs2))
self.assertTrue(lease3.is_renew_secret(rs2a))
# add-lease on a missing storage index is silently ignored
self.failUnlessEqual(ss.remote_add_lease(b"si18", b"", b""), None)
self.assertIsNone(ss.remote_add_lease(b"si18", b"", b""))
# check that si0 is readable
readers = ss.remote_get_buckets(b"si0")
@ -830,7 +980,7 @@ class Server(unittest.TestCase):
"""
clock = Clock()
clock.advance(123)
ss = self.create("test_immutable_add_lease_renews", get_current_time=clock.seconds)
ss = self.create("test_immutable_add_lease_renews", clock=clock)
# Start out with single lease created with bucket:
renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0")
@ -944,10 +1094,12 @@ class MutableServer(unittest.TestCase):
basedir = os.path.join("storage", "MutableServer", name)
return basedir
def create(self, name, get_current_time=time.time):
def create(self, name, clock=None):
workdir = self.workdir(name)
if clock is None:
clock = Clock()
ss = StorageServer(workdir, b"\x00" * 20,
get_current_time=get_current_time)
clock=clock)
ss.setServiceParent(self.sparent)
return ss
@ -1332,7 +1484,7 @@ class MutableServer(unittest.TestCase):
clock = Clock()
clock.advance(235)
ss = self.create("test_mutable_add_lease_renews",
get_current_time=clock.seconds)
clock=clock)
def secrets(n):
return ( self.write_enabler(b"we1"),
self.renew_secret(b"we1-%d" % n),
@ -3028,3 +3180,102 @@ class ShareFileTests(unittest.TestCase):
sf = self.get_sharefile()
with self.assertRaises(IndexError):
sf.cancel_lease(b"garbage")
def test_renew_secret(self):
"""
A lease loaded from a share file can have its renew secret verified.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
expiration_time = 2 ** 31
sf = self.get_sharefile()
lease = LeaseInfo(
owner_num=0,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
expiration_time=expiration_time,
)
sf.add_lease(lease)
(loaded_lease,) = sf.get_leases()
self.assertTrue(loaded_lease.is_renew_secret(renew_secret))
def test_cancel_secret(self):
"""
A lease loaded from a share file can have its cancel secret verified.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
expiration_time = 2 ** 31
sf = self.get_sharefile()
lease = LeaseInfo(
owner_num=0,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
expiration_time=expiration_time,
)
sf.add_lease(lease)
(loaded_lease,) = sf.get_leases()
self.assertTrue(loaded_lease.is_cancel_secret(cancel_secret))
class LeaseInfoTests(unittest.TestCase):
"""
Tests for ``allmydata.storage.lease.LeaseInfo``.
"""
def test_is_renew_secret(self):
"""
``LeaseInfo.is_renew_secret`` returns ``True`` if the value given is the
renew secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertTrue(lease.is_renew_secret(renew_secret))
def test_is_not_renew_secret(self):
"""
``LeaseInfo.is_renew_secret`` returns ``False`` if the value given is not
the renew secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertFalse(lease.is_renew_secret(cancel_secret))
def test_is_cancel_secret(self):
"""
``LeaseInfo.is_cancel_secret`` returns ``True`` if the value given is the
cancel secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertTrue(lease.is_cancel_secret(cancel_secret))
def test_is_not_cancel_secret(self):
"""
``LeaseInfo.is_cancel_secret`` returns ``False`` if the value given is not
the cancel secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertFalse(lease.is_cancel_secret(renew_secret))

View File

@ -0,0 +1,69 @@
"""
Tests for HTTP storage client + server.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
# fmt: off
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
# fmt: on
from unittest import SkipTest
from twisted.trial.unittest import TestCase
from twisted.internet.defer import inlineCallbacks
from treq.testing import StubTreq
from hyperlink import DecodedURL
from ..storage.server import StorageServer
from ..storage.http_server import HTTPServer
from ..storage.http_client import StorageClient, ClientException
class HTTPTests(TestCase):
"""
Tests of HTTP client talking to the HTTP server.
"""
def setUp(self):
if PY2:
raise SkipTest("Not going to bother supporting Python 2")
self.storage_server = StorageServer(self.mktemp(), b"\x00" * 20)
# TODO what should the swissnum _actually_ be?
self._http_server = HTTPServer(self.storage_server, b"abcd")
self.client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
b"abcd",
treq=StubTreq(self._http_server.get_resource()),
)
@inlineCallbacks
def test_bad_authentication(self):
"""
If the wrong swissnum is used, an ``Unauthorized`` response code is
returned.
"""
client = StorageClient(
DecodedURL.from_text("http://127.0.0.1"),
b"something wrong",
treq=StubTreq(self._http_server.get_resource()),
)
with self.assertRaises(ClientException) as e:
yield client.get_version()
self.assertEqual(e.exception.args[0], 401)
@inlineCallbacks
def test_version(self):
"""
The client can return the version.
"""
version = yield self.client.get_version()
expected_version = self.storage_server.remote_get_version()
self.assertEqual(version, expected_version)

View File

@ -23,6 +23,7 @@ from twisted.internet import defer
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.storage.server import si_a2b
from allmydata.immutable import offloaded, upload
from allmydata.immutable.literal import LiteralFileNode
@ -1290,9 +1291,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
# are sharefiles here
filename = os.path.join(dirpath, filenames[0])
# peek at the magic to see if it is a chk share
magic = open(filename, "rb").read(4)
if magic == b'\x00\x00\x00\x01':
break
with open(filename, "rb") as f:
if ShareFile.is_valid_header(f.read(32)):
break
else:
self.fail("unable to find any uri_extension files in %r"
% self.basedir)

View File

@ -553,11 +553,6 @@ class JSONBytes(unittest.TestCase):
o, cls=jsonbytes.AnyBytesJSONEncoder)),
expected,
)
self.assertEqual(
json.loads(jsonbytes.dumps(o, any_bytes=True)),
expected
)
class FakeGetVersion(object):

View File

@ -83,12 +83,18 @@ def create_introducer_webish(reactor, port_assigner, basedir):
with the node and its webish service.
"""
node.create_node_dir(basedir, "testing")
_, port_endpoint = port_assigner.assign(reactor)
main_tub_location, main_tub_endpoint = port_assigner.assign(reactor)
_, web_port_endpoint = port_assigner.assign(reactor)
with open(join(basedir, "tahoe.cfg"), "w") as f:
f.write(
"[node]\n"
"tub.location = 127.0.0.1:1\n" +
"web.port = {}\n".format(port_endpoint)
"tub.port = {main_tub_endpoint}\n"
"tub.location = {main_tub_location}\n"
"web.port = {web_port_endpoint}\n".format(
main_tub_endpoint=main_tub_endpoint,
main_tub_location=main_tub_location,
web_port_endpoint=web_port_endpoint,
)
)
intro_node = yield create_introducer(basedir)

View File

@ -0,0 +1,195 @@
"""
Bring in some Eliot updates from newer versions of Eliot than we can
depend on in Python 2. The implementations are copied from Eliot 1.14 and
only changed enough to add Python 2 compatibility.
Every API in this module (except ``eliot_json_encoder``) should be obsolete as
soon as we depend on Eliot 1.14 or newer.
When that happens:
* replace ``capture_logging``
with ``partial(eliot.testing.capture_logging, encoder_=eliot_json_encoder)``
* replace ``validateLogging``
with ``partial(eliot.testing.validateLogging, encoder_=eliot_json_encoder)``
* replace ``MemoryLogger``
with ``partial(eliot.MemoryLogger, encoder=eliot_json_encoder)``
Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import json as pyjson
from functools import wraps, partial
from eliot import (
MemoryLogger as _MemoryLogger,
)
from eliot.testing import (
check_for_errors,
swap_logger,
)
from .jsonbytes import AnyBytesJSONEncoder
# There are currently a number of log messages that include non-UTF-8 bytes.
# Allow these, at least for now. Later when the whole test suite has been
# converted to our SyncTestCase or AsyncTestCase it will be easier to turn
# this off and then attribute log failures to specific codepaths so they can
# be fixed (and then not regressed later) because those instances will result
# in test failures instead of only garbage being written to the eliot log.
eliot_json_encoder = AnyBytesJSONEncoder
class _CustomEncoderMemoryLogger(_MemoryLogger):
"""
Override message validation from the Eliot-supplied ``MemoryLogger`` to
use our chosen JSON encoder.
This is only necessary on Python 2 where we use an old version of Eliot
that does not parameterize the encoder.
"""
def __init__(self, encoder=eliot_json_encoder):
"""
@param encoder: A JSONEncoder subclass to use when encoding JSON.
"""
self._encoder = encoder
super(_CustomEncoderMemoryLogger, self).__init__()
def _validate_message(self, dictionary, serializer):
"""Validate an individual message.
As a side-effect, the message is replaced with its serialized contents.
@param dictionary: A message C{dict} to be validated. Might be mutated
by the serializer!
@param serializer: C{None} or a serializer.
@raises TypeError: If a field name is not unicode, or the dictionary
fails to serialize to JSON.
@raises eliot.ValidationError: If serializer was given and validation
failed.
"""
if serializer is not None:
serializer.validate(dictionary)
for key in dictionary:
if not isinstance(key, str):
if isinstance(key, bytes):
key.decode("utf-8")
else:
raise TypeError(dictionary, "%r is not unicode" % (key,))
if serializer is not None:
serializer.serialize(dictionary)
try:
pyjson.dumps(dictionary, cls=self._encoder)
except Exception as e:
raise TypeError("Message %s doesn't encode to JSON: %s" % (dictionary, e))
if PY2:
MemoryLogger = partial(_CustomEncoderMemoryLogger, encoder=eliot_json_encoder)
else:
MemoryLogger = partial(_MemoryLogger, encoder=eliot_json_encoder)
def validateLogging(
assertion, *assertionArgs, **assertionKwargs
):
"""
Decorator factory for L{unittest.TestCase} methods to add logging
validation.
1. The decorated test method gets a C{logger} keyword argument, a
L{MemoryLogger}.
2. All messages logged to this logger will be validated at the end of
the test.
3. Any unflushed logged tracebacks will cause the test to fail.
For example:
from unittest import TestCase
from eliot.testing import assertContainsFields, validateLogging
class MyTests(TestCase):
def assertFooLogging(self, logger):
assertContainsFields(self, logger.messages[0], {"key": 123})
@param assertion: A callable that will be called with the
L{unittest.TestCase} instance, the logger and C{assertionArgs} and
C{assertionKwargs} once the actual test has run, allowing for extra
logging-related assertions on the effects of the test. Use L{None} if you
want the cleanup assertions registered but no custom assertions.
@param assertionArgs: Additional positional arguments to pass to
C{assertion}.
@param assertionKwargs: Additional keyword arguments to pass to
C{assertion}.
@param encoder_: C{json.JSONEncoder} subclass to use when validating JSON.
"""
encoder_ = assertionKwargs.pop("encoder_", eliot_json_encoder)
def decorator(function):
@wraps(function)
def wrapper(self, *args, **kwargs):
skipped = False
kwargs["logger"] = logger = MemoryLogger(encoder=encoder_)
self.addCleanup(check_for_errors, logger)
# TestCase runs cleanups in reverse order, and we want this to
# run *before* tracebacks are checked:
if assertion is not None:
self.addCleanup(
lambda: skipped
or assertion(self, logger, *assertionArgs, **assertionKwargs)
)
try:
return function(self, *args, **kwargs)
except self.skipException:
skipped = True
raise
return wrapper
return decorator
# PEP 8 variant:
validate_logging = validateLogging
def capture_logging(
assertion, *assertionArgs, **assertionKwargs
):
"""
Capture and validate all logging that doesn't specify a L{Logger}.
See L{validate_logging} for details on the rest of its behavior.
"""
encoder_ = assertionKwargs.pop("encoder_", eliot_json_encoder)
def decorator(function):
@validate_logging(
assertion, *assertionArgs, encoder_=encoder_, **assertionKwargs
)
@wraps(function)
def wrapper(self, *args, **kwargs):
logger = kwargs["logger"]
previous_logger = swap_logger(logger)
def cleanup():
swap_logger(previous_logger)
self.addCleanup(cleanup)
return function(self, *args, **kwargs)
return wrapper
return decorator

View File

@ -16,12 +16,14 @@ from __future__ import (
)
__all__ = [
"MemoryLogger",
"inline_callbacks",
"eliot_logging_service",
"opt_eliot_destination",
"opt_help_eliot_destinations",
"validateInstanceOf",
"validateSetMembership",
"capture_logging",
]
from future.utils import PY2
@ -32,7 +34,7 @@ from six import ensure_text
from sys import (
stdout,
)
from functools import wraps, partial
from functools import wraps
from logging import (
INFO,
Handler,
@ -66,8 +68,6 @@ from eliot.twisted import (
DeferredContext,
inline_callbacks,
)
from eliot.testing import capture_logging as eliot_capture_logging
from twisted.python.usage import (
UsageError,
)
@ -87,8 +87,11 @@ from twisted.internet.defer import (
)
from twisted.application.service import Service
from .jsonbytes import AnyBytesJSONEncoder
from ._eliot_updates import (
MemoryLogger,
eliot_json_encoder,
capture_logging,
)
def validateInstanceOf(t):
"""
@ -306,7 +309,7 @@ class _DestinationParser(object):
rotateLength=rotate_length,
maxRotatedFiles=max_rotated_files,
)
return lambda reactor: FileDestination(get_file(), AnyBytesJSONEncoder)
return lambda reactor: FileDestination(get_file(), eliot_json_encoder)
_parse_destination_description = _DestinationParser().parse
@ -327,10 +330,3 @@ def log_call_deferred(action_type):
return DeferredContext(d).addActionFinish()
return logged_f
return decorate_log_call_deferred
# On Python 3, encoding bytes to JSON doesn't work, so we have a custom JSON
# encoder we want to use when validating messages.
if PY2:
capture_logging = eliot_capture_logging
else:
capture_logging = partial(eliot_capture_logging, encoder_=AnyBytesJSONEncoder)