mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-18 10:46:24 +00:00
encode: tolerate lost peers, as long as we still get enough shares out. Closes #17.
This commit is contained in:
parent
fcd7842ff2
commit
6bb9debc16
@ -58,6 +58,9 @@ hash tree is put into the URI.
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
class NotEnoughPeersError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
KiB=1024
|
KiB=1024
|
||||||
MiB=1024*KiB
|
MiB=1024*KiB
|
||||||
GiB=1024*MiB
|
GiB=1024*MiB
|
||||||
@ -67,6 +70,7 @@ PiB=1024*TiB
|
|||||||
class Encoder(object):
|
class Encoder(object):
|
||||||
implements(IEncoder)
|
implements(IEncoder)
|
||||||
NEEDED_SHARES = 25
|
NEEDED_SHARES = 25
|
||||||
|
SHARES_OF_HAPPINESS = 75
|
||||||
TOTAL_SHARES = 100
|
TOTAL_SHARES = 100
|
||||||
MAX_SEGMENT_SIZE = 2*MiB
|
MAX_SEGMENT_SIZE = 2*MiB
|
||||||
|
|
||||||
@ -74,9 +78,12 @@ class Encoder(object):
|
|||||||
object.__init__(self)
|
object.__init__(self)
|
||||||
self.MAX_SEGMENT_SIZE = options.get("max_segment_size",
|
self.MAX_SEGMENT_SIZE = options.get("max_segment_size",
|
||||||
self.MAX_SEGMENT_SIZE)
|
self.MAX_SEGMENT_SIZE)
|
||||||
k,n = options.get("needed_and_total_shares",
|
k,happy,n = options.get("needed_and_happy_and_total_shares",
|
||||||
(self.NEEDED_SHARES, self.TOTAL_SHARES))
|
(self.NEEDED_SHARES,
|
||||||
|
self.SHARES_OF_HAPPINESS,
|
||||||
|
self.TOTAL_SHARES))
|
||||||
self.NEEDED_SHARES = k
|
self.NEEDED_SHARES = k
|
||||||
|
self.SHARES_OF_HAPPINESS = happy
|
||||||
self.TOTAL_SHARES = n
|
self.TOTAL_SHARES = n
|
||||||
self.thingA_data = {}
|
self.thingA_data = {}
|
||||||
|
|
||||||
@ -91,6 +98,7 @@ class Encoder(object):
|
|||||||
|
|
||||||
self.num_shares = self.TOTAL_SHARES
|
self.num_shares = self.TOTAL_SHARES
|
||||||
self.required_shares = self.NEEDED_SHARES
|
self.required_shares = self.NEEDED_SHARES
|
||||||
|
self.shares_of_happiness = self.SHARES_OF_HAPPINESS
|
||||||
|
|
||||||
self.segment_size = min(self.MAX_SEGMENT_SIZE, self.file_size)
|
self.segment_size = min(self.MAX_SEGMENT_SIZE, self.file_size)
|
||||||
# this must be a multiple of self.required_shares
|
# this must be a multiple of self.required_shares
|
||||||
@ -246,7 +254,7 @@ class Encoder(object):
|
|||||||
dl.append(d)
|
dl.append(d)
|
||||||
subshare_hash = block_hash(subshare)
|
subshare_hash = block_hash(subshare)
|
||||||
self.subshare_hashes[shareid].append(subshare_hash)
|
self.subshare_hashes[shareid].append(subshare_hash)
|
||||||
dl = defer.DeferredList(dl)
|
dl = defer.DeferredList(dl, fireOnOneErrback=True, consumeErrors=True)
|
||||||
def _logit(res):
|
def _logit(res):
|
||||||
log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments))
|
log.msg("%s uploaded %s / %s bytes of your file." % (self, self.segment_size*(segnum+1), self.segment_size*self.num_segments))
|
||||||
return res
|
return res
|
||||||
@ -257,7 +265,18 @@ class Encoder(object):
|
|||||||
if shareid not in self.landlords:
|
if shareid not in self.landlords:
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
sh = self.landlords[shareid]
|
sh = self.landlords[shareid]
|
||||||
return sh.callRemote("put_block", segment_num, subshare)
|
d = sh.callRemote("put_block", segment_num, subshare)
|
||||||
|
d.addErrback(self._remove_shareholder, shareid,
|
||||||
|
"segnum=%d" % segment_num)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _remove_shareholder(self, why, shareid, where):
|
||||||
|
log.msg("error while sending %s to shareholder=%d: %s" %
|
||||||
|
(where, shareid, why)) # UNUSUAL
|
||||||
|
del self.landlords[shareid]
|
||||||
|
if len(self.landlords) < self.shares_of_happiness:
|
||||||
|
msg = "lost too many shareholders during upload"
|
||||||
|
raise NotEnoughPeersError(msg)
|
||||||
|
|
||||||
def send_all_subshare_hash_trees(self):
|
def send_all_subshare_hash_trees(self):
|
||||||
log.msg("%s sending subshare hash trees" % self)
|
log.msg("%s sending subshare hash trees" % self)
|
||||||
@ -266,7 +285,8 @@ class Encoder(object):
|
|||||||
# hashes is a list of the hashes of all subshares that were sent
|
# hashes is a list of the hashes of all subshares that were sent
|
||||||
# to shareholder[shareid].
|
# to shareholder[shareid].
|
||||||
dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
|
dl.append(self.send_one_subshare_hash_tree(shareid, hashes))
|
||||||
return defer.DeferredList(dl)
|
return defer.DeferredList(dl, fireOnOneErrback=True,
|
||||||
|
consumeErrors=True)
|
||||||
|
|
||||||
def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
|
def send_one_subshare_hash_tree(self, shareid, subshare_hashes):
|
||||||
t = HashTree(subshare_hashes)
|
t = HashTree(subshare_hashes)
|
||||||
@ -278,7 +298,9 @@ class Encoder(object):
|
|||||||
if shareid not in self.landlords:
|
if shareid not in self.landlords:
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
sh = self.landlords[shareid]
|
sh = self.landlords[shareid]
|
||||||
return sh.callRemote("put_block_hashes", all_hashes)
|
d = sh.callRemote("put_block_hashes", all_hashes)
|
||||||
|
d.addErrback(self._remove_shareholder, shareid, "put_block_hashes")
|
||||||
|
return d
|
||||||
|
|
||||||
def send_all_share_hash_trees(self):
|
def send_all_share_hash_trees(self):
|
||||||
# each bucket gets a set of share hash tree nodes that are needed to
|
# each bucket gets a set of share hash tree nodes that are needed to
|
||||||
@ -300,37 +322,49 @@ class Encoder(object):
|
|||||||
needed_hash_indices = t.needed_hashes(i, include_leaf=True)
|
needed_hash_indices = t.needed_hashes(i, include_leaf=True)
|
||||||
hashes = [(hi, t[hi]) for hi in needed_hash_indices]
|
hashes = [(hi, t[hi]) for hi in needed_hash_indices]
|
||||||
dl.append(self.send_one_share_hash_tree(i, hashes))
|
dl.append(self.send_one_share_hash_tree(i, hashes))
|
||||||
return defer.DeferredList(dl)
|
return defer.DeferredList(dl, fireOnOneErrback=True,
|
||||||
|
consumeErrors=True)
|
||||||
|
|
||||||
def send_one_share_hash_tree(self, shareid, needed_hashes):
|
def send_one_share_hash_tree(self, shareid, needed_hashes):
|
||||||
if shareid not in self.landlords:
|
if shareid not in self.landlords:
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
sh = self.landlords[shareid]
|
sh = self.landlords[shareid]
|
||||||
return sh.callRemote("put_share_hashes", needed_hashes)
|
d = sh.callRemote("put_share_hashes", needed_hashes)
|
||||||
|
d.addErrback(self._remove_shareholder, shareid, "put_share_hashes")
|
||||||
|
return d
|
||||||
|
|
||||||
def send_thingA_to_all_shareholders(self):
|
def send_thingA_to_all_shareholders(self):
|
||||||
log.msg("%s: sending thingA" % self)
|
log.msg("%s: sending thingA" % self)
|
||||||
thingA = bencode.bencode(self.thingA_data)
|
thingA = bencode.bencode(self.thingA_data)
|
||||||
self.thingA_hash = thingA_hash(thingA)
|
self.thingA_hash = thingA_hash(thingA)
|
||||||
dl = []
|
dl = []
|
||||||
for sh in self.landlords.values():
|
for shareid in self.landlords.keys():
|
||||||
dl.append(self.send_thingA(sh, thingA))
|
dl.append(self.send_thingA(shareid, thingA))
|
||||||
return defer.DeferredList(dl)
|
return defer.DeferredList(dl, fireOnOneErrback=True,
|
||||||
|
consumeErrors=True)
|
||||||
|
|
||||||
def send_thingA(self, sh, thingA):
|
def send_thingA(self, shareid, thingA):
|
||||||
return sh.callRemote("put_thingA", thingA)
|
sh = self.landlords[shareid]
|
||||||
|
d = sh.callRemote("put_thingA", thingA)
|
||||||
|
d.addErrback(self._remove_shareholder, shareid, "put_thingA")
|
||||||
|
return d
|
||||||
|
|
||||||
def close_all_shareholders(self):
|
def close_all_shareholders(self):
|
||||||
log.msg("%s: closing shareholders" % self)
|
log.msg("%s: closing shareholders" % self)
|
||||||
dl = []
|
dl = []
|
||||||
for shareid in self.landlords:
|
for shareid in self.landlords:
|
||||||
dl.append(self.landlords[shareid].callRemote("close"))
|
d = self.landlords[shareid].callRemote("close")
|
||||||
return defer.DeferredList(dl)
|
d.addErrback(self._remove_shareholder, shareid, "close")
|
||||||
|
dl.append(d)
|
||||||
|
return defer.DeferredList(dl, fireOnOneErrback=True,
|
||||||
|
consumeErrors=True)
|
||||||
|
|
||||||
def done(self):
|
def done(self):
|
||||||
log.msg("%s: upload done" % self)
|
log.msg("%s: upload done" % self)
|
||||||
return self.thingA_hash
|
return self.thingA_hash
|
||||||
|
|
||||||
def err(self, f):
|
def err(self, f):
|
||||||
log.msg("%s: upload failed: %s" % (self, f))
|
log.msg("%s: upload failed: %s" % (self, f)) # UNUSUAL
|
||||||
|
if f.check(defer.FirstError):
|
||||||
|
return f.value.subFailure
|
||||||
return f
|
return f
|
||||||
|
@ -41,6 +41,9 @@ class FakeStorageServer:
|
|||||||
else:
|
else:
|
||||||
return (set(), dict([(shnum, FakeBucketWriter(),) for shnum in sharenums]),)
|
return (set(), dict([(shnum, FakeBucketWriter(),) for shnum in sharenums]),)
|
||||||
|
|
||||||
|
class LostPeerError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
class FakeBucketWriter:
|
class FakeBucketWriter:
|
||||||
# these are used for both reading and writing
|
# these are used for both reading and writing
|
||||||
def __init__(self, mode="good"):
|
def __init__(self, mode="good"):
|
||||||
@ -59,8 +62,10 @@ class FakeBucketWriter:
|
|||||||
def put_block(self, segmentnum, data):
|
def put_block(self, segmentnum, data):
|
||||||
assert not self.closed
|
assert not self.closed
|
||||||
assert segmentnum not in self.blocks
|
assert segmentnum not in self.blocks
|
||||||
|
if self.mode == "lost" and segmentnum >= 1:
|
||||||
|
raise LostPeerError("I'm going away now")
|
||||||
self.blocks[segmentnum] = data
|
self.blocks[segmentnum] = data
|
||||||
|
|
||||||
def put_block_hashes(self, blockhashes):
|
def put_block_hashes(self, blockhashes):
|
||||||
assert not self.closed
|
assert not self.closed
|
||||||
assert self.block_hashes is None
|
assert self.block_hashes is None
|
||||||
@ -215,18 +220,19 @@ class Encode(unittest.TestCase):
|
|||||||
return self.do_encode(25, 101, 100, 5, 15, 8)
|
return self.do_encode(25, 101, 100, 5, 15, 8)
|
||||||
|
|
||||||
class Roundtrip(unittest.TestCase):
|
class Roundtrip(unittest.TestCase):
|
||||||
def send_and_recover(self, k_and_n=(25,100),
|
def send_and_recover(self, k_and_happy_and_n=(25,75,100),
|
||||||
AVAILABLE_SHARES=None,
|
AVAILABLE_SHARES=None,
|
||||||
datalen=76,
|
datalen=76,
|
||||||
max_segment_size=25,
|
max_segment_size=25,
|
||||||
bucket_modes={}):
|
bucket_modes={},
|
||||||
NUM_SHARES = k_and_n[1]
|
):
|
||||||
|
NUM_SHARES = k_and_happy_and_n[2]
|
||||||
if AVAILABLE_SHARES is None:
|
if AVAILABLE_SHARES is None:
|
||||||
AVAILABLE_SHARES = NUM_SHARES
|
AVAILABLE_SHARES = NUM_SHARES
|
||||||
data = make_data(datalen)
|
data = make_data(datalen)
|
||||||
# force use of multiple segments
|
# force use of multiple segments
|
||||||
options = {"max_segment_size": max_segment_size,
|
options = {"max_segment_size": max_segment_size,
|
||||||
"needed_and_total_shares": k_and_n}
|
"needed_and_happy_and_total_shares": k_and_happy_and_n}
|
||||||
e = encode.Encoder(options)
|
e = encode.Encoder(options)
|
||||||
nonkey = "\x00" * 16
|
nonkey = "\x00" * 16
|
||||||
e.setup(StringIO(data), nonkey)
|
e.setup(StringIO(data), nonkey)
|
||||||
@ -275,7 +281,8 @@ class Roundtrip(unittest.TestCase):
|
|||||||
fd._got_thingA(thingA_data)
|
fd._got_thingA(thingA_data)
|
||||||
for shnum in range(AVAILABLE_SHARES):
|
for shnum in range(AVAILABLE_SHARES):
|
||||||
bucket = all_shareholders[shnum]
|
bucket = all_shareholders[shnum]
|
||||||
fd.add_share_bucket(shnum, bucket)
|
if bucket.closed:
|
||||||
|
fd.add_share_bucket(shnum, bucket)
|
||||||
fd._got_all_shareholders(None)
|
fd._got_all_shareholders(None)
|
||||||
fd._create_validated_buckets(None)
|
fd._create_validated_buckets(None)
|
||||||
d2 = fd._download_all_segments(None)
|
d2 = fd._download_all_segments(None)
|
||||||
@ -289,7 +296,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
return d
|
return d
|
||||||
|
|
||||||
def test_not_enough_shares(self):
|
def test_not_enough_shares(self):
|
||||||
d = self.send_and_recover((4,10), AVAILABLE_SHARES=2)
|
d = self.send_and_recover((4,8,10), AVAILABLE_SHARES=2)
|
||||||
def _done(res):
|
def _done(res):
|
||||||
self.failUnless(isinstance(res, Failure))
|
self.failUnless(isinstance(res, Failure))
|
||||||
self.failUnless(res.check(download.NotEnoughPeersError))
|
self.failUnless(res.check(download.NotEnoughPeersError))
|
||||||
@ -329,7 +336,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
for i in range(6)]
|
for i in range(6)]
|
||||||
+ [(i, "good")
|
+ [(i, "good")
|
||||||
for i in range(6, 10)])
|
for i in range(6, 10)])
|
||||||
return self.send_and_recover((4,10), bucket_modes=modemap)
|
return self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
|
|
||||||
def test_bad_blocks_failure(self):
|
def test_bad_blocks_failure(self):
|
||||||
# the first 7 servers have bad blocks, which will be caught by the
|
# the first 7 servers have bad blocks, which will be caught by the
|
||||||
@ -338,7 +345,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
for i in range(7)]
|
for i in range(7)]
|
||||||
+ [(i, "good")
|
+ [(i, "good")
|
||||||
for i in range(7, 10)])
|
for i in range(7, 10)])
|
||||||
d = self.send_and_recover((4,10), bucket_modes=modemap)
|
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
def _done(res):
|
def _done(res):
|
||||||
self.failUnless(isinstance(res, Failure))
|
self.failUnless(isinstance(res, Failure))
|
||||||
self.failUnless(res.check(download.NotEnoughPeersError))
|
self.failUnless(res.check(download.NotEnoughPeersError))
|
||||||
@ -352,7 +359,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
for i in range(6)]
|
for i in range(6)]
|
||||||
+ [(i, "good")
|
+ [(i, "good")
|
||||||
for i in range(6, 10)])
|
for i in range(6, 10)])
|
||||||
return self.send_and_recover((4,10), bucket_modes=modemap)
|
return self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
|
|
||||||
def test_bad_blockhashes_failure(self):
|
def test_bad_blockhashes_failure(self):
|
||||||
# the first 7 servers have bad block hashes, so the blockhash tree
|
# the first 7 servers have bad block hashes, so the blockhash tree
|
||||||
@ -361,7 +368,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
for i in range(7)]
|
for i in range(7)]
|
||||||
+ [(i, "good")
|
+ [(i, "good")
|
||||||
for i in range(7, 10)])
|
for i in range(7, 10)])
|
||||||
d = self.send_and_recover((4,10), bucket_modes=modemap)
|
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
def _done(res):
|
def _done(res):
|
||||||
self.failUnless(isinstance(res, Failure))
|
self.failUnless(isinstance(res, Failure))
|
||||||
self.failUnless(res.check(download.NotEnoughPeersError))
|
self.failUnless(res.check(download.NotEnoughPeersError))
|
||||||
@ -375,7 +382,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
for i in range(6)]
|
for i in range(6)]
|
||||||
+ [(i, "good")
|
+ [(i, "good")
|
||||||
for i in range(6, 10)])
|
for i in range(6, 10)])
|
||||||
return self.send_and_recover((4,10), bucket_modes=modemap)
|
return self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
|
|
||||||
def test_bad_sharehashes_failure(self):
|
def test_bad_sharehashes_failure(self):
|
||||||
# the first 7 servers have bad block hashes, so the sharehash tree
|
# the first 7 servers have bad block hashes, so the sharehash tree
|
||||||
@ -384,7 +391,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
for i in range(7)]
|
for i in range(7)]
|
||||||
+ [(i, "good")
|
+ [(i, "good")
|
||||||
for i in range(7, 10)])
|
for i in range(7, 10)])
|
||||||
d = self.send_and_recover((4,10), bucket_modes=modemap)
|
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
def _done(res):
|
def _done(res):
|
||||||
self.failUnless(isinstance(res, Failure))
|
self.failUnless(isinstance(res, Failure))
|
||||||
self.failUnless(res.check(download.NotEnoughPeersError))
|
self.failUnless(res.check(download.NotEnoughPeersError))
|
||||||
@ -398,7 +405,7 @@ class Roundtrip(unittest.TestCase):
|
|||||||
for i in range(6)]
|
for i in range(6)]
|
||||||
+ [(i, "good")
|
+ [(i, "good")
|
||||||
for i in range(6, 10)])
|
for i in range(6, 10)])
|
||||||
return self.send_and_recover((4,10), bucket_modes=modemap)
|
return self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
|
|
||||||
def test_missing_sharehashes_failure(self):
|
def test_missing_sharehashes_failure(self):
|
||||||
# the first 7 servers are missing their sharehashes, so the
|
# the first 7 servers are missing their sharehashes, so the
|
||||||
@ -407,10 +414,41 @@ class Roundtrip(unittest.TestCase):
|
|||||||
for i in range(7)]
|
for i in range(7)]
|
||||||
+ [(i, "good")
|
+ [(i, "good")
|
||||||
for i in range(7, 10)])
|
for i in range(7, 10)])
|
||||||
d = self.send_and_recover((4,10), bucket_modes=modemap)
|
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
def _done(res):
|
def _done(res):
|
||||||
self.failUnless(isinstance(res, Failure))
|
self.failUnless(isinstance(res, Failure))
|
||||||
self.failUnless(res.check(download.NotEnoughPeersError))
|
self.failUnless(res.check(download.NotEnoughPeersError))
|
||||||
d.addBoth(_done)
|
d.addBoth(_done)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def test_lost_one_shareholder(self):
|
||||||
|
# we have enough shareholders when we start, but one segment in we
|
||||||
|
# lose one of them. The upload should still succeed, as long as we
|
||||||
|
# still have 'shares_of_happiness' peers left.
|
||||||
|
modemap = dict([(i, "good") for i in range(9)] +
|
||||||
|
[(i, "lost") for i in range(9, 10)])
|
||||||
|
return self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
|
|
||||||
|
def test_lost_many_shareholders(self):
|
||||||
|
# we have enough shareholders when we start, but one segment in we
|
||||||
|
# lose all but one of them. The upload should fail.
|
||||||
|
modemap = dict([(i, "good") for i in range(1)] +
|
||||||
|
[(i, "lost") for i in range(1, 10)])
|
||||||
|
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
|
def _done(res):
|
||||||
|
self.failUnless(isinstance(res, Failure))
|
||||||
|
self.failUnless(res.check(encode.NotEnoughPeersError))
|
||||||
|
d.addBoth(_done)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def test_lost_all_shareholders(self):
|
||||||
|
# we have enough shareholders when we start, but one segment in we
|
||||||
|
# lose all of them. The upload should fail.
|
||||||
|
modemap = dict([(i, "lost") for i in range(10)])
|
||||||
|
d = self.send_and_recover((4,8,10), bucket_modes=modemap)
|
||||||
|
def _done(res):
|
||||||
|
self.failUnless(isinstance(res, Failure))
|
||||||
|
self.failUnless(res.check(encode.NotEnoughPeersError))
|
||||||
|
d.addBoth(_done)
|
||||||
|
return d
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user