Merge branch 'master' of github.com:tahoe-lafs/tahoe-lafs into 3758.refactor.web-tests-grid-logs-root

This commit is contained in:
fenn-cs 2021-09-08 00:09:33 +01:00
commit 0902dbfac8
23 changed files with 705 additions and 250 deletions

View File

@ -24,11 +24,21 @@ Glossary
storage server
a Tahoe-LAFS process configured to offer storage and reachable over the network for store and retrieve operations
storage service
a Python object held in memory in the storage server which provides the implementation of the storage protocol
introducer
a Tahoe-LAFS process at a known location configured to re-publish announcements about the location of storage servers
fURL
a self-authenticating URL-like string which can be used to locate a remote object using the Foolscap protocol
(the storage service is an example of such an object)
NURL
a self-authenticating URL-like string almost exactly like a fURL but without being tied to Foolscap
swissnum
a short random string which is part of a fURL and which acts as a shared secret to authorize clients to use a storage service
lease
state associated with a share informing a storage server of the duration of storage desired by a client
@ -45,7 +55,7 @@ Glossary
(sometimes "slot" is considered a synonym for "storage index of a slot")
storage index
a short string which can address a slot or a bucket
a 16 byte string which can address a slot or a bucket
(in practice, derived by hashing the encryption key associated with contents of that slot or bucket)
write enabler
@ -128,6 +138,8 @@ The Foolscap-based protocol offers:
* A careful configuration of the TLS connection parameters *may* also offer **forward secrecy**.
However, Tahoe-LAFS' use of Foolscap takes no steps to ensure this is the case.
* **Storage authorization** by way of a capability contained in the fURL addressing a storage service.
Discussion
!!!!!!!!!!
@ -158,6 +170,10 @@ there is no way to write data which appears legitimate to a legitimate client).
Therefore, **message confidentiality** is necessary when exchanging these secrets.
**Forward secrecy** is preferred so that an attacker recording an exchange today cannot launch this attack at some future point after compromising the necessary keys.
A storage service offers service only to some clients.
A client proves their authorization to use the storage service by presenting a shared secret taken from the fURL.
In this way **storage authorization** is performed to prevent disallowed parties from consuming any storage resources.
Functionality
-------------
@ -214,6 +230,10 @@ Additionally,
by continuing to interact using TLS,
Bob's client and Alice's storage node are assured of both **message authentication** and **message confidentiality**.
Bob's client further inspects the fURL for the *swissnum*.
When Bob's client issues HTTP requests to Alice's storage node it includes the *swissnum* in its requests.
**Storage authorization** has been achieved.
.. note::
Foolscap TubIDs are 20 bytes (SHA1 digest of the certificate).
@ -343,6 +363,12 @@ one branch contains all of the share data;
another branch contains all of the lease data;
etc.
Authorization is required for all endpoints.
The standard HTTP authorization protocol is used.
The authentication *type* used is ``Tahoe-LAFS``.
The swissnum from the NURL used to locate the storage service is used as the *credentials*.
If credentials are not presented or the swissnum is not associated with a storage service then no storage processing is performed and the request receives an ``UNAUTHORIZED`` response.
General
~~~~~~~
@ -369,7 +395,7 @@ For example::
``PUT /v1/lease/:storage_index``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Create a new lease on the bucket addressed by ``storage_index``.
Either renew or create a new lease on the bucket addressed by ``storage_index``.
The details of the lease are encoded in the request body.
For example::
@ -380,6 +406,10 @@ then the expiration time of that lease will be changed to 31 days after the time
If it does not match an existing lease
then a new lease will be created with this ``renew-secret`` which expires 31 days after the time of this operation.
``renew-secret`` and ``cancel-secret`` values must be 32 bytes long.
The server treats them as opaque values.
:ref:`Share Leases` gives details about how the Tahoe-LAFS storage client constructs these values.
In these cases the response is ``NO CONTENT`` with an empty body.
It is possible that the storage server will have no shares for the given ``storage_index`` because:
@ -400,37 +430,11 @@ Several behaviors here are blindly copied from the Foolscap-based storage server
* There is a cancel secret but there is no API to use it to cancel a lease (see ticket:3768).
* The lease period is hard-coded at 31 days.
* There are separate **add** and **renew** lease APIs (see ticket:3773).
These are not necessarily ideal behaviors
but they are adopted to avoid any *semantic* changes between the Foolscap- and HTTP-based protocols.
It is expected that some or all of these behaviors may change in a future revision of the HTTP-based protocol.
``POST /v1/lease/:storage_index``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Renew an existing lease for all shares for the given storage index.
The details of the lease are encoded in the request body.
For example::
{"renew-secret": "abcd"}
If there are no shares for the given ``storage_index``
then ``NOT FOUND`` is returned.
If there is no lease with a matching ``renew-secret`` value on the given storage index
then ``NOT FOUND`` is returned.
In this case,
if the storage index refers to mutable data
then the response also includes a list of nodeids where the lease can be renewed.
For example::
{"nodeids": ["aaa...", "bbb..."]}
Othewise,
the matching lease's expiration time is changed to be 31 days from the time of this operation
and ``NO CONTENT`` is returned.
Immutable
---------
@ -484,17 +488,32 @@ The response includes ``already-have`` and ``allocated`` for two reasons:
Write data for the indicated share.
The share number must belong to the storage index.
The request body is the raw share data (i.e., ``application/octet-stream``).
*Content-Range* requests are encouraged for large transfers.
*Content-Range* requests are encouraged for large transfers to allow partially complete uploads to be resumed.
For example,
for a 1MiB share the data can be broken in to 8 128KiB chunks.
Each chunk can be *PUT* separately with the appropriate *Content-Range* header.
a 1MiB share can be divided in to eight separate 128KiB chunks.
Each chunk can be uploaded in a separate request.
Each request can include a *Content-Range* value indicating its placement within the complete share.
If any one of these requests fails then at most 128KiB of upload work needs to be retried.
The server must recognize when all of the data has been received and mark the share as complete
(which it can do because it was informed of the size when the storage index was initialized).
Clients should upload chunks in re-assembly order.
Servers may reject out-of-order chunks for implementation simplicity.
If an individual *PUT* fails then only a limited amount of effort is wasted on the necessary retry.
.. think about copying https://developers.google.com/drive/api/v2/resumable-upload
* When a chunk that does not complete the share is successfully uploaded the response is ``OK``.
* When the chunk that completes the share is successfully uploaded the response is ``CREATED``.
* If the *Content-Range* for a request covers part of the share that has already been uploaded the response is ``CONFLICT``.
The response body indicates the range of share data that has yet to be uploaded.
That is::
{ "required":
[ { "begin": <byte position, inclusive>
, "end": <byte position, exclusive>
}
,
...
]
}
``POST /v1/immutable/:storage_index/:share_number/corrupt``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
@ -520,31 +539,20 @@ For example::
[1, 5]
``GET /v1/immutable/:storage_index?share=:s0&share=:sN&offset=o1&size=z0&offset=oN&size=zN``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
``GET /v1/immutable/:storage_index/:share_number``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Read data from the indicated immutable shares.
If ``share`` query parameters are given, selecte only those shares for reading.
Otherwise, select all shares present.
If ``size`` and ``offset`` query parameters are given,
only the portions thus identified of the selected shares are returned.
Otherwise, all data is from the selected shares is returned.
The response body contains a mapping giving the read data.
For example::
{
3: ["foo", "bar"],
7: ["baz", "quux"]
}
Read a contiguous sequence of bytes from one share in one bucket.
The response body is the raw share data (i.e., ``application/octet-stream``).
The ``Range`` header may be used to request exactly one ``bytes`` range.
Interpretation and response behavior is as specified in RFC 7233 § 4.1.
Multiple ranges in a single request are *not* supported.
Discussion
``````````
Offset and size of the requested data are specified here as query arguments.
Instead, this information could be present in a ``Range`` header in the request.
This is the more obvious choice and leverages an HTTP feature built for exactly this use-case.
However, HTTP requires that the ``Content-Type`` of the response to "range requests" be ``multipart/...``.
Multiple ``bytes`` ranges are not supported.
HTTP requires that the ``Content-Type`` of the response in that case be ``multipart/...``.
The ``multipart`` major type brings along string sentinel delimiting as a means to frame the different response parts.
There are many drawbacks to this framing technique:
@ -552,6 +560,15 @@ There are many drawbacks to this framing technique:
2. It is resource-intensive to parse.
3. It is complex to parse safely [#]_ [#]_ [#]_ [#]_.
A previous revision of this specification allowed requesting one or more contiguous sequences from one or more shares.
This *superficially* mirrored the Foolscap based interface somewhat closely.
The interface was simplified to this version because this version is all that is required to let clients retrieve any desired information.
It only requires that the client issue multiple requests.
This can be done with pipelining or parallel requests to avoid an additional latency penalty.
In the future,
if there are performance goals,
benchmarks can demonstrate whether they are achieved by a more complicated interface or some other change.
Mutable
-------
@ -676,8 +693,8 @@ Immutable Data
#. Renew the lease on all immutable shares in bucket ``AAAAAAAAAAAAAAAA``::
POST /v1/lease/AAAAAAAAAAAAAAAA
{"renew-secret": "efgh"}
PUT /v1/lease/AAAAAAAAAAAAAAAA
{"renew-secret": "efgh", "cancel-secret": "ijkl"}
204 NO CONTENT
@ -757,8 +774,8 @@ Mutable Data
#. Renew the lease on previously uploaded mutable share in slot ``BBBBBBBBBBBBBBBB``::
POST /v1/lease/BBBBBBBBBBBBBBBB
{"renew-secret": "efgh"}
PUT /v1/lease/BBBBBBBBBBBBBBBB
{"renew-secret": "efgh", "cancel-secret": "ijkl"}
204 NO CONTENT

View File

@ -0,0 +1,87 @@
"""
This is a reference implementation of the lease renewal secret derivation
protocol in use by Tahoe-LAFS clients as of 1.16.0.
"""
from allmydata.util.base32 import (
a2b as b32decode,
b2a as b32encode,
)
from allmydata.util.hashutil import (
tagged_hash,
tagged_pair_hash,
)
def derive_renewal_secret(lease_secret: bytes, storage_index: bytes, tubid: bytes) -> bytes:
assert len(lease_secret) == 32
assert len(storage_index) == 16
assert len(tubid) == 20
bucket_renewal_tag = b"allmydata_bucket_renewal_secret_v1"
file_renewal_tag = b"allmydata_file_renewal_secret_v1"
client_renewal_tag = b"allmydata_client_renewal_secret_v1"
client_renewal_secret = tagged_hash(lease_secret, client_renewal_tag)
file_renewal_secret = tagged_pair_hash(
file_renewal_tag,
client_renewal_secret,
storage_index,
)
peer_id = tubid
return tagged_pair_hash(bucket_renewal_tag, file_renewal_secret, peer_id)
def demo():
secret = b32encode(derive_renewal_secret(
b"lease secretxxxxxxxxxxxxxxxxxxxx",
b"storage indexxxx",
b"tub idxxxxxxxxxxxxxx",
)).decode("ascii")
print("An example renewal secret: {}".format(secret))
def test():
# These test vectors created by intrumenting Tahoe-LAFS
# bb57fcfb50d4e01bbc4de2e23dbbf7a60c004031 to emit `self.renew_secret` in
# allmydata.immutable.upload.ServerTracker.query and then uploading a
# couple files to a couple different storage servers.
test_vector = [
dict(lease_secret=b"boity2cdh7jvl3ltaeebuiobbspjmbuopnwbde2yeh4k6x7jioga",
storage_index=b"vrttmwlicrzbt7gh5qsooogr7u",
tubid=b"v67jiisoty6ooyxlql5fuucitqiok2ic",
expected=b"osd6wmc5vz4g3ukg64sitmzlfiaaordutrez7oxdp5kkze7zp5zq",
),
dict(lease_secret=b"boity2cdh7jvl3ltaeebuiobbspjmbuopnwbde2yeh4k6x7jioga",
storage_index=b"75gmmfts772ww4beiewc234o5e",
tubid=b"v67jiisoty6ooyxlql5fuucitqiok2ic",
expected=b"35itmusj7qm2pfimh62snbyxp3imreofhx4djr7i2fweta75szda",
),
dict(lease_secret=b"boity2cdh7jvl3ltaeebuiobbspjmbuopnwbde2yeh4k6x7jioga",
storage_index=b"75gmmfts772ww4beiewc234o5e",
tubid=b"lh5fhobkjrmkqjmkxhy3yaonoociggpz",
expected=b"srrlruge47ws3lm53vgdxprgqb6bz7cdblnuovdgtfkqrygrjm4q",
),
dict(lease_secret=b"vacviff4xfqxsbp64tdr3frg3xnkcsuwt5jpyat2qxcm44bwu75a",
storage_index=b"75gmmfts772ww4beiewc234o5e",
tubid=b"lh5fhobkjrmkqjmkxhy3yaonoociggpz",
expected=b"b4jledjiqjqekbm2erekzqumqzblegxi23i5ojva7g7xmqqnl5pq",
),
]
for n, item in enumerate(test_vector):
derived = b32encode(derive_renewal_secret(
b32decode(item["lease_secret"]),
b32decode(item["storage_index"]),
b32decode(item["tubid"]),
))
assert derived == item["expected"] , \
"Test vector {} failed: {} (expected) != {} (derived)".format(
n,
item["expected"],
derived,
)
print("{} test vectors validated".format(len(test_vector)))
test()
demo()

View File

@ -14,5 +14,6 @@ the data formats used by Tahoe.
URI-extension
mutable
dirnodes
lease
servers-of-happiness
backends/raic

View File

@ -0,0 +1,69 @@
.. -*- coding: utf-8 -*-
.. _share leases:
Share Leases
============
A lease is a marker attached to a share indicating that some client has asked for that share to be retained for some amount of time.
The intent is to allow clients and servers to collaborate to determine which data should still be retained and which can be discarded to reclaim storage space.
Zero or more leases may be attached to any particular share.
Renewal Secrets
---------------
Each lease is uniquely identified by its **renewal secret**.
This is a 32 byte string which can be used to extend the validity period of that lease.
To a storage server a renewal secret is an opaque value which is only ever compared to other renewal secrets to determine equality.
Storage clients will typically want to follow a scheme to deterministically derive the renewal secret for a particular share from information the client already holds about that share.
This allows a client to maintain and renew single long-lived lease without maintaining additional local state.
The scheme in use in Tahoe-LAFS as of 1.16.0 is as follows.
* The **netstring encoding** of a byte string is the concatenation of:
* the ascii encoding of the base 10 representation of the length of the string
* ``":"``
* the string itself
* ``","``
* The **sha256d digest** is the **sha256 digest** of the **sha256 digest** of a string.
* The **sha256d tagged digest** is the **sha256d digest** of the concatenation of the **netstring encoding** of one string with one other unmodified string.
* The **sha256d tagged pair digest** the **sha256d digest** of the concatenation of the **netstring encodings** of each of three strings.
* The **bucket renewal tag** is ``"allmydata_bucket_renewal_secret_v1"``.
* The **file renewal tag** is ``"allmydata_file_renewal_secret_v1"``.
* The **client renewal tag** is ``"allmydata_client_renewal_secret_v1"``.
* The **lease secret** is a 32 byte string, typically randomly generated once and then persisted for all future uses.
* The **client renewal secret** is the **sha256d tagged digest** of (**lease secret**, **client renewal tag**).
* The **storage index** is constructed using a capability-type-specific scheme.
See ``storage_index_hash`` and ``ssk_storage_index_hash`` calls in ``src/allmydata/uri.py``.
* The **file renewal secret** is the **sha256d tagged pair digest** of (**file renewal tag**, **client renewal secret**, **storage index**).
* The **base32 encoding** is ``base64.b32encode`` lowercased and with trailing ``=`` stripped.
* The **peer id** is the **base32 encoding** of the SHA1 digest of the server's x509 certificate.
* The **renewal secret** is the **sha256d tagged pair digest** of (**bucket renewal tag**, **file renewal secret**, **peer id**).
A reference implementation is available.
.. literalinclude:: derive_renewal_secret.py
:language: python
:linenos:
Cancel Secrets
--------------
Lease cancellation is unimplemented.
Nevertheless,
a cancel secret is sent by storage clients to storage servers and stored in lease records.
The scheme for deriving **cancel secret** in use in Tahoe-LAFS as of 1.16.0 is similar to that used to derive the **renewal secret**.
The differences are:
* Use of **client renewal tag** is replaced by use of **client cancel tag**.
* Use of **file renewal secret** is replaced by use of **file cancel tag**.
* Use of **bucket renewal tag** is replaced by use of **bucket cancel tag**.
* **client cancel tag** is ``"allmydata_client_cancel_secret_v1"``.
* **file cancel tag** is ``"allmydata_file_cancel_secret_v1"``.
* **bucket cancel tag** is ``"allmydata_bucket_cancel_secret_v1"``.

0
newsfragments/3528.minor Normal file
View File

View File

@ -0,0 +1 @@
The Great Black Swamp specification now allows parallel upload of immutable share data.

0
newsfragments/3773.minor Normal file
View File

View File

@ -0,0 +1 @@
There is now a specification for the scheme which Tahoe-LAFS storage clients use to derive their lease renewal secrets.

View File

@ -0,0 +1 @@
The Great Black Swamp proposed specification now has a simplified interface for reading data from immutable shares.

View File

@ -0,0 +1 @@
The Great Black Swamp specification now describes the required authorization scheme.

View File

@ -154,25 +154,9 @@ class RIStorageServer(RemoteInterface):
"""
return Any() # returns None now, but future versions might change
def renew_lease(storage_index=StorageIndex, renew_secret=LeaseRenewSecret):
"""
Renew the lease on a given bucket, resetting the timer to 31 days.
Some networks will use this, some will not. If there is no bucket for
the given storage_index, IndexError will be raised.
For mutable shares, if the given renew_secret does not match an
existing lease, IndexError will be raised with a note listing the
server-nodeids on the existing leases, so leases on migrated shares
can be renewed. For immutable shares, IndexError (without the note)
will be raised.
"""
return Any()
def get_buckets(storage_index=StorageIndex):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
def slot_readv(storage_index=StorageIndex,
shares=ListOf(int), readv=ReadVector):
"""Read a vector from the numbered shares associated with the given
@ -343,14 +327,6 @@ class IStorageServer(Interface):
:see: ``RIStorageServer.add_lease``
"""
def renew_lease(
storage_index,
renew_secret,
):
"""
:see: ``RIStorageServer.renew_lease``
"""
def get_buckets(
storage_index,
):

View File

@ -9,6 +9,7 @@ if PY2:
import os, sys
from six.moves import StringIO
from past.builtins import unicode
import six
try:
@ -115,23 +116,75 @@ for module in (create_node,):
def parse_options(argv, config=None):
if not config:
config = Options()
config.parseOptions(argv) # may raise usage.error
try:
config.parseOptions(argv)
except usage.error as e:
if six.PY2:
# On Python 2 the exception may hold non-ascii in a byte string.
# This makes it impossible to convert the exception to any kind of
# string using str() or unicode(). It could also hold non-ascii
# in a unicode string which still makes it difficult to convert it
# to a byte string later.
#
# So, reach inside and turn it into some entirely safe ascii byte
# strings that will survive being written to stdout without
# causing too much damage in the process.
#
# As a result, non-ascii will not be rendered correctly but
# instead as escape sequences. At least this can go away when
# we're done with Python 2 support.
raise usage.error(*(
arg.encode("ascii", errors="backslashreplace")
if isinstance(arg, unicode)
else arg.decode("utf-8").encode("ascii", errors="backslashreplace")
for arg
in e.args
))
raise
return config
def parse_or_exit(config, argv, stdout, stderr):
"""
Parse Tahoe-LAFS CLI arguments and return a configuration object if they
are valid.
def parse_or_exit_with_explanation(argv, stdout=sys.stdout):
config = Options()
If they are invalid, write an explanation to ``stdout`` and exit.
:param allmydata.scripts.runner.Options config: An instance of the
argument-parsing class to use.
:param [unicode] argv: The argument list to parse, including the name of the
program being run as ``argv[0]``.
:param stdout: The file-like object to use as stdout.
:param stderr: The file-like object to use as stderr.
:raise SystemExit: If there is an argument-parsing problem.
:return: ``config``, after using it to parse the argument list.
"""
try:
parse_options(argv, config=config)
parse_options(argv[1:], config=config)
except usage.error as e:
# `parse_options` may have the side-effect of initializing a
# "sub-option" of the given configuration, even if it ultimately
# raises an exception. For example, `tahoe run --invalid-option` will
# set `config.subOptions` to an instance of
# `allmydata.scripts.tahoe_run.RunOptions` and then raise a
# `usage.error` because `RunOptions` does not recognize
# `--invalid-option`. If `run` itself had a sub-options then the same
# thing could happen but with another layer of nesting. We can
# present the user with the most precise information about their usage
# error possible by finding the most "sub" of the sub-options and then
# showing that to the user along with the usage error.
c = config
while hasattr(c, 'subOptions'):
c = c.subOptions
print(str(c), file=stdout)
# On Python 2 the string may turn into a unicode string, e.g. the error
# may be unicode, in which case it will print funny. Once we're on
# Python 3 we can just drop the ensure_str().
print(six.ensure_str("%s: %s\n" % (sys.argv[0], e)), file=stdout)
exc_str = str(e)
exc_bytes = six.ensure_binary(exc_str, "utf-8")
msg_bytes = b"%s: %s\n" % (six.ensure_binary(argv[0]), exc_bytes)
print(six.ensure_text(msg_bytes, "utf-8"), file=stdout)
sys.exit(1)
return config
@ -186,28 +239,66 @@ def _maybe_enable_eliot_logging(options, reactor):
PYTHON_3_WARNING = ("Support for Python 3 is an incomplete work-in-progress."
" Use at your own risk.")
def run():
if six.PY3:
print(PYTHON_3_WARNING, file=sys.stderr)
def run(configFactory=Options, argv=sys.argv, stdout=sys.stdout, stderr=sys.stderr):
"""
Run a Tahoe-LAFS node.
:param configFactory: A zero-argument callable which creates the config
object to use to parse the argument list.
:param [str] argv: The argument list to use to configure the run.
:param stdout: The file-like object to use for stdout.
:param stderr: The file-like object to use for stderr.
:raise SystemExit: Always raised after the run is complete.
"""
if six.PY3:
print(PYTHON_3_WARNING, file=stderr)
if sys.platform == "win32":
from allmydata.windows.fixups import initialize
initialize()
# doesn't return: calls sys.exit(rc)
task.react(_run_with_reactor)
task.react(
lambda reactor: _run_with_reactor(
reactor,
configFactory(),
argv,
stdout,
stderr,
),
)
def _setup_coverage(reactor):
def _setup_coverage(reactor, argv):
"""
Arrange for coverage to be collected if the 'coverage' package is
installed
If coverage measurement was requested, start collecting coverage
measurements and arrange to record those measurements when the process is
done.
Coverage measurement is considered requested if ``"--coverage"`` is in
``argv`` (and it will be removed from ``argv`` if it is found). There
should be a ``.coveragerc`` file in the working directory if coverage
measurement is requested.
This is only necessary to support multi-process coverage measurement,
typically when the test suite is running, and with the pytest-based
*integration* test suite (at ``integration/`` in the root of the source
tree) foremost in mind. The idea is that if you are running Tahoe-LAFS in
a configuration where multiple processes are involved - for example, a
test process and a client node process, if you only measure coverage from
the test process then you will fail to observe most Tahoe-LAFS code that
is being run.
This function arranges to have any Tahoe-LAFS process (such as that
client node process) collect and report coverage measurements as well.
"""
# can we put this _setup_coverage call after we hit
# argument-parsing?
# ensure_str() only necessary on Python 2.
if six.ensure_str('--coverage') not in sys.argv:
return
sys.argv.remove('--coverage')
argv.remove('--coverage')
try:
import coverage
@ -238,14 +329,37 @@ def _setup_coverage(reactor):
reactor.addSystemEventTrigger('after', 'shutdown', write_coverage_data)
def _run_with_reactor(reactor):
def _run_with_reactor(reactor, config, argv, stdout, stderr):
"""
Run a Tahoe-LAFS node using the given reactor.
_setup_coverage(reactor)
:param reactor: The reactor to use. This implementation largely ignores
this and lets the rest of the implementation pick its own reactor.
Oops.
argv = list(map(argv_to_unicode, sys.argv[1:]))
d = defer.maybeDeferred(parse_or_exit_with_explanation, argv)
:param twisted.python.usage.Options config: The config object to use to
parse the argument list.
:param [str] argv: The argument list to parse, *excluding* the name of the
program being run.
:param stdout: See ``run``.
:param stderr: See ``run``.
:return: A ``Deferred`` that fires when the run is complete.
"""
_setup_coverage(reactor, argv)
argv = list(map(argv_to_unicode, argv))
d = defer.maybeDeferred(
parse_or_exit,
config,
argv,
stdout,
stderr,
)
d.addCallback(_maybe_enable_eliot_logging, reactor)
d.addCallback(dispatch)
d.addCallback(dispatch, stdout=stdout, stderr=stderr)
def _show_exception(f):
# when task.react() notices a non-SystemExit exception, it does
# log.err() with the failure and then exits with rc=1. We want this
@ -253,7 +367,7 @@ def _run_with_reactor(reactor):
# weren't using react().
if f.check(SystemExit):
return f # dispatch function handled it
f.printTraceback(file=sys.stderr)
f.printTraceback(file=stderr)
sys.exit(1)
d.addErrback(_show_exception)
return d

View File

@ -192,7 +192,7 @@ class DaemonizeTahoeNodePlugin(object):
return DaemonizeTheRealService(self.nodetype, self.basedir, so)
def run(config):
def run(config, runApp=twistd.runApp):
"""
Runs a Tahoe-LAFS node in the foreground.
@ -212,10 +212,11 @@ def run(config):
if not nodetype:
print("%s is not a recognizable node directory" % quoted_basedir, file=err)
return 1
# Now prepare to turn into a twistd process. This os.chdir is the point
# of no return.
os.chdir(basedir)
twistd_args = ["--nodaemon"]
twistd_args = ["--nodaemon", "--rundir", basedir]
if sys.platform != "win32":
pidfile = get_pidfile(basedir)
twistd_args.extend(["--pidfile", pidfile])
twistd_args.extend(config.twistd_args)
twistd_args.append("DaemonizeTahoeNode") # point at our DaemonizeTahoeNodePlugin
@ -232,12 +233,11 @@ def run(config):
twistd_config.loadedPlugins = {"DaemonizeTahoeNode": DaemonizeTahoeNodePlugin(nodetype, basedir)}
# handle invalid PID file (twistd might not start otherwise)
pidfile = get_pidfile(basedir)
if get_pid_from_pidfile(pidfile) == -1:
if sys.platform != "win32" and get_pid_from_pidfile(pidfile) == -1:
print("found invalid PID file in %s - deleting it" % basedir, file=err)
os.remove(pidfile)
# We always pass --nodaemon so twistd.runApp does not daemonize.
print("running node in %s" % (quoted_basedir,), file=out)
twistd.runApp(twistd_config)
runApp(twistd_config)
return 0

View File

@ -49,6 +49,10 @@ from allmydata.storage.expirer import LeaseCheckingCrawler
NUM_RE=re.compile("^[0-9]+$")
# Number of seconds to add to expiration time on lease renewal.
# For now it's not actually configurable, but maybe someday.
DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
@implementer(RIStorageServer, IStatsProducer)
class StorageServer(service.MultiService, Referenceable):
@ -62,7 +66,8 @@ class StorageServer(service.MultiService, Referenceable):
expiration_mode="age",
expiration_override_lease_duration=None,
expiration_cutoff_date=None,
expiration_sharetypes=("mutable", "immutable")):
expiration_sharetypes=("mutable", "immutable"),
get_current_time=time.time):
service.MultiService.__init__(self)
assert isinstance(nodeid, bytes)
assert len(nodeid) == 20
@ -114,6 +119,7 @@ class StorageServer(service.MultiService, Referenceable):
expiration_cutoff_date,
expiration_sharetypes)
self.lease_checker.setServiceParent(self)
self._get_current_time = get_current_time
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
@ -264,7 +270,7 @@ class StorageServer(service.MultiService, Referenceable):
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
start = time.time()
start = self._get_current_time()
self.count("allocate")
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
@ -277,7 +283,7 @@ class StorageServer(service.MultiService, Referenceable):
# goes into the share files themselves. It could also be put into a
# separate database. Note that the lease should not be added until
# the BucketWriter has been closed.
expire_time = time.time() + 31*24*60*60
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -331,7 +337,7 @@ class StorageServer(service.MultiService, Referenceable):
if bucketwriters:
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
self.add_latency("allocate", time.time() - start)
self.add_latency("allocate", self._get_current_time() - start)
return alreadygot, bucketwriters
def _iter_share_files(self, storage_index):
@ -351,26 +357,26 @@ class StorageServer(service.MultiService, Referenceable):
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
start = time.time()
start = self._get_current_time()
self.count("add-lease")
new_expire_time = time.time() + 31*24*60*60
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
new_expire_time, self.my_nodeid)
for sf in self._iter_share_files(storage_index):
sf.add_or_renew_lease(lease_info)
self.add_latency("add-lease", time.time() - start)
self.add_latency("add-lease", self._get_current_time() - start)
return None
def remote_renew_lease(self, storage_index, renew_secret):
start = time.time()
start = self._get_current_time()
self.count("renew")
new_expire_time = time.time() + 31*24*60*60
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
found_buckets = False
for sf in self._iter_share_files(storage_index):
found_buckets = True
sf.renew_lease(renew_secret, new_expire_time)
self.add_latency("renew", time.time() - start)
self.add_latency("renew", self._get_current_time() - start)
if not found_buckets:
raise IndexError("no such lease to renew")
@ -394,7 +400,7 @@ class StorageServer(service.MultiService, Referenceable):
pass
def remote_get_buckets(self, storage_index):
start = time.time()
start = self._get_current_time()
self.count("get")
si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %r" % si_s)
@ -402,7 +408,7 @@ class StorageServer(service.MultiService, Referenceable):
for shnum, filename in self._get_bucket_shares(storage_index):
bucketreaders[shnum] = BucketReader(self, filename,
storage_index, shnum)
self.add_latency("get", time.time() - start)
self.add_latency("get", self._get_current_time() - start)
return bucketreaders
def get_leases(self, storage_index):
@ -563,7 +569,7 @@ class StorageServer(service.MultiService, Referenceable):
:return LeaseInfo: Information for a new lease for a share.
"""
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(ownerid,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -599,7 +605,7 @@ class StorageServer(service.MultiService, Referenceable):
See ``allmydata.interfaces.RIStorageServer`` for details about other
parameters and return value.
"""
start = time.time()
start = self._get_current_time()
self.count("writev")
si_s = si_b2a(storage_index)
log.msg("storage: slot_writev %r" % si_s)
@ -640,7 +646,7 @@ class StorageServer(service.MultiService, Referenceable):
self._add_or_renew_leases(remaining_shares, lease_info)
# all done
self.add_latency("writev", time.time() - start)
self.add_latency("writev", self._get_current_time() - start)
return (testv_is_good, read_data)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
@ -666,7 +672,7 @@ class StorageServer(service.MultiService, Referenceable):
return share
def remote_slot_readv(self, storage_index, shares, readv):
start = time.time()
start = self._get_current_time()
self.count("readv")
si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_readv %r %r" % (si_s, shares),
@ -675,7 +681,7 @@ class StorageServer(service.MultiService, Referenceable):
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
if not os.path.isdir(bucketdir):
self.add_latency("readv", time.time() - start)
self.add_latency("readv", self._get_current_time() - start)
return {}
datavs = {}
for sharenum_s in os.listdir(bucketdir):
@ -689,7 +695,7 @@ class StorageServer(service.MultiService, Referenceable):
datavs[sharenum] = msf.readv(readv)
log.msg("returning shares %s" % (list(datavs.keys()),),
facility="tahoe.storage", level=log.NOISY, parent=lp)
self.add_latency("readv", time.time() - start)
self.add_latency("readv", self._get_current_time() - start)
return datavs
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,

