test_immutable.Test: rewrite to use NoNetworkGrid, now takes 2.7s not 97s

remove now-unused ShareManglingMixin
refs #1363
This commit is contained in:
Brian Warner 2011-08-01 10:44:44 -07:00
parent b07af5e1a2
commit 0605c77f08
2 changed files with 120 additions and 245 deletions

View File

@ -14,11 +14,9 @@ from allmydata.check_results import CheckResults, CheckAndRepairResults, \
DeepCheckResults, DeepCheckAndRepairResults
from allmydata.mutable.common import CorruptShareError
from allmydata.mutable.layout import unpack_header
from allmydata.storage.server import storage_index_to_dir
from allmydata.storage.mutable import MutableShareFile
from allmydata.util import hashutil, log, fileutil, pollmixin
from allmydata.util.assertutil import precondition
from allmydata.util.consumer import download_to_data
from allmydata.stats import StatsGathererService
from allmydata.key_generator import KeyGeneratorService
import allmydata.test.common_util as testutil
@ -917,152 +915,6 @@ N8L+bvLd4BU9g6hRS8b59lQ6GNjryx2bUnCVtLcey4Jd
TEST_DATA="\x02"*(immutable.upload.Uploader.URI_LIT_SIZE_THRESHOLD+1)
class ShareManglingMixin(SystemTestMixin):
def setUp(self):
# Set self.basedir to a temp dir which has the name of the current
# test method in its name.
self.basedir = self.mktemp()
d = defer.maybeDeferred(SystemTestMixin.setUp, self)
d.addCallback(lambda x: self.set_up_nodes())
def _upload_a_file(ignored):
cl0 = self.clients[0]
# We need multiple segments to test crypttext hash trees that are
# non-trivial (i.e. they have more than just one hash in them).
cl0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
# Tests that need to test servers of happiness using this should
# set their own value for happy -- the default (7) breaks stuff.
cl0.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
d2 = cl0.upload(immutable.upload.Data(TEST_DATA, convergence=""))
def _after_upload(u):
filecap = u.uri
self.n = self.clients[1].create_node_from_uri(filecap)
self.uri = uri.CHKFileURI.init_from_string(filecap)
return cl0.create_node_from_uri(filecap)
d2.addCallback(_after_upload)
return d2
d.addCallback(_upload_a_file)
def _stash_it(filenode):
self.filenode = filenode
d.addCallback(_stash_it)
return d
def find_all_shares(self, unused=None):
"""Locate shares on disk. Returns a dict that maps
(clientnum,sharenum) to a string that contains the share container
(copied directly from the disk, containing leases etc). You can
modify this dict and then call replace_shares() to modify the shares.
"""
shares = {} # k: (i, sharenum), v: data
for i, c in enumerate(self.clients):
sharedir = c.getServiceNamed("storage").sharedir
for (dirp, dirns, fns) in os.walk(sharedir):
for fn in fns:
try:
sharenum = int(fn)
except TypeError:
# Whoops, I guess that's not a share file then.
pass
else:
data = open(os.path.join(sharedir, dirp, fn), "rb").read()
shares[(i, sharenum)] = data
return shares
def replace_shares(self, newshares, storage_index):
"""Replace shares on disk. Takes a dictionary in the same form
as find_all_shares() returns."""
for i, c in enumerate(self.clients):
sharedir = c.getServiceNamed("storage").sharedir
for (dirp, dirns, fns) in os.walk(sharedir):
for fn in fns:
try:
sharenum = int(fn)
except TypeError:
# Whoops, I guess that's not a share file then.
pass
else:
pathtosharefile = os.path.join(sharedir, dirp, fn)
os.unlink(pathtosharefile)
for ((clientnum, sharenum), newdata) in newshares.iteritems():
if clientnum == i:
fullsharedirp=os.path.join(sharedir, storage_index_to_dir(storage_index))
fileutil.make_dirs(fullsharedirp)
wf = open(os.path.join(fullsharedirp, str(sharenum)), "wb")
wf.write(newdata)
wf.close()
def _delete_a_share(self, unused=None, sharenum=None):
""" Delete one share. """
shares = self.find_all_shares()
ks = shares.keys()
if sharenum is not None:
k = [ key for key in shares.keys() if key[1] == sharenum ][0]
else:
k = random.choice(ks)
del shares[k]
self.replace_shares(shares, storage_index=self.uri.get_storage_index())
return unused
def _corrupt_a_share(self, unused, corruptor_func, sharenum):
shares = self.find_all_shares()
ks = [ key for key in shares.keys() if key[1] == sharenum ]
assert ks, (shares.keys(), sharenum)
k = ks[0]
shares[k] = corruptor_func(shares[k])
self.replace_shares(shares, storage_index=self.uri.get_storage_index())
return corruptor_func
def _corrupt_all_shares(self, unused, corruptor_func):
""" All shares on disk will be corrupted by corruptor_func. """
shares = self.find_all_shares()
for k in shares.keys():
self._corrupt_a_share(unused, corruptor_func, k[1])
return corruptor_func
def _corrupt_a_random_share(self, unused, corruptor_func):
""" Exactly one share on disk will be corrupted by corruptor_func. """
shares = self.find_all_shares()
ks = shares.keys()
k = random.choice(ks)
self._corrupt_a_share(unused, corruptor_func, k[1])
return k[1]
def _count_reads(self):
sum_of_read_counts = 0
for thisclient in self.clients:
counters = thisclient.stats_provider.get_stats()['counters']
sum_of_read_counts += counters.get('storage_server.read', 0)
return sum_of_read_counts
def _count_allocates(self):
sum_of_allocate_counts = 0
for thisclient in self.clients:
counters = thisclient.stats_provider.get_stats()['counters']
sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
return sum_of_allocate_counts
def _count_writes(self):
sum_of_write_counts = 0
for thisclient in self.clients:
counters = thisclient.stats_provider.get_stats()['counters']
sum_of_write_counts += counters.get('storage_server.write', 0)
return sum_of_write_counts
def _download_and_check_plaintext(self, unused=None):
d = download_to_data(self.n)
def _after_download(result):
self.failUnlessEqual(result, TEST_DATA)
d.addCallback(_after_download)
return d
class ShouldFailMixin:
def shouldFail(self, expected_failure, which, substring,
callable, *args, **kwargs):

