mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-18 18:56:28 +00:00
immutable: download from the first servers which provide at least K buckets instead of waiting for all servers to reply
This should put an end to the phenomenon I've been seeing that a single hung server can cause all downloads on a grid to hang. Also it should speed up all downloads by (a) not-waiting for responses to queries that it doesn't need, and (b) downloading shares from the servers which answered the initial query the fastest. Also, do not count how many buckets you've gotten when deciding whether the download has enough shares or not -- instead count how many buckets to *unique* shares that you've gotten. This appears to improve a slightly weird behavior in the current download code in which receiving >= K different buckets all to the same sharenumber would make it think it had enough to download the file when in fact it hadn't. This patch needs tests before it is actually ready for trunk.
This commit is contained in:
parent
14280b009c
commit
2bd9dfa5bd
@ -788,7 +788,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
self._opened = False
|
||||
|
||||
self.active_buckets = {} # k: shnum, v: bucket
|
||||
self._share_buckets = [] # list of (sharenum, bucket) tuples
|
||||
self._share_buckets = {} # k: sharenum, v: list of buckets
|
||||
self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
|
||||
|
||||
self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
|
||||
@ -869,7 +869,18 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
return d
|
||||
|
||||
def _get_all_shareholders(self):
|
||||
dl = []
|
||||
""" Once the number of buckets that I know about is >= K then I
|
||||
callback the Deferred that I return.
|
||||
|
||||
If all of the get_buckets deferreds have fired (whether callback or
|
||||
errback) and I still don't have enough buckets then I'll callback the
|
||||
Deferred that I return.
|
||||
"""
|
||||
self._wait_for_enough_buckets_d = defer.Deferred()
|
||||
|
||||
self._queries_sent = 0
|
||||
self._responses_received = 0
|
||||
self._queries_failed = 0
|
||||
sb = self._storage_broker
|
||||
servers = sb.get_servers_for_index(self._storage_index)
|
||||
if not servers:
|
||||
@ -878,17 +889,15 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
self.log(format="sending DYHB to [%(peerid)s]",
|
||||
peerid=idlib.shortnodeid_b2a(peerid),
|
||||
level=log.NOISY, umid="rT03hg")
|
||||
self._queries_sent += 1
|
||||
d = ss.callRemote("get_buckets", self._storage_index)
|
||||
d.addCallbacks(self._got_response, self._got_error,
|
||||
callbackArgs=(peerid,))
|
||||
dl.append(d)
|
||||
self._responses_received = 0
|
||||
self._queries_sent = len(dl)
|
||||
if self._status:
|
||||
self._status.set_status("Locating Shares (%d/%d)" %
|
||||
(self._responses_received,
|
||||
self._queries_sent))
|
||||
return defer.DeferredList(dl)
|
||||
return self._wait_for_enough_buckets_d
|
||||
|
||||
def _got_response(self, buckets, peerid):
|
||||
self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
|
||||
@ -906,6 +915,19 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
for sharenum, bucket in buckets.iteritems():
|
||||
b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
|
||||
self.add_share_bucket(sharenum, b)
|
||||
# If we just got enough buckets for the first time, then fire the
|
||||
# deferred. Then remove it from self so that we don't fire it
|
||||
# again.
|
||||
if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
|
||||
self._wait_for_enough_buckets_d.callback(True)
|
||||
self._wait_for_enough_buckets_d = None
|
||||
|
||||
# Else, if we ran out of outstanding requests then fire it and
|
||||
# remove it from self.
|
||||
assert (self._responses_received+self._queries_failed) <= self._queries_sent
|
||||
if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._queries_sent:
|
||||
self._wait_for_enough_buckets_d.callback(False)
|
||||
self._wait_for_enough_buckets_d = None
|
||||
|
||||
if self._results:
|
||||
if peerid not in self._results.servermap:
|
||||
@ -914,7 +936,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
|
||||
def add_share_bucket(self, sharenum, bucket):
|
||||
# this is split out for the benefit of test_encode.py
|
||||
self._share_buckets.append( (sharenum, bucket) )
|
||||
self._share_buckets.setdefault(sharenum, []).append(bucket)
|
||||
|
||||
def _got_error(self, f):
|
||||
level = log.WEIRD
|
||||
@ -922,6 +944,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
level = log.UNUSUAL
|
||||
self.log("Error during get_buckets", failure=f, level=level,
|
||||
umid="3uuBUQ")
|
||||
# If we ran out of outstanding requests then errback it and remove it
|
||||
# from self.
|
||||
self._queries_failed += 1
|
||||
assert (self._responses_received+self._queries_failed) <= self._queries_sent
|
||||
if self._wait_for_enough_buckets_d and self._responses_received == self._queries_sent:
|
||||
self._wait_for_enough_buckets_d.errback()
|
||||
self._wait_for_enough_buckets_d = None
|
||||
|
||||
def bucket_failed(self, vbucket):
|
||||
shnum = vbucket.sharenum
|
||||
@ -964,8 +993,9 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
uri_extension_fetch_started = time.time()
|
||||
|
||||
vups = []
|
||||
for sharenum, bucket in self._share_buckets:
|
||||
vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
|
||||
for sharenum, buckets in self._share_buckets.iteritems():
|
||||
for bucket in buckets:
|
||||
vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
|
||||
vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
|
||||
d = vto.start()
|
||||
|
||||
@ -1001,9 +1031,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
|
||||
def _get_crypttext_hash_tree(self, res):
|
||||
vchtps = []
|
||||
for sharenum, bucket in self._share_buckets:
|
||||
vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
|
||||
vchtps.append(vchtp)
|
||||
for sharenum, buckets in self._share_buckets.iteritems():
|
||||
for bucket in buckets:
|
||||
vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
|
||||
vchtps.append(vchtp)
|
||||
|
||||
_get_crypttext_hash_tree_started = time.time()
|
||||
if self._status:
|
||||
@ -1054,9 +1085,10 @@ class CiphertextDownloader(log.PrefixingLogMixin):
|
||||
|
||||
|
||||
def _download_all_segments(self, res):
|
||||
for sharenum, bucket in self._share_buckets:
|
||||
vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
|
||||
self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
|
||||
for sharenum, buckets in self._share_buckets.iteritems():
|
||||
for bucket in buckets:
|
||||
vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
|
||||
self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
|
||||
|
||||
# after the above code, self._share_vbuckets contains enough
|
||||
# buckets to complete the download, and some extra ones to
|
||||
|
Loading…
Reference in New Issue
Block a user