View File

@ -965,17 +965,6 @@ class _StorageServer(object):
cancel_secret,
)
def renew_lease(
self,
storage_index,
renew_secret,
):
return self._rref.callRemote(
"renew_lease",
storage_index,
renew_secret,
)
def get_buckets(
self,
storage_index,

View File

@ -11,23 +11,22 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six.moves import cStringIO as StringIO
from six import ensure_text, ensure_str
import re
from six import ensure_text
import os.path
import sys
import re
from mock import patch, Mock
from urllib.parse import quote as url_quote
from twisted.trial import unittest
from twisted.python.monkey import MonkeyPatcher
from twisted.internet import task
from twisted.python.filepath import FilePath
from twisted.internet.testing import (
MemoryReactor,
)
from twisted.internet.test.modulehelpers import (
AlternateReactor,
)
import allmydata
from allmydata.crypto import ed25519
from allmydata.util import fileutil, hashutil, base32
from allmydata.util.namespace import Namespace
from allmydata import uri
from allmydata.immutable import upload
from allmydata.dirnode import normalize
@ -524,42 +523,34 @@ class CLI(CLITestMixin, unittest.TestCase):
self.failUnlessIn(normalize(file), filenames)
def test_exception_catcher(self):
"""
An exception that is otherwise unhandled during argument dispatch is
written to stderr and causes the process to exit with code 1.
"""
self.basedir = "cli/exception_catcher"
stderr = StringIO()
exc = Exception("canary")
ns = Namespace()
ns.parse_called = False
def call_parse_or_exit(args):
ns.parse_called = True
class BrokenOptions(object):
def parseOptions(self, argv):
raise exc
ns.sys_exit_called = False
def call_sys_exit(exitcode):
ns.sys_exit_called = True
self.failUnlessEqual(exitcode, 1)
stderr = StringIO()
def fake_react(f):
reactor = Mock()
d = f(reactor)
# normally this Deferred would be errbacked with SystemExit, but
# since we mocked out sys.exit, it will be fired with None. So
# it's safe to drop it on the floor.
del d
reactor = MemoryReactor()
patcher = MonkeyPatcher((runner, 'parse_or_exit_with_explanation',
call_parse_or_exit),
(sys, 'argv', ["tahoe"]),
(sys, 'exit', call_sys_exit),
(sys, 'stderr', stderr),
(task, 'react', fake_react),
with AlternateReactor(reactor):
with self.assertRaises(SystemExit) as ctx:
runner.run(
configFactory=BrokenOptions,
argv=["tahoe"],
stderr=stderr,
)
patcher.runWithPatches(runner.run)
self.failUnless(ns.parse_called)
self.failUnless(ns.sys_exit_called)
self.assertTrue(reactor.hasRun)
self.assertFalse(reactor.running)
self.failUnlessIn(str(exc), stderr.getvalue())
self.assertEqual(1, ctx.exception.code)
class Help(unittest.TestCase):
@ -1331,30 +1322,3 @@ class Options(ReallyEqualMixin, unittest.TestCase):
["--node-directory=there", "run", some_twistd_option])
self.failUnlessRaises(usage.UsageError, self.parse,
["run", "--basedir=here", some_twistd_option])
class Run(unittest.TestCase):
@patch('allmydata.scripts.tahoe_run.os.chdir')
@patch('allmydata.scripts.tahoe_run.twistd')
def test_non_numeric_pid(self, mock_twistd, chdir):
"""
If the pidfile exists but does not contain a numeric value, a complaint to
this effect is written to stderr.
"""
basedir = FilePath(ensure_str(self.mktemp()))
basedir.makedirs()
basedir.child(u"twistd.pid").setContent(b"foo")
basedir.child(u"tahoe-client.tac").setContent(b"")
config = tahoe_run.RunOptions()
config.stdout = StringIO()
config.stderr = StringIO()
config['basedir'] = ensure_text(basedir.path)
config.twistd_args = []
result_code = tahoe_run.run(config)
self.assertIn("invalid PID file", config.stderr.getvalue())
self.assertTrue(len(mock_twistd.mock_calls), 1)
self.assertEqual(mock_twistd.mock_calls[0][0], 'runApp')
self.assertEqual(0, result_code)

View File

@ -16,11 +16,19 @@ from six.moves import (
StringIO,
)
from testtools import (
skipIf,
)
from testtools.matchers import (
Contains,
Equals,
HasLength,
)
from twisted.python.runtime import (
platform,
)
from twisted.python.filepath import (
FilePath,
)
@ -33,6 +41,8 @@ from twisted.internet.test.modulehelpers import (
from ...scripts.tahoe_run import (
DaemonizeTheRealService,
RunOptions,
run,
)
from ...scripts.runner import (
@ -135,3 +145,40 @@ class DaemonizeTheRealServiceTests(SyncTestCase):
""",
"Privacy requested",
)
class RunTests(SyncTestCase):
"""
Tests for ``run``.
"""
@skipIf(platform.isWindows(), "There are no PID files on Windows.")
def test_non_numeric_pid(self):
"""
If the pidfile exists but does not contain a numeric value, a complaint to
this effect is written to stderr.
"""
basedir = FilePath(self.mktemp()).asTextMode()
basedir.makedirs()
basedir.child(u"twistd.pid").setContent(b"foo")
basedir.child(u"tahoe-client.tac").setContent(b"")
config = RunOptions()
config.stdout = StringIO()
config.stderr = StringIO()
config['basedir'] = basedir.path
config.twistd_args = []
runs = []
result_code = run(config, runApp=runs.append)
self.assertThat(
config.stderr.getvalue(),
Contains("found invalid PID file in"),
)
self.assertThat(
runs,
HasLength(1),
)
self.assertThat(
result_code,
Equals(0),
)

View File

@ -35,6 +35,9 @@ from twisted.internet.error import (
from twisted.internet.interfaces import (
IProcessProtocol,
)
from twisted.python.log import (
msg,
)
from twisted.python.filepath import (
FilePath,
)
@ -99,7 +102,10 @@ class _ProcessProtocolAdapter(ProcessProtocol, object):
try:
proto = self._fds[childFD]
except KeyError:
pass
msg(format="Received unhandled output on %(fd)s: %(output)s",
fd=childFD,
output=data,
)
else:
proto.dataReceived(data)
@ -158,6 +164,9 @@ class CLINodeAPI(object):
u"-m",
u"allmydata.scripts.runner",
] + argv
msg(format="Executing %(argv)s",
argv=argv,
)
return self.reactor.spawnProcess(
processProtocol=process_protocol,
executable=exe,

View File

@ -15,6 +15,9 @@ import os
import sys
import time
import signal
from functools import (
partial,
)
from random import randrange
if PY2:
from StringIO import StringIO
@ -98,7 +101,7 @@ def run_cli_native(verb, *args, **kwargs):
args=args,
nodeargs=nodeargs,
)
argv = nodeargs + [verb] + list(args)
argv = ["tahoe"] + nodeargs + [verb] + list(args)
stdin = kwargs.get("stdin", "")
if PY2:
# The original behavior, the Python 2 behavior, is to accept either
@ -128,10 +131,20 @@ def run_cli_native(verb, *args, **kwargs):
stdout = TextIOWrapper(BytesIO(), encoding)
stderr = TextIOWrapper(BytesIO(), encoding)
d = defer.succeed(argv)
d.addCallback(runner.parse_or_exit_with_explanation, stdout=stdout)
d.addCallback(runner.dispatch,
d.addCallback(
partial(
runner.parse_or_exit,
runner.Options(),
),
stdout=stdout,
stderr=stderr,
)
d.addCallback(
runner.dispatch,
stdin=stdin,
stdout=stdout, stderr=stderr)
stdout=stdout,
stderr=stderr,
)
def _done(rc, stdout=stdout, stderr=stderr):
if return_bytes and PY3:
stdout = stdout.buffer

View File

@ -19,6 +19,21 @@ import os.path, re, sys
from os import linesep
import locale
import six
from testtools import (
skipUnless,
)
from testtools.matchers import (
MatchesListwise,
MatchesAny,
Contains,
Equals,
Always,
)
from testtools.twistedsupport import (
succeeded,
)
from eliot import (
log_call,
)
@ -39,6 +54,10 @@ from allmydata.util import fileutil, pollmixin
from allmydata.util.encodingutil import unicode_to_argv
from allmydata.test import common_util
import allmydata
from allmydata.scripts.runner import (
parse_options,
)
from .common import (
PIPE,
Popen,
@ -46,6 +65,7 @@ from .common import (
from .common_util import (
parse_cli,
run_cli,
run_cli_unicode,
)
from .cli_node_api import (
CLINodeAPI,
@ -56,6 +76,9 @@ from .cli_node_api import (
from ..util.eliotutil import (
inline_callbacks,
)
from .common import (
SyncTestCase,
)
def get_root_from_file(src):
srcdir = os.path.dirname(os.path.dirname(os.path.normcase(os.path.realpath(src))))
@ -74,6 +97,56 @@ srcfile = allmydata.__file__
rootdir = get_root_from_file(srcfile)
class ParseOptionsTests(SyncTestCase):
"""
Tests for ``parse_options``.
"""
@skipUnless(six.PY2, "Only Python 2 exceptions must stringify to bytes.")
def test_nonascii_unknown_subcommand_python2(self):
"""
When ``parse_options`` is called with an argv indicating a subcommand that
does not exist and which also contains non-ascii characters, the
exception it raises includes the subcommand encoded as UTF-8.
"""
tricky = u"\u00F6"
try:
parse_options([tricky])
except usage.error as e:
self.assertEqual(
b"Unknown command: \\xf6",
b"{}".format(e),
)
class ParseOrExitTests(SyncTestCase):
"""
Tests for ``parse_or_exit``.
"""
def test_nonascii_error_content(self):
"""
``parse_or_exit`` can report errors that include non-ascii content.
"""
tricky = u"\u00F6"
self.assertThat(
run_cli_unicode(tricky, [], encoding="utf-8"),
succeeded(
MatchesListwise([
# returncode
Equals(1),
# stdout
MatchesAny(
# Python 2
Contains(u"Unknown command: \\xf6"),
# Python 3
Contains(u"Unknown command: \xf6"),
),
# stderr,
Always()
]),
),
)
@log_call(action_type="run-bin-tahoe")
def run_bintahoe(extra_argv, python_options=None):
"""
@ -110,8 +183,16 @@ class BinTahoe(common_util.SignalMixin, unittest.TestCase):
"""
tricky = u"\u00F6"
out, err, returncode = run_bintahoe([tricky])
if PY2:
expected = u"Unknown command: \\xf6"
else:
expected = u"Unknown command: \xf6"
self.assertEqual(returncode, 1)
self.assertIn(u"Unknown command: " + tricky, out)
self.assertIn(
expected,
out,
"expected {!r} not found in {!r}\nstderr: {!r}".format(expected, out, err),
)
def test_with_python_options(self):
"""
@ -305,7 +386,12 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin):
u"--hostname", u"127.0.0.1",
])
self.assertEqual(returncode, 0)
self.assertEqual(
returncode,
0,
"stdout: {!r}\n"
"stderr: {!r}\n",
)
# This makes sure that node.url is written, which allows us to
# detect when the introducer restarts in _node_has_restarted below.

