move validation data to thingA, URI has storage_index plus thingA hash

This (compatibility-breaking) change moves much of the validation data and
encoding parameters out of the URI and into the so-called "thingA" block
(which will get a better name as soon as we find one we're comfortable with).
The URI retains the "storage_index" (a generalized term for the role that
we're currently using the verifierid for, the unique index for each file
that gets used by storage servers to decide which shares to return), the
decryption key, the needed_shares/total_shares counts (since they affect
peer selection), and the hash of the thingA block.

This shortens the URI and lets us add more kinds of validation data without
growing the URI (like plaintext merkle trees, to enable strong incremental
plaintext validation), at the cost of maybe 150 bytes of alacrity. Each
storage server holds an identical copy of the thingA block.

This is an incompatible change: new messages have been added to the storage
server interface, and the URI format has changed drastically.
This commit is contained in:
Brian Warner 2007-06-01 18:48:01 -07:00
parent d1d7cdd859
commit 3dfd26970b
10 changed files with 233 additions and 121 deletions

View File

@ -5,7 +5,7 @@ from twisted.python import log
from twisted.internet import defer from twisted.internet import defer
from twisted.application import service from twisted.application import service
from allmydata.util import idlib, mathutil from allmydata.util import idlib, mathutil, bencode
from allmydata.util.assertutil import _assert from allmydata.util.assertutil import _assert
from allmydata import codec, hashtree from allmydata import codec, hashtree
from allmydata.Crypto.Cipher import AES from allmydata.Crypto.Cipher import AES
@ -19,6 +19,8 @@ class HaveAllPeersError(Exception):
# we use this to jump out of the loop # we use this to jump out of the loop
pass pass
class BadThingAHashValue(Exception):
pass
class Output: class Output:
def __init__(self, downloadable, key): def __init__(self, downloadable, key):
@ -223,55 +225,44 @@ class FileDownloader:
def __init__(self, client, uri, downloadable): def __init__(self, client, uri, downloadable):
self._client = client self._client = client
self._downloadable = downloadable
d = unpack_uri(uri) d = unpack_uri(uri)
verifierid = d['verifierid'] self._storage_index = d['storage_index']
size = d['size'] self._thingA_hash = d['thingA_hash']
segment_size = d['segment_size'] self._total_shares = d['total_shares']
assert isinstance(verifierid, str) self._size = d['size']
assert len(verifierid) == 20 self._num_needed_shares = d['needed_shares']
self._verifierid = verifierid
self._fileid = d['fileid']
self._roothash = d['roothash']
self._codec = codec.get_decoder_by_name(d['codec_name'])
self._codec.set_serialized_params(d['codec_params'])
self._tail_codec = codec.get_decoder_by_name(d['codec_name'])
self._tail_codec.set_serialized_params(d['tail_codec_params'])
self._total_segments = mathutil.div_ceil(size, segment_size)
self._current_segnum = 0
self._segment_size = segment_size
self._size = size
self._num_needed_shares = self._codec.get_needed_shares()
self._output = Output(downloadable, d['key']) self._output = Output(downloadable, d['key'])
self._share_hashtree = hashtree.IncompleteHashTree(d['total_shares'])
self._share_hashtree.set_hashes({0: self._roothash})
self.active_buckets = {} # k: shnum, v: bucket self.active_buckets = {} # k: shnum, v: bucket
self._share_buckets = {} # k: shnum, v: set of buckets self._share_buckets = [] # list of (sharenum, bucket) tuples
self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
self._thingA_sources = []
self._thingA_data = None
def start(self): def start(self):
log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),)) log.msg("starting download [%s]" % idlib.b2a(self._storage_index))
# first step: who should we download from? # first step: who should we download from?
d = defer.maybeDeferred(self._get_all_shareholders) d = defer.maybeDeferred(self._get_all_shareholders)
d.addCallback(self._got_all_shareholders) d.addCallback(self._got_all_shareholders)
# once we know that, we can download blocks from them # now get the thingA block from somebody and validate it
d.addCallback(self._obtain_thingA)
d.addCallback(self._got_thingA)
d.addCallback(self._create_validated_buckets)
# once we know that, we can download blocks from everybody
d.addCallback(self._download_all_segments) d.addCallback(self._download_all_segments)
d.addCallback(self._done) d.addCallback(self._done)
return d return d
def _get_all_shareholders(self): def _get_all_shareholders(self):
dl = [] dl = []
for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._verifierid): for (permutedpeerid, peerid, connection) in self._client.get_permuted_peers(self._storage_index):
d = connection.callRemote("get_service", "storageserver") d = connection.callRemote("get_service", "storageserver")
d.addCallback(lambda ss: ss.callRemote("get_buckets", d.addCallback(lambda ss: ss.callRemote("get_buckets",
self._verifierid)) self._storage_index))
d.addCallbacks(self._got_response, self._got_error, d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(connection,)) callbackArgs=(connection,))
dl.append(d) dl.append(d)
@ -281,13 +272,11 @@ class FileDownloader:
_assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint _assert(isinstance(buckets, dict), buckets) # soon foolscap will check this for us with its DictOf schema constraint
for sharenum, bucket in buckets.iteritems(): for sharenum, bucket in buckets.iteritems():
self.add_share_bucket(sharenum, bucket) self.add_share_bucket(sharenum, bucket)
self._thingA_sources.append(bucket)
def add_share_bucket(self, sharenum, bucket): def add_share_bucket(self, sharenum, bucket):
vbucket = ValidatedBucket(sharenum, bucket, # this is split out for the benefit of test_encode.py
self._share_hashtree, self._share_buckets.append( (sharenum, bucket) )
self._roothash,
self._total_segments)
self._share_buckets.setdefault(sharenum, set()).add(vbucket)
def _got_error(self, f): def _got_error(self, f):
self._client.log("Somebody failed. -- %s" % (f,)) self._client.log("Somebody failed. -- %s" % (f,))
@ -295,23 +284,78 @@ class FileDownloader:
def bucket_failed(self, vbucket): def bucket_failed(self, vbucket):
shnum = vbucket.sharenum shnum = vbucket.sharenum
del self.active_buckets[shnum] del self.active_buckets[shnum]
s = self._share_buckets[shnum] s = self._share_vbuckets[shnum]
# s is a set of ValidatedBucket instances # s is a set of ValidatedBucket instances
s.remove(vbucket) s.remove(vbucket)
# ... which might now be empty # ... which might now be empty
if not s: if not s:
# there are no more buckets which can provide this share, so # there are no more buckets which can provide this share, so
# remove the key. This may prompt us to use a different share. # remove the key. This may prompt us to use a different share.
del self._share_buckets[shnum] del self._share_vbuckets[shnum]
def _got_all_shareholders(self, res): def _got_all_shareholders(self, res):
if len(self._share_buckets) < self._num_needed_shares: if len(self._share_buckets) < self._num_needed_shares:
raise NotEnoughPeersError raise NotEnoughPeersError
for s in self._share_buckets.values(): #for s in self._share_vbuckets.values():
for vb in s: # for vb in s:
assert isinstance(vb, ValidatedBucket), \ # assert isinstance(vb, ValidatedBucket), \
"vb is %s but should be a ValidatedBucket" % (vb,) # "vb is %s but should be a ValidatedBucket" % (vb,)
def _obtain_thingA(self, ignored=None):
# all shareholders are supposed to have a copy of thingA, and all are
# supposed to be identical. We compute the hash of the data that
# comes back, and compare it against the version in our URI. If they
# don't match, ignore their data and try someone else.
if not self._thingA_sources:
raise NotEnoughPeersError("ran out of peers while fetching thingA")
bucket = self._thingA_sources.pop()
d = bucket.callRemote("get_thingA")
def _got(thingA):
h = hashtree.thingA_hash(thingA)
if h != self._thingA_hash:
msg = ("The copy of thingA we received from %s was bad" %
bucket)
raise BadThingAHashValue(msg)
return bencode.bdecode(thingA)
d.addCallback(_got)
def _bad(f):
log.msg("thingA from vbucket %s failed: %s" % (bucket, f)) # WEIRD
# try again with a different one
return self._obtain_thingA()
d.addErrback(_bad)
return d
def _got_thingA(self, thingA_data):
d = self._thingA_data = thingA_data
self._codec = codec.get_decoder_by_name(d['codec_name'])
self._codec.set_serialized_params(d['codec_params'])
self._tail_codec = codec.get_decoder_by_name(d['codec_name'])
self._tail_codec.set_serialized_params(d['tail_codec_params'])
verifierid = d['verifierid']
assert isinstance(verifierid, str)
assert len(verifierid) == 20
self._verifierid = verifierid
self._fileid = d['fileid']
self._roothash = d['share_root_hash']
self._segment_size = segment_size = d['segment_size']
self._total_segments = mathutil.div_ceil(self._size, segment_size)
self._current_segnum = 0
self._share_hashtree = hashtree.IncompleteHashTree(d['total_shares'])
self._share_hashtree.set_hashes({0: self._roothash})
def _create_validated_buckets(self, ignored=None):
self._share_vbuckets = {}
for sharenum, bucket in self._share_buckets:
vbucket = ValidatedBucket(sharenum, bucket,
self._share_hashtree,
self._roothash,
self._total_segments)
s = self._share_vbuckets.setdefault(sharenum, set())
s.add(vbucket)
def _activate_enough_buckets(self): def _activate_enough_buckets(self):
"""either return a mapping from shnum to a ValidatedBucket that can """either return a mapping from shnum to a ValidatedBucket that can
@ -320,23 +364,23 @@ class FileDownloader:
while len(self.active_buckets) < self._num_needed_shares: while len(self.active_buckets) < self._num_needed_shares:
# need some more # need some more
handled_shnums = set(self.active_buckets.keys()) handled_shnums = set(self.active_buckets.keys())
available_shnums = set(self._share_buckets.keys()) available_shnums = set(self._share_vbuckets.keys())
potential_shnums = list(available_shnums - handled_shnums) potential_shnums = list(available_shnums - handled_shnums)
if not potential_shnums: if not potential_shnums:
raise NotEnoughPeersError raise NotEnoughPeersError
# choose a random share # choose a random share
shnum = random.choice(potential_shnums) shnum = random.choice(potential_shnums)
# and a random bucket that will provide it # and a random bucket that will provide it
validated_bucket = random.choice(list(self._share_buckets[shnum])) validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
self.active_buckets[shnum] = validated_bucket self.active_buckets[shnum] = validated_bucket
return self.active_buckets return self.active_buckets
def _download_all_segments(self, res): def _download_all_segments(self, res):
# the promise: upon entry to this function, self._share_buckets # the promise: upon entry to this function, self._share_vbuckets
# contains enough buckets to complete the download, and some extra # contains enough buckets to complete the download, and some extra
# ones to tolerate some buckets dropping out or having errors. # ones to tolerate some buckets dropping out or having errors.
# self._share_buckets is a dictionary that maps from shnum to a set # self._share_vbuckets is a dictionary that maps from shnum to a set
# of ValidatedBuckets, which themselves are wrappers around # of ValidatedBuckets, which themselves are wrappers around
# RIBucketReader references. # RIBucketReader references.
self.active_buckets = {} # k: shnum, v: ValidatedBucket instance self.active_buckets = {} # k: shnum, v: ValidatedBucket instance

