diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index cb7b40280..b36a435ce 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -171,11 +171,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): num_segments, total_shares, needed_shares, servers_of_happiness): """ - @return: (upload_servers, already_servers), where upload_servers is + @return: (upload_trackers, already_servers), where upload_trackers is a set of ServerTracker instances that have agreed to hold some shares for us (the shareids are stashed inside the ServerTracker), and already_servers is a dict mapping shnum - to a set of servers which claim to already have the share. + to a set of serverids which claim to already have the share. """ if self._status: @@ -195,7 +195,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # These servers have shares -- any shares -- for our SI. We keep # track of these to write an error message with them later. - self.servers_with_shares = set() + self.serverids_with_shares = set() # this needed_hashes computation should mirror # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree @@ -277,26 +277,26 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): return dl - def _handle_existing_response(self, res, server): + def _handle_existing_response(self, res, serverid): """ I handle responses to the queries sent by Tahoe2ServerSelector._existing_shares. """ if isinstance(res, failure.Failure): self.log("%s got error during existing shares check: %s" - % (idlib.shortnodeid_b2a(server), res), + % (idlib.shortnodeid_b2a(serverid), res), level=log.UNUSUAL) self.error_count += 1 self.bad_query_count += 1 else: buckets = res if buckets: - self.servers_with_shares.add(server) + self.serverids_with_shares.add(serverid) self.log("response to get_buckets() from server %s: alreadygot=%s" - % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))), + % (idlib.shortnodeid_b2a(serverid), tuple(sorted(buckets))), level=log.NOISY) for bucket in buckets: - self.preexisting_shares.setdefault(bucket, set()).add(server) + self.preexisting_shares.setdefault(bucket, set()).add(serverid) self.homeless_shares.discard(bucket) self.full_count += 1 self.bad_query_count += 1 @@ -374,7 +374,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): return self._loop() else: # Redistribution won't help us; fail. - server_count = len(self.servers_with_shares) + server_count = len(self.serverids_with_shares) failmsg = failure_message(server_count, self.needed_shares, self.servers_of_happiness, @@ -439,7 +439,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): merged = merge_peers(self.preexisting_shares, self.use_trackers) effective_happiness = servers_of_happiness(merged) if effective_happiness < self.servers_of_happiness: - msg = failure_message(len(self.servers_with_shares), + msg = failure_message(len(self.serverids_with_shares), self.needed_shares, self.servers_of_happiness, effective_happiness) @@ -502,7 +502,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): progress = True if allocated or alreadygot: - self.servers_with_shares.add(tracker.serverid) + self.serverids_with_shares.add(tracker.serverid) not_yet_present = set(shares_to_ask) - set(alreadygot) still_homeless = not_yet_present - set(allocated) @@ -920,38 +920,39 @@ class CHKUploader: d.addCallback(_done) return d - def set_shareholders(self, (upload_servers, already_servers), encoder): + def set_shareholders(self, (upload_trackers, already_servers), encoder): """ - @param upload_servers: a sequence of ServerTracker objects that - have agreed to hold some shares for us (the - shareids are stashed inside the ServerTracker) + @param upload_trackers: a sequence of ServerTracker objects that + have agreed to hold some shares for us (the + shareids are stashed inside the ServerTracker) @paran already_servers: a dict mapping sharenum to a set of serverids that claim to already have this share """ - msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s" - values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()]) - for s in upload_servers], already_servers) + msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s" + values = ([', '.join([str_shareloc(k,v) + for k,v in st.buckets.iteritems()]) + for st in upload_trackers], already_servers) self.log(msgtempl % values, level=log.OPERATIONAL) # record already-present shares in self._results self._results.preexisting_shares = len(already_servers) self._server_trackers = {} # k: shnum, v: instance of ServerTracker - for server in upload_servers: - assert isinstance(server, ServerTracker) + for tracker in upload_trackers: + assert isinstance(tracker, ServerTracker) buckets = {} servermap = already_servers.copy() - for server in upload_servers: - buckets.update(server.buckets) - for shnum in server.buckets: - self._server_trackers[shnum] = server - servermap.setdefault(shnum, set()).add(server.serverid) - assert len(buckets) == sum([len(server.buckets) - for server in upload_servers]), \ + for tracker in upload_trackers: + buckets.update(tracker.buckets) + for shnum in tracker.buckets: + self._server_trackers[shnum] = tracker + servermap.setdefault(shnum, set()).add(tracker.serverid) + assert len(buckets) == sum([len(tracker.buckets) + for tracker in upload_trackers]), \ "%s (%s) != %s (%s)" % ( len(buckets), buckets, - sum([len(server.buckets) for server in upload_servers]), - [(s.buckets, s.serverid) for s in upload_servers] + sum([len(tracker.buckets) for tracker in upload_trackers]), + [(t.buckets, t.serverid) for t in upload_trackers] ) encoder.set_shareholders(buckets, servermap)