WIP: refactoring (squahshed all meejah's commits)

This commit is contained in:
meejah 2016-10-11 16:47:53 -06:00
parent 211dc602fa
commit adb9a98383
6 changed files with 477 additions and 27 deletions

View File

@ -112,6 +112,10 @@ We calculate share placement like so:
1. Query all servers for existing shares.
1a. Query remaining space from all servers. Every server that has
enough free space is considered "readwrite" and every server with too
little space is "readonly".
2. Construct a bipartite graph G1 of *readonly* servers to pre-existing
shares, where an edge exists between an arbitrary readonly server S and an
arbitrary share T if and only if S currently holds T.
@ -132,12 +136,11 @@ We calculate share placement like so:
5. Calculate a maximum matching graph of G2, call this M2, again preferring
earlier servers.
6. Construct a bipartite graph G3 of (only readwrite) servers to shares. Let
an edge exist between server S and share T if and only if S already has T,
or *could* hold T (i.e. S has enough available space to hold a share of at
least T's size). Then remove (from G3) any servers and shares used in M1
or M2 (note that we retain servers/shares that were in G1/G2 but *not* in
the M1/M2 subsets)
6. Construct a bipartite graph G3 of (only readwrite) servers to
shares (some shares may already exist on a server). Then remove
(from G3) any servers and shares used in M1 or M2 (note that we
retain servers/shares that were in G1/G2 but *not* in the M1/M2
subsets)
7. Calculate a maximum matching graph of G3, call this M3, preferring earlier
servers. The final placement table is the union of M1+M2+M3.

View File

@ -1,7 +1,322 @@
from Queue import PriorityQueue
from allmydata.util.happinessutil import augmenting_path_for, residual_network
class Happiness_Upload:
def _query_all_shares(servermap, readonly_peers):
readonly_shares = set()
readonly_map = {}
for peer in servermap:
print("peer", peer)
if peer in readonly_peers:
readonly_map.setdefault(peer, servermap[peer])
for share in servermap[peer]:
readonly_shares.add(share)
return readonly_shares
def _convert_mappings(index_to_peer, index_to_share, maximum_graph):
"""
Now that a maximum spanning graph has been found, convert the indexes
back to their original ids so that the client can pass them to the
uploader.
"""
converted_mappings = {}
for share in maximum_graph:
peer = maximum_graph[share]
if peer == None:
converted_mappings.setdefault(index_to_share[share], None)
else:
converted_mappings.setdefault(index_to_share[share], set([index_to_peer[peer]]))
return converted_mappings
def _compute_maximum_graph(graph, shareIndices):
"""
This is an implementation of the Ford-Fulkerson method for finding
a maximum flow in a flow network applied to a bipartite graph.
Specifically, it is the Edmonds-Karp algorithm, since it uses a
breadth-first search to find the shortest augmenting path at each
iteration, if one exists.
The implementation here is an adapation of an algorithm described in
"Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
"""
if graph == []:
return {}
dim = len(graph)
flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
residual_graph, residual_function = residual_network(graph, flow_function)
while augmenting_path_for(residual_graph):
path = augmenting_path_for(residual_graph)
# Delta is the largest amount that we can increase flow across
# all of the edges in path. Because of the way that the residual
# function is constructed, f[u][v] for a particular edge (u, v)
# is the amount of unused capacity on that edge. Taking the
# minimum of a list of those values for each edge in the
# augmenting path gives us our delta.
delta = min(map(lambda (u, v), rf=residual_function: rf[u][v],
path))
for (u, v) in path:
flow_function[u][v] += delta
flow_function[v][u] -= delta
residual_graph, residual_function = residual_network(graph,flow_function)
new_mappings = {}
for shareIndex in shareIndices:
peer = residual_graph[shareIndex]
if peer == [dim - 1]:
new_mappings.setdefault(shareIndex, None)
else:
new_mappings.setdefault(shareIndex, peer[0])
return new_mappings
def _flow_network(peerIndices, shareIndices):
"""
Given set of peerIndices and a set of shareIndices, I create a flow network
to be used by _compute_maximum_graph. The return value is a two
dimensional list in the form of a flow network, where each index represents
a node, and the corresponding list represents all of the nodes it is connected
to.
This function is similar to allmydata.util.happinessutil.flow_network_for, but
we connect every peer with all shares instead of reflecting a supplied servermap.
"""
graph = []
# The first entry in our flow network is the source.
# Connect the source to every server.
graph.append(peerIndices)
sink_num = len(peerIndices + shareIndices) + 1
# Connect every server with every share it can possibly store.
for peerIndex in peerIndices:
graph.insert(peerIndex, shareIndices)
# Connect every share with the sink.
for shareIndex in shareIndices:
graph.insert(shareIndex, [sink_num])
# Add an empty entry for the sink.
graph.append([])
return graph
def _servermap_flow_graph(peers, shares, servermap):
"""
Generates a flow network of peerIndices to shareIndices from a server map
of 'peer' -> ['shares']. According to Wikipedia, "a flow network is a
directed graph where each edge has a capacity and each edge receives a flow.
The amount of flow on an edge cannot exceed the capacity of the edge." This
is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm
converts the problem into a maximum flow problem.
"""
if servermap == {}:
return []
peer_to_index, index_to_peer = _reindex(peers, 1)
share_to_index, index_to_share = _reindex(shares, len(peers) + 1)
graph = []
sink_num = len(peers) + len(shares) + 1
graph.append([peer_to_index[peer] for peer in peers])
for peer in peers:
indexedShares = [share_to_index[s] for s in servermap[peer]]
graph.insert(peer_to_index[peer], indexedShares)
for share in shares:
graph.insert(share_to_index[share], [sink_num])
graph.append([])
return graph
def _reindex(items, base):
"""
I take an iteratble of items and give each item an index to be used in
the construction of a flow network. Indices for these items start at base
and continue to base + len(items) - 1.
I return two dictionaries: ({item: index}, {index: item})
"""
item_to_index = {}
index_to_item = {}
for item in items:
item_to_index.setdefault(item, base)
index_to_item.setdefault(base, item)
base += 1
return (item_to_index, index_to_item)
def _maximum_matching_graph(graph, servermap):
"""
:param graph: an iterable of (server, share) 2-tuples
Calculate the maximum matching of the bipartite graph (U, V, E)
such that:
U = peers
V = shares
E = peers x shares
Returns a dictionary {share -> set(peer)}, indicating that the share
should be placed on each peer in the set. If a share's corresponding
value is None, the share can be placed on any server. Note that the set
of peers should only be one peer when returned.
"""
peers = [x[0] for x in graph]
shares = [x[1] for x in graph]
peer_to_index, index_to_peer = _reindex(peers, 1)
share_to_index, index_to_share = _reindex(shares, len(peers) + 1)
shareIndices = [share_to_index[s] for s in shares]
if servermap:
graph = _servermap_flow_graph(peers, shares, servermap)
else:
peerIndices = [peer_to_index[peer] for peer in peers]
graph = _flow_network(peerIndices, shareIndices)
max_graph = _compute_maximum_graph(graph, shareIndices)
return _convert_mappings(index_to_peer, index_to_share, max_graph)
def _filter_g3(g3, m1, m2):
"""
This implements the last part of 'step 6' in the spec, "Then
remove (from G3) any servers and shares used in M1 or M2 (note
that we retain servers/shares that were in G1/G2 but *not* in the
M1/M2 subsets)"
"""
# m1, m2 are dicts from share -> set(peers)
# (but I think the set size is always 1 .. so maybe we could fix that everywhere)
m12_servers = reduce(lambda a, b: a.union(b), m1.values() + m2.values())
m12_shares = set(m1.keys() + m2.keys())
new_g3 = set()
for edge in g3:
if edge[0] not in m12_servers and edge[1] not in m12_shares:
new_g3.add(edge)
return new_g3
def _merge_dicts(result, inc):
"""
given two dicts mapping key -> set(), merge the *values* of the
'inc' dict into the value of the 'result' dict if the value is not
None.
Note that this *mutates* 'result'
"""
for k, v in inc.items():
existing = result.get(k, None)
if existing is None:
result[k] = v
elif v is not None:
result[k] = existing.union(v)
def share_placement(peers, readonly_peers, shares, peers_to_shares={}):
"""
:param servers: ordered list of servers, "Maybe *2N* of them."
working from servers-of-happiness.rst, in kind-of pseudo-code
"""
# "1. Query all servers for existing shares."
#shares = _query_all_shares(servers, peers)
#print("shares", shares)
# "2. Construct a bipartite graph G1 of *readonly* servers to pre-existing
# shares, where an edge exists between an arbitrary readonly server S and an
# arbitrary share T if and only if S currently holds T."
g1 = set()
for share in shares:
for server in peers:
if server in readonly_peers and share in peers_to_shares.get(server, set()):
g1.add((server, share))
# 3. Calculate a maximum matching graph of G1 (a set of S->T edges that has or
# is-tied-for the highest "happiness score"). There is a clever efficient
# algorithm for this, named "Ford-Fulkerson". There may be more than one
# maximum matching for this graph; we choose one of them arbitrarily, but
# prefer earlier servers. Call this particular placement M1. The placement
# maps shares to servers, where each share appears at most once, and each
# server appears at most once.
m1 = _maximum_matching_graph(g1, peers_to_shares)#peers, shares)
if False:
print("M1:")
for k, v in m1.items():
print(" {}: {}".format(k, v))
# 4. Construct a bipartite graph G2 of readwrite servers to pre-existing
# shares. Then remove any edge (from G2) that uses a server or a share found
# in M1. Let an edge exist between server S and share T if and only if S
# already holds T.
g2 = set()
for g2_server, g2_shares in peers_to_shares.items():
for share in g2_shares:
g2.add((g2_server, share))
for server, share in m1.items():
for g2server, g2share in g2:
if g2server == server or g2share == share:
g2.remove((g2server, g2share))
# 5. Calculate a maximum matching graph of G2, call this M2, again preferring
# earlier servers.
m2 = _maximum_matching_graph(g2, peers_to_shares)
if False:
print("M2:")
for k, v in m2.items():
print(" {}: {}".format(k, v))
# 6. Construct a bipartite graph G3 of (only readwrite) servers to
# shares (some shares may already exist on a server). Then remove
# (from G3) any servers and shares used in M1 or M2 (note that we
# retain servers/shares that were in G1/G2 but *not* in the M1/M2
# subsets)
# meejah: does that last sentence mean remove *any* edge with any
# server in M1?? or just "remove any edge found in M1/M2"? (Wait,
# is that last sentence backwards? G1 a subset of M1?)
readwrite = set(peers).difference(set(readonly_peers))
g3 = [
(server, share) for server in readwrite for share in shares
]
g3 = _filter_g3(g3, m1, m2)
if False:
print("G3:")
for srv, shr in g3:
print(" {}->{}".format(srv, shr))
# 7. Calculate a maximum matching graph of G3, call this M3, preferring earlier
# servers. The final placement table is the union of M1+M2+M3.
m3 = _maximum_matching_graph(g3, {})#, peers_to_shares)
answer = dict()
_merge_dicts(answer, m1)
_merge_dicts(answer, m2)
_merge_dicts(answer, m3)
# anything left over that has "None" instead of a 1-set of peers
# should be part of the "evenly distribute amongst readwrite
# servers" thing.
# See "Properties of Upload Strategy of Happiness" in the spec:
# "The size of the maximum bipartite matching is bounded by the size of the smaller
# set of vertices. Therefore in a situation where the set of servers is smaller
# than the set of shares, placement is not generated for a subset of shares. In
# this case the remaining shares are distributed as evenly as possible across the
# set of writable servers."
def peer_generator():
while True:
for peer in readwrite:
yield peer
round_robin_peers = peer_generator()
for k, v in answer.items():
if v is None:
answer[k] = {next(round_robin_peers)}
# XXX we should probably actually return share->peer instead of
# share->set(peer) where the set-size is 1 because sets are a pain
# to deal with (i.e. no indexing).
return answer
class HappinessUpload:
"""
I handle the calculations involved with generating the maximum
spanning graph for a file when given a set of peers, a set of shares,
@ -11,6 +326,7 @@ class Happiness_Upload:
docs/specifications/servers-of-happiness.rst
"""
# HappinessUpload(self.peers, self.full_peers, shares, self.existing_shares)
def __init__(self, peers, readonly_peers, shares, servermap={}):
self._happiness = 0
self.homeless_shares = set()

View File

@ -14,8 +14,7 @@ from allmydata.storage.server import si_b2a
from allmydata.immutable import encode
from allmydata.util import base32, dictutil, idlib, log, mathutil
from allmydata.util.happinessutil import servers_of_happiness, \
shares_by_server, merge_servers, \
failure_message
merge_servers, failure_message
from allmydata.util.assertutil import precondition, _assert
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
@ -26,7 +25,7 @@ from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
from cStringIO import StringIO
from happiness_upload import Happiness_Upload
from happiness_upload import HappinessUpload
# this wants to live in storage, not here
@ -161,14 +160,14 @@ class ServerTracker:
sharenums,
self.allocated_size,
canary=Referenceable())
d.addCallback(self._got_reply)
d.addCallback(self._buckets_allocated)
return d
def ask_about_existing_shares(self):
rref = self._server.get_rref()
return rref.callRemote("get_buckets", self.storage_index)
def _got_reply(self, (alreadygot, buckets)):
def _buckets_allocated(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
b = {}
for sharenum, rref in buckets.iteritems():
@ -253,7 +252,7 @@ class PeerSelector():
def get_tasks(self):
shares = set(range(self.total_shares))
self.h = Happiness_Upload(self.peers, self.full_peers, shares, self.existing_shares)
self.h = HappinessUpload(self.peers, self.full_peers, shares, self.existing_shares)
return self.h.generate_mappings()
def is_healthy(self):
@ -324,6 +323,11 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
share_size, 0, num_segments,
num_share_hashes, EXTENSION_SIZE)
allocated_size = wbp.get_allocated_size()
# see docs/specifications/servers-of-happiness.rst
# 0. Start with an ordered list of servers. Maybe *2N* of them.
#
all_servers = storage_broker.get_servers_for_psi(storage_index)
if not all_servers:
raise NoServersError("client gave us zero servers")
@ -388,6 +392,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# servers_of_happiness accounting, then we forget about them.
readonly_trackers = _make_trackers(readonly_servers)
# see docs/specifications/servers-of-happiness.rst
# 1. Query all servers for existing shares.
#
# We now ask servers that can't hold any new shares about existing
# shares that they might have for our SI. Once this is done, we
# start placing the shares that we haven't already accounted
@ -985,22 +993,28 @@ class CHKUploader:
return defer.succeed(None)
return self._encoder.abort()
@defer.inlineCallbacks
def start_encrypted(self, encrypted):
""" Returns a Deferred that will fire with the UploadResults instance. """
"""
Returns a Deferred that will fire with the UploadResults instance.
"""
eu = IEncryptedUploadable(encrypted)
started = time.time()
self._encoder = e = encode.Encoder(
# would be Really Nice to make Encoder just a local; only
# abort() really needs self._encoder ...
self._encoder = encode.Encoder(
self._log_number,
self._upload_status,
progress=self._progress,
)
d = e.set_encrypted_uploadable(eu)
d.addCallback(self.locate_all_shareholders, started)
d.addCallback(self.set_shareholders, e)
d.addCallback(lambda res: e.start())
d.addCallback(self._encrypted_done)
return d
# this just returns itself
yield self._encoder.set_encrypted_uploadable(eu)
(upload_trackers, already_serverids) = yield self.locate_all_shareholders(self._encoder, started)
yield self.set_shareholders(upload_trackers, already_serverids, self._encoder)
verifycap = yield self._encoder.start()
results = yield self._encrypted_done(verifycap)
defer.returnValue(results)
def locate_all_shareholders(self, encoder, started):
server_selection_started = now = time.time()
@ -1031,13 +1045,13 @@ class CHKUploader:
d.addCallback(_done)
return d
def set_shareholders(self, (upload_trackers, already_serverids), encoder):
def set_shareholders(self, upload_trackers, already_serverids, encoder):
"""
@param upload_trackers: a sequence of ServerTracker objects that
:param upload_trackers: a sequence of ServerTracker objects that
have agreed to hold some shares for us (the
shareids are stashed inside the ServerTracker)
@paran already_serverids: a dict mapping sharenum to a set of
:param already_serverids: a dict mapping sharenum to a set of
serverids for servers that claim to already
have this share
"""

View File

@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
from twisted.trial import unittest
from allmydata.immutable import happiness_upload
from allmydata.util.happinessutil import augmenting_path_for, residual_network
class HappinessUtils(unittest.TestCase):
"""
test-cases for utility functions augmenting_path_for and residual_network
"""
def test_residual_0(self):
graph = happiness_upload._servermap_flow_graph(
['peer0'],
['share0'],
servermap={
'peer0': ['share0'],
}
)
flow = [[0 for _ in graph] for _ in graph]
residual, capacity = residual_network(graph, flow)
# XXX no idea if these are right; hand-verify
self.assertEqual(residual, [[1], [2], [3], []])
self.assertEqual(capacity, [[0, 1, 0, 0], [-1, 0, 1, 0], [0, -1, 0, 1], [0, 0, -1, 0]])
class Happiness(unittest.TestCase):
def test_original_easy(self):
shares = {'share0', 'share1', 'share2'}
peers = {'peer0', 'peer1'}
readonly_peers = set()
servermap = {
'peer0': {'share0'},
'peer1': {'share2'},
}
places0 = happiness_upload.HappinessUpload(peers, readonly_peers, shares, servermap).generate_mappings()
self.assertTrue('peer0' in places0['share0'])
self.assertTrue('peer1' in places0['share2'])
def test_placement_simple(self):
shares = {'share0', 'share1', 'share2'}
peers = {
'peer0',
'peer1',
}
readonly_peers = {'peer0'}
peers_to_shares = {
'peer0': {'share2'},
'peer1': [],
}
places0 = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares)
places1 = happiness_upload.HappinessUpload(peers, readonly_peers, shares).generate_mappings()
if False:
print("places0")
for k, v in places0.items():
print(" {} -> {}".format(k, v))
print("places1")
for k, v in places1.items():
print(" {} -> {}".format(k, v))
self.assertEqual(
places0,
{
'share0': {'peer1'},
'share1': {'peer1'},
'share2': {'peer0'},
}
)
def test_placement_1(self):
shares = {
'share0', 'share1', 'share2',
'share3', 'share4', 'share5',
'share7', 'share8', 'share9',
}
peers = {
'peer0', 'peer1', 'peer2', 'peer3',
'peer4', 'peer5', 'peer6', 'peer7',
'peer8', 'peer9', 'peerA', 'peerB',
}
readonly_peers = {'peer0', 'peer1', 'peer2', 'peer3'}
peers_to_shares = {
'peer0': {'share0'},
'peer1': {'share1'},
'peer2': {'share2'},
'peer3': {'share3'},
'peer4': {'share4'},
'peer5': {'share5'},
'peer6': {'share6'},
'peer7': {'share7'},
'peer8': {'share8'},
'peer9': {'share9'},
'peerA': set(),
'peerB': set(),
}
places0 = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares)
places1 = happiness_upload.HappinessUpload(peers, readonly_peers, shares).generate_mappings()
# share N maps to peer N
# i.e. this says that share0 should be on peer0, share1 should
# be on peer1, etc.
expected = {
'share{}'.format(i): {'peer{}'.format(i)}
for i in range(10)
}
self.assertEqual(expected, places0)

View File

@ -11,13 +11,13 @@ import allmydata # for __full_version__
from allmydata import uri, monitor, client
from allmydata.immutable import upload, encode
from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
from allmydata.util import log, base32, fileutil
from allmydata.util import log, base32
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata.test.no_network import GridTestMixin
from allmydata.test.common_util import ShouldFailMixin
from allmydata.util.happinessutil import servers_of_happiness, \
shares_by_server, merge_servers
shares_by_server, merge_servers
from allmydata.storage_client import StorageFarmBroker
from allmydata.storage.server import storage_index_to_dir
from allmydata.client import Client

View File

@ -1094,7 +1094,7 @@ class Grid(GridTestMixin, WebErrorMixin, ShouldFailMixin, testutil.ReallyEqualMi
" overdue= unused= need 3. Last failure: None")
msg2 = msgbase + (" ran out of shares:"
" complete="
" pending=Share(sh0-on-xgru5)"
" pending=Share(sh0-on-ysbz4st7)"
" overdue= unused= need 3. Last failure: None")
self.failUnless(body == msg1 or body == msg2, body)
d.addCallback(_check_one_share)