test_repairer: change Repairer to use much-faster no_network.GridTestMixin. As a side-effect, fix what I think was a bug: some of the assert-minimal-effort-expended checks were mixing write counts and allocate counts

This commit is contained in:
Brian Warner 2009-02-23 17:42:27 -07:00
parent cc3c1ae8df
commit 2be729b1e4
2 changed files with 194 additions and 154 deletions

View File

@ -264,3 +264,15 @@ class GridTestMixin:
pass
return sorted(shares)
def delete_share(self, (shnum, serverid, sharefile)):
os.unlink(sharefile)
def delete_shares_numbered(self, uri, shnums):
for (i_shnum, i_serverid, i_sharefile) in self.find_shares(uri):
if i_shnum in shnums:
os.unlink(i_sharefile)
def corrupt_share(self, (shnum, serverid, sharefile), corruptor_function):
sharedata = open(sharefile, "rb").read()
corruptdata = corruptor_function(sharedata)
open(sharefile, "wb").write(corruptdata)

View File

@ -2,15 +2,16 @@ from allmydata.test import common
from allmydata.monitor import Monitor
from allmydata import check_results
from allmydata.interfaces import NotEnoughSharesError
from allmydata.immutable import repairer
from allmydata.immutable import repairer, upload
from twisted.internet import defer
from twisted.trial import unittest
import random
from no_network import GridTestMixin
# We'll allow you to pass this test even if you trigger eighteen times as
# many disk reads and block fetches as would be optimal.
READ_LEEWAY = 18
DELTA_READS = 10 * READ_LEEWAY # N = 10
MAX_DELTA_READS = 10 * READ_LEEWAY # N = 10
class Verifier(common.ShareManglingMixin, unittest.TestCase):
def test_check_without_verify(self):
@ -66,7 +67,7 @@ class Verifier(common.ShareManglingMixin, unittest.TestCase):
d2 = self.filenode.check(Monitor(), verify=True)
def _after_check(checkresults):
after_check_reads = self._count_reads()
self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads))
self.failIf(after_check_reads - before_check_reads > MAX_DELTA_READS, (after_check_reads, before_check_reads))
try:
return judgement_func(checkresults)
except Exception, le:
@ -431,184 +432,207 @@ class DownUpConnector(unittest.TestCase):
d.addCallback(_callb)
return d
class Repairer(common.ShareManglingMixin, unittest.TestCase):
class Repairer(GridTestMixin, unittest.TestCase, common.ShouldFailMixin):
def upload_and_stash(self):
c0 = self.g.clients[0]
c1 = self.g.clients[1]
c0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
d = c0.upload(upload.Data(common.TEST_DATA, convergence=""))
def _stash_uri(ur):
self.uri = ur.uri
self.c0_filenode = c0.create_node_from_uri(ur.uri)
self.c1_filenode = c1.create_node_from_uri(ur.uri)
d.addCallback(_stash_uri)
return d
def test_test_code(self):
# The following process of stashing the shares, running
# replace_shares, and asserting that the new set of shares equals the
# old is more to test this test code than to test the Tahoe code...
d = defer.succeed(None)
d.addCallback(self.find_shares)
stash = [None]
def _stash_it(res):
stash[0] = res
return res
d.addCallback(_stash_it)
d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
# This test is actually to make sure our test harness works, rather
# than testing anything about Tahoe code itself.
def _compare(res):
oldshares = stash[0]
self.failUnless(isinstance(oldshares, dict), oldshares)
self.failUnlessEqual(oldshares, res)
self.basedir = "repairer/Repairer/test_code"
self.set_up_grid(num_clients=2)
d = self.upload_and_stash()
d.addCallback(self.find_shares)
d.addCallback(lambda ignored: self.find_shares(self.uri))
def _stash_shares(oldshares):
self.oldshares = oldshares
d.addCallback(_stash_shares)
d.addCallback(lambda ignored: self.find_shares(self.uri))
def _compare(newshares):
self.failUnlessEqual(newshares, self.oldshares)
d.addCallback(_compare)
d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
d.addCallback(self.find_shares)
d.addCallback(lambda x: self.failUnlessEqual(x, {}))
def _delete_8(ignored):
shnum = self.oldshares[0][0]
self.delete_shares_numbered(self.uri, [shnum])
for sh in self.oldshares[1:8]:
self.delete_share(sh)
d.addCallback(_delete_8)
d.addCallback(lambda ignored: self.find_shares(self.uri))
d.addCallback(lambda shares: self.failUnlessEqual(len(shares), 2))
# The following process of deleting 8 of the shares and asserting
# that you can't download it is more to test this test code than to
# test the Tahoe code...
def _then_delete_8(unused=None):
self.replace_shares(stash[0], storage_index=self.uri.storage_index)
for i in range(8):
self._delete_a_share()
d.addCallback(_then_delete_8)
d.addCallback(lambda ignored:
self.shouldFail(NotEnoughSharesError, "then_download",
None,
self.c1_filenode.download_to_data))
def _then_download(unused=None):
self.downloader = self.clients[1].getServiceNamed("downloader")
d = self.downloader.download_to_data(self.uri)
repair_monitor = Monitor()
d.addCallback(lambda ignored:
self.shouldFail(NotEnoughSharesError, "then_repair",
None,
self.c1_filenode.check_and_repair,
repair_monitor, verify=False))
def _after_download_callb(result):
self.fail() # should have gotten an errback instead
return result
def _after_download_errb(failure):
failure.trap(NotEnoughSharesError)
return None # success!
d.addCallbacks(_after_download_callb, _after_download_errb)
d.addCallback(_then_download)
# test share corruption
def _test_corrupt(ignored):
olddata = {}
shares = self.find_shares(self.uri)
for (shnum, serverid, sharefile) in shares:
olddata[ (shnum, serverid) ] = open(sharefile, "rb").read()
for sh in shares:
self.corrupt_share(sh, common._corrupt_uri_extension)
for (shnum, serverid, sharefile) in shares:
newdata = open(sharefile, "rb").read()
self.failIfEqual(olddata[ (shnum, serverid) ], newdata)
d.addCallback(_test_corrupt)
# The following process of deleting 8 of the shares and asserting
# that you can't repair it is more to test this test code than to
# test the Tahoe code...
d.addCallback(_then_delete_8)
def _then_repair(unused=None):
d2 = self.filenode.check_and_repair(Monitor(), verify=False)
def _after_repair_callb(result):
self.fail() # should have gotten an errback instead
return result
def _after_repair_errb(f):
f.trap(NotEnoughSharesError)
return None # success!
d2.addCallbacks(_after_repair_callb, _after_repair_errb)
return d2
d.addCallback(_then_repair)
def _remove_all(shares):
for sh in self.find_shares(self.uri):
self.delete_share(sh)
d.addCallback(_remove_all)
d.addCallback(lambda ignored: self.find_shares(self.uri))
d.addCallback(lambda shares: self.failUnlessEqual(shares, []))
return d
def failUnlessIsInstance(self, x, xtype):
self.failUnless(isinstance(x, xtype), x)
def _count_reads(self):
sum_of_read_counts = 0
for (i, ss, storedir) in self.iterate_servers():
counters = ss.stats_provider.get_stats()['counters']
sum_of_read_counts += counters.get('storage_server.read', 0)
return sum_of_read_counts
def _count_allocates(self):
sum_of_allocate_counts = 0
for (i, ss, storedir) in self.iterate_servers():
counters = ss.stats_provider.get_stats()['counters']
sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
return sum_of_allocate_counts
def _count_writes(self):
sum_of_write_counts = 0
for (i, ss, storedir) in self.iterate_servers():
counters = ss.stats_provider.get_stats()['counters']
sum_of_write_counts += counters.get('storage_server.write', 0)
return sum_of_write_counts
def _stash_counts(self):
self.before_repair_reads = self._count_reads()
self.before_repair_allocates = self._count_allocates()
self.before_repair_writes = self._count_writes()
def _get_delta_counts(self):
delta_reads = self._count_reads() - self.before_repair_reads
delta_allocates = self._count_allocates() - self.before_repair_allocates
delta_writes = self._count_writes() - self.before_repair_writes
return (delta_reads, delta_allocates, delta_writes)
def failIfBigger(self, x, y):
self.failIf(x > y, "%s > %s" % (x, y))
def test_repair_from_deletion_of_1(self):
""" Repair replaces a share that got deleted. """
d = defer.succeed(None)
d.addCallback(self._delete_a_share, sharenum=2)
self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
self.set_up_grid(num_clients=2)
d = self.upload_and_stash()
def _repair_from_deletion_of_1(unused):
before_repair_reads = self._count_reads()
before_repair_allocates = self._count_writes()
d.addCallback(lambda ignored:
self.delete_shares_numbered(self.uri, [2]))
d.addCallback(lambda ignored: self._stash_counts())
d.addCallback(lambda ignored:
self.c0_filenode.check_and_repair(Monitor(),
verify=False))
def _check_results(crr):
self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
pre = crr.get_pre_repair_results()
self.failUnlessIsInstance(pre, check_results.CheckResults)
post = crr.get_post_repair_results()
self.failUnlessIsInstance(post, check_results.CheckResults)
delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
self.failIfBigger(delta_reads, MAX_DELTA_READS)
self.failIfBigger(delta_allocates, DELTA_WRITES_PER_SHARE)
self.failIf(pre.is_healthy())
self.failUnless(post.is_healthy())
d2 = self.filenode.check_and_repair(Monitor(), verify=False)
def _after_repair(checkandrepairresults):
assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
prerepairres = checkandrepairresults.get_pre_repair_results()
assert isinstance(prerepairres, check_results.CheckResults), prerepairres
postrepairres = checkandrepairresults.get_post_repair_results()
assert isinstance(postrepairres, check_results.CheckResults), postrepairres
after_repair_reads = self._count_reads()
after_repair_allocates = self._count_writes()
# Now we inspect the filesystem to make sure that it has 10
# shares.
shares = self.find_shares(self.uri)
self.failIf(len(shares) < 10)
d.addCallback(_check_results)
# print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
self.failIf(after_repair_allocates - before_repair_allocates > DELTA_WRITES_PER_SHARE, (after_repair_allocates, before_repair_allocates))
self.failIf(prerepairres.is_healthy())
self.failUnless(postrepairres.is_healthy())
d.addCallback(lambda ignored:
self.c0_filenode.check(Monitor(), verify=True))
d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
# Now we inspect the filesystem to make sure that it has 10
# shares.
shares = self.find_shares()
self.failIf(len(shares) < 10)
# Now we delete seven of the other shares, then try to download the
# file and assert that it succeeds at downloading and has the right
# contents. This can't work unless it has already repaired the
# previously-deleted share #2.
# Now we assert that the verifier reports the file as healthy.
d3 = self.filenode.check(Monitor(), verify=True)
def _after_verify(verifyresults):
self.failUnless(verifyresults.is_healthy())
d3.addCallback(_after_verify)
# Now we delete seven of the other shares, then try to
# download the file and assert that it succeeds at
# downloading and has the right contents. This can't work
# unless it has already repaired the previously-deleted share
# #2.
def _then_delete_7_and_try_a_download(unused=None):
for sharenum in range(3, 10):
self._delete_a_share(sharenum=sharenum)
return self._download_and_check_plaintext()
d3.addCallback(_then_delete_7_and_try_a_download)
return d3
d2.addCallback(_after_repair)
return d2
d.addCallback(_repair_from_deletion_of_1)
d.addCallback(lambda ignored:
self.delete_shares_numbered(self.uri, range(3, 10+1)))
d.addCallback(lambda ignored: self.c1_filenode.download_to_data())
d.addCallback(lambda newdata:
self.failUnlessEqual(newdata, common.TEST_DATA))
return d
def test_repair_from_deletion_of_7(self):
""" Repair replaces seven shares that got deleted. """
shares = self.find_shares()
self.failIf(len(shares) != 10)
d = defer.succeed(None)
self.basedir = "repairer/Repairer/repair_from_deletion_of_1"
self.set_up_grid(num_clients=2)
d = self.upload_and_stash()
d.addCallback(lambda ignored:
self.delete_shares_numbered(self.uri, range(7)))
d.addCallback(lambda ignored: self._stash_counts())
d.addCallback(lambda ignored:
self.c0_filenode.check_and_repair(Monitor(),
verify=False))
def _check_results(crr):
self.failUnlessIsInstance(crr, check_results.CheckAndRepairResults)
pre = crr.get_pre_repair_results()
self.failUnlessIsInstance(pre, check_results.CheckResults)
post = crr.get_post_repair_results()
self.failUnlessIsInstance(post, check_results.CheckResults)
delta_reads, delta_allocates, delta_writes = self._get_delta_counts()
def _delete_7(unused=None):
shnums = range(10)
random.shuffle(shnums)
for sharenum in shnums[:7]:
self._delete_a_share(sharenum=sharenum)
d.addCallback(_delete_7)
self.failIfBigger(delta_reads, MAX_DELTA_READS)
self.failIfBigger(delta_allocates, (DELTA_WRITES_PER_SHARE * 7))
self.failIf(pre.is_healthy())
self.failUnless(post.is_healthy(), post.data)
def _repair_from_deletion_of_7(unused):
before_repair_reads = self._count_reads()
before_repair_allocates = self._count_writes()
# Make sure we really have 10 shares.
shares = self.find_shares(self.uri)
self.failIf(len(shares) < 10)
d.addCallback(_check_results)
d2 = self.filenode.check_and_repair(Monitor(), verify=False)
def _after_repair(checkandrepairresults):
assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
prerepairres = checkandrepairresults.get_pre_repair_results()
assert isinstance(prerepairres, check_results.CheckResults), prerepairres
postrepairres = checkandrepairresults.get_post_repair_results()
assert isinstance(postrepairres, check_results.CheckResults), postrepairres
after_repair_reads = self._count_reads()
after_repair_allocates = self._count_writes()
d.addCallback(lambda ignored:
self.c0_filenode.check(Monitor(), verify=True))
d.addCallback(lambda vr: self.failUnless(vr.is_healthy()))
# print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 7), (after_repair_allocates, before_repair_allocates))
self.failIf(prerepairres.is_healthy())
self.failUnless(postrepairres.is_healthy(), postrepairres.data)
# Now we delete seven of the other shares, then try to download the
# file and assert that it succeeds at downloading and has the right
# contents. This can't work unless it has already repaired the
# previously-deleted share #2.
# Now we inspect the filesystem to make sure that it has 10
# shares.
shares = self.find_shares()
self.failIf(len(shares) < 10)
# Now we assert that the verifier reports the file as healthy.
d3 = self.filenode.check(Monitor(), verify=True)
def _after_verify(verifyresults):
self.failUnless(verifyresults.is_healthy())
d3.addCallback(_after_verify)
# Now we delete seven random shares, then try to download the
# file and assert that it succeeds at downloading and has the
# right contents.
def _then_delete_7_and_try_a_download(unused=None):
for i in range(7):
self._delete_a_share()
return self._download_and_check_plaintext()
d3.addCallback(_then_delete_7_and_try_a_download)
return d3
d2.addCallback(_after_repair)
return d2
d.addCallback(_repair_from_deletion_of_7)
d.addCallback(lambda ignored:
self.delete_shares_numbered(self.uri, range(3, 10+1)))
d.addCallback(lambda ignored: self.c1_filenode.download_to_data())
d.addCallback(lambda newdata:
self.failUnlessEqual(newdata, common.TEST_DATA))
return d
# why is test_repair_from_corruption_of_1 disabled? Read on:
@ -638,6 +662,10 @@ class Repairer(common.ShareManglingMixin, unittest.TestCase):
# and will probably cause subsequent unrelated tests to fail too (due to
# "unclean reactor" problems).
#
# In addition, I (warner) have recently refactored the rest of this class
# to use the much-faster no_network.GridTestMixin, so this tests needs to
# be updated before it will be able to run again.
#
# So we're turning this test off until we've done one or more of the
# following:
# * remove some of these limitations
@ -672,7 +700,7 @@ class Repairer(common.ShareManglingMixin, unittest.TestCase):
# The "* 2" in reads is because you might read a whole share
# before figuring out that it is corrupted. It might be
# possible to make this delta reads number a little tighter.
self.failIf(after_repair_reads - before_repair_reads > (DELTA_READS * 2), (after_repair_reads, before_repair_reads))
self.failIf(after_repair_reads - before_repair_reads > (MAX_DELTA_READS * 2), (after_repair_reads, before_repair_reads))
# The "* 2" in writes is because each server has two shares,
# and it is reasonable for repairer to conclude that there
# are two shares that it should upload, if the server fails