View File

@ -3,9 +3,9 @@
from zope.interface import implements from zope.interface import implements
from twisted.internet import defer from twisted.internet import defer
from twisted.python import log from twisted.python import log
from allmydata.hashtree import HashTree, block_hash from allmydata.hashtree import HashTree, block_hash, thingA_hash
from allmydata.Crypto.Cipher import AES from allmydata.Crypto.Cipher import AES
from allmydata.util import mathutil from allmydata.util import mathutil, bencode
from allmydata.util.assertutil import _assert from allmydata.util.assertutil import _assert
from allmydata.codec import CRSEncoder from allmydata.codec import CRSEncoder
from allmydata.interfaces import IEncoder from allmydata.interfaces import IEncoder
@ -78,6 +78,7 @@ class Encoder(object):
(self.NEEDED_SHARES, self.TOTAL_SHARES)) (self.NEEDED_SHARES, self.TOTAL_SHARES))
self.NEEDED_SHARES = k self.NEEDED_SHARES = k
self.TOTAL_SHARES = n self.TOTAL_SHARES = n
self.thingA_data = {}
def setup(self, infile, encryption_key): def setup(self, infile, encryption_key):
self.infile = infile self.infile = infile
@ -103,6 +104,15 @@ class Encoder(object):
self._codec.set_params(self.segment_size, self._codec.set_params(self.segment_size,
self.required_shares, self.num_shares) self.required_shares, self.num_shares)
data = self.thingA_data
data['codec_name'] = self._codec.get_encoder_type()
data['codec_params'] = self._codec.get_serialized_params()
data['size'] = self.file_size
data['segment_size'] = self.segment_size
data['needed_shares'] = self.required_shares
data['total_shares'] = self.num_shares
# the "tail" is the last segment. This segment may or may not be # the "tail" is the last segment. This segment may or may not be
# shorter than all other segments. We use the "tail codec" to handle # shorter than all other segments. We use the "tail codec" to handle
# it. If the tail is short, we use a different codec instance. In # it. If the tail is short, we use a different codec instance. In
@ -118,6 +128,10 @@ class Encoder(object):
self._tail_codec = CRSEncoder() self._tail_codec = CRSEncoder()
self._tail_codec.set_params(padded_tail_size, self._tail_codec.set_params(padded_tail_size,
self.required_shares, self.num_shares) self.required_shares, self.num_shares)
data['tail_codec_params'] = self._tail_codec.get_serialized_params()
def set_thingA_data(self, thingA_data):
self.thingA_data.update(thingA_data)
def get_share_size(self): def get_share_size(self):
share_size = mathutil.div_ceil(self.file_size, self.required_shares) share_size = mathutil.div_ceil(self.file_size, self.required_shares)
@ -156,6 +170,7 @@ class Encoder(object):
d.addCallback(lambda res: self.send_all_subshare_hash_trees()) d.addCallback(lambda res: self.send_all_subshare_hash_trees())
d.addCallback(lambda res: self.send_all_share_hash_trees()) d.addCallback(lambda res: self.send_all_share_hash_trees())
d.addCallback(lambda res: self.send_thingA_to_all_shareholders())
d.addCallback(lambda res: self.close_all_shareholders()) d.addCallback(lambda res: self.close_all_shareholders())
d.addCallbacks(lambda res: self.done(), self.err) d.addCallbacks(lambda res: self.done(), self.err)
return d return d
@ -277,7 +292,7 @@ class Encoder(object):
# create the share hash tree # create the share hash tree
t = HashTree(self.share_root_hashes) t = HashTree(self.share_root_hashes)
# the root of this hash tree goes into our URI # the root of this hash tree goes into our URI
self.root_hash = t[0] self.thingA_data['share_root_hash'] = t[0]
# now send just the necessary pieces out to each shareholder # now send just the necessary pieces out to each shareholder
for i in range(self.num_shares): for i in range(self.num_shares):
# the HashTree is given a list of leaves: 0,1,2,3..n . # the HashTree is given a list of leaves: 0,1,2,3..n .
@ -293,6 +308,18 @@ class Encoder(object):
sh = self.landlords[shareid] sh = self.landlords[shareid]
return sh.callRemote("put_share_hashes", needed_hashes) return sh.callRemote("put_share_hashes", needed_hashes)
def send_thingA_to_all_shareholders(self):
log.msg("%s: sending thingA" % self)
thingA = bencode.bencode(self.thingA_data)
self.thingA_hash = thingA_hash(thingA)
dl = []
for sh in self.landlords.values():
dl.append(self.send_thingA(sh, thingA))
return defer.DeferredList(dl)
def send_thingA(self, sh, thingA):
return sh.callRemote("put_thingA", thingA)
def close_all_shareholders(self): def close_all_shareholders(self):
log.msg("%s: closing shareholders" % self) log.msg("%s: closing shareholders" % self)
dl = [] dl = []
@ -302,7 +329,7 @@ class Encoder(object):
def done(self): def done(self):
log.msg("%s: upload done" % self) log.msg("%s: upload done" % self)
return self.root_hash return self.thingA_hash
def err(self, f): def err(self, f):
log.msg("%s: upload failed: %s" % (self, f)) log.msg("%s: upload failed: %s" % (self, f))

