mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-10 22:43:52 +00:00
Retrieve: rewrite flow-control: use a top-level loop() to catch all errors
This ought to close the potential for dropped errors and hanging downloads. Verify needs to be examined, I may have broken it, although all tests pass.
This commit is contained in:
parent
53bbc1d06a
commit
df07060f93
@ -228,8 +228,6 @@ class Retrieve:
|
||||
self._setup_encoding_parameters()
|
||||
self.log("starting download")
|
||||
self._started_fetching = time.time()
|
||||
d = self._add_active_peers()
|
||||
# ...
|
||||
# The download process beyond this is a state machine.
|
||||
# _add_active_peers will select the peers that we want to use
|
||||
# for the download, and then attempt to start downloading. After
|
||||
@ -238,8 +236,17 @@ class Retrieve:
|
||||
# peers before downloading all of the segments, _done_deferred
|
||||
# will errback. Otherwise, it will eventually callback with the
|
||||
# contents of the mutable file.
|
||||
self.loop()
|
||||
return self._done_deferred
|
||||
|
||||
def loop(self):
|
||||
d = fireEventually(None) # avoid #237 recursion limit problem
|
||||
d.addCallback(lambda ign: self._activate_enough_peers())
|
||||
d.addCallback(lambda ign: self._download_current_segment())
|
||||
# when we're done, _download_current_segment will call _done. If we
|
||||
# aren't, it will call loop() again.
|
||||
d.addErrback(self._error)
|
||||
|
||||
def _setup_download(self):
|
||||
self._started = time.time()
|
||||
self._status.set_status("Retrieving Shares")
|
||||
@ -421,7 +428,7 @@ class Retrieve:
|
||||
|
||||
self._current_segment = self._start_segment
|
||||
|
||||
def _add_active_peers(self):
|
||||
def _activate_enough_peers(self):
|
||||
"""
|
||||
I populate self._active_readers with enough active readers to
|
||||
retrieve the contents of this mutable file. I am called before
|
||||
@ -449,35 +456,36 @@ class Retrieve:
|
||||
# of scope for MDMF, though.)
|
||||
|
||||
# We need at least self._required_shares readers to download a
|
||||
# segment.
|
||||
# segment. If we're verifying, we need all shares.
|
||||
if self._verify:
|
||||
needed = self._total_shares
|
||||
else:
|
||||
needed = self._required_shares - len(self._active_readers)
|
||||
needed = self._required_shares
|
||||
# XXX: Why don't format= log messages work here?
|
||||
self.log("adding %d peers to the active peers list" % needed)
|
||||
|
||||
if len(self._active_readers) >= needed:
|
||||
# enough shares are active
|
||||
return
|
||||
|
||||
more = needed - len(self._active_readers)
|
||||
known_shnums = set(self.remaining_sharemap.keys())
|
||||
used_shnums = set([r.shnum for r in self._active_readers])
|
||||
unused_shnums = known_shnums - used_shnums
|
||||
# We favor lower numbered shares, since FEC is faster with
|
||||
# primary shares than with other shares, and lower-numbered
|
||||
# shares are more likely to be primary than higher numbered
|
||||
# shares.
|
||||
active_shnums = set(sorted(self.remaining_sharemap.keys()))
|
||||
# We shouldn't consider adding shares that we already have; this
|
||||
# will cause problems later.
|
||||
active_shnums -= set([reader.shnum for reader in self._active_readers])
|
||||
active_shnums = list(active_shnums)[:needed]
|
||||
if len(active_shnums) < needed and not self._verify:
|
||||
new_shnums = sorted(unused_shnums)[:more]
|
||||
if len(new_shnums) < more and not self._verify:
|
||||
# We don't have enough readers to retrieve the file; fail.
|
||||
return self._failed()
|
||||
self._raise_notenoughshareserror()
|
||||
|
||||
for shnum in active_shnums:
|
||||
self._active_readers.append(self.readers[shnum])
|
||||
self.log("added reader for share %d" % shnum)
|
||||
assert len(self._active_readers) >= self._required_shares
|
||||
new_readers = set(self._active_readers) - self._validated_readers
|
||||
|
||||
for reader in new_readers:
|
||||
for shnum in new_shnums:
|
||||
reader = self.readers[shnum]
|
||||
self._active_readers.append(reader)
|
||||
self._validated_readers.add(reader)
|
||||
self.log("added reader for share %d" % shnum)
|
||||
# Each time we validate a reader, we check to see if we need the
|
||||
# private key. If we do, we politely ask for it and then continue
|
||||
# computing. If we find that we haven't gotten it at the end of
|
||||
@ -487,8 +495,7 @@ class Retrieve:
|
||||
d.addCallback(self._try_to_validate_privkey, reader)
|
||||
# XXX: don't just drop the Deferred. We need error-reporting
|
||||
# but not flow-control here.
|
||||
return self._download_current_segment()
|
||||
|
||||
assert len(self._active_readers) >= self._required_shares
|
||||
|
||||
def _try_to_validate_prefix(self, prefix, reader):
|
||||
"""
|
||||
@ -588,23 +595,16 @@ class Retrieve:
|
||||
that this Retrieve is currently responsible for downloading.
|
||||
"""
|
||||
assert len(self._active_readers) >= self._required_shares
|
||||
if self._current_segment <= self._last_segment:
|
||||
d = self._process_segment(self._current_segment)
|
||||
else:
|
||||
d = defer.succeed(None)
|
||||
d.addBoth(self._turn_barrier)
|
||||
d.addCallback(self._check_for_done)
|
||||
if self._current_segment > self._last_segment:
|
||||
# No more segments to download, we're done.
|
||||
self.log("got plaintext, done")
|
||||
return self._done()
|
||||
self.log("on segment %d of %d" %
|
||||
(self._current_segment + 1, self._num_segments))
|
||||
d = self._process_segment(self._current_segment)
|
||||
d.addCallback(lambda ign: self.loop())
|
||||
return d
|
||||
|
||||
|
||||
def _turn_barrier(self, result):
|
||||
"""
|
||||
I help the download process avoid the recursion limit issues
|
||||
discussed in #237.
|
||||
"""
|
||||
return fireEventually(result)
|
||||
|
||||
|
||||
def _process_segment(self, segnum):
|
||||
"""
|
||||
I download, validate, decode, and decrypt one segment of the
|
||||
@ -958,50 +958,11 @@ class Retrieve:
|
||||
self._need_privkey = False
|
||||
|
||||
|
||||
def _check_for_done(self, res):
|
||||
"""
|
||||
I check to see if this Retrieve object has successfully finished
|
||||
its work.
|
||||
|
||||
I can exit in the following ways:
|
||||
- If there are no more segments to download, then I exit by
|
||||
causing self._done_deferred to fire with the plaintext
|
||||
content requested by the caller.
|
||||
- If there are still segments to be downloaded, and there
|
||||
are enough active readers (readers which have not broken
|
||||
and have not given us corrupt data) to continue
|
||||
downloading, I send control back to
|
||||
_download_current_segment.
|
||||
- If there are still segments to be downloaded but there are
|
||||
not enough active peers to download them, I ask
|
||||
_add_active_peers to add more peers. If it is successful,
|
||||
it will call _download_current_segment. If there are not
|
||||
enough peers to retrieve the file, then that will cause
|
||||
_done_deferred to errback.
|
||||
"""
|
||||
self.log("checking for doneness")
|
||||
if self._current_segment > self._last_segment:
|
||||
# No more segments to download, we're done.
|
||||
self.log("got plaintext, done")
|
||||
return self._done()
|
||||
|
||||
if len(self._active_readers) >= self._required_shares:
|
||||
# More segments to download, but we have enough good peers
|
||||
# in self._active_readers that we can do that without issue,
|
||||
# so go nab the next segment.
|
||||
self.log("not done yet: on segment %d of %d" % \
|
||||
(self._current_segment + 1, self._num_segments))
|
||||
return self._download_current_segment()
|
||||
|
||||
self.log("not done yet: on segment %d of %d, need to add peers" % \
|
||||
(self._current_segment + 1, self._num_segments))
|
||||
return self._add_active_peers()
|
||||
|
||||
|
||||
def _done(self):
|
||||
"""
|
||||
I am called by _check_for_done when the download process has
|
||||
finished successfully. After making some useful logging
|
||||
I am called by _download_current_segment when the download process
|
||||
has finished successfully. After making some useful logging
|
||||
statements, I return the decrypted contents to the owner of this
|
||||
Retrieve object through self._done_deferred.
|
||||
"""
|
||||
@ -1029,36 +990,34 @@ class Retrieve:
|
||||
eventually(self._done_deferred.callback, ret)
|
||||
|
||||
|
||||
def _failed(self):
|
||||
def _raise_notenoughshareserror(self):
|
||||
"""
|
||||
I am called by _add_active_peers when there are not enough
|
||||
I am called by _activate_enough_peers when there are not enough
|
||||
active peers left to complete the download. After making some
|
||||
useful logging statements, I return an exception to that effect
|
||||
useful logging statements, I throw an exception to that effect
|
||||
to the caller of this Retrieve object through
|
||||
self._done_deferred.
|
||||
"""
|
||||
|
||||
format = ("ran out of peers: "
|
||||
"have %(have)d of %(total)d segments "
|
||||
"found %(bad)d bad shares "
|
||||
"encoding %(k)d-of-%(n)d")
|
||||
args = {"have": self._current_segment,
|
||||
"total": self._num_segments,
|
||||
"need": self._last_segment,
|
||||
"k": self._required_shares,
|
||||
"n": self._total_shares,
|
||||
"bad": len(self._bad_shares)}
|
||||
raise NotEnoughSharesError("%s, last failure: %s" %
|
||||
(format % args, str(self._last_failure)))
|
||||
|
||||
def _error(self, f):
|
||||
# all errors, including NotEnoughSharesError, land here
|
||||
self._running = False
|
||||
self._status.set_active(False)
|
||||
now = time.time()
|
||||
self._status.timings['total'] = now - self._started
|
||||
self._status.timings['fetch'] = now - self._started_fetching
|
||||
self._status.set_status("Failed")
|
||||
|
||||
if self._verify:
|
||||
ret = list(self._bad_shares)
|
||||
else:
|
||||
format = ("ran out of peers: "
|
||||
"have %(have)d of %(total)d segments "
|
||||
"found %(bad)d bad shares "
|
||||
"encoding %(k)d-of-%(n)d")
|
||||
args = {"have": self._current_segment,
|
||||
"total": self._num_segments,
|
||||
"need": self._last_segment,
|
||||
"k": self._required_shares,
|
||||
"n": self._total_shares,
|
||||
"bad": len(self._bad_shares)}
|
||||
e = NotEnoughSharesError("%s, last failure: %s" % \
|
||||
(format % args, str(self._last_failure)))
|
||||
f = failure.Failure(e)
|
||||
ret = f
|
||||
eventually(self._done_deferred.callback, ret)
|
||||
eventually(self._done_deferred.errback, f)
|
||||
|
Loading…
x
Reference in New Issue
Block a user