Merge remote-tracking branch 'origin/master' into 3779-istorageserver-with-fewer-assumptions

This commit is contained in:
Itamar Turner-Trauring 2021-09-01 14:15:14 -04:00
commit 597c9adebd
16 changed files with 474 additions and 223 deletions

View File

@ -369,7 +369,7 @@ For example::
``PUT /v1/lease/:storage_index``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Create a new lease on the bucket addressed by ``storage_index``.
Either renew or create a new lease on the bucket addressed by ``storage_index``.
The details of the lease are encoded in the request body.
For example::
@ -400,37 +400,11 @@ Several behaviors here are blindly copied from the Foolscap-based storage server
* There is a cancel secret but there is no API to use it to cancel a lease (see ticket:3768).
* The lease period is hard-coded at 31 days.
* There are separate **add** and **renew** lease APIs (see ticket:3773).
These are not necessarily ideal behaviors
but they are adopted to avoid any *semantic* changes between the Foolscap- and HTTP-based protocols.
It is expected that some or all of these behaviors may change in a future revision of the HTTP-based protocol.
``POST /v1/lease/:storage_index``
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
Renew an existing lease for all shares for the given storage index.
The details of the lease are encoded in the request body.
For example::
{"renew-secret": "abcd"}
If there are no shares for the given ``storage_index``
then ``NOT FOUND`` is returned.
If there is no lease with a matching ``renew-secret`` value on the given storage index
then ``NOT FOUND`` is returned.
In this case,
if the storage index refers to mutable data
then the response also includes a list of nodeids where the lease can be renewed.
For example::
{"nodeids": ["aaa...", "bbb..."]}
Othewise,
the matching lease's expiration time is changed to be 31 days from the time of this operation
and ``NO CONTENT`` is returned.
Immutable
---------
@ -676,8 +650,8 @@ Immutable Data
#. Renew the lease on all immutable shares in bucket ``AAAAAAAAAAAAAAAA``::
POST /v1/lease/AAAAAAAAAAAAAAAA
{"renew-secret": "efgh"}
PUT /v1/lease/AAAAAAAAAAAAAAAA
{"renew-secret": "efgh", "cancel-secret": "ijkl"}
204 NO CONTENT
@ -757,8 +731,8 @@ Mutable Data
#. Renew the lease on previously uploaded mutable share in slot ``BBBBBBBBBBBBBBBB``::
POST /v1/lease/BBBBBBBBBBBBBBBB
{"renew-secret": "efgh"}
PUT /v1/lease/BBBBBBBBBBBBBBBB
{"renew-secret": "efgh", "cancel-secret": "ijkl"}
204 NO CONTENT

0
newsfragments/3528.minor Normal file
View File

0
newsfragments/3773.minor Normal file
View File

View File