View File

@ -1,17 +1,20 @@
from allmydata.test import common
from allmydata.interfaces import NotEnoughSharesError
from allmydata.util.consumer import download_to_data
from allmydata import uri
from twisted.internet import defer
from twisted.trial import unittest
import random
from foolscap.api import eventually
from allmydata.util import log
from allmydata.immutable.downloader import finder
from twisted.trial import unittest
from twisted.internet import defer
import mock
from foolscap.api import eventually
from allmydata.test import common
from allmydata.test.no_network import GridTestMixin
from allmydata.test.common import TEST_DATA
from allmydata import uri
from allmydata.util import log
from allmydata.util.consumer import download_to_data
from allmydata.interfaces import NotEnoughSharesError
from allmydata.immutable.upload import Data
from allmydata.immutable.downloader import finder
class MockNode(object):
def __init__(self, check_reneging, check_fetch_failed):
@ -129,41 +132,76 @@ class TestShareFinder(unittest.TestCase):
return mocknode.when_finished()
class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase):
class Test(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
def startup(self, basedir):
self.basedir = basedir
self.set_up_grid(num_clients=2, num_servers=5)
c1 = self.g.clients[1]
# We need multiple segments to test crypttext hash trees that are
# non-trivial (i.e. they have more than just one hash in them).
c1.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
# Tests that need to test servers of happiness using this should
# set their own value for happy -- the default (7) breaks stuff.
c1.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
d = c1.upload(Data(TEST_DATA, convergence=""))
def _after_upload(ur):
self.uri = ur.uri
self.filenode = self.g.clients[0].create_node_from_uri(ur.uri)
return self.uri
d.addCallback(_after_upload)
return d
def _stash_shares(self, shares):
self.shares = shares
def _download_and_check_plaintext(self, ign=None):
num_reads = self._count_reads()
d = download_to_data(self.filenode)
def _after_download(result):
self.failUnlessEqual(result, TEST_DATA)
return self._count_reads() - num_reads
d.addCallback(_after_download)
return d
def _shuffled(self, num_shnums):
shnums = range(10)
random.shuffle(shnums)
return shnums[:num_shnums]
def _count_reads(self):
return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.read', 0)
for s in self.g.servers_by_number.values()])
def _count_allocates(self):
return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.allocate', 0)
for s in self.g.servers_by_number.values()])
def _count_writes(self):
return sum([s.stats_provider.get_stats() ['counters'].get('storage_server.write', 0)
for s in self.g.servers_by_number.values()])
def test_test_code(self):
# The following process of stashing the shares, running
# replace_shares, and asserting that the new set of shares equals the
# old is more to test this test code than to test the Tahoe code...
d = defer.succeed(None)
d.addCallback(self.find_all_shares)
stash = [None]
def _stash_it(res):
stash[0] = res
return res
d.addCallback(_stash_it)
d = self.startup("immutable/Test/code")
d.addCallback(self.copy_shares)
d.addCallback(self._stash_shares)
d.addCallback(self._download_and_check_plaintext)
# The following process of deleting 8 of the shares and asserting
# that you can't download it is more to test this test code than to
# test the Tahoe code...
def _then_delete_8(unused=None):
self.replace_shares(stash[0], storage_index=self.uri.get_storage_index())
for i in range(8):
self._delete_a_share()
def _then_delete_8(ign):
self.restore_all_shares(self.shares)
self.delete_shares_numbered(self.uri, range(8))
d.addCallback(_then_delete_8)
def _then_download(unused=None):
d2 = download_to_data(self.n)
def _after_download_callb(result):
self.fail() # should have gotten an errback instead
return result
def _after_download_errb(failure):
failure.trap(NotEnoughSharesError)
return None # success!
d2.addCallbacks(_after_download_callb, _after_download_errb)
return d2
d.addCallback(_then_download)
d.addCallback(lambda ign:
self.shouldFail(NotEnoughSharesError, "download-2",
"ran out of shares",
download_to_data, self.filenode))
return d
def test_download(self):
@ -171,55 +209,49 @@ class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase)
tested by test code in other modules, but this module is also going
to test some more specific things about immutable download.)
"""
d = defer.succeed(None)
before_download_reads = self._count_reads()
def _after_download(unused=None):
after_download_reads = self._count_reads()
#print before_download_reads, after_download_reads
self.failIf(after_download_reads-before_download_reads > 41,
(after_download_reads, before_download_reads))
d = self.startup("immutable/Test/download")
d.addCallback(self._download_and_check_plaintext)
def _after_download(ign):
num_reads = self._count_reads()
#print num_reads
self.failIf(num_reads > 41, num_reads)
d.addCallback(_after_download)
return d
def test_download_from_only_3_remaining_shares(self):
""" Test download after 7 random shares (of the 10) have been
removed."""
d = defer.succeed(None)
def _then_delete_7(unused=None):
for i in range(7):
self._delete_a_share()
before_download_reads = self._count_reads()
d.addCallback(_then_delete_7)
def _after_download(unused=None):
after_download_reads = self._count_reads()
#print before_download_reads, after_download_reads
self.failIf(after_download_reads-before_download_reads > 41, (after_download_reads, before_download_reads))
d = self.startup("immutable/Test/download_from_only_3_remaining_shares")
d.addCallback(lambda ign:
self.delete_shares_numbered(self.uri, range(7)))
d.addCallback(self._download_and_check_plaintext)
def _after_download(num_reads):
#print num_reads
self.failIf(num_reads > 41, num_reads)
d.addCallback(_after_download)
return d
def test_download_from_only_3_shares_with_good_crypttext_hash(self):
""" Test download after 7 random shares (of the 10) have had their
crypttext hash tree corrupted."""
d = defer.succeed(None)
def _then_corrupt_7(unused=None):
shnums = range(10)
random.shuffle(shnums)
for i in shnums[:7]:
self._corrupt_a_share(None, common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes, i)
#before_download_reads = self._count_reads()
d.addCallback(_then_corrupt_7)
d = self.startup("download_from_only_3_shares_with_good_crypttext_hash")
def _corrupt_7(ign):
c = common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes
self.corrupt_shares_numbered(self.uri, self._shuffled(7), c)
d.addCallback(_corrupt_7)
d.addCallback(self._download_and_check_plaintext)
return d
def test_download_abort_if_too_many_missing_shares(self):
""" Test that download gives up quickly when it realizes there aren't
enough shares out there."""
for i in range(8):
self._delete_a_share()
d = self.shouldFail(NotEnoughSharesError, "delete 8", None,
download_to_data, self.n)
d = self.startup("download_abort_if_too_many_missing_shares")
d.addCallback(lambda ign:
self.delete_shares_numbered(self.uri, range(8)))
d.addCallback(lambda ign:
self.shouldFail(NotEnoughSharesError, "delete 8",
"Last failure: None",
download_to_data, self.filenode))
# the new downloader pipelines a bunch of read requests in parallel,
# so don't bother asserting anything about the number of reads
return d
@ -229,39 +261,30 @@ class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase)
enough uncorrupted shares out there. It should be able to tell
because the corruption occurs in the sharedata version number, which
it checks first."""
d = defer.succeed(None)
def _then_corrupt_8(unused=None):
shnums = range(10)
random.shuffle(shnums)
for shnum in shnums[:8]:
self._corrupt_a_share(None, common._corrupt_sharedata_version_number, shnum)
d.addCallback(_then_corrupt_8)
d = self.startup("download_abort_if_too_many_corrupted_shares")
def _corrupt_8(ign):
c = common._corrupt_sharedata_version_number
self.corrupt_shares_numbered(self.uri, self._shuffled(8), c)
d.addCallback(_corrupt_8)
def _try_download(ign):
start_reads = self._count_reads()
d2 = self.shouldFail(NotEnoughSharesError, "corrupt 8",
"LayoutInvalid",
download_to_data, self.filenode)
def _check_numreads(ign):
num_reads = self._count_reads() - start_reads
#print num_reads
before_download_reads = self._count_reads()
def _attempt_to_download(unused=None):
d2 = download_to_data(self.n)
def _callb(res):
self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
def _errb(f):
self.failUnless(f.check(NotEnoughSharesError))
d2.addCallbacks(_callb, _errb)
# To pass this test, you are required to give up before
# reading all of the share data. Actually, we could give up
# sooner than 45 reads, but currently our download code does
# 45 reads. This test then serves as a "performance
# regression detector" -- if you change download code so that
# it takes *more* reads, then this test will fail.
self.failIf(num_reads > 45, num_reads)
d2.addCallback(_check_numreads)
return d2
d.addCallback(_attempt_to_download)
def _after_attempt(unused=None):
after_download_reads = self._count_reads()
#print before_download_reads, after_download_reads
# To pass this test, you are required to give up before reading
# all of the share data. Actually, we could give up sooner than
# 45 reads, but currently our download code does 45 reads. This
# test then serves as a "performance regression detector" -- if
# you change download code so that it takes *more* reads, then
# this test will fail.
self.failIf(after_download_reads-before_download_reads > 45,
(after_download_reads, before_download_reads))
d.addCallback(_after_attempt)
d.addCallback(_try_download)
return d