mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
upload: make peer-selection a bit more uniform. Closes #132.
This commit is contained in:
parent
24e6ccddce
commit
808f851589
@ -132,11 +132,12 @@ class FakeBucketWriter:
|
||||
self.closed = True
|
||||
|
||||
class FakeClient:
|
||||
def __init__(self, mode="good"):
|
||||
def __init__(self, mode="good", num_servers=50):
|
||||
self.mode = mode
|
||||
self.num_servers = num_servers
|
||||
def get_permuted_peers(self, storage_index, include_myself):
|
||||
peers = [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
|
||||
for fakeid in range(50) ]
|
||||
for fakeid in range(self.num_servers) ]
|
||||
self.last_peers = [p[2] for p in peers]
|
||||
return peers
|
||||
def get_push_to_ourselves(self):
|
||||
@ -276,8 +277,9 @@ class FullServer(unittest.TestCase):
|
||||
return d
|
||||
|
||||
class PeerSelection(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.node = FakeClient(mode="good")
|
||||
|
||||
def make_client(self, num_servers=50):
|
||||
self.node = FakeClient(mode="good", num_servers=num_servers)
|
||||
self.u = upload.Uploader()
|
||||
self.u.running = True
|
||||
self.u.parent = self.node
|
||||
@ -298,6 +300,7 @@ class PeerSelection(unittest.TestCase):
|
||||
# if we have 50 shares, and there are 50 peers, and they all accept a
|
||||
# share, we should get exactly one share per peer
|
||||
|
||||
self.make_client()
|
||||
data = self.get_data(SIZE_LARGE)
|
||||
self.u.DEFAULT_ENCODING_PARAMETERS = (25, 30, 50)
|
||||
d = self.u.upload_data(data)
|
||||
@ -314,6 +317,7 @@ class PeerSelection(unittest.TestCase):
|
||||
# if we have 100 shares, and there are 50 peers, and they all accept
|
||||
# all shares, we should get exactly two shares per peer
|
||||
|
||||
self.make_client()
|
||||
data = self.get_data(SIZE_LARGE)
|
||||
self.u.DEFAULT_ENCODING_PARAMETERS = (50, 75, 100)
|
||||
d = self.u.upload_data(data)
|
||||
@ -330,6 +334,7 @@ class PeerSelection(unittest.TestCase):
|
||||
# if we have 51 shares, and there are 50 peers, then one peer gets
|
||||
# two shares and the rest get just one
|
||||
|
||||
self.make_client()
|
||||
data = self.get_data(SIZE_LARGE)
|
||||
self.u.DEFAULT_ENCODING_PARAMETERS = (24, 41, 51)
|
||||
d = self.u.upload_data(data)
|
||||
@ -356,6 +361,7 @@ class PeerSelection(unittest.TestCase):
|
||||
# 4 shares. The design goal is to accomplish this with only two
|
||||
# queries per peer.
|
||||
|
||||
self.make_client()
|
||||
data = self.get_data(SIZE_LARGE)
|
||||
self.u.DEFAULT_ENCODING_PARAMETERS = (100, 150, 200)
|
||||
d = self.u.upload_data(data)
|
||||
@ -368,6 +374,25 @@ class PeerSelection(unittest.TestCase):
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
def test_three_of_ten(self):
|
||||
# if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
|
||||
# 4+4+2
|
||||
|
||||
self.make_client(3)
|
||||
data = self.get_data(SIZE_LARGE)
|
||||
self.u.DEFAULT_ENCODING_PARAMETERS = (3, 5, 10)
|
||||
d = self.u.upload_data(data)
|
||||
d.addCallback(self._check_large, SIZE_LARGE)
|
||||
def _check(res):
|
||||
counts = {}
|
||||
for p in self.node.last_peers:
|
||||
allocated = p.ss.allocated
|
||||
counts[len(allocated)] = counts.get(len(allocated), 0) + 1
|
||||
histogram = [counts.get(i, 0) for i in range(5)]
|
||||
self.failUnlessEqual(histogram, [0,0,0,2,1])
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
|
||||
# TODO:
|
||||
# upload with exactly 75 peers (shares_of_happiness)
|
||||
|
@ -124,7 +124,8 @@ class Tahoe2PeerSelector:
|
||||
|
||||
self.homeless_shares = range(total_shares)
|
||||
# self.uncontacted_peers = list() # peers we haven't asked yet
|
||||
self.contacted_peers = ["start"] # peers worth asking again
|
||||
self.contacted_peers = [] # peers worth asking again
|
||||
self.contacted_peers2 = [] # peers that we have asked again
|
||||
self.use_peers = set() # PeerTrackers that have shares assigned to them
|
||||
self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
|
||||
|
||||
@ -189,25 +190,27 @@ class Tahoe2PeerSelector:
|
||||
self.query_count += 1
|
||||
self.num_peers_contacted += 1
|
||||
d = peer.query(shares_to_ask)
|
||||
d.addBoth(self._got_response, peer, shares_to_ask)
|
||||
d.addBoth(self._got_response, peer, shares_to_ask,
|
||||
self.contacted_peers)
|
||||
return d
|
||||
elif len(self.contacted_peers) > 1:
|
||||
elif self.contacted_peers:
|
||||
# ask a peer that we've already asked.
|
||||
num_shares = mathutil.div_ceil(len(self.homeless_shares),
|
||||
len(self.contacted_peers))
|
||||
peer = self.contacted_peers.pop(0)
|
||||
if peer == "start":
|
||||
# we're at the beginning of the list, so re-calculate
|
||||
# shares_per_peer
|
||||
num_shares = mathutil.div_ceil(len(self.homeless_shares),
|
||||
len(self.contacted_peers))
|
||||
self.shares_per_peer = num_shares
|
||||
self.contacted_peers.append("start")
|
||||
peer = self.contacted_peers.pop(0)
|
||||
shares_to_ask = set(self.homeless_shares[:self.shares_per_peer])
|
||||
self.homeless_shares[:self.shares_per_peer] = []
|
||||
shares_to_ask = set(self.homeless_shares[:num_shares])
|
||||
self.homeless_shares[:num_shares] = []
|
||||
self.query_count += 1
|
||||
d = peer.query(shares_to_ask)
|
||||
d.addBoth(self._got_response, peer, shares_to_ask)
|
||||
d.addBoth(self._got_response, peer, shares_to_ask,
|
||||
self.contacted_peers2)
|
||||
return d
|
||||
elif self.contacted_peers2:
|
||||
# we've finished the second-or-later pass. Move all the remaining
|
||||
# peers back into self.contacted_peers for the next pass.
|
||||
self.contacted_peers.extend(self.contacted_peers2)
|
||||
self.contacted_peers[:] = []
|
||||
return self._loop()
|
||||
else:
|
||||
# no more peers. If we haven't placed enough shares, we fail.
|
||||
placed_shares = self.total_shares - len(self.homeless_shares)
|
||||
@ -230,14 +233,16 @@ class Tahoe2PeerSelector:
|
||||
# we placed enough to be happy, so we're done
|
||||
return self.use_peers
|
||||
|
||||
def _got_response(self, res, peer, shares_to_ask):
|
||||
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
|
||||
if isinstance(res, failure.Failure):
|
||||
# This is unusual, and probably indicates a bug or a network
|
||||
# problem.
|
||||
log.msg("%s got error during peer selection: %s" % (peer, res))
|
||||
self.error_count += 1
|
||||
self.homeless_shares = list(shares_to_ask) + self.homeless_shares
|
||||
if self.uncontacted_peers or len(self.contacted_peers) > 1:
|
||||
if (self.uncontacted_peers
|
||||
or self.contacted_peers
|
||||
or self.contacted_peers2):
|
||||
# there is still hope, so just loop
|
||||
pass
|
||||
else:
|
||||
@ -290,7 +295,7 @@ class Tahoe2PeerSelector:
|
||||
else:
|
||||
# if they *were* able to accept everything, they might be
|
||||
# willing to accept even more.
|
||||
self.contacted_peers.append(peer)
|
||||
put_peer_here.append(peer)
|
||||
|
||||
# now loop
|
||||
return self._loop()
|
||||
|
Loading…
x
Reference in New Issue
Block a user