@ -154,25 +154,9 @@ class RIStorageServer(RemoteInterface):
"""
return Any() # returns None now, but future versions might change
def renew_lease(storage_index=StorageIndex, renew_secret=LeaseRenewSecret):
"""
Renew the lease on a given bucket, resetting the timer to 31 days.
Some networks will use this, some will not. If there is no bucket for
the given storage_index, IndexError will be raised.
For mutable shares, if the given renew_secret does not match an
existing lease, IndexError will be raised with a note listing the
server-nodeids on the existing leases, so leases on migrated shares
can be renewed. For immutable shares, IndexError (without the note)
will be raised.
"""
return Any()
def get_buckets(storage_index=StorageIndex):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
def slot_readv(storage_index=StorageIndex,
shares=ListOf(int), readv=ReadVector):
"""Read a vector from the numbered shares associated with the given
@ -343,14 +327,6 @@ class IStorageServer(Interface):
:see: ``RIStorageServer.add_lease``
"""
def renew_lease(
storage_index,
renew_secret,
):
"""
:see: ``RIStorageServer.renew_lease``
"""
def get_buckets(
storage_index,
):

View File

@ -9,6 +9,7 @@ if PY2:
import os, sys
from six.moves import StringIO
from past.builtins import unicode
import six
try:
@ -115,23 +116,75 @@ for module in (create_node,):
def parse_options(argv, config=None):
if not config:
config = Options()
config.parseOptions(argv) # may raise usage.error
try:
config.parseOptions(argv)
except usage.error as e:
if six.PY2:
# On Python 2 the exception may hold non-ascii in a byte string.
# This makes it impossible to convert the exception to any kind of
# string using str() or unicode(). It could also hold non-ascii
# in a unicode string which still makes it difficult to convert it
# to a byte string later.
#
# So, reach inside and turn it into some entirely safe ascii byte
# strings that will survive being written to stdout without
# causing too much damage in the process.
#
# As a result, non-ascii will not be rendered correctly but
# instead as escape sequences. At least this can go away when
# we're done with Python 2 support.
raise usage.error(*(
arg.encode("ascii", errors="backslashreplace")
if isinstance(arg, unicode)
else arg.decode("utf-8").encode("ascii", errors="backslashreplace")
for arg
in e.args
))
raise
return config
def parse_or_exit(config, argv, stdout, stderr):
"""
Parse Tahoe-LAFS CLI arguments and return a configuration object if they
are valid.
def parse_or_exit_with_explanation(argv, stdout=sys.stdout):
config = Options()
If they are invalid, write an explanation to ``stdout`` and exit.
:param allmydata.scripts.runner.Options config: An instance of the
argument-parsing class to use.
:param [unicode] argv: The argument list to parse, including the name of the
program being run as ``argv[0]``.
:param stdout: The file-like object to use as stdout.
:param stderr: The file-like object to use as stderr.
:raise SystemExit: If there is an argument-parsing problem.
:return: ``config``, after using it to parse the argument list.
"""
try:
parse_options(argv, config=config)
parse_options(argv[1:], config=config)
except usage.error as e:
# `parse_options` may have the side-effect of initializing a
# "sub-option" of the given configuration, even if it ultimately
# raises an exception. For example, `tahoe run --invalid-option` will
# set `config.subOptions` to an instance of
# `allmydata.scripts.tahoe_run.RunOptions` and then raise a
# `usage.error` because `RunOptions` does not recognize
# `--invalid-option`. If `run` itself had a sub-options then the same
# thing could happen but with another layer of nesting. We can
# present the user with the most precise information about their usage
# error possible by finding the most "sub" of the sub-options and then
# showing that to the user along with the usage error.
c = config
while hasattr(c, 'subOptions'):
c = c.subOptions
print(str(c), file=stdout)
# On Python 2 the string may turn into a unicode string, e.g. the error
# may be unicode, in which case it will print funny. Once we're on
# Python 3 we can just drop the ensure_str().
print(six.ensure_str("%s: %s\n" % (sys.argv[0], e)), file=stdout)
exc_str = str(e)
exc_bytes = six.ensure_binary(exc_str, "utf-8")
msg_bytes = b"%s: %s\n" % (six.ensure_binary(argv[0]), exc_bytes)
print(six.ensure_text(msg_bytes, "utf-8"), file=stdout)
sys.exit(1)
return config
@ -186,28 +239,66 @@ def _maybe_enable_eliot_logging(options, reactor):
PYTHON_3_WARNING = ("Support for Python 3 is an incomplete work-in-progress."
" Use at your own risk.")
def run():
if six.PY3:
print(PYTHON_3_WARNING, file=sys.stderr)
def run(configFactory=Options, argv=sys.argv, stdout=sys.stdout, stderr=sys.stderr):
"""
Run a Tahoe-LAFS node.
:param configFactory: A zero-argument callable which creates the config
object to use to parse the argument list.
:param [str] argv: The argument list to use to configure the run.
:param stdout: The file-like object to use for stdout.
:param stderr: The file-like object to use for stderr.
:raise SystemExit: Always raised after the run is complete.
"""
if six.PY3:
print(PYTHON_3_WARNING, file=stderr)
if sys.platform == "win32":
from allmydata.windows.fixups import initialize
initialize()
# doesn't return: calls sys.exit(rc)
task.react(_run_with_reactor)
task.react(
lambda reactor: _run_with_reactor(
reactor,
configFactory(),
argv,
stdout,
stderr,
),
)
def _setup_coverage(reactor):
def _setup_coverage(reactor, argv):
"""
Arrange for coverage to be collected if the 'coverage' package is
installed
If coverage measurement was requested, start collecting coverage
measurements and arrange to record those measurements when the process is
done.
Coverage measurement is considered requested if ``"--coverage"`` is in
``argv`` (and it will be removed from ``argv`` if it is found). There
should be a ``.coveragerc`` file in the working directory if coverage
measurement is requested.
This is only necessary to support multi-process coverage measurement,
typically when the test suite is running, and with the pytest-based
*integration* test suite (at ``integration/`` in the root of the source
tree) foremost in mind. The idea is that if you are running Tahoe-LAFS in
a configuration where multiple processes are involved - for example, a
test process and a client node process, if you only measure coverage from
the test process then you will fail to observe most Tahoe-LAFS code that
is being run.
This function arranges to have any Tahoe-LAFS process (such as that
client node process) collect and report coverage measurements as well.
"""
# can we put this _setup_coverage call after we hit
# argument-parsing?
# ensure_str() only necessary on Python 2.
if six.ensure_str('--coverage') not in sys.argv:
return
sys.argv.remove('--coverage')
argv.remove('--coverage')
try:
import coverage
@ -238,14 +329,37 @@ def _setup_coverage(reactor):
reactor.addSystemEventTrigger('after', 'shutdown', write_coverage_data)
def _run_with_reactor(reactor):
def _run_with_reactor(reactor, config, argv, stdout, stderr):
"""
Run a Tahoe-LAFS node using the given reactor.
_setup_coverage(reactor)
:param reactor: The reactor to use. This implementation largely ignores
this and lets the rest of the implementation pick its own reactor.
Oops.
argv = list(map(argv_to_unicode, sys.argv[1:]))
d = defer.maybeDeferred(parse_or_exit_with_explanation, argv)
:param twisted.python.usage.Options config: The config object to use to
parse the argument list.
:param [str] argv: The argument list to parse, *excluding* the name of the
program being run.
:param stdout: See ``run``.
:param stderr: See ``run``.
:return: A ``Deferred`` that fires when the run is complete.
"""
_setup_coverage(reactor, argv)
argv = list(map(argv_to_unicode, argv))
d = defer.maybeDeferred(
parse_or_exit,
config,
argv,
stdout,
stderr,
)
d.addCallback(_maybe_enable_eliot_logging, reactor)
d.addCallback(dispatch)
d.addCallback(dispatch, stdout=stdout, stderr=stderr)
def _show_exception(f):
# when task.react() notices a non-SystemExit exception, it does
# log.err() with the failure and then exits with rc=1. We want this
@ -253,7 +367,7 @@ def _run_with_reactor(reactor):
# weren't using react().
if f.check(SystemExit):
return f # dispatch function handled it
f.printTraceback(file=sys.stderr)
f.printTraceback(file=stderr)
sys.exit(1)
d.addErrback(_show_exception)
return d

View File

@ -192,7 +192,7 @@ class DaemonizeTahoeNodePlugin(object):
return DaemonizeTheRealService(self.nodetype, self.basedir, so)
def run(config):
def run(config, runApp=twistd.runApp):
"""
Runs a Tahoe-LAFS node in the foreground.
@ -212,10 +212,11 @@ def run(config):
if not nodetype:
print("%s is not a recognizable node directory" % quoted_basedir, file=err)
return 1
# Now prepare to turn into a twistd process. This os.chdir is the point
# of no return.
os.chdir(basedir)
twistd_args = ["--nodaemon"]
twistd_args = ["--nodaemon", "--rundir", basedir]
if sys.platform != "win32":
pidfile = get_pidfile(basedir)
twistd_args.extend(["--pidfile", pidfile])
twistd_args.extend(config.twistd_args)
twistd_args.append("DaemonizeTahoeNode") # point at our DaemonizeTahoeNodePlugin
@ -232,12 +233,11 @@ def run(config):
twistd_config.loadedPlugins = {"DaemonizeTahoeNode": DaemonizeTahoeNodePlugin(nodetype, basedir)}
# handle invalid PID file (twistd might not start otherwise)
pidfile = get_pidfile(basedir)
if get_pid_from_pidfile(pidfile) == -1:
if sys.platform != "win32" and get_pid_from_pidfile(pidfile) == -1:
print("found invalid PID file in %s - deleting it" % basedir, file=err)
os.remove(pidfile)
# We always pass --nodaemon so twistd.runApp does not daemonize.
print("running node in %s" % (quoted_basedir,), file=out)
twistd.runApp(twistd_config)
runApp(twistd_config)
return 0