View File

@ -439,3 +439,6 @@ class IncompleteHashTree(CompleteBinaryTreeMixin, list):
def block_hash(data): def block_hash(data):
return tagged_hash("encoded subshare", data) return tagged_hash("encoded subshare", data)
def thingA_hash(data):
return tagged_hash("thingA", data)

View File

@ -12,9 +12,11 @@ Nodeid = StringConstraint(maxLength=20,
minLength=20) # binary format 20-byte SHA1 hash minLength=20) # binary format 20-byte SHA1 hash
FURL = StringConstraint(1000) FURL = StringConstraint(1000)
Verifierid = StringConstraint(20) Verifierid = StringConstraint(20)
StorageIndex = StringConstraint(32)
URI = StringConstraint(300) # kind of arbitrary URI = StringConstraint(300) # kind of arbitrary
MAX_BUCKETS = 200 # per peer MAX_BUCKETS = 200 # per peer
ShareData = StringConstraint(100000) ShareData = StringConstraint(100000) # 2MB segment / k=25
ThingAData = StringConstraint(1000)
class RIIntroducerClient(RemoteInterface): class RIIntroducerClient(RemoteInterface):
def new_peers(furls=SetOf(FURL)): def new_peers(furls=SetOf(FURL)):
@ -56,6 +58,16 @@ class RIBucketWriter(RemoteInterface):
def put_share_hashes(sharehashes=ListOf(TupleOf(int, Hash), maxLength=2**20)): def put_share_hashes(sharehashes=ListOf(TupleOf(int, Hash), maxLength=2**20)):
return None return None
def put_thingA(data=ThingAData):
"""This as-yet-unnamed block of data contains integrity-checking
information (hashes of plaintext, crypttext, and shares), as well as
encoding parameters that are necessary to recover the data. This is a
bencoded dict mapping strings to other strings. The hash of this data
is kept in the URI and verified before any of the data is used. All
buckets for a given file contain identical copies of this data.
"""
return None
def close(): def close():
""" """
If the data that has been written is incomplete or inconsistent then If the data that has been written is incomplete or inconsistent then
@ -74,9 +86,12 @@ class RIBucketReader(RemoteInterface):
return ListOf(Hash, maxLength=2**20) return ListOf(Hash, maxLength=2**20)
def get_share_hashes(): def get_share_hashes():
return ListOf(TupleOf(int, Hash), maxLength=2**20) return ListOf(TupleOf(int, Hash), maxLength=2**20)
def get_thingA():
return ThingAData
class RIStorageServer(RemoteInterface): class RIStorageServer(RemoteInterface):
def allocate_buckets(verifierid=Verifierid, def allocate_buckets(storage_index=StorageIndex,
sharenums=SetOf(int, maxLength=MAX_BUCKETS), sharenums=SetOf(int, maxLength=MAX_BUCKETS),
sharesize=int, blocksize=int, canary=Referenceable): sharesize=int, blocksize=int, canary=Referenceable):
""" """
@ -86,7 +101,7 @@ class RIStorageServer(RemoteInterface):
""" """
return TupleOf(SetOf(int, maxLength=MAX_BUCKETS), return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS)) DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
def get_buckets(verifierid=Verifierid): def get_buckets(storage_index=StorageIndex):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS) return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
# hm, we need a solution for forward references in schemas # hm, we need a solution for forward references in schemas
@ -377,7 +392,7 @@ class IEncoder(Interface):
input file, encrypting it, encoding the pieces, uploading the shares input file, encrypting it, encoding the pieces, uploading the shares
to the shareholders, then sending the hash trees. to the shareholders, then sending the hash trees.
I return a Deferred that fires with the root hash. I return a Deferred that fires with the hash of the thingA data block.
""" """
class IDecoder(Interface): class IDecoder(Interface):

