Implements 'Servers of Happiness' algorithm for upload

This is Mark Berger's original commits, from ticket #1382
This commit is contained in:
Mark Berger 2013-06-17 13:38:49 -04:00 committed by meejah
parent 42d8a79c9d
commit 17cff7a176
11 changed files with 713 additions and 266 deletions

View File

@ -90,3 +90,69 @@ issues.
We don't use servers-of-happiness for mutable files yet; this fix will
likely come in Tahoe-LAFS version 1.13.
============================
Upload Strategy of Happiness
============================
As mentioned above, the uploader is good at detecting instances which
do not pass the servers-of-happiness test, but the share distribution algorithm
is not always successful in instances where happiness can be achieved. A new
placement algorithm designed to pass the servers-of-happiness test, titled
'Upload Strategy of Happiness', is meant to fix these instances where the uploader
is unable to achieve happiness.
Calculating Share Placements
============================
We calculate share placement like so:
1. Query 2n servers for existing shares.
2. Construct a bipartite graph of readonly servers to shares, where an edge
exists between an arbitrary readonly server s and an arbitrary share n if and only if s
holds n.
3. Calculate the maximum matching graph of the bipartite graph. The maxmum matching
is the matching which contains the largest possible number of edges.
4. Construct a bipartite graph of servers to shares, removing any servers and
shares used in the maximum matching graph from step 3. Let an edge exist between
server s and share n if and only if s holds n.
5. Calculate the maximum matching graph of the new graph.
6. Construct a bipartite graph of servers to share, removing any servers and
shares used in the maximum matching graphs from steps 3 and 5. Let an edge exist
between server s and share n if and only if s can hold n.
7. Calculate the maximum matching graph of the new graph.
8. Renew the shares on their respective servers from steps 3
and 5.
9. Place share n on server s if an edge exists between s and n in the
maximum matching graph from step 7.
10. If any placements from step 7 fail, remove the server from the set of possible
servers and regenerate the matchings.
Properties of Upload Strategy of Happiness
==========================================
The size of the maximum bipartite matching is bounded by the size of the smaller
set of vertices. Therefore in a situation where the set of servers is smaller
than the set of shares, placement is not generated for a subset of shares. In
this case the remaining shares are distributed as evenly as possible across the
set of writable servers.
If the servers-of-happiness criteria can be met, the upload strategy of
happiness guarantees that H shares will be placed on the network. During file
repair, if the set of servers is larger than N, the algorithm will only attempt
to spread shares over N distinct servers. For both initial file upload and file
repair, N should be viewed as the maximum number of distinct servers shares
can be placed on, and H as the minimum amount. The uploader will fail if
the number of distinct servers is less than H, and it will never attempt to
exceed N.

View File

@ -63,6 +63,7 @@ class ShareFinder:
if not self._started:
si = self.verifycap.storage_index
servers = self._storage_broker.get_servers_for_psi(si)
servers.sort(key=lambda s: s.get_serverid())
self._servers = iter(servers)
self._started = True

View File

