mutable.Publish: create a dispatch_map for the benefit of recovery code, and pull pack/unpack methods out into functions

This commit is contained in:
Brian Warner 2007-11-05 22:14:59 -07:00
parent 59632c6812
commit 281afe7cfc
2 changed files with 149 additions and 127 deletions

View File

@ -10,8 +10,6 @@ from allmydata import hashtree, codec
from allmydata.encode import NotEnoughPeersError
HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ")
class NeedMoreDataError(Exception):
def __init__(self, needed_bytes):
Exception.__init__(self)
@ -20,6 +18,105 @@ class NeedMoreDataError(Exception):
class UncoordinatedWriteError(Exception):
pass
HEADER_LENGTH = struct.calcsize(">BQ32s BBQQ LLLLLQQ")
def unpack_share(data):
assert len(data) >= HEADER_LENGTH
o = {}
(version,
seqnum,
root_hash,
k, N, segsize, datalen,
o['signature'],
o['share_hash_chain'],
o['block_hash_tree'],
o['IV'],
o['share_data'],
o['enc_privkey'],
o['EOF']) = struct.unpack(">BQ32s" + "BBQQ" + "LLLLLQQ",
data[:HEADER_LENGTH])
assert version == 0
if len(data) < o['EOF']:
raise NeedMoreDataError(o['EOF'])
pubkey = data[HEADER_LENGTH:o['signature']]
signature = data[o['signature']:o['share_hash_chain']]
share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
share_hash_format = ">H32s"
hsize = struct.calcsize(share_hash_format)
assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
share_hash_chain = []
for i in range(0, len(share_hash_chain_s), hsize):
chunk = share_hash_chain_s[i:i+hsize]
(hid, h) = struct.unpack(share_hash_format, chunk)
share_hash_chain.append( (hid, h) )
block_hash_tree_s = data[o['block_hash_tree']:o['IV']]
assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
block_hash_tree = []
for i in range(0, len(block_hash_tree_s), 32):
block_hash_tree.append(block_hash_tree_s[i:i+32])
IV = data[o['IV']:o['share_data']]
share_data = data[o['share_data']:o['enc_privkey']]
enc_privkey = data[o['enc_privkey']:o['EOF']]
return (seqnum, root_hash, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
IV, share_data, enc_privkey)
def pack_checkstring(seqnum, root_hash):
return struct.pack(">BQ32s",
0, # version,
seqnum,
root_hash)
def unpack_checkstring(checkstring):
cs_len = struct.calcsize(">BQ32s")
version, seqnum, root_hash = struct.unpack(">BQ32s",
checkstring[:cs_len])
assert version == 0 # TODO: just ignore the share
return (seqnum, root_hash)
def pack_prefix(seqnum, root_hash,
required_shares, total_shares,
segment_size, data_length):
prefix = struct.pack(">BQ32s" + "BBQQ",
0, # version,
seqnum,
root_hash,
required_shares,
total_shares,
segment_size,
data_length,
)
return prefix
def pack_offsets(verification_key_length, signature_length,
share_hash_chain_length, block_hash_tree_length,
IV_length, share_data_length, encprivkey_length):
post_offset = HEADER_LENGTH
offsets = {}
o1 = offsets['signature'] = post_offset + verification_key_length
o2 = offsets['share_hash_chain'] = o1 + signature_length
o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
assert IV_length == 16
o4 = offsets['IV'] = o3 + block_hash_tree_length
o5 = offsets['share_data'] = o4 + IV_length
o6 = offsets['enc_privkey'] = o5 + share_data_length
o7 = offsets['EOF'] = o6 + encprivkey_length
return struct.pack(">LLLLLQQ",
offsets['signature'],
offsets['share_hash_chain'],
offsets['block_hash_tree'],
offsets['IV'],
offsets['share_data'],
offsets['enc_privkey'],
offsets['EOF'])
# use client.create_mutable_file() to make one of these
class MutableFileNode:
@ -109,54 +206,8 @@ class MutableFileNode:
def replace(self, newdata):
return defer.succeed(None)
class ShareFormattingMixin:
def _unpack_share(self, data):
assert len(data) >= HEADER_LENGTH
o = {}
(version,
seqnum,
root_hash,
k, N, segsize, datalen,
o['signature'],
o['share_hash_chain'],
o['block_hash_tree'],
o['IV'],
o['share_data'],
o['enc_privkey'],
o['EOF']) = struct.unpack(">BQ32s" + "BBQQ" + "LLLLLQQ",
data[:HEADER_LENGTH])
assert version == 0
if len(data) < o['EOF']:
raise NeedMoreDataError(o['EOF'])
pubkey = data[HEADER_LENGTH:o['signature']]
signature = data[o['signature']:o['share_hash_chain']]
share_hash_chain_s = data[o['share_hash_chain']:o['block_hash_tree']]
share_hash_format = ">H32s"
hsize = struct.calcsize(share_hash_format)
assert len(share_hash_chain_s) % hsize == 0, len(share_hash_chain_s)
share_hash_chain = []
for i in range(0, len(share_hash_chain_s), hsize):
chunk = share_hash_chain_s[i:i+hsize]
(hid, h) = struct.unpack(share_hash_format, chunk)
share_hash_chain.append( (hid, h) )
block_hash_tree_s = data[o['block_hash_tree']:o['IV']]
assert len(block_hash_tree_s) % 32 == 0, len(block_hash_tree_s)
block_hash_tree = []
for i in range(0, len(block_hash_tree_s), 32):
block_hash_tree.append(block_hash_tree_s[i:i+32])
IV = data[o['IV']:o['share_data']]
share_data = data[o['share_data']:o['enc_privkey']]
enc_privkey = data[o['enc_privkey']:o['EOF']]
return (seqnum, root_hash, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
IV, share_data, enc_privkey)
class Retrieve(ShareFormattingMixin):
class Retrieve:
def __init__(self, filenode):
self._node = filenode
@ -167,8 +218,7 @@ class DictOfSets(dict):
else:
self[key] = set([value])
class Publish(ShareFormattingMixin):
class Publish:
"""I represent a single act of publishing the mutable file to the grid."""
def __init__(self, filenode):
@ -184,13 +234,10 @@ class Publish(ShareFormattingMixin):
# 2: perform peer selection, get candidate servers
# 2a: send queries to n+epsilon servers, to determine current shares
# 2b: based upon responses, create target map
# 3: pre-allocate some shares to some servers, based upon any existing
# self._node._sharemap
# 4: send allocate/testv_and_writev messages
# 5: as responses return, update share-dispatch table
# 5a: may need to run recovery algorithm
# 6: when enough responses are back, we're done
# 3: send slot_testv_and_readv_and_writev messages
# 4: as responses return, update share-dispatch table
# 4a: may need to run recovery algorithm
# 5: when enough responses are back, we're done
old_roothash = self._node._current_roothash
old_seqnum = self._node._current_seqnum
@ -209,7 +256,7 @@ class Publish(ShareFormattingMixin):
d.addCallback(self._query_peers, total_shares)
d.addCallback(self._send_shares)
d.addCallback(self._wait_for_responses)
d.addCallback(self._maybe_recover)
d.addCallback(lambda res: None)
return d
@ -279,9 +326,9 @@ class Publish(ShareFormattingMixin):
root_hash = share_hash_tree[0]
assert len(root_hash) == 32
prefix = self._pack_prefix(seqnum, root_hash,
required_shares, total_shares,
segment_size, data_length)
prefix = pack_prefix(seqnum, root_hash,
required_shares, total_shares,
segment_size, data_length)
# now pack the beginning of the share. All shares are the same up
# to the signature, then they have divergent share hash chains,
@ -303,13 +350,13 @@ class Publish(ShareFormattingMixin):
assert len(h) == 32
block_hash_tree_s = "".join(bht)
share_data = all_shares[shnum]
offsets = self._pack_offsets(len(verification_key),
len(signature),
len(share_hash_chain_s),
len(block_hash_tree_s),
len(IV),
len(share_data),
len(encprivkey))
offsets = pack_offsets(len(verification_key),
len(signature),
len(share_hash_chain_s),
len(block_hash_tree_s),
len(IV),
len(share_data),
len(encprivkey))
final_shares[shnum] = "".join([prefix,
offsets,
@ -323,50 +370,6 @@ class Publish(ShareFormattingMixin):
return (seqnum, root_hash, final_shares)
def _pack_checkstring(self, seqnum, root_hash):
return struct.pack(">BQ32s",
0, # version,
seqnum,
root_hash)
def _pack_prefix(self, seqnum, root_hash,
required_shares, total_shares,
segment_size, data_length):
prefix = struct.pack(">BQ32s" + "BBQQ",
0, # version,
seqnum,
root_hash,
required_shares,
total_shares,
segment_size,
data_length,
)
return prefix
def _pack_offsets(self, verification_key_length, signature_length,
share_hash_chain_length, block_hash_tree_length,
IV_length, share_data_length, encprivkey_length):
post_offset = HEADER_LENGTH
offsets = {}
o1 = offsets['signature'] = post_offset + verification_key_length
o2 = offsets['share_hash_chain'] = o1 + signature_length
o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
assert IV_length == 16
o4 = offsets['IV'] = o3 + block_hash_tree_length
o5 = offsets['share_data'] = o4 + IV_length
o6 = offsets['enc_privkey'] = o5 + share_data_length
o7 = offsets['EOF'] = o6 + encprivkey_length
return struct.pack(">LLLLLQQ",
offsets['signature'],
offsets['share_hash_chain'],
offsets['block_hash_tree'],
offsets['IV'],
offsets['share_data'],
offsets['enc_privkey'],
offsets['EOF'])
def _query_peers(self, (seqnum, root_hash, final_shares), total_shares):
self._new_seqnum = seqnum
self._new_root_hash = root_hash
@ -416,7 +419,7 @@ class Publish(ShareFormattingMixin):
for shnum, datav in datavs.items():
assert len(datav) == 1
data = datav[0]
r = self._unpack_share(data)
r = unpack_share(data)
share = (shnum, r[0], r[1]) # shnum,seqnum,R
current_share_peers[shnum].add( (peerid, r[0], r[1]) )
@ -476,8 +479,8 @@ class Publish(ShareFormattingMixin):
# surprises here are *not* indications of UncoordinatedWriteError,
# and we'll need to respond to them more gracefully.
my_checkstring = self._pack_checkstring(self._new_seqnum,
self._new_root_hash)
my_checkstring = pack_checkstring(self._new_seqnum,
self._new_root_hash)
peer_messages = {}
expected_old_shares = {}
@ -498,6 +501,7 @@ class Publish(ShareFormattingMixin):
dl = []
# ok, send the messages!
self._surprised = False
dispatch_map = DictOfSets()
for peerid, tw_vectors in peer_messages.items():
@ -508,12 +512,12 @@ class Publish(ShareFormattingMixin):
d = self._do_testreadwrite(peerid, peer_storage_servers, secrets,
tw_vectors, read_vector)
d.addCallback(self._got_write_answer,
peerid, expected_old_shares[peerid])
d.addCallback(self._got_write_answer, tw_vectors, my_checkstring,
peerid, expected_old_shares[peerid], dispatch_map)
dl.append(d)
d = defer.DeferredList(dl)
d.addCallback(lambda res: self._surprised)
d.addCallback(lambda res: (self._surprised, dispatch_map))
return d
def _do_testreadwrite(self, peerid, peer_storage_servers, secrets,
@ -528,24 +532,42 @@ class Publish(ShareFormattingMixin):
read_vector)
return d
def _got_write_answer(self, answer, peerid, expected_old_shares):
def _got_write_answer(self, answer, tw_vectors, my_checkstring,
peerid, expected_old_shares,
dispatch_map):
wrote, read_data = answer
surprised = False
if not wrote:
# surprise! our testv failed, so the write did not happen
surprised = True
for shnum, (old_checkstring,) in read_data.items():
for shnum, (old_cs,) in read_data.items():
old_seqnum, old_root_hash = unpack_checkstring(old_cs)
if wrote and shnum in tw_vectors:
current_cs = my_checkstring
else:
current_cs = old_cs
current_seqnum, current_root_hash = unpack_checkstring(current_cs)
dispatch_map.add(shnum, (peerid, current_seqnum, current_root_hash))
if shnum not in expected_old_shares:
# surprise! there was a share we didn't know about
surprised = True
else:
seqnum, root_hash = expected_old_shares[shnum]
if seqnum is not None:
expected_checkstring = self._pack_checkstring(seqnum,
root_hash)
if old_checkstring != expected_checkstring:
# surprise! somebody modified the share
if seqnum != old_seqnum or root_hash != old_root_hash:
# surprise! somebody modified the share on us
surprised = True
if surprised:
self._surprised = True
def _maybe_recover(self, (surprised, dispatch_map)):
if not surprised:
return
print "RECOVERY NOT YET IMPLEMENTED"
# but dispatch_map will help us do it
raise UncoordinatedWriteError("I was surprised!")

View File

@ -213,7 +213,7 @@ class Publish(unittest.TestCase):
self.failUnless(isinstance(sh, str))
self.failUnlessEqual(len(sh), 367)
# feed the share through the unpacker as a sanity-check
pieces = r._unpack_share(sh)
pieces = mutable.unpack_share(sh)
(u_seqnum, u_root_hash, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
IV, share_data, enc_privkey) = pieces
@ -355,7 +355,7 @@ class Publish(unittest.TestCase):
d, p = self.setup_for_write(20, total_shares)
d.addCallback(p._query_peers, total_shares)
d.addCallback(p._send_shares)
def _done(surprised):
def _done((surprised, dispatch_map)):
self.failIf(surprised, "surprised!")
d.addCallback(_done)
return d