View File

@ -11,13 +11,13 @@ from allmydata.util import bencode, fileutil, idlib
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
# store/ # store/
# store/incoming # temp dirs named $VERIFIERID/$SHARENUM which will be moved to store/$VERIFIERID/$SHARENUM on success # store/incoming # temp dirs named $STORAGEINDEX/$SHARENUM which will be moved to store/$STORAGEINDEX/$SHARENUM on success
# store/$VERIFIERID # store/$STORAGEINDEX
# store/$VERIFIERID/$SHARENUM # store/$STORAGEINDEX/$SHARENUM
# store/$VERIFIERID/$SHARENUM/blocksize # store/$STORAGEINDEX/$SHARENUM/blocksize
# store/$VERIFIERID/$SHARENUM/data # store/$STORAGEINDEX/$SHARENUM/data
# store/$VERIFIERID/$SHARENUM/blockhashes # store/$STORAGEINDEX/$SHARENUM/blockhashes
# store/$VERIFIERID/$SHARENUM/sharehashtree # store/$STORAGEINDEX/$SHARENUM/sharehashtree
# $SHARENUM matches this regex: # $SHARENUM matches this regex:
NUM_RE=re.compile("[0-9]*") NUM_RE=re.compile("[0-9]*")
@ -59,6 +59,10 @@ class BucketWriter(Referenceable):
precondition(not self.closed) precondition(not self.closed)
self._write_file('sharehashes', bencode.bencode(sharehashes)) self._write_file('sharehashes', bencode.bencode(sharehashes))
def remote_put_thingA(self, data):
precondition(not self.closed)
self._write_file('thingA', data)
def remote_close(self): def remote_close(self):
precondition(not self.closed) precondition(not self.closed)
# TODO assert or check the completeness and consistency of the data that has been written # TODO assert or check the completeness and consistency of the data that has been written
@ -100,6 +104,9 @@ class BucketReader(Referenceable):
# schema # schema
return [tuple(i) for i in hashes] return [tuple(i) for i in hashes]
def remote_get_thingA(self):
return self._read_file('thingA')
class StorageServer(service.MultiService, Referenceable): class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer) implements(RIStorageServer)
name = 'storageserver' name = 'storageserver'
@ -116,13 +123,13 @@ class StorageServer(service.MultiService, Referenceable):
def _clean_incomplete(self): def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir) fileutil.rm_dir(self.incomingdir)
def remote_allocate_buckets(self, verifierid, sharenums, sharesize, def remote_allocate_buckets(self, storage_index, sharenums, sharesize,
blocksize, canary): blocksize, canary):
alreadygot = set() alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter bucketwriters = {} # k: shnum, v: BucketWriter
for shnum in sharenums: for shnum in sharenums:
incominghome = os.path.join(self.incomingdir, idlib.b2a(verifierid), "%d"%shnum) incominghome = os.path.join(self.incomingdir, idlib.b2a(storage_index), "%d"%shnum)
finalhome = os.path.join(self.storedir, idlib.b2a(verifierid), "%d"%shnum) finalhome = os.path.join(self.storedir, idlib.b2a(storage_index), "%d"%shnum)
if os.path.exists(incominghome) or os.path.exists(finalhome): if os.path.exists(incominghome) or os.path.exists(finalhome):
alreadygot.add(shnum) alreadygot.add(shnum)
else: else:
@ -130,13 +137,13 @@ class StorageServer(service.MultiService, Referenceable):
return alreadygot, bucketwriters return alreadygot, bucketwriters
def remote_get_buckets(self, verifierid): def remote_get_buckets(self, storage_index):
bucketreaders = {} # k: sharenum, v: BucketReader bucketreaders = {} # k: sharenum, v: BucketReader
verifierdir = os.path.join(self.storedir, idlib.b2a(verifierid)) storagedir = os.path.join(self.storedir, idlib.b2a(storage_index))
try: try:
for f in os.listdir(verifierdir): for f in os.listdir(storagedir):
if NUM_RE.match(f): if NUM_RE.match(f):
bucketreaders[int(f)] = BucketReader(os.path.join(verifierdir, f)) bucketreaders[int(f)] = BucketReader(os.path.join(storagedir, f))
except OSError: except OSError:
# Commonly caused by there being no buckets at all. # Commonly caused by there being no buckets at all.
pass pass

