Merge pull request #1141 from LeastAuthority/3814.remove-control-port

Remove the "control port" and its associated Tub

Fixes: ticket:3814
This commit is contained in:
Jean-Paul Calderone 2021-10-25 20:57:23 -04:00 committed by GitHub
commit 84dfb360ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 10 additions and 1151 deletions

1
.gitignore vendored
View File

@ -30,7 +30,6 @@ zope.interface-*.egg
/src/allmydata/test/plugins/dropin.cache
/_trial_temp*
/_test_memory/
/tmp*
/*.patch
/dist/

View File

@ -142,31 +142,6 @@ count-lines:
# src/allmydata/test/bench_dirnode.py
# The check-speed and check-grid targets are disabled, since they depend upon
# the pre-located $(TAHOE) executable that was removed when we switched to
# tox. They will eventually be resurrected as dedicated tox environments.
# The check-speed target uses a pre-established client node to run a canned
# set of performance tests against a test network that is also
# pre-established (probably on a remote machine). Provide it with the path to
# a local directory where this client node has been created (and populated
# with the necessary FURLs of the test network). This target will start that
# client with the current code and then run the tests. Afterwards it will
# stop the client.
#
# The 'sleep 5' is in there to give the new client a chance to connect to its
# storageservers, since check_speed.py has no good way of doing that itself.
##.PHONY: check-speed
##check-speed: .built
## if [ -z '$(TESTCLIENTDIR)' ]; then exit 1; fi
## @echo "stopping any leftover client code"
## -$(TAHOE) stop $(TESTCLIENTDIR)
## $(TAHOE) start $(TESTCLIENTDIR)
## sleep 5
## $(TAHOE) @src/allmydata/test/check_speed.py $(TESTCLIENTDIR)
## $(TAHOE) stop $(TESTCLIENTDIR)
# The check-grid target also uses a pre-established client node, along with a
# long-term directory that contains some well-known files. See the docstring
# in src/allmydata/test/check_grid.py to see how to set this up.
@ -195,12 +170,11 @@ test-clean:
# Use 'make distclean' instead to delete all generated files.
.PHONY: clean
clean:
rm -rf build _trial_temp _test_memory .built
rm -rf build _trial_temp .built
rm -f `find src *.egg -name '*.so' -or -name '*.pyc'`
rm -rf support dist
rm -rf `ls -d *.egg | grep -vEe"setuptools-|setuptools_darcs-|darcsver-"`
rm -rf *.pyc
rm -f bin/tahoe bin/tahoe.pyscript
rm -f *.pkg
.PHONY: distclean

View File

@ -1,522 +0,0 @@
from __future__ import print_function
import os, shutil, sys, urllib, time, stat, urlparse
# Python 2 compatibility
from future.utils import PY2
if PY2:
from future.builtins import str # noqa: F401
from six.moves import cStringIO as StringIO
from twisted.python.filepath import (
FilePath,
)
from twisted.internet import defer, reactor, protocol, error
from twisted.application import service, internet
from twisted.web import client as tw_client
from twisted.python import log, procutils
from foolscap.api import Tub, fireEventually, flushEventualQueue
from allmydata import client, introducer
from allmydata.immutable import upload
from allmydata.scripts import create_node
from allmydata.util import fileutil, pollmixin
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.scripts.common import (
write_introducer,
)
class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter, object):
full_speed_ahead = False
_bytes_so_far = 0
stalled = None
def handleResponsePart(self, data):
self._bytes_so_far += len(data)
if not self.factory.do_stall:
return
if self.full_speed_ahead:
return
if self._bytes_so_far > 1e6+100:
if not self.stalled:
print("STALLING")
self.transport.pauseProducing()
self.stalled = reactor.callLater(10.0, self._resume_speed)
def _resume_speed(self):
print("RESUME SPEED")
self.stalled = None
self.full_speed_ahead = True
self.transport.resumeProducing()
def handleResponseEnd(self):
if self.stalled:
print("CANCEL")
self.stalled.cancel()
self.stalled = None
return tw_client.HTTPPageGetter.handleResponseEnd(self)
class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory, object):
protocol = StallableHTTPGetterDiscarder
def discardPage(url, stall=False, *args, **kwargs):
"""Start fetching the URL, but stall our pipe after the first 1MB.
Wait 10 seconds, then resume downloading (and discarding) everything.
"""
# adapted from twisted.web.client.getPage . We can't just wrap or
# subclass because it provides no way to override the HTTPClientFactory
# that it creates.
scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
assert scheme == 'http'
host, port = netloc, 80
if ":" in host:
host, port = host.split(":")
port = int(port)
factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
factory.do_stall = stall
reactor.connectTCP(host, port, factory)
return factory.deferred
class ChildDidNotStartError(Exception):
pass
class SystemFramework(pollmixin.PollMixin):
numnodes = 7
def __init__(self, basedir, mode):
self.basedir = basedir = abspath_expanduser_unicode(str(basedir))
if not (basedir + os.path.sep).startswith(abspath_expanduser_unicode(u".") + os.path.sep):
raise AssertionError("safety issue: basedir must be a subdir")
self.testdir = testdir = os.path.join(basedir, "test")
if os.path.exists(testdir):
shutil.rmtree(testdir)
fileutil.make_dirs(testdir)
self.sparent = service.MultiService()
self.sparent.startService()
self.proc = None
self.tub = Tub()
self.tub.setOption("expose-remote-exception-types", False)
self.tub.setServiceParent(self.sparent)
self.mode = mode
self.failed = False
self.keepalive_file = None
def run(self):
framelog = os.path.join(self.basedir, "driver.log")
log.startLogging(open(framelog, "a"), setStdout=False)
log.msg("CHECK_MEMORY(mode=%s) STARTING" % self.mode)
#logfile = open(os.path.join(self.testdir, "log"), "w")
#flo = log.FileLogObserver(logfile)
#log.startLoggingWithObserver(flo.emit, setStdout=False)
d = fireEventually()
d.addCallback(lambda res: self.setUp())
d.addCallback(lambda res: self.record_initial_memusage())
d.addCallback(lambda res: self.make_nodes())
d.addCallback(lambda res: self.wait_for_client_connected())
d.addCallback(lambda res: self.do_test())
d.addBoth(self.tearDown)
def _err(err):
self.failed = err
log.err(err)
print(err)
d.addErrback(_err)
def _done(res):
reactor.stop()
return res
d.addBoth(_done)
reactor.run()
if self.failed:
# raiseException doesn't work for CopiedFailures
self.failed.raiseException()
def setUp(self):
#print("STARTING")
self.stats = {}
self.statsfile = open(os.path.join(self.basedir, "stats.out"), "a")
self.make_introducer()
d = self.start_client()
def _record_control_furl(control_furl):
self.control_furl = control_furl
#print("OBTAINING '%s'" % (control_furl,))
return self.tub.getReference(self.control_furl)
d.addCallback(_record_control_furl)
def _record_control(control_rref):
self.control_rref = control_rref
d.addCallback(_record_control)
def _ready(res):
#print("CLIENT READY")
pass
d.addCallback(_ready)
return d
def record_initial_memusage(self):
print()
print("Client started (no connections yet)")
d = self._print_usage()
d.addCallback(self.stash_stats, "init")
return d
def wait_for_client_connected(self):
print()
print("Client connecting to other nodes..")
return self.control_rref.callRemote("wait_for_client_connections",
self.numnodes+1)
def tearDown(self, passthrough):
# the client node will shut down in a few seconds
#os.remove(os.path.join(self.clientdir, client.Client.EXIT_TRIGGER_FILE))
log.msg("shutting down SystemTest services")
if self.keepalive_file and os.path.exists(self.keepalive_file):
age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
log.msg("keepalive file at shutdown was %ds old" % age)
d = defer.succeed(None)
if self.proc:
d.addCallback(lambda res: self.kill_client())
d.addCallback(lambda res: self.sparent.stopService())
d.addCallback(lambda res: flushEventualQueue())
def _close_statsfile(res):
self.statsfile.close()
d.addCallback(_close_statsfile)
d.addCallback(lambda res: passthrough)
return d
def make_introducer(self):
iv_basedir = os.path.join(self.testdir, "introducer")
os.mkdir(iv_basedir)
self.introducer = introducer.IntroducerNode(basedir=iv_basedir)
self.introducer.setServiceParent(self)
self.introducer_furl = self.introducer.introducer_url
def make_nodes(self):
root = FilePath(self.testdir)
self.nodes = []
for i in range(self.numnodes):
nodedir = root.child("node%d" % (i,))
private = nodedir.child("private")
private.makedirs()
write_introducer(nodedir, "default", self.introducer_url)
config = (
"[client]\n"
"shares.happy = 1\n"
"[storage]\n"
)
# the only tests for which we want the internal nodes to actually
# retain shares are the ones where somebody's going to download
# them.
if self.mode in ("download", "download-GET", "download-GET-slow"):
# retain shares
pass
else:
# for these tests, we tell the storage servers to pretend to
# accept shares, but really just throw them out, since we're
# only testing upload and not download.
config += "debug_discard = true\n"
if self.mode in ("receive",):
# for this mode, the client-under-test gets all the shares,
# so our internal nodes can refuse requests
config += "readonly = true\n"
nodedir.child("tahoe.cfg").setContent(config)
c = client.Client(basedir=nodedir.path)
c.setServiceParent(self)
self.nodes.append(c)
# the peers will start running, eventually they will connect to each
# other and the introducer
def touch_keepalive(self):
if os.path.exists(self.keepalive_file):
age = time.time() - os.stat(self.keepalive_file)[stat.ST_MTIME]
log.msg("touching keepalive file, was %ds old" % age)
f = open(self.keepalive_file, "w")
f.write("""\
If the node notices this file at startup, it will poll every 5 seconds and
terminate if the file is more than 10 seconds old, or if it has been deleted.
If the test harness has an internal failure and neglects to kill off the node
itself, this helps to avoid leaving processes lying around. The contents of
this file are ignored.
""")
f.close()
def start_client(self):
# this returns a Deferred that fires with the client's control.furl
log.msg("MAKING CLIENT")
# self.testdir is an absolute Unicode path
clientdir = self.clientdir = os.path.join(self.testdir, u"client")
clientdir_str = clientdir.encode(get_filesystem_encoding())
quiet = StringIO()
create_node.create_node({'basedir': clientdir}, out=quiet)
log.msg("DONE MAKING CLIENT")
write_introducer(clientdir, "default", self.introducer_furl)
# now replace tahoe.cfg
# set webport=0 and then ask the node what port it picked.
f = open(os.path.join(clientdir, "tahoe.cfg"), "w")
f.write("[node]\n"
"web.port = tcp:0:interface=127.0.0.1\n"
"[client]\n"
"shares.happy = 1\n"
"[storage]\n"
)
if self.mode in ("upload-self", "receive"):
# accept and store shares, to trigger the memory consumption bugs
pass
else:
# don't accept any shares
f.write("readonly = true\n")
## also, if we do receive any shares, throw them away
#f.write("debug_discard = true")
if self.mode == "upload-self":
pass
f.close()
self.keepalive_file = os.path.join(clientdir,
client.Client.EXIT_TRIGGER_FILE)
# now start updating the mtime.
self.touch_keepalive()
ts = internet.TimerService(1.0, self.touch_keepalive)
ts.setServiceParent(self.sparent)
pp = ClientWatcher()
self.proc_done = pp.d = defer.Deferred()
logfile = os.path.join(self.basedir, "client.log")
tahoes = procutils.which("tahoe")
if not tahoes:
raise RuntimeError("unable to find a 'tahoe' executable")
cmd = [tahoes[0], "run", ".", "-l", logfile]
env = os.environ.copy()
self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir_str)
log.msg("CLIENT STARTED")
# now we wait for the client to get started. we're looking for the
# control.furl file to appear.
furl_file = os.path.join(clientdir, "private", "control.furl")
url_file = os.path.join(clientdir, "node.url")
def _check():
if pp.ended and pp.ended.value.status != 0:
# the twistd process ends normally (with rc=0) if the child
# is successfully launched. It ends abnormally (with rc!=0)
# if the child cannot be launched.
raise ChildDidNotStartError("process ended while waiting for startup")
return os.path.exists(furl_file)
d = self.poll(_check, 0.1)
# once it exists, wait a moment before we read from it, just in case
# it hasn't finished writing the whole thing. Ideally control.furl
# would be created in some atomic fashion, or made non-readable until
# it's ready, but I can't think of an easy way to do that, and I
# think the chances that we'll observe a half-write are pretty low.
def _stall(res):
d2 = defer.Deferred()
reactor.callLater(0.1, d2.callback, None)
return d2
d.addCallback(_stall)
def _read(res):
# read the node's URL
self.webish_url = open(url_file, "r").read().strip()
if self.webish_url[-1] == "/":
# trim trailing slash, since the rest of the code wants it gone
self.webish_url = self.webish_url[:-1]
f = open(furl_file, "r")
furl = f.read()
return furl.strip()
d.addCallback(_read)
return d
def kill_client(self):
# returns a Deferred that fires when the process exits. This may only
# be called once.
try:
self.proc.signalProcess("INT")
except error.ProcessExitedAlready:
pass
return self.proc_done
def create_data(self, name, size):
filename = os.path.join(self.testdir, name + ".data")
f = open(filename, "wb")
block = "a" * 8192
while size > 0:
l = min(size, 8192)
f.write(block[:l])
size -= l
return filename
def stash_stats(self, stats, name):
self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
self.statsfile.flush()
self.stats[name] = stats['VmPeak']
def POST(self, urlpath, **fields):
url = self.webish_url + urlpath
sepbase = "boogabooga"
sep = "--" + sepbase
form = []
form.append(sep)
form.append('Content-Disposition: form-data; name="_charset"')
form.append('')
form.append('UTF-8')
form.append(sep)
for name, value in fields.iteritems():
if isinstance(value, tuple):
filename, value = value
form.append('Content-Disposition: form-data; name="%s"; '
'filename="%s"' % (name, filename))
else:
form.append('Content-Disposition: form-data; name="%s"' % name)
form.append('')
form.append(value)
form.append(sep)
form[-1] += "--"
body = "\r\n".join(form) + "\r\n"
headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
}
return tw_client.getPage(url, method="POST", postdata=body,
headers=headers, followRedirect=False)
def GET_discard(self, urlpath, stall):
url = self.webish_url + urlpath + "?filename=dummy-get.out"
return discardPage(url, stall)
def _print_usage(self, res=None):
d = self.control_rref.callRemote("get_memory_usage")
def _print(stats):
print("VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
stats["VmPeak"]))
return stats
d.addCallback(_print)
return d
def _do_upload(self, res, size, files, uris):
name = '%d' % size
print()
print("uploading %s" % name)
if self.mode in ("upload", "upload-self"):
d = self.control_rref.callRemote("upload_random_data_from_file",
size,
convergence="check-memory")
elif self.mode == "upload-POST":
data = "a" * size
url = "/uri"
d = self.POST(url, t="upload", file=("%d.data" % size, data))
elif self.mode in ("receive",
"download", "download-GET", "download-GET-slow"):
# mode=receive: upload the data from a local peer, so that the
# client-under-test receives and stores the shares
#
# mode=download*: upload the data from a local peer, then have
# the client-under-test download it.
#
# we need to wait until the uploading node has connected to all
# peers, since the wait_for_client_connections() above doesn't
# pay attention to our self.nodes[] and their connections.
files[name] = self.create_data(name, size)
u = self.nodes[0].getServiceNamed("uploader")
d = self.nodes[0].debug_wait_for_client_connections(self.numnodes+1)
d.addCallback(lambda res:
u.upload(upload.FileName(files[name],
convergence="check-memory")))
d.addCallback(lambda results: results.get_uri())
else:
raise ValueError("unknown mode=%s" % self.mode)
def _complete(uri):
uris[name] = uri
print("uploaded %s" % name)
d.addCallback(_complete)
return d
def _do_download(self, res, size, uris):
if self.mode not in ("download", "download-GET", "download-GET-slow"):
return
name = '%d' % size
print("downloading %s" % name)
uri = uris[name]
if self.mode == "download":
d = self.control_rref.callRemote("download_to_tempfile_and_delete",
uri)
elif self.mode == "download-GET":
url = "/uri/%s" % uri
d = self.GET_discard(urllib.quote(url), stall=False)
elif self.mode == "download-GET-slow":
url = "/uri/%s" % uri
d = self.GET_discard(urllib.quote(url), stall=True)
def _complete(res):
print("downloaded %s" % name)
return res
d.addCallback(_complete)
return d
def do_test(self):
#print("CLIENT STARTED")
#print("FURL", self.control_furl)
#print("RREF", self.control_rref)
#print()
kB = 1000; MB = 1000*1000
files = {}
uris = {}
d = self._print_usage()
d.addCallback(self.stash_stats, "0B")
for i in range(10):
d.addCallback(self._do_upload, 10*kB+i, files, uris)
d.addCallback(self._do_download, 10*kB+i, uris)
d.addCallback(self._print_usage)
d.addCallback(self.stash_stats, "10kB")
for i in range(3):
d.addCallback(self._do_upload, 10*MB+i, files, uris)
d.addCallback(self._do_download, 10*MB+i, uris)
d.addCallback(self._print_usage)
d.addCallback(self.stash_stats, "10MB")
for i in range(1):
d.addCallback(self._do_upload, 50*MB+i, files, uris)
d.addCallback(self._do_download, 50*MB+i, uris)
d.addCallback(self._print_usage)
d.addCallback(self.stash_stats, "50MB")
#for i in range(1):
# d.addCallback(self._do_upload, 100*MB+i, files, uris)
# d.addCallback(self._do_download, 100*MB+i, uris)
# d.addCallback(self._print_usage)
#d.addCallback(self.stash_stats, "100MB")
#d.addCallback(self.stall)
def _done(res):
print("FINISHING")
d.addCallback(_done)
return d
def stall(self, res):
d = defer.Deferred()
reactor.callLater(5, d.callback, None)
return d
class ClientWatcher(protocol.ProcessProtocol, object):
ended = False
def outReceived(self, data):
print("OUT:", data)
def errReceived(self, data):
print("ERR:", data)
def processEnded(self, reason):
self.ended = reason
self.d.callback(None)
if __name__ == '__main__':
mode = "upload"
if len(sys.argv) > 1:
mode = sys.argv[1]
if sys.maxsize == 2147483647:
bits = "32"
elif sys.maxsize == 9223372036854775807:
bits = "64"
else:
bits = "?"
print("%s-bit system (sys.maxsize=%d)" % (bits, sys.maxsize))
# put the logfile and stats.out in _test_memory/ . These stick around.
# put the nodes and other files in _test_memory/test/ . These are
# removed each time we run.
sf = SystemFramework("_test_memory", mode)
sf.run()

View File

@ -1,234 +0,0 @@
from __future__ import print_function
import os, sys
from twisted.internet import reactor, defer
from twisted.python import log
from twisted.application import service
from foolscap.api import Tub, fireEventually
MB = 1000000
class SpeedTest(object):
DO_IMMUTABLE = True
DO_MUTABLE_CREATE = True
DO_MUTABLE = True
def __init__(self, test_client_dir):
#self.real_stderr = sys.stderr
log.startLogging(open("st.log", "a"), setStdout=False)
f = open(os.path.join(test_client_dir, "private", "control.furl"), "r")
self.control_furl = f.read().strip()
f.close()
self.base_service = service.MultiService()
self.failed = None
self.upload_times = {}
self.download_times = {}
def run(self):
print("STARTING")
d = fireEventually()
d.addCallback(lambda res: self.setUp())
d.addCallback(lambda res: self.do_test())
d.addBoth(self.tearDown)
def _err(err):
self.failed = err
log.err(err)
print(err)
d.addErrback(_err)
def _done(res):
reactor.stop()
return res
d.addBoth(_done)
reactor.run()
if self.failed:
print("EXCEPTION")
print(self.failed)
sys.exit(1)
def setUp(self):
self.base_service.startService()
self.tub = Tub()
self.tub.setOption("expose-remote-exception-types", False)
self.tub.setServiceParent(self.base_service)
d = self.tub.getReference(self.control_furl)
def _gotref(rref):
self.client_rref = rref
print("Got Client Control reference")
return self.stall(5)
d.addCallback(_gotref)
return d
def stall(self, delay, result=None):
d = defer.Deferred()
reactor.callLater(delay, d.callback, result)
return d
def record_times(self, times, key):
print("TIME (%s): %s up, %s down" % (key, times[0], times[1]))
self.upload_times[key], self.download_times[key] = times
def one_test(self, res, name, count, size, mutable):
# values for 'mutable':
# False (upload a different CHK file for each 'count')
# "create" (upload different contents into a new SSK file)
# "upload" (upload different contents into the same SSK file. The
# time consumed does not include the creation of the file)
d = self.client_rref.callRemote("speed_test", count, size, mutable)
d.addCallback(self.record_times, name)
return d
def measure_rtt(self, res):
# use RIClient.get_nodeid() to measure the foolscap-level RTT
d = self.client_rref.callRemote("measure_peer_response_time")
def _got(res):
assert len(res) # need at least one peer
times = res.values()
self.total_rtt = sum(times)
self.average_rtt = sum(times) / len(times)
self.max_rtt = max(times)
print("num-peers: %d" % len(times))
print("total-RTT: %f" % self.total_rtt)
print("average-RTT: %f" % self.average_rtt)
print("max-RTT: %f" % self.max_rtt)
d.addCallback(_got)
return d
def do_test(self):
print("doing test")
d = defer.succeed(None)
d.addCallback(self.one_test, "startup", 1, 1000, False) #ignore this one
d.addCallback(self.measure_rtt)
if self.DO_IMMUTABLE:
# immutable files
d.addCallback(self.one_test, "1x 200B", 1, 200, False)
d.addCallback(self.one_test, "10x 200B", 10, 200, False)
def _maybe_do_100x_200B(res):
if self.upload_times["10x 200B"] < 5:
print("10x 200B test went too fast, doing 100x 200B test")
return self.one_test(None, "100x 200B", 100, 200, False)
return
d.addCallback(_maybe_do_100x_200B)
d.addCallback(self.one_test, "1MB", 1, 1*MB, False)
d.addCallback(self.one_test, "10MB", 1, 10*MB, False)
def _maybe_do_100MB(res):
if self.upload_times["10MB"] > 30:
print("10MB test took too long, skipping 100MB test")
return
return self.one_test(None, "100MB", 1, 100*MB, False)
d.addCallback(_maybe_do_100MB)
if self.DO_MUTABLE_CREATE:
# mutable file creation
d.addCallback(self.one_test, "10x 200B SSK creation", 10, 200,
"create")
if self.DO_MUTABLE:
# mutable file upload/download
d.addCallback(self.one_test, "10x 200B SSK", 10, 200, "upload")
def _maybe_do_100x_200B_SSK(res):
if self.upload_times["10x 200B SSK"] < 5:
print("10x 200B SSK test went too fast, doing 100x 200B SSK")
return self.one_test(None, "100x 200B SSK", 100, 200,
"upload")
return
d.addCallback(_maybe_do_100x_200B_SSK)
d.addCallback(self.one_test, "1MB SSK", 1, 1*MB, "upload")
d.addCallback(self.calculate_speeds)
return d
def calculate_speeds(self, res):
# time = A*size+B
# we assume that A*200bytes is negligible
if self.DO_IMMUTABLE:
# upload
if "100x 200B" in self.upload_times:
B = self.upload_times["100x 200B"] / 100
else:
B = self.upload_times["10x 200B"] / 10
print("upload per-file time: %.3fs" % B)
print("upload per-file times-avg-RTT: %f" % (B / self.average_rtt))
print("upload per-file times-total-RTT: %f" % (B / self.total_rtt))
A1 = 1*MB / (self.upload_times["1MB"] - B) # in bytes per second
print("upload speed (1MB):", self.number(A1, "Bps"))
A2 = 10*MB / (self.upload_times["10MB"] - B)
print("upload speed (10MB):", self.number(A2, "Bps"))
if "100MB" in self.upload_times:
A3 = 100*MB / (self.upload_times["100MB"] - B)
print("upload speed (100MB):", self.number(A3, "Bps"))
# download
if "100x 200B" in self.download_times:
B = self.download_times["100x 200B"] / 100
else:
B = self.download_times["10x 200B"] / 10
print("download per-file time: %.3fs" % B)
print("download per-file times-avg-RTT: %f" % (B / self.average_rtt))
print("download per-file times-total-RTT: %f" % (B / self.total_rtt))
A1 = 1*MB / (self.download_times["1MB"] - B) # in bytes per second
print("download speed (1MB):", self.number(A1, "Bps"))
A2 = 10*MB / (self.download_times["10MB"] - B)
print("download speed (10MB):", self.number(A2, "Bps"))
if "100MB" in self.download_times:
A3 = 100*MB / (self.download_times["100MB"] - B)
print("download speed (100MB):", self.number(A3, "Bps"))
if self.DO_MUTABLE_CREATE:
# SSK creation
B = self.upload_times["10x 200B SSK creation"] / 10
print("create per-file time SSK: %.3fs" % B)
if self.DO_MUTABLE:
# upload SSK
if "100x 200B SSK" in self.upload_times:
B = self.upload_times["100x 200B SSK"] / 100
else:
B = self.upload_times["10x 200B SSK"] / 10
print("upload per-file time SSK: %.3fs" % B)
A1 = 1*MB / (self.upload_times["1MB SSK"] - B) # in bytes per second
print("upload speed SSK (1MB):", self.number(A1, "Bps"))
# download SSK
if "100x 200B SSK" in self.download_times:
B = self.download_times["100x 200B SSK"] / 100
else:
B = self.download_times["10x 200B SSK"] / 10
print("download per-file time SSK: %.3fs" % B)
A1 = 1*MB / (self.download_times["1MB SSK"] - B) # in bytes per
# second
print("download speed SSK (1MB):", self.number(A1, "Bps"))
def number(self, value, suffix=""):
scaling = 1
if value < 1:
fmt = "%1.2g%s"
elif value < 100:
fmt = "%.1f%s"
elif value < 1000:
fmt = "%d%s"
elif value < 1e6:
fmt = "%.2fk%s"; scaling = 1e3
elif value < 1e9:
fmt = "%.2fM%s"; scaling = 1e6
elif value < 1e12:
fmt = "%.2fG%s"; scaling = 1e9
elif value < 1e15:
fmt = "%.2fT%s"; scaling = 1e12
elif value < 1e18:
fmt = "%.2fP%s"; scaling = 1e15
else:
fmt = "huge! %g%s"
return fmt % (value / scaling, suffix)
def tearDown(self, res):
d = self.base_service.stopService()
d.addCallback(lambda ignored: res)
return d
if __name__ == '__main__':
test_client_dir = sys.argv[1]
st = SpeedTest(test_client_dir)
st.run()

View File

@ -1,20 +0,0 @@
#! /usr/bin/env python
from __future__ import print_function
from foolscap import Tub
from foolscap.eventual import eventually
import sys
from twisted.internet import reactor
def go():
t = Tub()
d = t.getReference(sys.argv[1])
d.addCallback(lambda rref: rref.callRemote("get_memory_usage"))
def _got(res):
print(res)
reactor.stop()
d.addCallback(_got)
eventually(go)
reactor.run()

View File

@ -0,0 +1 @@
The little-used "control port" has been removed from all node types.

View File

@ -40,7 +40,6 @@ from allmydata.storage.server import StorageServer
from allmydata import storage_client
from allmydata.immutable.upload import Uploader
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import (
hashutil, base32, pollmixin, log, idlib,
@ -283,7 +282,6 @@ def create_client_from_config(config, _client_factory=None, _introducer_factory=
config, tub_options, default_connection_handlers,
foolscap_connection_handlers, i2p_provider, tor_provider,
)
control_tub = node.create_control_tub()
introducer_clients = create_introducer_clients(config, main_tub, _introducer_factory)
storage_broker = create_storage_farm_broker(
@ -294,7 +292,6 @@ def create_client_from_config(config, _client_factory=None, _introducer_factory=
client = _client_factory(
config,
main_tub,
control_tub,
i2p_provider,
tor_provider,
introducer_clients,
@ -631,12 +628,12 @@ class _Client(node.Node, pollmixin.PollMixin):
"max_segment_size": DEFAULT_MAX_SEGMENT_SIZE,
}
def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider, introducer_clients,
def __init__(self, config, main_tub, i2p_provider, tor_provider, introducer_clients,
storage_farm_broker):
"""
Use :func:`allmydata.client.create_client` to instantiate one of these.
"""
node.Node.__init__(self, config, main_tub, control_tub, i2p_provider, tor_provider)
node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider)
self.started_timestamp = time.time()
self.logSource = "Client"
@ -648,7 +645,6 @@ class _Client(node.Node, pollmixin.PollMixin):
self.init_stats_provider()
self.init_secrets()
self.init_node_key()
self.init_control()
self._key_generator = KeyGenerator()
key_gen_furl = config.get_config("client", "key_generator.furl", None)
if key_gen_furl:
@ -985,12 +981,6 @@ class _Client(node.Node, pollmixin.PollMixin):
def get_history(self):
return self.history
def init_control(self):
c = ControlServer()
c.setServiceParent(self)
control_url = self.control_tub.registerReference(c)
self.config.write_private_config("control.furl", control_url + "\n")
def init_helper(self):
self.helper = Helper(self.config.get_config_path("helper"),
self.storage_broker, self._secret_holder,

View File

@ -1,273 +0,0 @@
"""Ported to Python 3.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from future.utils import PY2
if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
import os, time, tempfile
from zope.interface import implementer
from twisted.application import service
from twisted.internet import defer
from twisted.internet.interfaces import IConsumer
from foolscap.api import Referenceable
from allmydata.interfaces import RIControlClient, IFileNode
from allmydata.util import fileutil, mathutil
from allmydata.immutable import upload
from allmydata.mutable.publish import MutableData
from twisted.python import log
def get_memory_usage():
# this is obviously linux-specific
stat_names = (b"VmPeak",
b"VmSize",
#b"VmHWM",
b"VmData")
stats = {}
try:
with open("/proc/self/status", "rb") as f:
for line in f:
name, right = line.split(b":",2)
if name in stat_names:
assert right.endswith(b" kB\n")
right = right[:-4]
stats[name] = int(right) * 1024
except:
# Probably not on (a compatible version of) Linux
stats['VmSize'] = 0
stats['VmPeak'] = 0
return stats
def log_memory_usage(where=""):
stats = get_memory_usage()
log.msg("VmSize: %9d VmPeak: %9d %s" % (stats[b"VmSize"],
stats[b"VmPeak"],
where))
@implementer(IConsumer)
class FileWritingConsumer(object):
def __init__(self, filename):
self.done = False
self.f = open(filename, "wb")
def registerProducer(self, p, streaming):
if streaming:
p.resumeProducing()
else:
while not self.done:
p.resumeProducing()
def write(self, data):
self.f.write(data)
def unregisterProducer(self):
self.done = True
self.f.close()
@implementer(RIControlClient)
class ControlServer(Referenceable, service.Service):
def remote_wait_for_client_connections(self, num_clients):
return self.parent.debug_wait_for_client_connections(num_clients)
def remote_upload_random_data_from_file(self, size, convergence):
tempdir = tempfile.mkdtemp()
filename = os.path.join(tempdir, "data")
f = open(filename, "wb")
block = b"a" * 8192
while size > 0:
l = min(size, 8192)
f.write(block[:l])
size -= l
f.close()
uploader = self.parent.getServiceNamed("uploader")
u = upload.FileName(filename, convergence=convergence)
# XXX should pass reactor arg
d = uploader.upload(u)
d.addCallback(lambda results: results.get_uri())
def _done(uri):
os.remove(filename)
os.rmdir(tempdir)
return uri
d.addCallback(_done)
return d
def remote_download_to_tempfile_and_delete(self, uri):
tempdir = tempfile.mkdtemp()
filename = os.path.join(tempdir, "data")
filenode = self.parent.create_node_from_uri(uri, name=filename)
if not IFileNode.providedBy(filenode):
raise AssertionError("The URI does not reference a file.")
c = FileWritingConsumer(filename)
d = filenode.read(c)
def _done(res):
os.remove(filename)
os.rmdir(tempdir)
return None
d.addCallback(_done)
return d
def remote_speed_test(self, count, size, mutable):
assert size > 8
log.msg("speed_test: count=%d, size=%d, mutable=%s" % (count, size,
mutable))
st = SpeedTest(self.parent, count, size, mutable)
return st.run()
def remote_get_memory_usage(self):
return get_memory_usage()
def remote_measure_peer_response_time(self):
# I'd like to average together several pings, but I don't want this
# phase to take more than 10 seconds. Expect worst-case latency to be
# 300ms.
results = {}
sb = self.parent.get_storage_broker()
everyone = sb.get_connected_servers()
num_pings = int(mathutil.div_ceil(10, (len(everyone) * 0.3)))
everyone = list(everyone) * num_pings
d = self._do_one_ping(None, everyone, results)
return d
def _do_one_ping(self, res, everyone_left, results):
if not everyone_left:
return results
server = everyone_left.pop(0)
server_name = server.get_longname()
storage_server = server.get_storage_server()
start = time.time()
d = storage_server.get_buckets(b"\x00" * 16)
def _done(ignored):
stop = time.time()
elapsed = stop - start
if server_name in results:
results[server_name].append(elapsed)
else:
results[server_name] = [elapsed]
d.addCallback(_done)
d.addCallback(self._do_one_ping, everyone_left, results)
def _average(res):
averaged = {}
for server_name,times in results.items():
averaged[server_name] = sum(times) / len(times)
return averaged
d.addCallback(_average)
return d
class SpeedTest(object):
def __init__(self, parent, count, size, mutable):
self.parent = parent
self.count = count
self.size = size
self.mutable_mode = mutable
self.uris = {}
self.basedir = self.parent.config.get_config_path("_speed_test_data")
def run(self):
self.create_data()
d = self.do_upload()
d.addCallback(lambda res: self.do_download())
d.addBoth(self.do_cleanup)
d.addCallback(lambda res: (self.upload_time, self.download_time))
return d
def create_data(self):
fileutil.make_dirs(self.basedir)
for i in range(self.count):
s = self.size
fn = os.path.join(self.basedir, str(i))
if os.path.exists(fn):
os.unlink(fn)
f = open(fn, "wb")
f.write(os.urandom(8))
s -= 8
while s > 0:
chunk = min(s, 4096)
f.write(b"\x00" * chunk)
s -= chunk
f.close()
def do_upload(self):
d = defer.succeed(None)
def _create_slot(res):
d1 = self.parent.create_mutable_file(b"")
def _created(n):
self._n = n
d1.addCallback(_created)
return d1
if self.mutable_mode == "upload":
d.addCallback(_create_slot)
def _start(res):
self._start = time.time()
d.addCallback(_start)
def _record_uri(uri, i):
self.uris[i] = uri
def _upload_one_file(ignored, i):
if i >= self.count:
return
fn = os.path.join(self.basedir, str(i))
if self.mutable_mode == "create":
data = open(fn,"rb").read()
d1 = self.parent.create_mutable_file(data)
d1.addCallback(lambda n: n.get_uri())
elif self.mutable_mode == "upload":
data = open(fn,"rb").read()
d1 = self._n.overwrite(MutableData(data))
d1.addCallback(lambda res: self._n.get_uri())
else:
up = upload.FileName(fn, convergence=None)
d1 = self.parent.upload(up)
d1.addCallback(lambda results: results.get_uri())
d1.addCallback(_record_uri, i)
d1.addCallback(_upload_one_file, i+1)
return d1
d.addCallback(_upload_one_file, 0)
def _upload_done(ignored):
stop = time.time()
self.upload_time = stop - self._start
d.addCallback(_upload_done)
return d
def do_download(self):
start = time.time()
d = defer.succeed(None)
def _download_one_file(ignored, i):
if i >= self.count:
return
n = self.parent.create_node_from_uri(self.uris[i])
if not IFileNode.providedBy(n):
raise AssertionError("The URI does not reference a file.")
if n.is_mutable():
d1 = n.download_best_version()
else:
d1 = n.read(DiscardingConsumer())
d1.addCallback(_download_one_file, i+1)
return d1
d.addCallback(_download_one_file, 0)
def _download_done(ignored):
stop = time.time()
self.download_time = stop - start
d.addCallback(_download_done)
return d
def do_cleanup(self, res):
for i in range(self.count):
fn = os.path.join(self.basedir, str(i))
os.unlink(fn)
return res
@implementer(IConsumer)
class DiscardingConsumer(object):
def __init__(self):
self.done = False
def registerProducer(self, p, streaming):
if streaming:
p.resumeProducing()
else:
while not self.done:
p.resumeProducing()
def write(self, data):
pass
def unregisterProducer(self):
self.done = True

View File

@ -39,7 +39,6 @@ from allmydata.introducer.common import unsign_from_foolscap, \
from allmydata.node import read_config
from allmydata.node import create_node_dir
from allmydata.node import create_connection_handlers
from allmydata.node import create_control_tub
from allmydata.node import create_tub_options
from allmydata.node import create_main_tub
@ -88,12 +87,10 @@ def create_introducer(basedir=u"."):
config, tub_options, default_connection_handlers,
foolscap_connection_handlers, i2p_provider, tor_provider,
)
control_tub = create_control_tub()
node = _IntroducerNode(
config,
main_tub,
control_tub,
i2p_provider,
tor_provider,
)
@ -105,8 +102,8 @@ def create_introducer(basedir=u"."):
class _IntroducerNode(node.Node):
NODETYPE = "introducer"
def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider):
node.Node.__init__(self, config, main_tub, control_tub, i2p_provider, tor_provider)
def __init__(self, config, main_tub, i2p_provider, tor_provider):
node.Node.__init__(self, config, main_tub, i2p_provider, tor_provider)
self.init_introducer()
webport = self.get_config("node", "web.port", None)
if webport:

View File

@ -919,18 +919,6 @@ def create_main_tub(config, tub_options,
return tub
def create_control_tub():
"""
Creates a Foolscap Tub for use by the control port. This is a
localhost-only ephemeral Tub, with no control over the listening
port or location
"""
control_tub = Tub()
portnum = iputil.listenOnUnused(control_tub)
log.msg("Control Tub location set to 127.0.0.1:%s" % (portnum,))
return control_tub
class Node(service.MultiService):
"""
This class implements common functionality of both Client nodes and Introducer nodes.
@ -938,7 +926,7 @@ class Node(service.MultiService):
NODETYPE = "unknown NODETYPE"
CERTFILE = "node.pem"
def __init__(self, config, main_tub, control_tub, i2p_provider, tor_provider):
def __init__(self, config, main_tub, i2p_provider, tor_provider):
"""
Initialize the node with the given configuration. Its base directory
is the current directory by default.
@ -967,10 +955,6 @@ class Node(service.MultiService):
else:
self.nodeid = self.short_nodeid = None
self.control_tub = control_tub
if self.control_tub is not None:
self.control_tub.setServiceParent(self)
self.log("Node constructed. " + __full_version__)
iputil.increase_rlimits()

View File

@ -17,8 +17,7 @@ from __future__ import unicode_literals
# This should be useful for tests which want to examine and/or manipulate the
# uploaded shares, checker/verifier/repairer tests, etc. The clients have no
# Tubs, so it is not useful for tests that involve a Helper or the
# control.furl .
# Tubs, so it is not useful for tests that involve a Helper.
from future.utils import PY2
if PY2:
@ -251,7 +250,6 @@ def create_no_network_client(basedir):
client = _NoNetworkClient(
config,
main_tub=None,
control_tub=None,
i2p_provider=None,
tor_provider=None,
introducer_clients=[],
@ -274,8 +272,6 @@ class _NoNetworkClient(_Client): # type: ignore # tahoe-lafs/ticket/3573
pass
def init_introducer_client(self):
pass
def create_control_tub(self):
pass
def create_log_tub(self):
pass
def setup_logging(self):
@ -284,8 +280,6 @@ class _NoNetworkClient(_Client): # type: ignore # tahoe-lafs/ticket/3573
service.MultiService.startService(self)
def stopService(self):
return service.MultiService.stopService(self)
def init_control(self):
pass
def init_helper(self):
pass
def init_key_gen(self):

View File

@ -12,7 +12,7 @@ if PY2:
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, dict, list, object, range, max, min, str # noqa: F401
from past.builtins import chr as byteschr, long
from six import ensure_text, ensure_str
from six import ensure_text
import os, re, sys, time, json
@ -780,7 +780,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
d.addCallback(self._check_publish_private)
d.addCallback(self.log, "did _check_publish_private")
d.addCallback(self._test_web)
d.addCallback(self._test_control)
d.addCallback(self._test_cli)
# P now has four top-level children:
# P/personal/sekrit data
@ -1343,25 +1342,6 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
if line.startswith("CHK %s " % storage_index_s)]
self.failUnlessEqual(len(matching), 10)
def _test_control(self, res):
# exercise the remote-control-the-client foolscap interfaces in
# allmydata.control (mostly used for performance tests)
c0 = self.clients[0]
control_furl_file = c0.config.get_private_path("control.furl")
control_furl = ensure_str(open(control_furl_file, "r").read().strip())
# it doesn't really matter which Tub we use to connect to the client,
# so let's just use our IntroducerNode's
d = self.introducer.tub.getReference(control_furl)
d.addCallback(self._test_control2, control_furl_file)
return d
def _test_control2(self, rref, filename):
d = defer.succeed(None)
d.addCallback(lambda res: rref.callRemote("speed_test", 1, 200, False))
if sys.platform in ("linux2", "linux3"):
d.addCallback(lambda res: rref.callRemote("get_memory_usage"))
d.addCallback(lambda res: rref.callRemote("measure_peer_response_time"))
return d
def _test_cli(self, res):
# run various CLI commands (in a thread, since they use blocking
# network calls)

View File

@ -211,7 +211,7 @@ class IntroducerRootTests(SyncTestCase):
main_tub = Tub()
main_tub.listenOn(b"tcp:0")
main_tub.setLocation(b"tcp:127.0.0.1:1")
introducer_node = _IntroducerNode(config, main_tub, None, None, None)
introducer_node = _IntroducerNode(config, main_tub, None, None)
introducer_service = introducer_node.getServiceNamed("introducer")
for n in range(2):

11
tox.ini
View File

@ -206,17 +206,6 @@ commands =
flogtool --version
python misc/build_helpers/run-deprecations.py --package allmydata --warnings={env:TAHOE_LAFS_WARNINGS_LOG:_trial_temp/deprecation-warnings.log} trial {env:TAHOE_LAFS_TRIAL_ARGS:--rterrors} {posargs:allmydata}
[testenv:checkmemory]
commands =
rm -rf _test_memory
python src/allmydata/test/check_memory.py upload
python src/allmydata/test/check_memory.py upload-self
python src/allmydata/test/check_memory.py upload-POST
python src/allmydata/test/check_memory.py download
python src/allmydata/test/check_memory.py download-GET
python src/allmydata/test/check_memory.py download-GET-slow
python src/allmydata/test/check_memory.py receive
# Use 'tox -e docs' to check formatting and cross-references in docs .rst
# files. The published docs are built by code run over at readthedocs.org,
# which does not use this target (but does something similar).