check_memory: getting closer, now we have memusage numbers for uploads of 10kB and 10MB files

This commit is contained in:
Brian Warner 2007-05-29 17:39:39 -07:00
parent 04b649f971
commit ea78b4b605
6 changed files with 174 additions and 43 deletions

View File

@ -62,7 +62,7 @@ class Client(node.Node, Referenceable):
hotline_file = os.path.join(self.basedir,
self.SUICIDE_PREVENTION_HOTLINE_FILE)
if os.path.exists(hotline_file):
hotline = TimerService(5.0, self._check_hotline, hotline_file)
hotline = TimerService(1.0, self._check_hotline, hotline_file)
hotline.setServiceParent(self)
def _check_hotline(self, hotline_file):

View File

@ -3,11 +3,20 @@ from zope.interface import implements
from twisted.application import service
from foolscap import Referenceable
from allmydata.interfaces import RIControlClient
from allmydata.util import testutil
class ControlServer(Referenceable, service.Service):
class ControlServer(Referenceable, service.Service, testutil.PollMixin):
implements(RIControlClient)
def remote_wait_for_client_connections(self, num_clients):
def _check():
current_clients = list(self.parent.get_all_peerids())
return len(current_clients) >= num_clients
d = self.poll(_check, 0.5)
d.addCallback(lambda res: None)
return d
def remote_upload_from_file_to_uri(self, filename):
uploader = self.parent.getServiceNamed("uploader")
d = uploader.upload_filename(filename)

View File