@ -0,0 +1,314 @@
from Queue import PriorityQueue
from allmydata.util.happinessutil import augmenting_path_for, residual_network
class Happiness_Upload:
"""
I handle the calculations involved with generating the maximum
spanning graph for a file when given a set of peers, a set of shares,
and a servermap of 'peer' -> [shares].
For more information on the algorithm this class implements, refer to
docs/specifications/servers-of-happiness.rst
"""
def __init__(self, peers, readonly_peers, shares, servermap={}):
self._happiness = 0
self.homeless_shares = set()
self.peers = peers
self.readonly_peers = readonly_peers
self.shares = shares
self.servermap = servermap
def happiness(self):
return self._happiness
def generate_mappings(self):
"""
Generates the allocations the upload should based on the given
information. We construct a dictionary of 'share_num' -> set(server_ids)
and return it to the caller. Each share should be placed on each server
in the corresponding set. Existing allocations appear as placements
because attempting to place an existing allocation will renew the share.
"""
# First calculate share placement for the readonly servers.
readonly_peers = self.readonly_peers
readonly_shares = set()
readonly_map = {}
for peer in self.servermap:
if peer in self.readonly_peers:
readonly_map.setdefault(peer, self.servermap[peer])
for share in self.servermap[peer]:
readonly_shares.add(share)
readonly_mappings = self._calculate_mappings(readonly_peers, readonly_shares, readonly_map)
used_peers, used_shares = self._extract_ids(readonly_mappings)
# Calculate share placement for the remaining existing allocations
peers = set(self.servermap.keys()) - used_peers
# Squash a list of sets into one set
shares = set(item for subset in self.servermap.values() for item in subset)
shares -= used_shares
servermap = self.servermap.copy()
for peer in self.servermap:
if peer in used_peers:
servermap.pop(peer, None)
else:
servermap[peer] = servermap[peer] - used_shares
if servermap[peer] == set():
servermap.pop(peer, None)
peers.remove(peer)
existing_mappings = self._calculate_mappings(peers, shares, servermap)
existing_peers, existing_shares = self._extract_ids(existing_mappings)
# Calculate share placement for the remaining peers and shares which
# won't be preserved by existing allocations.
peers = self.peers - existing_peers - used_peers
shares = self.shares - existing_shares - used_shares
new_mappings = self._calculate_mappings(peers, shares)
mappings = dict(readonly_mappings.items() + existing_mappings.items() + new_mappings.items())
self._calculate_happiness(mappings)
if len(self.homeless_shares) != 0:
all_shares = set(item for subset in self.servermap.values() for item in subset)
self._distribute_homeless_shares(mappings, all_shares)
return mappings
def _calculate_mappings(self, peers, shares, servermap=None):
"""
Given a set of peers, a set of shares, and a dictionary of server ->
set(shares), determine how the uploader should allocate shares. If a
servermap is supplied, determine which existing allocations should be
preserved. If servermap is None, calculate the maximum matching of the
bipartite graph (U, V, E) such that:
U = peers
V = shares
E = peers x shares
Returns a dictionary {share -> set(peer)}, indicating that the share
should be placed on each peer in the set. If a share's corresponding
value is None, the share can be placed on any server. Note that the set
of peers should only be one peer when returned, but it is possible to
duplicate shares by adding additional servers to the set.
"""
peer_to_index, index_to_peer = self._reindex(peers, 1)
share_to_index, index_to_share = self._reindex(shares, len(peers) + 1)
shareIndices = [share_to_index[s] for s in shares]
if servermap:
graph = self._servermap_flow_graph(peers, shares, servermap)
else:
peerIndices = [peer_to_index[peer] for peer in peers]
graph = self._flow_network(peerIndices, shareIndices)
max_graph = self._compute_maximum_graph(graph, shareIndices)
return self._convert_mappings(index_to_peer, index_to_share, max_graph)
def _compute_maximum_graph(self, graph, shareIndices):
"""
This is an implementation of the Ford-Fulkerson method for finding
a maximum flow in a flow network applied to a bipartite graph.
Specifically, it is the Edmonds-Karp algorithm, since it uses a
BFS to find the shortest augmenting path at each iteration, if one
exists.
The implementation here is an adapation of an algorithm described in
"Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
"""
if graph == []:
return {}
dim = len(graph)
flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
residual_graph, residual_function = residual_network(graph, flow_function)
while augmenting_path_for(residual_graph):
path = augmenting_path_for(residual_graph)
# Delta is the largest amount that we can increase flow across
# all of the edges in path. Because of the way that the residual
# function is constructed, f[u][v] for a particular edge (u, v)
# is the amount of unused capacity on that edge. Taking the
# minimum of a list of those values for each edge in the
# augmenting path gives us our delta.
delta = min(map(lambda (u, v), rf=residual_function: rf[u][v],
path))
for (u, v) in path:
flow_function[u][v] += delta
flow_function[v][u] -= delta
residual_graph, residual_function = residual_network(graph,flow_function)
new_mappings = {}
for shareIndex in shareIndices:
peer = residual_graph[shareIndex]
if peer == [dim - 1]:
new_mappings.setdefault(shareIndex, None)
else:
new_mappings.setdefault(shareIndex, peer[0])
return new_mappings
def _extract_ids(self, mappings):
shares = set()
peers = set()
for share in mappings:
if mappings[share] == None:
pass
else:
shares.add(share)
for item in mappings[share]:
peers.add(item)
return (peers, shares)
def _calculate_happiness(self, mappings):
"""
I calculate the happiness of the generated mappings and
create the set self.homeless_shares.
"""
self._happiness = 0
self.homeless_shares = set()
for share in mappings:
if mappings[share] is not None:
self._happiness += 1
else:
self.homeless_shares.add(share)
def _distribute_homeless_shares(self, mappings, shares):
"""
Shares which are not mapped to a peer in the maximum spanning graph
still need to be placed on a server. This function attempts to
distribute those homeless shares as evenly as possible over the
available peers. If possible a share will be placed on the server it was
originally on, signifying the lease should be renewed instead.
"""
# First check to see if the leases can be renewed.
to_distribute = set()
for share in self.homeless_shares:
if share in shares:
for peer in self.servermap:
if share in self.servermap[peer]:
mappings[share] = set([peer])
break
else:
to_distribute.add(share)
# This builds a priority queue of peers with the number of shares
# each peer holds as the priority.
priority = {}
pQueue = PriorityQueue()
for peer in self.peers:
priority.setdefault(peer, 0)
for share in mappings:
if mappings[share] is not None:
for peer in mappings[share]:
if peer in self.peers:
priority[peer] += 1
if priority == {}:
return
for peer in priority:
pQueue.put((priority[peer], peer))
# Distribute the shares to peers with the lowest priority.
for share in to_distribute:
peer = pQueue.get()
mappings[share] = set([peer[1]])
pQueue.put((peer[0]+1, peer[1]))
def _convert_mappings(self, index_to_peer, index_to_share, maximum_graph):
"""
Now that a maximum spanning graph has been found, convert the indexes
back to their original ids so that the client can pass them to the
uploader.
"""
converted_mappings = {}
for share in maximum_graph:
peer = maximum_graph[share]
if peer == None:
converted_mappings.setdefault(index_to_share[share], None)
else:
converted_mappings.setdefault(index_to_share[share], set([index_to_peer[peer]]))
return converted_mappings
def _servermap_flow_graph(self, peers, shares, servermap):
"""
Generates a flow network of peerIndices to shareIndices from a server map
of 'peer' -> ['shares']. According to Wikipedia, "a flow network is a
directed graph where each edge has a capacity and each edge receives a flow.
The amount of flow on an edge cannot exceed the capacity of the edge." This
is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm
converts the problem into a maximum flow problem.
"""
if servermap == {}:
return []
peer_to_index, index_to_peer = self._reindex(peers, 1)
share_to_index, index_to_share = self._reindex(shares, len(peers) + 1)
graph = []
sink_num = len(peers) + len(shares) + 1
graph.append([peer_to_index[peer] for peer in peers])
for peer in peers:
indexedShares = [share_to_index[s] for s in servermap[peer]]
graph.insert(peer_to_index[peer], indexedShares)
for share in shares:
graph.insert(share_to_index[share], [sink_num])
graph.append([])
return graph
def _reindex(self, items, base):
"""
I take an iteratble of items and give each item an index to be used in
the construction of a flow network. Indices for these items start at base
and continue to base + len(items) - 1.
I return two dictionaries: ({item: index}, {index: item})
"""
item_to_index = {}
index_to_item = {}
for item in items:
item_to_index.setdefault(item, base)
index_to_item.setdefault(base, item)
base += 1
return (item_to_index, index_to_item)
def _flow_network(self, peerIndices, shareIndices):
"""
Given set of peerIndices and a set of shareIndices, I create a flow network
to be used by _compute_maximum_graph. The return value is a two
dimensional list in the form of a flow network, where each index represents
a node, and the corresponding list represents all of the nodes it is connected
to.
This function is similar to allmydata.util.happinessutil.flow_network_for, but
we connect every peer with all shares instead of reflecting a supplied servermap.
"""
graph = []
# The first entry in our flow network is the source.
# Connect the source to every server.
graph.append(peerIndices)
sink_num = len(peerIndices + shareIndices) + 1
# Connect every server with every share it can possibly store.
for peerIndex in peerIndices:
graph.insert(peerIndex, shareIndices)
# Connect every share with the sink.
for shareIndex in shareIndices:
graph.insert(shareIndex, [sink_num])
# Add an empty entry for the sink.
graph.append([])
return graph

