Fix up the behavior of #778, per reviewers' comments

- Make some important utility functions clearer and more thoroughly 
    documented.
  - Assert in upload.servers_of_happiness that the buckets attributes
    of PeerTrackers passed to it are mutually disjoint.
  - Get rid of some silly non-Pythonisms that I didn't see when I first
    wrote these patches.
  - Make sure that should_add_server returns true when queried about a 
    shnum that it doesn't know about yet.
  - Change Tahoe2PeerSelector.preexisting_shares to map a shareid to a set
    of peerids, alter dependencies to deal with that.
  - Remove upload.should_add_servers, because it is no longer necessary
  - Move upload.shares_of_happiness and upload.shares_by_server to a utility
    file.
  - Change some points in Tahoe2PeerSelector.
  - Compute servers_of_happiness using a bipartite matching algorithm that 
    we know is optimal instead of an ad-hoc greedy algorithm that isn't.
  - Change servers_of_happiness to just take a sharemap as an argument,
    change its callers to merge existing_shares and used_peers before 
    calling it.
  - Change an error message in the encoder to be more appropriate for 
    servers of happiness.
  - Clarify the wording of an error message in immutable/upload.py
  - Refactor a happiness failure message to happinessutil.py, and make
    immutable/upload.py and immutable/encode.py use it.
  - Move the word "only" as far to the right as possible in failure 
    messages.
  - Use a better definition of progress during peer selection.
  - Do read-only peer share detection queries in parallel, not sequentially.
  - Clean up logging semantics; print the query statistics whenever an
    upload is unsuccessful, not just in one case.
This commit is contained in:
Kevan Carstensen 2010-05-13 17:49:17 -07:00
parent 9bc71d3da0
commit e225f573b9
5 changed files with 436 additions and 180 deletions

View File

