2007-01-04 18:06:20 -07:00
|
|
|
# -*- test-case-name: allmydata.test.test_encode -*-
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-03-28 11:24:53 -07:00
|
|
|
from zope.interface import implements
|
2006-12-13 20:32:35 -07:00
|
|
|
from twisted.internet import defer
|
2007-03-30 18:01:37 -07:00
|
|
|
from twisted.python import log
|
2007-08-09 18:26:17 -07:00
|
|
|
from foolscap import eventual
|
2007-06-11 18:25:18 -07:00
|
|
|
from allmydata import uri
|
2007-06-07 21:47:21 -07:00
|
|
|
from allmydata.hashtree import HashTree
|
2007-08-09 18:26:56 -07:00
|
|
|
from allmydata.util import mathutil, hashutil, idlib
|
2007-03-30 16:50:50 -07:00
|
|
|
from allmydata.util.assertutil import _assert
|
2007-02-01 16:07:00 -07:00
|
|
|
from allmydata.codec import CRSEncoder
|
2007-07-23 19:31:53 -07:00
|
|
|
from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
|
|
|
|
IEncryptedUploadable
|
2006-12-13 20:32:35 -07:00
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
The goal of the encoder is to turn the original file into a series of
|
|
|
|
'shares'. Each share is going to a 'shareholder' (nominally each shareholder
|
2007-04-30 13:06:09 -07:00
|
|
|
is a different host, but for small grids there may be overlap). The number
|
2006-12-13 20:32:35 -07:00
|
|
|
of shares is chosen to hit our reliability goals (more shares on more
|
|
|
|
machines means more reliability), and is limited by overhead (proportional to
|
|
|
|
numshares or log(numshares)) and the encoding technology in use (Reed-Solomon
|
|
|
|
only permits 256 shares total). It is also constrained by the amount of data
|
|
|
|
we want to send to each host. For estimating purposes, think of 100 shares
|
|
|
|
out of which we need 25 to reconstruct the file.
|
|
|
|
|
|
|
|
The encoder starts by cutting the original file into segments. All segments
|
|
|
|
except the last are of equal size. The segment size is chosen to constrain
|
|
|
|
the memory footprint (which will probably vary between 1x and 4x segment
|
|
|
|
size) and to constrain the overhead (which will be proportional to either the
|
|
|
|
number of segments or log(number of segments)).
|
|
|
|
|
|
|
|
|
|
|
|
Each segment (A,B,C) is read into memory, encrypted, and encoded into
|
2007-03-29 20:19:52 -07:00
|
|
|
blocks. The 'share' (say, share #1) that makes it out to a host is a
|
|
|
|
collection of these blocks (block A1, B1, C1), plus some hash-tree
|
2006-12-13 20:32:35 -07:00
|
|
|
information necessary to validate the data upon retrieval. Only one segment
|
2007-03-29 20:19:52 -07:00
|
|
|
is handled at a time: all blocks for segment A are delivered before any
|
2006-12-13 20:32:35 -07:00
|
|
|
work is begun on segment B.
|
|
|
|
|
2007-03-29 20:19:52 -07:00
|
|
|
As blocks are created, we retain the hash of each one. The list of
|
|
|
|
block hashes for a single share (say, hash(A1), hash(B1), hash(C1)) is
|
2006-12-13 20:32:35 -07:00
|
|
|
used to form the base of a Merkle hash tree for that share (hashtrees[1]).
|
2007-03-29 20:19:52 -07:00
|
|
|
This hash tree has one terminal leaf per block. The complete block hash
|
2006-12-13 20:32:35 -07:00
|
|
|
tree is sent to the shareholder after all the data has been sent. At
|
|
|
|
retrieval time, the decoder will ask for specific pieces of this tree before
|
2007-03-29 20:19:52 -07:00
|
|
|
asking for blocks, whichever it needs to validate those blocks.
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-03-29 20:19:52 -07:00
|
|
|
(Note: we don't really need to generate this whole block hash tree
|
2006-12-13 20:32:35 -07:00
|
|
|
ourselves. It would be sufficient to have the shareholder generate it and
|
|
|
|
just tell us the root. This gives us an extra level of validation on the
|
2006-12-29 12:40:10 -07:00
|
|
|
transfer, though, and it is relatively cheap to compute.)
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-03-29 20:19:52 -07:00
|
|
|
Each of these block hash trees has a root hash. The collection of these
|
2006-12-13 20:32:35 -07:00
|
|
|
root hashes for all shares are collected into the 'share hash tree', which
|
2007-03-29 20:19:52 -07:00
|
|
|
has one terminal leaf per share. After sending the blocks and the complete
|
|
|
|
block hash tree to each shareholder, we send them the portion of the share
|
2006-12-13 20:32:35 -07:00
|
|
|
hash tree that is necessary to validate their share. The root of the share
|
|
|
|
hash tree is put into the URI.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
2007-06-06 10:32:40 -07:00
|
|
|
class NotEnoughPeersError(Exception):
|
|
|
|
pass
|
|
|
|
|
2007-01-04 23:51:35 -07:00
|
|
|
KiB=1024
|
|
|
|
MiB=1024*KiB
|
|
|
|
GiB=1024*MiB
|
|
|
|
TiB=1024*GiB
|
|
|
|
PiB=1024*TiB
|
|
|
|
|
2006-12-13 20:32:35 -07:00
|
|
|
class Encoder(object):
|
2007-03-28 11:24:53 -07:00
|
|
|
implements(IEncoder)
|
2007-03-30 11:53:03 -07:00
|
|
|
NEEDED_SHARES = 25
|
2007-06-06 10:32:40 -07:00
|
|
|
SHARES_OF_HAPPINESS = 75
|
2007-03-30 11:53:03 -07:00
|
|
|
TOTAL_SHARES = 100
|
2007-07-16 13:48:34 -07:00
|
|
|
MAX_SEGMENT_SIZE = 1*MiB
|
2007-04-16 19:29:57 -07:00
|
|
|
|
|
|
|
def __init__(self, options={}):
|
|
|
|
object.__init__(self)
|
|
|
|
self.MAX_SEGMENT_SIZE = options.get("max_segment_size",
|
|
|
|
self.MAX_SEGMENT_SIZE)
|
2007-06-06 10:32:40 -07:00
|
|
|
k,happy,n = options.get("needed_and_happy_and_total_shares",
|
|
|
|
(self.NEEDED_SHARES,
|
|
|
|
self.SHARES_OF_HAPPINESS,
|
|
|
|
self.TOTAL_SHARES))
|
2007-04-19 10:56:15 -07:00
|
|
|
self.NEEDED_SHARES = k
|
2007-06-06 10:32:40 -07:00
|
|
|
self.SHARES_OF_HAPPINESS = happy
|
2007-04-19 10:56:15 -07:00
|
|
|
self.TOTAL_SHARES = n
|
2007-06-08 15:59:16 -07:00
|
|
|
self.uri_extension_data = {}
|
2007-07-23 19:31:53 -07:00
|
|
|
self._codec = None
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-08-09 18:26:56 -07:00
|
|
|
def __repr__(self):
|
|
|
|
if hasattr(self, "_storage_index"):
|
|
|
|
return "<Encoder for %s>" % idlib.b2a(self._storage_index)[:6]
|
|
|
|
return "<Encoder for unknown storage index>"
|
|
|
|
|
2007-07-19 18:21:44 -07:00
|
|
|
def set_size(self, size):
|
2007-07-23 19:31:53 -07:00
|
|
|
assert not self._codec
|
2007-07-19 18:21:44 -07:00
|
|
|
self.file_size = size
|
|
|
|
|
2007-07-12 15:33:30 -07:00
|
|
|
def set_params(self, encoding_parameters):
|
2007-07-23 19:31:53 -07:00
|
|
|
assert not self._codec
|
2007-07-12 15:33:30 -07:00
|
|
|
k,d,n = encoding_parameters
|
|
|
|
self.NEEDED_SHARES = k
|
|
|
|
self.SHARES_OF_HAPPINESS = d
|
|
|
|
self.TOTAL_SHARES = n
|
|
|
|
|
2007-07-23 19:31:53 -07:00
|
|
|
def _setup_codec(self):
|
2007-03-30 11:53:03 -07:00
|
|
|
self.num_shares = self.TOTAL_SHARES
|
|
|
|
self.required_shares = self.NEEDED_SHARES
|
2007-06-06 10:32:40 -07:00
|
|
|
self.shares_of_happiness = self.SHARES_OF_HAPPINESS
|
2006-12-29 13:50:53 -07:00
|
|
|
|
2007-04-16 19:29:57 -07:00
|
|
|
self.segment_size = min(self.MAX_SEGMENT_SIZE, self.file_size)
|
2007-03-30 16:50:50 -07:00
|
|
|
# this must be a multiple of self.required_shares
|
|
|
|
self.segment_size = mathutil.next_multiple(self.segment_size,
|
|
|
|
self.required_shares)
|
2006-12-29 13:50:53 -07:00
|
|
|
|
2007-07-23 19:31:53 -07:00
|
|
|
# now set up the codec
|
|
|
|
|
2007-03-30 16:50:50 -07:00
|
|
|
assert self.segment_size % self.required_shares == 0
|
2007-07-19 18:21:44 -07:00
|
|
|
self.num_segments = mathutil.div_ceil(self.file_size,
|
|
|
|
self.segment_size)
|
|
|
|
|
2007-03-30 11:53:03 -07:00
|
|
|
self._codec = CRSEncoder()
|
2007-03-30 16:50:50 -07:00
|
|
|
self._codec.set_params(self.segment_size,
|
|
|
|
self.required_shares, self.num_shares)
|
|
|
|
|
2007-06-08 15:59:16 -07:00
|
|
|
data = self.uri_extension_data
|
2007-06-01 18:48:01 -07:00
|
|
|
data['codec_name'] = self._codec.get_encoder_type()
|
|
|
|
data['codec_params'] = self._codec.get_serialized_params()
|
|
|
|
|
|
|
|
data['size'] = self.file_size
|
|
|
|
data['segment_size'] = self.segment_size
|
2007-07-19 18:21:44 -07:00
|
|
|
self.share_size = mathutil.div_ceil(self.file_size,
|
|
|
|
self.required_shares)
|
|
|
|
data['num_segments'] = self.num_segments
|
2007-06-01 18:48:01 -07:00
|
|
|
data['needed_shares'] = self.required_shares
|
|
|
|
data['total_shares'] = self.num_shares
|
|
|
|
|
2007-03-30 16:50:50 -07:00
|
|
|
# the "tail" is the last segment. This segment may or may not be
|
|
|
|
# shorter than all other segments. We use the "tail codec" to handle
|
|
|
|
# it. If the tail is short, we use a different codec instance. In
|
|
|
|
# addition, the tail codec must be fed data which has been padded out
|
|
|
|
# to the right size.
|
|
|
|
self.tail_size = self.file_size % self.segment_size
|
|
|
|
if not self.tail_size:
|
|
|
|
self.tail_size = self.segment_size
|
|
|
|
|
|
|
|
# the tail codec is responsible for encoding tail_size bytes
|
|
|
|
padded_tail_size = mathutil.next_multiple(self.tail_size,
|
|
|
|
self.required_shares)
|
|
|
|
self._tail_codec = CRSEncoder()
|
|
|
|
self._tail_codec.set_params(padded_tail_size,
|
|
|
|
self.required_shares, self.num_shares)
|
2007-06-01 18:48:01 -07:00
|
|
|
data['tail_codec_params'] = self._tail_codec.get_serialized_params()
|
|
|
|
|
2007-07-23 19:31:53 -07:00
|
|
|
def _get_share_size(self):
|
2007-03-30 11:32:57 -07:00
|
|
|
share_size = mathutil.div_ceil(self.file_size, self.required_shares)
|
2007-07-23 19:31:53 -07:00
|
|
|
overhead = self._compute_overhead()
|
2007-03-30 11:32:57 -07:00
|
|
|
return share_size + overhead
|
2007-07-23 19:31:53 -07:00
|
|
|
|
|
|
|
def _compute_overhead(self):
|
2007-03-30 11:32:57 -07:00
|
|
|
return 0
|
2007-07-23 19:31:53 -07:00
|
|
|
|
|
|
|
def set_encrypted_uploadable(self, uploadable):
|
|
|
|
u = self._uploadable = IEncryptedUploadable(uploadable)
|
|
|
|
d = u.get_size()
|
|
|
|
d.addCallback(self.set_size)
|
|
|
|
d.addCallback(lambda res: self.get_param("serialized_params"))
|
|
|
|
d.addCallback(u.set_serialized_encoding_parameters)
|
|
|
|
d.addCallback(lambda res: u.get_storage_index())
|
|
|
|
def _done(storage_index):
|
|
|
|
self._storage_index = storage_index
|
|
|
|
return self
|
|
|
|
d.addCallback(_done)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def get_param(self, name):
|
|
|
|
if not self._codec:
|
|
|
|
self._setup_codec()
|
|
|
|
|
|
|
|
if name == "storage_index":
|
|
|
|
return self._storage_index
|
|
|
|
elif name == "share_counts":
|
|
|
|
return (self.required_shares, self.shares_of_happiness,
|
|
|
|
self.num_shares)
|
|
|
|
elif name == "num_segments":
|
|
|
|
return self.num_segments
|
|
|
|
elif name == "segment_size":
|
|
|
|
return self.segment_size
|
|
|
|
elif name == "block_size":
|
|
|
|
return self._codec.get_block_size()
|
|
|
|
elif name == "share_size":
|
|
|
|
return self._get_share_size()
|
|
|
|
elif name == "serialized_params":
|
|
|
|
return self._codec.get_serialized_params()
|
|
|
|
else:
|
|
|
|
raise KeyError("unknown parameter name '%s'" % name)
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-03-28 13:30:17 -07:00
|
|
|
def set_shareholders(self, landlords):
|
2007-03-30 16:50:50 -07:00
|
|
|
assert isinstance(landlords, dict)
|
|
|
|
for k in landlords:
|
2007-07-13 15:09:01 -07:00
|
|
|
assert IStorageBucketWriter.providedBy(landlords[k])
|
2007-03-28 13:30:17 -07:00
|
|
|
self.landlords = landlords.copy()
|
2006-12-13 20:32:35 -07:00
|
|
|
|
|
|
|
def start(self):
|
2007-03-30 11:53:03 -07:00
|
|
|
#paddedsize = self._size + mathutil.pad_size(self._size, self.needed_shares)
|
2007-07-23 19:31:53 -07:00
|
|
|
if not self._codec:
|
|
|
|
self._setup_codec()
|
|
|
|
|
2007-07-19 18:21:44 -07:00
|
|
|
self._crypttext_hasher = hashutil.crypttext_hasher()
|
2007-06-06 19:40:20 -07:00
|
|
|
self._crypttext_hashes = []
|
2007-07-23 19:31:53 -07:00
|
|
|
self.segment_num = 0
|
|
|
|
self.subshare_hashes = [[] for x in range(self.num_shares)]
|
|
|
|
# subshare_hashes[i] is a list that will be accumulated and then send
|
|
|
|
# to landlord[i]. This list contains a hash of each segment_share
|
|
|
|
# that we sent to that landlord.
|
|
|
|
self.share_root_hashes = [None] * self.num_shares
|
|
|
|
|
2007-08-09 18:26:17 -07:00
|
|
|
d = eventual.fireEventually()
|
|
|
|
d.addCallback(lambda res:
|
|
|
|
self._uploadable.set_segment_size(self.segment_size))
|
2007-03-30 16:50:50 -07:00
|
|
|
|
2007-07-13 14:04:49 -07:00
|
|
|
for l in self.landlords.values():
|
|
|
|
d.addCallback(lambda res, l=l: l.start())
|
|
|
|
|
2007-03-30 16:50:50 -07:00
|
|
|
for i in range(self.num_segments-1):
|
2007-04-17 20:29:08 -07:00
|
|
|
# note to self: this form doesn't work, because lambda only
|
|
|
|
# captures the slot, not the value
|
|
|
|
#d.addCallback(lambda res: self.do_segment(i))
|
|
|
|
# use this form instead:
|
2007-07-19 18:21:44 -07:00
|
|
|
d.addCallback(lambda res, i=i: self._encode_segment(i))
|
|
|
|
d.addCallback(self._send_segment, i)
|
2007-08-09 18:26:17 -07:00
|
|
|
d.addCallback(self._turn_barrier)
|
2007-07-19 18:21:44 -07:00
|
|
|
last_segnum = self.num_segments - 1
|
|
|
|
d.addCallback(lambda res: self._encode_tail_segment(last_segnum))
|
|
|
|
d.addCallback(self._send_segment, last_segnum)
|
2007-08-09 18:26:17 -07:00
|
|
|
d.addCallback(self._turn_barrier)
|
2007-07-19 18:21:44 -07:00
|
|
|
|
2007-07-23 19:31:53 -07:00
|
|
|
d.addCallback(lambda res: self.finish_hashing())
|
2007-03-30 16:50:50 -07:00
|
|
|
|
2007-06-06 19:40:20 -07:00
|
|
|
d.addCallback(lambda res:
|
|
|
|
self.send_plaintext_hash_tree_to_all_shareholders())
|
|
|
|
d.addCallback(lambda res:
|
|
|
|
self.send_crypttext_hash_tree_to_all_shareholders())
|
2006-12-13 20:32:35 -07:00
|
|
|
d.addCallback(lambda res: self.send_all_subshare_hash_trees())
|
|
|
|
d.addCallback(lambda res: self.send_all_share_hash_trees())
|
2007-06-08 15:59:16 -07:00
|
|
|
d.addCallback(lambda res: self.send_uri_extension_to_all_shareholders())
|
2007-07-19 18:21:44 -07:00
|
|
|
|
2006-12-13 20:32:35 -07:00
|
|
|
d.addCallback(lambda res: self.close_all_shareholders())
|
2007-04-06 18:04:38 -07:00
|
|
|
d.addCallbacks(lambda res: self.done(), self.err)
|
2006-12-13 20:32:35 -07:00
|
|
|
return d
|
|
|
|
|
2007-08-09 18:26:17 -07:00
|
|
|
def _turn_barrier(self, res):
|
|
|
|
# putting this method in a Deferred chain imposes a guaranteed
|
|
|
|
# reactor turn between the pre- and post- portions of that chain.
|
|
|
|
# This can be useful to limit memory consumption: since Deferreds do
|
|
|
|
# not do tail recursion, code which uses defer.succeed(result) for
|
|
|
|
# consistency will cause objects to live for longer than you might
|
|
|
|
# normally expect.
|
|
|
|
|
|
|
|
return eventual.fireEventually(res)
|
|
|
|
|
2007-07-19 18:21:44 -07:00
|
|
|
def _encode_segment(self, segnum):
|
2007-03-30 16:50:50 -07:00
|
|
|
codec = self._codec
|
2007-07-19 18:21:44 -07:00
|
|
|
|
2007-03-28 11:06:19 -07:00
|
|
|
# the ICodecEncoder API wants to receive a total of self.segment_size
|
|
|
|
# bytes on each encode() call, broken up into a number of
|
|
|
|
# identically-sized pieces. Due to the way the codec algorithm works,
|
|
|
|
# these pieces need to be the same size as the share which the codec
|
|
|
|
# will generate. Therefore we must feed it with input_piece_size that
|
|
|
|
# equals the output share size.
|
2007-03-30 16:50:50 -07:00
|
|
|
input_piece_size = codec.get_block_size()
|
2007-03-28 11:06:19 -07:00
|
|
|
|
|
|
|
# as a result, the number of input pieces per encode() call will be
|
|
|
|
# equal to the number of required shares with which the codec was
|
|
|
|
# constructed. You can think of the codec as chopping up a
|
|
|
|
# 'segment_size' of data into 'required_shares' shares (not doing any
|
|
|
|
# fancy math at all, just doing a split), then creating some number
|
|
|
|
# of additional shares which can be substituted if the primary ones
|
|
|
|
# are unavailable
|
|
|
|
|
2007-07-19 18:21:44 -07:00
|
|
|
crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
|
2007-06-07 00:15:41 -07:00
|
|
|
|
2007-06-07 13:15:58 -07:00
|
|
|
# memory footprint: we only hold a tiny piece of the plaintext at any
|
|
|
|
# given time. We build up a segment's worth of cryptttext, then hand
|
|
|
|
# it to the encoder. Assuming 25-of-100 encoding (4x expansion) and
|
|
|
|
# 2MiB max_segment_size, we get a peak memory footprint of 5*2MiB =
|
|
|
|
# 10MiB. Lowering max_segment_size to, say, 100KiB would drop the
|
|
|
|
# footprint to 500KiB at the expense of more hash-tree overhead.
|
|
|
|
|
2007-07-19 18:21:44 -07:00
|
|
|
d = self._gather_data(self.required_shares, input_piece_size,
|
|
|
|
crypttext_segment_hasher)
|
|
|
|
def _done(chunks):
|
|
|
|
for c in chunks:
|
|
|
|
assert len(c) == input_piece_size
|
|
|
|
self._crypttext_hashes.append(crypttext_segment_hasher.digest())
|
|
|
|
# during this call, we hit 5*segsize memory
|
|
|
|
return codec.encode(chunks)
|
|
|
|
d.addCallback(_done)
|
2007-03-30 16:50:50 -07:00
|
|
|
return d
|
|
|
|
|
2007-07-19 18:21:44 -07:00
|
|
|
def _encode_tail_segment(self, segnum):
|
|
|
|
|
2007-03-30 16:50:50 -07:00
|
|
|
codec = self._tail_codec
|
|
|
|
input_piece_size = codec.get_block_size()
|
|
|
|
|
2007-07-19 18:21:44 -07:00
|
|
|
crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
|
|
|
|
|
|
|
|
d = self._gather_data(self.required_shares, input_piece_size,
|
|
|
|
crypttext_segment_hasher,
|
|
|
|
allow_short=True)
|
|
|
|
def _done(chunks):
|
|
|
|
for c in chunks:
|
|
|
|
# a short trailing chunk will have been padded by
|
|
|
|
# _gather_data
|
|
|
|
assert len(c) == input_piece_size
|
|
|
|
self._crypttext_hashes.append(crypttext_segment_hasher.digest())
|
|
|
|
return codec.encode(chunks)
|
|
|
|
d.addCallback(_done)
|
|
|
|
return d
|
2007-06-07 13:14:14 -07:00
|
|
|
|
2007-07-19 18:21:44 -07:00
|
|
|
def _gather_data(self, num_chunks, input_chunk_size,
|
2007-07-23 19:31:53 -07:00
|
|
|
crypttext_segment_hasher,
|
2007-07-19 18:21:44 -07:00
|
|
|
allow_short=False,
|
|
|
|
previous_chunks=[]):
|
|
|
|
"""Return a Deferred that will fire when the required number of
|
|
|
|
chunks have been read (and hashed and encrypted). The Deferred fires
|
|
|
|
with the combination of any 'previous_chunks' and the new chunks
|
|
|
|
which were gathered."""
|
|
|
|
|
|
|
|
if not num_chunks:
|
|
|
|
return defer.succeed(previous_chunks)
|
|
|
|
|
2007-07-23 19:31:53 -07:00
|
|
|
d = self._uploadable.read_encrypted(input_chunk_size)
|
2007-07-19 18:21:44 -07:00
|
|
|
def _got(data):
|
|
|
|
encrypted_pieces = []
|
|
|
|
length = 0
|
|
|
|
while data:
|
2007-07-23 19:31:53 -07:00
|
|
|
encrypted_piece = data.pop(0)
|
|
|
|
length += len(encrypted_piece)
|
2007-07-19 18:21:44 -07:00
|
|
|
crypttext_segment_hasher.update(encrypted_piece)
|
|
|
|
self._crypttext_hasher.update(encrypted_piece)
|
|
|
|
encrypted_pieces.append(encrypted_piece)
|
|
|
|
|
|
|
|
if allow_short:
|
|
|
|
if length < input_chunk_size:
|
|
|
|
# padding
|
|
|
|
pad_size = input_chunk_size - length
|
|
|
|
encrypted_pieces.append('\x00' * pad_size)
|
|
|
|
else:
|
|
|
|
# non-tail segments should be the full segment size
|
|
|
|
assert length == input_chunk_size
|
|
|
|
|
|
|
|
encrypted_piece = "".join(encrypted_pieces)
|
|
|
|
return previous_chunks + [encrypted_piece]
|
|
|
|
|
|
|
|
d.addCallback(_got)
|
|
|
|
d.addCallback(lambda chunks:
|
|
|
|
self._gather_data(num_chunks-1, input_chunk_size,
|
|
|
|
crypttext_segment_hasher,
|
|
|
|
allow_short, chunks))
|
2007-01-04 23:51:35 -07:00
|
|
|
return d
|
|
|
|
|
2007-07-19 18:21:44 -07:00
|
|
|
def _send_segment(self, (shares, shareids), segnum):
|
2007-04-18 18:29:10 -07:00
|
|
|
# To generate the URI, we must generate the roothash, so we must
|
|
|
|
# generate all shares, even if we aren't actually giving them to
|
2007-07-13 17:00:06 -07:00
|
|
|
# anybody. This means that the set of shares we create will be equal
|
2007-04-18 18:29:10 -07:00
|
|
|
# to or larger than the set of landlords. If we have any landlord who
|
|
|
|
# *doesn't* have a share, that's an error.
|
|
|
|
_assert(set(self.landlords.keys()).issubset(set(shareids)),
|
2007-03-30 16:50:50 -07:00
|
|
|
shareids=shareids, landlords=self.landlords)
|
2006-12-13 20:32:35 -07:00
|
|
|
dl = []
|
2007-03-27 20:14:45 -07:00
|
|
|
for i in range(len(shares)):
|
|
|
|
subshare = shares[i]
|
|
|
|
shareid = shareids[i]
|
2007-03-30 16:50:50 -07:00
|
|
|
d = self.send_subshare(shareid, segnum, subshare)
|
2006-12-13 20:32:35 -07:00
|
|
|
dl.append(d)
|
2007-06-07 21:47:21 -07:00
|
|
|
subshare_hash = hashutil.block_hash(subshare)
|
2007-07-13 16:38:25 -07:00
|
|
|
#from allmydata.util import idlib
|
|
|
|
#log.msg("creating block (shareid=%d, blocknum=%d) "
|
|
|
|
# "len=%d %r .. %r: %s" %
|
|
|
|
# (shareid, segnum, len(subshare),
|
|
|
|
# subshare[:50], subshare[-50:], idlib.b2a(subshare_hash)))
|
2007-03-30 11:32:13 -07:00
|
|
|
self.subshare_hashes[shareid].append(subshare_hash)
|
2007-08-09 18:28:45 -07:00
|
|
|
|
2007-06-06 12:40:16 -07:00
|
|
|
dl = self._gather_responses(dl)
|
2007-04-02 10:23:24 -07:00
|
|
|
def _logit(res):
|
2007-08-09 18:28:45 -07:00
|
|
|
log.msg("%s uploaded %s / %s bytes (%d%%) of your file." %
|
|
|
|
(self,
|
|
|
|
self.segment_size*(segnum+1),
|
|
|
|
self.segment_size*self.num_segments,
|
|
|
|
100 * (segnum+1) / self.num_segments,
|
|
|
|
))
|
2007-04-02 10:23:24 -07:00
|
|
|
return res
|
|
|
|
dl.addCallback(_logit)
|
2007-03-30 18:01:37 -07:00
|
|
|
return dl
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-02-01 16:07:00 -07:00
|
|
|
def send_subshare(self, shareid, segment_num, subshare):
|
2007-04-18 18:29:10 -07:00
|
|
|
if shareid not in self.landlords:
|
|
|
|
return defer.succeed(None)
|
2007-04-05 22:36:18 -07:00
|
|
|
sh = self.landlords[shareid]
|
2007-07-08 23:27:46 -07:00
|
|
|
d = sh.put_block(segment_num, subshare)
|
2007-06-06 10:32:40 -07:00
|
|
|
d.addErrback(self._remove_shareholder, shareid,
|
|
|
|
"segnum=%d" % segment_num)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _remove_shareholder(self, why, shareid, where):
|
|
|
|
log.msg("error while sending %s to shareholder=%d: %s" %
|
|
|
|
(where, shareid, why)) # UNUSUAL
|
2007-06-06 12:40:16 -07:00
|
|
|
if shareid in self.landlords:
|
|
|
|
del self.landlords[shareid]
|
|
|
|
else:
|
|
|
|
# even more UNUSUAL
|
|
|
|
log.msg(" weird, they weren't in our list of landlords")
|
2007-06-06 10:32:40 -07:00
|
|
|
if len(self.landlords) < self.shares_of_happiness:
|
2007-06-07 00:15:41 -07:00
|
|
|
msg = "lost too many shareholders during upload: %s" % why
|
2007-06-06 10:32:40 -07:00
|
|
|
raise NotEnoughPeersError(msg)
|
2007-06-06 12:40:16 -07:00
|
|
|
log.msg("but we can still continue with %s shares, we'll be happy "
|
|
|
|
"with at least %s" % (len(self.landlords),
|
|
|
|
self.shares_of_happiness))
|
|
|
|
|
|
|
|
def _gather_responses(self, dl):
|
|
|
|
d = defer.DeferredList(dl, fireOnOneErrback=True)
|
|
|
|
def _eatNotEnoughPeersError(f):
|
|
|
|
# all exceptions that occur while talking to a peer are handled
|
|
|
|
# in _remove_shareholder. That might raise NotEnoughPeersError,
|
|
|
|
# which will cause the DeferredList to errback but which should
|
|
|
|
# otherwise be consumed. Allow non-NotEnoughPeersError exceptions
|
|
|
|
# to pass through as an unhandled errback. We use this in lieu of
|
|
|
|
# consumeErrors=True to allow coding errors to be logged.
|
|
|
|
f.trap(NotEnoughPeersError)
|
|
|
|
return None
|
|
|
|
for d0 in dl:
|
|
|
|
d0.addErrback(_eatNotEnoughPeersError)
|
|
|
|
return d
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-07-23 19:31:53 -07:00
|
|
|
def finish_hashing(self):
|
2007-07-19 18:21:44 -07:00
|
|
|
crypttext_hash = self._crypttext_hasher.digest()
|
|
|
|
self.uri_extension_data["crypttext_hash"] = crypttext_hash
|
2007-07-23 19:31:53 -07:00
|
|
|
u = self._uploadable
|
|
|
|
d = u.get_plaintext_hash()
|
|
|
|
def _got(plaintext_hash):
|
|
|
|
self.uri_extension_data["plaintext_hash"] = plaintext_hash
|
|
|
|
return u.get_plaintext_segment_hashtree_nodes(self.num_segments)
|
|
|
|
d.addCallback(_got)
|
|
|
|
def _got_hashtree_nodes(t):
|
|
|
|
self.uri_extension_data["plaintext_root_hash"] = t[0]
|
|
|
|
self._plaintext_hashtree_nodes = t
|
|
|
|
d.addCallback(_got_hashtree_nodes)
|
|
|
|
return d
|
2007-07-19 18:21:44 -07:00
|
|
|
|
2007-06-06 19:40:20 -07:00
|
|
|
def send_plaintext_hash_tree_to_all_shareholders(self):
|
|
|
|
log.msg("%s sending plaintext hash tree" % self)
|
|
|
|
dl = []
|
|
|
|
for shareid in self.landlords.keys():
|
2007-07-23 19:31:53 -07:00
|
|
|
d = self.send_plaintext_hash_tree(shareid,
|
|
|
|
self._plaintext_hashtree_nodes)
|
|
|
|
dl.append(d)
|
2007-06-06 19:40:20 -07:00
|
|
|
return self._gather_responses(dl)
|
|
|
|
|
|
|
|
def send_plaintext_hash_tree(self, shareid, all_hashes):
|
|
|
|
if shareid not in self.landlords:
|
|
|
|
return defer.succeed(None)
|
|
|
|
sh = self.landlords[shareid]
|
2007-07-08 23:27:46 -07:00
|
|
|
d = sh.put_plaintext_hashes(all_hashes)
|
2007-06-06 19:40:20 -07:00
|
|
|
d.addErrback(self._remove_shareholder, shareid, "put_plaintext_hashes")
|
|
|
|
return d
|
|
|
|
|
|
|
|
def send_crypttext_hash_tree_to_all_shareholders(self):
|
|
|
|
log.msg("%s sending crypttext hash tree" % self)
|
|
|
|
t = HashTree(self._crypttext_hashes)
|
|
|
|
all_hashes = list(t)
|
2007-06-08 15:59:16 -07:00
|
|
|
self.uri_extension_data["crypttext_root_hash"] = t[0]
|
2007-06-06 19:40:20 -07:00
|
|
|
dl = []
|
|
|
|
for shareid in self.landlords.keys():
|
|
|
|
dl.append(self.send_crypttext_hash_tree(shareid, all_hashes))
|
|
|
|
return self._gather_responses(dl)
|
|
|
|
|
|
|
|
def send_crypttext_hash_tree(self, shareid, all_hashes):
|
|
|
|
if shareid not in self.landlords:
|
|
|
|
return defer.succeed(None)
|
|
|
|
sh = self.landlords[shareid]
|
2007-07-08 23:27:46 -07:00
|
|
|
d = sh.put_crypttext_hashes(all_hashes)
|
2007-06-06 19:40:20 -07:00
|
|
|
d.addErrback(self._remove_shareholder, shareid, "put_crypttext_hashes")
|
|
|
|
return d
|
|
|
|
|
2006-12-13 20:32:35 -07:00
|
|
|
def send_all_subshare_hash_trees(self):
|
2007-04-06 18:04:38 -07:00
|
|
|
log.msg("%s sending subshare hash trees" % self)
|
2006-12-13 20:32:35 -07:00
|
|
|
dl = []
|
2007-02-01 16:07:00 -07:00
|
|
|
for shareid,hashes in enumerate(self.subshare_hashes):
|
2006-12-13 20:32:35 -07:00
|
|
|
# hashes is a list of the hashes of all subshares that were sent
|
2007-02-01 16:07:00 -07:00
|
|
|
# to shareholder[shareid].
|
|
|
|
dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
|
2007-06-06 12:40:16 -07:00
|
|
|
return self._gather_responses(dl)
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-02-01 16:07:00 -07:00
|
|
|
def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
|
2006-12-13 20:32:35 -07:00
|
|
|
t = HashTree(subshare_hashes)
|
|
|
|
all_hashes = list(t)
|
|
|
|
# all_hashes[0] is the root hash, == hash(ah[1]+ah[2])
|
|
|
|
# all_hashes[1] is the left child, == hash(ah[3]+ah[4])
|
|
|
|
# all_hashes[n] == hash(all_hashes[2*n+1] + all_hashes[2*n+2])
|
2007-02-01 16:07:00 -07:00
|
|
|
self.share_root_hashes[shareid] = t[0]
|
2007-04-18 18:29:10 -07:00
|
|
|
if shareid not in self.landlords:
|
|
|
|
return defer.succeed(None)
|
2007-04-05 22:36:18 -07:00
|
|
|
sh = self.landlords[shareid]
|
2007-07-08 23:27:46 -07:00
|
|
|
d = sh.put_block_hashes(all_hashes)
|
2007-06-06 10:32:40 -07:00
|
|
|
d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
|
|
|
|
return d
|
2006-12-13 20:32:35 -07:00
|
|
|
|
|
|
|
def send_all_share_hash_trees(self):
|
2007-04-12 19:41:48 -07:00
|
|
|
# each bucket gets a set of share hash tree nodes that are needed to
|
|
|
|
# validate their share. This includes the share hash itself, but does
|
|
|
|
# not include the top-level hash root (which is stored securely in
|
|
|
|
# the URI instead).
|
2007-04-06 18:04:38 -07:00
|
|
|
log.msg("%s sending all share hash trees" % self)
|
2006-12-13 20:32:35 -07:00
|
|
|
dl = []
|
|
|
|
for h in self.share_root_hashes:
|
|
|
|
assert h
|
|
|
|
# create the share hash tree
|
|
|
|
t = HashTree(self.share_root_hashes)
|
|
|
|
# the root of this hash tree goes into our URI
|
2007-06-08 15:59:16 -07:00
|
|
|
self.uri_extension_data['share_root_hash'] = t[0]
|
2006-12-13 20:32:35 -07:00
|
|
|
# now send just the necessary pieces out to each shareholder
|
|
|
|
for i in range(self.num_shares):
|
2006-12-14 04:31:17 -07:00
|
|
|
# the HashTree is given a list of leaves: 0,1,2,3..n .
|
|
|
|
# These become nodes A+0,A+1,A+2.. of the tree, where A=n-1
|
2007-04-12 19:41:48 -07:00
|
|
|
needed_hash_indices = t.needed_hashes(i, include_leaf=True)
|
2006-12-14 04:31:17 -07:00
|
|
|
hashes = [(hi, t[hi]) for hi in needed_hash_indices]
|
|
|
|
dl.append(self.send_one_share_hash_tree(i, hashes))
|
2007-06-06 12:40:16 -07:00
|
|
|
return self._gather_responses(dl)
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-02-01 16:07:00 -07:00
|
|
|
def send_one_share_hash_tree(self, shareid, needed_hashes):
|
2007-04-18 18:29:10 -07:00
|
|
|
if shareid not in self.landlords:
|
|
|
|
return defer.succeed(None)
|
2007-04-05 22:36:18 -07:00
|
|
|
sh = self.landlords[shareid]
|
2007-07-08 23:27:46 -07:00
|
|
|
d = sh.put_share_hashes(needed_hashes)
|
2007-06-06 10:32:40 -07:00
|
|
|
d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
|
|
|
|
return d
|
2006-12-13 20:32:35 -07:00
|
|
|
|
2007-06-08 15:59:16 -07:00
|
|
|
def send_uri_extension_to_all_shareholders(self):
|
|
|
|
log.msg("%s: sending uri_extension" % self)
|
2007-07-19 18:21:44 -07:00
|
|
|
for k in ('crypttext_root_hash', 'crypttext_hash',
|
|
|
|
'plaintext_root_hash', 'plaintext_hash',
|
|
|
|
):
|
|
|
|
assert k in self.uri_extension_data
|
2007-06-11 18:25:18 -07:00
|
|
|
uri_extension = uri.pack_extension(self.uri_extension_data)
|
2007-06-08 15:59:16 -07:00
|
|
|
self.uri_extension_hash = hashutil.uri_extension_hash(uri_extension)
|
2007-06-01 18:48:01 -07:00
|
|
|
dl = []
|
2007-06-06 10:32:40 -07:00
|
|
|
for shareid in self.landlords.keys():
|
2007-06-08 15:59:16 -07:00
|
|
|
dl.append(self.send_uri_extension(shareid, uri_extension))
|
2007-06-06 12:40:16 -07:00
|
|
|
return self._gather_responses(dl)
|
2007-06-01 18:48:01 -07:00
|
|
|
|
2007-06-08 15:59:16 -07:00
|
|
|
def send_uri_extension(self, shareid, uri_extension):
|
2007-06-06 10:32:40 -07:00
|
|
|
sh = self.landlords[shareid]
|
2007-07-08 23:27:46 -07:00
|
|
|
d = sh.put_uri_extension(uri_extension)
|
2007-06-08 15:59:16 -07:00
|
|
|
d.addErrback(self._remove_shareholder, shareid, "put_uri_extension")
|
2007-06-06 10:32:40 -07:00
|
|
|
return d
|
2007-06-01 18:48:01 -07:00
|
|
|
|
2006-12-13 20:32:35 -07:00
|
|
|
def close_all_shareholders(self):
|
2007-04-06 18:04:38 -07:00
|
|
|
log.msg("%s: closing shareholders" % self)
|
2006-12-13 20:32:35 -07:00
|
|
|
dl = []
|
2007-04-18 18:29:10 -07:00
|
|
|
for shareid in self.landlords:
|
2007-07-08 23:27:46 -07:00
|
|
|
d = self.landlords[shareid].close()
|
2007-06-06 10:32:40 -07:00
|
|
|
d.addErrback(self._remove_shareholder, shareid, "close")
|
|
|
|
dl.append(d)
|
2007-06-06 12:40:16 -07:00
|
|
|
return self._gather_responses(dl)
|
2006-12-13 20:32:35 -07:00
|
|
|
|
|
|
|
def done(self):
|
2007-04-06 15:45:45 -07:00
|
|
|
log.msg("%s: upload done" % self)
|
2007-07-23 19:31:53 -07:00
|
|
|
return (self.uri_extension_hash, self.required_shares,
|
|
|
|
self.num_shares, self.file_size)
|
2007-04-06 18:04:38 -07:00
|
|
|
|
|
|
|
def err(self, f):
|
2007-06-06 10:32:40 -07:00
|
|
|
log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL
|
|
|
|
if f.check(defer.FirstError):
|
|
|
|
return f.value.subFailure
|
2007-04-06 18:04:38 -07:00
|
|
|
return f
|