View File

@ -21,11 +21,12 @@ from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
NoServersError, InsufficientVersionError, UploadUnhappinessError, \
DEFAULT_MAX_SEGMENT_SIZE, IProgress
DEFAULT_MAX_SEGMENT_SIZE, IProgress, IPeerSelector
from allmydata.immutable import layout
from pycryptopp.cipher.aes import AES
from cStringIO import StringIO
from happiness_upload import Happiness_Upload
# this wants to live in storage, not here
@ -201,8 +202,68 @@ class ServerTracker:
def str_shareloc(shnum, bucketwriter):
return "%s: %s" % (shnum, bucketwriter.get_servername(),)
class PeerSelector():
implements(IPeerSelector)
def __init__(self, num_segments, total_shares, needed_shares, servers_of_happiness):
self.num_segments = num_segments
self.total_shares = total_shares
self.needed_shares = needed_shares
self.min_happiness = servers_of_happiness
self.existing_shares = {}
self.confirmed_allocations = {}
self.peers = set()
self.full_peers = set()
self.bad_peers = set()
def add_peer_with_share(self, peerid, shnum):
if peerid in self.existing_shares.keys():
self.existing_shares[peerid].add(shnum)
else:
self.existing_shares[peerid] = set([shnum])
def confirm_share_allocation(self, shnum, peer):
self.confirmed_allocations.setdefault(shnum, set()).add(peer)
def get_allocations(self):
return self.confirmed_allocations
def add_peer(self, peerid):
self.peers.add(peerid)
def mark_full_peer(self, peerid):
self.full_peers.add(peerid)
self.peers.remove(peerid)
def mark_bad_peer(self, peerid):
if peerid in self.peers:
self.peers.remove(peerid)
self.bad_peers.add(peerid)
elif peerid in self.full_peers:
self.full_peers.remove(peerid)
self.bad_peers.add(peerid)
def get_sharemap_of_preexisting_shares(self):
preexisting = dictutil.DictOfSets()
for server, shares in self.existing_shares.iteritems():
for share in shares:
preexisting.add(share, server)
return preexisting
def get_tasks(self):
shares = set(range(self.total_shares))
self.h = Happiness_Upload(self.peers, self.full_peers, shares, self.existing_shares)
return self.h.generate_mappings()
def is_healthy(self):
return self.min_happiness <= self.h.happiness()
class Tahoe2ServerSelector(log.PrefixingLogMixin):
peer_selector_class = PeerSelector
def __init__(self, upload_id, logparent=None, upload_status=None):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
@ -215,6 +276,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
self.log("starting", level=log.OPERATIONAL)
def __repr__(self):
return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
@ -234,6 +296,9 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
if self._status:
self._status.set_status("Contacting Servers..")
self.peer_selector = self.peer_selector_class(num_segments, total_shares,
needed_shares, servers_of_happiness)
self.total_shares = total_shares
self.servers_of_happiness = servers_of_happiness
self.needed_shares = needed_shares
@ -271,9 +336,15 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
v0 = server.get_rref().version
v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
writeable_servers = [server for server in all_servers
candidate_servers = all_servers[:2*total_shares]
for server in candidate_servers:
self.peer_selector.add_peer(server.get_serverid())
writeable_servers = [server for server in candidate_servers
if _get_maxsize(server) >= allocated_size]
readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
readonly_servers = set(candidate_servers) - set(writeable_servers)
for server in readonly_servers:
self.peer_selector.mark_full_peer(server.get_serverid())
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
@ -308,10 +379,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
# second-pass list and repeat the "second" pass (really the third,
# fourth, etc pass), until all shares are assigned, or we've run out
# of potential servers.
self.first_pass_trackers = _make_trackers(writeable_servers)
self.second_pass_trackers = [] # servers worth asking again
self.next_pass_trackers = [] # servers that we have asked again
self._started_second_pass = False
write_trackers = _make_trackers(writeable_servers)
# We don't try to allocate shares to these servers, since they've
# said that they're incapable of storing shares of the size that we'd
@ -337,11 +405,28 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
self.query_count += 1
self.log("asking server %s for any existing shares" %
(tracker.get_name(),), level=log.NOISY)
for tracker in write_trackers:
assert isinstance(tracker, ServerTracker)
d = tracker.query(set())
d.addBoth(self._handle_existing_write_response, tracker, set())
ds.append(d)
self.num_servers_contacted += 1
self.query_count += 1
self.log("asking server %s for any existing shares" %
(tracker.get_name(),), level=log.NOISY)
self.trackers = write_trackers + readonly_trackers
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._loop())
dl.addCallback(lambda ign: self._calculate_tasks())
dl.addCallback(lambda ign: self._request_another_allocation())
return dl
def _calculate_tasks(self):
self.tasks = self.peer_selector.get_tasks()
def _handle_existing_response(self, res, tracker):
"""
I handle responses to the queries sent by
@ -351,6 +436,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
if isinstance(res, failure.Failure):
self.log("%s got error during existing shares check: %s"
% (tracker.get_name(), res), level=log.UNUSUAL)
self.peer_selector.mark_bad_peer(serverid)
self.error_count += 1
self.bad_query_count += 1
else:
@ -361,10 +447,27 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
% (tracker.get_name(), tuple(sorted(buckets))),
level=log.NOISY)
for bucket in buckets:
self.peer_selector.add_peer_with_share(serverid, bucket)
self.preexisting_shares.setdefault(bucket, set()).add(serverid)
self.homeless_shares.discard(bucket)
self.full_count += 1
self.bad_query_count += 1
def _handle_existing_write_response(self, res, tracker, shares_to_ask):
"""
Function handles the response from the write servers
when inquiring about what shares each server already has.
"""
if isinstance(res, failure.Failure):
self.peer_selector.mark_bad_peer(tracker.get_serverid())
self.log("%s got error during server selection: %s" % (tracker, res),
level=log.UNUSUAL)
self.homeless_shares |= shares_to_ask
msg = ("last failure (from %s) was: %s" % (tracker, res))
self.last_failure_msg = msg
else:
(alreadygot, allocated) = res
for share in alreadygot:
self.peer_selector.add_peer_with_share(tracker.get_serverid(), share)
def _get_progress_message(self):
@ -386,12 +489,69 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
self.good_query_count, self.bad_query_count,
self.full_count, self.error_count))
def _get_next_allocation(self):
"""
Return the next share allocation that we need to make.
def _loop(self):
if not self.homeless_shares:
merged = merge_servers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if self.servers_of_happiness <= effective_happiness:
Specifically, I return a tuple (tracker, shares_to_ask), where
tracker is a ServerTracker instance and shares_to_ask is a set of
shares that we should store on that server. If there are no more
allocations to make, I return None.
"""
if len(self.trackers) == 0:
return None
tracker = self.trackers.pop(0)
# TODO: don't pre-convert all serverids to ServerTrackers
assert isinstance(tracker, ServerTracker)
shares_to_ask = set()
servermap = self.tasks
for shnum, tracker_id in servermap.items():
if tracker_id == None:
continue
if tracker.get_serverid() in tracker_id:
shares_to_ask.add(shnum)
if shnum in self.homeless_shares:
self.homeless_shares.remove(shnum)
if self._status:
self._status.set_status("Contacting Servers [%s] (first query),"
" %d shares left.."
% (tracker.get_name(),
len(self.homeless_shares)))
return (tracker, shares_to_ask)
def _request_another_allocation(self):
allocation = self._get_next_allocation()
if allocation is not None:
tracker, shares_to_ask = allocation
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask)
return d
else:
# no more servers. If we haven't placed enough shares, we fail.
merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers)
effective_happiness = servers_of_happiness(self.peer_selector.get_allocations())
if effective_happiness < self.servers_of_happiness:
msg = failure_message(len(self.serverids_with_shares),
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
msg = ("server selection failed for %s: %s (%s), merged=%s" %
(self, msg, self._get_progress_message(),
pretty_print_shnum_to_servers(merged)))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
self.log(msg, level=log.UNUSUAL)
return self._failed(msg)
else:
# we placed enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
"self.use_trackers: %s, self.preexisting_shares: %s") \
% (self, self._get_progress_message(),
@ -401,129 +561,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
for st in self.use_trackers],
pretty_print_shnum_to_servers(self.preexisting_shares))
self.log(msg, level=log.OPERATIONAL)
return (self.use_trackers, self.preexisting_shares)
else:
# We're not okay right now, but maybe we can fix it by
# redistributing some shares. In cases where one or two
# servers has, before the upload, all or most of the
# shares for a given SI, this can work by allowing _loop
# a chance to spread those out over the other servers,
delta = self.servers_of_happiness - effective_happiness
shares = shares_by_server(self.preexisting_shares)
# Each server in shares maps to a set of shares stored on it.
# Since we want to keep at least one share on each server
# that has one (otherwise we'd only be making
# the situation worse by removing distinct servers),
# each server has len(its shares) - 1 to spread around.
shares_to_spread = sum([len(list(sharelist)) - 1
for (server, sharelist)
in shares.items()])
if delta <= len(self.first_pass_trackers) and \
shares_to_spread >= delta:
items = shares.items()
while len(self.homeless_shares) < delta:
# Loop through the allocated shares, removing
# one from each server that has more than one
# and putting it back into self.homeless_shares
# until we've done this delta times.
server, sharelist = items.pop()
if len(sharelist) > 1:
share = sharelist.pop()
self.homeless_shares.add(share)
self.preexisting_shares[share].remove(server)
if not self.preexisting_shares[share]:
del self.preexisting_shares[share]
items.append((server, sharelist))
for writer in self.use_trackers:
writer.abort_some_buckets(self.homeless_shares)
return self._loop()
else:
# Redistribution won't help us; fail.
server_count = len(self.serverids_with_shares)
failmsg = failure_message(server_count,
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
servmsg = servmsgtempl % (
self,
failmsg,
self._get_progress_message(),
pretty_print_shnum_to_servers(merged)
)
self.log(servmsg, level=log.INFREQUENT)
return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
return (self.use_trackers, self.peer_selector.get_sharemap_of_preexisting_shares())
if self.first_pass_trackers:
tracker = self.first_pass_trackers.pop(0)
# TODO: don't pre-convert all serverids to ServerTrackers
assert isinstance(tracker, ServerTracker)
shares_to_ask = set(sorted(self.homeless_shares)[:1])
self.homeless_shares -= shares_to_ask
self.query_count += 1
self.num_servers_contacted += 1
if self._status:
self._status.set_status("Contacting Servers [%s] (first query),"
" %d shares left.."
% (tracker.get_name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
self.second_pass_trackers)
return d
elif self.second_pass_trackers:
# ask a server that we've already asked.
if not self._started_second_pass:
self.log("starting second pass",
level=log.NOISY)
self._started_second_pass = True
num_shares = mathutil.div_ceil(len(self.homeless_shares),
len(self.second_pass_trackers))
tracker = self.second_pass_trackers.pop(0)
shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
self.homeless_shares -= shares_to_ask
self.query_count += 1
if self._status:
self._status.set_status("Contacting Servers [%s] (second query),"
" %d shares left.."
% (tracker.get_name(),
len(self.homeless_shares)))
d = tracker.query(shares_to_ask)
d.addBoth(self._got_response, tracker, shares_to_ask,
self.next_pass_trackers)
return d
elif self.next_pass_trackers:
# we've finished the second-or-later pass. Move all the remaining
# servers back into self.second_pass_trackers for the next pass.
self.second_pass_trackers.extend(self.next_pass_trackers)
self.next_pass_trackers[:] = []
return self._loop()
else:
# no more servers. If we haven't placed enough shares, we fail.
merged = merge_servers(self.preexisting_shares, self.use_trackers)
effective_happiness = servers_of_happiness(merged)
if effective_happiness < self.servers_of_happiness:
msg = failure_message(len(self.serverids_with_shares),
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
msg = ("server selection failed for %s: %s (%s)" %
(self, msg, self._get_progress_message()))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
self.log(msg, level=log.UNUSUAL)
return self._failed(msg)
else:
# we placed enough to be happy, so we're done
if self._status:
self._status.set_status("Placed all shares")
msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
self.log(msg, level=log.OPERATIONAL)
return (self.use_trackers, self.preexisting_shares)
def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
def _got_response(self, res, tracker, shares_to_ask):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
@ -532,9 +573,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
self.error_count += 1
self.bad_query_count += 1
self.homeless_shares |= shares_to_ask
if (self.first_pass_trackers
or self.second_pass_trackers
or self.next_pass_trackers):
if (self.trackers):
# there is still hope, so just loop
pass
else:
@ -553,6 +592,7 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
level=log.NOISY)
progress = False
for s in alreadygot:
self.peer_selector.confirm_share_allocation(s, tracker.get_serverid())
self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
if s in self.homeless_shares:
self.homeless_shares.remove(s)
@ -565,6 +605,8 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
if allocated:
self.use_trackers.add(tracker)
progress = True
for s in allocated:
self.peer_selector.confirm_share_allocation(s, tracker.get_serverid())
if allocated or alreadygot:
self.serverids_with_shares.add(tracker.get_serverid())
@ -595,13 +637,10 @@ class Tahoe2ServerSelector(log.PrefixingLogMixin):
self.homeless_shares |= still_homeless
# Since they were unable to accept all of our requests, so it
# is safe to assume that asking them again won't help.
else:
# if they *were* able to accept everything, they might be
# willing to accept even more.
put_tracker_here.append(tracker)
# now loop
return self._loop()
return self._request_another_allocation()
def _failed(self, msg):

View File

@ -730,6 +730,80 @@ class IReadable(Interface):
download-to-memory consumer.
"""
class IPeerSelector(Interface):
"""
I select peers for an upload, maximizing some measure of health.
I keep track of the state of a grid relative to a file. This means
that I know about all of the peers that parts of that file could be
placed on, and about shares that have been placed on those peers.
Given this, I assign shares to peers in a way that maximizes the
file's health according to whichever definition of health I am
programmed with. I tell the uploader whether or not my assignment is
healthy. I keep track of failures during the process and update my
conclusions appropriately.
"""
def add_peer_with_share(peerid, shnum):
"""
Update my internal state to reflect the fact that peer peerid
holds share shnum. Called for shares that are detected before
peer selection begins.
"""
def confirm_share_allocation(peerid, shnum):
"""
Confirm that an allocated peer=>share pairing has been
successfully established.
"""
def add_peers(peerids=set):
"""
Update my internal state to include the peers in peerids as
potential candidates for storing a file.
"""
def mark_full_peer(peerid):
"""
Mark the peer peerid as full. This means that any
peer-with-share relationships I know about for peerid remain
valid, but that peerid will not be assigned any new shares.
"""
def mark_bad_peer(peerid):
"""
Mark the peer peerid as bad. This is typically called when an
error is encountered when communicating with a peer. I will
disregard any existing peer => share relationships associated
with peerid, and will not attempt to assign it any more shares.
"""
def get_tasks():
"""
Return a tuple of tasks to our caller.
Specifically, return (queries, placements), where queries and
allocations are both lists of things to do. Each query is a
request for our caller to ask a server about the shares it holds
for this upload; the results will be fed back into the
allocator. Each allocation is a request for some share or shares
to be placed on a server. Result may be None, in which case the
selector thinks that the share placement is as reliably or
correctly placed as it can be.
"""
def is_healthy():
"""
I return whether the share assignments I'm currently using
reflect a healthy file, based on my internal definitions.
"""
def needs_recomputation():
"""
I return True if the share assignments I last returned may have
become stale. This is a hint to the caller that they should call
get_share_assignments again.
"""
class IWriteable(Interface):
"""

View File

@ -401,14 +401,14 @@ class BalancingAct(GridTestMixin, unittest.TestCase):
0:[A] 1:[A] 2:[A] 3:[A,B,C,D,E]
4 good shares, but 5 good hosts
After deleting all instances of share #3 and repairing:
0:[A,B], 1:[A,C], 2:[A,D], 3:[E]
Still 4 good shares and 5 good hosts
0:[A], 1:[A,B], 2:[C,A], 3:[E]
Still 4 good shares but now 4 good hosts
"""
d.addCallback(_check_and_repair)
d.addCallback(_check_counts, 4, 5)
d.addCallback(lambda _: self.delete_shares_numbered(self.uri, [3]))
d.addCallback(_check_and_repair)
d.addCallback(_check_counts, 4, 5)
d.addCallback(_check_counts, 4, 4)
d.addCallback(lambda _: [self.g.break_server(sid)
for sid in self.g.get_all_serverids()])
d.addCallback(_check_and_repair)

