immutable: refactor downloader to be more reusable for checker/verifier/repairer (and better)

The code for validating the share hash tree and the block hash tree has been rewritten to make sure it handles all cases, to share metadata about the file (such as the share hash tree, block hash trees, and UEB) among different share downloads, and not to require hashes to be stored on the server unnecessarily, such as the roots of the block hash trees (not needed since they are also the leaves of the share hash tree), and the root of the share hash tree (not needed since it is also included in the UEB).  It also passes the latest tests including handling corrupted shares well.
  
ValidatedReadBucketProxy takes a share_hash_tree argument to its constructor, which is a reference to a share hash tree shared by all ValidatedReadBucketProxies for that immutable file download.
  
ValidatedReadBucketProxy requires the block_size and share_size to be provided in its constructor, and it then uses those to compute the offsets and lengths of blocks when it needs them, instead of reading those values out of the share.  The user of ValidatedReadBucketProxy therefore has to have first used a ValidatedExtendedURIProxy to compute those two values from the validated contents of the URI.  This is pleasingly simplifies safety analysis: the client knows which span of bytes corresponds to a given block from the validated URI data, rather than from the unvalidated data stored on the storage server.  It also simplifies unit testing of verifier/repairer, because now it doesn't care about the contents of the "share size" and "block size" fields in the share.  It does not relieve the need for share data v2 layout, because we still need to store and retrieve the offsets of the fields which come after the share data, therefore we still need to use share data v2 with its 8-byte fields if we want to store share data larger than about 2^32.
  
Specify which subset of the block hashes and share hashes you need while downloading a particular share.  In the future this will hopefully be used to fetch only a subset, for network efficiency, but currently all of them are fetched, regardless of which subset you specify.
  
ReadBucketProxy hides the question of whether it has "started" or not (sent a request to the server to get metadata) from its user.

Download is optimized to do as few roundtrips and as few requests as possible, hopefully speeding up download a bit.
This commit is contained in:
Zooko O'Whielacronx 2009-01-05 09:51:45 -07:00
parent 5d5e89d96d
commit 778167c2b1
10 changed files with 264 additions and 210 deletions

View File

