upload.py: implement Tahoe2 peer-selection algorithm

This commit is contained in:
Brian Warner 2007-09-16 01:24:07 -07:00
parent f24d7450a7
commit 979d12cd42

View File

@ -1,7 +1,7 @@
import os import os
from zope.interface import implements from zope.interface import implements
from twisted.python import log from twisted.python import log, failure
from twisted.internet import defer from twisted.internet import defer
from twisted.application import service from twisted.application import service
from foolscap import Referenceable from foolscap import Referenceable
@ -11,6 +11,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
bucket_cancel_secret_hash, plaintext_hasher, \ bucket_cancel_secret_hash, plaintext_hasher, \
storage_index_chk_hash, plaintext_segment_hasher, key_hasher storage_index_chk_hash, plaintext_segment_hasher, key_hasher
from allmydata import encode, storage, hashtree, uri from allmydata import encode, storage, hashtree, uri
from allmydata.util import idlib, mathutil
from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable from allmydata.interfaces import IUploadable, IUploader, IEncryptedUploadable
from allmydata.Crypto.Cipher import AES from allmydata.Crypto.Cipher import AES
@ -59,6 +60,11 @@ class PeerTracker:
self.renew_secret = bucket_renewal_secret self.renew_secret = bucket_renewal_secret
self.cancel_secret = bucket_cancel_secret self.cancel_secret = bucket_cancel_secret
def __repr__(self):
return ("<PeerTracker for peer %s and SI %s>"
% (idlib.b2a(self.peerid)[:4],
idlib.b2a(self.storage_index)[:6]))
def query(self, sharenums): def query(self, sharenums):
if not self._storageserver: if not self._storageserver:
d = self.connection.callRemote("get_service", "storageserver") d = self.connection.callRemote("get_service", "storageserver")
@ -95,6 +101,12 @@ class PeerTracker:
class Tahoe3PeerSelector: class Tahoe3PeerSelector:
def __init__(self, upload_id):
self.upload_id = upload_id
def __repr__(self):
return "<Tahoe3PeerSelector for upload %s>" % self.upload_id
def get_shareholders(self, client, def get_shareholders(self, client,
storage_index, share_size, block_size, storage_index, share_size, block_size,
num_segments, total_shares, shares_of_happiness, num_segments, total_shares, shares_of_happiness,
@ -138,6 +150,7 @@ class Tahoe3PeerSelector:
peerid), peerid),
) )
for permutedid, peerid, conn in peers ] for permutedid, peerid, conn in peers ]
self.all_peers = trackers
self.usable_peers = set(trackers) # this set shrinks over time self.usable_peers = set(trackers) # this set shrinks over time
self.used_peers = set() # while this set grows self.used_peers = set() # while this set grows
self.unallocated_sharenums = set(range(total_shares)) # this one shrinks self.unallocated_sharenums = set(range(total_shares)) # this one shrinks
@ -262,6 +275,197 @@ class Tahoe3PeerSelector:
log.msg("%s._got_error(%s, %s)" % (self, f, peer,)) log.msg("%s._got_error(%s, %s)" % (self, f, peer,))
self.usable_peers.remove(peer) self.usable_peers.remove(peer)
class Tahoe2PeerSelector:
def __init__(self, upload_id):
self.upload_id = upload_id
self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
self.error_count = 0
self.num_peers_contacted = 0
self.last_failure_msg = None
def __repr__(self):
return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
def get_shareholders(self, client,
storage_index, share_size, block_size,
num_segments, total_shares, shares_of_happiness,
push_to_ourselves):
"""
@return: a set of PeerTracker instances that have agreed to hold some
shares for us
"""
self.total_shares = total_shares
self.shares_of_happiness = shares_of_happiness
self.homeless_shares = range(total_shares)
# self.uncontacted_peers = list() # peers we haven't asked yet
self.contacted_peers = list() # peers worth asking again
self.use_peers = set() # PeerTrackers that have shares assigned to them
self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
peers = client.get_permuted_peers(storage_index, push_to_ourselves)
if not peers:
raise encode.NotEnoughPeersError("client gave us zero peers")
# figure out how much space to ask for
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
# (instead of a HashTree) because we don't require actual hashing
# just to count the levels.
ht = hashtree.IncompleteHashTree(total_shares)
num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
# decide upon the renewal/cancel secrets, to include them in the
# allocat_buckets query.
client_renewal_secret = client.get_renewal_secret()
client_cancel_secret = client.get_cancel_secret()
file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
storage_index)
file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
storage_index)
trackers = [ PeerTracker(peerid, permutedid, conn,
share_size, block_size,
num_segments, num_share_hashes,
storage_index,
bucket_renewal_secret_hash(file_renewal_secret,
peerid),
bucket_cancel_secret_hash(file_cancel_secret,
peerid),
)
for permutedid, peerid, conn in peers ]
self.uncontacted_peers = trackers
d = defer.maybeDeferred(self._loop)
return d
def _loop(self):
if not self.homeless_shares:
# all done
msg = ("placed all %d shares, "
"sent %d queries to %d peers, "
"%d queries placed some shares, %d placed none, "
"got %d errors" %
(self.total_shares,
self.query_count, self.num_peers_contacted,
self.good_query_count, self.bad_query_count,
self.error_count))
log.msg("peer selection successful for %s: %s" % (self, msg))
return self.use_peers
if self.uncontacted_peers:
peer = self.uncontacted_peers.pop(0)
# TODO: don't pre-convert all peerids to PeerTrackers
assert isinstance(peer, PeerTracker)
shares_to_ask = set([self.homeless_shares.pop(0)])
self.query_count += 1
self.num_peers_contacted += 1
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask)
return d
elif self.contacted_peers:
# ask a peer that we've already asked.
num_shares = mathutil.div_ceil(len(self.homeless_shares),
len(self.contacted_peers))
shares_to_ask = set(self.homeless_shares[:num_shares])
self.homeless_shares[:num_shares] = []
peer = self.contacted_peers.pop(0)
self.query_count += 1
d = peer.query(shares_to_ask)
d.addBoth(self._got_response, peer, shares_to_ask)
return d
else:
# no more peers. If we haven't placed enough shares, we fail.
placed_shares = self.total_shares - len(self.homeless_shares)
if placed_shares < self.shares_of_happiness:
msg = ("placed %d shares out of %d total (%d homeless), "
"sent %d queries to %d peers, "
"%d queries placed some shares, %d placed none, "
"got %d errors" %
(self.total_shares - len(self.homeless_shares),
self.total_shares, len(self.homeless_shares),
self.query_count, self.num_peers_contacted,
self.good_query_count, self.bad_query_count,
self.error_count))
msg = "peer selection failed for %s: %s" % (self, msg)
if self.last_failure_msg:
msg += " (%s)" % (self.last_failure_msg,)
log.msg(msg)
raise encode.NotEnoughPeersError(msg)
else:
# we placed enough to be happy, so we're done
return self.use_peers
def _got_response(self, res, peer, shares_to_ask):
if isinstance(res, failure.Failure):
# This is unusual, and probably indicates a bug or a network
# problem.
log.msg("%s got error during peer selection: %s" % (peer, res))
self.error_count += 1
self.homeless_shares = list(shares_to_ask) + self.homeless_shares
if self.uncontacted_peers or self.contacted_peers:
# there is still hope, so just loop
pass
else:
# No more peers, so this upload might fail (it depends upon
# whether we've hit shares_of_happiness or not). Log the last
# failure we got: if a coding error causes all peers to fail
# in the same way, this allows the common failure to be seen
# by the uploader and should help with debugging
msg = ("last failure (from %s) was: %s" % (peer, res))
self.last_failure_msg = msg
else:
(alreadygot, allocated) = res
progress = False
for s in alreadygot:
self.preexisting_shares[s] = peer
if s in self.homeless_shares:
self.homeless_shares.remove(s)
progress = True
# the PeerTracker will remember which shares were allocated on
# that peer. We just have to remember to use them.
if allocated:
self.use_peers.add(peer)
progress = True
not_yet_present = set(shares_to_ask) - set(alreadygot)
still_homeless = not_yet_present - set(allocated)
if progress:
# they accepted or already had at least one share, so
# progress has been made
self.good_query_count += 1
else:
self.bad_query_count += 1
if still_homeless:
# In networks with lots of space, this is very unusual and
# probably indicates an error. In networks with peers that
# are full, it is merely unusual. In networks that are very
# full, it is common, and many uploads will fail. In most
# cases, this is obviously not fatal, and we'll just use some
# other peers.
# some shares are still homeless, keep trying to find them a
# home. The ones that were rejected get first priority.
self.homeless_shares = (list(still_homeless)
+ self.homeless_shares)
# Since they were unable to accept all of our requests, so it
# is safe to assume that asking them again won't help.
else:
# if they *were* able to accept everything, they might be
# willing to accept even more.
self.contacted_peers.append(peer)
# now loop
return self._loop()
class EncryptAnUploadable: class EncryptAnUploadable:
"""This is a wrapper that takes an IUploadable and provides """This is a wrapper that takes an IUploadable and provides
@ -415,9 +619,10 @@ class CHKUploader:
return d return d
def locate_all_shareholders(self, encoder): def locate_all_shareholders(self, encoder):
peer_selector = self.peer_selector_class()
storage_index = encoder.get_param("storage_index") storage_index = encoder.get_param("storage_index")
upload_id = idlib.b2a(storage_index)[:6]
peer_selector = self.peer_selector_class(upload_id)
share_size = encoder.get_param("share_size") share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size") block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments") num_segments = encoder.get_param("num_segments")