mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-19 21:17:54 +00:00
mutable: improve test coverage, fix bug in privkey fetching, add .finished to stats, remove dead code
This commit is contained in:
parent
9613997ddb
commit
09dcfeae22
@ -61,10 +61,6 @@ class MutableFileNode:
|
||||
self._sharemap = {} # known shares, shnum-to-[nodeids]
|
||||
self._cache = ResponseCache()
|
||||
|
||||
self._current_data = None # SDMF: we're allowed to cache the contents
|
||||
self._current_roothash = None # ditto
|
||||
self._current_seqnum = None # ditto
|
||||
|
||||
# all users of this MutableFileNode go through the serializer. This
|
||||
# takes advantage of the fact that Deferreds discard the callbacks
|
||||
# that they're done with, so we can keep using the same Deferred
|
||||
@ -118,10 +114,6 @@ class MutableFileNode:
|
||||
self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
|
||||
self._readkey = self._uri.readkey
|
||||
self._storage_index = self._uri.storage_index
|
||||
# TODO: seqnum/roothash: really we mean "doesn't matter since
|
||||
# nobody knows about us yet"
|
||||
self._current_seqnum = 0
|
||||
self._current_roothash = "\x00"*32
|
||||
return self._upload(initial_contents, None)
|
||||
d.addCallback(_generated)
|
||||
return d
|
||||
@ -157,10 +149,6 @@ class MutableFileNode:
|
||||
self._required_shares = required_shares
|
||||
def _populate_total_shares(self, total_shares):
|
||||
self._total_shares = total_shares
|
||||
def _populate_seqnum(self, seqnum):
|
||||
self._current_seqnum = seqnum
|
||||
def _populate_root_hash(self, root_hash):
|
||||
self._current_roothash = root_hash
|
||||
|
||||
def _populate_privkey(self, privkey):
|
||||
self._privkey = privkey
|
||||
|
@ -30,15 +30,18 @@ class UpdateStatus:
|
||||
self.progress = 0.0
|
||||
self.counter = self.statusid_counter.next()
|
||||
self.started = time.time()
|
||||
self.finished = None
|
||||
|
||||
def add_per_server_time(self, peerid, op, elapsed):
|
||||
assert op in ("query", "privkey")
|
||||
def add_per_server_time(self, peerid, op, sent, elapsed):
|
||||
assert op in ("query", "late", "privkey")
|
||||
if peerid not in self.timings["per_server"]:
|
||||
self.timings["per_server"][peerid] = []
|
||||
self.timings["per_server"][peerid].append((op,elapsed))
|
||||
self.timings["per_server"][peerid].append((op,sent,elapsed))
|
||||
|
||||
def get_started(self):
|
||||
return self.started
|
||||
def get_finished(self):
|
||||
return self.finished
|
||||
def get_storage_index(self):
|
||||
return self.storage_index
|
||||
def get_mode(self):
|
||||
@ -72,6 +75,8 @@ class UpdateStatus:
|
||||
self.progress = value
|
||||
def set_active(self, value):
|
||||
self.active = value
|
||||
def set_finished(self, when):
|
||||
self.finished = when
|
||||
|
||||
class ServerMap:
|
||||
"""I record the placement of mutable shares.
|
||||
@ -140,6 +145,10 @@ class ServerMap:
|
||||
(idlib.shortnodeid_b2a(peerid), shnum,
|
||||
seqnum, base32.b2a(root_hash)[:4], k, N,
|
||||
datalength))
|
||||
if self.problems:
|
||||
print >>out, "%d PROBLEMS" % len(self.problems)
|
||||
for f in self.problems:
|
||||
print >>out, str(f)
|
||||
return out
|
||||
|
||||
def all_peers(self):
|
||||
@ -174,7 +183,6 @@ class ServerMap:
|
||||
(verinfo, timestamp) = self.servermap[key]
|
||||
return verinfo
|
||||
return None
|
||||
return None
|
||||
|
||||
def shares_available(self):
|
||||
"""Return a dict that maps verinfo to tuples of
|
||||
@ -254,14 +262,38 @@ class ServerMap:
|
||||
# Return a dict of versionid -> health, for versions that are
|
||||
# unrecoverable and have later seqnums than any recoverable versions.
|
||||
# These indicate that a write will lose data.
|
||||
pass
|
||||
versionmap = self.make_versionmap()
|
||||
healths = {} # maps verinfo to (found,k)
|
||||
unrecoverable = set()
|
||||
highest_recoverable_seqnum = -1
|
||||
for (verinfo, shares) in versionmap.items():
|
||||
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
||||
offsets_tuple) = verinfo
|
||||
shnums = set([shnum for (shnum, peerid, timestamp) in shares])
|
||||
healths[verinfo] = (len(shnums),k)
|
||||
if len(shnums) < k:
|
||||
unrecoverable.add(verinfo)
|
||||
else:
|
||||
highest_recoverable_seqnum = max(seqnum,
|
||||
highest_recoverable_seqnum)
|
||||
|
||||
newversions = {}
|
||||
for verinfo in unrecoverable:
|
||||
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
||||
offsets_tuple) = verinfo
|
||||
if seqnum > highest_recoverable_seqnum:
|
||||
newversions[verinfo] = healths[verinfo]
|
||||
|
||||
return newversions
|
||||
|
||||
|
||||
def needs_merge(self):
|
||||
# return True if there are multiple recoverable versions with the
|
||||
# same seqnum, meaning that MutableFileNode.read_best_version is not
|
||||
# giving you the whole story, and that using its data to do a
|
||||
# subsequent publish will lose information.
|
||||
pass
|
||||
return bool(len(self.recoverable_versions()) > 1)
|
||||
|
||||
|
||||
class ServermapUpdater:
|
||||
def __init__(self, filenode, servermap, mode=MODE_READ):
|
||||
@ -457,13 +489,14 @@ class ServermapUpdater:
|
||||
level=log.NOISY)
|
||||
now = time.time()
|
||||
elapsed = now - started
|
||||
self._status.add_per_server_time(peerid, "query", elapsed)
|
||||
self._queries_outstanding.discard(peerid)
|
||||
self._must_query.discard(peerid)
|
||||
self._queries_completed += 1
|
||||
if not self._running:
|
||||
self.log("but we're not running, so we'll ignore it", parent=lp)
|
||||
self._status.add_per_server_time(peerid, "late", started, elapsed)
|
||||
return
|
||||
self._status.add_per_server_time(peerid, "query", started, elapsed)
|
||||
|
||||
if datavs:
|
||||
self._good_peers.add(peerid)
|
||||
@ -602,7 +635,7 @@ class ServermapUpdater:
|
||||
pubkey, signature, share_hash_chain, block_hash_tree,
|
||||
share_data, enc_privkey) = r
|
||||
|
||||
return self._try_to_validate_privkey(self, enc_privkey, peerid, shnum)
|
||||
return self._try_to_validate_privkey(enc_privkey, peerid, shnum)
|
||||
|
||||
def _try_to_validate_privkey(self, enc_privkey, peerid, shnum):
|
||||
|
||||
@ -638,7 +671,7 @@ class ServermapUpdater:
|
||||
def _got_privkey_results(self, datavs, peerid, shnum, started):
|
||||
now = time.time()
|
||||
elapsed = now - started
|
||||
self._status.add_per_server_time(peerid, "privkey", elapsed)
|
||||
self._status.add_per_server_time(peerid, "privkey", started, elapsed)
|
||||
self._queries_outstanding.discard(peerid)
|
||||
if not self._need_privkey:
|
||||
return
|
||||
@ -853,7 +886,9 @@ class ServermapUpdater:
|
||||
if not self._running:
|
||||
return
|
||||
self._running = False
|
||||
elapsed = time.time() - self._started
|
||||
now = time.time()
|
||||
elapsed = now - self._started
|
||||
self._status.set_finished(now)
|
||||
self._status.timings["total"] = elapsed
|
||||
self._status.set_progress(1.0)
|
||||
self._status.set_status("Done")
|
||||
|
@ -575,11 +575,14 @@ class Servermap(unittest.TestCase):
|
||||
d = self._client.create_mutable_file("New contents go here")
|
||||
def _created(node):
|
||||
self._fn = node
|
||||
self._fn2 = self._client.create_node_from_uri(node.get_uri())
|
||||
d.addCallback(_created)
|
||||
return d
|
||||
|
||||
def make_servermap(self, mode=MODE_CHECK):
|
||||
smu = ServermapUpdater(self._fn, ServerMap(), mode)
|
||||
def make_servermap(self, mode=MODE_CHECK, fn=None):
|
||||
if fn is None:
|
||||
fn = self._fn
|
||||
smu = ServermapUpdater(fn, ServerMap(), mode)
|
||||
d = smu.update()
|
||||
return d
|
||||
|
||||
@ -621,6 +624,7 @@ class Servermap(unittest.TestCase):
|
||||
d.addCallback(lambda sm: us(sm, mode=MODE_READ))
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
|
||||
d.addCallback(lambda sm: us(sm, mode=MODE_WRITE))
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
|
||||
d.addCallback(lambda sm: us(sm, mode=MODE_CHECK))
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
|
||||
d.addCallback(lambda sm: us(sm, mode=MODE_ANYTHING))
|
||||
@ -628,6 +632,25 @@ class Servermap(unittest.TestCase):
|
||||
|
||||
return d
|
||||
|
||||
def test_fetch_privkey(self):
|
||||
d = defer.succeed(None)
|
||||
# use the sibling filenode (which hasn't been used yet), and make
|
||||
# sure it can fetch the privkey. The file is small, so the privkey
|
||||
# will be fetched on the first (query) pass.
|
||||
d.addCallback(lambda res: self.make_servermap(MODE_WRITE, self._fn2))
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
|
||||
|
||||
# create a new file, which is large enough to knock the privkey out
|
||||
# of the early part of the fil
|
||||
LARGE = "These are Larger contents" * 200 # about 5KB
|
||||
d.addCallback(lambda res: self._client.create_mutable_file(LARGE))
|
||||
def _created(large_fn):
|
||||
large_fn2 = self._client.create_node_from_uri(large_fn.get_uri())
|
||||
return self.make_servermap(MODE_WRITE, large_fn2)
|
||||
d.addCallback(_created)
|
||||
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
|
||||
return d
|
||||
|
||||
def test_mark_bad(self):
|
||||
d = defer.succeed(None)
|
||||
ms = self.make_servermap
|
||||
@ -696,6 +719,7 @@ class Servermap(unittest.TestCase):
|
||||
self.failUnlessEqual(best, None)
|
||||
self.failUnlessEqual(len(sm.shares_available()), 1)
|
||||
self.failUnlessEqual(sm.shares_available().values()[0], (2,3) )
|
||||
return sm
|
||||
|
||||
def test_not_quite_enough_shares(self):
|
||||
s = self._client._storage
|
||||
@ -713,6 +737,8 @@ class Servermap(unittest.TestCase):
|
||||
|
||||
d.addCallback(lambda res: ms(mode=MODE_CHECK))
|
||||
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
|
||||
d.addCallback(lambda sm:
|
||||
self.failUnlessEqual(len(sm.make_sharemap()), 2))
|
||||
d.addCallback(lambda res: ms(mode=MODE_ANYTHING))
|
||||
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
|
||||
d.addCallback(lambda res: ms(mode=MODE_WRITE))
|
||||
@ -817,24 +843,32 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
||||
if substring:
|
||||
allproblems = [str(f) for f in servermap.problems]
|
||||
self.failUnless(substring in "".join(allproblems))
|
||||
return
|
||||
return servermap
|
||||
if should_succeed:
|
||||
d1 = self._fn.download_best_version()
|
||||
d1.addCallback(lambda new_contents:
|
||||
self.failUnlessEqual(new_contents, self.CONTENTS))
|
||||
return d1
|
||||
else:
|
||||
return self.shouldFail(NotEnoughSharesError,
|
||||
"_corrupt_all(offset=%s)" % (offset,),
|
||||
substring,
|
||||
self._fn.download_best_version)
|
||||
d1 = self.shouldFail(NotEnoughSharesError,
|
||||
"_corrupt_all(offset=%s)" % (offset,),
|
||||
substring,
|
||||
self._fn.download_best_version)
|
||||
d1.addCallback(lambda res: servermap)
|
||||
return d1
|
||||
d.addCallback(_do_retrieve)
|
||||
return d
|
||||
|
||||
def test_corrupt_all_verbyte(self):
|
||||
# when the version byte is not 0, we hit an assertion error in
|
||||
# unpack_share().
|
||||
return self._test_corrupt_all(0, "AssertionError")
|
||||
d = self._test_corrupt_all(0, "AssertionError")
|
||||
def _check_servermap(servermap):
|
||||
# and the dump should mention the problems
|
||||
s = StringIO()
|
||||
dump = servermap.dump(s).getvalue()
|
||||
self.failUnless("10 PROBLEMS" in dump, dump)
|
||||
d.addCallback(_check_servermap)
|
||||
return d
|
||||
|
||||
def test_corrupt_all_seqnum(self):
|
||||
# a corrupt sequence number will trigger a bad signature
|
||||
@ -976,8 +1010,6 @@ class MultipleEncodings(unittest.TestCase):
|
||||
fn2._pubkey = fn._pubkey
|
||||
fn2._privkey = fn._privkey
|
||||
fn2._encprivkey = fn._encprivkey
|
||||
fn2._current_seqnum = 0
|
||||
fn2._current_roothash = "\x00" * 32
|
||||
# and set the encoding parameters to something completely different
|
||||
fn2._required_shares = k
|
||||
fn2._total_shares = n
|
||||
@ -1095,6 +1127,108 @@ class MultipleEncodings(unittest.TestCase):
|
||||
d.addCallback(_retrieved)
|
||||
return d
|
||||
|
||||
class MultipleVersions(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.CONTENTS = ["Contents 0",
|
||||
"Contents 1",
|
||||
"Contents 2",
|
||||
"Contents 3a",
|
||||
"Contents 3b"]
|
||||
self._copied_shares = {}
|
||||
num_peers = 20
|
||||
self._client = FakeClient(num_peers)
|
||||
self._storage = self._client._storage
|
||||
d = self._client.create_mutable_file(self.CONTENTS[0]) # seqnum=1
|
||||
def _created(node):
|
||||
self._fn = node
|
||||
# now create multiple versions of the same file, and accumulate
|
||||
# their shares, so we can mix and match them later.
|
||||
d = defer.succeed(None)
|
||||
d.addCallback(self._copy_shares, 0)
|
||||
d.addCallback(lambda res: node.overwrite(self.CONTENTS[1])) #s2
|
||||
d.addCallback(self._copy_shares, 1)
|
||||
d.addCallback(lambda res: node.overwrite(self.CONTENTS[2])) #s3
|
||||
d.addCallback(self._copy_shares, 2)
|
||||
d.addCallback(lambda res: node.overwrite(self.CONTENTS[3])) #s4a
|
||||
d.addCallback(self._copy_shares, 3)
|
||||
# now we replace all the shares with version s3, and upload a new
|
||||
# version to get s4b.
|
||||
rollback = dict([(i,2) for i in range(10)])
|
||||
d.addCallback(lambda res: self._set_versions(rollback))
|
||||
d.addCallback(lambda res: node.overwrite(self.CONTENTS[4])) #s4b
|
||||
d.addCallback(self._copy_shares, 4)
|
||||
# we leave the storage in state 4
|
||||
return d
|
||||
d.addCallback(_created)
|
||||
return d
|
||||
|
||||
def _copy_shares(self, ignored, index):
|
||||
shares = self._client._storage._peers
|
||||
# we need a deep copy
|
||||
new_shares = {}
|
||||
for peerid in shares:
|
||||
new_shares[peerid] = {}
|
||||
for shnum in shares[peerid]:
|
||||
new_shares[peerid][shnum] = shares[peerid][shnum]
|
||||
self._copied_shares[index] = new_shares
|
||||
|
||||
def _set_versions(self, versionmap):
|
||||
# versionmap maps shnums to which version (0,1,2,3,4) we want the
|
||||
# share to be at. Any shnum which is left out of the map will stay at
|
||||
# its current version.
|
||||
shares = self._client._storage._peers
|
||||
oldshares = self._copied_shares
|
||||
for peerid in shares:
|
||||
for shnum in shares[peerid]:
|
||||
if shnum in versionmap:
|
||||
index = versionmap[shnum]
|
||||
shares[peerid][shnum] = oldshares[index][peerid][shnum]
|
||||
|
||||
def test_multiple_versions(self):
|
||||
# if we see a mix of versions in the grid, download_best_version
|
||||
# should get the latest one
|
||||
self._set_versions(dict([(i,2) for i in (0,2,4,6,8)]))
|
||||
d = self._fn.download_best_version()
|
||||
d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[4]))
|
||||
# but if everything is at version 2, that's what we should download
|
||||
d.addCallback(lambda res:
|
||||
self._set_versions(dict([(i,2) for i in range(10)])))
|
||||
d.addCallback(lambda res: self._fn.download_best_version())
|
||||
d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
|
||||
# if exactly one share is at version 3, we should still get v2
|
||||
d.addCallback(lambda res:
|
||||
self._set_versions({0:3}))
|
||||
d.addCallback(lambda res: self._fn.download_best_version())
|
||||
d.addCallback(lambda res: self.failUnlessEqual(res, self.CONTENTS[2]))
|
||||
# but the servermap should see the unrecoverable version. This
|
||||
# depends upon the single newer share being queried early.
|
||||
d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
|
||||
def _check_smap(smap):
|
||||
self.failUnlessEqual(len(smap.unrecoverable_versions()), 1)
|
||||
newer = smap.unrecoverable_newer_versions()
|
||||
self.failUnlessEqual(len(newer), 1)
|
||||
verinfo, health = newer.items()[0]
|
||||
self.failUnlessEqual(verinfo[0], 4)
|
||||
self.failUnlessEqual(health, (1,3))
|
||||
self.failIf(smap.needs_merge())
|
||||
d.addCallback(_check_smap)
|
||||
# if we have a mix of two parallel versions (s4a and s4b), we could
|
||||
# recover either
|
||||
d.addCallback(lambda res:
|
||||
self._set_versions({0:3,2:3,4:3,6:3,8:3,
|
||||
1:4,3:4,5:4,7:4,9:4}))
|
||||
d.addCallback(lambda res: self._fn.get_servermap(MODE_READ))
|
||||
def _check_smap_mixed(smap):
|
||||
self.failUnlessEqual(len(smap.unrecoverable_versions()), 0)
|
||||
newer = smap.unrecoverable_newer_versions()
|
||||
self.failUnlessEqual(len(newer), 0)
|
||||
self.failUnless(smap.needs_merge())
|
||||
d.addCallback(_check_smap_mixed)
|
||||
d.addCallback(lambda res: self._fn.download_best_version())
|
||||
d.addCallback(lambda res: self.failUnless(res == self.CONTENTS[3] or
|
||||
res == self.CONTENTS[4]))
|
||||
return d
|
||||
|
||||
|
||||
class Utils(unittest.TestCase):
|
||||
def test_dict_of_sets(self):
|
||||
|
@ -12,6 +12,7 @@
|
||||
|
||||
<ul>
|
||||
<li>Started: <span n:render="started"/></li>
|
||||
<li>Finished: <span n:render="finished"/></li>
|
||||
<li>Storage Index: <span n:render="si"/></li>
|
||||
<li>Helper?: <span n:render="helper"/></li>
|
||||
<li>Progress: <span n:render="progress"/></li>
|
||||
|
@ -624,6 +624,15 @@ class MapupdateStatusPage(rend.Page, RateAndTimeMixin):
|
||||
time.localtime(data.get_started()))
|
||||
return started_s
|
||||
|
||||
def render_finished(self, ctx, data):
|
||||
when = data.get_finished()
|
||||
if not when:
|
||||
return "not yet"
|
||||
TIME_FORMAT = "%H:%M:%S %d-%b-%Y"
|
||||
started_s = time.strftime(TIME_FORMAT,
|
||||
time.localtime(data.get_finished()))
|
||||
return started_s
|
||||
|
||||
def render_si(self, ctx, data):
|
||||
si_s = base32.b2a_or_none(data.get_storage_index())
|
||||
if si_s is None:
|
||||
@ -677,11 +686,13 @@ class MapupdateStatusPage(rend.Page, RateAndTimeMixin):
|
||||
for peerid in sorted(per_server.keys()):
|
||||
peerid_s = idlib.shortnodeid_b2a(peerid)
|
||||
times = []
|
||||
for op,t in per_server[peerid]:
|
||||
for op,started,t in per_server[peerid]:
|
||||
if op == "query":
|
||||
times.append( self.render_time(None, t) )
|
||||
elif op == "late":
|
||||
times.append( "late(" + self.render_time(None, t) + ")" )
|
||||
else:
|
||||
times.append( "(" + self.render_time(None, t) + ")" )
|
||||
times.append( "privkey(" + self.render_time(None, t) + ")" )
|
||||
times_s = ", ".join(times)
|
||||
l[T.li["[%s]: %s" % (peerid_s, times_s)]]
|
||||
return T.li["Per-Server Response Times: ", l]
|
||||
|
Loading…
Reference in New Issue
Block a user