View File

@ -5,6 +5,7 @@ from twisted.internet import defer
from twisted.python.failure import Failure from twisted.python.failure import Failure
from foolscap import eventual from foolscap import eventual
from allmydata import encode, download from allmydata import encode, download
from allmydata.util import bencode
from allmydata.uri import pack_uri from allmydata.uri import pack_uri
from cStringIO import StringIO from cStringIO import StringIO
@ -70,6 +71,10 @@ class FakeBucketWriter:
assert self.share_hashes is None assert self.share_hashes is None
self.share_hashes = sharehashes self.share_hashes = sharehashes
def put_thingA(self, thingA):
assert not self.closed
self.thingA = thingA
def close(self): def close(self):
assert not self.closed assert not self.closed
self.closed = True self.closed = True
@ -78,7 +83,7 @@ class FakeBucketWriter:
return good[:-1] + chr(ord(good[-1]) ^ 0x01) return good[:-1] + chr(ord(good[-1]) ^ 0x01)
def get_block(self, blocknum): def get_block(self, blocknum):
assert isinstance(blocknum, int) assert isinstance(blocknum, (int, long))
if self.mode == "bad block": if self.mode == "bad block":
return self.flip_bit(self.blocks[blocknum]) return self.flip_bit(self.blocks[blocknum])
return self.blocks[blocknum] return self.blocks[blocknum]
@ -238,28 +243,41 @@ class Roundtrip(unittest.TestCase):
shareholders[shnum] = peer shareholders[shnum] = peer
all_shareholders.append(peer) all_shareholders.append(peer)
e.set_shareholders(shareholders) e.set_shareholders(shareholders)
e.set_thingA_data({'verifierid': "V" * 20,
'fileid': "F" * 20,
})
d = e.start() d = e.start()
def _uploaded(roothash): def _uploaded(thingA_hash):
URI = pack_uri(codec_name=e._codec.get_encoder_type(), URI = pack_uri(storage_index="S" * 20,
codec_params=e._codec.get_serialized_params(),
tail_codec_params=e._tail_codec.get_serialized_params(),
verifierid="V" * 20,
fileid="F" * 20,
key=nonkey, key=nonkey,
roothash=roothash, thingA_hash=thingA_hash,
needed_shares=e.required_shares, needed_shares=e.required_shares,
total_shares=e.num_shares, total_shares=e.num_shares,
size=e.file_size, size=e.file_size)
segment_size=e.segment_size)
client = None client = None
target = download.Data() target = download.Data()
fd = download.FileDownloader(client, URI, target) fd = download.FileDownloader(client, URI, target)
fd.check_verifierid = False fd.check_verifierid = False
fd.check_fileid = False fd.check_fileid = False
# grab a copy of thingA from one of the shareholders
thingA = shareholders[0].thingA
thingA_data = bencode.bdecode(thingA)
NOTthingA = {'codec_name': e._codec.get_encoder_type(),
'codec_params': e._codec.get_serialized_params(),
'tail_codec_params': e._tail_codec.get_serialized_params(),
'verifierid': "V" * 20,
'fileid': "F" * 20,
#'share_root_hash': roothash,
'segment_size': e.segment_size,
'needed_shares': e.required_shares,
'total_shares': e.num_shares,
}
fd._got_thingA(thingA_data)
for shnum in range(AVAILABLE_SHARES): for shnum in range(AVAILABLE_SHARES):
bucket = all_shareholders[shnum] bucket = all_shareholders[shnum]
fd.add_share_bucket(shnum, bucket) fd.add_share_bucket(shnum, bucket)
fd._got_all_shareholders(None) fd._got_all_shareholders(None)
fd._create_validated_buckets(None)
d2 = fd._download_all_segments(None) d2 = fd._download_all_segments(None)
d2.addCallback(fd._done) d2.addCallback(fd._done)
return d2 return d2

