mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-19 13:07:56 +00:00
mutable: implement filenode share-packing, still pretty rough
This commit is contained in:
parent
6510510ea7
commit
78c45c82d1
@ -1,10 +1,12 @@
|
||||
|
||||
import struct
|
||||
import os, struct
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from allmydata.interfaces import IMutableFileNode, IMutableFileURI
|
||||
from allmydata.util import hashutil
|
||||
from allmydata.util import hashutil, mathutil
|
||||
from allmydata.uri import WriteableSSKFileURI
|
||||
from allmydata.Crypto.Cipher import AES
|
||||
from allmydata import hashtree, codec
|
||||
|
||||
class MutableFileNode:
|
||||
implements(IMutableFileNode)
|
||||
@ -15,7 +17,15 @@ class MutableFileNode:
|
||||
self._privkey = None # filled in if we're mutable
|
||||
self._sharemap = {} # known shares, shnum-to-nodeid
|
||||
|
||||
self._current_data = None # SDMF: we're allowed to cache the contents
|
||||
self._current_roothash = None # ditto
|
||||
self._current_seqnum = None # ditto
|
||||
|
||||
def init_from_uri(self, myuri):
|
||||
# we have the URI, but we have not yet retrieved the public
|
||||
# verification key, nor things like 'k' or 'N'. If and when someone
|
||||
# wants to get our contents, we'll pull from shares and fill those
|
||||
# in.
|
||||
self._uri = IMutableFileURI(myuri)
|
||||
return self
|
||||
|
||||
@ -89,7 +99,198 @@ class MutableFileNode:
|
||||
share_data = data[offsets['share_data']:offsets['share_data']+datalen]
|
||||
enc_privkey = data[offsets['enc_privkey']:]
|
||||
|
||||
def pack_data(self):
|
||||
|
||||
# use client.create_mutable_file() to make one of these
|
||||
|
||||
class Publish:
|
||||
"""I represent a single act of publishing the mutable file to the grid."""
|
||||
|
||||
def __init__(self, filenode):
|
||||
self._node = filenode
|
||||
|
||||
def publish(self, newdata):
|
||||
"""Publish the filenode's current contents. Returns a Deferred that
|
||||
fires (with None) when the publish has done as much work as it's ever
|
||||
going to do, or errbacks with ConsistencyError if it detects a
|
||||
simultaneous write."""
|
||||
|
||||
# 1: generate shares (SDMF: files are small, so we can do it in RAM)
|
||||
# 2: perform peer selection, get candidate servers
|
||||
# 3: pre-allocate some shares to some servers, based upon any existing
|
||||
# self._node._sharemap
|
||||
# 4: send allocate/testv_and_writev messages
|
||||
# 5: as responses return, update share-dispatch table
|
||||
# 5a: may need to run recovery algorithm
|
||||
# 6: when enough responses are back, we're done
|
||||
|
||||
old_roothash = self._node._current_roothash
|
||||
old_seqnum = self._node._current_seqnum
|
||||
|
||||
readkey = self._node.readkey
|
||||
required_shares = self._node.required_shares
|
||||
total_shares = self._node.total_shares
|
||||
privkey = self._node.privkey
|
||||
pubkey = self._node.pubkey
|
||||
|
||||
d = defer.succeed(newdata)
|
||||
d.addCallback(self._encrypt_and_encode, readkey,
|
||||
required_shares, total_shares)
|
||||
d.addCallback(self._generate_shares, old_seqnum+1,
|
||||
privkey, self._encprivkey, pubkey)
|
||||
|
||||
d.addCallback(self._get_peers)
|
||||
d.addCallback(self._map_shares)
|
||||
d.addCallback(self._send_shares)
|
||||
d.addCallback(self._wait_for_responses)
|
||||
d.addCallback(lambda res: None)
|
||||
return d
|
||||
|
||||
def _encrypt_and_encode(self, newdata, readkey,
|
||||
required_shares, total_shares):
|
||||
IV = os.urandom(16)
|
||||
key = hashutil.ssk_readkey_data_hash(IV, readkey)
|
||||
enc = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
|
||||
crypttext = enc.encrypt(newdata)
|
||||
|
||||
# now apply FEC
|
||||
self.MAX_SEGMENT_SIZE = 1024*1024
|
||||
segment_size = min(self.MAX_SEGMENT_SIZE, len(crypttext))
|
||||
# this must be a multiple of self.required_shares
|
||||
segment_size = mathutil.next_multiple(segment_size,
|
||||
required_shares)
|
||||
self.num_segments = mathutil.div_ceil(len(crypttext), segment_size)
|
||||
assert self.num_segments == 1 # SDMF restrictions
|
||||
fec = codec.CRSEncoder()
|
||||
fec.set_params(segment_size, required_shares, total_shares)
|
||||
piece_size = fec.get_block_size()
|
||||
crypttext_pieces = []
|
||||
for offset in range(0, len(crypttext), piece_size):
|
||||
piece = crypttext[offset:offset+piece_size]
|
||||
if len(piece) < piece_size:
|
||||
pad_size = piece_size - len(piece)
|
||||
piece = piece + "\x00"*pad_size
|
||||
crypttext_pieces.append(piece)
|
||||
assert len(piece) == piece_size
|
||||
|
||||
d = fec.encode(crypttext_pieces)
|
||||
d.addCallback(lambda shares:
|
||||
(shares, required_shares, total_shares,
|
||||
segment_size, len(crypttext), IV) )
|
||||
return d
|
||||
|
||||
def _generate_shares(self, (shares_and_shareids,
|
||||
required_shares, total_shares,
|
||||
segment_size, data_length, IV),
|
||||
seqnum, privkey, encprivkey, pubkey):
|
||||
|
||||
(shares, share_ids) = shares_and_shareids
|
||||
|
||||
assert len(shares) == len(share_ids)
|
||||
assert len(shares) == total_shares
|
||||
all_shares = {}
|
||||
block_hash_trees = {}
|
||||
share_hash_leaves = [None] * len(shares)
|
||||
for i in range(len(shares)):
|
||||
share_data = shares[i]
|
||||
shnum = share_ids[i]
|
||||
all_shares[shnum] = share_data
|
||||
|
||||
# build the block hash tree. SDMF has only one leaf.
|
||||
leaves = [hashutil.block_hash(share_data)]
|
||||
t = hashtree.HashTree(leaves)
|
||||
block_hash_trees[shnum] = block_hash_tree = list(t)
|
||||
share_hash_leaves[shnum] = t[0]
|
||||
for leaf in share_hash_leaves:
|
||||
assert leaf is not None
|
||||
share_hash_tree = hashtree.HashTree(share_hash_leaves)
|
||||
share_hash_chain = {}
|
||||
for shnum in range(total_shares):
|
||||
needed_hashes = share_hash_tree.needed_hashes(shnum)
|
||||
share_hash_chain[shnum] = dict( [ (i, share_hash_tree[i])
|
||||
for i in needed_hashes ] )
|
||||
root_hash = share_hash_tree[0]
|
||||
assert len(root_hash) == 32
|
||||
|
||||
prefix = self._pack_prefix(seqnum, root_hash,
|
||||
required_shares, total_shares,
|
||||
segment_size, data_length)
|
||||
|
||||
# now pack the beginning of the share. All shares are the same up
|
||||
# to the signature, then they have divergent share hash chains,
|
||||
# then completely different block hash trees + IV + share data,
|
||||
# then they all share the same encprivkey at the end. The sizes
|
||||
# of everything are the same for all shares.
|
||||
|
||||
signature = privkey.sign(prefix)
|
||||
|
||||
verification_key = pubkey.serialize()
|
||||
|
||||
final_shares = {}
|
||||
for shnum in range(total_shares):
|
||||
shc = share_hash_chain[shnum]
|
||||
share_hash_chain_s = "".join([struct.pack(">H32s", i, shc[i])
|
||||
for i in sorted(shc.keys())])
|
||||
bht = block_hash_trees[shnum]
|
||||
for h in bht:
|
||||
assert len(h) == 32
|
||||
block_hash_tree_s = "".join(bht)
|
||||
share_data = all_shares[shnum]
|
||||
offsets = self._pack_offsets(len(verification_key),
|
||||
len(signature),
|
||||
len(share_hash_chain),
|
||||
len(block_hash_tree),
|
||||
len(IV),
|
||||
len(share_data))
|
||||
|
||||
final_shares[shnum] = "".join([prefix,
|
||||
offsets,
|
||||
verification_key,
|
||||
signature,
|
||||
share_hash_chain_s,
|
||||
block_hash_tree_s,
|
||||
IV,
|
||||
share_data,
|
||||
encprivkey])
|
||||
return (seqnum, root_hash, final_shares)
|
||||
|
||||
|
||||
def _pack_prefix(self, seqnum, root_hash,
|
||||
required_shares, total_shares,
|
||||
segment_size, data_length):
|
||||
prefix = struct.pack(">BQ32s" + "BBQQ",
|
||||
0, # version,
|
||||
seqnum,
|
||||
root_hash,
|
||||
|
||||
required_shares,
|
||||
total_shares,
|
||||
segment_size,
|
||||
data_length,
|
||||
)
|
||||
return prefix
|
||||
|
||||
def _pack_offsets(self, verification_key_length, signature_length,
|
||||
share_hash_chain_length, block_hash_tree_length,
|
||||
IV_length, share_data_length):
|
||||
post_offset = struct.calcsize(">BQ32s" + "BBQQ" + "LLLLLQ")
|
||||
offsets = {}
|
||||
o1 = offsets['signature'] = post_offset + verification_key_length
|
||||
o2 = offsets['share_hash_chain'] = o1 + signature_length
|
||||
o3 = offsets['block_hash_tree'] = o2 + share_hash_chain_length
|
||||
assert IV_length == 16
|
||||
o4 = offsets['IV'] = o3 + block_hash_tree_length
|
||||
o5 = offsets['share_data'] = o4 + IV_length
|
||||
o6 = offsets['enc_privkey'] = o5 + share_data_length
|
||||
|
||||
return struct.pack(">LLLLLQ",
|
||||
offsets['signature'],
|
||||
offsets['share_hash_chain'],
|
||||
offsets['block_hash_tree'],
|
||||
offsets['IV'],
|
||||
offsets['share_data'],
|
||||
offsets['enc_privkey'])
|
||||
|
||||
def OFF_pack_data(self):
|
||||
# dummy values to satisfy pyflakes until we wire this all up
|
||||
seqnum, root_hash, k, N, segsize, datalen = 0,0,0,0,0,0
|
||||
(verification_key, signature, share_hash_chain, block_hash_tree,
|
||||
@ -126,5 +327,3 @@ class MutableFileNode:
|
||||
enc_privkey])
|
||||
return "".join(newbuf)
|
||||
|
||||
|
||||
# use client.create_mutable_file() to make one of these
|
||||
|
@ -117,6 +117,71 @@ class Filenode(unittest.TestCase):
|
||||
d.addCallback(_created)
|
||||
return d
|
||||
|
||||
class Publish(unittest.TestCase):
|
||||
def test_encrypt(self):
|
||||
c = MyClient()
|
||||
fn = FakeFilenode(c)
|
||||
# .create usually returns a Deferred, but we happen to know it's
|
||||
# synchronous
|
||||
CONTENTS = "some initial contents"
|
||||
fn.create(CONTENTS)
|
||||
p = mutable.Publish(fn)
|
||||
d = defer.maybeDeferred(p._encrypt_and_encode,
|
||||
CONTENTS, "READKEY", 3, 10)
|
||||
def _done( ((shares, share_ids),
|
||||
required_shares, total_shares,
|
||||
segsize, data_length, IV) ):
|
||||
self.failUnlessEqual(len(shares), 10)
|
||||
for sh in shares:
|
||||
self.failUnless(isinstance(sh, str))
|
||||
self.failUnlessEqual(len(sh), 7)
|
||||
self.failUnlessEqual(len(share_ids), 10)
|
||||
self.failUnlessEqual(required_shares, 3)
|
||||
self.failUnlessEqual(total_shares, 10)
|
||||
self.failUnlessEqual(segsize, 21)
|
||||
self.failUnlessEqual(data_length, len(CONTENTS))
|
||||
self.failUnlessEqual(len(IV), 16)
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def test_generate(self):
|
||||
c = MyClient()
|
||||
fn = FakeFilenode(c)
|
||||
# .create usually returns a Deferred, but we happen to know it's
|
||||
# synchronous
|
||||
CONTENTS = "some initial contents"
|
||||
fn.create(CONTENTS)
|
||||
p = mutable.Publish(fn)
|
||||
# make some fake shares
|
||||
shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
|
||||
d = defer.maybeDeferred(p._generate_shares,
|
||||
(shares_and_ids,
|
||||
3, 10,
|
||||
21, # segsize
|
||||
len(CONTENTS),
|
||||
"IV"*8),
|
||||
3, # seqnum
|
||||
FakePrivKey(), "encprivkey", FakePubKey(),
|
||||
)
|
||||
def _done( (seqnum, root_hash, final_shares) ):
|
||||
self.failUnlessEqual(seqnum, 3)
|
||||
self.failUnlessEqual(len(root_hash), 32)
|
||||
self.failUnless(isinstance(final_shares, dict))
|
||||
self.failUnlessEqual(len(final_shares), 10)
|
||||
self.failUnlessEqual(sorted(final_shares.keys()), range(10))
|
||||
for i,sh in final_shares.items():
|
||||
self.failUnless(isinstance(sh, str))
|
||||
self.failUnlessEqual(len(sh), 359)
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
class FakePubKey:
|
||||
def serialize(self):
|
||||
return "PUBKEY"
|
||||
class FakePrivKey:
|
||||
def sign(self, data):
|
||||
return "SIGN(%s)" % data
|
||||
|
||||
class Dirnode(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.client = MyClient()
|
||||
|
@ -125,5 +125,7 @@ def ssk_pubkey_fingerprint_hash(pubkey):
|
||||
|
||||
def ssk_readkey_hash(writekey):
|
||||
return tagged_hash("allmydata_mutable_readkey_v1", writekey)
|
||||
def ssk_readkey_data_hash(IV, readkey):
|
||||
return tagged_pair_hash("allmydata_mutable_readkey_data_v1", IV, readkey)
|
||||
def ssk_storage_index_hash(readkey):
|
||||
return tagged_hash("allmydata_mutable_storage_index_v1", readkey)
|
||||
|
Loading…
Reference in New Issue
Block a user