@ -7,7 +7,7 @@ from twisted.application import service
from foolscap import DeadReferenceError
from foolscap.eventual import eventually
from allmydata.util import base32, mathutil, hashutil, log
from allmydata.util import base32, deferredutil, mathutil, hashutil, log
from allmydata.util.assertutil import _assert, precondition
from allmydata.util.rrefutil import ServerFailure
from allmydata import codec, hashtree, uri
@ -134,7 +134,8 @@ class ValidatedThingObtainer:
if not self._validatedthingproxies:
raise NotEnoughSharesError("ran out of peers, last error was %s" % (f,))
# try again with a different one
return self._try_the_next_one()
d = self._try_the_next_one()
return d
def _try_the_next_one(self):
vtp = self._validatedthingproxies.pop(0)
@ -167,8 +168,7 @@ class ValidatedCrypttextHashTreeProxy:
return self
def start(self):
d = self._readbucketproxy.startIfNecessary()
d.addCallback(lambda ignored: self._readbucketproxy.get_crypttext_hashes())
d = self._readbucketproxy.get_crypttext_hashes()
d.addCallback(self._validate)
return d
@ -333,97 +333,104 @@ class ValidatedExtendedURIProxy:
""" Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
it. Returns a deferred which is called back with self once the fetch is successful, or
is erred back if it fails. """
d = self._readbucketproxy.startIfNecessary()
d.addCallback(lambda ignored: self._readbucketproxy.get_uri_extension())
d = self._readbucketproxy.get_uri_extension()
d.addCallback(self._check_integrity)
d.addCallback(self._parse_and_validate)
return d
class ValidatedReadBucketProxy(log.PrefixingLogMixin):
"""I am a front-end for a remote storage bucket, responsible for
retrieving and validating data from that bucket.
"""I am a front-end for a remote storage bucket, responsible for retrieving and validating
data from that bucket.
My get_block() method is used by BlockDownloaders.
"""
def __init__(self, sharenum, bucket,
share_hash_tree, share_root_hash,
num_blocks):
""" share_root_hash is the root of the share hash tree; share_root_hash is stored in the UEB """
def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
""" share_hash_tree is required to have already been initialized with the root hash
(the number-0 hash), using the share_root_hash from the UEB """
precondition(share_hash_tree[0] is not None, share_hash_tree)
prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
self.sharenum = sharenum
self.bucket = bucket
self._share_hash = None # None means not validated yet
self.share_hash_tree = share_hash_tree
self._share_root_hash = share_root_hash
self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks)
self.started = False
self.num_blocks = num_blocks
self.block_size = block_size
self.share_size = share_size
self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
def get_block(self, blocknum):
if not self.started:
d = self.bucket.start()
def _started(res):
self.started = True
return self.get_block(blocknum)
d.addCallback(_started)
return d
# the first time we use this bucket, we need to fetch enough elements
# of the share hash tree to validate it from our share hash up to the
# hashroot.
if not self._share_hash:
if self.share_hash_tree.needed_hashes(self.sharenum):
d1 = self.bucket.get_share_hashes()
else:
d1 = defer.succeed([])
# we might need to grab some elements of our block hash tree, to
# validate the requested block up to the share hash
needed = self.block_hash_tree.needed_hashes(blocknum)
if needed:
# TODO: get fewer hashes, use get_block_hashes(needed)
d2 = self.bucket.get_block_hashes()
# We might need to grab some elements of our block hash tree, to
# validate the requested block up to the share hash.
blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
# We don't need the root of the block hash tree, as that comes in the share tree.
blockhashesneeded.discard(0)
d2 = self.bucket.get_block_hashes(blockhashesneeded)
if blocknum < self.num_blocks-1:
thisblocksize = self.block_size
else:
d2 = defer.succeed([])
thisblocksize = self.share_size % self.block_size
if thisblocksize == 0:
thisblocksize = self.block_size
d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
d3 = self.bucket.get_block(blocknum)
dl = deferredutil.gatherResults([d1, d2, d3])
dl.addCallback(self._got_data, blocknum)
return dl
d = defer.gatherResults([d1, d2, d3])
d.addCallback(self._got_data, blocknum)
return d
def _got_data(self, results, blocknum):
precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
sharehashes, blockhashes, blockdata = results
try:
sharehashes = dict(sharehashes)
except ValueError, le:
le.args = tuple(le.args + (sharehashes,))
raise
blockhashes = dict(enumerate(blockhashes))
def _got_data(self, res, blocknum):
sharehashes, blockhashes, blockdata = res
blockhash = None # to make logging it safe
candidate_share_hash = None # in case we log it in the except block below
blockhash = None # in case we log it in the except block below
try:
if not self._share_hash:
sh = dict(sharehashes)
sh[0] = self._share_root_hash # always use our own root, from the URI
sht = self.share_hash_tree
if sht.get_leaf_index(self.sharenum) not in sh:
raise hashtree.NotEnoughHashesError
sht.set_hashes(sh)
self._share_hash = sht.get_leaf(self.sharenum)
if self.share_hash_tree.needed_hashes(self.sharenum):
# This will raise exception if the values being passed do not match the root
# node of self.share_hash_tree.
self.share_hash_tree.set_hashes(sharehashes)
# To validate a block we need the root of the block hash tree, which is also one of
# the leafs of the share hash tree, and is called "the share hash".
if not self.block_hash_tree[0]: # empty -- no root node yet
# Get the share hash from the share hash tree.
share_hash = self.share_hash_tree.get_leaf(self.sharenum)
if not share_hash:
raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
self.block_hash_tree.set_hashes({0: share_hash})
if self.block_hash_tree.needed_hashes(blocknum):
self.block_hash_tree.set_hashes(blockhashes)
blockhash = hashutil.block_hash(blockdata)
self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
#self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
# "%r .. %r: %s" %
# (self.sharenum, blocknum, len(blockdata),
# blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
# we always validate the blockhash
bh = dict(enumerate(blockhashes))
# replace blockhash root with validated value
bh[0] = self._share_hash
self.block_hash_tree.set_hashes(bh, {blocknum: blockhash})
except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
# log.WEIRD: indicates undetected disk/network error, or more
# likely a programming error
self.log("hash failure in block=%d, shnum=%d on %s" %
(blocknum, self.sharenum, self.bucket))
if self._share_hash:
if self.block_hash_tree.needed_hashes(blocknum):
self.log(""" failure occurred when checking the block_hash_tree.
This suggests that either the block data was bad, or that the
block hashes we received along with it were bad.""")
@ -431,7 +438,7 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
self.log(""" the failure probably occurred when checking the
share_hash_tree, which suggests that the share hashes we
received from the remote peer were bad.""")
self.log(" have self._share_hash: %s" % bool(self._share_hash))
self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
self.log(" block length: %d" % len(blockdata))
self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
if len(blockdata) < 100:
@ -439,15 +446,14 @@ class ValidatedReadBucketProxy(log.PrefixingLogMixin):
else:
self.log(" block data start/end: %r .. %r" %
(blockdata[:50], blockdata[-50:]))
self.log(" root hash: %s" % base32.b2a(self._share_root_hash))
self.log(" share hash tree:\n" + self.share_hash_tree.dump())
self.log(" block hash tree:\n" + self.block_hash_tree.dump())
lines = []
for i,h in sorted(sharehashes):
for i,h in sorted(sharehashes.items()):
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
lines = []
for i,h in enumerate(blockhashes):
for i,h in blockhashes.items():
lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
raise BadOrMissingHash(le)
@ -495,7 +501,7 @@ class BlockDownloader(log.PrefixingLogMixin):
self.parent.hold_block(self.blocknum, data)
def _got_block_error(self, f):
failtype = f.trap(ServerFailure, IntegrityCheckReject, layout.LayoutInvalid)
failtype = f.trap(ServerFailure, IntegrityCheckReject, layout.LayoutInvalid, layout.ShareVersionIncompatible)
if f.check(ServerFailure):
level = log.UNUSUAL
else:
@ -679,6 +685,7 @@ class FileDownloader(log.PrefixingLogMixin):
self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
self._share_hash_tree = None
self._crypttext_hash_tree = None
def pauseProducing(self):
@ -840,8 +847,8 @@ class FileDownloader(log.PrefixingLogMixin):
self._current_segnum = 0
self._share_hashtree = hashtree.IncompleteHashTree(self._uri.total_shares)
self._share_hashtree.set_hashes({0: vup.share_root_hash})
self._share_hash_tree = hashtree.IncompleteHashTree(self._uri.total_shares)
self._share_hash_tree.set_hashes({0: vup.share_root_hash})
self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
@ -897,12 +904,8 @@ class FileDownloader(log.PrefixingLogMixin):
def _download_all_segments(self, res):
for sharenum, bucket in self._share_buckets:
vbucket = ValidatedReadBucketProxy(sharenum, bucket,
self._share_hashtree,
self._vup.share_root_hash,
self._vup.num_segments)
s = self._share_vbuckets.setdefault(sharenum, set())
s.add(vbucket)
vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
# after the above code, self._share_vbuckets contains enough
# buckets to complete the download, and some extra ones to

View File

@ -283,7 +283,7 @@ class Encoder(object):
self.set_status("Starting shareholders")
dl = []
for shareid in self.landlords:
d = self.landlords[shareid].start()
d = self.landlords[shareid].put_header()
d.addErrback(self._remove_shareholder, shareid, "start")
dl.append(d)
return self._gather_responses(dl)

View File

@ -3,7 +3,7 @@ from zope.interface import implements
from twisted.internet import defer
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
FileTooLargeError, HASH_SIZE
from allmydata.util import mathutil, idlib
from allmydata.util import log, mathutil, idlib, observer
from allmydata.util.assertutil import precondition
from allmydata import storage
@ -29,8 +29,8 @@ big-endian offset values, which indicate where each section starts. Each offset
the beginning of the share data.
0x00: version number (=00 00 00 01)
0x04: segment size
0x08: data size
0x04: block size # See Footnote 1 below.
0x08: share data size # See Footnote 1 below.
0x0c: offset of data (=00 00 00 24)
0x10: offset of plaintext_hash_tree UNUSED
0x14: offset of crypttext_hash_tree
@ -43,7 +43,7 @@ the beginning of the share data.
? : start of block_hashes
? : start of share_hashes
each share_hash is written as a two-byte (big-endian) hashnum
followed by the 32-byte SHA-256 hash. We only store the hashes
followed by the 32-byte SHA-256 hash. We store only the hashes
necessary to validate the share hash root
? : start of uri_extension_length (four-byte big-endian value)
? : start of uri_extension
@ -54,8 +54,8 @@ v2 shares: these use 8-byte offsets to remove two of the three ~12GiB size
limitations described in #346.
0x00: version number (=00 00 00 02)
0x04: segment size
0x0c: data size
0x04: block size # See Footnote 1 below.
0x0c: share data size # See Footnote 1 below.
0x14: offset of data (=00 00 00 00 00 00 00 44)
0x1c: offset of plaintext_hash_tree UNUSED
0x24: offset of crypttext_hash_tree
@ -68,23 +68,26 @@ limitations described in #346.
? : start of uri_extension_length (eight-byte big-endian value)
"""
# Footnote 1: as of Tahoe v1.3.0 these fields are not used when reading, but they are still
# provided when writing so that older versions of Tahoe can read them.
def allocated_size(data_size, num_segments, num_share_hashes,
uri_extension_size):
uri_extension_size_max):
wbp = WriteBucketProxy(None, data_size, 0, num_segments, num_share_hashes,
uri_extension_size, None)
uri_extension_size_max, None)
uri_extension_starts_at = wbp._offsets['uri_extension']
return uri_extension_starts_at + wbp.fieldsize + uri_extension_size
return uri_extension_starts_at + wbp.fieldsize + uri_extension_size_max
class WriteBucketProxy:
implements(IStorageBucketWriter)
fieldsize = 4
fieldstruct = ">L"
def __init__(self, rref, data_size, segment_size, num_segments,
num_share_hashes, uri_extension_size, nodeid):
def __init__(self, rref, data_size, block_size, num_segments,
num_share_hashes, uri_extension_size_max, nodeid):
self._rref = rref
self._data_size = data_size
self._segment_size = segment_size
self._block_size = block_size
self._num_segments = num_segments
self._nodeid = nodeid
@ -92,14 +95,14 @@ class WriteBucketProxy:
self._segment_hash_size = (2*effective_segments - 1) * HASH_SIZE
# how many share hashes are included in each share? This will be
# about ln2(num_shares).
self._share_hash_size = num_share_hashes * (2+HASH_SIZE)
self._share_hashtree_size = num_share_hashes * (2+HASH_SIZE)
# we commit to not sending a uri extension larger than this
self._uri_extension_size = uri_extension_size
self._uri_extension_size_max = uri_extension_size_max
self._create_offsets(segment_size, data_size)
self._create_offsets(block_size, data_size)
def _create_offsets(self, segment_size, data_size):
if segment_size >= 2**32 or data_size >= 2**32:
def _create_offsets(self, block_size, data_size):
if block_size >= 2**32 or data_size >= 2**32:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
offsets = self._offsets = {}
@ -113,7 +116,7 @@ class WriteBucketProxy:
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
x += self._share_hash_size
x += self._share_hashtree_size
offsets['uri_extension'] = x
if x >= 2**32:
@ -121,7 +124,7 @@ class WriteBucketProxy:
offset_data = struct.pack(">LLLLLLLLL",
1, # version number
segment_size,
block_size,
data_size,
offsets['data'],
offsets['plaintext_hash_tree'], # UNUSED
@ -140,21 +143,21 @@ class WriteBucketProxy:
nodeid_s = "[None]"
return "<allmydata.storage.WriteBucketProxy for node %s>" % nodeid_s
def start(self):
def put_header(self):
return self._write(0, self._offset_data)
def put_block(self, segmentnum, data):
offset = self._offsets['data'] + segmentnum * self._segment_size
offset = self._offsets['data'] + segmentnum * self._block_size
assert offset + len(data) <= self._offsets['uri_extension']
assert isinstance(data, str)
if segmentnum < self._num_segments-1:
precondition(len(data) == self._segment_size,
len(data), self._segment_size)
precondition(len(data) == self._block_size,
len(data), self._block_size)
else:
precondition(len(data) == (self._data_size -
(self._segment_size *
(self._block_size *
(self._num_segments - 1))),
len(data), self._segment_size)
len(data), self._block_size)
return self._write(offset, data)
def put_crypttext_hashes(self, hashes):
@ -186,8 +189,8 @@ class WriteBucketProxy:
assert isinstance(sharehashes, list)
data = "".join([struct.pack(">H", hashnum) + hashvalue
for hashnum,hashvalue in sharehashes])
precondition(len(data) == self._share_hash_size,
len(data), self._share_hash_size)
precondition(len(data) == self._share_hashtree_size,
len(data), self._share_hashtree_size)
precondition(offset + len(data) <= self._offsets['uri_extension'],
offset, len(data), offset+len(data),
self._offsets['uri_extension'])
@ -196,8 +199,8 @@ class WriteBucketProxy:
def put_uri_extension(self, data):
offset = self._offsets['uri_extension']
assert isinstance(data, str)
precondition(len(data) <= self._uri_extension_size,
len(data), self._uri_extension_size)
precondition(len(data) <= self._uri_extension_size_max,
len(data), self._uri_extension_size_max)
length = struct.pack(self.fieldstruct, len(data))
return self._write(offset, length+data)
@ -215,8 +218,8 @@ class WriteBucketProxy_v2(WriteBucketProxy):
fieldsize = 8
fieldstruct = ">Q"
def _create_offsets(self, segment_size, data_size):
if segment_size >= 2**64 or data_size >= 2**64:
def _create_offsets(self, block_size, data_size):
if block_size >= 2**64 or data_size >= 2**64:
raise FileTooLargeError("This file is too large to be uploaded (data_size).")
offsets = self._offsets = {}
@ -230,7 +233,7 @@ class WriteBucketProxy_v2(WriteBucketProxy):
offsets['block_hashes'] = x
x += self._segment_hash_size
offsets['share_hashes'] = x
x += self._share_hash_size
x += self._share_hashtree_size
offsets['uri_extension'] = x
if x >= 2**64:
@ -238,7 +241,7 @@ class WriteBucketProxy_v2(WriteBucketProxy):
offset_data = struct.pack(">LQQQQQQQQ",
2, # version number
segment_size,
block_size,
data_size,
offsets['data'],
offsets['plaintext_hash_tree'], # UNUSED
@ -252,13 +255,17 @@ class WriteBucketProxy_v2(WriteBucketProxy):
class ReadBucketProxy:
implements(IStorageBucketReader)
MAX_UEB_SIZE = 2000 # actual size is closer to 419, but varies by a few bytes
def __init__(self, rref, peerid, storage_index):
self._rref = rref
self._peerid = peerid
peer_id_s = idlib.shortnodeid_b2a(peerid)
storage_index_s = storage.si_b2a(storage_index)
self._reprstr = "<ReadBucketProxy to peer [%s] SI %s>" % (peer_id_s, storage_index_s)
self._started = False
self._reprstr = "<ReadBucketProxy %s to peer [%s] SI %s>" % (id(self), peer_id_s, storage_index_s)
self._started = False # sent request to server
self._ready = observer.OneShotObserverList() # got response from server
def get_peerid(self):
return self._peerid
@ -266,22 +273,27 @@ class ReadBucketProxy:
def __repr__(self):
return self._reprstr
def startIfNecessary(self):
if self._started:
return defer.succeed(self)
d = self.start()
d.addCallback(lambda res: self)
def _start_if_needed(self):
""" Returns a deferred that will be fired when I'm ready to return data, or errbacks if
the starting (header reading and parsing) process fails."""
if not self._started:
self._start()
return self._ready.when_fired()
def _start(self):
self._started = True
# TODO: for small shares, read the whole bucket in _start()
d = self._fetch_header()
d.addCallback(self._parse_offsets)
d.addCallback(self._fetch_sharehashtree_and_ueb)
d.addCallback(self._parse_sharehashtree_and_ueb)
def _fail_waiters(f):
self._ready.fire(f)
d.addErrback(_fail_waiters)
return d
def start(self):
# TODO: for small shares, read the whole bucket in start()
d = self._read(0, 0x44)
d.addCallback(self._parse_offsets)
def _started(res):
self._started = True
return res
d.addCallback(_started)
return d
def _fetch_header(self):
return self._read(0, 0x44)
def _parse_offsets(self, data):
precondition(len(data) >= 0x4)
@ -295,15 +307,11 @@ class ReadBucketProxy:
x = 0x0c
fieldsize = 0x4
fieldstruct = ">L"
(self._segment_size,
self._data_size) = struct.unpack(">LL", data[0x4:0xc])
else:
precondition(len(data) >= 0x44)
x = 0x14
fieldsize = 0x8
fieldstruct = ">Q"
(self._segment_size,
self._data_size) = struct.unpack(">QQ", data[0x4:0x14])
self._version = version
self._fieldsize = fieldsize
@ -321,67 +329,86 @@ class ReadBucketProxy:
self._offsets[field] = offset
return self._offsets
def get_block(self, blocknum):
num_segments = mathutil.div_ceil(self._data_size, self._segment_size)
if blocknum < num_segments-1:
size = self._segment_size
else:
size = self._data_size % self._segment_size
if size == 0:
size = self._segment_size
offset = self._offsets['data'] + blocknum * self._segment_size
return self._read(offset, size)
def _fetch_sharehashtree_and_ueb(self, offsets):
sharehashtree_size = offsets['uri_extension'] - offsets['share_hashes']
return self._read(offsets['share_hashes'], self.MAX_UEB_SIZE+sharehashtree_size)
def _parse_sharehashtree_and_ueb(self, data):
sharehashtree_size = self._offsets['uri_extension'] - self._offsets['share_hashes']
if len(data) < sharehashtree_size:
raise LayoutInvalid("share hash tree truncated -- should have at least %d bytes -- not %d" % (sharehashtree_size, len(data)))
if sharehashtree_size % (2+HASH_SIZE) != 0:
raise LayoutInvalid("share hash tree malformed -- should have an even multiple of %d bytes -- not %d" % (2+HASH_SIZE, sharehashtree_size))
self._share_hashes = []
for i in range(0, sharehashtree_size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
self._share_hashes.append( (hashnum, hashvalue) )
i = self._offsets['uri_extension']-self._offsets['share_hashes']
if len(data) < i+self._fieldsize:
raise LayoutInvalid("not enough bytes to encode URI length -- should be at least %d bytes long, not %d " % (i+self._fieldsize, len(data),))
length = struct.unpack(self._fieldstruct, data[i:i+self._fieldsize])[0]
self._ueb_data = data[i+self._fieldsize:i+self._fieldsize+length]
self._ready.fire(self)
def _get_block_data(self, unused, blocknum, blocksize, thisblocksize):
offset = self._offsets['data'] + blocknum * blocksize
return self._read(offset, thisblocksize)
def get_block_data(self, blocknum, blocksize, thisblocksize):
d = self._start_if_needed()
d.addCallback(self._get_block_data, blocknum, blocksize, thisblocksize)
return d
def _str2l(self, s):
""" split string (pulled from storage) into a list of blockids """
return [ s[i:i+HASH_SIZE]
for i in range(0, len(s), HASH_SIZE) ]
def get_crypttext_hashes(self):
def _get_crypttext_hashes(self, unused=None):
offset = self._offsets['crypttext_hash_tree']
size = self._offsets['block_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_block_hashes(self):
def get_crypttext_hashes(self):
d = self._start_if_needed()
d.addCallback(self._get_crypttext_hashes)
return d
def _get_block_hashes(self, unused=None, at_least_these=()):
# TODO: fetch only at_least_these instead of all of them.
offset = self._offsets['block_hashes']
size = self._offsets['share_hashes'] - offset
d = self._read(offset, size)
d.addCallback(self._str2l)
return d
def get_block_hashes(self, at_least_these=()):
if at_least_these:
d = self._start_if_needed()
d.addCallback(self._get_block_hashes, at_least_these)
return d
else:
return defer.succeed([])
def _get_share_hashes(self, unused=None):
return self._share_hashes
def get_share_hashes(self):
offset = self._offsets['share_hashes']
size = self._offsets['uri_extension'] - offset
assert size % (2+HASH_SIZE) == 0
d = self._read(offset, size)
def _unpack_share_hashes(data):
assert len(data) == size
hashes = []
for i in range(0, size, 2+HASH_SIZE):
hashnum = struct.unpack(">H", data[i:i+2])[0]
hashvalue = data[i+2:i+2+HASH_SIZE]
hashes.append( (hashnum, hashvalue) )
return hashes
d.addCallback(_unpack_share_hashes)
d = self._start_if_needed()
d.addCallback(self._get_share_hashes)
return d
def get_uri_extension(self):
offset = self._offsets['uri_extension']
d = self._read(offset, self._fieldsize)
def _got_length(data):
if len(data) != self._fieldsize:
raise LayoutInvalid("not enough bytes to encode URI length -- should be %d bytes long, not %d " % (self._fieldsize, len(data),))
length = struct.unpack(self._fieldstruct, data)[0]
if length >= 2**31:
# URI extension blocks are around 419 bytes long, so this must be corrupted.
# Anyway, the foolscap interface schema for "read" will not allow >= 2**31 bytes
# length.
raise RidiculouslyLargeURIExtensionBlock(length)
def _get_uri_extension(self, unused=None):
return self._ueb_data
return self._read(offset+self._fieldsize, length)
d.addCallback(_got_length)
def get_uri_extension(self):
d = self._start_if_needed()
d.addCallback(self._get_uri_extension)
return d
def _read(self, offset, length):

View File

@ -306,7 +306,7 @@ class IStorageBucketWriter(Interface):
class IStorageBucketReader(Interface):
def get_block(blocknum=int):
def get_block_data(blocknum=int, blocksize=int, size=int):
"""Most blocks will be the same size. The last block might be shorter
than the others.
@ -318,12 +318,12 @@ class IStorageBucketReader(Interface):
@return: ListOf(Hash)
"""
def get_block_hashes():
def get_block_hashes(at_least_these=SetOf(int)):
"""
@return: ListOf(Hash)
"""
def get_share_hashes():
def get_share_hashes(at_least_these=SetOf(int)):
"""
@return: ListOf(TupleOf(int, Hash))
"""

View File

@ -88,8 +88,7 @@ class CHKCheckerAndUEBFetcher:
return
b,peerid = self._readers.pop()
rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
d = rbp.startIfNecessary()
d.addCallback(lambda res: rbp.get_uri_extension())
d = rbp.get_uri_extension()
d.addCallback(self._got_uri_extension)
d.addErrback(self._ueb_error)
return d

View File

@ -105,7 +105,8 @@ def dump_immutable_share(options):
print >>out, "%20s: %s" % ("verify-cap", verify_cap)
sizes = {}
sizes['data'] = bp._data_size
sizes['data'] = (offsets['plaintext_hash_tree'] -
offsets['data'])
sizes['validation'] = (offsets['uri_extension'] -
offsets['plaintext_hash_tree'])
sizes['uri-extension'] = len(UEB_data)
@ -586,6 +587,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, sf):
self.sf = sf
ReadBucketProxy.__init__(self, "", "", "")
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
@ -594,7 +596,6 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
# use a ReadBucketProxy to parse the bucket and find the uri extension
sf = storage.ShareFile(abs_sharefile)
bp = ImmediateReadBucketProxy(sf)
call(bp.start)
expiration_time = min( [lease.expiration_time
for lease in sf.iter_leases()] )

View File

@ -36,14 +36,15 @@ class FakeBucketReaderWriterProxy:
def get_peerid(self):
return "peerid"
def startIfNecessary(self):
return defer.succeed(self)
def start(self):
def _start(self):
if self.mode == "lost-early":
f = Failure(LostPeerError("I went away early"))
return eventual.fireEventually(f)
return defer.succeed(self)
def put_header(self):
return self._start()
def put_block(self, segmentnum, data):
if self.mode == "lost-early":
f = Failure(LostPeerError("I went away early"))
@ -99,41 +100,50 @@ class FakeBucketReaderWriterProxy:
def abort(self):
return defer.succeed(None)
def get_block(self, blocknum):
def _try():
def get_block_data(self, blocknum, blocksize, size):
d = self._start()
def _try(unused=None):
assert isinstance(blocknum, (int, long))
if self.mode == "bad block":
return flip_bit(self.blocks[blocknum])
return self.blocks[blocknum]
return defer.maybeDeferred(_try)
d.addCallback(_try)
return d
def get_plaintext_hashes(self):
def _try():
d = self._start()
def _try(unused=None):
hashes = self.plaintext_hashes[:]
return hashes
return defer.maybeDeferred(_try)
d.addCallback(_try)
return d
def get_crypttext_hashes(self):
def _try():
d = self._start()
def _try(unused=None):
hashes = self.crypttext_hashes[:]
if self.mode == "bad crypttext hashroot":
hashes[0] = flip_bit(hashes[0])
if self.mode == "bad crypttext hash":
hashes[1] = flip_bit(hashes[1])
return hashes
return defer.maybeDeferred(_try)
d.addCallback(_try)
return d
def get_block_hashes(self):
def _try():
def get_block_hashes(self, at_least_these=()):
d = self._start()
def _try(unused=None):
if self.mode == "bad blockhash":
hashes = self.block_hashes[:]
hashes[1] = flip_bit(hashes[1])
return hashes
return self.block_hashes
return defer.maybeDeferred(_try)
d.addCallback(_try)
return d
def get_share_hashes(self):
def _try():
def get_share_hashes(self, at_least_these=()):
d = self._start()
def _try(unused=None):
if self.mode == "bad sharehash":
hashes = self.share_hashes[:]
hashes[1] = (hashes[1][0], flip_bit(hashes[1][1]))
@ -144,14 +154,17 @@ class FakeBucketReaderWriterProxy:
# download.py is supposed to guard against this case.
return []
return self.share_hashes
return defer.maybeDeferred(_try)
d.addCallback(_try)
return d
def get_uri_extension(self):
def _try():
d = self._start()
def _try(unused=None):
if self.mode == "bad uri_extension":
return flip_bit(self.uri_extension)
return self.uri_extension
return defer.maybeDeferred(_try)
d.addCallback(_try)
return d
def make_data(length):
@ -719,9 +732,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
# the first 7 servers have bad block hashes, so the sharehash tree
# will not validate, and the download will fail
modemap = dict([(i, "bad sharehash")
for i in range(7)]
+ [(i, "good")
for i in range(7, 10)])
for i in range(10)])
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure))
@ -739,12 +750,10 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
return self.send_and_recover((4,8,10), bucket_modes=modemap)
def test_missing_sharehashes_failure(self):
# the first 7 servers are missing their sharehashes, so the
# sharehash tree will not validate, and the download will fail
# all servers are missing their sharehashes, so the sharehash tree will not validate,
# and the download will fail
modemap = dict([(i, "missing sharehash")
for i in range(7)]
+ [(i, "good")
for i in range(7, 10)])
for i in range(10)])
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
def _done(res):
self.failUnless(isinstance(res, Failure), res)

