test_system.py: factor SystemTestMixin out of SystemTest

This commit is contained in:
Brian Warner 2008-07-25 15:33:49 -07:00
parent 10bbc740e9
commit 48c64b7a29
2 changed files with 136 additions and 220 deletions

View File

@ -4,7 +4,8 @@ from zope.interface import implements
from twisted.internet import defer
from twisted.python import failure
from twisted.application import service
from foolscap.eventual import flushEventualQueue
from foolscap import Tub
from foolscap.eventual import flushEventualQueue, fireEventually
from allmydata import uri, dirnode, client
from allmydata.introducer.server import IntroducerNode
from allmydata.interfaces import IURI, IMutableFileNode, IFileNode, \
@ -12,6 +13,9 @@ from allmydata.interfaces import IURI, IMutableFileNode, IFileNode, \
from allmydata.immutable import checker
from allmydata.immutable.encode import NotEnoughSharesError
from allmydata.util import log, testutil, fileutil
from allmydata.stats import PickleStatsGatherer
from allmydata.key_generator import KeyGeneratorService
def flush_but_dont_ignore(res):
d = flushEventualQueue()
@ -196,6 +200,12 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin,
def setUp(self):
self.sparent = service.MultiService()
self.sparent.startService()
self.stats_gatherer = None
self.stats_gatherer_furl = None
self.key_generator_svc = None
self.key_generator_furl = None
def tearDown(self):
log.msg("shutting down SystemTest services")
d = self.sparent.stopService()
@ -209,7 +219,8 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin,
s.setServiceParent(self.sparent)
return s
def set_up_nodes(self, NUMCLIENTS=5):
def set_up_nodes(self, NUMCLIENTS=5,
use_stats_gatherer=False, use_key_generator=False):
self.numclients = NUMCLIENTS
iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir):
@ -220,7 +231,51 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin,
iv = IntroducerNode(basedir=iv_dir)
self.introducer = self.add_service(iv)
d = self.introducer.when_tub_ready()
d.addCallback(self._get_introducer_web)
if use_stats_gatherer:
d.addCallback(self._set_up_stats_gatherer)
if use_key_generator:
d.addCallback(self._set_up_key_generator)
d.addCallback(self._set_up_nodes_2)
if use_stats_gatherer:
d.addCallback(self._grab_stats)
return d
def _get_introducer_web(self, res):
f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
self.introweb_url = f.read().strip()
f.close()
def _set_up_stats_gatherer(self, res):
statsdir = self.getdir("stats_gatherer")
fileutil.make_dirs(statsdir)
t = Tub()
self.add_service(t)
l = t.listenOn("tcp:0")
p = l.getPortnum()
t.setLocation("localhost:%d" % p)
self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
self.add_service(self.stats_gatherer)
self.stats_gatherer_furl = self.stats_gatherer.get_furl()
def _set_up_key_generator(self, res):
kgsdir = self.getdir("key_generator")
fileutil.make_dirs(kgsdir)
self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
self.key_generator_svc.key_generator.pool_size = 4
self.key_generator_svc.key_generator.pool_refresh_delay = 60
self.add_service(self.key_generator_svc)
d = fireEventually()
def check_for_furl():
return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
def get_furl(junk):
kgf = os.path.join(kgsdir, 'key_generator.furl')
self.key_generator_furl = file(kgf, 'rb').read().strip()
d.addCallback(get_furl)
return d
def _set_up_nodes_2(self, res):
@ -235,8 +290,17 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin,
if i == 0:
# client[0] runs a webserver and a helper, no key_generator
open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
open(os.path.join(basedir, "sizelimit"), "w").write("10GB\n")
if i == 3:
# client[3] runs a webserver and uses a helper, uses key_generator
open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
if self.key_generator_furl:
kgf = "%s\n" % (self.key_generator_furl,)
open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
if self.stats_gatherer_furl:
open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
# start client[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl).
@ -244,6 +308,14 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin,
self.clients.append(c)
d = c.when_tub_ready()
def _ready(res):
f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
helper_furl = f.read()
f.close()
self.helper_furl = helper_furl
f = open(os.path.join(basedirs[3],"helper.furl"), "w")
f.write(helper_furl)
f.close()
# this starts the rest of the clients
for i in range(1, self.numclients):
c = self.add_service(client.Client(basedir=basedirs[i]))
@ -257,9 +329,66 @@ class SystemTestMixin(testutil.SignalMixin, testutil.PollMixin,
l = self.clients[0].getServiceNamed("webish").listener
port = l._port.getHost().port
self.webish_url = "http://localhost:%d/" % port
# and the helper-using webport
l = self.clients[3].getServiceNamed("webish").listener
port = l._port.getHost().port
self.helper_webish_url = "http://localhost:%d/" % port
d.addCallback(_connected)
return d
def _grab_stats(self, res):
d = self.stats_gatherer.poll()
return d
def bounce_client(self, num):
c = self.clients[num]
d = c.disownServiceParent()
# I think windows requires a moment to let the connection really stop
# and the port number made available for re-use. TODO: examine the
# behavior, see if this is really the problem, see if we can do
# better than blindly waiting for a second.
d.addCallback(self.stall, 1.0)
def _stopped(res):
new_c = client.Client(basedir=self.getdir("client%d" % num))
self.clients[num] = new_c
self.add_service(new_c)
return new_c.when_tub_ready()
d.addCallback(_stopped)
d.addCallback(lambda res: self.wait_for_connections())
def _maybe_get_webport(res):
if num == 0:
# now find out where the web port was
l = self.clients[0].getServiceNamed("webish").listener
port = l._port.getHost().port
self.webish_url = "http://localhost:%d/" % port
d.addCallback(_maybe_get_webport)
return d
def add_extra_node(self, client_num, helper_furl=None,
add_to_sparent=False):
# usually this node is *not* parented to our self.sparent, so we can
# shut it down separately from the rest, to exercise the
# connection-lost code
basedir = self.getdir("client%d" % client_num)
if not os.path.isdir(basedir):
fileutil.make_dirs(basedir)
open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
if helper_furl:
f = open(os.path.join(basedir, "helper.furl") ,"w")
f.write(helper_furl+"\n")
f.close()
c = client.Client(basedir=basedir)
self.clients.append(c)
self.numclients += 1
if add_to_sparent:
c.setServiceParent(self.sparent)
else:
c.startService()
d = self.wait_for_connections()
d.addCallback(lambda res: c)
return d
def _check_connections(self):
for c in self.clients:
ic = c.introducer_client

View File

@ -6,26 +6,21 @@ from twisted.trial import unittest
from twisted.internet import defer
from twisted.internet import threads # CLI tests use deferToThread
from twisted.internet.error import ConnectionDone, ConnectionLost
from twisted.application import service
import allmydata
from allmydata import client, uri, storage, offloaded
from allmydata import uri, storage, offloaded
from allmydata.immutable import download, upload, filenode
from allmydata.introducer.server import IntroducerNode
from allmydata.util import fileutil, idlib, mathutil, testutil
from allmydata.util import idlib, mathutil
from allmydata.util import log, base32
from allmydata.scripts import runner, cli
from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
from allmydata.mutable.common import NotMutableError
from allmydata.mutable import layout as mutable_layout
from allmydata.stats import PickleStatsGatherer
from allmydata.key_generator import KeyGeneratorService
from foolscap.eventual import fireEventually
from foolscap import DeadReferenceError, Tub
from foolscap import DeadReferenceError
from twisted.python.failure import Failure
from twisted.web.client import getPage
from twisted.web.error import Error
from allmydata.test.common import SystemTestMixin, flush_but_dont_ignore
from allmydata.test.common import SystemTestMixin
LARGE_DATA = """
This is some data to publish to the virtual drive, which needs to be large
@ -46,215 +41,7 @@ class CountingDataUploadable(upload.Data):
return upload.Data.read(self, length)
class SystemTest(testutil.SignalMixin, testutil.PollMixin, testutil.StallMixin,
unittest.TestCase):
def setUp(self):
self.sparent = service.MultiService()
self.sparent.startService()
self.stats_gatherer = None
self.stats_gatherer_furl = None
self.key_generator_svc = None
self.key_generator_furl = None
def tearDown(self):
log.msg("shutting down SystemTest services")
d = self.sparent.stopService()
d.addBoth(flush_but_dont_ignore)
return d
def getdir(self, subdir):
return os.path.join(self.basedir, subdir)
def add_service(self, s):
s.setServiceParent(self.sparent)
return s
def set_up_nodes(self, NUMCLIENTS=5,
use_stats_gatherer=False, use_key_generator=False):
self.numclients = NUMCLIENTS
iv_dir = self.getdir("introducer")
if not os.path.isdir(iv_dir):
fileutil.make_dirs(iv_dir)
f = open(os.path.join(iv_dir, "webport"), "w")
f.write("tcp:0:interface=127.0.0.1\n")
f.close()
iv = IntroducerNode(basedir=iv_dir)
self.introducer = self.add_service(iv)
d = self.introducer.when_tub_ready()
d.addCallback(self._get_introducer_web)
if use_stats_gatherer:
d.addCallback(self._set_up_stats_gatherer)
if use_key_generator:
d.addCallback(self._set_up_key_generator)
d.addCallback(self._set_up_nodes_2)
if use_stats_gatherer:
d.addCallback(self._grab_stats)
return d
def _get_introducer_web(self, res):
f = open(os.path.join(self.getdir("introducer"), "node.url"), "r")
self.introweb_url = f.read().strip()
f.close()
def _set_up_stats_gatherer(self, res):
statsdir = self.getdir("stats_gatherer")
fileutil.make_dirs(statsdir)
t = Tub()
self.add_service(t)
l = t.listenOn("tcp:0")
p = l.getPortnum()
t.setLocation("localhost:%d" % p)
self.stats_gatherer = PickleStatsGatherer(t, statsdir, False)
self.add_service(self.stats_gatherer)
self.stats_gatherer_furl = self.stats_gatherer.get_furl()
def _set_up_key_generator(self, res):
kgsdir = self.getdir("key_generator")
fileutil.make_dirs(kgsdir)
self.key_generator_svc = KeyGeneratorService(kgsdir, display_furl=False)
self.key_generator_svc.key_generator.pool_size = 4
self.key_generator_svc.key_generator.pool_refresh_delay = 60
self.add_service(self.key_generator_svc)
d = fireEventually()
def check_for_furl():
return os.path.exists(os.path.join(kgsdir, 'key_generator.furl'))
d.addCallback(lambda junk: self.poll(check_for_furl, timeout=30))
def get_furl(junk):
kgf = os.path.join(kgsdir, 'key_generator.furl')
self.key_generator_furl = file(kgf, 'rb').read().strip()
d.addCallback(get_furl)
return d
def _set_up_nodes_2(self, res):
q = self.introducer
self.introducer_furl = q.introducer_url
self.clients = []
basedirs = []
for i in range(self.numclients):
basedir = self.getdir("client%d" % i)
basedirs.append(basedir)
fileutil.make_dirs(basedir)
if i == 0:
# client[0] runs a webserver and a helper, no key_generator
open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
open(os.path.join(basedir, "run_helper"), "w").write("yes\n")
open(os.path.join(basedir, "sizelimit"), "w").write("10GB\n")
if i == 3:
# client[3] runs a webserver and uses a helper, uses key_generator
open(os.path.join(basedir, "webport"), "w").write("tcp:0:interface=127.0.0.1")
if self.key_generator_furl:
kgf = "%s\n" % (self.key_generator_furl,)
open(os.path.join(basedir, "key_generator.furl"), "w").write(kgf)
open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
if self.stats_gatherer_furl:
open(os.path.join(basedir, "stats_gatherer.furl"), "w").write(self.stats_gatherer_furl)
# start client[0], wait for it's tub to be ready (at which point it
# will have registered the helper furl).
c = self.add_service(client.Client(basedir=basedirs[0]))
self.clients.append(c)
d = c.when_tub_ready()
def _ready(res):
f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
helper_furl = f.read()
f.close()
self.helper_furl = helper_furl
f = open(os.path.join(basedirs[3],"helper.furl"), "w")
f.write(helper_furl)
f.close()
# this starts the rest of the clients
for i in range(1, self.numclients):
c = self.add_service(client.Client(basedir=basedirs[i]))
self.clients.append(c)
log.msg("STARTING")
return self.wait_for_connections()
d.addCallback(_ready)
def _connected(res):
log.msg("CONNECTED")
# now find out where the web port was
l = self.clients[0].getServiceNamed("webish").listener
port = l._port.getHost().port
self.webish_url = "http://localhost:%d/" % port
# and the helper-using webport
l = self.clients[3].getServiceNamed("webish").listener
port = l._port.getHost().port
self.helper_webish_url = "http://localhost:%d/" % port
d.addCallback(_connected)
return d
def _grab_stats(self, res):
d = self.stats_gatherer.poll()
return d
def bounce_client(self, num):
c = self.clients[num]
d = c.disownServiceParent()
# I think windows requires a moment to let the connection really stop
# and the port number made available for re-use. TODO: examine the
# behavior, see if this is really the problem, see if we can do
# better than blindly waiting for a second.
d.addCallback(self.stall, 1.0)
def _stopped(res):
new_c = client.Client(basedir=self.getdir("client%d" % num))
self.clients[num] = new_c
self.add_service(new_c)
return new_c.when_tub_ready()
d.addCallback(_stopped)
d.addCallback(lambda res: self.wait_for_connections())
def _maybe_get_webport(res):
if num == 0:
# now find out where the web port was
l = self.clients[0].getServiceNamed("webish").listener
port = l._port.getHost().port
self.webish_url = "http://localhost:%d/" % port
d.addCallback(_maybe_get_webport)
return d
def add_extra_node(self, client_num, helper_furl=None,
add_to_sparent=False):
# usually this node is *not* parented to our self.sparent, so we can
# shut it down separately from the rest, to exercise the
# connection-lost code
basedir = self.getdir("client%d" % client_num)
if not os.path.isdir(basedir):
fileutil.make_dirs(basedir)
open(os.path.join(basedir, "introducer.furl"), "w").write(self.introducer_furl)
if helper_furl:
f = open(os.path.join(basedir, "helper.furl") ,"w")
f.write(helper_furl+"\n")
f.close()
c = client.Client(basedir=basedir)
self.clients.append(c)
self.numclients += 1
if add_to_sparent:
c.setServiceParent(self.sparent)
else:
c.startService()
d = self.wait_for_connections()
d.addCallback(lambda res: c)
return d
def _check_connections(self):
for c in self.clients:
ic = c.introducer_client
if not ic.connected_to_introducer():
return False
if len(ic.get_all_peerids()) != self.numclients:
return False
return True
def wait_for_connections(self, ignored=None):
# TODO: replace this with something that takes a list of peerids and
# fires when they've all been heard from, instead of using a count
# and a threshold
return self.poll(self._check_connections, timeout=200)
class SystemTest(SystemTestMixin, unittest.TestCase):
def test_connections(self):
self.basedir = "system/SystemTest/test_connections"