View File

@ -213,17 +213,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
return good[:-1] + chr(ord(good[-1]) ^ 0x01) return good[:-1] + chr(ord(good[-1]) ^ 0x01)
def mangle_uri(self, gooduri): def mangle_uri(self, gooduri):
# change the verifierid, which means we'll be asking about the wrong # change the storage index, which means we'll be asking about the
# file, so nobody will have any shares # wrong file, so nobody will have any shares
d = uri.unpack_uri(gooduri) d = uri.unpack_uri(gooduri)
assert len(d['verifierid']) == 20 assert len(d['storage_index']) == 20
d['verifierid'] = self.flip_bit(d['verifierid']) d['storage_index'] = self.flip_bit(d['storage_index'])
return uri.pack_uri(**d) return uri.pack_uri(**d)
# TODO: add a test which mangles the fileid instead, and should fail in # TODO: add a test which mangles the thingA_hash instead, and should fail
# the post-download phase when the file's integrity check fails. Do the # due to not being able to get a valid thingA block. Also a test which
# same thing for the key, which should cause the download to fail the # sneakily mangles the thingA block to change some of the validation
# post-download verifierid check. # data, so it will fail in the post-download phase when the file's
# crypttext integrity check fails. Do the same thing for the key, which
# should cause the download to fail the post-download plaintext
# verifierid check.
def test_vdrive(self): def test_vdrive(self):
self.basedir = "test_system/SystemTest/test_vdrive" self.basedir = "test_system/SystemTest/test_vdrive"

