mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
mutable: test roundtrip, make it work
This commit is contained in:
parent
ba43c033fa
commit
be94960680
@ -146,7 +146,7 @@ class RIStorageServer(RemoteInterface):
|
||||
"""Read a vector from the numbered shares associated with the given
|
||||
storage index. An empty shares list means to return data from all
|
||||
known shares. Returns a dictionary with one key per share."""
|
||||
return DictOf(int, DataVector) # shnum -> results
|
||||
return DictOf(int, ReadData) # shnum -> results
|
||||
|
||||
def slot_testv_and_readv_and_writev(storage_index=StorageIndex,
|
||||
secrets=TupleOf(Hash, Hash, Hash),
|
||||
|
@ -166,9 +166,11 @@ class Retrieve:
|
||||
self._pubkey = filenode.get_pubkey()
|
||||
self._storage_index = filenode.get_storage_index()
|
||||
self._readkey = filenode.get_readkey()
|
||||
self._last_failure = None
|
||||
|
||||
def log(self, msg):
|
||||
self._node._client.log(msg)
|
||||
#self._node._client.log(msg)
|
||||
pass
|
||||
|
||||
def retrieve(self):
|
||||
"""Retrieve the filenode's current contents. Returns a Deferred that
|
||||
@ -218,12 +220,37 @@ class Retrieve:
|
||||
self._segsize = None
|
||||
self._datalength = None
|
||||
|
||||
# self._valid_versions is a dictionary in which the keys are
|
||||
# 'verinfo' tuples (seqnum, root_hash, IV). Every time we hear about
|
||||
# a new potential version of the file, we check its signature, and
|
||||
# the valid ones are added to this dictionary. The values of the
|
||||
# dictionary are (prefix, sharemap) tuples, where 'prefix' is just
|
||||
# the first part of the share (containing the serialized verinfo),
|
||||
# for easier comparison. 'sharemap' is a DictOfSets, in which the
|
||||
# keys are sharenumbers, and the values are sets of (peerid, data)
|
||||
# tuples. There is a (peerid, data) tuple for every instance of a
|
||||
# given share that we've seen. The 'data' in this tuple is a full
|
||||
# copy of the SDMF share, starting with the \x00 version byte and
|
||||
# continuing through the last byte of sharedata.
|
||||
self._valid_versions = {}
|
||||
|
||||
# self._valid_shares is a set (peerid,data) tuples. Each time we
|
||||
# examine the hash chains inside a share and validate them against a
|
||||
# signed root_hash, we add the share to self._valid_shares . We use
|
||||
# this to avoid re-checking the hashes over and over again.
|
||||
self._valid_shares = set()
|
||||
|
||||
self._done_deferred = defer.Deferred()
|
||||
|
||||
d = defer.succeed(initial_query_count)
|
||||
d.addCallback(self._choose_initial_peers)
|
||||
d.addCallback(self._send_initial_requests)
|
||||
d.addCallback(lambda res: self._contents)
|
||||
d.addCallback(self._wait_for_finish)
|
||||
return d
|
||||
|
||||
def _wait_for_finish(self, res):
|
||||
return self._done_deferred
|
||||
|
||||
def _choose_initial_peers(self, numqueries):
|
||||
n = self._node
|
||||
full_peerlist = n._client.get_permuted_peers(self._storage_index,
|
||||
@ -246,10 +273,11 @@ class Retrieve:
|
||||
self._bad_peerids = set()
|
||||
self._running = True
|
||||
self._queries_outstanding = set()
|
||||
self._used_peers = set()
|
||||
self._sharemap = DictOfSets() # shnum -> [(peerid, seqnum, R)..]
|
||||
self._peer_storage_servers = {}
|
||||
dl = []
|
||||
for (permutedid, peerid, conn) in peerlist:
|
||||
for (peerid, conn) in peerlist:
|
||||
self._queries_outstanding.add(peerid)
|
||||
self._do_query(conn, peerid, self._storage_index, self._read_size,
|
||||
self._peer_storage_servers)
|
||||
@ -257,9 +285,7 @@ class Retrieve:
|
||||
# control flow beyond this point: state machine. Receiving responses
|
||||
# from queries is the input. We might send out more queries, or we
|
||||
# might produce a result.
|
||||
|
||||
d = self._done_deferred = defer.Deferred()
|
||||
return d
|
||||
return None
|
||||
|
||||
def _do_query(self, conn, peerid, storage_index, readsize,
|
||||
peer_storage_servers):
|
||||
@ -281,13 +307,10 @@ class Retrieve:
|
||||
|
||||
def _deserialize_pubkey(self, pubkey_s):
|
||||
# TODO
|
||||
from allmydata.test.test_mutable import FakePubKey
|
||||
return FakePubKey(0)
|
||||
return None
|
||||
|
||||
def _validate_share(self, root_hash, shnum, data):
|
||||
if False:
|
||||
raise CorruptShareError("explanation")
|
||||
pass
|
||||
|
||||
def _got_results(self, datavs, peerid, readsize):
|
||||
self._queries_outstanding.discard(peerid)
|
||||
self._used_peers.add(peerid)
|
||||
@ -350,8 +373,9 @@ class Retrieve:
|
||||
self._do_query(conn, peerid, storage_index, self._read_size,
|
||||
peer_storage_servers)
|
||||
return
|
||||
self._last_failure = f
|
||||
self._bad_peerids.add(peerid)
|
||||
short_sid = idlib.a2b(self.storage_index)[:6]
|
||||
short_sid = idlib.b2a(self._storage_index)[:6]
|
||||
if f.check(CorruptShareError):
|
||||
self.log("WEIRD: bad share for %s: %s" % (short_sid, f))
|
||||
else:
|
||||
@ -362,13 +386,17 @@ class Retrieve:
|
||||
share_prefixes = {}
|
||||
versionmap = DictOfSets()
|
||||
for verinfo, (prefix, sharemap) in self._valid_versions.items():
|
||||
# sharemap is a dict that maps shnums to sets of (peerid,data).
|
||||
# len(sharemap) is the number of distinct shares that appear to
|
||||
# be available.
|
||||
if len(sharemap) >= self._required_shares:
|
||||
# this one looks retrievable
|
||||
d = defer.maybeDeferred(self._extract_data, verinfo, sharemap)
|
||||
d = defer.maybeDeferred(self._attempt_decode, verinfo, sharemap)
|
||||
def _problem(f):
|
||||
self._last_failure = f
|
||||
if f.check(CorruptShareError):
|
||||
# log(WEIRD)
|
||||
# _extract_data is responsible for removing the bad
|
||||
# _attempt_decode is responsible for removing the bad
|
||||
# share, so we can just try again
|
||||
eventually(self._check_for_done)
|
||||
return
|
||||
@ -416,41 +444,45 @@ class Retrieve:
|
||||
return
|
||||
|
||||
# we've used up all the peers we're allowed to search. Failure.
|
||||
return self._done(failure.Failure(NotEnoughPeersError()))
|
||||
e = NotEnoughPeersError("last failure: %s" % self._last_failure)
|
||||
return self._done(failure.Failure(e))
|
||||
|
||||
def _extract_data(self, verinfo, sharemap):
|
||||
def _attempt_decode(self, verinfo, sharemap):
|
||||
# sharemap is a dict which maps shnum to [(peerid,data)..] sets.
|
||||
(seqnum, root_hash, IV) = verinfo
|
||||
|
||||
# first, validate each share that we haven't validated yet. We use
|
||||
# self._valid_shares to remember which ones we've already checked.
|
||||
|
||||
self._valid_shares = set() # set of (peerid,data) sets
|
||||
shares = {}
|
||||
for shnum, shareinfo in sharemap.items():
|
||||
if shareinfo not in self._valid_shares:
|
||||
(peerid,data) = shareinfo
|
||||
try:
|
||||
# The (seqnum+root_hash+IV) tuple for this share was
|
||||
# already verified: specifically, all shares in the
|
||||
# sharemap have a (seqnum+root_hash+IV) pair that was
|
||||
# present in a validly signed prefix. The remainder of
|
||||
# the prefix for this particular share has *not* been
|
||||
# validated, but we don't care since we don't use it.
|
||||
# self._validate_share() is required to check the hashes
|
||||
# on the share data (and hash chains) to make sure they
|
||||
# match root_hash, but is not required (and is in fact
|
||||
# prohibited, because we don't validate the prefix on all
|
||||
# shares) from using anything else in the share.
|
||||
sharedata = self._validate_share(root_hash, shnum, data)
|
||||
except CorruptShareError, e:
|
||||
self.log("WEIRD: share was corrupt: %s" % e)
|
||||
sharemap[shnum].discard(shareinfo)
|
||||
# If there are enough remaining shares, _check_for_done()
|
||||
# will try again
|
||||
raise
|
||||
self._valid_shares.add(shareinfo)
|
||||
shares[shnum] = sharedata
|
||||
for shnum, shareinfos in sharemap.items():
|
||||
for shareinfo in shareinfos:
|
||||
if shareinfo not in self._valid_shares:
|
||||
(peerid,data) = shareinfo
|
||||
try:
|
||||
# The (seqnum+root_hash+IV) tuple for this share was
|
||||
# already verified: specifically, all shares in the
|
||||
# sharemap have a (seqnum+root_hash+IV) pair that was
|
||||
# present in a validly signed prefix. The remainder
|
||||
# of the prefix for this particular share has *not*
|
||||
# been validated, but we don't care since we don't
|
||||
# use it. self._validate_share() is required to check
|
||||
# the hashes on the share data (and hash chains) to
|
||||
# make sure they match root_hash, but is not required
|
||||
# (and is in fact prohibited, because we don't
|
||||
# validate the prefix on all shares) from using
|
||||
# anything else in the share.
|
||||
validator = self._validate_share_and_extract_data
|
||||
sharedata = validator(root_hash, shnum, data)
|
||||
assert isinstance(sharedata, str)
|
||||
except CorruptShareError, e:
|
||||
self.log("WEIRD: share was corrupt: %s" % e)
|
||||
sharemap[shnum].discard(shareinfo)
|
||||
# If there are enough remaining shares,
|
||||
# _check_for_done() will try again
|
||||
raise
|
||||
self._valid_shares.add(shareinfo)
|
||||
shares[shnum] = sharedata
|
||||
# at this point, all shares in the sharemap are valid, and they're
|
||||
# all for the same seqnum+root_hash version, so it's now down to
|
||||
# doing FEC and decrypt.
|
||||
@ -458,7 +490,36 @@ class Retrieve:
|
||||
d.addCallback(self._decrypt, IV)
|
||||
return d
|
||||
|
||||
def _validate_share_and_extract_data(self, root_hash, shnum, data):
|
||||
# 'data' is the whole SMDF share
|
||||
self.log("_validate_share_and_extract_data[%d]" % shnum)
|
||||
assert data[0] == "\x00"
|
||||
pieces = unpack_share(data)
|
||||
(seqnum, root_hash, IV, k, N, segsize, datalen,
|
||||
pubkey, signature, share_hash_chain, block_hash_tree,
|
||||
share_data, enc_privkey) = pieces
|
||||
|
||||
assert isinstance(share_data, str)
|
||||
# build the block hash tree. SDMF has only one leaf.
|
||||
leaves = [hashutil.block_hash(share_data)]
|
||||
t = hashtree.HashTree(leaves)
|
||||
if list(t) != block_hash_tree:
|
||||
raise CorruptShareError("block hash tree failure")
|
||||
share_hash_leaf = t[0]
|
||||
# t2 = hashtree.IncompleteHashTree()
|
||||
# TODO: use shnum, share_hash_leaf, share_hash_chain to compare against
|
||||
# root_hash
|
||||
#if False:
|
||||
# raise CorruptShareError("explanation")
|
||||
self.log(" data valid! len=%d" % len(share_data))
|
||||
return share_data
|
||||
|
||||
def _decode(self, shares_dict):
|
||||
# we ought to know these values by now
|
||||
assert self._segsize is not None
|
||||
assert self._required_shares is not None
|
||||
assert self._total_shares is not None
|
||||
|
||||
# shares_dict is a dict mapping shnum to share data, but the codec
|
||||
# wants two lists.
|
||||
shareids = []; shares = []
|
||||
@ -466,21 +527,29 @@ class Retrieve:
|
||||
shareids.append(shareid)
|
||||
shares.append(share)
|
||||
|
||||
# zfec really doesn't want extra shares
|
||||
shareids = shareids[:self._required_shares]
|
||||
shares = shares[:self._required_shares]
|
||||
|
||||
fec = codec.CRSDecoder()
|
||||
# we ought to know these values by now
|
||||
assert self._segsize is not None
|
||||
assert self._required_shares is not None
|
||||
assert self._total_shares is not None
|
||||
params = "%d-%d-%d" % (self._segsize,
|
||||
self._required_shares, self._total_shares)
|
||||
fec.set_serialized_params(params)
|
||||
|
||||
d = fec.decode(shares, shareids)
|
||||
self.log("params %s, we have %d shares" % (params, len(shares)))
|
||||
self.log("about to decode, shareids=%s" % (shareids,))
|
||||
d = defer.maybeDeferred(fec.decode, shares, shareids)
|
||||
def _done(buffers):
|
||||
self.log(" decode done, %d buffers" % len(buffers))
|
||||
segment = "".join(buffers)
|
||||
segment = segment[:self._datalength]
|
||||
self.log(" segment len=%d" % len(segment))
|
||||
return segment
|
||||
def _err(f):
|
||||
self.log(" decode failed: %s" % f)
|
||||
return f
|
||||
d.addCallback(_done)
|
||||
d.addErrback(_err)
|
||||
return d
|
||||
|
||||
def _decrypt(self, crypttext, IV):
|
||||
@ -490,6 +559,7 @@ class Retrieve:
|
||||
return plaintext
|
||||
|
||||
def _done(self, contents):
|
||||
self.log("DONE, contents: %r" % contents)
|
||||
self._running = False
|
||||
eventually(self._done_deferred.callback, contents)
|
||||
|
||||
@ -508,6 +578,10 @@ class Publish:
|
||||
def __init__(self, filenode):
|
||||
self._node = filenode
|
||||
|
||||
def log(self, msg):
|
||||
prefix = idlib.b2a(self._node.get_storage_index())[:6]
|
||||
#self._node._client.log("%s: %s" % (prefix, msg))
|
||||
|
||||
def publish(self, newdata):
|
||||
"""Publish the filenode's current contents. Returns a Deferred that
|
||||
fires (with None) when the publish has done as much work as it's ever
|
||||
@ -523,6 +597,8 @@ class Publish:
|
||||
# 4a: may need to run recovery algorithm
|
||||
# 5: when enough responses are back, we're done
|
||||
|
||||
self.log("starting publish")
|
||||
|
||||
old_roothash = self._node._current_roothash
|
||||
old_seqnum = self._node._current_seqnum
|
||||
|
||||
@ -549,6 +625,8 @@ class Publish:
|
||||
|
||||
def _encrypt_and_encode(self, newdata, readkey, IV,
|
||||
required_shares, total_shares):
|
||||
self.log("_encrypt_and_encode")
|
||||
|
||||
key = hashutil.ssk_readkey_data_hash(IV, readkey)
|
||||
enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
|
||||
crypttext = enc.encrypt(newdata)
|
||||
@ -583,6 +661,7 @@ class Publish:
|
||||
required_shares, total_shares,
|
||||
segment_size, data_length, IV),
|
||||
seqnum, privkey, encprivkey, pubkey):
|
||||
self.log("_generate_shares")
|
||||
|
||||
(shares, share_ids) = shares_and_shareids
|
||||
|
||||
@ -655,6 +734,8 @@ class Publish:
|
||||
|
||||
|
||||
def _query_peers(self, (seqnum, root_hash, final_shares), total_shares):
|
||||
self.log("_query_peers")
|
||||
|
||||
self._new_seqnum = seqnum
|
||||
self._new_root_hash = root_hash
|
||||
self._new_shares = final_shares
|
||||
@ -700,6 +781,8 @@ class Publish:
|
||||
|
||||
def _got_query_results(self, datavs, peerid, permutedid,
|
||||
reachable_peers, current_share_peers):
|
||||
self.log("_got_query_results")
|
||||
|
||||
assert isinstance(datavs, dict)
|
||||
reachable_peers[peerid] = permutedid
|
||||
for shnum, datav in datavs.items():
|
||||
@ -712,6 +795,7 @@ class Publish:
|
||||
def _got_all_query_results(self, res,
|
||||
total_shares, reachable_peers, new_seqnum,
|
||||
current_share_peers, peer_storage_servers):
|
||||
self.log("_got_all_query_results")
|
||||
# now that we know everything about the shares currently out there,
|
||||
# decide where to place the new shares.
|
||||
|
||||
@ -758,6 +842,7 @@ class Publish:
|
||||
return (target_map, peer_storage_servers)
|
||||
|
||||
def _send_shares(self, (target_map, peer_storage_servers), IV ):
|
||||
self.log("_send_shares")
|
||||
# we're finally ready to send out our shares. If we encounter any
|
||||
# surprises here, it's because somebody else is writing at the same
|
||||
# time. (Note: in the future, when we remove the _query_peers() step
|
||||
@ -821,6 +906,7 @@ class Publish:
|
||||
def _got_write_answer(self, answer, tw_vectors, my_checkstring,
|
||||
peerid, expected_old_shares,
|
||||
dispatch_map):
|
||||
self.log("_got_write_answer: %r" % (answer,))
|
||||
wrote, read_data = answer
|
||||
surprised = False
|
||||
|
||||
@ -851,6 +937,7 @@ class Publish:
|
||||
self._surprised = True
|
||||
|
||||
def _maybe_recover(self, (surprised, dispatch_map)):
|
||||
self.log("_maybe_recover")
|
||||
if not surprised:
|
||||
return
|
||||
print "RECOVERY NOT YET IMPLEMENTED"
|
||||
@ -886,6 +973,7 @@ class MutableFileNode:
|
||||
self._writekey = self._uri.writekey
|
||||
self._readkey = self._uri.readkey
|
||||
self._storage_index = self._uri.storage_index
|
||||
self._fingerprint = self._uri.fingerprint
|
||||
return self
|
||||
|
||||
def create(self, initial_contents):
|
||||
@ -996,9 +1084,8 @@ class MutableFileNode:
|
||||
raise NotImplementedError
|
||||
|
||||
def download_to_data(self):
|
||||
#downloader = self._client.getServiceNamed("downloader")
|
||||
#return downloader.download_to_data(self.uri)
|
||||
return defer.succeed("this isn't going to fool you, is it")
|
||||
r = Retrieve(self)
|
||||
return r.retrieve()
|
||||
|
||||
def replace(self, newdata):
|
||||
return defer.succeed(None)
|
||||
|
@ -241,7 +241,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
|
||||
def test_mutable(self):
|
||||
self.basedir = "system/SystemTest/test_mutable"
|
||||
DATA = "Some data to upload\n" * 200
|
||||
DATA = "initial contents go here." # 25 bytes % 3 != 0
|
||||
d = self.set_up_nodes()
|
||||
|
||||
def _create_mutable(res):
|
||||
@ -249,10 +249,12 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
#print "CREATING MUTABLE FILENODE"
|
||||
c = self.clients[0]
|
||||
n = MutableFileNode(c)
|
||||
d1 = n.create("initial contents go here.") # 25 bytes % 3 != 0
|
||||
d1 = n.create(DATA)
|
||||
def _done(res):
|
||||
log.msg("DONE: %s" % (res,))
|
||||
#print "DONE", res
|
||||
self._mutable_node_1 = res
|
||||
uri = res.get_uri()
|
||||
#print "DONE", uri
|
||||
d1.addBoth(_done)
|
||||
return d1
|
||||
d.addCallback(_create_mutable)
|
||||
@ -314,6 +316,41 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
|
||||
raise
|
||||
d.addCallback(_test_debug)
|
||||
|
||||
# test retrieval
|
||||
|
||||
# first, let's see if we can use the existing node to retrieve the
|
||||
# contents. This allows it to use the cached pubkey and maybe the
|
||||
# latest-known sharemap.
|
||||
|
||||
d.addCallback(lambda res: self._mutable_node_1.download_to_data())
|
||||
def _check_download_1(res):
|
||||
#print "_check_download_1"
|
||||
self.failUnlessEqual(res, DATA)
|
||||
# now we see if we can retrieve the data from a new node,
|
||||
# constructed using the URI of the original one. We do this test
|
||||
# on the same client that uploaded the data.
|
||||
#print "download1 good, starting download2"
|
||||
uri = self._mutable_node_1.get_uri()
|
||||
newnode = self.clients[0].create_mutable_file_from_uri(uri)
|
||||
return newnode.download_to_data()
|
||||
return d
|
||||
d.addCallback(_check_download_1)
|
||||
|
||||
def _check_download_2(res):
|
||||
#print "_check_download_2"
|
||||
self.failUnlessEqual(res, DATA)
|
||||
# same thing, but with a different client
|
||||
#print "starting download 3"
|
||||
uri = self._mutable_node_1.get_uri()
|
||||
newnode = self.clients[1].create_mutable_file_from_uri(uri)
|
||||
return newnode.download_to_data()
|
||||
d.addCallback(_check_download_2)
|
||||
|
||||
def _check_download_3(res):
|
||||
#print "_check_download_3"
|
||||
self.failUnlessEqual(res, DATA)
|
||||
d.addCallback(_check_download_3)
|
||||
|
||||
return d
|
||||
|
||||
def flip_bit(self, good):
|
||||
|
Loading…
x
Reference in New Issue
Block a user