upload.py: more tracker-vs-server cleanup

This commit is contained in:
Brian Warner 2011-02-26 19:11:07 -07:00
parent 0cf9e3b150
commit 43488b025c

View File

@ -171,11 +171,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
num_segments, total_shares, needed_shares, num_segments, total_shares, needed_shares,
servers_of_happiness): servers_of_happiness):
""" """
@return: (upload_servers, already_servers), where upload_servers is @return: (upload_trackers, already_servers), where upload_trackers is
a set of ServerTracker instances that have agreed to hold a set of ServerTracker instances that have agreed to hold
some shares for us (the shareids are stashed inside the some shares for us (the shareids are stashed inside the
ServerTracker), and already_servers is a dict mapping shnum ServerTracker), and already_servers is a dict mapping shnum
to a set of servers which claim to already have the share. to a set of serverids which claim to already have the share.
""" """
if self._status: if self._status:
@ -195,7 +195,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# These servers have shares -- any shares -- for our SI. We keep # These servers have shares -- any shares -- for our SI. We keep
# track of these to write an error message with them later. # track of these to write an error message with them later.
self.servers_with_shares = set() self.serverids_with_shares = set()
# this needed_hashes computation should mirror # this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
@ -277,26 +277,26 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
return dl return dl
def _handle_existing_response(self, res, server): def _handle_existing_response(self, res, serverid):
""" """
I handle responses to the queries sent by I handle responses to the queries sent by
Tahoe2ServerSelector._existing_shares. Tahoe2ServerSelector._existing_shares.
""" """
if isinstance(res, failure.Failure): if isinstance(res, failure.Failure):
self.log("%s got error during existing shares check: %s" self.log("%s got error during existing shares check: %s"
% (idlib.shortnodeid_b2a(server), res), % (idlib.shortnodeid_b2a(serverid), res),
level=log.UNUSUAL) level=log.UNUSUAL)
self.error_count += 1 self.error_count += 1
self.bad_query_count += 1 self.bad_query_count += 1
else: else:
buckets = res buckets = res
if buckets: if buckets:
self.servers_with_shares.add(server) self.serverids_with_shares.add(serverid)
self.log("response to get_buckets() from server %s: alreadygot=%s" self.log("response to get_buckets() from server %s: alreadygot=%s"
% (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))), % (idlib.shortnodeid_b2a(serverid), tuple(sorted(buckets))),
level=log.NOISY) level=log.NOISY)
for bucket in buckets: for bucket in buckets:
self.preexisting_shares.setdefault(bucket, set()).add(server) self.preexisting_shares.setdefault(bucket, set()).add(serverid)
self.homeless_shares.discard(bucket) self.homeless_shares.discard(bucket)
self.full_count += 1 self.full_count += 1
self.bad_query_count += 1 self.bad_query_count += 1
@ -374,7 +374,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
return self._loop() return self._loop()
else: else:
# Redistribution won't help us; fail. # Redistribution won't help us; fail.
server_count = len(self.servers_with_shares) server_count = len(self.serverids_with_shares)
failmsg = failure_message(server_count, failmsg = failure_message(server_count,
self.needed_shares, self.needed_shares,
self.servers_of_happiness, self.servers_of_happiness,
@ -439,7 +439,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
merged = merge_peers(self.preexisting_shares, self.use_trackers) merged = merge_peers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged) effective_happiness = servers_of_happiness(merged)
if effective_happiness < self.servers_of_happiness: if effective_happiness < self.servers_of_happiness:
msg = failure_message(len(self.servers_with_shares), msg = failure_message(len(self.serverids_with_shares),
self.needed_shares, self.needed_shares,
self.servers_of_happiness, self.servers_of_happiness,
effective_happiness) effective_happiness)
@ -502,7 +502,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
progress = True progress = True
if allocated or alreadygot: if allocated or alreadygot:
self.servers_with_shares.add(tracker.serverid) self.serverids_with_shares.add(tracker.serverid)
not_yet_present = set(shares_to_ask) - set(alreadygot) not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated) still_homeless = not_yet_present - set(allocated)
@ -920,38 +920,39 @@ class CHKUploader:
d.addCallback(_done) d.addCallback(_done)
return d return d
def set_shareholders(self, (upload_servers, already_servers), encoder): def set_shareholders(self, (upload_trackers, already_servers), encoder):
""" """
@param upload_servers: a sequence of ServerTracker objects that @param upload_trackers: a sequence of ServerTracker objects that
have agreed to hold some shares for us (the have agreed to hold some shares for us (the
shareids are stashed inside the ServerTracker) shareids are stashed inside the ServerTracker)
@paran already_servers: a dict mapping sharenum to a set of serverids @paran already_servers: a dict mapping sharenum to a set of serverids
that claim to already have this share that claim to already have this share
""" """
msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s" msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s"
values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()]) values = ([', '.join([str_shareloc(k,v)
for s in upload_servers], already_servers) for k,v in st.buckets.iteritems()])
for st in upload_trackers], already_servers)
self.log(msgtempl % values, level=log.OPERATIONAL) self.log(msgtempl % values, level=log.OPERATIONAL)
# record already-present shares in self._results # record already-present shares in self._results
self._results.preexisting_shares = len(already_servers) self._results.preexisting_shares = len(already_servers)
self._server_trackers = {} # k: shnum, v: instance of ServerTracker self._server_trackers = {} # k: shnum, v: instance of ServerTracker
for server in upload_servers: for tracker in upload_trackers:
assert isinstance(server, ServerTracker) assert isinstance(tracker, ServerTracker)
buckets = {} buckets = {}
servermap = already_servers.copy() servermap = already_servers.copy()
for server in upload_servers: for tracker in upload_trackers:
buckets.update(server.buckets) buckets.update(tracker.buckets)
for shnum in server.buckets: for shnum in tracker.buckets:
self._server_trackers[shnum] = server self._server_trackers[shnum] = tracker
servermap.setdefault(shnum, set()).add(server.serverid) servermap.setdefault(shnum, set()).add(tracker.serverid)
assert len(buckets) == sum([len(server.buckets) assert len(buckets) == sum([len(tracker.buckets)
for server in upload_servers]), \ for tracker in upload_trackers]), \
"%s (%s) != %s (%s)" % ( "%s (%s) != %s (%s)" % (
len(buckets), len(buckets),
buckets, buckets,
sum([len(server.buckets) for server in upload_servers]), sum([len(tracker.buckets) for tracker in upload_trackers]),
[(s.buckets, s.serverid) for s in upload_servers] [(t.buckets, t.serverid) for t in upload_trackers]
) )
encoder.set_shareholders(buckets, servermap) encoder.set_shareholders(buckets, servermap)