tahoe-lafs/src/allmydata/test/test_mutable.py

629 lines
24 KiB
Python
Raw Normal View History

import itertools, struct, re
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python import failure, log
from allmydata import mutable, uri, dirnode, download
from allmydata.util.hashutil import tagged_hash
from allmydata.encode import NotEnoughPeersError
from allmydata.interfaces import IURI, INewDirectoryURI, \
IMutableFileURI, IUploadable, IFileURI
from allmydata.filenode import LiteralFileNode
import sha
#from allmydata.test.common import FakeMutableFileNode
#FakeFilenode = FakeMutableFileNode
class FakeFilenode(mutable.MutableFileNode):
counter = itertools.count(1)
all_contents = {}
all_rw_friends = {}
def create(self, initial_contents):
d = mutable.MutableFileNode.create(self, initial_contents)
def _then(res):
self.all_contents[self.get_uri()] = initial_contents
return res
d.addCallback(_then)
return d
def init_from_uri(self, myuri):
mutable.MutableFileNode.init_from_uri(self, myuri)
return self
def _generate_pubprivkeys(self):
count = self.counter.next()
return FakePubKey(count), FakePrivKey(count)
def _publish(self, initial_contents):
self.all_contents[self.get_uri()] = initial_contents
2007-11-06 05:38:43 +00:00
return defer.succeed(self)
def download_to_data(self):
if self.is_readonly():
assert self.all_rw_friends.has_key(self.get_uri()), (self.get_uri(), id(self.all_rw_friends))
return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]])
else:
return defer.succeed(self.all_contents[self.get_uri()])
def replace(self, newdata):
self.all_contents[self.get_uri()] = newdata
return defer.succeed(None)
class FakeStorage:
# this class replaces the collection of storage servers, allowing the
# tests to examine and manipulate the published shares. It also lets us
# control the order in which read queries are answered, to exercise more
# of the error-handling code in mutable.Retrieve .
#
# Note that we ignore the storage index: this FakeStorage instance can
# only be used for a single storage index.
def __init__(self):
self._peers = {}
def read(self, peerid, storage_index):
shares = self._peers.get(peerid, {})
return shares
def write(self, peerid, storage_index, shnum, offset, data):
if peerid not in self._peers:
self._peers[peerid] = {}
shares = self._peers[peerid]
f = StringIO()
f.write(shares.get(shnum, ""))
f.seek(offset)
f.write(data)
shares[shnum] = f.getvalue()
class FakePublish(mutable.Publish):
def _do_read(self, ss, peerid, storage_index, shnums, readv):
assert ss[0] == peerid
assert shnums == []
return defer.succeed(self._storage.read(peerid, storage_index))
def _do_testreadwrite(self, peerid, secrets,
tw_vectors, read_vector):
storage_index = self._node._uri.storage_index
# always-pass: parrot the test vectors back to them.
readv = {}
for shnum, (testv, writev, new_length) in tw_vectors.items():
for (offset, length, op, specimen) in testv:
assert op in ("le", "eq", "ge")
readv[shnum] = [ specimen
for (offset, length, op, specimen)
in testv ]
for (offset, data) in writev:
self._storage.write(peerid, storage_index, shnum, offset, data)
answer = (True, readv)
return defer.succeed(answer)
2007-12-04 18:45:20 +00:00
class FakeNewDirectoryNode(dirnode.NewDirectoryNode):
filenode_class = FakeFilenode
class FakeClient:
def __init__(self, num_peers=10):
self._num_peers = num_peers
self._peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(self._num_peers)]
self.nodeid = "fakenodeid"
def log(self, msg, **kw):
return log.msg(msg, **kw)
def get_renewal_secret(self):
return "I hereby permit you to renew my files"
def get_cancel_secret(self):
return "I hereby permit you to cancel my leases"
def create_empty_dirnode(self):
n = FakeNewDirectoryNode(self)
d = n.create()
d.addCallback(lambda res: n)
return d
def create_dirnode_from_uri(self, u):
return FakeNewDirectoryNode(self).init_from_uri(u)
def create_mutable_file(self, contents=""):
n = FakeFilenode(self)
d = n.create(contents)
d.addCallback(lambda res: n)
return d
def create_node_from_uri(self, u):
u = IURI(u)
if INewDirectoryURI.providedBy(u):
return self.create_dirnode_from_uri(u)
if IFileURI.providedBy(u):
if isinstance(u, uri.LiteralFileURI):
return LiteralFileNode(u, self)
else:
# CHK
raise RuntimeError("not simulated")
assert IMutableFileURI.providedBy(u), u
res = FakeFilenode(self).init_from_uri(u)
return res
def get_permuted_peers(self, service_name, key):
# TODO: include_myself=True
"""
@return: list of (peerid, connection,)
"""
peers_and_connections = [(pid, (pid,)) for pid in self._peerids]
results = []
for peerid, connection in peers_and_connections:
assert isinstance(peerid, str)
permuted = sha.new(key + peerid).digest()
results.append((permuted, peerid, connection))
results.sort()
results = [ (r[1],r[2]) for r in results]
return results
def upload(self, uploadable):
assert IUploadable.providedBy(uploadable)
d = uploadable.get_size()
d.addCallback(lambda length: uploadable.read(length))
#d.addCallback(self.create_mutable_file)
def _got_data(datav):
data = "".join(datav)
#newnode = FakeFilenode(self)
return uri.LiteralFileURI(data)
d.addCallback(_got_data)
return d
2008-03-10 22:44:05 +00:00
class FakePubKey:
def __init__(self, count):
self.count = count
def serialize(self):
return "PUBKEY-%d" % self.count
def verify(self, msg, signature):
if signature[:5] != "SIGN(":
return False
if signature[5:-1] != msg:
return False
if signature[-1] != ")":
return False
2008-03-10 22:44:05 +00:00
return True
class FakePrivKey:
def __init__(self, count):
self.count = count
def serialize(self):
return "PRIVKEY-%d" % self.count
def sign(self, data):
return "SIGN(%s)" % data
class Filenode(unittest.TestCase):
def setUp(self):
self.client = FakeClient()
def test_create(self):
d = self.client.create_mutable_file()
def _created(n):
d = n.replace("contents 1")
d.addCallback(lambda res: self.failUnlessIdentical(res, None))
d.addCallback(lambda res: n.download_to_data())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
d.addCallback(lambda res: n.replace("contents 2"))
d.addCallback(lambda res: n.download_to_data())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
d.addCallback(lambda res: n.download(download.Data()))
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
return d
d.addCallback(_created)
return d
def test_create_with_initial_contents(self):
d = self.client.create_mutable_file("contents 1")
def _created(n):
d = n.download_to_data()
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
d.addCallback(lambda res: n.replace("contents 2"))
d.addCallback(lambda res: n.download_to_data())
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
return d
d.addCallback(_created)
return d
2007-11-07 01:56:39 +00:00
class Publish(unittest.TestCase):
def test_encrypt(self):
c = FakeClient()
fn = FakeFilenode(c)
# .create usually returns a Deferred, but we happen to know it's
# synchronous
CONTENTS = "some initial contents"
fn.create(CONTENTS)
p = mutable.Publish(fn)
target_info = None
d = defer.maybeDeferred(p._encrypt_and_encode, target_info,
CONTENTS, "READKEY", "IV"*8, 3, 10)
def _done( ((shares, share_ids),
required_shares, total_shares,
segsize, data_length, target_info2) ):
self.failUnlessEqual(len(shares), 10)
for sh in shares:
self.failUnless(isinstance(sh, str))
self.failUnlessEqual(len(sh), 7)
self.failUnlessEqual(len(share_ids), 10)
self.failUnlessEqual(required_shares, 3)
self.failUnlessEqual(total_shares, 10)
self.failUnlessEqual(segsize, 21)
self.failUnlessEqual(data_length, len(CONTENTS))
self.failUnlessIdentical(target_info, target_info2)
d.addCallback(_done)
return d
def test_generate(self):
c = FakeClient()
fn = FakeFilenode(c)
# .create usually returns a Deferred, but we happen to know it's
# synchronous
CONTENTS = "some initial contents"
fn.create(CONTENTS)
p = mutable.Publish(fn)
r = mutable.Retrieve(fn)
# make some fake shares
shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
target_info = None
p._privkey = FakePrivKey(0)
p._encprivkey = "encprivkey"
p._pubkey = FakePubKey(0)
d = defer.maybeDeferred(p._generate_shares,
(shares_and_ids,
3, 10,
21, # segsize
len(CONTENTS),
target_info),
3, # seqnum
"IV"*8)
def _done( (seqnum, root_hash, final_shares, target_info2) ):
self.failUnlessEqual(seqnum, 3)
self.failUnlessEqual(len(root_hash), 32)
self.failUnless(isinstance(final_shares, dict))
self.failUnlessEqual(len(final_shares), 10)
self.failUnlessEqual(sorted(final_shares.keys()), range(10))
for i,sh in final_shares.items():
self.failUnless(isinstance(sh, str))
self.failUnlessEqual(len(sh), 381)
# feed the share through the unpacker as a sanity-check
pieces = mutable.unpack_share(sh)
(u_seqnum, u_root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
share_data, enc_privkey) = pieces
self.failUnlessEqual(u_seqnum, 3)
self.failUnlessEqual(u_root_hash, root_hash)
self.failUnlessEqual(k, 3)
self.failUnlessEqual(N, 10)
self.failUnlessEqual(segsize, 21)
self.failUnlessEqual(datalen, len(CONTENTS))
self.failUnlessEqual(pubkey, FakePubKey(0).serialize())
sig_material = struct.pack(">BQ32s16s BBQQ",
0, seqnum, root_hash, IV,
k, N, segsize, datalen)
self.failUnlessEqual(signature,
FakePrivKey(0).sign(sig_material))
self.failUnless(isinstance(share_hash_chain, dict))
self.failUnlessEqual(len(share_hash_chain), 4) # ln2(10)++
for shnum,share_hash in share_hash_chain.items():
self.failUnless(isinstance(shnum, int))
self.failUnless(isinstance(share_hash, str))
self.failUnlessEqual(len(share_hash), 32)
self.failUnless(isinstance(block_hash_tree, list))
self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
self.failUnlessEqual(IV, "IV"*8)
self.failUnlessEqual(len(share_data), len("%07d" % 1))
self.failUnlessEqual(enc_privkey, "encprivkey")
self.failUnlessIdentical(target_info, target_info2)
d.addCallback(_done)
return d
def setup_for_sharemap(self, num_peers):
c = FakeClient(num_peers)
fn = FakeFilenode(c)
s = FakeStorage()
# .create usually returns a Deferred, but we happen to know it's
# synchronous
CONTENTS = "some initial contents"
fn.create(CONTENTS)
p = FakePublish(fn)
p._storage_index = "\x00"*32
p._new_seqnum = 3
p._read_size = 1000
#r = mutable.Retrieve(fn)
p._storage = s
return c, p
def shouldFail(self, expected_failure, which, call, *args, **kwargs):
substring = kwargs.pop("substring", None)
d = defer.maybeDeferred(call, *args, **kwargs)
def _done(res):
if isinstance(res, failure.Failure):
res.trap(expected_failure)
if substring:
self.failUnless(substring in str(res),
"substring '%s' not in '%s'"
% (substring, str(res)))
else:
self.fail("%s was supposed to raise %s, not get '%s'" %
(which, expected_failure, res))
d.addBoth(_done)
return d
def test_sharemap_20newpeers(self):
c, p = self.setup_for_sharemap(20)
total_shares = 10
d = p._query_peers(total_shares)
def _done(target_info):
(target_map, shares_per_peer) = target_info
shares_per_peer = {}
for shnum in target_map:
for (peerid, old_seqnum, old_R) in target_map[shnum]:
#print "shnum[%d]: send to %s [oldseqnum=%s]" % \
# (shnum, idlib.b2a(peerid), old_seqnum)
if peerid not in shares_per_peer:
shares_per_peer[peerid] = 1
else:
shares_per_peer[peerid] += 1
# verify that we're sending only one share per peer
for peerid, count in shares_per_peer.items():
self.failUnlessEqual(count, 1)
d.addCallback(_done)
return d
def test_sharemap_3newpeers(self):
c, p = self.setup_for_sharemap(3)
total_shares = 10
d = p._query_peers(total_shares)
def _done(target_info):
(target_map, shares_per_peer) = target_info
shares_per_peer = {}
for shnum in target_map:
for (peerid, old_seqnum, old_R) in target_map[shnum]:
if peerid not in shares_per_peer:
shares_per_peer[peerid] = 1
else:
shares_per_peer[peerid] += 1
# verify that we're sending 3 or 4 shares per peer
for peerid, count in shares_per_peer.items():
self.failUnless(count in (3,4), count)
d.addCallback(_done)
return d
def test_sharemap_nopeers(self):
c, p = self.setup_for_sharemap(0)
total_shares = 10
d = self.shouldFail(NotEnoughPeersError, "test_sharemap_nopeers",
p._query_peers, total_shares)
return d
def test_write(self):
total_shares = 10
c, p = self.setup_for_sharemap(20)
p._privkey = FakePrivKey(0)
p._encprivkey = "encprivkey"
p._pubkey = FakePubKey(0)
# make some fake shares
CONTENTS = "some initial contents"
shares_and_ids = ( ["%07d" % i for i in range(10)], range(10) )
d = defer.maybeDeferred(p._query_peers, total_shares)
IV = "IV"*8
d.addCallback(lambda target_info:
p._generate_shares( (shares_and_ids,
3, total_shares,
21, # segsize
len(CONTENTS),
target_info),
3, # seqnum
IV))
d.addCallback(p._send_shares, IV)
def _done((surprised, dispatch_map)):
self.failIf(surprised, "surprised!")
d.addCallback(_done)
return d
class FakeRetrieve(mutable.Retrieve):
def _do_read(self, ss, peerid, storage_index, shnums, readv):
shares = self._storage.read(peerid, storage_index)
response = {}
for shnum in shares:
if shnums and shnum not in shnums:
continue
vector = response[shnum] = []
for (offset, length) in readv:
vector.append(shares[shnum][offset:offset+length])
return defer.succeed(response)
def _deserialize_pubkey(self, pubkey_s):
mo = re.search(r"^PUBKEY-(\d+)$", pubkey_s)
if not mo:
raise RuntimeError("mangled pubkey")
count = mo.group(1)
return FakePubKey(int(count))
class Roundtrip(unittest.TestCase):
2007-11-06 05:38:43 +00:00
def setup_for_publish(self, num_peers):
c = FakeClient(num_peers)
2007-11-06 05:38:43 +00:00
fn = FakeFilenode(c)
s = FakeStorage()
2007-11-06 05:38:43 +00:00
# .create usually returns a Deferred, but we happen to know it's
# synchronous
fn.create("")
p = FakePublish(fn)
p._storage = s
r = FakeRetrieve(fn)
r._storage = s
return c, s, fn, p, r
2007-11-06 05:38:43 +00:00
def test_basic(self):
c, s, fn, p, r = self.setup_for_publish(20)
contents = "New contents go here"
d = p.publish(contents)
def _published(res):
return r.retrieve()
d.addCallback(_published)
def _retrieved(new_contents):
self.failUnlessEqual(contents, new_contents)
d.addCallback(_retrieved)
2007-11-06 05:38:43 +00:00
return d
def flip_bit(self, original, byte_offset):
return (original[:byte_offset] +
chr(ord(original[byte_offset]) ^ 0x01) +
original[byte_offset+1:])
def shouldFail(self, expected_failure, which, substring,
callable, *args, **kwargs):
assert substring is None or isinstance(substring, str)
d = defer.maybeDeferred(callable, *args, **kwargs)
def done(res):
if isinstance(res, failure.Failure):
res.trap(expected_failure)
if substring:
self.failUnless(substring in str(res),
"substring '%s' not in '%s'"
% (substring, str(res)))
else:
self.fail("%s was supposed to raise %s, not get '%s'" %
(which, expected_failure, res))
d.addBoth(done)
return d
def _corrupt_all(self, offset, substring, refetch_pubkey=False,
should_succeed=False):
c, s, fn, p, r = self.setup_for_publish(20)
contents = "New contents go here"
d = p.publish(contents)
def _published(res):
if refetch_pubkey:
# clear the pubkey, to force a fetch
r._pubkey = None
for peerid in s._peers:
shares = s._peers[peerid]
for shnum in shares:
data = shares[shnum]
(version,
seqnum,
root_hash,
IV,
k, N, segsize, datalen,
o) = mutable.unpack_header(data)
if isinstance(offset, tuple):
offset1, offset2 = offset
else:
offset1 = offset
offset2 = 0
if offset1 == "pubkey":
real_offset = 107
elif offset1 in o:
real_offset = o[offset1]
else:
real_offset = offset1
real_offset = int(real_offset) + offset2
assert isinstance(real_offset, int), offset
shares[shnum] = self.flip_bit(data, real_offset)
d.addCallback(_published)
if should_succeed:
d.addCallback(lambda res: r.retrieve())
else:
d.addCallback(lambda res:
self.shouldFail(NotEnoughPeersError,
"_corrupt_all(offset=%s)" % (offset,),
substring,
r.retrieve))
return d
def test_corrupt_all_verbyte(self):
# when the version byte is not 0, we hit an assertion error in
# unpack_share().
return self._corrupt_all(0, "AssertionError")
def test_corrupt_all_seqnum(self):
# a corrupt sequence number will trigger a bad signature
return self._corrupt_all(1, "signature is invalid")
def test_corrupt_all_R(self):
# a corrupt root hash will trigger a bad signature
return self._corrupt_all(9, "signature is invalid")
def test_corrupt_all_IV(self):
# a corrupt salt/IV will trigger a bad signature
return self._corrupt_all(41, "signature is invalid")
def test_corrupt_all_k(self):
# a corrupt 'k' will trigger a bad signature
return self._corrupt_all(57, "signature is invalid")
def test_corrupt_all_N(self):
# a corrupt 'N' will trigger a bad signature
return self._corrupt_all(58, "signature is invalid")
def test_corrupt_all_segsize(self):
# a corrupt segsize will trigger a bad signature
return self._corrupt_all(59, "signature is invalid")
def test_corrupt_all_datalen(self):
# a corrupt data length will trigger a bad signature
return self._corrupt_all(67, "signature is invalid")
def test_corrupt_all_pubkey(self):
# a corrupt pubkey won't match the URI's fingerprint
return self._corrupt_all("pubkey", "pubkey doesn't match fingerprint",
refetch_pubkey=True)
def test_corrupt_all_sig(self):
# a corrupt signature is a bad one
# the signature runs from about [543:799], depending upon the length
# of the pubkey
return self._corrupt_all("signature", "signature is invalid",
refetch_pubkey=True)
def test_corrupt_all_share_hash_chain_number(self):
# a corrupt share hash chain entry will show up as a bad hash. If we
# mangle the first byte, that will look like a bad hash number,
# causing an IndexError
return self._corrupt_all("share_hash_chain", "corrupt hashes")
def test_corrupt_all_share_hash_chain_hash(self):
# a corrupt share hash chain entry will show up as a bad hash. If we
# mangle a few bytes in, that will look like a bad hash.
return self._corrupt_all(("share_hash_chain",4), "corrupt hashes")
def test_corrupt_all_block_hash_tree(self):
return self._corrupt_all("block_hash_tree", "block hash tree failure")
def test_corrupt_all_block(self):
return self._corrupt_all("share_data", "block hash tree failure")
def test_corrupt_all_encprivkey(self):
# a corrupted privkey won't even be noticed by the reader
return self._corrupt_all("enc_privkey", None, should_succeed=True)
2008-03-11 01:08:23 +00:00
def test_short_read(self):
c, s, fn, p, r = self.setup_for_publish(20)
contents = "New contents go here"
d = p.publish(contents)
def _published(res):
# force a short read, to make Retrieve._got_results re-send the
# queries. But don't make it so short that we can't read the
# header.
r._read_size = mutable.HEADER_LENGTH + 10
return r.retrieve()
d.addCallback(_published)
def _retrieved(new_contents):
self.failUnlessEqual(contents, new_contents)
d.addCallback(_retrieved)
return d