mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-08 11:24:25 +00:00
upload: remove Tahoe3 peer-selection algorithm
This commit is contained in:
parent
baa16087cd
commit
8a251d8670
@ -16,7 +16,6 @@ from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable
|
||||
from allmydata.Crypto.Cipher import AES
|
||||
|
||||
from cStringIO import StringIO
|
||||
import collections, random
|
||||
|
||||
|
||||
class HaveAllPeersError(Exception):
|
||||
@ -99,182 +98,6 @@ class PeerTracker:
|
||||
self.buckets.update(b)
|
||||
return (alreadygot, set(b.keys()))
|
||||
|
||||
class Tahoe3PeerSelector:
|
||||
|
||||
def __init__(self, upload_id):
|
||||
self.upload_id = upload_id
|
||||
|
||||
def __repr__(self):
|
||||
return "<Tahoe3PeerSelector for upload %s>" % self.upload_id
|
||||
|
||||
def get_shareholders(self, client,
|
||||
storage_index, share_size, block_size,
|
||||
num_segments, total_shares, shares_of_happiness,
|
||||
push_to_ourselves):
|
||||
"""
|
||||
@return: a set of PeerTracker instances that have agreed to hold some
|
||||
shares for us
|
||||
"""
|
||||
|
||||
self.total_shares = total_shares
|
||||
self.shares_of_happiness = shares_of_happiness
|
||||
|
||||
# we are responsible for locating the shareholders. self._encoder is
|
||||
# responsible for handling the data and sending out the shares.
|
||||
peers = client.get_permuted_peers(storage_index, push_to_ourselves)
|
||||
|
||||
assert peers, "peer selection left us with zero peers for our data"
|
||||
|
||||
# this needed_hashes computation should mirror
|
||||
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
|
||||
# (instead of a HashTree) because we don't require actual hashing
|
||||
# just to count the levels.
|
||||
ht = hashtree.IncompleteHashTree(total_shares)
|
||||
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
|
||||
|
||||
client_renewal_secret = client.get_renewal_secret()
|
||||
client_cancel_secret = client.get_cancel_secret()
|
||||
|
||||
file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
|
||||
storage_index)
|
||||
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
|
||||
storage_index)
|
||||
|
||||
trackers = [ PeerTracker(peerid, permutedid, conn,
|
||||
share_size, block_size,
|
||||
num_segments, num_share_hashes,
|
||||
storage_index,
|
||||
bucket_renewal_secret_hash(file_renewal_secret,
|
||||
peerid),
|
||||
bucket_cancel_secret_hash(file_cancel_secret,
|
||||
peerid),
|
||||
)
|
||||
for permutedid, peerid, conn in peers ]
|
||||
self.all_peers = trackers
|
||||
self.usable_peers = set(trackers) # this set shrinks over time
|
||||
self.used_peers = set() # while this set grows
|
||||
self.unallocated_sharenums = set(range(total_shares)) # this one shrinks
|
||||
|
||||
return self._locate_more_shareholders()
|
||||
|
||||
def _locate_more_shareholders(self):
|
||||
d = self._query_peers()
|
||||
d.addCallback(self._located_some_shareholders)
|
||||
return d
|
||||
|
||||
def _located_some_shareholders(self, res):
|
||||
log.msg("_located_some_shareholders")
|
||||
log.msg(" still need homes for %d shares, still have %d usable peers"
|
||||
% (len(self.unallocated_sharenums), len(self.usable_peers)))
|
||||
if not self.unallocated_sharenums:
|
||||
# Finished allocating places for all shares.
|
||||
log.msg("%s._locate_all_shareholders() "
|
||||
"Finished allocating places for all shares." % self)
|
||||
log.msg("used_peers is %s" % (self.used_peers,))
|
||||
return self.used_peers
|
||||
if not self.usable_peers:
|
||||
# Ran out of peers who have space.
|
||||
log.msg("%s._locate_all_shareholders() "
|
||||
"Ran out of peers who have space." % self)
|
||||
margin = self.total_shares - self.shares_of_happiness
|
||||
if len(self.unallocated_sharenums) < margin:
|
||||
# But we allocated places for enough shares.
|
||||
log.msg("%s._locate_all_shareholders() "
|
||||
"But we allocated places for enough shares.")
|
||||
return self.used_peers
|
||||
raise encode.NotEnoughPeersError
|
||||
# we need to keep trying
|
||||
return self._locate_more_shareholders()
|
||||
|
||||
def _create_ring_of_things(self):
|
||||
PEER = 1 # must sort later than SHARE, for consistency with download
|
||||
SHARE = 0
|
||||
# ring_of_things is a list of (position_in_ring, whatami, x) where
|
||||
# whatami is SHARE if x is a sharenum or else PEER if x is a
|
||||
# PeerTracker instance
|
||||
ring_of_things = []
|
||||
ring_of_things.extend([ (peer.permutedid, PEER, peer,)
|
||||
for peer in self.usable_peers ])
|
||||
shares = [ (i * 2**160 / self.total_shares, SHARE, i)
|
||||
for i in self.unallocated_sharenums]
|
||||
ring_of_things.extend(shares)
|
||||
ring_of_things.sort()
|
||||
ring_of_things = collections.deque(ring_of_things)
|
||||
return ring_of_things
|
||||
|
||||
def _query_peers(self):
|
||||
"""
|
||||
@return: a deferred that fires when all queries have resolved
|
||||
"""
|
||||
PEER = 1
|
||||
SHARE = 0
|
||||
ring = self._create_ring_of_things()
|
||||
|
||||
# Choose a random starting point, talk to that peer.
|
||||
ring.rotate(random.randrange(0, len(ring)))
|
||||
|
||||
# Walk backwards to find a peer. We know that we'll eventually find
|
||||
# one because we earlier asserted that there was at least one.
|
||||
while ring[0][1] != PEER:
|
||||
ring.rotate(-1)
|
||||
peer = ring[0][2]
|
||||
assert isinstance(peer, PeerTracker), peer
|
||||
ring.rotate(-1)
|
||||
|
||||
# loop invariant: at the top of the loop, we are always one step to
|
||||
# the left of a peer, which is stored in the peer variable.
|
||||
outstanding_queries = []
|
||||
sharenums_to_query = set()
|
||||
for i in range(len(ring)):
|
||||
if ring[0][1] == SHARE:
|
||||
sharenums_to_query.add(ring[0][2])
|
||||
else:
|
||||
if True or sharenums_to_query:
|
||||
d = peer.query(sharenums_to_query)
|
||||
d.addCallbacks(self._got_response, self._got_error, callbackArgs=(peer, sharenums_to_query), errbackArgs=(peer,))
|
||||
outstanding_queries.append(d)
|
||||
d.addErrback(log.err)
|
||||
peer = ring[0][2]
|
||||
sharenums_to_query = set()
|
||||
ring.rotate(-1)
|
||||
|
||||
return defer.DeferredList(outstanding_queries)
|
||||
|
||||
def _got_response(self, (alreadygot, allocated), peer, shares_we_requested):
|
||||
"""
|
||||
@type alreadygot: a set of sharenums
|
||||
@type allocated: a set of sharenums
|
||||
"""
|
||||
# TODO: some future version of Foolscap might not convert inbound
|
||||
# sets into sets.Set on us, even when we're using 2.4
|
||||
alreadygot = set(alreadygot)
|
||||
allocated = set(allocated)
|
||||
#log.msg("%s._got_response(%s, %s, %s): "
|
||||
# "self.unallocated_sharenums: %s, unhandled: %s"
|
||||
# % (self, (alreadygot, allocated), peer, shares_we_requested,
|
||||
# self.unallocated_sharenums,
|
||||
# shares_we_requested - alreadygot - allocated))
|
||||
self.unallocated_sharenums -= alreadygot
|
||||
self.unallocated_sharenums -= allocated
|
||||
|
||||
if allocated:
|
||||
self.used_peers.add(peer)
|
||||
|
||||
if shares_we_requested - alreadygot - allocated:
|
||||
# Then he didn't accept some of the shares, so he's full.
|
||||
|
||||
#log.msg("%s._got_response(%s, %s, %s): "
|
||||
# "self.unallocated_sharenums: %s, unhandled: %s HE'S FULL"
|
||||
# % (self,
|
||||
# (alreadygot, allocated), peer, shares_we_requested,
|
||||
# self.unallocated_sharenums,
|
||||
# shares_we_requested - alreadygot - allocated))
|
||||
self.usable_peers.remove(peer)
|
||||
|
||||
def _got_error(self, f, peer):
|
||||
log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
|
||||
self.usable_peers.remove(peer)
|
||||
|
||||
class Tahoe2PeerSelector:
|
||||
|
||||
def __init__(self, upload_id):
|
||||
|
Loading…
x
Reference in New Issue
Block a user