mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
Rewrite immutable downloader (#798). This patch adds the new downloader itself.
This commit is contained in:
parent
88d7ec2d54
commit
22a07e9bbe
File diff suppressed because it is too large
Load Diff
0
src/allmydata/immutable/downloader/__init__.py
Normal file
0
src/allmydata/immutable/downloader/__init__.py
Normal file
13
src/allmydata/immutable/downloader/common.py
Normal file
13
src/allmydata/immutable/downloader/common.py
Normal file
@ -0,0 +1,13 @@
|
||||
|
||||
(AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, BADSEGNUM) = \
|
||||
("AVAILABLE", "PENDING", "OVERDUE", "COMPLETE", "CORRUPT", "DEAD", "BADSEGNUM")
|
||||
|
||||
class BadSegmentNumberError(Exception):
|
||||
pass
|
||||
class WrongSegmentError(Exception):
|
||||
pass
|
||||
class BadCiphertextHashError(Exception):
|
||||
pass
|
||||
|
||||
class DownloadStopped(Exception):
|
||||
pass
|
229
src/allmydata/immutable/downloader/fetcher.py
Normal file
229
src/allmydata/immutable/downloader/fetcher.py
Normal file
@ -0,0 +1,229 @@
|
||||
|
||||
from twisted.python.failure import Failure
|
||||
from foolscap.api import eventually
|
||||
from allmydata.interfaces import NotEnoughSharesError, NoSharesError
|
||||
from allmydata.util import log
|
||||
from allmydata.util.dictutil import DictOfSets
|
||||
from common import AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT, DEAD, \
|
||||
BADSEGNUM, BadSegmentNumberError
|
||||
|
||||
class SegmentFetcher:
|
||||
"""I am responsible for acquiring blocks for a single segment. I will use
|
||||
the Share instances passed to my add_shares() method to locate, retrieve,
|
||||
and validate those blocks. I expect my parent node to call my
|
||||
no_more_shares() method when there are no more shares available. I will
|
||||
call my parent's want_more_shares() method when I want more: I expect to
|
||||
see at least one call to add_shares or no_more_shares afterwards.
|
||||
|
||||
When I have enough validated blocks, I will call my parent's
|
||||
process_blocks() method with a dictionary that maps shnum to blockdata.
|
||||
If I am unable to provide enough blocks, I will call my parent's
|
||||
fetch_failed() method with (self, f). After either of these events, I
|
||||
will shut down and do no further work. My parent can also call my stop()
|
||||
method to have me shut down early."""
|
||||
|
||||
def __init__(self, node, segnum, k):
|
||||
self._node = node # _Node
|
||||
self.segnum = segnum
|
||||
self._k = k
|
||||
self._shares = {} # maps non-dead Share instance to a state, one of
|
||||
# (AVAILABLE, PENDING, OVERDUE, COMPLETE, CORRUPT).
|
||||
# State transition map is:
|
||||
# AVAILABLE -(send-read)-> PENDING
|
||||
# PENDING -(timer)-> OVERDUE
|
||||
# PENDING -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
|
||||
# OVERDUE -(rx)-> COMPLETE, CORRUPT, DEAD, BADSEGNUM
|
||||
# If a share becomes DEAD, it is removed from the
|
||||
# dict. If it becomes BADSEGNUM, the whole fetch is
|
||||
# terminated.
|
||||
self._share_observers = {} # maps Share to EventStreamObserver for
|
||||
# active ones
|
||||
self._shnums = DictOfSets() # maps shnum to the shares that provide it
|
||||
self._blocks = {} # maps shnum to validated block data
|
||||
self._no_more_shares = False
|
||||
self._bad_segnum = False
|
||||
self._last_failure = None
|
||||
self._running = True
|
||||
|
||||
def stop(self):
|
||||
log.msg("SegmentFetcher(%s).stop" % self._node._si_prefix,
|
||||
level=log.NOISY, umid="LWyqpg")
|
||||
self._cancel_all_requests()
|
||||
self._running = False
|
||||
self._shares.clear() # let GC work # ??? XXX
|
||||
|
||||
|
||||
# called by our parent _Node
|
||||
|
||||
def add_shares(self, shares):
|
||||
# called when ShareFinder locates a new share, and when a non-initial
|
||||
# segment fetch is started and we already know about shares from the
|
||||
# previous segment
|
||||
for s in shares:
|
||||
self._shares[s] = AVAILABLE
|
||||
self._shnums.add(s._shnum, s)
|
||||
eventually(self.loop)
|
||||
|
||||
def no_more_shares(self):
|
||||
# ShareFinder tells us it's reached the end of its list
|
||||
self._no_more_shares = True
|
||||
eventually(self.loop)
|
||||
|
||||
# internal methods
|
||||
|
||||
def _count_shnums(self, *states):
|
||||
"""shnums for which at least one state is in the following list"""
|
||||
shnums = []
|
||||
for shnum,shares in self._shnums.iteritems():
|
||||
matches = [s for s in shares if self._shares.get(s) in states]
|
||||
if matches:
|
||||
shnums.append(shnum)
|
||||
return len(shnums)
|
||||
|
||||
def loop(self):
|
||||
try:
|
||||
# if any exception occurs here, kill the download
|
||||
self._do_loop()
|
||||
except BaseException:
|
||||
self._node.fetch_failed(self, Failure())
|
||||
raise
|
||||
|
||||
def _do_loop(self):
|
||||
k = self._k
|
||||
if not self._running:
|
||||
return
|
||||
if self._bad_segnum:
|
||||
# oops, we were asking for a segment number beyond the end of the
|
||||
# file. This is an error.
|
||||
self.stop()
|
||||
e = BadSegmentNumberError("segnum=%d, numsegs=%d" %
|
||||
(self.segnum, self._node.num_segments))
|
||||
f = Failure(e)
|
||||
self._node.fetch_failed(self, f)
|
||||
return
|
||||
|
||||
# are we done?
|
||||
if self._count_shnums(COMPLETE) >= k:
|
||||
# yay!
|
||||
self.stop()
|
||||
self._node.process_blocks(self.segnum, self._blocks)
|
||||
return
|
||||
|
||||
# we may have exhausted everything
|
||||
if (self._no_more_shares and
|
||||
self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) < k):
|
||||
# no more new shares are coming, and the remaining hopeful shares
|
||||
# aren't going to be enough. boo!
|
||||
|
||||
log.msg("share states: %r" % (self._shares,),
|
||||
level=log.NOISY, umid="0ThykQ")
|
||||
if self._count_shnums(AVAILABLE, PENDING, OVERDUE, COMPLETE) == 0:
|
||||
format = ("no shares (need %(k)d)."
|
||||
" Last failure: %(last_failure)s")
|
||||
args = { "k": k,
|
||||
"last_failure": self._last_failure }
|
||||
error = NoSharesError
|
||||
else:
|
||||
format = ("ran out of shares: %(complete)d complete,"
|
||||
" %(pending)d pending, %(overdue)d overdue,"
|
||||
" %(unused)d unused, need %(k)d."
|
||||
" Last failure: %(last_failure)s")
|
||||
args = {"complete": self._count_shnums(COMPLETE),
|
||||
"pending": self._count_shnums(PENDING),
|
||||
"overdue": self._count_shnums(OVERDUE),
|
||||
# 'unused' should be zero
|
||||
"unused": self._count_shnums(AVAILABLE),
|
||||
"k": k,
|
||||
"last_failure": self._last_failure,
|
||||
}
|
||||
error = NotEnoughSharesError
|
||||
log.msg(format=format, level=log.UNUSUAL, umid="1DsnTg", **args)
|
||||
e = error(format % args)
|
||||
f = Failure(e)
|
||||
self.stop()
|
||||
self._node.fetch_failed(self, f)
|
||||
return
|
||||
|
||||
# nope, not done. Are we "block-hungry" (i.e. do we want to send out
|
||||
# more read requests, or do we think we have enough in flight
|
||||
# already?)
|
||||
while self._count_shnums(PENDING, COMPLETE) < k:
|
||||
# we're hungry.. are there any unused shares?
|
||||
sent = self._send_new_request()
|
||||
if not sent:
|
||||
break
|
||||
|
||||
# ok, now are we "share-hungry" (i.e. do we have enough known shares
|
||||
# to make us happy, or should we ask the ShareFinder to get us more?)
|
||||
if self._count_shnums(AVAILABLE, PENDING, COMPLETE) < k:
|
||||
# we're hungry for more shares
|
||||
self._node.want_more_shares()
|
||||
# that will trigger the ShareFinder to keep looking
|
||||
|
||||
def _find_one(self, shares, state):
|
||||
# TODO could choose fastest
|
||||
for s in shares:
|
||||
if self._shares[s] == state:
|
||||
return s
|
||||
# can never get here, caller has assert in case of code bug
|
||||
|
||||
def _send_new_request(self):
|
||||
for shnum,shares in sorted(self._shnums.iteritems()):
|
||||
states = [self._shares[s] for s in shares]
|
||||
if COMPLETE in states or PENDING in states:
|
||||
# don't send redundant requests
|
||||
continue
|
||||
if AVAILABLE not in states:
|
||||
# no candidates for this shnum, move on
|
||||
continue
|
||||
# here's a candidate. Send a request.
|
||||
s = self._find_one(shares, AVAILABLE)
|
||||
assert s
|
||||
self._shares[s] = PENDING
|
||||
self._share_observers[s] = o = s.get_block(self.segnum)
|
||||
o.subscribe(self._block_request_activity, share=s, shnum=shnum)
|
||||
# TODO: build up a list of candidates, then walk through the
|
||||
# list, sending requests to the most desireable servers,
|
||||
# re-checking our block-hunger each time. For non-initial segment
|
||||
# fetches, this would let us stick with faster servers.
|
||||
return True
|
||||
# nothing was sent: don't call us again until you have more shares to
|
||||
# work with, or one of the existing shares has been declared OVERDUE
|
||||
return False
|
||||
|
||||
def _cancel_all_requests(self):
|
||||
for o in self._share_observers.values():
|
||||
o.cancel()
|
||||
self._share_observers = {}
|
||||
|
||||
def _block_request_activity(self, share, shnum, state, block=None, f=None):
|
||||
# called by Shares, in response to our s.send_request() calls.
|
||||
if not self._running:
|
||||
return
|
||||
log.msg("SegmentFetcher(%s)._block_request_activity:"
|
||||
" Share(sh%d-on-%s) -> %s" %
|
||||
(self._node._si_prefix, shnum, share._peerid_s, state),
|
||||
level=log.NOISY, umid="vilNWA")
|
||||
# COMPLETE, CORRUPT, DEAD, BADSEGNUM are terminal.
|
||||
if state in (COMPLETE, CORRUPT, DEAD, BADSEGNUM):
|
||||
self._share_observers.pop(share, None)
|
||||
if state is COMPLETE:
|
||||
# 'block' is fully validated
|
||||
self._shares[share] = COMPLETE
|
||||
self._blocks[shnum] = block
|
||||
elif state is OVERDUE:
|
||||
self._shares[share] = OVERDUE
|
||||
# OVERDUE is not terminal: it will eventually transition to
|
||||
# COMPLETE, CORRUPT, or DEAD.
|
||||
elif state is CORRUPT:
|
||||
self._shares[share] = CORRUPT
|
||||
elif state is DEAD:
|
||||
del self._shares[share]
|
||||
self._shnums[shnum].remove(share)
|
||||
self._last_failure = f
|
||||
elif state is BADSEGNUM:
|
||||
self._shares[share] = BADSEGNUM # ???
|
||||
self._bad_segnum = True
|
||||
eventually(self.loop)
|
||||
|
||||
|
202
src/allmydata/immutable/downloader/finder.py
Normal file
202
src/allmydata/immutable/downloader/finder.py
Normal file
@ -0,0 +1,202 @@
|
||||
|
||||
import time
|
||||
now = time.time
|
||||
from foolscap.api import eventually
|
||||
from allmydata.util import base32, log, idlib
|
||||
|
||||
from share import Share, CommonShare
|
||||
|
||||
def incidentally(res, f, *args, **kwargs):
|
||||
"""Add me to a Deferred chain like this:
|
||||
d.addBoth(incidentally, func, arg)
|
||||
and I'll behave as if you'd added the following function:
|
||||
def _(res):
|
||||
func(arg)
|
||||
return res
|
||||
This is useful if you want to execute an expression when the Deferred
|
||||
fires, but don't care about its value.
|
||||
"""
|
||||
f(*args, **kwargs)
|
||||
return res
|
||||
|
||||
class RequestToken:
|
||||
def __init__(self, peerid):
|
||||
self.peerid = peerid
|
||||
|
||||
class ShareFinder:
|
||||
def __init__(self, storage_broker, verifycap, node, download_status,
|
||||
logparent=None, max_outstanding_requests=10):
|
||||
self.running = True # stopped by Share.stop, from Terminator
|
||||
self.verifycap = verifycap
|
||||
self._started = False
|
||||
self._storage_broker = storage_broker
|
||||
self.share_consumer = self.node = node
|
||||
self.max_outstanding_requests = max_outstanding_requests
|
||||
|
||||
self._hungry = False
|
||||
|
||||
self._commonshares = {} # shnum to CommonShare instance
|
||||
self.undelivered_shares = []
|
||||
self.pending_requests = set()
|
||||
|
||||
self._storage_index = verifycap.storage_index
|
||||
self._si_prefix = base32.b2a_l(self._storage_index[:8], 60)
|
||||
self._node_logparent = logparent
|
||||
self._download_status = download_status
|
||||
self._lp = log.msg(format="ShareFinder[si=%(si)s] starting",
|
||||
si=self._si_prefix,
|
||||
level=log.NOISY, parent=logparent, umid="2xjj2A")
|
||||
|
||||
def start_finding_servers(self):
|
||||
# don't get servers until somebody uses us: creating the
|
||||
# ImmutableFileNode should not cause work to happen yet. Test case is
|
||||
# test_dirnode, which creates us with storage_broker=None
|
||||
if not self._started:
|
||||
si = self.verifycap.storage_index
|
||||
s = self._storage_broker.get_servers_for_index(si)
|
||||
self._servers = iter(s)
|
||||
self._started = True
|
||||
|
||||
def log(self, *args, **kwargs):
|
||||
if "parent" not in kwargs:
|
||||
kwargs["parent"] = self._lp
|
||||
return log.msg(*args, **kwargs)
|
||||
|
||||
def stop(self):
|
||||
self.running = False
|
||||
|
||||
# called by our parent CiphertextDownloader
|
||||
def hungry(self):
|
||||
self.log(format="ShareFinder[si=%(si)s] hungry",
|
||||
si=self._si_prefix, level=log.NOISY, umid="NywYaQ")
|
||||
self.start_finding_servers()
|
||||
self._hungry = True
|
||||
eventually(self.loop)
|
||||
|
||||
# internal methods
|
||||
def loop(self):
|
||||
undelivered_s = ",".join(["sh%d@%s" %
|
||||
(s._shnum, idlib.shortnodeid_b2a(s._peerid))
|
||||
for s in self.undelivered_shares])
|
||||
pending_s = ",".join([idlib.shortnodeid_b2a(rt.peerid)
|
||||
for rt in self.pending_requests]) # sort?
|
||||
self.log(format="ShareFinder loop: running=%(running)s"
|
||||
" hungry=%(hungry)s, undelivered=%(undelivered)s,"
|
||||
" pending=%(pending)s",
|
||||
running=self.running, hungry=self._hungry,
|
||||
undelivered=undelivered_s, pending=pending_s,
|
||||
level=log.NOISY, umid="kRtS4Q")
|
||||
if not self.running:
|
||||
return
|
||||
if not self._hungry:
|
||||
return
|
||||
if self.undelivered_shares:
|
||||
sh = self.undelivered_shares.pop(0)
|
||||
# they will call hungry() again if they want more
|
||||
self._hungry = False
|
||||
self.log(format="delivering Share(shnum=%(shnum)d, server=%(peerid)s)",
|
||||
shnum=sh._shnum, peerid=sh._peerid_s,
|
||||
level=log.NOISY, umid="2n1qQw")
|
||||
eventually(self.share_consumer.got_shares, [sh])
|
||||
return
|
||||
|
||||
if len(self.pending_requests) >= self.max_outstanding_requests:
|
||||
# cannot send more requests, must wait for some to retire
|
||||
return
|
||||
|
||||
server = None
|
||||
try:
|
||||
if self._servers:
|
||||
server = self._servers.next()
|
||||
except StopIteration:
|
||||
self._servers = None
|
||||
|
||||
if server:
|
||||
self.send_request(server)
|
||||
# we loop again to get parallel queries. The check above will
|
||||
# prevent us from looping forever.
|
||||
eventually(self.loop)
|
||||
return
|
||||
|
||||
if self.pending_requests:
|
||||
# no server, but there are still requests in flight: maybe one of
|
||||
# them will make progress
|
||||
return
|
||||
|
||||
self.log(format="ShareFinder.loop: no_more_shares, ever",
|
||||
level=log.UNUSUAL, umid="XjQlzg")
|
||||
# we've run out of servers (so we can't send any more requests), and
|
||||
# we have nothing in flight. No further progress can be made. They
|
||||
# are destined to remain hungry.
|
||||
self.share_consumer.no_more_shares()
|
||||
|
||||
def send_request(self, server):
|
||||
peerid, rref = server
|
||||
req = RequestToken(peerid)
|
||||
self.pending_requests.add(req)
|
||||
lp = self.log(format="sending DYHB to [%(peerid)s]",
|
||||
peerid=idlib.shortnodeid_b2a(peerid),
|
||||
level=log.NOISY, umid="Io7pyg")
|
||||
d_ev = self._download_status.add_dyhb_sent(peerid, now())
|
||||
d = rref.callRemote("get_buckets", self._storage_index)
|
||||
d.addBoth(incidentally, self.pending_requests.discard, req)
|
||||
d.addCallbacks(self._got_response, self._got_error,
|
||||
callbackArgs=(rref.version, peerid, req, d_ev, lp),
|
||||
errbackArgs=(peerid, req, d_ev, lp))
|
||||
d.addErrback(log.err, format="error in send_request",
|
||||
level=log.WEIRD, parent=lp, umid="rpdV0w")
|
||||
d.addCallback(incidentally, eventually, self.loop)
|
||||
|
||||
def _got_response(self, buckets, server_version, peerid, req, d_ev, lp):
|
||||
shnums = sorted([shnum for shnum in buckets])
|
||||
d_ev.finished(shnums, now())
|
||||
if buckets:
|
||||
shnums_s = ",".join([str(shnum) for shnum in shnums])
|
||||
self.log(format="got shnums [%(shnums)s] from [%(peerid)s]",
|
||||
shnums=shnums_s, peerid=idlib.shortnodeid_b2a(peerid),
|
||||
level=log.NOISY, parent=lp, umid="0fcEZw")
|
||||
else:
|
||||
self.log(format="no shares from [%(peerid)s]",
|
||||
peerid=idlib.shortnodeid_b2a(peerid),
|
||||
level=log.NOISY, parent=lp, umid="U7d4JA")
|
||||
if self.node.num_segments is None:
|
||||
best_numsegs = self.node.guessed_num_segments
|
||||
else:
|
||||
best_numsegs = self.node.num_segments
|
||||
for shnum, bucket in buckets.iteritems():
|
||||
self._create_share(best_numsegs, shnum, bucket, server_version,
|
||||
peerid)
|
||||
|
||||
def _create_share(self, best_numsegs, shnum, bucket, server_version,
|
||||
peerid):
|
||||
if shnum in self._commonshares:
|
||||
cs = self._commonshares[shnum]
|
||||
else:
|
||||
cs = CommonShare(best_numsegs, self._si_prefix, shnum,
|
||||
self._node_logparent)
|
||||
# Share._get_satisfaction is responsible for updating
|
||||
# CommonShare.set_numsegs after we know the UEB. Alternatives:
|
||||
# 1: d = self.node.get_num_segments()
|
||||
# d.addCallback(cs.got_numsegs)
|
||||
# the problem is that the OneShotObserverList I was using
|
||||
# inserts an eventual-send between _get_satisfaction's
|
||||
# _satisfy_UEB and _satisfy_block_hash_tree, and the
|
||||
# CommonShare didn't get the num_segs message before
|
||||
# being asked to set block hash values. To resolve this
|
||||
# would require an immediate ObserverList instead of
|
||||
# an eventual-send -based one
|
||||
# 2: break _get_satisfaction into Deferred-attached pieces.
|
||||
# Yuck.
|
||||
self._commonshares[shnum] = cs
|
||||
s = Share(bucket, server_version, self.verifycap, cs, self.node,
|
||||
self._download_status, peerid, shnum,
|
||||
self._node_logparent)
|
||||
self.undelivered_shares.append(s)
|
||||
|
||||
def _got_error(self, f, peerid, req, d_ev, lp):
|
||||
d_ev.finished("error", now())
|
||||
self.log(format="got error from [%(peerid)s]",
|
||||
peerid=idlib.shortnodeid_b2a(peerid), failure=f,
|
||||
level=log.UNUSUAL, parent=lp, umid="zUKdCw")
|
||||
|
||||
|
471
src/allmydata/immutable/downloader/node.py
Normal file
471
src/allmydata/immutable/downloader/node.py
Normal file
@ -0,0 +1,471 @@
|
||||
|
||||
import time
|
||||
now = time.time
|
||||
from twisted.python.failure import Failure
|
||||
from twisted.internet import defer
|
||||
from foolscap.api import eventually
|
||||
from allmydata import uri
|
||||
from allmydata.codec import CRSDecoder
|
||||
from allmydata.util import base32, log, hashutil, mathutil, observer
|
||||
from allmydata.interfaces import DEFAULT_MAX_SEGMENT_SIZE
|
||||
from allmydata.hashtree import IncompleteHashTree, BadHashError, \
|
||||
NotEnoughHashesError
|
||||
|
||||
# local imports
|
||||
from finder import ShareFinder
|
||||
from fetcher import SegmentFetcher
|
||||
from segmentation import Segmentation
|
||||
from common import BadCiphertextHashError
|
||||
|
||||
class Cancel:
|
||||
def __init__(self, f):
|
||||
self._f = f
|
||||
self.cancelled = False
|
||||
def cancel(self):
|
||||
if not self.cancelled:
|
||||
self.cancelled = True
|
||||
self._f(self)
|
||||
|
||||
class DownloadNode:
|
||||
"""Internal class which manages downloads and holds state. External
|
||||
callers use CiphertextFileNode instead."""
|
||||
|
||||
# Share._node points to me
|
||||
def __init__(self, verifycap, storage_broker, secret_holder,
|
||||
terminator, history, download_status):
|
||||
assert isinstance(verifycap, uri.CHKFileVerifierURI)
|
||||
self._verifycap = verifycap
|
||||
self._storage_broker = storage_broker
|
||||
self._si_prefix = base32.b2a_l(verifycap.storage_index[:8], 60)
|
||||
self.running = True
|
||||
if terminator:
|
||||
terminator.register(self) # calls self.stop() at stopService()
|
||||
# the rules are:
|
||||
# 1: Only send network requests if you're active (self.running is True)
|
||||
# 2: Use TimerService, not reactor.callLater
|
||||
# 3: You can do eventual-sends any time.
|
||||
# These rules should mean that once
|
||||
# stopService()+flushEventualQueue() fires, everything will be done.
|
||||
self._secret_holder = secret_holder
|
||||
self._history = history
|
||||
self._download_status = download_status
|
||||
|
||||
k, N = self._verifycap.needed_shares, self._verifycap.total_shares
|
||||
self.share_hash_tree = IncompleteHashTree(N)
|
||||
|
||||
# we guess the segment size, so Segmentation can pull non-initial
|
||||
# segments in a single roundtrip. This populates
|
||||
# .guessed_segment_size, .guessed_num_segments, and
|
||||
# .ciphertext_hash_tree (with a dummy, to let us guess which hashes
|
||||
# we'll need)
|
||||
self._build_guessed_tables(DEFAULT_MAX_SEGMENT_SIZE)
|
||||
|
||||
# filled in when we parse a valid UEB
|
||||
self.have_UEB = False
|
||||
self.segment_size = None
|
||||
self.tail_segment_size = None
|
||||
self.tail_segment_padded = None
|
||||
self.num_segments = None
|
||||
self.block_size = None
|
||||
self.tail_block_size = None
|
||||
|
||||
# things to track callers that want data
|
||||
|
||||
# _segment_requests can have duplicates
|
||||
self._segment_requests = [] # (segnum, d, cancel_handle)
|
||||
self._active_segment = None # a SegmentFetcher, with .segnum
|
||||
|
||||
self._segsize_observers = observer.OneShotObserverList()
|
||||
|
||||
# we create one top-level logparent for this _Node, and another one
|
||||
# for each read() call. Segmentation and get_segment() messages are
|
||||
# associated with the read() call, everything else is tied to the
|
||||
# _Node's log entry.
|
||||
lp = log.msg(format="Immutable _Node(%(si)s) created: size=%(size)d,"
|
||||
" guessed_segsize=%(guessed_segsize)d,"
|
||||
" guessed_numsegs=%(guessed_numsegs)d",
|
||||
si=self._si_prefix, size=verifycap.size,
|
||||
guessed_segsize=self.guessed_segment_size,
|
||||
guessed_numsegs=self.guessed_num_segments,
|
||||
level=log.OPERATIONAL, umid="uJ0zAQ")
|
||||
self._lp = lp
|
||||
|
||||
self._sharefinder = ShareFinder(storage_broker, verifycap, self,
|
||||
self._download_status, lp)
|
||||
self._shares = set()
|
||||
|
||||
def _build_guessed_tables(self, max_segment_size):
|
||||
size = min(self._verifycap.size, max_segment_size)
|
||||
s = mathutil.next_multiple(size, self._verifycap.needed_shares)
|
||||
self.guessed_segment_size = s
|
||||
r = self._calculate_sizes(self.guessed_segment_size)
|
||||
self.guessed_num_segments = r["num_segments"]
|
||||
# as with CommonShare, our ciphertext_hash_tree is a stub until we
|
||||
# get the real num_segments
|
||||
self.ciphertext_hash_tree = IncompleteHashTree(self.guessed_num_segments)
|
||||
|
||||
def __repr__(self):
|
||||
return "Imm_Node(%s)" % (self._si_prefix,)
|
||||
|
||||
def stop(self):
|
||||
# called by the Terminator at shutdown, mostly for tests
|
||||
if self._active_segment:
|
||||
self._active_segment.stop()
|
||||
self._active_segment = None
|
||||
self._sharefinder.stop()
|
||||
|
||||
# things called by outside callers, via CiphertextFileNode. get_segment()
|
||||
# may also be called by Segmentation.
|
||||
|
||||
def read(self, consumer, offset=0, size=None, read_ev=None):
|
||||
"""I am the main entry point, from which FileNode.read() can get
|
||||
data. I feed the consumer with the desired range of ciphertext. I
|
||||
return a Deferred that fires (with the consumer) when the read is
|
||||
finished.
|
||||
|
||||
Note that there is no notion of a 'file pointer': each call to read()
|
||||
uses an independent offset= value."""
|
||||
# for concurrent operations: each gets its own Segmentation manager
|
||||
if size is None:
|
||||
size = self._verifycap.size
|
||||
# clip size so offset+size does not go past EOF
|
||||
size = min(size, self._verifycap.size-offset)
|
||||
if read_ev is None:
|
||||
read_ev = self._download_status.add_read_event(offset, size, now())
|
||||
|
||||
lp = log.msg(format="imm Node(%(si)s).read(%(offset)d, %(size)d)",
|
||||
si=base32.b2a(self._verifycap.storage_index)[:8],
|
||||
offset=offset, size=size,
|
||||
level=log.OPERATIONAL, parent=self._lp, umid="l3j3Ww")
|
||||
if self._history:
|
||||
sp = self._history.stats_provider
|
||||
sp.count("downloader.files_downloaded", 1) # really read() calls
|
||||
sp.count("downloader.bytes_downloaded", size)
|
||||
s = Segmentation(self, offset, size, consumer, read_ev, lp)
|
||||
# this raises an interesting question: what segments to fetch? if
|
||||
# offset=0, always fetch the first segment, and then allow
|
||||
# Segmentation to be responsible for pulling the subsequent ones if
|
||||
# the first wasn't large enough. If offset>0, we're going to need an
|
||||
# extra roundtrip to get the UEB (and therefore the segment size)
|
||||
# before we can figure out which segment to get. TODO: allow the
|
||||
# offset-table-guessing code (which starts by guessing the segsize)
|
||||
# to assist the offset>0 process.
|
||||
d = s.start()
|
||||
def _done(res):
|
||||
read_ev.finished(now())
|
||||
return res
|
||||
d.addBoth(_done)
|
||||
return d
|
||||
|
||||
def get_segment(self, segnum, logparent=None):
|
||||
"""Begin downloading a segment. I return a tuple (d, c): 'd' is a
|
||||
Deferred that fires with (offset,data) when the desired segment is
|
||||
available, and c is an object on which c.cancel() can be called to
|
||||
disavow interest in the segment (after which 'd' will never fire).
|
||||
|
||||
You probably need to know the segment size before calling this,
|
||||
unless you want the first few bytes of the file. If you ask for a
|
||||
segment number which turns out to be too large, the Deferred will
|
||||
errback with BadSegmentNumberError.
|
||||
|
||||
The Deferred fires with the offset of the first byte of the data
|
||||
segment, so that you can call get_segment() before knowing the
|
||||
segment size, and still know which data you received.
|
||||
|
||||
The Deferred can also errback with other fatal problems, such as
|
||||
NotEnoughSharesError, NoSharesError, or BadCiphertextHashError.
|
||||
"""
|
||||
log.msg(format="imm Node(%(si)s).get_segment(%(segnum)d)",
|
||||
si=base32.b2a(self._verifycap.storage_index)[:8],
|
||||
segnum=segnum,
|
||||
level=log.OPERATIONAL, parent=logparent, umid="UKFjDQ")
|
||||
self._download_status.add_segment_request(segnum, now())
|
||||
d = defer.Deferred()
|
||||
c = Cancel(self._cancel_request)
|
||||
self._segment_requests.append( (segnum, d, c) )
|
||||
self._start_new_segment()
|
||||
return (d, c)
|
||||
|
||||
def get_segsize(self):
|
||||
"""Return a Deferred that fires when we know the real segment size."""
|
||||
if self.segment_size:
|
||||
return defer.succeed(self.segment_size)
|
||||
# TODO: this downloads (and discards) the first segment of the file.
|
||||
# We could make this more efficient by writing
|
||||
# fetcher.SegmentSizeFetcher, with the job of finding a single valid
|
||||
# share and extracting the UEB. We'd add Share.get_UEB() to request
|
||||
# just the UEB.
|
||||
(d,c) = self.get_segment(0)
|
||||
# this ensures that an error during get_segment() will errback the
|
||||
# caller, so Repair won't wait forever on completely missing files
|
||||
d.addCallback(lambda ign: self._segsize_observers.when_fired())
|
||||
return d
|
||||
|
||||
# things called by the Segmentation object used to transform
|
||||
# arbitrary-sized read() calls into quantized segment fetches
|
||||
|
||||
def _start_new_segment(self):
|
||||
if self._active_segment is None and self._segment_requests:
|
||||
segnum = self._segment_requests[0][0]
|
||||
k = self._verifycap.needed_shares
|
||||
log.msg(format="%(node)s._start_new_segment: segnum=%(segnum)d",
|
||||
node=repr(self), segnum=segnum,
|
||||
level=log.NOISY, umid="wAlnHQ")
|
||||
self._active_segment = fetcher = SegmentFetcher(self, segnum, k)
|
||||
active_shares = [s for s in self._shares if s.is_alive()]
|
||||
fetcher.add_shares(active_shares) # this triggers the loop
|
||||
|
||||
|
||||
# called by our child ShareFinder
|
||||
def got_shares(self, shares):
|
||||
self._shares.update(shares)
|
||||
if self._active_segment:
|
||||
self._active_segment.add_shares(shares)
|
||||
def no_more_shares(self):
|
||||
self._no_more_shares = True
|
||||
if self._active_segment:
|
||||
self._active_segment.no_more_shares()
|
||||
|
||||
# things called by our Share instances
|
||||
|
||||
def validate_and_store_UEB(self, UEB_s):
|
||||
log.msg("validate_and_store_UEB",
|
||||
level=log.OPERATIONAL, parent=self._lp, umid="7sTrPw")
|
||||
h = hashutil.uri_extension_hash(UEB_s)
|
||||
if h != self._verifycap.uri_extension_hash:
|
||||
raise BadHashError
|
||||
UEB_dict = uri.unpack_extension(UEB_s)
|
||||
self._parse_and_store_UEB(UEB_dict) # sets self._stuff
|
||||
# TODO: a malformed (but authentic) UEB could throw an assertion in
|
||||
# _parse_and_store_UEB, and we should abandon the download.
|
||||
self.have_UEB = True
|
||||
|
||||
def _parse_and_store_UEB(self, d):
|
||||
# Note: the UEB contains needed_shares and total_shares. These are
|
||||
# redundant and inferior (the filecap contains the authoritative
|
||||
# values). However, because it is possible to encode the same file in
|
||||
# multiple ways, and the encoders might choose (poorly) to use the
|
||||
# same key for both (therefore getting the same SI), we might
|
||||
# encounter shares for both types. The UEB hashes will be different,
|
||||
# however, and we'll disregard the "other" encoding's shares as
|
||||
# corrupted.
|
||||
|
||||
# therefore, we ignore d['total_shares'] and d['needed_shares'].
|
||||
|
||||
log.msg(format="UEB=%(ueb)s, vcap=%(vcap)s",
|
||||
ueb=repr(d), vcap=self._verifycap.to_string(),
|
||||
level=log.NOISY, parent=self._lp, umid="cVqZnA")
|
||||
|
||||
k, N = self._verifycap.needed_shares, self._verifycap.total_shares
|
||||
|
||||
self.segment_size = d['segment_size']
|
||||
self._segsize_observers.fire(self.segment_size)
|
||||
|
||||
r = self._calculate_sizes(self.segment_size)
|
||||
self.tail_segment_size = r["tail_segment_size"]
|
||||
self.tail_segment_padded = r["tail_segment_padded"]
|
||||
self.num_segments = r["num_segments"]
|
||||
self.block_size = r["block_size"]
|
||||
self.tail_block_size = r["tail_block_size"]
|
||||
log.msg("actual sizes: %s" % (r,),
|
||||
level=log.NOISY, parent=self._lp, umid="PY6P5Q")
|
||||
if (self.segment_size == self.guessed_segment_size
|
||||
and self.num_segments == self.guessed_num_segments):
|
||||
log.msg("my guess was right!",
|
||||
level=log.NOISY, parent=self._lp, umid="x340Ow")
|
||||
else:
|
||||
log.msg("my guess was wrong! Extra round trips for me.",
|
||||
level=log.NOISY, parent=self._lp, umid="tb7RJw")
|
||||
|
||||
# zfec.Decode() instantiation is fast, but still, let's use the same
|
||||
# codec instance for all but the last segment. 3-of-10 takes 15us on
|
||||
# my laptop, 25-of-100 is 900us, 3-of-255 is 97us, 25-of-255 is
|
||||
# 2.5ms, worst-case 254-of-255 is 9.3ms
|
||||
self._codec = CRSDecoder()
|
||||
self._codec.set_params(self.segment_size, k, N)
|
||||
|
||||
|
||||
# Ciphertext hash tree root is mandatory, so that there is at most
|
||||
# one ciphertext that matches this read-cap or verify-cap. The
|
||||
# integrity check on the shares is not sufficient to prevent the
|
||||
# original encoder from creating some shares of file A and other
|
||||
# shares of file B. self.ciphertext_hash_tree was a guess before:
|
||||
# this is where we create it for real.
|
||||
self.ciphertext_hash_tree = IncompleteHashTree(self.num_segments)
|
||||
self.ciphertext_hash_tree.set_hashes({0: d['crypttext_root_hash']})
|
||||
|
||||
self.share_hash_tree.set_hashes({0: d['share_root_hash']})
|
||||
|
||||
# Our job is a fast download, not verification, so we ignore any
|
||||
# redundant fields. The Verifier uses a different code path which
|
||||
# does not ignore them.
|
||||
|
||||
def _calculate_sizes(self, segment_size):
|
||||
# segments of ciphertext
|
||||
size = self._verifycap.size
|
||||
k = self._verifycap.needed_shares
|
||||
|
||||
# this assert matches the one in encode.py:127 inside
|
||||
# Encoded._got_all_encoding_parameters, where the UEB is constructed
|
||||
assert segment_size % k == 0
|
||||
|
||||
# the last segment is usually short. We don't store a whole segsize,
|
||||
# but we do pad the segment up to a multiple of k, because the
|
||||
# encoder requires that.
|
||||
tail_segment_size = size % segment_size
|
||||
if tail_segment_size == 0:
|
||||
tail_segment_size = segment_size
|
||||
padded = mathutil.next_multiple(tail_segment_size, k)
|
||||
tail_segment_padded = padded
|
||||
|
||||
num_segments = mathutil.div_ceil(size, segment_size)
|
||||
|
||||
# each segment is turned into N blocks. All but the last are of size
|
||||
# block_size, and the last is of size tail_block_size
|
||||
block_size = segment_size / k
|
||||
tail_block_size = tail_segment_padded / k
|
||||
|
||||
return { "tail_segment_size": tail_segment_size,
|
||||
"tail_segment_padded": tail_segment_padded,
|
||||
"num_segments": num_segments,
|
||||
"block_size": block_size,
|
||||
"tail_block_size": tail_block_size,
|
||||
}
|
||||
|
||||
|
||||
def process_share_hashes(self, share_hashes):
|
||||
for hashnum in share_hashes:
|
||||
if hashnum >= len(self.share_hash_tree):
|
||||
# "BadHashError" is normally for e.g. a corrupt block. We
|
||||
# sort of abuse it here to mean a badly numbered hash (which
|
||||
# indicates corruption in the number bytes, rather than in
|
||||
# the data bytes).
|
||||
raise BadHashError("hashnum %d doesn't fit in hashtree(%d)"
|
||||
% (hashnum, len(self.share_hash_tree)))
|
||||
self.share_hash_tree.set_hashes(share_hashes)
|
||||
|
||||
def get_needed_ciphertext_hashes(self, segnum):
|
||||
cht = self.ciphertext_hash_tree
|
||||
return cht.needed_hashes(segnum, include_leaf=True)
|
||||
def process_ciphertext_hashes(self, hashes):
|
||||
assert self.num_segments is not None
|
||||
# this may raise BadHashError or NotEnoughHashesError
|
||||
self.ciphertext_hash_tree.set_hashes(hashes)
|
||||
|
||||
|
||||
# called by our child SegmentFetcher
|
||||
|
||||
def want_more_shares(self):
|
||||
self._sharefinder.hungry()
|
||||
|
||||
def fetch_failed(self, sf, f):
|
||||
assert sf is self._active_segment
|
||||
self._active_segment = None
|
||||
# deliver error upwards
|
||||
for (d,c) in self._extract_requests(sf.segnum):
|
||||
eventually(self._deliver, d, c, f)
|
||||
|
||||
def process_blocks(self, segnum, blocks):
|
||||
d = defer.maybeDeferred(self._decode_blocks, segnum, blocks)
|
||||
d.addCallback(self._check_ciphertext_hash, segnum)
|
||||
def _deliver(result):
|
||||
ds = self._download_status
|
||||
if isinstance(result, Failure):
|
||||
ds.add_segment_error(segnum, now())
|
||||
else:
|
||||
(offset, segment, decodetime) = result
|
||||
ds.add_segment_delivery(segnum, now(),
|
||||
offset, len(segment), decodetime)
|
||||
log.msg(format="delivering segment(%(segnum)d)",
|
||||
segnum=segnum,
|
||||
level=log.OPERATIONAL, parent=self._lp,
|
||||
umid="j60Ojg")
|
||||
for (d,c) in self._extract_requests(segnum):
|
||||
eventually(self._deliver, d, c, result)
|
||||
self._active_segment = None
|
||||
self._start_new_segment()
|
||||
d.addBoth(_deliver)
|
||||
d.addErrback(lambda f:
|
||||
log.err("unhandled error during process_blocks",
|
||||
failure=f, level=log.WEIRD,
|
||||
parent=self._lp, umid="MkEsCg"))
|
||||
|
||||
def _decode_blocks(self, segnum, blocks):
|
||||
tail = (segnum == self.num_segments-1)
|
||||
codec = self._codec
|
||||
block_size = self.block_size
|
||||
decoded_size = self.segment_size
|
||||
if tail:
|
||||
# account for the padding in the last segment
|
||||
codec = CRSDecoder()
|
||||
k, N = self._verifycap.needed_shares, self._verifycap.total_shares
|
||||
codec.set_params(self.tail_segment_padded, k, N)
|
||||
block_size = self.tail_block_size
|
||||
decoded_size = self.tail_segment_padded
|
||||
|
||||
shares = []
|
||||
shareids = []
|
||||
for (shareid, share) in blocks.iteritems():
|
||||
assert len(share) == block_size
|
||||
shareids.append(shareid)
|
||||
shares.append(share)
|
||||
del blocks
|
||||
|
||||
start = now()
|
||||
d = codec.decode(shares, shareids) # segment
|
||||
del shares
|
||||
def _process(buffers):
|
||||
decodetime = now() - start
|
||||
segment = "".join(buffers)
|
||||
assert len(segment) == decoded_size
|
||||
del buffers
|
||||
if tail:
|
||||
segment = segment[:self.tail_segment_size]
|
||||
return (segment, decodetime)
|
||||
d.addCallback(_process)
|
||||
return d
|
||||
|
||||
def _check_ciphertext_hash(self, (segment, decodetime), segnum):
|
||||
assert self._active_segment.segnum == segnum
|
||||
assert self.segment_size is not None
|
||||
offset = segnum * self.segment_size
|
||||
|
||||
h = hashutil.crypttext_segment_hash(segment)
|
||||
try:
|
||||
self.ciphertext_hash_tree.set_hashes(leaves={segnum: h})
|
||||
return (offset, segment, decodetime)
|
||||
except (BadHashError, NotEnoughHashesError):
|
||||
format = ("hash failure in ciphertext_hash_tree:"
|
||||
" segnum=%(segnum)d, SI=%(si)s")
|
||||
log.msg(format=format, segnum=segnum, si=self._si_prefix,
|
||||
failure=Failure(),
|
||||
level=log.WEIRD, parent=self._lp, umid="MTwNnw")
|
||||
# this is especially weird, because we made it past the share
|
||||
# hash tree. It implies that we're using the wrong encoding, or
|
||||
# that the uploader deliberately constructed a bad UEB.
|
||||
msg = format % {"segnum": segnum, "si": self._si_prefix}
|
||||
raise BadCiphertextHashError(msg)
|
||||
|
||||
def _deliver(self, d, c, result):
|
||||
# this method exists to handle cancel() that occurs between
|
||||
# _got_segment and _deliver
|
||||
if not c.cancelled:
|
||||
d.callback(result) # might actually be an errback
|
||||
|
||||
def _extract_requests(self, segnum):
|
||||
"""Remove matching requests and return their (d,c) tuples so that the
|
||||
caller can retire them."""
|
||||
retire = [(d,c) for (segnum0, d, c) in self._segment_requests
|
||||
if segnum0 == segnum]
|
||||
self._segment_requests = [t for t in self._segment_requests
|
||||
if t[0] != segnum]
|
||||
return retire
|
||||
|
||||
def _cancel_request(self, c):
|
||||
self._segment_requests = [t for t in self._segment_requests
|
||||
if t[2] != c]
|
||||
segnums = [segnum for (segnum,d,c) in self._segment_requests]
|
||||
if self._active_segment.segnum not in segnums:
|
||||
self._active_segment.stop()
|
||||
self._active_segment = None
|
||||
self._start_new_segment()
|
160
src/allmydata/immutable/downloader/segmentation.py
Normal file
160
src/allmydata/immutable/downloader/segmentation.py
Normal file
@ -0,0 +1,160 @@
|
||||
|
||||
import time
|
||||
now = time.time
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from twisted.internet.interfaces import IPushProducer
|
||||
from foolscap.api import eventually
|
||||
from allmydata.util import log
|
||||
from allmydata.util.spans import overlap
|
||||
|
||||
from common import BadSegmentNumberError, WrongSegmentError, DownloadStopped
|
||||
|
||||
class Segmentation:
|
||||
"""I am responsible for a single offset+size read of the file. I handle
|
||||
segmentation: I figure out which segments are necessary, request them
|
||||
(from my CiphertextDownloader) in order, and trim the segments down to
|
||||
match the offset+size span. I use the Producer/Consumer interface to only
|
||||
request one segment at a time.
|
||||
"""
|
||||
implements(IPushProducer)
|
||||
def __init__(self, node, offset, size, consumer, read_ev, logparent=None):
|
||||
self._node = node
|
||||
self._hungry = True
|
||||
self._active_segnum = None
|
||||
self._cancel_segment_request = None
|
||||
# these are updated as we deliver data. At any given time, we still
|
||||
# want to download file[offset:offset+size]
|
||||
self._offset = offset
|
||||
self._size = size
|
||||
assert offset+size <= node._verifycap.size
|
||||
self._consumer = consumer
|
||||
self._read_ev = read_ev
|
||||
self._start_pause = None
|
||||
self._lp = logparent
|
||||
|
||||
def start(self):
|
||||
self._alive = True
|
||||
self._deferred = defer.Deferred()
|
||||
self._consumer.registerProducer(self, True)
|
||||
self._maybe_fetch_next()
|
||||
return self._deferred
|
||||
|
||||
def _maybe_fetch_next(self):
|
||||
if not self._alive or not self._hungry:
|
||||
return
|
||||
if self._active_segnum is not None:
|
||||
return
|
||||
self._fetch_next()
|
||||
|
||||
def _fetch_next(self):
|
||||
if self._size == 0:
|
||||
# done!
|
||||
self._alive = False
|
||||
self._hungry = False
|
||||
self._consumer.unregisterProducer()
|
||||
self._deferred.callback(self._consumer)
|
||||
return
|
||||
n = self._node
|
||||
have_actual_segment_size = n.segment_size is not None
|
||||
guess_s = ""
|
||||
if not have_actual_segment_size:
|
||||
guess_s = "probably "
|
||||
segment_size = n.segment_size or n.guessed_segment_size
|
||||
if self._offset == 0:
|
||||
# great! we want segment0 for sure
|
||||
wanted_segnum = 0
|
||||
else:
|
||||
# this might be a guess
|
||||
wanted_segnum = self._offset // segment_size
|
||||
log.msg(format="_fetch_next(offset=%(offset)d) %(guess)swants segnum=%(segnum)d",
|
||||
offset=self._offset, guess=guess_s, segnum=wanted_segnum,
|
||||
level=log.NOISY, parent=self._lp, umid="5WfN0w")
|
||||
self._active_segnum = wanted_segnum
|
||||
d,c = n.get_segment(wanted_segnum, self._lp)
|
||||
self._cancel_segment_request = c
|
||||
d.addBoth(self._request_retired)
|
||||
d.addCallback(self._got_segment, wanted_segnum)
|
||||
if not have_actual_segment_size:
|
||||
# we can retry once
|
||||
d.addErrback(self._retry_bad_segment)
|
||||
d.addErrback(self._error)
|
||||
|
||||
def _request_retired(self, res):
|
||||
self._active_segnum = None
|
||||
self._cancel_segment_request = None
|
||||
return res
|
||||
|
||||
def _got_segment(self, (segment_start,segment,decodetime), wanted_segnum):
|
||||
self._cancel_segment_request = None
|
||||
# we got file[segment_start:segment_start+len(segment)]
|
||||
# we want file[self._offset:self._offset+self._size]
|
||||
log.msg(format="Segmentation got data:"
|
||||
" want [%(wantstart)d-%(wantend)d),"
|
||||
" given [%(segstart)d-%(segend)d), for segnum=%(segnum)d",
|
||||
wantstart=self._offset, wantend=self._offset+self._size,
|
||||
segstart=segment_start, segend=segment_start+len(segment),
|
||||
segnum=wanted_segnum,
|
||||
level=log.OPERATIONAL, parent=self._lp, umid="32dHcg")
|
||||
|
||||
o = overlap(segment_start, len(segment), self._offset, self._size)
|
||||
# the overlap is file[o[0]:o[0]+o[1]]
|
||||
if not o or o[0] != self._offset:
|
||||
# we didn't get the first byte, so we can't use this segment
|
||||
log.msg("Segmentation handed wrong data:"
|
||||
" want [%d-%d), given [%d-%d), for segnum=%d,"
|
||||
" for si=%s"
|
||||
% (self._offset, self._offset+self._size,
|
||||
segment_start, segment_start+len(segment),
|
||||
wanted_segnum, self._node._si_prefix),
|
||||
level=log.UNUSUAL, parent=self._lp, umid="STlIiA")
|
||||
# we may retry if the segnum we asked was based on a guess
|
||||
raise WrongSegmentError("I was given the wrong data.")
|
||||
offset_in_segment = self._offset - segment_start
|
||||
desired_data = segment[offset_in_segment:offset_in_segment+o[1]]
|
||||
|
||||
self._offset += len(desired_data)
|
||||
self._size -= len(desired_data)
|
||||
self._consumer.write(desired_data)
|
||||
# the consumer might call our .pauseProducing() inside that write()
|
||||
# call, setting self._hungry=False
|
||||
self._read_ev.update(len(desired_data), 0, 0)
|
||||
self._maybe_fetch_next()
|
||||
|
||||
def _retry_bad_segment(self, f):
|
||||
f.trap(WrongSegmentError, BadSegmentNumberError)
|
||||
# we guessed the segnum wrong: either one that doesn't overlap with
|
||||
# the start of our desired region, or one that's beyond the end of
|
||||
# the world. Now that we have the right information, we're allowed to
|
||||
# retry once.
|
||||
assert self._node.segment_size is not None
|
||||
return self._maybe_fetch_next()
|
||||
|
||||
def _error(self, f):
|
||||
log.msg("Error in Segmentation", failure=f,
|
||||
level=log.WEIRD, parent=self._lp, umid="EYlXBg")
|
||||
self._alive = False
|
||||
self._hungry = False
|
||||
self._consumer.unregisterProducer()
|
||||
self._deferred.errback(f)
|
||||
|
||||
def stopProducing(self):
|
||||
self._hungry = False
|
||||
self._alive = False
|
||||
# cancel any outstanding segment request
|
||||
if self._cancel_segment_request:
|
||||
self._cancel_segment_request.cancel()
|
||||
self._cancel_segment_request = None
|
||||
e = DownloadStopped("our Consumer called stopProducing()")
|
||||
self._deferred.errback(e)
|
||||
|
||||
def pauseProducing(self):
|
||||
self._hungry = False
|
||||
self._start_pause = now()
|
||||
def resumeProducing(self):
|
||||
self._hungry = True
|
||||
eventually(self._maybe_fetch_next)
|
||||
if self._start_pause is not None:
|
||||
paused = now() - self._start_pause
|
||||
self._read_ev.update(0, 0, paused)
|
||||
self._start_pause = None
|
848
src/allmydata/immutable/downloader/share.py
Normal file
848
src/allmydata/immutable/downloader/share.py
Normal file
@ -0,0 +1,848 @@
|
||||
|
||||
import struct
|
||||
import time
|
||||
now = time.time
|
||||
|
||||
from twisted.python.failure import Failure
|
||||
from foolscap.api import eventually
|
||||
from allmydata.util import base32, log, hashutil, mathutil
|
||||
from allmydata.util.spans import Spans, DataSpans
|
||||
from allmydata.interfaces import HASH_SIZE
|
||||
from allmydata.hashtree import IncompleteHashTree, BadHashError, \
|
||||
NotEnoughHashesError
|
||||
|
||||
from allmydata.immutable.layout import make_write_bucket_proxy
|
||||
from allmydata.util.observer import EventStreamObserver
|
||||
from common import COMPLETE, CORRUPT, DEAD, BADSEGNUM
|
||||
|
||||
|
||||
class LayoutInvalid(Exception):
|
||||
pass
|
||||
class DataUnavailable(Exception):
|
||||
pass
|
||||
|
||||
class Share:
|
||||
"""I represent a single instance of a single share (e.g. I reference the
|
||||
shnum2 for share SI=abcde on server xy12t, not the one on server ab45q).
|
||||
I am associated with a CommonShare that remembers data that is held in
|
||||
common among e.g. SI=abcde/shnum2 across all servers. I am also
|
||||
associated with a CiphertextFileNode for e.g. SI=abcde (all shares, all
|
||||
servers).
|
||||
"""
|
||||
# this is a specific implementation of IShare for tahoe's native storage
|
||||
# servers. A different backend would use a different class.
|
||||
|
||||
def __init__(self, rref, server_version, verifycap, commonshare, node,
|
||||
download_status, peerid, shnum, logparent):
|
||||
self._rref = rref
|
||||
self._server_version = server_version
|
||||
self._node = node # holds share_hash_tree and UEB
|
||||
self.actual_segment_size = node.segment_size # might still be None
|
||||
# XXX change node.guessed_segment_size to
|
||||
# node.best_guess_segment_size(), which should give us the real ones
|
||||
# if known, else its guess.
|
||||
self._guess_offsets(verifycap, node.guessed_segment_size)
|
||||
self.actual_offsets = None
|
||||
self._UEB_length = None
|
||||
self._commonshare = commonshare # holds block_hash_tree
|
||||
self._download_status = download_status
|
||||
self._peerid = peerid
|
||||
self._peerid_s = base32.b2a(peerid)[:5]
|
||||
self._storage_index = verifycap.storage_index
|
||||
self._si_prefix = base32.b2a(verifycap.storage_index)[:8]
|
||||
self._shnum = shnum
|
||||
# self._alive becomes False upon fatal corruption or server error
|
||||
self._alive = True
|
||||
self._lp = log.msg(format="%(share)s created", share=repr(self),
|
||||
level=log.NOISY, parent=logparent, umid="P7hv2w")
|
||||
|
||||
self._pending = Spans() # request sent but no response received yet
|
||||
self._received = DataSpans() # ACK response received, with data
|
||||
self._unavailable = Spans() # NAK response received, no data
|
||||
|
||||
# any given byte of the share can be in one of four states:
|
||||
# in: _wanted, _requested, _received
|
||||
# FALSE FALSE FALSE : don't care about it at all
|
||||
# TRUE FALSE FALSE : want it, haven't yet asked for it
|
||||
# TRUE TRUE FALSE : request is in-flight
|
||||
# or didn't get it
|
||||
# FALSE TRUE TRUE : got it, haven't used it yet
|
||||
# FALSE TRUE FALSE : got it and used it
|
||||
# FALSE FALSE FALSE : block consumed, ready to ask again
|
||||
#
|
||||
# when we request data and get a NAK, we leave it in _requested
|
||||
# to remind ourself to not ask for it again. We don't explicitly
|
||||
# remove it from anything (maybe this should change).
|
||||
#
|
||||
# We retain the hashtrees in the Node, so we leave those spans in
|
||||
# _requested (and never ask for them again, as long as the Node is
|
||||
# alive). But we don't retain data blocks (too big), so when we
|
||||
# consume a data block, we remove it from _requested, so a later
|
||||
# download can re-fetch it.
|
||||
|
||||
self._requested_blocks = [] # (segnum, set(observer2..))
|
||||
ver = server_version["http://allmydata.org/tahoe/protocols/storage/v1"]
|
||||
self._overrun_ok = ver["tolerates-immutable-read-overrun"]
|
||||
# If _overrun_ok and we guess the offsets correctly, we can get
|
||||
# everything in one RTT. If _overrun_ok and we guess wrong, we might
|
||||
# need two RTT (but we could get lucky and do it in one). If overrun
|
||||
# is *not* ok (tahoe-1.3.0 or earlier), we need four RTT: 1=version,
|
||||
# 2=offset table, 3=UEB_length and everything else (hashes, block),
|
||||
# 4=UEB.
|
||||
|
||||
self.had_corruption = False # for unit tests
|
||||
|
||||
def __repr__(self):
|
||||
return "Share(sh%d-on-%s)" % (self._shnum, self._peerid_s)
|
||||
|
||||
def is_alive(self):
|
||||
# XXX: reconsider. If the share sees a single error, should it remain
|
||||
# dead for all time? Or should the next segment try again? This DEAD
|
||||
# state is stored elsewhere too (SegmentFetcher per-share states?)
|
||||
# and needs to be consistent. We clear _alive in self._fail(), which
|
||||
# is called upon a network error, or layout failure, or hash failure
|
||||
# in the UEB or a hash tree. We do not _fail() for a hash failure in
|
||||
# a block, but of course we still tell our callers about
|
||||
# state=CORRUPT so they'll find a different share.
|
||||
return self._alive
|
||||
|
||||
def _guess_offsets(self, verifycap, guessed_segment_size):
|
||||
self.guessed_segment_size = guessed_segment_size
|
||||
size = verifycap.size
|
||||
k = verifycap.needed_shares
|
||||
N = verifycap.total_shares
|
||||
r = self._node._calculate_sizes(guessed_segment_size)
|
||||
# num_segments, block_size/tail_block_size
|
||||
# guessed_segment_size/tail_segment_size/tail_segment_padded
|
||||
share_size = mathutil.div_ceil(size, k)
|
||||
# share_size is the amount of block data that will be put into each
|
||||
# share, summed over all segments. It does not include hashes, the
|
||||
# UEB, or other overhead.
|
||||
|
||||
# use the upload-side code to get this as accurate as possible
|
||||
ht = IncompleteHashTree(N)
|
||||
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
|
||||
wbp = make_write_bucket_proxy(None, share_size, r["block_size"],
|
||||
r["num_segments"], num_share_hashes, 0,
|
||||
None)
|
||||
self._fieldsize = wbp.fieldsize
|
||||
self._fieldstruct = wbp.fieldstruct
|
||||
self.guessed_offsets = wbp._offsets
|
||||
|
||||
# called by our client, the SegmentFetcher
|
||||
def get_block(self, segnum):
|
||||
"""Add a block number to the list of requests. This will eventually
|
||||
result in a fetch of the data necessary to validate the block, then
|
||||
the block itself. The fetch order is generally
|
||||
first-come-first-served, but requests may be answered out-of-order if
|
||||
data becomes available sooner.
|
||||
|
||||
I return an EventStreamObserver, which has two uses. The first is to
|
||||
call o.subscribe(), which gives me a place to send state changes and
|
||||
eventually the data block. The second is o.cancel(), which removes
|
||||
the request (if it is still active).
|
||||
|
||||
I will distribute the following events through my EventStreamObserver:
|
||||
- state=OVERDUE: ?? I believe I should have had an answer by now.
|
||||
You may want to ask another share instead.
|
||||
- state=BADSEGNUM: the segnum you asked for is too large. I must
|
||||
fetch a valid UEB before I can determine this,
|
||||
so the notification is asynchronous
|
||||
- state=COMPLETE, block=data: here is a valid block
|
||||
- state=CORRUPT: this share contains corrupted data
|
||||
- state=DEAD, f=Failure: the server reported an error, this share
|
||||
is unusable
|
||||
"""
|
||||
log.msg("%s.get_block(%d)" % (repr(self), segnum),
|
||||
level=log.NOISY, parent=self._lp, umid="RTo9MQ")
|
||||
assert segnum >= 0
|
||||
o = EventStreamObserver()
|
||||
o.set_canceler(self, "_cancel_block_request")
|
||||
for i,(segnum0,observers) in enumerate(self._requested_blocks):
|
||||
if segnum0 == segnum:
|
||||
observers.add(o)
|
||||
break
|
||||
else:
|
||||
self._requested_blocks.append( (segnum, set([o])) )
|
||||
eventually(self.loop)
|
||||
return o
|
||||
|
||||
def _cancel_block_request(self, o):
|
||||
new_requests = []
|
||||
for e in self._requested_blocks:
|
||||
(segnum0, observers) = e
|
||||
observers.discard(o)
|
||||
if observers:
|
||||
new_requests.append(e)
|
||||
self._requested_blocks = new_requests
|
||||
|
||||
# internal methods
|
||||
def _active_segnum_and_observers(self):
|
||||
if self._requested_blocks:
|
||||
# we only retrieve information for one segment at a time, to
|
||||
# minimize alacrity (first come, first served)
|
||||
return self._requested_blocks[0]
|
||||
return None, []
|
||||
|
||||
def loop(self):
|
||||
try:
|
||||
# if any exceptions occur here, kill the download
|
||||
log.msg("%s.loop, reqs=[%s], pending=%s, received=%s,"
|
||||
" unavailable=%s" %
|
||||
(repr(self),
|
||||
",".join([str(req[0]) for req in self._requested_blocks]),
|
||||
self._pending.dump(), self._received.dump(),
|
||||
self._unavailable.dump() ),
|
||||
level=log.NOISY, parent=self._lp, umid="BaL1zw")
|
||||
self._do_loop()
|
||||
# all exception cases call self._fail(), which clears self._alive
|
||||
except (BadHashError, NotEnoughHashesError, LayoutInvalid), e:
|
||||
# Abandon this share. We do this if we see corruption in the
|
||||
# offset table, the UEB, or a hash tree. We don't abandon the
|
||||
# whole share if we see corruption in a data block (we abandon
|
||||
# just the one block, and still try to get data from other blocks
|
||||
# on the same server). In theory, we could get good data from a
|
||||
# share with a corrupt UEB (by first getting the UEB from some
|
||||
# other share), or corrupt hash trees, but the logic to decide
|
||||
# when this is safe is non-trivial. So for now, give up at the
|
||||
# first sign of corruption.
|
||||
#
|
||||
# _satisfy_*() code which detects corruption should first call
|
||||
# self._signal_corruption(), and then raise the exception.
|
||||
log.msg(format="corruption detected in %(share)s",
|
||||
share=repr(self),
|
||||
level=log.UNUSUAL, parent=self._lp, umid="gWspVw")
|
||||
self._fail(Failure(e), log.UNUSUAL)
|
||||
except DataUnavailable, e:
|
||||
# Abandon this share.
|
||||
log.msg(format="need data that will never be available"
|
||||
" from %s: pending=%s, received=%s, unavailable=%s" %
|
||||
(repr(self),
|
||||
self._pending.dump(), self._received.dump(),
|
||||
self._unavailable.dump() ),
|
||||
level=log.UNUSUAL, parent=self._lp, umid="F7yJnQ")
|
||||
self._fail(Failure(e), log.UNUSUAL)
|
||||
except BaseException:
|
||||
self._fail(Failure())
|
||||
raise
|
||||
log.msg("%s.loop done, reqs=[%s], pending=%s, received=%s,"
|
||||
" unavailable=%s" %
|
||||
(repr(self),
|
||||
",".join([str(req[0]) for req in self._requested_blocks]),
|
||||
self._pending.dump(), self._received.dump(),
|
||||
self._unavailable.dump() ),
|
||||
level=log.NOISY, parent=self._lp, umid="9lRaRA")
|
||||
|
||||
def _do_loop(self):
|
||||
# we are (eventually) called after all state transitions:
|
||||
# new segments added to self._requested_blocks
|
||||
# new data received from servers (responses to our read() calls)
|
||||
# impatience timer fires (server appears slow)
|
||||
if not self._alive:
|
||||
return
|
||||
|
||||
# First, consume all of the information that we currently have, for
|
||||
# all the segments people currently want.
|
||||
while self._get_satisfaction():
|
||||
pass
|
||||
|
||||
# When we get no satisfaction (from the data we've received so far),
|
||||
# we determine what data we desire (to satisfy more requests). The
|
||||
# number of segments is finite, so I can't get no satisfaction
|
||||
# forever.
|
||||
wanted, needed = self._desire()
|
||||
|
||||
# Finally, send out requests for whatever we need (desire minus
|
||||
# have). You can't always get what you want, but if you try
|
||||
# sometimes, you just might find, you get what you need.
|
||||
self._send_requests(wanted + needed)
|
||||
|
||||
# and sometimes you can't even get what you need
|
||||
disappointment = needed & self._unavailable
|
||||
if len(disappointment):
|
||||
self.had_corruption = True
|
||||
raise DataUnavailable("need %s but will never get it" %
|
||||
disappointment.dump())
|
||||
|
||||
def _get_satisfaction(self):
|
||||
# return True if we retired a data block, and should therefore be
|
||||
# called again. Return False if we don't retire a data block (even if
|
||||
# we do retire some other data, like hash chains).
|
||||
|
||||
if self.actual_offsets is None:
|
||||
if not self._satisfy_offsets():
|
||||
# can't even look at anything without the offset table
|
||||
return False
|
||||
|
||||
if not self._node.have_UEB:
|
||||
if not self._satisfy_UEB():
|
||||
# can't check any hashes without the UEB
|
||||
return False
|
||||
self.actual_segment_size = self._node.segment_size # might be updated
|
||||
assert self.actual_segment_size is not None
|
||||
|
||||
# knowing the UEB means knowing num_segments. Despite the redundancy,
|
||||
# this is the best place to set this. CommonShare.set_numsegs will
|
||||
# ignore duplicate calls.
|
||||
assert self._node.num_segments is not None
|
||||
cs = self._commonshare
|
||||
cs.set_numsegs(self._node.num_segments)
|
||||
|
||||
segnum, observers = self._active_segnum_and_observers()
|
||||
# if segnum is None, we don't really need to do anything (we have no
|
||||
# outstanding readers right now), but we'll fill in the bits that
|
||||
# aren't tied to any particular segment.
|
||||
|
||||
if segnum is not None and segnum >= self._node.num_segments:
|
||||
for o in observers:
|
||||
o.notify(state=BADSEGNUM)
|
||||
self._requested_blocks.pop(0)
|
||||
return True
|
||||
|
||||
if self._node.share_hash_tree.needed_hashes(self._shnum):
|
||||
if not self._satisfy_share_hash_tree():
|
||||
# can't check block_hash_tree without a root
|
||||
return False
|
||||
|
||||
if cs.need_block_hash_root():
|
||||
block_hash_root = self._node.share_hash_tree.get_leaf(self._shnum)
|
||||
cs.set_block_hash_root(block_hash_root)
|
||||
|
||||
if segnum is None:
|
||||
return False # we don't want any particular segment right now
|
||||
|
||||
# block_hash_tree
|
||||
needed_hashes = self._commonshare.get_needed_block_hashes(segnum)
|
||||
if needed_hashes:
|
||||
if not self._satisfy_block_hash_tree(needed_hashes):
|
||||
# can't check block without block_hash_tree
|
||||
return False
|
||||
|
||||
# ciphertext_hash_tree
|
||||
needed_hashes = self._node.get_needed_ciphertext_hashes(segnum)
|
||||
if needed_hashes:
|
||||
if not self._satisfy_ciphertext_hash_tree(needed_hashes):
|
||||
# can't check decoded blocks without ciphertext_hash_tree
|
||||
return False
|
||||
|
||||
# data blocks
|
||||
return self._satisfy_data_block(segnum, observers)
|
||||
|
||||
def _satisfy_offsets(self):
|
||||
version_s = self._received.get(0, 4)
|
||||
if version_s is None:
|
||||
return False
|
||||
(version,) = struct.unpack(">L", version_s)
|
||||
if version == 1:
|
||||
table_start = 0x0c
|
||||
self._fieldsize = 0x4
|
||||
self._fieldstruct = "L"
|
||||
elif version == 2:
|
||||
table_start = 0x14
|
||||
self._fieldsize = 0x8
|
||||
self._fieldstruct = "Q"
|
||||
else:
|
||||
self.had_corruption = True
|
||||
raise LayoutInvalid("unknown version %d (I understand 1 and 2)"
|
||||
% version)
|
||||
offset_table_size = 6 * self._fieldsize
|
||||
table_s = self._received.pop(table_start, offset_table_size)
|
||||
if table_s is None:
|
||||
return False
|
||||
fields = struct.unpack(">"+6*self._fieldstruct, table_s)
|
||||
offsets = {}
|
||||
for i,field in enumerate(['data',
|
||||
'plaintext_hash_tree', # UNUSED
|
||||
'crypttext_hash_tree',
|
||||
'block_hashes',
|
||||
'share_hashes',
|
||||
'uri_extension',
|
||||
] ):
|
||||
offsets[field] = fields[i]
|
||||
self.actual_offsets = offsets
|
||||
log.msg("actual offsets: data=%d, plaintext_hash_tree=%d, crypttext_hash_tree=%d, block_hashes=%d, share_hashes=%d, uri_extension=%d" % tuple(fields))
|
||||
self._received.remove(0, 4) # don't need this anymore
|
||||
|
||||
# validate the offsets a bit
|
||||
share_hashes_size = offsets["uri_extension"] - offsets["share_hashes"]
|
||||
if share_hashes_size < 0 or share_hashes_size % (2+HASH_SIZE) != 0:
|
||||
# the share hash chain is stored as (hashnum,hash) pairs
|
||||
self.had_corruption = True
|
||||
raise LayoutInvalid("share hashes malformed -- should be a"
|
||||
" multiple of %d bytes -- not %d" %
|
||||
(2+HASH_SIZE, share_hashes_size))
|
||||
block_hashes_size = offsets["share_hashes"] - offsets["block_hashes"]
|
||||
if block_hashes_size < 0 or block_hashes_size % (HASH_SIZE) != 0:
|
||||
# the block hash tree is stored as a list of hashes
|
||||
self.had_corruption = True
|
||||
raise LayoutInvalid("block hashes malformed -- should be a"
|
||||
" multiple of %d bytes -- not %d" %
|
||||
(HASH_SIZE, block_hashes_size))
|
||||
# we only look at 'crypttext_hash_tree' if the UEB says we're
|
||||
# actually using it. Same with 'plaintext_hash_tree'. This gives us
|
||||
# some wiggle room: a place to stash data for later extensions.
|
||||
|
||||
return True
|
||||
|
||||
def _satisfy_UEB(self):
|
||||
o = self.actual_offsets
|
||||
fsize = self._fieldsize
|
||||
UEB_length_s = self._received.get(o["uri_extension"], fsize)
|
||||
if not UEB_length_s:
|
||||
return False
|
||||
(UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
|
||||
UEB_s = self._received.pop(o["uri_extension"]+fsize, UEB_length)
|
||||
if not UEB_s:
|
||||
return False
|
||||
self._received.remove(o["uri_extension"], fsize)
|
||||
try:
|
||||
self._node.validate_and_store_UEB(UEB_s)
|
||||
return True
|
||||
except (LayoutInvalid, BadHashError), e:
|
||||
# TODO: if this UEB was bad, we'll keep trying to validate it
|
||||
# over and over again. Only log.err on the first one, or better
|
||||
# yet skip all but the first
|
||||
f = Failure(e)
|
||||
self._signal_corruption(f, o["uri_extension"], fsize+UEB_length)
|
||||
self.had_corruption = True
|
||||
raise
|
||||
|
||||
def _satisfy_share_hash_tree(self):
|
||||
# the share hash chain is stored as (hashnum,hash) tuples, so you
|
||||
# can't fetch just the pieces you need, because you don't know
|
||||
# exactly where they are. So fetch everything, and parse the results
|
||||
# later.
|
||||
o = self.actual_offsets
|
||||
hashlen = o["uri_extension"] - o["share_hashes"]
|
||||
assert hashlen % (2+HASH_SIZE) == 0
|
||||
hashdata = self._received.get(o["share_hashes"], hashlen)
|
||||
if not hashdata:
|
||||
return False
|
||||
share_hashes = {}
|
||||
for i in range(0, hashlen, 2+HASH_SIZE):
|
||||
(hashnum,) = struct.unpack(">H", hashdata[i:i+2])
|
||||
hashvalue = hashdata[i+2:i+2+HASH_SIZE]
|
||||
share_hashes[hashnum] = hashvalue
|
||||
# TODO: if they give us an empty set of hashes,
|
||||
# process_share_hashes() won't fail. We must ensure that this
|
||||
# situation doesn't allow unverified shares through. Manual testing
|
||||
# shows that set_block_hash_root() throws an assert because an
|
||||
# internal node is None instead of an actual hash, but we want
|
||||
# something better. It's probably best to add a method to
|
||||
# IncompleteHashTree which takes a leaf number and raises an
|
||||
# exception unless that leaf is present and fully validated.
|
||||
try:
|
||||
self._node.process_share_hashes(share_hashes)
|
||||
# adds to self._node.share_hash_tree
|
||||
except (BadHashError, NotEnoughHashesError), e:
|
||||
f = Failure(e)
|
||||
self._signal_corruption(f, o["share_hashes"], hashlen)
|
||||
self.had_corruption = True
|
||||
raise
|
||||
self._received.remove(o["share_hashes"], hashlen)
|
||||
return True
|
||||
|
||||
def _signal_corruption(self, f, start, offset):
|
||||
# there was corruption somewhere in the given range
|
||||
reason = "corruption in share[%d-%d): %s" % (start, start+offset,
|
||||
str(f.value))
|
||||
self._rref.callRemoteOnly("advise_corrupt_share", reason)
|
||||
|
||||
def _satisfy_block_hash_tree(self, needed_hashes):
|
||||
o_bh = self.actual_offsets["block_hashes"]
|
||||
block_hashes = {}
|
||||
for hashnum in needed_hashes:
|
||||
hashdata = self._received.get(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
|
||||
if hashdata:
|
||||
block_hashes[hashnum] = hashdata
|
||||
else:
|
||||
return False # missing some hashes
|
||||
# note that we don't submit any hashes to the block_hash_tree until
|
||||
# we've gotten them all, because the hash tree will throw an
|
||||
# exception if we only give it a partial set (which it therefore
|
||||
# cannot validate)
|
||||
try:
|
||||
self._commonshare.process_block_hashes(block_hashes)
|
||||
except (BadHashError, NotEnoughHashesError), e:
|
||||
f = Failure(e)
|
||||
hashnums = ",".join([str(n) for n in sorted(block_hashes.keys())])
|
||||
log.msg(format="hash failure in block_hashes=(%(hashnums)s),"
|
||||
" from %(share)s",
|
||||
hashnums=hashnums, shnum=self._shnum, share=repr(self),
|
||||
failure=f, level=log.WEIRD, parent=self._lp, umid="yNyFdA")
|
||||
hsize = max(0, max(needed_hashes)) * HASH_SIZE
|
||||
self._signal_corruption(f, o_bh, hsize)
|
||||
self.had_corruption = True
|
||||
raise
|
||||
for hashnum in needed_hashes:
|
||||
self._received.remove(o_bh+hashnum*HASH_SIZE, HASH_SIZE)
|
||||
return True
|
||||
|
||||
def _satisfy_ciphertext_hash_tree(self, needed_hashes):
|
||||
start = self.actual_offsets["crypttext_hash_tree"]
|
||||
hashes = {}
|
||||
for hashnum in needed_hashes:
|
||||
hashdata = self._received.get(start+hashnum*HASH_SIZE, HASH_SIZE)
|
||||
if hashdata:
|
||||
hashes[hashnum] = hashdata
|
||||
else:
|
||||
return False # missing some hashes
|
||||
# we don't submit any hashes to the ciphertext_hash_tree until we've
|
||||
# gotten them all
|
||||
try:
|
||||
self._node.process_ciphertext_hashes(hashes)
|
||||
except (BadHashError, NotEnoughHashesError), e:
|
||||
f = Failure(e)
|
||||
hashnums = ",".join([str(n) for n in sorted(hashes.keys())])
|
||||
log.msg(format="hash failure in ciphertext_hashes=(%(hashnums)s),"
|
||||
" from %(share)s",
|
||||
hashnums=hashnums, share=repr(self), failure=f,
|
||||
level=log.WEIRD, parent=self._lp, umid="iZI0TA")
|
||||
hsize = max(0, max(needed_hashes))*HASH_SIZE
|
||||
self._signal_corruption(f, start, hsize)
|
||||
self.had_corruption = True
|
||||
raise
|
||||
for hashnum in needed_hashes:
|
||||
self._received.remove(start+hashnum*HASH_SIZE, HASH_SIZE)
|
||||
return True
|
||||
|
||||
def _satisfy_data_block(self, segnum, observers):
|
||||
tail = (segnum == self._node.num_segments-1)
|
||||
datastart = self.actual_offsets["data"]
|
||||
blockstart = datastart + segnum * self._node.block_size
|
||||
blocklen = self._node.block_size
|
||||
if tail:
|
||||
blocklen = self._node.tail_block_size
|
||||
|
||||
block = self._received.pop(blockstart, blocklen)
|
||||
if not block:
|
||||
log.msg("no data for block %s (want [%d:+%d])" % (repr(self),
|
||||
blockstart, blocklen))
|
||||
return False
|
||||
log.msg(format="%(share)s._satisfy_data_block [%(start)d:+%(length)d]",
|
||||
share=repr(self), start=blockstart, length=blocklen,
|
||||
level=log.NOISY, parent=self._lp, umid="uTDNZg")
|
||||
# this block is being retired, either as COMPLETE or CORRUPT, since
|
||||
# no further data reads will help
|
||||
assert self._requested_blocks[0][0] == segnum
|
||||
try:
|
||||
self._commonshare.check_block(segnum, block)
|
||||
# hurrah, we have a valid block. Deliver it.
|
||||
for o in observers:
|
||||
# goes to SegmentFetcher._block_request_activity
|
||||
o.notify(state=COMPLETE, block=block)
|
||||
except (BadHashError, NotEnoughHashesError), e:
|
||||
# rats, we have a corrupt block. Notify our clients that they
|
||||
# need to look elsewhere, and advise the server. Unlike
|
||||
# corruption in other parts of the share, this doesn't cause us
|
||||
# to abandon the whole share.
|
||||
f = Failure(e)
|
||||
log.msg(format="hash failure in block %(segnum)d, from %(share)s",
|
||||
segnum=segnum, share=repr(self), failure=f,
|
||||
level=log.WEIRD, parent=self._lp, umid="mZjkqA")
|
||||
for o in observers:
|
||||
o.notify(state=CORRUPT)
|
||||
self._signal_corruption(f, blockstart, blocklen)
|
||||
self.had_corruption = True
|
||||
# in either case, we've retired this block
|
||||
self._requested_blocks.pop(0)
|
||||
# popping the request keeps us from turning around and wanting the
|
||||
# block again right away
|
||||
return True # got satisfaction
|
||||
|
||||
def _desire(self):
|
||||
segnum, observers = self._active_segnum_and_observers() # maybe None
|
||||
|
||||
# 'want_it' is for data we merely want: we know that we don't really
|
||||
# need it. This includes speculative reads, like the first 1KB of the
|
||||
# share (for the offset table) and the first 2KB of the UEB.
|
||||
#
|
||||
# 'need_it' is for data that, if we have the real offset table, we'll
|
||||
# need. If we are only guessing at the offset table, it's merely
|
||||
# wanted. (The share is abandoned if we can't get data that we really
|
||||
# need).
|
||||
#
|
||||
# 'gotta_gotta_have_it' is for data that we absolutely need,
|
||||
# independent of whether we're still guessing about the offset table:
|
||||
# the version number and the offset table itself.
|
||||
#
|
||||
# Mr. Popeil, I'm in trouble, need your assistance on the double. Aww..
|
||||
|
||||
desire = Spans(), Spans(), Spans()
|
||||
(want_it, need_it, gotta_gotta_have_it) = desire
|
||||
|
||||
self.actual_segment_size = self._node.segment_size # might be updated
|
||||
o = self.actual_offsets or self.guessed_offsets
|
||||
segsize = self.actual_segment_size or self.guessed_segment_size
|
||||
r = self._node._calculate_sizes(segsize)
|
||||
|
||||
if not self.actual_offsets:
|
||||
# all _desire functions add bits to the three desire[] spans
|
||||
self._desire_offsets(desire)
|
||||
|
||||
# we can use guessed offsets as long as this server tolerates
|
||||
# overrun. Otherwise, we must wait for the offsets to arrive before
|
||||
# we try to read anything else.
|
||||
if self.actual_offsets or self._overrun_ok:
|
||||
if not self._node.have_UEB:
|
||||
self._desire_UEB(desire, o)
|
||||
# They might ask for a segment that doesn't look right.
|
||||
# _satisfy() will catch+reject bad segnums once we know the UEB
|
||||
# (and therefore segsize and numsegs), so we'll only fail this
|
||||
# test if we're still guessing. We want to avoid asking the
|
||||
# hashtrees for needed_hashes() for bad segnums. So don't enter
|
||||
# _desire_hashes or _desire_data unless the segnum looks
|
||||
# reasonable.
|
||||
if segnum < r["num_segments"]:
|
||||
# XXX somehow we're getting here for sh5. we don't yet know
|
||||
# the actual_segment_size, we're still working off the guess.
|
||||
# the ciphertext_hash_tree has been corrected, but the
|
||||
# commonshare._block_hash_tree is still in the guessed state.
|
||||
self._desire_share_hashes(desire, o)
|
||||
if segnum is not None:
|
||||
self._desire_block_hashes(desire, o, segnum)
|
||||
self._desire_data(desire, o, r, segnum, segsize)
|
||||
else:
|
||||
log.msg("_desire: segnum(%d) looks wrong (numsegs=%d)"
|
||||
% (segnum, r["num_segments"]),
|
||||
level=log.UNUSUAL, parent=self._lp, umid="tuYRQQ")
|
||||
|
||||
log.msg("end _desire: want_it=%s need_it=%s gotta=%s"
|
||||
% (want_it.dump(), need_it.dump(), gotta_gotta_have_it.dump()))
|
||||
if self.actual_offsets:
|
||||
return (want_it, need_it+gotta_gotta_have_it)
|
||||
else:
|
||||
return (want_it+need_it, gotta_gotta_have_it)
|
||||
|
||||
def _desire_offsets(self, desire):
|
||||
(want_it, need_it, gotta_gotta_have_it) = desire
|
||||
if self._overrun_ok:
|
||||
# easy! this includes version number, sizes, and offsets
|
||||
want_it.add(0, 1024)
|
||||
return
|
||||
|
||||
# v1 has an offset table that lives [0x0,0x24). v2 lives [0x0,0x44).
|
||||
# To be conservative, only request the data that we know lives there,
|
||||
# even if that means more roundtrips.
|
||||
|
||||
gotta_gotta_have_it.add(0, 4) # version number, always safe
|
||||
version_s = self._received.get(0, 4)
|
||||
if not version_s:
|
||||
return
|
||||
(version,) = struct.unpack(">L", version_s)
|
||||
# The code in _satisfy_offsets will have checked this version
|
||||
# already. There is no code path to get this far with version>2.
|
||||
assert 1 <= version <= 2, "can't get here, version=%d" % version
|
||||
if version == 1:
|
||||
table_start = 0x0c
|
||||
fieldsize = 0x4
|
||||
elif version == 2:
|
||||
table_start = 0x14
|
||||
fieldsize = 0x8
|
||||
offset_table_size = 6 * fieldsize
|
||||
gotta_gotta_have_it.add(table_start, offset_table_size)
|
||||
|
||||
def _desire_UEB(self, desire, o):
|
||||
(want_it, need_it, gotta_gotta_have_it) = desire
|
||||
|
||||
# UEB data is stored as (length,data).
|
||||
if self._overrun_ok:
|
||||
# We can pre-fetch 2kb, which should probably cover it. If it
|
||||
# turns out to be larger, we'll come back here later with a known
|
||||
# length and fetch the rest.
|
||||
want_it.add(o["uri_extension"], 2048)
|
||||
# now, while that is probably enough to fetch the whole UEB, it
|
||||
# might not be, so we need to do the next few steps as well. In
|
||||
# most cases, the following steps will not actually add anything
|
||||
# to need_it
|
||||
|
||||
need_it.add(o["uri_extension"], self._fieldsize)
|
||||
# only use a length if we're sure it's correct, otherwise we'll
|
||||
# probably fetch a huge number
|
||||
if not self.actual_offsets:
|
||||
return
|
||||
UEB_length_s = self._received.get(o["uri_extension"], self._fieldsize)
|
||||
if UEB_length_s:
|
||||
(UEB_length,) = struct.unpack(">"+self._fieldstruct, UEB_length_s)
|
||||
# we know the length, so make sure we grab everything
|
||||
need_it.add(o["uri_extension"]+self._fieldsize, UEB_length)
|
||||
|
||||
def _desire_share_hashes(self, desire, o):
|
||||
(want_it, need_it, gotta_gotta_have_it) = desire
|
||||
|
||||
if self._node.share_hash_tree.needed_hashes(self._shnum):
|
||||
hashlen = o["uri_extension"] - o["share_hashes"]
|
||||
need_it.add(o["share_hashes"], hashlen)
|
||||
|
||||
def _desire_block_hashes(self, desire, o, segnum):
|
||||
(want_it, need_it, gotta_gotta_have_it) = desire
|
||||
|
||||
# block hash chain
|
||||
for hashnum in self._commonshare.get_needed_block_hashes(segnum):
|
||||
need_it.add(o["block_hashes"]+hashnum*HASH_SIZE, HASH_SIZE)
|
||||
|
||||
# ciphertext hash chain
|
||||
for hashnum in self._node.get_needed_ciphertext_hashes(segnum):
|
||||
need_it.add(o["crypttext_hash_tree"]+hashnum*HASH_SIZE, HASH_SIZE)
|
||||
|
||||
def _desire_data(self, desire, o, r, segnum, segsize):
|
||||
(want_it, need_it, gotta_gotta_have_it) = desire
|
||||
tail = (segnum == r["num_segments"]-1)
|
||||
datastart = o["data"]
|
||||
blockstart = datastart + segnum * r["block_size"]
|
||||
blocklen = r["block_size"]
|
||||
if tail:
|
||||
blocklen = r["tail_block_size"]
|
||||
need_it.add(blockstart, blocklen)
|
||||
|
||||
def _send_requests(self, desired):
|
||||
ask = desired - self._pending - self._received.get_spans()
|
||||
log.msg("%s._send_requests, desired=%s, pending=%s, ask=%s" %
|
||||
(repr(self), desired.dump(), self._pending.dump(), ask.dump()),
|
||||
level=log.NOISY, parent=self._lp, umid="E94CVA")
|
||||
# XXX At one time, this code distinguished between data blocks and
|
||||
# hashes, and made sure to send (small) requests for hashes before
|
||||
# sending (big) requests for blocks. The idea was to make sure that
|
||||
# all hashes arrive before the blocks, so the blocks can be consumed
|
||||
# and released in a single turn. I removed this for simplicity.
|
||||
# Reconsider the removal: maybe bring it back.
|
||||
ds = self._download_status
|
||||
|
||||
for (start, length) in ask:
|
||||
# TODO: quantize to reasonably-large blocks
|
||||
self._pending.add(start, length)
|
||||
lp = log.msg(format="%(share)s._send_request"
|
||||
" [%(start)d:+%(length)d]",
|
||||
share=repr(self),
|
||||
start=start, length=length,
|
||||
level=log.NOISY, parent=self._lp, umid="sgVAyA")
|
||||
req_ev = ds.add_request_sent(self._peerid, self._shnum,
|
||||
start, length, now())
|
||||
d = self._send_request(start, length)
|
||||
d.addCallback(self._got_data, start, length, req_ev, lp)
|
||||
d.addErrback(self._got_error, start, length, req_ev, lp)
|
||||
d.addCallback(self._trigger_loop)
|
||||
d.addErrback(lambda f:
|
||||
log.err(format="unhandled error during send_request",
|
||||
failure=f, parent=self._lp,
|
||||
level=log.WEIRD, umid="qZu0wg"))
|
||||
|
||||
def _send_request(self, start, length):
|
||||
return self._rref.callRemote("read", start, length)
|
||||
|
||||
def _got_data(self, data, start, length, req_ev, lp):
|
||||
req_ev.finished(len(data), now())
|
||||
if not self._alive:
|
||||
return
|
||||
log.msg(format="%(share)s._got_data [%(start)d:+%(length)d] -> %(datalen)d",
|
||||
share=repr(self), start=start, length=length, datalen=len(data),
|
||||
level=log.NOISY, parent=lp, umid="5Qn6VQ")
|
||||
self._pending.remove(start, length)
|
||||
self._received.add(start, data)
|
||||
|
||||
# if we ask for [a:c], and we get back [a:b] (b<c), that means we're
|
||||
# never going to get [b:c]. If we really need that data, this block
|
||||
# will never complete. The easiest way to get into this situation is
|
||||
# to hit a share with a corrupted offset table, or one that's somehow
|
||||
# been truncated. On the other hand, when overrun_ok is true, we ask
|
||||
# for data beyond the end of the share all the time (it saves some
|
||||
# RTT when we don't know the length of the share ahead of time). So
|
||||
# not every asked-for-but-not-received byte is fatal.
|
||||
if len(data) < length:
|
||||
self._unavailable.add(start+len(data), length-len(data))
|
||||
|
||||
# XXX if table corruption causes our sections to overlap, then one
|
||||
# consumer (i.e. block hash tree) will pop/remove the data that
|
||||
# another consumer (i.e. block data) mistakenly thinks it needs. It
|
||||
# won't ask for that data again, because the span is in
|
||||
# self._requested. But that span won't be in self._unavailable
|
||||
# because we got it back from the server. TODO: handle this properly
|
||||
# (raise DataUnavailable). Then add sanity-checking
|
||||
# no-overlaps-allowed tests to the offset-table unpacking code to
|
||||
# catch this earlier. XXX
|
||||
|
||||
# accumulate a wanted/needed span (not as self._x, but passed into
|
||||
# desire* functions). manage a pending/in-flight list. when the
|
||||
# requests are sent out, empty/discard the wanted/needed span and
|
||||
# populate/augment the pending list. when the responses come back,
|
||||
# augment either received+data or unavailable.
|
||||
|
||||
# if a corrupt offset table results in double-usage, we'll send
|
||||
# double requests.
|
||||
|
||||
# the wanted/needed span is only "wanted" for the first pass. Once
|
||||
# the offset table arrives, it's all "needed".
|
||||
|
||||
def _got_error(self, f, start, length, req_ev, lp):
|
||||
req_ev.finished("error", now())
|
||||
log.msg(format="error requesting %(start)d+%(length)d"
|
||||
" from %(server)s for si %(si)s",
|
||||
start=start, length=length,
|
||||
server=self._peerid_s, si=self._si_prefix,
|
||||
failure=f, parent=lp, level=log.UNUSUAL, umid="BZgAJw")
|
||||
# retire our observers, assuming we won't be able to make any
|
||||
# further progress
|
||||
self._fail(f, log.UNUSUAL)
|
||||
|
||||
def _trigger_loop(self, res):
|
||||
if self._alive:
|
||||
eventually(self.loop)
|
||||
return res
|
||||
|
||||
def _fail(self, f, level=log.WEIRD):
|
||||
log.msg(format="abandoning %(share)s",
|
||||
share=repr(self), failure=f,
|
||||
level=level, parent=self._lp, umid="JKM2Og")
|
||||
self._alive = False
|
||||
for (segnum, observers) in self._requested_blocks:
|
||||
for o in observers:
|
||||
o.notify(state=DEAD, f=f)
|
||||
|
||||
|
||||
class CommonShare:
|
||||
"""I hold data that is common across all instances of a single share,
|
||||
like sh2 on both servers A and B. This is just the block hash tree.
|
||||
"""
|
||||
def __init__(self, guessed_numsegs, si_prefix, shnum, logparent):
|
||||
self.si_prefix = si_prefix
|
||||
self.shnum = shnum
|
||||
# in the beginning, before we have the real UEB, we can only guess at
|
||||
# the number of segments. But we want to ask for block hashes early.
|
||||
# So if we're asked for which block hashes are needed before we know
|
||||
# numsegs for sure, we return a guess.
|
||||
self._block_hash_tree = IncompleteHashTree(guessed_numsegs)
|
||||
self._know_numsegs = False
|
||||
self._logparent = logparent
|
||||
|
||||
def set_numsegs(self, numsegs):
|
||||
if self._know_numsegs:
|
||||
return
|
||||
self._block_hash_tree = IncompleteHashTree(numsegs)
|
||||
self._know_numsegs = True
|
||||
|
||||
def need_block_hash_root(self):
|
||||
return bool(not self._block_hash_tree[0])
|
||||
|
||||
def set_block_hash_root(self, roothash):
|
||||
assert self._know_numsegs
|
||||
self._block_hash_tree.set_hashes({0: roothash})
|
||||
|
||||
def get_needed_block_hashes(self, segnum):
|
||||
# XXX: include_leaf=True needs thought: how did the old downloader do
|
||||
# it? I think it grabbed *all* block hashes and set them all at once.
|
||||
# Since we want to fetch less data, we either need to fetch the leaf
|
||||
# too, or wait to set the block hashes until we've also received the
|
||||
# block itself, so we can hash it too, and set the chain+leaf all at
|
||||
# the same time.
|
||||
return self._block_hash_tree.needed_hashes(segnum, include_leaf=True)
|
||||
|
||||
def process_block_hashes(self, block_hashes):
|
||||
assert self._know_numsegs
|
||||
# this may raise BadHashError or NotEnoughHashesError
|
||||
self._block_hash_tree.set_hashes(block_hashes)
|
||||
|
||||
def check_block(self, segnum, block):
|
||||
assert self._know_numsegs
|
||||
h = hashutil.block_hash(block)
|
||||
# this may raise BadHashError or NotEnoughHashesError
|
||||
self._block_hash_tree.set_hashes(leaves={segnum: h})
|
170
src/allmydata/immutable/downloader/status.py
Normal file
170
src/allmydata/immutable/downloader/status.py
Normal file
@ -0,0 +1,170 @@
|
||||
|
||||
import itertools
|
||||
from zope.interface import implements
|
||||
from allmydata.interfaces import IDownloadStatus
|
||||
|
||||
class RequestEvent:
|
||||
def __init__(self, download_status, tag):
|
||||
self._download_status = download_status
|
||||
self._tag = tag
|
||||
def finished(self, received, when):
|
||||
self._download_status.add_request_finished(self._tag, received, when)
|
||||
|
||||
class DYHBEvent:
|
||||
def __init__(self, download_status, tag):
|
||||
self._download_status = download_status
|
||||
self._tag = tag
|
||||
def finished(self, shnums, when):
|
||||
self._download_status.add_dyhb_finished(self._tag, shnums, when)
|
||||
|
||||
class ReadEvent:
|
||||
def __init__(self, download_status, tag):
|
||||
self._download_status = download_status
|
||||
self._tag = tag
|
||||
def update(self, bytes, decrypttime, pausetime):
|
||||
self._download_status.update_read_event(self._tag, bytes,
|
||||
decrypttime, pausetime)
|
||||
def finished(self, finishtime):
|
||||
self._download_status.finish_read_event(self._tag, finishtime)
|
||||
|
||||
class DownloadStatus:
|
||||
# There is one DownloadStatus for each CiphertextFileNode. The status
|
||||
# object will keep track of all activity for that node.
|
||||
implements(IDownloadStatus)
|
||||
statusid_counter = itertools.count(0)
|
||||
|
||||
def __init__(self, storage_index, size):
|
||||
self.storage_index = storage_index
|
||||
self.size = size
|
||||
self.counter = self.statusid_counter.next()
|
||||
self.helper = False
|
||||
self.started = None
|
||||
# self.dyhb_requests tracks "do you have a share" requests and
|
||||
# responses. It maps serverid to a tuple of:
|
||||
# send time
|
||||
# tuple of response shnums (None if response hasn't arrived, "error")
|
||||
# response time (None if response hasn't arrived yet)
|
||||
self.dyhb_requests = {}
|
||||
|
||||
# self.requests tracks share-data requests and responses. It maps
|
||||
# serverid to a tuple of:
|
||||
# shnum,
|
||||
# start,length, (of data requested)
|
||||
# send time
|
||||
# response length (None if reponse hasn't arrived yet, or "error")
|
||||
# response time (None if response hasn't arrived)
|
||||
self.requests = {}
|
||||
|
||||
# self.segment_events tracks segment requests and delivery. It is a
|
||||
# list of:
|
||||
# type ("request", "delivery", "error")
|
||||
# segment number
|
||||
# event time
|
||||
# segment start (file offset of first byte, None except in "delivery")
|
||||
# segment length (only in "delivery")
|
||||
# time spent in decode (only in "delivery")
|
||||
self.segment_events = []
|
||||
|
||||
# self.read_events tracks read() requests. It is a list of:
|
||||
# start,length (of data requested)
|
||||
# request time
|
||||
# finish time (None until finished)
|
||||
# bytes returned (starts at 0, grows as segments are delivered)
|
||||
# time spent in decrypt (None for ciphertext-only reads)
|
||||
# time spent paused
|
||||
self.read_events = []
|
||||
|
||||
self.known_shares = [] # (serverid, shnum)
|
||||
self.problems = []
|
||||
|
||||
|
||||
def add_dyhb_sent(self, serverid, when):
|
||||
r = (when, None, None)
|
||||
if serverid not in self.dyhb_requests:
|
||||
self.dyhb_requests[serverid] = []
|
||||
self.dyhb_requests[serverid].append(r)
|
||||
tag = (serverid, len(self.dyhb_requests[serverid])-1)
|
||||
return DYHBEvent(self, tag)
|
||||
|
||||
def add_dyhb_finished(self, tag, shnums, when):
|
||||
# received="error" on error, else tuple(shnums)
|
||||
(serverid, index) = tag
|
||||
r = self.dyhb_requests[serverid][index]
|
||||
(sent, _, _) = r
|
||||
r = (sent, shnums, when)
|
||||
self.dyhb_requests[serverid][index] = r
|
||||
|
||||
def add_request_sent(self, serverid, shnum, start, length, when):
|
||||
r = (shnum, start, length, when, None, None)
|
||||
if serverid not in self.requests:
|
||||
self.requests[serverid] = []
|
||||
self.requests[serverid].append(r)
|
||||
tag = (serverid, len(self.requests[serverid])-1)
|
||||
return RequestEvent(self, tag)
|
||||
|
||||
def add_request_finished(self, tag, received, when):
|
||||
# received="error" on error, else len(data)
|
||||
(serverid, index) = tag
|
||||
r = self.requests[serverid][index]
|
||||
(shnum, start, length, sent, _, _) = r
|
||||
r = (shnum, start, length, sent, received, when)
|
||||
self.requests[serverid][index] = r
|
||||
|
||||
def add_segment_request(self, segnum, when):
|
||||
if self.started is None:
|
||||
self.started = when
|
||||
r = ("request", segnum, when, None, None, None)
|
||||
self.segment_events.append(r)
|
||||
def add_segment_delivery(self, segnum, when, start, length, decodetime):
|
||||
r = ("delivery", segnum, when, start, length, decodetime)
|
||||
self.segment_events.append(r)
|
||||
def add_segment_error(self, segnum, when):
|
||||
r = ("error", segnum, when, None, None, None)
|
||||
self.segment_events.append(r)
|
||||
|
||||
def add_read_event(self, start, length, when):
|
||||
if self.started is None:
|
||||
self.started = when
|
||||
r = (start, length, when, None, 0, 0, 0)
|
||||
self.read_events.append(r)
|
||||
tag = len(self.read_events)-1
|
||||
return ReadEvent(self, tag)
|
||||
def update_read_event(self, tag, bytes_d, decrypt_d, paused_d):
|
||||
r = self.read_events[tag]
|
||||
(start, length, requesttime, finishtime, bytes, decrypt, paused) = r
|
||||
bytes += bytes_d
|
||||
decrypt += decrypt_d
|
||||
paused += paused_d
|
||||
r = (start, length, requesttime, finishtime, bytes, decrypt, paused)
|
||||
self.read_events[tag] = r
|
||||
def finish_read_event(self, tag, finishtime):
|
||||
r = self.read_events[tag]
|
||||
(start, length, requesttime, _, bytes, decrypt, paused) = r
|
||||
r = (start, length, requesttime, finishtime, bytes, decrypt, paused)
|
||||
self.read_events[tag] = r
|
||||
|
||||
def add_known_share(self, serverid, shnum):
|
||||
self.known_shares.append( (serverid, shnum) )
|
||||
|
||||
def add_problem(self, p):
|
||||
self.problems.append(p)
|
||||
|
||||
# IDownloadStatus methods
|
||||
def get_counter(self):
|
||||
return self.counter
|
||||
def get_storage_index(self):
|
||||
return self.storage_index
|
||||
def get_size(self):
|
||||
return self.size
|
||||
def get_status(self):
|
||||
return "not impl yet" # TODO
|
||||
def get_progress(self):
|
||||
return 0.1 # TODO
|
||||
def using_helper(self):
|
||||
return False
|
||||
def get_active(self):
|
||||
return False # TODO
|
||||
def get_started(self):
|
||||
return self.started
|
||||
def get_results(self):
|
||||
return None # TODO
|
Loading…
x
Reference in New Issue
Block a user