mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-13 22:03:04 +00:00
upload.py: apply David-Sarah's advice rename (un)contacted(2) trackers to first_pass/second_pass/next_pass
This patch was written by Brian but was re-recorded by Zooko (with David-Sarah looking on) to use darcs replace instead of editing to rename the three variables to their new names. refs #1363
This commit is contained in:
parent
c9def76977
commit
880758340f
@ -173,11 +173,12 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
num_segments, total_shares, needed_shares,
|
||||
servers_of_happiness):
|
||||
"""
|
||||
@return: (upload_trackers, already_servers), where upload_trackers is
|
||||
a set of ServerTracker instances that have agreed to hold
|
||||
@return: (upload_trackers, already_serverids), where upload_trackers
|
||||
is a set of ServerTracker instances that have agreed to hold
|
||||
some shares for us (the shareids are stashed inside the
|
||||
ServerTracker), and already_servers is a dict mapping shnum
|
||||
to a set of serverids which claim to already have the share.
|
||||
ServerTracker), and already_serverids is a dict mapping
|
||||
shnum to a set of serverids for servers which claim to
|
||||
already have the share.
|
||||
"""
|
||||
|
||||
if self._status:
|
||||
@ -188,9 +189,6 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
self.needed_shares = needed_shares
|
||||
|
||||
self.homeless_shares = set(range(total_shares))
|
||||
self.contacted_trackers = [] # servers worth asking again
|
||||
self.contacted_trackers2 = [] # servers that we have asked again
|
||||
self._started_second_pass = False
|
||||
self.use_trackers = set() # ServerTrackers that have shares assigned
|
||||
# to them
|
||||
self.preexisting_shares = {} # shareid => set(serverids) holding shareid
|
||||
@ -249,7 +247,21 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
renew, cancel)
|
||||
trackers.append(st)
|
||||
return trackers
|
||||
self.uncontacted_trackers = _make_trackers(writable_servers)
|
||||
|
||||
# We assign each servers/trackers into one three lists. They all
|
||||
# start in the "first pass" list. During the first pass, as we ask
|
||||
# each one to hold a share, we move their tracker to the "second
|
||||
# pass" list, until the first-pass list is empty. Then during the
|
||||
# second pass, as we ask each to hold more shares, we move their
|
||||
# tracker to the "next pass" list, until the second-pass list is
|
||||
# empty. Then we move everybody from the next-pass list back to the
|
||||
# second-pass list and repeat the "second" pass (really the third,
|
||||
# fourth, etc pass), until all shares are assigned, or we've run out
|
||||
# of potential servers.
|
||||
self.first_pass_trackers = _make_trackers(writable_servers)
|
||||
self.second_pass_trackers = [] # servers worth asking again
|
||||
self.next_pass_trackers = [] # servers that we have asked again
|
||||
self._started_second_pass = False
|
||||
|
||||
# We don't try to allocate shares to these servers, since they've
|
||||
# said that they're incapable of storing shares of the size that we'd
|
||||
@ -356,7 +368,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
shares_to_spread = sum([len(list(sharelist)) - 1
|
||||
for (server, sharelist)
|
||||
in shares.items()])
|
||||
if delta <= len(self.uncontacted_trackers) and \
|
||||
if delta <= len(self.first_pass_trackers) and \
|
||||
shares_to_spread >= delta:
|
||||
items = shares.items()
|
||||
while len(self.homeless_shares) < delta:
|
||||
@ -392,8 +404,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
self.log(servmsg, level=log.INFREQUENT)
|
||||
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
|
||||
|
||||
if self.uncontacted_trackers:
|
||||
tracker = self.uncontacted_trackers.pop(0)
|
||||
if self.first_pass_trackers:
|
||||
tracker = self.first_pass_trackers.pop(0)
|
||||
# TODO: don't pre-convert all serverids to ServerTrackers
|
||||
assert isinstance(tracker, ServerTracker)
|
||||
|
||||
@ -408,17 +420,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
len(self.homeless_shares)))
|
||||
d = tracker.query(shares_to_ask)
|
||||
d.addBoth(self._got_response, tracker, shares_to_ask,
|
||||
self.contacted_trackers)
|
||||
self.second_pass_trackers)
|
||||
return d
|
||||
elif self.contacted_trackers:
|
||||
elif self.second_pass_trackers:
|
||||
# ask a server that we've already asked.
|
||||
if not self._started_second_pass:
|
||||
self.log("starting second pass",
|
||||
level=log.NOISY)
|
||||
self._started_second_pass = True
|
||||
num_shares = mathutil.div_ceil(len(self.homeless_shares),
|
||||
len(self.contacted_trackers))
|
||||
tracker = self.contacted_trackers.pop(0)
|
||||
len(self.second_pass_trackers))
|
||||
tracker = self.second_pass_trackers.pop(0)
|
||||
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
|
||||
self.homeless_shares -= shares_to_ask
|
||||
self.query_count += 1
|
||||
@ -429,13 +441,13 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
len(self.homeless_shares)))
|
||||
d = tracker.query(shares_to_ask)
|
||||
d.addBoth(self._got_response, tracker, shares_to_ask,
|
||||
self.contacted_trackers2)
|
||||
self.next_pass_trackers)
|
||||
return d
|
||||
elif self.contacted_trackers2:
|
||||
elif self.next_pass_trackers:
|
||||
# we've finished the second-or-later pass. Move all the remaining
|
||||
# servers back into self.contacted_trackers for the next pass.
|
||||
self.contacted_trackers.extend(self.contacted_trackers2)
|
||||
self.contacted_trackers2[:] = []
|
||||
# servers back into self.second_pass_trackers for the next pass.
|
||||
self.second_pass_trackers.extend(self.next_pass_trackers)
|
||||
self.next_pass_trackers[:] = []
|
||||
return self._loop()
|
||||
else:
|
||||
# no more servers. If we haven't placed enough shares, we fail.
|
||||
@ -470,9 +482,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
self.error_count += 1
|
||||
self.bad_query_count += 1
|
||||
self.homeless_shares |= shares_to_ask
|
||||
if (self.uncontacted_trackers
|
||||
or self.contacted_trackers
|
||||
or self.contacted_trackers2):
|
||||
if (self.first_pass_trackers
|
||||
or self.second_pass_trackers
|
||||
or self.next_pass_trackers):
|
||||
# there is still hope, so just loop
|
||||
pass
|
||||
else:
|
||||
@ -923,27 +935,29 @@ class CHKUploader:
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
def set_shareholders(self, (upload_trackers, already_servers), encoder):
|
||||
def set_shareholders(self, (upload_trackers, already_serverids), encoder):
|
||||
"""
|
||||
@param upload_trackers: a sequence of ServerTracker objects that
|
||||
have agreed to hold some shares for us (the
|
||||
shareids are stashed inside the ServerTracker)
|
||||
@paran already_servers: a dict mapping sharenum to a set of serverids
|
||||
that claim to already have this share
|
||||
|
||||
@paran already_serverids: a dict mapping sharenum to a set of
|
||||
serverids for servers that claim to already
|
||||
have this share
|
||||
"""
|
||||
msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s"
|
||||
msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
|
||||
values = ([', '.join([str_shareloc(k,v)
|
||||
for k,v in st.buckets.iteritems()])
|
||||
for st in upload_trackers], already_servers)
|
||||
for st in upload_trackers], already_serverids)
|
||||
self.log(msgtempl % values, level=log.OPERATIONAL)
|
||||
# record already-present shares in self._results
|
||||
self._results.preexisting_shares = len(already_servers)
|
||||
self._results.preexisting_shares = len(already_serverids)
|
||||
|
||||
self._server_trackers = {} # k: shnum, v: instance of ServerTracker
|
||||
for tracker in upload_trackers:
|
||||
assert isinstance(tracker, ServerTracker)
|
||||
buckets = {}
|
||||
servermap = already_servers.copy()
|
||||
servermap = already_serverids.copy()
|
||||
for tracker in upload_trackers:
|
||||
buckets.update(tracker.buckets)
|
||||
for shnum in tracker.buckets:
|
||||
|
Loading…
x
Reference in New Issue
Block a user