2007-01-05 01:06:20 +00:00
|
|
|
# -*- test-case-name: allmydata.test.test_encode -*-
|
2006-12-14 03:32:35 +00:00
|
|
|
|
2007-03-28 18:24:53 +00:00
|
|
|
from zope.interface import implements
|
2006-12-14 03:32:35 +00:00
|
|
|
from twisted.internet import defer
|
2007-03-31 01:01:37 +00:00
|
|
|
from twisted.python import log
|
2006-12-14 11:31:17 +00:00
|
|
|
from allmydata.chunk import HashTree, roundup_pow2
|
2007-01-06 04:12:26 +00:00
|
|
|
from allmydata.Crypto.Cipher import AES
|
2007-03-30 18:32:13 +00:00
|
|
|
from allmydata.util import mathutil, hashutil
|
2007-03-30 23:50:50 +00:00
|
|
|
from allmydata.util.assertutil import _assert
|
2007-02-01 23:07:00 +00:00
|
|
|
from allmydata.codec import CRSEncoder
|
2007-03-28 18:24:53 +00:00
|
|
|
from allmydata.interfaces import IEncoder
|
2006-12-14 03:32:35 +00:00
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
The goal of the encoder is to turn the original file into a series of
|
|
|
|
'shares'. Each share is going to a 'shareholder' (nominally each shareholder
|
|
|
|
is a different host, but for small meshes there may be overlap). The number
|
|
|
|
of shares is chosen to hit our reliability goals (more shares on more
|
|
|
|
machines means more reliability), and is limited by overhead (proportional to
|
|
|
|
numshares or log(numshares)) and the encoding technology in use (Reed-Solomon
|
|
|
|
only permits 256 shares total). It is also constrained by the amount of data
|
|
|
|
we want to send to each host. For estimating purposes, think of 100 shares
|
|
|
|
out of which we need 25 to reconstruct the file.
|
|
|
|
|
|
|
|
The encoder starts by cutting the original file into segments. All segments
|
|
|
|
except the last are of equal size. The segment size is chosen to constrain
|
|
|
|
the memory footprint (which will probably vary between 1x and 4x segment
|
|
|
|
size) and to constrain the overhead (which will be proportional to either the
|
|
|
|
number of segments or log(number of segments)).
|
|
|
|
|
|
|
|
|
|
|
|
Each segment (A,B,C) is read into memory, encrypted, and encoded into
|
2007-03-30 03:19:52 +00:00
|
|
|
blocks. The 'share' (say, share #1) that makes it out to a host is a
|
|
|
|
collection of these blocks (block A1, B1, C1), plus some hash-tree
|
2006-12-14 03:32:35 +00:00
|
|
|
information necessary to validate the data upon retrieval. Only one segment
|
2007-03-30 03:19:52 +00:00
|
|
|
is handled at a time: all blocks for segment A are delivered before any
|
2006-12-14 03:32:35 +00:00
|
|
|
work is begun on segment B.
|
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
As blocks are created, we retain the hash of each one. The list of
|
|
|
|
block hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is
|
2006-12-14 03:32:35 +00:00
|
|
|
used to form the base of a Merkle hash tree for that share (hashtrees[1]).
|
2007-03-30 03:19:52 +00:00
|
|
|
This hash tree has one terminal leaf per block. The complete block hash
|
2006-12-14 03:32:35 +00:00
|
|
|
tree is sent to the shareholder after all the data has been sent. At
|
|
|
|
retrieval time, the decoder will ask for specific pieces of this tree before
|
2007-03-30 03:19:52 +00:00
|
|
|
asking for blocks, whichever it needs to validate those blocks.
|
2006-12-14 03:32:35 +00:00
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
(Note: we don't really need to generate this whole block hash tree
|
2006-12-14 03:32:35 +00:00
|
|
|
ourselves. It would be sufficient to have the shareholder generate it and
|
|
|
|
just tell us the root. This gives us an extra level of validation on the
|
2006-12-29 19:40:10 +00:00
|
|
|
transfer, though, and it is relatively cheap to compute.)
|
2006-12-14 03:32:35 +00:00
|
|
|
|
2007-03-30 03:19:52 +00:00
|
|
|
Each of these block hash trees has a root hash. The collection of these
|
2006-12-14 03:32:35 +00:00
|
|
|
root hashes for all shares are collected into the 'share hash tree', which
|
2007-03-30 03:19:52 +00:00
|
|
|
has one terminal leaf per share. After sending the blocks and the complete
|
|
|
|
block hash tree to each shareholder, we send them the portion of the share
|
2006-12-14 03:32:35 +00:00
|
|
|
hash tree that is necessary to validate their share. The root of the share
|
|
|
|
hash tree is put into the URI.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
2007-01-05 00:58:14 +00:00
|
|
|
def pad(s, l, c='\x00'):
|
|
|
|
"""
|
|
|
|
Return string s with enough chars c appended to it to make its length be
|
|
|
|
an even multiple of l bytes.
|
2006-12-14 03:32:35 +00:00
|
|
|
|
2007-01-05 00:58:14 +00:00
|
|
|
@param s the original string
|
|
|
|
@param l the length of the resulting padded string in bytes
|
|
|
|
@param c the pad char
|
|
|
|
"""
|
|
|
|
return s + c * mathutil.pad_size(len(s), l)
|
2006-12-14 03:32:35 +00:00
|
|
|
|
2007-01-05 06:51:35 +00:00
|
|
|
KiB=1024
|
|
|
|
MiB=1024*KiB
|
|
|
|
GiB=1024*MiB
|
|
|
|
TiB=1024*GiB
|
|
|
|
PiB=1024*TiB
|
|
|
|
|
2006-12-14 03:32:35 +00:00
|
|
|
class Encoder(object):
|
2007-03-28 18:24:53 +00:00
|
|
|
implements(IEncoder)
|
2007-03-30 18:53:03 +00:00
|
|
|
NEEDED_SHARES = 25
|
|
|
|
TOTAL_SHARES = 100
|
2006-12-14 03:32:35 +00:00
|
|
|
|
|
|
|
def setup(self, infile):
|
|
|
|
self.infile = infile
|
|
|
|
infile.seek(0, 2)
|
|
|
|
self.file_size = infile.tell()
|
|
|
|
infile.seek(0, 0)
|
|
|
|
|
2007-03-30 18:53:03 +00:00
|
|
|
self.num_shares = self.TOTAL_SHARES
|
|
|
|
self.required_shares = self.NEEDED_SHARES
|
2006-12-29 20:50:53 +00:00
|
|
|
|
2007-01-05 06:51:35 +00:00
|
|
|
self.segment_size = min(2*MiB, self.file_size)
|
2007-03-30 23:50:50 +00:00
|
|
|
# this must be a multiple of self.required_shares
|
|
|
|
self.segment_size = mathutil.next_multiple(self.segment_size,
|
|
|
|
self.required_shares)
|
2007-03-30 18:53:03 +00:00
|
|
|
self.setup_codec()
|
2006-12-29 20:50:53 +00:00
|
|
|
|
2007-03-30 18:53:03 +00:00
|
|
|
def setup_codec(self):
|
2007-03-30 23:50:50 +00:00
|
|
|
assert self.segment_size % self.required_shares == 0
|
2007-03-30 18:53:03 +00:00
|
|
|
self._codec = CRSEncoder()
|
2007-03-30 23:50:50 +00:00
|
|
|
self._codec.set_params(self.segment_size,
|
|
|
|
self.required_shares, self.num_shares)
|
|
|
|
|
|
|
|
# the "tail" is the last segment. This segment may or may not be
|
|
|
|
# shorter than all other segments. We use the "tail codec" to handle
|
|
|
|
# it. If the tail is short, we use a different codec instance. In
|
|
|
|
# addition, the tail codec must be fed data which has been padded out
|
|
|
|
# to the right size.
|
|
|
|
self.tail_size = self.file_size % self.segment_size
|
|
|
|
if not self.tail_size:
|
|
|
|
self.tail_size = self.segment_size
|
|
|
|
|
|
|
|
# the tail codec is responsible for encoding tail_size bytes
|
|
|
|
padded_tail_size = mathutil.next_multiple(self.tail_size,
|
|
|
|
self.required_shares)
|
|
|
|
self._tail_codec = CRSEncoder()
|
|
|
|
self._tail_codec.set_params(padded_tail_size,
|
|
|
|
self.required_shares, self.num_shares)
|
2007-03-30 18:53:03 +00:00
|
|
|
|
|
|
|
def get_share_size(self):
|
2007-03-30 18:32:57 +00:00
|
|
|
share_size = mathutil.div_ceil(self.file_size, self.required_shares)
|
2006-12-14 03:32:35 +00:00
|
|
|
overhead = self.compute_overhead()
|
2007-03-30 18:32:57 +00:00
|
|
|
return share_size + overhead
|
|
|
|
def compute_overhead(self):
|
|
|
|
return 0
|
2007-03-30 18:53:03 +00:00
|
|
|
def get_block_size(self):
|
|
|
|
return self._codec.get_block_size()
|
2006-12-14 03:32:35 +00:00
|
|
|
|
2007-03-28 20:30:17 +00:00
|
|
|
def set_shareholders(self, landlords):
|
2007-03-30 23:50:50 +00:00
|
|
|
assert isinstance(landlords, dict)
|
|
|
|
for k in landlords:
|
|
|
|
# it would be nice to:
|
|
|
|
#assert RIBucketWriter.providedBy(landlords[k])
|
|
|
|
pass
|
2007-03-28 20:30:17 +00:00
|
|
|
self.landlords = landlords.copy()
|
2006-12-14 03:32:35 +00:00
|
|
|
|
|
|
|
def start(self):
|
2007-03-30 18:53:03 +00:00
|
|
|
#paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
|
2007-03-30 18:32:57 +00:00
|
|
|
self.num_segments = mathutil.div_ceil(self.file_size,
|
|
|
|
self.segment_size)
|
|
|
|
self.share_size = mathutil.div_ceil(self.file_size,
|
|
|
|
self.required_shares)
|
2006-12-14 03:32:35 +00:00
|
|
|
self.setup_encryption()
|
2007-03-30 18:53:03 +00:00
|
|
|
self.setup_codec()
|
2006-12-14 03:32:35 +00:00
|
|
|
d = defer.succeed(None)
|
2007-03-30 23:50:50 +00:00
|
|
|
|
|
|
|
for i in range(self.num_segments-1):
|
2006-12-14 03:32:35 +00:00
|
|
|
d.addCallback(lambda res: self.do_segment(i))
|
2007-03-30 23:50:50 +00:00
|
|
|
d.addCallback(lambda res: self.do_tail_segment(self.num_segments-1))
|
|
|
|
|
2006-12-14 03:32:35 +00:00
|
|
|
d.addCallback(lambda res: self.send_all_subshare_hash_trees())
|
|
|
|
d.addCallback(lambda res: self.send_all_share_hash_trees())
|
|
|
|
d.addCallback(lambda res: self.close_all_shareholders())
|
|
|
|
d.addCallback(lambda res: self.done())
|
|
|
|
return d
|
|
|
|
|
2007-03-28 20:30:17 +00:00
|
|
|
def setup_encryption(self):
|
|
|
|
self.key = "\x00"*16
|
|
|
|
self.cryptor = AES.new(key=self.key, mode=AES.MODE_CTR,
|
|
|
|
counterstart="\x00"*16)
|
|
|
|
self.segment_num = 0
|
|
|
|
self.subshare_hashes = [[] for x in range(self.num_shares)]
|
|
|
|
# subshare_hashes[i] is a list that will be accumulated and then send
|
|
|
|
# to landlord[i]. This list contains a hash of each segment_share
|
|
|
|
# that we sent to that landlord.
|
|
|
|
self.share_root_hashes = [None] * self.num_shares
|
|
|
|
|
2006-12-14 03:32:35 +00:00
|
|
|
def do_segment(self, segnum):
|
2007-02-01 23:07:00 +00:00
|
|
|
chunks = []
|
2007-03-30 23:50:50 +00:00
|
|
|
codec = self._codec
|
2007-03-28 18:06:19 +00:00
|
|
|
# the ICodecEncoder API wants to receive a total of self.segment_size
|
|
|
|
# bytes on each encode() call, broken up into a number of
|
|
|
|
# identically-sized pieces. Due to the way the codec algorithm works,
|
|
|
|
# these pieces need to be the same size as the share which the codec
|
|
|
|
# will generate. Therefore we must feed it with input_piece_size that
|
|
|
|
# equals the output share size.
|
2007-03-30 23:50:50 +00:00
|
|
|
input_piece_size = codec.get_block_size()
|
2007-03-28 18:06:19 +00:00
|
|
|
|
|
|
|
# as a result, the number of input pieces per encode() call will be
|
|
|
|
# equal to the number of required shares with which the codec was
|
|
|
|
# constructed. You can think of the codec as chopping up a
|
|
|
|
# 'segment_size' of data into 'required_shares' shares (not doing any
|
|
|
|
# fancy math at all, just doing a split), then creating some number
|
|
|
|
# of additional shares which can be substituted if the primary ones
|
|
|
|
# are unavailable
|
|
|
|
|
2007-03-30 23:50:50 +00:00
|
|
|
for i in range(self.required_shares):
|
|
|
|
input_piece = self.infile.read(input_piece_size)
|
|
|
|
# non-tail segments should be the full segment size
|
|
|
|
assert len(input_piece) == input_piece_size
|
|
|
|
encrypted_piece = self.cryptor.encrypt(input_piece)
|
|
|
|
chunks.append(encrypted_piece)
|
|
|
|
d = codec.encode(chunks)
|
|
|
|
d.addCallback(self._encoded_segment, segnum)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def do_tail_segment(self, segnum):
|
|
|
|
chunks = []
|
|
|
|
codec = self._tail_codec
|
|
|
|
input_piece_size = codec.get_block_size()
|
|
|
|
|
2007-02-01 23:07:00 +00:00
|
|
|
for i in range(self.required_shares):
|
2007-03-28 18:06:19 +00:00
|
|
|
input_piece = self.infile.read(input_piece_size)
|
|
|
|
if len(input_piece) < input_piece_size:
|
2007-02-01 23:07:00 +00:00
|
|
|
# padding
|
2007-03-28 18:06:19 +00:00
|
|
|
input_piece += ('\x00' * (input_piece_size - len(input_piece)))
|
|
|
|
encrypted_piece = self.cryptor.encrypt(input_piece)
|
|
|
|
chunks.append(encrypted_piece)
|
2007-03-30 23:50:50 +00:00
|
|
|
d = codec.encode(chunks)
|
|
|
|
d.addCallback(self._encoded_segment, segnum)
|
2007-01-05 06:51:35 +00:00
|
|
|
return d
|
|
|
|
|
2007-03-30 23:50:50 +00:00
|
|
|
def _encoded_segment(self, (shares, shareids), segnum):
|
|
|
|
_assert(set(shareids) == set(self.landlords.keys()),
|
|
|
|
shareids=shareids, landlords=self.landlords)
|
2006-12-14 03:32:35 +00:00
|
|
|
dl = []
|
2007-03-28 03:14:45 +00:00
|
|
|
for i in range(len(shares)):
|
|
|
|
subshare = shares[i]
|
|
|
|
shareid = shareids[i]
|
2007-03-30 23:50:50 +00:00
|
|
|
d = self.send_subshare(shareid, segnum, subshare)
|
2006-12-14 03:32:35 +00:00
|
|
|
dl.append(d)
|
2007-03-30 18:32:13 +00:00
|
|
|
subshare_hash = hashutil.tagged_hash("encoded subshare", subshare)
|
|
|
|
self.subshare_hashes[shareid].append(subshare_hash)
|
2007-03-31 01:01:37 +00:00
|
|
|
dl = defer.DeferredList(dl)
|
2007-04-02 17:23:24 +00:00
|
|
|
def _logit(res):
|
|
|
|
log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments))
|
|
|
|
return res
|
|
|
|
dl.addCallback(_logit)
|
2007-03-31 01:01:37 +00:00
|
|
|
return dl
|
2006-12-14 03:32:35 +00:00
|
|
|
|
2007-02-01 23:07:00 +00:00
|
|
|
def send_subshare(self, shareid, segment_num, subshare):
|
2007-04-06 05:36:18 +00:00
|
|
|
sh = self.landlords[shareid]
|
|
|
|
return sh.callRemote("put_block", segment_num, subshare)
|
2006-12-14 03:32:35 +00:00
|
|
|
|
|
|
|
def send_all_subshare_hash_trees(self):
|
|
|
|
dl = []
|
2007-02-01 23:07:00 +00:00
|
|
|
for shareid,hashes in enumerate(self.subshare_hashes):
|
2006-12-14 03:32:35 +00:00
|
|
|
# hashes is a list of the hashes of all subshares that were sent
|
2007-02-01 23:07:00 +00:00
|
|
|
# to shareholder[shareid].
|
|
|
|
dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
|
2006-12-14 03:32:35 +00:00
|
|
|
return defer.DeferredList(dl)
|
|
|
|
|
2007-02-01 23:07:00 +00:00
|
|
|
def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
|
2006-12-14 03:32:35 +00:00
|
|
|
t = HashTree(subshare_hashes)
|
|
|
|
all_hashes = list(t)
|
|
|
|
# all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
|
|
|
|
# all_hashes[1] is the left child, == hash(ah[3]+ah[4])
|
|
|
|
# all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
|
2007-02-01 23:07:00 +00:00
|
|
|
self.share_root_hashes[shareid] = t[0]
|
2007-04-06 05:36:18 +00:00
|
|
|
sh = self.landlords[shareid]
|
|
|
|
return sh.callRemote("put_block_hashes", all_hashes)
|
2006-12-14 03:32:35 +00:00
|
|
|
|
|
|
|
def send_all_share_hash_trees(self):
|
|
|
|
dl = []
|
|
|
|
for h in self.share_root_hashes:
|
|
|
|
assert h
|
|
|
|
# create the share hash tree
|
|
|
|
t = HashTree(self.share_root_hashes)
|
|
|
|
# the root of this hash tree goes into our URI
|
|
|
|
self.root_hash = t[0]
|
|
|
|
# now send just the necessary pieces out to each shareholder
|
|
|
|
for i in range(self.num_shares):
|
2006-12-14 11:31:17 +00:00
|
|
|
# the HashTree is given a list of leaves: 0,1,2,3..n .
|
|
|
|
# These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
|
|
|
|
tree_width = roundup_pow2(self.num_shares)
|
|
|
|
base_index = i + tree_width - 1
|
|
|
|
needed_hash_indices = t.needed_for(base_index)
|
|
|
|
hashes = [(hi, t[hi]) for hi in needed_hash_indices]
|
|
|
|
dl.append(self.send_one_share_hash_tree(i, hashes))
|
2006-12-14 03:32:35 +00:00
|
|
|
return defer.DeferredList(dl)
|
|
|
|
|
2007-02-01 23:07:00 +00:00
|
|
|
def send_one_share_hash_tree(self, shareid, needed_hashes):
|
2007-04-06 05:36:18 +00:00
|
|
|
sh = self.landlords[shareid]
|
|
|
|
return sh.callRemote("put_share_hashes", needed_hashes)
|
2006-12-14 03:32:35 +00:00
|
|
|
|
|
|
|
def close_all_shareholders(self):
|
|
|
|
dl = []
|
2007-02-01 23:07:00 +00:00
|
|
|
for shareid in range(self.num_shares):
|
2007-04-06 05:36:18 +00:00
|
|
|
dl.append(self.landlords[shareid].callRemote("close"))
|
2006-12-14 03:32:35 +00:00
|
|
|
return defer.DeferredList(dl)
|
|
|
|
|
|
|
|
def done(self):
|
|
|
|
return self.root_hash
|