2007-03-29 21:01:28 +00:00
|
|
|
#! /usr/bin/env python
|
2007-03-09 01:12:24 +00:00
|
|
|
|
2007-07-17 01:08:55 +00:00
|
|
|
import os, shutil, sys
|
|
|
|
from cStringIO import StringIO
|
2007-03-12 23:28:37 +00:00
|
|
|
from twisted.internet import defer, reactor, protocol, error
|
2007-05-25 00:34:42 +00:00
|
|
|
from twisted.application import service, internet
|
2007-07-17 03:17:51 +00:00
|
|
|
from twisted.web.client import getPage
|
2007-04-30 16:57:52 +00:00
|
|
|
from allmydata import client, introducer_and_vdrive
|
2007-07-17 01:08:55 +00:00
|
|
|
from allmydata.scripts import create_node
|
2007-05-30 00:39:39 +00:00
|
|
|
from allmydata.util import testutil
|
|
|
|
import foolscap
|
|
|
|
from foolscap import eventual
|
2007-03-12 23:28:37 +00:00
|
|
|
from twisted.python import log
|
|
|
|
|
2007-05-30 00:39:39 +00:00
|
|
|
class SystemFramework(testutil.PollMixin):
|
2007-03-12 23:28:37 +00:00
|
|
|
numnodes = 5
|
|
|
|
|
2007-07-17 01:08:55 +00:00
|
|
|
def __init__(self, basedir, mode):
|
2007-03-12 23:28:37 +00:00
|
|
|
self.basedir = basedir = os.path.abspath(basedir)
|
|
|
|
if not basedir.startswith(os.path.abspath(".")):
|
|
|
|
raise AssertionError("safety issue: basedir must be a subdir")
|
|
|
|
if os.path.exists(basedir):
|
|
|
|
shutil.rmtree(basedir)
|
|
|
|
os.mkdir(basedir)
|
|
|
|
self.sparent = service.MultiService()
|
|
|
|
self.sparent.startService()
|
2007-05-30 00:39:39 +00:00
|
|
|
self.proc = None
|
|
|
|
self.tub = foolscap.Tub()
|
|
|
|
self.tub.setServiceParent(self.sparent)
|
2007-07-17 01:08:55 +00:00
|
|
|
self.discard_shares = True
|
|
|
|
self.mode = mode
|
2007-03-12 23:28:37 +00:00
|
|
|
|
|
|
|
def run(self):
|
2007-05-30 00:39:39 +00:00
|
|
|
log.startLogging(open(os.path.join(self.basedir, "log"), "w"),
|
|
|
|
setStdout=False)
|
|
|
|
#logfile = open(os.path.join(self.basedir, "log"), "w")
|
|
|
|
#flo = log.FileLogObserver(logfile)
|
|
|
|
#log.startLoggingWithObserver(flo.emit, setStdout=False)
|
|
|
|
d = eventual.fireEventually()
|
|
|
|
d.addCallback(lambda res: self.setUp())
|
|
|
|
d.addCallback(lambda res: self.do_test())
|
|
|
|
d.addBoth(self.tearDown)
|
|
|
|
def _err(err):
|
|
|
|
log.err(err)
|
|
|
|
print err
|
|
|
|
d.addErrback(_err)
|
|
|
|
d.addBoth(lambda res: reactor.stop())
|
2007-03-12 23:28:37 +00:00
|
|
|
reactor.run()
|
|
|
|
|
2007-05-30 00:39:39 +00:00
|
|
|
def setUp(self):
|
2007-07-17 01:08:55 +00:00
|
|
|
#print "STARTING"
|
|
|
|
self.stats = {}
|
|
|
|
self.statsfile = open(os.path.join(self.basedir, "stats.out"), "w")
|
2007-04-30 16:57:52 +00:00
|
|
|
d = self.make_introducer_and_vdrive()
|
2007-03-12 23:28:37 +00:00
|
|
|
def _more(res):
|
|
|
|
self.make_nodes()
|
2007-05-30 00:39:39 +00:00
|
|
|
return self.start_client()
|
2007-03-12 23:28:37 +00:00
|
|
|
d.addCallback(_more)
|
2007-05-30 00:39:39 +00:00
|
|
|
def _record_control_furl(control_furl):
|
|
|
|
self.control_furl = control_furl
|
2007-07-17 01:08:55 +00:00
|
|
|
#print "OBTAINING '%s'" % (control_furl,)
|
2007-05-30 00:39:39 +00:00
|
|
|
return self.tub.getReference(self.control_furl)
|
|
|
|
d.addCallback(_record_control_furl)
|
|
|
|
def _record_control(control_rref):
|
|
|
|
self.control_rref = control_rref
|
|
|
|
return control_rref.callRemote("wait_for_client_connections",
|
|
|
|
self.numnodes+1)
|
|
|
|
d.addCallback(_record_control)
|
|
|
|
def _ready(res):
|
2007-07-17 01:08:55 +00:00
|
|
|
#print "CLIENT READY"
|
|
|
|
pass
|
2007-05-30 00:39:39 +00:00
|
|
|
d.addCallback(_ready)
|
2007-03-12 23:28:37 +00:00
|
|
|
return d
|
|
|
|
|
2007-05-30 00:39:39 +00:00
|
|
|
def tearDown(self, passthrough):
|
2007-03-12 23:28:37 +00:00
|
|
|
# the client node will shut down in a few seconds
|
2007-05-30 00:39:39 +00:00
|
|
|
#os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
|
2007-03-12 23:28:37 +00:00
|
|
|
log.msg("shutting down SystemTest services")
|
2007-05-30 00:39:39 +00:00
|
|
|
d = defer.succeed(None)
|
|
|
|
if self.proc:
|
|
|
|
d.addCallback(lambda res: self.kill_client())
|
|
|
|
d.addCallback(lambda res: self.sparent.stopService())
|
|
|
|
d.addCallback(lambda res: eventual.flushEventualQueue())
|
2007-07-17 01:08:55 +00:00
|
|
|
def _close_statsfile(res):
|
|
|
|
self.statsfile.close()
|
|
|
|
d.addCallback(_close_statsfile)
|
2007-05-30 00:39:39 +00:00
|
|
|
d.addCallback(lambda res: passthrough)
|
2007-03-12 23:28:37 +00:00
|
|
|
return d
|
|
|
|
|
|
|
|
def add_service(self, s):
|
|
|
|
s.setServiceParent(self.sparent)
|
|
|
|
return s
|
|
|
|
|
2007-04-30 16:57:52 +00:00
|
|
|
def make_introducer_and_vdrive(self):
|
2007-05-25 00:34:42 +00:00
|
|
|
iv_basedir = os.path.join(self.basedir, "introducer_and_vdrive")
|
|
|
|
os.mkdir(iv_basedir)
|
|
|
|
iv = introducer_and_vdrive.IntroducerAndVdrive(basedir=iv_basedir)
|
|
|
|
self.introducer_and_vdrive = self.add_service(iv)
|
2007-04-30 16:57:52 +00:00
|
|
|
d = self.introducer_and_vdrive.when_tub_ready()
|
2007-03-12 23:28:37 +00:00
|
|
|
return d
|
|
|
|
|
|
|
|
def make_nodes(self):
|
2007-04-30 16:57:52 +00:00
|
|
|
q = self.introducer_and_vdrive
|
|
|
|
self.introducer_furl = q.urls["introducer"]
|
2007-07-17 02:34:52 +00:00
|
|
|
self.vdrive_furl = q.urls["vdrive"]
|
2007-03-12 23:28:37 +00:00
|
|
|
self.nodes = []
|
|
|
|
for i in range(self.numnodes):
|
|
|
|
nodedir = os.path.join(self.basedir, "node%d" % i)
|
|
|
|
os.mkdir(nodedir)
|
2007-03-27 23:12:11 +00:00
|
|
|
f = open(os.path.join(nodedir, "introducer.furl"), "w")
|
2007-04-30 16:57:52 +00:00
|
|
|
f.write(self.introducer_furl)
|
2007-03-27 23:12:11 +00:00
|
|
|
f.close()
|
|
|
|
f = open(os.path.join(nodedir, "vdrive.furl"), "w")
|
2007-07-17 02:34:52 +00:00
|
|
|
f.write(self.vdrive_furl)
|
2007-03-27 23:12:11 +00:00
|
|
|
f.close()
|
2007-07-17 01:08:55 +00:00
|
|
|
if self.discard_shares:
|
|
|
|
# for this test, we tell the storage servers to throw out all
|
|
|
|
# their stored data, since we're only testing upload and not
|
|
|
|
# download.
|
|
|
|
f = open(os.path.join(nodedir, "debug_no_storage"), "w")
|
|
|
|
f.write("no_storage\n")
|
|
|
|
f.close()
|
2007-03-12 23:28:37 +00:00
|
|
|
c = self.add_service(client.Client(basedir=nodedir))
|
|
|
|
self.nodes.append(c)
|
|
|
|
# the peers will start running, eventually they will connect to each
|
2007-04-30 16:57:52 +00:00
|
|
|
# other and the introducer_and_vdrive
|
2007-03-12 23:28:37 +00:00
|
|
|
|
2007-03-23 05:22:00 +00:00
|
|
|
def touch_keepalive(self):
|
|
|
|
f = open(self.keepalive_file, "w")
|
2007-05-25 00:34:42 +00:00
|
|
|
f.write("""\
|
|
|
|
If the node notices this file at startup, it will poll every 5 seconds and
|
|
|
|
terminate if the file is more than 10 seconds old, or if it has been deleted.
|
|
|
|
If the test harness has an internal failure and neglects to kill off the node
|
|
|
|
itself, this helps to avoid leaving processes lying around. The contents of
|
|
|
|
this file are ignored.
|
|
|
|
""")
|
2007-03-23 05:22:00 +00:00
|
|
|
f.close()
|
|
|
|
|
2007-03-12 23:28:37 +00:00
|
|
|
def start_client(self):
|
2007-05-30 00:39:39 +00:00
|
|
|
# this returns a Deferred that fires with the client's control.furl
|
2007-03-12 23:28:37 +00:00
|
|
|
log.msg("MAKING CLIENT")
|
|
|
|
clientdir = self.clientdir = os.path.join(self.basedir, "client")
|
2007-07-17 01:08:55 +00:00
|
|
|
quiet = StringIO()
|
|
|
|
create_node.create_client(clientdir, {}, out=quiet)
|
2007-03-12 23:28:37 +00:00
|
|
|
log.msg("DONE MAKING CLIENT")
|
2007-03-27 23:12:11 +00:00
|
|
|
f = open(os.path.join(clientdir, "introducer.furl"), "w")
|
2007-04-30 16:57:52 +00:00
|
|
|
f.write(self.introducer_furl + "\n")
|
2007-03-12 23:28:37 +00:00
|
|
|
f.close()
|
2007-05-25 00:34:42 +00:00
|
|
|
f = open(os.path.join(clientdir, "vdrive.furl"), "w")
|
2007-07-17 02:34:52 +00:00
|
|
|
f.write(self.vdrive_furl + "\n")
|
2007-05-25 00:34:42 +00:00
|
|
|
f.close()
|
2007-07-17 03:17:51 +00:00
|
|
|
f = open(os.path.join(clientdir, "webport"), "w")
|
|
|
|
# TODO: ideally we would set webport=0 and then ask the node what
|
|
|
|
# port it picked. But at the moment it is not convenient to do this,
|
|
|
|
# so we just pick a relatively unique one.
|
|
|
|
webport = max(os.getpid(), 2000)
|
|
|
|
f.write("tcp:%d:interface=127.0.0.1\n" % webport)
|
|
|
|
f.close()
|
|
|
|
self.webish_url = "http://localhost:%d" % webport
|
2007-07-17 01:08:55 +00:00
|
|
|
if self.discard_shares:
|
|
|
|
f = open(os.path.join(clientdir, "debug_no_storage"), "w")
|
|
|
|
f.write("no_storage\n")
|
|
|
|
f.close()
|
2007-08-10 01:30:24 +00:00
|
|
|
if self.mode == "upload-self":
|
|
|
|
f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
|
|
|
|
f.write("push_to_ourselves\n")
|
|
|
|
f.close()
|
2007-05-25 00:34:42 +00:00
|
|
|
self.keepalive_file = os.path.join(clientdir,
|
|
|
|
"suicide_prevention_hotline")
|
2007-03-23 05:22:00 +00:00
|
|
|
# now start updating the mtime.
|
2007-05-25 00:34:42 +00:00
|
|
|
self.touch_keepalive()
|
|
|
|
ts = internet.TimerService(4.0, self.touch_keepalive)
|
|
|
|
ts.setServiceParent(self.sparent)
|
2007-03-12 23:28:37 +00:00
|
|
|
|
|
|
|
pp = ClientWatcher()
|
2007-05-30 00:39:39 +00:00
|
|
|
self.proc_done = pp.d = defer.Deferred()
|
2007-03-12 23:28:37 +00:00
|
|
|
cmd = ["twistd", "-y", "client.tac"]
|
|
|
|
env = os.environ.copy()
|
|
|
|
self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
|
|
|
|
log.msg("CLIENT STARTED")
|
|
|
|
|
2007-05-30 00:39:39 +00:00
|
|
|
# now we wait for the client to get started. we're looking for the
|
|
|
|
# control.furl file to appear.
|
|
|
|
furl_file = os.path.join(clientdir, "control.furl")
|
|
|
|
def _check():
|
2007-09-15 03:16:57 +00:00
|
|
|
if pp.ended:
|
|
|
|
raise RuntimeError("process ended while waiting for startup")
|
2007-05-30 00:39:39 +00:00
|
|
|
return os.path.exists(furl_file)
|
|
|
|
d = self.poll(_check, 0.1)
|
|
|
|
# once it exists, wait a moment before we read from it, just in case
|
|
|
|
# it hasn't finished writing the whole thing. Ideally control.furl
|
|
|
|
# would be created in some atomic fashion, or made non-readable until
|
|
|
|
# it's ready, but I can't think of an easy way to do that, and I
|
|
|
|
# think the chances that we'll observe a half-write are pretty low.
|
|
|
|
def _stall(res):
|
|
|
|
d2 = defer.Deferred()
|
|
|
|
reactor.callLater(0.1, d2.callback, None)
|
|
|
|
return d2
|
|
|
|
d.addCallback(_stall)
|
|
|
|
def _read(res):
|
|
|
|
f = open(furl_file, "r")
|
|
|
|
furl = f.read()
|
|
|
|
return furl.strip()
|
|
|
|
d.addCallback(_read)
|
|
|
|
return d
|
|
|
|
|
|
|
|
|
2007-03-12 23:28:37 +00:00
|
|
|
def kill_client(self):
|
2007-05-30 00:39:39 +00:00
|
|
|
# returns a Deferred that fires when the process exits. This may only
|
|
|
|
# be called once.
|
2007-03-12 23:28:37 +00:00
|
|
|
try:
|
|
|
|
self.proc.signalProcess("KILL")
|
|
|
|
except error.ProcessExitedAlready:
|
|
|
|
pass
|
2007-05-30 00:39:39 +00:00
|
|
|
return self.proc_done
|
|
|
|
|
|
|
|
|
|
|
|
def create_data(self, name, size):
|
|
|
|
filename = os.path.join(self.basedir, name + ".data")
|
|
|
|
f = open(filename, "wb")
|
|
|
|
block = "a" * 8192
|
|
|
|
while size > 0:
|
|
|
|
l = min(size, 8192)
|
|
|
|
f.write(block[:l])
|
|
|
|
size -= l
|
|
|
|
return filename
|
|
|
|
|
2007-07-17 01:08:55 +00:00
|
|
|
def stash_stats(self, stats, name):
|
|
|
|
self.statsfile.write("%s %s: %d\n" % (self.mode, name, stats['VmPeak']))
|
|
|
|
self.stats[name] = stats['VmPeak']
|
|
|
|
|
2007-07-17 03:17:51 +00:00
|
|
|
def POST(self, urlpath, **fields):
|
|
|
|
url = self.webish_url + urlpath
|
|
|
|
sepbase = "boogabooga"
|
|
|
|
sep = "--" + sepbase
|
|
|
|
form = []
|
|
|
|
form.append(sep)
|
|
|
|
form.append('Content-Disposition: form-data; name="_charset"')
|
|
|
|
form.append('')
|
|
|
|
form.append('UTF-8')
|
|
|
|
form.append(sep)
|
|
|
|
for name, value in fields.iteritems():
|
|
|
|
if isinstance(value, tuple):
|
|
|
|
filename, value = value
|
|
|
|
form.append('Content-Disposition: form-data; name="%s"; '
|
|
|
|
'filename="%s"' % (name, filename))
|
|
|
|
else:
|
|
|
|
form.append('Content-Disposition: form-data; name="%s"' % name)
|
|
|
|
form.append('')
|
|
|
|
form.append(value)
|
|
|
|
form.append(sep)
|
|
|
|
form[-1] += "--"
|
|
|
|
body = "\r\n".join(form) + "\r\n"
|
|
|
|
headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
|
|
|
|
}
|
|
|
|
return getPage(url, method="POST", postdata=body,
|
|
|
|
headers=headers, followRedirect=False)
|
|
|
|
|
2007-05-30 00:39:39 +00:00
|
|
|
def do_test(self):
|
2007-07-17 01:08:55 +00:00
|
|
|
#print "CLIENT STARTED"
|
|
|
|
#print "FURL", self.control_furl
|
|
|
|
#print "RREF", self.control_rref
|
|
|
|
#print
|
2007-05-30 00:39:39 +00:00
|
|
|
kB = 1000; MB = 1000*1000
|
|
|
|
files = {}
|
|
|
|
uris = {}
|
|
|
|
control = self.control_rref
|
|
|
|
|
|
|
|
def _print_usage(res=None):
|
|
|
|
d = control.callRemote("get_memory_usage")
|
|
|
|
def _print(stats):
|
|
|
|
print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
|
|
|
|
stats["VmPeak"])
|
2007-07-17 01:08:55 +00:00
|
|
|
return stats
|
2007-05-30 00:39:39 +00:00
|
|
|
d.addCallback(_print)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _do_upload(res, size):
|
|
|
|
name = '%d' % size
|
2007-07-17 01:08:55 +00:00
|
|
|
print
|
|
|
|
print "uploading %s" % name
|
2007-08-10 01:30:24 +00:00
|
|
|
if self.mode in ("upload", "upload-self"):
|
2007-07-17 03:17:51 +00:00
|
|
|
files[name] = self.create_data(name, size)
|
|
|
|
d = control.callRemote("upload_from_file_to_uri", files[name])
|
|
|
|
def _done(uri):
|
|
|
|
os.remove(files[name])
|
|
|
|
del files[name]
|
|
|
|
return uri
|
|
|
|
d.addCallback(_done)
|
|
|
|
elif self.mode == "upload-POST":
|
|
|
|
data = "a" * size
|
|
|
|
url = "/vdrive/global"
|
|
|
|
d = self.POST(url, t="upload", file=("%d.data" % size, data))
|
|
|
|
else:
|
|
|
|
raise RuntimeError("unknown mode=%s" % self.mode)
|
|
|
|
def _complete(uri):
|
2007-05-30 00:39:39 +00:00
|
|
|
uris[name] = uri
|
|
|
|
print "uploaded %s" % name
|
2007-07-17 03:17:51 +00:00
|
|
|
d.addCallback(_complete)
|
2007-05-30 00:39:39 +00:00
|
|
|
return d
|
|
|
|
|
|
|
|
d = _print_usage()
|
2007-07-17 03:17:51 +00:00
|
|
|
d.addCallback(self.stash_stats, "0B")
|
2007-05-30 00:39:39 +00:00
|
|
|
|
|
|
|
for i in range(10):
|
|
|
|
d.addCallback(_do_upload, size=10*kB+i)
|
|
|
|
d.addCallback(_print_usage)
|
2007-07-17 01:08:55 +00:00
|
|
|
d.addCallback(self.stash_stats, "10kB")
|
2007-05-30 00:39:39 +00:00
|
|
|
|
2007-07-17 01:08:55 +00:00
|
|
|
for i in range(3):
|
2007-05-30 00:39:39 +00:00
|
|
|
d.addCallback(_do_upload, size=10*MB+i)
|
|
|
|
d.addCallback(_print_usage)
|
2007-07-17 01:08:55 +00:00
|
|
|
d.addCallback(self.stash_stats, "10MB")
|
|
|
|
|
2007-08-09 08:32:52 +00:00
|
|
|
for i in range(3):
|
|
|
|
d.addCallback(_do_upload, size=50*MB+i)
|
|
|
|
d.addCallback(_print_usage)
|
|
|
|
d.addCallback(self.stash_stats, "50MB")
|
2007-05-30 00:39:39 +00:00
|
|
|
|
2007-08-15 19:55:11 +00:00
|
|
|
#for i in range(1):
|
|
|
|
# d.addCallback(_do_upload, size=100*MB+i)
|
|
|
|
# d.addCallback(_print_usage)
|
|
|
|
#d.addCallback(self.stash_stats, "100MB")
|
2007-08-09 18:30:33 +00:00
|
|
|
|
2007-05-30 00:39:39 +00:00
|
|
|
#d.addCallback(self.stall)
|
|
|
|
def _done(res):
|
|
|
|
print "FINISHING"
|
|
|
|
d.addCallback(_done)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def stall(self, res):
|
|
|
|
d = defer.Deferred()
|
|
|
|
reactor.callLater(5, d.callback, None)
|
|
|
|
return d
|
2007-03-12 23:28:37 +00:00
|
|
|
|
|
|
|
|
|
|
|
class ClientWatcher(protocol.ProcessProtocol):
|
2007-09-15 03:16:57 +00:00
|
|
|
ended = False
|
2007-03-12 23:28:37 +00:00
|
|
|
def outReceived(self, data):
|
|
|
|
print "OUT:", data
|
|
|
|
def errReceived(self, data):
|
|
|
|
print "ERR:", data
|
2007-05-30 00:39:39 +00:00
|
|
|
def processEnded(self, reason):
|
2007-09-15 03:16:57 +00:00
|
|
|
print "PROCESSENDED", reason
|
|
|
|
self.ended = True
|
2007-05-30 00:39:39 +00:00
|
|
|
self.d.callback(None)
|
2007-03-12 23:28:37 +00:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
2007-07-17 01:08:55 +00:00
|
|
|
mode = "upload"
|
|
|
|
if len(sys.argv) > 1:
|
|
|
|
mode = sys.argv[1]
|
|
|
|
sf = SystemFramework("_test_memory", mode)
|
2007-03-12 23:28:37 +00:00
|
|
|
sf.run()
|
|
|
|
|