mutable.py: checkpointing #303 work: retrieve does what I want, now starting in on publish

This commit is contained in:
Brian Warner 2008-04-04 17:09:26 -07:00
parent 68527b25fc
commit 2c939bfdd3
2 changed files with 1331 additions and 563 deletions

File diff suppressed because it is too large Load Diff

View File

@ -5,13 +5,14 @@ from twisted.trial import unittest
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.python import failure from twisted.python import failure
from allmydata import mutable, uri, dirnode, download from allmydata import mutable, uri, dirnode, download
from allmydata.util import base32
from allmydata.util.idlib import shortnodeid_b2a from allmydata.util.idlib import shortnodeid_b2a
from allmydata.util.hashutil import tagged_hash from allmydata.util.hashutil import tagged_hash
from allmydata.encode import NotEnoughPeersError from allmydata.encode import NotEnoughPeersError
from allmydata.interfaces import IURI, INewDirectoryURI, \ from allmydata.interfaces import IURI, INewDirectoryURI, \
IMutableFileURI, IUploadable, IFileURI IMutableFileURI, IUploadable, IFileURI
from allmydata.filenode import LiteralFileNode from allmydata.filenode import LiteralFileNode
from foolscap.eventual import eventually from foolscap.eventual import eventually, fireEventually
from foolscap.logging import log from foolscap.logging import log
import sha import sha
@ -110,7 +111,9 @@ class FakePublish(mutable.Publish):
def _do_read(self, ss, peerid, storage_index, shnums, readv): def _do_read(self, ss, peerid, storage_index, shnums, readv):
assert ss[0] == peerid assert ss[0] == peerid
assert shnums == [] assert shnums == []
return defer.maybeDeferred(self._storage.read, peerid, storage_index) d = fireEventually()
d.addCallback(lambda res: self._storage.read(peerid, storage_index))
return d
def _do_testreadwrite(self, peerid, secrets, def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector): tw_vectors, read_vector):
@ -182,7 +185,6 @@ class FakeClient:
return res return res
def get_permuted_peers(self, service_name, key): def get_permuted_peers(self, service_name, key):
# TODO: include_myself=True
""" """
@return: list of (peerid, connection,) @return: list of (peerid, connection,)
""" """
@ -303,6 +305,7 @@ class Publish(unittest.TestCase):
CONTENTS = "some initial contents" CONTENTS = "some initial contents"
fn.create(CONTENTS) fn.create(CONTENTS)
p = mutable.Publish(fn) p = mutable.Publish(fn)
r = mutable.Retrieve(fn)
# make some fake shares # make some fake shares
shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) ) shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
target_info = None target_info = None
@ -467,7 +470,27 @@ class Publish(unittest.TestCase):
class FakeRetrieve(mutable.Retrieve): class FakeRetrieve(mutable.Retrieve):
def _do_read(self, ss, peerid, storage_index, shnums, readv): def _do_read(self, ss, peerid, storage_index, shnums, readv):
d = defer.maybeDeferred(self._storage.read, peerid, storage_index) d = fireEventually()
d.addCallback(lambda res: self._storage.read(peerid, storage_index))
def _read(shares):
response = {}
for shnum in shares:
if shnums and shnum not in shnums:
continue
vector = response[shnum] = []
for (offset, length) in readv:
assert isinstance(offset, (int, long)), offset
assert isinstance(length, (int, long)), length
vector.append(shares[shnum][offset:offset+length])
return response
d.addCallback(_read)
return d
class FakeServermapUpdater(mutable.ServermapUpdater):
def _do_read(self, ss, peerid, storage_index, shnums, readv):
d = fireEventually()
d.addCallback(lambda res: self._storage.read(peerid, storage_index))
def _read(shares): def _read(shares):
response = {} response = {}
for shnum in shares: for shnum in shares:
@ -487,31 +510,217 @@ class FakeRetrieve(mutable.Retrieve):
count = mo.group(1) count = mo.group(1)
return FakePubKey(int(count)) return FakePubKey(int(count))
class Sharemap(unittest.TestCase):
def setUp(self):
# publish a file and create shares, which can then be manipulated
# later.
num_peers = 20
self._client = FakeClient(num_peers)
self._fn = FakeFilenode(self._client)
self._storage = FakeStorage()
d = self._fn.create("")
def _created(res):
p = FakePublish(self._fn)
p._storage = self._storage
contents = "New contents go here"
return p.publish(contents)
d.addCallback(_created)
return d
def make_servermap(self, storage, mode=mutable.MODE_CHECK):
smu = FakeServermapUpdater(self._fn, mutable.ServerMap(), mode)
smu._storage = storage
d = smu.update()
return d
def update_servermap(self, storage, oldmap, mode=mutable.MODE_CHECK):
smu = FakeServermapUpdater(self._fn, oldmap, mode)
smu._storage = storage
d = smu.update()
return d
def failUnlessOneRecoverable(self, sm, num_shares):
self.failUnlessEqual(len(sm.recoverable_versions()), 1)
self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
best = sm.best_recoverable_version()
self.failIfEqual(best, None)
self.failUnlessEqual(sm.recoverable_versions(), set([best]))
self.failUnlessEqual(len(sm.shares_available()), 1)
self.failUnlessEqual(sm.shares_available()[best], (num_shares, 3))
return sm
def test_basic(self):
s = self._storage # unmangled
d = defer.succeed(None)
ms = self.make_servermap
us = self.update_servermap
d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH))
# this more stops at k+epsilon, and epsilon=k, so 6 shares
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING))
# this mode stops at 'k' shares
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
# and can we re-use the same servermap? Note that these are sorted in
# increasing order of number of servers queried, since once a server
# gets into the servermap, we'll always ask it for an update.
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 3))
d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_ENOUGH))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 6))
d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_WRITE))
d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
d.addCallback(lambda sm: us(s, sm, mode=mutable.MODE_ANYTHING))
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
return d
def failUnlessNoneRecoverable(self, sm):
self.failUnlessEqual(len(sm.recoverable_versions()), 0)
self.failUnlessEqual(len(sm.unrecoverable_versions()), 0)
best = sm.best_recoverable_version()
self.failUnlessEqual(best, None)
self.failUnlessEqual(len(sm.shares_available()), 0)
def test_no_shares(self):
s = self._storage
s._peers = {} # delete all shares
ms = self.make_servermap
d = defer.succeed(None)
d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH))
d.addCallback(lambda sm: self.failUnlessNoneRecoverable(sm))
return d
def failUnlessNotQuiteEnough(self, sm):
self.failUnlessEqual(len(sm.recoverable_versions()), 0)
self.failUnlessEqual(len(sm.unrecoverable_versions()), 1)
best = sm.best_recoverable_version()
self.failUnlessEqual(best, None)
self.failUnlessEqual(len(sm.shares_available()), 1)
self.failUnlessEqual(sm.shares_available().values()[0], (2,3) )
def test_not_quite_enough_shares(self):
s = self._storage
ms = self.make_servermap
num_shares = len(s._peers)
for peerid in s._peers:
s._peers[peerid] = {}
num_shares -= 1
if num_shares == 2:
break
# now there ought to be only two shares left
assert len([peerid for peerid in s._peers if s._peers[peerid]]) == 2
d = defer.succeed(None)
d.addCallback(lambda res: ms(s, mode=mutable.MODE_CHECK))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
d.addCallback(lambda res: ms(s, mode=mutable.MODE_ANYTHING))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
d.addCallback(lambda res: ms(s, mode=mutable.MODE_WRITE))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
d.addCallback(lambda res: ms(s, mode=mutable.MODE_ENOUGH))
d.addCallback(lambda sm: self.failUnlessNotQuiteEnough(sm))
return d
class Roundtrip(unittest.TestCase): class Roundtrip(unittest.TestCase):
def setup_for_publish(self, num_peers): def setUp(self):
c = FakeClient(num_peers) # publish a file and create shares, which can then be manipulated
fn = FakeFilenode(c) # later.
s = FakeStorage() self.CONTENTS = "New contents go here"
# .create usually returns a Deferred, but we happen to know it's num_peers = 20
# synchronous self._client = FakeClient(num_peers)
fn.create("") self._fn = FakeFilenode(self._client)
p = FakePublish(fn) self._storage = FakeStorage()
p._storage = s d = self._fn.create("")
r = FakeRetrieve(fn) def _created(res):
r._storage = s p = FakePublish(self._fn)
return c, s, fn, p, r p._storage = self._storage
return p.publish(self.CONTENTS)
d.addCallback(_created)
return d
def make_servermap(self, mode=mutable.MODE_ENOUGH, oldmap=None):
if oldmap is None:
oldmap = mutable.ServerMap()
smu = FakeServermapUpdater(self._fn, oldmap, mode)
smu._storage = self._storage
d = smu.update()
return d
def abbrev_verinfo(self, verinfo):
if verinfo is None:
return None
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = verinfo
return "%d-%s" % (seqnum, base32.b2a(root_hash)[:4])
def abbrev_verinfo_dict(self, verinfo_d):
output = {}
for verinfo,value in verinfo_d.items():
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = verinfo
output["%d-%s" % (seqnum, base32.b2a(root_hash)[:4])] = value
return output
def dump_servermap(self, servermap):
print "SERVERMAP", servermap
print "RECOVERABLE", [self.abbrev_verinfo(v)
for v in servermap.recoverable_versions()]
print "BEST", self.abbrev_verinfo(servermap.best_recoverable_version())
print "available", self.abbrev_verinfo_dict(servermap.shares_available())
def do_download(self, servermap, version=None):
if version is None:
version = servermap.best_recoverable_version()
r = FakeRetrieve(self._fn, servermap, version)
r._storage = self._storage
return r.download()
def test_basic(self): def test_basic(self):
c, s, fn, p, r = self.setup_for_publish(20) d = self.make_servermap()
contents = "New contents go here" def _do_retrieve(servermap):
d = p.publish(contents) self._smap = servermap
def _published(res): #self.dump_servermap(servermap)
return r.retrieve() self.failUnlessEqual(len(servermap.recoverable_versions()), 1)
d.addCallback(_published) return self.do_download(servermap)
d.addCallback(_do_retrieve)
def _retrieved(new_contents): def _retrieved(new_contents):
self.failUnlessEqual(contents, new_contents) self.failUnlessEqual(new_contents, self.CONTENTS)
d.addCallback(_retrieved)
# we should be able to re-use the same servermap, both with and
# without updating it.
d.addCallback(lambda res: self.do_download(self._smap))
d.addCallback(_retrieved)
d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
d.addCallback(lambda res: self.do_download(self._smap))
d.addCallback(_retrieved)
# clobbering the pubkey should make the servermap updater re-fetch it
def _clobber_pubkey(res):
self._fn._pubkey = None
d.addCallback(_clobber_pubkey)
d.addCallback(lambda res: self.make_servermap(oldmap=self._smap))
d.addCallback(lambda res: self.do_download(self._smap))
d.addCallback(_retrieved) d.addCallback(_retrieved)
return d return d
@ -538,144 +747,139 @@ class Roundtrip(unittest.TestCase):
d.addBoth(done) d.addBoth(done)
return d return d
def _corrupt_all(self, offset, substring, refetch_pubkey=False, def _corrupt(self, res, s, offset, shnums_to_corrupt=None):
should_succeed=False): # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
c, s, fn, p, r = self.setup_for_publish(20) # list of shnums to corrupt.
contents = "New contents go here" for peerid in s._peers:
d = p.publish(contents) shares = s._peers[peerid]
def _published(res): for shnum in shares:
if refetch_pubkey: if (shnums_to_corrupt is not None
# clear the pubkey, to force a fetch and shnum not in shnums_to_corrupt):
r._pubkey = None continue
for peerid in s._peers: data = shares[shnum]
shares = s._peers[peerid] (version,
for shnum in shares: seqnum,
data = shares[shnum] root_hash,
(version, IV,
seqnum, k, N, segsize, datalen,
root_hash, o) = mutable.unpack_header(data)
IV, if isinstance(offset, tuple):
k, N, segsize, datalen, offset1, offset2 = offset
o) = mutable.unpack_header(data) else:
if isinstance(offset, tuple): offset1 = offset
offset1, offset2 = offset offset2 = 0
else: if offset1 == "pubkey":
offset1 = offset real_offset = 107
offset2 = 0 elif offset1 in o:
if offset1 == "pubkey": real_offset = o[offset1]
real_offset = 107 else:
elif offset1 in o: real_offset = offset1
real_offset = o[offset1] real_offset = int(real_offset) + offset2
else: assert isinstance(real_offset, int), offset
real_offset = offset1 shares[shnum] = self.flip_bit(data, real_offset)
real_offset = int(real_offset) + offset2 return res
assert isinstance(real_offset, int), offset
shares[shnum] = self.flip_bit(data, real_offset) def _test_corrupt_all(self, offset, substring,
d.addCallback(_published) should_succeed=False, corrupt_early=True):
if should_succeed: d = defer.succeed(None)
d.addCallback(lambda res: r.retrieve()) if corrupt_early:
else: d.addCallback(self._corrupt, self._storage, offset)
d.addCallback(lambda res: d.addCallback(lambda res: self.make_servermap())
self.shouldFail(NotEnoughPeersError, if not corrupt_early:
"_corrupt_all(offset=%s)" % (offset,), d.addCallback(self._corrupt, self._storage, offset)
substring, def _do_retrieve(servermap):
r.retrieve)) ver = servermap.best_recoverable_version()
if ver is None and not should_succeed:
# no recoverable versions == not succeeding. The problem
# should be noted in the servermap's list of problems.
if substring:
allproblems = [str(f) for f in servermap.problems]
self.failUnless(substring in "".join(allproblems))
return
r = FakeRetrieve(self._fn, servermap, ver)
r._storage = self._storage
if should_succeed:
d1 = r.download()
d1.addCallback(lambda new_contents:
self.failUnlessEqual(new_contents, self.CONTENTS))
return d1
else:
return self.shouldFail(NotEnoughPeersError,
"_corrupt_all(offset=%s)" % (offset,),
substring,
r.download)
d.addCallback(_do_retrieve)
return d return d
def test_corrupt_all_verbyte(self): def test_corrupt_all_verbyte(self):
# when the version byte is not 0, we hit an assertion error in # when the version byte is not 0, we hit an assertion error in
# unpack_share(). # unpack_share().
return self._corrupt_all(0, "AssertionError") return self._test_corrupt_all(0, "AssertionError")
def test_corrupt_all_seqnum(self): def test_corrupt_all_seqnum(self):
# a corrupt sequence number will trigger a bad signature # a corrupt sequence number will trigger a bad signature
return self._corrupt_all(1, "signature is invalid") return self._test_corrupt_all(1, "signature is invalid")
def test_corrupt_all_R(self): def test_corrupt_all_R(self):
# a corrupt root hash will trigger a bad signature # a corrupt root hash will trigger a bad signature
return self._corrupt_all(9, "signature is invalid") return self._test_corrupt_all(9, "signature is invalid")
def test_corrupt_all_IV(self): def test_corrupt_all_IV(self):
# a corrupt salt/IV will trigger a bad signature # a corrupt salt/IV will trigger a bad signature
return self._corrupt_all(41, "signature is invalid") return self._test_corrupt_all(41, "signature is invalid")
def test_corrupt_all_k(self): def test_corrupt_all_k(self):
# a corrupt 'k' will trigger a bad signature # a corrupt 'k' will trigger a bad signature
return self._corrupt_all(57, "signature is invalid") return self._test_corrupt_all(57, "signature is invalid")
def test_corrupt_all_N(self): def test_corrupt_all_N(self):
# a corrupt 'N' will trigger a bad signature # a corrupt 'N' will trigger a bad signature
return self._corrupt_all(58, "signature is invalid") return self._test_corrupt_all(58, "signature is invalid")
def test_corrupt_all_segsize(self): def test_corrupt_all_segsize(self):
# a corrupt segsize will trigger a bad signature # a corrupt segsize will trigger a bad signature
return self._corrupt_all(59, "signature is invalid") return self._test_corrupt_all(59, "signature is invalid")
def test_corrupt_all_datalen(self): def test_corrupt_all_datalen(self):
# a corrupt data length will trigger a bad signature # a corrupt data length will trigger a bad signature
return self._corrupt_all(67, "signature is invalid") return self._test_corrupt_all(67, "signature is invalid")
def test_corrupt_all_pubkey(self): def test_corrupt_all_pubkey(self):
# a corrupt pubkey won't match the URI's fingerprint # a corrupt pubkey won't match the URI's fingerprint. We need to
return self._corrupt_all("pubkey", "pubkey doesn't match fingerprint", # remove the pubkey from the filenode, or else it won't bother trying
refetch_pubkey=True) # to update it.
self._fn._pubkey = None
return self._test_corrupt_all("pubkey",
"pubkey doesn't match fingerprint")
def test_corrupt_all_sig(self): def test_corrupt_all_sig(self):
# a corrupt signature is a bad one # a corrupt signature is a bad one
# the signature runs from about [543:799], depending upon the length # the signature runs from about [543:799], depending upon the length
# of the pubkey # of the pubkey
return self._corrupt_all("signature", "signature is invalid", return self._test_corrupt_all("signature", "signature is invalid")
refetch_pubkey=True)
def test_corrupt_all_share_hash_chain_number(self): def test_corrupt_all_share_hash_chain_number(self):
# a corrupt share hash chain entry will show up as a bad hash. If we # a corrupt share hash chain entry will show up as a bad hash. If we
# mangle the first byte, that will look like a bad hash number, # mangle the first byte, that will look like a bad hash number,
# causing an IndexError # causing an IndexError
return self._corrupt_all("share_hash_chain", "corrupt hashes") return self._test_corrupt_all("share_hash_chain", "corrupt hashes")
def test_corrupt_all_share_hash_chain_hash(self): def test_corrupt_all_share_hash_chain_hash(self):
# a corrupt share hash chain entry will show up as a bad hash. If we # a corrupt share hash chain entry will show up as a bad hash. If we
# mangle a few bytes in, that will look like a bad hash. # mangle a few bytes in, that will look like a bad hash.
return self._corrupt_all(("share_hash_chain",4), "corrupt hashes") return self._test_corrupt_all(("share_hash_chain",4), "corrupt hashes")
def test_corrupt_all_block_hash_tree(self): def test_corrupt_all_block_hash_tree(self):
return self._corrupt_all("block_hash_tree", "block hash tree failure") return self._test_corrupt_all("block_hash_tree",
"block hash tree failure")
def test_corrupt_all_block(self): def test_corrupt_all_block(self):
return self._corrupt_all("share_data", "block hash tree failure") return self._test_corrupt_all("share_data", "block hash tree failure")
def test_corrupt_all_encprivkey(self): def test_corrupt_all_encprivkey(self):
# a corrupted privkey won't even be noticed by the reader # a corrupted privkey won't even be noticed by the reader, only by a
return self._corrupt_all("enc_privkey", None, should_succeed=True) # writer.
return self._test_corrupt_all("enc_privkey", None, should_succeed=True)
def test_short_read(self):
c, s, fn, p, r = self.setup_for_publish(20)
contents = "New contents go here"
d = p.publish(contents)
def _published(res):
# force a short read, to make Retrieve._got_results re-send the
# queries. But don't make it so short that we can't read the
# header.
r._read_size = mutable.HEADER_LENGTH + 10
return r.retrieve()
d.addCallback(_published)
def _retrieved(new_contents):
self.failUnlessEqual(contents, new_contents)
d.addCallback(_retrieved)
return d
def test_basic_sequenced(self):
c, s, fn, p, r = self.setup_for_publish(20)
s._sequence = c._peerids[:]
contents = "New contents go here"
d = p.publish(contents)
def _published(res):
return r.retrieve()
d.addCallback(_published)
def _retrieved(new_contents):
self.failUnlessEqual(contents, new_contents)
d.addCallback(_retrieved)
return d
def test_basic_pubkey_at_end(self): def test_basic_pubkey_at_end(self):
# we corrupt the pubkey in all but the last 'k' shares, allowing the # we corrupt the pubkey in all but the last 'k' shares, allowing the
@ -683,33 +887,25 @@ class Roundtrip(unittest.TestCase):
# this is rather pessimistic: our Retrieve process will throw away # this is rather pessimistic: our Retrieve process will throw away
# the whole share if the pubkey is bad, even though the rest of the # the whole share if the pubkey is bad, even though the rest of the
# share might be good. # share might be good.
c, s, fn, p, r = self.setup_for_publish(20)
s._sequence = c._peerids[:] self._fn._pubkey = None
contents = "New contents go here" k = self._fn.get_required_shares()
d = p.publish(contents) N = self._fn.get_total_shares()
def _published(res): d = defer.succeed(None)
r._pubkey = None d.addCallback(self._corrupt, self._storage, "pubkey",
homes = [peerid for peerid in c._peerids shnums_to_corrupt=range(0, N-k))
if s._peers.get(peerid, {})] d.addCallback(lambda res: self.make_servermap())
k = fn.get_required_shares() def _do_retrieve(servermap):
homes_to_corrupt = homes[:-k] self.failUnless(servermap.problems)
for peerid in homes_to_corrupt: self.failUnless("pubkey doesn't match fingerprint"
shares = s._peers[peerid] in str(servermap.problems[0]))
for shnum in shares: ver = servermap.best_recoverable_version()
data = shares[shnum] r = FakeRetrieve(self._fn, servermap, ver)
(version, r._storage = self._storage
seqnum, return r.download()
root_hash, d.addCallback(_do_retrieve)
IV, d.addCallback(lambda new_contents:
k, N, segsize, datalen, self.failUnlessEqual(new_contents, self.CONTENTS))
o) = mutable.unpack_header(data)
offset = 107 # pubkey
shares[shnum] = self.flip_bit(data, offset)
return r.retrieve()
d.addCallback(_published)
def _retrieved(new_contents):
self.failUnlessEqual(contents, new_contents)
d.addCallback(_retrieved)
return d return d
def _encode(self, c, s, fn, k, n, data): def _encode(self, c, s, fn, k, n, data):
@ -741,6 +937,32 @@ class Roundtrip(unittest.TestCase):
d.addCallback(_published) d.addCallback(_published)
return d return d
class MultipleEncodings(unittest.TestCase):
def publish(self):
# publish a file and create shares, which can then be manipulated
# later.
self.CONTENTS = "New contents go here"
num_peers = 20
self._client = FakeClient(num_peers)
self._fn = FakeFilenode(self._client)
self._storage = FakeStorage()
d = self._fn.create("")
def _created(res):
p = FakePublish(self._fn)
p._storage = self._storage
return p.publish(self.CONTENTS)
d.addCallback(_created)
return d
def make_servermap(self, mode=mutable.MODE_ENOUGH, oldmap=None):
if oldmap is None:
oldmap = mutable.ServerMap()
smu = FakeServermapUpdater(self._fn, oldmap, mode)
smu._storage = self._storage
d = smu.update()
return d
def test_multiple_encodings(self): def test_multiple_encodings(self):
# we encode the same file in two different ways (3-of-10 and 4-of-9), # we encode the same file in two different ways (3-of-10 and 4-of-9),
# then mix up the shares, to make sure that download survives seeing # then mix up the shares, to make sure that download survives seeing
@ -842,3 +1064,87 @@ class Roundtrip(unittest.TestCase):
d.addCallback(_retrieved) d.addCallback(_retrieved)
return d return d
class Utils(unittest.TestCase):
def test_dict_of_sets(self):
ds = mutable.DictOfSets()
ds.add(1, "a")
ds.add(2, "b")
ds.add(2, "b")
ds.add(2, "c")
self.failUnlessEqual(ds[1], set(["a"]))
self.failUnlessEqual(ds[2], set(["b", "c"]))
ds.discard(3, "d") # should not raise an exception
ds.discard(2, "b")
self.failUnlessEqual(ds[2], set(["c"]))
ds.discard(2, "c")
self.failIf(2 in ds)
def _do_inside(self, c, x_start, x_length, y_start, y_length):
# we compare this against sets of integers
x = set(range(x_start, x_start+x_length))
y = set(range(y_start, y_start+y_length))
should_be_inside = x.issubset(y)
self.failUnlessEqual(should_be_inside, c._inside(x_start, x_length,
y_start, y_length),
str((x_start, x_length, y_start, y_length)))
def test_cache_inside(self):
c = mutable.ResponseCache()
x_start = 10
x_length = 5
for y_start in range(8, 17):
for y_length in range(8):
self._do_inside(c, x_start, x_length, y_start, y_length)
def _do_overlap(self, c, x_start, x_length, y_start, y_length):
# we compare this against sets of integers
x = set(range(x_start, x_start+x_length))
y = set(range(y_start, y_start+y_length))
overlap = bool(x.intersection(y))
self.failUnlessEqual(overlap, c._does_overlap(x_start, x_length,
y_start, y_length),
str((x_start, x_length, y_start, y_length)))
def test_cache_overlap(self):
c = mutable.ResponseCache()
x_start = 10
x_length = 5
for y_start in range(8, 17):
for y_length in range(8):
self._do_overlap(c, x_start, x_length, y_start, y_length)
def test_cache(self):
c = mutable.ResponseCache()
# xdata = base62.b2a(os.urandom(100))[:100]
xdata = "1Ex4mdMaDyOl9YnGBM3I4xaBF97j8OQAg1K3RBR01F2PwTP4HohB3XpACuku8Xj4aTQjqJIR1f36mEj3BCNjXaJmPBEZnnHL0U9l"
ydata = "4DCUQXvkEPnnr9Lufikq5t21JsnzZKhzxKBhLhrBB6iIcBOWRuT4UweDhjuKJUre8A4wOObJnl3Kiqmlj4vjSLSqUGAkUD87Y3vs"
nope = (None, None)
c.add("v1", 1, 0, xdata, "time0")
c.add("v1", 1, 2000, ydata, "time1")
self.failUnlessEqual(c.read("v2", 1, 10, 11), nope)
self.failUnlessEqual(c.read("v1", 2, 10, 11), nope)
self.failUnlessEqual(c.read("v1", 1, 0, 10), (xdata[:10], "time0"))
self.failUnlessEqual(c.read("v1", 1, 90, 10), (xdata[90:], "time0"))
self.failUnlessEqual(c.read("v1", 1, 300, 10), nope)
self.failUnlessEqual(c.read("v1", 1, 2050, 5), (ydata[50:55], "time1"))
self.failUnlessEqual(c.read("v1", 1, 0, 101), nope)
self.failUnlessEqual(c.read("v1", 1, 99, 1), (xdata[99:100], "time0"))
self.failUnlessEqual(c.read("v1", 1, 100, 1), nope)
self.failUnlessEqual(c.read("v1", 1, 1990, 9), nope)
self.failUnlessEqual(c.read("v1", 1, 1990, 10), nope)
self.failUnlessEqual(c.read("v1", 1, 1990, 11), nope)
self.failUnlessEqual(c.read("v1", 1, 1990, 15), nope)
self.failUnlessEqual(c.read("v1", 1, 1990, 19), nope)
self.failUnlessEqual(c.read("v1", 1, 1990, 20), nope)
self.failUnlessEqual(c.read("v1", 1, 1990, 21), nope)
self.failUnlessEqual(c.read("v1", 1, 1990, 25), nope)
self.failUnlessEqual(c.read("v1", 1, 1999, 25), nope)
# optional: join fragments
c = mutable.ResponseCache()
c.add("v1", 1, 0, xdata[:10], "time0")
c.add("v1", 1, 10, xdata[10:20], "time1")
#self.failUnlessEqual(c.read("v1", 1, 0, 20), (xdata[:20], "time0"))