diff --git a/.gitignore b/.gitignore index 50a1352a2..7c7fa2afd 100644 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,7 @@ zope.interface-*.egg .pc /src/allmydata/test/plugins/dropin.cache -/_trial_temp* +**/_trial_temp* /tmp* /*.patch /dist/ diff --git a/docs/proposed/http-storage-node-protocol.rst b/docs/proposed/http-storage-node-protocol.rst index 521bf476d..490d3f3ca 100644 --- a/docs/proposed/http-storage-node-protocol.rst +++ b/docs/proposed/http-storage-node-protocol.rst @@ -363,11 +363,11 @@ one branch contains all of the share data; another branch contains all of the lease data; etc. -Authorization is required for all endpoints. +An ``Authorization`` header in requests is required for all endpoints. The standard HTTP authorization protocol is used. The authentication *type* used is ``Tahoe-LAFS``. The swissnum from the NURL used to locate the storage service is used as the *credentials*. -If credentials are not presented or the swissnum is not associated with a storage service then no storage processing is performed and the request receives an ``UNAUTHORIZED`` response. +If credentials are not presented or the swissnum is not associated with a storage service then no storage processing is performed and the request receives an ``401 UNAUTHORIZED`` response. General ~~~~~~~ @@ -396,17 +396,19 @@ For example:: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! Either renew or create a new lease on the bucket addressed by ``storage_index``. -The details of the lease are encoded in the request body. + +The renew secret and cancellation secret should be included as ``X-Tahoe-Authorization`` headers. For example:: - {"renew-secret": "abcd", "cancel-secret": "efgh"} + X-Tahoe-Authorization: lease-renew-secret + X-Tahoe-Authorization: lease-cancel-secret -If the ``renew-secret`` value matches an existing lease +If the ``lease-renew-secret`` value matches an existing lease then the expiration time of that lease will be changed to 31 days after the time of this operation. If it does not match an existing lease -then a new lease will be created with this ``renew-secret`` which expires 31 days after the time of this operation. +then a new lease will be created with this ``lease-renew-secret`` which expires 31 days after the time of this operation. -``renew-secret`` and ``cancel-secret`` values must be 32 bytes long. +``lease-renew-secret`` and ``lease-cancel-secret`` values must be 32 bytes long. The server treats them as opaque values. :ref:`Share Leases` gives details about how the Tahoe-LAFS storage client constructs these values. @@ -423,8 +425,10 @@ In these cases the server takes no action and returns ``NOT FOUND``. Discussion `````````` -We considered an alternative where ``renew-secret`` and ``cancel-secret`` are placed in query arguments on the request path. -We chose to put these values into the request body to make the URL simpler. +We considered an alternative where ``lease-renew-secret`` and ``lease-cancel-secret`` are placed in query arguments on the request path. +This increases chances of leaking secrets in logs. +Putting the secrets in the body reduces the chances of leaking secrets, +but eventually we chose headers as the least likely information to be logged. Several behaviors here are blindly copied from the Foolscap-based storage server protocol. @@ -450,14 +454,22 @@ A lease is also created for the shares. Details of the buckets to create are encoded in the request body. For example:: - {"renew-secret": "efgh", "cancel-secret": "ijkl", - "share-numbers": [1, 7, ...], "allocated-size": 12345} + {"share-numbers": [1, 7, ...], "allocated-size": 12345} + +The request must include ``X-Tahoe-Authorization`` HTTP headers that set the various secrets—upload, lease renewal, lease cancellation—that will be later used to authorize various operations. +For example:: + + X-Tahoe-Authorization: lease-renew-secret + X-Tahoe-Authorization: lease-cancel-secret + X-Tahoe-Authorization: upload-secret The response body includes encoded information about the created buckets. For example:: {"already-have": [1, ...], "allocated": [7, ...]} +The upload secret is an opaque _byte_ string. + Discussion `````````` @@ -482,6 +494,20 @@ The response includes ``already-have`` and ``allocated`` for two reasons: This might be because a server has become unavailable and a remaining server needs to store more shares for the upload. It could also just be that the client's preferred servers have changed. +Regarding upload secrets, +the goal is for uploading and aborting (see next sections) to be authenticated by more than just the storage index. +In the future, we may want to generate them in a way that allows resuming/canceling when the client has issues. +In the short term, they can just be a random byte string. +The primary security constraint is that each upload to each server has its own unique upload key, +tied to uploading that particular storage index to this particular server. + +Rejected designs for upload secrets: + +* Upload secret per share number. + In order to make the secret unguessable by attackers, which includes other servers, + it must contain randomness. + Randomness means there is no need to have a secret per share, since adding share-specific content to randomness doesn't actually make the secret any better. + ``PATCH /v1/immutable/:storage_index/:share_number`` !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! @@ -498,6 +524,12 @@ If any one of these requests fails then at most 128KiB of upload work needs to b The server must recognize when all of the data has been received and mark the share as complete (which it can do because it was informed of the size when the storage index was initialized). +The request must include a ``X-Tahoe-Authorization`` header that includes the upload secret:: + + X-Tahoe-Authorization: upload-secret + +Responses: + * When a chunk that does not complete the share is successfully uploaded the response is ``OK``. The response body indicates the range of share data that has yet to be uploaded. That is:: @@ -522,6 +554,10 @@ The server must recognize when all of the data has been received and mark the sh This cancels an *in-progress* upload. +The request must include a ``X-Tahoe-Authorization`` header that includes the upload secret:: + + X-Tahoe-Authorization: upload-secret + The response code: * When the upload is still in progress and therefore the abort has succeeded, @@ -619,16 +655,16 @@ The first write operation on a mutable storage index creates it (that is, there is no separate "create this storage index" operation as there is for the immutable storage index type). -The request body includes the secrets necessary to rewrite to the shares -along with test, read, and write vectors for the operation. +The request must include ``X-Tahoe-Authorization`` headers with write enabler and lease secrets:: + + X-Tahoe-Authorization: write-enabler + X-Tahoe-Authorization: lease-cancel-secret + X-Tahoe-Authorization: lease-renew-secret + +The request body includes test, read, and write vectors for the operation. For example:: { - "secrets": { - "write-enabler": "abcd", - "lease-renew": "efgh", - "lease-cancel": "ijkl" - }, "test-write-vectors": { 0: { "test": [{ @@ -694,8 +730,12 @@ Immutable Data 1. Create a bucket for storage index ``AAAAAAAAAAAAAAAA`` to hold two immutable shares, discovering that share ``1`` was already uploaded:: POST /v1/immutable/AAAAAAAAAAAAAAAA - {"renew-secret": "efgh", "cancel-secret": "ijkl", - "share-numbers": [1, 7], "allocated-size": 48} + Authorization: Tahoe-LAFS nurl-swissnum + X-Tahoe-Authorization: lease-renew-secret efgh + X-Tahoe-Authorization: lease-cancel-secret jjkl + X-Tahoe-Authorization: upload-secret xyzf + + {"share-numbers": [1, 7], "allocated-size": 48} 200 OK {"already-have": [1], "allocated": [7]} @@ -703,26 +743,34 @@ Immutable Data #. Upload the content for immutable share ``7``:: PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7 + Authorization: Tahoe-LAFS nurl-swissnum Content-Range: bytes 0-15/48 + X-Tahoe-Authorization: upload-secret xyzf 200 OK PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7 + Authorization: Tahoe-LAFS nurl-swissnum Content-Range: bytes 16-31/48 + X-Tahoe-Authorization: upload-secret xyzf 200 OK PATCH /v1/immutable/AAAAAAAAAAAAAAAA/7 + Authorization: Tahoe-LAFS nurl-swissnum Content-Range: bytes 32-47/48 + X-Tahoe-Authorization: upload-secret xyzf 201 CREATED #. Download the content of the previously uploaded immutable share ``7``:: - GET /v1/immutable/AAAAAAAAAAAAAAAA?share=7&offset=0&size=48 + GET /v1/immutable/AAAAAAAAAAAAAAAA?share=7 + Authorization: Tahoe-LAFS nurl-swissnum + Range: bytes=0-47 200 OK @@ -730,7 +778,9 @@ Immutable Data #. Renew the lease on all immutable shares in bucket ``AAAAAAAAAAAAAAAA``:: PUT /v1/lease/AAAAAAAAAAAAAAAA - {"renew-secret": "efgh", "cancel-secret": "ijkl"} + Authorization: Tahoe-LAFS nurl-swissnum + X-Tahoe-Authorization: lease-cancel-secret jjkl + X-Tahoe-Authorization: lease-renew-secret efgh 204 NO CONTENT @@ -743,12 +793,12 @@ if there is no existing share, otherwise it will read a byte which won't match `b""`:: POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write + Authorization: Tahoe-LAFS nurl-swissnum + X-Tahoe-Authorization: write-enabler abcd + X-Tahoe-Authorization: lease-cancel-secret efgh + X-Tahoe-Authorization: lease-renew-secret ijkl + { - "secrets": { - "write-enabler": "abcd", - "lease-renew": "efgh", - "lease-cancel": "ijkl" - }, "test-write-vectors": { 3: { "test": [{ @@ -775,12 +825,12 @@ otherwise it will read a byte which won't match `b""`:: #. Safely rewrite the contents of a known version of mutable share number ``3`` (or fail):: POST /v1/mutable/BBBBBBBBBBBBBBBB/read-test-write + Authorization: Tahoe-LAFS nurl-swissnum + X-Tahoe-Authorization: write-enabler abcd + X-Tahoe-Authorization: lease-cancel-secret efgh + X-Tahoe-Authorization: lease-renew-secret ijkl + { - "secrets": { - "write-enabler": "abcd", - "lease-renew": "efgh", - "lease-cancel": "ijkl" - }, "test-write-vectors": { 3: { "test": [{ @@ -807,12 +857,16 @@ otherwise it will read a byte which won't match `b""`:: #. Download the contents of share number ``3``:: GET /v1/mutable/BBBBBBBBBBBBBBBB?share=3&offset=0&size=10 + Authorization: Tahoe-LAFS nurl-swissnum + #. Renew the lease on previously uploaded mutable share in slot ``BBBBBBBBBBBBBBBB``:: PUT /v1/lease/BBBBBBBBBBBBBBBB - {"renew-secret": "efgh", "cancel-secret": "ijkl"} + Authorization: Tahoe-LAFS nurl-swissnum + X-Tahoe-Authorization: lease-cancel-secret efgh + X-Tahoe-Authorization: lease-renew-secret ijkl 204 NO CONTENT diff --git a/integration/test_tor.py b/integration/test_tor.py index 15d888e36..b0419f0d2 100644 --- a/integration/test_tor.py +++ b/integration/test_tor.py @@ -35,6 +35,9 @@ from allmydata.test.common import ( if sys.platform.startswith('win'): pytest.skip('Skipping Tor tests on Windows', allow_module_level=True) +if PY2: + pytest.skip('Skipping Tor tests on Python 2 because dependencies are hard to come by', allow_module_level=True) + @pytest_twisted.inlineCallbacks def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl): yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl) diff --git a/newsfragments/3527.minor b/newsfragments/3527.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3807.feature b/newsfragments/3807.feature new file mode 100644 index 000000000..f82363ffd --- /dev/null +++ b/newsfragments/3807.feature @@ -0,0 +1 @@ +If uploading an immutable hasn't had a write for 30 minutes, the storage server will abort the upload. \ No newline at end of file diff --git a/newsfragments/3820.minor b/newsfragments/3820.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3833.minor b/newsfragments/3833.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3834.minor b/newsfragments/3834.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3836.minor b/newsfragments/3836.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3837.other b/newsfragments/3837.other new file mode 100644 index 000000000..a9e4e6986 --- /dev/null +++ b/newsfragments/3837.other @@ -0,0 +1 @@ +Tahoe-LAFS no longer runs its Tor integration test suite on Python 2 due to the increased complexity of obtaining compatible versions of necessary dependencies. diff --git a/newsfragments/3838.minor b/newsfragments/3838.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3842.minor b/newsfragments/3842.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3843.minor b/newsfragments/3843.minor new file mode 100644 index 000000000..e69de29bb diff --git a/newsfragments/3847.minor b/newsfragments/3847.minor new file mode 100644 index 000000000..e69de29bb diff --git a/nix/cbor2.nix b/nix/cbor2.nix new file mode 100644 index 000000000..16ca8ff63 --- /dev/null +++ b/nix/cbor2.nix @@ -0,0 +1,20 @@ +{ lib, buildPythonPackage, fetchPypi, setuptools_scm }: +buildPythonPackage rec { + pname = "cbor2"; + version = "5.2.0"; + + src = fetchPypi { + sha256 = "1gwlgjl70vlv35cgkcw3cg7b5qsmws36hs4mmh0l9msgagjs4fm3"; + inherit pname version; + }; + + doCheck = false; + + propagatedBuildInputs = [ setuptools_scm ]; + + meta = with lib; { + homepage = https://github.com/agronholm/cbor2; + description = "CBOR encoder/decoder"; + license = licenses.mit; + }; +} diff --git a/nix/overlays.nix b/nix/overlays.nix index fbd0ce3bb..92f36e93e 100644 --- a/nix/overlays.nix +++ b/nix/overlays.nix @@ -21,6 +21,9 @@ self: super: { # collections-extended is not part of nixpkgs at this time. collections-extended = python-super.pythonPackages.callPackage ./collections-extended.nix { }; + + # cbor2 is not part of nixpkgs at this time. + cbor2 = python-super.pythonPackages.callPackage ./cbor2.nix { }; }; }; diff --git a/nix/tahoe-lafs.nix b/nix/tahoe-lafs.nix index e42afc57f..59864d36d 100644 --- a/nix/tahoe-lafs.nix +++ b/nix/tahoe-lafs.nix @@ -4,7 +4,7 @@ , setuptools, setuptoolsTrial, pyasn1, zope_interface , service-identity, pyyaml, magic-wormhole, treq, appdirs , beautifulsoup4, eliot, autobahn, cryptography, netifaces -, html5lib, pyutil, distro, configparser +, html5lib, pyutil, distro, configparser, klein, cbor2 }: python.pkgs.buildPythonPackage rec { # Most of the time this is not exactly the release version (eg 1.16.0). @@ -95,9 +95,10 @@ EOF propagatedBuildInputs = with python.pkgs; [ twisted foolscap zfec appdirs setuptoolsTrial pyasn1 zope_interface - service-identity pyyaml magic-wormhole treq + service-identity pyyaml magic-wormhole eliot autobahn cryptography netifaces setuptools future pyutil distro configparser collections-extended + klein cbor2 treq ]; checkInputs = with python.pkgs; [ diff --git a/setup.py b/setup.py index 8c6396937..7e7a955c6 100644 --- a/setup.py +++ b/setup.py @@ -140,6 +140,11 @@ install_requires = [ # For the RangeMap datastructure. "collections-extended", + + # HTTP server and client + "klein", + "treq", + "cbor2" ] setup_requires = [ @@ -397,7 +402,6 @@ setup(name="tahoe-lafs", # also set in __init__.py # Python 2.7. "decorator < 5", "hypothesis >= 3.6.1", - "treq", "towncrier", "testtools", "fixtures", diff --git a/src/allmydata/scripts/common.py b/src/allmydata/scripts/common.py index 0a9ab8714..c9fc8e031 100644 --- a/src/allmydata/scripts/common.py +++ b/src/allmydata/scripts/common.py @@ -141,7 +141,9 @@ def write_introducer(basedir, petname, furl): """ if isinstance(furl, bytes): furl = furl.decode("utf-8") - basedir.child(b"private").child(b"introducers.yaml").setContent( + private = basedir.child(b"private") + private.makedirs(ignoreExistingDirectory=True) + private.child(b"introducers.yaml").setContent( safe_dump({ "introducers": { petname: { diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index 4d3f4cb21..260cca55b 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -15,15 +15,22 @@ try: except ImportError: pass - -# do not import any allmydata modules at this level. Do that from inside -# individual functions instead. import struct, time, os, sys + from twisted.python import usage, failure from twisted.internet import defer from foolscap.logging import cli as foolscap_cli -from allmydata.scripts.common import BaseOptions +from allmydata.scripts.common import BaseOptions +from allmydata import uri +from allmydata.storage.mutable import MutableShareFile +from allmydata.storage.immutable import ShareFile +from allmydata.mutable.layout import unpack_share +from allmydata.mutable.layout import MDMFSlotReadProxy +from allmydata.mutable.common import NeedMoreDataError +from allmydata.immutable.layout import ReadBucketProxy +from allmydata.util import base32 +from allmydata.util.encodingutil import quote_output class DumpOptions(BaseOptions): def getSynopsis(self): @@ -56,13 +63,11 @@ def dump_share(options): # check the version, to see if we have a mutable or immutable share print("share filename: %s" % quote_output(options['filename']), file=out) - f = open(options['filename'], "rb") - prefix = f.read(32) - f.close() - if prefix == MutableShareFile.MAGIC: - return dump_mutable_share(options) - # otherwise assume it's immutable - return dump_immutable_share(options) + with open(options['filename'], "rb") as f: + if MutableShareFile.is_valid_header(f.read(32)): + return dump_mutable_share(options) + # otherwise assume it's immutable + return dump_immutable_share(options) def dump_immutable_share(options): from allmydata.storage.immutable import ShareFile @@ -712,125 +717,122 @@ def call(c, *args, **kwargs): return results[0] def describe_share(abs_sharefile, si_s, shnum_s, now, out): - from allmydata import uri - from allmydata.storage.mutable import MutableShareFile - from allmydata.storage.immutable import ShareFile - from allmydata.mutable.layout import unpack_share - from allmydata.mutable.common import NeedMoreDataError - from allmydata.immutable.layout import ReadBucketProxy - from allmydata.util import base32 - from allmydata.util.encodingutil import quote_output - import struct - - f = open(abs_sharefile, "rb") - prefix = f.read(32) - - if prefix == MutableShareFile.MAGIC: - # mutable share - m = MutableShareFile(abs_sharefile) - WE, nodeid = m._read_write_enabler_and_nodeid(f) - data_length = m._read_data_length(f) - expiration_time = min( [lease.get_expiration_time() - for (i,lease) in m._enumerate_leases(f)] ) - expiration = max(0, expiration_time - now) - - share_type = "unknown" - f.seek(m.DATA_OFFSET) - version = f.read(1) - if version == b"\x00": - # this slot contains an SMDF share - share_type = "SDMF" - elif version == b"\x01": - share_type = "MDMF" - - if share_type == "SDMF": - f.seek(m.DATA_OFFSET) - data = f.read(min(data_length, 2000)) - - try: - pieces = unpack_share(data) - except NeedMoreDataError as e: - # retry once with the larger size - size = e.needed_bytes - f.seek(m.DATA_OFFSET) - data = f.read(min(data_length, size)) - pieces = unpack_share(data) - (seqnum, root_hash, IV, k, N, segsize, datalen, - pubkey, signature, share_hash_chain, block_hash_tree, - share_data, enc_privkey) = pieces - - print("SDMF %s %d/%d %d #%d:%s %d %s" % \ - (si_s, k, N, datalen, - seqnum, str(base32.b2a(root_hash), "utf-8"), - expiration, quote_output(abs_sharefile)), file=out) - elif share_type == "MDMF": - from allmydata.mutable.layout import MDMFSlotReadProxy - fake_shnum = 0 - # TODO: factor this out with dump_MDMF_share() - class ShareDumper(MDMFSlotReadProxy): - def _read(self, readvs, force_remote=False, queue=False): - data = [] - for (where,length) in readvs: - f.seek(m.DATA_OFFSET+where) - data.append(f.read(length)) - return defer.succeed({fake_shnum: data}) - - p = ShareDumper(None, "fake-si", fake_shnum) - def extract(func): - stash = [] - # these methods return Deferreds, but we happen to know that - # they run synchronously when not actually talking to a - # remote server - d = func() - d.addCallback(stash.append) - return stash[0] - - verinfo = extract(p.get_verinfo) - (seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix, - offsets) = verinfo - print("MDMF %s %d/%d %d #%d:%s %d %s" % \ - (si_s, k, N, datalen, - seqnum, str(base32.b2a(root_hash), "utf-8"), - expiration, quote_output(abs_sharefile)), file=out) + with open(abs_sharefile, "rb") as f: + prefix = f.read(32) + if MutableShareFile.is_valid_header(prefix): + _describe_mutable_share(abs_sharefile, f, now, si_s, out) + elif ShareFile.is_valid_header(prefix): + _describe_immutable_share(abs_sharefile, now, si_s, out) else: - print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out) + print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out) - elif struct.unpack(">L", prefix[:4]) == (1,): - # immutable +def _describe_mutable_share(abs_sharefile, f, now, si_s, out): + # mutable share + m = MutableShareFile(abs_sharefile) + WE, nodeid = m._read_write_enabler_and_nodeid(f) + data_length = m._read_data_length(f) + expiration_time = min( [lease.get_expiration_time() + for (i,lease) in m._enumerate_leases(f)] ) + expiration = max(0, expiration_time - now) - class ImmediateReadBucketProxy(ReadBucketProxy): - def __init__(self, sf): - self.sf = sf - ReadBucketProxy.__init__(self, None, None, "") - def __repr__(self): - return "" - def _read(self, offset, size): - return defer.succeed(sf.read_share_data(offset, size)) + share_type = "unknown" + f.seek(m.DATA_OFFSET) + version = f.read(1) + if version == b"\x00": + # this slot contains an SMDF share + share_type = "SDMF" + elif version == b"\x01": + share_type = "MDMF" - # use a ReadBucketProxy to parse the bucket and find the uri extension - sf = ShareFile(abs_sharefile) - bp = ImmediateReadBucketProxy(sf) + if share_type == "SDMF": + f.seek(m.DATA_OFFSET) - expiration_time = min( [lease.get_expiration_time() - for lease in sf.get_leases()] ) - expiration = max(0, expiration_time - now) + # Read at least the mutable header length, if possible. If there's + # less data than that in the share, don't try to read more (we won't + # be able to unpack the header in this case but we surely don't want + # to try to unpack bytes *following* the data section as if they were + # header data). Rather than 2000 we could use HEADER_LENGTH from + # allmydata/mutable/layout.py, probably. + data = f.read(min(data_length, 2000)) - UEB_data = call(bp.get_uri_extension) - unpacked = uri.unpack_extension_readable(UEB_data) + try: + pieces = unpack_share(data) + except NeedMoreDataError as e: + # retry once with the larger size + size = e.needed_bytes + f.seek(m.DATA_OFFSET) + data = f.read(min(data_length, size)) + pieces = unpack_share(data) + (seqnum, root_hash, IV, k, N, segsize, datalen, + pubkey, signature, share_hash_chain, block_hash_tree, + share_data, enc_privkey) = pieces - k = unpacked["needed_shares"] - N = unpacked["total_shares"] - filesize = unpacked["size"] - ueb_hash = unpacked["UEB_hash"] + print("SDMF %s %d/%d %d #%d:%s %d %s" % \ + (si_s, k, N, datalen, + seqnum, str(base32.b2a(root_hash), "utf-8"), + expiration, quote_output(abs_sharefile)), file=out) + elif share_type == "MDMF": + fake_shnum = 0 + # TODO: factor this out with dump_MDMF_share() + class ShareDumper(MDMFSlotReadProxy): + def _read(self, readvs, force_remote=False, queue=False): + data = [] + for (where,length) in readvs: + f.seek(m.DATA_OFFSET+where) + data.append(f.read(length)) + return defer.succeed({fake_shnum: data}) - print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize, - str(ueb_hash, "utf-8"), expiration, - quote_output(abs_sharefile)), file=out) + p = ShareDumper(None, "fake-si", fake_shnum) + def extract(func): + stash = [] + # these methods return Deferreds, but we happen to know that + # they run synchronously when not actually talking to a + # remote server + d = func() + d.addCallback(stash.append) + return stash[0] + verinfo = extract(p.get_verinfo) + (seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix, + offsets) = verinfo + print("MDMF %s %d/%d %d #%d:%s %d %s" % \ + (si_s, k, N, datalen, + seqnum, str(base32.b2a(root_hash), "utf-8"), + expiration, quote_output(abs_sharefile)), file=out) else: - print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out) + print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out) + + +def _describe_immutable_share(abs_sharefile, now, si_s, out): + class ImmediateReadBucketProxy(ReadBucketProxy): + def __init__(self, sf): + self.sf = sf + ReadBucketProxy.__init__(self, None, None, "") + def __repr__(self): + return "" + def _read(self, offset, size): + return defer.succeed(sf.read_share_data(offset, size)) + + # use a ReadBucketProxy to parse the bucket and find the uri extension + sf = ShareFile(abs_sharefile) + bp = ImmediateReadBucketProxy(sf) + + expiration_time = min(lease.get_expiration_time() + for lease in sf.get_leases()) + expiration = max(0, expiration_time - now) + + UEB_data = call(bp.get_uri_extension) + unpacked = uri.unpack_extension_readable(UEB_data) + + k = unpacked["needed_shares"] + N = unpacked["total_shares"] + filesize = unpacked["size"] + ueb_hash = unpacked["UEB_hash"] + + print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize, + str(ueb_hash, "utf-8"), expiration, + quote_output(abs_sharefile)), file=out) - f.close() def catalog_shares(options): from allmydata.util.encodingutil import listdir_unicode, quote_output @@ -933,34 +935,35 @@ def corrupt_share(options): f.write(d) f.close() - f = open(fn, "rb") - prefix = f.read(32) - f.close() - if prefix == MutableShareFile.MAGIC: - # mutable - m = MutableShareFile(fn) - f = open(fn, "rb") - f.seek(m.DATA_OFFSET) - data = f.read(2000) - # make sure this slot contains an SMDF share - assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported" - f.close() + with open(fn, "rb") as f: + prefix = f.read(32) - (version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize, - ig_datalen, offsets) = unpack_header(data) + if MutableShareFile.is_valid_header(prefix): + # mutable + m = MutableShareFile(fn) + with open(fn, "rb") as f: + f.seek(m.DATA_OFFSET) + # Read enough data to get a mutable header to unpack. + data = f.read(2000) + # make sure this slot contains an SMDF share + assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported" + f.close() - assert version == 0, "we only handle v0 SDMF files" - start = m.DATA_OFFSET + offsets["share_data"] - end = m.DATA_OFFSET + offsets["enc_privkey"] - flip_bit(start, end) - else: - # otherwise assume it's immutable - f = ShareFile(fn) - bp = ReadBucketProxy(None, None, '') - offsets = bp._parse_offsets(f.read_share_data(0, 0x24)) - start = f._data_offset + offsets["data"] - end = f._data_offset + offsets["plaintext_hash_tree"] - flip_bit(start, end) + (version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize, + ig_datalen, offsets) = unpack_header(data) + + assert version == 0, "we only handle v0 SDMF files" + start = m.DATA_OFFSET + offsets["share_data"] + end = m.DATA_OFFSET + offsets["enc_privkey"] + flip_bit(start, end) + else: + # otherwise assume it's immutable + f = ShareFile(fn) + bp = ReadBucketProxy(None, None, '') + offsets = bp._parse_offsets(f.read_share_data(0, 0x24)) + start = f._data_offset + offsets["data"] + end = f._data_offset + offsets["plaintext_hash_tree"] + flip_bit(start, end) diff --git a/src/allmydata/storage/http_client.py b/src/allmydata/storage/http_client.py new file mode 100644 index 000000000..f8a7590aa --- /dev/null +++ b/src/allmydata/storage/http_client.py @@ -0,0 +1,79 @@ +""" +HTTP client that talks to the HTTP storage server. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 + +if PY2: + # fmt: off + from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 + # fmt: on +else: + # typing module not available in Python 2, and we only do type checking in + # Python 3 anyway. + from typing import Union + from treq.testing import StubTreq + +import base64 + +# TODO Make sure to import Python version? +from cbor2 import loads + + +from twisted.web.http_headers import Headers +from twisted.internet.defer import inlineCallbacks, returnValue, fail +from hyperlink import DecodedURL +import treq + + +class ClientException(Exception): + """An unexpected error.""" + + +def _decode_cbor(response): + """Given HTTP response, return decoded CBOR body.""" + if response.code > 199 and response.code < 300: + return treq.content(response).addCallback(loads) + return fail(ClientException(response.code, response.phrase)) + + +def swissnum_auth_header(swissnum): # type: (bytes) -> bytes + """Return value for ``Authentication`` header.""" + return b"Tahoe-LAFS " + base64.b64encode(swissnum).strip() + + +class StorageClient(object): + """ + HTTP client that talks to the HTTP storage server. + """ + + def __init__( + self, url, swissnum, treq=treq + ): # type: (DecodedURL, bytes, Union[treq,StubTreq]) -> None + self._base_url = url + self._swissnum = swissnum + self._treq = treq + + def _get_headers(self): # type: () -> Headers + """Return the basic headers to be used by default.""" + headers = Headers() + headers.addRawHeader( + "Authorization", + swissnum_auth_header(self._swissnum), + ) + return headers + + @inlineCallbacks + def get_version(self): + """ + Return the version metadata for the server. + """ + url = self._base_url.click("/v1/version") + response = yield self._treq.get(url, headers=self._get_headers()) + decoded_response = yield _decode_cbor(response) + returnValue(decoded_response) diff --git a/src/allmydata/storage/http_server.py b/src/allmydata/storage/http_server.py new file mode 100644 index 000000000..327892ecd --- /dev/null +++ b/src/allmydata/storage/http_server.py @@ -0,0 +1,94 @@ +""" +HTTP server for storage. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 + +if PY2: + # fmt: off + from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 + # fmt: on + +from functools import wraps + +from klein import Klein +from twisted.web import http + +# TODO Make sure to use pure Python versions? +from cbor2 import dumps + +from .server import StorageServer +from .http_client import swissnum_auth_header + + +def _authorization_decorator(f): + """ + Check the ``Authorization`` header, and (TODO: in later revision of code) + extract ``X-Tahoe-Authorization`` headers and pass them in. + """ + + @wraps(f) + def route(self, request, *args, **kwargs): + if request.requestHeaders.getRawHeaders("Authorization", [None])[0] != str( + swissnum_auth_header(self._swissnum), "ascii" + ): + request.setResponseCode(http.UNAUTHORIZED) + return b"" + # authorization = request.requestHeaders.getRawHeaders("X-Tahoe-Authorization", []) + # For now, just a placeholder: + authorization = None + return f(self, request, authorization, *args, **kwargs) + + return route + + +def _authorized_route(app, *route_args, **route_kwargs): + """ + Like Klein's @route, but with additional support for checking the + ``Authorization`` header as well as ``X-Tahoe-Authorization`` headers. The + latter will (TODO: in later revision of code) get passed in as second + argument to wrapped functions. + """ + + def decorator(f): + @app.route(*route_args, **route_kwargs) + @_authorization_decorator + def handle_route(*args, **kwargs): + return f(*args, **kwargs) + + return handle_route + + return decorator + + +class HTTPServer(object): + """ + A HTTP interface to the storage server. + """ + + _app = Klein() + + def __init__( + self, storage_server, swissnum + ): # type: (StorageServer, bytes) -> None + self._storage_server = storage_server + self._swissnum = swissnum + + def get_resource(self): + """Return twisted.web ``Resource`` for this object.""" + return self._app.resource() + + def _cbor(self, request, data): + """Return CBOR-encoded data.""" + request.setHeader("Content-Type", "application/cbor") + # TODO if data is big, maybe want to use a temporary file eventually... + return dumps(data) + + @_authorized_route(_app, "/v1/version", methods=["GET"]) + def version(self, request, authorization): + return self._cbor(request, self._storage_server.remote_get_version()) diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index 7712e568a..08b83cd87 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -24,7 +24,6 @@ from allmydata.interfaces import ( ) from allmydata.util import base32, fileutil, log from allmydata.util.assertutil import precondition -from allmydata.util.hashutil import timing_safe_compare from allmydata.storage.lease import LeaseInfo from allmydata.storage.common import UnknownImmutableContainerVersionError @@ -57,6 +56,21 @@ class ShareFile(object): LEASE_SIZE = struct.calcsize(">L32s32sL") sharetype = "immutable" + @classmethod + def is_valid_header(cls, header): + # type: (bytes) -> bool + """ + Determine if the given bytes constitute a valid header for this type of + container. + + :param header: Some bytes from the beginning of a container. + + :return: ``True`` if the bytes could belong to this container, + ``False`` otherwise. + """ + (version,) = struct.unpack(">L", header[:4]) + return version == 1 + def __init__(self, filename, max_size=None, create=False): """ If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """ precondition((max_size is not None) or (not create), max_size, create) @@ -165,7 +179,7 @@ class ShareFile(object): secret. """ for i,lease in enumerate(self.get_leases()): - if timing_safe_compare(lease.renew_secret, renew_secret): + if lease.is_renew_secret(renew_secret): # yup. See if we need to update the owner time. if allow_backdate or new_expire_time > lease.get_expiration_time(): # yes @@ -194,7 +208,7 @@ class ShareFile(object): leases = list(self.get_leases()) num_leases_removed = 0 for i,lease in enumerate(leases): - if timing_safe_compare(lease.cancel_secret, cancel_secret): + if lease.is_cancel_secret(cancel_secret): leases[i] = None num_leases_removed += 1 if not num_leases_removed: @@ -219,7 +233,7 @@ class ShareFile(object): @implementer(RIBucketWriter) class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 - def __init__(self, ss, incominghome, finalhome, max_size, lease_info): + def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock): self.ss = ss self.incominghome = incominghome self.finalhome = finalhome @@ -231,12 +245,16 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 # added by simultaneous uploaders self._sharefile.add_lease(lease_info) self._already_written = RangeMap() + self._clock = clock + self._timeout = clock.callLater(30 * 60, self._abort_due_to_timeout) def allocated_size(self): return self._max_size def remote_write(self, offset, data): - start = time.time() + # Delay the timeout, since we received data: + self._timeout.reset(30 * 60) + start = self._clock.seconds() precondition(not self.closed) if self.throw_out_all_data: return @@ -254,12 +272,16 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 self._sharefile.write_share_data(offset, data) self._already_written.set(True, offset, end) - self.ss.add_latency("write", time.time() - start) + self.ss.add_latency("write", self._clock.seconds() - start) self.ss.count("write") def remote_close(self): + self.close() + + def close(self): precondition(not self.closed) - start = time.time() + self._timeout.cancel() + start = self._clock.seconds() fileutil.make_dirs(os.path.dirname(self.finalhome)) fileutil.rename(self.incominghome, self.finalhome) @@ -292,20 +314,28 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 filelen = os.stat(self.finalhome)[stat.ST_SIZE] self.ss.bucket_writer_closed(self, filelen) - self.ss.add_latency("close", time.time() - start) + self.ss.add_latency("close", self._clock.seconds() - start) self.ss.count("close") def disconnected(self): if not self.closed: - self._abort() + self.abort() + + def _abort_due_to_timeout(self): + """ + Called if we run out of time. + """ + log.msg("storage: aborting sharefile %s due to timeout" % self.incominghome, + facility="tahoe.storage", level=log.UNUSUAL) + self.abort() def remote_abort(self): log.msg("storage: aborting sharefile %s" % self.incominghome, facility="tahoe.storage", level=log.UNUSUAL) - self._abort() + self.abort() self.ss.count("abort") - def _abort(self): + def abort(self): if self.closed: return @@ -323,6 +353,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 self.closed = True self.ss.bucket_writer_closed(self, 0) + # Cancel timeout if it wasn't already cancelled. + if self._timeout.active(): + self._timeout.cancel() + @implementer(RIBucketReader) class BucketReader(Referenceable): # type: ignore # warner/foolscap#78 diff --git a/src/allmydata/storage/lease.py b/src/allmydata/storage/lease.py index 17683a888..ff96ebaf4 100644 --- a/src/allmydata/storage/lease.py +++ b/src/allmydata/storage/lease.py @@ -15,6 +15,8 @@ import struct, time import attr +from allmydata.util.hashutil import timing_safe_compare + @attr.s(frozen=True) class LeaseInfo(object): """ @@ -68,6 +70,26 @@ class LeaseInfo(object): _expiration_time=new_expire_time, ) + def is_renew_secret(self, candidate_secret): + # type: (bytes) -> bool + """ + Check a string to see if it is the correct renew secret. + + :return: ``True`` if it is the correct renew secret, ``False`` + otherwise. + """ + return timing_safe_compare(self.renew_secret, candidate_secret) + + def is_cancel_secret(self, candidate_secret): + # type: (bytes) -> bool + """ + Check a string to see if it is the correct cancel secret. + + :return: ``True`` if it is the correct cancel secret, ``False`` + otherwise. + """ + return timing_safe_compare(self.cancel_secret, candidate_secret) + def get_grant_renew_time_time(self): # hack, based upon fixed 31day expiration period return self._expiration_time - 31*24*60*60 diff --git a/src/allmydata/storage/mutable.py b/src/allmydata/storage/mutable.py index de840b89a..9480a3c03 100644 --- a/src/allmydata/storage/mutable.py +++ b/src/allmydata/storage/mutable.py @@ -67,6 +67,20 @@ class MutableShareFile(object): MAX_SIZE = MAX_MUTABLE_SHARE_SIZE # TODO: decide upon a policy for max share size + @classmethod + def is_valid_header(cls, header): + # type: (bytes) -> bool + """ + Determine if the given bytes constitute a valid header for this type of + container. + + :param header: Some bytes from the beginning of a container. + + :return: ``True`` if the bytes could belong to this container, + ``False`` otherwise. + """ + return header.startswith(cls.MAGIC) + def __init__(self, filename, parent=None): self.home = filename if os.path.exists(self.home): @@ -77,7 +91,7 @@ class MutableShareFile(object): write_enabler_nodeid, write_enabler, data_length, extra_least_offset) = \ struct.unpack(">32s20s32sQQ", data) - if magic != self.MAGIC: + if not self.is_valid_header(data): msg = "sharefile %s had magic '%r' but we wanted '%r'" % \ (filename, magic, self.MAGIC) raise UnknownMutableContainerVersionError(msg) @@ -313,7 +327,7 @@ class MutableShareFile(object): accepting_nodeids = set() with open(self.home, 'rb+') as f: for (leasenum,lease) in self._enumerate_leases(f): - if timing_safe_compare(lease.renew_secret, renew_secret): + if lease.is_renew_secret(renew_secret): # yup. See if we need to update the owner time. if allow_backdate or new_expire_time > lease.get_expiration_time(): # yes @@ -357,7 +371,7 @@ class MutableShareFile(object): with open(self.home, 'rb+') as f: for (leasenum,lease) in self._enumerate_leases(f): accepting_nodeids.add(lease.nodeid) - if timing_safe_compare(lease.cancel_secret, cancel_secret): + if lease.is_cancel_secret(cancel_secret): self._write_lease_record(f, leasenum, blank_lease) modified += 1 else: @@ -388,7 +402,7 @@ class MutableShareFile(object): write_enabler_nodeid, write_enabler, data_length, extra_least_offset) = \ struct.unpack(">32s20s32sQQ", data) - assert magic == self.MAGIC + assert self.is_valid_header(data) return (write_enabler, write_enabler_nodeid) def readv(self, readv): diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 041783a4e..7dc277e39 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -14,12 +14,13 @@ if PY2: else: from typing import Dict -import os, re, struct, time +import os, re import six from foolscap.api import Referenceable from foolscap.ipb import IRemoteReference from twisted.application import service +from twisted.internet import reactor from zope.interface import implementer from allmydata.interfaces import RIStorageServer, IStatsProducer @@ -57,6 +58,9 @@ DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60 @implementer(RIStorageServer, IStatsProducer) class StorageServer(service.MultiService, Referenceable): + """ + A filesystem-based implementation of ``RIStorageServer``. + """ name = 'storage' LeaseCheckerClass = LeaseCheckingCrawler @@ -68,7 +72,7 @@ class StorageServer(service.MultiService, Referenceable): expiration_override_lease_duration=None, expiration_cutoff_date=None, expiration_sharetypes=("mutable", "immutable"), - get_current_time=time.time): + clock=reactor): service.MultiService.__init__(self) assert isinstance(nodeid, bytes) assert len(nodeid) == 20 @@ -119,7 +123,7 @@ class StorageServer(service.MultiService, Referenceable): expiration_cutoff_date, expiration_sharetypes) self.lease_checker.setServiceParent(self) - self._get_current_time = get_current_time + self._clock = clock # Currently being-written Bucketwriters. For Foolscap, lifetime is tied # to connection: when disconnection happens, the BucketWriters are @@ -132,6 +136,12 @@ class StorageServer(service.MultiService, Referenceable): # Canaries and disconnect markers for BucketWriters created via Foolscap: self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)] + def stopService(self): + # Cancel any in-progress uploads: + for bw in list(self._bucket_writers.values()): + bw.disconnected() + return service.MultiService.stopService(self) + def __repr__(self): return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) @@ -277,14 +287,19 @@ class StorageServer(service.MultiService, Referenceable): def _allocate_buckets(self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, - owner_num=0): + owner_num=0, renew_leases=True): """ Generic bucket allocation API. + + :param bool renew_leases: If and only if this is ``True`` then renew a + secret-matching lease on (or, if none match, add a new lease to) + existing shares in this bucket. Any *new* shares are given a new + lease regardless. """ # owner_num is not for clients to set, but rather it should be # curried into the PersonalStorageServer instance that is dedicated # to a particular owner. - start = self._get_current_time() + start = self._clock.seconds() self.count("allocate") alreadygot = set() bucketwriters = {} # k: shnum, v: BucketWriter @@ -297,7 +312,7 @@ class StorageServer(service.MultiService, Referenceable): # goes into the share files themselves. It could also be put into a # separate database. Note that the lease should not be added until # the BucketWriter has been closed. - expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME + expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME lease_info = LeaseInfo(owner_num, renew_secret, cancel_secret, expire_time, self.my_nodeid) @@ -319,8 +334,9 @@ class StorageServer(service.MultiService, Referenceable): # file, they'll want us to hold leases for this file. for (shnum, fn) in self._get_bucket_shares(storage_index): alreadygot.add(shnum) - sf = ShareFile(fn) - sf.add_or_renew_lease(lease_info) + if renew_leases: + sf = ShareFile(fn) + sf.add_or_renew_lease(lease_info) for shnum in sharenums: incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum) @@ -337,7 +353,8 @@ class StorageServer(service.MultiService, Referenceable): elif (not limited) or (remaining_space >= max_space_per_bucket): # ok! we need to create the new share file. bw = BucketWriter(self, incominghome, finalhome, - max_space_per_bucket, lease_info) + max_space_per_bucket, lease_info, + clock=self._clock) if self.no_storage: bw.throw_out_all_data = True bucketwriters[shnum] = bw @@ -351,7 +368,7 @@ class StorageServer(service.MultiService, Referenceable): if bucketwriters: fileutil.make_dirs(os.path.join(self.sharedir, si_dir)) - self.add_latency("allocate", self._get_current_time() - start) + self.add_latency("allocate", self._clock.seconds() - start) return alreadygot, bucketwriters def remote_allocate_buckets(self, storage_index, @@ -361,7 +378,7 @@ class StorageServer(service.MultiService, Referenceable): """Foolscap-specific ``allocate_buckets()`` API.""" alreadygot, bucketwriters = self._allocate_buckets( storage_index, renew_secret, cancel_secret, sharenums, allocated_size, - owner_num=owner_num, + owner_num=owner_num, renew_leases=True, ) # Abort BucketWriters if disconnection happens. for bw in bucketwriters.values(): @@ -373,12 +390,12 @@ class StorageServer(service.MultiService, Referenceable): for shnum, filename in self._get_bucket_shares(storage_index): with open(filename, 'rb') as f: header = f.read(32) - if header[:32] == MutableShareFile.MAGIC: + if MutableShareFile.is_valid_header(header): sf = MutableShareFile(filename, self) # note: if the share has been migrated, the renew_lease() # call will throw an exception, with information to help the # client update the lease. - elif header[:4] == struct.pack(">L", 1): + elif ShareFile.is_valid_header(header): sf = ShareFile(filename) else: continue # non-sharefile @@ -386,26 +403,26 @@ class StorageServer(service.MultiService, Referenceable): def remote_add_lease(self, storage_index, renew_secret, cancel_secret, owner_num=1): - start = self._get_current_time() + start = self._clock.seconds() self.count("add-lease") - new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME + new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME lease_info = LeaseInfo(owner_num, renew_secret, cancel_secret, new_expire_time, self.my_nodeid) for sf in self._iter_share_files(storage_index): sf.add_or_renew_lease(lease_info) - self.add_latency("add-lease", self._get_current_time() - start) + self.add_latency("add-lease", self._clock.seconds() - start) return None def remote_renew_lease(self, storage_index, renew_secret): - start = self._get_current_time() + start = self._clock.seconds() self.count("renew") - new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME + new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME found_buckets = False for sf in self._iter_share_files(storage_index): found_buckets = True sf.renew_lease(renew_secret, new_expire_time) - self.add_latency("renew", self._get_current_time() - start) + self.add_latency("renew", self._clock.seconds() - start) if not found_buckets: raise IndexError("no such lease to renew") @@ -432,7 +449,7 @@ class StorageServer(service.MultiService, Referenceable): pass def remote_get_buckets(self, storage_index): - start = self._get_current_time() + start = self._clock.seconds() self.count("get") si_s = si_b2a(storage_index) log.msg("storage: get_buckets %r" % si_s) @@ -440,7 +457,7 @@ class StorageServer(service.MultiService, Referenceable): for shnum, filename in self._get_bucket_shares(storage_index): bucketreaders[shnum] = BucketReader(self, filename, storage_index, shnum) - self.add_latency("get", self._get_current_time() - start) + self.add_latency("get", self._clock.seconds() - start) return bucketreaders def get_leases(self, storage_index): @@ -579,10 +596,8 @@ class StorageServer(service.MultiService, Referenceable): else: if sharenum not in shares: # allocate a new share - allocated_size = 2000 # arbitrary, really share = self._allocate_slot_share(bucketdir, secrets, sharenum, - allocated_size, owner_num=0) shares[sharenum] = share shares[sharenum].writev(datav, new_length) @@ -601,7 +616,7 @@ class StorageServer(service.MultiService, Referenceable): :return LeaseInfo: Information for a new lease for a share. """ ownerid = 1 # TODO - expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME + expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME lease_info = LeaseInfo(ownerid, renew_secret, cancel_secret, expire_time, self.my_nodeid) @@ -631,13 +646,15 @@ class StorageServer(service.MultiService, Referenceable): Read data from shares and conditionally write some data to them. :param bool renew_leases: If and only if this is ``True`` and the test - vectors pass then shares in this slot will also have an updated - lease applied to them. + vectors pass then shares mentioned in ``test_and_write_vectors`` + that still exist after the changes are made will also have a + secret-matching lease renewed (or, if none match, a new lease + added). See ``allmydata.interfaces.RIStorageServer`` for details about other parameters and return value. """ - start = self._get_current_time() + start = self._clock.seconds() self.count("writev") si_s = si_b2a(storage_index) log.msg("storage: slot_writev %r" % si_s) @@ -678,7 +695,7 @@ class StorageServer(service.MultiService, Referenceable): self._add_or_renew_leases(remaining_shares, lease_info) # all done - self.add_latency("writev", self._get_current_time() - start) + self.add_latency("writev", self._clock.seconds() - start) return (testv_is_good, read_data) def remote_slot_testv_and_readv_and_writev(self, storage_index, @@ -694,7 +711,7 @@ class StorageServer(service.MultiService, Referenceable): ) def _allocate_slot_share(self, bucketdir, secrets, sharenum, - allocated_size, owner_num=0): + owner_num=0): (write_enabler, renew_secret, cancel_secret) = secrets my_nodeid = self.my_nodeid fileutil.make_dirs(bucketdir) @@ -704,7 +721,7 @@ class StorageServer(service.MultiService, Referenceable): return share def remote_slot_readv(self, storage_index, shares, readv): - start = self._get_current_time() + start = self._clock.seconds() self.count("readv") si_s = si_b2a(storage_index) lp = log.msg("storage: slot_readv %r %r" % (si_s, shares), @@ -713,7 +730,7 @@ class StorageServer(service.MultiService, Referenceable): # shares exist if there is a file for them bucketdir = os.path.join(self.sharedir, si_dir) if not os.path.isdir(bucketdir): - self.add_latency("readv", self._get_current_time() - start) + self.add_latency("readv", self._clock.seconds() - start) return {} datavs = {} for sharenum_s in os.listdir(bucketdir): @@ -727,7 +744,7 @@ class StorageServer(service.MultiService, Referenceable): datavs[sharenum] = msf.readv(readv) log.msg("returning shares %s" % (list(datavs.keys()),), facility="tahoe.storage", level=log.NOISY, parent=lp) - self.add_latency("readv", self._get_current_time() - start) + self.add_latency("readv", self._clock.seconds() - start) return datavs def remote_advise_corrupt_share(self, share_type, storage_index, shnum, diff --git a/src/allmydata/storage/shares.py b/src/allmydata/storage/shares.py index ec6c0a501..59e7b1539 100644 --- a/src/allmydata/storage/shares.py +++ b/src/allmydata/storage/shares.py @@ -17,8 +17,7 @@ from allmydata.storage.immutable import ShareFile def get_share_file(filename): with open(filename, "rb") as f: prefix = f.read(32) - if prefix == MutableShareFile.MAGIC: + if MutableShareFile.is_valid_header(prefix): return MutableShareFile(filename) # otherwise assume it's immutable return ShareFile(filename) - diff --git a/src/allmydata/test/__init__.py b/src/allmydata/test/__init__.py index 893aa15ce..ad245ca77 100644 --- a/src/allmydata/test/__init__.py +++ b/src/allmydata/test/__init__.py @@ -125,5 +125,5 @@ if sys.platform == "win32": initialize() from eliot import to_file -from allmydata.util.jsonbytes import AnyBytesJSONEncoder -to_file(open("eliot.log", "wb"), encoder=AnyBytesJSONEncoder) +from allmydata.util.eliotutil import eliot_json_encoder +to_file(open("eliot.log", "wb"), encoder=eliot_json_encoder) diff --git a/src/allmydata/test/cli/test_create.py b/src/allmydata/test/cli/test_create.py index 282f26163..609888fb3 100644 --- a/src/allmydata/test/cli/test_create.py +++ b/src/allmydata/test/cli/test_create.py @@ -11,16 +11,24 @@ if PY2: from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 import os -import mock + +try: + from typing import Any, List, Tuple +except ImportError: + pass + from twisted.trial import unittest from twisted.internet import defer, reactor from twisted.python import usage from allmydata.util import configutil +from allmydata.util import tor_provider, i2p_provider from ..common_util import run_cli, parse_cli +from ..common import ( + disable_modules, +) from ...scripts import create_node from ... import client - def read_config(basedir): tahoe_cfg = os.path.join(basedir, "tahoe.cfg") config = configutil.get_config(tahoe_cfg) @@ -105,11 +113,12 @@ class Config(unittest.TestCase): @defer.inlineCallbacks def test_client_hide_ip_no_i2p_txtorcon(self): - # hmm, I must be doing something weird, these don't work as - # @mock.patch decorators for some reason - txi2p = mock.patch('allmydata.util.i2p_provider._import_txi2p', return_value=None) - txtorcon = mock.patch('allmydata.util.tor_provider._import_txtorcon', return_value=None) - with txi2p, txtorcon: + """ + The ``create-client`` sub-command tells the user to install the necessary + dependencies if they have neither tor nor i2p support installed and + they request network location privacy with the ``--hide-ip`` flag. + """ + with disable_modules("txi2p", "txtorcon"): basedir = self.mktemp() rc, out, err = yield run_cli("create-client", "--hide-ip", basedir) self.assertTrue(rc != 0, out) @@ -118,8 +127,7 @@ class Config(unittest.TestCase): @defer.inlineCallbacks def test_client_i2p_option_no_txi2p(self): - txi2p = mock.patch('allmydata.util.i2p_provider._import_txi2p', return_value=None) - with txi2p: + with disable_modules("txi2p"): basedir = self.mktemp() rc, out, err = yield run_cli("create-node", "--listen=i2p", "--i2p-launch", basedir) self.assertTrue(rc != 0) @@ -127,8 +135,7 @@ class Config(unittest.TestCase): @defer.inlineCallbacks def test_client_tor_option_no_txtorcon(self): - txtorcon = mock.patch('allmydata.util.tor_provider._import_txtorcon', return_value=None) - with txtorcon: + with disable_modules("txtorcon"): basedir = self.mktemp() rc, out, err = yield run_cli("create-node", "--listen=tor", "--tor-launch", basedir) self.assertTrue(rc != 0) @@ -145,9 +152,7 @@ class Config(unittest.TestCase): @defer.inlineCallbacks def test_client_hide_ip_no_txtorcon(self): - txtorcon = mock.patch('allmydata.util.tor_provider._import_txtorcon', - return_value=None) - with txtorcon: + with disable_modules("txtorcon"): basedir = self.mktemp() rc, out, err = yield run_cli("create-client", "--hide-ip", basedir) self.assertEqual(0, rc) @@ -295,11 +300,10 @@ class Config(unittest.TestCase): def test_node_slow_tor(self): basedir = self.mktemp() d = defer.Deferred() - with mock.patch("allmydata.util.tor_provider.create_config", - return_value=d): - d2 = run_cli("create-node", "--listen=tor", basedir) - d.callback(({}, "port", "location")) - rc, out, err = yield d2 + self.patch(tor_provider, "create_config", lambda *a, **kw: d) + d2 = run_cli("create-node", "--listen=tor", basedir) + d.callback(({}, "port", "location")) + rc, out, err = yield d2 self.assertEqual(rc, 0) self.assertIn("Node created", out) self.assertEqual(err, "") @@ -308,11 +312,10 @@ class Config(unittest.TestCase): def test_node_slow_i2p(self): basedir = self.mktemp() d = defer.Deferred() - with mock.patch("allmydata.util.i2p_provider.create_config", - return_value=d): - d2 = run_cli("create-node", "--listen=i2p", basedir) - d.callback(({}, "port", "location")) - rc, out, err = yield d2 + self.patch(i2p_provider, "create_config", lambda *a, **kw: d) + d2 = run_cli("create-node", "--listen=i2p", basedir) + d.callback(({}, "port", "location")) + rc, out, err = yield d2 self.assertEqual(rc, 0) self.assertIn("Node created", out) self.assertEqual(err, "") @@ -353,6 +356,27 @@ class Config(unittest.TestCase): self.assertIn("is not empty", err) self.assertIn("To avoid clobbering anything, I am going to quit now", err) +def fake_config(testcase, module, result): + # type: (unittest.TestCase, Any, Any) -> List[Tuple] + """ + Monkey-patch a fake configuration function into the given module. + + :param testcase: The test case to use to do the monkey-patching. + + :param module: The module into which to patch the fake function. + + :param result: The return value for the fake function. + + :return: A list of tuples of the arguments the fake function was called + with. + """ + calls = [] + def fake_config(reactor, cli_config): + calls.append((reactor, cli_config)) + return result + testcase.patch(module, "create_config", fake_config) + return calls + class Tor(unittest.TestCase): def test_default(self): basedir = self.mktemp() @@ -360,12 +384,14 @@ class Tor(unittest.TestCase): tor_port = "ghi" tor_location = "jkl" config_d = defer.succeed( (tor_config, tor_port, tor_location) ) - with mock.patch("allmydata.util.tor_provider.create_config", - return_value=config_d) as co: - rc, out, err = self.successResultOf( - run_cli("create-node", "--listen=tor", basedir)) - self.assertEqual(len(co.mock_calls), 1) - args = co.mock_calls[0][1] + + calls = fake_config(self, tor_provider, config_d) + rc, out, err = self.successResultOf( + run_cli("create-node", "--listen=tor", basedir), + ) + + self.assertEqual(len(calls), 1) + args = calls[0] self.assertIdentical(args[0], reactor) self.assertIsInstance(args[1], create_node.CreateNodeOptions) self.assertEqual(args[1]["listen"], "tor") @@ -380,12 +406,15 @@ class Tor(unittest.TestCase): tor_port = "ghi" tor_location = "jkl" config_d = defer.succeed( (tor_config, tor_port, tor_location) ) - with mock.patch("allmydata.util.tor_provider.create_config", - return_value=config_d) as co: - rc, out, err = self.successResultOf( - run_cli("create-node", "--listen=tor", "--tor-launch", - basedir)) - args = co.mock_calls[0][1] + + calls = fake_config(self, tor_provider, config_d) + rc, out, err = self.successResultOf( + run_cli( + "create-node", "--listen=tor", "--tor-launch", + basedir, + ), + ) + args = calls[0] self.assertEqual(args[1]["listen"], "tor") self.assertEqual(args[1]["tor-launch"], True) self.assertEqual(args[1]["tor-control-port"], None) @@ -396,12 +425,15 @@ class Tor(unittest.TestCase): tor_port = "ghi" tor_location = "jkl" config_d = defer.succeed( (tor_config, tor_port, tor_location) ) - with mock.patch("allmydata.util.tor_provider.create_config", - return_value=config_d) as co: - rc, out, err = self.successResultOf( - run_cli("create-node", "--listen=tor", "--tor-control-port=mno", - basedir)) - args = co.mock_calls[0][1] + + calls = fake_config(self, tor_provider, config_d) + rc, out, err = self.successResultOf( + run_cli( + "create-node", "--listen=tor", "--tor-control-port=mno", + basedir, + ), + ) + args = calls[0] self.assertEqual(args[1]["listen"], "tor") self.assertEqual(args[1]["tor-launch"], False) self.assertEqual(args[1]["tor-control-port"], "mno") @@ -434,12 +466,13 @@ class I2P(unittest.TestCase): i2p_port = "ghi" i2p_location = "jkl" dest_d = defer.succeed( (i2p_config, i2p_port, i2p_location) ) - with mock.patch("allmydata.util.i2p_provider.create_config", - return_value=dest_d) as co: - rc, out, err = self.successResultOf( - run_cli("create-node", "--listen=i2p", basedir)) - self.assertEqual(len(co.mock_calls), 1) - args = co.mock_calls[0][1] + + calls = fake_config(self, i2p_provider, dest_d) + rc, out, err = self.successResultOf( + run_cli("create-node", "--listen=i2p", basedir), + ) + self.assertEqual(len(calls), 1) + args = calls[0] self.assertIdentical(args[0], reactor) self.assertIsInstance(args[1], create_node.CreateNodeOptions) self.assertEqual(args[1]["listen"], "i2p") @@ -461,12 +494,15 @@ class I2P(unittest.TestCase): i2p_port = "ghi" i2p_location = "jkl" dest_d = defer.succeed( (i2p_config, i2p_port, i2p_location) ) - with mock.patch("allmydata.util.i2p_provider.create_config", - return_value=dest_d) as co: - rc, out, err = self.successResultOf( - run_cli("create-node", "--listen=i2p", "--i2p-sam-port=mno", - basedir)) - args = co.mock_calls[0][1] + + calls = fake_config(self, i2p_provider, dest_d) + rc, out, err = self.successResultOf( + run_cli( + "create-node", "--listen=i2p", "--i2p-sam-port=mno", + basedir, + ), + ) + args = calls[0] self.assertEqual(args[1]["listen"], "i2p") self.assertEqual(args[1]["i2p-launch"], False) self.assertEqual(args[1]["i2p-sam-port"], "mno") diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index 0f2dc7c62..fee13cca9 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -28,6 +28,7 @@ __all__ = [ import sys import os, random, struct +from contextlib import contextmanager import six import tempfile from tempfile import mktemp @@ -267,8 +268,12 @@ class UseNode(object): node_config = attr.ib(default=attr.Factory(dict)) config = attr.ib(default=None) + reactor = attr.ib(default=None) def setUp(self): + self.assigner = SameProcessStreamEndpointAssigner() + self.assigner.setUp() + def format_config_items(config): return "\n".join( " = ".join((key, value)) @@ -292,6 +297,23 @@ class UseNode(object): "default", self.introducer_furl, ) + + node_config = self.node_config.copy() + if "tub.port" not in node_config: + if "tub.location" in node_config: + raise ValueError( + "UseNode fixture does not support specifying tub.location " + "without tub.port" + ) + + # Don't use the normal port auto-assignment logic. It produces + # collisions and makes tests fail spuriously. + tub_location, tub_endpoint = self.assigner.assign(self.reactor) + node_config.update({ + "tub.port": tub_endpoint, + "tub.location": tub_location, + }) + self.config = config_from_string( self.basedir.asTextMode().path, "tub.port", @@ -304,7 +326,7 @@ storage.plugins = {storage_plugin} {plugin_config_section} """.format( storage_plugin=self.storage_plugin, - node_config=format_config_items(self.node_config), + node_config=format_config_items(node_config), plugin_config_section=plugin_config_section, ) ) @@ -316,7 +338,7 @@ storage.plugins = {storage_plugin} ) def cleanUp(self): - pass + self.assigner.tearDown() def getDetails(self): @@ -1068,7 +1090,7 @@ def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False): def _corrupt_mutable_share_data(data, debug=False): prefix = data[:32] - assert prefix == MutableShareFile.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC) + assert MutableShareFile.is_valid_header(prefix), "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC) data_offset = MutableShareFile.DATA_OFFSET sharetype = data[data_offset:data_offset+1] assert sharetype == b"\x00", "non-SDMF mutable shares not supported" @@ -1213,6 +1235,29 @@ class ConstantAddresses(object): raise Exception("{!r} has no client endpoint.") return self._handler +@contextmanager +def disable_modules(*names): + """ + A context manager which makes modules appear to be missing while it is + active. + + :param *names: The names of the modules to disappear. Only top-level + modules are supported (that is, "." is not allowed in any names). + This is an implementation shortcoming which could be lifted if + desired. + """ + if any("." in name for name in names): + raise ValueError("Names containing '.' are not supported.") + missing = object() + modules = list(sys.modules.get(n, missing) for n in names) + for n in names: + sys.modules[n] = None + yield + for n, original in zip(names, modules): + if original is missing: + del sys.modules[n] + else: + sys.modules[n] = original class _TestCaseMixin(object): """ diff --git a/src/allmydata/test/common_system.py b/src/allmydata/test/common_system.py index 9d14c8642..0c424136a 100644 --- a/src/allmydata/test/common_system.py +++ b/src/allmydata/test/common_system.py @@ -672,11 +672,14 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): """ iv_dir = self.getdir("introducer") if not os.path.isdir(iv_dir): - _, port_endpoint = self.port_assigner.assign(reactor) + _, web_port_endpoint = self.port_assigner.assign(reactor) + main_location_hint, main_port_endpoint = self.port_assigner.assign(reactor) introducer_config = ( u"[node]\n" u"nickname = introducer \N{BLACK SMILING FACE}\n" + - u"web.port = {}\n".format(port_endpoint) + u"web.port = {}\n".format(web_port_endpoint) + + u"tub.port = {}\n".format(main_port_endpoint) + + u"tub.location = {}\n".format(main_location_hint) ).encode("utf-8") fileutil.make_dirs(iv_dir) @@ -764,13 +767,15 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): def _generate_config(self, which, basedir): config = {} - except1 = set(range(self.numclients)) - {1} + allclients = set(range(self.numclients)) + except1 = allclients - {1} feature_matrix = { ("client", "nickname"): except1, - # client 1 has to auto-assign an address. - ("node", "tub.port"): except1, - ("node", "tub.location"): except1, + # Auto-assigning addresses is extremely failure prone and not + # amenable to automated testing in _this_ manner. + ("node", "tub.port"): allclients, + ("node", "tub.location"): allclients, # client 0 runs a webserver and a helper # client 3 runs a webserver but no helper @@ -852,7 +857,13 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin): # connection-lost code basedir = FilePath(self.getdir("client%d" % client_num)) basedir.makedirs() - config = "[client]\n" + config = ( + "[node]\n" + "tub.location = {}\n" + "tub.port = {}\n" + "[client]\n" + ).format(*self.port_assigner.assign(reactor)) + if helper_furl: config += "helper.furl = %s\n" % helper_furl basedir.child("tahoe.cfg").setContent(config.encode("utf-8")) diff --git a/src/allmydata/test/eliotutil.py b/src/allmydata/test/eliotutil.py index 1685744fd..dd21f1e9d 100644 --- a/src/allmydata/test/eliotutil.py +++ b/src/allmydata/test/eliotutil.py @@ -42,7 +42,6 @@ from zope.interface import ( from eliot import ( ActionType, Field, - MemoryLogger, ILogger, ) from eliot.testing import ( @@ -54,8 +53,9 @@ from twisted.python.monkey import ( MonkeyPatcher, ) -from ..util.jsonbytes import AnyBytesJSONEncoder - +from ..util.eliotutil import ( + MemoryLogger, +) _NAME = Field.for_types( u"name", @@ -71,14 +71,6 @@ RUN_TEST = ActionType( ) -# On Python 3, we want to use our custom JSON encoder when validating messages -# can be encoded to JSON: -if PY2: - _memory_logger = MemoryLogger -else: - _memory_logger = lambda: MemoryLogger(encoder=AnyBytesJSONEncoder) - - @attr.s class EliotLoggedRunTest(object): """ @@ -170,7 +162,7 @@ def with_logging( """ @wraps(test_method) def run_with_logging(*args, **kwargs): - validating_logger = _memory_logger() + validating_logger = MemoryLogger() original = swap_logger(None) try: swap_logger(_TwoLoggers(original, validating_logger)) diff --git a/src/allmydata/test/test_client.py b/src/allmydata/test/test_client.py index fd2837f1d..a2572e735 100644 --- a/src/allmydata/test/test_client.py +++ b/src/allmydata/test/test_client.py @@ -89,6 +89,7 @@ from .common import ( UseTestPlugins, MemoryIntroducerClient, get_published_announcements, + UseNode, ) from .matchers import ( MatchesSameElements, @@ -953,13 +954,14 @@ class Run(unittest.TestCase, testutil.StallMixin): @defer.inlineCallbacks def test_reloadable(self): - basedir = FilePath("test_client.Run.test_reloadable") - private = basedir.child("private") - private.makedirs() + from twisted.internet import reactor + dummy = "pb://wl74cyahejagspqgy4x5ukrvfnevlknt@127.0.0.1:58889/bogus" - write_introducer(basedir, "someintroducer", dummy) - basedir.child("tahoe.cfg").setContent(BASECONFIG. encode("ascii")) - c1 = yield client.create_client(basedir.path) + fixture = UseNode(None, None, FilePath(self.mktemp()), dummy, reactor=reactor) + fixture.setUp() + self.addCleanup(fixture.cleanUp) + + c1 = yield fixture.create_node() c1.setServiceParent(self.sparent) # delay to let the service start up completely. I'm not entirely sure @@ -981,7 +983,7 @@ class Run(unittest.TestCase, testutil.StallMixin): # also change _check_exit_trigger to use it instead of a raw # reactor.stop, also instrument the shutdown event in an # attribute that we can check.) - c2 = yield client.create_client(basedir.path) + c2 = yield fixture.create_node() c2.setServiceParent(self.sparent) yield c2.disownServiceParent() diff --git a/src/allmydata/test/test_common_util.py b/src/allmydata/test/test_common_util.py index 55986d123..c141adc8d 100644 --- a/src/allmydata/test/test_common_util.py +++ b/src/allmydata/test/test_common_util.py @@ -10,16 +10,30 @@ from future.utils import PY2 if PY2: from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 +import sys import random -import unittest +from hypothesis import given +from hypothesis.strategies import lists, sampled_from +from testtools.matchers import Equals +from twisted.python.reflect import ( + ModuleNotFound, + namedAny, +) + +from .common import ( + SyncTestCase, + disable_modules, +) from allmydata.test.common_util import flip_one_bit -class TestFlipOneBit(unittest.TestCase): +class TestFlipOneBit(SyncTestCase): def setUp(self): - random.seed(42) # I tried using version=1 on PY3 to avoid the if below, to no avail. + super(TestFlipOneBit, self).setUp() + # I tried using version=1 on PY3 to avoid the if below, to no avail. + random.seed(42) def test_accepts_byte_string(self): actual = flip_one_bit(b'foo') @@ -27,3 +41,61 @@ class TestFlipOneBit(unittest.TestCase): def test_rejects_unicode_string(self): self.assertRaises(AssertionError, flip_one_bit, u'foo') + + + +def some_existing_modules(): + """ + Build the names of modules (as native strings) that exist and can be + imported. + """ + candidates = sorted( + name + for name + in sys.modules + if "." not in name + and sys.modules[name] is not None + ) + return sampled_from(candidates) + +class DisableModulesTests(SyncTestCase): + """ + Tests for ``disable_modules``. + """ + def setup_example(self): + return sys.modules.copy() + + def teardown_example(self, safe_modules): + sys.modules.update(safe_modules) + + @given(lists(some_existing_modules(), unique=True)) + def test_importerror(self, module_names): + """ + While the ``disable_modules`` context manager is active any import of the + modules identified by the names passed to it result in ``ImportError`` + being raised. + """ + def get_modules(): + return list( + namedAny(name) + for name + in module_names + ) + before_modules = get_modules() + + with disable_modules(*module_names): + for name in module_names: + with self.assertRaises(ModuleNotFound): + namedAny(name) + + after_modules = get_modules() + self.assertThat(before_modules, Equals(after_modules)) + + def test_dotted_names_rejected(self): + """ + If names with "." in them are passed to ``disable_modules`` then + ``ValueError`` is raised. + """ + with self.assertRaises(ValueError): + with disable_modules("foo.bar"): + pass diff --git a/src/allmydata/test/test_eliotutil.py b/src/allmydata/test/test_eliotutil.py index 3f915ecd2..cabe599b3 100644 --- a/src/allmydata/test/test_eliotutil.py +++ b/src/allmydata/test/test_eliotutil.py @@ -27,13 +27,12 @@ from fixtures import ( ) from testtools import ( TestCase, -) -from testtools import ( TestResult, ) from testtools.matchers import ( Is, IsInstance, + Not, MatchesStructure, Equals, HasLength, @@ -65,11 +64,11 @@ from twisted.internet.task import deferLater from twisted.internet import reactor from ..util.eliotutil import ( + eliot_json_encoder, log_call_deferred, _parse_destination_description, _EliotLogging, ) -from ..util.jsonbytes import AnyBytesJSONEncoder from .common import ( SyncTestCase, @@ -77,24 +76,105 @@ from .common import ( ) -class EliotLoggedTestTests(AsyncTestCase): +def passes(): + """ + Create a matcher that matches a ``TestCase`` that runs without failures or + errors. + """ + def run(case): + result = TestResult() + case.run(result) + return result.wasSuccessful() + return AfterPreprocessing(run, Equals(True)) + + +class EliotLoggedTestTests(TestCase): + """ + Tests for the automatic log-related provided by ``AsyncTestCase``. + + This class uses ``testtools.TestCase`` because it is inconvenient to nest + ``AsyncTestCase`` inside ``AsyncTestCase`` (in particular, Eliot messages + emitted by the inner test case get observed by the outer test case and if + an inner case emits invalid messages they cause the outer test case to + fail). + """ + def test_fails(self): + """ + A test method of an ``AsyncTestCase`` subclass can fail. + """ + class UnderTest(AsyncTestCase): + def test_it(self): + self.fail("make sure it can fail") + + self.assertThat(UnderTest("test_it"), Not(passes())) + + def test_unserializable_fails(self): + """ + A test method of an ``AsyncTestCase`` subclass that logs an unserializable + value with Eliot fails. + """ + class world(object): + """ + an unserializable object + """ + + class UnderTest(AsyncTestCase): + def test_it(self): + Message.log(hello=world) + + self.assertThat(UnderTest("test_it"), Not(passes())) + + def test_logs_non_utf_8_byte(self): + """ + A test method of an ``AsyncTestCase`` subclass can log a message that + contains a non-UTF-8 byte string and return ``None`` and pass. + """ + class UnderTest(AsyncTestCase): + def test_it(self): + Message.log(hello=b"\xFF") + + self.assertThat(UnderTest("test_it"), passes()) + def test_returns_none(self): - Message.log(hello="world") + """ + A test method of an ``AsyncTestCase`` subclass can log a message and + return ``None`` and pass. + """ + class UnderTest(AsyncTestCase): + def test_it(self): + Message.log(hello="world") + + self.assertThat(UnderTest("test_it"), passes()) def test_returns_fired_deferred(self): - Message.log(hello="world") - return succeed(None) + """ + A test method of an ``AsyncTestCase`` subclass can log a message and + return an already-fired ``Deferred`` and pass. + """ + class UnderTest(AsyncTestCase): + def test_it(self): + Message.log(hello="world") + return succeed(None) + + self.assertThat(UnderTest("test_it"), passes()) def test_returns_unfired_deferred(self): - Message.log(hello="world") - # @eliot_logged_test automatically gives us an action context but it's - # still our responsibility to maintain it across stack-busting - # operations. - d = DeferredContext(deferLater(reactor, 0.0, lambda: None)) - d.addCallback(lambda ignored: Message.log(goodbye="world")) - # We didn't start an action. We're not finishing an action. - return d.result + """ + A test method of an ``AsyncTestCase`` subclass can log a message and + return an unfired ``Deferred`` and pass when the ``Deferred`` fires. + """ + class UnderTest(AsyncTestCase): + def test_it(self): + Message.log(hello="world") + # @eliot_logged_test automatically gives us an action context + # but it's still our responsibility to maintain it across + # stack-busting operations. + d = DeferredContext(deferLater(reactor, 0.0, lambda: None)) + d.addCallback(lambda ignored: Message.log(goodbye="world")) + # We didn't start an action. We're not finishing an action. + return d.result + self.assertThat(UnderTest("test_it"), passes()) class ParseDestinationDescriptionTests(SyncTestCase): @@ -109,7 +189,7 @@ class ParseDestinationDescriptionTests(SyncTestCase): reactor = object() self.assertThat( _parse_destination_description("file:-")(reactor), - Equals(FileDestination(stdout, encoder=AnyBytesJSONEncoder)), + Equals(FileDestination(stdout, encoder=eliot_json_encoder)), ) diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index fe494a9d4..a17264713 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -21,6 +21,7 @@ if PY2: from random import Random from twisted.internet.defer import inlineCallbacks, returnValue +from twisted.internet.task import Clock from foolscap.api import Referenceable, RemoteException @@ -1017,16 +1018,17 @@ class _FoolscapMixin(SystemTestMixin): self.server = s break assert self.server is not None, "Couldn't find StorageServer" - self._current_time = 123456 - self.server._get_current_time = self.fake_time + self._clock = Clock() + self._clock.advance(123456) + self.server._clock = self._clock def fake_time(self): """Return the current fake, test-controlled, time.""" - return self._current_time + return self._clock.seconds() def fake_sleep(self, seconds): """Advance the fake time by the given number of seconds.""" - self._current_time += seconds + self._clock.advance(seconds) @inlineCallbacks def tearDown(self): diff --git a/src/allmydata/test/test_node.py b/src/allmydata/test/test_node.py index cf5fa27f3..c6cff1bab 100644 --- a/src/allmydata/test/test_node.py +++ b/src/allmydata/test/test_node.py @@ -69,6 +69,8 @@ import allmydata.test.common_util as testutil from .common import ( ConstantAddresses, + SameProcessStreamEndpointAssigner, + UseNode, ) def port_numbers(): @@ -80,11 +82,10 @@ class LoggingMultiService(service.MultiService): # see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2946 -def testing_tub(config_data=''): +def testing_tub(reactor, config_data=''): """ Creates a 'main' Tub for testing purposes, from config data """ - from twisted.internet import reactor basedir = 'dummy_basedir' config = config_from_string(basedir, 'DEFAULT_PORTNUMFILE_BLANK', config_data) fileutil.make_dirs(os.path.join(basedir, 'private')) @@ -112,6 +113,9 @@ class TestCase(testutil.SignalMixin, unittest.TestCase): # try to bind the port. We'll use a low-numbered one that's likely to # conflict with another service to prove it. self._available_port = 22 + self.port_assigner = SameProcessStreamEndpointAssigner() + self.port_assigner.setUp() + self.addCleanup(self.port_assigner.tearDown) def _test_location( self, @@ -137,11 +141,23 @@ class TestCase(testutil.SignalMixin, unittest.TestCase): :param local_addresses: If not ``None`` then a list of addresses to supply to the system under test as local addresses. """ + from twisted.internet import reactor + basedir = self.mktemp() create_node_dir(basedir, "testing") + if tub_port is None: + # Always configure a usable tub.port address instead of relying on + # the automatic port assignment. The automatic port assignment is + # prone to collisions and spurious test failures. + _, tub_port = self.port_assigner.assign(reactor) + config_data = "[node]\n" - if tub_port: - config_data += "tub.port = {}\n".format(tub_port) + config_data += "tub.port = {}\n".format(tub_port) + + # If they wanted a certain location, go for it. This probably won't + # agree with the tub.port value we set but that only matters if + # anything tries to use this to establish a connection ... which + # nothing in this test suite will. if tub_location is not None: config_data += "tub.location = {}\n".format(tub_location) @@ -149,7 +165,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase): self.patch(iputil, 'get_local_addresses_sync', lambda: local_addresses) - tub = testing_tub(config_data) + tub = testing_tub(reactor, config_data) class Foo(object): pass @@ -431,7 +447,12 @@ class TestCase(testutil.SignalMixin, unittest.TestCase): @defer.inlineCallbacks def test_logdir_is_str(self): - basedir = "test_node/test_logdir_is_str" + from twisted.internet import reactor + + basedir = FilePath(self.mktemp()) + fixture = UseNode(None, None, basedir, "pb://introducer/furl", {}, reactor=reactor) + fixture.setUp() + self.addCleanup(fixture.cleanUp) ns = Namespace() ns.called = False @@ -440,8 +461,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase): self.failUnless(isinstance(logdir, str), logdir) self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir) - create_node_dir(basedir, "nothing to see here") - yield client.create_client(basedir) + yield fixture.create_node() self.failUnless(ns.called) def test_set_config_unescaped_furl_hash(self): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 8123be2c5..bc87e168d 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -128,7 +128,7 @@ class Bucket(unittest.TestCase): def test_create(self): incoming, final = self.make_workdir("test_create") - bw = BucketWriter(self, incoming, final, 200, self.make_lease()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock()) bw.remote_write(0, b"a"*25) bw.remote_write(25, b"b"*25) bw.remote_write(50, b"c"*25) @@ -137,7 +137,7 @@ class Bucket(unittest.TestCase): def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") - bw = BucketWriter(self, incoming, final, 200, self.make_lease()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock()) bw.remote_write(0, b"a"*25) bw.remote_write(25, b"b"*25) bw.remote_write(50, b"c"*7) # last block may be short @@ -155,7 +155,7 @@ class Bucket(unittest.TestCase): incoming, final = self.make_workdir( "test_write_past_size_errors-{}".format(i) ) - bw = BucketWriter(self, incoming, final, 200, self.make_lease()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock()) with self.assertRaises(DataTooLargeError): bw.remote_write(offset, b"a" * length) @@ -174,7 +174,7 @@ class Bucket(unittest.TestCase): expected_data = b"".join(bchr(i) for i in range(100)) incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) bw = BucketWriter( - self, incoming, final, length, self.make_lease(), + self, incoming, final, length, self.make_lease(), Clock() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. bw.remote_write(10, expected_data[10:20]) @@ -212,7 +212,7 @@ class Bucket(unittest.TestCase): length = 100 incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) bw = BucketWriter( - self, incoming, final, length, self.make_lease(), + self, incoming, final, length, self.make_lease(), Clock() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. bw.remote_write(10, b"1" * 10) @@ -285,6 +285,67 @@ class Bucket(unittest.TestCase): result_of_read = br.remote_read(0, len(share_data)+1) self.failUnlessEqual(result_of_read, share_data) + def _assert_timeout_only_after_30_minutes(self, clock, bw): + """ + The ``BucketWriter`` times out and is closed after 30 minutes, but not + sooner. + """ + self.assertFalse(bw.closed) + # 29 minutes pass. Everything is fine. + for i in range(29): + clock.advance(60) + self.assertFalse(bw.closed, "Bucket closed after only %d minutes" % (i + 1,)) + # After the 30th minute, the bucket is closed due to lack of writes. + clock.advance(60) + self.assertTrue(bw.closed) + + def test_bucket_expires_if_no_writes_for_30_minutes(self): + """ + If a ``BucketWriter`` receives no writes for 30 minutes, it is removed. + """ + incoming, final = self.make_workdir("test_bucket_expires") + clock = Clock() + bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock) + self._assert_timeout_only_after_30_minutes(clock, bw) + + def test_bucket_writes_delay_timeout(self): + """ + So long as the ``BucketWriter`` receives writes, the the removal + timeout is put off. + """ + incoming, final = self.make_workdir("test_bucket_writes_delay_timeout") + clock = Clock() + bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock) + # 29 minutes pass, getting close to the timeout... + clock.advance(29 * 60) + # .. but we receive a write! So that should delay the timeout again to + # another 30 minutes. + bw.remote_write(0, b"hello") + self._assert_timeout_only_after_30_minutes(clock, bw) + + def test_bucket_closing_cancels_timeout(self): + """ + Closing cancels the ``BucketWriter`` timeout. + """ + incoming, final = self.make_workdir("test_bucket_close_timeout") + clock = Clock() + bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock) + self.assertTrue(clock.getDelayedCalls()) + bw.close() + self.assertFalse(clock.getDelayedCalls()) + + def test_bucket_aborting_cancels_timeout(self): + """ + Closing cancels the ``BucketWriter`` timeout. + """ + incoming, final = self.make_workdir("test_bucket_abort_timeout") + clock = Clock() + bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock) + self.assertTrue(clock.getDelayedCalls()) + bw.abort() + self.assertFalse(clock.getDelayedCalls()) + + class RemoteBucket(object): def __init__(self, target): @@ -312,7 +373,7 @@ class BucketProxy(unittest.TestCase): final = os.path.join(basedir, "bucket") fileutil.make_dirs(basedir) fileutil.make_dirs(os.path.join(basedir, "tmp")) - bw = BucketWriter(self, incoming, final, size, self.make_lease()) + bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock()) rb = RemoteBucket(bw) return bw, rb, final @@ -438,11 +499,13 @@ class Server(unittest.TestCase): basedir = os.path.join("storage", "Server", name) return basedir - def create(self, name, reserved_space=0, klass=StorageServer, get_current_time=time.time): + def create(self, name, reserved_space=0, klass=StorageServer, clock=None): + if clock is None: + clock = Clock() workdir = self.workdir(name) ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space, stats_provider=FakeStatsProvider(), - get_current_time=get_current_time) + clock=clock) ss.setServiceParent(self.sparent) return ss @@ -468,14 +531,19 @@ class Server(unittest.TestCase): sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1'] self.failUnlessIn(b'available-space', sv1) - def allocate(self, ss, storage_index, sharenums, size, canary=None): + def allocate(self, ss, storage_index, sharenums, size, renew_leases=True): + """ + Call directly into the storage server's allocate_buckets implementation, + skipping the Foolscap layer. + """ renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)) cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)) - if not canary: - canary = FakeCanary() - return ss.remote_allocate_buckets(storage_index, - renew_secret, cancel_secret, - sharenums, size, canary) + return ss._allocate_buckets( + storage_index, + renew_secret, cancel_secret, + sharenums, size, + renew_leases=renew_leases, + ) def test_large_share(self): syslow = platform.system().lower() @@ -554,7 +622,6 @@ class Server(unittest.TestCase): writer.remote_abort() self.failUnlessEqual(ss.allocated_size(), 0) - def test_allocate(self): ss = self.create("test_allocate") @@ -608,6 +675,64 @@ class Server(unittest.TestCase): for i,wb in writers.items(): wb.remote_abort() + def test_allocate_without_lease_renewal(self): + """ + ``StorageServer._allocate_buckets`` does not renew leases on existing + shares if ``renew_leases`` is ``False``. + """ + first_lease = 456 + second_lease = 543 + storage_index = b"allocate" + + clock = Clock() + clock.advance(first_lease) + ss = self.create( + "test_allocate_without_lease_renewal", + clock=clock, + ) + + # Put a share on there + already, writers = self.allocate( + ss, storage_index, [0], 1, renew_leases=False, + ) + (writer,) = writers.values() + writer.remote_write(0, b"x") + writer.remote_close() + + # It should have a lease granted at the current time. + shares = dict(ss._get_bucket_shares(storage_index)) + self.assertEqual( + [first_lease], + list( + lease.get_grant_renew_time_time() + for lease + in ShareFile(shares[0]).get_leases() + ), + ) + + # Let some time pass so we can tell if the lease on share 0 is + # renewed. + clock.advance(second_lease) + + # Put another share on there. + already, writers = self.allocate( + ss, storage_index, [1], 1, renew_leases=False, + ) + (writer,) = writers.values() + writer.remote_write(0, b"x") + writer.remote_close() + + # The first share's lease expiration time is unchanged. + shares = dict(ss._get_bucket_shares(storage_index)) + self.assertEqual( + [first_lease], + list( + lease.get_grant_renew_time_time() + for lease + in ShareFile(shares[0]).get_leases() + ), + ) + def test_bad_container_version(self): ss = self.create("test_bad_container_version") a,w = self.allocate(ss, b"si1", [0], 10) @@ -629,8 +754,17 @@ class Server(unittest.TestCase): def test_disconnect(self): # simulate a disconnection ss = self.create("test_disconnect") + renew_secret = b"r" * 32 + cancel_secret = b"c" * 32 canary = FakeCanary() - already,writers = self.allocate(ss, b"disconnect", [0,1,2], 75, canary) + already,writers = ss.remote_allocate_buckets( + b"disconnect", + renew_secret, + cancel_secret, + sharenums=[0,1,2], + allocated_size=75, + canary=canary, + ) self.failUnlessEqual(already, set()) self.failUnlessEqual(set(writers.keys()), set([0,1,2])) for (f,args,kwargs) in list(canary.disconnectors.values()): @@ -662,8 +796,17 @@ class Server(unittest.TestCase): # the size we request. OVERHEAD = 3*4 LEASE_SIZE = 4+32+32+4 + renew_secret = b"r" * 32 + cancel_secret = b"c" * 32 canary = FakeCanary() - already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary) + already, writers = ss.remote_allocate_buckets( + b"vid1", + renew_secret, + cancel_secret, + sharenums=[0,1,2], + allocated_size=1000, + canary=canary, + ) self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally # allocated, allowing only 2000 more to be claimed @@ -696,7 +839,14 @@ class Server(unittest.TestCase): # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and # 5000-1085=3915 free, therefore we can fit 39 100byte shares canary3 = FakeCanary() - already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3) + already3, writers3 = ss.remote_allocate_buckets( + b"vid3", + renew_secret, + cancel_secret, + sharenums=list(range(100)), + allocated_size=100, + canary=canary3, + ) self.failUnlessEqual(len(writers3), 39) self.failUnlessEqual(len(ss._bucket_writers), 39) @@ -755,28 +905,28 @@ class Server(unittest.TestCase): # Create a bucket: rs0, cs0 = self.create_bucket_5_shares(ss, b"si0") - leases = list(ss.get_leases(b"si0")) - self.failUnlessEqual(len(leases), 1) - self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0])) + (lease,) = ss.get_leases(b"si0") + self.assertTrue(lease.is_renew_secret(rs0)) rs1, cs1 = self.create_bucket_5_shares(ss, b"si1") # take out a second lease on si1 rs2, cs2 = self.create_bucket_5_shares(ss, b"si1", 5, 0) - leases = list(ss.get_leases(b"si1")) - self.failUnlessEqual(len(leases), 2) - self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2])) + (lease1, lease2) = ss.get_leases(b"si1") + self.assertTrue(lease1.is_renew_secret(rs1)) + self.assertTrue(lease2.is_renew_secret(rs2)) # and a third lease, using add-lease rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)), hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))) ss.remote_add_lease(b"si1", rs2a, cs2a) - leases = list(ss.get_leases(b"si1")) - self.failUnlessEqual(len(leases), 3) - self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a])) + (lease1, lease2, lease3) = ss.get_leases(b"si1") + self.assertTrue(lease1.is_renew_secret(rs1)) + self.assertTrue(lease2.is_renew_secret(rs2)) + self.assertTrue(lease3.is_renew_secret(rs2a)) # add-lease on a missing storage index is silently ignored - self.failUnlessEqual(ss.remote_add_lease(b"si18", b"", b""), None) + self.assertIsNone(ss.remote_add_lease(b"si18", b"", b"")) # check that si0 is readable readers = ss.remote_get_buckets(b"si0") @@ -830,7 +980,7 @@ class Server(unittest.TestCase): """ clock = Clock() clock.advance(123) - ss = self.create("test_immutable_add_lease_renews", get_current_time=clock.seconds) + ss = self.create("test_immutable_add_lease_renews", clock=clock) # Start out with single lease created with bucket: renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0") @@ -944,10 +1094,12 @@ class MutableServer(unittest.TestCase): basedir = os.path.join("storage", "MutableServer", name) return basedir - def create(self, name, get_current_time=time.time): + def create(self, name, clock=None): workdir = self.workdir(name) + if clock is None: + clock = Clock() ss = StorageServer(workdir, b"\x00" * 20, - get_current_time=get_current_time) + clock=clock) ss.setServiceParent(self.sparent) return ss @@ -1332,7 +1484,7 @@ class MutableServer(unittest.TestCase): clock = Clock() clock.advance(235) ss = self.create("test_mutable_add_lease_renews", - get_current_time=clock.seconds) + clock=clock) def secrets(n): return ( self.write_enabler(b"we1"), self.renew_secret(b"we1-%d" % n), @@ -3028,3 +3180,102 @@ class ShareFileTests(unittest.TestCase): sf = self.get_sharefile() with self.assertRaises(IndexError): sf.cancel_lease(b"garbage") + + def test_renew_secret(self): + """ + A lease loaded from a share file can have its renew secret verified. + """ + renew_secret = b"r" * 32 + cancel_secret = b"c" * 32 + expiration_time = 2 ** 31 + + sf = self.get_sharefile() + lease = LeaseInfo( + owner_num=0, + renew_secret=renew_secret, + cancel_secret=cancel_secret, + expiration_time=expiration_time, + ) + sf.add_lease(lease) + (loaded_lease,) = sf.get_leases() + self.assertTrue(loaded_lease.is_renew_secret(renew_secret)) + + def test_cancel_secret(self): + """ + A lease loaded from a share file can have its cancel secret verified. + """ + renew_secret = b"r" * 32 + cancel_secret = b"c" * 32 + expiration_time = 2 ** 31 + + sf = self.get_sharefile() + lease = LeaseInfo( + owner_num=0, + renew_secret=renew_secret, + cancel_secret=cancel_secret, + expiration_time=expiration_time, + ) + sf.add_lease(lease) + (loaded_lease,) = sf.get_leases() + self.assertTrue(loaded_lease.is_cancel_secret(cancel_secret)) + + +class LeaseInfoTests(unittest.TestCase): + """ + Tests for ``allmydata.storage.lease.LeaseInfo``. + """ + def test_is_renew_secret(self): + """ + ``LeaseInfo.is_renew_secret`` returns ``True`` if the value given is the + renew secret. + """ + renew_secret = b"r" * 32 + cancel_secret = b"c" * 32 + lease = LeaseInfo( + owner_num=1, + renew_secret=renew_secret, + cancel_secret=cancel_secret, + ) + self.assertTrue(lease.is_renew_secret(renew_secret)) + + def test_is_not_renew_secret(self): + """ + ``LeaseInfo.is_renew_secret`` returns ``False`` if the value given is not + the renew secret. + """ + renew_secret = b"r" * 32 + cancel_secret = b"c" * 32 + lease = LeaseInfo( + owner_num=1, + renew_secret=renew_secret, + cancel_secret=cancel_secret, + ) + self.assertFalse(lease.is_renew_secret(cancel_secret)) + + def test_is_cancel_secret(self): + """ + ``LeaseInfo.is_cancel_secret`` returns ``True`` if the value given is the + cancel secret. + """ + renew_secret = b"r" * 32 + cancel_secret = b"c" * 32 + lease = LeaseInfo( + owner_num=1, + renew_secret=renew_secret, + cancel_secret=cancel_secret, + ) + self.assertTrue(lease.is_cancel_secret(cancel_secret)) + + def test_is_not_cancel_secret(self): + """ + ``LeaseInfo.is_cancel_secret`` returns ``False`` if the value given is not + the cancel secret. + """ + renew_secret = b"r" * 32 + cancel_secret = b"c" * 32 + lease = LeaseInfo( + owner_num=1, + renew_secret=renew_secret, + cancel_secret=cancel_secret, + ) + self.assertFalse(lease.is_cancel_secret(renew_secret)) diff --git a/src/allmydata/test/test_storage_http.py b/src/allmydata/test/test_storage_http.py new file mode 100644 index 000000000..442e154a0 --- /dev/null +++ b/src/allmydata/test/test_storage_http.py @@ -0,0 +1,69 @@ +""" +Tests for HTTP storage client + server. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 + +if PY2: + # fmt: off + from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 + # fmt: on + +from unittest import SkipTest + +from twisted.trial.unittest import TestCase +from twisted.internet.defer import inlineCallbacks + +from treq.testing import StubTreq +from hyperlink import DecodedURL + +from ..storage.server import StorageServer +from ..storage.http_server import HTTPServer +from ..storage.http_client import StorageClient, ClientException + + +class HTTPTests(TestCase): + """ + Tests of HTTP client talking to the HTTP server. + """ + + def setUp(self): + if PY2: + raise SkipTest("Not going to bother supporting Python 2") + self.storage_server = StorageServer(self.mktemp(), b"\x00" * 20) + # TODO what should the swissnum _actually_ be? + self._http_server = HTTPServer(self.storage_server, b"abcd") + self.client = StorageClient( + DecodedURL.from_text("http://127.0.0.1"), + b"abcd", + treq=StubTreq(self._http_server.get_resource()), + ) + + @inlineCallbacks + def test_bad_authentication(self): + """ + If the wrong swissnum is used, an ``Unauthorized`` response code is + returned. + """ + client = StorageClient( + DecodedURL.from_text("http://127.0.0.1"), + b"something wrong", + treq=StubTreq(self._http_server.get_resource()), + ) + with self.assertRaises(ClientException) as e: + yield client.get_version() + self.assertEqual(e.exception.args[0], 401) + + @inlineCallbacks + def test_version(self): + """ + The client can return the version. + """ + version = yield self.client.get_version() + expected_version = self.storage_server.remote_get_version() + self.assertEqual(version, expected_version) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 087a1c634..d859a0e00 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -23,6 +23,7 @@ from twisted.internet import defer from allmydata import uri from allmydata.storage.mutable import MutableShareFile +from allmydata.storage.immutable import ShareFile from allmydata.storage.server import si_a2b from allmydata.immutable import offloaded, upload from allmydata.immutable.literal import LiteralFileNode @@ -1290,9 +1291,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase): # are sharefiles here filename = os.path.join(dirpath, filenames[0]) # peek at the magic to see if it is a chk share - magic = open(filename, "rb").read(4) - if magic == b'\x00\x00\x00\x01': - break + with open(filename, "rb") as f: + if ShareFile.is_valid_header(f.read(32)): + break else: self.fail("unable to find any uri_extension files in %r" % self.basedir) diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index a03845ed6..9a0af1e06 100644 --- a/src/allmydata/test/test_util.py +++ b/src/allmydata/test/test_util.py @@ -553,11 +553,6 @@ class JSONBytes(unittest.TestCase): o, cls=jsonbytes.AnyBytesJSONEncoder)), expected, ) - self.assertEqual( - json.loads(jsonbytes.dumps(o, any_bytes=True)), - expected - ) - class FakeGetVersion(object): diff --git a/src/allmydata/test/web/test_introducer.py b/src/allmydata/test/web/test_introducer.py index 4b5850cbc..69309d35b 100644 --- a/src/allmydata/test/web/test_introducer.py +++ b/src/allmydata/test/web/test_introducer.py @@ -83,12 +83,18 @@ def create_introducer_webish(reactor, port_assigner, basedir): with the node and its webish service. """ node.create_node_dir(basedir, "testing") - _, port_endpoint = port_assigner.assign(reactor) + main_tub_location, main_tub_endpoint = port_assigner.assign(reactor) + _, web_port_endpoint = port_assigner.assign(reactor) with open(join(basedir, "tahoe.cfg"), "w") as f: f.write( "[node]\n" - "tub.location = 127.0.0.1:1\n" + - "web.port = {}\n".format(port_endpoint) + "tub.port = {main_tub_endpoint}\n" + "tub.location = {main_tub_location}\n" + "web.port = {web_port_endpoint}\n".format( + main_tub_endpoint=main_tub_endpoint, + main_tub_location=main_tub_location, + web_port_endpoint=web_port_endpoint, + ) ) intro_node = yield create_introducer(basedir) diff --git a/src/allmydata/util/_eliot_updates.py b/src/allmydata/util/_eliot_updates.py new file mode 100644 index 000000000..81db566a4 --- /dev/null +++ b/src/allmydata/util/_eliot_updates.py @@ -0,0 +1,195 @@ +""" +Bring in some Eliot updates from newer versions of Eliot than we can +depend on in Python 2. The implementations are copied from Eliot 1.14 and +only changed enough to add Python 2 compatibility. + +Every API in this module (except ``eliot_json_encoder``) should be obsolete as +soon as we depend on Eliot 1.14 or newer. + +When that happens: + +* replace ``capture_logging`` + with ``partial(eliot.testing.capture_logging, encoder_=eliot_json_encoder)`` +* replace ``validateLogging`` + with ``partial(eliot.testing.validateLogging, encoder_=eliot_json_encoder)`` +* replace ``MemoryLogger`` + with ``partial(eliot.MemoryLogger, encoder=eliot_json_encoder)`` + +Ported to Python 3. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from future.utils import PY2 +if PY2: + from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401 + +import json as pyjson +from functools import wraps, partial + +from eliot import ( + MemoryLogger as _MemoryLogger, +) + +from eliot.testing import ( + check_for_errors, + swap_logger, +) + +from .jsonbytes import AnyBytesJSONEncoder + +# There are currently a number of log messages that include non-UTF-8 bytes. +# Allow these, at least for now. Later when the whole test suite has been +# converted to our SyncTestCase or AsyncTestCase it will be easier to turn +# this off and then attribute log failures to specific codepaths so they can +# be fixed (and then not regressed later) because those instances will result +# in test failures instead of only garbage being written to the eliot log. +eliot_json_encoder = AnyBytesJSONEncoder + +class _CustomEncoderMemoryLogger(_MemoryLogger): + """ + Override message validation from the Eliot-supplied ``MemoryLogger`` to + use our chosen JSON encoder. + + This is only necessary on Python 2 where we use an old version of Eliot + that does not parameterize the encoder. + """ + def __init__(self, encoder=eliot_json_encoder): + """ + @param encoder: A JSONEncoder subclass to use when encoding JSON. + """ + self._encoder = encoder + super(_CustomEncoderMemoryLogger, self).__init__() + + def _validate_message(self, dictionary, serializer): + """Validate an individual message. + + As a side-effect, the message is replaced with its serialized contents. + + @param dictionary: A message C{dict} to be validated. Might be mutated + by the serializer! + + @param serializer: C{None} or a serializer. + + @raises TypeError: If a field name is not unicode, or the dictionary + fails to serialize to JSON. + + @raises eliot.ValidationError: If serializer was given and validation + failed. + """ + if serializer is not None: + serializer.validate(dictionary) + for key in dictionary: + if not isinstance(key, str): + if isinstance(key, bytes): + key.decode("utf-8") + else: + raise TypeError(dictionary, "%r is not unicode" % (key,)) + if serializer is not None: + serializer.serialize(dictionary) + + try: + pyjson.dumps(dictionary, cls=self._encoder) + except Exception as e: + raise TypeError("Message %s doesn't encode to JSON: %s" % (dictionary, e)) + +if PY2: + MemoryLogger = partial(_CustomEncoderMemoryLogger, encoder=eliot_json_encoder) +else: + MemoryLogger = partial(_MemoryLogger, encoder=eliot_json_encoder) + +def validateLogging( + assertion, *assertionArgs, **assertionKwargs +): + """ + Decorator factory for L{unittest.TestCase} methods to add logging + validation. + + 1. The decorated test method gets a C{logger} keyword argument, a + L{MemoryLogger}. + 2. All messages logged to this logger will be validated at the end of + the test. + 3. Any unflushed logged tracebacks will cause the test to fail. + + For example: + + from unittest import TestCase + from eliot.testing import assertContainsFields, validateLogging + + class MyTests(TestCase): + def assertFooLogging(self, logger): + assertContainsFields(self, logger.messages[0], {"key": 123}) + + + @param assertion: A callable that will be called with the + L{unittest.TestCase} instance, the logger and C{assertionArgs} and + C{assertionKwargs} once the actual test has run, allowing for extra + logging-related assertions on the effects of the test. Use L{None} if you + want the cleanup assertions registered but no custom assertions. + + @param assertionArgs: Additional positional arguments to pass to + C{assertion}. + + @param assertionKwargs: Additional keyword arguments to pass to + C{assertion}. + + @param encoder_: C{json.JSONEncoder} subclass to use when validating JSON. + """ + encoder_ = assertionKwargs.pop("encoder_", eliot_json_encoder) + def decorator(function): + @wraps(function) + def wrapper(self, *args, **kwargs): + skipped = False + + kwargs["logger"] = logger = MemoryLogger(encoder=encoder_) + self.addCleanup(check_for_errors, logger) + # TestCase runs cleanups in reverse order, and we want this to + # run *before* tracebacks are checked: + if assertion is not None: + self.addCleanup( + lambda: skipped + or assertion(self, logger, *assertionArgs, **assertionKwargs) + ) + try: + return function(self, *args, **kwargs) + except self.skipException: + skipped = True + raise + + return wrapper + + return decorator + +# PEP 8 variant: +validate_logging = validateLogging + +def capture_logging( + assertion, *assertionArgs, **assertionKwargs +): + """ + Capture and validate all logging that doesn't specify a L{Logger}. + + See L{validate_logging} for details on the rest of its behavior. + """ + encoder_ = assertionKwargs.pop("encoder_", eliot_json_encoder) + def decorator(function): + @validate_logging( + assertion, *assertionArgs, encoder_=encoder_, **assertionKwargs + ) + @wraps(function) + def wrapper(self, *args, **kwargs): + logger = kwargs["logger"] + previous_logger = swap_logger(logger) + + def cleanup(): + swap_logger(previous_logger) + + self.addCleanup(cleanup) + return function(self, *args, **kwargs) + + return wrapper + + return decorator diff --git a/src/allmydata/util/eliotutil.py b/src/allmydata/util/eliotutil.py index 4e48fbb9f..789ef38ff 100644 --- a/src/allmydata/util/eliotutil.py +++ b/src/allmydata/util/eliotutil.py @@ -16,12 +16,14 @@ from __future__ import ( ) __all__ = [ + "MemoryLogger", "inline_callbacks", "eliot_logging_service", "opt_eliot_destination", "opt_help_eliot_destinations", "validateInstanceOf", "validateSetMembership", + "capture_logging", ] from future.utils import PY2 @@ -32,7 +34,7 @@ from six import ensure_text from sys import ( stdout, ) -from functools import wraps, partial +from functools import wraps from logging import ( INFO, Handler, @@ -66,8 +68,6 @@ from eliot.twisted import ( DeferredContext, inline_callbacks, ) -from eliot.testing import capture_logging as eliot_capture_logging - from twisted.python.usage import ( UsageError, ) @@ -87,8 +87,11 @@ from twisted.internet.defer import ( ) from twisted.application.service import Service -from .jsonbytes import AnyBytesJSONEncoder - +from ._eliot_updates import ( + MemoryLogger, + eliot_json_encoder, + capture_logging, +) def validateInstanceOf(t): """ @@ -306,7 +309,7 @@ class _DestinationParser(object): rotateLength=rotate_length, maxRotatedFiles=max_rotated_files, ) - return lambda reactor: FileDestination(get_file(), AnyBytesJSONEncoder) + return lambda reactor: FileDestination(get_file(), eliot_json_encoder) _parse_destination_description = _DestinationParser().parse @@ -327,10 +330,3 @@ def log_call_deferred(action_type): return DeferredContext(d).addActionFinish() return logged_f return decorate_log_call_deferred - -# On Python 3, encoding bytes to JSON doesn't work, so we have a custom JSON -# encoder we want to use when validating messages. -if PY2: - capture_logging = eliot_capture_logging -else: - capture_logging = partial(eliot_capture_logging, encoder_=AnyBytesJSONEncoder)