fix happiness calculation

unit-test for happiness calculation

unused function

put old servers_of_happiness() calculation back for now

test for calculate_happiness

remove some redundant functions
This commit is contained in:
meejah 2017-01-20 14:58:49 -07:00
parent 42011e775d
commit ef17ef2c62
5 changed files with 210 additions and 38 deletions

@ -73,17 +73,6 @@ def residual_network(graph, f):
cf[v][i] = -1
return (new_graph, cf)
def _query_all_shares(servermap, readonly_peers):
readonly_shares = set()
readonly_map = {}
for peer in servermap:
if peer in readonly_peers:
readonly_map.setdefault(peer, servermap[peer])
for share in servermap[peer]:
readonly_shares.add(share)
return readonly_shares
def _convert_mappings(index_to_peer, index_to_share, maximum_graph):
"""
Now that a maximum spanning graph has been found, convert the indexes
@ -276,24 +265,19 @@ def _merge_dicts(result, inc):
elif v is not None:
result[k] = existing.union(v)
def calculate_happiness(mappings):
"""
I calculate the happiness of the generated mappings
"""
happiness = 0
for share in mappings:
if mappings[share] is not None:
happiness += 1
return happiness
unique_peers = {list(v)[0] for k, v in mappings.items()}
return len(unique_peers)
def share_placement(peers, readonly_peers, shares, peers_to_shares={}):
"""
:param servers: ordered list of servers, "Maybe *2N* of them."
"""
# "1. Query all servers for existing shares."
#shares = _query_all_shares(servers, peers)
#print("shares", shares)
# "2. Construct a bipartite graph G1 of *readonly* servers to pre-existing
# shares, where an edge exists between an arbitrary readonly server S and an
# arbitrary share T if and only if S currently holds T."

@ -201,6 +201,7 @@ class ServerTracker:
def str_shareloc(shnum, bucketwriter):
return "%s: %s" % (shnum, bucketwriter.get_servername(),)
class PeerSelector():
implements(IPeerSelector)
@ -259,6 +260,7 @@ class PeerSelector():
def is_healthy(self):
return self.min_happiness <= self.happiness
class Tahoe2ServerSelector(log.PrefixingLogMixin):
peer_selector_class = PeerSelector
@ -554,13 +556,13 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# no more servers. If we haven't placed enough shares, we fail.
merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
effective_happiness = servers_of_happiness(self.peer_selector.get_allocations())
#effective_happiness = self.peer_selector.happiness
print "effective_happiness %s" % effective_happiness
if effective_happiness < self.servers_of_happiness:
msg = failure_message(len(self.serverids_with_shares),
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
msg = failure_message(
peer_count=len(self.serverids_with_shares),
k=self.needed_shares,
happy=self.servers_of_happiness,
effective_happy=effective_happiness,
)
msg = ("server selection failed for %s: %s (%s), merged=%s" %
(self, msg, self._get_progress_message(),
pretty_print_shnum_to_servers(merged)))

@ -96,3 +96,38 @@ class Happiness(unittest.TestCase):
for i in range(10)
}
self.assertEqual(expected, places0)
def test_unhappy(self):
shares = {
'share1', 'share2', 'share3', 'share4', 'share5',
}
peers = {
'peer1', 'peer2', 'peer3', 'peer4',
}
readonly_peers = set()
peers_to_shares = {
}
places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares)
happiness = happiness_upload.calculate_happiness(places)
self.assertEqual(4, happiness)
def test_calc_happy(self):
sharemap = {
0: set(["\x0e\xd6\xb3>\xd6\x85\x9d\x94')'\xf03:R\x88\xf1\x04\x1b\xa4",
'\x8de\x1cqM\xba\xc3\x0b\x80\x9aC<5\xfc$\xdc\xd5\xd3\x8b&',
'\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t',
'\xc4\x83\x9eJ\x7f\xac| .\xc90\xf4b\xe4\x92\xbe\xaa\xe6\t\x80']),
1: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']),
2: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']),
3: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']),
4: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']),
5: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']),
6: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']),
7: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']),
8: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']),
9: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']),
}
happy = happiness_upload.calculate_happiness(sharemap)
self.assertEqual(2, happy)

@ -940,7 +940,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
self.basedir = "upload/EncodingParameters/aborted_shares"
self.set_up_grid(num_servers=4)
c = self.g.clients[0]
DATA = upload.Data(100* "kittens", convergence="")
DATA = upload.Data(100 * "kittens", convergence="")
# These parameters are unsatisfiable with only 4 servers, but should
# work with 5, as long as the original 4 are not stuck in the open
# BucketWriter state (open() but not

@ -4,7 +4,12 @@ reporting it in messages
"""
from copy import deepcopy
from allmydata.immutable.happiness_upload import share_placement, calculate_happiness
from allmydata.immutable.happiness_upload import share_placement
from allmydata.immutable.happiness_upload import calculate_happiness
from allmydata.immutable.happiness_upload import residual_network
from allmydata.immutable.happiness_upload import bfs
from allmydata.immutable.happiness_upload import augmenting_path_for
def failure_message(peer_count, k, happy, effective_happy):
# If peer_count < needed_shares, this error message makes more
@ -78,14 +83,160 @@ def merge_servers(servermap, upload_trackers=None):
servermap.setdefault(shnum, set()).add(tracker.get_serverid())
return servermap
def servers_of_happiness(sharemap):
peers = sharemap.values()
if len(peers) == 1:
peers = peers[0]
else:
peers = [list(x)[0] for x in peers] # XXX
shares = sharemap.keys()
readonly_peers = set() # XXX
peers_to_shares = shares_by_server(sharemap)
places0 = share_placement(peers, readonly_peers, shares, peers_to_shares)
return calculate_happiness(places0)
"""
I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
return the 'servers_of_happiness' number that sharemap results in.
To calculate the 'servers_of_happiness' number for the sharemap, I
construct a bipartite graph with servers in one partition of vertices
and shares in the other, and with an edge between a server s and a share t
if s is to store t. I then compute the size of a maximum matching in
the resulting graph; this is then returned as the 'servers_of_happiness'
for my arguments.
For example, consider the following layout:
server 1: shares 1, 2, 3, 4
server 2: share 6
server 3: share 3
server 4: share 4
server 5: share 2
From this, we can construct the following graph:
L = {server 1, server 2, server 3, server 4, server 5}
R = {share 1, share 2, share 3, share 4, share 6}
V = L U R
E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
(server 1, share 4), (server 2, share 6), (server 3, share 3),
(server 4, share 4), (server 5, share 2)}
G = (V, E)
Note that G is bipartite since every edge in e has one endpoint in L
and one endpoint in R.
A matching in a graph G is a subset M of E such that, for any vertex
v in V, v is incident to at most one edge of M. A maximum matching
in G is a matching that is no smaller than any other matching. For
this graph, a matching of cardinality 5 is:
M = {(server 1, share 1), (server 2, share 6),
(server 3, share 3), (server 4, share 4),
(server 5, share 2)}
Since G is bipartite, and since |L| = 5, we cannot have an M' such
that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
as long as k <= 5, we can see that the layout above has
servers_of_happiness = 5, which matches the results here.
"""
if sharemap == {}:
return 0
servermap = shares_by_server(sharemap)
graph = _flow_network_for(servermap)
# XXX this core stuff is identical to
# happiness_upload._compute_maximum_graph and we should find a way
# to share the code.
# This is an implementation of the Ford-Fulkerson method for finding
# a maximum flow in a flow network applied to a bipartite graph.
# Specifically, it is the Edmonds-Karp algorithm, since it uses a
# BFS to find the shortest augmenting path at each iteration, if one
# exists.
#
# The implementation here is an adapation of an algorithm described in
# "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
dim = len(graph)
flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
residual_graph, residual_function = residual_network(graph, flow_function)
while augmenting_path_for(residual_graph):
path = augmenting_path_for(residual_graph)
# Delta is the largest amount that we can increase flow across
# all of the edges in path. Because of the way that the residual
# function is constructed, f[u][v] for a particular edge (u, v)
# is the amount of unused capacity on that edge. Taking the
# minimum of a list of those values for each edge in the
# augmenting path gives us our delta.
delta = min(map(lambda (u, v), rf=residual_function: rf[u][v],
path))
for (u, v) in path:
flow_function[u][v] += delta
flow_function[v][u] -= delta
residual_graph, residual_function = residual_network(graph,
flow_function)
num_servers = len(servermap)
# The value of a flow is the total flow out of the source vertex
# (vertex 0, in our graph). We could just as well sum across all of
# f[0], but we know that vertex 0 only has edges to the servers in
# our graph, so we can stop after summing flow across those. The
# value of a flow computed in this way is the size of a maximum
# matching on the bipartite graph described above.
return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
def _flow_network_for(servermap):
"""
I take my argument, a dict of peerid -> set(shareid) mappings, and
turn it into a flow network suitable for use with Edmonds-Karp. I
then return the adjacency list representation of that network.
Specifically, I build G = (V, E), where:
V = { peerid in servermap } U { shareid in servermap } U {s, t}
E = {(s, peerid) for each peerid}
U {(peerid, shareid) if peerid is to store shareid }
U {(shareid, t) for each shareid}
s and t will be source and sink nodes when my caller starts treating
the graph I return like a flow network. Without s and t, the
returned graph is bipartite.
"""
# Servers don't have integral identifiers, and we can't make any
# assumptions about the way shares are indexed -- it's possible that
# there are missing shares, for example. So before making a graph,
# we re-index so that all of our vertices have integral indices, and
# that there aren't any holes. We start indexing at 1, so that we
# can add a source node at index 0.
servermap, num_shares = _reindex(servermap, base_index=1)
num_servers = len(servermap)
graph = [] # index -> [index], an adjacency list
# Add an entry at the top (index 0) that has an edge to every server
# in servermap
graph.append(servermap.keys())
# For each server, add an entry that has an edge to every share that it
# contains (or will contain).
for k in servermap:
graph.append(servermap[k])
# For each share, add an entry that has an edge to the sink.
sink_num = num_servers + num_shares + 1
for i in xrange(num_shares):
graph.append([sink_num])
# Add an empty entry for the sink, which has no outbound edges.
graph.append([])
return graph
# XXX warning: this is different from happiness_upload's _reindex!
def _reindex(servermap, base_index):
"""
Given servermap, I map peerids and shareids to integers that don't
conflict with each other, so they're useful as indices in a graph. I
return a servermap that is reindexed appropriately, and also the
number of distinct shares in the resulting servermap as a convenience
for my caller. base_index tells me where to start indexing.
"""
shares = {} # shareid -> vertex index
num = base_index
ret = {} # peerid -> [shareid], a reindexed servermap.
# Number the servers first
for k in servermap:
ret[num] = servermap[k]
num += 1
# Number the shares
for k in ret:
for shnum in ret[k]:
if not shares.has_key(shnum):
shares[shnum] = num
num += 1
ret[k] = map(lambda x: shares[x], ret[k])
return (ret, len(shares))