mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-19 19:26:25 +00:00
Eliminate overcounting iof servers_of_happiness in Tahoe2PeerSelector; also reorganize some things.
This commit is contained in:
parent
4e29060847
commit
320582be5a
@ -138,8 +138,21 @@ class PeerTracker:
|
||||
return (alreadygot, set(b.keys()))
|
||||
|
||||
def servers_with_unique_shares(existing_shares, used_peers=None):
|
||||
"""
|
||||
I accept a dict of shareid -> peerid mappings (and optionally a list
|
||||
of PeerTracker instances) and return a list of servers that have shares.
|
||||
"""
|
||||
servers = []
|
||||
existing_shares = existing_shares.copy()
|
||||
if used_peers:
|
||||
peerdict = {}
|
||||
for peer in used_peers:
|
||||
peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
|
||||
for k in peerdict.keys():
|
||||
if existing_shares.has_key(k):
|
||||
# Prevent overcounting; favor the bucket, and not the
|
||||
# prexisting share.
|
||||
del(existing_shares[k])
|
||||
peers = list(used_peers.copy())
|
||||
# We do this because the preexisting shares list goes by peerid.
|
||||
peers = [x.peerid for x in peers]
|
||||
@ -148,12 +161,29 @@ def servers_with_unique_shares(existing_shares, used_peers=None):
|
||||
return list(set(servers))
|
||||
|
||||
def shares_by_server(existing_shares):
|
||||
"""
|
||||
I accept a dict of shareid -> peerid mappings, and return a dict
|
||||
of peerid -> shareid mappings
|
||||
"""
|
||||
servers = {}
|
||||
for server in set(existing_shares.values()):
|
||||
servers[server] = set([x for x in existing_shares.keys()
|
||||
if existing_shares[x] == server])
|
||||
return servers
|
||||
|
||||
def should_add_server(existing_shares, server, bucket):
|
||||
"""
|
||||
I tell my caller whether the servers_of_happiness number will be
|
||||
increased or decreased if a particular server is added as the peer
|
||||
already holding a particular share. I take a dictionary, a peerid,
|
||||
and a bucket as arguments, and return a boolean.
|
||||
"""
|
||||
old_size = len(servers_with_unique_shares(existing_shares))
|
||||
new_candidate = existing_shares.copy()
|
||||
new_candidate[bucket] = server
|
||||
new_size = len(servers_with_unique_shares(new_candidate))
|
||||
return old_size < new_size
|
||||
|
||||
class Tahoe2PeerSelector:
|
||||
|
||||
def __init__(self, upload_id, logparent=None, upload_status=None):
|
||||
@ -261,14 +291,15 @@ class Tahoe2PeerSelector:
|
||||
peer = self.readonly_peers.pop()
|
||||
assert isinstance(peer, PeerTracker)
|
||||
d = peer.query_allocated()
|
||||
d.addCallback(self._handle_allocate_response)
|
||||
d.addCallback(self._handle_existing_response)
|
||||
return d
|
||||
|
||||
def _handle_allocate_response(self, (peer, buckets)):
|
||||
def _handle_existing_response(self, (peer, buckets)):
|
||||
for bucket in buckets:
|
||||
self.preexisting_shares[bucket] = peer
|
||||
if self.homeless_shares:
|
||||
self.homeless_shares.remove(bucket)
|
||||
if should_add_server(self.preexisting_shares, peer, bucket):
|
||||
self.preexisting_shares[bucket] = peer
|
||||
if self.homeless_shares and bucket in self.homeless_shares:
|
||||
self.homeless_shares.remove(bucket)
|
||||
return self._existing_shares()
|
||||
|
||||
def _loop(self):
|
||||
@ -312,10 +343,10 @@ class Tahoe2PeerSelector:
|
||||
items.append((servernum, sharelist))
|
||||
return self._loop()
|
||||
else:
|
||||
raise NotEnoughSharesError("shares could only be placed on %d "
|
||||
"servers (%d were requested)" %
|
||||
(len(effective_happiness),
|
||||
self.servers_of_happiness))
|
||||
raise NotEnoughSharesError("shares could only be placed "
|
||||
"on %d servers (%d were requested)" %
|
||||
(len(effective_happiness),
|
||||
self.servers_of_happiness))
|
||||
|
||||
if self.uncontacted_peers:
|
||||
peer = self.uncontacted_peers.pop(0)
|
||||
@ -391,7 +422,7 @@ class Tahoe2PeerSelector:
|
||||
# we placed enough to be happy, so we're done
|
||||
if self._status:
|
||||
self._status.set_status("Placed all shares")
|
||||
return self.use_peers
|
||||
return (self.use_peers, self.preexisting_shares)
|
||||
|
||||
def _got_response(self, res, peer, shares_to_ask, put_peer_here):
|
||||
if isinstance(res, failure.Failure):
|
||||
@ -422,16 +453,12 @@ class Tahoe2PeerSelector:
|
||||
level=log.NOISY, parent=self._log_parent)
|
||||
progress = False
|
||||
for s in alreadygot:
|
||||
if self.preexisting_shares.has_key(s):
|
||||
old_size = len(servers_with_unique_shares(self.preexisting_shares))
|
||||
new_candidate = self.preexisting_shares.copy()
|
||||
new_candidate[s] = peer.peerid
|
||||
new_size = len(servers_with_unique_shares(new_candidate))
|
||||
if old_size >= new_size: continue
|
||||
self.preexisting_shares[s] = peer.peerid
|
||||
if s in self.homeless_shares:
|
||||
self.homeless_shares.remove(s)
|
||||
progress = True
|
||||
if should_add_server(self.preexisting_shares,
|
||||
peer.peerid, s):
|
||||
self.preexisting_shares[s] = peer.peerid
|
||||
if s in self.homeless_shares:
|
||||
self.homeless_shares.remove(s)
|
||||
progress = True
|
||||
|
||||
# the PeerTracker will remember which shares were allocated on
|
||||
# that peer. We just have to remember to use them.
|
||||
|
Loading…
Reference in New Issue
Block a user