SegmentFetcher: use new diversity-seeking share-selection algorithm, and

deliver all shares at once instead of feeding them out one-at-a-time.

Also fix distribution of real-number-of-segments information: now all
CommonShares (not just the ones used for the first segment) get a
correctly-sized hashtree. Previously, the late ones might not, which would
make them crash and get dropped (causing the download to fail if the initial
set were insufficient, perhaps because one of their servers went away).

Update tests, add some TODO notes, improve variable names and comments.
Improve logging: add logparents, set more appropriate levels.
This commit is contained in:
Brian Warner 2010-08-31 18:37:02 -07:00
parent c89a464510
commit 00e9e4e676
8 changed files with 689 additions and 245 deletions

View File

@ -4,8 +4,8 @@ from foolscap.api import eventually
from allmydata.interfaces import NotEnoughSharesError, NoSharesError
from allmydata.util import log
from allmydata.util.dictutil import DictOfSets
from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \
BADSEGNUM, BadSegmentNumberError
from common import OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM, \
BadSegmentNumberError
class SegmentFetcher:
"""I am responsible for acquiring blocks for a single segment. I will use
@ -22,35 +22,42 @@ class SegmentFetcher:
will shut down and do no further work. My parent can also call my stop()
method to have me shut down early."""
def __init__(self, node, segnum, k):
def __init__(self, node, segnum, k, logparent):
self._node = node # _Node
self.segnum = segnum
self._k = k
self._shares = {} # maps non-dead Share instance to a state, one of
# (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT).
# State transition map is:
# AVAILABLE -(send-read)-> PENDING
# PENDING -(timer)-> OVERDUE
# PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
# OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
# If a share becomes DEAD, it is removed from the
# dict. If it becomes BADSEGNUM, the whole fetch is
# terminated.
self._shares = [] # unused Share instances, sorted by "goodness"
# (RTT), then shnum. This is populated when DYHB
# responses arrive, or (for later segments) at
# startup. We remove shares from it when we call
# sh.get_block() on them.
self._shares_from_server = DictOfSets() # maps serverid to set of
# Shares on that server for
# which we have outstanding
# get_block() calls.
self._max_shares_per_server = 1 # how many Shares we're allowed to
# pull from each server. This starts
# at 1 and grows if we don't have
# sufficient diversity.
self._active_share_map = {} # maps shnum to outstanding (and not
# OVERDUE) Share that provides it.
self._overdue_share_map = DictOfSets() # shares in the OVERDUE state
self._lp = logparent
self._share_observers = {} # maps Share to EventStreamObserver for
# active ones
self._shnums = DictOfSets() # maps shnum to the shares that provide it
self._blocks = {} # maps shnum to validated block data
self._no_more_shares = False
self._bad_segnum = False
self._last_failure = None
self._running = True
def stop(self):
log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix,
level=log.NOISY, umid="LWyqpg")
level=log.NOISY, parent=self._lp, umid="LWyqpg")
self._cancel_all_requests()
self._running = False
self._shares.clear() # let GC work # ??? XXX
# help GC ??? XXX
del self._shares, self._shares_from_server, self._active_share_map
del self._share_observers
# called by our parent _Node
@ -59,9 +66,8 @@ class SegmentFetcher:
# called when ShareFinder locates a new share, and when a non-initial
# segment fetch is started and we already know about shares from the
# previous segment
for s in shares:
self._shares[s] = AVAILABLE
self._shnums.add(s._shnum, s)
self._shares.extend(shares)
self._shares.sort(key=lambda s: (s._dyhb_rtt, s._shnum) )
eventually(self.loop)
def no_more_shares(self):
@ -71,15 +77,6 @@ class SegmentFetcher:
# internal methods
def _count_shnums(self, *states):
"""shnums for which at least one state is in the following list"""
shnums = []
for shnum,shares in self._shnums.iteritems():
matches = [s for s in shares if self._shares.get(s) in states]
if matches:
shnums.append(shnum)
return len(shnums)
def loop(self):
try:
# if any exception occurs here, kill the download
@ -92,7 +89,8 @@ class SegmentFetcher:
k = self._k
if not self._running:
return
if self._bad_segnum:
numsegs, authoritative = self._node.get_num_segments()
if authoritative and self.segnum >= numsegs:
# oops, we were asking for a segment number beyond the end of the
# file. This is an error.
self.stop()
@ -102,98 +100,125 @@ class SegmentFetcher:
self._node.fetch_failed(self, f)
return
#print "LOOP", self._blocks.keys(), "active:", self._active_share_map, "overdue:", self._overdue_share_map, "unused:", self._shares
# Should we sent out more requests?
while len(set(self._blocks.keys())
| set(self._active_share_map.keys())
) < k:
# we don't have data or active requests for enough shares. Are
# there any unused shares we can start using?
(sent_something, want_more_diversity) = self._find_and_use_share()
if sent_something:
# great. loop back around in case we need to send more.
continue
if want_more_diversity:
# we could have sent something if we'd been allowed to pull
# more shares per server. Increase the limit and try again.
self._max_shares_per_server += 1
log.msg("SegmentFetcher(%s) increasing diversity limit to %d"
% (self._node._si_prefix, self._max_shares_per_server),
level=log.NOISY, umid="xY2pBA")
# Also ask for more shares, in the hopes of achieving better
# diversity for the next segment.
self._ask_for_more_shares()
continue
# we need more shares than the ones in self._shares to make
# progress
self._ask_for_more_shares()
if self._no_more_shares:
# But there are no more shares to be had. If we're going to
# succeed, it will be with the shares we've already seen.
# Will they be enough?
if len(set(self._blocks.keys())
| set(self._active_share_map.keys())
| set(self._overdue_share_map.keys())
) < k:
# nope. bail.
self._no_shares_error() # this calls self.stop()
return
# our outstanding or overdue requests may yet work.
# more shares may be coming. Wait until then.
return
# are we done?
if self._count_shnums(COMPLETE) >= k:
if len(set(self._blocks.keys())) >= k:
# yay!
self.stop()
self._node.process_blocks(self.segnum, self._blocks)
return
# we may have exhausted everything
if (self._no_more_shares and
self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k):
# no more new shares are coming, and the remaining hopeful shares
# aren't going to be enough. boo!
def _no_shares_error(self):
if not (self._shares or self._active_share_map or
self._overdue_share_map or self._blocks):
format = ("no shares (need %(k)d)."
" Last failure: %(last_failure)s")
args = { "k": self._k,
"last_failure": self._last_failure }
error = NoSharesError
else:
format = ("ran out of shares: complete=%(complete)s"
" pending=%(pending)s overdue=%(overdue)s"
" unused=%(unused)s need %(k)d."
" Last failure: %(last_failure)s")
def join(shnums): return ",".join(["sh%d" % shnum
for shnum in sorted(shnums)])
pending_s = ",".join([str(sh)
for sh in self._active_share_map.values()])
overdue = set()
for shares in self._overdue_share_map.values():
overdue |= shares
overdue_s = ",".join([str(sh) for sh in overdue])
args = {"complete": join(self._blocks.keys()),
"pending": pending_s,
"overdue": overdue_s,
# 'unused' should be zero
"unused": ",".join([str(sh) for sh in self._shares]),
"k": self._k,
"last_failure": self._last_failure,
}
error = NotEnoughSharesError
log.msg(format=format,
level=log.UNUSUAL, parent=self._lp, umid="1DsnTg",
**args)
e = error(format % args)
f = Failure(e)
self.stop()
self._node.fetch_failed(self, f)
log.msg("share states: %r" % (self._shares,),
level=log.NOISY, umid="0ThykQ")
if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0:
format = ("no shares (need %(k)d)."
" Last failure: %(last_failure)s")
args = { "k": k,
"last_failure": self._last_failure }
error = NoSharesError
else:
format = ("ran out of shares: %(complete)d complete,"
" %(pending)d pending, %(overdue)d overdue,"
" %(unused)d unused, need %(k)d."
" Last failure: %(last_failure)s")
args = {"complete": self._count_shnums(COMPLETE),
"pending": self._count_shnums(PENDING),
"overdue": self._count_shnums(OVERDUE),
# 'unused' should be zero
"unused": self._count_shnums(AVAILABLE),
"k": k,
"last_failure": self._last_failure,
}
error = NotEnoughSharesError
log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
e = error(format % args)
f = Failure(e)
self.stop()
self._node.fetch_failed(self, f)
return
def _find_and_use_share(self):
sent_something = False
want_more_diversity = False
for sh in self._shares: # find one good share to fetch
shnum = sh._shnum ; serverid = sh._peerid
if shnum in self._blocks:
continue # don't request data we already have
if shnum in self._active_share_map:
# note: OVERDUE shares are removed from _active_share_map
# and added to _overdue_share_map instead.
continue # don't send redundant requests
sfs = self._shares_from_server
if len(sfs.get(serverid,set())) >= self._max_shares_per_server:
# don't pull too much from a single server
want_more_diversity = True
continue
# ok, we can use this share
self._shares.remove(sh)
self._active_share_map[shnum] = sh
self._shares_from_server.add(serverid, sh)
self._start_share(sh, shnum)
sent_something = True
break
return (sent_something, want_more_diversity)
# nope, not done. Are we "block-hungry" (i.e. do we want to send out
# more read requests, or do we think we have enough in flight
# already?)
while self._count_shnums(PENDING, COMPLETE) < k:
# we're hungry.. are there any unused shares?
sent = self._send_new_request()
if not sent:
break
def _start_share(self, share, shnum):
self._share_observers[share] = o = share.get_block(self.segnum)
o.subscribe(self._block_request_activity, share=share, shnum=shnum)
# ok, now are we "share-hungry" (i.e. do we have enough known shares
# to make us happy, or should we ask the ShareFinder to get us more?)
if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k:
# we're hungry for more shares
def _ask_for_more_shares(self):
if not self._no_more_shares:
self._node.want_more_shares()
# that will trigger the ShareFinder to keep looking
def _find_one(self, shares, state):
# TODO could choose fastest, or avoid servers already in use
for s in shares:
if self._shares[s] == state:
return s
# can never get here, caller has assert in case of code bug
def _send_new_request(self):
# TODO: this is probably O(k^2), and we're called from a range(k)
# loop, so O(k^3)
# this first loop prefers sh0, then sh1, sh2, etc
for shnum,shares in sorted(self._shnums.iteritems()):
states = [self._shares[s] for s in shares]
if COMPLETE in states or PENDING in states:
# don't send redundant requests
continue
if AVAILABLE not in states:
# no candidates for this shnum, move on
continue
# here's a candidate. Send a request.
s = self._find_one(shares, AVAILABLE)
assert s
self._shares[s] = PENDING
self._share_observers[s] = o = s.get_block(self.segnum)
o.subscribe(self._block_request_activity, share=s, shnum=shnum)
# TODO: build up a list of candidates, then walk through the
# list, sending requests to the most desireable servers,
# re-checking our block-hunger each time. For non-initial segment
# fetches, this would let us stick with faster servers.
return True
# nothing was sent: don't call us again until you have more shares to
# work with, or one of the existing shares has been declared OVERDUE
return False
# that will trigger the ShareFinder to keep looking, and call our
# add_shares() or no_more_shares() later.
def _cancel_all_requests(self):
for o in self._share_observers.values():
@ -207,27 +232,33 @@ class SegmentFetcher:
log.msg("SegmentFetcher(%s)._block_request_activity:"
" Share(sh%d-on-%s) -> %s" %
(self._node._si_prefix, shnum, share._peerid_s, state),
level=log.NOISY, umid="vilNWA")
# COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
level=log.NOISY, parent=self._lp, umid="vilNWA")
# COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal. Remove the share
# from all our tracking lists.
if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
self._share_observers.pop(share, None)
self._shares_from_server.discard(shnum, share)
if self._active_share_map.get(shnum) is share:
del self._active_share_map[shnum]
self._overdue_share_map.discard(shnum, share)
if state is COMPLETE:
# 'block' is fully validated
self._shares[share] = COMPLETE
# 'block' is fully validated and complete
self._blocks[shnum] = block
elif state is OVERDUE:
self._shares[share] = OVERDUE
if state is OVERDUE:
# no longer active, but still might complete
del self._active_share_map[shnum]
self._overdue_share_map.add(shnum, share)
# OVERDUE is not terminal: it will eventually transition to
# COMPLETE, CORRUPT, or DEAD.
elif state is CORRUPT:
self._shares[share] = CORRUPT
elif state is DEAD:
del self._shares[share]
self._shnums[shnum].remove(share)
if state is DEAD:
self._last_failure = f
elif state is BADSEGNUM:
self._shares[share] = BADSEGNUM # ???
self._bad_segnum = True
if state is BADSEGNUM:
# our main loop will ask the DownloadNode each time for the
# number of segments, so we'll deal with this in the top of
# _do_loop
pass
eventually(self.loop)