View File

@ -49,6 +49,10 @@ from allmydata.storage.expirer import LeaseCheckingCrawler
NUM_RE=re.compile("^[0-9]+$")
# Number of seconds to add to expiration time on lease renewal.
# For now it's not actually configurable, but maybe someday.
DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
@implementer(RIStorageServer, IStatsProducer)
class StorageServer(service.MultiService, Referenceable):
@ -62,7 +66,8 @@ class StorageServer(service.MultiService, Referenceable):
expiration_mode="age",
expiration_override_lease_duration=None,
expiration_cutoff_date=None,
expiration_sharetypes=("mutable", "immutable")):
expiration_sharetypes=("mutable", "immutable"),
get_current_time=time.time):
service.MultiService.__init__(self)
assert isinstance(nodeid, bytes)
assert len(nodeid) == 20
@ -114,6 +119,7 @@ class StorageServer(service.MultiService, Referenceable):
expiration_cutoff_date,
expiration_sharetypes)
self.lease_checker.setServiceParent(self)
self._get_current_time = get_current_time
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
@ -264,7 +270,7 @@ class StorageServer(service.MultiService, Referenceable):
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
start = time.time()
start = self._get_current_time()
self.count("allocate")
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
@ -277,7 +283,7 @@ class StorageServer(service.MultiService, Referenceable):
# goes into the share files themselves. It could also be put into a
# separate database. Note that the lease should not be added until
# the BucketWriter has been closed.
expire_time = time.time() + 31*24*60*60
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -331,7 +337,7 @@ class StorageServer(service.MultiService, Referenceable):
if bucketwriters:
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
self.add_latency("allocate", time.time() - start)
self.add_latency("allocate", self._get_current_time() - start)
return alreadygot, bucketwriters
def _iter_share_files(self, storage_index):
@ -351,26 +357,26 @@ class StorageServer(service.MultiService, Referenceable):
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
start = time.time()
start = self._get_current_time()
self.count("add-lease")
new_expire_time = time.time() + 31*24*60*60
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
new_expire_time, self.my_nodeid)
for sf in self._iter_share_files(storage_index):
sf.add_or_renew_lease(lease_info)
self.add_latency("add-lease", time.time() - start)
self.add_latency("add-lease", self._get_current_time() - start)
return None
def remote_renew_lease(self, storage_index, renew_secret):
start = time.time()
start = self._get_current_time()
self.count("renew")
new_expire_time = time.time() + 31*24*60*60
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
found_buckets = False
for sf in self._iter_share_files(storage_index):
found_buckets = True
sf.renew_lease(renew_secret, new_expire_time)
self.add_latency("renew", time.time() - start)
self.add_latency("renew", self._get_current_time() - start)
if not found_buckets:
raise IndexError("no such lease to renew")
@ -394,7 +400,7 @@ class StorageServer(service.MultiService, Referenceable):
pass
def remote_get_buckets(self, storage_index):
start = time.time()
start = self._get_current_time()
self.count("get")
si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %r" % si_s)
@ -402,7 +408,7 @@ class StorageServer(service.MultiService, Referenceable):
for shnum, filename in self._get_bucket_shares(storage_index):
bucketreaders[shnum] = BucketReader(self, filename,
storage_index, shnum)
self.add_latency("get", time.time() - start)
self.add_latency("get", self._get_current_time() - start)
return bucketreaders
def get_leases(self, storage_index):
@ -563,7 +569,7 @@ class StorageServer(service.MultiService, Referenceable):
:return LeaseInfo: Information for a new lease for a share.
"""
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(ownerid,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -599,7 +605,7 @@ class StorageServer(service.MultiService, Referenceable):
See ``allmydata.interfaces.RIStorageServer`` for details about other
parameters and return value.
"""
start = time.time()
start = self._get_current_time()
self.count("writev")
si_s = si_b2a(storage_index)
log.msg("storage: slot_writev %r" % si_s)
@ -640,7 +646,7 @@ class StorageServer(service.MultiService, Referenceable):
self._add_or_renew_leases(remaining_shares, lease_info)
# all done
self.add_latency("writev", time.time() - start)
self.add_latency("writev", self._get_current_time() - start)
return (testv_is_good, read_data)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
@ -666,7 +672,7 @@ class StorageServer(service.MultiService, Referenceable):
return share
def remote_slot_readv(self, storage_index, shares, readv):
start = time.time()
start = self._get_current_time()
self.count("readv")
si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_readv %r %r" % (si_s, shares),
@ -675,7 +681,7 @@ class StorageServer(service.MultiService, Referenceable):
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
if not os.path.isdir(bucketdir):
self.add_latency("readv", time.time() - start)
self.add_latency("readv", self._get_current_time() - start)
return {}
datavs = {}
for sharenum_s in os.listdir(bucketdir):
@ -689,7 +695,7 @@ class StorageServer(service.MultiService, Referenceable):
datavs[sharenum] = msf.readv(readv)
log.msg("returning shares %s" % (list(datavs.keys()),),
facility="tahoe.storage", level=log.NOISY, parent=lp)
self.add_latency("readv", time.time() - start)
self.add_latency("readv", self._get_current_time() - start)
return datavs
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,

