From 17cff7a17674439dacb0f26657eb316170887439 Mon Sep 17 00:00:00 2001 From: Mark Berger Date: Mon, 17 Jun 2013 13:38:49 -0400 Subject: [PATCH] Implements 'Servers of Happiness' algorithm for upload This is Mark Berger's original commits, from ticket #1382 --- docs/specifications/servers-of-happiness.rst | 66 ++++ src/allmydata/immutable/downloader/finder.py | 1 + src/allmydata/immutable/happiness_upload.py | 314 ++++++++++++++++++ src/allmydata/immutable/upload.py | 327 +++++++++++-------- src/allmydata/interfaces.py | 74 +++++ src/allmydata/test/test_checker.py | 6 +- src/allmydata/test/test_download.py | 64 ++-- src/allmydata/test/test_hung_server.py | 2 +- src/allmydata/test/test_repairer.py | 2 +- src/allmydata/test/test_upload.py | 87 ++--- src/allmydata/util/happinessutil.py | 36 +- 11 files changed, 713 insertions(+), 266 deletions(-) create mode 100644 src/allmydata/immutable/happiness_upload.py diff --git a/docs/specifications/servers-of-happiness.rst b/docs/specifications/servers-of-happiness.rst index 91377e749..7d36848d9 100644 --- a/docs/specifications/servers-of-happiness.rst +++ b/docs/specifications/servers-of-happiness.rst @@ -90,3 +90,69 @@ issues. We don't use servers-of-happiness for mutable files yet; this fix will likely come in Tahoe-LAFS version 1.13. + + +============================ +Upload Strategy of Happiness +============================ + +As mentioned above, the uploader is good at detecting instances which +do not pass the servers-of-happiness test, but the share distribution algorithm +is not always successful in instances where happiness can be achieved. A new +placement algorithm designed to pass the servers-of-happiness test, titled +'Upload Strategy of Happiness', is meant to fix these instances where the uploader +is unable to achieve happiness. + +Calculating Share Placements +============================ + +We calculate share placement like so: + +1. Query 2n servers for existing shares. + +2. Construct a bipartite graph of readonly servers to shares, where an edge +exists between an arbitrary readonly server s and an arbitrary share n if and only if s +holds n. + +3. Calculate the maximum matching graph of the bipartite graph. The maxmum matching +is the matching which contains the largest possible number of edges. + +4. Construct a bipartite graph of servers to shares, removing any servers and +shares used in the maximum matching graph from step 3. Let an edge exist between +server s and share n if and only if s holds n. + +5. Calculate the maximum matching graph of the new graph. + +6. Construct a bipartite graph of servers to share, removing any servers and +shares used in the maximum matching graphs from steps 3 and 5. Let an edge exist +between server s and share n if and only if s can hold n. + +7. Calculate the maximum matching graph of the new graph. + +8. Renew the shares on their respective servers from steps 3 +and 5. + +9. Place share n on server s if an edge exists between s and n in the +maximum matching graph from step 7. + +10. If any placements from step 7 fail, remove the server from the set of possible +servers and regenerate the matchings. + + +Properties of Upload Strategy of Happiness +========================================== + +The size of the maximum bipartite matching is bounded by the size of the smaller +set of vertices. Therefore in a situation where the set of servers is smaller +than the set of shares, placement is not generated for a subset of shares. In +this case the remaining shares are distributed as evenly as possible across the +set of writable servers. + +If the servers-of-happiness criteria can be met, the upload strategy of +happiness guarantees that H shares will be placed on the network. During file +repair, if the set of servers is larger than N, the algorithm will only attempt +to spread shares over N distinct servers. For both initial file upload and file +repair, N should be viewed as the maximum number of distinct servers shares +can be placed on, and H as the minimum amount. The uploader will fail if +the number of distinct servers is less than H, and it will never attempt to +exceed N. diff --git a/src/allmydata/immutable/downloader/finder.py b/src/allmydata/immutable/downloader/finder.py index 8bcdca76f..2aa4f857d 100644 --- a/src/allmydata/immutable/downloader/finder.py +++ b/src/allmydata/immutable/downloader/finder.py @@ -63,6 +63,7 @@ class ShareFinder: if not self._started: si = self.verifycap.storage_index servers = self._storage_broker.get_servers_for_psi(si) + servers.sort(key=lambda s: s.get_serverid()) self._servers = iter(servers) self._started = True diff --git a/src/allmydata/immutable/happiness_upload.py b/src/allmydata/immutable/happiness_upload.py new file mode 100644 index 000000000..d48d57276 --- /dev/null +++ b/src/allmydata/immutable/happiness_upload.py @@ -0,0 +1,314 @@ +from Queue import PriorityQueue +from allmydata.util.happinessutil import augmenting_path_for, residual_network + +class Happiness_Upload: + """ + I handle the calculations involved with generating the maximum + spanning graph for a file when given a set of peers, a set of shares, + and a servermap of 'peer' -> [shares]. + + For more information on the algorithm this class implements, refer to + docs/specifications/servers-of-happiness.rst + """ + + def __init__(self, peers, readonly_peers, shares, servermap={}): + self._happiness = 0 + self.homeless_shares = set() + self.peers = peers + self.readonly_peers = readonly_peers + self.shares = shares + self.servermap = servermap + + def happiness(self): + return self._happiness + + + def generate_mappings(self): + """ + Generates the allocations the upload should based on the given + information. We construct a dictionary of 'share_num' -> set(server_ids) + and return it to the caller. Each share should be placed on each server + in the corresponding set. Existing allocations appear as placements + because attempting to place an existing allocation will renew the share. + """ + + # First calculate share placement for the readonly servers. + readonly_peers = self.readonly_peers + readonly_shares = set() + readonly_map = {} + for peer in self.servermap: + if peer in self.readonly_peers: + readonly_map.setdefault(peer, self.servermap[peer]) + for share in self.servermap[peer]: + readonly_shares.add(share) + + readonly_mappings = self._calculate_mappings(readonly_peers, readonly_shares, readonly_map) + used_peers, used_shares = self._extract_ids(readonly_mappings) + + # Calculate share placement for the remaining existing allocations + peers = set(self.servermap.keys()) - used_peers + # Squash a list of sets into one set + shares = set(item for subset in self.servermap.values() for item in subset) + shares -= used_shares + servermap = self.servermap.copy() + for peer in self.servermap: + if peer in used_peers: + servermap.pop(peer, None) + else: + servermap[peer] = servermap[peer] - used_shares + if servermap[peer] == set(): + servermap.pop(peer, None) + peers.remove(peer) + + existing_mappings = self._calculate_mappings(peers, shares, servermap) + existing_peers, existing_shares = self._extract_ids(existing_mappings) + + # Calculate share placement for the remaining peers and shares which + # won't be preserved by existing allocations. + peers = self.peers - existing_peers - used_peers + shares = self.shares - existing_shares - used_shares + new_mappings = self._calculate_mappings(peers, shares) + + mappings = dict(readonly_mappings.items() + existing_mappings.items() + new_mappings.items()) + self._calculate_happiness(mappings) + if len(self.homeless_shares) != 0: + all_shares = set(item for subset in self.servermap.values() for item in subset) + self._distribute_homeless_shares(mappings, all_shares) + + return mappings + + + def _calculate_mappings(self, peers, shares, servermap=None): + """ + Given a set of peers, a set of shares, and a dictionary of server -> + set(shares), determine how the uploader should allocate shares. If a + servermap is supplied, determine which existing allocations should be + preserved. If servermap is None, calculate the maximum matching of the + bipartite graph (U, V, E) such that: + + U = peers + V = shares + E = peers x shares + + Returns a dictionary {share -> set(peer)}, indicating that the share + should be placed on each peer in the set. If a share's corresponding + value is None, the share can be placed on any server. Note that the set + of peers should only be one peer when returned, but it is possible to + duplicate shares by adding additional servers to the set. + """ + peer_to_index, index_to_peer = self._reindex(peers, 1) + share_to_index, index_to_share = self._reindex(shares, len(peers) + 1) + shareIndices = [share_to_index[s] for s in shares] + if servermap: + graph = self._servermap_flow_graph(peers, shares, servermap) + else: + peerIndices = [peer_to_index[peer] for peer in peers] + graph = self._flow_network(peerIndices, shareIndices) + max_graph = self._compute_maximum_graph(graph, shareIndices) + return self._convert_mappings(index_to_peer, index_to_share, max_graph) + + + def _compute_maximum_graph(self, graph, shareIndices): + """ + This is an implementation of the Ford-Fulkerson method for finding + a maximum flow in a flow network applied to a bipartite graph. + Specifically, it is the Edmonds-Karp algorithm, since it uses a + BFS to find the shortest augmenting path at each iteration, if one + exists. + + The implementation here is an adapation of an algorithm described in + "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. + """ + + if graph == []: + return {} + + dim = len(graph) + flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] + residual_graph, residual_function = residual_network(graph, flow_function) + + while augmenting_path_for(residual_graph): + path = augmenting_path_for(residual_graph) + # Delta is the largest amount that we can increase flow across + # all of the edges in path. Because of the way that the residual + # function is constructed, f[u][v] for a particular edge (u, v) + # is the amount of unused capacity on that edge. Taking the + # minimum of a list of those values for each edge in the + # augmenting path gives us our delta. + delta = min(map(lambda (u, v), rf=residual_function: rf[u][v], + path)) + for (u, v) in path: + flow_function[u][v] += delta + flow_function[v][u] -= delta + residual_graph, residual_function = residual_network(graph,flow_function) + + new_mappings = {} + for shareIndex in shareIndices: + peer = residual_graph[shareIndex] + if peer == [dim - 1]: + new_mappings.setdefault(shareIndex, None) + else: + new_mappings.setdefault(shareIndex, peer[0]) + + return new_mappings + + + def _extract_ids(self, mappings): + shares = set() + peers = set() + for share in mappings: + if mappings[share] == None: + pass + else: + shares.add(share) + for item in mappings[share]: + peers.add(item) + return (peers, shares) + + + def _calculate_happiness(self, mappings): + """ + I calculate the happiness of the generated mappings and + create the set self.homeless_shares. + """ + self._happiness = 0 + self.homeless_shares = set() + for share in mappings: + if mappings[share] is not None: + self._happiness += 1 + else: + self.homeless_shares.add(share) + + + def _distribute_homeless_shares(self, mappings, shares): + """ + Shares which are not mapped to a peer in the maximum spanning graph + still need to be placed on a server. This function attempts to + distribute those homeless shares as evenly as possible over the + available peers. If possible a share will be placed on the server it was + originally on, signifying the lease should be renewed instead. + """ + + # First check to see if the leases can be renewed. + to_distribute = set() + + for share in self.homeless_shares: + if share in shares: + for peer in self.servermap: + if share in self.servermap[peer]: + mappings[share] = set([peer]) + break + else: + to_distribute.add(share) + + # This builds a priority queue of peers with the number of shares + # each peer holds as the priority. + + priority = {} + pQueue = PriorityQueue() + for peer in self.peers: + priority.setdefault(peer, 0) + for share in mappings: + if mappings[share] is not None: + for peer in mappings[share]: + if peer in self.peers: + priority[peer] += 1 + + if priority == {}: + return + + for peer in priority: + pQueue.put((priority[peer], peer)) + + # Distribute the shares to peers with the lowest priority. + for share in to_distribute: + peer = pQueue.get() + mappings[share] = set([peer[1]]) + pQueue.put((peer[0]+1, peer[1])) + + + def _convert_mappings(self, index_to_peer, index_to_share, maximum_graph): + """ + Now that a maximum spanning graph has been found, convert the indexes + back to their original ids so that the client can pass them to the + uploader. + """ + + converted_mappings = {} + for share in maximum_graph: + peer = maximum_graph[share] + if peer == None: + converted_mappings.setdefault(index_to_share[share], None) + else: + converted_mappings.setdefault(index_to_share[share], set([index_to_peer[peer]])) + return converted_mappings + + + def _servermap_flow_graph(self, peers, shares, servermap): + """ + Generates a flow network of peerIndices to shareIndices from a server map + of 'peer' -> ['shares']. According to Wikipedia, "a flow network is a + directed graph where each edge has a capacity and each edge receives a flow. + The amount of flow on an edge cannot exceed the capacity of the edge." This + is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm + converts the problem into a maximum flow problem. + """ + if servermap == {}: + return [] + + peer_to_index, index_to_peer = self._reindex(peers, 1) + share_to_index, index_to_share = self._reindex(shares, len(peers) + 1) + graph = [] + sink_num = len(peers) + len(shares) + 1 + graph.append([peer_to_index[peer] for peer in peers]) + for peer in peers: + indexedShares = [share_to_index[s] for s in servermap[peer]] + graph.insert(peer_to_index[peer], indexedShares) + for share in shares: + graph.insert(share_to_index[share], [sink_num]) + graph.append([]) + return graph + + + def _reindex(self, items, base): + """ + I take an iteratble of items and give each item an index to be used in + the construction of a flow network. Indices for these items start at base + and continue to base + len(items) - 1. + + I return two dictionaries: ({item: index}, {index: item}) + """ + item_to_index = {} + index_to_item = {} + for item in items: + item_to_index.setdefault(item, base) + index_to_item.setdefault(base, item) + base += 1 + return (item_to_index, index_to_item) + + + def _flow_network(self, peerIndices, shareIndices): + """ + Given set of peerIndices and a set of shareIndices, I create a flow network + to be used by _compute_maximum_graph. The return value is a two + dimensional list in the form of a flow network, where each index represents + a node, and the corresponding list represents all of the nodes it is connected + to. + + This function is similar to allmydata.util.happinessutil.flow_network_for, but + we connect every peer with all shares instead of reflecting a supplied servermap. + """ + graph = [] + # The first entry in our flow network is the source. + # Connect the source to every server. + graph.append(peerIndices) + sink_num = len(peerIndices + shareIndices) + 1 + # Connect every server with every share it can possibly store. + for peerIndex in peerIndices: + graph.insert(peerIndex, shareIndices) + # Connect every share with the sink. + for shareIndex in shareIndices: + graph.insert(shareIndex, [sink_num]) + # Add an empty entry for the sink. + graph.append([]) + return graph diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index 6f5224942..af422f173 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -21,11 +21,12 @@ from allmydata.util.rrefutil import add_version_to_remote_reference from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \ IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \ NoServersError, InsufficientVersionError, UploadUnhappinessError, \ - DEFAULT_MAX_SEGMENT_SIZE, IProgress + DEFAULT_MAX_SEGMENT_SIZE, IProgress, IPeerSelector from allmydata.immutable import layout from pycryptopp.cipher.aes import AES from cStringIO import StringIO +from happiness_upload import Happiness_Upload # this wants to live in storage, not here @@ -201,8 +202,68 @@ class ServerTracker: def str_shareloc(shnum, bucketwriter): return "%s: %s" % (shnum, bucketwriter.get_servername(),) +class PeerSelector(): + implements(IPeerSelector) + + def __init__(self, num_segments, total_shares, needed_shares, servers_of_happiness): + self.num_segments = num_segments + self.total_shares = total_shares + self.needed_shares = needed_shares + self.min_happiness = servers_of_happiness + + self.existing_shares = {} + self.confirmed_allocations = {} + self.peers = set() + self.full_peers = set() + self.bad_peers = set() + + def add_peer_with_share(self, peerid, shnum): + if peerid in self.existing_shares.keys(): + self.existing_shares[peerid].add(shnum) + else: + self.existing_shares[peerid] = set([shnum]) + + def confirm_share_allocation(self, shnum, peer): + self.confirmed_allocations.setdefault(shnum, set()).add(peer) + + def get_allocations(self): + return self.confirmed_allocations + + def add_peer(self, peerid): + self.peers.add(peerid) + + def mark_full_peer(self, peerid): + self.full_peers.add(peerid) + self.peers.remove(peerid) + + def mark_bad_peer(self, peerid): + if peerid in self.peers: + self.peers.remove(peerid) + self.bad_peers.add(peerid) + elif peerid in self.full_peers: + self.full_peers.remove(peerid) + self.bad_peers.add(peerid) + + def get_sharemap_of_preexisting_shares(self): + preexisting = dictutil.DictOfSets() + for server, shares in self.existing_shares.iteritems(): + for share in shares: + preexisting.add(share, server) + return preexisting + + def get_tasks(self): + shares = set(range(self.total_shares)) + self.h = Happiness_Upload(self.peers, self.full_peers, shares, self.existing_shares) + return self.h.generate_mappings() + + def is_healthy(self): + return self.min_happiness <= self.h.happiness() + + class Tahoe2ServerSelector(log.PrefixingLogMixin): + peer_selector_class = PeerSelector + def __init__(self, upload_id, logparent=None, upload_status=None): self.upload_id = upload_id self.query_count, self.good_query_count, self.bad_query_count = 0,0,0 @@ -215,6 +276,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id) self.log("starting", level=log.OPERATIONAL) + def __repr__(self): return "" % self.upload_id @@ -234,6 +296,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): if self._status: self._status.set_status("Contacting Servers..") + self.peer_selector = self.peer_selector_class(num_segments, total_shares, + needed_shares, servers_of_happiness) + self.total_shares = total_shares self.servers_of_happiness = servers_of_happiness self.needed_shares = needed_shares @@ -271,9 +336,15 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): v0 = server.get_rref().version v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"] return v1["maximum-immutable-share-size"] - writeable_servers = [server for server in all_servers + + candidate_servers = all_servers[:2*total_shares] + for server in candidate_servers: + self.peer_selector.add_peer(server.get_serverid()) + writeable_servers = [server for server in candidate_servers if _get_maxsize(server) >= allocated_size] - readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers) + readonly_servers = set(candidate_servers) - set(writeable_servers) + for server in readonly_servers: + self.peer_selector.mark_full_peer(server.get_serverid()) # decide upon the renewal/cancel secrets, to include them in the # allocate_buckets query. @@ -308,10 +379,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # second-pass list and repeat the "second" pass (really the third, # fourth, etc pass), until all shares are assigned, or we've run out # of potential servers. - self.first_pass_trackers = _make_trackers(writeable_servers) - self.second_pass_trackers = [] # servers worth asking again - self.next_pass_trackers = [] # servers that we have asked again - self._started_second_pass = False + write_trackers = _make_trackers(writeable_servers) # We don't try to allocate shares to these servers, since they've # said that they're incapable of storing shares of the size that we'd @@ -337,11 +405,28 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.query_count += 1 self.log("asking server %s for any existing shares" % (tracker.get_name(),), level=log.NOISY) + + for tracker in write_trackers: + assert isinstance(tracker, ServerTracker) + d = tracker.query(set()) + d.addBoth(self._handle_existing_write_response, tracker, set()) + ds.append(d) + self.num_servers_contacted += 1 + self.query_count += 1 + self.log("asking server %s for any existing shares" % + (tracker.get_name(),), level=log.NOISY) + + self.trackers = write_trackers + readonly_trackers + dl = defer.DeferredList(ds) - dl.addCallback(lambda ign: self._loop()) + dl.addCallback(lambda ign: self._calculate_tasks()) + dl.addCallback(lambda ign: self._request_another_allocation()) return dl + def _calculate_tasks(self): + self.tasks = self.peer_selector.get_tasks() + def _handle_existing_response(self, res, tracker): """ I handle responses to the queries sent by @@ -351,6 +436,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): if isinstance(res, failure.Failure): self.log("%s got error during existing shares check: %s" % (tracker.get_name(), res), level=log.UNUSUAL) + self.peer_selector.mark_bad_peer(serverid) self.error_count += 1 self.bad_query_count += 1 else: @@ -361,10 +447,27 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): % (tracker.get_name(), tuple(sorted(buckets))), level=log.NOISY) for bucket in buckets: + self.peer_selector.add_peer_with_share(serverid, bucket) self.preexisting_shares.setdefault(bucket, set()).add(serverid) self.homeless_shares.discard(bucket) - self.full_count += 1 - self.bad_query_count += 1 + + def _handle_existing_write_response(self, res, tracker, shares_to_ask): + """ + Function handles the response from the write servers + when inquiring about what shares each server already has. + """ + if isinstance(res, failure.Failure): + self.peer_selector.mark_bad_peer(tracker.get_serverid()) + self.log("%s got error during server selection: %s" % (tracker, res), + level=log.UNUSUAL) + self.homeless_shares |= shares_to_ask + + msg = ("last failure (from %s) was: %s" % (tracker, res)) + self.last_failure_msg = msg + else: + (alreadygot, allocated) = res + for share in alreadygot: + self.peer_selector.add_peer_with_share(tracker.get_serverid(), share) def _get_progress_message(self): @@ -386,12 +489,69 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.good_query_count, self.bad_query_count, self.full_count, self.error_count)) + def _get_next_allocation(self): + """ + Return the next share allocation that we need to make. - def _loop(self): - if not self.homeless_shares: - merged = merge_servers(self.preexisting_shares, self.use_trackers) - effective_happiness = servers_of_happiness(merged) - if self.servers_of_happiness <= effective_happiness: + Specifically, I return a tuple (tracker, shares_to_ask), where + tracker is a ServerTracker instance and shares_to_ask is a set of + shares that we should store on that server. If there are no more + allocations to make, I return None. + """ + + if len(self.trackers) == 0: + return None + + tracker = self.trackers.pop(0) + # TODO: don't pre-convert all serverids to ServerTrackers + assert isinstance(tracker, ServerTracker) + + shares_to_ask = set() + servermap = self.tasks + for shnum, tracker_id in servermap.items(): + if tracker_id == None: + continue + if tracker.get_serverid() in tracker_id: + shares_to_ask.add(shnum) + if shnum in self.homeless_shares: + self.homeless_shares.remove(shnum) + + if self._status: + self._status.set_status("Contacting Servers [%s] (first query)," + " %d shares left.." + % (tracker.get_name(), + len(self.homeless_shares))) + return (tracker, shares_to_ask) + + + def _request_another_allocation(self): + allocation = self._get_next_allocation() + if allocation is not None: + tracker, shares_to_ask = allocation + d = tracker.query(shares_to_ask) + d.addBoth(self._got_response, tracker, shares_to_ask) + return d + + else: + # no more servers. If we haven't placed enough shares, we fail. + merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers) + effective_happiness = servers_of_happiness(self.peer_selector.get_allocations()) + if effective_happiness < self.servers_of_happiness: + msg = failure_message(len(self.serverids_with_shares), + self.needed_shares, + self.servers_of_happiness, + effective_happiness) + msg = ("server selection failed for %s: %s (%s), merged=%s" % + (self, msg, self._get_progress_message(), + pretty_print_shnum_to_servers(merged))) + if self.last_failure_msg: + msg += " (%s)" % (self.last_failure_msg,) + self.log(msg, level=log.UNUSUAL) + return self._failed(msg) + else: + # we placed enough to be happy, so we're done + if self._status: + self._status.set_status("Placed all shares") msg = ("server selection successful for %s: %s: pretty_print_merged: %s, " "self.use_trackers: %s, self.preexisting_shares: %s") \ % (self, self._get_progress_message(), @@ -401,129 +561,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): for st in self.use_trackers], pretty_print_shnum_to_servers(self.preexisting_shares)) self.log(msg, level=log.OPERATIONAL) - return (self.use_trackers, self.preexisting_shares) - else: - # We're not okay right now, but maybe we can fix it by - # redistributing some shares. In cases where one or two - # servers has, before the upload, all or most of the - # shares for a given SI, this can work by allowing _loop - # a chance to spread those out over the other servers, - delta = self.servers_of_happiness - effective_happiness - shares = shares_by_server(self.preexisting_shares) - # Each server in shares maps to a set of shares stored on it. - # Since we want to keep at least one share on each server - # that has one (otherwise we'd only be making - # the situation worse by removing distinct servers), - # each server has len(its shares) - 1 to spread around. - shares_to_spread = sum([len(list(sharelist)) - 1 - for (server, sharelist) - in shares.items()]) - if delta <= len(self.first_pass_trackers) and \ - shares_to_spread >= delta: - items = shares.items() - while len(self.homeless_shares) < delta: - # Loop through the allocated shares, removing - # one from each server that has more than one - # and putting it back into self.homeless_shares - # until we've done this delta times. - server, sharelist = items.pop() - if len(sharelist) > 1: - share = sharelist.pop() - self.homeless_shares.add(share) - self.preexisting_shares[share].remove(server) - if not self.preexisting_shares[share]: - del self.preexisting_shares[share] - items.append((server, sharelist)) - for writer in self.use_trackers: - writer.abort_some_buckets(self.homeless_shares) - return self._loop() - else: - # Redistribution won't help us; fail. - server_count = len(self.serverids_with_shares) - failmsg = failure_message(server_count, - self.needed_shares, - self.servers_of_happiness, - effective_happiness) - servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s" - servmsg = servmsgtempl % ( - self, - failmsg, - self._get_progress_message(), - pretty_print_shnum_to_servers(merged) - ) - self.log(servmsg, level=log.INFREQUENT) - return self._failed("%s (%s)" % (failmsg, self._get_progress_message())) + return (self.use_trackers, self.peer_selector.get_sharemap_of_preexisting_shares()) - if self.first_pass_trackers: - tracker = self.first_pass_trackers.pop(0) - # TODO: don't pre-convert all serverids to ServerTrackers - assert isinstance(tracker, ServerTracker) - shares_to_ask = set(sorted(self.homeless_shares)[:1]) - self.homeless_shares -= shares_to_ask - self.query_count += 1 - self.num_servers_contacted += 1 - if self._status: - self._status.set_status("Contacting Servers [%s] (first query)," - " %d shares left.." - % (tracker.get_name(), - len(self.homeless_shares))) - d = tracker.query(shares_to_ask) - d.addBoth(self._got_response, tracker, shares_to_ask, - self.second_pass_trackers) - return d - elif self.second_pass_trackers: - # ask a server that we've already asked. - if not self._started_second_pass: - self.log("starting second pass", - level=log.NOISY) - self._started_second_pass = True - num_shares = mathutil.div_ceil(len(self.homeless_shares), - len(self.second_pass_trackers)) - tracker = self.second_pass_trackers.pop(0) - shares_to_ask = set(sorted(self.homeless_shares)[:num_shares]) - self.homeless_shares -= shares_to_ask - self.query_count += 1 - if self._status: - self._status.set_status("Contacting Servers [%s] (second query)," - " %d shares left.." - % (tracker.get_name(), - len(self.homeless_shares))) - d = tracker.query(shares_to_ask) - d.addBoth(self._got_response, tracker, shares_to_ask, - self.next_pass_trackers) - return d - elif self.next_pass_trackers: - # we've finished the second-or-later pass. Move all the remaining - # servers back into self.second_pass_trackers for the next pass. - self.second_pass_trackers.extend(self.next_pass_trackers) - self.next_pass_trackers[:] = [] - return self._loop() - else: - # no more servers. If we haven't placed enough shares, we fail. - merged = merge_servers(self.preexisting_shares, self.use_trackers) - effective_happiness = servers_of_happiness(merged) - if effective_happiness < self.servers_of_happiness: - msg = failure_message(len(self.serverids_with_shares), - self.needed_shares, - self.servers_of_happiness, - effective_happiness) - msg = ("server selection failed for %s: %s (%s)" % - (self, msg, self._get_progress_message())) - if self.last_failure_msg: - msg += " (%s)" % (self.last_failure_msg,) - self.log(msg, level=log.UNUSUAL) - return self._failed(msg) - else: - # we placed enough to be happy, so we're done - if self._status: - self._status.set_status("Placed all shares") - msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, - self._get_progress_message(), pretty_print_shnum_to_servers(merged))) - self.log(msg, level=log.OPERATIONAL) - return (self.use_trackers, self.preexisting_shares) - - def _got_response(self, res, tracker, shares_to_ask, put_tracker_here): + def _got_response(self, res, tracker, shares_to_ask): if isinstance(res, failure.Failure): # This is unusual, and probably indicates a bug or a network # problem. @@ -532,9 +573,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.error_count += 1 self.bad_query_count += 1 self.homeless_shares |= shares_to_ask - if (self.first_pass_trackers - or self.second_pass_trackers - or self.next_pass_trackers): + if (self.trackers): # there is still hope, so just loop pass else: @@ -553,6 +592,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): level=log.NOISY) progress = False for s in alreadygot: + self.peer_selector.confirm_share_allocation(s, tracker.get_serverid()) self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid()) if s in self.homeless_shares: self.homeless_shares.remove(s) @@ -565,6 +605,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): if allocated: self.use_trackers.add(tracker) progress = True + for s in allocated: + self.peer_selector.confirm_share_allocation(s, tracker.get_serverid()) if allocated or alreadygot: self.serverids_with_shares.add(tracker.get_serverid()) @@ -595,13 +637,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): self.homeless_shares |= still_homeless # Since they were unable to accept all of our requests, so it # is safe to assume that asking them again won't help. - else: - # if they *were* able to accept everything, they might be - # willing to accept even more. - put_tracker_here.append(tracker) + # now loop - return self._loop() + return self._request_another_allocation() def _failed(self, msg): diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index b73247eb5..171f63bcd 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -730,6 +730,80 @@ class IReadable(Interface): download-to-memory consumer. """ +class IPeerSelector(Interface): + """ + I select peers for an upload, maximizing some measure of health. + + I keep track of the state of a grid relative to a file. This means + that I know about all of the peers that parts of that file could be + placed on, and about shares that have been placed on those peers. + Given this, I assign shares to peers in a way that maximizes the + file's health according to whichever definition of health I am + programmed with. I tell the uploader whether or not my assignment is + healthy. I keep track of failures during the process and update my + conclusions appropriately. + """ + def add_peer_with_share(peerid, shnum): + """ + Update my internal state to reflect the fact that peer peerid + holds share shnum. Called for shares that are detected before + peer selection begins. + """ + + def confirm_share_allocation(peerid, shnum): + """ + Confirm that an allocated peer=>share pairing has been + successfully established. + """ + + def add_peers(peerids=set): + """ + Update my internal state to include the peers in peerids as + potential candidates for storing a file. + """ + + def mark_full_peer(peerid): + """ + Mark the peer peerid as full. This means that any + peer-with-share relationships I know about for peerid remain + valid, but that peerid will not be assigned any new shares. + """ + + def mark_bad_peer(peerid): + """ + Mark the peer peerid as bad. This is typically called when an + error is encountered when communicating with a peer. I will + disregard any existing peer => share relationships associated + with peerid, and will not attempt to assign it any more shares. + """ + + def get_tasks(): + """ + Return a tuple of tasks to our caller. + + Specifically, return (queries, placements), where queries and + allocations are both lists of things to do. Each query is a + request for our caller to ask a server about the shares it holds + for this upload; the results will be fed back into the + allocator. Each allocation is a request for some share or shares + to be placed on a server. Result may be None, in which case the + selector thinks that the share placement is as reliably or + correctly placed as it can be. + """ + + def is_healthy(): + """ + I return whether the share assignments I'm currently using + reflect a healthy file, based on my internal definitions. + """ + + def needs_recomputation(): + """ + I return True if the share assignments I last returned may have + become stale. This is a hint to the caller that they should call + get_share_assignments again. + """ + class IWriteable(Interface): """ diff --git a/src/allmydata/test/test_checker.py b/src/allmydata/test/test_checker.py index 8447e9c81..79b2fa406 100644 --- a/src/allmydata/test/test_checker.py +++ b/src/allmydata/test/test_checker.py @@ -401,14 +401,14 @@ class BalancingAct(GridTestMixin, unittest.TestCase): 0:[A] 1:[A] 2:[A] 3:[A,B,C,D,E] 4 good shares, but 5 good hosts After deleting all instances of share #3 and repairing: - 0:[A,B], 1:[A,C], 2:[A,D], 3:[E] - Still 4 good shares and 5 good hosts + 0:[A], 1:[A,B], 2:[C,A], 3:[E] + Still 4 good shares but now 4 good hosts """ d.addCallback(_check_and_repair) d.addCallback(_check_counts, 4, 5) d.addCallback(lambda _: self.delete_shares_numbered(self.uri, [3])) d.addCallback(_check_and_repair) - d.addCallback(_check_counts, 4, 5) + d.addCallback(_check_counts, 4, 4) d.addCallback(lambda _: [self.g.break_server(sid) for sid in self.g.get_all_serverids()]) d.addCallback(_check_and_repair) diff --git a/src/allmydata/test/test_download.py b/src/allmydata/test/test_download.py index 710d98ed1..03a85b1b8 100644 --- a/src/allmydata/test/test_download.py +++ b/src/allmydata/test/test_download.py @@ -295,7 +295,7 @@ class DownloadTest(_Base, unittest.TestCase): # find the shares that were used and delete them shares = self.n._cnode._node._shares shnums = sorted([s._shnum for s in shares]) - self.failUnlessEqual(shnums, [0,1,2,3]) + self.failUnlessEqual(shnums, [2,4,6,7]) # break the RIBucketReader references # (we don't break the RIStorageServer references, because that @@ -312,7 +312,7 @@ class DownloadTest(_Base, unittest.TestCase): self.failUnlessEqual("".join(c.chunks), plaintext) shares = self.n._cnode._node._shares shnums = sorted([s._shnum for s in shares]) - self.failIfEqual(shnums, [0,1,2,3]) + self.failIfEqual(shnums, [2,4,6,7]) d.addCallback(_check_failover) return d @@ -934,13 +934,13 @@ class Corruption(_Base, unittest.TestCase): log.msg("corrupt %d" % which) def _corruptor(s, debug=False): return s[:which] + chr(ord(s[which])^0x01) + s[which+1:] - self.corrupt_shares_numbered(imm_uri, [0], _corruptor) + self.corrupt_shares_numbered(imm_uri, [2], _corruptor) def _corrupt_set(self, ign, imm_uri, which, newvalue): log.msg("corrupt %d" % which) def _corruptor(s, debug=False): return s[:which] + chr(newvalue) + s[which+1:] - self.corrupt_shares_numbered(imm_uri, [0], _corruptor) + self.corrupt_shares_numbered(imm_uri, [2], _corruptor) def test_each_byte(self): # Setting catalog_detection=True performs an exhaustive test of the @@ -976,25 +976,25 @@ class Corruption(_Base, unittest.TestCase): def _got_data(data): self.failUnlessEqual(data, plaintext) shnums = sorted([s._shnum for s in n._cnode._node._shares]) - no_sh0 = bool(0 not in shnums) - sh0 = [s for s in n._cnode._node._shares if s._shnum == 0] - sh0_had_corruption = False - if sh0 and sh0[0].had_corruption: - sh0_had_corruption = True + no_sh2 = bool(2 not in shnums) + sh2 = [s for s in n._cnode._node._shares if s._shnum == 2] + sh2_had_corruption = False + if sh2 and sh2[0].had_corruption: + sh2_had_corruption = True num_needed = len(n._cnode._node._shares) if self.catalog_detection: - detected = no_sh0 or sh0_had_corruption or (num_needed!=3) + detected = no_sh2 or sh2_had_corruption or (num_needed!=3) if not detected: undetected.add(which, 1) - if expected == "no-sh0": - self.failIfIn(0, shnums) - elif expected == "0bad-need-3": - self.failIf(no_sh0) - self.failUnless(sh0[0].had_corruption) + if expected == "no-sh2": + self.failIfIn(2, shnums) + elif expected == "2bad-need-3": + self.failIf(no_sh2) + self.failUnless(sh2[0].had_corruption) self.failUnlessEqual(num_needed, 3) elif expected == "need-4th": - self.failIf(no_sh0) - self.failUnless(sh0[0].had_corruption) + self.failIf(no_sh2) + self.failUnless(sh2[0].had_corruption) self.failIfEqual(num_needed, 3) d.addCallback(_got_data) return d @@ -1012,23 +1012,20 @@ class Corruption(_Base, unittest.TestCase): # data-block-offset, and offset=48 is the first byte of the first # data-block). Each one also specifies what sort of corruption # we're expecting to see. - no_sh0_victims = [0,1,2,3] # container version + no_sh2_victims = [0,1,2,3] # container version need3_victims = [ ] # none currently in this category # when the offsets are corrupted, the Share will be unable to # retrieve the data it wants (because it thinks that data lives # off in the weeds somewhere), and Share treats DataUnavailable # as abandon-this-share, so in general we'll be forced to look # for a 4th share. - need_4th_victims = [12,13,14,15, # share version - 24,25,26,27, # offset[data] - 32,33,34,35, # offset[crypttext_hash_tree] - 36,37,38,39, # offset[block_hashes] - 44,45,46,47, # offset[UEB] + need_4th_victims = [12,13,14,15, # offset[data] + 24,25,26,27, # offset[block_hashes] ] - need_4th_victims.append(48) # block data + need_4th_victims.append(36) # block data # when corrupting hash trees, we must corrupt a value that isn't # directly set from somewhere else. Since we download data from - # seg0, corrupt something on its hash chain, like [2] (the + # seg2, corrupt something on its hash chain, like [2] (the # right-hand child of the root) need_4th_victims.append(600+2*32) # block_hashes[2] # Share.loop is pretty conservative: it abandons the share at the @@ -1039,15 +1036,15 @@ class Corruption(_Base, unittest.TestCase): # the following fields (which are present in multiple shares) # should fall into the "need3_victims" case instead of the # "need_4th_victims" case. - need_4th_victims.append(376+2*32) # crypttext_hash_tree[2] need_4th_victims.append(824) # share_hashes - need_4th_victims.append(994) # UEB length - need_4th_victims.append(998) # UEB - corrupt_me = ([(i,"no-sh0") for i in no_sh0_victims] + - [(i, "0bad-need-3") for i in need3_victims] + + corrupt_me = ([(i,"no-sh2") for i in no_sh2_victims] + + [(i, "2bad-need-3") for i in need3_victims] + [(i, "need-4th") for i in need_4th_victims]) if self.catalog_detection: - corrupt_me = [(i, "") for i in range(len(self.sh0_orig))] + share_len = len(self.shares.values()[0]) + corrupt_me = [(i, "") for i in range(share_len)] + # This is a work around for ticket #2024. + corrupt_me = corrupt_me[0:8]+corrupt_me[12:] for i,expected in corrupt_me: # All these tests result in a successful download. What we're # measuring is how many shares the downloader had to use. @@ -1055,7 +1052,7 @@ class Corruption(_Base, unittest.TestCase): d.addCallback(_download, imm_uri, i, expected) d.addCallback(lambda ign: self.restore_all_shares(self.shares)) d.addCallback(fireEventually) - corrupt_values = [(3, 2, "no-sh0"), + corrupt_values = [(3, 2, "no-sh2"), (15, 2, "need-4th"), # share looks v2 ] for i,newvalue,expected in corrupt_values: @@ -1066,9 +1063,10 @@ class Corruption(_Base, unittest.TestCase): return d d.addCallback(_uploaded) def _show_results(ign): + share_len = len(self.shares.values()[0]) print print ("of [0:%d], corruption ignored in %s" % - (len(self.sh0_orig), undetected.dump())) + (share_len, undetected.dump())) if self.catalog_detection: d.addCallback(_show_results) # of [0:2070], corruption ignored in len=1133: diff --git a/src/allmydata/test/test_hung_server.py b/src/allmydata/test/test_hung_server.py index e2d6f6a1d..1dbfee574 100644 --- a/src/allmydata/test/test_hung_server.py +++ b/src/allmydata/test/test_hung_server.py @@ -233,7 +233,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin, done = [] d = self._set_up(False, "test_5_overdue_immutable") def _reduce_max_outstanding_requests_and_download(ign): - self._hang_shares(range(5)) + self._hang_shares([2, 4, 6, 7, 3]) n = self.c0.create_node_from_uri(self.uri) n._cnode._maybe_create_download_node() self._sf = n._cnode._node._sharefinder diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py index ca7e0df50..c38ca3ebf 100644 --- a/src/allmydata/test/test_repairer.py +++ b/src/allmydata/test/test_repairer.py @@ -707,7 +707,7 @@ class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin, def _then(ign): ss = self.g.servers_by_number[0] self.g.break_server(ss.my_nodeid, count=1) - self.delete_shares_numbered(self.uri, [9]) + self.delete_shares_numbered(self.uri, [8]) return self.c0_filenode.check_and_repair(Monitor()) d.addCallback(_then) def _check(rr): diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index fadc66d4c..28a3e4fe4 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -11,7 +11,7 @@ import allmydata # for __full_version__ from allmydata import uri, monitor, client from allmydata.immutable import upload, encode from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError -from allmydata.util import log, base32 +from allmydata.util import log, base32, fileutil from allmydata.util.assertutil import precondition from allmydata.util.deferredutil import DeferredListShouldSucceed from allmydata.test.no_network import GridTestMixin @@ -425,33 +425,13 @@ class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin): d.addCallback(_check) return d - def test_second_error(self): - # we want to make sure we make it to a third pass. This means that - # the first pass was insufficient to place all shares, and at least - # one of second pass servers (other than the last one) accepted a - # share (so we'll believe that a third pass will be useful). (if - # everyone but the last server throws an error, then we'll send all - # the remaining shares to the last server at the end of the second - # pass, and if that succeeds, we won't make it to a third pass). - # - # we can achieve this 97.5% of the time by using 40 servers, having - # 39 of them fail on the second request, leaving only one to succeed - # on the second request. (we need to keep the number of servers low - # enough to ensure a second pass with 100 shares). - mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)]) - self.make_node(mode, 40) - d = upload_data(self.u, DATA) - d.addCallback(extract_uri) - d.addCallback(self._check_large, SIZE_LARGE) - return d - def test_second_error_all(self): self.make_node("second-fail") d = self.shouldFail(UploadUnhappinessError, "second_error_all", "server selection failed", upload_data, self.u, DATA) def _check((f,)): - self.failUnlessIn("placed 10 shares out of 100 total", str(f.value)) + self.failUnlessIn("placed 0 shares out of 100 total", str(f.value)) # there should also be a 'last failure was' message self.failUnlessIn("ServerError", str(f.value)) d.addCallback(_check) @@ -515,7 +495,7 @@ class ServerSelection(unittest.TestCase): for s in self.node.last_servers: allocated = s.allocated self.failUnlessEqual(len(allocated), 1) - self.failUnlessEqual(s.queries, 1) + self.failUnlessEqual(s.queries, 2) d.addCallback(_check) return d @@ -555,7 +535,7 @@ class ServerSelection(unittest.TestCase): allocated = s.allocated self.failUnless(len(allocated) in (1,2), len(allocated)) if len(allocated) == 1: - self.failUnlessEqual(s.queries, 1) + self.failUnlessEqual(s.queries, 2) got_one.append(s) else: self.failUnlessEqual(s.queries, 2) @@ -634,6 +614,21 @@ class ServerSelection(unittest.TestCase): d.addCallback(_check) return d + def test_number_of_servers_contacted(self): + # This tests ensures that Tahoe only contacts 2n servers + # during peer selection + self.make_client(40) + self.set_encoding_parameters(3, 7, 10) + data = self.get_data(SIZE_LARGE) + d = upload_data(self.u, data) + def _check(res): + servers_contacted = [] + for s in self.node.last_servers: + if(s.queries != 0): + servers_contacted.append(s) + self.failUnless(len(servers_contacted), 20) + d.addCallback(_check) + return d class StorageIndex(unittest.TestCase): def test_params_must_matter(self): @@ -1202,7 +1197,6 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d.addCallback(lambda ign: self.failUnless(self._has_happy_share_distribution())) return d - test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release." def test_happiness_with_some_readonly_servers(self): # Try the following layout @@ -1597,7 +1591,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d.addCallback(_setup) d.addCallback(lambda c: self.shouldFail(UploadUnhappinessError, "test_query_counting", - "10 queries placed some shares", + "0 queries placed some shares", c.upload, upload.Data("data" * 10000, convergence=""))) # Now try with some readonly servers. We want to make sure that @@ -1650,7 +1644,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d.addCallback(_next) d.addCallback(lambda c: self.shouldFail(UploadUnhappinessError, "test_query_counting", - "1 queries placed some shares", + "0 queries placed some shares", c.upload, upload.Data("data" * 10000, convergence=""))) return d @@ -1867,44 +1861,6 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d.addCallback(lambda ign: self.failUnless(self._has_happy_share_distribution())) return d - test_problem_layout_comment_187.todo = "this isn't fixed yet" - - def test_problem_layout_ticket_1118(self): - # #1118 includes a report from a user who hit an assertion in - # the upload code with this layout. - self.basedir = self.mktemp() - d = self._setup_and_upload(k=2, n=4) - - # server 0: no shares - # server 1: shares 0, 3 - # server 3: share 1 - # server 2: share 2 - # The order that they get queries is 0, 1, 3, 2 - def _setup(ign): - self._add_server(server_number=0) - self._add_server_with_share(server_number=1, share_number=0) - self._add_server_with_share(server_number=2, share_number=2) - self._add_server_with_share(server_number=3, share_number=1) - # Copy shares - self._copy_share_to_server(3, 1) - self.delete_all_shares(self.get_serverdir(0)) - client = self.g.clients[0] - client.encoding_params['happy'] = 4 - return client - - d.addCallback(_setup) - # Note: actually it should succeed! See - # test_problem_layout_ticket_1128. But ticket 1118 is just to - # make it realize that it has failed, so if it raises - # UploadUnhappinessError then we'll give it the green light - # for now. - d.addCallback(lambda ignored: - self.shouldFail(UploadUnhappinessError, - "test_problem_layout_ticket_1118", - "", - self.g.clients[0].upload, upload.Data("data" * 10000, - convergence=""))) - return d def test_problem_layout_ticket_1128(self): # #1118 includes a report from a user who hit an assertion in @@ -1936,7 +1892,6 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, d.addCallback(lambda ign: self.failUnless(self._has_happy_share_distribution())) return d - test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case." def test_upload_succeeds_with_some_homeless_shares(self): # If the upload is forced to stop trying to place shares before diff --git a/src/allmydata/util/happinessutil.py b/src/allmydata/util/happinessutil.py index b8e8b5421..253f9c8fd 100644 --- a/src/allmydata/util/happinessutil.py +++ b/src/allmydata/util/happinessutil.py @@ -126,8 +126,8 @@ def servers_of_happiness(sharemap): """ if sharemap == {}: return 0 - sharemap = shares_by_server(sharemap) - graph = flow_network_for(sharemap) + servermap = shares_by_server(sharemap) + graph = flow_network_for(servermap) # This is an implementation of the Ford-Fulkerson method for finding # a maximum flow in a flow network applied to a bipartite graph. # Specifically, it is the Edmonds-Karp algorithm, since it uses a @@ -154,7 +154,7 @@ def servers_of_happiness(sharemap): flow_function[v][u] -= delta residual_graph, residual_function = residual_network(graph, flow_function) - num_servers = len(sharemap) + num_servers = len(servermap) # The value of a flow is the total flow out of the source vertex # (vertex 0, in our graph). We could just as well sum across all of # f[0], but we know that vertex 0 only has edges to the servers in @@ -163,14 +163,14 @@ def servers_of_happiness(sharemap): # matching on the bipartite graph described above. return sum([flow_function[0][v] for v in xrange(1, num_servers+1)]) -def flow_network_for(sharemap): +def flow_network_for(servermap): """ I take my argument, a dict of peerid -> set(shareid) mappings, and turn it into a flow network suitable for use with Edmonds-Karp. I then return the adjacency list representation of that network. Specifically, I build G = (V, E), where: - V = { peerid in sharemap } U { shareid in sharemap } U {s, t} + V = { peerid in servermap } U { shareid in servermap } U {s, t} E = {(s, peerid) for each peerid} U {(peerid, shareid) if peerid is to store shareid } U {(shareid, t) for each shareid} @@ -185,16 +185,16 @@ def flow_network_for(sharemap): # we re-index so that all of our vertices have integral indices, and # that there aren't any holes. We start indexing at 1, so that we # can add a source node at index 0. - sharemap, num_shares = reindex(sharemap, base_index=1) - num_servers = len(sharemap) + servermap, num_shares = reindex(servermap, base_index=1) + num_servers = len(servermap) graph = [] # index -> [index], an adjacency list # Add an entry at the top (index 0) that has an edge to every server - # in sharemap - graph.append(sharemap.keys()) + # in servermap + graph.append(servermap.keys()) # For each server, add an entry that has an edge to every share that it # contains (or will contain). - for k in sharemap: - graph.append(sharemap[k]) + for k in servermap: + graph.append(servermap[k]) # For each share, add an entry that has an edge to the sink. sink_num = num_servers + num_shares + 1 for i in xrange(num_shares): @@ -203,20 +203,20 @@ def flow_network_for(sharemap): graph.append([]) return graph -def reindex(sharemap, base_index): +def reindex(servermap, base_index): """ - Given sharemap, I map peerids and shareids to integers that don't + Given servermap, I map peerids and shareids to integers that don't conflict with each other, so they're useful as indices in a graph. I - return a sharemap that is reindexed appropriately, and also the - number of distinct shares in the resulting sharemap as a convenience + return a servermap that is reindexed appropriately, and also the + number of distinct shares in the resulting servermap as a convenience for my caller. base_index tells me where to start indexing. """ shares = {} # shareid -> vertex index num = base_index - ret = {} # peerid -> [shareid], a reindexed sharemap. + ret = {} # peerid -> [shareid], a reindexed servermap. # Number the servers first - for k in sharemap: - ret[num] = sharemap[k] + for k in servermap: + ret[num] = servermap[k] num += 1 # Number the shares for k in ret: