mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-19 03:06:33 +00:00
download: refactor handling of URI Extension Block and crypttext hash tree, simplify things
Refactor into a class the logic of asking each server in turn until one of them gives an answer that validates. It is called ValidatedThingObtainer. Refactor the downloading and verification of the URI Extension Block into a class named ValidatedExtendedURIProxy. The new logic of validating UEBs is minimalist: it doesn't require the UEB to contain any unncessary information, but of course it still accepts such information for backwards compatibility (so that this new download code is able to download files uploaded with old, and for that matter with current, upload code). The new logic of validating UEBs follows the practice of doing all validation up front. This practice advises one to isolate the validation of incoming data into one place, so that all of the rest of the code can assume only valid data. If any redundant information is present in the UEB+URI, the new code cross-checks and asserts that it is all fully consistent. This closes some issues where the uploader could have uploaded inconsistent redundant data, which would probably have caused the old downloader to simply reject that download after getting a Python exception, but perhaps could have caused greater harm to the old downloader. I removed the notion of selecting an erasure codec from codec.py based on the string that was passed in the UEB. Currently "crs" is the only such string that works, so "_assert(codec_name == 'crs')" is simpler and more explicit. This is also in keeping with the "validate up front" strategy -- now if someone sets a different string than "crs" in their UEB, the downloader will reject the download in the "validate this UEB" function instead of in a separate "select the codec instance" function. I removed the code to check plaintext hashes and plaintext Merkle Trees. Uploaders do not produce this information any more (since it potentially exposes confidential information about the file), and the unit tests for it were disabled. The downloader before this patch would check that plaintext hash or plaintext merkle tree if they were present, but not complain if they were absent. The new downloader in this patch complains if they are present and doesn't check them. (We might in the future re-introduce such hashes over the plaintext, but encrypt the hashes which are stored in the UEB to preserve confidentiality. This would be a double- check on the correctness of our own source code -- the current Merkle Tree over the ciphertext is already sufficient to guarantee the integrity of the download unless there is a bug in our Merkle Tree or AES implementation.) This patch increases the lines-of-code count by 8 (from 17,770 to 17,778), and reduces the uncovered-by-tests lines-of-code count by 24 (from 1408 to 1384). Those numbers would be more meaningful if we omitted src/allmydata/util/ from the test-coverage statistics.
This commit is contained in:
parent
a0b5b5ab2b
commit
b315619d6b
@ -7,59 +7,6 @@ from allmydata.util.assertutil import precondition
|
|||||||
from allmydata.interfaces import ICodecEncoder, ICodecDecoder
|
from allmydata.interfaces import ICodecEncoder, ICodecDecoder
|
||||||
import zfec
|
import zfec
|
||||||
|
|
||||||
|
|
||||||
class ReplicatingEncoder(object):
|
|
||||||
implements(ICodecEncoder)
|
|
||||||
ENCODER_TYPE = "rep"
|
|
||||||
|
|
||||||
def set_params(self, data_size, required_shares, max_shares):
|
|
||||||
assert data_size % required_shares == 0
|
|
||||||
assert required_shares <= max_shares
|
|
||||||
self.data_size = data_size
|
|
||||||
self.required_shares = required_shares
|
|
||||||
self.max_shares = max_shares
|
|
||||||
|
|
||||||
def get_encoder_type(self):
|
|
||||||
return self.ENCODER_TYPE
|
|
||||||
|
|
||||||
def get_serialized_params(self):
|
|
||||||
return "%d" % self.required_shares
|
|
||||||
|
|
||||||
def get_block_size(self):
|
|
||||||
return self.data_size
|
|
||||||
|
|
||||||
def encode(self, inshares, desired_shareids=None):
|
|
||||||
assert isinstance(inshares, list)
|
|
||||||
for inshare in inshares:
|
|
||||||
assert isinstance(inshare, str)
|
|
||||||
assert self.required_shares * len(inshare) == self.data_size
|
|
||||||
data = "".join(inshares)
|
|
||||||
if desired_shareids is None:
|
|
||||||
desired_shareids = range(self.max_shares)
|
|
||||||
shares = [data for i in desired_shareids]
|
|
||||||
return defer.succeed((shares, desired_shareids))
|
|
||||||
|
|
||||||
class ReplicatingDecoder(object):
|
|
||||||
implements(ICodecDecoder)
|
|
||||||
|
|
||||||
def set_serialized_params(self, params):
|
|
||||||
self.required_shares = int(params)
|
|
||||||
|
|
||||||
def get_needed_shares(self):
|
|
||||||
return self.required_shares
|
|
||||||
|
|
||||||
def decode(self, some_shares, their_shareids):
|
|
||||||
assert len(some_shares) == self.required_shares
|
|
||||||
assert len(some_shares) == len(their_shareids)
|
|
||||||
data = some_shares[0]
|
|
||||||
|
|
||||||
chunksize = mathutil.div_ceil(len(data), self.required_shares)
|
|
||||||
numchunks = mathutil.div_ceil(len(data), chunksize)
|
|
||||||
l = [ data[i:i+chunksize] for i in range(0, len(data), chunksize) ]
|
|
||||||
|
|
||||||
return defer.succeed(l)
|
|
||||||
|
|
||||||
|
|
||||||
class CRSEncoder(object):
|
class CRSEncoder(object):
|
||||||
implements(ICodecEncoder)
|
implements(ICodecEncoder)
|
||||||
ENCODER_TYPE = "crs"
|
ENCODER_TYPE = "crs"
|
||||||
@ -76,6 +23,9 @@ class CRSEncoder(object):
|
|||||||
def get_encoder_type(self):
|
def get_encoder_type(self):
|
||||||
return self.ENCODER_TYPE
|
return self.ENCODER_TYPE
|
||||||
|
|
||||||
|
def get_params(self):
|
||||||
|
return (self.data_size, self.required_shares, self.max_shares)
|
||||||
|
|
||||||
def get_serialized_params(self):
|
def get_serialized_params(self):
|
||||||
return "%d-%d-%d" % (self.data_size, self.required_shares,
|
return "%d-%d-%d" % (self.data_size, self.required_shares,
|
||||||
self.max_shares)
|
self.max_shares)
|
||||||
@ -98,11 +48,10 @@ class CRSEncoder(object):
|
|||||||
class CRSDecoder(object):
|
class CRSDecoder(object):
|
||||||
implements(ICodecDecoder)
|
implements(ICodecDecoder)
|
||||||
|
|
||||||
def set_serialized_params(self, params):
|
def set_params(self, data_size, required_shares, max_shares):
|
||||||
pieces = params.split("-")
|
self.data_size = data_size
|
||||||
self.data_size = int(pieces[0])
|
self.required_shares = required_shares
|
||||||
self.required_shares = int(pieces[1])
|
self.max_shares = max_shares
|
||||||
self.max_shares = int(pieces[2])
|
|
||||||
|
|
||||||
self.chunk_size = self.required_shares
|
self.chunk_size = self.required_shares
|
||||||
self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size)
|
self.num_chunks = mathutil.div_ceil(self.data_size, self.chunk_size)
|
||||||
@ -121,13 +70,6 @@ class CRSDecoder(object):
|
|||||||
[int(s) for s in their_shareids])
|
[int(s) for s in their_shareids])
|
||||||
return defer.succeed(data)
|
return defer.succeed(data)
|
||||||
|
|
||||||
|
def parse_params(serializedparams):
|
||||||
all_encoders = {
|
pieces = serializedparams.split("-")
|
||||||
ReplicatingEncoder.ENCODER_TYPE: (ReplicatingEncoder, ReplicatingDecoder),
|
return int(pieces[0]), int(pieces[1]), int(pieces[2])
|
||||||
CRSEncoder.ENCODER_TYPE: (CRSEncoder, CRSDecoder),
|
|
||||||
}
|
|
||||||
|
|
||||||
def get_decoder_by_name(name):
|
|
||||||
decoder_class = all_encoders[name][1]
|
|
||||||
return decoder_class()
|
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ from allmydata.util import hashutil, mathutil, base32, log
|
|||||||
from allmydata.util.hashutil import netstring
|
from allmydata.util.hashutil import netstring
|
||||||
from allmydata.util.limiter import ConcurrencyLimiter
|
from allmydata.util.limiter import ConcurrencyLimiter
|
||||||
from allmydata.util.netstring import split_netstring
|
from allmydata.util.netstring import split_netstring
|
||||||
from allmydata.uri import NewDirectoryURI
|
from allmydata.uri import NewDirectoryURI, LiteralFileURI, from_string
|
||||||
from pycryptopp.cipher.aes import AES
|
from pycryptopp.cipher.aes import AES
|
||||||
|
|
||||||
class Deleter:
|
class Deleter:
|
||||||
@ -202,6 +202,8 @@ class NewDirectoryNode:
|
|||||||
assert isinstance(metadata, dict)
|
assert isinstance(metadata, dict)
|
||||||
rwcap = child.get_uri() # might be RO if the child is not writeable
|
rwcap = child.get_uri() # might be RO if the child is not writeable
|
||||||
rocap = child.get_readonly_uri()
|
rocap = child.get_readonly_uri()
|
||||||
|
assert isinstance(rocap, str), rocap
|
||||||
|
assert isinstance(rwcap, str), rwcap
|
||||||
entry = "".join([netstring(name.encode("utf-8")),
|
entry = "".join([netstring(name.encode("utf-8")),
|
||||||
netstring(rocap),
|
netstring(rocap),
|
||||||
netstring(self._encrypt_rwcap(rwcap)),
|
netstring(self._encrypt_rwcap(rwcap)),
|
||||||
@ -579,7 +581,8 @@ class DeepStats:
|
|||||||
self.add("count-files")
|
self.add("count-files")
|
||||||
size = node.get_size()
|
size = node.get_size()
|
||||||
self.histogram("size-files-histogram", size)
|
self.histogram("size-files-histogram", size)
|
||||||
if node.get_uri().startswith("URI:LIT:"):
|
theuri = from_string(node.get_uri())
|
||||||
|
if isinstance(theuri, LiteralFileURI):
|
||||||
self.add("count-literal-files")
|
self.add("count-literal-files")
|
||||||
self.add("size-literal-files", size)
|
self.add("size-literal-files", size)
|
||||||
else:
|
else:
|
||||||
|
@ -384,7 +384,7 @@ class IncompleteHashTree(CompleteBinaryTreeMixin, list):
|
|||||||
for i in new_hashes:
|
for i in new_hashes:
|
||||||
if self[i]:
|
if self[i]:
|
||||||
if self[i] != new_hashes[i]:
|
if self[i] != new_hashes[i]:
|
||||||
msg = "new hash does not match existing hash at "
|
msg = "new hash %s does not match existing hash %s at " % (base32.b2a(new_hashes[i]), base32.b2a(self[i]))
|
||||||
msg += self._name_hash(i)
|
msg += self._name_hash(i)
|
||||||
raise BadHashError(msg)
|
raise BadHashError(msg)
|
||||||
else:
|
else:
|
||||||
|
@ -11,7 +11,9 @@ from twisted.python import log
|
|||||||
from allmydata import storage
|
from allmydata import storage
|
||||||
from allmydata.checker_results import CheckerResults
|
from allmydata.checker_results import CheckerResults
|
||||||
from allmydata.immutable import download
|
from allmydata.immutable import download
|
||||||
|
from allmydata.uri import CHKFileURI
|
||||||
from allmydata.util import hashutil
|
from allmydata.util import hashutil
|
||||||
|
from allmydata.util.assertutil import _assert, precondition
|
||||||
|
|
||||||
class SimpleCHKFileChecker:
|
class SimpleCHKFileChecker:
|
||||||
"""Return a list of (needed, total, found, sharemap), where sharemap maps
|
"""Return a list of (needed, total, found, sharemap), where sharemap maps
|
||||||
@ -68,7 +70,7 @@ class SimpleCHKFileChecker:
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
def _done(self, res):
|
def _done(self, res):
|
||||||
r = CheckerResults(self.uri, self.storage_index)
|
r = CheckerResults(self.uri.to_string(), self.storage_index)
|
||||||
report = []
|
report = []
|
||||||
healthy = bool(len(self.found_shares) >= self.total_shares)
|
healthy = bool(len(self.found_shares) >= self.total_shares)
|
||||||
r.set_healthy(healthy)
|
r.set_healthy(healthy)
|
||||||
@ -129,7 +131,7 @@ class VerifyingOutput:
|
|||||||
results.set_recoverable(False)
|
results.set_recoverable(False)
|
||||||
results.set_summary("Not Healthy")
|
results.set_summary("Not Healthy")
|
||||||
|
|
||||||
def setup_hashtrees(self, plaintext_hashtree, crypttext_hashtree):
|
def got_crypttext_hash_tree(self, crypttext_hashtree):
|
||||||
self._crypttext_hash_tree = crypttext_hashtree
|
self._crypttext_hash_tree = crypttext_hashtree
|
||||||
|
|
||||||
def write_segment(self, crypttext):
|
def write_segment(self, crypttext):
|
||||||
@ -162,10 +164,11 @@ class SimpleCHKFileVerifier(download.FileDownloader):
|
|||||||
# remaining shareholders, and it cannot verify the plaintext.
|
# remaining shareholders, and it cannot verify the plaintext.
|
||||||
check_plaintext_hash = False
|
check_plaintext_hash = False
|
||||||
|
|
||||||
def __init__(self, client, uri, storage_index, k, N, size, ueb_hash):
|
def __init__(self, client, u, storage_index, k, N, size, ueb_hash):
|
||||||
|
precondition(isinstance(u, CHKFileURI), u)
|
||||||
self._client = client
|
self._client = client
|
||||||
|
|
||||||
self._uri = uri
|
self._uri = u
|
||||||
self._storage_index = storage_index
|
self._storage_index = storage_index
|
||||||
self._uri_extension_hash = ueb_hash
|
self._uri_extension_hash = ueb_hash
|
||||||
self._total_shares = N
|
self._total_shares = N
|
||||||
@ -175,7 +178,7 @@ class SimpleCHKFileVerifier(download.FileDownloader):
|
|||||||
self._si_s = storage.si_b2a(self._storage_index)
|
self._si_s = storage.si_b2a(self._storage_index)
|
||||||
self.init_logging()
|
self.init_logging()
|
||||||
|
|
||||||
self._check_results = r = CheckerResults(self._uri, self._storage_index)
|
self._check_results = r = CheckerResults(self._uri.to_string(), self._storage_index)
|
||||||
r.set_data({"count-shares-needed": k,
|
r.set_data({"count-shares-needed": k,
|
||||||
"count-shares-expected": N,
|
"count-shares-expected": N,
|
||||||
})
|
})
|
||||||
@ -226,9 +229,7 @@ class SimpleCHKFileVerifier(download.FileDownloader):
|
|||||||
d.addCallback(self._got_all_shareholders)
|
d.addCallback(self._got_all_shareholders)
|
||||||
# now get the uri_extension block from somebody and validate it
|
# now get the uri_extension block from somebody and validate it
|
||||||
d.addCallback(self._obtain_uri_extension)
|
d.addCallback(self._obtain_uri_extension)
|
||||||
d.addCallback(self._got_uri_extension)
|
d.addCallback(self._get_crypttext_hash_tree)
|
||||||
d.addCallback(self._get_hashtrees)
|
|
||||||
d.addCallback(self._create_validated_buckets)
|
|
||||||
# once we know that, we can download blocks from everybody
|
# once we know that, we can download blocks from everybody
|
||||||
d.addCallback(self._download_all_segments)
|
d.addCallback(self._download_all_segments)
|
||||||
d.addCallback(self._done)
|
d.addCallback(self._done)
|
||||||
|
@ -8,10 +8,10 @@ from foolscap import DeadReferenceError
|
|||||||
from foolscap.eventual import eventually
|
from foolscap.eventual import eventually
|
||||||
|
|
||||||
from allmydata.util import base32, mathutil, hashutil, log, observer
|
from allmydata.util import base32, mathutil, hashutil, log, observer
|
||||||
from allmydata.util.assertutil import _assert
|
from allmydata.util.assertutil import _assert, precondition
|
||||||
from allmydata import codec, hashtree, storage, uri
|
from allmydata import codec, hashtree, storage, uri
|
||||||
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, \
|
from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
|
||||||
IDownloadStatus, IDownloadResults, NotEnoughSharesError
|
IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError
|
||||||
from allmydata.immutable import layout
|
from allmydata.immutable import layout
|
||||||
from pycryptopp.cipher.aes import AES
|
from pycryptopp.cipher.aes import AES
|
||||||
|
|
||||||
@ -26,7 +26,7 @@ class BadURIExtensionHashValue(IntegrityCheckError):
|
|||||||
pass
|
pass
|
||||||
class BadURIExtension(IntegrityCheckError):
|
class BadURIExtension(IntegrityCheckError):
|
||||||
pass
|
pass
|
||||||
class BadPlaintextHashValue(IntegrityCheckError):
|
class UnsupportedErasureCodec(BadURIExtension):
|
||||||
pass
|
pass
|
||||||
class BadCrypttextHashValue(IntegrityCheckError):
|
class BadCrypttextHashValue(IntegrityCheckError):
|
||||||
pass
|
pass
|
||||||
@ -50,11 +50,9 @@ class Output:
|
|||||||
self.downloadable = downloadable
|
self.downloadable = downloadable
|
||||||
self._decryptor = AES(key)
|
self._decryptor = AES(key)
|
||||||
self._crypttext_hasher = hashutil.crypttext_hasher()
|
self._crypttext_hasher = hashutil.crypttext_hasher()
|
||||||
self._plaintext_hasher = hashutil.plaintext_hasher()
|
|
||||||
self.length = 0
|
self.length = 0
|
||||||
self.total_length = total_length
|
self.total_length = total_length
|
||||||
self._segment_number = 0
|
self._segment_number = 0
|
||||||
self._plaintext_hash_tree = None
|
|
||||||
self._crypttext_hash_tree = None
|
self._crypttext_hash_tree = None
|
||||||
self._opened = False
|
self._opened = False
|
||||||
self._log_parent = log_parent
|
self._log_parent = log_parent
|
||||||
@ -68,9 +66,8 @@ class Output:
|
|||||||
kwargs["facility"] = "download.output"
|
kwargs["facility"] = "download.output"
|
||||||
return log.msg(*args, **kwargs)
|
return log.msg(*args, **kwargs)
|
||||||
|
|
||||||
def setup_hashtrees(self, plaintext_hashtree, crypttext_hashtree):
|
def got_crypttext_hash_tree(self, crypttext_hash_tree):
|
||||||
self._plaintext_hash_tree = plaintext_hashtree
|
self._crypttext_hash_tree = crypttext_hash_tree
|
||||||
self._crypttext_hash_tree = crypttext_hashtree
|
|
||||||
|
|
||||||
def write_segment(self, crypttext):
|
def write_segment(self, crypttext):
|
||||||
self.length += len(crypttext)
|
self.length += len(crypttext)
|
||||||
@ -94,18 +91,6 @@ class Output:
|
|||||||
del crypttext
|
del crypttext
|
||||||
|
|
||||||
# now we're back down to 1*segment_size.
|
# now we're back down to 1*segment_size.
|
||||||
|
|
||||||
self._plaintext_hasher.update(plaintext)
|
|
||||||
if self._plaintext_hash_tree:
|
|
||||||
ph = hashutil.plaintext_segment_hasher()
|
|
||||||
ph.update(plaintext)
|
|
||||||
plaintext_leaves = {self._segment_number: ph.digest()}
|
|
||||||
self.log(format="plaintext leaf hash (%(bytes)sB) [%(segnum)d] is %(hash)s",
|
|
||||||
bytes=len(plaintext),
|
|
||||||
segnum=self._segment_number, hash=base32.b2a(ph.digest()),
|
|
||||||
level=log.NOISY)
|
|
||||||
self._plaintext_hash_tree.set_hashes(leaves=plaintext_leaves)
|
|
||||||
|
|
||||||
self._segment_number += 1
|
self._segment_number += 1
|
||||||
# We're still at 1*segment_size. The Downloadable is responsible for
|
# We're still at 1*segment_size. The Downloadable is responsible for
|
||||||
# any memory usage beyond this.
|
# any memory usage beyond this.
|
||||||
@ -127,14 +112,230 @@ class Output:
|
|||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.crypttext_hash = self._crypttext_hasher.digest()
|
self.crypttext_hash = self._crypttext_hasher.digest()
|
||||||
self.plaintext_hash = self._plaintext_hasher.digest()
|
|
||||||
self.log("download finished, closing IDownloadable", level=log.NOISY)
|
self.log("download finished, closing IDownloadable", level=log.NOISY)
|
||||||
self.downloadable.close()
|
self.downloadable.close()
|
||||||
|
|
||||||
def finish(self):
|
def finish(self):
|
||||||
return self.downloadable.finish()
|
return self.downloadable.finish()
|
||||||
|
|
||||||
class ValidatedBucket:
|
class ValidatedThingObtainer:
|
||||||
|
def __init__(self, validatedthingproxies, debugname, log_id):
|
||||||
|
self._validatedthingproxies = validatedthingproxies
|
||||||
|
self._debugname = debugname
|
||||||
|
self._log_id = log_id
|
||||||
|
|
||||||
|
def _bad(self, f, validatedthingproxy):
|
||||||
|
level = log.WEIRD
|
||||||
|
if f.check(DeadReferenceError):
|
||||||
|
level = log.UNUSUAL
|
||||||
|
log.msg(parent=self._log_id, facility="tahoe.immutable.download",
|
||||||
|
format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
|
||||||
|
op=self._debugname, validatedthingproxy=str(validatedthingproxy),
|
||||||
|
failure=f, level=level, umid="JGXxBA")
|
||||||
|
if not self._validatedthingproxies:
|
||||||
|
raise NotEnoughSharesError("ran out of peers, last error was %s" % (f,))
|
||||||
|
# try again with a different one
|
||||||
|
return self._try_the_next_one()
|
||||||
|
|
||||||
|
def _try_the_next_one(self):
|
||||||
|
vtp = self._validatedthingproxies.pop(0)
|
||||||
|
d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
|
||||||
|
d.addErrback(self._bad, vtp)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
return self._try_the_next_one()
|
||||||
|
|
||||||
|
class ValidatedCrypttextHashTreeProxy:
|
||||||
|
implements(IValidatedThingProxy)
|
||||||
|
""" I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
|
||||||
|
its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
|
||||||
|
start() method that fires with self once I get a valid one). """
|
||||||
|
def __init__(self, readbucketproxy, crypttext_hash_tree, fetch_failures=None):
|
||||||
|
# fetch_failures is for debugging -- see test_encode.py
|
||||||
|
self._readbucketproxy = readbucketproxy
|
||||||
|
self._fetch_failures = fetch_failures
|
||||||
|
self._crypttext_hash_tree = crypttext_hash_tree
|
||||||
|
|
||||||
|
def _validate(self, proposal):
|
||||||
|
ct_hashes = dict(list(enumerate(proposal)))
|
||||||
|
try:
|
||||||
|
self._crypttext_hash_tree.set_hashes(ct_hashes)
|
||||||
|
except hashtree.BadHashError:
|
||||||
|
if self._fetch_failures is not None:
|
||||||
|
self._fetch_failures["crypttext_hash_tree"] += 1
|
||||||
|
raise
|
||||||
|
return self
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
d = self._readbucketproxy.startIfNecessary()
|
||||||
|
d.addCallback(lambda ignored: self._readbucketproxy.get_crypttext_hashes())
|
||||||
|
d.addCallback(self._validate)
|
||||||
|
return d
|
||||||
|
|
||||||
|
class ValidatedExtendedURIProxy:
|
||||||
|
implements(IValidatedThingProxy)
|
||||||
|
""" I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
|
||||||
|
retrieving and validating the elements from the UEB. """
|
||||||
|
|
||||||
|
def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
|
||||||
|
# fetch_failures is for debugging -- see test_encode.py
|
||||||
|
self._fetch_failures = fetch_failures
|
||||||
|
self._readbucketproxy = readbucketproxy
|
||||||
|
precondition(IVerifierURI.providedBy(verifycap), verifycap)
|
||||||
|
self._verifycap = verifycap
|
||||||
|
|
||||||
|
# required
|
||||||
|
self.segment_size = None
|
||||||
|
self.crypttext_root_hash = None
|
||||||
|
self.share_root_hash = None
|
||||||
|
|
||||||
|
# computed
|
||||||
|
self.num_segments = None
|
||||||
|
self.tail_segment_size = None
|
||||||
|
|
||||||
|
# optional
|
||||||
|
self.crypttext_hash = None
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
|
||||||
|
|
||||||
|
def _check_integrity(self, data):
|
||||||
|
h = hashutil.uri_extension_hash(data)
|
||||||
|
if h != self._verifycap.uri_extension_hash:
|
||||||
|
msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
|
||||||
|
(self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
|
||||||
|
if self._fetch_failures is not None:
|
||||||
|
self._fetch_failures["uri_extension"] += 1
|
||||||
|
raise BadURIExtensionHashValue(msg)
|
||||||
|
else:
|
||||||
|
return data
|
||||||
|
|
||||||
|
def _parse_and_validate(self, data):
|
||||||
|
d = uri.unpack_extension(data)
|
||||||
|
|
||||||
|
# There are several kinds of things that can be found in a UEB. First, things that we
|
||||||
|
# really need to learn from the UEB in order to do this download. Next: things which are
|
||||||
|
# optional but not redundant -- if they are present in the UEB they will get used. Next,
|
||||||
|
# things that are optional and redundant. These things are required to be consistent:
|
||||||
|
# they don't have to be in the UEB, but if they are in the UEB then they will be checked
|
||||||
|
# for consistency with the already-known facts, and if they are inconsistent then an
|
||||||
|
# exception will be raised. These things aren't actually used -- they are just tested
|
||||||
|
# for consistency and ignored. Finally: things which are deprecated -- they ought not be
|
||||||
|
# in the UEB at all, and if they are present then a warning will be logged but they are
|
||||||
|
# otherwise ignored.
|
||||||
|
|
||||||
|
# First, things that we really need to learn from the UEB: segment_size,
|
||||||
|
# crypttext_root_hash, and share_root_hash.
|
||||||
|
self.segment_size = d['segment_size']
|
||||||
|
|
||||||
|
self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
|
||||||
|
|
||||||
|
tail_data_size = self._verifycap.size % self.segment_size
|
||||||
|
if not tail_data_size:
|
||||||
|
tail_data_size = self.segment_size
|
||||||
|
# padding for erasure code
|
||||||
|
self.tail_segment_size = mathutil.next_multiple(tail_data_size, self._verifycap.needed_shares)
|
||||||
|
|
||||||
|
# Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
|
||||||
|
# matches this read-cap or verify-cap. The integrity check on the shares is not
|
||||||
|
# sufficient to prevent the original encoder from creating some shares of file A and
|
||||||
|
# other shares of file B.
|
||||||
|
self.crypttext_root_hash = d['crypttext_root_hash']
|
||||||
|
|
||||||
|
self.share_root_hash = d['share_root_hash']
|
||||||
|
|
||||||
|
|
||||||
|
# Next: things that are optional and not redundant: crypttext_hash
|
||||||
|
if d.has_key('crypttext_hash'):
|
||||||
|
self.crypttext_hash = d['crypttext_hash']
|
||||||
|
if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
|
||||||
|
raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
|
||||||
|
|
||||||
|
|
||||||
|
# Next: things that are optional, redundant, and required to be consistent: codec_name,
|
||||||
|
# codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
|
||||||
|
if d.has_key('codec_name'):
|
||||||
|
if d['codec_name'] != "crs":
|
||||||
|
raise UnsupportedErasureCodec(d['codec_name'])
|
||||||
|
|
||||||
|
if d.has_key('codec_params'):
|
||||||
|
ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
|
||||||
|
if ucpss != self.segment_size:
|
||||||
|
raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
|
||||||
|
"self.segment_size: %s" % (ucpss, self.segment_size))
|
||||||
|
if ucpns != self._verifycap.needed_shares:
|
||||||
|
raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
|
||||||
|
"self._verifycap.needed_shares: %s" % (ucpns,
|
||||||
|
self._verifycap.needed_shares))
|
||||||
|
if ucpts != self._verifycap.total_shares:
|
||||||
|
raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
|
||||||
|
"self._verifycap.total_shares: %s" % (ucpts,
|
||||||
|
self._verifycap.total_shares))
|
||||||
|
|
||||||
|
if d.has_key('tail_codec_params'):
|
||||||
|
utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
|
||||||
|
if utcpss != self.tail_segment_size:
|
||||||
|
raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
|
||||||
|
"self.tail_segment_size: %s, self._verifycap.size: %s, "
|
||||||
|
"self.segment_size: %s, self._verifycap.needed_shares: %s"
|
||||||
|
% (utcpss, self.tail_segment_size, self._verifycap.size,
|
||||||
|
self.segment_size, self._verifycap.needed_shares))
|
||||||
|
if utcpns != self._verifycap.needed_shares:
|
||||||
|
raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
|
||||||
|
"self._verifycap.needed_shares: %s" % (utcpns,
|
||||||
|
self._verifycap.needed_shares))
|
||||||
|
if utcpts != self._verifycap.total_shares:
|
||||||
|
raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
|
||||||
|
"self._verifycap.total_shares: %s" % (utcpts,
|
||||||
|
self._verifycap.total_shares))
|
||||||
|
|
||||||
|
if d.has_key('num_segments'):
|
||||||
|
if d['num_segments'] != self.num_segments:
|
||||||
|
raise BadURIExtension("inconsistent num_segments: size: %s, "
|
||||||
|
"segment_size: %s, computed_num_segments: %s, "
|
||||||
|
"ueb_num_segments: %s" % (self._verifycap.size,
|
||||||
|
self.segment_size,
|
||||||
|
self.num_segments, d['num_segments']))
|
||||||
|
|
||||||
|
if d.has_key('size'):
|
||||||
|
if d['size'] != self._verifycap.size:
|
||||||
|
raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
|
||||||
|
(self._verifycap.size, d['size']))
|
||||||
|
|
||||||
|
if d.has_key('needed_shares'):
|
||||||
|
if d['needed_shares'] != self._verifycap.needed_shares:
|
||||||
|
raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
|
||||||
|
"needed shares: %s" % (self._verifycap.total_shares,
|
||||||
|
d['needed_shares']))
|
||||||
|
|
||||||
|
if d.has_key('total_shares'):
|
||||||
|
if d['total_shares'] != self._verifycap.total_shares:
|
||||||
|
raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
|
||||||
|
"total shares: %s" % (self._verifycap.total_shares,
|
||||||
|
d['total_shares']))
|
||||||
|
|
||||||
|
# Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
|
||||||
|
if d.get('plaintext_hash'):
|
||||||
|
log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
|
||||||
|
"and is no longer used. Ignoring. %s" % (self,))
|
||||||
|
if d.get('plaintext_root_hash'):
|
||||||
|
log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
|
||||||
|
"reasons and is no longer used. Ignoring. %s" % (self,))
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
""" Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
|
||||||
|
it. Returns a deferred which is called back with self once the fetch is successful, or
|
||||||
|
is erred back if it fails. """
|
||||||
|
d = self._readbucketproxy.startIfNecessary()
|
||||||
|
d.addCallback(lambda ignored: self._readbucketproxy.get_uri_extension())
|
||||||
|
d.addCallback(self._check_integrity)
|
||||||
|
d.addCallback(self._parse_and_validate)
|
||||||
|
return d
|
||||||
|
|
||||||
|
class ValidatedReadBucketProxy:
|
||||||
"""I am a front-end for a remote storage bucket, responsible for
|
"""I am a front-end for a remote storage bucket, responsible for
|
||||||
retrieving and validating data from that bucket.
|
retrieving and validating data from that bucket.
|
||||||
|
|
||||||
@ -142,13 +343,14 @@ class ValidatedBucket:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, sharenum, bucket,
|
def __init__(self, sharenum, bucket,
|
||||||
share_hash_tree, roothash,
|
share_hash_tree, share_root_hash,
|
||||||
num_blocks):
|
num_blocks):
|
||||||
|
""" share_root_hash is the root of the share hash tree; share_root_hash is stored in the UEB """
|
||||||
self.sharenum = sharenum
|
self.sharenum = sharenum
|
||||||
self.bucket = bucket
|
self.bucket = bucket
|
||||||
self._share_hash = None # None means not validated yet
|
self._share_hash = None # None means not validated yet
|
||||||
self.share_hash_tree = share_hash_tree
|
self.share_hash_tree = share_hash_tree
|
||||||
self._roothash = roothash
|
self._share_root_hash = share_root_hash
|
||||||
self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks)
|
self.block_hash_tree = hashtree.IncompleteHashTree(num_blocks)
|
||||||
self.started = False
|
self.started = False
|
||||||
|
|
||||||
@ -191,7 +393,7 @@ class ValidatedBucket:
|
|||||||
try:
|
try:
|
||||||
if not self._share_hash:
|
if not self._share_hash:
|
||||||
sh = dict(sharehashes)
|
sh = dict(sharehashes)
|
||||||
sh[0] = self._roothash # always use our own root, from the URI
|
sh[0] = self._share_root_hash # always use our own root, from the URI
|
||||||
sht = self.share_hash_tree
|
sht = self.share_hash_tree
|
||||||
if sht.get_leaf_index(self.sharenum) not in sh:
|
if sht.get_leaf_index(self.sharenum) not in sh:
|
||||||
raise hashtree.NotEnoughHashesError
|
raise hashtree.NotEnoughHashesError
|
||||||
@ -231,7 +433,7 @@ class ValidatedBucket:
|
|||||||
else:
|
else:
|
||||||
log.msg(" block data start/end: %r .. %r" %
|
log.msg(" block data start/end: %r .. %r" %
|
||||||
(blockdata[:50], blockdata[-50:]))
|
(blockdata[:50], blockdata[-50:]))
|
||||||
log.msg(" root hash: %s" % base32.b2a(self._roothash))
|
log.msg(" root hash: %s" % base32.b2a(self._share_root_hash))
|
||||||
log.msg(" share hash tree:\n" + self.share_hash_tree.dump())
|
log.msg(" share hash tree:\n" + self.share_hash_tree.dump())
|
||||||
log.msg(" block hash tree:\n" + self.block_hash_tree.dump())
|
log.msg(" block hash tree:\n" + self.block_hash_tree.dump())
|
||||||
lines = []
|
lines = []
|
||||||
@ -426,19 +628,16 @@ class DownloadStatus:
|
|||||||
|
|
||||||
class FileDownloader:
|
class FileDownloader:
|
||||||
implements(IPushProducer)
|
implements(IPushProducer)
|
||||||
check_crypttext_hash = True
|
|
||||||
check_plaintext_hash = True
|
|
||||||
_status = None
|
_status = None
|
||||||
|
|
||||||
def __init__(self, client, u, downloadable):
|
def __init__(self, client, u, downloadable):
|
||||||
|
precondition(isinstance(u, uri.CHKFileURI), u)
|
||||||
self._client = client
|
self._client = client
|
||||||
|
|
||||||
u = IFileURI(u)
|
self._uri = u
|
||||||
self._storage_index = u.storage_index
|
self._storage_index = u.storage_index
|
||||||
self._uri_extension_hash = u.uri_extension_hash
|
self._uri_extension_hash = u.uri_extension_hash
|
||||||
self._total_shares = u.total_shares
|
self._vup = None # ValidatedExtendedURIProxy
|
||||||
self._size = u.size
|
|
||||||
self._num_needed_shares = u.needed_shares
|
|
||||||
|
|
||||||
self._si_s = storage.si_b2a(self._storage_index)
|
self._si_s = storage.si_b2a(self._storage_index)
|
||||||
self.init_logging()
|
self.init_logging()
|
||||||
@ -447,13 +646,13 @@ class FileDownloader:
|
|||||||
self._status = s = DownloadStatus()
|
self._status = s = DownloadStatus()
|
||||||
s.set_status("Starting")
|
s.set_status("Starting")
|
||||||
s.set_storage_index(self._storage_index)
|
s.set_storage_index(self._storage_index)
|
||||||
s.set_size(self._size)
|
s.set_size(self._uri.size)
|
||||||
s.set_helper(False)
|
s.set_helper(False)
|
||||||
s.set_active(True)
|
s.set_active(True)
|
||||||
|
|
||||||
self._results = DownloadResults()
|
self._results = DownloadResults()
|
||||||
s.set_results(self._results)
|
s.set_results(self._results)
|
||||||
self._results.file_size = self._size
|
self._results.file_size = self._uri.size
|
||||||
self._results.timings["servers_peer_selection"] = {}
|
self._results.timings["servers_peer_selection"] = {}
|
||||||
self._results.timings["fetch_per_server"] = {}
|
self._results.timings["fetch_per_server"] = {}
|
||||||
self._results.timings["cumulative_fetch"] = 0.0
|
self._results.timings["cumulative_fetch"] = 0.0
|
||||||
@ -466,22 +665,16 @@ class FileDownloader:
|
|||||||
if IConsumer.providedBy(downloadable):
|
if IConsumer.providedBy(downloadable):
|
||||||
downloadable.registerProducer(self, True)
|
downloadable.registerProducer(self, True)
|
||||||
self._downloadable = downloadable
|
self._downloadable = downloadable
|
||||||
self._output = Output(downloadable, u.key, self._size, self._log_number,
|
self._output = Output(downloadable, u.key, self._uri.size, self._log_number,
|
||||||
self._status)
|
self._status)
|
||||||
|
|
||||||
self.active_buckets = {} # k: shnum, v: bucket
|
self.active_buckets = {} # k: shnum, v: bucket
|
||||||
self._share_buckets = [] # list of (sharenum, bucket) tuples
|
self._share_buckets = [] # list of (sharenum, bucket) tuples
|
||||||
self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
|
self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
|
||||||
self._uri_extension_sources = []
|
|
||||||
|
|
||||||
self._uri_extension_data = None
|
self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
|
||||||
|
|
||||||
self._fetch_failures = {"uri_extension": 0,
|
self._crypttext_hash_tree = None
|
||||||
"plaintext_hashroot": 0,
|
|
||||||
"plaintext_hashtree": 0,
|
|
||||||
"crypttext_hashroot": 0,
|
|
||||||
"crypttext_hashtree": 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
def init_logging(self):
|
def init_logging(self):
|
||||||
self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
|
self._log_prefix = prefix = storage.si_b2a(self._storage_index)[:5]
|
||||||
@ -523,16 +716,15 @@ class FileDownloader:
|
|||||||
self._status.set_active(False)
|
self._status.set_active(False)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
|
assert isinstance(self._uri, uri.CHKFileURI), (self._uri, type(self._uri))
|
||||||
self.log("starting download")
|
self.log("starting download")
|
||||||
|
|
||||||
# first step: who should we download from?
|
# first step: who should we download from?
|
||||||
d = defer.maybeDeferred(self._get_all_shareholders)
|
d = defer.maybeDeferred(self._get_all_shareholders)
|
||||||
d.addCallback(self._got_all_shareholders)
|
d.addCallback(self._got_all_shareholders)
|
||||||
# now get the uri_extension block from somebody and validate it
|
# now get the uri_extension block from somebody and integrity check it and parse and validate its contents
|
||||||
d.addCallback(self._obtain_uri_extension)
|
d.addCallback(self._obtain_uri_extension)
|
||||||
d.addCallback(self._got_uri_extension)
|
d.addCallback(self._get_crypttext_hash_tree)
|
||||||
d.addCallback(self._get_hashtrees)
|
|
||||||
d.addCallback(self._create_validated_buckets)
|
|
||||||
# once we know that, we can download blocks from everybody
|
# once we know that, we can download blocks from everybody
|
||||||
d.addCallback(self._download_all_segments)
|
d.addCallback(self._download_all_segments)
|
||||||
def _finished(res):
|
def _finished(res):
|
||||||
@ -582,7 +774,7 @@ class FileDownloader:
|
|||||||
for sharenum, bucket in buckets.iteritems():
|
for sharenum, bucket in buckets.iteritems():
|
||||||
b = layout.ReadBucketProxy(bucket, peerid, self._si_s)
|
b = layout.ReadBucketProxy(bucket, peerid, self._si_s)
|
||||||
self.add_share_bucket(sharenum, b)
|
self.add_share_bucket(sharenum, b)
|
||||||
self._uri_extension_sources.append(b)
|
|
||||||
if self._results:
|
if self._results:
|
||||||
if peerid not in self._results.servermap:
|
if peerid not in self._results.servermap:
|
||||||
self._results.servermap[peerid] = set()
|
self._results.servermap[peerid] = set()
|
||||||
@ -603,7 +795,7 @@ class FileDownloader:
|
|||||||
shnum = vbucket.sharenum
|
shnum = vbucket.sharenum
|
||||||
del self.active_buckets[shnum]
|
del self.active_buckets[shnum]
|
||||||
s = self._share_vbuckets[shnum]
|
s = self._share_vbuckets[shnum]
|
||||||
# s is a set of ValidatedBucket instances
|
# s is a set of ValidatedReadBucketProxy instances
|
||||||
s.remove(vbucket)
|
s.remove(vbucket)
|
||||||
# ... which might now be empty
|
# ... which might now be empty
|
||||||
if not s:
|
if not s:
|
||||||
@ -612,22 +804,21 @@ class FileDownloader:
|
|||||||
del self._share_vbuckets[shnum]
|
del self._share_vbuckets[shnum]
|
||||||
|
|
||||||
def _got_all_shareholders(self, res):
|
def _got_all_shareholders(self, res):
|
||||||
|
assert isinstance(self._uri, uri.CHKFileURI), (self._uri, type(self._uri))
|
||||||
if self._results:
|
if self._results:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
self._results.timings["peer_selection"] = now - self._started
|
self._results.timings["peer_selection"] = now - self._started
|
||||||
|
|
||||||
if len(self._share_buckets) < self._num_needed_shares:
|
if len(self._share_buckets) < self._uri.needed_shares:
|
||||||
raise NotEnoughSharesError
|
raise NotEnoughSharesError
|
||||||
|
|
||||||
#for s in self._share_vbuckets.values():
|
#for s in self._share_vbuckets.values():
|
||||||
# for vb in s:
|
# for vb in s:
|
||||||
# assert isinstance(vb, ValidatedBucket), \
|
# assert isinstance(vb, ValidatedReadBucketProxy), \
|
||||||
# "vb is %s but should be a ValidatedBucket" % (vb,)
|
# "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
|
||||||
|
|
||||||
def _unpack_uri_extension_data(self, data):
|
|
||||||
return uri.unpack_extension(data)
|
|
||||||
|
|
||||||
def _obtain_uri_extension(self, ignored):
|
def _obtain_uri_extension(self, ignored):
|
||||||
|
assert isinstance(self._uri, uri.CHKFileURI), self._uri
|
||||||
# all shareholders are supposed to have a copy of uri_extension, and
|
# all shareholders are supposed to have a copy of uri_extension, and
|
||||||
# all are supposed to be identical. We compute the hash of the data
|
# all are supposed to be identical. We compute the hash of the data
|
||||||
# that comes back, and compare it against the version in our URI. If
|
# that comes back, and compare it against the version in our URI. If
|
||||||
@ -635,171 +826,64 @@ class FileDownloader:
|
|||||||
if self._status:
|
if self._status:
|
||||||
self._status.set_status("Obtaining URI Extension")
|
self._status.set_status("Obtaining URI Extension")
|
||||||
|
|
||||||
self._uri_extension_fetch_started = time.time()
|
uri_extension_fetch_started = time.time()
|
||||||
def _validate(proposal, bucket):
|
|
||||||
h = hashutil.uri_extension_hash(proposal)
|
|
||||||
if h != self._uri_extension_hash:
|
|
||||||
self._fetch_failures["uri_extension"] += 1
|
|
||||||
msg = ("The copy of uri_extension we received from "
|
|
||||||
"%s was bad: wanted %s, got %s" %
|
|
||||||
(bucket,
|
|
||||||
base32.b2a(self._uri_extension_hash),
|
|
||||||
base32.b2a(h)))
|
|
||||||
self.log(msg, level=log.SCARY, umid="jnkTtQ")
|
|
||||||
raise BadURIExtensionHashValue(msg)
|
|
||||||
return self._unpack_uri_extension_data(proposal)
|
|
||||||
return self._obtain_validated_thing(None,
|
|
||||||
self._uri_extension_sources,
|
|
||||||
"uri_extension",
|
|
||||||
"get_uri_extension", (), _validate)
|
|
||||||
|
|
||||||
def _obtain_validated_thing(self, ignored, sources, name, methname, args,
|
vups = []
|
||||||
validatorfunc):
|
|
||||||
if not sources:
|
|
||||||
raise NotEnoughSharesError("started with zero peers while fetching "
|
|
||||||
"%s" % name)
|
|
||||||
bucket = sources[0]
|
|
||||||
sources = sources[1:]
|
|
||||||
#d = bucket.callRemote(methname, *args)
|
|
||||||
d = bucket.startIfNecessary()
|
|
||||||
d.addCallback(lambda res: getattr(bucket, methname)(*args))
|
|
||||||
d.addCallback(validatorfunc, bucket)
|
|
||||||
def _bad(f):
|
|
||||||
level = log.WEIRD
|
|
||||||
if f.check(DeadReferenceError):
|
|
||||||
level = log.UNUSUAL
|
|
||||||
self.log(format="operation %(op)s from vbucket %(vbucket)s failed",
|
|
||||||
op=name, vbucket=str(bucket),
|
|
||||||
failure=f, level=level, umid="JGXxBA")
|
|
||||||
if not sources:
|
|
||||||
raise NotEnoughSharesError("ran out of peers, last error was %s"
|
|
||||||
% (f,))
|
|
||||||
# try again with a different one
|
|
||||||
return self._obtain_validated_thing(None, sources, name,
|
|
||||||
methname, args, validatorfunc)
|
|
||||||
d.addErrback(_bad)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _got_uri_extension(self, uri_extension_data):
|
|
||||||
if self._results:
|
|
||||||
elapsed = time.time() - self._uri_extension_fetch_started
|
|
||||||
self._results.timings["uri_extension"] = elapsed
|
|
||||||
|
|
||||||
d = self._uri_extension_data = uri_extension_data
|
|
||||||
|
|
||||||
self._codec = codec.get_decoder_by_name(d['codec_name'])
|
|
||||||
self._codec.set_serialized_params(d['codec_params'])
|
|
||||||
self._tail_codec = codec.get_decoder_by_name(d['codec_name'])
|
|
||||||
self._tail_codec.set_serialized_params(d['tail_codec_params'])
|
|
||||||
|
|
||||||
crypttext_hash = d.get('crypttext_hash', None) # optional
|
|
||||||
if crypttext_hash:
|
|
||||||
assert isinstance(crypttext_hash, str)
|
|
||||||
assert len(crypttext_hash) == 32
|
|
||||||
self._crypttext_hash = crypttext_hash
|
|
||||||
self._plaintext_hash = d.get('plaintext_hash', None) # optional
|
|
||||||
|
|
||||||
self._roothash = d['share_root_hash']
|
|
||||||
|
|
||||||
self._segment_size = segment_size = d['segment_size']
|
|
||||||
self._total_segments = mathutil.div_ceil(self._size, segment_size)
|
|
||||||
self._current_segnum = 0
|
|
||||||
|
|
||||||
self._share_hashtree = hashtree.IncompleteHashTree(d['total_shares'])
|
|
||||||
self._share_hashtree.set_hashes({0: self._roothash})
|
|
||||||
|
|
||||||
def _get_hashtrees(self, res):
|
|
||||||
self._get_hashtrees_started = time.time()
|
|
||||||
if self._status:
|
|
||||||
self._status.set_status("Retrieving Hash Trees")
|
|
||||||
d = defer.maybeDeferred(self._get_plaintext_hashtrees)
|
|
||||||
d.addCallback(self._get_crypttext_hashtrees)
|
|
||||||
d.addCallback(self._setup_hashtrees)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _get_plaintext_hashtrees(self):
|
|
||||||
# plaintext hashes are optional. If the root isn't in the UEB, then
|
|
||||||
# the share will be holding an empty list. We don't even bother
|
|
||||||
# fetching it.
|
|
||||||
if "plaintext_root_hash" not in self._uri_extension_data:
|
|
||||||
self._plaintext_hashtree = None
|
|
||||||
return
|
|
||||||
def _validate_plaintext_hashtree(proposal, bucket):
|
|
||||||
if proposal[0] != self._uri_extension_data['plaintext_root_hash']:
|
|
||||||
self._fetch_failures["plaintext_hashroot"] += 1
|
|
||||||
msg = ("The copy of the plaintext_root_hash we received from"
|
|
||||||
" %s was bad" % bucket)
|
|
||||||
raise BadPlaintextHashValue(msg)
|
|
||||||
pt_hashtree = hashtree.IncompleteHashTree(self._total_segments)
|
|
||||||
pt_hashes = dict(list(enumerate(proposal)))
|
|
||||||
try:
|
|
||||||
pt_hashtree.set_hashes(pt_hashes)
|
|
||||||
except hashtree.BadHashError:
|
|
||||||
# the hashes they gave us were not self-consistent, even
|
|
||||||
# though the root matched what we saw in the uri_extension
|
|
||||||
# block
|
|
||||||
self._fetch_failures["plaintext_hashtree"] += 1
|
|
||||||
raise
|
|
||||||
self._plaintext_hashtree = pt_hashtree
|
|
||||||
d = self._obtain_validated_thing(None,
|
|
||||||
self._uri_extension_sources,
|
|
||||||
"plaintext_hashes",
|
|
||||||
"get_plaintext_hashes", (),
|
|
||||||
_validate_plaintext_hashtree)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _get_crypttext_hashtrees(self, res):
|
|
||||||
# Ciphertext hash tree root is mandatory, so that there is at
|
|
||||||
# most one ciphertext that matches this read-cap or
|
|
||||||
# verify-cap. The integrity check on the shares is not
|
|
||||||
# sufficient to prevent the original encoder from creating
|
|
||||||
# some shares of file A and other shares of file B.
|
|
||||||
if "crypttext_root_hash" not in self._uri_extension_data:
|
|
||||||
raise BadURIExtension("URI Extension block did not have the ciphertext hash tree root")
|
|
||||||
def _validate_crypttext_hashtree(proposal, bucket):
|
|
||||||
if proposal[0] != self._uri_extension_data['crypttext_root_hash']:
|
|
||||||
self._fetch_failures["crypttext_hashroot"] += 1
|
|
||||||
msg = ("The copy of the crypttext_root_hash we received from"
|
|
||||||
" %s was bad" % bucket)
|
|
||||||
raise BadCrypttextHashValue(msg)
|
|
||||||
ct_hashtree = hashtree.IncompleteHashTree(self._total_segments)
|
|
||||||
ct_hashes = dict(list(enumerate(proposal)))
|
|
||||||
try:
|
|
||||||
ct_hashtree.set_hashes(ct_hashes)
|
|
||||||
except hashtree.BadHashError:
|
|
||||||
self._fetch_failures["crypttext_hashtree"] += 1
|
|
||||||
raise
|
|
||||||
ct_hashtree.set_hashes(ct_hashes)
|
|
||||||
self._crypttext_hashtree = ct_hashtree
|
|
||||||
d = self._obtain_validated_thing(None,
|
|
||||||
self._uri_extension_sources,
|
|
||||||
"crypttext_hashes",
|
|
||||||
"get_crypttext_hashes", (),
|
|
||||||
_validate_crypttext_hashtree)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def _setup_hashtrees(self, res):
|
|
||||||
self._output.setup_hashtrees(self._plaintext_hashtree,
|
|
||||||
self._crypttext_hashtree)
|
|
||||||
if self._results:
|
|
||||||
elapsed = time.time() - self._get_hashtrees_started
|
|
||||||
self._results.timings["hashtrees"] = elapsed
|
|
||||||
|
|
||||||
def _create_validated_buckets(self, ignored=None):
|
|
||||||
self._share_vbuckets = {}
|
|
||||||
for sharenum, bucket in self._share_buckets:
|
for sharenum, bucket in self._share_buckets:
|
||||||
vbucket = ValidatedBucket(sharenum, bucket,
|
vups.append(ValidatedExtendedURIProxy(bucket, self._uri.get_verifier(), self._fetch_failures))
|
||||||
self._share_hashtree,
|
vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._log_number)
|
||||||
self._roothash,
|
d = vto.start()
|
||||||
self._total_segments)
|
|
||||||
s = self._share_vbuckets.setdefault(sharenum, set())
|
def _got_uri_extension(vup):
|
||||||
s.add(vbucket)
|
precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
|
||||||
|
if self._results:
|
||||||
|
elapsed = time.time() - uri_extension_fetch_started
|
||||||
|
self._results.timings["uri_extension"] = elapsed
|
||||||
|
|
||||||
|
self._vup = vup
|
||||||
|
self._codec = codec.CRSDecoder()
|
||||||
|
self._codec.set_params(self._vup.segment_size, self._uri.needed_shares, self._uri.total_shares)
|
||||||
|
self._tail_codec = codec.CRSDecoder()
|
||||||
|
self._tail_codec.set_params(self._vup.tail_segment_size, self._uri.needed_shares, self._uri.total_shares)
|
||||||
|
|
||||||
|
self._current_segnum = 0
|
||||||
|
|
||||||
|
self._share_hashtree = hashtree.IncompleteHashTree(self._uri.total_shares)
|
||||||
|
self._share_hashtree.set_hashes({0: vup.share_root_hash})
|
||||||
|
|
||||||
|
self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
|
||||||
|
self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
|
||||||
|
d.addCallback(_got_uri_extension)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _get_crypttext_hash_tree(self, res):
|
||||||
|
vchtps = []
|
||||||
|
for sharenum, bucket in self._share_buckets:
|
||||||
|
vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._fetch_failures)
|
||||||
|
vchtps.append(vchtp)
|
||||||
|
|
||||||
|
_get_crypttext_hash_tree_started = time.time()
|
||||||
|
if self._status:
|
||||||
|
self._status.set_status("Retrieving crypttext hash tree")
|
||||||
|
|
||||||
|
vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._log_number)
|
||||||
|
d = vto.start()
|
||||||
|
|
||||||
|
def _got_crypttext_hash_tree(res):
|
||||||
|
self._crypttext_hash_tree = res._crypttext_hash_tree
|
||||||
|
self._output.got_crypttext_hash_tree(self._crypttext_hash_tree)
|
||||||
|
if self._results:
|
||||||
|
elapsed = time.time() - _get_crypttext_hash_tree_started
|
||||||
|
self._results.timings["hashtrees"] = elapsed
|
||||||
|
d.addCallback(_got_crypttext_hash_tree)
|
||||||
|
return d
|
||||||
|
|
||||||
def _activate_enough_buckets(self):
|
def _activate_enough_buckets(self):
|
||||||
"""either return a mapping from shnum to a ValidatedBucket that can
|
"""either return a mapping from shnum to a ValidatedReadBucketProxy that can
|
||||||
provide data for that share, or raise NotEnoughSharesError"""
|
provide data for that share, or raise NotEnoughSharesError"""
|
||||||
|
assert isinstance(self._uri, uri.CHKFileURI), self._uri
|
||||||
|
|
||||||
while len(self.active_buckets) < self._num_needed_shares:
|
while len(self.active_buckets) < self._uri.needed_shares:
|
||||||
# need some more
|
# need some more
|
||||||
handled_shnums = set(self.active_buckets.keys())
|
handled_shnums = set(self.active_buckets.keys())
|
||||||
available_shnums = set(self._share_vbuckets.keys())
|
available_shnums = set(self._share_vbuckets.keys())
|
||||||
@ -815,23 +899,31 @@ class FileDownloader:
|
|||||||
|
|
||||||
|
|
||||||
def _download_all_segments(self, res):
|
def _download_all_segments(self, res):
|
||||||
# the promise: upon entry to this function, self._share_vbuckets
|
for sharenum, bucket in self._share_buckets:
|
||||||
# contains enough buckets to complete the download, and some extra
|
vbucket = ValidatedReadBucketProxy(sharenum, bucket,
|
||||||
# ones to tolerate some buckets dropping out or having errors.
|
self._share_hashtree,
|
||||||
# self._share_vbuckets is a dictionary that maps from shnum to a set
|
self._vup.share_root_hash,
|
||||||
# of ValidatedBuckets, which themselves are wrappers around
|
self._vup.num_segments)
|
||||||
# RIBucketReader references.
|
s = self._share_vbuckets.setdefault(sharenum, set())
|
||||||
self.active_buckets = {} # k: shnum, v: ValidatedBucket instance
|
s.add(vbucket)
|
||||||
|
|
||||||
|
# after the above code, self._share_vbuckets contains enough
|
||||||
|
# buckets to complete the download, and some extra ones to
|
||||||
|
# tolerate some buckets dropping out or having
|
||||||
|
# errors. self._share_vbuckets is a dictionary that maps from
|
||||||
|
# shnum to a set of ValidatedBuckets, which themselves are
|
||||||
|
# wrappers around RIBucketReader references.
|
||||||
|
self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
|
||||||
|
|
||||||
self._started_fetching = time.time()
|
self._started_fetching = time.time()
|
||||||
|
|
||||||
d = defer.succeed(None)
|
d = defer.succeed(None)
|
||||||
for segnum in range(self._total_segments-1):
|
for segnum in range(self._vup.num_segments-1):
|
||||||
d.addCallback(self._download_segment, segnum)
|
d.addCallback(self._download_segment, segnum)
|
||||||
# this pause, at the end of write, prevents pre-fetch from
|
# this pause, at the end of write, prevents pre-fetch from
|
||||||
# happening until the consumer is ready for more data.
|
# happening until the consumer is ready for more data.
|
||||||
d.addCallback(self._check_for_pause)
|
d.addCallback(self._check_for_pause)
|
||||||
d.addCallback(self._download_tail_segment, self._total_segments-1)
|
d.addCallback(self._download_tail_segment, self._vup.num_segments-1)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _check_for_pause(self, res):
|
def _check_for_pause(self, res):
|
||||||
@ -844,15 +936,16 @@ class FileDownloader:
|
|||||||
return res
|
return res
|
||||||
|
|
||||||
def _download_segment(self, res, segnum):
|
def _download_segment(self, res, segnum):
|
||||||
|
assert isinstance(self._uri, uri.CHKFileURI), self._uri
|
||||||
if self._status:
|
if self._status:
|
||||||
self._status.set_status("Downloading segment %d of %d" %
|
self._status.set_status("Downloading segment %d of %d" %
|
||||||
(segnum+1, self._total_segments))
|
(segnum+1, self._vup.num_segments))
|
||||||
self.log("downloading seg#%d of %d (%d%%)"
|
self.log("downloading seg#%d of %d (%d%%)"
|
||||||
% (segnum, self._total_segments,
|
% (segnum, self._vup.num_segments,
|
||||||
100.0 * segnum / self._total_segments))
|
100.0 * segnum / self._vup.num_segments))
|
||||||
# memory footprint: when the SegmentDownloader finishes pulling down
|
# memory footprint: when the SegmentDownloader finishes pulling down
|
||||||
# all shares, we have 1*segment_size of usage.
|
# all shares, we have 1*segment_size of usage.
|
||||||
segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares,
|
segmentdler = SegmentDownloader(self, segnum, self._uri.needed_shares,
|
||||||
self._results)
|
self._results)
|
||||||
started = time.time()
|
started = time.time()
|
||||||
d = segmentdler.start()
|
d = segmentdler.start()
|
||||||
@ -904,10 +997,11 @@ class FileDownloader:
|
|||||||
return d
|
return d
|
||||||
|
|
||||||
def _download_tail_segment(self, res, segnum):
|
def _download_tail_segment(self, res, segnum):
|
||||||
|
assert isinstance(self._uri, uri.CHKFileURI), self._uri
|
||||||
self.log("downloading seg#%d of %d (%d%%)"
|
self.log("downloading seg#%d of %d (%d%%)"
|
||||||
% (segnum, self._total_segments,
|
% (segnum, self._vup.num_segments,
|
||||||
100.0 * segnum / self._total_segments))
|
100.0 * segnum / self._vup.num_segments))
|
||||||
segmentdler = SegmentDownloader(self, segnum, self._num_needed_shares,
|
segmentdler = SegmentDownloader(self, segnum, self._uri.needed_shares,
|
||||||
self._results)
|
self._results)
|
||||||
started = time.time()
|
started = time.time()
|
||||||
d = segmentdler.start()
|
d = segmentdler.start()
|
||||||
@ -940,8 +1034,8 @@ class FileDownloader:
|
|||||||
del buffers
|
del buffers
|
||||||
# we never send empty segments. If the data was an exact multiple
|
# we never send empty segments. If the data was an exact multiple
|
||||||
# of the segment size, the last segment will be full.
|
# of the segment size, the last segment will be full.
|
||||||
pad_size = mathutil.pad_size(self._size, self._segment_size)
|
pad_size = mathutil.pad_size(self._uri.size, self._vup.segment_size)
|
||||||
tail_size = self._segment_size - pad_size
|
tail_size = self._vup.segment_size - pad_size
|
||||||
segment = segment[:tail_size]
|
segment = segment[:tail_size]
|
||||||
started_decrypt = time.time()
|
started_decrypt = time.time()
|
||||||
self._output.write_segment(segment)
|
self._output.write_segment(segment)
|
||||||
@ -952,24 +1046,20 @@ class FileDownloader:
|
|||||||
return d
|
return d
|
||||||
|
|
||||||
def _done(self, res):
|
def _done(self, res):
|
||||||
|
assert isinstance(self._uri, uri.CHKFileURI), self._uri
|
||||||
self.log("download done")
|
self.log("download done")
|
||||||
if self._results:
|
if self._results:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
self._results.timings["total"] = now - self._started
|
self._results.timings["total"] = now - self._started
|
||||||
self._results.timings["segments"] = now - self._started_fetching
|
self._results.timings["segments"] = now - self._started_fetching
|
||||||
self._output.close()
|
self._output.close()
|
||||||
if self.check_crypttext_hash and self._crypttext_hash:
|
if self._vup.crypttext_hash:
|
||||||
_assert(self._crypttext_hash == self._output.crypttext_hash,
|
_assert(self._vup.crypttext_hash == self._output.crypttext_hash,
|
||||||
"bad crypttext_hash: computed=%s, expected=%s" %
|
"bad crypttext_hash: computed=%s, expected=%s" %
|
||||||
(base32.b2a(self._output.crypttext_hash),
|
(base32.b2a(self._output.crypttext_hash),
|
||||||
base32.b2a(self._crypttext_hash)))
|
base32.b2a(self._vup.crypttext_hash)))
|
||||||
if self.check_plaintext_hash and self._plaintext_hash:
|
_assert(self._output.length == self._uri.size,
|
||||||
_assert(self._plaintext_hash == self._output.plaintext_hash,
|
got=self._output.length, expected=self._uri.size)
|
||||||
"bad plaintext_hash: computed=%s, expected=%s" %
|
|
||||||
(base32.b2a(self._output.plaintext_hash),
|
|
||||||
base32.b2a(self._plaintext_hash)))
|
|
||||||
_assert(self._output.length == self._size,
|
|
||||||
got=self._output.length, expected=self._size)
|
|
||||||
return self._output.finish()
|
return self._output.finish()
|
||||||
|
|
||||||
def get_download_status(self):
|
def get_download_status(self):
|
||||||
@ -1043,12 +1133,6 @@ class ConsumerAdapter:
|
|||||||
implements(IDownloadTarget, IConsumer)
|
implements(IDownloadTarget, IConsumer)
|
||||||
def __init__(self, consumer):
|
def __init__(self, consumer):
|
||||||
self._consumer = consumer
|
self._consumer = consumer
|
||||||
self._when_finished = observer.OneShotObserverList()
|
|
||||||
|
|
||||||
def when_finished(self):
|
|
||||||
# I think this is unused, along with self._when_finished . But I need
|
|
||||||
# to trace the error paths to be sure.
|
|
||||||
return self._when_finished.when_fired()
|
|
||||||
|
|
||||||
def registerProducer(self, producer, streaming):
|
def registerProducer(self, producer, streaming):
|
||||||
self._consumer.registerProducer(producer, streaming)
|
self._consumer.registerProducer(producer, streaming)
|
||||||
@ -1060,10 +1144,10 @@ class ConsumerAdapter:
|
|||||||
def write(self, data):
|
def write(self, data):
|
||||||
self._consumer.write(data)
|
self._consumer.write(data)
|
||||||
def close(self):
|
def close(self):
|
||||||
self._when_finished.fire(None)
|
pass
|
||||||
|
|
||||||
def fail(self, why):
|
def fail(self, why):
|
||||||
self._when_finished.fire(why)
|
pass
|
||||||
def register_canceller(self, cb):
|
def register_canceller(self, cb):
|
||||||
pass
|
pass
|
||||||
def finish(self):
|
def finish(self):
|
||||||
|
@ -150,12 +150,12 @@ class Encoder(object):
|
|||||||
# it. If the tail is short, we use a different codec instance. In
|
# it. If the tail is short, we use a different codec instance. In
|
||||||
# addition, the tail codec must be fed data which has been padded out
|
# addition, the tail codec must be fed data which has been padded out
|
||||||
# to the right size.
|
# to the right size.
|
||||||
self.tail_size = self.file_size % self.segment_size
|
tail_size = self.file_size % self.segment_size
|
||||||
if not self.tail_size:
|
if not tail_size:
|
||||||
self.tail_size = self.segment_size
|
tail_size = self.segment_size
|
||||||
|
|
||||||
# the tail codec is responsible for encoding tail_size bytes
|
# the tail codec is responsible for encoding tail_size bytes
|
||||||
padded_tail_size = mathutil.next_multiple(self.tail_size,
|
padded_tail_size = mathutil.next_multiple(tail_size,
|
||||||
self.required_shares)
|
self.required_shares)
|
||||||
self._tail_codec = CRSEncoder()
|
self._tail_codec = CRSEncoder()
|
||||||
self._tail_codec.set_params(padded_tail_size,
|
self._tail_codec.set_params(padded_tail_size,
|
||||||
|
@ -9,6 +9,7 @@ from foolscap.eventual import eventually
|
|||||||
from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
|
from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
|
||||||
IDownloadTarget
|
IDownloadTarget
|
||||||
from allmydata.util import log, base32
|
from allmydata.util import log, base32
|
||||||
|
from allmydata.uri import from_string as uri_from_string
|
||||||
from allmydata.immutable.checker import SimpleCHKFileChecker, \
|
from allmydata.immutable.checker import SimpleCHKFileChecker, \
|
||||||
SimpleCHKFileVerifier
|
SimpleCHKFileVerifier
|
||||||
from allmydata.immutable import download
|
from allmydata.immutable import download
|
||||||
@ -191,17 +192,18 @@ class FileNode(_ImmutableFileNodeBase):
|
|||||||
# SimpleCHKFileVerifier, have it call monitor.raise_if_cancelled()
|
# SimpleCHKFileVerifier, have it call monitor.raise_if_cancelled()
|
||||||
# before sending each request.
|
# before sending each request.
|
||||||
storage_index = self.u.storage_index
|
storage_index = self.u.storage_index
|
||||||
|
assert IFileURI.providedBy(self.u), self.u
|
||||||
k = self.u.needed_shares
|
k = self.u.needed_shares
|
||||||
N = self.u.total_shares
|
N = self.u.total_shares
|
||||||
size = self.u.size
|
size = self.u.size
|
||||||
ueb_hash = self.u.uri_extension_hash
|
ueb_hash = self.u.uri_extension_hash
|
||||||
if verify:
|
if verify:
|
||||||
v = self.verifier_class(self._client,
|
v = self.verifier_class(self._client,
|
||||||
self.get_uri(), storage_index,
|
uri_from_string(self.get_uri()), storage_index,
|
||||||
k, N, size, ueb_hash)
|
k, N, size, ueb_hash)
|
||||||
else:
|
else:
|
||||||
v = self.checker_class(self._client,
|
v = self.checker_class(self._client,
|
||||||
self.get_uri(), storage_index,
|
uri_from_string(self.get_uri()), storage_index,
|
||||||
k, N)
|
k, N)
|
||||||
return v.start()
|
return v.start()
|
||||||
|
|
||||||
|
@ -959,6 +959,9 @@ class ICodecEncoder(Interface):
|
|||||||
may be invoked.
|
may be invoked.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def get_params():
|
||||||
|
"""Return the 3-tuple of data_size, required_shares, max_shares"""
|
||||||
|
|
||||||
def get_encoder_type():
|
def get_encoder_type():
|
||||||
"""Return a short string that describes the type of this encoder.
|
"""Return a short string that describes the type of this encoder.
|
||||||
|
|
||||||
@ -967,23 +970,6 @@ class ICodecEncoder(Interface):
|
|||||||
encoder class, and this encoder is an instance of that class.
|
encoder class, and this encoder is an instance of that class.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def get_serialized_params(): # TODO: maybe, maybe not
|
|
||||||
"""Return a string that describes the parameters of this encoder.
|
|
||||||
|
|
||||||
This string can be passed to the decoder to prepare it for handling
|
|
||||||
the encoded shares we create. It might contain more information than
|
|
||||||
was presented to set_params(), if there is some flexibility of
|
|
||||||
parameter choice.
|
|
||||||
|
|
||||||
This string is intended to be embedded in the URI, so there are
|
|
||||||
several restrictions on its contents. At the moment I'm thinking that
|
|
||||||
this means it may contain hex digits and hyphens, and nothing else.
|
|
||||||
The idea is that the URI contains something like '%s:%s:%s' %
|
|
||||||
(encoder.get_encoder_name(), encoder.get_serialized_params(),
|
|
||||||
b2a(crypttext_hash)), and this is enough information to construct a
|
|
||||||
compatible decoder.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def get_block_size():
|
def get_block_size():
|
||||||
"""Return the length of the shares that encode() will produce.
|
"""Return the length of the shares that encode() will produce.
|
||||||
"""
|
"""
|
||||||
@ -1093,13 +1079,12 @@ class ICodecEncoder(Interface):
|
|||||||
|
|
||||||
|
|
||||||
class ICodecDecoder(Interface):
|
class ICodecDecoder(Interface):
|
||||||
def set_serialized_params(params):
|
def set_params(data_size, required_shares, max_shares):
|
||||||
"""Set up the parameters of this encoder, from a string returned by
|
"""Set the params. They have to be exactly the same ones that were used for encoding. """
|
||||||
encoder.get_serialized_params()."""
|
|
||||||
|
|
||||||
def get_needed_shares():
|
def get_needed_shares():
|
||||||
"""Return the number of shares needed to reconstruct the data.
|
"""Return the number of shares needed to reconstruct the data.
|
||||||
set_serialized_params() is required to be called before this."""
|
set_params() is required to be called before this."""
|
||||||
|
|
||||||
def decode(some_shares, their_shareids):
|
def decode(some_shares, their_shareids):
|
||||||
"""Decode a partial list of shares into data.
|
"""Decode a partial list of shares into data.
|
||||||
@ -2159,6 +2144,10 @@ class RIKeyGenerator(RemoteInterface):
|
|||||||
class FileTooLargeError(Exception):
|
class FileTooLargeError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class IValidatedThingProxy(Interface):
|
||||||
|
def start():
|
||||||
|
""" Acquire a thing and validate it. Return a deferred which is eventually fired with
|
||||||
|
self if the thing is valid or errbacked if it can't be acquired or validated. """
|
||||||
|
|
||||||
class InsufficientVersionError(Exception):
|
class InsufficientVersionError(Exception):
|
||||||
def __init__(self, needed, got):
|
def __init__(self, needed, got):
|
||||||
|
@ -494,10 +494,9 @@ class Retrieve:
|
|||||||
shares = shares[:k]
|
shares = shares[:k]
|
||||||
|
|
||||||
fec = codec.CRSDecoder()
|
fec = codec.CRSDecoder()
|
||||||
params = "%d-%d-%d" % (segsize, k, N)
|
fec.set_params(segsize, k, N)
|
||||||
fec.set_serialized_params(params)
|
|
||||||
|
|
||||||
self.log("params %s, we have %d shares" % (params, len(shares)))
|
self.log("params %s, we have %d shares" % ((segsize, k, N), len(shares)))
|
||||||
self.log("about to decode, shareids=%s" % (shareids,))
|
self.log("about to decode, shareids=%s" % (shareids,))
|
||||||
d = defer.maybeDeferred(fec.decode, shares, shareids)
|
d = defer.maybeDeferred(fec.decode, shares, shareids)
|
||||||
def _done(buffers):
|
def _done(buffers):
|
||||||
|
@ -2,17 +2,18 @@
|
|||||||
import os
|
import os
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from twisted.python import log
|
from twisted.python import log
|
||||||
from allmydata.codec import ReplicatingEncoder, ReplicatingDecoder, CRSEncoder, CRSDecoder
|
from allmydata.codec import CRSEncoder, CRSDecoder
|
||||||
import random
|
import random
|
||||||
from allmydata.util import mathutil
|
from allmydata.util import mathutil
|
||||||
|
|
||||||
class Tester:
|
class T(unittest.TestCase):
|
||||||
def do_test(self, size, required_shares, max_shares, fewer_shares=None):
|
def do_test(self, size, required_shares, max_shares, fewer_shares=None):
|
||||||
data0s = [os.urandom(mathutil.div_ceil(size, required_shares)) for i in range(required_shares)]
|
data0s = [os.urandom(mathutil.div_ceil(size, required_shares)) for i in range(required_shares)]
|
||||||
enc = self.enc_class()
|
enc = CRSEncoder()
|
||||||
enc.set_params(size, required_shares, max_shares)
|
enc.set_params(size, required_shares, max_shares)
|
||||||
serialized_params = enc.get_serialized_params()
|
params = enc.get_params()
|
||||||
log.msg("serialized_params: %s" % serialized_params)
|
assert params == (size, required_shares, max_shares)
|
||||||
|
log.msg("params: %s" % (params,))
|
||||||
d = enc.encode(data0s)
|
d = enc.encode(data0s)
|
||||||
def _done_encoding_all((shares, shareids)):
|
def _done_encoding_all((shares, shareids)):
|
||||||
self.failUnlessEqual(len(shares), max_shares)
|
self.failUnlessEqual(len(shares), max_shares)
|
||||||
@ -28,8 +29,8 @@ class Tester:
|
|||||||
d.addCallback(_check_fewer_shares)
|
d.addCallback(_check_fewer_shares)
|
||||||
|
|
||||||
def _decode((shares, shareids)):
|
def _decode((shares, shareids)):
|
||||||
dec = self.dec_class()
|
dec = CRSDecoder()
|
||||||
dec.set_serialized_params(serialized_params)
|
dec.set_params(*params)
|
||||||
d1 = dec.decode(shares, shareids)
|
d1 = dec.decode(shares, shareids)
|
||||||
return d1
|
return d1
|
||||||
|
|
||||||
@ -72,8 +73,8 @@ class Tester:
|
|||||||
sharesl2 = random.sample(zip(self.shares, self.shareids), required_shares)
|
sharesl2 = random.sample(zip(self.shares, self.shareids), required_shares)
|
||||||
shares2 = [ x[0] for x in sharesl2 ]
|
shares2 = [ x[0] for x in sharesl2 ]
|
||||||
shareids2 = [ x[1] for x in sharesl2 ]
|
shareids2 = [ x[1] for x in sharesl2 ]
|
||||||
dec = self.dec_class()
|
dec = CRSDecoder()
|
||||||
dec.set_serialized_params(serialized_params)
|
dec.set_params(*params)
|
||||||
d1 = dec.decode(shares1, shareids1)
|
d1 = dec.decode(shares1, shareids1)
|
||||||
d1.addCallback(_check_data)
|
d1.addCallback(_check_data)
|
||||||
d1.addCallback(lambda res: dec.decode(shares2, shareids2))
|
d1.addCallback(lambda res: dec.decode(shares2, shareids2))
|
||||||
@ -91,12 +92,3 @@ class Tester:
|
|||||||
|
|
||||||
def test_encode2(self):
|
def test_encode2(self):
|
||||||
return self.do_test(125, 25, 100, 90)
|
return self.do_test(125, 25, 100, 90)
|
||||||
|
|
||||||
class Replicating(unittest.TestCase, Tester):
|
|
||||||
enc_class = ReplicatingEncoder
|
|
||||||
dec_class = ReplicatingDecoder
|
|
||||||
|
|
||||||
class CRS(unittest.TestCase, Tester):
|
|
||||||
enc_class = CRSEncoder
|
|
||||||
dec_class = CRSDecoder
|
|
||||||
|
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
|
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
@ -7,7 +6,7 @@ from twisted.python.failure import Failure
|
|||||||
from foolscap import eventual
|
from foolscap import eventual
|
||||||
from allmydata import hashtree, uri
|
from allmydata import hashtree, uri
|
||||||
from allmydata.immutable import encode, upload, download
|
from allmydata.immutable import encode, upload, download
|
||||||
from allmydata.util import hashutil
|
from allmydata.util import base32, hashutil
|
||||||
from allmydata.util.assertutil import _assert
|
from allmydata.util.assertutil import _assert
|
||||||
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
|
from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, NotEnoughSharesError
|
||||||
import common_util as testutil
|
import common_util as testutil
|
||||||
@ -22,7 +21,7 @@ class FakeClient:
|
|||||||
def log(self, *args, **kwargs):
|
def log(self, *args, **kwargs):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class FakeBucketWriterProxy:
|
class FakeBucketReaderWriterProxy:
|
||||||
implements(IStorageBucketWriter, IStorageBucketReader)
|
implements(IStorageBucketWriter, IStorageBucketReader)
|
||||||
# these are used for both reading and writing
|
# these are used for both reading and writing
|
||||||
def __init__(self, mode="good"):
|
def __init__(self, mode="good"):
|
||||||
@ -111,10 +110,6 @@ class FakeBucketWriterProxy:
|
|||||||
def get_plaintext_hashes(self):
|
def get_plaintext_hashes(self):
|
||||||
def _try():
|
def _try():
|
||||||
hashes = self.plaintext_hashes[:]
|
hashes = self.plaintext_hashes[:]
|
||||||
if self.mode == "bad plaintext hashroot":
|
|
||||||
hashes[0] = flip_bit(hashes[0])
|
|
||||||
if self.mode == "bad plaintext hash":
|
|
||||||
hashes[1] = flip_bit(hashes[1])
|
|
||||||
return hashes
|
return hashes
|
||||||
return defer.maybeDeferred(_try)
|
return defer.maybeDeferred(_try)
|
||||||
|
|
||||||
@ -164,6 +159,102 @@ def make_data(length):
|
|||||||
assert length <= len(data)
|
assert length <= len(data)
|
||||||
return data[:length]
|
return data[:length]
|
||||||
|
|
||||||
|
class ValidatedExtendedURIProxy(unittest.TestCase):
|
||||||
|
K = 4
|
||||||
|
M = 10
|
||||||
|
SIZE = 200
|
||||||
|
SEGSIZE = 72
|
||||||
|
_TMP = SIZE%SEGSIZE
|
||||||
|
if _TMP == 0:
|
||||||
|
_TMP = SEGSIZE
|
||||||
|
if _TMP % K != 0:
|
||||||
|
_TMP += (K - (_TMP % K))
|
||||||
|
TAIL_SEGSIZE = _TMP
|
||||||
|
_TMP = SIZE / SEGSIZE
|
||||||
|
if SIZE % SEGSIZE != 0:
|
||||||
|
_TMP += 1
|
||||||
|
NUM_SEGMENTS = _TMP
|
||||||
|
mindict = { 'segment_size': SEGSIZE,
|
||||||
|
'crypttext_root_hash': '0'*hashutil.CRYPTO_VAL_SIZE,
|
||||||
|
'share_root_hash': '1'*hashutil.CRYPTO_VAL_SIZE }
|
||||||
|
optional_consistent = { 'crypttext_hash': '2'*hashutil.CRYPTO_VAL_SIZE,
|
||||||
|
'codec_name': "crs",
|
||||||
|
'codec_params': "%d-%d-%d" % (SEGSIZE, K, M),
|
||||||
|
'tail_codec_params': "%d-%d-%d" % (TAIL_SEGSIZE, K, M),
|
||||||
|
'num_segments': NUM_SEGMENTS,
|
||||||
|
'size': SIZE,
|
||||||
|
'needed_shares': K,
|
||||||
|
'total_shares': M,
|
||||||
|
'plaintext_hash': "anything",
|
||||||
|
'plaintext_root_hash': "anything", }
|
||||||
|
# optional_inconsistent = { 'crypttext_hash': ('2'*(hashutil.CRYPTO_VAL_SIZE-1), "", 77),
|
||||||
|
optional_inconsistent = { 'crypttext_hash': (77,),
|
||||||
|
'codec_name': ("digital fountain", ""),
|
||||||
|
'codec_params': ("%d-%d-%d" % (SEGSIZE, K-1, M),
|
||||||
|
"%d-%d-%d" % (SEGSIZE-1, K, M),
|
||||||
|
"%d-%d-%d" % (SEGSIZE, K, M-1)),
|
||||||
|
'tail_codec_params': ("%d-%d-%d" % (TAIL_SEGSIZE, K-1, M),
|
||||||
|
"%d-%d-%d" % (TAIL_SEGSIZE-1, K, M),
|
||||||
|
"%d-%d-%d" % (TAIL_SEGSIZE, K, M-1)),
|
||||||
|
'num_segments': (NUM_SEGMENTS-1,),
|
||||||
|
'size': (SIZE-1,),
|
||||||
|
'needed_shares': (K-1,),
|
||||||
|
'total_shares': (M-1,), }
|
||||||
|
|
||||||
|
def _test(self, uebdict):
|
||||||
|
uebstring = uri.pack_extension(uebdict)
|
||||||
|
uebhash = hashutil.uri_extension_hash(uebstring)
|
||||||
|
fb = FakeBucketReaderWriterProxy()
|
||||||
|
fb.put_uri_extension(uebstring)
|
||||||
|
verifycap = uri.CHKFileVerifierURI(storage_index='x'*16, uri_extension_hash=uebhash, needed_shares=self.K, total_shares=self.M, size=self.SIZE)
|
||||||
|
vup = download.ValidatedExtendedURIProxy(fb, verifycap)
|
||||||
|
return vup.start()
|
||||||
|
|
||||||
|
def _test_accept(self, uebdict):
|
||||||
|
return self._test(uebdict)
|
||||||
|
|
||||||
|
def _should_fail(self, res, expected_failures):
|
||||||
|
if isinstance(res, Failure):
|
||||||
|
res.trap(*expected_failures)
|
||||||
|
else:
|
||||||
|
self.fail("was supposed to raise %s, not get '%s'" % (expected_failures, res))
|
||||||
|
|
||||||
|
def _test_reject(self, uebdict):
|
||||||
|
d = self._test(uebdict)
|
||||||
|
d.addBoth(self._should_fail, (KeyError, download.BadURIExtension))
|
||||||
|
return d
|
||||||
|
|
||||||
|
def test_accept_minimal(self):
|
||||||
|
return self._test_accept(self.mindict)
|
||||||
|
|
||||||
|
def test_reject_insufficient(self):
|
||||||
|
dl = []
|
||||||
|
for k in self.mindict.iterkeys():
|
||||||
|
insuffdict = self.mindict.copy()
|
||||||
|
del insuffdict[k]
|
||||||
|
d = self._test_reject(insuffdict)
|
||||||
|
dl.append(d)
|
||||||
|
return defer.DeferredList(dl)
|
||||||
|
|
||||||
|
def test_accept_optional(self):
|
||||||
|
dl = []
|
||||||
|
for k in self.optional_consistent.iterkeys():
|
||||||
|
mydict = self.mindict.copy()
|
||||||
|
mydict[k] = self.optional_consistent[k]
|
||||||
|
d = self._test_accept(mydict)
|
||||||
|
dl.append(d)
|
||||||
|
return defer.DeferredList(dl)
|
||||||
|
|
||||||
|
def test_reject_optional(self):
|
||||||
|
dl = []
|
||||||
|
for k in self.optional_inconsistent.iterkeys():
|
||||||
|
for v in self.optional_inconsistent[k]:
|
||||||
|
mydict = self.mindict.copy()
|
||||||
|
mydict[k] = v
|
||||||
|
d = self._test_reject(mydict)
|
||||||
|
dl.append(d)
|
||||||
|
return defer.DeferredList(dl)
|
||||||
|
|
||||||
class Encode(unittest.TestCase):
|
class Encode(unittest.TestCase):
|
||||||
|
|
||||||
def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
|
def do_encode(self, max_segment_size, datalen, NUM_SHARES, NUM_SEGMENTS,
|
||||||
@ -192,7 +283,7 @@ class Encode(unittest.TestCase):
|
|||||||
|
|
||||||
shareholders = {}
|
shareholders = {}
|
||||||
for shnum in range(NUM_SHARES):
|
for shnum in range(NUM_SHARES):
|
||||||
peer = FakeBucketWriterProxy()
|
peer = FakeBucketReaderWriterProxy()
|
||||||
shareholders[shnum] = peer
|
shareholders[shnum] = peer
|
||||||
all_shareholders.append(peer)
|
all_shareholders.append(peer)
|
||||||
e.set_shareholders(shareholders)
|
e.set_shareholders(shareholders)
|
||||||
@ -357,7 +448,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
all_peers = []
|
all_peers = []
|
||||||
for shnum in range(NUM_SHARES):
|
for shnum in range(NUM_SHARES):
|
||||||
mode = bucket_modes.get(shnum, "good")
|
mode = bucket_modes.get(shnum, "good")
|
||||||
peer = FakeBucketWriterProxy(mode)
|
peer = FakeBucketReaderWriterProxy(mode)
|
||||||
shareholders[shnum] = peer
|
shareholders[shnum] = peer
|
||||||
e.set_shareholders(shareholders)
|
e.set_shareholders(shareholders)
|
||||||
return e.start()
|
return e.start()
|
||||||
@ -385,12 +476,11 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
needed_shares=required_shares,
|
needed_shares=required_shares,
|
||||||
total_shares=num_shares,
|
total_shares=num_shares,
|
||||||
size=file_size)
|
size=file_size)
|
||||||
URI = u.to_string()
|
|
||||||
|
|
||||||
client = FakeClient()
|
client = FakeClient()
|
||||||
if not target:
|
if not target:
|
||||||
target = download.Data()
|
target = download.Data()
|
||||||
fd = download.FileDownloader(client, URI, target)
|
fd = download.FileDownloader(client, u, target)
|
||||||
|
|
||||||
# we manually cycle the FileDownloader through a number of steps that
|
# we manually cycle the FileDownloader through a number of steps that
|
||||||
# would normally be sequenced by a Deferred chain in
|
# would normally be sequenced by a Deferred chain in
|
||||||
@ -405,9 +495,9 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
# Make it possible to obtain uri_extension from the shareholders.
|
# Make it possible to obtain uri_extension from the shareholders.
|
||||||
# Arrange for shareholders[0] to be the first, so we can selectively
|
# Arrange for shareholders[0] to be the first, so we can selectively
|
||||||
# corrupt the data it returns.
|
# corrupt the data it returns.
|
||||||
fd._uri_extension_sources = shareholders.values()
|
uri_extension_sources = shareholders.values()
|
||||||
fd._uri_extension_sources.remove(shareholders[0])
|
uri_extension_sources.remove(shareholders[0])
|
||||||
fd._uri_extension_sources.insert(0, shareholders[0])
|
uri_extension_sources.insert(0, shareholders[0])
|
||||||
|
|
||||||
d = defer.succeed(None)
|
d = defer.succeed(None)
|
||||||
|
|
||||||
@ -418,33 +508,22 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
# replace everybody's crypttext hash trees with a different one
|
# replace everybody's crypttext hash trees with a different one
|
||||||
# (computed over a different file), then modify our uri_extension
|
# (computed over a different file), then modify our uri_extension
|
||||||
# to reflect the new crypttext hash tree root
|
# to reflect the new crypttext hash tree root
|
||||||
def _corrupt_crypttext_hashes(uri_extension):
|
def _corrupt_crypttext_hashes(unused):
|
||||||
assert isinstance(uri_extension, dict)
|
assert isinstance(fd._vup, download.ValidatedExtendedURIProxy), fd._vup
|
||||||
assert 'crypttext_root_hash' in uri_extension
|
assert fd._vup.crypttext_root_hash, fd._vup
|
||||||
badhash = hashutil.tagged_hash("bogus", "data")
|
badhash = hashutil.tagged_hash("bogus", "data")
|
||||||
bad_crypttext_hashes = [badhash] * uri_extension['num_segments']
|
bad_crypttext_hashes = [badhash] * fd._vup.num_segments
|
||||||
badtree = hashtree.HashTree(bad_crypttext_hashes)
|
badtree = hashtree.HashTree(bad_crypttext_hashes)
|
||||||
for bucket in shareholders.values():
|
for bucket in shareholders.values():
|
||||||
bucket.crypttext_hashes = list(badtree)
|
bucket.crypttext_hashes = list(badtree)
|
||||||
uri_extension['crypttext_root_hash'] = badtree[0]
|
fd._crypttext_hash_tree = hashtree.IncompleteHashTree(fd._vup.num_segments)
|
||||||
return uri_extension
|
fd._crypttext_hash_tree.set_hashes({0: badtree[0]})
|
||||||
|
return fd._vup
|
||||||
d.addCallback(_corrupt_crypttext_hashes)
|
d.addCallback(_corrupt_crypttext_hashes)
|
||||||
elif "omit_crypttext_root_hash" in recover_mode:
|
|
||||||
# make it as though the crypttext hash tree root had not
|
|
||||||
# been in the UEB
|
|
||||||
def _remove_crypttext_hashes(uri_extension):
|
|
||||||
assert isinstance(uri_extension, dict)
|
|
||||||
assert 'crypttext_root_hash' in uri_extension
|
|
||||||
del uri_extension['crypttext_root_hash']
|
|
||||||
return uri_extension
|
|
||||||
d.addCallback(_remove_crypttext_hashes)
|
|
||||||
|
|
||||||
d.addCallback(fd._got_uri_extension)
|
|
||||||
|
|
||||||
# also have the FileDownloader ask for hash trees
|
# also have the FileDownloader ask for hash trees
|
||||||
d.addCallback(fd._get_hashtrees)
|
d.addCallback(fd._get_crypttext_hash_tree)
|
||||||
|
|
||||||
d.addCallback(fd._create_validated_buckets)
|
|
||||||
d.addCallback(fd._download_all_segments)
|
d.addCallback(fd._download_all_segments)
|
||||||
d.addCallback(fd._done)
|
d.addCallback(fd._done)
|
||||||
def _done(newdata):
|
def _done(newdata):
|
||||||
@ -551,7 +630,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
def _done(res):
|
def _done(res):
|
||||||
self.failUnless(isinstance(res, Failure))
|
self.failUnless(isinstance(res, Failure))
|
||||||
self.failUnless(res.check(NotEnoughSharesError))
|
self.failUnless(res.check(NotEnoughSharesError), res)
|
||||||
d.addBoth(_done)
|
d.addBoth(_done)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
@ -566,10 +645,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
|
|
||||||
def assertFetchFailureIn(self, fd, where):
|
def assertFetchFailureIn(self, fd, where):
|
||||||
expected = {"uri_extension": 0,
|
expected = {"uri_extension": 0,
|
||||||
"plaintext_hashroot": 0,
|
"crypttext_hash_tree": 0,
|
||||||
"plaintext_hashtree": 0,
|
|
||||||
"crypttext_hashroot": 0,
|
|
||||||
"crypttext_hashtree": 0,
|
|
||||||
}
|
}
|
||||||
if where is not None:
|
if where is not None:
|
||||||
expected[where] += 1
|
expected[where] += 1
|
||||||
@ -592,31 +668,13 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
d.addCallback(self.assertFetchFailureIn, "uri_extension")
|
d.addCallback(self.assertFetchFailureIn, "uri_extension")
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def OFF_test_bad_plaintext_hashroot(self):
|
|
||||||
# the first server has a bad plaintext hashroot, so we will fail over
|
|
||||||
# to a different server.
|
|
||||||
modemap = dict([(i, "bad plaintext hashroot") for i in range(1)] +
|
|
||||||
[(i, "good") for i in range(1, 10)])
|
|
||||||
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
|
||||||
d.addCallback(self.assertFetchFailureIn, "plaintext_hashroot")
|
|
||||||
return d
|
|
||||||
|
|
||||||
def test_bad_crypttext_hashroot(self):
|
def test_bad_crypttext_hashroot(self):
|
||||||
# the first server has a bad crypttext hashroot, so we will fail
|
# the first server has a bad crypttext hashroot, so we will fail
|
||||||
# over to a different server.
|
# over to a different server.
|
||||||
modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
|
modemap = dict([(i, "bad crypttext hashroot") for i in range(1)] +
|
||||||
[(i, "good") for i in range(1, 10)])
|
[(i, "good") for i in range(1, 10)])
|
||||||
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
d.addCallback(self.assertFetchFailureIn, "crypttext_hashroot")
|
d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
|
||||||
return d
|
|
||||||
|
|
||||||
def OFF_test_bad_plaintext_hashes(self):
|
|
||||||
# the first server has a bad plaintext hash block, so we will fail
|
|
||||||
# over to a different server.
|
|
||||||
modemap = dict([(i, "bad plaintext hash") for i in range(1)] +
|
|
||||||
[(i, "good") for i in range(1, 10)])
|
|
||||||
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
|
||||||
d.addCallback(self.assertFetchFailureIn, "plaintext_hashtree")
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_bad_crypttext_hashes(self):
|
def test_bad_crypttext_hashes(self):
|
||||||
@ -625,7 +683,7 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
|
modemap = dict([(i, "bad crypttext hash") for i in range(1)] +
|
||||||
[(i, "good") for i in range(1, 10)])
|
[(i, "good") for i in range(1, 10)])
|
||||||
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
d.addCallback(self.assertFetchFailureIn, "crypttext_hashtree")
|
d.addCallback(self.assertFetchFailureIn, "crypttext_hash_tree")
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_bad_crypttext_hashes_failure(self):
|
def test_bad_crypttext_hashes_failure(self):
|
||||||
@ -646,20 +704,6 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
d.addBoth(_done)
|
d.addBoth(_done)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def test_omitted_crypttext_hashes_failure(self):
|
|
||||||
# Test that the downloader requires a Merkle Tree over the
|
|
||||||
# ciphertext (per http://crisp.cs.du.edu/?q=node/88 -- the
|
|
||||||
# problem that checking the integrity of the shares could let
|
|
||||||
# more than one immutable file match the same read-cap).
|
|
||||||
modemap = dict([(i, "good") for i in range(0, 10)])
|
|
||||||
d = self.send_and_recover((4,8,10), bucket_modes=modemap,
|
|
||||||
recover_mode=("omit_crypttext_root_hash"))
|
|
||||||
def _done(res):
|
|
||||||
self.failUnless(isinstance(res, Failure))
|
|
||||||
self.failUnless(res.check(download.BadURIExtension), res)
|
|
||||||
d.addBoth(_done)
|
|
||||||
return d
|
|
||||||
|
|
||||||
def OFF_test_bad_plaintext(self):
|
def OFF_test_bad_plaintext(self):
|
||||||
# faking a decryption failure is easier: just corrupt the key
|
# faking a decryption failure is easier: just corrupt the key
|
||||||
modemap = dict([(i, "good") for i in range(0, 10)])
|
modemap = dict([(i, "good") for i in range(0, 10)])
|
||||||
@ -747,4 +791,3 @@ class Roundtrip(unittest.TestCase, testutil.ShouldFailMixin):
|
|||||||
self.failUnless(res.check(NotEnoughSharesError))
|
self.failUnless(res.check(NotEnoughSharesError))
|
||||||
d.addBoth(_done)
|
d.addBoth(_done)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -130,9 +130,10 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
|
|||||||
return d1
|
return d1
|
||||||
d.addCallback(_do_upload)
|
d.addCallback(_do_upload)
|
||||||
def _upload_done(results):
|
def _upload_done(results):
|
||||||
uri = results.uri
|
theuri = results.uri
|
||||||
log.msg("upload finished: uri is %s" % (uri,))
|
log.msg("upload finished: uri is %s" % (theuri,))
|
||||||
self.uri = uri
|
self.uri = theuri
|
||||||
|
assert isinstance(self.uri, str), self.uri
|
||||||
dl = self.clients[1].getServiceNamed("downloader")
|
dl = self.clients[1].getServiceNamed("downloader")
|
||||||
self.downloader = dl
|
self.downloader = dl
|
||||||
d.addCallback(_upload_done)
|
d.addCallback(_upload_done)
|
||||||
@ -847,6 +848,7 @@ class SystemTest(SystemTestMixin, unittest.TestCase):
|
|||||||
d1.addCallback(self.log, "publish finished")
|
d1.addCallback(self.log, "publish finished")
|
||||||
def _stash_uri(filenode):
|
def _stash_uri(filenode):
|
||||||
self.uri = filenode.get_uri()
|
self.uri = filenode.get_uri()
|
||||||
|
assert isinstance(self.uri, str), (self.uri, filenode)
|
||||||
d1.addCallback(_stash_uri)
|
d1.addCallback(_stash_uri)
|
||||||
return d1
|
return d1
|
||||||
d.addCallback(_made_subdir1)
|
d.addCallback(_made_subdir1)
|
||||||
@ -2518,8 +2520,8 @@ class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
|
|||||||
self.basedir = self.mktemp()
|
self.basedir = self.mktemp()
|
||||||
d = self.set_up_nodes()
|
d = self.set_up_nodes()
|
||||||
d.addCallback(self.set_up_damaged_tree)
|
d.addCallback(self.set_up_damaged_tree)
|
||||||
d.addCallback(self.do_test_check_bad)
|
d.addCallback(self.do_check)
|
||||||
d.addCallback(self.do_test_deepcheck_bad)
|
d.addCallback(self.do_deepcheck)
|
||||||
d.addCallback(self.do_test_web_bad)
|
d.addCallback(self.do_test_web_bad)
|
||||||
d.addErrback(self.explain_web_error)
|
d.addErrback(self.explain_web_error)
|
||||||
d.addErrback(self.explain_error)
|
d.addErrback(self.explain_error)
|
||||||
@ -2620,8 +2622,8 @@ class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
|
|||||||
|
|
||||||
|
|
||||||
def check_is_healthy(self, cr, where):
|
def check_is_healthy(self, cr, where):
|
||||||
self.failUnless(ICheckerResults.providedBy(cr), where)
|
self.failUnless(ICheckerResults.providedBy(cr), (cr, type(cr), where))
|
||||||
self.failUnless(cr.is_healthy(), where)
|
self.failUnless(cr.is_healthy(), (cr.get_report(), cr.is_healthy(), cr.get_summary(), where))
|
||||||
self.failUnless(cr.is_recoverable(), where)
|
self.failUnless(cr.is_recoverable(), where)
|
||||||
d = cr.get_data()
|
d = cr.get_data()
|
||||||
self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
|
self.failUnlessEqual(d["count-recoverable-versions"], 1, where)
|
||||||
@ -2660,7 +2662,7 @@ class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
|
|||||||
self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
|
self.failUnlessEqual(d["count-unrecoverable-versions"], 1, where)
|
||||||
return cr
|
return cr
|
||||||
|
|
||||||
def do_test_check_bad(self, ignored):
|
def do_check(self, ignored):
|
||||||
d = defer.succeed(None)
|
d = defer.succeed(None)
|
||||||
|
|
||||||
# check the individual items, without verification. This will not
|
# check the individual items, without verification. This will not
|
||||||
@ -2709,7 +2711,7 @@ class DeepCheckWebBad(DeepCheckBase, unittest.TestCase):
|
|||||||
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def do_test_deepcheck_bad(self, ignored):
|
def do_deepcheck(self, ignored):
|
||||||
d = defer.succeed(None)
|
d = defer.succeed(None)
|
||||||
|
|
||||||
# now deep-check the root, with various verify= and repair= options
|
# now deep-check the root, with various verify= and repair= options
|
||||||
|
@ -416,9 +416,7 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
|
|||||||
|
|
||||||
def abbreviated_dirnode(dirnode):
|
def abbreviated_dirnode(dirnode):
|
||||||
u = from_string_dirnode(dirnode.get_uri())
|
u = from_string_dirnode(dirnode.get_uri())
|
||||||
si = u.get_filenode_uri().storage_index
|
return u.abbrev()
|
||||||
si_s = base32.b2a(si)
|
|
||||||
return si_s[:6]
|
|
||||||
|
|
||||||
class DirectoryAsHTML(rend.Page):
|
class DirectoryAsHTML(rend.Page):
|
||||||
# The remainder of this class is to render the directory into
|
# The remainder of this class is to render the directory into
|
||||||
|
Loading…
Reference in New Issue
Block a user