@ -7,7 +7,7 @@ from foolscap.api import fireEventually
from allmydata import uri
from allmydata.storage.server import si_b2a
from allmydata.hashtree import HashTree
from allmydata.util import mathutil, hashutil, base32, log
from allmydata.util import mathutil, hashutil, base32, log, happinessutil
from allmydata.util.assertutil import _assert, precondition
from allmydata.codec import CRSEncoder
from allmydata.interfaces import IEncoder, IStorageBucketWriter, \
@ -198,6 +198,8 @@ class Encoder(object):
assert IStorageBucketWriter.providedBy(landlords[k])
self.landlords = landlords.copy()
assert isinstance(servermap, dict)
for v in servermap.itervalues():
assert isinstance(v, set)
self.servermap = servermap.copy()
def start(self):
@ -484,26 +486,33 @@ class Encoder(object):
level=log.UNUSUAL, failure=why)
if shareid in self.landlords:
self.landlords[shareid].abort()
peerid = self.landlords[shareid].get_peerid()
assert peerid
del self.landlords[shareid]
self.servermap[shareid].remove(peerid)
if not self.servermap[shareid]:
del self.servermap[shareid]
else:
# even more UNUSUAL
self.log("they weren't in our list of landlords", parent=ln,
level=log.WEIRD, umid="TQGFRw")
del(self.servermap[shareid])
servers_left = list(set(self.servermap.values()))
if len(servers_left) < self.servers_of_happiness:
msg = "lost too many servers during upload (still have %d, want %d): %s" % \
(len(servers_left),
self.servers_of_happiness, why)
happiness = happinessutil.servers_of_happiness(self.servermap)
if happiness < self.servers_of_happiness:
peerids = set(happinessutil.shares_by_server(self.servermap).keys())
msg = happinessutil.failure_message(len(peerids),
self.required_shares,
self.servers_of_happiness,
happiness)
msg = "%s: %s" % (msg, why)
raise UploadUnhappinessError(msg)
self.log("but we can still continue with %s shares, we'll be happy "
"with at least %s" % (len(servers_left),
"with at least %s" % (happiness,
self.servers_of_happiness),
parent=ln)
def _gather_responses(self, dl):
d = defer.DeferredList(dl, fireOnOneErrback=True)
def _eatNotEnoughSharesError(f):
def _eatUploadUnhappinessError(f):
# all exceptions that occur while talking to a peer are handled
# in _remove_shareholder. That might raise UploadUnhappinessError,
# which will cause the DeferredList to errback but which should
@ -513,7 +522,7 @@ class Encoder(object):
f.trap(UploadUnhappinessError)
return None
for d0 in dl:
d0.addErrback(_eatNotEnoughSharesError)
d0.addErrback(_eatUploadUnhappinessError)
return d
def finish_hashing(self):

View File

@ -242,6 +242,12 @@ class WriteBucketProxy:
def abort(self):
return self._rref.callRemoteOnly("abort")
def get_peerid(self):
if self._nodeid:
return self._nodeid
return None
class WriteBucketProxy_v2(WriteBucketProxy):
fieldsize = 8
fieldstruct = ">Q"

View File

@ -13,6 +13,9 @@ from allmydata import hashtree, uri
from allmydata.storage.server import si_b2a
from allmydata.immutable import encode
from allmydata.util import base32, dictutil, idlib, log, mathutil
from allmydata.util.happinessutil import servers_of_happiness, \
shares_by_server, merge_peers, \
failure_message
from allmydata.util.assertutil import precondition
from allmydata.util.rrefutil import add_version_to_remote_reference
from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
@ -113,10 +116,9 @@ class PeerTracker:
d.addCallback(self._got_reply)
return d
def query_allocated(self):
d = self._storageserver.callRemote("get_buckets",
self.storage_index)
return d
def ask_about_existing_shares(self):
return self._storageserver.callRemote("get_buckets",
self.storage_index)
def _got_reply(self, (alreadygot, buckets)):
#log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
@ -132,52 +134,6 @@ class PeerTracker:
self.buckets.update(b)
return (alreadygot, set(b.keys()))
def servers_with_unique_shares(existing_shares, used_peers=None):
"""
I accept a dict of shareid -> peerid mappings (and optionally a list
of PeerTracker instances) and return a list of servers that have shares.
"""
servers = []
existing_shares = existing_shares.copy()
if used_peers:
peerdict = {}
for peer in used_peers:
peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
for k in peerdict.keys():
if existing_shares.has_key(k):
# Prevent overcounting; favor the bucket, and not the
# prexisting share.
del(existing_shares[k])
peers = list(used_peers.copy())
# We do this because the preexisting shares list goes by peerid.
peers = [x.peerid for x in peers]
servers.extend(peers)
servers.extend(existing_shares.values())
return list(set(servers))
def shares_by_server(existing_shares):
"""
I accept a dict of shareid -> peerid mappings, and return a dict
of peerid -> shareid mappings
"""
servers = {}
for server in set(existing_shares.values()):
servers[server] = set([x for x in existing_shares.keys()
if existing_shares[x] == server])
return servers
def should_add_server(existing_shares, server, bucket):
"""
I tell my caller whether the servers_of_happiness number will be
increased or decreased if a particular server is added as the peer
already holding a particular share. I take a dictionary, a peerid,
and a bucket as arguments, and return a boolean.
"""
old_size = len(servers_with_unique_shares(existing_shares))
new_candidate = existing_shares.copy()
new_candidate[bucket] = server
new_size = len(servers_with_unique_shares(new_candidate))
return old_size < new_size
class Tahoe2PeerSelector:
@ -203,8 +159,8 @@ class Tahoe2PeerSelector:
@return: (used_peers, already_peers), where used_peers is a set of
PeerTracker instances that have agreed to hold some shares
for us (the shnum is stashed inside the PeerTracker),
and already_peers is a dict mapping shnum to a peer
which claims to already have the share.
and already_peers is a dict mapping shnum to a set of peers
which claim to already have the share.
"""
if self._status:
@ -215,25 +171,21 @@ class Tahoe2PeerSelector:
self.needed_shares = needed_shares
self.homeless_shares = range(total_shares)
# self.uncontacted_peers = list() # peers we haven't asked yet
self.contacted_peers = [] # peers worth asking again
self.contacted_peers2 = [] # peers that we have asked again
self._started_second_pass = False
self.use_peers = set() # PeerTrackers that have shares assigned to them
self.preexisting_shares = {} # sharenum -> peerid holding the share
# We don't try to allocate shares to these servers, since they've
# said that they're incapable of storing shares of the size that
# we'd want to store. We keep them around because they may have
# existing shares for this storage index, which we want to know
# about for accurate servers_of_happiness accounting
self.readonly_peers = []
# These peers have shares -- any shares -- for our SI. We keep track
# of these to write an error message with them later.
self.peers_with_shares = []
peers = storage_broker.get_servers_for_index(storage_index)
if not peers:
raise NoServersError("client gave us zero peers")
self.preexisting_shares = {} # shareid => set(peerids) holding shareid
# We don't try to allocate shares to these servers, since they've said
# that they're incapable of storing shares of the size that we'd want
# to store. We keep them around because they may have existing shares
# for this storage index, which we want to know about for accurate
# servers_of_happiness accounting
# (this is eventually a list, but it is initialized later)
self.readonly_peers = None
# These peers have shares -- any shares -- for our SI. We keep
# track of these to write an error message with them later.
self.peers_with_shares = set()
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
@ -247,6 +199,9 @@ class Tahoe2PeerSelector:
num_share_hashes, EXTENSION_SIZE,
None)
allocated_size = wbp.get_allocated_size()
all_peers = storage_broker.get_servers_for_index(storage_index)
if not all_peers:
raise NoServersError("client gave us zero peers")
# filter the list of peers according to which ones can accomodate
# this request. This excludes older peers (which used a 4-byte size
@ -256,10 +211,9 @@ class Tahoe2PeerSelector:
(peerid, conn) = peer
v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
return v1["maximum-immutable-share-size"]
new_peers = [peer for peer in peers
if _get_maxsize(peer) >= allocated_size]
old_peers = list(set(peers).difference(set(new_peers)))
peers = new_peers
writable_peers = [peer for peer in all_peers
if _get_maxsize(peer) >= allocated_size]
readonly_peers = set(all_peers[:2*total_shares]) - set(writable_peers)
# decide upon the renewal/cancel secrets, to include them in the
# allocate_buckets query.
@ -271,41 +225,46 @@ class Tahoe2PeerSelector:
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
def _make_trackers(peers):
return [ PeerTracker(peerid, conn,
share_size, block_size,
num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret_hash(file_renewal_secret,
peerid),
bucket_cancel_secret_hash(file_cancel_secret,
peerid))
return [PeerTracker(peerid, conn,
share_size, block_size,
num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret_hash(file_renewal_secret,
peerid),
bucket_cancel_secret_hash(file_cancel_secret,
peerid))
for (peerid, conn) in peers]
self.uncontacted_peers = _make_trackers(peers)
self.readonly_peers = _make_trackers(old_peers)
# Talk to the readonly servers to get an idea of what servers
# have what shares (if any) for this storage index
d = defer.maybeDeferred(self._existing_shares)
d.addCallback(lambda ign: self._loop())
return d
def _existing_shares(self):
if self.readonly_peers:
peer = self.readonly_peers.pop()
self.uncontacted_peers = _make_trackers(writable_peers)
self.readonly_peers = _make_trackers(readonly_peers)
# We now ask peers that can't hold any new shares about existing
# shares that they might have for our SI. Once this is done, we
# start placing the shares that we haven't already accounted
# for.
ds = []
if self._status and self.readonly_peers:
self._status.set_status("Contacting readonly peers to find "
"any existing shares")
for peer in self.readonly_peers:
assert isinstance(peer, PeerTracker)
d = peer.query_allocated()
d = peer.ask_about_existing_shares()
d.addBoth(self._handle_existing_response, peer.peerid)
ds.append(d)
self.num_peers_contacted += 1
self.query_count += 1
log.msg("asking peer %s for any existing shares for upload id %s"
log.msg("asking peer %s for any existing shares for "
"upload id %s"
% (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
level=log.NOISY, parent=self._log_parent)
if self._status:
self._status.set_status("Contacting Peer %s to find "
"any existing shares"
% idlib.shortnodeid_b2a(peer.peerid))
return d
dl = defer.DeferredList(ds)
dl.addCallback(lambda ign: self._loop())
return dl
def _handle_existing_response(self, res, peer):
"""
I handle responses to the queries sent by
Tahoe2PeerSelector._existing_shares.
"""
if isinstance(res, failure.Failure):
log.msg("%s got error during existing shares check: %s"
% (idlib.shortnodeid_b2a(peer), res),
@ -315,18 +274,17 @@ class Tahoe2PeerSelector:
else:
buckets = res
if buckets:
self.peers_with_shares.append(peer)
self.peers_with_shares.add(peer)
log.msg("response from peer %s: alreadygot=%s"
% (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
level=log.NOISY, parent=self._log_parent)
for bucket in buckets:
if should_add_server(self.preexisting_shares, peer, bucket):
self.preexisting_shares[bucket] = peer
if self.homeless_shares and bucket in self.homeless_shares:
self.homeless_shares.remove(bucket)
self.preexisting_shares.setdefault(bucket, set()).add(peer)
if self.homeless_shares and bucket in self.homeless_shares:
self.homeless_shares.remove(bucket)
self.full_count += 1
self.bad_query_count += 1
return self._existing_shares()
def _get_progress_message(self):
if not self.homeless_shares:
@ -350,16 +308,20 @@ class Tahoe2PeerSelector:
def _loop(self):
if not self.homeless_shares:
effective_happiness = servers_with_unique_shares(
self.preexisting_shares,
self.use_peers)
if self.servers_of_happiness <= len(effective_happiness):
merged = merge_peers(self.preexisting_shares, self.use_peers)
effective_happiness = servers_of_happiness(merged)
if self.servers_of_happiness <= effective_happiness:
msg = ("peer selection successful for %s: %s" % (self,
self._get_progress_message()))
log.msg(msg, parent=self._log_parent)
return (self.use_peers, self.preexisting_shares)
else:
delta = self.servers_of_happiness - len(effective_happiness)
# We're not okay right now, but maybe we can fix it by
# redistributing some shares. In cases where one or two
# servers has, before the upload, all or most of the
# shares for a given SI, this can work by allowing _loop
# a chance to spread those out over the other peers,
delta = self.servers_of_happiness - effective_happiness
shares = shares_by_server(self.preexisting_shares)
# Each server in shares maps to a set of shares stored on it.
# Since we want to keep at least one share on each server
@ -371,60 +333,32 @@ class Tahoe2PeerSelector:
in shares.items()])
if delta <= len(self.uncontacted_peers) and \
shares_to_spread >= delta:
# Loop through the allocated shares, removing
# one from each server that has more than one and putting
# it back into self.homeless_shares until we've done
# this delta times.
items = shares.items()
while len(self.homeless_shares) < delta:
servernum, sharelist = items.pop()
# Loop through the allocated shares, removing
# one from each server that has more than one
# and putting it back into self.homeless_shares
# until we've done this delta times.
server, sharelist = items.pop()
if len(sharelist) > 1:
share = sharelist.pop()
self.homeless_shares.append(share)
del(self.preexisting_shares[share])
items.append((servernum, sharelist))
self.preexisting_shares[share].remove(server)
if not self.preexisting_shares[share]:
del self.preexisting_shares[share]
items.append((server, sharelist))
return self._loop()
else:
peer_count = len(list(set(self.peers_with_shares)))
# Redistribution won't help us; fail.
peer_count = len(self.peers_with_shares)
# If peer_count < needed_shares, then the second error
# message is nonsensical, so we use this one.
if peer_count < self.needed_shares:
msg = ("shares could only be placed or found on %d "
"server(s). "
"We were asked to place shares on at least %d "
"server(s) such that any %d of them have "
"enough shares to recover the file." %
(peer_count,
self.servers_of_happiness,
self.needed_shares))
# Otherwise, if we've placed on at least needed_shares
# peers, but there isn't an x-happy subset of those peers
# for x < needed_shares, we use this error message.
elif len(effective_happiness) < self.needed_shares:
msg = ("shares could be placed or found on %d "
"server(s), but they are not spread out evenly "
"enough to ensure that any %d of these servers "
"would have enough shares to recover the file. "
"We were asked to place "
"shares on at least %d servers such that any "
"%d of them have enough shares to recover the "
"file." %
(peer_count,
self.needed_shares,
self.servers_of_happiness,
self.needed_shares))
# Otherwise, if there is an x-happy subset of peers where
# x >= needed_shares, but x < shares_of_happiness, then
# we use this message.
else:
msg = ("shares could only be placed on %d server(s) "
"such that any %d of them have enough shares "
"to recover the file, but we were asked to use "
"at least %d such servers." %
(len(effective_happiness),
self.needed_shares,
self.servers_of_happiness))
raise UploadUnhappinessError(msg)
msg = failure_message(peer_count,
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
raise UploadUnhappinessError("%s (%s)" % (msg,
self._get_progress_message()))
if self.uncontacted_peers:
peer = self.uncontacted_peers.pop(0)
@ -473,11 +407,15 @@ class Tahoe2PeerSelector:
else:
# no more peers. If we haven't placed enough shares, we fail.
placed_shares = self.total_shares - len(self.homeless_shares)
effective_happiness = servers_with_unique_shares(
self.preexisting_shares,
self.use_peers)
if len(effective_happiness) < self.servers_of_happiness:
msg = ("peer selection failed for %s: %s" % (self,
merged = merge_peers(self.preexisting_shares, self.use_peers)
effective_happiness = servers_of_happiness(merged)
if effective_happiness < self.servers_of_happiness:
msg = failure_message(len(self.peers_with_shares),
self.needed_shares,
self.servers_of_happiness,
effective_happiness)
msg = ("peer selection failed for %s: %s (%s)" % (self,
msg,
self._get_progress_message()))
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
@ -519,11 +457,12 @@ class Tahoe2PeerSelector:
level=log.NOISY, parent=self._log_parent)
progress = False
for s in alreadygot:
if should_add_server(self.preexisting_shares,
peer.peerid, s):
self.preexisting_shares[s] = peer.peerid
if s in self.homeless_shares:
self.homeless_shares.remove(s)
self.preexisting_shares.setdefault(s, set()).add(peer.peerid)
if s in self.homeless_shares:
self.homeless_shares.remove(s)
progress = True
elif s in shares_to_ask:
progress = True
# the PeerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.
@ -532,14 +471,16 @@ class Tahoe2PeerSelector:
progress = True
if allocated or alreadygot:
self.peers_with_shares.append(peer.peerid)
self.peers_with_shares.add(peer.peerid)
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
if progress:
# they accepted or already had at least one share, so
# progress has been made
# They accepted at least one of the shares that we asked
# them to accept, or they had a share that we didn't ask
# them to accept but that we hadn't placed yet, so this
# was a productive query
self.good_query_count += 1
else:
self.bad_query_count += 1
@ -938,8 +879,8 @@ class CHKUploader:
def set_shareholders(self, (used_peers, already_peers), encoder):
"""
@param used_peers: a sequence of PeerTracker objects
@paran already_peers: a dict mapping sharenum to a peerid that
claims to already have this share
@paran already_peers: a dict mapping sharenum to a set of peerids
that claim to already have this share
"""
self.log("_send_shares, used_peers is %s" % (used_peers,))
# record already-present shares in self._results
@ -954,7 +895,7 @@ class CHKUploader:
buckets.update(peer.buckets)
for shnum in peer.buckets:
self._peer_trackers[shnum] = peer
servermap[shnum] = peer.peerid
servermap.setdefault(shnum, set()).add(peer.peerid)
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
encoder.set_shareholders(buckets, servermap)

View File

@ -1345,7 +1345,8 @@ class IEncoder(Interface):
must be a dictionary that maps share number (an integer ranging from
0 to n-1) to an instance that provides IStorageBucketWriter.
'servermap' is a dictionary that maps share number (as defined above)
to a peerid. This must be performed before start() can be called."""
to a set of peerids. This must be performed before start() can be
called."""
def start():
"""Begin the encode/upload process. This involves reading encrypted

View File

@ -0,0 +1,299 @@
"""
I contain utilities useful for calculating servers_of_happiness, and for
reporting it in messages
"""
def failure_message(peer_count, k, happy, effective_happy):
# If peer_count < needed_shares, this error message makes more
# sense than any of the others, so use it.
if peer_count < k:
msg = ("shares could be placed or found on only %d "
"server(s). "
"We were asked to place shares on at least %d "
"server(s) such that any %d of them have "
"enough shares to recover the file." %
(peer_count, happy, k))
# Otherwise, if we've placed on at least needed_shares
# peers, but there isn't an x-happy subset of those peers
# for x >= needed_shares, we use this error message.
elif effective_happy < k:
msg = ("shares could be placed or found on %d "
"server(s), but they are not spread out evenly "
"enough to ensure that any %d of these servers "
"would have enough shares to recover the file. "
"We were asked to place "
"shares on at least %d servers such that any "
"%d of them have enough shares to recover the "
"file." %
(peer_count, k, happy, k))
# Otherwise, if there is an x-happy subset of peers where
# x >= needed_shares, but x < servers_of_happiness, then
# we use this message.
else:
msg = ("shares could be placed on only %d server(s) "
"such that any %d of them have enough shares "
"to recover the file, but we were asked to "
"place shares on at least %d such servers." %
(effective_happy, k, happy))
return msg
def shares_by_server(servermap):
"""
I accept a dict of shareid -> set(peerid) mappings, and return a
dict of peerid -> set(shareid) mappings. My argument is a dictionary
with sets of peers, indexed by shares, and I transform that into a
dictionary of sets of shares, indexed by peerids.
"""
ret = {}
for shareid, peers in servermap.iteritems():
assert isinstance(peers, set)
for peerid in peers:
ret.setdefault(peerid, set()).add(shareid)
return ret
def merge_peers(servermap, used_peers=None):
"""
I accept a dict of shareid -> set(peerid) mappings, and optionally a
set of PeerTrackers. If no set of PeerTrackers is provided, I return
my first argument unmodified. Otherwise, I update a copy of my first
argument to include the shareid -> peerid mappings implied in the
set of PeerTrackers, returning the resulting dict.
"""
if not used_peers:
return servermap
assert(isinstance(servermap, dict))
assert(isinstance(used_peers, set))
# Since we mutate servermap, and are called outside of a
# context where it is okay to do that, make a copy of servermap and
# work with it.
servermap = servermap.copy()
for peer in used_peers:
for shnum in peer.buckets:
servermap.setdefault(shnum, set()).add(peer.peerid)
return servermap
def servers_of_happiness(sharemap):
"""
I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
return the 'servers_of_happiness' number that sharemap results in.
To calculate the 'servers_of_happiness' number for the sharemap, I
construct a bipartite graph with servers in one partition of vertices
and shares in the other, and with an edge between a server s and a share t
if s is to store t. I then compute the size of a maximum matching in
the resulting graph; this is then returned as the 'servers_of_happiness'
for my arguments.
For example, consider the following layout:
server 1: shares 1, 2, 3, 4
server 2: share 6
server 3: share 3
server 4: share 4
server 5: share 2
From this, we can construct the following graph:
L = {server 1, server 2, server 3, server 4, server 5}
R = {share 1, share 2, share 3, share 4, share 6}
V = L U R
E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
(server 1, share 4), (server 2, share 6), (server 3, share 3),
(server 4, share 4), (server 5, share 2)}
G = (V, E)
Note that G is bipartite since every edge in e has one endpoint in L
and one endpoint in R.
A matching in a graph G is a subset M of E such that, for any vertex
v in V, v is incident to at most one edge of M. A maximum matching
in G is a matching that is no smaller than any other matching. For
this graph, a matching of cardinality 5 is:
M = {(server 1, share 1), (server 2, share 6),
(server 3, share 3), (server 4, share 4),
(server 5, share 2)}
Since G is bipartite, and since |L| = 5, we cannot have an M' such
that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
as long as k <= 5, we can see that the layout above has
servers_of_happiness = 5, which matches the results here.
"""
if sharemap == {}:
return 0
sharemap = shares_by_server(sharemap)
graph = flow_network_for(sharemap)
# This is an implementation of the Ford-Fulkerson method for finding
# a maximum flow in a flow network applied to a bipartite graph.
# Specifically, it is the Edmonds-Karp algorithm, since it uses a
# BFS to find the shortest augmenting path at each iteration, if one
# exists.
#
# The implementation here is an adapation of an algorithm described in
# "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
dim = len(graph)
flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
residual_graph, residual_function = residual_network(graph, flow_function)
while augmenting_path_for(residual_graph):
path = augmenting_path_for(residual_graph)
# Delta is the largest amount that we can increase flow across
# all of the edges in path. Because of the way that the residual
# function is constructed, f[u][v] for a particular edge (u, v)
# is the amount of unused capacity on that edge. Taking the
# minimum of a list of those values for each edge in the
# augmenting path gives us our delta.
delta = min(map(lambda (u, v): residual_function[u][v], path))
for (u, v) in path:
flow_function[u][v] += delta
flow_function[v][u] -= delta
residual_graph, residual_function = residual_network(graph,
flow_function)
num_servers = len(sharemap)
# The value of a flow is the total flow out of the source vertex
# (vertex 0, in our graph). We could just as well sum across all of
# f[0], but we know that vertex 0 only has edges to the servers in
# our graph, so we can stop after summing flow across those. The
# value of a flow computed in this way is the size of a maximum
# matching on the bipartite graph described above.
return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
def flow_network_for(sharemap):
"""
I take my argument, a dict of peerid -> set(shareid) mappings, and
turn it into a flow network suitable for use with Edmonds-Karp. I
then return the adjacency list representation of that network.
Specifically, I build G = (V, E), where:
V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
E = {(s, peerid) for each peerid}
U {(peerid, shareid) if peerid is to store shareid }
U {(shareid, t) for each shareid}
s and t will be source and sink nodes when my caller starts treating
the graph I return like a flow network. Without s and t, the
returned graph is bipartite.
"""
# Servers don't have integral identifiers, and we can't make any
# assumptions about the way shares are indexed -- it's possible that
# there are missing shares, for example. So before making a graph,
# we re-index so that all of our vertices have integral indices, and
# that there aren't any holes. We start indexing at 1, so that we
# can add a source node at index 0.
sharemap, num_shares = reindex(sharemap, base_index=1)
num_servers = len(sharemap)
graph = [] # index -> [index], an adjacency list
# Add an entry at the top (index 0) that has an edge to every server
# in sharemap
graph.append(sharemap.keys())
# For each server, add an entry that has an edge to every share that it
# contains (or will contain).
for k in sharemap:
graph.append(sharemap[k])
# For each share, add an entry that has an edge to the sink.
sink_num = num_servers + num_shares + 1
for i in xrange(num_shares):
graph.append([sink_num])
# Add an empty entry for the sink, which has no outbound edges.
graph.append([])
return graph
def reindex(sharemap, base_index):
"""
Given sharemap, I map peerids and shareids to integers that don't
conflict with each other, so they're useful as indices in a graph. I
return a sharemap that is reindexed appropriately, and also the
number of distinct shares in the resulting sharemap as a convenience
for my caller. base_index tells me where to start indexing.
"""
shares = {} # shareid -> vertex index
num = base_index
ret = {} # peerid -> [shareid], a reindexed sharemap.
# Number the servers first
for k in sharemap:
ret[num] = sharemap[k]
num += 1
# Number the shares
for k in ret:
for shnum in ret[k]:
if not shares.has_key(shnum):
shares[shnum] = num
num += 1
ret[k] = map(lambda x: shares[x], ret[k])
return (ret, len(shares))
def residual_network(graph, f):
"""
I return the residual network and residual capacity function of the
flow network represented by my graph and f arguments. graph is a
flow network in adjacency-list form, and f is a flow in graph.
"""
new_graph = [[] for i in xrange(len(graph))]
cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
for i in xrange(len(graph)):
for v in graph[i]:
if f[i][v] == 1:
# We add an edge (v, i) with cf[v,i] = 1. This means
# that we can remove 1 unit of flow from the edge (i, v)
new_graph[v].append(i)
cf[v][i] = 1
cf[i][v] = -1
else:
# We add the edge (i, v), since we're not using it right
# now.
new_graph[i].append(v)
cf[i][v] = 1
cf[v][i] = -1
return (new_graph, cf)
def augmenting_path_for(graph):
"""
I return an augmenting path, if there is one, from the source node
to the sink node in the flow network represented by my graph argument.
If there is no augmenting path, I return False. I assume that the
source node is at index 0 of graph, and the sink node is at the last
index. I also assume that graph is a flow network in adjacency list
form.
"""
bfs_tree = bfs(graph, 0)
if bfs_tree[len(graph) - 1]:
n = len(graph) - 1
path = [] # [(u, v)], where u and v are vertices in the graph
while n != 0:
path.insert(0, (bfs_tree[n], n))
n = bfs_tree[n]
return path
return False
def bfs(graph, s):
"""
Perform a BFS on graph starting at s, where graph is a graph in
adjacency list form, and s is a node in graph. I return the
predecessor table that the BFS generates.
"""
# This is an adaptation of the BFS described in "Introduction to
# Algorithms", Cormen et al, 2nd ed., p. 532.
# WHITE vertices are those that we haven't seen or explored yet.
WHITE = 0
# GRAY vertices are those we have seen, but haven't explored yet
GRAY = 1
# BLACK vertices are those we have seen and explored
BLACK = 2
color = [WHITE for i in xrange(len(graph))]
predecessor = [None for i in xrange(len(graph))]
distance = [-1 for i in xrange(len(graph))]
queue = [s] # vertices that we haven't explored yet.
color[s] = GRAY
distance[s] = 0
while queue:
n = queue.pop(0)
for v in graph[n]:
if color[v] == WHITE:
color[v] = GRAY
distance[v] = distance[n] + 1
predecessor[v] = n
queue.append(v)
color[n] = BLACK
return predecessor