View File

@ -62,6 +62,24 @@ class Incomplete(unittest.TestCase):
self.failUnlessRaises(IndexError, ht.get_leaf, 8)
self.failUnlessEqual(ht.get_leaf_index(0), 7)
def test_needed_hashes(self):
ht = hashtree.IncompleteHashTree(8)
self.failUnlessEqual(ht.needed_hashes(0), set([8, 4, 2]))
self.failUnlessEqual(ht.needed_hashes(0, True), set([7, 8, 4, 2]))
self.failUnlessEqual(ht.needed_hashes(1), set([7, 4, 2]))
self.failUnlessEqual(ht.needed_hashes(7), set([13, 5, 1]))
self.failUnlessEqual(ht.needed_hashes(7, False), set([13, 5, 1]))
self.failUnlessEqual(ht.needed_hashes(7, True), set([14, 13, 5, 1]))
ht = hashtree.IncompleteHashTree(1)
self.failUnlessEqual(ht.needed_hashes(0), set([]))
ht = hashtree.IncompleteHashTree(6)
self.failUnlessEqual(ht.needed_hashes(0), set([8, 4, 2]))
self.failUnlessEqual(ht.needed_hashes(0, True), set([7, 8, 4, 2]))
self.failUnlessEqual(ht.needed_hashes(1), set([7, 4, 2]))
self.failUnlessEqual(ht.needed_hashes(5), set([11, 6, 1]))
self.failUnlessEqual(ht.needed_hashes(5, False), set([11, 6, 1]))
self.failUnlessEqual(ht.needed_hashes(5, True), set([12, 11, 6, 1]))
def test_check(self):
# first create a complete hash tree
ht = make_tree(6)
@ -160,4 +178,3 @@ class Incomplete(unittest.TestCase):
iht.set_hashes(chain, leaves={4: tagged_hash("tag", "4")})
except hashtree.BadHashError, e:
self.fail("bad hash: %s" % e)

