From d8b71b85f8f83bdb7bf57eaa760e81208d1b4687 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Fri, 6 Apr 2007 22:51:19 -0700 Subject: [PATCH] download: retrieve share hashes when downloading. We don't really do much validation with them yet, though. --- src/allmydata/download.py | 68 ++++++++++++++++++++++++++++++---- src/allmydata/storageserver.py | 9 +++-- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/src/allmydata/download.py b/src/allmydata/download.py index 12cc03de2..458e7f2a6 100644 --- a/src/allmydata/download.py +++ b/src/allmydata/download.py @@ -7,7 +7,7 @@ from twisted.application import service from allmydata.util import idlib, mathutil from allmydata.util.assertutil import _assert -from allmydata import codec +from allmydata import codec, chunk from allmydata.Crypto.Cipher import AES from allmydata.uri import unpack_uri from allmydata.interfaces import IDownloadTarget, IDownloader @@ -105,6 +105,55 @@ class SegmentDownloader: # Now we have enough buckets, in self.parent.active_buckets. + # before we get any blocks of a given share, we need to be able to + # validate that block and that share. Check to see if we have enough + # hashes. If we don't, grab them before continuing. + d = self._grab_needed_hashes() + d.addCallback(self._download_some_blocks) + return d + + def _grab_needed_hashes(self): + # each bucket is holding the hashes necessary to validate their + # share. So it suffices to ask everybody for all the hashes they know + # about. Eventually we'll have all that we need, so we can stop + # asking. + + # for each active share, see what hashes we need + ht = self.parent.get_share_hashtree() + needed_hashes = set() + for shnum in self.parent.active_buckets: + needed_hashes.update(ht.needed_hashes(shnum)) + if not needed_hashes: + return defer.succeed(None) + + # for now, just ask everybody for everything + # TODO: send fewer queries + dl = [] + for shnum, bucket in self.parent.active_buckets.iteritems(): + d = bucket.callRemote("get_share_hashes") + d.addCallback(self._got_share_hashes, shnum, bucket) + dl.append(d) + d.addCallback(self._validate_root) + return defer.DeferredList(dl) + + def _got_share_hashes(self, share_hashes, shnum, bucket): + ht = self.parent.get_share_hashtree() + for hashnum, sharehash in share_hashes: + # TODO: we're accumulating these hashes blindly, since we only + # validate the leaves. This makes it possible for someone to + # frame another server by giving us bad internal hashes. We pass + # 'shnum' and 'bucket' in so that if we detected problems with + # intermediate nodes, we could associate the error with the + # bucket and stop using them. + ht.set_hash(hashnum, sharehash) + + def _validate_root(self, res): + # TODO: I dunno, check that the hash tree looks good so far and that + # it adds up to the root. The idea is to reject any bad buckets + # early. + pass + + def _download_some_blocks(self, res): # in test cases, bd.start might mutate active_buckets right away, so # we need to put off calling start() until we've iterated all the way # through it @@ -153,13 +202,18 @@ class FileDownloader: self._output = Output(downloadable, key) # future: - # self._share_hash_tree = ?? - # self._subshare_hash_trees = {} # k:shnum, v: hashtree # each time we start using a new shnum, we must acquire a share hash - # from one of the buckets that provides that shnum, then validate it against - # the rest of the share hash tree that they provide. Then, each time we - # get a block in that share, we must validate the block against the rest - # of the subshare hash tree that that bucket will provide. + # from one of the buckets that provides that shnum, then validate it + # against the rest of the share hash tree that they provide. Then, + # each time we get a block in that share, we must validate the block + # against the rest of the subshare hash tree that that bucket will + # provide. + + self._share_hashtree = chunk.IncompleteHashTree(total_shares) + #self._block_hashtrees = {} # k: shnum, v: hashtree + + def get_share_hashtree(self): + return self._share_hashtree def start(self): log.msg("starting download [%s]" % (idlib.b2a(self._verifierid),)) diff --git a/src/allmydata/storageserver.py b/src/allmydata/storageserver.py index df9b1a615..5bab5e26b 100644 --- a/src/allmydata/storageserver.py +++ b/src/allmydata/storageserver.py @@ -51,7 +51,7 @@ class BucketWriter(Referenceable): def remote_put_share_hashes(self, sharehashes): precondition(not self.closed) - self._write_file('sharehashree', bencode.bencode(sharehashes)) + self._write_file('sharehashes', bencode.bencode(sharehashes)) def remote_close(self): precondition(not self.closed) @@ -89,8 +89,11 @@ class BucketReader(Referenceable): return str2l(self._read_file('blockhashes')) def remote_get_share_hashes(self): - return bencode.bdecode(self._read_file('sharehashes')) - + hashes = bencode.bdecode(self._read_file('sharehashes')) + # tuples come through bdecode(bencode()) as lists, which violates the + # schema + return [tuple(i) for i in hashes] + class StorageServer(service.MultiService, Referenceable): implements(RIStorageServer) name = 'storageserver'