Merge remote-tracking branch 'origin/master' into 3820.gbs-immutable-upload-session

This commit is contained in:
Itamar Turner-Trauring 2021-11-10 14:42:56 -05:00
commit 68a27b1125
42 changed files with 668 additions and 1426 deletions

View File

@ -6,6 +6,23 @@ on:
- "master"
pull_request:
# Control to what degree jobs in this workflow will run concurrently with
# other instances of themselves.
#
# https://docs.github.com/en/actions/learn-github-actions/workflow-syntax-for-github-actions#concurrency
concurrency:
# We want every revision on master to run the workflow completely.
# "head_ref" is not set for the "push" event but it is set for the
# "pull_request" event. If it is set then it is the name of the branch and
# we can use it to make sure each branch has only one active workflow at a
# time. If it is not set then we can compute a unique string that gives
# every master/push workflow its own group.
group: "${{ github.head_ref || format('{0}-{1}', github.run_number, github.run_attempt) }}"
# Then, we say that if a new workflow wants to start in the same group as a
# running workflow, the running workflow should be cancelled.
cancel-in-progress: true
env:
# Tell Hypothesis which configuration we want it to use.
TAHOE_LAFS_HYPOTHESIS_PROFILE: "ci"

1
.gitignore vendored
View File

@ -30,7 +30,6 @@ zope.interface-*.egg
/src/allmydata/test/plugins/dropin.cache
/_trial_temp*
/_test_memory/
/tmp*
/*.patch
/dist/

5
.readthedocs.yaml Normal file
View File

@ -0,0 +1,5 @@
version: 2
python:
install:
- requirements: docs/requirements.txt

View File

@ -142,31 +142,6 @@ count-lines:
# src/allmydata/test/bench_dirnode.py
# The check-speed and check-grid targets are disabled, since they depend upon
# the pre-located $(TAHOE) executable that was removed when we switched to
# tox. They will eventually be resurrected as dedicated tox environments.
# The check-speed target uses a pre-established client node to run a canned
# set of performance tests against a test network that is also
# pre-established (probably on a remote machine). Provide it with the path to
# a local directory where this client node has been created (and populated
# with the necessary FURLs of the test network). This target will start that
# client with the current code and then run the tests. Afterwards it will
# stop the client.
#
# The 'sleep 5' is in there to give the new client a chance to connect to its
# storageservers, since check_speed.py has no good way of doing that itself.
##.PHONY: check-speed
##check-speed: .built
## if [ -z '$(TESTCLIENTDIR)' ]; then exit 1; fi
## @echo "stopping any leftover client code"
## -$(TAHOE) stop $(TESTCLIENTDIR)
## $(TAHOE) start $(TESTCLIENTDIR)
## sleep 5
## $(TAHOE) @src/allmydata/test/check_speed.py $(TESTCLIENTDIR)
## $(TAHOE) stop $(TESTCLIENTDIR)
# The check-grid target also uses a pre-established client node, along with a
# long-term directory that contains some well-known files. See the docstring
# in src/allmydata/test/check_grid.py to see how to set this up.
@ -195,12 +170,11 @@ test-clean:
# Use 'make distclean' instead to delete all generated files.
.PHONY: clean
clean:
rm -rf build _trial_temp _test_memory .built
rm -rf build _trial_temp .built
rm -f `find src *.egg -name '*.so' -or -name '*.pyc'`
rm -rf support dist
rm -rf `ls -d *.egg | grep -vEe"setuptools-|setuptools_darcs-|darcsver-"`
rm -rf *.pyc
rm -f bin/tahoe bin/tahoe.pyscript
rm -f *.pkg
.PHONY: distclean

4
docs/requirements.txt Normal file
View File

@ -0,0 +1,4 @@
sphinx
docutils<0.18 # https://github.com/sphinx-doc/sphinx/issues/9788
recommonmark
sphinx_rtd_theme

View File

@ -35,6 +35,9 @@ from allmydata.test.common import (
if sys.platform.startswith('win'):
pytest.skip('Skipping Tor tests on Windows', allow_module_level=True)
if PY2:
pytest.skip('Skipping Tor tests on Python 2 because dependencies are hard to come by', allow_module_level=True)
@pytest_twisted.inlineCallbacks
def test_onion_service_storage(reactor, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl):
yield _create_anonymous_node(reactor, 'carol', 8008, request, temp_dir, flog_gatherer, tor_network, tor_introducer_furl)

View File

@ -1,522 +0,0 @@
from __future__ import print_function
import os, shutil, sys, urllib, time, stat, urlparse
# Python 2 compatibility
from future.utils import PY2
if PY2:
from future.builtins import str # noqa: F401
from six.moves import cStringIO as StringIO
from twisted.python.filepath import (
FilePath,
)
from twisted.internet import defer, reactor, protocol, error
from twisted.application import service, internet
from twisted.web import client as tw_client
from twisted.python import log, procutils
from foolscap.api import Tub, fireEventually, flushEventualQueue
from allmydata import client, introducer
from allmydata.immutable import upload
from allmydata.scripts import create_node
from allmydata.util import fileutil, pollmixin
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.scripts.common import (
write_introducer,
)
class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter, object):
full_speed_ahead = False
_bytes_so_far = 0
stalled = None
def handleResponsePart(self, data):
self._bytes_so_far += len(data)
if not self.factory.do_stall:
return
if self.full_speed_ahead:
return
if self._bytes_so_far > 1e6+100:
if not self.stalled:
print("STALLING")
self.transport.pauseProducing()
self.stalled = reactor.callLater(10.0, self._resume_speed)
def _resume_speed(self):
print("RESUME SPEED")
self.stalled = None
self.full_speed_ahead = True
self.transport.resumeProducing()
def handleResponseEnd(self):
if self.stalled:
print("CANCEL")
self.stalled.cancel()
self.stalled = None
return tw_client.HTTPPageGetter.handleResponseEnd(self)
class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory, object):
protocol = StallableHTTPGetterDiscarder
def discardPage(url, stall=False, *args, **kwargs):
"""Start fetching the URL, but stall our pipe after the first 1MB.
Wait 10 seconds, then resume downloading (and discarding) everything.
"""
# adapted from twisted.web.client.getPage . We can't just wrap or
# subclass because it provides no way to override the HTTPClientFactory
# that it creates.
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
assert scheme == 'http'
host, port = netloc, 80
if ":" in host:
host, port = host.split(":")
port = int(port)
factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
factory.do_stall = stall
reactor.connectTCP(host, port, factory)
return factory.deferred
class ChildDidNotStartError(Exception):
pass
class SystemFramework(pollmixin.PollMixin):
numnodes = 7
def __init__(self, basedir, mode):
self.basedir = basedir = abspath_expanduser_unicode(str(basedir))
if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
raise AssertionError("safety issue: basedir must be a subdir")
self.testdir = testdir = os.path.join(basedir, "test")
if os.path.exists(testdir):
shutil.rmtree(testdir)
fileutil.make_dirs(testdir)
self.sparent = service.MultiService()
self.sparent.startService()
self.proc = None
self.tub = Tub()
self.tub.setOption("expose-remote-exception-types", False)
self.tub.setServiceParent(self.sparent)
self.mode = mode
self.failed = False
self.keepalive_file = None
def run(self):
framelog = os.path.join(self.basedir, "driver.log")
log.startLogging(open(framelog, "a"), setStdout=False)
log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
#logfile = open(os.path.join(self.testdir, "log"), "w")
#flo = log.FileLogObserver(logfile)
#log.startLoggingWithObserver(flo.emit, setStdout=False)
d = fireEventually()
d.addCallback(lambda res: self.setUp())
d.addCallback(lambda res: self.record_initial_memusage())
d.addCallback(lambda res: self.make_nodes())
d.addCallback(lambda res: self.wait_for_client_connected())
d.addCallback(lambda res: self.do_test())
d.addBoth(self.tearDown)
def _err(err):
self.failed = err
log.err(err)
print(err)
d.addErrback(_err)
def _done(res):
reactor.stop()
return res
d.addBoth(_done)
reactor.run()
if self.failed:
# raiseException doesn't work for CopiedFailures
self.failed.raiseException()
def setUp(self):
#print("STARTING")
self.stats = {}
self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
self.make_introducer()
d = self.start_client()
def _record_control_furl(control_furl):
self.control_furl = control_furl
#print("OBTAINING '%s'" % (control_furl,))
return self.tub.getReference(self.control_furl)
d.addCallback(_record_control_furl)
def _record_control(control_rref):
self.control_rref = control_rref
d.addCallback(_record_control)
def _ready(res):
#print("CLIENT READY")
pass
d.addCallback(_ready)
return d
def record_initial_memusage(self):
print()
print("Client started (no connections yet)")
d = self._print_usage()
d.addCallback(self.stash_stats, "init")
return d
def wait_for_client_connected(self):
print()
print("Client connecting to other nodes..")
return self.control_rref.callRemote("wait_for_client_connections",
self.numnodes+1)
def tearDown(self, passthrough):
# the client node will shut down in a few seconds
#os.remove(os.path.join(self.clientdir, client.Client.EXIT_TRIGGER_FILE))
log.msg("shutting down SystemTest services")
if self.keepalive_file and os.path.exists(self.keepalive_file):
age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
log.msg("keepalive file at shutdown was %ds old" % age)
d = defer.succeed(None)
if self.proc:
d.addCallback(lambda res: self.kill_client())
d.addCallback(lambda res: self.sparent.stopService())
d.addCallback(lambda res: flushEventualQueue())
def _close_statsfile(res):
self.statsfile.close()
d.addCallback(_close_statsfile)
d.addCallback(lambda res: passthrough)
return d
def make_introducer(self):
iv_basedir = os.path.join(self.testdir, "introducer")
os.mkdir(iv_basedir)
self.introducer = introducer.IntroducerNode(basedir=iv_basedir)
self.introducer.setServiceParent(self)
self.introducer_furl = self.introducer.introducer_url
def make_nodes(self):
root = FilePath(self.testdir)
self.nodes = []
for i in range(self.numnodes):
nodedir = root.child("node%d" % (i,))
private = nodedir.child("private")
private.makedirs()
write_introducer(nodedir, "default", self.introducer_url)
config = (
"[client]\n"
"shares.happy = 1\n"
"[storage]\n"
)
# the only tests for which we want the internal nodes to actually
# retain shares are the ones where somebody's going to download
# them.
if self.mode in ("download", "download-GET", "download-GET-slow"):
# retain shares
pass
else:
# for these tests, we tell the storage servers to pretend to
# accept shares, but really just throw them out, since we're
# only testing upload and not download.
config += "debug_discard = true\n"
if self.mode in ("receive",):
# for this mode, the client-under-test gets all the shares,
# so our internal nodes can refuse requests
config += "readonly = true\n"
nodedir.child("tahoe.cfg").setContent(config)
c = client.Client(basedir=nodedir.path)
c.setServiceParent(self)
self.nodes.append(c)
# the peers will start running, eventually they will connect to each
# other and the introducer
def touch_keepalive(self):
if os.path.exists(self.keepalive_file):
age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
log.msg("touching keepalive file, was %ds old" % age)
f = open(self.keepalive_file, "w")
f.write("""\
If the node notices this file at startup, it will poll every 5 seconds and
terminate if the file is more than 10 seconds old, or if it has been deleted.
If the test harness has an internal failure and neglects to kill off the node
itself, this helps to avoid leaving processes lying around. The contents of
this file are ignored.
""")
f.close()
def start_client(self):
# this returns a Deferred that fires with the client's control.furl
log.msg("MAKING CLIENT")
# self.testdir is an absolute Unicode path
clientdir = self.clientdir = os.path.join(self.testdir, u"client")
clientdir_str = clientdir.encode(get_filesystem_encoding())
quiet = StringIO()
create_node.create_node({'basedir': clientdir}, out=quiet)
log.msg("DONE MAKING CLIENT")
write_introducer(clientdir, "default", self.introducer_furl)
# now replace tahoe.cfg
# set webport=0 and then ask the node what port it picked.
f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
f.write("[node]\n"
"web.port = tcp:0:interface=127.0.0.1\n"
"[client]\n"
"shares.happy = 1\n"
"[storage]\n"
)
if self.mode in ("upload-self", "receive"):
# accept and store shares, to trigger the memory consumption bugs
pass
else:
# don't accept any shares
f.write("readonly = true\n")
## also, if we do receive any shares, throw them away
#f.write("debug_discard = true")
if self.mode == "upload-self":
pass
f.close()
self.keepalive_file = os.path.join(clientdir,
client.Client.EXIT_TRIGGER_FILE)
# now start updating the mtime.
self.touch_keepalive()
ts = internet.TimerService(1.0, self.touch_keepalive)
ts.setServiceParent(self.sparent)
pp = ClientWatcher()
self.proc_done = pp.d = defer.Deferred()
logfile = os.path.join(self.basedir, "client.log")
tahoes = procutils.which("tahoe")
if not tahoes:
raise RuntimeError("unable to find a 'tahoe' executable")
cmd = [tahoes[0], "run", ".", "-l", logfile]
env = os.environ.copy()
self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
log.msg("CLIENT STARTED")
# now we wait for the client to get started. we're looking for the
# control.furl file to appear.
furl_file = os.path.join(clientdir, "private", "control.furl")
url_file = os.path.join(clientdir, "node.url")
def _check():
if pp.ended and pp.ended.value.status != 0:
# the twistd process ends normally (with rc=0) if the child
# is successfully launched. It ends abnormally (with rc!=0)
# if the child cannot be launched.
raise ChildDidNotStartError("process ended while waiting for startup")
return os.path.exists(furl_file)
d = self.poll(_check, 0.1)
# once it exists, wait a moment before we read from it, just in case
# it hasn't finished writing the whole thing. Ideally control.furl
# would be created in some atomic fashion, or made non-readable until
# it's ready, but I can't think of an easy way to do that, and I
# think the chances that we'll observe a half-write are pretty low.
def _stall(res):
d2 = defer.Deferred()
reactor.callLater(0.1, d2.callback, None)
return d2
d.addCallback(_stall)
def _read(res):
# read the node's URL
self.webish_url = open(url_file, "r").read().strip()
if self.webish_url[-1] == "/":
# trim trailing slash, since the rest of the code wants it gone
self.webish_url = self.webish_url[:-1]
f = open(furl_file, "r")
furl = f.read()
return furl.strip()
d.addCallback(_read)
return d
def kill_client(self):
# returns a Deferred that fires when the process exits. This may only
# be called once.
try:
self.proc.signalProcess("INT")
except error.ProcessExitedAlready:
pass
return self.proc_done
def create_data(self, name, size):
filename = os.path.join(self.testdir, name + ".data")
f = open(filename, "wb")
block = "a" * 8192
while size > 0:
l = min(size, 8192)
f.write(block[:l])
size -= l
return filename
def stash_stats(self, stats, name):
self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
self.statsfile.flush()
self.stats[name] = stats['VmPeak']
def POST(self, urlpath, **fields):
url = self.webish_url + urlpath
sepbase = "boogabooga"
sep = "--" + sepbase
form = []
form.append(sep)
form.append('Content-Disposition: form-data; name="_charset"')
form.append('')
form.append('UTF-8')
form.append(sep)
for name, value in fields.iteritems():
if isinstance(value, tuple):
filename, value = value
form.append('Content-Disposition: form-data; name="%s"; '
'filename="%s"' % (name, filename))
else:
form.append('Content-Disposition: form-data; name="%s"' % name)
form.append('')
form.append(value)
form.append(sep)
form[-1] += "--"
body = "\r\n".join(form) + "\r\n"
headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
}
return tw_client.getPage(url, method="POST", postdata=body,
headers=headers, followRedirect=False)
def GET_discard(self, urlpath, stall):
url = self.webish_url + urlpath + "?filename=dummy-get.out"
return discardPage(url, stall)
def _print_usage(self, res=None):
d = self.control_rref.callRemote("get_memory_usage")
def _print(stats):
print("VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
stats["VmPeak"]))
return stats
d.addCallback(_print)
return d
def _do_upload(self, res, size, files, uris):
name = '%d' % size
print()
print("uploading %s" % name)
if self.mode in ("upload", "upload-self"):
d = self.control_rref.callRemote("upload_random_data_from_file",
size,
convergence="check-memory")
elif self.mode == "upload-POST":
data = "a" * size
url = "/uri"
d = self.POST(url, t="upload", file=("%d.data" % size, data))
elif self.mode in ("receive",
"download", "download-GET", "download-GET-slow"):
# mode=receive: upload the data from a local peer, so that the
# client-under-test receives and stores the shares
#
# mode=download*: upload the data from a local peer, then have
# the client-under-test download it.
#
# we need to wait until the uploading node has connected to all
# peers, since the wait_for_client_connections() above doesn't
# pay attention to our self.nodes[] and their connections.
files[name] = self.create_data(name, size)
u = self.nodes[0].getServiceNamed("uploader")
d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
d.addCallback(lambda res:
u.upload(upload.FileName(files[name],
convergence="check-memory")))
d.addCallback(lambda results: results.get_uri())
else:
raise ValueError("unknown mode=%s" % self.mode)
def _complete(uri):
uris[name] = uri
print("uploaded %s" % name)
d.addCallback(_complete)
return d
def _do_download(self, res, size, uris):
if self.mode not in ("download", "download-GET", "download-GET-slow"):
return
name = '%d' % size
print("downloading %s" % name)
uri = uris[name]
if self.mode == "download":
d = self.control_rref.callRemote("download_to_tempfile_and_delete",
uri)
elif self.mode == "download-GET":
url = "/uri/%s" % uri
d = self.GET_discard(urllib.quote(url), stall=False)
elif self.mode == "download-GET-slow":
url = "/uri/%s" % uri
d = self.GET_discard(urllib.quote(url), stall=True)
def _complete(res):
print("downloaded %s" % name)
return res
d.addCallback(_complete)
return d
def do_test(self):
#print("CLIENT STARTED")
#print("FURL", self.control_furl)
#print("RREF", self.control_rref)
#print()
kB = 1000; MB = 1000*1000
files = {}
uris = {}
d = self._print_usage()
d.addCallback(self.stash_stats, "0B")
for i in range(10):
d.addCallback(self._do_upload, 10*kB+i, files, uris)
d.addCallback(self._do_download, 10*kB+i, uris)
d.addCallback(self._print_usage)
d.addCallback(self.stash_stats, "10kB")
for i in range(3):
d.addCallback(self._do_upload, 10*MB+i, files, uris)
d.addCallback(self._do_download, 10*MB+i, uris)
d.addCallback(self._print_usage)
d.addCallback(self.stash_stats, "10MB")
for i in range(1):
d.addCallback(self._do_upload, 50*MB+i, files, uris)
d.addCallback(self._do_download, 50*MB+i, uris)
d.addCallback(self._print_usage)
d.addCallback(self.stash_stats, "50MB")
#for i in range(1):
# d.addCallback(self._do_upload, 100*MB+i, files, uris)
# d.addCallback(self._do_download, 100*MB+i, uris)
# d.addCallback(self._print_usage)
#d.addCallback(self.stash_stats, "100MB")
#d.addCallback(self.stall)
def _done(res):
print("FINISHING")
d.addCallback(_done)
return d
def stall(self, res):
d = defer.Deferred()
reactor.callLater(5, d.callback, None)
return d
class ClientWatcher(protocol.ProcessProtocol, object):
ended = False
def outReceived(self, data):
print("OUT:", data)
def errReceived(self, data):
print("ERR:", data)
def processEnded(self, reason):
self.ended = reason
self.d.callback(None)
if __name__ == '__main__':
mode = "upload"
if len(sys.argv) > 1:
mode = sys.argv[1]
if sys.maxsize == 2147483647:
bits = "32"
elif sys.maxsize == 9223372036854775807:
bits = "64"
else:
bits = "?"
print("%s-bit system (sys.maxsize=%d)" % (bits, sys.maxsize))
# put the logfile and stats.out in _test_memory/ . These stick around.
# put the nodes and other files in _test_memory/test/ . These are
# removed each time we run.
sf = SystemFramework("_test_memory", mode)
sf.run()

View File

@ -1,234 +0,0 @@
from __future__ import print_function
import os, sys
from twisted.internet import reactor, defer
from twisted.python import log
from twisted.application import service
from foolscap.api import Tub, fireEventually
MB = 1000000
class SpeedTest(object):
DO_IMMUTABLE = True
DO_MUTABLE_CREATE = True
DO_MUTABLE = True
def __init__(self, test_client_dir):
#self.real_stderr = sys.stderr
log.startLogging(open("st.log", "a"), setStdout=False)
f = open(os.path.join(test_client_dir, "private", "control.furl"), "r")
self.control_furl = f.read().strip()
f.close()
self.base_service = service.MultiService()
self.failed = None
self.upload_times = {}
self.download_times = {}
def run(self):
print("STARTING")
d = fireEventually()
d.addCallback(lambda res: self.setUp())
d.addCallback(lambda res: self.do_test())
d.addBoth(self.tearDown)
def _err(err):
self.failed = err
log.err(err)
print(err)
d.addErrback(_err)
def _done(res):
reactor.stop()
return res
d.addBoth(_done)
reactor.run()
if self.failed:
print("EXCEPTION")
print(self.failed)
sys.exit(1)
def setUp(self):
self.base_service.startService()
self.tub = Tub()
self.tub.setOption("expose-remote-exception-types", False)
self.tub.setServiceParent(self.base_service)
d = self.tub.getReference(self.control_furl)
def _gotref(rref):
self.client_rref = rref
print("Got Client Control reference")
return self.stall(5)
d.addCallback(_gotref)
return d
def stall(self, delay, result=None):
d = defer.Deferred()
reactor.callLater(delay, d.callback, result)
return d
def record_times(self, times, key):
print("TIME (%s): %s up, %s down" % (key, times[0], times[1]))
self.upload_times[key], self.download_times[key] = times
def one_test(self, res, name, count, size, mutable):
# values for 'mutable':
# False (upload a different CHK file for each 'count')
# "create" (upload different contents into a new SSK file)
# "upload" (upload different contents into the same SSK file. The
# time consumed does not include the creation of the file)
d = self.client_rref.callRemote("speed_test", count, size, mutable)
d.addCallback(self.record_times, name)
return d
def measure_rtt(self, res):
# use RIClient.get_nodeid() to measure the foolscap-level RTT
d = self.client_rref.callRemote("measure_peer_response_time")
def _got(res):
assert len(res) # need at least one peer
times = res.values()
self.total_rtt = sum(times)
self.average_rtt = sum(times) / len(times)
self.max_rtt = max(times)
print("num-peers: %d" % len(times))
print("total-RTT: %f" % self.total_rtt)
print("average-RTT: %f" % self.average_rtt)
print("max-RTT: %f" % self.max_rtt)
d.addCallback(_got)
return d
def do_test(self):
print("doing test")
d = defer.succeed(None)
d.addCallback(self.one_test, "startup", 1, 1000, False) #ignore this one
d.addCallback(self.measure_rtt)
if self.DO_IMMUTABLE:
# immutable files
d.addCallback(self.one_test, "1x 200B", 1, 200, False)
d.addCallback(self.one_test, "10x 200B", 10, 200, False)
def _maybe_do_100x_200B(res):
if self.upload_times["10x 200B"] < 5:
print("10x 200B test went too fast, doing 100x 200B test")
return self.one_test(None, "100x 200B", 100, 200, False)
return
d.addCallback(_maybe_do_100x_200B)
d.addCallback(self.one_test, "1MB", 1, 1*MB, False)
d.addCallback(self.one_test, "10MB", 1, 10*MB, False)
def _maybe_do_100MB(res):
if self.upload_times["10MB"] > 30:
print("10MB test took too long, skipping 100MB test")
return
return self.one_test(None, "100MB", 1, 100*MB, False)
d.addCallback(_maybe_do_100MB)
if self.DO_MUTABLE_CREATE:
# mutable file creation
d.addCallback(self.one_test, "10x 200B SSK creation", 10, 200,
"create")
if self.DO_MUTABLE:
# mutable file upload/download
d.addCallback(self.one_test, "10x 200B SSK", 10, 200, "upload")
def _maybe_do_100x_200B_SSK(res):
if self.upload_times["10x 200B SSK"] < 5:
print("10x 200B SSK test went too fast, doing 100x 200B SSK")
return self.one_test(None, "100x 200B SSK", 100, 200,
"upload")
return
d.addCallback(_maybe_do_100x_200B_SSK)
d.addCallback(self.one_test, "1MB SSK", 1, 1*MB, "upload")
d.addCallback(self.calculate_speeds)
return d
def calculate_speeds(self, res):
# time = A*size+B
# we assume that A*200bytes is negligible
if self.DO_IMMUTABLE:
# upload
if "100x 200B" in self.upload_times:
B = self.upload_times["100x 200B"] / 100
else:
B = self.upload_times["10x 200B"] / 10
print("upload per-file time: %.3fs" % B)
print("upload per-file times-avg-RTT: %f" % (B / self.average_rtt))
print("upload per-file times-total-RTT: %f" % (B / self.total_rtt))
A1 = 1*MB / (self.upload_times["1MB"] - B) # in bytes per second
print("upload speed (1MB):", self.number(A1, "Bps"))
A2 = 10*MB / (self.upload_times["10MB"] - B)
print("upload speed (10MB):", self.number(A2, "Bps"))
if "100MB" in self.upload_times:
A3 = 100*MB / (self.upload_times["100MB"] - B)
print("upload speed (100MB):", self.number(A3, "Bps"))
# download
if "100x 200B" in self.download_times:
B = self.download_times["100x 200B"] / 100
else:
B = self.download_times["10x 200B"] / 10
print("download per-file time: %.3fs" % B)
print("download per-file times-avg-RTT: %f" % (B / self.average_rtt))
print("download per-file times-total-RTT: %f" % (B / self.total_rtt))
A1 = 1*MB / (self.download_times["1MB"] - B) # in bytes per second
print("download speed (1MB):", self.number(A1, "Bps"))
A2 = 10*MB / (self.download_times["10MB"] - B)
print("download speed (10MB):", self.number(A2, "Bps"))
if "100MB" in self.download_times:
A3 = 100*MB / (self.download_times["100MB"] - B)
print("download speed (100MB):", self.number(A3, "Bps"))
if self.DO_MUTABLE_CREATE:
# SSK creation
B = self.upload_times["10x 200B SSK creation"] / 10
print("create per-file time SSK: %.3fs" % B)
if self.DO_MUTABLE:
# upload SSK
if "100x 200B SSK" in self.upload_times:
B = self.upload_times["100x 200B SSK"] / 100
else:
B = self.upload_times["10x 200B SSK"] / 10
print("upload per-file time SSK: %.3fs" % B)
A1 = 1*MB / (self.upload_times["1MB SSK"] - B) # in bytes per second
print("upload speed SSK (1MB):", self.number(A1, "Bps"))
# download SSK
if "100x 200B SSK" in self.download_times:
B = self.download_times["100x 200B SSK"] / 100
else:
B = self.download_times["10x 200B SSK"] / 10
print("download per-file time SSK: %.3fs" % B)
A1 = 1*MB / (self.download_times["1MB SSK"] - B) # in bytes per
# second
print("download speed SSK (1MB):", self.number(A1, "Bps"))
def number(self, value, suffix=""):
scaling = 1
if value < 1:
fmt = "%1.2g%s"
elif value < 100:
fmt = "%.1f%s"
elif value < 1000:
fmt = "%d%s"
elif value < 1e6:
fmt = "%.2fk%s"; scaling = 1e3
elif value < 1e9:
fmt = "%.2fM%s"; scaling = 1e6
elif value < 1e12:
fmt = "%.2fG%s"; scaling = 1e9
elif value < 1e15:
fmt = "%.2fT%s"; scaling = 1e12
elif value < 1e18:
fmt = "%.2fP%s"; scaling = 1e15
else:
fmt = "huge! %g%s"
return fmt % (value / scaling, suffix)
def tearDown(self, res):
d = self.base_service.stopService()
d.addCallback(lambda ignored: res)
return d
if __name__ == '__main__':
test_client_dir = sys.argv[1]
st = SpeedTest(test_client_dir)
st.run()

View File

@ -1,20 +0,0 @@
#! /usr/bin/env python
from __future__ import print_function
from foolscap import Tub
from foolscap.eventual import eventually
import sys
from twisted.internet import reactor
def go():
t = Tub()
d = t.getReference(sys.argv[1])
d.addCallback(lambda rref: rref.callRemote("get_memory_usage"))
def _got(res):
print(res)
reactor.stop()
d.addCallback(_got)
eventually(go)
reactor.run()

View File

@ -0,0 +1 @@
The little-used "control port" has been removed from all node types.

0
newsfragments/3829.minor Normal file
View File

0
newsfragments/3830.minor Normal file
View File

0
newsfragments/3831.minor Normal file
View File

0
newsfragments/3832.minor Normal file
View File

0
newsfragments/3833.minor Normal file
View File

0
newsfragments/3834.minor Normal file
View File

0
newsfragments/3835.minor Normal file
View File

0
newsfragments/3836.minor Normal file
View File

1
newsfragments/3837.other Normal file
View File

@ -0,0 +1 @@
Tahoe-LAFS no longer runs its Tor integration test suite on Python 2 due to the increased complexity of obtaining compatible versions of necessary dependencies.

0
newsfragments/3838.minor Normal file
View File

View File

@ -40,7 +40,6 @@ from allmydata.storage.server import StorageServer
from allmydata import storage_client
from allmydata.immutable.upload import Uploader
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import (
hashutil, base32, pollmixin, log, idlib,
@ -283,7 +282,6 @@ def create_client_from_config(config, _client_factory=None, _introducer_factory=
config, tub_options, default_connection_handlers,
foolscap_connection_handlers, i2p_provider, tor_provider,
)
control_tub = node.create_control_tub()
introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
storage_broker = create_storage_farm_broker(
@ -294,7 +292,6 @@ def create_client_from_config(config, _client_factory=None, _introducer_factory=
client = _client_factory(
config,
main_tub,
control_tub,
i2p_provider,
tor_provider,
introducer_clients,
@ -631,12 +628,12 @@ class _Client(node.Node, pollmixin.PollMixin):
"max_segment_size": DEFAULT_MAX_SEGMENT_SIZE,
}
def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider, introducer_clients,
def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients,
storage_farm_broker):
"""
Use :func:`allmydata.client.create_client` to instantiate one of these.
"""
node.Node.__init__(self, config, main_tub, control_tub, i2p_provider, tor_provider)
node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider)
self.started_timestamp = time.time()
self.logSource = "Client"
@ -648,7 +645,6 @@ class _Client(node.Node, pollmixin.PollMixin):
self.init_stats_provider()
self.init_secrets()
self.init_node_key()
self.init_control()
self._key_generator = KeyGenerator()
key_gen_furl = config.get_config("client", "key_generator.furl", None)
if key_gen_furl:
@ -985,12 +981,6 @@ class _Client(node.Node, pollmixin.PollMixin):
def get_history(self):
return self.history
def init_control(self):
c = ControlServer()
c.setServiceParent(self)
control_url = self.control_tub.registerReference(c)
self.config.write_private_config("control.furl", control_url + "\n")
def init_helper(self):
self.helper = Helper(self.config.get_config_path("helper"),
self.storage_broker, self._secret_holder,

View File

@ -1,273 +0,0 @@
"""Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import os, time, tempfile
from zope.interface import implementer
from twisted.application import service
from twisted.internet import defer
from twisted.internet.interfaces import IConsumer
from foolscap.api import Referenceable
from allmydata.interfaces import RIControlClient, IFileNode
from allmydata.util import fileutil, mathutil
from allmydata.immutable import upload
from allmydata.mutable.publish import MutableData
from twisted.python import log
def get_memory_usage():
# this is obviously linux-specific
stat_names = (b"VmPeak",
b"VmSize",
#b"VmHWM",
b"VmData")
stats = {}
try:
with open("/proc/self/status", "rb") as f:
for line in f:
name, right = line.split(b":",2)
if name in stat_names:
assert right.endswith(b" kB\n")
right = right[:-4]
stats[name] = int(right) * 1024
except:
# Probably not on (a compatible version of) Linux
stats['VmSize'] = 0
stats['VmPeak'] = 0
return stats
def log_memory_usage(where=""):
stats = get_memory_usage()
log.msg("VmSize: %9d VmPeak: %9d %s" % (stats[b"VmSize"],
stats[b"VmPeak"],
where))
@implementer(IConsumer)
class FileWritingConsumer(object):
def __init__(self, filename):
self.done = False
self.f = open(filename, "wb")
def registerProducer(self, p, streaming):
if streaming:
p.resumeProducing()
else:
while not self.done:
p.resumeProducing()
def write(self, data):
self.f.write(data)
def unregisterProducer(self):
self.done = True
self.f.close()
@implementer(RIControlClient)
class ControlServer(Referenceable, service.Service):
def remote_wait_for_client_connections(self, num_clients):
return self.parent.debug_wait_for_client_connections(num_clients)
def remote_upload_random_data_from_file(self, size, convergence):
tempdir = tempfile.mkdtemp()
filename = os.path.join(tempdir, "data")
f = open(filename, "wb")
block = b"a" * 8192
while size > 0:
l = min(size, 8192)
f.write(block[:l])
size -= l
f.close()
uploader = self.parent.getServiceNamed("uploader")
u = upload.FileName(filename, convergence=convergence)
# XXX should pass reactor arg
d = uploader.upload(u)
d.addCallback(lambda results: results.get_uri())
def _done(uri):
os.remove(filename)
os.rmdir(tempdir)
return uri
d.addCallback(_done)
return d
def remote_download_to_tempfile_and_delete(self, uri):
tempdir = tempfile.mkdtemp()
filename = os.path.join(tempdir, "data")
filenode = self.parent.create_node_from_uri(uri, name=filename)
if not IFileNode.providedBy(filenode):
raise AssertionError("The URI does not reference a file.")
c = FileWritingConsumer(filename)
d = filenode.read(c)
def _done(res):
os.remove(filename)
os.rmdir(tempdir)
return None
d.addCallback(_done)
return d
def remote_speed_test(self, count, size, mutable):
assert size > 8
log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
mutable))
st = SpeedTest(self.parent, count, size, mutable)
return st.run()
def remote_get_memory_usage(self):
return get_memory_usage()
def remote_measure_peer_response_time(self):
# I'd like to average together several pings, but I don't want this
# phase to take more than 10 seconds. Expect worst-case latency to be
# 300ms.
results = {}
sb = self.parent.get_storage_broker()
everyone = sb.get_connected_servers()
num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
everyone = list(everyone) * num_pings
d = self._do_one_ping(None, everyone, results)
return d
def _do_one_ping(self, res, everyone_left, results):
if not everyone_left:
return results
server = everyone_left.pop(0)
server_name = server.get_longname()
storage_server = server.get_storage_server()
start = time.time()
d = storage_server.get_buckets(b"\x00" * 16)
def _done(ignored):
stop = time.time()
elapsed = stop - start
if server_name in results:
results[server_name].append(elapsed)
else:
results[server_name] = [elapsed]
d.addCallback(_done)
d.addCallback(self._do_one_ping, everyone_left, results)
def _average(res):
averaged = {}
for server_name,times in results.items():
averaged[server_name] = sum(times) / len(times)
return averaged
d.addCallback(_average)
return d
class SpeedTest(object):
def __init__(self, parent, count, size, mutable):
self.parent = parent
self.count = count
self.size = size
self.mutable_mode = mutable
self.uris = {}
self.basedir = self.parent.config.get_config_path("_speed_test_data")
def run(self):
self.create_data()
d = self.do_upload()
d.addCallback(lambda res: self.do_download())
d.addBoth(self.do_cleanup)
d.addCallback(lambda res: (self.upload_time, self.download_time))
return d
def create_data(self):
fileutil.make_dirs(self.basedir)
for i in range(self.count):
s = self.size
fn = os.path.join(self.basedir, str(i))
if os.path.exists(fn):
os.unlink(fn)
f = open(fn, "wb")
f.write(os.urandom(8))
s -= 8
while s > 0:
chunk = min(s, 4096)
f.write(b"\x00" * chunk)
s -= chunk
f.close()
def do_upload(self):
d = defer.succeed(None)
def _create_slot(res):
d1 = self.parent.create_mutable_file(b"")
def _created(n):
self._n = n
d1.addCallback(_created)
return d1
if self.mutable_mode == "upload":
d.addCallback(_create_slot)
def _start(res):
self._start = time.time()
d.addCallback(_start)
def _record_uri(uri, i):
self.uris[i] = uri
def _upload_one_file(ignored, i):
if i >= self.count:
return
fn = os.path.join(self.basedir, str(i))
if self.mutable_mode == "create":
data = open(fn,"rb").read()
d1 = self.parent.create_mutable_file(data)
d1.addCallback(lambda n: n.get_uri())
elif self.mutable_mode == "upload":
data = open(fn,"rb").read()
d1 = self._n.overwrite(MutableData(data))
d1.addCallback(lambda res: self._n.get_uri())
else:
up = upload.FileName(fn, convergence=None)
d1 = self.parent.upload(up)
d1.addCallback(lambda results: results.get_uri())
d1.addCallback(_record_uri, i)
d1.addCallback(_upload_one_file, i+1)
return d1
d.addCallback(_upload_one_file, 0)
def _upload_done(ignored):
stop = time.time()
self.upload_time = stop - self._start
d.addCallback(_upload_done)
return d
def do_download(self):
start = time.time()
d = defer.succeed(None)
def _download_one_file(ignored, i):
if i >= self.count:
return
n = self.parent.create_node_from_uri(self.uris[i])
if not IFileNode.providedBy(n):
raise AssertionError("The URI does not reference a file.")
if n.is_mutable():
d1 = n.download_best_version()
else:
d1 = n.read(DiscardingConsumer())
d1.addCallback(_download_one_file, i+1)
return d1
d.addCallback(_download_one_file, 0)
def _download_done(ignored):
stop = time.time()
self.download_time = stop - start
d.addCallback(_download_done)
return d
def do_cleanup(self, res):
for i in range(self.count):
fn = os.path.join(self.basedir, str(i))
os.unlink(fn)
return res
@implementer(IConsumer)
class DiscardingConsumer(object):
def __init__(self):
self.done = False
def registerProducer(self, p, streaming):
if streaming:
p.resumeProducing()
else:
while not self.done:
p.resumeProducing()
def write(self, data):
pass
def unregisterProducer(self):
self.done = True

View File

@ -39,7 +39,6 @@ from allmydata.introducer.common import unsign_from_foolscap, \
from allmydata.node import read_config
from allmydata.node import create_node_dir
from allmydata.node import create_connection_handlers
from allmydata.node import create_control_tub
from allmydata.node import create_tub_options
from allmydata.node import create_main_tub
@ -88,12 +87,10 @@ def create_introducer(basedir=u"."):
config, tub_options, default_connection_handlers,
foolscap_connection_handlers, i2p_provider, tor_provider,
)
control_tub = create_control_tub()
node = _IntroducerNode(
config,
main_tub,
control_tub,
i2p_provider,
tor_provider,
)
@ -105,8 +102,8 @@ def create_introducer(basedir=u"."):
class _IntroducerNode(node.Node):
NODETYPE = "introducer"
def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider):
node.Node.__init__(self, config, main_tub, control_tub, i2p_provider, tor_provider)
def __init__(self, config, main_tub, i2p_provider, tor_provider):
node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider)
self.init_introducer()
webport = self.get_config("node", "web.port", None)
if webport:

View File

@ -919,18 +919,6 @@ def create_main_tub(config, tub_options,
return tub
def create_control_tub():
"""
Creates a Foolscap Tub for use by the control port. This is a
localhost-only ephemeral Tub, with no control over the listening
port or location
"""
control_tub = Tub()
portnum = iputil.listenOnUnused(control_tub)
log.msg("Control Tub location set to 127.0.0.1:%s" % (portnum,))
return control_tub
class Node(service.MultiService):
"""
This class implements common functionality of both Client nodes and Introducer nodes.
@ -938,7 +926,7 @@ class Node(service.MultiService):
NODETYPE = "unknown NODETYPE"
CERTFILE = "node.pem"
def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider):
def __init__(self, config, main_tub, i2p_provider, tor_provider):
"""
Initialize the node with the given configuration. Its base directory
is the current directory by default.
@ -967,10 +955,6 @@ class Node(service.MultiService):
else:
self.nodeid = self.short_nodeid = None
self.control_tub = control_tub
if self.control_tub is not None:
self.control_tub.setServiceParent(self)
self.log("Node constructed. " + __full_version__)
iputil.increase_rlimits()

View File

@ -141,7 +141,9 @@ def write_introducer(basedir, petname, furl):
"""
if isinstance(furl, bytes):
furl = furl.decode("utf-8")
basedir.child(b"private").child(b"introducers.yaml").setContent(
private = basedir.child(b"private")
private.makedirs(ignoreExistingDirectory=True)
private.child(b"introducers.yaml").setContent(
safe_dump({
"introducers": {
petname: {

View File

@ -15,15 +15,22 @@ try:
except ImportError:
pass
# do not import any allmydata modules at this level. Do that from inside
# individual functions instead.
import struct, time, os, sys
from twisted.python import usage, failure
from twisted.internet import defer
from foolscap.logging import cli as foolscap_cli
from allmydata.scripts.common import BaseOptions
from allmydata.scripts.common import BaseOptions
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.mutable.layout import unpack_share
from allmydata.mutable.layout import MDMFSlotReadProxy
from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import base32
from allmydata.util.encodingutil import quote_output
class DumpOptions(BaseOptions):
def getSynopsis(self):
@ -56,13 +63,11 @@ def dump_share(options):
# check the version, to see if we have a mutable or immutable share
print("share filename: %s" % quote_output(options['filename']), file=out)
f = open(options['filename'], "rb")
prefix = f.read(32)
f.close()
if prefix == MutableShareFile.MAGIC:
return dump_mutable_share(options)
# otherwise assume it's immutable
return dump_immutable_share(options)
with open(options['filename'], "rb") as f:
if MutableShareFile.is_valid_header(f.read(32)):
return dump_mutable_share(options)
# otherwise assume it's immutable
return dump_immutable_share(options)
def dump_immutable_share(options):
from allmydata.storage.immutable import ShareFile
@ -170,7 +175,7 @@ def dump_immutable_lease_info(f, out):
leases = list(f.get_leases())
if leases:
for i,lease in enumerate(leases):
when = format_expiration_time(lease.expiration_time)
when = format_expiration_time(lease.get_expiration_time())
print(" Lease #%d: owner=%d, expire in %s" \
% (i, lease.owner_num, when), file=out)
else:
@ -223,7 +228,7 @@ def dump_mutable_share(options):
print(file=out)
print(" Lease #%d:" % leasenum, file=out)
print(" ownerid: %d" % lease.owner_num, file=out)
when = format_expiration_time(lease.expiration_time)
when = format_expiration_time(lease.get_expiration_time())
print(" expires in %s" % when, file=out)
print(" renew_secret: %s" % str(base32.b2a(lease.renew_secret), "utf-8"), file=out)
print(" cancel_secret: %s" % str(base32.b2a(lease.cancel_secret), "utf-8"), file=out)
@ -712,125 +717,122 @@ def call(c, *args, **kwargs):
return results[0]
def describe_share(abs_sharefile, si_s, shnum_s, now, out):
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.mutable.layout import unpack_share
from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import base32
from allmydata.util.encodingutil import quote_output
import struct
f = open(abs_sharefile, "rb")
prefix = f.read(32)
if prefix == MutableShareFile.MAGIC:
# mutable share
m = MutableShareFile(abs_sharefile)
WE, nodeid = m._read_write_enabler_and_nodeid(f)
data_length = m._read_data_length(f)
expiration_time = min( [lease.expiration_time
for (i,lease) in m._enumerate_leases(f)] )
expiration = max(0, expiration_time - now)
share_type = "unknown"
f.seek(m.DATA_OFFSET)
version = f.read(1)
if version == b"\x00":
# this slot contains an SMDF share
share_type = "SDMF"
elif version == b"\x01":
share_type = "MDMF"
if share_type == "SDMF":
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, 2000))
try:
pieces = unpack_share(data)
except NeedMoreDataError as e:
# retry once with the larger size
size = e.needed_bytes
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = pieces
print("SDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
elif share_type == "MDMF":
from allmydata.mutable.layout import MDMFSlotReadProxy
fake_shnum = 0
# TODO: factor this out with dump_MDMF_share()
class ShareDumper(MDMFSlotReadProxy):
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
f.seek(m.DATA_OFFSET+where)
data.append(f.read(length))
return defer.succeed({fake_shnum: data})
p = ShareDumper(None, "fake-si", fake_shnum)
def extract(func):
stash = []
# these methods return Deferreds, but we happen to know that
# they run synchronously when not actually talking to a
# remote server
d = func()
d.addCallback(stash.append)
return stash[0]
verinfo = extract(p.get_verinfo)
(seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
offsets) = verinfo
print("MDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
with open(abs_sharefile, "rb") as f:
prefix = f.read(32)
if MutableShareFile.is_valid_header(prefix):
_describe_mutable_share(abs_sharefile, f, now, si_s, out)
elif ShareFile.is_valid_header(prefix):
_describe_immutable_share(abs_sharefile, now, si_s, out)
else:
print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out)
print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out)
elif struct.unpack(">L", prefix[:4]) == (1,):
# immutable
def _describe_mutable_share(abs_sharefile, f, now, si_s, out):
# mutable share
m = MutableShareFile(abs_sharefile)
WE, nodeid = m._read_write_enabler_and_nodeid(f)
data_length = m._read_data_length(f)
expiration_time = min( [lease.get_expiration_time()
for (i,lease) in m._enumerate_leases(f)] )
expiration = max(0, expiration_time - now)
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
ReadBucketProxy.__init__(self, None, None, "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
return defer.succeed(sf.read_share_data(offset, size))
share_type = "unknown"
f.seek(m.DATA_OFFSET)
version = f.read(1)
if version == b"\x00":
# this slot contains an SMDF share
share_type = "SDMF"
elif version == b"\x01":
share_type = "MDMF"
# use a ReadBucketProxy to parse the bucket and find the uri extension
sf = ShareFile(abs_sharefile)
bp = ImmediateReadBucketProxy(sf)
if share_type == "SDMF":
f.seek(m.DATA_OFFSET)
expiration_time = min( [lease.expiration_time
for lease in sf.get_leases()] )
expiration = max(0, expiration_time - now)
# Read at least the mutable header length, if possible. If there's
# less data than that in the share, don't try to read more (we won't
# be able to unpack the header in this case but we surely don't want
# to try to unpack bytes *following* the data section as if they were
# header data). Rather than 2000 we could use HEADER_LENGTH from
# allmydata/mutable/layout.py, probably.
data = f.read(min(data_length, 2000))
UEB_data = call(bp.get_uri_extension)
unpacked = uri.unpack_extension_readable(UEB_data)
try:
pieces = unpack_share(data)
except NeedMoreDataError as e:
# retry once with the larger size
size = e.needed_bytes
f.seek(m.DATA_OFFSET)
data = f.read(min(data_length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = pieces
k = unpacked["needed_shares"]
N = unpacked["total_shares"]
filesize = unpacked["size"]
ueb_hash = unpacked["UEB_hash"]
print("SDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
elif share_type == "MDMF":
fake_shnum = 0
# TODO: factor this out with dump_MDMF_share()
class ShareDumper(MDMFSlotReadProxy):
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
f.seek(m.DATA_OFFSET+where)
data.append(f.read(length))
return defer.succeed({fake_shnum: data})
print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize,
str(ueb_hash, "utf-8"), expiration,
quote_output(abs_sharefile)), file=out)
p = ShareDumper(None, "fake-si", fake_shnum)
def extract(func):
stash = []
# these methods return Deferreds, but we happen to know that
# they run synchronously when not actually talking to a
# remote server
d = func()
d.addCallback(stash.append)
return stash[0]
verinfo = extract(p.get_verinfo)
(seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
offsets) = verinfo
print("MDMF %s %d/%d %d #%d:%s %d %s" % \
(si_s, k, N, datalen,
seqnum, str(base32.b2a(root_hash), "utf-8"),
expiration, quote_output(abs_sharefile)), file=out)
else:
print("UNKNOWN really-unknown %s" % quote_output(abs_sharefile), file=out)
print("UNKNOWN mutable %s" % quote_output(abs_sharefile), file=out)
def _describe_immutable_share(abs_sharefile, now, si_s, out):
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
ReadBucketProxy.__init__(self, None, None, "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
return defer.succeed(sf.read_share_data(offset, size))
# use a ReadBucketProxy to parse the bucket and find the uri extension
sf = ShareFile(abs_sharefile)
bp = ImmediateReadBucketProxy(sf)
expiration_time = min(lease.get_expiration_time()
for lease in sf.get_leases())
expiration = max(0, expiration_time - now)
UEB_data = call(bp.get_uri_extension)
unpacked = uri.unpack_extension_readable(UEB_data)
k = unpacked["needed_shares"]
N = unpacked["total_shares"]
filesize = unpacked["size"]
ueb_hash = unpacked["UEB_hash"]
print("CHK %s %d/%d %d %s %d %s" % (si_s, k, N, filesize,
str(ueb_hash, "utf-8"), expiration,
quote_output(abs_sharefile)), file=out)
f.close()
def catalog_shares(options):
from allmydata.util.encodingutil import listdir_unicode, quote_output
@ -933,34 +935,35 @@ def corrupt_share(options):
f.write(d)
f.close()
f = open(fn, "rb")
prefix = f.read(32)
f.close()
if prefix == MutableShareFile.MAGIC:
# mutable
m = MutableShareFile(fn)
f = open(fn, "rb")
f.seek(m.DATA_OFFSET)
data = f.read(2000)
# make sure this slot contains an SMDF share
assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported"
f.close()
with open(fn, "rb") as f:
prefix = f.read(32)
(version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
ig_datalen, offsets) = unpack_header(data)
if MutableShareFile.is_valid_header(prefix):
# mutable
m = MutableShareFile(fn)
with open(fn, "rb") as f:
f.seek(m.DATA_OFFSET)
# Read enough data to get a mutable header to unpack.
data = f.read(2000)
# make sure this slot contains an SMDF share
assert data[0:1] == b"\x00", "non-SDMF mutable shares not supported"
f.close()
assert version == 0, "we only handle v0 SDMF files"
start = m.DATA_OFFSET + offsets["share_data"]
end = m.DATA_OFFSET + offsets["enc_privkey"]
flip_bit(start, end)
else:
# otherwise assume it's immutable
f = ShareFile(fn)
bp = ReadBucketProxy(None, None, '')
offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
start = f._data_offset + offsets["data"]
end = f._data_offset + offsets["plaintext_hash_tree"]
flip_bit(start, end)
(version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
ig_datalen, offsets) = unpack_header(data)
assert version == 0, "we only handle v0 SDMF files"
start = m.DATA_OFFSET + offsets["share_data"]
end = m.DATA_OFFSET + offsets["enc_privkey"]
flip_bit(start, end)
else:
# otherwise assume it's immutable
f = ShareFile(fn)
bp = ReadBucketProxy(None, None, '')
offsets = bp._parse_offsets(f.read_share_data(0, 0x24))
start = f._data_offset + offsets["data"]
end = f._data_offset + offsets["plaintext_hash_tree"]
flip_bit(start, end)

View File

@ -24,7 +24,6 @@ from allmydata.interfaces import (
)
from allmydata.util import base32, fileutil, log
from allmydata.util.assertutil import precondition
from allmydata.util.hashutil import timing_safe_compare
from allmydata.storage.lease import LeaseInfo
from allmydata.storage.common import UnknownImmutableContainerVersionError
@ -57,6 +56,21 @@ class ShareFile(object):
LEASE_SIZE = struct.calcsize(">L32s32sL")
sharetype = "immutable"
@classmethod
def is_valid_header(cls, header):
# type: (bytes) -> bool
"""
Determine if the given bytes constitute a valid header for this type of
container.
:param header: Some bytes from the beginning of a container.
:return: ``True`` if the bytes could belong to this container,
``False`` otherwise.
"""
(version,) = struct.unpack(">L", header[:4])
return version == 1
def __init__(self, filename, max_size=None, create=False):
""" If max_size is not None then I won't allow more than max_size to be written to me. If create=True and max_size must not be None. """
precondition((max_size is not None) or (not create), max_size, create)
@ -144,7 +158,7 @@ class ShareFile(object):
for i in range(num_leases):
data = f.read(self.LEASE_SIZE)
if data:
yield LeaseInfo().from_immutable_data(data)
yield LeaseInfo.from_immutable_data(data)
def add_lease(self, lease_info):
with open(self.home, 'rb+') as f:
@ -152,13 +166,24 @@ class ShareFile(object):
self._write_lease_record(f, num_leases, lease_info)
self._write_num_leases(f, num_leases+1)
def renew_lease(self, renew_secret, new_expire_time):
def renew_lease(self, renew_secret, new_expire_time, allow_backdate=False):
# type: (bytes, int, bool) -> None
"""
Update the expiration time on an existing lease.
:param allow_backdate: If ``True`` then allow the new expiration time
to be before the current expiration time. Otherwise, make no
change when this is the case.
:raise IndexError: If there is no lease matching the given renew
secret.
"""
for i,lease in enumerate(self.get_leases()):
if timing_safe_compare(lease.renew_secret, renew_secret):
if lease.is_renew_secret(renew_secret):
# yup. See if we need to update the owner time.
if new_expire_time > lease.expiration_time:
if allow_backdate or new_expire_time > lease.get_expiration_time():
# yes
lease.expiration_time = new_expire_time
lease = lease.renew(new_expire_time)
with open(self.home, 'rb+') as f:
self._write_lease_record(f, i, lease)
return
@ -167,7 +192,7 @@ class ShareFile(object):
def add_or_renew_lease(self, lease_info):
try:
self.renew_lease(lease_info.renew_secret,
lease_info.expiration_time)
lease_info.get_expiration_time())
except IndexError:
self.add_lease(lease_info)
@ -183,7 +208,7 @@ class ShareFile(object):
leases = list(self.get_leases())
num_leases_removed = 0
for i,lease in enumerate(leases):
if timing_safe_compare(lease.cancel_secret, cancel_secret):
if lease.is_cancel_secret(cancel_secret):
leases[i] = None
num_leases_removed += 1
if not num_leases_removed:

View File

@ -13,52 +13,132 @@ if PY2:
import struct, time
import attr
from allmydata.util.hashutil import timing_safe_compare
@attr.s(frozen=True)
class LeaseInfo(object):
def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
expiration_time=None, nodeid=None):
self.owner_num = owner_num
self.renew_secret = renew_secret
self.cancel_secret = cancel_secret
self.expiration_time = expiration_time
if nodeid is not None:
assert isinstance(nodeid, bytes)
assert len(nodeid) == 20
self.nodeid = nodeid
"""
Represent the details of one lease, a marker which is intended to inform
the storage server how long to store a particular share.
"""
owner_num = attr.ib(default=None)
# Don't put secrets into the default string representation. This makes it
# slightly less likely the secrets will accidentally be leaked to
# someplace they're not meant to be.
renew_secret = attr.ib(default=None, repr=False)
cancel_secret = attr.ib(default=None, repr=False)
_expiration_time = attr.ib(default=None)
nodeid = attr.ib(default=None)
@nodeid.validator
def _validate_nodeid(self, attribute, value):
if value is not None:
if not isinstance(value, bytes):
raise ValueError(
"nodeid value must be bytes, not {!r}".format(value),
)
if len(value) != 20:
raise ValueError(
"nodeid value must be 20 bytes long, not {!r}".format(value),
)
return None
def get_expiration_time(self):
return self.expiration_time
# type: () -> float
"""
Retrieve a POSIX timestamp representing the time at which this lease is
set to expire.
"""
return self._expiration_time
def renew(self, new_expire_time):
# type: (float) -> LeaseInfo
"""
Create a new lease the same as this one but with a new expiration time.
:param new_expire_time: The new expiration time.
:return: The new lease info.
"""
return attr.assoc(
self,
_expiration_time=new_expire_time,
)
def is_renew_secret(self, candidate_secret):
# type: (bytes) -> bool
"""
Check a string to see if it is the correct renew secret.
:return: ``True`` if it is the correct renew secret, ``False``
otherwise.
"""
return timing_safe_compare(self.renew_secret, candidate_secret)
def is_cancel_secret(self, candidate_secret):
# type: (bytes) -> bool
"""
Check a string to see if it is the correct cancel secret.
:return: ``True`` if it is the correct cancel secret, ``False``
otherwise.
"""
return timing_safe_compare(self.cancel_secret, candidate_secret)
def get_grant_renew_time_time(self):
# hack, based upon fixed 31day expiration period
return self.expiration_time - 31*24*60*60
return self._expiration_time - 31*24*60*60
def get_age(self):
return time.time() - self.get_grant_renew_time_time()
def from_immutable_data(self, data):
(self.owner_num,
self.renew_secret,
self.cancel_secret,
self.expiration_time) = struct.unpack(">L32s32sL", data)
self.nodeid = None
return self
@classmethod
def from_immutable_data(cls, data):
"""
Create a new instance from the encoded data given.
:param data: A lease serialized using the immutable-share-file format.
"""
names = [
"owner_num",
"renew_secret",
"cancel_secret",
"expiration_time",
]
values = struct.unpack(">L32s32sL", data)
return cls(nodeid=None, **dict(zip(names, values)))
def to_immutable_data(self):
return struct.pack(">L32s32sL",
self.owner_num,
self.renew_secret, self.cancel_secret,
int(self.expiration_time))
int(self._expiration_time))
def to_mutable_data(self):
return struct.pack(">LL32s32s20s",
self.owner_num,
int(self.expiration_time),
int(self._expiration_time),
self.renew_secret, self.cancel_secret,
self.nodeid)
def from_mutable_data(self, data):
(self.owner_num,
self.expiration_time,
self.renew_secret, self.cancel_secret,
self.nodeid) = struct.unpack(">LL32s32s20s", data)
return self
@classmethod
def from_mutable_data(cls, data):
"""
Create a new instance from the encoded data given.
:param data: A lease serialized using the mutable-share-file format.
"""
names = [
"owner_num",
"expiration_time",
"renew_secret",
"cancel_secret",
"nodeid",
]
values = struct.unpack(">LL32s32s20s", data)
return cls(**dict(zip(names, values)))

View File

@ -67,6 +67,20 @@ class MutableShareFile(object):
MAX_SIZE = MAX_MUTABLE_SHARE_SIZE
# TODO: decide upon a policy for max share size
@classmethod
def is_valid_header(cls, header):
# type: (bytes) -> bool
"""
Determine if the given bytes constitute a valid header for this type of
container.
:param header: Some bytes from the beginning of a container.
:return: ``True`` if the bytes could belong to this container,
``False`` otherwise.
"""
return header.startswith(cls.MAGIC)
def __init__(self, filename, parent=None):
self.home = filename
if os.path.exists(self.home):
@ -77,7 +91,7 @@ class MutableShareFile(object):
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
struct.unpack(">32s20s32sQQ", data)
if magic != self.MAGIC:
if not self.is_valid_header(data):
msg = "sharefile %s had magic '%r' but we wanted '%r'" % \
(filename, magic, self.MAGIC)
raise UnknownMutableContainerVersionError(msg)
@ -253,7 +267,7 @@ class MutableShareFile(object):
f.seek(offset)
assert f.tell() == offset
data = f.read(self.LEASE_SIZE)
lease_info = LeaseInfo().from_mutable_data(data)
lease_info = LeaseInfo.from_mutable_data(data)
if lease_info.owner_num == 0:
return None
return lease_info
@ -298,15 +312,26 @@ class MutableShareFile(object):
else:
self._write_lease_record(f, num_lease_slots, lease_info)
def renew_lease(self, renew_secret, new_expire_time):
def renew_lease(self, renew_secret, new_expire_time, allow_backdate=False):
# type: (bytes, int, bool) -> None
"""
Update the expiration time on an existing lease.
:param allow_backdate: If ``True`` then allow the new expiration time
to be before the current expiration time. Otherwise, make no
change when this is the case.
:raise IndexError: If there is no lease matching the given renew
secret.
"""
accepting_nodeids = set()
with open(self.home, 'rb+') as f:
for (leasenum,lease) in self._enumerate_leases(f):
if timing_safe_compare(lease.renew_secret, renew_secret):
if lease.is_renew_secret(renew_secret):
# yup. See if we need to update the owner time.
if new_expire_time > lease.expiration_time:
if allow_backdate or new_expire_time > lease.get_expiration_time():
# yes
lease.expiration_time = new_expire_time
lease = lease.renew(new_expire_time)
self._write_lease_record(f, leasenum, lease)
return
accepting_nodeids.add(lease.nodeid)
@ -324,7 +349,7 @@ class MutableShareFile(object):
precondition(lease_info.owner_num != 0) # 0 means "no lease here"
try:
self.renew_lease(lease_info.renew_secret,
lease_info.expiration_time)
lease_info.get_expiration_time())
except IndexError:
self.add_lease(lease_info)
@ -346,7 +371,7 @@ class MutableShareFile(object):
with open(self.home, 'rb+') as f:
for (leasenum,lease) in self._enumerate_leases(f):
accepting_nodeids.add(lease.nodeid)
if timing_safe_compare(lease.cancel_secret, cancel_secret):
if lease.is_cancel_secret(cancel_secret):
self._write_lease_record(f, leasenum, blank_lease)
modified += 1
else:
@ -377,7 +402,7 @@ class MutableShareFile(object):
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
struct.unpack(">32s20s32sQQ", data)
assert magic == self.MAGIC
assert self.is_valid_header(data)
return (write_enabler, write_enabler_nodeid)
def readv(self, readv):
@ -454,4 +479,3 @@ def create_mutable_sharefile(filename, my_nodeid, write_enabler, parent):
ms.create(my_nodeid, write_enabler)
del ms
return MutableShareFile(filename, parent)

View File

@ -14,7 +14,7 @@ if PY2:
else:
from typing import Dict
import os, re, struct, time
import os, re, time
import six
from foolscap.api import Referenceable
@ -373,12 +373,12 @@ class StorageServer(service.MultiService, Referenceable):
for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f:
header = f.read(32)
if header[:32] == MutableShareFile.MAGIC:
if MutableShareFile.is_valid_header(header):
sf = MutableShareFile(filename, self)
# note: if the share has been migrated, the renew_lease()
# call will throw an exception, with information to help the
# client update the lease.
elif header[:4] == struct.pack(">L", 1):
elif ShareFile.is_valid_header(header):
sf = ShareFile(filename)
else:
continue # non-sharefile

View File

@ -17,8 +17,7 @@ from allmydata.storage.immutable import ShareFile
def get_share_file(filename):
with open(filename, "rb") as f:
prefix = f.read(32)
if prefix == MutableShareFile.MAGIC:
if MutableShareFile.is_valid_header(prefix):
return MutableShareFile(filename)
# otherwise assume it's immutable
return ShareFile(filename)

View File

@ -267,8 +267,12 @@ class UseNode(object):
node_config = attr.ib(default=attr.Factory(dict))
config = attr.ib(default=None)
reactor = attr.ib(default=None)
def setUp(self):
self.assigner = SameProcessStreamEndpointAssigner()
self.assigner.setUp()
def format_config_items(config):
return "\n".join(
" = ".join((key, value))
@ -292,6 +296,23 @@ class UseNode(object):
"default",
self.introducer_furl,
)
node_config = self.node_config.copy()
if "tub.port" not in node_config:
if "tub.location" in node_config:
raise ValueError(
"UseNode fixture does not support specifying tub.location "
"without tub.port"
)
# Don't use the normal port auto-assignment logic. It produces
# collisions and makes tests fail spuriously.
tub_location, tub_endpoint = self.assigner.assign(self.reactor)
node_config.update({
"tub.port": tub_endpoint,
"tub.location": tub_location,
})
self.config = config_from_string(
self.basedir.asTextMode().path,
"tub.port",
@ -304,7 +325,7 @@ storage.plugins = {storage_plugin}
{plugin_config_section}
""".format(
storage_plugin=self.storage_plugin,
node_config=format_config_items(self.node_config),
node_config=format_config_items(node_config),
plugin_config_section=plugin_config_section,
)
)
@ -316,7 +337,7 @@ storage.plugins = {storage_plugin}
)
def cleanUp(self):
pass
self.assigner.tearDown()
def getDetails(self):
@ -1068,7 +1089,7 @@ def _corrupt_offset_of_uri_extension_to_force_short_read(data, debug=False):
def _corrupt_mutable_share_data(data, debug=False):
prefix = data[:32]
assert prefix == MutableShareFile.MAGIC, "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
assert MutableShareFile.is_valid_header(prefix), "This function is designed to corrupt mutable shares of v1, and the magic number doesn't look right: %r vs %r" % (prefix, MutableShareFile.MAGIC)
data_offset = MutableShareFile.DATA_OFFSET
sharetype = data[data_offset:data_offset+1]
assert sharetype == b"\x00", "non-SDMF mutable shares not supported"

View File

@ -672,11 +672,14 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
"""
iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir):
_, port_endpoint = self.port_assigner.assign(reactor)
_, web_port_endpoint = self.port_assigner.assign(reactor)
main_location_hint, main_port_endpoint = self.port_assigner.assign(reactor)
introducer_config = (
u"[node]\n"
u"nickname = introducer \N{BLACK SMILING FACE}\n" +
u"web.port = {}\n".format(port_endpoint)
u"web.port = {}\n".format(web_port_endpoint) +
u"tub.port = {}\n".format(main_port_endpoint) +
u"tub.location = {}\n".format(main_location_hint)
).encode("utf-8")
fileutil.make_dirs(iv_dir)
@ -764,13 +767,15 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
def _generate_config(self, which, basedir):
config = {}
except1 = set(range(self.numclients)) - {1}
allclients = set(range(self.numclients))
except1 = allclients - {1}
feature_matrix = {
("client", "nickname"): except1,
# client 1 has to auto-assign an address.
("node", "tub.port"): except1,
("node", "tub.location"): except1,
# Auto-assigning addresses is extremely failure prone and not
# amenable to automated testing in _this_ manner.
("node", "tub.port"): allclients,
("node", "tub.location"): allclients,
# client 0 runs a webserver and a helper
# client 3 runs a webserver but no helper
@ -852,7 +857,13 @@ class SystemTestMixin(pollmixin.PollMixin, testutil.StallMixin):
# connection-lost code
basedir = FilePath(self.getdir("client%d" % client_num))
basedir.makedirs()
config = "[client]\n"
config = (
"[node]\n"
"tub.location = {}\n"
"tub.port = {}\n"
"[client]\n"
).format(*self.port_assigner.assign(reactor))
if helper_furl:
config += "helper.furl = %s\n" % helper_furl
basedir.child("tahoe.cfg").setContent(config.encode("utf-8"))

View File

@ -17,8 +17,7 @@ from __future__ import unicode_literals
# This should be useful for tests which want to examine and/or manipulate the
# uploaded shares, checker/verifier/repairer tests, etc. The clients have no
# Tubs, so it is not useful for tests that involve a Helper or the
# control.furl .
# Tubs, so it is not useful for tests that involve a Helper.
from future.utils import PY2
if PY2:
@ -26,6 +25,11 @@ if PY2:
from past.builtins import unicode
from six import ensure_text
try:
from typing import Dict, Callable
except ImportError:
pass
import os
from base64 import b32encode
from functools import (
@ -251,7 +255,6 @@ def create_no_network_client(basedir):
client = _NoNetworkClient(
config,
main_tub=None,
control_tub=None,
i2p_provider=None,
tor_provider=None,
introducer_clients=[],
@ -274,8 +277,6 @@ class _NoNetworkClient(_Client): # type: ignore # tahoe-lafs/ticket/3573
pass
def init_introducer_client(self):
pass
def create_control_tub(self):
pass
def create_log_tub(self):
pass
def setup_logging(self):
@ -284,8 +285,6 @@ class _NoNetworkClient(_Client): # type: ignore # tahoe-lafs/ticket/3573
service.MultiService.startService(self)
def stopService(self):
return service.MultiService.stopService(self)
def init_control(self):
pass
def init_helper(self):
pass
def init_key_gen(self):
@ -485,6 +484,18 @@ class GridTestMixin(object):
def set_up_grid(self, num_clients=1, num_servers=10,
client_config_hooks={}, oneshare=False):
"""
Create a Tahoe-LAFS storage grid.
:param num_clients: See ``NoNetworkGrid``
:param num_servers: See `NoNetworkGrid``
:param client_config_hooks: See ``NoNetworkGrid``
:param bool oneshare: If ``True`` then the first client node is
configured with ``n == k == happy == 1``.
:return: ``None``
"""
# self.basedir must be set
port_assigner = SameProcessStreamEndpointAssigner()
port_assigner.setUp()
@ -563,6 +574,15 @@ class GridTestMixin(object):
return sorted(shares)
def copy_shares(self, uri):
# type: (bytes) -> Dict[bytes, bytes]
"""
Read all of the share files for the given capability from the storage area
of the storage servers created by ``set_up_grid``.
:param bytes uri: A Tahoe-LAFS data capability.
:return: A ``dict`` mapping share file names to share file contents.
"""
shares = {}
for (shnum, serverid, sharefile) in self.find_uri_shares(uri):
with open(sharefile, "rb") as f:
@ -607,10 +627,15 @@ class GridTestMixin(object):
f.write(corruptdata)
def corrupt_all_shares(self, uri, corruptor, debug=False):
# type: (bytes, Callable[[bytes, bool], bytes], bool) -> None
"""
Apply ``corruptor`` to the contents of all share files associated with a
given capability and replace the share file contents with its result.
"""
for (i_shnum, i_serverid, i_sharefile) in self.find_uri_shares(uri):
with open(i_sharefile, "rb") as f:
sharedata = f.read()
corruptdata = corruptor(sharedata, debug=debug)
corruptdata = corruptor(sharedata, debug)
with open(i_sharefile, "wb") as f:
f.write(corruptdata)

View File

@ -89,6 +89,7 @@ from .common import (
UseTestPlugins,
MemoryIntroducerClient,
get_published_announcements,
UseNode,
)
from .matchers import (
MatchesSameElements,
@ -953,13 +954,14 @@ class Run(unittest.TestCase, testutil.StallMixin):
@defer.inlineCallbacks
def test_reloadable(self):
basedir = FilePath("test_client.Run.test_reloadable")
private = basedir.child("private")
private.makedirs()
from twisted.internet import reactor
dummy = "pb://wl74cyahejagspqgy4x5ukrvfnevlknt@127.0.0.1:58889/bogus"
write_introducer(basedir, "someintroducer", dummy)
basedir.child("tahoe.cfg").setContent(BASECONFIG. encode("ascii"))
c1 = yield client.create_client(basedir.path)
fixture = UseNode(None, None, FilePath(self.mktemp()), dummy, reactor=reactor)
fixture.setUp()
self.addCleanup(fixture.cleanUp)
c1 = yield fixture.create_node()
c1.setServiceParent(self.sparent)
# delay to let the service start up completely. I'm not entirely sure
@ -981,7 +983,7 @@ class Run(unittest.TestCase, testutil.StallMixin):
# also change _check_exit_trigger to use it instead of a raw
# reactor.stop, also instrument the shutdown event in an
# attribute that we can check.)
c2 = yield client.create_client(basedir.path)
c2 = yield fixture.create_node()
c2.setServiceParent(self.sparent)
yield c2.disownServiceParent()

View File

@ -14,6 +14,11 @@ if PY2:
# a previous run. This asserts that the current code is capable of decoding
# shares from a previous version.
try:
from typing import Any
except ImportError:
pass
import six
import os
from twisted.trial import unittest
@ -951,12 +956,52 @@ class Corruption(_Base, unittest.TestCase):
self.corrupt_shares_numbered(imm_uri, [2], _corruptor)
def _corrupt_set(self, ign, imm_uri, which, newvalue):
# type: (Any, bytes, int, int) -> None
"""
Replace a single byte share file number 2 for the given capability with a
new byte.
:param imm_uri: Corrupt share number 2 belonging to this capability.
:param which: The byte position to replace.
:param newvalue: The new byte value to set in the share.
"""
log.msg("corrupt %d" % which)
def _corruptor(s, debug=False):
return s[:which] + bchr(newvalue) + s[which+1:]
self.corrupt_shares_numbered(imm_uri, [2], _corruptor)
def test_each_byte(self):
"""
Test share selection behavior of the downloader in the face of certain
kinds of data corruption.
1. upload a small share to the no-network grid
2. read all of the resulting share files out of the no-network storage servers
3. for each of
a. each byte of the share file version field
b. each byte of the immutable share version field
c. each byte of the immutable share data offset field
d. the most significant byte of the block_shares offset field
e. one of the bytes of one of the merkle trees
f. one of the bytes of the share hashes list
i. flip the least significant bit in all of the the share files
ii. perform the download/check/restore process
4. add 2 ** 24 to the share file version number
5. perform the download/check/restore process
6. add 2 ** 24 to the share version number
7. perform the download/check/restore process
The download/check/restore process is:
1. attempt to download the data
2. assert that the recovered plaintext is correct
3. assert that only the "correct" share numbers were used to reconstruct the plaintext
4. restore all of the share files to their pristine condition
"""
# Setting catalog_detection=True performs an exhaustive test of the
# Downloader's response to corruption in the lsb of each byte of the
# 2070-byte share, with two goals: make sure we tolerate all forms of
@ -1145,8 +1190,18 @@ class Corruption(_Base, unittest.TestCase):
return d
def _corrupt_flip_all(self, ign, imm_uri, which):
# type: (Any, bytes, int) -> None
"""
Flip the least significant bit at a given byte position in all share files
for the given capability.
"""
def _corruptor(s, debug=False):
return s[:which] + bchr(ord(s[which:which+1])^0x01) + s[which+1:]
# type: (bytes, bool) -> bytes
before_corruption = s[:which]
after_corruption = s[which+1:]
original_byte = s[which:which+1]
corrupt_byte = bchr(ord(original_byte) ^ 0x01)
return b"".join([before_corruption, corrupt_byte, after_corruption])
self.corrupt_all_shares(imm_uri, _corruptor)
class DownloadV2(_Base, unittest.TestCase):

View File

@ -69,6 +69,8 @@ import allmydata.test.common_util as testutil
from .common import (
ConstantAddresses,
SameProcessStreamEndpointAssigner,
UseNode,
)
def port_numbers():
@ -80,11 +82,10 @@ class LoggingMultiService(service.MultiService):
# see https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2946
def testing_tub(config_data=''):
def testing_tub(reactor, config_data=''):
"""
Creates a 'main' Tub for testing purposes, from config data
"""
from twisted.internet import reactor
basedir = 'dummy_basedir'
config = config_from_string(basedir, 'DEFAULT_PORTNUMFILE_BLANK', config_data)
fileutil.make_dirs(os.path.join(basedir, 'private'))
@ -112,6 +113,9 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
# try to bind the port. We'll use a low-numbered one that's likely to
# conflict with another service to prove it.
self._available_port = 22
self.port_assigner = SameProcessStreamEndpointAssigner()
self.port_assigner.setUp()
self.addCleanup(self.port_assigner.tearDown)
def _test_location(
self,
@ -137,11 +141,23 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
:param local_addresses: If not ``None`` then a list of addresses to
supply to the system under test as local addresses.
"""
from twisted.internet import reactor
basedir = self.mktemp()
create_node_dir(basedir, "testing")
if tub_port is None:
# Always configure a usable tub.port address instead of relying on
# the automatic port assignment. The automatic port assignment is
# prone to collisions and spurious test failures.
_, tub_port = self.port_assigner.assign(reactor)
config_data = "[node]\n"
if tub_port:
config_data += "tub.port = {}\n".format(tub_port)
config_data += "tub.port = {}\n".format(tub_port)
# If they wanted a certain location, go for it. This probably won't
# agree with the tub.port value we set but that only matters if
# anything tries to use this to establish a connection ... which
# nothing in this test suite will.
if tub_location is not None:
config_data += "tub.location = {}\n".format(tub_location)
@ -149,7 +165,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
self.patch(iputil, 'get_local_addresses_sync',
lambda: local_addresses)
tub = testing_tub(config_data)
tub = testing_tub(reactor, config_data)
class Foo(object):
pass
@ -431,7 +447,12 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
@defer.inlineCallbacks
def test_logdir_is_str(self):
basedir = "test_node/test_logdir_is_str"
from twisted.internet import reactor
basedir = FilePath(self.mktemp())
fixture = UseNode(None, None, basedir, "pb://introducer/furl", {}, reactor=reactor)
fixture.setUp()
self.addCleanup(fixture.cleanUp)
ns = Namespace()
ns.called = False
@ -440,8 +461,7 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
self.failUnless(isinstance(logdir, str), logdir)
self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
create_node_dir(basedir, "nothing to see here")
yield client.create_client(basedir)
yield fixture.create_node()
self.failUnless(ns.called)
def test_set_config_unescaped_furl_hash(self):

View File

@ -755,28 +755,28 @@ class Server(unittest.TestCase):
# Create a bucket:
rs0, cs0 = self.create_bucket_5_shares(ss, b"si0")
leases = list(ss.get_leases(b"si0"))
self.failUnlessEqual(len(leases), 1)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
(lease,) = ss.get_leases(b"si0")
self.assertTrue(lease.is_renew_secret(rs0))
rs1, cs1 = self.create_bucket_5_shares(ss, b"si1")
# take out a second lease on si1
rs2, cs2 = self.create_bucket_5_shares(ss, b"si1", 5, 0)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 2)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
(lease1, lease2) = ss.get_leases(b"si1")
self.assertTrue(lease1.is_renew_secret(rs1))
self.assertTrue(lease2.is_renew_secret(rs2))
# and a third lease, using add-lease
rs2a,cs2a = (hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret)),
hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret)))
ss.remote_add_lease(b"si1", rs2a, cs2a)
leases = list(ss.get_leases(b"si1"))
self.failUnlessEqual(len(leases), 3)
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
(lease1, lease2, lease3) = ss.get_leases(b"si1")
self.assertTrue(lease1.is_renew_secret(rs1))
self.assertTrue(lease2.is_renew_secret(rs2))
self.assertTrue(lease3.is_renew_secret(rs2a))
# add-lease on a missing storage index is silently ignored
self.failUnlessEqual(ss.remote_add_lease(b"si18", b"", b""), None)
self.assertIsNone(ss.remote_add_lease(b"si18", b"", b""))
# check that si0 is readable
readers = ss.remote_get_buckets(b"si0")
@ -835,7 +835,7 @@ class Server(unittest.TestCase):
# Start out with single lease created with bucket:
renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0")
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.expiration_time, 123 + DEFAULT_RENEWAL_TIME)
self.assertEqual(lease.get_expiration_time(), 123 + DEFAULT_RENEWAL_TIME)
# Time passes:
clock.advance(123456)
@ -843,7 +843,7 @@ class Server(unittest.TestCase):
# Adding a lease with matching renewal secret just renews it:
ss.remote_add_lease(b"si0", renewal_secret, cancel_secret)
[lease] = ss.get_leases(b"si0")
self.assertEqual(lease.expiration_time, 123 + 123456 + DEFAULT_RENEWAL_TIME)
self.assertEqual(lease.get_expiration_time(), 123 + 123456 + DEFAULT_RENEWAL_TIME)
def test_have_shares(self):
"""By default the StorageServer has no shares."""
@ -1230,17 +1230,6 @@ class MutableServer(unittest.TestCase):
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
self.failUnlessEqual(a.nodeid, b.nodeid)
def compare_leases(self, leases_a, leases_b):
self.failUnlessEqual(len(leases_a), len(leases_b))
for i in range(len(leases_a)):
a = leases_a[i]
b = leases_b[i]
self.failUnlessEqual(a.owner_num, b.owner_num)
self.failUnlessEqual(a.renew_secret, b.renew_secret)
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
self.failUnlessEqual(a.nodeid, b.nodeid)
self.failUnlessEqual(a.expiration_time, b.expiration_time)
def test_leases(self):
ss = self.create("test_leases")
def secrets(n):
@ -1321,11 +1310,11 @@ class MutableServer(unittest.TestCase):
self.failUnlessIn("I have leases accepted by nodeids:", e_s)
self.failUnlessIn("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' .", e_s)
self.compare_leases(all_leases, list(s0.get_leases()))
self.assertEqual(all_leases, list(s0.get_leases()))
# reading shares should not modify the timestamp
read(b"si1", [], [(0,200)])
self.compare_leases(all_leases, list(s0.get_leases()))
self.assertEqual(all_leases, list(s0.get_leases()))
write(b"si1", secrets(0),
{0: ([], [(200, b"make me bigger")], None)}, [])
@ -1359,7 +1348,7 @@ class MutableServer(unittest.TestCase):
"shares", storage_index_to_dir(b"si1"))
s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
[lease] = s0.get_leases()
self.assertEqual(lease.expiration_time, 235 + DEFAULT_RENEWAL_TIME)
self.assertEqual(lease.get_expiration_time(), 235 + DEFAULT_RENEWAL_TIME)
# Time passes...
clock.advance(835)
@ -1367,7 +1356,7 @@ class MutableServer(unittest.TestCase):
# Adding a lease renews it:
ss.remote_add_lease(b"si1", renew_secret, cancel_secret)
[lease] = s0.get_leases()
self.assertEqual(lease.expiration_time,
self.assertEqual(lease.get_expiration_time(),
235 + 835 + DEFAULT_RENEWAL_TIME)
def test_remove(self):
@ -3039,3 +3028,102 @@ class ShareFileTests(unittest.TestCase):
sf = self.get_sharefile()
with self.assertRaises(IndexError):
sf.cancel_lease(b"garbage")
def test_renew_secret(self):
"""
A lease loaded from a share file can have its renew secret verified.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
expiration_time = 2 ** 31
sf = self.get_sharefile()
lease = LeaseInfo(
owner_num=0,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
expiration_time=expiration_time,
)
sf.add_lease(lease)
(loaded_lease,) = sf.get_leases()
self.assertTrue(loaded_lease.is_renew_secret(renew_secret))
def test_cancel_secret(self):
"""
A lease loaded from a share file can have its cancel secret verified.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
expiration_time = 2 ** 31
sf = self.get_sharefile()
lease = LeaseInfo(
owner_num=0,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
expiration_time=expiration_time,
)
sf.add_lease(lease)
(loaded_lease,) = sf.get_leases()
self.assertTrue(loaded_lease.is_cancel_secret(cancel_secret))
class LeaseInfoTests(unittest.TestCase):
"""
Tests for ``allmydata.storage.lease.LeaseInfo``.
"""
def test_is_renew_secret(self):
"""
``LeaseInfo.is_renew_secret`` returns ``True`` if the value given is the
renew secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertTrue(lease.is_renew_secret(renew_secret))
def test_is_not_renew_secret(self):
"""
``LeaseInfo.is_renew_secret`` returns ``False`` if the value given is not
the renew secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertFalse(lease.is_renew_secret(cancel_secret))
def test_is_cancel_secret(self):
"""
``LeaseInfo.is_cancel_secret`` returns ``True`` if the value given is the
cancel secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertTrue(lease.is_cancel_secret(cancel_secret))
def test_is_not_cancel_secret(self):
"""
``LeaseInfo.is_cancel_secret`` returns ``False`` if the value given is not
the cancel secret.
"""
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
lease = LeaseInfo(
owner_num=1,
renew_secret=renew_secret,
cancel_secret=cancel_secret,
)
self.assertFalse(lease.is_cancel_secret(renew_secret))

View File

@ -485,17 +485,7 @@ class LeaseCrawler(unittest.TestCase, pollmixin.PollMixin):
return d
def backdate_lease(self, sf, renew_secret, new_expire_time):
# ShareFile.renew_lease ignores attempts to back-date a lease (i.e.
# "renew" a lease with a new_expire_time that is older than what the
# current lease has), so we have to reach inside it.
for i,lease in enumerate(sf.get_leases()):
if lease.renew_secret == renew_secret:
lease.expiration_time = new_expire_time
f = open(sf.home, 'rb+')
sf._write_lease_record(f, i, lease)
f.close()
return
raise IndexError("unable to renew non-existent lease")
sf.renew_lease(renew_secret, new_expire_time, allow_backdate=True)
def test_expire_age(self):
basedir = "storage/LeaseCrawler/expire_age"

View File

@ -12,7 +12,7 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, dict, list, object, range, max, min, str # noqa: F401
from past.builtins import chr as byteschr, long
from six import ensure_text, ensure_str
from six import ensure_text
import os, re, sys, time, json
@ -23,6 +23,7 @@ from twisted.internet import defer
from allmydata import uri
from allmydata.storage.mutable import MutableShareFile
from allmydata.storage.immutable import ShareFile
from allmydata.storage.server import si_a2b
from allmydata.immutable import offloaded, upload
from allmydata.immutable.literal import LiteralFileNode
@ -780,7 +781,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
d.addCallback(self._check_publish_private)
d.addCallback(self.log, "did _check_publish_private")
d.addCallback(self._test_web)
d.addCallback(self._test_control)
d.addCallback(self._test_cli)
# P now has four top-level children:
# P/personal/sekrit data
@ -1291,9 +1291,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
# are sharefiles here
filename = os.path.join(dirpath, filenames[0])
# peek at the magic to see if it is a chk share
magic = open(filename, "rb").read(4)
if magic == b'\x00\x00\x00\x01':
break
with open(filename, "rb") as f:
if ShareFile.is_valid_header(f.read(32)):
break
else:
self.fail("unable to find any uri_extension files in %r"
% self.basedir)
@ -1343,25 +1343,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
if line.startswith("CHK %s " % storage_index_s)]
self.failUnlessEqual(len(matching), 10)
def _test_control(self, res):
# exercise the remote-control-the-client foolscap interfaces in
# allmydata.control (mostly used for performance tests)
c0 = self.clients[0]
control_furl_file = c0.config.get_private_path("control.furl")
control_furl = ensure_str(open(control_furl_file, "r").read().strip())
# it doesn't really matter which Tub we use to connect to the client,
# so let's just use our IntroducerNode's
d = self.introducer.tub.getReference(control_furl)
d.addCallback(self._test_control2, control_furl_file)
return d
def _test_control2(self, rref, filename):
d = defer.succeed(None)
d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
if sys.platform in ("linux2", "linux3"):
d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
return d
def _test_cli(self, res):
# run various CLI commands (in a thread, since they use blocking
# network calls)

View File

@ -83,12 +83,18 @@ def create_introducer_webish(reactor, port_assigner, basedir):
with the node and its webish service.
"""
node.create_node_dir(basedir, "testing")
_, port_endpoint = port_assigner.assign(reactor)
main_tub_location, main_tub_endpoint = port_assigner.assign(reactor)
_, web_port_endpoint = port_assigner.assign(reactor)
with open(join(basedir, "tahoe.cfg"), "w") as f:
f.write(
"[node]\n"
"tub.location = 127.0.0.1:1\n" +
"web.port = {}\n".format(port_endpoint)
"tub.port = {main_tub_endpoint}\n"
"tub.location = {main_tub_location}\n"
"web.port = {web_port_endpoint}\n".format(
main_tub_endpoint=main_tub_endpoint,
main_tub_location=main_tub_location,
web_port_endpoint=web_port_endpoint,
)
)
intro_node = yield create_introducer(basedir)
@ -211,7 +217,7 @@ class IntroducerRootTests(SyncTestCase):
main_tub = Tub()
main_tub.listenOn(b"tcp:0")
main_tub.setLocation(b"tcp:127.0.0.1:1")
introducer_node = _IntroducerNode(config, main_tub, None, None, None)
introducer_node = _IntroducerNode(config, main_tub, None, None)
introducer_service = introducer_node.getServiceNamed("introducer")
for n in range(2):

18
tox.ini
View File

@ -206,17 +206,6 @@ commands =
flogtool --version
python misc/build_helpers/run-deprecations.py --package allmydata --warnings={env:TAHOE_LAFS_WARNINGS_LOG:_trial_temp/deprecation-warnings.log} trial {env:TAHOE_LAFS_TRIAL_ARGS:--rterrors} {posargs:allmydata}
[testenv:checkmemory]
commands =
rm -rf _test_memory
python src/allmydata/test/check_memory.py upload
python src/allmydata/test/check_memory.py upload-self
python src/allmydata/test/check_memory.py upload-POST
python src/allmydata/test/check_memory.py download
python src/allmydata/test/check_memory.py download-GET
python src/allmydata/test/check_memory.py download-GET-slow
python src/allmydata/test/check_memory.py receive
# Use 'tox -e docs' to check formatting and cross-references in docs .rst
# files. The published docs are built by code run over at readthedocs.org,
# which does not use this target (but does something similar).
@ -228,13 +217,8 @@ commands =
# your web browser.
[testenv:docs]
# we pin docutils because of https://sourceforge.net/p/docutils/bugs/301/
# which asserts when it reads links to .svg files (e.g. about.rst)
deps =
sphinx
docutils==0.12
recommonmark
sphinx_rtd_theme
-r docs/requirements.txt
# normal install is not needed for docs, and slows things down
skip_install = True
commands =