View File

@ -965,17 +965,6 @@ class _StorageServer(object):
cancel_secret,
)
def renew_lease(
self,
storage_index,
renew_secret,
):
return self._rref.callRemote(
"renew_lease",
storage_index,
renew_secret,
)
def get_buckets(
self,
storage_index,

View File

@ -11,23 +11,22 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
from six.moves import cStringIO as StringIO
from six import ensure_text, ensure_str
import re
from six import ensure_text
import os.path
import sys
import re
from mock import patch, Mock
from urllib.parse import quote as url_quote
from twisted.trial import unittest
from twisted.python.monkey import MonkeyPatcher
from twisted.internet import task
from twisted.python.filepath import FilePath
from twisted.internet.testing import (
MemoryReactor,
)
from twisted.internet.test.modulehelpers import (
AlternateReactor,
)
import allmydata
from allmydata.crypto import ed25519
from allmydata.util import fileutil, hashutil, base32
from allmydata.util.namespace import Namespace
from allmydata import uri
from allmydata.immutable import upload
from allmydata.dirnode import normalize
@ -524,42 +523,34 @@ class CLI(CLITestMixin, unittest.TestCase):
self.failUnlessIn(normalize(file), filenames)
def test_exception_catcher(self):
"""
An exception that is otherwise unhandled during argument dispatch is
written to stderr and causes the process to exit with code 1.
"""
self.basedir = "cli/exception_catcher"
stderr = StringIO()
exc = Exception("canary")
ns = Namespace()
class BrokenOptions(object):
def parseOptions(self, argv):
raise exc
ns.parse_called = False
def call_parse_or_exit(args):
ns.parse_called = True
raise exc
stderr = StringIO()
ns.sys_exit_called = False
def call_sys_exit(exitcode):
ns.sys_exit_called = True
self.failUnlessEqual(exitcode, 1)
reactor = MemoryReactor()
def fake_react(f):
reactor = Mock()
d = f(reactor)
# normally this Deferred would be errbacked with SystemExit, but
# since we mocked out sys.exit, it will be fired with None. So
# it's safe to drop it on the floor.
del d
with AlternateReactor(reactor):
with self.assertRaises(SystemExit) as ctx:
runner.run(
configFactory=BrokenOptions,
argv=["tahoe"],
stderr=stderr,
)
patcher = MonkeyPatcher((runner, 'parse_or_exit_with_explanation',
call_parse_or_exit),
(sys, 'argv', ["tahoe"]),
(sys, 'exit', call_sys_exit),
(sys, 'stderr', stderr),
(task, 'react', fake_react),
)
patcher.runWithPatches(runner.run)
self.assertTrue(reactor.hasRun)
self.assertFalse(reactor.running)
self.failUnless(ns.parse_called)
self.failUnless(ns.sys_exit_called)
self.failUnlessIn(str(exc), stderr.getvalue())
self.assertEqual(1, ctx.exception.code)
class Help(unittest.TestCase):
@ -1331,30 +1322,3 @@ class Options(ReallyEqualMixin, unittest.TestCase):
["--node-directory=there", "run", some_twistd_option])
self.failUnlessRaises(usage.UsageError, self.parse,
["run", "--basedir=here", some_twistd_option])
class Run(unittest.TestCase):
@patch('allmydata.scripts.tahoe_run.os.chdir')
@patch('allmydata.scripts.tahoe_run.twistd')
def test_non_numeric_pid(self, mock_twistd, chdir):
"""
If the pidfile exists but does not contain a numeric value, a complaint to
this effect is written to stderr.
"""
basedir = FilePath(ensure_str(self.mktemp()))
basedir.makedirs()
basedir.child(u"twistd.pid").setContent(b"foo")
basedir.child(u"tahoe-client.tac").setContent(b"")
config = tahoe_run.RunOptions()
config.stdout = StringIO()
config.stderr = StringIO()
config['basedir'] = ensure_text(basedir.path)
config.twistd_args = []
result_code = tahoe_run.run(config)
self.assertIn("invalid PID file", config.stderr.getvalue())
self.assertTrue(len(mock_twistd.mock_calls), 1)
self.assertEqual(mock_twistd.mock_calls[0][0], 'runApp')
self.assertEqual(0, result_code)

