mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-20 17:52:50 +00:00
Alter 'immutable/encode.py' and 'immutable/upload.py' to use servers_of_happiness instead of shares_of_happiness.
This commit is contained in:
parent
b2d8a7cec2
commit
8c71df53f9
@ -118,7 +118,7 @@ class Encoder(object):
|
||||
assert not self._codec
|
||||
k, happy, n, segsize = params
|
||||
self.required_shares = k
|
||||
self.shares_of_happiness = happy
|
||||
self.servers_of_happiness = happy
|
||||
self.num_shares = n
|
||||
self.segment_size = segsize
|
||||
self.log("got encoding parameters: %d/%d/%d %d" % (k,happy,n, segsize))
|
||||
@ -176,7 +176,7 @@ class Encoder(object):
|
||||
if name == "storage_index":
|
||||
return self._storage_index
|
||||
elif name == "share_counts":
|
||||
return (self.required_shares, self.shares_of_happiness,
|
||||
return (self.required_shares, self.servers_of_happiness,
|
||||
self.num_shares)
|
||||
elif name == "num_segments":
|
||||
return self.num_segments
|
||||
@ -191,11 +191,13 @@ class Encoder(object):
|
||||
else:
|
||||
raise KeyError("unknown parameter name '%s'" % name)
|
||||
|
||||
def set_shareholders(self, landlords):
|
||||
def set_shareholders(self, landlords, servermap):
|
||||
assert isinstance(landlords, dict)
|
||||
for k in landlords:
|
||||
assert IStorageBucketWriter.providedBy(landlords[k])
|
||||
self.landlords = landlords.copy()
|
||||
assert isinstance(servermap, dict)
|
||||
self.servermap = servermap.copy()
|
||||
|
||||
def start(self):
|
||||
""" Returns a Deferred that will fire with the verify cap (an instance of
|
||||
@ -486,16 +488,19 @@ class Encoder(object):
|
||||
# even more UNUSUAL
|
||||
self.log("they weren't in our list of landlords", parent=ln,
|
||||
level=log.WEIRD, umid="TQGFRw")
|
||||
if len(self.landlords) < self.shares_of_happiness:
|
||||
msg = "lost too many shareholders during upload (still have %d, want %d): %s" % \
|
||||
(len(self.landlords), self.shares_of_happiness, why)
|
||||
if self.landlords:
|
||||
del(self.servermap[shareid])
|
||||
servers_left = list(set(self.servermap.values()))
|
||||
if len(servers_left) < self.servers_of_happiness:
|
||||
msg = "lost too many servers during upload (still have %d, want %d): %s" % \
|
||||
(len(servers_left),
|
||||
self.servers_of_happiness, why)
|
||||
if servers_left:
|
||||
raise NotEnoughSharesError(msg)
|
||||
else:
|
||||
raise NoSharesError(msg)
|
||||
self.log("but we can still continue with %s shares, we'll be happy "
|
||||
"with at least %s" % (len(self.landlords),
|
||||
self.shares_of_happiness),
|
||||
"with at least %s" % (len(servers_left),
|
||||
self.servers_of_happiness),
|
||||
parent=ln)
|
||||
|
||||
def _gather_responses(self, dl):
|
||||
|
@ -128,6 +128,23 @@ class PeerTracker:
|
||||
self.buckets.update(b)
|
||||
return (alreadygot, set(b.keys()))
|
||||
|
||||
def servers_with_unique_shares(existing_shares, used_peers=None):
|
||||
servers = []
|
||||
if used_peers:
|
||||
peers = list(used_peers.copy())
|
||||
# We do this because the preexisting shares list goes by peerid.
|
||||
peers = [x.peerid for x in peers]
|
||||
servers.extend(peers)
|
||||
servers.extend(existing_shares.values())
|
||||
return list(set(servers))
|
||||
|
||||
def shares_by_server(existing_shares):
|
||||
servers = {}
|
||||
for server in set(existing_shares.values()):
|
||||
servers[server] = set([x for x in existing_shares.keys()
|
||||
if existing_shares[x] == server])
|
||||
return servers
|
||||
|
||||
class Tahoe2PeerSelector:
|
||||
|
||||
def __init__(self, upload_id, logparent=None, upload_status=None):
|
||||
@ -144,7 +161,7 @@ class Tahoe2PeerSelector:
|
||||
|
||||
def get_shareholders(self, storage_broker, secret_holder,
|
||||
storage_index, share_size, block_size,
|
||||
num_segments, total_shares, shares_of_happiness):
|
||||
num_segments, total_shares, servers_of_happiness):
|
||||
"""
|
||||
@return: (used_peers, already_peers), where used_peers is a set of
|
||||
PeerTracker instances that have agreed to hold some shares
|
||||
@ -157,7 +174,7 @@ class Tahoe2PeerSelector:
|
||||
self._status.set_status("Contacting Peers..")
|
||||
|
||||
self.total_shares = total_shares
|
||||
self.shares_of_happiness = shares_of_happiness
|
||||
self.servers_of_happiness = servers_of_happiness
|
||||
|
||||
self.homeless_shares = range(total_shares)
|
||||
# self.uncontacted_peers = list() # peers we haven't asked yet
|
||||
@ -222,20 +239,52 @@ class Tahoe2PeerSelector:
|
||||
d = defer.maybeDeferred(self._loop)
|
||||
return d
|
||||
|
||||
|
||||
def _loop(self):
|
||||
if not self.homeless_shares:
|
||||
# all done
|
||||
msg = ("placed all %d shares, "
|
||||
"sent %d queries to %d peers, "
|
||||
"%d queries placed some shares, %d placed none, "
|
||||
"got %d errors" %
|
||||
(self.total_shares,
|
||||
self.query_count, self.num_peers_contacted,
|
||||
self.good_query_count, self.bad_query_count,
|
||||
self.error_count))
|
||||
log.msg("peer selection successful for %s: %s" % (self, msg),
|
||||
effective_happiness = servers_with_unique_shares(
|
||||
self.preexisting_shares,
|
||||
self.use_peers)
|
||||
if self.servers_of_happiness <= len(effective_happiness):
|
||||
msg = ("placed all %d shares, "
|
||||
"sent %d queries to %d peers, "
|
||||
"%d queries placed some shares, %d placed none, "
|
||||
"got %d errors" %
|
||||
(self.total_shares,
|
||||
self.query_count, self.num_peers_contacted,
|
||||
self.good_query_count, self.bad_query_count,
|
||||
self.error_count))
|
||||
log.msg("peer selection successful for %s: %s" % (self, msg),
|
||||
parent=self._log_parent)
|
||||
return (self.use_peers, self.preexisting_shares)
|
||||
return (self.use_peers, self.preexisting_shares)
|
||||
else:
|
||||
delta = self.servers_of_happiness - len(effective_happiness)
|
||||
shares = shares_by_server(self.preexisting_shares)
|
||||
# Each server in shares maps to a set of shares stored on it.
|
||||
# Since we want to keep at least one share on each server
|
||||
# that has one (otherwise we'd only be making
|
||||
# the situation worse by removing distinct servers),
|
||||
# each server has len(its shares) - 1 to spread around.
|
||||
shares_to_spread = sum([len(list(sharelist)) - 1
|
||||
for (server, sharelist)
|
||||
in shares.items()])
|
||||
if delta <= len(self.uncontacted_peers) and \
|
||||
shares_to_spread >= delta:
|
||||
# Loop through the allocated shares, removing
|
||||
items = shares.items()
|
||||
while len(self.homeless_shares) < delta:
|
||||
servernum, sharelist = items.pop()
|
||||
if len(sharelist) > 1:
|
||||
share = sharelist.pop()
|
||||
self.homeless_shares.append(share)
|
||||
del(self.preexisting_shares[share])
|
||||
items.append((servernum, sharelist))
|
||||
return self._loop()
|
||||
else:
|
||||
raise NotEnoughSharesError("shares could only be placed on %d "
|
||||
"servers (%d were requested)" %
|
||||
(len(effective_happiness),
|
||||
self.servers_of_happiness))
|
||||
|
||||
if self.uncontacted_peers:
|
||||
peer = self.uncontacted_peers.pop(0)
|
||||
@ -284,15 +333,18 @@ class Tahoe2PeerSelector:
|
||||
else:
|
||||
# no more peers. If we haven't placed enough shares, we fail.
|
||||
placed_shares = self.total_shares - len(self.homeless_shares)
|
||||
if placed_shares < self.shares_of_happiness:
|
||||
effective_happiness = servers_with_unique_shares(
|
||||
self.preexisting_shares,
|
||||
self.use_peers)
|
||||
if len(effective_happiness) < self.servers_of_happiness:
|
||||
msg = ("placed %d shares out of %d total (%d homeless), "
|
||||
"want to place %d, "
|
||||
"want to place on %d servers, "
|
||||
"sent %d queries to %d peers, "
|
||||
"%d queries placed some shares, %d placed none, "
|
||||
"got %d errors" %
|
||||
(self.total_shares - len(self.homeless_shares),
|
||||
self.total_shares, len(self.homeless_shares),
|
||||
self.shares_of_happiness,
|
||||
self.servers_of_happiness,
|
||||
self.query_count, self.num_peers_contacted,
|
||||
self.good_query_count, self.bad_query_count,
|
||||
self.error_count))
|
||||
@ -339,6 +391,12 @@ class Tahoe2PeerSelector:
|
||||
level=log.NOISY, parent=self._log_parent)
|
||||
progress = False
|
||||
for s in alreadygot:
|
||||
if self.preexisting_shares.has_key(s):
|
||||
old_size = len(servers_with_unique_shares(self.preexisting_shares))
|
||||
new_candidate = self.preexisting_shares.copy()
|
||||
new_candidate[s] = peer.peerid
|
||||
new_size = len(servers_with_unique_shares(new_candidate))
|
||||
if old_size >= new_size: continue
|
||||
self.preexisting_shares[s] = peer.peerid
|
||||
if s in self.homeless_shares:
|
||||
self.homeless_shares.remove(s)
|
||||
@ -764,12 +822,14 @@ class CHKUploader:
|
||||
for peer in used_peers:
|
||||
assert isinstance(peer, PeerTracker)
|
||||
buckets = {}
|
||||
servermap = already_peers.copy()
|
||||
for peer in used_peers:
|
||||
buckets.update(peer.buckets)
|
||||
for shnum in peer.buckets:
|
||||
self._peer_trackers[shnum] = peer
|
||||
servermap[shnum] = peer.peerid
|
||||
assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
|
||||
encoder.set_shareholders(buckets)
|
||||
encoder.set_shareholders(buckets, servermap)
|
||||
|
||||
def _encrypted_done(self, verifycap):
|
||||
""" Returns a Deferred that will fire with the UploadResults instance. """
|
||||
|
Loading…
x
Reference in New Issue
Block a user