2008-04-11 21:31:16 +00:00
|
|
|
|
|
|
|
import struct, time
|
|
|
|
from itertools import count
|
|
|
|
from zope.interface import implements
|
|
|
|
from twisted.internet import defer
|
|
|
|
from twisted.python import failure
|
2008-08-26 00:51:55 +00:00
|
|
|
from foolscap import DeadReferenceError
|
2008-04-22 00:27:50 +00:00
|
|
|
from foolscap.eventual import eventually, fireEventually
|
2008-10-27 20:34:49 +00:00
|
|
|
from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError
|
2008-04-11 21:31:16 +00:00
|
|
|
from allmydata.util import hashutil, idlib, log
|
|
|
|
from allmydata import hashtree, codec, storage
|
|
|
|
from pycryptopp.cipher.aes import AES
|
2008-08-26 23:34:54 +00:00
|
|
|
from pycryptopp.publickey import rsa
|
2008-04-11 21:31:16 +00:00
|
|
|
|
|
|
|
from common import DictOfSets, CorruptShareError, UncoordinatedWriteError
|
|
|
|
from layout import SIGNED_PREFIX, unpack_share_data
|
|
|
|
|
|
|
|
class RetrieveStatus:
|
|
|
|
implements(IRetrieveStatus)
|
|
|
|
statusid_counter = count(0)
|
|
|
|
def __init__(self):
|
|
|
|
self.timings = {}
|
|
|
|
self.timings["fetch_per_server"] = {}
|
|
|
|
self.timings["cumulative_verify"] = 0.0
|
|
|
|
self.problems = {}
|
|
|
|
self.active = True
|
|
|
|
self.storage_index = None
|
|
|
|
self.helper = False
|
|
|
|
self.encoding = ("?","?")
|
|
|
|
self.size = None
|
|
|
|
self.status = "Not started"
|
|
|
|
self.progress = 0.0
|
|
|
|
self.counter = self.statusid_counter.next()
|
|
|
|
self.started = time.time()
|
|
|
|
|
|
|
|
def get_started(self):
|
|
|
|
return self.started
|
|
|
|
def get_storage_index(self):
|
|
|
|
return self.storage_index
|
|
|
|
def get_encoding(self):
|
|
|
|
return self.encoding
|
|
|
|
def using_helper(self):
|
|
|
|
return self.helper
|
|
|
|
def get_size(self):
|
|
|
|
return self.size
|
|
|
|
def get_status(self):
|
|
|
|
return self.status
|
|
|
|
def get_progress(self):
|
|
|
|
return self.progress
|
|
|
|
def get_active(self):
|
|
|
|
return self.active
|
|
|
|
def get_counter(self):
|
|
|
|
return self.counter
|
|
|
|
|
2008-04-17 00:49:06 +00:00
|
|
|
def add_fetch_timing(self, peerid, elapsed):
|
|
|
|
if peerid not in self.timings["fetch_per_server"]:
|
|
|
|
self.timings["fetch_per_server"][peerid] = []
|
|
|
|
self.timings["fetch_per_server"][peerid].append(elapsed)
|
2008-04-11 21:31:16 +00:00
|
|
|
def set_storage_index(self, si):
|
|
|
|
self.storage_index = si
|
|
|
|
def set_helper(self, helper):
|
|
|
|
self.helper = helper
|
|
|
|
def set_encoding(self, k, n):
|
|
|
|
self.encoding = (k, n)
|
|
|
|
def set_size(self, size):
|
|
|
|
self.size = size
|
|
|
|
def set_status(self, status):
|
|
|
|
self.status = status
|
|
|
|
def set_progress(self, value):
|
|
|
|
self.progress = value
|
|
|
|
def set_active(self, value):
|
|
|
|
self.active = value
|
|
|
|
|
|
|
|
class Marker:
|
|
|
|
pass
|
|
|
|
|
|
|
|
class Retrieve:
|
|
|
|
# this class is currently single-use. Eventually (in MDMF) we will make
|
|
|
|
# it multi-use, in which case you can call download(range) multiple
|
|
|
|
# times, and each will have a separate response chain. However the
|
|
|
|
# Retrieve object will remain tied to a specific version of the file, and
|
|
|
|
# will use a single ServerMap instance.
|
|
|
|
|
2008-08-26 23:34:54 +00:00
|
|
|
def __init__(self, filenode, servermap, verinfo, fetch_privkey=False):
|
2008-04-11 21:31:16 +00:00
|
|
|
self._node = filenode
|
|
|
|
assert self._node._pubkey
|
|
|
|
self._storage_index = filenode.get_storage_index()
|
|
|
|
assert self._node._readkey
|
|
|
|
self._last_failure = None
|
|
|
|
prefix = storage.si_b2a(self._storage_index)[:5]
|
|
|
|
self._log_number = log.msg("Retrieve(%s): starting" % prefix)
|
|
|
|
self._outstanding_queries = {} # maps (peerid,shnum) to start_time
|
|
|
|
self._running = True
|
|
|
|
self._decoding = False
|
2008-04-15 22:58:02 +00:00
|
|
|
self._bad_shares = set()
|
2008-04-11 21:31:16 +00:00
|
|
|
|
|
|
|
self.servermap = servermap
|
|
|
|
assert self._node._pubkey
|
|
|
|
self.verinfo = verinfo
|
2008-08-26 23:34:54 +00:00
|
|
|
# during repair, we may be called upon to grab the private key, since
|
|
|
|
# it wasn't picked up during a verify=False checker run, and we'll
|
|
|
|
# need it for repair to generate the a new version.
|
|
|
|
self._need_privkey = fetch_privkey
|
|
|
|
if self._node._privkey:
|
|
|
|
self._need_privkey = False
|
2008-04-11 21:31:16 +00:00
|
|
|
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status = RetrieveStatus()
|
|
|
|
self._status.set_storage_index(self._storage_index)
|
|
|
|
self._status.set_helper(False)
|
|
|
|
self._status.set_progress(0.0)
|
|
|
|
self._status.set_active(True)
|
|
|
|
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
|
|
|
offsets_tuple) = self.verinfo
|
|
|
|
self._status.set_size(datalength)
|
|
|
|
self._status.set_encoding(k, N)
|
|
|
|
|
|
|
|
def get_status(self):
|
|
|
|
return self._status
|
|
|
|
|
2008-04-11 21:31:16 +00:00
|
|
|
def log(self, *args, **kwargs):
|
|
|
|
if "parent" not in kwargs:
|
|
|
|
kwargs["parent"] = self._log_number
|
2008-08-26 00:51:55 +00:00
|
|
|
if "facility" not in kwargs:
|
|
|
|
kwargs["facility"] = "tahoe.mutable.retrieve"
|
2008-04-11 21:31:16 +00:00
|
|
|
return log.msg(*args, **kwargs)
|
|
|
|
|
|
|
|
def download(self):
|
|
|
|
self._done_deferred = defer.Deferred()
|
2008-04-17 00:49:06 +00:00
|
|
|
self._started = time.time()
|
|
|
|
self._status.set_status("Retrieving Shares")
|
2008-04-11 21:31:16 +00:00
|
|
|
|
|
|
|
# first, which servers can we use?
|
|
|
|
versionmap = self.servermap.make_versionmap()
|
|
|
|
shares = versionmap[self.verinfo]
|
|
|
|
# this sharemap is consumed as we decide to send requests
|
|
|
|
self.remaining_sharemap = DictOfSets()
|
|
|
|
for (shnum, peerid, timestamp) in shares:
|
|
|
|
self.remaining_sharemap.add(shnum, peerid)
|
|
|
|
|
|
|
|
self.shares = {} # maps shnum to validated blocks
|
|
|
|
|
|
|
|
# how many shares do we need?
|
|
|
|
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
|
|
|
offsets_tuple) = self.verinfo
|
|
|
|
assert len(self.remaining_sharemap) >= k
|
|
|
|
# we start with the lowest shnums we have available, since FEC is
|
|
|
|
# faster if we're using "primary shares"
|
|
|
|
self.active_shnums = set(sorted(self.remaining_sharemap.keys())[:k])
|
|
|
|
for shnum in self.active_shnums:
|
|
|
|
# we use an arbitrary peer who has the share. If shares are
|
|
|
|
# doubled up (more than one share per peer), we could make this
|
|
|
|
# run faster by spreading the load among multiple peers. But the
|
|
|
|
# algorithm to do that is more complicated than I want to write
|
|
|
|
# right now, and a well-provisioned grid shouldn't have multiple
|
|
|
|
# shares per peer.
|
|
|
|
peerid = list(self.remaining_sharemap[shnum])[0]
|
|
|
|
self.get_data(shnum, peerid)
|
|
|
|
|
|
|
|
# control flow beyond this point: state machine. Receiving responses
|
|
|
|
# from queries is the input. We might send out more queries, or we
|
|
|
|
# might produce a result.
|
|
|
|
|
|
|
|
return self._done_deferred
|
|
|
|
|
|
|
|
def get_data(self, shnum, peerid):
|
|
|
|
self.log(format="sending sh#%(shnum)d request to [%(peerid)s]",
|
|
|
|
shnum=shnum,
|
|
|
|
peerid=idlib.shortnodeid_b2a(peerid),
|
|
|
|
level=log.NOISY)
|
|
|
|
ss = self.servermap.connections[peerid]
|
|
|
|
started = time.time()
|
|
|
|
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
|
|
|
offsets_tuple) = self.verinfo
|
|
|
|
offsets = dict(offsets_tuple)
|
2008-08-26 23:34:54 +00:00
|
|
|
|
2008-04-11 21:31:16 +00:00
|
|
|
# we read the checkstring, to make sure that the data we grab is from
|
2008-08-26 23:34:54 +00:00
|
|
|
# the right version.
|
|
|
|
readv = [ (0, struct.calcsize(SIGNED_PREFIX)) ]
|
|
|
|
|
|
|
|
# We also read the data, and the hashes necessary to validate them
|
|
|
|
# (share_hash_chain, block_hash_tree, share_data). We don't read the
|
|
|
|
# signature or the pubkey, since that was handled during the
|
|
|
|
# servermap phase, and we'll be comparing the share hash chain
|
|
|
|
# against the roothash that was validated back then.
|
|
|
|
|
|
|
|
readv.append( (offsets['share_hash_chain'],
|
|
|
|
offsets['enc_privkey'] - offsets['share_hash_chain'] ) )
|
|
|
|
|
|
|
|
# if we need the private key (for repair), we also fetch that
|
|
|
|
if self._need_privkey:
|
|
|
|
readv.append( (offsets['enc_privkey'],
|
|
|
|
offsets['EOF'] - offsets['enc_privkey']) )
|
2008-04-11 21:31:16 +00:00
|
|
|
|
|
|
|
m = Marker()
|
|
|
|
self._outstanding_queries[m] = (peerid, shnum, started)
|
|
|
|
|
|
|
|
# ask the cache first
|
2008-04-17 00:49:06 +00:00
|
|
|
got_from_cache = False
|
2008-04-22 00:27:50 +00:00
|
|
|
datavs = []
|
|
|
|
for (offset, length) in readv:
|
|
|
|
(data, timestamp) = self._node._cache.read(self.verinfo, shnum,
|
|
|
|
offset, length)
|
|
|
|
if data is not None:
|
|
|
|
datavs.append(data)
|
|
|
|
if len(datavs) == len(readv):
|
2008-04-11 21:31:16 +00:00
|
|
|
self.log("got data from cache")
|
2008-04-17 00:49:06 +00:00
|
|
|
got_from_cache = True
|
2008-04-22 00:27:50 +00:00
|
|
|
d = fireEventually({shnum: datavs})
|
|
|
|
# datavs is a dict mapping shnum to a pair of strings
|
2008-04-11 21:31:16 +00:00
|
|
|
else:
|
|
|
|
d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
|
2008-04-22 00:27:50 +00:00
|
|
|
self.remaining_sharemap.discard(shnum, peerid)
|
2008-04-11 21:31:16 +00:00
|
|
|
|
2008-04-17 00:49:06 +00:00
|
|
|
d.addCallback(self._got_results, m, peerid, started, got_from_cache)
|
2008-04-11 21:31:16 +00:00
|
|
|
d.addErrback(self._query_failed, m, peerid)
|
|
|
|
# errors that aren't handled by _query_failed (and errors caused by
|
|
|
|
# _query_failed) get logged, but we still want to check for doneness.
|
|
|
|
def _oops(f):
|
|
|
|
self.log(format="problem in _query_failed for sh#%(shnum)d to %(peerid)s",
|
|
|
|
shnum=shnum,
|
|
|
|
peerid=idlib.shortnodeid_b2a(peerid),
|
|
|
|
failure=f,
|
2008-08-26 01:57:59 +00:00
|
|
|
level=log.WEIRD, umid="W0xnQA")
|
2008-04-11 21:31:16 +00:00
|
|
|
d.addErrback(_oops)
|
|
|
|
d.addBoth(self._check_for_done)
|
|
|
|
# any error during _check_for_done means the download fails. If the
|
|
|
|
# download is successful, _check_for_done will fire _done by itself.
|
|
|
|
d.addErrback(self._done)
|
|
|
|
d.addErrback(log.err)
|
|
|
|
return d # purely for testing convenience
|
|
|
|
|
|
|
|
def _do_read(self, ss, peerid, storage_index, shnums, readv):
|
|
|
|
# isolate the callRemote to a separate method, so tests can subclass
|
|
|
|
# Publish and override it
|
|
|
|
d = ss.callRemote("slot_readv", storage_index, shnums, readv)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def remove_peer(self, peerid):
|
|
|
|
for shnum in list(self.remaining_sharemap.keys()):
|
|
|
|
self.remaining_sharemap.discard(shnum, peerid)
|
|
|
|
|
2008-04-17 00:49:06 +00:00
|
|
|
def _got_results(self, datavs, marker, peerid, started, got_from_cache):
|
|
|
|
now = time.time()
|
|
|
|
elapsed = now - started
|
|
|
|
if not got_from_cache:
|
|
|
|
self._status.add_fetch_timing(peerid, elapsed)
|
2008-04-11 21:31:16 +00:00
|
|
|
self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
|
|
|
|
shares=len(datavs),
|
|
|
|
peerid=idlib.shortnodeid_b2a(peerid),
|
|
|
|
level=log.NOISY)
|
|
|
|
self._outstanding_queries.pop(marker, None)
|
|
|
|
if not self._running:
|
|
|
|
return
|
|
|
|
|
|
|
|
# note that we only ask for a single share per query, so we only
|
|
|
|
# expect a single share back. On the other hand, we use the extra
|
|
|
|
# shares if we get them.. seems better than an assert().
|
|
|
|
|
|
|
|
for shnum,datav in datavs.items():
|
2008-08-26 23:34:54 +00:00
|
|
|
(prefix, hash_and_data) = datav[:2]
|
2008-04-11 21:31:16 +00:00
|
|
|
try:
|
|
|
|
self._got_results_one_share(shnum, peerid,
|
|
|
|
prefix, hash_and_data)
|
|
|
|
except CorruptShareError, e:
|
|
|
|
# log it and give the other shares a chance to be processed
|
|
|
|
f = failure.Failure()
|
2008-08-26 00:51:55 +00:00
|
|
|
self.log(format="bad share: %(f_value)s",
|
2008-08-26 01:57:59 +00:00
|
|
|
f_value=str(f.value), failure=f,
|
|
|
|
level=log.WEIRD, umid="7fzWZw")
|
2008-10-24 20:21:28 +00:00
|
|
|
self.notify_server_corruption(peerid, shnum, str(e))
|
2008-04-11 21:31:16 +00:00
|
|
|
self.remove_peer(peerid)
|
2008-07-18 04:09:23 +00:00
|
|
|
self.servermap.mark_bad_share(peerid, shnum, prefix)
|
2008-04-15 22:58:02 +00:00
|
|
|
self._bad_shares.add( (peerid, shnum) )
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.problems[peerid] = f
|
2008-04-11 21:31:16 +00:00
|
|
|
self._last_failure = f
|
|
|
|
pass
|
2008-08-26 23:34:54 +00:00
|
|
|
if self._need_privkey and len(datav) > 2:
|
|
|
|
lp = None
|
|
|
|
self._try_to_validate_privkey(datav[2], peerid, shnum, lp)
|
2008-04-11 21:31:16 +00:00
|
|
|
# all done!
|
|
|
|
|
2008-10-24 20:21:28 +00:00
|
|
|
def notify_server_corruption(self, peerid, shnum, reason):
|
|
|
|
ss = self.servermap.connections[peerid]
|
|
|
|
ss.callRemoteOnly("advise_corrupt_share",
|
|
|
|
"mutable", self._storage_index, shnum, reason)
|
|
|
|
|
2008-04-11 21:31:16 +00:00
|
|
|
def _got_results_one_share(self, shnum, peerid,
|
|
|
|
got_prefix, got_hash_and_data):
|
|
|
|
self.log("_got_results: got shnum #%d from peerid %s"
|
|
|
|
% (shnum, idlib.shortnodeid_b2a(peerid)))
|
|
|
|
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
|
|
|
offsets_tuple) = self.verinfo
|
|
|
|
assert len(got_prefix) == len(prefix), (len(got_prefix), len(prefix))
|
|
|
|
if got_prefix != prefix:
|
|
|
|
msg = "someone wrote to the data since we read the servermap: prefix changed"
|
|
|
|
raise UncoordinatedWriteError(msg)
|
|
|
|
(share_hash_chain, block_hash_tree,
|
|
|
|
share_data) = unpack_share_data(self.verinfo, got_hash_and_data)
|
|
|
|
|
|
|
|
assert isinstance(share_data, str)
|
|
|
|
# build the block hash tree. SDMF has only one leaf.
|
|
|
|
leaves = [hashutil.block_hash(share_data)]
|
|
|
|
t = hashtree.HashTree(leaves)
|
|
|
|
if list(t) != block_hash_tree:
|
|
|
|
raise CorruptShareError(peerid, shnum, "block hash tree failure")
|
|
|
|
share_hash_leaf = t[0]
|
|
|
|
t2 = hashtree.IncompleteHashTree(N)
|
|
|
|
# root_hash was checked by the signature
|
|
|
|
t2.set_hashes({0: root_hash})
|
|
|
|
try:
|
|
|
|
t2.set_hashes(hashes=share_hash_chain,
|
|
|
|
leaves={shnum: share_hash_leaf})
|
|
|
|
except (hashtree.BadHashError, hashtree.NotEnoughHashesError,
|
|
|
|
IndexError), e:
|
|
|
|
msg = "corrupt hashes: %s" % (e,)
|
|
|
|
raise CorruptShareError(peerid, shnum, msg)
|
|
|
|
self.log(" data valid! len=%d" % len(share_data))
|
|
|
|
# each query comes down to this: placing validated share data into
|
|
|
|
# self.shares
|
|
|
|
self.shares[shnum] = share_data
|
|
|
|
|
2008-08-26 23:34:54 +00:00
|
|
|
def _try_to_validate_privkey(self, enc_privkey, peerid, shnum, lp):
|
|
|
|
|
|
|
|
alleged_privkey_s = self._node._decrypt_privkey(enc_privkey)
|
|
|
|
alleged_writekey = hashutil.ssk_writekey_hash(alleged_privkey_s)
|
|
|
|
if alleged_writekey != self._node.get_writekey():
|
|
|
|
self.log("invalid privkey from %s shnum %d" %
|
|
|
|
(idlib.nodeid_b2a(peerid)[:8], shnum),
|
|
|
|
parent=lp, level=log.WEIRD, umid="YIw4tA")
|
|
|
|
return
|
|
|
|
|
|
|
|
# it's good
|
|
|
|
self.log("got valid privkey from shnum %d on peerid %s" %
|
|
|
|
(shnum, idlib.shortnodeid_b2a(peerid)),
|
|
|
|
parent=lp)
|
|
|
|
privkey = rsa.create_signing_key_from_string(alleged_privkey_s)
|
|
|
|
self._node._populate_encprivkey(enc_privkey)
|
|
|
|
self._node._populate_privkey(privkey)
|
|
|
|
self._need_privkey = False
|
|
|
|
|
2008-04-11 21:31:16 +00:00
|
|
|
def _query_failed(self, f, marker, peerid):
|
|
|
|
self.log(format="query to [%(peerid)s] failed",
|
|
|
|
peerid=idlib.shortnodeid_b2a(peerid),
|
|
|
|
level=log.NOISY)
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.problems[peerid] = f
|
2008-04-11 21:31:16 +00:00
|
|
|
self._outstanding_queries.pop(marker, None)
|
|
|
|
if not self._running:
|
|
|
|
return
|
|
|
|
self._last_failure = f
|
|
|
|
self.remove_peer(peerid)
|
2008-08-26 00:51:55 +00:00
|
|
|
level = log.WEIRD
|
|
|
|
if f.check(DeadReferenceError):
|
|
|
|
level = log.UNUSUAL
|
|
|
|
self.log(format="error during query: %(f_value)s",
|
2008-08-26 01:57:59 +00:00
|
|
|
f_value=str(f.value), failure=f, level=level, umid="gOJB5g")
|
2008-04-11 21:31:16 +00:00
|
|
|
|
|
|
|
def _check_for_done(self, res):
|
|
|
|
# exit paths:
|
|
|
|
# return : keep waiting, no new queries
|
|
|
|
# return self._send_more_queries(outstanding) : send some more queries
|
|
|
|
# fire self._done(plaintext) : download successful
|
|
|
|
# raise exception : download fails
|
|
|
|
|
|
|
|
self.log(format="_check_for_done: running=%(running)s, decoding=%(decoding)s",
|
|
|
|
running=self._running, decoding=self._decoding,
|
|
|
|
level=log.NOISY)
|
|
|
|
if not self._running:
|
|
|
|
return
|
|
|
|
if self._decoding:
|
|
|
|
return
|
|
|
|
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
|
|
|
offsets_tuple) = self.verinfo
|
|
|
|
|
|
|
|
if len(self.shares) < k:
|
|
|
|
# we don't have enough shares yet
|
|
|
|
return self._maybe_send_more_queries(k)
|
2008-08-26 23:34:54 +00:00
|
|
|
if self._need_privkey:
|
|
|
|
# we got k shares, but none of them had a valid privkey. TODO:
|
|
|
|
# look further. Adding code to do this is a bit complicated, and
|
|
|
|
# I want to avoid that complication, and this should be pretty
|
|
|
|
# rare (k shares with bitflips in the enc_privkey but not in the
|
|
|
|
# data blocks). If we actually do get here, the subsequent repair
|
|
|
|
# will fail for lack of a privkey.
|
|
|
|
self.log("got k shares but still need_privkey, bummer",
|
|
|
|
level=log.WEIRD, umid="MdRHPA")
|
2008-04-11 21:31:16 +00:00
|
|
|
|
|
|
|
# we have enough to finish. All the shares have had their hashes
|
|
|
|
# checked, so if something fails at this point, we don't know how
|
|
|
|
# to fix it, so the download will fail.
|
|
|
|
|
|
|
|
self._decoding = True # avoid reentrancy
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.set_status("decoding")
|
|
|
|
now = time.time()
|
|
|
|
elapsed = now - self._started
|
|
|
|
self._status.timings["fetch"] = elapsed
|
2008-04-11 21:31:16 +00:00
|
|
|
|
|
|
|
d = defer.maybeDeferred(self._decode)
|
|
|
|
d.addCallback(self._decrypt, IV, self._node._readkey)
|
|
|
|
d.addBoth(self._done)
|
|
|
|
return d # purely for test convenience
|
|
|
|
|
|
|
|
def _maybe_send_more_queries(self, k):
|
|
|
|
# we don't have enough shares yet. Should we send out more queries?
|
|
|
|
# There are some number of queries outstanding, each for a single
|
|
|
|
# share. If we can generate 'needed_shares' additional queries, we do
|
|
|
|
# so. If we can't, then we know this file is a goner, and we raise
|
2008-04-15 23:08:32 +00:00
|
|
|
# NotEnoughSharesError.
|
2008-04-11 21:31:16 +00:00
|
|
|
self.log(format=("_maybe_send_more_queries, have=%(have)d, k=%(k)d, "
|
|
|
|
"outstanding=%(outstanding)d"),
|
|
|
|
have=len(self.shares), k=k,
|
|
|
|
outstanding=len(self._outstanding_queries),
|
|
|
|
level=log.NOISY)
|
|
|
|
|
|
|
|
remaining_shares = k - len(self.shares)
|
|
|
|
needed = remaining_shares - len(self._outstanding_queries)
|
|
|
|
if not needed:
|
|
|
|
# we have enough queries in flight already
|
|
|
|
|
|
|
|
# TODO: but if they've been in flight for a long time, and we
|
|
|
|
# have reason to believe that new queries might respond faster
|
|
|
|
# (i.e. we've seen other queries come back faster, then consider
|
|
|
|
# sending out new queries. This could help with peers which have
|
|
|
|
# silently gone away since the servermap was updated, for which
|
|
|
|
# we're still waiting for the 15-minute TCP disconnect to happen.
|
|
|
|
self.log("enough queries are in flight, no more are needed",
|
|
|
|
level=log.NOISY)
|
|
|
|
return
|
|
|
|
|
|
|
|
outstanding_shnums = set([shnum
|
|
|
|
for (peerid, shnum, started)
|
|
|
|
in self._outstanding_queries.values()])
|
|
|
|
# prefer low-numbered shares, they are more likely to be primary
|
|
|
|
available_shnums = sorted(self.remaining_sharemap.keys())
|
|
|
|
for shnum in available_shnums:
|
|
|
|
if shnum in outstanding_shnums:
|
|
|
|
# skip ones that are already in transit
|
|
|
|
continue
|
|
|
|
if shnum not in self.remaining_sharemap:
|
|
|
|
# no servers for that shnum. note that DictOfSets removes
|
|
|
|
# empty sets from the dict for us.
|
|
|
|
continue
|
|
|
|
peerid = list(self.remaining_sharemap[shnum])[0]
|
|
|
|
# get_data will remove that peerid from the sharemap, and add the
|
|
|
|
# query to self._outstanding_queries
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.set_status("Retrieving More Shares")
|
2008-04-11 21:31:16 +00:00
|
|
|
self.get_data(shnum, peerid)
|
|
|
|
needed -= 1
|
|
|
|
if not needed:
|
|
|
|
break
|
|
|
|
|
|
|
|
# at this point, we have as many outstanding queries as we can. If
|
|
|
|
# needed!=0 then we might not have enough to recover the file.
|
|
|
|
if needed:
|
|
|
|
format = ("ran out of peers: "
|
|
|
|
"have %(have)d shares (k=%(k)d), "
|
|
|
|
"%(outstanding)d queries in flight, "
|
2008-04-15 22:58:02 +00:00
|
|
|
"need %(need)d more, "
|
|
|
|
"found %(bad)d bad shares")
|
|
|
|
args = {"have": len(self.shares),
|
|
|
|
"k": k,
|
|
|
|
"outstanding": len(self._outstanding_queries),
|
|
|
|
"need": needed,
|
|
|
|
"bad": len(self._bad_shares),
|
|
|
|
}
|
2008-04-11 21:31:16 +00:00
|
|
|
self.log(format=format,
|
2008-08-26 01:57:59 +00:00
|
|
|
level=log.WEIRD, umid="ezTfjw", **args)
|
2008-04-15 23:08:32 +00:00
|
|
|
err = NotEnoughSharesError("%s, last failure: %s" %
|
2008-04-15 22:58:02 +00:00
|
|
|
(format % args, self._last_failure))
|
|
|
|
if self._bad_shares:
|
|
|
|
self.log("We found some bad shares this pass. You should "
|
|
|
|
"update the servermap and try again to check "
|
|
|
|
"more peers",
|
2008-08-26 01:57:59 +00:00
|
|
|
level=log.WEIRD, umid="EFkOlA")
|
2008-04-15 22:58:02 +00:00
|
|
|
err.servermap = self.servermap
|
|
|
|
raise err
|
2008-04-11 21:31:16 +00:00
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
def _decode(self):
|
2008-04-17 00:49:06 +00:00
|
|
|
started = time.time()
|
2008-04-11 21:31:16 +00:00
|
|
|
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
|
|
|
offsets_tuple) = self.verinfo
|
|
|
|
|
|
|
|
# shares_dict is a dict mapping shnum to share data, but the codec
|
|
|
|
# wants two lists.
|
|
|
|
shareids = []; shares = []
|
|
|
|
for shareid, share in self.shares.items():
|
|
|
|
shareids.append(shareid)
|
|
|
|
shares.append(share)
|
|
|
|
|
|
|
|
assert len(shareids) >= k, len(shareids)
|
|
|
|
# zfec really doesn't want extra shares
|
|
|
|
shareids = shareids[:k]
|
|
|
|
shares = shares[:k]
|
|
|
|
|
|
|
|
fec = codec.CRSDecoder()
|
download: refactor handling of URI Extension Block and crypttext hash tree, simplify things
Refactor into a class the logic of asking each server in turn until one of them gives an answer
that validates. It is called ValidatedThingObtainer.
Refactor the downloading and verification of the URI Extension Block into a class named
ValidatedExtendedURIProxy.
The new logic of validating UEBs is minimalist: it doesn't require the UEB to contain any
unncessary information, but of course it still accepts such information for backwards
compatibility (so that this new download code is able to download files uploaded with old, and
for that matter with current, upload code).
The new logic of validating UEBs follows the practice of doing all validation up front. This
practice advises one to isolate the validation of incoming data into one place, so that all of
the rest of the code can assume only valid data.
If any redundant information is present in the UEB+URI, the new code cross-checks and asserts
that it is all fully consistent. This closes some issues where the uploader could have
uploaded inconsistent redundant data, which would probably have caused the old downloader to
simply reject that download after getting a Python exception, but perhaps could have caused
greater harm to the old downloader.
I removed the notion of selecting an erasure codec from codec.py based on the string that was
passed in the UEB. Currently "crs" is the only such string that works, so
"_assert(codec_name == 'crs')" is simpler and more explicit. This is also in keeping with the
"validate up front" strategy -- now if someone sets a different string than "crs" in their UEB,
the downloader will reject the download in the "validate this UEB" function instead of in a
separate "select the codec instance" function.
I removed the code to check plaintext hashes and plaintext Merkle Trees. Uploaders do not
produce this information any more (since it potentially exposes confidential information about
the file), and the unit tests for it were disabled. The downloader before this patch would
check that plaintext hash or plaintext merkle tree if they were present, but not complain if
they were absent. The new downloader in this patch complains if they are present and doesn't
check them. (We might in the future re-introduce such hashes over the plaintext, but encrypt
the hashes which are stored in the UEB to preserve confidentiality. This would be a double-
check on the correctness of our own source code -- the current Merkle Tree over the ciphertext
is already sufficient to guarantee the integrity of the download unless there is a bug in our
Merkle Tree or AES implementation.)
This patch increases the lines-of-code count by 8 (from 17,770 to 17,778), and reduces the
uncovered-by-tests lines-of-code count by 24 (from 1408 to 1384). Those numbers would be more
meaningful if we omitted src/allmydata/util/ from the test-coverage statistics.
2008-12-05 15:17:54 +00:00
|
|
|
fec.set_params(segsize, k, N)
|
2008-04-11 21:31:16 +00:00
|
|
|
|
download: refactor handling of URI Extension Block and crypttext hash tree, simplify things
Refactor into a class the logic of asking each server in turn until one of them gives an answer
that validates. It is called ValidatedThingObtainer.
Refactor the downloading and verification of the URI Extension Block into a class named
ValidatedExtendedURIProxy.
The new logic of validating UEBs is minimalist: it doesn't require the UEB to contain any
unncessary information, but of course it still accepts such information for backwards
compatibility (so that this new download code is able to download files uploaded with old, and
for that matter with current, upload code).
The new logic of validating UEBs follows the practice of doing all validation up front. This
practice advises one to isolate the validation of incoming data into one place, so that all of
the rest of the code can assume only valid data.
If any redundant information is present in the UEB+URI, the new code cross-checks and asserts
that it is all fully consistent. This closes some issues where the uploader could have
uploaded inconsistent redundant data, which would probably have caused the old downloader to
simply reject that download after getting a Python exception, but perhaps could have caused
greater harm to the old downloader.
I removed the notion of selecting an erasure codec from codec.py based on the string that was
passed in the UEB. Currently "crs" is the only such string that works, so
"_assert(codec_name == 'crs')" is simpler and more explicit. This is also in keeping with the
"validate up front" strategy -- now if someone sets a different string than "crs" in their UEB,
the downloader will reject the download in the "validate this UEB" function instead of in a
separate "select the codec instance" function.
I removed the code to check plaintext hashes and plaintext Merkle Trees. Uploaders do not
produce this information any more (since it potentially exposes confidential information about
the file), and the unit tests for it were disabled. The downloader before this patch would
check that plaintext hash or plaintext merkle tree if they were present, but not complain if
they were absent. The new downloader in this patch complains if they are present and doesn't
check them. (We might in the future re-introduce such hashes over the plaintext, but encrypt
the hashes which are stored in the UEB to preserve confidentiality. This would be a double-
check on the correctness of our own source code -- the current Merkle Tree over the ciphertext
is already sufficient to guarantee the integrity of the download unless there is a bug in our
Merkle Tree or AES implementation.)
This patch increases the lines-of-code count by 8 (from 17,770 to 17,778), and reduces the
uncovered-by-tests lines-of-code count by 24 (from 1408 to 1384). Those numbers would be more
meaningful if we omitted src/allmydata/util/ from the test-coverage statistics.
2008-12-05 15:17:54 +00:00
|
|
|
self.log("params %s, we have %d shares" % ((segsize, k, N), len(shares)))
|
2008-04-11 21:31:16 +00:00
|
|
|
self.log("about to decode, shareids=%s" % (shareids,))
|
|
|
|
d = defer.maybeDeferred(fec.decode, shares, shareids)
|
|
|
|
def _done(buffers):
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.timings["decode"] = time.time() - started
|
2008-04-11 21:31:16 +00:00
|
|
|
self.log(" decode done, %d buffers" % len(buffers))
|
|
|
|
segment = "".join(buffers)
|
|
|
|
self.log(" joined length %d, datalength %d" %
|
|
|
|
(len(segment), datalength))
|
|
|
|
segment = segment[:datalength]
|
|
|
|
self.log(" segment len=%d" % len(segment))
|
|
|
|
return segment
|
|
|
|
def _err(f):
|
|
|
|
self.log(" decode failed: %s" % f)
|
|
|
|
return f
|
|
|
|
d.addCallback(_done)
|
|
|
|
d.addErrback(_err)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def _decrypt(self, crypttext, IV, readkey):
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.set_status("decrypting")
|
2008-04-11 21:31:16 +00:00
|
|
|
started = time.time()
|
|
|
|
key = hashutil.ssk_readkey_data_hash(IV, readkey)
|
|
|
|
decryptor = AES(key)
|
|
|
|
plaintext = decryptor.process(crypttext)
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.timings["decrypt"] = time.time() - started
|
2008-04-11 21:31:16 +00:00
|
|
|
return plaintext
|
|
|
|
|
|
|
|
def _done(self, res):
|
|
|
|
if not self._running:
|
|
|
|
return
|
|
|
|
self._running = False
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.set_active(False)
|
|
|
|
self._status.timings["total"] = time.time() - self._started
|
2008-04-11 21:31:16 +00:00
|
|
|
# res is either the new contents, or a Failure
|
|
|
|
if isinstance(res, failure.Failure):
|
2008-04-22 00:27:50 +00:00
|
|
|
self.log("Retrieve done, with failure", failure=res,
|
|
|
|
level=log.UNUSUAL)
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.set_status("Failed")
|
2008-04-11 21:31:16 +00:00
|
|
|
else:
|
2008-04-16 22:22:30 +00:00
|
|
|
self.log("Retrieve done, success!")
|
2008-04-17 00:49:06 +00:00
|
|
|
self._status.set_status("Done")
|
|
|
|
self._status.set_progress(1.0)
|
2008-04-11 21:31:16 +00:00
|
|
|
# remember the encoding parameters, use them again next time
|
|
|
|
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
|
|
|
|
offsets_tuple) = self.verinfo
|
|
|
|
self._node._populate_required_shares(k)
|
|
|
|
self._node._populate_total_shares(N)
|
|
|
|
eventually(self._done_deferred.callback, res)
|
|
|
|
|