tahoe-lafs/src/allmydata/mutable/retrieve.py

498 lines
20 KiB
Python
Raw Normal View History

import struct, time
from itertools import count
from zope.interface import implements
from twisted.internet import defer
from twisted.python import failure
from foolscap.eventual import eventually
from allmydata.interfaces import IRetrieveStatus
from allmydata.util import hashutil, idlib, log
from allmydata import hashtree, codec, storage
from allmydata.encode import NotEnoughSharesError
from pycryptopp.cipher.aes import AES
from common import DictOfSets, CorruptShareError, UncoordinatedWriteError
from layout import SIGNED_PREFIX, unpack_share_data
class RetrieveStatus:
implements(IRetrieveStatus)
statusid_counter = count(0)
def __init__(self):
self.timings = {}
self.timings["fetch_per_server"] = {}
self.timings["cumulative_verify"] = 0.0
self.problems = {}
self.active = True
self.storage_index = None
self.helper = False
self.encoding = ("?","?")
self.size = None
self.status = "Not started"
self.progress = 0.0
self.counter = self.statusid_counter.next()
self.started = time.time()
def get_started(self):
return self.started
def get_storage_index(self):
return self.storage_index
def get_encoding(self):
return self.encoding
def using_helper(self):
return self.helper
def get_size(self):
return self.size
def get_status(self):
return self.status
def get_progress(self):
return self.progress
def get_active(self):
return self.active
def get_counter(self):
return self.counter
def add_fetch_timing(self, peerid, elapsed):
if peerid not in self.timings["fetch_per_server"]:
self.timings["fetch_per_server"][peerid] = []
self.timings["fetch_per_server"][peerid].append(elapsed)
def set_storage_index(self, si):
self.storage_index = si
def set_helper(self, helper):
self.helper = helper
def set_encoding(self, k, n):
self.encoding = (k, n)
def set_size(self, size):
self.size = size
def set_status(self, status):
self.status = status
def set_progress(self, value):
self.progress = value
def set_active(self, value):
self.active = value
class Marker:
pass
class Retrieve:
# this class is currently single-use. Eventually (in MDMF) we will make
# it multi-use, in which case you can call download(range) multiple
# times, and each will have a separate response chain. However the
# Retrieve object will remain tied to a specific version of the file, and
# will use a single ServerMap instance.
def __init__(self, filenode, servermap, verinfo):
self._node = filenode
assert self._node._pubkey
self._storage_index = filenode.get_storage_index()
assert self._node._readkey
self._last_failure = None
prefix = storage.si_b2a(self._storage_index)[:5]
self._log_number = log.msg("Retrieve(%s): starting" % prefix)
self._outstanding_queries = {} # maps (peerid,shnum) to start_time
self._running = True
self._decoding = False
self._bad_shares = set()
self.servermap = servermap
assert self._node._pubkey
self.verinfo = verinfo
self._status = RetrieveStatus()
self._status.set_storage_index(self._storage_index)
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
self._status.set_size(datalength)
self._status.set_encoding(k, N)
def get_status(self):
return self._status
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
return log.msg(*args, **kwargs)
def download(self):
self._done_deferred = defer.Deferred()
self._started = time.time()
self._status.set_status("Retrieving Shares")
# first, which servers can we use?
versionmap = self.servermap.make_versionmap()
shares = versionmap[self.verinfo]
# this sharemap is consumed as we decide to send requests
self.remaining_sharemap = DictOfSets()
for (shnum, peerid, timestamp) in shares:
self.remaining_sharemap.add(shnum, peerid)
self.shares = {} # maps shnum to validated blocks
# how many shares do we need?
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
assert len(self.remaining_sharemap) >= k
# we start with the lowest shnums we have available, since FEC is
# faster if we're using "primary shares"
self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k])
for shnum in self.active_shnums:
# we use an arbitrary peer who has the share. If shares are
# doubled up (more than one share per peer), we could make this
# run faster by spreading the load among multiple peers. But the
# algorithm to do that is more complicated than I want to write
# right now, and a well-provisioned grid shouldn't have multiple
# shares per peer.
peerid = list(self.remaining_sharemap[shnum])[0]
self.get_data(shnum, peerid)
# control flow beyond this point: state machine. Receiving responses
# from queries is the input. We might send out more queries, or we
# might produce a result.
return self._done_deferred
def get_data(self, shnum, peerid):
self.log(format="sending sh#%(shnum)d request to [%(peerid)s]",
shnum=shnum,
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY)
ss = self.servermap.connections[peerid]
started = time.time()
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
offsets = dict(offsets_tuple)
# we read the checkstring, to make sure that the data we grab is from
# the right version. We also read the data, and the hashes necessary
# to validate them (share_hash_chain, block_hash_tree, share_data).
# We don't read the signature or the pubkey, since that was handled
# during the servermap phase, and we'll be comparing the share hash
# chain against the roothash that was validated back then.
readv = [ (0, struct.calcsize(SIGNED_PREFIX)),
(offsets['share_hash_chain'],
offsets['enc_privkey'] - offsets['share_hash_chain']),
]
m = Marker()
self._outstanding_queries[m] = (peerid, shnum, started)
# ask the cache first
got_from_cache = False
datav = []
#for (offset, length) in readv:
# (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
# offset, length)
# if data is not None:
# datav.append(data)
if len(datav) == len(readv):
self.log("got data from cache")
got_from_cache = True
d = defer.succeed(datav)
else:
self.remaining_sharemap[shnum].remove(peerid)
d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
d.addCallback(self._fill_cache, readv)
d.addCallback(self._got_results, m, peerid, started, got_from_cache)
d.addErrback(self._query_failed, m, peerid)
# errors that aren't handled by _query_failed (and errors caused by
# _query_failed) get logged, but we still want to check for doneness.
def _oops(f):
self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s",
shnum=shnum,
peerid=idlib.shortnodeid_b2a(peerid),
failure=f,
level=log.WEIRD)
d.addErrback(_oops)
d.addBoth(self._check_for_done)
# any error during _check_for_done means the download fails. If the
# download is successful, _check_for_done will fire _done by itself.
d.addErrback(self._done)
d.addErrback(log.err)
return d # purely for testing convenience
def _fill_cache(self, datavs, readv):
timestamp = time.time()
for shnum,datav in datavs.items():
for i, (offset, length) in enumerate(readv):
data = datav[i]
self._node._cache.add(self.verinfo, shnum, offset, data,
timestamp)
return datavs
def _do_read(self, ss, peerid, storage_index, shnums, readv):
# isolate the callRemote to a separate method, so tests can subclass
# Publish and override it
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
return d
def remove_peer(self, peerid):
for shnum in list(self.remaining_sharemap.keys()):
self.remaining_sharemap.discard(shnum, peerid)
def _got_results(self, datavs, marker, peerid, started, got_from_cache):
now = time.time()
elapsed = now - started
if not got_from_cache:
self._status.add_fetch_timing(peerid, elapsed)
self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
shares=len(datavs),
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY)
self._outstanding_queries.pop(marker, None)
if not self._running:
return
# note that we only ask for a single share per query, so we only
# expect a single share back. On the other hand, we use the extra
# shares if we get them.. seems better than an assert().
for shnum,datav in datavs.items():
(prefix, hash_and_data) = datav
try:
self._got_results_one_share(shnum, peerid,
prefix, hash_and_data)
except CorruptShareError, e:
# log it and give the other shares a chance to be processed
f = failure.Failure()
self.log("bad share: %s %s" % (f, f.value), level=log.WEIRD)
self.remove_peer(peerid)
self.servermap.mark_bad_share(peerid, shnum)
self._bad_shares.add( (peerid, shnum) )
self._status.problems[peerid] = f
self._last_failure = f
pass
# all done!
def _got_results_one_share(self, shnum, peerid,
got_prefix, got_hash_and_data):
self.log("_got_results: got shnum #%d from peerid %s"
% (shnum, idlib.shortnodeid_b2a(peerid)))
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix))
if got_prefix != prefix:
msg = "someone wrote to the data since we read the servermap: prefix changed"
raise UncoordinatedWriteError(msg)
(share_hash_chain, block_hash_tree,
share_data) = unpack_share_data(self.verinfo, got_hash_and_data)
assert isinstance(share_data, str)
# build the block hash tree. SDMF has only one leaf.
leaves = [hashutil.block_hash(share_data)]
t = hashtree.HashTree(leaves)
if list(t) != block_hash_tree:
raise CorruptShareError(peerid, shnum, "block hash tree failure")
share_hash_leaf = t[0]
t2 = hashtree.IncompleteHashTree(N)
# root_hash was checked by the signature
t2.set_hashes({0: root_hash})
try:
t2.set_hashes(hashes=share_hash_chain,
leaves={shnum: share_hash_leaf})
except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
IndexError), e:
msg = "corrupt hashes: %s" % (e,)
raise CorruptShareError(peerid, shnum, msg)
self.log(" data valid! len=%d" % len(share_data))
# each query comes down to this: placing validated share data into
# self.shares
self.shares[shnum] = share_data
def _query_failed(self, f, marker, peerid):
self.log(format="query to [%(peerid)s] failed",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY)
self._status.problems[peerid] = f
self._outstanding_queries.pop(marker, None)
if not self._running:
return
self._last_failure = f
self.remove_peer(peerid)
self.log("error during query: %s %s" % (f, f.value), level=log.WEIRD)
def _check_for_done(self, res):
# exit paths:
# return : keep waiting, no new queries
# return self._send_more_queries(outstanding) : send some more queries
# fire self._done(plaintext) : download successful
# raise exception : download fails
self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s",
running=self._running, decoding=self._decoding,
level=log.NOISY)
if not self._running:
return
if self._decoding:
return
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
if len(self.shares) < k:
# we don't have enough shares yet
return self._maybe_send_more_queries(k)
# we have enough to finish. All the shares have had their hashes
# checked, so if something fails at this point, we don't know how
# to fix it, so the download will fail.
self._decoding = True # avoid reentrancy
self._status.set_status("decoding")
now = time.time()
elapsed = now - self._started
self._status.timings["fetch"] = elapsed
d = defer.maybeDeferred(self._decode)
d.addCallback(self._decrypt, IV, self._node._readkey)
d.addBoth(self._done)
return d # purely for test convenience
def _maybe_send_more_queries(self, k):
# we don't have enough shares yet. Should we send out more queries?
# There are some number of queries outstanding, each for a single
# share. If we can generate 'needed_shares' additional queries, we do
# so. If we can't, then we know this file is a goner, and we raise
# NotEnoughSharesError.
self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, "
"outstanding=%(outstanding)d"),
have=len(self.shares), k=k,
outstanding=len(self._outstanding_queries),
level=log.NOISY)
remaining_shares = k - len(self.shares)
needed = remaining_shares - len(self._outstanding_queries)
if not needed:
# we have enough queries in flight already
# TODO: but if they've been in flight for a long time, and we
# have reason to believe that new queries might respond faster
# (i.e. we've seen other queries come back faster, then consider
# sending out new queries. This could help with peers which have
# silently gone away since the servermap was updated, for which
# we're still waiting for the 15-minute TCP disconnect to happen.
self.log("enough queries are in flight, no more are needed",
level=log.NOISY)
return
outstanding_shnums = set([shnum
for (peerid, shnum, started)
in self._outstanding_queries.values()])
# prefer low-numbered shares, they are more likely to be primary
available_shnums = sorted(self.remaining_sharemap.keys())
for shnum in available_shnums:
if shnum in outstanding_shnums:
# skip ones that are already in transit
continue
if shnum not in self.remaining_sharemap:
# no servers for that shnum. note that DictOfSets removes
# empty sets from the dict for us.
continue
peerid = list(self.remaining_sharemap[shnum])[0]
# get_data will remove that peerid from the sharemap, and add the
# query to self._outstanding_queries
self._status.set_status("Retrieving More Shares")
self.get_data(shnum, peerid)
needed -= 1
if not needed:
break
# at this point, we have as many outstanding queries as we can. If
# needed!=0 then we might not have enough to recover the file.
if needed:
format = ("ran out of peers: "
"have %(have)d shares (k=%(k)d), "
"%(outstanding)d queries in flight, "
"need %(need)d more, "
"found %(bad)d bad shares")
args = {"have": len(self.shares),
"k": k,
"outstanding": len(self._outstanding_queries),
"need": needed,
"bad": len(self._bad_shares),
}
self.log(format=format,
level=log.WEIRD, **args)
err = NotEnoughSharesError("%s, last failure: %s" %
(format % args, self._last_failure))
if self._bad_shares:
self.log("We found some bad shares this pass. You should "
"update the servermap and try again to check "
"more peers",
level=log.WEIRD)
err.servermap = self.servermap
raise err
return
def _decode(self):
started = time.time()
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
# shares_dict is a dict mapping shnum to share data, but the codec
# wants two lists.
shareids = []; shares = []
for shareid, share in self.shares.items():
shareids.append(shareid)
shares.append(share)
assert len(shareids) >= k, len(shareids)
# zfec really doesn't want extra shares
shareids = shareids[:k]
shares = shares[:k]
fec = codec.CRSDecoder()
params = "%d-%d-%d" % (segsize, k, N)
fec.set_serialized_params(params)
self.log("params %s, we have %d shares" % (params, len(shares)))
self.log("about to decode, shareids=%s" % (shareids,))
d = defer.maybeDeferred(fec.decode, shares, shareids)
def _done(buffers):
self._status.timings["decode"] = time.time() - started
self.log(" decode done, %d buffers" % len(buffers))
segment = "".join(buffers)
self.log(" joined length %d, datalength %d" %
(len(segment), datalength))
segment = segment[:datalength]
self.log(" segment len=%d" % len(segment))
return segment
def _err(f):
self.log(" decode failed: %s" % f)
return f
d.addCallback(_done)
d.addErrback(_err)
return d
def _decrypt(self, crypttext, IV, readkey):
self._status.set_status("decrypting")
started = time.time()
key = hashutil.ssk_readkey_data_hash(IV, readkey)
decryptor = AES(key)
plaintext = decryptor.process(crypttext)
self._status.timings["decrypt"] = time.time() - started
return plaintext
def _done(self, res):
if not self._running:
return
self._running = False
self._status.set_active(False)
self._status.timings["total"] = time.time() - self._started
# res is either the new contents, or a Failure
if isinstance(res, failure.Failure):
self.log("Retrieve done, with failure", failure=res)
self._status.set_status("Failed")
else:
2008-04-16 22:22:30 +00:00
self.log("Retrieve done, success!")
self._status.set_status("Done")
self._status.set_progress(1.0)
# remember the encoding parameters, use them again next time
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
self._node._populate_required_shares(k)
self._node._populate_total_shares(N)
eventually(self._done_deferred.callback, res)