mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-20 05:28:04 +00:00
Make correction to docstring for Tahoe2ServerSelector's _handle_existing_response
Add comments 10 and 8 from the servers of happiness spec Fix bug in _filter_g3 for servers of happiness Remove usage of HappinessUpload class here we modifying the PeerSelector class. we make sure to correctly calculate the happiness value by ignoring keys who's value are None... Remove HappinessUpload and tests Replace helper servers_of_happiness we replace it's previous implementation with a new wrapper function that uses share_placement
This commit is contained in:
parent
adb9a98383
commit
42011e775d
@ -1,11 +1,82 @@
|
||||
from Queue import PriorityQueue
|
||||
from allmydata.util.happinessutil import augmenting_path_for, residual_network
|
||||
|
||||
def augmenting_path_for(graph):
|
||||
"""
|
||||
I return an augmenting path, if there is one, from the source node
|
||||
to the sink node in the flow network represented by my graph argument.
|
||||
If there is no augmenting path, I return False. I assume that the
|
||||
source node is at index 0 of graph, and the sink node is at the last
|
||||
index. I also assume that graph is a flow network in adjacency list
|
||||
form.
|
||||
"""
|
||||
bfs_tree = bfs(graph, 0)
|
||||
if bfs_tree[len(graph) - 1]:
|
||||
n = len(graph) - 1
|
||||
path = [] # [(u, v)], where u and v are vertices in the graph
|
||||
while n != 0:
|
||||
path.insert(0, (bfs_tree[n], n))
|
||||
n = bfs_tree[n]
|
||||
return path
|
||||
return False
|
||||
|
||||
def bfs(graph, s):
|
||||
"""
|
||||
Perform a BFS on graph starting at s, where graph is a graph in
|
||||
adjacency list form, and s is a node in graph. I return the
|
||||
predecessor table that the BFS generates.
|
||||
"""
|
||||
# This is an adaptation of the BFS described in "Introduction to
|
||||
# Algorithms", Cormen et al, 2nd ed., p. 532.
|
||||
# WHITE vertices are those that we haven't seen or explored yet.
|
||||
WHITE = 0
|
||||
# GRAY vertices are those we have seen, but haven't explored yet
|
||||
GRAY = 1
|
||||
# BLACK vertices are those we have seen and explored
|
||||
BLACK = 2
|
||||
color = [WHITE for i in xrange(len(graph))]
|
||||
predecessor = [None for i in xrange(len(graph))]
|
||||
distance = [-1 for i in xrange(len(graph))]
|
||||
queue = [s] # vertices that we haven't explored yet.
|
||||
color[s] = GRAY
|
||||
distance[s] = 0
|
||||
while queue:
|
||||
n = queue.pop(0)
|
||||
for v in graph[n]:
|
||||
if color[v] == WHITE:
|
||||
color[v] = GRAY
|
||||
distance[v] = distance[n] + 1
|
||||
predecessor[v] = n
|
||||
queue.append(v)
|
||||
color[n] = BLACK
|
||||
return predecessor
|
||||
|
||||
def residual_network(graph, f):
|
||||
"""
|
||||
I return the residual network and residual capacity function of the
|
||||
flow network represented by my graph and f arguments. graph is a
|
||||
flow network in adjacency-list form, and f is a flow in graph.
|
||||
"""
|
||||
new_graph = [[] for i in xrange(len(graph))]
|
||||
cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
|
||||
for i in xrange(len(graph)):
|
||||
for v in graph[i]:
|
||||
if f[i][v] == 1:
|
||||
# We add an edge (v, i) with cf[v,i] = 1. This means
|
||||
# that we can remove 1 unit of flow from the edge (i, v)
|
||||
new_graph[v].append(i)
|
||||
cf[v][i] = 1
|
||||
cf[i][v] = -1
|
||||
else:
|
||||
# We add the edge (i, v), since we're not using it right
|
||||
# now.
|
||||
new_graph[i].append(v)
|
||||
cf[i][v] = 1
|
||||
cf[v][i] = -1
|
||||
return (new_graph, cf)
|
||||
|
||||
def _query_all_shares(servermap, readonly_peers):
|
||||
readonly_shares = set()
|
||||
readonly_map = {}
|
||||
for peer in servermap:
|
||||
print("peer", peer)
|
||||
if peer in readonly_peers:
|
||||
readonly_map.setdefault(peer, servermap[peer])
|
||||
for share in servermap[peer]:
|
||||
@ -158,7 +229,6 @@ def _maximum_matching_graph(graph, servermap):
|
||||
"""
|
||||
peers = [x[0] for x in graph]
|
||||
shares = [x[1] for x in graph]
|
||||
|
||||
peer_to_index, index_to_peer = _reindex(peers, 1)
|
||||
share_to_index, index_to_share = _reindex(shares, len(peers) + 1)
|
||||
shareIndices = [share_to_index[s] for s in shares]
|
||||
@ -178,9 +248,11 @@ def _filter_g3(g3, m1, m2):
|
||||
that we retain servers/shares that were in G1/G2 but *not* in the
|
||||
M1/M2 subsets)"
|
||||
"""
|
||||
# m1, m2 are dicts from share -> set(peers)
|
||||
# (but I think the set size is always 1 .. so maybe we could fix that everywhere)
|
||||
m12_servers = reduce(lambda a, b: a.union(b), m1.values() + m2.values())
|
||||
sequence = m1.values() + m2.values()
|
||||
sequence = filter(lambda x: x is not None, sequence)
|
||||
if len(sequence) == 0:
|
||||
return g3
|
||||
m12_servers = reduce(lambda a, b: a.union(b), sequence)
|
||||
m12_shares = set(m1.keys() + m2.keys())
|
||||
new_g3 = set()
|
||||
for edge in g3:
|
||||
@ -204,12 +276,19 @@ def _merge_dicts(result, inc):
|
||||
elif v is not None:
|
||||
result[k] = existing.union(v)
|
||||
|
||||
def calculate_happiness(mappings):
|
||||
"""
|
||||
I calculate the happiness of the generated mappings
|
||||
"""
|
||||
happiness = 0
|
||||
for share in mappings:
|
||||
if mappings[share] is not None:
|
||||
happiness += 1
|
||||
return happiness
|
||||
|
||||
def share_placement(peers, readonly_peers, shares, peers_to_shares={}):
|
||||
"""
|
||||
:param servers: ordered list of servers, "Maybe *2N* of them."
|
||||
|
||||
working from servers-of-happiness.rst, in kind-of pseudo-code
|
||||
"""
|
||||
# "1. Query all servers for existing shares."
|
||||
#shares = _query_all_shares(servers, peers)
|
||||
@ -231,7 +310,7 @@ def share_placement(peers, readonly_peers, shares, peers_to_shares={}):
|
||||
# prefer earlier servers. Call this particular placement M1. The placement
|
||||
# maps shares to servers, where each share appears at most once, and each
|
||||
# server appears at most once.
|
||||
m1 = _maximum_matching_graph(g1, peers_to_shares)#peers, shares)
|
||||
m1 = _maximum_matching_graph(g1, peers_to_shares)
|
||||
if False:
|
||||
print("M1:")
|
||||
for k, v in m1.items():
|
||||
@ -274,7 +353,6 @@ def share_placement(peers, readonly_peers, shares, peers_to_shares={}):
|
||||
g3 = [
|
||||
(server, share) for server in readwrite for share in shares
|
||||
]
|
||||
|
||||
g3 = _filter_g3(g3, m1, m2)
|
||||
if False:
|
||||
print("G3:")
|
||||
@ -315,316 +393,3 @@ def share_placement(peers, readonly_peers, shares, peers_to_shares={}):
|
||||
# share->set(peer) where the set-size is 1 because sets are a pain
|
||||
# to deal with (i.e. no indexing).
|
||||
return answer
|
||||
|
||||
class HappinessUpload:
|
||||
"""
|
||||
I handle the calculations involved with generating the maximum
|
||||
spanning graph for a file when given a set of peers, a set of shares,
|
||||
and a servermap of 'peer' -> [shares].
|
||||
|
||||
For more information on the algorithm this class implements, refer to
|
||||
docs/specifications/servers-of-happiness.rst
|
||||
"""
|
||||
|
||||
# HappinessUpload(self.peers, self.full_peers, shares, self.existing_shares)
|
||||
def __init__(self, peers, readonly_peers, shares, servermap={}):
|
||||
self._happiness = 0
|
||||
self.homeless_shares = set()
|
||||
self.peers = peers
|
||||
self.readonly_peers = readonly_peers
|
||||
self.shares = shares
|
||||
self.servermap = servermap
|
||||
|
||||
def happiness(self):
|
||||
return self._happiness
|
||||
|
||||
|
||||
def generate_mappings(self):
|
||||
"""
|
||||
Generates the allocations the upload should based on the given
|
||||
information. We construct a dictionary of 'share_num' -> set(server_ids)
|
||||
and return it to the caller. Each share should be placed on each server
|
||||
in the corresponding set. Existing allocations appear as placements
|
||||
because attempting to place an existing allocation will renew the share.
|
||||
"""
|
||||
|
||||
# First calculate share placement for the readonly servers.
|
||||
readonly_peers = self.readonly_peers
|
||||
readonly_shares = set()
|
||||
readonly_map = {}
|
||||
for peer in self.servermap:
|
||||
if peer in self.readonly_peers:
|
||||
readonly_map.setdefault(peer, self.servermap[peer])
|
||||
for share in self.servermap[peer]:
|
||||
readonly_shares.add(share)
|
||||
|
||||
readonly_mappings = self._calculate_mappings(readonly_peers, readonly_shares, readonly_map)
|
||||
used_peers, used_shares = self._extract_ids(readonly_mappings)
|
||||
|
||||
# Calculate share placement for the remaining existing allocations
|
||||
peers = set(self.servermap.keys()) - used_peers
|
||||
# Squash a list of sets into one set
|
||||
shares = set(item for subset in self.servermap.values() for item in subset)
|
||||
shares -= used_shares
|
||||
servermap = self.servermap.copy()
|
||||
for peer in self.servermap:
|
||||
if peer in used_peers:
|
||||
servermap.pop(peer, None)
|
||||
else:
|
||||
servermap[peer] = servermap[peer] - used_shares
|
||||
if servermap[peer] == set():
|
||||
servermap.pop(peer, None)
|
||||
peers.remove(peer)
|
||||
|
||||
existing_mappings = self._calculate_mappings(peers, shares, servermap)
|
||||
existing_peers, existing_shares = self._extract_ids(existing_mappings)
|
||||
|
||||
# Calculate share placement for the remaining peers and shares which
|
||||
# won't be preserved by existing allocations.
|
||||
peers = self.peers - existing_peers - used_peers
|
||||
shares = self.shares - existing_shares - used_shares
|
||||
new_mappings = self._calculate_mappings(peers, shares)
|
||||
|
||||
mappings = dict(readonly_mappings.items() + existing_mappings.items() + new_mappings.items())
|
||||
self._calculate_happiness(mappings)
|
||||
if len(self.homeless_shares) != 0:
|
||||
all_shares = set(item for subset in self.servermap.values() for item in subset)
|
||||
self._distribute_homeless_shares(mappings, all_shares)
|
||||
|
||||
return mappings
|
||||
|
||||
|
||||
def _calculate_mappings(self, peers, shares, servermap=None):
|
||||
"""
|
||||
Given a set of peers, a set of shares, and a dictionary of server ->
|
||||
set(shares), determine how the uploader should allocate shares. If a
|
||||
servermap is supplied, determine which existing allocations should be
|
||||
preserved. If servermap is None, calculate the maximum matching of the
|
||||
bipartite graph (U, V, E) such that:
|
||||
|
||||
U = peers
|
||||
V = shares
|
||||
E = peers x shares
|
||||
|
||||
Returns a dictionary {share -> set(peer)}, indicating that the share
|
||||
should be placed on each peer in the set. If a share's corresponding
|
||||
value is None, the share can be placed on any server. Note that the set
|
||||
of peers should only be one peer when returned, but it is possible to
|
||||
duplicate shares by adding additional servers to the set.
|
||||
"""
|
||||
peer_to_index, index_to_peer = self._reindex(peers, 1)
|
||||
share_to_index, index_to_share = self._reindex(shares, len(peers) + 1)
|
||||
shareIndices = [share_to_index[s] for s in shares]
|
||||
if servermap:
|
||||
graph = self._servermap_flow_graph(peers, shares, servermap)
|
||||
else:
|
||||
peerIndices = [peer_to_index[peer] for peer in peers]
|
||||
graph = self._flow_network(peerIndices, shareIndices)
|
||||
max_graph = self._compute_maximum_graph(graph, shareIndices)
|
||||
return self._convert_mappings(index_to_peer, index_to_share, max_graph)
|
||||
|
||||
|
||||
def _compute_maximum_graph(self, graph, shareIndices):
|
||||
"""
|
||||
This is an implementation of the Ford-Fulkerson method for finding
|
||||
a maximum flow in a flow network applied to a bipartite graph.
|
||||
Specifically, it is the Edmonds-Karp algorithm, since it uses a
|
||||
BFS to find the shortest augmenting path at each iteration, if one
|
||||
exists.
|
||||
|
||||
The implementation here is an adapation of an algorithm described in
|
||||
"Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
|
||||
"""
|
||||
|
||||
if graph == []:
|
||||
return {}
|
||||
|
||||
dim = len(graph)
|
||||
flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
|
||||
residual_graph, residual_function = residual_network(graph, flow_function)
|
||||
|
||||
while augmenting_path_for(residual_graph):
|
||||
path = augmenting_path_for(residual_graph)
|
||||
# Delta is the largest amount that we can increase flow across
|
||||
# all of the edges in path. Because of the way that the residual
|
||||
# function is constructed, f[u][v] for a particular edge (u, v)
|
||||
# is the amount of unused capacity on that edge. Taking the
|
||||
# minimum of a list of those values for each edge in the
|
||||
# augmenting path gives us our delta.
|
||||
delta = min(map(lambda (u, v), rf=residual_function: rf[u][v],
|
||||
path))
|
||||
for (u, v) in path:
|
||||
flow_function[u][v] += delta
|
||||
flow_function[v][u] -= delta
|
||||
residual_graph, residual_function = residual_network(graph,flow_function)
|
||||
|
||||
new_mappings = {}
|
||||
for shareIndex in shareIndices:
|
||||
peer = residual_graph[shareIndex]
|
||||
if peer == [dim - 1]:
|
||||
new_mappings.setdefault(shareIndex, None)
|
||||
else:
|
||||
new_mappings.setdefault(shareIndex, peer[0])
|
||||
|
||||
return new_mappings
|
||||
|
||||
|
||||
def _extract_ids(self, mappings):
|
||||
shares = set()
|
||||
peers = set()
|
||||
for share in mappings:
|
||||
if mappings[share] == None:
|
||||
pass
|
||||
else:
|
||||
shares.add(share)
|
||||
for item in mappings[share]:
|
||||
peers.add(item)
|
||||
return (peers, shares)
|
||||
|
||||
|
||||
def _calculate_happiness(self, mappings):
|
||||
"""
|
||||
I calculate the happiness of the generated mappings and
|
||||
create the set self.homeless_shares.
|
||||
"""
|
||||
self._happiness = 0
|
||||
self.homeless_shares = set()
|
||||
for share in mappings:
|
||||
if mappings[share] is not None:
|
||||
self._happiness += 1
|
||||
else:
|
||||
self.homeless_shares.add(share)
|
||||
|
||||
|
||||
def _distribute_homeless_shares(self, mappings, shares):
|
||||
"""
|
||||
Shares which are not mapped to a peer in the maximum spanning graph
|
||||
still need to be placed on a server. This function attempts to
|
||||
distribute those homeless shares as evenly as possible over the
|
||||
available peers. If possible a share will be placed on the server it was
|
||||
originally on, signifying the lease should be renewed instead.
|
||||
"""
|
||||
|
||||
# First check to see if the leases can be renewed.
|
||||
to_distribute = set()
|
||||
|
||||
for share in self.homeless_shares:
|
||||
if share in shares:
|
||||
for peer in self.servermap:
|
||||
if share in self.servermap[peer]:
|
||||
mappings[share] = set([peer])
|
||||
break
|
||||
else:
|
||||
to_distribute.add(share)
|
||||
|
||||
# This builds a priority queue of peers with the number of shares
|
||||
# each peer holds as the priority.
|
||||
|
||||
priority = {}
|
||||
pQueue = PriorityQueue()
|
||||
for peer in self.peers:
|
||||
priority.setdefault(peer, 0)
|
||||
for share in mappings:
|
||||
if mappings[share] is not None:
|
||||
for peer in mappings[share]:
|
||||
if peer in self.peers:
|
||||
priority[peer] += 1
|
||||
|
||||
if priority == {}:
|
||||
return
|
||||
|
||||
for peer in priority:
|
||||
pQueue.put((priority[peer], peer))
|
||||
|
||||
# Distribute the shares to peers with the lowest priority.
|
||||
for share in to_distribute:
|
||||
peer = pQueue.get()
|
||||
mappings[share] = set([peer[1]])
|
||||
pQueue.put((peer[0]+1, peer[1]))
|
||||
|
||||
|
||||
def _convert_mappings(self, index_to_peer, index_to_share, maximum_graph):
|
||||
"""
|
||||
Now that a maximum spanning graph has been found, convert the indexes
|
||||
back to their original ids so that the client can pass them to the
|
||||
uploader.
|
||||
"""
|
||||
|
||||
converted_mappings = {}
|
||||
for share in maximum_graph:
|
||||
peer = maximum_graph[share]
|
||||
if peer == None:
|
||||
converted_mappings.setdefault(index_to_share[share], None)
|
||||
else:
|
||||
converted_mappings.setdefault(index_to_share[share], set([index_to_peer[peer]]))
|
||||
return converted_mappings
|
||||
|
||||
|
||||
def _servermap_flow_graph(self, peers, shares, servermap):
|
||||
"""
|
||||
Generates a flow network of peerIndices to shareIndices from a server map
|
||||
of 'peer' -> ['shares']. According to Wikipedia, "a flow network is a
|
||||
directed graph where each edge has a capacity and each edge receives a flow.
|
||||
The amount of flow on an edge cannot exceed the capacity of the edge." This
|
||||
is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm
|
||||
converts the problem into a maximum flow problem.
|
||||
"""
|
||||
if servermap == {}:
|
||||
return []
|
||||
|
||||
peer_to_index, index_to_peer = self._reindex(peers, 1)
|
||||
share_to_index, index_to_share = self._reindex(shares, len(peers) + 1)
|
||||
graph = []
|
||||
sink_num = len(peers) + len(shares) + 1
|
||||
graph.append([peer_to_index[peer] for peer in peers])
|
||||
for peer in peers:
|
||||
indexedShares = [share_to_index[s] for s in servermap[peer]]
|
||||
graph.insert(peer_to_index[peer], indexedShares)
|
||||
for share in shares:
|
||||
graph.insert(share_to_index[share], [sink_num])
|
||||
graph.append([])
|
||||
return graph
|
||||
|
||||
|
||||
def _reindex(self, items, base):
|
||||
"""
|
||||
I take an iteratble of items and give each item an index to be used in
|
||||
the construction of a flow network. Indices for these items start at base
|
||||
and continue to base + len(items) - 1.
|
||||
|
||||
I return two dictionaries: ({item: index}, {index: item})
|
||||
"""
|
||||
item_to_index = {}
|
||||
index_to_item = {}
|
||||
for item in items:
|
||||
item_to_index.setdefault(item, base)
|
||||
index_to_item.setdefault(base, item)
|
||||
base += 1
|
||||
return (item_to_index, index_to_item)
|
||||
|
||||
|
||||
def _flow_network(self, peerIndices, shareIndices):
|
||||
"""
|
||||
Given set of peerIndices and a set of shareIndices, I create a flow network
|
||||
to be used by _compute_maximum_graph. The return value is a two
|
||||
dimensional list in the form of a flow network, where each index represents
|
||||
a node, and the corresponding list represents all of the nodes it is connected
|
||||
to.
|
||||
|
||||
This function is similar to allmydata.util.happinessutil.flow_network_for, but
|
||||
we connect every peer with all shares instead of reflecting a supplied servermap.
|
||||
"""
|
||||
graph = []
|
||||
# The first entry in our flow network is the source.
|
||||
# Connect the source to every server.
|
||||
graph.append(peerIndices)
|
||||
sink_num = len(peerIndices + shareIndices) + 1
|
||||
# Connect every server with every share it can possibly store.
|
||||
for peerIndex in peerIndices:
|
||||
graph.insert(peerIndex, shareIndices)
|
||||
# Connect every share with the sink.
|
||||
for shareIndex in shareIndices:
|
||||
graph.insert(shareIndex, [sink_num])
|
||||
# Add an empty entry for the sink.
|
||||
graph.append([])
|
||||
return graph
|
||||
|
@ -25,7 +25,7 @@ from allmydata.immutable import layout
|
||||
from pycryptopp.cipher.aes import AES
|
||||
|
||||
from cStringIO import StringIO
|
||||
from happiness_upload import HappinessUpload
|
||||
from happiness_upload import share_placement, calculate_happiness
|
||||
|
||||
|
||||
# this wants to live in storage, not here
|
||||
@ -252,12 +252,12 @@ class PeerSelector():
|
||||
|
||||
def get_tasks(self):
|
||||
shares = set(range(self.total_shares))
|
||||
self.h = HappinessUpload(self.peers, self.full_peers, shares, self.existing_shares)
|
||||
return self.h.generate_mappings()
|
||||
self.happiness_mappings = share_placement(self.peers, self.full_peers, shares, self.existing_shares)
|
||||
self.happiness = calculate_happiness(self.happiness_mappings)
|
||||
return self.happiness_mappings
|
||||
|
||||
def is_healthy(self):
|
||||
return self.min_happiness <= self.h.happiness()
|
||||
|
||||
return self.min_happiness <= self.happiness
|
||||
|
||||
class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
|
||||
@ -438,7 +438,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
def _handle_existing_response(self, res, tracker):
|
||||
"""
|
||||
I handle responses to the queries sent by
|
||||
Tahoe2ServerSelector._existing_shares.
|
||||
Tahoe2ServerSelector.get_shareholders.
|
||||
"""
|
||||
serverid = tracker.get_serverid()
|
||||
if isinstance(res, failure.Failure):
|
||||
@ -533,10 +533,20 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
|
||||
|
||||
def _request_another_allocation(self):
|
||||
"""
|
||||
see docs/specifications/servers-of-happiness.rst
|
||||
10. If any placements from step 9 fail, mark the server as read-only. Go back
|
||||
to step 2 (since we may discover a server is/has-become read-only, or has
|
||||
failed, during step 9).
|
||||
"""
|
||||
allocation = self._get_next_allocation()
|
||||
if allocation is not None:
|
||||
tracker, shares_to_ask = allocation
|
||||
|
||||
# see docs/specifications/servers-of-happiness.rst
|
||||
# 8. Renew the shares on their respective servers from M1 and M2.
|
||||
d = tracker.query(shares_to_ask)
|
||||
|
||||
d.addBoth(self._got_response, tracker, shares_to_ask)
|
||||
return d
|
||||
|
||||
@ -544,6 +554,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
|
||||
# no more servers. If we haven't placed enough shares, we fail.
|
||||
merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
|
||||
effective_happiness = servers_of_happiness(self.peer_selector.get_allocations())
|
||||
#effective_happiness = self.peer_selector.happiness
|
||||
print "effective_happiness %s" % effective_happiness
|
||||
if effective_happiness < self.servers_of_happiness:
|
||||
msg = failure_message(len(self.serverids_with_shares),
|
||||
self.needed_shares,
|
||||
|
@ -2,7 +2,6 @@
|
||||
|
||||
from twisted.trial import unittest
|
||||
from allmydata.immutable import happiness_upload
|
||||
from allmydata.util.happinessutil import augmenting_path_for, residual_network
|
||||
|
||||
|
||||
class HappinessUtils(unittest.TestCase):
|
||||
@ -20,7 +19,7 @@ class HappinessUtils(unittest.TestCase):
|
||||
)
|
||||
flow = [[0 for _ in graph] for _ in graph]
|
||||
|
||||
residual, capacity = residual_network(graph, flow)
|
||||
residual, capacity = happiness_upload.residual_network(graph, flow)
|
||||
|
||||
# XXX no idea if these are right; hand-verify
|
||||
self.assertEqual(residual, [[1], [2], [3], []])
|
||||
@ -29,19 +28,6 @@ class HappinessUtils(unittest.TestCase):
|
||||
|
||||
class Happiness(unittest.TestCase):
|
||||
|
||||
def test_original_easy(self):
|
||||
shares = {'share0', 'share1', 'share2'}
|
||||
peers = {'peer0', 'peer1'}
|
||||
readonly_peers = set()
|
||||
servermap = {
|
||||
'peer0': {'share0'},
|
||||
'peer1': {'share2'},
|
||||
}
|
||||
places0 = happiness_upload.HappinessUpload(peers, readonly_peers, shares, servermap).generate_mappings()
|
||||
|
||||
self.assertTrue('peer0' in places0['share0'])
|
||||
self.assertTrue('peer1' in places0['share2'])
|
||||
|
||||
def test_placement_simple(self):
|
||||
|
||||
shares = {'share0', 'share1', 'share2'}
|
||||
@ -56,15 +42,11 @@ class Happiness(unittest.TestCase):
|
||||
}
|
||||
|
||||
places0 = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares)
|
||||
places1 = happiness_upload.HappinessUpload(peers, readonly_peers, shares).generate_mappings()
|
||||
|
||||
if False:
|
||||
print("places0")
|
||||
for k, v in places0.items():
|
||||
print(" {} -> {}".format(k, v))
|
||||
print("places1")
|
||||
for k, v in places1.items():
|
||||
print(" {} -> {}".format(k, v))
|
||||
|
||||
self.assertEqual(
|
||||
places0,
|
||||
@ -105,7 +87,6 @@ class Happiness(unittest.TestCase):
|
||||
}
|
||||
|
||||
places0 = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares)
|
||||
places1 = happiness_upload.HappinessUpload(peers, readonly_peers, shares).generate_mappings()
|
||||
|
||||
# share N maps to peer N
|
||||
# i.e. this says that share0 should be on peer0, share1 should
|
||||
|
@ -4,6 +4,7 @@ reporting it in messages
|
||||
"""
|
||||
|
||||
from copy import deepcopy
|
||||
from allmydata.immutable.happiness_upload import share_placement, calculate_happiness
|
||||
|
||||
def failure_message(peer_count, k, happy, effective_happy):
|
||||
# If peer_count < needed_shares, this error message makes more
|
||||
@ -78,225 +79,13 @@ def merge_servers(servermap, upload_trackers=None):
|
||||
return servermap
|
||||
|
||||
def servers_of_happiness(sharemap):
|
||||
"""
|
||||
I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
|
||||
return the 'servers_of_happiness' number that sharemap results in.
|
||||
|
||||
To calculate the 'servers_of_happiness' number for the sharemap, I
|
||||
construct a bipartite graph with servers in one partition of vertices
|
||||
and shares in the other, and with an edge between a server s and a share t
|
||||
if s is to store t. I then compute the size of a maximum matching in
|
||||
the resulting graph; this is then returned as the 'servers_of_happiness'
|
||||
for my arguments.
|
||||
|
||||
For example, consider the following layout:
|
||||
|
||||
server 1: shares 1, 2, 3, 4
|
||||
server 2: share 6
|
||||
server 3: share 3
|
||||
server 4: share 4
|
||||
server 5: share 2
|
||||
|
||||
From this, we can construct the following graph:
|
||||
|
||||
L = {server 1, server 2, server 3, server 4, server 5}
|
||||
R = {share 1, share 2, share 3, share 4, share 6}
|
||||
V = L U R
|
||||
E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
|
||||
(server 1, share 4), (server 2, share 6), (server 3, share 3),
|
||||
(server 4, share 4), (server 5, share 2)}
|
||||
G = (V, E)
|
||||
|
||||
Note that G is bipartite since every edge in e has one endpoint in L
|
||||
and one endpoint in R.
|
||||
|
||||
A matching in a graph G is a subset M of E such that, for any vertex
|
||||
v in V, v is incident to at most one edge of M. A maximum matching
|
||||
in G is a matching that is no smaller than any other matching. For
|
||||
this graph, a matching of cardinality 5 is:
|
||||
|
||||
M = {(server 1, share 1), (server 2, share 6),
|
||||
(server 3, share 3), (server 4, share 4),
|
||||
(server 5, share 2)}
|
||||
|
||||
Since G is bipartite, and since |L| = 5, we cannot have an M' such
|
||||
that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
|
||||
as long as k <= 5, we can see that the layout above has
|
||||
servers_of_happiness = 5, which matches the results here.
|
||||
"""
|
||||
if sharemap == {}:
|
||||
return 0
|
||||
servermap = shares_by_server(sharemap)
|
||||
graph = flow_network_for(servermap)
|
||||
# This is an implementation of the Ford-Fulkerson method for finding
|
||||
# a maximum flow in a flow network applied to a bipartite graph.
|
||||
# Specifically, it is the Edmonds-Karp algorithm, since it uses a
|
||||
# BFS to find the shortest augmenting path at each iteration, if one
|
||||
# exists.
|
||||
#
|
||||
# The implementation here is an adapation of an algorithm described in
|
||||
# "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
|
||||
dim = len(graph)
|
||||
flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
|
||||
residual_graph, residual_function = residual_network(graph, flow_function)
|
||||
while augmenting_path_for(residual_graph):
|
||||
path = augmenting_path_for(residual_graph)
|
||||
# Delta is the largest amount that we can increase flow across
|
||||
# all of the edges in path. Because of the way that the residual
|
||||
# function is constructed, f[u][v] for a particular edge (u, v)
|
||||
# is the amount of unused capacity on that edge. Taking the
|
||||
# minimum of a list of those values for each edge in the
|
||||
# augmenting path gives us our delta.
|
||||
delta = min(map(lambda (u, v), rf=residual_function: rf[u][v],
|
||||
path))
|
||||
for (u, v) in path:
|
||||
flow_function[u][v] += delta
|
||||
flow_function[v][u] -= delta
|
||||
residual_graph, residual_function = residual_network(graph,
|
||||
flow_function)
|
||||
num_servers = len(servermap)
|
||||
# The value of a flow is the total flow out of the source vertex
|
||||
# (vertex 0, in our graph). We could just as well sum across all of
|
||||
# f[0], but we know that vertex 0 only has edges to the servers in
|
||||
# our graph, so we can stop after summing flow across those. The
|
||||
# value of a flow computed in this way is the size of a maximum
|
||||
# matching on the bipartite graph described above.
|
||||
return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
|
||||
|
||||
def flow_network_for(servermap):
|
||||
"""
|
||||
I take my argument, a dict of peerid -> set(shareid) mappings, and
|
||||
turn it into a flow network suitable for use with Edmonds-Karp. I
|
||||
then return the adjacency list representation of that network.
|
||||
|
||||
Specifically, I build G = (V, E), where:
|
||||
V = { peerid in servermap } U { shareid in servermap } U {s, t}
|
||||
E = {(s, peerid) for each peerid}
|
||||
U {(peerid, shareid) if peerid is to store shareid }
|
||||
U {(shareid, t) for each shareid}
|
||||
|
||||
s and t will be source and sink nodes when my caller starts treating
|
||||
the graph I return like a flow network. Without s and t, the
|
||||
returned graph is bipartite.
|
||||
"""
|
||||
# Servers don't have integral identifiers, and we can't make any
|
||||
# assumptions about the way shares are indexed -- it's possible that
|
||||
# there are missing shares, for example. So before making a graph,
|
||||
# we re-index so that all of our vertices have integral indices, and
|
||||
# that there aren't any holes. We start indexing at 1, so that we
|
||||
# can add a source node at index 0.
|
||||
servermap, num_shares = reindex(servermap, base_index=1)
|
||||
num_servers = len(servermap)
|
||||
graph = [] # index -> [index], an adjacency list
|
||||
# Add an entry at the top (index 0) that has an edge to every server
|
||||
# in servermap
|
||||
graph.append(servermap.keys())
|
||||
# For each server, add an entry that has an edge to every share that it
|
||||
# contains (or will contain).
|
||||
for k in servermap:
|
||||
graph.append(servermap[k])
|
||||
# For each share, add an entry that has an edge to the sink.
|
||||
sink_num = num_servers + num_shares + 1
|
||||
for i in xrange(num_shares):
|
||||
graph.append([sink_num])
|
||||
# Add an empty entry for the sink, which has no outbound edges.
|
||||
graph.append([])
|
||||
return graph
|
||||
|
||||
def reindex(servermap, base_index):
|
||||
"""
|
||||
Given servermap, I map peerids and shareids to integers that don't
|
||||
conflict with each other, so they're useful as indices in a graph. I
|
||||
return a servermap that is reindexed appropriately, and also the
|
||||
number of distinct shares in the resulting servermap as a convenience
|
||||
for my caller. base_index tells me where to start indexing.
|
||||
"""
|
||||
shares = {} # shareid -> vertex index
|
||||
num = base_index
|
||||
ret = {} # peerid -> [shareid], a reindexed servermap.
|
||||
# Number the servers first
|
||||
for k in servermap:
|
||||
ret[num] = servermap[k]
|
||||
num += 1
|
||||
# Number the shares
|
||||
for k in ret:
|
||||
for shnum in ret[k]:
|
||||
if not shares.has_key(shnum):
|
||||
shares[shnum] = num
|
||||
num += 1
|
||||
ret[k] = map(lambda x: shares[x], ret[k])
|
||||
return (ret, len(shares))
|
||||
|
||||
def residual_network(graph, f):
|
||||
"""
|
||||
I return the residual network and residual capacity function of the
|
||||
flow network represented by my graph and f arguments. graph is a
|
||||
flow network in adjacency-list form, and f is a flow in graph.
|
||||
"""
|
||||
new_graph = [[] for i in xrange(len(graph))]
|
||||
cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
|
||||
for i in xrange(len(graph)):
|
||||
for v in graph[i]:
|
||||
if f[i][v] == 1:
|
||||
# We add an edge (v, i) with cf[v,i] = 1. This means
|
||||
# that we can remove 1 unit of flow from the edge (i, v)
|
||||
new_graph[v].append(i)
|
||||
cf[v][i] = 1
|
||||
cf[i][v] = -1
|
||||
else:
|
||||
# We add the edge (i, v), since we're not using it right
|
||||
# now.
|
||||
new_graph[i].append(v)
|
||||
cf[i][v] = 1
|
||||
cf[v][i] = -1
|
||||
return (new_graph, cf)
|
||||
|
||||
def augmenting_path_for(graph):
|
||||
"""
|
||||
I return an augmenting path, if there is one, from the source node
|
||||
to the sink node in the flow network represented by my graph argument.
|
||||
If there is no augmenting path, I return False. I assume that the
|
||||
source node is at index 0 of graph, and the sink node is at the last
|
||||
index. I also assume that graph is a flow network in adjacency list
|
||||
form.
|
||||
"""
|
||||
bfs_tree = bfs(graph, 0)
|
||||
if bfs_tree[len(graph) - 1]:
|
||||
n = len(graph) - 1
|
||||
path = [] # [(u, v)], where u and v are vertices in the graph
|
||||
while n != 0:
|
||||
path.insert(0, (bfs_tree[n], n))
|
||||
n = bfs_tree[n]
|
||||
return path
|
||||
return False
|
||||
|
||||
def bfs(graph, s):
|
||||
"""
|
||||
Perform a BFS on graph starting at s, where graph is a graph in
|
||||
adjacency list form, and s is a node in graph. I return the
|
||||
predecessor table that the BFS generates.
|
||||
"""
|
||||
# This is an adaptation of the BFS described in "Introduction to
|
||||
# Algorithms", Cormen et al, 2nd ed., p. 532.
|
||||
# WHITE vertices are those that we haven't seen or explored yet.
|
||||
WHITE = 0
|
||||
# GRAY vertices are those we have seen, but haven't explored yet
|
||||
GRAY = 1
|
||||
# BLACK vertices are those we have seen and explored
|
||||
BLACK = 2
|
||||
color = [WHITE for i in xrange(len(graph))]
|
||||
predecessor = [None for i in xrange(len(graph))]
|
||||
distance = [-1 for i in xrange(len(graph))]
|
||||
queue = [s] # vertices that we haven't explored yet.
|
||||
color[s] = GRAY
|
||||
distance[s] = 0
|
||||
while queue:
|
||||
n = queue.pop(0)
|
||||
for v in graph[n]:
|
||||
if color[v] == WHITE:
|
||||
color[v] = GRAY
|
||||
distance[v] = distance[n] + 1
|
||||
predecessor[v] = n
|
||||
queue.append(v)
|
||||
color[n] = BLACK
|
||||
return predecessor
|
||||
peers = sharemap.values()
|
||||
if len(peers) == 1:
|
||||
peers = peers[0]
|
||||
else:
|
||||
peers = [list(x)[0] for x in peers] # XXX
|
||||
shares = sharemap.keys()
|
||||
readonly_peers = set() # XXX
|
||||
peers_to_shares = shares_by_server(sharemap)
|
||||
places0 = share_placement(peers, readonly_peers, shares, peers_to_shares)
|
||||
return calculate_happiness(places0)
|
||||
|
Loading…
Reference in New Issue
Block a user