From ef17ef2c62dc8de3f98c5c251baeccb8b44a16a7 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 20 Jan 2017 14:58:49 -0700 Subject: [PATCH] fix happiness calculation unit-test for happiness calculation unused function put old servers_of_happiness() calculation back for now test for calculate_happiness remove some redundant functions --- src/allmydata/immutable/happiness_upload.py | 24 +-- src/allmydata/immutable/upload.py | 14 +- src/allmydata/test/test_happiness.py | 35 ++++ src/allmydata/test/test_upload.py | 2 +- src/allmydata/util/happinessutil.py | 173 ++++++++++++++++++-- 5 files changed, 210 insertions(+), 38 deletions(-) diff --git a/src/allmydata/immutable/happiness_upload.py b/src/allmydata/immutable/happiness_upload.py index 40d6e223d..0d2421cbb 100644 --- a/src/allmydata/immutable/happiness_upload.py +++ b/src/allmydata/immutable/happiness_upload.py @@ -73,17 +73,6 @@ def residual_network(graph, f): cf[v][i] = -1 return (new_graph, cf) -def _query_all_shares(servermap, readonly_peers): - readonly_shares = set() - readonly_map = {} - for peer in servermap: - if peer in readonly_peers: - readonly_map.setdefault(peer, servermap[peer]) - for share in servermap[peer]: - readonly_shares.add(share) - return readonly_shares - - def _convert_mappings(index_to_peer, index_to_share, maximum_graph): """ Now that a maximum spanning graph has been found, convert the indexes @@ -276,24 +265,19 @@ def _merge_dicts(result, inc): elif v is not None: result[k] = existing.union(v) + def calculate_happiness(mappings): """ I calculate the happiness of the generated mappings """ - happiness = 0 - for share in mappings: - if mappings[share] is not None: - happiness += 1 - return happiness + unique_peers = {list(v)[0] for k, v in mappings.items()} + return len(unique_peers) + def share_placement(peers, readonly_peers, shares, peers_to_shares={}): """ :param servers: ordered list of servers, "Maybe *2N* of them." """ - # "1. Query all servers for existing shares." - #shares = _query_all_shares(servers, peers) - #print("shares", shares) - # "2. Construct a bipartite graph G1 of *readonly* servers to pre-existing # shares, where an edge exists between an arbitrary readonly server S and an # arbitrary share T if and only if S currently holds T." diff --git a/src/allmydata/immutable/upload.py b/src/allmydata/immutable/upload.py index d37f94dd8..1907d5354 100644 --- a/src/allmydata/immutable/upload.py +++ b/src/allmydata/immutable/upload.py @@ -201,6 +201,7 @@ class ServerTracker: def str_shareloc(shnum, bucketwriter): return "%s: %s" % (shnum, bucketwriter.get_servername(),) + class PeerSelector(): implements(IPeerSelector) @@ -259,6 +260,7 @@ class PeerSelector(): def is_healthy(self): return self.min_happiness <= self.happiness + class Tahoe2ServerSelector(log.PrefixingLogMixin): peer_selector_class = PeerSelector @@ -554,13 +556,13 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin): # no more servers. If we haven't placed enough shares, we fail. merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers) effective_happiness = servers_of_happiness(self.peer_selector.get_allocations()) - #effective_happiness = self.peer_selector.happiness - print "effective_happiness %s" % effective_happiness if effective_happiness < self.servers_of_happiness: - msg = failure_message(len(self.serverids_with_shares), - self.needed_shares, - self.servers_of_happiness, - effective_happiness) + msg = failure_message( + peer_count=len(self.serverids_with_shares), + k=self.needed_shares, + happy=self.servers_of_happiness, + effective_happy=effective_happiness, + ) msg = ("server selection failed for %s: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged))) diff --git a/src/allmydata/test/test_happiness.py b/src/allmydata/test/test_happiness.py index 7e408d172..4fc72ee00 100644 --- a/src/allmydata/test/test_happiness.py +++ b/src/allmydata/test/test_happiness.py @@ -96,3 +96,38 @@ class Happiness(unittest.TestCase): for i in range(10) } self.assertEqual(expected, places0) + + def test_unhappy(self): + + shares = { + 'share1', 'share2', 'share3', 'share4', 'share5', + } + peers = { + 'peer1', 'peer2', 'peer3', 'peer4', + } + readonly_peers = set() + peers_to_shares = { + } + + places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) + happiness = happiness_upload.calculate_happiness(places) + self.assertEqual(4, happiness) + + def test_calc_happy(self): + sharemap = { + 0: set(["\x0e\xd6\xb3>\xd6\x85\x9d\x94')'\xf03:R\x88\xf1\x04\x1b\xa4", + '\x8de\x1cqM\xba\xc3\x0b\x80\x9aC<5\xfc$\xdc\xd5\xd3\x8b&', + '\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t', + '\xc4\x83\x9eJ\x7f\xac| .\xc90\xf4b\xe4\x92\xbe\xaa\xe6\t\x80']), + 1: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), + 2: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), + 3: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), + 4: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), + 5: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), + 6: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), + 7: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), + 8: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), + 9: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), + } + happy = happiness_upload.calculate_happiness(sharemap) + self.assertEqual(2, happy) diff --git a/src/allmydata/test/test_upload.py b/src/allmydata/test/test_upload.py index 09873eebf..f9fd5c4ae 100644 --- a/src/allmydata/test/test_upload.py +++ b/src/allmydata/test/test_upload.py @@ -940,7 +940,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin, self.basedir = "upload/EncodingParameters/aborted_shares" self.set_up_grid(num_servers=4) c = self.g.clients[0] - DATA = upload.Data(100* "kittens", convergence="") + DATA = upload.Data(100 * "kittens", convergence="") # These parameters are unsatisfiable with only 4 servers, but should # work with 5, as long as the original 4 are not stuck in the open # BucketWriter state (open() but not diff --git a/src/allmydata/util/happinessutil.py b/src/allmydata/util/happinessutil.py index 52be6e80c..c5bc03330 100644 --- a/src/allmydata/util/happinessutil.py +++ b/src/allmydata/util/happinessutil.py @@ -4,7 +4,12 @@ reporting it in messages """ from copy import deepcopy -from allmydata.immutable.happiness_upload import share_placement, calculate_happiness +from allmydata.immutable.happiness_upload import share_placement +from allmydata.immutable.happiness_upload import calculate_happiness +from allmydata.immutable.happiness_upload import residual_network +from allmydata.immutable.happiness_upload import bfs +from allmydata.immutable.happiness_upload import augmenting_path_for + def failure_message(peer_count, k, happy, effective_happy): # If peer_count < needed_shares, this error message makes more @@ -78,14 +83,160 @@ def merge_servers(servermap, upload_trackers=None): servermap.setdefault(shnum, set()).add(tracker.get_serverid()) return servermap + def servers_of_happiness(sharemap): - peers = sharemap.values() - if len(peers) == 1: - peers = peers[0] - else: - peers = [list(x)[0] for x in peers] # XXX - shares = sharemap.keys() - readonly_peers = set() # XXX - peers_to_shares = shares_by_server(sharemap) - places0 = share_placement(peers, readonly_peers, shares, peers_to_shares) - return calculate_happiness(places0) + """ + I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I + return the 'servers_of_happiness' number that sharemap results in. + + To calculate the 'servers_of_happiness' number for the sharemap, I + construct a bipartite graph with servers in one partition of vertices + and shares in the other, and with an edge between a server s and a share t + if s is to store t. I then compute the size of a maximum matching in + the resulting graph; this is then returned as the 'servers_of_happiness' + for my arguments. + + For example, consider the following layout: + + server 1: shares 1, 2, 3, 4 + server 2: share 6 + server 3: share 3 + server 4: share 4 + server 5: share 2 + + From this, we can construct the following graph: + + L = {server 1, server 2, server 3, server 4, server 5} + R = {share 1, share 2, share 3, share 4, share 6} + V = L U R + E = {(server 1, share 1), (server 1, share 2), (server 1, share 3), + (server 1, share 4), (server 2, share 6), (server 3, share 3), + (server 4, share 4), (server 5, share 2)} + G = (V, E) + + Note that G is bipartite since every edge in e has one endpoint in L + and one endpoint in R. + + A matching in a graph G is a subset M of E such that, for any vertex + v in V, v is incident to at most one edge of M. A maximum matching + in G is a matching that is no smaller than any other matching. For + this graph, a matching of cardinality 5 is: + + M = {(server 1, share 1), (server 2, share 6), + (server 3, share 3), (server 4, share 4), + (server 5, share 2)} + + Since G is bipartite, and since |L| = 5, we cannot have an M' such + that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and + as long as k <= 5, we can see that the layout above has + servers_of_happiness = 5, which matches the results here. + """ + if sharemap == {}: + return 0 + servermap = shares_by_server(sharemap) + graph = _flow_network_for(servermap) + + # XXX this core stuff is identical to + # happiness_upload._compute_maximum_graph and we should find a way + # to share the code. + + # This is an implementation of the Ford-Fulkerson method for finding + # a maximum flow in a flow network applied to a bipartite graph. + # Specifically, it is the Edmonds-Karp algorithm, since it uses a + # BFS to find the shortest augmenting path at each iteration, if one + # exists. + # + # The implementation here is an adapation of an algorithm described in + # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. + dim = len(graph) + flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] + residual_graph, residual_function = residual_network(graph, flow_function) + while augmenting_path_for(residual_graph): + path = augmenting_path_for(residual_graph) + # Delta is the largest amount that we can increase flow across + # all of the edges in path. Because of the way that the residual + # function is constructed, f[u][v] for a particular edge (u, v) + # is the amount of unused capacity on that edge. Taking the + # minimum of a list of those values for each edge in the + # augmenting path gives us our delta. + delta = min(map(lambda (u, v), rf=residual_function: rf[u][v], + path)) + for (u, v) in path: + flow_function[u][v] += delta + flow_function[v][u] -= delta + residual_graph, residual_function = residual_network(graph, + flow_function) + num_servers = len(servermap) + # The value of a flow is the total flow out of the source vertex + # (vertex 0, in our graph). We could just as well sum across all of + # f[0], but we know that vertex 0 only has edges to the servers in + # our graph, so we can stop after summing flow across those. The + # value of a flow computed in this way is the size of a maximum + # matching on the bipartite graph described above. + return sum([flow_function[0][v] for v in xrange(1, num_servers+1)]) + +def _flow_network_for(servermap): + """ + I take my argument, a dict of peerid -> set(shareid) mappings, and + turn it into a flow network suitable for use with Edmonds-Karp. I + then return the adjacency list representation of that network. + + Specifically, I build G = (V, E), where: + V = { peerid in servermap } U { shareid in servermap } U {s, t} + E = {(s, peerid) for each peerid} + U {(peerid, shareid) if peerid is to store shareid } + U {(shareid, t) for each shareid} + + s and t will be source and sink nodes when my caller starts treating + the graph I return like a flow network. Without s and t, the + returned graph is bipartite. + """ + # Servers don't have integral identifiers, and we can't make any + # assumptions about the way shares are indexed -- it's possible that + # there are missing shares, for example. So before making a graph, + # we re-index so that all of our vertices have integral indices, and + # that there aren't any holes. We start indexing at 1, so that we + # can add a source node at index 0. + servermap, num_shares = _reindex(servermap, base_index=1) + num_servers = len(servermap) + graph = [] # index -> [index], an adjacency list + # Add an entry at the top (index 0) that has an edge to every server + # in servermap + graph.append(servermap.keys()) + # For each server, add an entry that has an edge to every share that it + # contains (or will contain). + for k in servermap: + graph.append(servermap[k]) + # For each share, add an entry that has an edge to the sink. + sink_num = num_servers + num_shares + 1 + for i in xrange(num_shares): + graph.append([sink_num]) + # Add an empty entry for the sink, which has no outbound edges. + graph.append([]) + return graph + + +# XXX warning: this is different from happiness_upload's _reindex! +def _reindex(servermap, base_index): + """ + Given servermap, I map peerids and shareids to integers that don't + conflict with each other, so they're useful as indices in a graph. I + return a servermap that is reindexed appropriately, and also the + number of distinct shares in the resulting servermap as a convenience + for my caller. base_index tells me where to start indexing. + """ + shares = {} # shareid -> vertex index + num = base_index + ret = {} # peerid -> [shareid], a reindexed servermap. + # Number the servers first + for k in servermap: + ret[num] = servermap[k] + num += 1 + # Number the shares + for k in ret: + for shnum in ret[k]: + if not shares.has_key(shnum): + shares[shnum] = num + num += 1 + ret[k] = map(lambda x: shares[x], ret[k]) + return (ret, len(shares))