View File

@ -295,7 +295,7 @@ class DownloadTest(_Base, unittest.TestCase):
# find the shares that were used and delete them
shares = self.n._cnode._node._shares
shnums = sorted([s._shnum for s in shares])
self.failUnlessEqual(shnums, [0,1,2,3])
self.failUnlessEqual(shnums, [2,4,6,7])
# break the RIBucketReader references
# (we don't break the RIStorageServer references, because that
@ -312,7 +312,7 @@ class DownloadTest(_Base, unittest.TestCase):
self.failUnlessEqual("".join(c.chunks), plaintext)
shares = self.n._cnode._node._shares
shnums = sorted([s._shnum for s in shares])
self.failIfEqual(shnums, [0,1,2,3])
self.failIfEqual(shnums, [2,4,6,7])
d.addCallback(_check_failover)
return d
@ -934,13 +934,13 @@ class Corruption(_Base, unittest.TestCase):
log.msg("corrupt %d" % which)
def _corruptor(s, debug=False):
return s[:which] + chr(ord(s[which])^0x01) + s[which+1:]
self.corrupt_shares_numbered(imm_uri, [0], _corruptor)
self.corrupt_shares_numbered(imm_uri, [2], _corruptor)
def _corrupt_set(self, ign, imm_uri, which, newvalue):
log.msg("corrupt %d" % which)
def _corruptor(s, debug=False):
return s[:which] + chr(newvalue) + s[which+1:]
self.corrupt_shares_numbered(imm_uri, [0], _corruptor)
self.corrupt_shares_numbered(imm_uri, [2], _corruptor)
def test_each_byte(self):
# Setting catalog_detection=True performs an exhaustive test of the
@ -976,25 +976,25 @@ class Corruption(_Base, unittest.TestCase):
def _got_data(data):
self.failUnlessEqual(data, plaintext)
shnums = sorted([s._shnum for s in n._cnode._node._shares])
no_sh0 = bool(0 not in shnums)
sh0 = [s for s in n._cnode._node._shares if s._shnum == 0]
sh0_had_corruption = False
if sh0 and sh0[0].had_corruption:
sh0_had_corruption = True
no_sh2 = bool(2 not in shnums)
sh2 = [s for s in n._cnode._node._shares if s._shnum == 2]
sh2_had_corruption = False
if sh2 and sh2[0].had_corruption:
sh2_had_corruption = True
num_needed = len(n._cnode._node._shares)
if self.catalog_detection:
detected = no_sh0 or sh0_had_corruption or (num_needed!=3)
detected = no_sh2 or sh2_had_corruption or (num_needed!=3)
if not detected:
undetected.add(which, 1)
if expected == "no-sh0":
self.failIfIn(0, shnums)
elif expected == "0bad-need-3":
self.failIf(no_sh0)
self.failUnless(sh0[0].had_corruption)
if expected == "no-sh2":
self.failIfIn(2, shnums)
elif expected == "2bad-need-3":
self.failIf(no_sh2)
self.failUnless(sh2[0].had_corruption)
self.failUnlessEqual(num_needed, 3)
elif expected == "need-4th":
self.failIf(no_sh0)
self.failUnless(sh0[0].had_corruption)
self.failIf(no_sh2)
self.failUnless(sh2[0].had_corruption)
self.failIfEqual(num_needed, 3)
d.addCallback(_got_data)
return d
@ -1012,23 +1012,20 @@ class Corruption(_Base, unittest.TestCase):
# data-block-offset, and offset=48 is the first byte of the first
# data-block). Each one also specifies what sort of corruption
# we're expecting to see.
no_sh0_victims = [0,1,2,3] # container version
no_sh2_victims = [0,1,2,3] # container version
need3_victims = [ ] # none currently in this category
# when the offsets are corrupted, the Share will be unable to
# retrieve the data it wants (because it thinks that data lives
# off in the weeds somewhere), and Share treats DataUnavailable
# as abandon-this-share, so in general we'll be forced to look
# for a 4th share.
need_4th_victims = [12,13,14,15, # share version
24,25,26,27, # offset[data]
32,33,34,35, # offset[crypttext_hash_tree]
36,37,38,39, # offset[block_hashes]
44,45,46,47, # offset[UEB]
need_4th_victims = [12,13,14,15, # offset[data]
24,25,26,27, # offset[block_hashes]
]
need_4th_victims.append(48) # block data
need_4th_victims.append(36) # block data
# when corrupting hash trees, we must corrupt a value that isn't
# directly set from somewhere else. Since we download data from
# seg0, corrupt something on its hash chain, like [2] (the
# seg2, corrupt something on its hash chain, like [2] (the
# right-hand child of the root)
need_4th_victims.append(600+2*32) # block_hashes[2]
# Share.loop is pretty conservative: it abandons the share at the
@ -1039,15 +1036,15 @@ class Corruption(_Base, unittest.TestCase):
# the following fields (which are present in multiple shares)
# should fall into the "need3_victims" case instead of the
# "need_4th_victims" case.
need_4th_victims.append(376+2*32) # crypttext_hash_tree[2]
need_4th_victims.append(824) # share_hashes
need_4th_victims.append(994) # UEB length
need_4th_victims.append(998) # UEB
corrupt_me = ([(i,"no-sh0") for i in no_sh0_victims] +
[(i, "0bad-need-3") for i in need3_victims] +
corrupt_me = ([(i,"no-sh2") for i in no_sh2_victims] +
[(i, "2bad-need-3") for i in need3_victims] +
[(i, "need-4th") for i in need_4th_victims])
if self.catalog_detection:
corrupt_me = [(i, "") for i in range(len(self.sh0_orig))]
share_len = len(self.shares.values()[0])
corrupt_me = [(i, "") for i in range(share_len)]
# This is a work around for ticket #2024.
corrupt_me = corrupt_me[0:8]+corrupt_me[12:]
for i,expected in corrupt_me:
# All these tests result in a successful download. What we're
# measuring is how many shares the downloader had to use.
@ -1055,7 +1052,7 @@ class Corruption(_Base, unittest.TestCase):
d.addCallback(_download, imm_uri, i, expected)
d.addCallback(lambda ign: self.restore_all_shares(self.shares))
d.addCallback(fireEventually)
corrupt_values = [(3, 2, "no-sh0"),
corrupt_values = [(3, 2, "no-sh2"),
(15, 2, "need-4th"), # share looks v2
]
for i,newvalue,expected in corrupt_values:
@ -1066,9 +1063,10 @@ class Corruption(_Base, unittest.TestCase):
return d
d.addCallback(_uploaded)
def _show_results(ign):
share_len = len(self.shares.values()[0])
print
print ("of [0:%d], corruption ignored in %s" %
(len(self.sh0_orig), undetected.dump()))
(share_len, undetected.dump()))
if self.catalog_detection:
d.addCallback(_show_results)
# of [0:2070], corruption ignored in len=1133:

View File

@ -233,7 +233,7 @@ class HungServerDownloadTest(GridTestMixin, ShouldFailMixin, PollMixin,
done = []
d = self._set_up(False, "test_5_overdue_immutable")
def _reduce_max_outstanding_requests_and_download(ign):
self._hang_shares(range(5))
self._hang_shares([2, 4, 6, 7, 3])
n = self.c0.create_node_from_uri(self.uri)
n._cnode._maybe_create_download_node()
self._sf = n._cnode._node._sharefinder

View File

@ -707,7 +707,7 @@ class Repairer(GridTestMixin, unittest.TestCase, RepairTestMixin,
def _then(ign):
ss = self.g.servers_by_number[0]
self.g.break_server(ss.my_nodeid, count=1)
self.delete_shares_numbered(self.uri, [9])
self.delete_shares_numbered(self.uri, [8])
return self.c0_filenode.check_and_repair(Monitor())
d.addCallback(_then)
def _check(rr):

View File

@ -11,7 +11,7 @@ import allmydata # for __full_version__
from allmydata import uri, monitor, client
from allmydata.immutable import upload, encode
from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError
from allmydata.util import log, base32
from allmydata.util import log, base32, fileutil
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
from allmydata.test.no_network import GridTestMixin
@ -425,33 +425,13 @@ class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
d.addCallback(_check)
return d
def test_second_error(self):
# we want to make sure we make it to a third pass. This means that
# the first pass was insufficient to place all shares, and at least
# one of second pass servers (other than the last one) accepted a
# share (so we'll believe that a third pass will be useful). (if
# everyone but the last server throws an error, then we'll send all
# the remaining shares to the last server at the end of the second
# pass, and if that succeeds, we won't make it to a third pass).
#
# we can achieve this 97.5% of the time by using 40 servers, having
# 39 of them fail on the second request, leaving only one to succeed
# on the second request. (we need to keep the number of servers low
# enough to ensure a second pass with 100 shares).
mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
self.make_node(mode, 40)
d = upload_data(self.u, DATA)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
return d
def test_second_error_all(self):
self.make_node("second-fail")
d = self.shouldFail(UploadUnhappinessError, "second_error_all",
"server selection failed",
upload_data, self.u, DATA)
def _check((f,)):
self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
# there should also be a 'last failure was' message
self.failUnlessIn("ServerError", str(f.value))
d.addCallback(_check)
@ -515,7 +495,7 @@ class ServerSelection(unittest.TestCase):
for s in self.node.last_servers:
allocated = s.allocated
self.failUnlessEqual(len(allocated), 1)
self.failUnlessEqual(s.queries, 1)
self.failUnlessEqual(s.queries, 2)
d.addCallback(_check)
return d
@ -555,7 +535,7 @@ class ServerSelection(unittest.TestCase):
allocated = s.allocated
self.failUnless(len(allocated) in (1,2), len(allocated))
if len(allocated) == 1:
self.failUnlessEqual(s.queries, 1)
self.failUnlessEqual(s.queries, 2)
got_one.append(s)
else:
self.failUnlessEqual(s.queries, 2)
@ -634,6 +614,21 @@ class ServerSelection(unittest.TestCase):
d.addCallback(_check)
return d
def test_number_of_servers_contacted(self):
# This tests ensures that Tahoe only contacts 2n servers
# during peer selection
self.make_client(40)
self.set_encoding_parameters(3, 7, 10)
data = self.get_data(SIZE_LARGE)
d = upload_data(self.u, data)
def _check(res):
servers_contacted = []
for s in self.node.last_servers:
if(s.queries != 0):
servers_contacted.append(s)
self.failUnless(len(servers_contacted), 20)
d.addCallback(_check)
return d
class StorageIndex(unittest.TestCase):
def test_params_must_matter(self):
@ -1202,7 +1197,6 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
d.addCallback(lambda ign:
self.failUnless(self._has_happy_share_distribution()))
return d
test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
def test_happiness_with_some_readonly_servers(self):
# Try the following layout
@ -1597,7 +1591,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
d.addCallback(_setup)
d.addCallback(lambda c:
self.shouldFail(UploadUnhappinessError, "test_query_counting",
"10 queries placed some shares",
"0 queries placed some shares",
c.upload, upload.Data("data" * 10000,
convergence="")))
# Now try with some readonly servers. We want to make sure that
@ -1650,7 +1644,7 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
d.addCallback(_next)
d.addCallback(lambda c:
self.shouldFail(UploadUnhappinessError, "test_query_counting",
"1 queries placed some shares",
"0 queries placed some shares",
c.upload, upload.Data("data" * 10000,
convergence="")))
return d
@ -1867,44 +1861,6 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
d.addCallback(lambda ign:
self.failUnless(self._has_happy_share_distribution()))
return d
test_problem_layout_comment_187.todo = "this isn't fixed yet"
def test_problem_layout_ticket_1118(self):
# #1118 includes a report from a user who hit an assertion in
# the upload code with this layout.
self.basedir = self.mktemp()
d = self._setup_and_upload(k=2, n=4)
# server 0: no shares
# server 1: shares 0, 3
# server 3: share 1
# server 2: share 2
# The order that they get queries is 0, 1, 3, 2
def _setup(ign):
self._add_server(server_number=0)
self._add_server_with_share(server_number=1, share_number=0)
self._add_server_with_share(server_number=2, share_number=2)
self._add_server_with_share(server_number=3, share_number=1)
# Copy shares
self._copy_share_to_server(3, 1)
self.delete_all_shares(self.get_serverdir(0))
client = self.g.clients[0]
client.encoding_params['happy'] = 4
return client
d.addCallback(_setup)
# Note: actually it should succeed! See
# test_problem_layout_ticket_1128. But ticket 1118 is just to
# make it realize that it has failed, so if it raises
# UploadUnhappinessError then we'll give it the green light
# for now.
d.addCallback(lambda ignored:
self.shouldFail(UploadUnhappinessError,
"test_problem_layout_ticket_1118",
"",
self.g.clients[0].upload, upload.Data("data" * 10000,
convergence="")))
return d
def test_problem_layout_ticket_1128(self):
# #1118 includes a report from a user who hit an assertion in
@ -1936,7 +1892,6 @@ class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
d.addCallback(lambda ign:
self.failUnless(self._has_happy_share_distribution()))
return d
test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
def test_upload_succeeds_with_some_homeless_shares(self):
# If the upload is forced to stop trying to place shares before

View File

@ -126,8 +126,8 @@ def servers_of_happiness(sharemap):
"""
if sharemap == {}:
return 0
sharemap = shares_by_server(sharemap)
graph = flow_network_for(sharemap)
servermap = shares_by_server(sharemap)
graph = flow_network_for(servermap)
# This is an implementation of the Ford-Fulkerson method for finding
# a maximum flow in a flow network applied to a bipartite graph.
# Specifically, it is the Edmonds-Karp algorithm, since it uses a
@ -154,7 +154,7 @@ def servers_of_happiness(sharemap):
flow_function[v][u] -= delta
residual_graph, residual_function = residual_network(graph,
flow_function)
num_servers = len(sharemap)
num_servers = len(servermap)
# The value of a flow is the total flow out of the source vertex
# (vertex 0, in our graph). We could just as well sum across all of
# f[0], but we know that vertex 0 only has edges to the servers in
@ -163,14 +163,14 @@ def servers_of_happiness(sharemap):
# matching on the bipartite graph described above.
return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
def flow_network_for(sharemap):
def flow_network_for(servermap):
"""
I take my argument, a dict of peerid -> set(shareid) mappings, and
turn it into a flow network suitable for use with Edmonds-Karp. I
then return the adjacency list representation of that network.
Specifically, I build G = (V, E), where:
V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
V = { peerid in servermap } U { shareid in servermap } U {s, t}
E = {(s, peerid) for each peerid}
U {(peerid, shareid) if peerid is to store shareid }
U {(shareid, t) for each shareid}
@ -185,16 +185,16 @@ def flow_network_for(sharemap):
# we re-index so that all of our vertices have integral indices, and
# that there aren't any holes. We start indexing at 1, so that we
# can add a source node at index 0.
sharemap, num_shares = reindex(sharemap, base_index=1)
num_servers = len(sharemap)
servermap, num_shares = reindex(servermap, base_index=1)
num_servers = len(servermap)
graph = [] # index -> [index], an adjacency list
# Add an entry at the top (index 0) that has an edge to every server
# in sharemap
graph.append(sharemap.keys())
# in servermap
graph.append(servermap.keys())
# For each server, add an entry that has an edge to every share that it
# contains (or will contain).
for k in sharemap:
graph.append(sharemap[k])
for k in servermap:
graph.append(servermap[k])
# For each share, add an entry that has an edge to the sink.
sink_num = num_servers + num_shares + 1
for i in xrange(num_shares):
@ -203,20 +203,20 @@ def flow_network_for(sharemap):
graph.append([])
return graph
def reindex(sharemap, base_index):
def reindex(servermap, base_index):
"""
Given sharemap, I map peerids and shareids to integers that don't
Given servermap, I map peerids and shareids to integers that don't
conflict with each other, so they're useful as indices in a graph. I
return a sharemap that is reindexed appropriately, and also the
number of distinct shares in the resulting sharemap as a convenience
return a servermap that is reindexed appropriately, and also the
number of distinct shares in the resulting servermap as a convenience
for my caller. base_index tells me where to start indexing.
"""
shares = {} # shareid -> vertex index
num = base_index
ret = {} # peerid -> [shareid], a reindexed sharemap.
ret = {} # peerid -> [shareid], a reindexed servermap.
# Number the servers first
for k in sharemap:
ret[num] = sharemap[k]
for k in servermap:
ret[num] = servermap[k]
num += 1
# Number the shares
for k in ret: