download: retrieve share hashes when downloading. We don't really do much validation with them yet, though.

This commit is contained in:
Brian Warner 2007-04-06 22:51:19 -07:00
parent 9a64a9f76e
commit d8b71b85f8
2 changed files with 67 additions and 10 deletions

View File

@ -7,7 +7,7 @@ from twisted.application import service
from allmydata.util import idlib, mathutil from allmydata.util import idlib, mathutil
from allmydata.util.assertutil import _assert from allmydata.util.assertutil import _assert
from allmydata import codec from allmydata import codec, chunk
from allmydata.Crypto.Cipher import AES from allmydata.Crypto.Cipher import AES
from allmydata.uri import unpack_uri from allmydata.uri import unpack_uri
from allmydata.interfaces import IDownloadTarget, IDownloader from allmydata.interfaces import IDownloadTarget, IDownloader
@ -105,6 +105,55 @@ class SegmentDownloader:
# Now we have enough buckets, in self.parent.active_buckets. # Now we have enough buckets, in self.parent.active_buckets.
# before we get any blocks of a given share, we need to be able to
# validate that block and that share. Check to see if we have enough
# hashes. If we don't, grab them before continuing.
d = self._grab_needed_hashes()
d.addCallback(self._download_some_blocks)
return d
def _grab_needed_hashes(self):
# each bucket is holding the hashes necessary to validate their
# share. So it suffices to ask everybody for all the hashes they know
# about. Eventually we'll have all that we need, so we can stop
# asking.
# for each active share, see what hashes we need
ht = self.parent.get_share_hashtree()
needed_hashes = set()
for shnum in self.parent.active_buckets:
needed_hashes.update(ht.needed_hashes(shnum))
if not needed_hashes:
return defer.succeed(None)
# for now, just ask everybody for everything
# TODO: send fewer queries
dl = []
for shnum, bucket in self.parent.active_buckets.iteritems():
d = bucket.callRemote("get_share_hashes")
d.addCallback(self._got_share_hashes, shnum, bucket)
dl.append(d)
d.addCallback(self._validate_root)
return defer.DeferredList(dl)
def _got_share_hashes(self, share_hashes, shnum, bucket):
ht = self.parent.get_share_hashtree()
for hashnum, sharehash in share_hashes:
# TODO: we're accumulating these hashes blindly, since we only
# validate the leaves. This makes it possible for someone to
# frame another server by giving us bad internal hashes. We pass
# 'shnum' and 'bucket' in so that if we detected problems with
# intermediate nodes, we could associate the error with the
# bucket and stop using them.
ht.set_hash(hashnum, sharehash)
def _validate_root(self, res):
# TODO: I dunno, check that the hash tree looks good so far and that
# it adds up to the root. The idea is to reject any bad buckets
# early.
pass
def _download_some_blocks(self, res):
# in test cases, bd.start might mutate active_buckets right away, so # in test cases, bd.start might mutate active_buckets right away, so
# we need to put off calling start() until we've iterated all the way # we need to put off calling start() until we've iterated all the way
# through it # through it
@ -153,13 +202,18 @@ class FileDownloader:
self._output = Output(downloadable, key) self._output = Output(downloadable, key)
# future: # future:
# self._share_hash_tree = ??
# self._subshare_hash_trees = {} # k:shnum, v: hashtree
# each time we start using a new shnum, we must acquire a share hash # each time we start using a new shnum, we must acquire a share hash
# from one of the buckets that provides that shnum, then validate it against # from one of the buckets that provides that shnum, then validate it
# the rest of the share hash tree that they provide. Then, each time we # against the rest of the share hash tree that they provide. Then,
# get a block in that share, we must validate the block against the rest # each time we get a block in that share, we must validate the block
# of the subshare hash tree that that bucket will provide. # against the rest of the subshare hash tree that that bucket will
# provide.
self._share_hashtree = chunk.IncompleteHashTree(total_shares)
#self._block_hashtrees = {} # k: shnum, v: hashtree
def get_share_hashtree(self):
return self._share_hashtree
def start(self): def start(self):
log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),)) log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),))

View File

@ -51,7 +51,7 @@ class BucketWriter(Referenceable):
def remote_put_share_hashes(self, sharehashes): def remote_put_share_hashes(self, sharehashes):
precondition(not self.closed) precondition(not self.closed)
self._write_file('sharehashree', bencode.bencode(sharehashes)) self._write_file('sharehashes', bencode.bencode(sharehashes))
def remote_close(self): def remote_close(self):
precondition(not self.closed) precondition(not self.closed)
@ -89,7 +89,10 @@ class BucketReader(Referenceable):
return str2l(self._read_file('blockhashes')) return str2l(self._read_file('blockhashes'))
def remote_get_share_hashes(self): def remote_get_share_hashes(self):
return bencode.bdecode(self._read_file('sharehashes')) hashes = bencode.bdecode(self._read_file('sharehashes'))
# tuples come through bdecode(bencode()) as lists, which violates the
# schema
return [tuple(i) for i in hashes]
class StorageServer(service.MultiService, Referenceable): class StorageServer(service.MultiService, Referenceable):
implements(RIStorageServer) implements(RIStorageServer)