View File

@ -16,11 +16,19 @@ from six.moves import (
StringIO,
)
from testtools import (
skipIf,
)
from testtools.matchers import (
Contains,
Equals,
HasLength,
)
from twisted.python.runtime import (
platform,
)
from twisted.python.filepath import (
FilePath,
)
@ -33,6 +41,8 @@ from twisted.internet.test.modulehelpers import (
from ...scripts.tahoe_run import (
DaemonizeTheRealService,
RunOptions,
run,
)
from ...scripts.runner import (
@ -135,3 +145,40 @@ class DaemonizeTheRealServiceTests(SyncTestCase):
""",
"Privacy requested",
)
class RunTests(SyncTestCase):
"""
Tests for ``run``.
"""
@skipIf(platform.isWindows(), "There are no PID files on Windows.")
def test_non_numeric_pid(self):
"""
If the pidfile exists but does not contain a numeric value, a complaint to
this effect is written to stderr.
"""
basedir = FilePath(self.mktemp()).asTextMode()
basedir.makedirs()
basedir.child(u"twistd.pid").setContent(b"foo")
basedir.child(u"tahoe-client.tac").setContent(b"")
config = RunOptions()
config.stdout = StringIO()
config.stderr = StringIO()
config['basedir'] = basedir.path
config.twistd_args = []
runs = []
result_code = run(config, runApp=runs.append)
self.assertThat(
config.stderr.getvalue(),
Contains("found invalid PID file in"),
)
self.assertThat(
runs,
HasLength(1),
)
self.assertThat(
result_code,
Equals(0),
)

View File

@ -35,6 +35,9 @@ from twisted.internet.error import (
from twisted.internet.interfaces import (
IProcessProtocol,
)
from twisted.python.log import (
msg,
)
from twisted.python.filepath import (
FilePath,
)
@ -99,7 +102,10 @@ class _ProcessProtocolAdapter(ProcessProtocol, object):
try:
proto = self._fds[childFD]
except KeyError:
pass
msg(format="Received unhandled output on %(fd)s: %(output)s",
fd=childFD,
output=data,
)
else:
proto.dataReceived(data)
@ -158,6 +164,9 @@ class CLINodeAPI(object):
u"-m",
u"allmydata.scripts.runner",
] + argv
msg(format="Executing %(argv)s",
argv=argv,
)
return self.reactor.spawnProcess(
processProtocol=process_protocol,
executable=exe,

View File

@ -15,6 +15,9 @@ import os
import sys
import time
import signal
from functools import (
partial,
)
from random import randrange
if PY2:
from StringIO import StringIO
@ -98,7 +101,7 @@ def run_cli_native(verb, *args, **kwargs):
args=args,
nodeargs=nodeargs,
)
argv = nodeargs + [verb] + list(args)
argv = ["tahoe"] + nodeargs + [verb] + list(args)
stdin = kwargs.get("stdin", "")
if PY2:
# The original behavior, the Python 2 behavior, is to accept either
@ -128,10 +131,20 @@ def run_cli_native(verb, *args, **kwargs):
stdout = TextIOWrapper(BytesIO(), encoding)
stderr = TextIOWrapper(BytesIO(), encoding)
d = defer.succeed(argv)
d.addCallback(runner.parse_or_exit_with_explanation, stdout=stdout)
d.addCallback(runner.dispatch,
stdin=stdin,
stdout=stdout, stderr=stderr)
d.addCallback(
partial(
runner.parse_or_exit,
runner.Options(),
),
stdout=stdout,
stderr=stderr,
)
d.addCallback(
runner.dispatch,
stdin=stdin,
stdout=stdout,
stderr=stderr,
)
def _done(rc, stdout=stdout, stderr=stderr):
if return_bytes and PY3:
stdout = stdout.buffer

View File

@ -19,6 +19,21 @@ import os.path, re, sys
from os import linesep
import locale
import six
from testtools import (
skipUnless,
)
from testtools.matchers import (
MatchesListwise,
MatchesAny,
Contains,
Equals,
Always,
)
from testtools.twistedsupport import (
succeeded,
)
from eliot import (
log_call,
)
@ -39,6 +54,10 @@ from allmydata.util import fileutil, pollmixin
from allmydata.util.encodingutil import unicode_to_argv
from allmydata.test import common_util
import allmydata
from allmydata.scripts.runner import (
parse_options,
)
from .common import (
PIPE,
Popen,
@ -46,6 +65,7 @@ from .common import (
from .common_util import (
parse_cli,
run_cli,
run_cli_unicode,
)
from .cli_node_api import (
CLINodeAPI,
@ -56,6 +76,9 @@ from .cli_node_api import (
from ..util.eliotutil import (
inline_callbacks,
)
from .common import (
SyncTestCase,
)
def get_root_from_file(src):
srcdir = os.path.dirname(os.path.dirname(os.path.normcase(os.path.realpath(src))))
@ -74,6 +97,56 @@ srcfile = allmydata.__file__
rootdir = get_root_from_file(srcfile)
class ParseOptionsTests(SyncTestCase):
"""
Tests for ``parse_options``.
"""
@skipUnless(six.PY2, "Only Python 2 exceptions must stringify to bytes.")
def test_nonascii_unknown_subcommand_python2(self):
"""
When ``parse_options`` is called with an argv indicating a subcommand that
does not exist and which also contains non-ascii characters, the
exception it raises includes the subcommand encoded as UTF-8.
"""
tricky = u"\u00F6"
try:
parse_options([tricky])
except usage.error as e:
self.assertEqual(
b"Unknown command: \\xf6",
b"{}".format(e),
)
class ParseOrExitTests(SyncTestCase):
"""
Tests for ``parse_or_exit``.
"""
def test_nonascii_error_content(self):
"""
``parse_or_exit`` can report errors that include non-ascii content.
"""
tricky = u"\u00F6"
self.assertThat(
run_cli_unicode(tricky, [], encoding="utf-8"),
succeeded(
MatchesListwise([
# returncode
Equals(1),
# stdout
MatchesAny(
# Python 2
Contains(u"Unknown command: \\xf6"),
# Python 3
Contains(u"Unknown command: \xf6"),
),
# stderr,
Always()
]),
),
)
@log_call(action_type="run-bin-tahoe")
def run_bintahoe(extra_argv, python_options=None):
"""
@ -110,8 +183,16 @@ class BinTahoe(common_util.SignalMixin, unittest.TestCase):
"""
tricky = u"\u00F6"
out, err, returncode = run_bintahoe([tricky])
if PY2:
expected = u"Unknown command: \\xf6"
else:
expected = u"Unknown command: \xf6"
self.assertEqual(returncode, 1)
self.assertIn(u"Unknown command: " + tricky, out)
self.assertIn(
expected,
out,
"expected {!r} not found in {!r}\nstderr: {!r}".format(expected, out, err),
)
def test_with_python_options(self):
"""
@ -305,7 +386,12 @@ class RunNode(common_util.SignalMixin, unittest.TestCase, pollmixin.PollMixin):
u"--hostname", u"127.0.0.1",
])
self.assertEqual(returncode, 0)
self.assertEqual(
returncode,
0,
"stdout: {!r}\n"
"stderr: {!r}\n",
)
# This makes sure that node.url is written, which allows us to
# detect when the introducer restarts in _node_has_restarted below.

