From a611673934c4c35e75a9a3c97dd68aa3b8554873 Mon Sep 17 00:00:00 2001 From: David Stainton Date: Wed, 1 Feb 2017 18:55:37 +0000 Subject: [PATCH] Make a correction to a hypothesis test comment Comment out all debug print statements Add hypothesis tests for the old servers of happiness implementation Attempt to speed up meejah's servers of happiness WIP Fix test_calc_happy WIP --- integration/test_hypothesis_happiness.py | 7 +- integration/test_hypothesis_old_happiness.py | 56 ++ src/allmydata/immutable/happiness_upload.py | 883 +++++-------------- src/allmydata/test/test_happiness.py | 98 +- 4 files changed, 311 insertions(+), 733 deletions(-) create mode 100644 integration/test_hypothesis_old_happiness.py diff --git a/integration/test_hypothesis_happiness.py b/integration/test_hypothesis_happiness.py index 0d79ea36b..87dded9e5 100644 --- a/integration/test_hypothesis_happiness.py +++ b/integration/test_hypothesis_happiness.py @@ -12,7 +12,8 @@ from allmydata.immutable import happiness_upload ) def test_hypothesis_unhappy(peers, shares): """ - similar to test_unhappy we test that the resulting happiness is always 4 since the size of peers is 4. + similar to test_unhappy we test that the resulting happiness is + always 4 since the size of peers is 4. """ # https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets # hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source] @@ -31,7 +32,9 @@ def test_hypothesis_unhappy(peers, shares): ) def test_more_hypothesis(peers, shares): """ - similar to test_unhappy we test that the resulting happiness is always 4 since the size of peers is 4. + similar to test_unhappy we test that the resulting happiness is + always either the number of peers or the number of shares + whichever is smaller. """ # https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets # hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source] diff --git a/integration/test_hypothesis_old_happiness.py b/integration/test_hypothesis_old_happiness.py new file mode 100644 index 000000000..729526e27 --- /dev/null +++ b/integration/test_hypothesis_old_happiness.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- + +from hypothesis import given +from hypothesis.strategies import text, sets +from allmydata.immutable import happiness_upload + + +@given( + sets(elements=text(min_size=1), min_size=4, max_size=4), + sets(elements=text(min_size=1), min_size=4), +) +def test_hypothesis_old_unhappy(peers, shares): + """ + similar to test_unhappy we test that the resulting happiness is + always 4 since the size of peers is 4. + """ + # https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets + # hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source] + readonly_peers = set() + peers_to_shares = {} + h = happiness_upload.HappinessUpload(peers, readonly_peers, shares, peers_to_shares) + places = h.generate_mappings() + assert set(places.keys()) == shares + assert h.happiness() == 4 + + +@given( + sets(elements=text(min_size=1), min_size=1, max_size=10), + # can we make a readonly_peers that's a subset of ^ + sets(elements=text(min_size=1), min_size=1, max_size=20), +) +def test_hypothesis_old_more_happiness(peers, shares): + """ + similar to test_unhappy we test that the resulting happiness is + always either the number of peers or the number of shares + whichever is smaller. + """ + # https://hypothesis.readthedocs.io/en/latest/data.html#hypothesis.strategies.sets + # hypothesis.strategies.sets(elements=None, min_size=None, average_size=None, max_size=None)[source] + # XXX would be nice to paramaterize these by hypothesis too + readonly_peers = set() + peers_to_shares = {} + h = happiness_upload.HappinessUpload(peers, readonly_peers, shares, peers_to_shares) + places = h.generate_mappings() + happiness = h.happiness() + + # every share should get placed + assert set(places.keys()) == shares + + # we should only use peers that exist + assert set(map(lambda x: list(x)[0], places.values())).issubset(peers) # XXX correct? + + # if we have more shares than peers, happiness is at most # of + # peers; if we have fewer shares than peers happiness is capped at + # # of peers. + assert happiness == min(len(peers), len(shares)) diff --git a/src/allmydata/immutable/happiness_upload.py b/src/allmydata/immutable/happiness_upload.py index 33deba19a..91447b89b 100644 --- a/src/allmydata/immutable/happiness_upload.py +++ b/src/allmydata/immutable/happiness_upload.py @@ -1,4 +1,7 @@ +from Queue import PriorityQueue + + def augmenting_path_for(graph): """ I return an augmenting path, if there is one, from the source node @@ -73,6 +76,149 @@ def residual_network(graph, f): cf[v][i] = -1 return (new_graph, cf) +def calculate_happiness(mappings): + """ + I return the happiness of the mappings + """ + happy = 0 + for share in mappings: + if mappings[share] is not None: + happy += 1 + return happy + +def _calculate_mappings(peers, shares, servermap=None): + """ + Given a set of peers, a set of shares, and a dictionary of server -> + set(shares), determine how the uploader should allocate shares. If a + servermap is supplied, determine which existing allocations should be + preserved. If servermap is None, calculate the maximum matching of the + bipartite graph (U, V, E) such that: + + U = peers + V = shares + E = peers x shares + + Returns a dictionary {share -> set(peer)}, indicating that the share + should be placed on each peer in the set. If a share's corresponding + value is None, the share can be placed on any server. Note that the set + of peers should only be one peer when returned, but it is possible to + duplicate shares by adding additional servers to the set. + """ + peer_to_index, index_to_peer = _reindex(peers, 1) + share_to_index, index_to_share = _reindex(shares, len(peers) + 1) + shareIndices = [share_to_index[s] for s in shares] + if servermap: + graph = _servermap_flow_graph(peers, shares, servermap) + else: + peerIndices = [peer_to_index[peer] for peer in peers] + graph = _flow_network(peerIndices, shareIndices) + max_graph = _compute_maximum_graph(graph, shareIndices) + return _convert_mappings(index_to_peer, index_to_share, max_graph) + + +def _compute_maximum_graph(graph, shareIndices): + """ + This is an implementation of the Ford-Fulkerson method for finding + a maximum flow in a flow network applied to a bipartite graph. + Specifically, it is the Edmonds-Karp algorithm, since it uses a + BFS to find the shortest augmenting path at each iteration, if one + exists. + + The implementation here is an adapation of an algorithm described in + "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. + """ + + if graph == []: + return {} + + dim = len(graph) + flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] + residual_graph, residual_function = residual_network(graph, flow_function) + + while augmenting_path_for(residual_graph): + path = augmenting_path_for(residual_graph) + # Delta is the largest amount that we can increase flow across + # all of the edges in path. Because of the way that the residual + # function is constructed, f[u][v] for a particular edge (u, v) + # is the amount of unused capacity on that edge. Taking the + # minimum of a list of those values for each edge in the + # augmenting path gives us our delta. + delta = min(map(lambda (u, v), rf=residual_function: rf[u][v], + path)) + for (u, v) in path: + flow_function[u][v] += delta + flow_function[v][u] -= delta + residual_graph, residual_function = residual_network(graph,flow_function) + + new_mappings = {} + for shareIndex in shareIndices: + peer = residual_graph[shareIndex] + if peer == [dim - 1]: + new_mappings.setdefault(shareIndex, None) + else: + new_mappings.setdefault(shareIndex, peer[0]) + + return new_mappings + + +def _extract_ids(mappings): + shares = set() + peers = set() + for share in mappings: + if mappings[share] == None: + pass + else: + shares.add(share) + for item in mappings[share]: + peers.add(item) + return (peers, shares) + +def _distribute_homeless_shares(mappings, homeless_shares, peers_to_shares): + """ + Shares which are not mapped to a peer in the maximum spanning graph + still need to be placed on a server. This function attempts to + distribute those homeless shares as evenly as possible over the + available peers. If possible a share will be placed on the server it was + originally on, signifying the lease should be renewed instead. + """ + #print "mappings, homeless_shares, peers_to_shares %s %s %s" % (mappings, homeless_shares, peers_to_shares) + servermap_peerids = set([key for key in peers_to_shares]) + servermap_shareids = set() + for key in peers_to_shares: + for share in peers_to_shares[key]: + servermap_shareids.add(share) + + # First check to see if the leases can be renewed. + to_distribute = set() + for share in homeless_shares: + if share in servermap_shareids: + for peerid in peers_to_shares: + if share in peers_to_shares[peerid]: + mappings[share] = set([peerid]) + break + else: + to_distribute.add(share) + # This builds a priority queue of peers with the number of shares + # each peer holds as the priority. + priority = {} + pQueue = PriorityQueue() + for peerid in servermap_peerids: + priority.setdefault(peerid, 0) + for share in mappings: + if mappings[share] is not None: + for peer in mappings[share]: + if peer in servermap_peerids: + priority[peer] += 1 + if priority == {}: + return + for peerid in priority: + pQueue.put((priority[peerid], peerid)) + # Distribute the shares to peers with the lowest priority. + for share in to_distribute: + peer = pQueue.get() + mappings[share] = set([peer[1]]) + pQueue.put((peer[0]+1, peer[1])) + def _convert_mappings(index_to_peer, index_to_share, maximum_graph): """ Now that a maximum spanning graph has been found, convert the indexes @@ -89,51 +235,56 @@ def _convert_mappings(index_to_peer, index_to_share, maximum_graph): converted_mappings.setdefault(index_to_share[share], set([index_to_peer[peer]])) return converted_mappings -def _compute_maximum_graph(graph, shareIndices): + +def _servermap_flow_graph(peers, shares, servermap): """ - This is an implementation of the Ford-Fulkerson method for finding - a maximum flow in a flow network applied to a bipartite graph. - Specifically, it is the Edmonds-Karp algorithm, since it uses a - breadth-first search to find the shortest augmenting path at each - iteration, if one exists. - - The implementation here is an adapation of an algorithm described in - "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. + Generates a flow network of peerIndices to shareIndices from a server map + of 'peer' -> ['shares']. According to Wikipedia, "a flow network is a + directed graph where each edge has a capacity and each edge receives a flow. + The amount of flow on an edge cannot exceed the capacity of the edge." This + is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm + converts the problem into a maximum flow problem. """ + if servermap == {}: + return [] - if graph == []: - return {} + peer_to_index, index_to_peer = _reindex(peers, 1) + share_to_index, index_to_share = _reindex(shares, len(peers) + 1) + graph = [] + indexedShares = [] + sink_num = len(peers) + len(shares) + 1 + graph.append([peer_to_index[peer] for peer in peers]) + #print "share_to_index %s" % share_to_index + #print "servermap %s" % servermap + for peer in peers: + print "peer %s" % peer + if servermap.has_key(peer): + for s in servermap[peer]: + if share_to_index.has_key(s): + indexedShares.append(share_to_index[s]) + graph.insert(peer_to_index[peer], indexedShares) + for share in shares: + graph.insert(share_to_index[share], [sink_num]) + graph.append([]) + return graph - dim = len(graph) - flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] - residual_graph, residual_function = residual_network(graph, flow_function) - path = augmenting_path_for(residual_graph) - while path: - # Delta is the largest amount that we can increase flow across - # all of the edges in path. Because of the way that the residual - # function is constructed, f[u][v] for a particular edge (u, v) - # is the amount of unused capacity on that edge. Taking the - # minimum of a list of those values for each edge in the - # augmenting path gives us our delta. - delta = min(map(lambda (u, v), rf=residual_function: rf[u][v], - path)) - for (u, v) in path: - flow_function[u][v] += delta - flow_function[v][u] -= delta - residual_graph, residual_function = residual_network(graph,flow_function) - path = augmenting_path_for(residual_graph) - print('loop', len(residual_graph)) +def _reindex(items, base): + """ + I take an iteratble of items and give each item an index to be used in + the construction of a flow network. Indices for these items start at base + and continue to base + len(items) - 1. - new_mappings = {} - for shareIndex in shareIndices: - peer = residual_graph[shareIndex] - if peer == [dim - 1]: - new_mappings.setdefault(shareIndex, None) - else: - new_mappings.setdefault(shareIndex, peer[0]) + I return two dictionaries: ({item: index}, {index: item}) + """ + item_to_index = {} + index_to_item = {} + for item in items: + item_to_index.setdefault(item, base) + index_to_item.setdefault(base, item) + base += 1 + return (item_to_index, index_to_item) - return new_mappings def _flow_network(peerIndices, shareIndices): """ @@ -161,619 +312,65 @@ def _flow_network(peerIndices, shareIndices): graph.append([]) return graph -def _servermap_flow_graph(peers, shares, servermap): +def share_placement(peers, readonly_peers, shares, peers_to_shares): """ - Generates a flow network of peerIndices to shareIndices from a server map - of 'peer' -> ['shares']. According to Wikipedia, "a flow network is a - directed graph where each edge has a capacity and each edge receives a flow. - The amount of flow on an edge cannot exceed the capacity of the edge." This - is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm - converts the problem into a maximum flow problem. + Generates the allocations the upload should based on the given + information. We construct a dictionary of 'share_num' -> set(server_ids) + and return it to the caller. Each share should be placed on each server + in the corresponding set. Existing allocations appear as placements + because attempting to place an existing allocation will renew the share. + + For more information on the algorithm this class implements, refer to + docs/specifications/servers-of-happiness.rst """ - if servermap == {}: - return [] + homeless_shares = set() - peer_to_index, index_to_peer = _reindex(peers, 1) - share_to_index, index_to_share = _reindex(shares, len(peers) + 1) - graph = [] - sink_num = len(peers) + len(shares) + 1 - graph.append([peer_to_index[peer] for peer in peers]) - for peer in peers: - indexedShares = [share_to_index[s] for s in servermap[peer]] - graph.insert(peer_to_index[peer], indexedShares) - for share in shares: - graph.insert(share_to_index[share], [sink_num]) - graph.append([]) - return graph + # First calculate share placement for the readonly servers. + readonly_peers = readonly_peers + readonly_shares = set() + readonly_map = {} + for peer in peers_to_shares: + if peer in readonly_peers: + readonly_map.setdefault(peer, peers_to_shares[peer]) + for share in peers_to_shares[peer]: + readonly_shares.add(share) -def _reindex(items, base): - """ - I take an iteratble of items and give each item an index to be used in - the construction of a flow network. Indices for these items start at base - and continue to base + len(items) - 1. + readonly_mappings = _calculate_mappings(readonly_peers, readonly_shares, readonly_map) + used_peers, used_shares = _extract_ids(readonly_mappings) - I return two dictionaries: ({item: index}, {index: item}) - """ - item_to_index = {} - index_to_item = {} - for item in items: - item_to_index.setdefault(item, base) - index_to_item.setdefault(base, item) - base += 1 - return (item_to_index, index_to_item) + # Calculate share placement for the remaining existing allocations + new_peers = set(peers) - used_peers + # Squash a list of sets into one set + new_shares = shares - used_shares -def _maximum_matching_graph(graph, servermap): - """ - :param graph: an iterable of (server, share) 2-tuples - - Calculate the maximum matching of the bipartite graph (U, V, E) - such that: - - U = peers - V = shares - E = peers x shares - - Returns a dictionary {share -> set(peer)}, indicating that the share - should be placed on each peer in the set. If a share's corresponding - value is None, the share can be placed on any server. Note that the set - of peers should only be one peer when returned. - """ - peers = [x[0] for x in graph] - shares = [x[1] for x in graph] - peer_to_index, index_to_peer = _reindex(peers, 1) - share_to_index, index_to_share = _reindex(shares, len(peers) + 1) - shareIndices = [share_to_index[s] for s in shares] - if servermap: - graph = _servermap_flow_graph(peers, shares, servermap) - else: - peerIndices = [peer_to_index[peer] for peer in peers] - graph = _flow_network(peerIndices, shareIndices) - max_graph = _compute_maximum_graph(graph, shareIndices) - return _convert_mappings(index_to_peer, index_to_share, max_graph) - - -def _filter_g3(g3, m1, m2): - """ - This implements the last part of 'step 6' in the spec, "Then - remove (from G3) any servers and shares used in M1 or M2 (note - that we retain servers/shares that were in G1/G2 but *not* in the - M1/M2 subsets)" - """ - sequence = m1.values() + m2.values() - sequence = filter(lambda x: x is not None, sequence) - if len(sequence) == 0: - return g3 - m12_servers = reduce(lambda a, b: a.union(b), sequence) - # m1 and m2 may contain edges like "peer -> None" but those - # shouldn't be considered "actual mappings" by this removal - # algorithm (i.e. an edge "peer0 -> None" means there's nothing - # placed on peer0) - m12_shares = set( - [k for k, v in m1.items() if v] + - [k for k, v in m2.items() if v] - ) - new_g3 = set() - for edge in g3: - if edge[0] not in m12_servers and edge[1] not in m12_shares: - new_g3.add(edge) - return new_g3 - - -def _merge_dicts(result, inc): - """ - given two dicts mapping key -> set(), merge the *values* of the - 'inc' dict into the value of the 'result' dict if the value is not - None. - - Note that this *mutates* 'result' - """ - for k, v in inc.items(): - existing = result.get(k, None) - if existing is None: - result[k] = v - elif v is not None: - result[k] = existing.union(v) - - -def calculate_happiness(mappings): - """ - I calculate the happiness of the generated mappings - """ - unique_peers = {v for k, v in mappings.items()} - return len(unique_peers) - - -def share_placement(peers, readonly_peers, shares, peers_to_shares={}): - """ - :param servers: ordered list of servers, "Maybe *2N* of them." - """ - if False: - print("peers:", peers) - print("readonly:", readonly_peers) - print("shares:", shares) - print("peers_to_shares:", peers_to_shares) - # "2. Construct a bipartite graph G1 of *readonly* servers to pre-existing - # shares, where an edge exists between an arbitrary readonly server S and an - # arbitrary share T if and only if S currently holds T." - g1 = set() - for share in shares: - for server in peers: - if server in readonly_peers and share in peers_to_shares.get(server, set()): - g1.add((server, share)) - - # 3. Calculate a maximum matching graph of G1 (a set of S->T edges that has or - # is-tied-for the highest "happiness score"). There is a clever efficient - # algorithm for this, named "Ford-Fulkerson". There may be more than one - # maximum matching for this graph; we choose one of them arbitrarily, but - # prefer earlier servers. Call this particular placement M1. The placement - # maps shares to servers, where each share appears at most once, and each - # server appears at most once. - m1 = _maximum_matching_graph(g1, peers_to_shares) - if False: - print("G1:") - for k, v in g1: - print(" {}: {}".format(k, v)) - print("M1:") - for k, v in m1.items(): - print(" {}: {}".format(k, v)) - - # 4. Construct a bipartite graph G2 of readwrite servers to pre-existing - # shares. Then remove any edge (from G2) that uses a server or a share found - # in M1. Let an edge exist between server S and share T if and only if S - # already holds T. - g2 = set() - for g2_server, g2_shares in peers_to_shares.items(): - for share in g2_shares: - g2.add((g2_server, share)) - - for server, share in m1.items(): - for g2server, g2share in g2: - if g2server == server or g2share == share: - g2.remove((g2server, g2share)) - - # 5. Calculate a maximum matching graph of G2, call this M2, again preferring - # earlier servers. - - m2 = _maximum_matching_graph(g2, peers_to_shares) - - if False: - print("G2:") - for k, v in g2: - print(" {}: {}".format(k, v)) - print("M2:") - for k, v in m2.items(): - print(" {}: {}".format(k, v)) - - # 6. Construct a bipartite graph G3 of (only readwrite) servers to - # shares (some shares may already exist on a server). Then remove - # (from G3) any servers and shares used in M1 or M2 (note that we - # retain servers/shares that were in G1/G2 but *not* in the M1/M2 - # subsets) - - # meejah: does that last sentence mean remove *any* edge with any - # server in M1?? or just "remove any edge found in M1/M2"? (Wait, - # is that last sentence backwards? G1 a subset of M1?) - readwrite = set(peers).difference(set(readonly_peers)) - g3 = [ - (server, share) for server in readwrite for share in shares - ] - g3 = _filter_g3(g3, m1, m2) - if False: - print("G3:") - for srv, shr in g3: - print(" {}->{}".format(srv, shr)) - - # 7. Calculate a maximum matching graph of G3, call this M3, preferring earlier - # servers. The final placement table is the union of M1+M2+M3. - - m3 = _maximum_matching_graph(g3, {})#, peers_to_shares) - - answer = { - k: None for k in shares - } - if False: - print("m1", m1) - print("m2", m2) - print("m3", m3) - _merge_dicts(answer, m1) - _merge_dicts(answer, m2) - _merge_dicts(answer, m3) - - # anything left over that has "None" instead of a 1-set of peers - # should be part of the "evenly distribute amongst readwrite - # servers" thing. - - # See "Properties of Upload Strategy of Happiness" in the spec: - # "The size of the maximum bipartite matching is bounded by the size of the smaller - # set of vertices. Therefore in a situation where the set of servers is smaller - # than the set of shares, placement is not generated for a subset of shares. In - # this case the remaining shares are distributed as evenly as possible across the - # set of writable servers." - - # if we have any readwrite servers at all, we can place any shares - # that didn't get placed -- otherwise, we can't. - if readwrite: - def peer_generator(): - while True: - for peer in readwrite: - yield peer - round_robin_peers = peer_generator() - for k, v in answer.items(): - if v is None: - answer[k] = {next(round_robin_peers)} - - new_answer = dict() - for k, v in answer.items(): - new_answer[k] = list(v)[0] if v else None - return new_answer - - - -# putting mark-berger code back in to see if it's slow too -from Queue import PriorityQueue -from allmydata.util.happinessutil import augmenting_path_for, residual_network - -class Happiness_Upload: - """ - I handle the calculations involved with generating the maximum - spanning graph for a file when given a set of peerids, shareids, and - a servermap of 'peerid' -> [shareids]. Mappings are returned in a - dictionary of 'shareid' -> 'peerid' - """ - - def __init__(self, peerids, readonly_peers, shareids, servermap={}): - self.happy = 0 - self.homeless_shares = set() - self.peerids = peerids - self.readonly_peers = readonly_peers - self.shareids = shareids - self.servermap = servermap - self.servermap_peerids = set([key for key in servermap]) - self.servermap_shareids = set() - for key in servermap: - for share in servermap[key]: - self.servermap_shareids.add(share) - - - def happiness(self): - return self.happy - - - def generate_mappings(self): - """ - Generate a flow network of peerids to existing shareids and find - its maximum spanning graph. The leases of these shares should be renewed - by the client. - """ - - # 2. Construct a bipartite graph G1 of *readonly* servers to pre-existing - # shares, where an edge exists between an arbitrary readonly server S and an - # arbitrary share T if and only if S currently holds T. - - # First find the maximum spanning of the readonly servers. - readonly_peers = self.readonly_peers - readonly_shares = set() - readonly_map = {} - for peer in self.servermap: - if peer in self.readonly_peers: - readonly_map.setdefault(peer, self.servermap[peer]) - for share in self.servermap[peer]: - readonly_shares.add(share) - - peer_to_index = self._index_peers(readonly_peers, 1) - share_to_index, index_to_share = self._reindex_shares(readonly_shares, - len(readonly_peers) + 1) - # "graph" is G1 - graph = self._servermap_flow_graph(readonly_peers, readonly_shares, readonly_map) - shareids = [share_to_index[s] for s in readonly_shares] - max_graph = self._compute_maximum_graph(graph, shareids) - - # 3. Calculate a maximum matching graph of G1 (a set of S->T edges that has or - # is-tied-for the highest "happiness score"). There is a clever efficient - # algorithm for this, named "Ford-Fulkerson". There may be more than one - # maximum matching for this graph; we choose one of them arbitrarily, but - # prefer earlier servers. Call this particular placement M1. The placement - # maps shares to servers, where each share appears at most once, and each - # server appears at most once. - - # "max_graph" is M1 and is a dict which maps shares -> peer - # (but "one" of the many arbitrary mappings that give us "max - # happiness" of the existing placed shares) - readonly_mappings = self._convert_mappings(peer_to_index, - index_to_share, max_graph) - - used_peers, used_shares = self._extract_ids(readonly_mappings) - - print("readonly mappings") - for k, v in readonly_mappings.items(): - print(" {} -> {}".format(k, v)) - - # 4. Construct a bipartite graph G2 of readwrite servers to pre-existing - # shares. Then remove any edge (from G2) that uses a server or a share found - # in M1. Let an edge exist between server S and share T if and only if S - # already holds T. - - # Now find the maximum matching for the rest of the existing allocations. - # Remove any peers and shares used in readonly_mappings. - peers = self.servermap_peerids - used_peers - shares = self.servermap_shareids - used_shares - servermap = self.servermap.copy() - for peer in self.servermap: - if peer in used_peers: + servermap = peers_to_shares.copy() + for peer in peers_to_shares: + if peer in used_peers: + servermap.pop(peer, None) + else: + servermap[peer] = set(servermap[peer]) - used_shares + if servermap[peer] == set(): servermap.pop(peer, None) - else: - servermap[peer] = servermap[peer] - used_shares - if servermap[peer] == set(): - servermap.pop(peer, None) - peers.remove(peer) + new_peers.remove(peer) - # 5. Calculate a maximum matching graph of G2, call this M2, again preferring - # earlier servers. + existing_mappings = _calculate_mappings(new_peers, new_shares, servermap) + existing_peers, existing_shares = _extract_ids(existing_mappings) - # Reindex and find the maximum matching of the graph. - peer_to_index = self._index_peers(peers, 1) - share_to_index, index_to_share = self._reindex_shares(shares, len(peers) + 1) - graph = self._servermap_flow_graph(peers, shares, servermap) - shareids = [share_to_index[s] for s in shares] - max_server_graph = self._compute_maximum_graph(graph, shareids) - existing_mappings = self._convert_mappings(peer_to_index, - index_to_share, max_server_graph) - # "max_server_graph" is M2 - - print("existing mappings") - for k, v in existing_mappings.items(): - print(" {} -> {}".format(k, v)) - - # 6. Construct a bipartite graph G3 of (only readwrite) servers to - # shares (some shares may already exist on a server). Then remove - # (from G3) any servers and shares used in M1 or M2 (note that we - # retain servers/shares that were in G1/G2 but *not* in the M1/M2 - # subsets) - - existing_peers, existing_shares = self._extract_ids(existing_mappings) - peers = self.peerids - existing_peers - used_peers - shares = self.shareids - existing_shares - used_shares - - # Generate a flow network of peerids to shareids for all peers - # and shares which cannot be reused from previous file allocations. - # These mappings represent new allocations the uploader must make. - peer_to_index = self._index_peers(peers, 1) - share_to_index, index_to_share = self._reindex_shares(shares, len(peers) + 1) - peerids = [peer_to_index[peer] for peer in peers] - shareids = [share_to_index[share] for share in shares] - graph = self._flow_network(peerids, shareids) - - # XXX I think the above is equivalent to step 6, except - # instead of "construct, then remove" the above is just - # "remove all used peers, shares and then construct graph" - - # 7. Calculate a maximum matching graph of G3, call this M3, preferring earlier - # servers. The final placement table is the union of M1+M2+M3. - - max_graph = self._compute_maximum_graph(graph, shareids) - new_mappings = self._convert_mappings(peer_to_index, index_to_share, - max_graph) - - print("new mappings") - for k, v in new_mappings.items(): - print(" {} -> {}".format(k, v)) - - # "the final placement table" - mappings = dict(readonly_mappings.items() + existing_mappings.items() - + new_mappings.items()) - self._calculate_happiness(mappings) - if len(self.homeless_shares) != 0: - self._distribute_homeless_shares(mappings) - - return mappings + # Calculate share placement for the remaining peers and shares which + # won't be preserved by existing allocations. + new_peers = new_peers - existing_peers - used_peers - def _compute_maximum_graph(self, graph, shareids): - """ - This is an implementation of the Ford-Fulkerson method for finding - a maximum flow in a flow network applied to a bipartite graph. - Specifically, it is the Edmonds-Karp algorithm, since it uses a - BFS to find the shortest augmenting path at each iteration, if one - exists. - - The implementation here is an adapation of an algorithm described in - "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. - """ - - if graph == []: - return {} - - dim = len(graph) - flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] - residual_graph, residual_function = residual_network(graph, flow_function) - - while augmenting_path_for(residual_graph): - path = augmenting_path_for(residual_graph) - # Delta is the largest amount that we can increase flow across - # all of the edges in path. Because of the way that the residual - # function is constructed, f[u][v] for a particular edge (u, v) - # is the amount of unused capacity on that edge. Taking the - # minimum of a list of those values for each edge in the - # augmenting path gives us our delta. - delta = min(map(lambda (u, v), rf=residual_function: rf[u][v], - path)) - for (u, v) in path: - flow_function[u][v] += delta - flow_function[v][u] -= delta - residual_graph, residual_function = residual_network(graph,flow_function) - - new_mappings = {} - for share in shareids: - peer = residual_graph[share] - if peer == [dim - 1]: - new_mappings.setdefault(share, None) - else: - new_mappings.setdefault(share, peer[0]) - - return new_mappings - - - def _extract_ids(self, mappings): - shares = set() - peers = set() - for share in mappings: - if mappings[share] == None: - pass - else: - shares.add(share) - for item in mappings[share]: - peers.add(item) - return (peers, shares) - - - def _calculate_happiness(self, mappings): - """ - I calculate the happiness of the generated mappings and - create the set self.homeless_shares. - """ - self.happy = 0 - self.homeless_shares = set() - for share in mappings: - if mappings[share] is not None: - self.happy += 1 - else: - self.homeless_shares.add(share) - - - def _distribute_homeless_shares(self, mappings): - """ - Shares which are not mapped to a peer in the maximum spanning graph - still need to be placed on a server. This function attempts to - distribute those homeless shares as evenly as possible over the - available peers. If possible a share will be placed on the server it was - originally on, signifying the lease should be renewed instead. - """ - - # First check to see if the leases can be renewed. - to_distribute = set() - - for share in self.homeless_shares: - if share in self.servermap_shareids: - for peerid in self.servermap: - if share in self.servermap[peerid]: - mappings[share] = set([peerid]) - break - else: - to_distribute.add(share) - - # This builds a priority queue of peers with the number of shares - # each peer holds as the priority. - - priority = {} - pQueue = PriorityQueue() - for peerid in self.peerids: - priority.setdefault(peerid, 0) - for share in mappings: - if mappings[share] is not None: - for peer in mappings[share]: - if peer in self.peerids: - priority[peer] += 1 - - if priority == {}: - return - - for peerid in priority: - pQueue.put((priority[peerid], peerid)) - - # Distribute the shares to peers with the lowest priority. - for share in to_distribute: - peer = pQueue.get() - mappings[share] = set([peer[1]]) - pQueue.put((peer[0]+1, peer[1])) - - - def _convert_mappings(self, peer_to_index, share_to_index, maximum_graph): - """ - Now that a maximum spanning graph has been found, convert the indexes - back to their original ids so that the client can pass them to the - uploader. - """ - - converted_mappings = {} - for share in maximum_graph: - peer = maximum_graph[share] - if peer == None: - converted_mappings.setdefault(share_to_index[share], None) - else: - converted_mappings.setdefault(share_to_index[share], - set([peer_to_index[peer]])) - return converted_mappings - - - def _servermap_flow_graph(self, peers, shares, servermap): - """ - Generates a flow network of peerids to shareids from a server map - of 'peerids' -> ['shareids']. According to Wikipedia, "a flow network is a - directed graph where each edge has a capacity and each edge receives a flow. - The amount of flow on an edge cannot exceed the capacity of the edge." This - is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm - converts the problem into a maximum flow problem. - """ - if servermap == {}: - return [] - - peerids = peers - shareids = shares - peer_to_index = self._index_peers(peerids, 1) - share_to_index, index_to_share = self._reindex_shares(shareids, len(peerids) + 1) - graph = [] - sink_num = len(peerids) + len(shareids) + 1 - graph.append([peer_to_index[peer] for peer in peerids]) - for peerid in peerids: - shares = [share_to_index[s] for s in servermap[peerid]] - graph.insert(peer_to_index[peerid], shares) - for shareid in shareids: - graph.insert(share_to_index[shareid], [sink_num]) - graph.append([]) - return graph - - - def _index_peers(self, ids, base): - """ - I create a bidirectional dictionary of indexes to ids with - indexes from base to base + |ids| - 1 inclusively. I am used - in order to create a flow network with vertices 0 through n. - """ - reindex_to_name = {} - for item in ids: - reindex_to_name.setdefault(item, base) - reindex_to_name.setdefault(base, item) - base += 1 - return reindex_to_name - - - def _reindex_shares(self, shares, base): - """ - I create a dictionary of sharenum -> index (where 'index' is as defined - in _index_peers) and a dictionary of index -> sharenum. Since share - numbers use the same name space as the indexes, two dictionaries need - to be created instead of one like in _reindex_peers. - """ - share_to_index = {} - index_to_share = {} - for share in shares: - share_to_index.setdefault(share, base) - index_to_share.setdefault(base, share) - base += 1 - return (share_to_index, index_to_share) - - - def _flow_network(self, peerids, shareids): - """ - Given set of peerids and shareids, I create a flow network - to be used by _compute_maximum_graph. - """ - graph = [] - graph.append(peerids) - sink_num = len(peerids + shareids) + 1 - for peerid in peerids: - graph.insert(peerid, shareids) - for shareid in shareids: - graph.insert(shareid, [sink_num]) - graph.append([]) - return graph + new_shares = new_shares - existing_shares - used_shares + new_mappings = _calculate_mappings(new_peers, new_shares) + #print "new_peers %s" % new_peers + #print "new_mappings %s" % new_mappings + mappings = dict(readonly_mappings.items() + existing_mappings.items() + new_mappings.items()) + homeless_shares = set() + for share in mappings: + if mappings[share] is None: + homeless_shares.add(share) + if len(homeless_shares) != 0: + _distribute_homeless_shares(mappings, homeless_shares, peers_to_shares) + #print "mappings %s" % mappings + return mappings diff --git a/src/allmydata/test/test_happiness.py b/src/allmydata/test/test_happiness.py index 3a24ff92c..994ef5bb6 100644 --- a/src/allmydata/test/test_happiness.py +++ b/src/allmydata/test/test_happiness.py @@ -53,7 +53,6 @@ class Happiness(unittest.TestCase): } ) - def test_placement_1(self): shares = { @@ -88,7 +87,7 @@ class Happiness(unittest.TestCase): # i.e. this says that share0 should be on peer0, share1 should # be on peer1, etc. expected = { - 'share{}'.format(i): 'peer{}'.format(i) + 'share{}'.format(i): 'set([peer{}])'.format(i) for i in range(10) } self.assertEqual(expected, places) @@ -112,6 +111,10 @@ class Happiness(unittest.TestCase): readonly_peers = set() peers_to_shares = dict() + #h = happiness_upload.HappinessUpload(peers, readonly_peers, shares, peers_to_shares) + #places = h.generate_mappings() + #happiness = h.happiness() + places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) happiness = happiness_upload.calculate_happiness(places) @@ -119,7 +122,7 @@ class Happiness(unittest.TestCase): # process just gets killed with anything like 200 (see # test_upload.py) - def test_50(self): + def no_test_50(self): peers = set(['peer{}'.format(x) for x in range(50)]) shares = set(['share{}'.format(x) for x in range(50)]) readonly_peers = set() @@ -130,21 +133,6 @@ class Happiness(unittest.TestCase): self.assertEqual(50, happiness) - def test_50_orig_code(self): - peers = set(['peer{}'.format(x) for x in range(50)]) - shares = set(['share{}'.format(x) for x in range(50)]) - readonly_peers = set() - peers_to_shares = dict() - - h = happiness_upload.Happiness_Upload(peers, readonly_peers, shares, peers_to_shares) - places = h.generate_mappings() - - self.assertEqual(50, h.happy) - self.assertEqual(50, len(places)) - for share in shares: - self.assertTrue(share in places) - self.assertTrue(places[share].pop() in peers) - def test_redistribute(self): """ with existing shares 0, 3 on a single servers we can achieve @@ -161,29 +149,13 @@ class Happiness(unittest.TestCase): # we can achieve more happiness by moving "2" or "3" to server "d" places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) + #print "places %s" % places + #places = happiness_upload.slow_share_placement(peers, readonly_peers, shares, peers_to_shares) + #print "places %s" % places + happiness = happiness_upload.calculate_happiness(places) self.assertEqual(4, happiness) - def test_redistribute2(self): - """ - with existing shares 0, 3 on a single servers we can achieve - higher happiness by moving one of those shares to a new server - """ - peers = {'a', 'b', 'c', 'd'} - shares = {'0', '1', '2', '3'} - readonly_peers = set() - peers_to_shares = { - 'a': set(['0']), - 'b': set(['1']), - 'c': set(['2', '3']), - } - # we can achieve more happiness by moving "2" or "3" to server "d" - - h = happiness_upload.Happiness_Upload(peers, readonly_peers, shares, peers_to_shares) - places = h.generate_mappings() - self.assertEqual(4, h.happy) - print(places) - def test_calc_happy(self): # share -> server share_placements = { @@ -200,53 +172,3 @@ class Happiness(unittest.TestCase): } happy = happiness_upload.calculate_happiness(share_placements) self.assertEqual(2, happy) - - def test_bar(self): - peers = {'peer0', 'peer1', 'peer2', 'peer3'} - shares = {'share0', 'share1', 'share2'} - readonly_peers = {'peer0'} - servermap = { - 'peer0': {'share2', 'share0'}, - 'peer1': {'share1'}, - } - h = happiness_upload.Happiness_Upload(peers, readonly_peers, shares, servermap) - maps = h.generate_mappings() - print("maps:") - for k in sorted(maps.keys()): - print("{} -> {}".format(k, maps[k])) - - def test_foo(self): - peers = ['peer0', 'peer1'] - shares = ['share0', 'share1', 'share2'] - h = happiness_upload.Happiness_Upload(peers, [], shares, {}) - - # servermap must have all peers -> [share, share, share, ...] - graph = h._servermap_flow_graph( - peers, - shares, - { - 'peer0': ['share0', 'share1', 'share2'], - 'peer1': ['share1'], - }, - ) - peer_to_index = h._index_peers(peers, 1) - share_to_index, index_to_share = h._reindex_shares(shares, len(peers) + 1) - - print("graph:") - for row in graph: - print(row) - shareids = [3, 4, 5] - max_server_graph = h._compute_maximum_graph(graph, shareids) - print("max_server_graph:", max_server_graph) - for k, v in max_server_graph.items(): - print("{} -> {}".format(k, v)) - - mappings = h._convert_mappings(peer_to_index, index_to_share, max_server_graph) - print("mappings:", mappings) - used_peers, used_shares = h._extract_ids(mappings) - print("existing used peers", used_peers) - print("existing used shares", used_shares) - - unused_peers = peers - used_peers - unused_shares = shares - used_shares -