View File

@ -35,11 +35,9 @@ class ShareFinder:
self._storage_broker = storage_broker
self.share_consumer = self.node = node
self.max_outstanding_requests = max_outstanding_requests
self._hungry = False
self._commonshares = {} # shnum to CommonShare instance
self.undelivered_shares = []
self.pending_requests = set()
self.overdue_requests = set() # subset of pending_requests
self.overdue_timers = {}
@ -52,6 +50,12 @@ class ShareFinder:
si=self._si_prefix,
level=log.NOISY, parent=logparent, umid="2xjj2A")
def update_num_segments(self):
(numsegs, authoritative) = self.node.get_num_segments()
assert authoritative
for cs in self._commonshares.values():
cs.set_authoritative_num_segments(numsegs)
def start_finding_servers(self):
# don't get servers until somebody uses us: creating the
# ImmutableFileNode should not cause work to happen yet. Test case is
@ -83,30 +87,16 @@ class ShareFinder:
# internal methods
def loop(self):
undelivered_s = ",".join(["sh%d@%s" %
(s._shnum, idlib.shortnodeid_b2a(s._peerid))
for s in self.undelivered_shares])
pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
for rt in self.pending_requests]) # sort?
self.log(format="ShareFinder loop: running=%(running)s"
" hungry=%(hungry)s, undelivered=%(undelivered)s,"
" pending=%(pending)s",
running=self.running, hungry=self._hungry,
undelivered=undelivered_s, pending=pending_s,
" hungry=%(hungry)s, pending=%(pending)s",
running=self.running, hungry=self._hungry, pending=pending_s,
level=log.NOISY, umid="kRtS4Q")
if not self.running:
return
if not self._hungry:
return
if self.undelivered_shares:
sh = self.undelivered_shares.pop(0)
# they will call hungry() again if they want more
self._hungry = False
self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)",
shnum=sh._shnum, peerid=sh._peerid_s,
level=log.NOISY, umid="2n1qQw")
eventually(self.share_consumer.got_shares, [sh])
return
non_overdue = self.pending_requests - self.overdue_requests
if len(non_overdue) >= self.max_outstanding_requests:
@ -146,14 +136,16 @@ class ShareFinder:
lp = self.log(format="sending DYHB to [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, umid="Io7pyg")
d_ev = self._download_status.add_dyhb_sent(peerid, now())
time_sent = now()
d_ev = self._download_status.add_dyhb_sent(peerid, time_sent)
# TODO: get the timer from a Server object, it knows best
self.overdue_timers[req] = reactor.callLater(self.OVERDUE_TIMEOUT,
self.overdue, req)
d = rref.callRemote("get_buckets", self._storage_index)
d.addBoth(incidentally, self._request_retired, req)
d.addCallbacks(self._got_response, self._got_error,
callbackArgs=(rref.version, peerid, req, d_ev, lp),
callbackArgs=(rref.version, peerid, req, d_ev,
time_sent, lp),
errbackArgs=(peerid, req, d_ev, lp))
d.addErrback(log.err, format="error in send_request",
level=log.WEIRD, parent=lp, umid="rpdV0w")
@ -172,33 +164,37 @@ class ShareFinder:
self.overdue_requests.add(req)
eventually(self.loop)
def _got_response(self, buckets, server_version, peerid, req, d_ev, lp):
def _got_response(self, buckets, server_version, peerid, req, d_ev,
time_sent, lp):
shnums = sorted([shnum for shnum in buckets])
d_ev.finished(shnums, now())
if buckets:
shnums_s = ",".join([str(shnum) for shnum in shnums])
self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, parent=lp, umid="0fcEZw")
else:
time_received = now()
d_ev.finished(shnums, time_received)
dyhb_rtt = time_received - time_sent
if not buckets:
self.log(format="no shares from [%(peerid)s]",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, parent=lp, umid="U7d4JA")
if self.node.num_segments is None:
best_numsegs = self.node.guessed_num_segments
else:
best_numsegs = self.node.num_segments
return
shnums_s = ",".join([str(shnum) for shnum in shnums])
self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY, parent=lp, umid="0fcEZw")
shares = []
for shnum, bucket in buckets.iteritems():
self._create_share(best_numsegs, shnum, bucket, server_version,
peerid)
s = self._create_share(shnum, bucket, server_version, peerid,
dyhb_rtt)
shares.append(s)
self._deliver_shares(shares)
def _create_share(self, best_numsegs, shnum, bucket, server_version,
peerid):
def _create_share(self, shnum, bucket, server_version, peerid, dyhb_rtt):
if shnum in self._commonshares:
cs = self._commonshares[shnum]
else:
cs = CommonShare(best_numsegs, self._si_prefix, shnum,
numsegs, authoritative = self.node.get_num_segments()
cs = CommonShare(numsegs, self._si_prefix, shnum,
self._node_logparent)
if authoritative:
cs.set_authoritative_num_segments(numsegs)
# Share._get_satisfaction is responsible for updating
# CommonShare.set_numsegs after we know the UEB. Alternatives:
# 1: d = self.node.get_num_segments()
@ -214,9 +210,17 @@ class ShareFinder:
# Yuck.
self._commonshares[shnum] = cs
s = Share(bucket, server_version, self.verifycap, cs, self.node,
self._download_status, peerid, shnum,
self._download_status, peerid, shnum, dyhb_rtt,
self._node_logparent)
self.undelivered_shares.append(s)
return s
def _deliver_shares(self, shares):
# they will call hungry() again if they want more
self._hungry = False
shares_s = ",".join([str(sh) for sh in shares])
self.log(format="delivering shares: %s" % shares_s,
level=log.NOISY, umid="2n1qQw")
eventually(self.share_consumer.got_shares, shares)
def _got_error(self, f, peerid, req, d_ev, lp):
d_ev.finished("error", now())

View File

@ -72,7 +72,7 @@ class DownloadNode:
# things to track callers that want data
# _segment_requests can have duplicates
self._segment_requests = [] # (segnum, d, cancel_handle)
self._segment_requests = [] # (segnum, d, cancel_handle, logparent)
self._active_segment = None # a SegmentFetcher, with .segnum
self._segsize_observers = observer.OneShotObserverList()
@ -81,7 +81,8 @@ class DownloadNode:
# for each read() call. Segmentation and get_segment() messages are
# associated with the read() call, everything else is tied to the
# _Node's log entry.
lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d,"
lp = log.msg(format="Immutable.DownloadNode(%(si)s) created:"
" size=%(size)d,"
" guessed_segsize=%(guessed_segsize)d,"
" guessed_numsegs=%(guessed_numsegs)d",
si=self._si_prefix, size=verifycap.size,
@ -103,9 +104,10 @@ class DownloadNode:
# as with CommonShare, our ciphertext_hash_tree is a stub until we
# get the real num_segments
self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
self.ciphertext_hash_tree_leaves = self.guessed_num_segments
def __repr__(self):
return "Imm_Node(%s)" % (self._si_prefix,)
return "ImmutableDownloadNode(%s)" % (self._si_prefix,)
def stop(self):
# called by the Terminator at shutdown, mostly for tests
@ -175,14 +177,14 @@ class DownloadNode:
The Deferred can also errback with other fatal problems, such as
NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
"""
log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
si=base32.b2a(self._verifycap.storage_index)[:8],
segnum=segnum,
level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
lp = log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
si=base32.b2a(self._verifycap.storage_index)[:8],
segnum=segnum,
level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
self._download_status.add_segment_request(segnum, now())
d = defer.Deferred()
c = Cancel(self._cancel_request)
self._segment_requests.append( (segnum, d, c) )
self._segment_requests.append( (segnum, d, c, lp) )
self._start_new_segment()
return (d, c)
@ -208,10 +210,11 @@ class DownloadNode:
if self._active_segment is None and self._segment_requests:
segnum = self._segment_requests[0][0]
k = self._verifycap.needed_shares
lp = self._segment_requests[0][3]
log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
node=repr(self), segnum=segnum,
level=log.NOISY, umid="wAlnHQ")
self._active_segment = fetcher = SegmentFetcher(self, segnum, k)
level=log.NOISY, parent=lp, umid="wAlnHQ")
self._active_segment = fetcher = SegmentFetcher(self, segnum, k, lp)
active_shares = [s for s in self._shares if s.is_alive()]
fetcher.add_shares(active_shares) # this triggers the loop
@ -234,13 +237,17 @@ class DownloadNode:
h = hashutil.uri_extension_hash(UEB_s)
if h != self._verifycap.uri_extension_hash:
raise BadHashError
UEB_dict = uri.unpack_extension(UEB_s)
self._parse_and_store_UEB(UEB_dict) # sets self._stuff
self._parse_and_store_UEB(UEB_s) # sets self._stuff
# TODO: a malformed (but authentic) UEB could throw an assertion in
# _parse_and_store_UEB, and we should abandon the download.
self.have_UEB = True
def _parse_and_store_UEB(self, d):
# inform the ShareFinder about our correct number of segments. This
# will update the block-hash-trees in all existing CommonShare
# instances, and will populate new ones with the correct value.
self._sharefinder.update_num_segments()
def _parse_and_store_UEB(self, UEB_s):
# Note: the UEB contains needed_shares and total_shares. These are
# redundant and inferior (the filecap contains the authoritative
# values). However, because it is possible to encode the same file in
@ -252,8 +259,11 @@ class DownloadNode:
# therefore, we ignore d['total_shares'] and d['needed_shares'].
d = uri.unpack_extension(UEB_s)
log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
ueb=repr(d), vcap=self._verifycap.to_string(),
ueb=repr(uri.unpack_extension_readable(UEB_s)),
vcap=self._verifycap.to_string(),
level=log.NOISY, parent=self._lp, umid="cVqZnA")
k, N = self._verifycap.needed_shares, self._verifycap.total_shares
@ -292,6 +302,7 @@ class DownloadNode:
# shares of file B. self.ciphertext_hash_tree was a guess before:
# this is where we create it for real.
self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
self.ciphertext_hash_tree_leaves = self.num_segments
self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
self.share_hash_tree.set_hashes({0: d['share_root_hash']})
@ -344,9 +355,15 @@ class DownloadNode:
% (hashnum, len(self.share_hash_tree)))
self.share_hash_tree.set_hashes(share_hashes)
def get_desired_ciphertext_hashes(self, segnum):
if segnum < self.ciphertext_hash_tree_leaves:
return self.ciphertext_hash_tree.needed_hashes(segnum,
include_leaf=True)
return []
def get_needed_ciphertext_hashes(self, segnum):
cht = self.ciphertext_hash_tree
return cht.needed_hashes(segnum, include_leaf=True)
def process_ciphertext_hashes(self, hashes):
assert self.num_segments is not None
# this may raise BadHashError or NotEnoughHashesError
@ -457,7 +474,7 @@ class DownloadNode:
def _extract_requests(self, segnum):
"""Remove matching requests and return their (d,c) tuples so that the
caller can retire them."""
retire = [(d,c) for (segnum0, d, c) in self._segment_requests
retire = [(d,c) for (segnum0, d, c, lp) in self._segment_requests
if segnum0 == segnum]
self._segment_requests = [t for t in self._segment_requests
if t[0] != segnum]
@ -466,10 +483,18 @@ class DownloadNode:
def _cancel_request(self, c):
self._segment_requests = [t for t in self._segment_requests
if t[2] != c]
segnums = [segnum for (segnum,d,c) in self._segment_requests]
segnums = [segnum for (segnum,d,c,lp) in self._segment_requests]
# self._active_segment might be None in rare circumstances, so make
# sure we tolerate it
if self._active_segment and self._active_segment.segnum not in segnums:
self._active_segment.stop()
self._active_segment = None
self._start_new_segment()
# called by ShareFinder to choose hashtree sizes in CommonShares, and by
# SegmentFetcher to tell if it is still fetching a valid segnum.
def get_num_segments(self):
# returns (best_num_segments, authoritative)
if self.num_segments is None:
return (self.guessed_num_segments, False)
return (self.num_segments, True)

View File

@ -33,7 +33,7 @@ class Share:
# servers. A different backend would use a different class.
def __init__(self, rref, server_version, verifycap, commonshare, node,
download_status, peerid, shnum, logparent):
download_status, peerid, shnum, dyhb_rtt, logparent):
self._rref = rref
self._server_version = server_version
self._node = node # holds share_hash_tree and UEB
@ -51,6 +51,7 @@ class Share:
self._storage_index = verifycap.storage_index
self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
self._shnum = shnum
self._dyhb_rtt = dyhb_rtt
# self._alive becomes False upon fatal corruption or server error
self._alive = True
self._lp = log.msg(format="%(share)s created", share=repr(self),
@ -278,15 +279,16 @@ class Share:
if not self._satisfy_UEB():
# can't check any hashes without the UEB
return False
# the call to _satisfy_UEB() will immediately set the
# authoritative num_segments in all our CommonShares. If we
# guessed wrong, we might stil be working on a bogus segnum
# (beyond the real range). We catch this and signal BADSEGNUM
# before invoking any further code that touches hashtrees.
self.actual_segment_size = self._node.segment_size # might be updated
assert self.actual_segment_size is not None
# knowing the UEB means knowing num_segments. Despite the redundancy,
# this is the best place to set this. CommonShare.set_numsegs will
# ignore duplicate calls.
# knowing the UEB means knowing num_segments
assert self._node.num_segments is not None
cs = self._commonshare
cs.set_numsegs(self._node.num_segments)
segnum, observers = self._active_segnum_and_observers()
# if segnum is None, we don't really need to do anything (we have no
@ -304,9 +306,9 @@ class Share:
# can't check block_hash_tree without a root
return False
if cs.need_block_hash_root():
if self._commonshare.need_block_hash_root():
block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
cs.set_block_hash_root(block_hash_root)
self._commonshare.set_block_hash_root(block_hash_root)
if segnum is None:
return False # we don't want any particular segment right now
@ -360,7 +362,8 @@ class Share:
] ):
offsets[field] = fields[i]
self.actual_offsets = offsets
log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields))
log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields),
level=log.NOISY, parent=self._lp, umid="jedQcw")
self._received.remove(0, 4) # don't need this anymore
# validate the offsets a bit
@ -517,7 +520,8 @@ class Share:
block = self._received.pop(blockstart, blocklen)
if not block:
log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
blockstart, blocklen))
blockstart, blocklen),
level=log.NOISY, parent=self._lp, umid="aK0RFw")
return False
log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
share=repr(self), start=blockstart, length=blocklen,
@ -589,29 +593,17 @@ class Share:
if self.actual_offsets or self._overrun_ok:
if not self._node.have_UEB:
self._desire_UEB(desire, o)
# They might ask for a segment that doesn't look right.
# _satisfy() will catch+reject bad segnums once we know the UEB
# (and therefore segsize and numsegs), so we'll only fail this
# test if we're still guessing. We want to avoid asking the
# hashtrees for needed_hashes() for bad segnums. So don't enter
# _desire_hashes or _desire_data unless the segnum looks
# reasonable.
if segnum < r["num_segments"]:
# XXX somehow we're getting here for sh5. we don't yet know
# the actual_segment_size, we're still working off the guess.
# the ciphertext_hash_tree has been corrected, but the
# commonshare._block_hash_tree is still in the guessed state.
self._desire_share_hashes(desire, o)
if segnum is not None:
self._desire_block_hashes(desire, o, segnum)
self._desire_data(desire, o, r, segnum, segsize)
else:
log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)"
% (segnum, r["num_segments"]),
level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
self._desire_share_hashes(desire, o)
if segnum is not None:
# They might be asking for a segment number that is beyond
# what we guess the file contains, but _desire_block_hashes
# and _desire_data will tolerate that.
self._desire_block_hashes(desire, o, segnum)
self._desire_data(desire, o, r, segnum, segsize)
log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
% (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()))
% (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()),
level=log.NOISY, parent=self._lp, umid="IG7CgA")
if self.actual_offsets:
return (want_it, need_it+gotta_gotta_have_it)
else:
@ -681,14 +673,30 @@ class Share:
(want_it, need_it, gotta_gotta_have_it) = desire
# block hash chain
for hashnum in self._commonshare.get_needed_block_hashes(segnum):
for hashnum in self._commonshare.get_desired_block_hashes(segnum):
need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
# ciphertext hash chain
for hashnum in self._node.get_needed_ciphertext_hashes(segnum):
for hashnum in self._node.get_desired_ciphertext_hashes(segnum):
need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
def _desire_data(self, desire, o, r, segnum, segsize):
if segnum > r["num_segments"]:
# they're asking for a segment that's beyond what we think is the
# end of the file. We won't get here if we've already learned the
# real UEB: _get_satisfaction() will notice the out-of-bounds and
# terminate the loop. So we must still be guessing, which means
# that they might be correct in asking for such a large segnum.
# But if they're right, then our segsize/segnum guess is
# certainly wrong, which means we don't know what data blocks to
# ask for yet. So don't bother adding anything. When the UEB
# comes back and we learn the correct segsize/segnums, we'll
# either reject the request or have enough information to proceed
# normally. This costs one roundtrip.
log.msg("_desire_data: segnum(%d) looks wrong (numsegs=%d)"
% (segnum, r["num_segments"]),
level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
return
(want_it, need_it, gotta_gotta_have_it) = desire
tail = (segnum == r["num_segments"]-1)
datastart = o["data"]
@ -803,34 +811,62 @@ class Share:
class CommonShare:
# TODO: defer creation of the hashtree until somebody uses us. There will
# be a lot of unused shares, and we shouldn't spend the memory on a large
# hashtree unless necessary.
"""I hold data that is common across all instances of a single share,
like sh2 on both servers A and B. This is just the block hash tree.
"""
def __init__(self, guessed_numsegs, si_prefix, shnum, logparent):
def __init__(self, best_numsegs, si_prefix, shnum, logparent):
self.si_prefix = si_prefix
self.shnum = shnum
# in the beginning, before we have the real UEB, we can only guess at
# the number of segments. But we want to ask for block hashes early.
# So if we're asked for which block hashes are needed before we know
# numsegs for sure, we return a guess.
self._block_hash_tree = IncompleteHashTree(guessed_numsegs)
self._know_numsegs = False
self._block_hash_tree = IncompleteHashTree(best_numsegs)
self._block_hash_tree_is_authoritative = False
self._block_hash_tree_leaves = best_numsegs
self._logparent = logparent
def set_numsegs(self, numsegs):
if self._know_numsegs:
return
self._block_hash_tree = IncompleteHashTree(numsegs)
self._know_numsegs = True
def __repr__(self):
return "CommonShare(%s-sh%d)" % (self.si_prefix, self.shnum)
def set_authoritative_num_segments(self, numsegs):
if self._block_hash_tree_leaves != numsegs:
self._block_hash_tree = IncompleteHashTree(numsegs)
self._block_hash_tree_leaves = numsegs
self._block_hash_tree_is_authoritative = True
def need_block_hash_root(self):
return bool(not self._block_hash_tree[0])
def set_block_hash_root(self, roothash):
assert self._know_numsegs
assert self._block_hash_tree_is_authoritative
self._block_hash_tree.set_hashes({0: roothash})
def get_desired_block_hashes(self, segnum):
if segnum < self._block_hash_tree_leaves:
return self._block_hash_tree.needed_hashes(segnum,
include_leaf=True)
# the segnum might be out-of-bounds. Originally it was due to a race
# between the receipt of the UEB on one share (from which we learn
# the correct number of segments, update all hash trees to the right
# size, and queue a BADSEGNUM to the SegmentFetcher) and the delivery
# of a new Share to the SegmentFetcher while that BADSEGNUM was
# queued (which sends out requests to the stale segnum, now larger
# than the hash tree). I fixed that (by making SegmentFetcher.loop
# check for a bad segnum at the start of each pass, instead of using
# the queued BADSEGNUM or a flag it sets), but just in case this
# still happens, I'm leaving the < in place. If it gets hit, there's
# a potential lost-progress problem, but I'm pretty sure that it will
# get cleared up on the following turn.
return []
def get_needed_block_hashes(self, segnum):
assert self._block_hash_tree_is_authoritative
# XXX: include_leaf=True needs thought: how did the old downloader do
# it? I think it grabbed *all* block hashes and set them all at once.
# Since we want to fetch less data, we either need to fetch the leaf
@ -840,12 +876,25 @@ class CommonShare:
return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
def process_block_hashes(self, block_hashes):
assert self._know_numsegs
assert self._block_hash_tree_is_authoritative
# this may raise BadHashError or NotEnoughHashesError
self._block_hash_tree.set_hashes(block_hashes)
def check_block(self, segnum, block):
assert self._know_numsegs
assert self._block_hash_tree_is_authoritative
h = hashutil.block_hash(block)
# this may raise BadHashError or NotEnoughHashesError
self._block_hash_tree.set_hashes(leaves={segnum: h})
# TODO: maybe stop using EventStreamObserver: instead, use a Deferred and an
# auxilliary OVERDUE callback. Just make sure to get all the messages in the
# right order and on the right turns.
# TODO: we're asking for too much data. We probably don't need
# include_leaf=True in the block hash tree or ciphertext hash tree.
# TODO: we ask for ciphertext hash tree nodes from all shares (whenever
# _desire is called while we're missing those nodes), but we only consume it
# from the first response, leaving the rest of the data sitting in _received.
# This was ameliorated by clearing self._received after each block is
# complete.

View File

@ -2303,8 +2303,8 @@ class Errors(GridTestMixin, CLITestMixin, unittest.TestCase):
# the download is abandoned as soon as it's clear that we won't get
# enough shares. The one remaining share might be in either the
# COMPLETE or the PENDING state.
in_complete_msg = "ran out of shares: 1 complete, 0 pending, 0 overdue, 0 unused, need 3"
in_pending_msg = "ran out of shares: 0 complete, 1 pending, 0 overdue, 0 unused, need 3"
in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3"
in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7v) overdue= unused= need 3"
d.addCallback(lambda ign: self.do_cli("get", self.uri_1share))
def _check1((rc, out, err)):

View File

@ -15,8 +15,9 @@ from allmydata.test.no_network import GridTestMixin
from allmydata.test.common import ShouldFailMixin
from allmydata.interfaces import NotEnoughSharesError, NoSharesError
from allmydata.immutable.downloader.common import BadSegmentNumberError, \
BadCiphertextHashError, DownloadStopped
BadCiphertextHashError, DownloadStopped, COMPLETE, OVERDUE, DEAD
from allmydata.immutable.downloader.status import DownloadStatus
from allmydata.immutable.downloader.fetcher import SegmentFetcher
from allmydata.codec import CRSDecoder
from foolscap.eventual import fireEventually, flushEventualQueue
@ -295,7 +296,7 @@ class DownloadTest(_Base, unittest.TestCase):
# shares
servers = []
shares = sorted([s._shnum for s in self.n._cnode._node._shares])
self.failUnlessEqual(shares, [0,1,2])
self.failUnlessEqual(shares, [0,1,2,3])
# break the RIBucketReader references
for s in self.n._cnode._node._shares:
s._rref.broken = True
@ -318,7 +319,7 @@ class DownloadTest(_Base, unittest.TestCase):
self.failUnlessEqual("".join(c.chunks), plaintext)
shares = sorted([s._shnum for s in self.n._cnode._node._shares])
# we should now be using more shares than we were before
self.failIfEqual(shares, [0,1,2])
self.failIfEqual(shares, [0,1,2,3])
d.addCallback(_check_failover)
return d
@ -539,7 +540,7 @@ class DownloadTest(_Base, unittest.TestCase):
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
self.failUnless(f.check(NotEnoughSharesError))
self.failUnless(f.check(NoSharesError))
con2.producer.stopProducing()
return d2
d.addCallbacks(_con1_should_not_succeed, _con1_failed)
@ -583,7 +584,7 @@ class DownloadTest(_Base, unittest.TestCase):
def _con1_should_not_succeed(res):
self.fail("the first read should not have succeeded")
def _con1_failed(f):
self.failUnless(f.check(NotEnoughSharesError))
self.failUnless(f.check(NoSharesError))
# we *don't* cancel the second one here: this exercises a
# lost-progress bug from #1154. We just wait for it to
# succeed.
@ -1121,7 +1122,7 @@ class Corruption(_Base, unittest.TestCase):
# All these tests result in a failed download.
d.addCallback(self._corrupt_flip_all, imm_uri, i)
d.addCallback(lambda ign:
self.shouldFail(NotEnoughSharesError, which,
self.shouldFail(NoSharesError, which,
substring,
_download, imm_uri))
d.addCallback(lambda ign: self.restore_all_shares(self.shares))
@ -1257,3 +1258,332 @@ class Status(unittest.TestCase):
e2.update(1000, 2.0, 2.0)
e2.finished(now+5)
self.failUnlessEqual(ds.get_progress(), 1.0)
class MyShare:
def __init__(self, shnum, peerid, rtt):
self._shnum = shnum
self._peerid = peerid
self._peerid_s = peerid
self._dyhb_rtt = rtt
def __repr__(self):
return "sh%d-on-%s" % (self._shnum, self._peerid)
class MySegmentFetcher(SegmentFetcher):
def __init__(self, *args, **kwargs):
SegmentFetcher.__init__(self, *args, **kwargs)
self._test_start_shares = []
def _start_share(self, share, shnum):
self._test_start_shares.append(share)
class FakeNode:
def __init__(self):
self.want_more = 0
self.failed = None
self.processed = None
self._si_prefix = "si_prefix"
def want_more_shares(self):
self.want_more += 1
def fetch_failed(self, fetcher, f):
self.failed = f
def process_blocks(self, segnum, blocks):
self.processed = (segnum, blocks)
def get_num_segments(self):
return 1, True
class Selection(unittest.TestCase):
def test_no_shares(self):
node = FakeNode()
sf = SegmentFetcher(node, 0, 3, None)
sf.add_shares([])
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 1)
self.failUnlessEqual(node.failed, None)
sf.no_more_shares()
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
self.failUnless(node.failed)
self.failUnless(node.failed.check(NoSharesError))
d.addCallback(_check2)
return d
def test_only_one_share(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
shares = [MyShare(0, "peer-A", 0.0)]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 1)
self.failUnlessEqual(node.failed, None)
sf.no_more_shares()
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
self.failUnless(node.failed)
self.failUnless(node.failed.check(NotEnoughSharesError))
self.failUnlessIn("complete= pending=sh0-on-peer-A overdue= unused=",
str(node.failed))
d.addCallback(_check2)
return d
def test_good_diversity_early(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 0)
self.failUnlessEqual(sf._test_start_shares, shares[:3])
for sh in sf._test_start_shares:
sf._block_request_activity(sh, sh._shnum, COMPLETE,
"block-%d" % sh._shnum)
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
self.failIfEqual(node.processed, None)
self.failUnlessEqual(node.processed, (0, {0: "block-0",
1: "block-1",
2: "block-2"}) )
d.addCallback(_check2)
return d
def test_good_diversity_late(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
sf.add_shares([])
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 1)
sf.add_shares(shares)
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
self.failUnlessEqual(sf._test_start_shares, shares[:3])
for sh in sf._test_start_shares:
sf._block_request_activity(sh, sh._shnum, COMPLETE,
"block-%d" % sh._shnum)
return flushEventualQueue()
d.addCallback(_check2)
def _check3(ign):
self.failIfEqual(node.processed, None)
self.failUnlessEqual(node.processed, (0, {0: "block-0",
1: "block-1",
2: "block-2"}) )
d.addCallback(_check3)
return d
def test_avoid_bad_diversity_late(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
# we could satisfy the read entirely from the first server, but we'd
# prefer not to. Instead, we expect to only pull one share from the
# first server
shares = [MyShare(0, "peer-A", 0.0),
MyShare(1, "peer-A", 0.0),
MyShare(2, "peer-A", 0.0),
MyShare(3, "peer-B", 1.0),
MyShare(4, "peer-C", 2.0),
]
sf.add_shares([])
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 1)
sf.add_shares(shares)
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
self.failUnlessEqual(sf._test_start_shares,
[shares[0], shares[3], shares[4]])
for sh in sf._test_start_shares:
sf._block_request_activity(sh, sh._shnum, COMPLETE,
"block-%d" % sh._shnum)
return flushEventualQueue()
d.addCallback(_check2)
def _check3(ign):
self.failIfEqual(node.processed, None)
self.failUnlessEqual(node.processed, (0, {0: "block-0",
3: "block-3",
4: "block-4"}) )
d.addCallback(_check3)
return d
def test_suffer_bad_diversity_late(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
# we satisfy the read entirely from the first server because we don't
# have any other choice.
shares = [MyShare(0, "peer-A", 0.0),
MyShare(1, "peer-A", 0.0),
MyShare(2, "peer-A", 0.0),
MyShare(3, "peer-A", 0.0),
MyShare(4, "peer-A", 0.0),
]
sf.add_shares([])
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 1)
sf.add_shares(shares)
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
self.failUnlessEqual(node.want_more, 3)
self.failUnlessEqual(sf._test_start_shares,
[shares[0], shares[1], shares[2]])
for sh in sf._test_start_shares:
sf._block_request_activity(sh, sh._shnum, COMPLETE,
"block-%d" % sh._shnum)
return flushEventualQueue()
d.addCallback(_check2)
def _check3(ign):
self.failIfEqual(node.processed, None)
self.failUnlessEqual(node.processed, (0, {0: "block-0",
1: "block-1",
2: "block-2"}) )
d.addCallback(_check3)
return d
def test_suffer_bad_diversity_early(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
# we satisfy the read entirely from the first server because we don't
# have any other choice.
shares = [MyShare(0, "peer-A", 0.0),
MyShare(1, "peer-A", 0.0),
MyShare(2, "peer-A", 0.0),
MyShare(3, "peer-A", 0.0),
MyShare(4, "peer-A", 0.0),
]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 2)
self.failUnlessEqual(sf._test_start_shares,
[shares[0], shares[1], shares[2]])
for sh in sf._test_start_shares:
sf._block_request_activity(sh, sh._shnum, COMPLETE,
"block-%d" % sh._shnum)
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
self.failIfEqual(node.processed, None)
self.failUnlessEqual(node.processed, (0, {0: "block-0",
1: "block-1",
2: "block-2"}) )
d.addCallback(_check2)
return d
def test_overdue(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
shares = [MyShare(i, "peer-%d" % i, i) for i in range(10)]
sf.add_shares(shares)
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 0)
self.failUnlessEqual(sf._test_start_shares, shares[:3])
for sh in sf._test_start_shares:
sf._block_request_activity(sh, sh._shnum, OVERDUE)
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
self.failUnlessEqual(sf._test_start_shares, shares[:6])
for sh in sf._test_start_shares[3:]:
sf._block_request_activity(sh, sh._shnum, COMPLETE,
"block-%d" % sh._shnum)
return flushEventualQueue()
d.addCallback(_check2)
def _check3(ign):
self.failIfEqual(node.processed, None)
self.failUnlessEqual(node.processed, (0, {3: "block-3",
4: "block-4",
5: "block-5"}) )
d.addCallback(_check3)
return d
def test_overdue_fails(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
shares = [MyShare(i, "peer-%d" % i, i) for i in range(6)]
sf.add_shares(shares)
sf.no_more_shares()
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 0)
self.failUnlessEqual(sf._test_start_shares, shares[:3])
for sh in sf._test_start_shares:
sf._block_request_activity(sh, sh._shnum, OVERDUE)
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
self.failUnlessEqual(sf._test_start_shares, shares[:6])
for sh in sf._test_start_shares[3:]:
sf._block_request_activity(sh, sh._shnum, DEAD)
return flushEventualQueue()
d.addCallback(_check2)
def _check3(ign):
# we're still waiting
self.failUnlessEqual(node.processed, None)
self.failUnlessEqual(node.failed, None)
# now complete one of the overdue ones, and kill one of the other
# ones, leaving one hanging. This should trigger a failure, since
# we cannot succeed.
live = sf._test_start_shares[0]
die = sf._test_start_shares[1]
sf._block_request_activity(live, live._shnum, COMPLETE, "block")
sf._block_request_activity(die, die._shnum, DEAD)
return flushEventualQueue()
d.addCallback(_check3)
def _check4(ign):
self.failUnless(node.failed)
self.failUnless(node.failed.check(NotEnoughSharesError))
self.failUnlessIn("complete=sh0 pending= overdue=sh2-on-peer-2 unused=",
str(node.failed))
d.addCallback(_check4)
return d
def test_avoid_redundancy(self):
node = FakeNode()
sf = MySegmentFetcher(node, 0, 3, None)
# we could satisfy the read entirely from the first server, but we'd
# prefer not to. Instead, we expect to only pull one share from the
# first server
shares = [MyShare(0, "peer-A", 0.0),
MyShare(1, "peer-B", 1.0),
MyShare(0, "peer-C", 2.0), # this will be skipped
MyShare(1, "peer-D", 3.0),
MyShare(2, "peer-E", 4.0),
]
sf.add_shares(shares[:3])
d = flushEventualQueue()
def _check1(ign):
self.failUnlessEqual(node.want_more, 1)
self.failUnlessEqual(sf._test_start_shares,
[shares[0], shares[1]])
# allow sh1 to retire
sf._block_request_activity(shares[1], 1, COMPLETE, "block-1")
return flushEventualQueue()
d.addCallback(_check1)
def _check2(ign):
# and then feed in the remaining shares
sf.add_shares(shares[3:])
sf.no_more_shares()
return flushEventualQueue()
d.addCallback(_check2)
def _check3(ign):
self.failUnlessEqual(sf._test_start_shares,
[shares[0], shares[1], shares[4]])
sf._block_request_activity(shares[0], 0, COMPLETE, "block-0")
sf._block_request_activity(shares[4], 2, COMPLETE, "block-2")
return flushEventualQueue()
d.addCallback(_check3)
def _check4(ign):
self.failIfEqual(node.processed, None)
self.failUnlessEqual(node.processed, (0, {0: "block-0",
1: "block-1",
2: "block-2"}) )
d.addCallback(_check4)
return d

View File

@ -52,7 +52,7 @@ class Test(common.ShareManglingMixin, common.ShouldFailMixin, unittest.TestCase)
def _after_download(unused=None):
after_download_reads = self._count_reads()
#print before_download_reads, after_download_reads
self.failIf(after_download_reads-before_download_reads > 36,
self.failIf(after_download_reads-before_download_reads > 41,
(after_download_reads, before_download_reads))
d.addCallback(self._download_and_check_plaintext)
d.addCallback(_after_download)

View File

@ -4259,15 +4259,20 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi
def _check_one_share(body):
self.failIf("<html>" in body, body)
body = " ".join(body.strip().split())
msg = ("NotEnoughSharesError: This indicates that some "
"servers were unavailable, or that shares have been "
"lost to server departure, hard drive failure, or disk "
"corruption. You should perform a filecheck on "
"this object to learn more. The full error message is:"
" ran out of shares: %d complete, %d pending, 0 overdue,"
" 0 unused, need 3. Last failure: None")
msg1 = msg % (1, 0)
msg2 = msg % (0, 1)
msgbase = ("NotEnoughSharesError: This indicates that some "
"servers were unavailable, or that shares have been "
"lost to server departure, hard drive failure, or disk "
"corruption. You should perform a filecheck on "
"this object to learn more. The full error message is:"
)
msg1 = msgbase + (" ran out of shares:"
" complete=sh0"
" pending="
" overdue= unused= need 3. Last failure: None")
msg2 = msgbase + (" ran out of shares:"
" complete="
" pending=Share(sh0-on-xgru5)"
" overdue= unused= need 3. Last failure: None")
self.failUnless(body == msg1 or body == msg2, body)
d.addCallback(_check_one_share)