View File

@ -24,11 +24,12 @@ import gc
from twisted.trial import unittest
from twisted.internet import defer
from twisted.internet.task import Clock
import itertools
from allmydata import interfaces
from allmydata.util import fileutil, hashutil, base32
from allmydata.storage.server import StorageServer
from allmydata.storage.server import StorageServer, DEFAULT_RENEWAL_TIME
from allmydata.storage.shares import get_share_file
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import BucketWriter, BucketReader, ShareFile
@ -168,7 +169,7 @@ class Bucket(unittest.TestCase):
assert len(renewsecret) == 32
cancelsecret = b'THIS LETS ME KILL YOUR FILE HAHA'
assert len(cancelsecret) == 32
expirationtime = struct.pack('>L', 60*60*24*31) # 31 days in seconds
expirationtime = struct.pack('>L', DEFAULT_RENEWAL_TIME) # 31 days in seconds
lease_data = ownernumber + renewsecret + cancelsecret + expirationtime
@ -354,10 +355,11 @@ class Server(unittest.TestCase):
basedir = os.path.join("storage", "Server", name)
return basedir
def create(self, name, reserved_space=0, klass=StorageServer):
def create(self, name, reserved_space=0, klass=StorageServer, get_current_time=time.time):
workdir = self.workdir(name)
ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space,
stats_provider=FakeStatsProvider())
stats_provider=FakeStatsProvider(),
get_current_time=get_current_time)
ss.setServiceParent(self.sparent)
return ss
@ -384,8 +386,8 @@ class Server(unittest.TestCase):
self.failUnlessIn(b'available-space', sv1)
def allocate(self, ss, storage_index, sharenums, size, canary=None):
renew_secret = hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret))
cancel_secret = hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret))
renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
if not canary:
canary = FakeCanary()
return ss.remote_allocate_buckets(storage_index,
@ -646,6 +648,27 @@ class Server(unittest.TestCase):
f2 = open(filename, "rb")
self.failUnlessEqual(f2.read(5), b"start")
def create_bucket_5_shares(
self, ss, storage_index, expected_already=0, expected_writers=5
):
"""
Given a StorageServer, create a bucket with 5 shares and return renewal
and cancellation secrets.
"""
canary = FakeCanary()
sharenums = list(range(5))
size = 100
# Creating a bucket also creates a lease:
rs, cs = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
already, writers = ss.remote_allocate_buckets(storage_index, rs, cs,
sharenums, size, canary)
self.failUnlessEqual(len(already), expected_already)
self.failUnlessEqual(len(writers), expected_writers)
for wb in writers.values():
wb.remote_close()
return rs, cs
def test_leases(self):
ss = self.create("test_leases")
@ -653,41 +676,23 @@ class Server(unittest.TestCase):
sharenums = list(range(5))
size = 100
rs0,cs0 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si0", rs0, cs0,
sharenums, size, canary)
self.failUnlessEqual(len(already), 0)
self.failUnlessEqual(len(writers), 5)
for wb in writers.values():
wb.remote_close()
# Create a bucket:
rs0, cs0 = self.create_bucket_5_shares(ss, b"si0")
leases = list(ss.get_leases(b"si0"))
self.failUnlessEqual(len(leases), 1)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
rs1,cs1 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si1", rs1, cs1,
sharenums, size, canary)
for wb in writers.values():
wb.remote_close()
rs1, cs1 = self.create_bucket_5_shares(ss, b"si1")
# take out a second lease on si1
rs2,cs2 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si1", rs2, cs2,
sharenums, size, canary)
self.failUnlessEqual(len(already), 5)
self.failUnlessEqual(len(writers), 0)
rs2, cs2 = self.create_bucket_5_shares(ss, b"si1", 5, 0)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 2)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
# and a third lease, using add-lease
rs2a,cs2a = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
ss.remote_add_lease(b"si1", rs2a, cs2a)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 3)
@ -715,10 +720,10 @@ class Server(unittest.TestCase):
"ss should not have a 'remote_cancel_lease' method/attribute")
# test overlapping uploads
rs3,cs3 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
rs4,cs4 = (hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)),
hashutil.tagged_hash(b"blah", b"%d" % next(self._lease_secret)))
rs3,cs3 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
rs4,cs4 = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
already,writers = ss.remote_allocate_buckets(b"si3", rs3, cs3,
sharenums, size, canary)
self.failUnlessEqual(len(already), 0)
@ -741,6 +746,28 @@ class Server(unittest.TestCase):
leases = list(ss.get_leases(b"si3"))
self.failUnlessEqual(len(leases), 2)
def test_immutable_add_lease_renews(self):
"""
Adding a lease on an already leased immutable with the same secret just
renews it.
"""
clock = Clock()
clock.advance(123)
ss = self.create("test_immutable_add_lease_renews", get_current_time=clock.seconds)
# Start out with single lease created with bucket:
renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0")
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.expiration_time, 123 + DEFAULT_RENEWAL_TIME)
# Time passes:
clock.advance(123456)
# Adding a lease with matching renewal secret just renews it:
ss.remote_add_lease(b"si0", renewal_secret, cancel_secret)
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.expiration_time, 123 + 123456 + DEFAULT_RENEWAL_TIME)
def test_have_shares(self):
"""By default the StorageServer has no shares."""
workdir = self.workdir("test_have_shares")
@ -840,9 +867,10 @@ class MutableServer(unittest.TestCase):
basedir = os.path.join("storage", "MutableServer", name)
return basedir
def create(self, name):
def create(self, name, get_current_time=time.time):
workdir = self.workdir(name)
ss = StorageServer(workdir, b"\x00" * 20)
ss = StorageServer(workdir, b"\x00" * 20,
get_current_time=get_current_time)
ss.setServiceParent(self.sparent)
return ss
@ -1379,6 +1407,41 @@ class MutableServer(unittest.TestCase):
{0: ([], [(500, b"make me really bigger")], None)}, [])
self.compare_leases_without_timestamps(all_leases, list(s0.get_leases()))
def test_mutable_add_lease_renews(self):
"""
Adding a lease on an already leased mutable with the same secret just
renews it.
"""
clock = Clock()
clock.advance(235)
ss = self.create("test_mutable_add_lease_renews",
get_current_time=clock.seconds)
def secrets(n):
return ( self.write_enabler(b"we1"),
self.renew_secret(b"we1-%d" % n),
self.cancel_secret(b"we1-%d" % n) )
data = b"".join([ (b"%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
write_enabler, renew_secret, cancel_secret = secrets(0)
rc = write(b"si1", (write_enabler, renew_secret, cancel_secret),
{0: ([], [(0,data)], None)}, [])
self.failUnlessEqual(rc, (True, {}))
bucket_dir = os.path.join(self.workdir("test_mutable_add_lease_renews"),
"shares", storage_index_to_dir(b"si1"))
s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
[lease] = s0.get_leases()
self.assertEqual(lease.expiration_time, 235 + DEFAULT_RENEWAL_TIME)
# Time passes...
clock.advance(835)
# Adding a lease renews it:
ss.remote_add_lease(b"si1", renew_secret, cancel_secret)
[lease] = s0.get_leases()
self.assertEqual(lease.expiration_time,
235 + 835 + DEFAULT_RENEWAL_TIME)
def test_remove(self):
ss = self.create("test_remove")
self.allocate(ss, b"si1", b"we1", next(self._lease_secret),

View File

@ -127,7 +127,7 @@ def argv_to_abspath(s, **kwargs):
return abspath_expanduser_unicode(decoded, **kwargs)
def unicode_to_argv(s, mangle=False):
def unicode_to_argv(s):
"""
Make the given unicode string suitable for use in an argv list.

View File

@ -188,7 +188,17 @@ def initialize():
# for example, the Python interpreter or any options passed to it, or runner
# scripts such as 'coverage run'. It works even if there are no such arguments,
# as in the case of a frozen executable created by bb-freeze or similar.
sys.argv = argv[-len(sys.argv):]
#
# Also, modify sys.argv in place. If any code has already taken a
# reference to the original argument list object then this ensures that
# code sees the new values. This reliance on mutation of shared state is,
# of course, awful. Why does this function even modify sys.argv? Why not
# have a function that *returns* the properly initialized argv as a new
# list? I don't know.
#
# At least Python 3 gets sys.argv correct so before very much longer we
# should be able to fix this bad design by deleting it.
sys.argv[:] = argv[-len(sys.argv):]
def a_console(handle):