import struct, time from itertools import count from zope.interface import implements from twisted.internet import defer from twisted.python import failure from foolscap.eventual import eventually from allmydata.interfaces import IRetrieveStatus from allmydata.util import hashutil, idlib, log from allmydata import hashtree, codec, storage from allmydata.encode import NotEnoughSharesError from pycryptopp.cipher.aes import AES from common import DictOfSets, CorruptShareError, UncoordinatedWriteError from layout import SIGNED_PREFIX, unpack_share_data class RetrieveStatus: implements(IRetrieveStatus) statusid_counter = count(0) def __init__(self): self.timings = {} self.timings["fetch_per_server"] = {} self.timings["cumulative_verify"] = 0.0 self.sharemap = {} self.problems = {} self.active = True self.storage_index = None self.helper = False self.encoding = ("?","?") self.search_distance = None self.size = None self.status = "Not started" self.progress = 0.0 self.counter = self.statusid_counter.next() self.started = time.time() def get_started(self): return self.started def get_storage_index(self): return self.storage_index def get_encoding(self): return self.encoding def get_search_distance(self): return self.search_distance def using_helper(self): return self.helper def get_size(self): return self.size def get_status(self): return self.status def get_progress(self): return self.progress def get_active(self): return self.active def get_counter(self): return self.counter def set_storage_index(self, si): self.storage_index = si def set_helper(self, helper): self.helper = helper def set_encoding(self, k, n): self.encoding = (k, n) def set_search_distance(self, value): self.search_distance = value def set_size(self, size): self.size = size def set_status(self, status): self.status = status def set_progress(self, value): self.progress = value def set_active(self, value): self.active = value class Marker: pass class Retrieve: # this class is currently single-use. Eventually (in MDMF) we will make # it multi-use, in which case you can call download(range) multiple # times, and each will have a separate response chain. However the # Retrieve object will remain tied to a specific version of the file, and # will use a single ServerMap instance. def __init__(self, filenode, servermap, verinfo): self._node = filenode assert self._node._pubkey self._storage_index = filenode.get_storage_index() assert self._node._readkey self._last_failure = None prefix = storage.si_b2a(self._storage_index)[:5] self._log_number = log.msg("Retrieve(%s): starting" % prefix) self._outstanding_queries = {} # maps (peerid,shnum) to start_time self._running = True self._decoding = False self._bad_shares = set() self.servermap = servermap assert self._node._pubkey self.verinfo = verinfo def log(self, *args, **kwargs): if "parent" not in kwargs: kwargs["parent"] = self._log_number return log.msg(*args, **kwargs) def download(self): self._done_deferred = defer.Deferred() # first, which servers can we use? versionmap = self.servermap.make_versionmap() shares = versionmap[self.verinfo] # this sharemap is consumed as we decide to send requests self.remaining_sharemap = DictOfSets() for (shnum, peerid, timestamp) in shares: self.remaining_sharemap.add(shnum, peerid) self.shares = {} # maps shnum to validated blocks # how many shares do we need? (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.verinfo assert len(self.remaining_sharemap) >= k # we start with the lowest shnums we have available, since FEC is # faster if we're using "primary shares" self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k]) for shnum in self.active_shnums: # we use an arbitrary peer who has the share. If shares are # doubled up (more than one share per peer), we could make this # run faster by spreading the load among multiple peers. But the # algorithm to do that is more complicated than I want to write # right now, and a well-provisioned grid shouldn't have multiple # shares per peer. peerid = list(self.remaining_sharemap[shnum])[0] self.get_data(shnum, peerid) # control flow beyond this point: state machine. Receiving responses # from queries is the input. We might send out more queries, or we # might produce a result. return self._done_deferred def get_data(self, shnum, peerid): self.log(format="sending sh#%(shnum)d request to [%(peerid)s]", shnum=shnum, peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY) ss = self.servermap.connections[peerid] started = time.time() (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.verinfo offsets = dict(offsets_tuple) # we read the checkstring, to make sure that the data we grab is from # the right version. We also read the data, and the hashes necessary # to validate them (share_hash_chain, block_hash_tree, share_data). # We don't read the signature or the pubkey, since that was handled # during the servermap phase, and we'll be comparing the share hash # chain against the roothash that was validated back then. readv = [ (0, struct.calcsize(SIGNED_PREFIX)), (offsets['share_hash_chain'], offsets['enc_privkey'] - offsets['share_hash_chain']), ] m = Marker() self._outstanding_queries[m] = (peerid, shnum, started) # ask the cache first datav = [] #for (offset, length) in readv: # (data, timestamp) = self._node._cache.read(self.verinfo, shnum, # offset, length) # if data is not None: # datav.append(data) if len(datav) == len(readv): self.log("got data from cache") d = defer.succeed(datav) else: self.remaining_sharemap[shnum].remove(peerid) d = self._do_read(ss, peerid, self._storage_index, [shnum], readv) d.addCallback(self._fill_cache, readv) d.addCallback(self._got_results, m, peerid, started) d.addErrback(self._query_failed, m, peerid) # errors that aren't handled by _query_failed (and errors caused by # _query_failed) get logged, but we still want to check for doneness. def _oops(f): self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s", shnum=shnum, peerid=idlib.shortnodeid_b2a(peerid), failure=f, level=log.WEIRD) d.addErrback(_oops) d.addBoth(self._check_for_done) # any error during _check_for_done means the download fails. If the # download is successful, _check_for_done will fire _done by itself. d.addErrback(self._done) d.addErrback(log.err) return d # purely for testing convenience def _fill_cache(self, datavs, readv): timestamp = time.time() for shnum,datav in datavs.items(): for i, (offset, length) in enumerate(readv): data = datav[i] self._node._cache.add(self.verinfo, shnum, offset, data, timestamp) return datavs def _do_read(self, ss, peerid, storage_index, shnums, readv): # isolate the callRemote to a separate method, so tests can subclass # Publish and override it d = ss.callRemote("slot_readv", storage_index, shnums, readv) return d def remove_peer(self, peerid): for shnum in list(self.remaining_sharemap.keys()): self.remaining_sharemap.discard(shnum, peerid) def _got_results(self, datavs, marker, peerid, started): self.log(format="got results (%(shares)d shares) from [%(peerid)s]", shares=len(datavs), peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY) self._outstanding_queries.pop(marker, None) if not self._running: return # note that we only ask for a single share per query, so we only # expect a single share back. On the other hand, we use the extra # shares if we get them.. seems better than an assert(). for shnum,datav in datavs.items(): (prefix, hash_and_data) = datav try: self._got_results_one_share(shnum, peerid, prefix, hash_and_data) except CorruptShareError, e: # log it and give the other shares a chance to be processed f = failure.Failure() self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD) self.remove_peer(peerid) self.servermap.mark_bad_share(peerid, shnum) self._bad_shares.add( (peerid, shnum) ) self._last_failure = f pass # all done! def _got_results_one_share(self, shnum, peerid, got_prefix, got_hash_and_data): self.log("_got_results: got shnum #%d from peerid %s" % (shnum, idlib.shortnodeid_b2a(peerid))) (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.verinfo assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix)) if got_prefix != prefix: msg = "someone wrote to the data since we read the servermap: prefix changed" raise UncoordinatedWriteError(msg) (share_hash_chain, block_hash_tree, share_data) = unpack_share_data(self.verinfo, got_hash_and_data) assert isinstance(share_data, str) # build the block hash tree. SDMF has only one leaf. leaves = [hashutil.block_hash(share_data)] t = hashtree.HashTree(leaves) if list(t) != block_hash_tree: raise CorruptShareError(peerid, shnum, "block hash tree failure") share_hash_leaf = t[0] t2 = hashtree.IncompleteHashTree(N) # root_hash was checked by the signature t2.set_hashes({0: root_hash}) try: t2.set_hashes(hashes=share_hash_chain, leaves={shnum: share_hash_leaf}) except (hashtree.BadHashError, hashtree.NotEnoughHashesError, IndexError), e: msg = "corrupt hashes: %s" % (e,) raise CorruptShareError(peerid, shnum, msg) self.log(" data valid! len=%d" % len(share_data)) # each query comes down to this: placing validated share data into # self.shares self.shares[shnum] = share_data def _query_failed(self, f, marker, peerid): self.log(format="query to [%(peerid)s] failed", peerid=idlib.shortnodeid_b2a(peerid), level=log.NOISY) self._outstanding_queries.pop(marker, None) if not self._running: return self._last_failure = f self.remove_peer(peerid) self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD) def _check_for_done(self, res): # exit paths: # return : keep waiting, no new queries # return self._send_more_queries(outstanding) : send some more queries # fire self._done(plaintext) : download successful # raise exception : download fails self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s", running=self._running, decoding=self._decoding, level=log.NOISY) if not self._running: return if self._decoding: return (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.verinfo if len(self.shares) < k: # we don't have enough shares yet return self._maybe_send_more_queries(k) # we have enough to finish. All the shares have had their hashes # checked, so if something fails at this point, we don't know how # to fix it, so the download will fail. self._decoding = True # avoid reentrancy d = defer.maybeDeferred(self._decode) d.addCallback(self._decrypt, IV, self._node._readkey) d.addBoth(self._done) return d # purely for test convenience def _maybe_send_more_queries(self, k): # we don't have enough shares yet. Should we send out more queries? # There are some number of queries outstanding, each for a single # share. If we can generate 'needed_shares' additional queries, we do # so. If we can't, then we know this file is a goner, and we raise # NotEnoughSharesError. self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, " "outstanding=%(outstanding)d"), have=len(self.shares), k=k, outstanding=len(self._outstanding_queries), level=log.NOISY) remaining_shares = k - len(self.shares) needed = remaining_shares - len(self._outstanding_queries) if not needed: # we have enough queries in flight already # TODO: but if they've been in flight for a long time, and we # have reason to believe that new queries might respond faster # (i.e. we've seen other queries come back faster, then consider # sending out new queries. This could help with peers which have # silently gone away since the servermap was updated, for which # we're still waiting for the 15-minute TCP disconnect to happen. self.log("enough queries are in flight, no more are needed", level=log.NOISY) return outstanding_shnums = set([shnum for (peerid, shnum, started) in self._outstanding_queries.values()]) # prefer low-numbered shares, they are more likely to be primary available_shnums = sorted(self.remaining_sharemap.keys()) for shnum in available_shnums: if shnum in outstanding_shnums: # skip ones that are already in transit continue if shnum not in self.remaining_sharemap: # no servers for that shnum. note that DictOfSets removes # empty sets from the dict for us. continue peerid = list(self.remaining_sharemap[shnum])[0] # get_data will remove that peerid from the sharemap, and add the # query to self._outstanding_queries self.get_data(shnum, peerid) needed -= 1 if not needed: break # at this point, we have as many outstanding queries as we can. If # needed!=0 then we might not have enough to recover the file. if needed: format = ("ran out of peers: " "have %(have)d shares (k=%(k)d), " "%(outstanding)d queries in flight, " "need %(need)d more, " "found %(bad)d bad shares") args = {"have": len(self.shares), "k": k, "outstanding": len(self._outstanding_queries), "need": needed, "bad": len(self._bad_shares), } self.log(format=format, level=log.WEIRD, **args) err = NotEnoughSharesError("%s, last failure: %s" % (format % args, self._last_failure)) if self._bad_shares: self.log("We found some bad shares this pass. You should " "update the servermap and try again to check " "more peers", level=log.WEIRD) err.servermap = self.servermap raise err return def _decode(self): (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.verinfo # shares_dict is a dict mapping shnum to share data, but the codec # wants two lists. shareids = []; shares = [] for shareid, share in self.shares.items(): shareids.append(shareid) shares.append(share) assert len(shareids) >= k, len(shareids) # zfec really doesn't want extra shares shareids = shareids[:k] shares = shares[:k] fec = codec.CRSDecoder() params = "%d-%d-%d" % (segsize, k, N) fec.set_serialized_params(params) self.log("params %s, we have %d shares" % (params, len(shares))) self.log("about to decode, shareids=%s" % (shareids,)) d = defer.maybeDeferred(fec.decode, shares, shareids) def _done(buffers): self.log(" decode done, %d buffers" % len(buffers)) segment = "".join(buffers) self.log(" joined length %d, datalength %d" % (len(segment), datalength)) segment = segment[:datalength] self.log(" segment len=%d" % len(segment)) return segment def _err(f): self.log(" decode failed: %s" % f) return f d.addCallback(_done) d.addErrback(_err) return d def _decrypt(self, crypttext, IV, readkey): started = time.time() key = hashutil.ssk_readkey_data_hash(IV, readkey) decryptor = AES(key) plaintext = decryptor.process(crypttext) return plaintext def _done(self, res): if not self._running: return self._running = False # res is either the new contents, or a Failure if isinstance(res, failure.Failure): self.log("Retrieve done, with failure", failure=res) else: self.log("Retrieve done, success!") # remember the encoding parameters, use them again next time (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, offsets_tuple) = self.verinfo self._node._populate_required_shares(k) self._node._populate_total_shares(N) eventually(self._done_deferred.callback, res)