View File

@ -380,9 +380,10 @@ class Test(ShareManglingMixin, unittest.TestCase):
before_download_reads = self._count_reads()
def _after_download(unused=None):
after_download_reads = self._count_reads()
# To pass this test, you have to download the file using only 10 reads to get the
# UEB (in parallel from all shares), plus one read for each of the 3 shares.
self.failIf(after_download_reads-before_download_reads > 13, (after_download_reads, before_download_reads))
# To pass this test, you have to download the file using only 10 reads total: 3 to
# get the headers from each share, 3 to get the share hash trees and uebs from each
# share, 1 to get the crypttext hashes, and 3 to get the block data from each share.
self.failIf(after_download_reads-before_download_reads > 10, (after_download_reads, before_download_reads))
d.addCallback(self._download_and_check_plaintext)
d.addCallback(_after_download)
return d
@ -403,7 +404,6 @@ class Test(ShareManglingMixin, unittest.TestCase):
d.addCallback(self._download_and_check_plaintext)
d.addCallback(_after_download)
return d
test_download_from_only_3_remaining_shares.todo = "I think this test is failing due to the downloader code not knowing how to handle URI corruption and keeping going. I'm going to commit new downloader code soon, and then see if this test starts passing."
def test_download_abort_if_too_many_missing_shares(self):
""" Test that download gives up quickly when it realizes there aren't enough shares out

View File

@ -125,10 +125,10 @@ class BucketProxy(unittest.TestCase):
bw, rb, sharefname = self.make_bucket("test_create", 500)
bp = WriteBucketProxy(rb,
data_size=300,
segment_size=10,
block_size=10,
num_segments=5,
num_share_hashes=3,
uri_extension_size=500, nodeid=None)
uri_extension_size_max=500, nodeid=None)
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
@ -156,13 +156,13 @@ class BucketProxy(unittest.TestCase):
bw, rb, sharefname = self.make_bucket(name, sharesize)
bp = wbp_class(rb,
data_size=95,
segment_size=25,
block_size=25,
num_segments=4,
num_share_hashes=3,
uri_extension_size=len(uri_extension),
uri_extension_size_max=len(uri_extension),
nodeid=None)
d = bp.start()
d = bp.put_header()
d.addCallback(lambda res: bp.put_block(0, "a"*25))
d.addCallback(lambda res: bp.put_block(1, "b"*25))
d.addCallback(lambda res: bp.put_block(2, "c"*25))
@ -182,21 +182,19 @@ class BucketProxy(unittest.TestCase):
self.failUnless("to peer" in repr(rbp))
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
d1 = rbp.startIfNecessary()
d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
d1.addCallback(lambda res: rbp.get_block(0))
d1 = rbp.get_block_data(0, 25, 25)
d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
d1.addCallback(lambda res: rbp.get_block(1))
d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
d1.addCallback(lambda res: rbp.get_block(2))
d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
d1.addCallback(lambda res: rbp.get_block(3))
d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
d1.addCallback(lambda res: rbp.get_crypttext_hashes())
d1.addCallback(lambda res:
self.failUnlessEqual(res, crypttext_hashes))
d1.addCallback(lambda res: rbp.get_block_hashes())
d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
d1.addCallback(lambda res: rbp.get_share_hashes())
d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))