mirror of
synced 2025-03-28 14:28:34 +00:00
490 lines
17 KiB
490 lines
17 KiB
#! /usr/bin/env python
Unit and system tests for tahoe-fuse.
# Note: It's always a SetupFailure, not a TestFailure if a webapi
# operation fails, because this does not indicate a fuse interface
# failure.
# TODO: Test mismatches between tahoe and fuse/posix. What about nodes
# with crazy names ('\0', unicode, '/', '..')? Huuuuge files?
# Huuuuge directories... As tahoe approaches production quality, it'd
# be nice if the fuse interface did so also by hardening against such cases.
# FIXME: This framework might be replaceable with twisted.trial,
# especially the "layer" design, which is a bit cumbersome when
# using recursion to manage multiple clients.
# FIXME: Identify all race conditions (hint: starting clients, versus
# using the grid fs).
import sys, os, shutil, unittest, subprocess
import tempfile, re, time, signal, random, httplib
import traceback
import tahoe_fuse
### Main flow control:
def main(args = sys.argv[1:]):
target = 'all'
if args:
if len(args) != 1:
raise SystemExit(Usage)
target = args[0]
if target not in ('all', 'unit', 'system'):
raise SystemExit(Usage)
if target in ('all', 'unit'):
if target in ('all', 'system'):
def run_unit_tests():
print 'Running Unit Tests.'
except SystemExit, se:
print 'Unit Tests complete.\n'
def run_system_test():
### System Testing:
class SystemTest (object):
def __init__(self):
# These members represent configuration:
self.fullcleanup = False # FIXME: Make this a commandline option.
# These members represent test state:
self.cliexec = None
self.testroot = None
# This test state is specific to the first client:
self.port = None
self.clientbase = None
## Top-level flow control:
# These "*_layer" methods call eachother in a linear fashion, using
# exception unwinding to do cleanup properly. Each "layer" invokes
# a deeper layer, and each layer does its own cleanup upon exit.
def run(self, fullcleanup = False):
If full_cleanup, delete all temporary state.
Else: If there is an error do not delete basedirs.
Set to False if you wish to analyze a failure.
self.fullcleanup = fullcleanup
print '\n*** Setting up system tests.'
failures, total = self.init_cli_layer()
print '\n*** System Tests complete: %d failed out of %d.' % (failures, total)
except self.SetupFailure, sfail:
print sfail
print '\n*** System Tests were not successfully completed.'
def init_cli_layer(self):
'''This layer finds the appropriate tahoe executable.'''
runtestpath = os.path.abspath(sys.argv[0])
path = runtestpath
for expectedname in ('runtests.py', 'fuse', 'contrib'):
path, name = os.path.split(path)
if name != expectedname:
reason = 'Unexpected test script path: %r\n'
reason += 'The system test script must be run from the source directory.'
raise self.SetupFailure(reason, runtestpath)
self.cliexec = os.path.join(path, 'bin', 'tahoe')
version = self.run_tahoe('--version')
print 'Using %r with version:\n%s' % (self.cliexec, version.rstrip())
return self.create_testroot_layer()
def create_testroot_layer(self):
print 'Creating test base directory.'
self.testroot = tempfile.mkdtemp(prefix='tahoe_fuse_test_')
return self.launch_introducer_layer()
if self.fullcleanup:
print 'Cleaning up test root directory.'
except Exception, e:
print 'Exception removing test root directory: %r' % (self.testroot, )
print 'Ignoring cleanup exception: %r' % (e,)
print 'Leaving test root directory: %r' % (self.testroot, )
def launch_introducer_layer(self):
print 'Launching introducer.'
introbase = os.path.join(self.testroot, 'introducer')
# NOTE: We assume if tahoe exits with non-zero status, no separate
# tahoe child process is still running.
createoutput = self.run_tahoe('create-introducer', '--basedir', introbase)
self.check_tahoe_output(createoutput, ExpectedCreationOutput, introbase)
startoutput = self.run_tahoe('start', '--basedir', introbase)
self.check_tahoe_output(startoutput, ExpectedStartOutput, introbase)
return self.launch_clients_layer(introbase)
print 'Stopping introducer node.'
TotalClientsNeeded = 3
def launch_clients_layer(self, introbase, clientnum = 1):
if clientnum > self.TotalClientsNeeded:
return self.create_test_dirnode_layer()
tmpl = 'Launching client %d of %d.'
print tmpl % (clientnum,
base = os.path.join(self.testroot, 'client_%d' % (clientnum,))
output = self.run_tahoe('create-client', '--basedir', base)
self.check_tahoe_output(output, ExpectedCreationOutput, base)
if clientnum == 1:
# The first client is special:
self.clientbase = base
self.port = random.randrange(1024, 2**15)
f = open(os.path.join(base, 'webport'), 'w')
f.write('tcp:%d:interface=\n' % self.port)
introfurl = os.path.join(introbase, 'introducer.furl')
self.polling_operation(lambda : os.path.isfile(introfurl))
shutil.copy(introfurl, base)
# NOTE: We assume if tahoe exist with non-zero status, no separate
# tahoe child process is still running.
startoutput = self.run_tahoe('start', '--basedir', base)
self.check_tahoe_output(startoutput, ExpectedStartOutput, base)
return self.launch_clients_layer(introbase, clientnum+1)
print 'Stopping client node %d.' % (clientnum,)
def create_test_dirnode_layer(self):
print 'Creating test dirnode.'
cap = self.create_dirnode()
f = open(os.path.join(self.clientbase, 'private', 'root_dir.cap'), 'w')
return self.mount_fuse_layer(cap)
def mount_fuse_layer(self, fusebasecap, fusepause=2.0):
print 'Mounting fuse interface.'
mp = os.path.join(self.testroot, 'mountpoint')
thispath = os.path.abspath(sys.argv[0])
thisdir = os.path.dirname(thispath)
fusescript = os.path.join(thisdir, 'tahoe_fuse.py')
proc = subprocess.Popen([fusescript,
'--basedir', self.clientbase])
# The mount is verified by the test_layer, but we sleep to
# avoid race conditions against the first few tests.
return self.run_test_layer(fusebasecap, mp)
print '\n*** Cleaning up system test'
if proc.poll() is None:
print 'Killing fuse interface.'
os.kill(proc.pid, signal.SIGTERM)
print 'Waiting for the fuse interface to exit.'
def run_test_layer(self, fbcap, mountpoint):
total = failures = 0
for name in sorted(dir(self)):
if name.startswith('test_'):
total += 1
print '\n*** Running test #%d: %s' % (total, name)
testcap = self.create_dirnode()
self.attach_node(fbcap, testcap, name)
method = getattr(self, name)
method(testcap, testdir = os.path.join(mountpoint, name))
print 'Test succeeded.'
except self.TestFailure, f:
print f
failures += 1
print 'Error in test code... Cleaning up.'
return (failures, total)
# Tests:
def test_directory_existence(self, testcap, testdir):
if not os.path.isdir(testdir):
raise self.TestFailure('Attached test directory not found: %r', testdir)
def test_empty_directory_listing(self, testcap, testdir):
listing = os.listdir(testdir)
if listing:
raise self.TestFailure('Expected empty directory, found: %r', listing)
def test_directory_listing(self, testcap, testdir):
names = []
filesizes = {}
for i in range(3):
fname = 'file_%d' % (i,)
body = 'Hello World #%d!' % (i,)
filesizes[fname] = len(body)
cap = self.webapi_call('PUT', '/uri', body)
self.attach_node(testcap, cap, fname)
dname = 'dir_%d' % (i,)
cap = self.create_dirnode()
self.attach_node(testcap, cap, dname)
listing = os.listdir(testdir)
if listing != names:
tmpl = 'Expected directory list containing %r but fuse gave %r'
raise self.TestFailure(tmpl, names, listing)
for file, size in filesizes.items():
st = os.stat(os.path.join(testdir, file))
if st.st_size != size:
tmpl = 'Expected %r size of %r but fuse returned %r'
raise self.TestFailure(tmpl, file, size, st.st_size)
def test_file_contents(self, testcap, testdir):
name = 'hw.txt'
body = 'Hello World!'
cap = self.webapi_call('PUT', '/uri', body)
self.attach_node(testcap, cap, name)
path = os.path.join(testdir, name)
found = open(path, 'r').read()
except Exception, err:
tmpl = 'Could not read file contents of %r: %r'
raise self.TestFailure(tmpl, path, err)
if found != body:
tmpl = 'Expected file contents %r but found %r'
raise self.TestFailure(tmpl, body, found)
# Utilities:
def run_tahoe(self, *args):
realargs = ('tahoe',) + args
status, output = gather_output(realargs, executable=self.cliexec)
if status != 0:
tmpl = 'The tahoe cli exited with nonzero status.\n'
tmpl += 'Executable: %r\n'
tmpl += 'Command arguments: %r\n'
tmpl += 'Exit status: %r\n'
tmpl += 'Output:\n%s\n[End of tahoe output.]\n'
raise self.SetupFailure(tmpl,
return output
def check_tahoe_output(self, output, expected, expdir):
m = re.match(expected, output, re.M)
if m is None:
tmpl = 'The output of tahoe did not match the expectation:\n'
tmpl += 'Expected regex: %s\n'
tmpl += 'Actual output: %r\n'
self.warn(tmpl, expected, output)
elif expdir != m.group('path'):
tmpl = 'The output of tahoe refers to an unexpected directory:\n'
tmpl += 'Expected directory: %r\n'
tmpl += 'Actual directory: %r\n'
self.warn(tmpl, expdir, m.group(1))
def stop_node(self, basedir):
self.run_tahoe('stop', '--basedir', basedir)
except Exception, e:
print 'Failed to stop tahoe node.'
print 'Ignoring cleanup exception:'
# Indent the exception description:
desc = str(e).rstrip()
print ' ' + desc.replace('\n', '\n ')
def webapi_call(self, method, path, body=None, **options):
if options:
path = path + '?' + ('&'.join(['%s=%s' % kv for kv in options.items()]))
conn = httplib.HTTPConnection('', self.port)
conn.request(method, path, body = body)
resp = conn.getresponse()
if resp.status != 200:
tmpl = 'A webapi operation failed.\n'
tmpl += 'Request: %r %r\n'
tmpl += 'Body:\n%s\n'
tmpl += 'Response:\nStatus %r\nBody:\n%s'
raise self.SetupFailure(tmpl,
method, path,
body or '',
resp.status, body)
return resp.read()
def create_dirnode(self):
return self.webapi_call('PUT', '/uri', t='mkdir').strip()
def attach_node(self, dircap, childcap, childname):
body = self.webapi_call('PUT',
'/uri/%s/%s' % (dircap, childname),
body = childcap,
t = 'uri',
replace = 'false')
assert body.strip() == childcap, `status, dircap, childcap, childname`
def polling_operation(self, operation, timeout = 10.0, pollinterval = 0.2):
totaltime = timeout # Fudging for edge-case SetupFailure description...
totalattempts = int(timeout / pollinterval)
starttime = time.time()
for attempt in range(totalattempts):
opstart = time.time()
result = operation()
except KeyboardInterrupt, e:
except Exception, e:
result = False
totaltime = time.time() - starttime
if result is not False:
#tmpl = '(Polling took over %.2f seconds.)'
#print tmpl % (totaltime,)
return result
elif totaltime > timeout:
opdelay = time.time() - opstart
realinterval = max(0., pollinterval - opdelay)
#tmpl = '(Poll attempt %d failed after %.2f seconds, sleeping %.2f seconds.)'
#print tmpl % (attempt+1, opdelay, realinterval)
tmpl = 'Timeout after waiting for creation of introducer.furl.\n'
tmpl += 'Waited %.2f seconds (%d polls).'
raise self.SetupFailure(tmpl, totaltime, attempt+1)
def warn(self, tmpl, *args):
print ('Test Warning: ' + tmpl) % args
# SystemTest Exceptions:
class Failure (Exception):
def __init__(self, tmpl, *args):
msg = self.Prefix + (tmpl % args)
Exception.__init__(self, msg)
class SetupFailure (Failure):
Prefix = 'Setup Failure - The test framework encountered an error:\n'
class TestFailure (Failure):
Prefix = 'TestFailure: '
### Unit Tests:
class TestUtilFunctions (unittest.TestCase):
'''Tests small stand-alone functions.'''
def test_canonicalize_cap(self):
iopairs = [('',
for input, output in iopairs:
result = tahoe_fuse.canonicalize_cap(input)
self.failUnlessEqual(output, result, 'input == %r' % (input,))
### Misc:
def gather_output(*args, **kwargs):
This expects the child does not require input and that it closes
stdout/err eventually.
p = subprocess.Popen(stdout = subprocess.PIPE,
stderr = subprocess.STDOUT,
output = p.stdout.read()
exitcode = p.wait()
return (exitcode, output)
ExpectedCreationOutput = r'(introducer|client) created in (?P<path>.*?)\n'
ExpectedStartOutput = r'STARTING (?P<path>.*?)\n(introducer|client) node probably started'
Usage = '''
Usage: %s [target]
Run tests for the given target.
target is one of: unit, system, or all
''' % (sys.argv[0],)
if __name__ == '__main__':