mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-02 01:08:20 +00:00
upload: rearrange peer-selection code to be more readable, and fix a silly bug
This commit is contained in:
parent
460ed84562
commit
ccdc2622d4
@ -6,6 +6,7 @@ from twisted.application import service
|
||||
from foolscap import Referenceable
|
||||
|
||||
from allmydata.util import idlib, bencode
|
||||
from allmydata.util.idlib import peerid_to_short_string as shortid
|
||||
from allmydata.util.deferredutil import DeferredListShouldSucceed
|
||||
from allmydata import codec
|
||||
|
||||
@ -85,11 +86,8 @@ class FileUploader:
|
||||
for p in self.permuted:
|
||||
assert isinstance(p, str)
|
||||
# we will shrink self.permuted as we give up on peers
|
||||
self.peer_index = 0
|
||||
self.goodness_points = 0
|
||||
self.landlords = [] # list of (peerid, bucket_num, remotebucket)
|
||||
|
||||
d = defer.maybeDeferred(self._check_next_peer)
|
||||
d = defer.maybeDeferred(self._find_peers)
|
||||
d.addCallback(self._got_enough_peers)
|
||||
d.addCallback(self._compute_uri)
|
||||
return d
|
||||
@ -97,6 +95,27 @@ class FileUploader:
|
||||
def _compute_uri(self, params):
|
||||
return "URI:%s" % bencode.bencode((self._verifierid, params))
|
||||
|
||||
def _build_not_enough_peers_error(self):
|
||||
yes = ",".join([shortid(p) for p in self.peers_who_said_yes])
|
||||
no = ",".join([shortid(p) for p in self.peers_who_said_no])
|
||||
err = ",".join([shortid(p) for p in self.peers_who_had_errors])
|
||||
msg = ("%s goodness, want %s, have %d "
|
||||
"landlords, %d total peers, "
|
||||
"peers:yes=%s;no=%s;err=%s" %
|
||||
(self.goodness_points, self.target_goodness,
|
||||
len(self.landlords), self._total_peers,
|
||||
yes, no, err))
|
||||
return msg
|
||||
|
||||
def _find_peers(self):
|
||||
# this returns a Deferred which fires (with a meaningless value) when
|
||||
# enough peers are found, or errbacks with a NotEnoughPeersError if
|
||||
# not.
|
||||
self.peer_index = 0
|
||||
self.goodness_points = 0
|
||||
self.landlords = [] # list of (peerid, bucket_num, remotebucket)
|
||||
return self._check_next_peer()
|
||||
|
||||
def _check_next_peer(self):
|
||||
if self.debug:
|
||||
log.msg("FileUploader._check_next_peer: %d permuted, %d goodness"
|
||||
@ -104,76 +123,75 @@ class FileUploader:
|
||||
(len(self.permuted), self.goodness_points,
|
||||
self.target_goodness, len(self.landlords),
|
||||
self._total_peers))
|
||||
if len(self.permuted) == 0:
|
||||
# there are no more to check
|
||||
yes = ",".join([idlib.peerid_to_short_string(p)
|
||||
for p in self.peers_who_said_yes])
|
||||
no = ",".join([idlib.peerid_to_short_string(p)
|
||||
for p in self.peers_who_said_no])
|
||||
err = ",".join([idlib.peerid_to_short_string(p)
|
||||
for p in self.peers_who_had_errors])
|
||||
msg = ("%s goodness, want %s, have %d "
|
||||
"landlords, %d total peers, "
|
||||
"peers:yes=%s;no=%s;err=%s" %
|
||||
(self.goodness_points, self.target_goodness,
|
||||
len(self.landlords), self._total_peers,
|
||||
yes, no, err))
|
||||
if (self.goodness_points >= self.target_goodness and
|
||||
len(self.landlords) >= self.min_shares):
|
||||
if self.debug: print " we're done!"
|
||||
return "done"
|
||||
if not self.permuted:
|
||||
# we've run out of peers to check without finding enough, which
|
||||
# means we won't be able to upload this file. Bummer.
|
||||
msg = self._build_not_enough_peers_error()
|
||||
log.msg("NotEnoughPeersError: %s" % msg)
|
||||
raise NotEnoughPeersError(msg)
|
||||
|
||||
# otherwise we use self.peer_index to rotate through all the usable
|
||||
# peers. It gets inremented elsewhere, but wrapped here.
|
||||
if self.peer_index >= len(self.permuted):
|
||||
self.peer_index = 0
|
||||
|
||||
peerid = self.permuted[self.peer_index]
|
||||
|
||||
d = self._check_peer(peerid)
|
||||
d.addCallback(lambda res: self._check_next_peer())
|
||||
return d
|
||||
|
||||
def _check_peer(self, peerid):
|
||||
# contact a single peer, and ask them to hold a share. If they say
|
||||
# yes, we update self.landlords and self.goodness_points, and
|
||||
# increment self.peer_index. If they say no, or are uncontactable, we
|
||||
# remove them from self.permuted. This returns a Deferred which never
|
||||
# errbacks.
|
||||
|
||||
bucket_num = len(self.landlords)
|
||||
d = self._peer.get_remote_service(peerid, "storageserver")
|
||||
def _got_peer(service):
|
||||
bucket_num = len(self.landlords)
|
||||
if self.debug: print "asking %s" % idlib.b2a(peerid)
|
||||
if self.debug: print "asking %s" % shortid(peerid)
|
||||
d2 = service.callRemote("allocate_bucket",
|
||||
verifierid=self._verifierid,
|
||||
bucket_num=bucket_num,
|
||||
size=self._share_size,
|
||||
leaser=self._peer.nodeid,
|
||||
canary=Referenceable())
|
||||
def _allocate_response(bucket):
|
||||
if self.debug:
|
||||
print " peerid %s will grant us a lease" % idlib.b2a(peerid)
|
||||
self.peers_who_said_yes.append(peerid)
|
||||
self.landlords.append( (peerid, bucket_num, bucket) )
|
||||
self.goodness_points += 1
|
||||
if (self.goodness_points >= self.target_goodness and
|
||||
len(self.landlords) >= self.min_shares):
|
||||
if self.debug: print " we're done!"
|
||||
raise HaveAllPeersError()
|
||||
# otherwise we fall through to allocate more peers
|
||||
d2.addCallback(_allocate_response)
|
||||
return d2
|
||||
d.addCallback(_got_peer)
|
||||
def _done_with_peer(res):
|
||||
if self.debug: print "done with peer %s:" % idlib.b2a(peerid)
|
||||
if isinstance(res, failure.Failure):
|
||||
if res.check(HaveAllPeersError):
|
||||
if self.debug: print " all done"
|
||||
# we're done!
|
||||
return
|
||||
if res.check(TooFullError):
|
||||
if self.debug: print " too full"
|
||||
self.peers_who_said_no.append(peerid)
|
||||
elif res.check(IndexError):
|
||||
if self.debug: print " no connection"
|
||||
self.peers_who_had_errors.append(peerid)
|
||||
else:
|
||||
if self.debug: print " other error:", res
|
||||
self.peers_who_had_errors.append(peerid)
|
||||
self.permuted.remove(peerid) # this peer was unusable
|
||||
|
||||
def _allocate_response(bucket):
|
||||
if self.debug:
|
||||
print " peerid %s will grant us a lease" % shortid(peerid)
|
||||
self.peers_who_said_yes.append(peerid)
|
||||
self.landlords.append( (peerid, bucket_num, bucket) )
|
||||
self.goodness_points += 1
|
||||
self.peer_index += 1
|
||||
|
||||
d.addCallback(_allocate_response)
|
||||
|
||||
def _err(f):
|
||||
if self.debug: print "err from peer %s:" % idlib.b2a(peerid)
|
||||
assert isinstance(f, failure.Failure)
|
||||
if f.check(TooFullError):
|
||||
if self.debug: print " too full"
|
||||
self.peers_who_said_no.append(peerid)
|
||||
elif f.check(IndexError):
|
||||
if self.debug: print " no connection"
|
||||
self.peers_who_had_errors.append(peerid)
|
||||
else:
|
||||
if self.debug: print " they gave us a lease"
|
||||
# we get here for either good peers (when we still need
|
||||
# more), or after checking a bad peer (and thus still need
|
||||
# more). So now we need to grab a new peer.
|
||||
self.peer_index += 1
|
||||
return self._check_next_peer()
|
||||
d.addBoth(_done_with_peer)
|
||||
if self.debug: print " other error:", res
|
||||
self.peers_who_had_errors.append(peerid)
|
||||
log.msg("FileUploader._check_peer(%s): err" % shortid(peerid))
|
||||
log.msg(f)
|
||||
self.permuted.remove(peerid) # this peer was unusable
|
||||
return None
|
||||
d.addErrback(_err)
|
||||
return d
|
||||
|
||||
def _got_enough_peers(self, res):
|
||||
|
Loading…
x
Reference in New Issue
Block a user