@ -617,6 +617,12 @@ class NotCapableError(Exception):
"""You have tried to write to a read-only node."""
class RIControlClient(RemoteInterface):
def wait_for_client_connections(num_clients=int):
"""Do not return until we have connections to at least NUM_CLIENTS
storage servers.
"""
def upload_from_file_to_uri(filename=str):
"""Upload a file to the grid. This accepts a filename (which must be
absolute) that points to a file on the node's local disk. The node

View File

@ -6,10 +6,12 @@ from twisted.internet import defer, reactor, protocol, error
from twisted.application import service, internet
from allmydata import client, introducer_and_vdrive
from allmydata.scripts import runner
from foolscap.eventual import eventually, flushEventualQueue
from allmydata.util import testutil
import foolscap
from foolscap import eventual
from twisted.python import log
class SystemFramework:
class SystemFramework(testutil.PollMixin):
numnodes = 5
def __init__(self, basedir):
@ -21,35 +23,59 @@ class SystemFramework:
os.mkdir(basedir)
self.sparent = service.MultiService()
self.sparent.startService()
self.proc = None
self.tub = foolscap.Tub()
self.tub.setServiceParent(self.sparent)
def run(self):
log.startLogging(open(os.path.join(self.basedir, "log"), "w"))
d = defer.Deferred()
eventually(d.callback, None)
d.addCallback(lambda res: self.start())
d.addErrback(log.err)
log.startLogging(open(os.path.join(self.basedir, "log"), "w"),
setStdout=False)
#logfile = open(os.path.join(self.basedir, "log"), "w")
#flo = log.FileLogObserver(logfile)
#log.startLoggingWithObserver(flo.emit, setStdout=False)
d = eventual.fireEventually()
d.addCallback(lambda res: self.setUp())
d.addCallback(lambda res: self.do_test())
d.addBoth(self.tearDown)
def _err(err):
log.err(err)
print err
d.addErrback(_err)
d.addBoth(lambda res: reactor.stop())
reactor.run()
def start(self):
def setUp(self):
print "STARTING"
d = self.make_introducer_and_vdrive()
def _more(res):
self.make_nodes()
self.start_client()
return self.start_client()
d.addCallback(_more)
def _record_control_furl(control_furl):
self.control_furl = control_furl
print "OBTAINING '%s'" % (control_furl,)
return self.tub.getReference(self.control_furl)
d.addCallback(_record_control_furl)
def _record_control(control_rref):
self.control_rref = control_rref
return control_rref.callRemote("wait_for_client_connections",
self.numnodes+1)
d.addCallback(_record_control)
def _ready(res):
print "CLIENT READY"
d.addCallback(_ready)
return d
def tearDown(self):
os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
def tearDown(self, passthrough):
# the client node will shut down in a few seconds
#os.remove(os.path.join(self.clientdir, "suicide_prevention_hotline"))
log.msg("shutting down SystemTest services")
d = self.sparent.stopService()
d.addCallback(lambda res: flushEventualQueue())
def _done(res):
d1 = defer.Deferred()
reactor.callLater(self.DISCONNECT_DELAY, d1.callback, None)
return d1
d.addCallback(_done)
d = defer.succeed(None)
if self.proc:
d.addCallback(lambda res: self.kill_client())
d.addCallback(lambda res: self.sparent.stopService())
d.addCallback(lambda res: eventual.flushEventualQueue())
d.addCallback(lambda res: passthrough)
return d
def add_service(self, s):
@ -95,9 +121,10 @@ this file are ignored.
f.close()
def start_client(self):
# this returns a Deferred that fires with the client's control.furl
log.msg("MAKING CLIENT")
clientdir = self.clientdir = os.path.join(self.basedir, "client")
config = {'basedir': clientdir, 'quiet': False}
config = {'basedir': clientdir, 'quiet': True}
runner.create_client(config)
log.msg("DONE MAKING CLIENT")
f = open(os.path.join(clientdir, "introducer.furl"), "w")
@ -114,16 +141,104 @@ this file are ignored.
ts.setServiceParent(self.sparent)
pp = ClientWatcher()
self.proc_done = pp.d = defer.Deferred()
cmd = ["twistd", "-y", "client.tac"]
env = os.environ.copy()
self.proc = reactor.spawnProcess(pp, cmd[0], cmd, env, path=clientdir)
log.msg("CLIENT STARTED")
# now we wait for the client to get started. we're looking for the
# control.furl file to appear.
furl_file = os.path.join(clientdir, "control.furl")
def _check():
return os.path.exists(furl_file)
d = self.poll(_check, 0.1)
# once it exists, wait a moment before we read from it, just in case
# it hasn't finished writing the whole thing. Ideally control.furl
# would be created in some atomic fashion, or made non-readable until
# it's ready, but I can't think of an easy way to do that, and I
# think the chances that we'll observe a half-write are pretty low.
def _stall(res):
d2 = defer.Deferred()
reactor.callLater(0.1, d2.callback, None)
return d2
d.addCallback(_stall)
def _read(res):
f = open(furl_file, "r")
furl = f.read()
return furl.strip()
d.addCallback(_read)
return d
def kill_client(self):
# returns a Deferred that fires when the process exits. This may only
# be called once.
try:
self.proc.signalProcess("KILL")
except error.ProcessExitedAlready:
pass
return self.proc_done
def create_data(self, name, size):
filename = os.path.join(self.basedir, name + ".data")
f = open(filename, "wb")
block = "a" * 8192
while size > 0:
l = min(size, 8192)
f.write(block[:l])
size -= l
return filename
def do_test(self):
print "CLIENT STARTED"
print "FURL", self.control_furl
print "RREF", self.control_rref
print
kB = 1000; MB = 1000*1000
files = {}
uris = {}
control = self.control_rref
def _print_usage(res=None):
d = control.callRemote("get_memory_usage")
def _print(stats):
print "VmSize: %9d VmPeak: %9d" % (stats["VmSize"],
stats["VmPeak"])
d.addCallback(_print)
return d
def _do_upload(res, size):
name = '%d' % size
files[name] = self.create_data(name, size)
d = control.callRemote("upload_from_file_to_uri", files[name])
def _done(uri):
uris[name] = uri
print "uploaded %s" % name
d.addCallback(_done)
return d
d = _print_usage()
for i in range(10):
d.addCallback(_do_upload, size=10*kB+i)
d.addCallback(_print_usage)
for i in range(10):
d.addCallback(_do_upload, size=10*MB+i)
d.addCallback(_print_usage)
#d.addCallback(self.stall)
def _done(res):
print "FINISHING"
d.addCallback(_done)
return d
def stall(self, res):
d = defer.Deferred()
reactor.callLater(5, d.callback, None)
return d
class ClientWatcher(protocol.ProcessProtocol):
@ -131,12 +246,11 @@ class ClientWatcher(protocol.ProcessProtocol):
print "OUT:", data
def errReceived(self, data):
print "ERR:", data
def processEnded(self, reason):
self.d.callback(None)
if __name__ == '__main__':
sf = SystemFramework("_test_memory")
sf.run()
# add a config option that looks for a keepalive file, and if it disappears,
# shut down the node.

View File

@ -7,7 +7,7 @@ from foolscap import Tub, Referenceable
from foolscap.eventual import flushEventualQueue
from twisted.application import service
from allmydata.introducer import IntroducerClient, Introducer
from allmydata.util import idlib
from allmydata.util import idlib, testutil
class MyNode(Referenceable):
pass
@ -16,7 +16,7 @@ class LoggingMultiService(service.MultiService):
def log(self, msg):
pass
class TestIntroducer(unittest.TestCase):
class TestIntroducer(unittest.TestCase, testutil.PollMixin):
def setUp(self):
self.parent = LoggingMultiService()
self.parent.startService()
@ -28,22 +28,6 @@ class TestIntroducer(unittest.TestCase):
return d
def poll(self, check_f, pollinterval=0.01):
# Return a Deferred, then call check_f periodically until it returns
# True, at which point the Deferred will fire.. If check_f raises an
# exception, the Deferred will errback.
d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
return d
def _poll(self, res, check_f, pollinterval):
if check_f():
return True
d = defer.Deferred()
d.addCallback(self._poll, check_f, pollinterval)
reactor.callLater(pollinterval, d.callback, None)
return d
def test_create(self):
ic = IntroducerClient(None, "introducer", "myfurl")
def _ignore(nodeid, rref):

View File

@ -1,6 +1,6 @@
import os, signal, time
from twisted.internet import reactor
from twisted.internet import reactor, defer
class SignalMixin:
# This class is necessary for any code which wants to use Processes
@ -20,6 +20,24 @@ class SignalMixin:
if self.sigchldHandler:
signal.signal(signal.SIGCHLD, self.sigchldHandler)
class PollMixin:
def poll(self, check_f, pollinterval=0.01):
# Return a Deferred, then call check_f periodically until it returns
# True, at which point the Deferred will fire.. If check_f raises an
# exception, the Deferred will errback.
d = defer.maybeDeferred(self._poll, None, check_f, pollinterval)
return d
def _poll(self, res, check_f, pollinterval):
if check_f():
return True
d = defer.Deferred()
d.addCallback(self._poll, check_f, pollinterval)
reactor.callLater(pollinterval, d.callback, None)
return d
class TestMixin(SignalMixin):
def setUp(self, repeatable=False):
"""