mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-10 14:34:05 +00:00
upload.py: fix var names to avoid confusion between 'trackers' and 'servers'
This commit is contained in:
parent
ebfcb649f9
commit
0cf9e3b150
@ -186,19 +186,13 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
self.needed_shares = needed_shares
|
self.needed_shares = needed_shares
|
||||||
|
|
||||||
self.homeless_shares = set(range(total_shares))
|
self.homeless_shares = set(range(total_shares))
|
||||||
self.contacted_servers = [] # servers worth asking again
|
self.contacted_trackers = [] # servers worth asking again
|
||||||
self.contacted_servers2 = [] # servers that we have asked again
|
self.contacted_trackers2 = [] # servers that we have asked again
|
||||||
self._started_second_pass = False
|
self._started_second_pass = False
|
||||||
self.use_servers = set() # ServerTrackers that have shares assigned
|
self.use_trackers = set() # ServerTrackers that have shares assigned
|
||||||
# to them
|
# to them
|
||||||
self.preexisting_shares = {} # shareid => set(serverids) holding shareid
|
self.preexisting_shares = {} # shareid => set(serverids) holding shareid
|
||||||
# We don't try to allocate shares to these servers, since they've said
|
|
||||||
# that they're incapable of storing shares of the size that we'd want
|
|
||||||
# to store. We keep them around because they may have existing shares
|
|
||||||
# for this storage index, which we want to know about for accurate
|
|
||||||
# servers_of_happiness accounting
|
|
||||||
# (this is eventually a list, but it is initialized later)
|
|
||||||
self.readonly_servers = None
|
|
||||||
# These servers have shares -- any shares -- for our SI. We keep
|
# These servers have shares -- any shares -- for our SI. We keep
|
||||||
# track of these to write an error message with them later.
|
# track of these to write an error message with them later.
|
||||||
self.servers_with_shares = set()
|
self.servers_with_shares = set()
|
||||||
@ -251,25 +245,32 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
bucket_cancel_secret_hash(file_cancel_secret,
|
bucket_cancel_secret_hash(file_cancel_secret,
|
||||||
serverid))
|
serverid))
|
||||||
for (serverid, conn) in servers]
|
for (serverid, conn) in servers]
|
||||||
self.uncontacted_servers = _make_trackers(writable_servers)
|
self.uncontacted_trackers = _make_trackers(writable_servers)
|
||||||
self.readonly_servers = _make_trackers(readonly_servers)
|
|
||||||
|
# We don't try to allocate shares to these servers, since they've
|
||||||
|
# said that they're incapable of storing shares of the size that we'd
|
||||||
|
# want to store. We ask them about existing shares for this storage
|
||||||
|
# index, which we want to know about for accurate
|
||||||
|
# servers_of_happiness accounting, then we forget about them.
|
||||||
|
readonly_trackers = _make_trackers(readonly_servers)
|
||||||
|
|
||||||
# We now ask servers that can't hold any new shares about existing
|
# We now ask servers that can't hold any new shares about existing
|
||||||
# shares that they might have for our SI. Once this is done, we
|
# shares that they might have for our SI. Once this is done, we
|
||||||
# start placing the shares that we haven't already accounted
|
# start placing the shares that we haven't already accounted
|
||||||
# for.
|
# for.
|
||||||
ds = []
|
ds = []
|
||||||
if self._status and self.readonly_servers:
|
if self._status and readonly_trackers:
|
||||||
self._status.set_status("Contacting readonly servers to find "
|
self._status.set_status("Contacting readonly servers to find "
|
||||||
"any existing shares")
|
"any existing shares")
|
||||||
for server in self.readonly_servers:
|
for tracker in readonly_trackers:
|
||||||
assert isinstance(server, ServerTracker)
|
assert isinstance(tracker, ServerTracker)
|
||||||
d = server.ask_about_existing_shares()
|
d = tracker.ask_about_existing_shares()
|
||||||
d.addBoth(self._handle_existing_response, server.serverid)
|
d.addBoth(self._handle_existing_response, tracker.serverid)
|
||||||
ds.append(d)
|
ds.append(d)
|
||||||
self.num_servers_contacted += 1
|
self.num_servers_contacted += 1
|
||||||
self.query_count += 1
|
self.query_count += 1
|
||||||
self.log("asking server %s for any existing shares" %
|
self.log("asking server %s for any existing shares" %
|
||||||
(idlib.shortnodeid_b2a(server.serverid),),
|
(idlib.shortnodeid_b2a(tracker.serverid),),
|
||||||
level=log.NOISY)
|
level=log.NOISY)
|
||||||
dl = defer.DeferredList(ds)
|
dl = defer.DeferredList(ds)
|
||||||
dl.addCallback(lambda ign: self._loop())
|
dl.addCallback(lambda ign: self._loop())
|
||||||
@ -323,19 +324,19 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
|
|
||||||
def _loop(self):
|
def _loop(self):
|
||||||
if not self.homeless_shares:
|
if not self.homeless_shares:
|
||||||
merged = merge_peers(self.preexisting_shares, self.use_servers)
|
merged = merge_peers(self.preexisting_shares, self.use_trackers)
|
||||||
effective_happiness = servers_of_happiness(merged)
|
effective_happiness = servers_of_happiness(merged)
|
||||||
if self.servers_of_happiness <= effective_happiness:
|
if self.servers_of_happiness <= effective_happiness:
|
||||||
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
|
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
|
||||||
"self.use_servers: %s, self.preexisting_shares: %s") \
|
"self.use_trackers: %s, self.preexisting_shares: %s") \
|
||||||
% (self, self._get_progress_message(),
|
% (self, self._get_progress_message(),
|
||||||
pretty_print_shnum_to_servers(merged),
|
pretty_print_shnum_to_servers(merged),
|
||||||
[', '.join([str_shareloc(k,v)
|
[', '.join([str_shareloc(k,v)
|
||||||
for k,v in s.buckets.iteritems()])
|
for k,v in st.buckets.iteritems()])
|
||||||
for s in self.use_servers],
|
for st in self.use_trackers],
|
||||||
pretty_print_shnum_to_servers(self.preexisting_shares))
|
pretty_print_shnum_to_servers(self.preexisting_shares))
|
||||||
self.log(msg, level=log.OPERATIONAL)
|
self.log(msg, level=log.OPERATIONAL)
|
||||||
return (self.use_servers, self.preexisting_shares)
|
return (self.use_trackers, self.preexisting_shares)
|
||||||
else:
|
else:
|
||||||
# We're not okay right now, but maybe we can fix it by
|
# We're not okay right now, but maybe we can fix it by
|
||||||
# redistributing some shares. In cases where one or two
|
# redistributing some shares. In cases where one or two
|
||||||
@ -352,7 +353,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
shares_to_spread = sum([len(list(sharelist)) - 1
|
shares_to_spread = sum([len(list(sharelist)) - 1
|
||||||
for (server, sharelist)
|
for (server, sharelist)
|
||||||
in shares.items()])
|
in shares.items()])
|
||||||
if delta <= len(self.uncontacted_servers) and \
|
if delta <= len(self.uncontacted_trackers) and \
|
||||||
shares_to_spread >= delta:
|
shares_to_spread >= delta:
|
||||||
items = shares.items()
|
items = shares.items()
|
||||||
while len(self.homeless_shares) < delta:
|
while len(self.homeless_shares) < delta:
|
||||||
@ -368,7 +369,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
if not self.preexisting_shares[share]:
|
if not self.preexisting_shares[share]:
|
||||||
del self.preexisting_shares[share]
|
del self.preexisting_shares[share]
|
||||||
items.append((server, sharelist))
|
items.append((server, sharelist))
|
||||||
for writer in self.use_servers:
|
for writer in self.use_trackers:
|
||||||
writer.abort_some_buckets(self.homeless_shares)
|
writer.abort_some_buckets(self.homeless_shares)
|
||||||
return self._loop()
|
return self._loop()
|
||||||
else:
|
else:
|
||||||
@ -388,10 +389,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
self.log(servmsg, level=log.INFREQUENT)
|
self.log(servmsg, level=log.INFREQUENT)
|
||||||
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
|
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
|
||||||
|
|
||||||
if self.uncontacted_servers:
|
if self.uncontacted_trackers:
|
||||||
server = self.uncontacted_servers.pop(0)
|
tracker = self.uncontacted_trackers.pop(0)
|
||||||
# TODO: don't pre-convert all serverids to ServerTrackers
|
# TODO: don't pre-convert all serverids to ServerTrackers
|
||||||
assert isinstance(server, ServerTracker)
|
assert isinstance(tracker, ServerTracker)
|
||||||
|
|
||||||
shares_to_ask = set(sorted(self.homeless_shares)[:1])
|
shares_to_ask = set(sorted(self.homeless_shares)[:1])
|
||||||
self.homeless_shares -= shares_to_ask
|
self.homeless_shares -= shares_to_ask
|
||||||
@ -400,42 +401,42 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
if self._status:
|
if self._status:
|
||||||
self._status.set_status("Contacting Servers [%s] (first query),"
|
self._status.set_status("Contacting Servers [%s] (first query),"
|
||||||
" %d shares left.."
|
" %d shares left.."
|
||||||
% (idlib.shortnodeid_b2a(server.serverid),
|
% (idlib.shortnodeid_b2a(tracker.serverid),
|
||||||
len(self.homeless_shares)))
|
len(self.homeless_shares)))
|
||||||
d = server.query(shares_to_ask)
|
d = tracker.query(shares_to_ask)
|
||||||
d.addBoth(self._got_response, server, shares_to_ask,
|
d.addBoth(self._got_response, tracker, shares_to_ask,
|
||||||
self.contacted_servers)
|
self.contacted_trackers)
|
||||||
return d
|
return d
|
||||||
elif self.contacted_servers:
|
elif self.contacted_trackers:
|
||||||
# ask a server that we've already asked.
|
# ask a server that we've already asked.
|
||||||
if not self._started_second_pass:
|
if not self._started_second_pass:
|
||||||
self.log("starting second pass",
|
self.log("starting second pass",
|
||||||
level=log.NOISY)
|
level=log.NOISY)
|
||||||
self._started_second_pass = True
|
self._started_second_pass = True
|
||||||
num_shares = mathutil.div_ceil(len(self.homeless_shares),
|
num_shares = mathutil.div_ceil(len(self.homeless_shares),
|
||||||
len(self.contacted_servers))
|
len(self.contacted_trackers))
|
||||||
server = self.contacted_servers.pop(0)
|
tracker = self.contacted_trackers.pop(0)
|
||||||
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
|
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
|
||||||
self.homeless_shares -= shares_to_ask
|
self.homeless_shares -= shares_to_ask
|
||||||
self.query_count += 1
|
self.query_count += 1
|
||||||
if self._status:
|
if self._status:
|
||||||
self._status.set_status("Contacting Servers [%s] (second query),"
|
self._status.set_status("Contacting Servers [%s] (second query),"
|
||||||
" %d shares left.."
|
" %d shares left.."
|
||||||
% (idlib.shortnodeid_b2a(server.serverid),
|
% (idlib.shortnodeid_b2a(tracker.serverid),
|
||||||
len(self.homeless_shares)))
|
len(self.homeless_shares)))
|
||||||
d = server.query(shares_to_ask)
|
d = tracker.query(shares_to_ask)
|
||||||
d.addBoth(self._got_response, server, shares_to_ask,
|
d.addBoth(self._got_response, tracker, shares_to_ask,
|
||||||
self.contacted_servers2)
|
self.contacted_trackers2)
|
||||||
return d
|
return d
|
||||||
elif self.contacted_servers2:
|
elif self.contacted_trackers2:
|
||||||
# we've finished the second-or-later pass. Move all the remaining
|
# we've finished the second-or-later pass. Move all the remaining
|
||||||
# servers back into self.contacted_servers for the next pass.
|
# servers back into self.contacted_trackers for the next pass.
|
||||||
self.contacted_servers.extend(self.contacted_servers2)
|
self.contacted_trackers.extend(self.contacted_trackers2)
|
||||||
self.contacted_servers2[:] = []
|
self.contacted_trackers2[:] = []
|
||||||
return self._loop()
|
return self._loop()
|
||||||
else:
|
else:
|
||||||
# no more servers. If we haven't placed enough shares, we fail.
|
# no more servers. If we haven't placed enough shares, we fail.
|
||||||
merged = merge_peers(self.preexisting_shares, self.use_servers)
|
merged = merge_peers(self.preexisting_shares, self.use_trackers)
|
||||||
effective_happiness = servers_of_happiness(merged)
|
effective_happiness = servers_of_happiness(merged)
|
||||||
if effective_happiness < self.servers_of_happiness:
|
if effective_happiness < self.servers_of_happiness:
|
||||||
msg = failure_message(len(self.servers_with_shares),
|
msg = failure_message(len(self.servers_with_shares),
|
||||||
@ -455,20 +456,20 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
|
msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
|
||||||
self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
|
self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
|
||||||
self.log(msg, level=log.OPERATIONAL)
|
self.log(msg, level=log.OPERATIONAL)
|
||||||
return (self.use_servers, self.preexisting_shares)
|
return (self.use_trackers, self.preexisting_shares)
|
||||||
|
|
||||||
def _got_response(self, res, server, shares_to_ask, put_server_here):
|
def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
|
||||||
if isinstance(res, failure.Failure):
|
if isinstance(res, failure.Failure):
|
||||||
# This is unusual, and probably indicates a bug or a network
|
# This is unusual, and probably indicates a bug or a network
|
||||||
# problem.
|
# problem.
|
||||||
self.log("%s got error during server selection: %s" % (server, res),
|
self.log("%s got error during server selection: %s" % (tracker, res),
|
||||||
level=log.UNUSUAL)
|
level=log.UNUSUAL)
|
||||||
self.error_count += 1
|
self.error_count += 1
|
||||||
self.bad_query_count += 1
|
self.bad_query_count += 1
|
||||||
self.homeless_shares |= shares_to_ask
|
self.homeless_shares |= shares_to_ask
|
||||||
if (self.uncontacted_servers
|
if (self.uncontacted_trackers
|
||||||
or self.contacted_servers
|
or self.contacted_trackers
|
||||||
or self.contacted_servers2):
|
or self.contacted_trackers2):
|
||||||
# there is still hope, so just loop
|
# there is still hope, so just loop
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
@ -477,17 +478,17 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
# failure we got: if a coding error causes all servers to fail
|
# failure we got: if a coding error causes all servers to fail
|
||||||
# in the same way, this allows the common failure to be seen
|
# in the same way, this allows the common failure to be seen
|
||||||
# by the uploader and should help with debugging
|
# by the uploader and should help with debugging
|
||||||
msg = ("last failure (from %s) was: %s" % (server, res))
|
msg = ("last failure (from %s) was: %s" % (tracker, res))
|
||||||
self.last_failure_msg = msg
|
self.last_failure_msg = msg
|
||||||
else:
|
else:
|
||||||
(alreadygot, allocated) = res
|
(alreadygot, allocated) = res
|
||||||
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
|
self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
|
||||||
% (idlib.shortnodeid_b2a(server.serverid),
|
% (idlib.shortnodeid_b2a(tracker.serverid),
|
||||||
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
|
tuple(sorted(alreadygot)), tuple(sorted(allocated))),
|
||||||
level=log.NOISY)
|
level=log.NOISY)
|
||||||
progress = False
|
progress = False
|
||||||
for s in alreadygot:
|
for s in alreadygot:
|
||||||
self.preexisting_shares.setdefault(s, set()).add(server.serverid)
|
self.preexisting_shares.setdefault(s, set()).add(tracker.serverid)
|
||||||
if s in self.homeless_shares:
|
if s in self.homeless_shares:
|
||||||
self.homeless_shares.remove(s)
|
self.homeless_shares.remove(s)
|
||||||
progress = True
|
progress = True
|
||||||
@ -497,11 +498,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
# the ServerTracker will remember which shares were allocated on
|
# the ServerTracker will remember which shares were allocated on
|
||||||
# that peer. We just have to remember to use them.
|
# that peer. We just have to remember to use them.
|
||||||
if allocated:
|
if allocated:
|
||||||
self.use_servers.add(server)
|
self.use_trackers.add(tracker)
|
||||||
progress = True
|
progress = True
|
||||||
|
|
||||||
if allocated or alreadygot:
|
if allocated or alreadygot:
|
||||||
self.servers_with_shares.add(server.serverid)
|
self.servers_with_shares.add(tracker.serverid)
|
||||||
|
|
||||||
not_yet_present = set(shares_to_ask) - set(alreadygot)
|
not_yet_present = set(shares_to_ask) - set(alreadygot)
|
||||||
still_homeless = not_yet_present - set(allocated)
|
still_homeless = not_yet_present - set(allocated)
|
||||||
@ -532,7 +533,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
else:
|
else:
|
||||||
# if they *were* able to accept everything, they might be
|
# if they *were* able to accept everything, they might be
|
||||||
# willing to accept even more.
|
# willing to accept even more.
|
||||||
put_server_here.append(server)
|
put_tracker_here.append(tracker)
|
||||||
|
|
||||||
# now loop
|
# now loop
|
||||||
return self._loop()
|
return self._loop()
|
||||||
@ -545,11 +546,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
|||||||
place shares for this file. I then raise an
|
place shares for this file. I then raise an
|
||||||
UploadUnhappinessError with my msg argument.
|
UploadUnhappinessError with my msg argument.
|
||||||
"""
|
"""
|
||||||
for server in self.use_servers:
|
for tracker in self.use_trackers:
|
||||||
assert isinstance(server, ServerTracker)
|
assert isinstance(tracker, ServerTracker)
|
||||||
|
tracker.abort()
|
||||||
server.abort()
|
|
||||||
|
|
||||||
raise UploadUnhappinessError(msg)
|
raise UploadUnhappinessError(msg)
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user