View File

@ -25,13 +25,10 @@ class GoodServer(unittest.TestCase):
self.failUnless(isinstance(uri, str)) self.failUnless(isinstance(uri, str))
self.failUnless(uri.startswith("URI:")) self.failUnless(uri.startswith("URI:"))
d = unpack_uri(uri) d = unpack_uri(uri)
self.failUnless(isinstance(d['verifierid'], str)) self.failUnless(isinstance(d['storage_index'], str))
self.failUnlessEqual(len(d['verifierid']), 20) self.failUnlessEqual(len(d['storage_index']), 20)
self.failUnless(isinstance(d['fileid'], str))
self.failUnlessEqual(len(d['fileid']), 20)
self.failUnless(isinstance(d['key'], str)) self.failUnless(isinstance(d['key'], str))
self.failUnlessEqual(len(d['key']), 16) self.failUnlessEqual(len(d['key']), 16)
self.failUnless(isinstance(d['codec_params'], str))
def testData(self): def testData(self):
data = "This is some data to upload" data = "This is some data to upload"

View File

@ -237,23 +237,21 @@ class FileUploader:
buckets.update(peer.buckets) buckets.update(peer.buckets)
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers]) assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
self._encoder.set_shareholders(buckets) self._encoder.set_shareholders(buckets)
thingA_data = {}
thingA_data['verifierid'] = self._verifierid
thingA_data['fileid'] = self._fileid
self._encoder.set_thingA_data(thingA_data)
return self._encoder.start() return self._encoder.start()
def _compute_uri(self, roothash): def _compute_uri(self, thingA_hash):
codec_type = self._encoder._codec.get_encoder_type() return pack_uri(storage_index=self._verifierid,
codec_params = self._encoder._codec.get_serialized_params()
tail_codec_params = self._encoder._tail_codec.get_serialized_params()
return pack_uri(codec_name=codec_type,
codec_params=codec_params,
tail_codec_params=tail_codec_params,
verifierid=self._verifierid,
fileid=self._fileid,
key=self._encryption_key, key=self._encryption_key,
roothash=roothash, thingA_hash=thingA_hash,
needed_shares=self.needed_shares, needed_shares=self.needed_shares,
total_shares=self.total_shares, total_shares=self.total_shares,
size=self._size, size=self._size,
segment_size=self._encoder.segment_size) )
def netstring(s): def netstring(s):