View File

@ -24,11 +24,12 @@ import gc
from twisted.trial import unittest
from twisted.internet import defer
from twisted.internet.task import Clock
import itertools
from allmydata import interfaces
from allmydata.util import fileutil, hashutil, base32
from allmydata.storage.server import StorageServer
from allmydata.storage.server import StorageServer, DEFAULT_RENEWAL_TIME
from allmydata.storage.shares import get_share_file
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
@ -168,7 +169,7 @@ class Bucket(unittest.TestCase):
assert len(renewsecret) == 32
cancelsecret = b'THIS LETS ME KILL YOUR FILE HAHA'
assert len(cancelsecret) == 32
expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
expirationtime = struct.pack('>L', DEFAULT_RENEWAL_TIME) # 31 days in seconds
lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
@ -354,10 +355,11 @@ class Server(unittest.TestCase):
basedir = os.path.join("storage", "Server", name)
return basedir
def create(self, name, reserved_space=0, klass=StorageServer):
def create(self, name, reserved_space=0, klass=StorageServer, get_current_time=time.time):
workdir = self.workdir(name)
ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space,
stats_provider=FakeStatsProvider())
stats_provider=FakeStatsProvider(),
get_current_time=get_current_time)
ss.setServiceParent(self.sparent)
return ss
@ -384,8 +386,8 @@ class Server(unittest.TestCase):
self.failUnlessIn(b'available-space', sv1)
def allocate(self, ss, storage_index, sharenums, size, canary=None):
renew_secret = hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret))
cancel_secret = hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret))
renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
if not canary:
canary = FakeCanary()
return ss.remote_allocate_buckets(storage_index,
@ -646,6 +648,27 @@ class Server(unittest.TestCase):
f2 = open(filename, "rb")
self.failUnlessEqual(f2.read(5), b"start")
def create_bucket_5_shares(
self, ss, storage_index, expected_already=0, expected_writers=5
):
"""
Given a StorageServer, create a bucket with 5 shares and return renewal
and cancellation secrets.
"""
canary = FakeCanary()
sharenums = list(range(5))
size = 100
# Creating a bucket also creates a lease:
rs, cs = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
already, writers = ss.remote_allocate_buckets(storage_index, rs, cs,
sharenums, size, canary)
self.failUnlessEqual(len(already), expected_already)
self.failUnlessEqual(len(writers), expected_writers)
for wb in writers.values():
wb.remote_close()
return rs, cs
def test_leases(self):
ss = self.create("test_leases")
@ -653,41 +676,23 @@ class Server(unittest.TestCase):
sharenums = list(range(5))
size = 100
rs0,cs0 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si0", rs0, cs0,
sharenums, size, canary)
self.failUnlessEqual(len(already), 0)
self.failUnlessEqual(len(writers), 5)
for wb in writers.values():
wb.remote_close()
# Create a bucket:
rs0, cs0 = self.create_bucket_5_shares(ss, b"si0")
leases = list(ss.get_leases(b"si0"))
self.failUnlessEqual(len(leases), 1)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
rs1,cs1 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si1", rs1, cs1,
sharenums, size, canary)
for wb in writers.values():
wb.remote_close()
rs1, cs1 = self.create_bucket_5_shares(ss, b"si1")
# take out a second lease on si1
rs2,cs2 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si1", rs2, cs2,
sharenums, size, canary)
self.failUnlessEqual(len(already), 5)
self.failUnlessEqual(len(writers), 0)
rs2, cs2 = self.create_bucket_5_shares(ss, b"si1", 5, 0)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 2)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
# and a third lease, using add-lease
rs2a,cs2a = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
ss.remote_add_lease(b"si1", rs2a, cs2a)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 3)
@ -715,10 +720,10 @@ class Server(unittest.TestCase):
"ss should not have a 'remote_cancel_lease' method/attribute")
# test overlapping uploads
rs3,cs3 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
rs4,cs4 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
rs3,cs3 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
rs4,cs4 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si3", rs3, cs3,
sharenums, size, canary)
self.failUnlessEqual(len(already), 0)
@ -741,6 +746,28 @@ class Server(unittest.TestCase):
leases = list(ss.get_leases(b"si3"))
self.failUnlessEqual(len(leases), 2)
def test_immutable_add_lease_renews(self):
"""
Adding a lease on an already leased immutable with the same secret just
renews it.
"""
clock = Clock()
clock.advance(123)
ss = self.create("test_immutable_add_lease_renews", get_current_time=clock.seconds)
# Start out with single lease created with bucket:
renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0")
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.expiration_time, 123 + DEFAULT_RENEWAL_TIME)
# Time passes:
clock.advance(123456)
# Adding a lease with matching renewal secret just renews it:
ss.remote_add_lease(b"si0", renewal_secret, cancel_secret)
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.expiration_time, 123 + 123456 + DEFAULT_RENEWAL_TIME)
def test_have_shares(self):
"""By default the StorageServer has no shares."""
workdir = self.workdir("test_have_shares")
@ -840,9 +867,10 @@ class MutableServer(unittest.TestCase):
basedir = os.path.join("storage", "MutableServer", name)
return basedir
def create(self, name):
def create(self, name, get_current_time=time.time):
workdir = self.workdir(name)
ss = StorageServer(workdir, b"\x00" * 20)
ss = StorageServer(workdir, b"\x00" * 20,
get_current_time=get_current_time)
ss.setServiceParent(self.sparent)
return ss
@ -1379,6 +1407,41 @@ class MutableServer(unittest.TestCase):
{0: ([], [(500, b"make me really bigger")], None)}, [])
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
def test_mutable_add_lease_renews(self):
"""
Adding a lease on an already leased mutable with the same secret just
renews it.
"""
clock = Clock()
clock.advance(235)
ss = self.create("test_mutable_add_lease_renews",
get_current_time=clock.seconds)
def secrets(n):
return ( self.write_enabler(b"we1"),
self.renew_secret(b"we1-%d" % n),
self.cancel_secret(b"we1-%d" % n) )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
write_enabler, renew_secret, cancel_secret = secrets(0)
rc = write(b"si1", (write_enabler, renew_secret, cancel_secret),
{0: ([], [(0,data)], None)}, [])
self.failUnlessEqual(rc, (True, {}))
bucket_dir = os.path.join(self.workdir("test_mutable_add_lease_renews"),
"shares", storage_index_to_dir(b"si1"))
s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
[lease] = s0.get_leases()
self.assertEqual(lease.expiration_time, 235 + DEFAULT_RENEWAL_TIME)
# Time passes...
clock.advance(835)
# Adding a lease renews it:
ss.remote_add_lease(b"si1", renew_secret, cancel_secret)
[lease] = s0.get_leases()
self.assertEqual(lease.expiration_time,
235 + 835 + DEFAULT_RENEWAL_TIME)
def test_remove(self):
ss = self.create("test_remove")
self.allocate(ss, b"si1", b"we1", next(self._lease_secret),

View File

@ -127,7 +127,7 @@ def argv_to_abspath(s, **kwargs):
return abspath_expanduser_unicode(decoded, **kwargs)
def unicode_to_argv(s, mangle=False):
def unicode_to_argv(s):
"""
Make the given unicode string suitable for use in an argv list.

View File

@ -188,7 +188,17 @@ def initialize():
# for example, the Python interpreter or any options passed to it, or runner
# scripts such as 'coverage run'. It works even if there are no such arguments,
# as in the case of a frozen executable created by bb-freeze or similar.
sys.argv = argv[-len(sys.argv):]
#
# Also, modify sys.argv in place. If any code has already taken a
# reference to the original argument list object then this ensures that
# code sees the new values. This reliance on mutation of shared state is,
# of course, awful. Why does this function even modify sys.argv? Why not
# have a function that *returns* the properly initialized argv as a new
# list? I don't know.
#
# At least Python 3 gets sys.argv correct so before very much longer we
# should be able to fix this bad design by deleting it.
sys.argv[:] = argv[-len(sys.argv):]
def a_console(handle):