View File

@ -5,39 +5,39 @@ from allmydata.util import idlib
# enough information to retrieve and validate the contents. It shall be # enough information to retrieve and validate the contents. It shall be
# expressed in a limited character set (namely [TODO]). # expressed in a limited character set (namely [TODO]).
def pack_uri(codec_name, codec_params, tail_codec_params, def pack_uri(storage_index, key, thingA_hash,
verifierid, fileid, key, needed_shares, total_shares, size):
roothash, needed_shares, total_shares, size, segment_size):
# applications should pass keyword parameters into this # applications should pass keyword parameters into this
assert isinstance(codec_name, str) assert isinstance(storage_index, str)
assert len(codec_name) < 10 assert len(storage_index) == 20 # sha1 hash. TODO: sha256
assert ":" not in codec_name
assert isinstance(codec_params, str) assert isinstance(thingA_hash, str)
assert ":" not in codec_params assert len(thingA_hash) == 32 # sha56 hash
assert isinstance(tail_codec_params, str)
assert ":" not in tail_codec_params
assert isinstance(verifierid, str)
assert len(verifierid) == 20 # sha1 hash
assert isinstance(fileid, str)
assert len(fileid) == 20 # sha1 hash
assert isinstance(key, str) assert isinstance(key, str)
assert len(key) == 16 # AES-128 assert len(key) == 16 # AES-128
return "URI:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s:%s" % (codec_name, codec_params, tail_codec_params, idlib.b2a(verifierid), idlib.b2a(fileid), idlib.b2a(key), idlib.b2a(roothash), needed_shares, total_shares, size, segment_size) assert isinstance(needed_shares, int)
assert isinstance(total_shares, int)
assert isinstance(size, (int,long))
return "URI:%s:%s:%s:%d:%d:%d" % (idlib.b2a(storage_index), idlib.b2a(key),
idlib.b2a(thingA_hash),
needed_shares, total_shares, size)
def unpack_uri(uri): def unpack_uri(uri):
assert uri.startswith("URI:") assert uri.startswith("URI:")
d = {} d = {}
header, d['codec_name'], d['codec_params'], d['tail_codec_params'], verifierid_s, fileid_s, key_s, roothash_s, needed_shares_s, total_shares_s, size_s, segment_size_s = uri.split(":") (header,
storage_index_s, key_s, thingA_hash_s,
needed_shares_s, total_shares_s, size_s) = uri.split(":")
assert header == "URI" assert header == "URI"
d['verifierid'] = idlib.a2b(verifierid_s) d['storage_index'] = idlib.a2b(storage_index_s)
d['fileid'] = idlib.a2b(fileid_s)
d['key'] = idlib.a2b(key_s) d['key'] = idlib.a2b(key_s)
d['roothash'] = idlib.a2b(roothash_s) d['thingA_hash'] = idlib.a2b(thingA_hash_s)
d['needed_shares'] = int(needed_shares_s) d['needed_shares'] = int(needed_shares_s)
d['total_shares'] = int(total_shares_s) d['total_shares'] = int(total_shares_s)
d['size'] = int(size_s) d['size'] = int(size_s)
d['segment_size'] = int(segment_size_s)
return d return d