mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-25 21:59:19 +00:00
test_mutable: more test coverage, building up a framework to cause reads to occur in a specific order
This commit is contained in:
parent
7aee162049
commit
57bd23f35f
@ -1641,6 +1641,7 @@ class MutableFileNode:
|
|||||||
publish_class = Publish
|
publish_class = Publish
|
||||||
retrieve_class = Retrieve
|
retrieve_class = Retrieve
|
||||||
SIGNATURE_KEY_SIZE = 2048
|
SIGNATURE_KEY_SIZE = 2048
|
||||||
|
DEFAULT_ENCODING = (3, 10)
|
||||||
|
|
||||||
def __init__(self, client):
|
def __init__(self, client):
|
||||||
self._client = client
|
self._client = client
|
||||||
@ -1685,8 +1686,8 @@ class MutableFileNode:
|
|||||||
contents. Returns a Deferred that fires (with the MutableFileNode
|
contents. Returns a Deferred that fires (with the MutableFileNode
|
||||||
instance you should use) when it completes.
|
instance you should use) when it completes.
|
||||||
"""
|
"""
|
||||||
self._required_shares = 3
|
self._required_shares, self._total_shares = self.DEFAULT_ENCODING
|
||||||
self._total_shares = 10
|
|
||||||
d = defer.maybeDeferred(self._generate_pubprivkeys)
|
d = defer.maybeDeferred(self._generate_pubprivkeys)
|
||||||
def _generated( (pubkey, privkey) ):
|
def _generated( (pubkey, privkey) ):
|
||||||
self._pubkey, self._privkey = pubkey, privkey
|
self._pubkey, self._privkey = pubkey, privkey
|
||||||
|
@ -2,14 +2,16 @@
|
|||||||
import itertools, struct, re
|
import itertools, struct, re
|
||||||
from cStringIO import StringIO
|
from cStringIO import StringIO
|
||||||
from twisted.trial import unittest
|
from twisted.trial import unittest
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer, reactor
|
||||||
from twisted.python import failure, log
|
from twisted.python import failure
|
||||||
from allmydata import mutable, uri, dirnode, download
|
from allmydata import mutable, uri, dirnode, download
|
||||||
from allmydata.util.hashutil import tagged_hash
|
from allmydata.util.hashutil import tagged_hash
|
||||||
from allmydata.encode import NotEnoughPeersError
|
from allmydata.encode import NotEnoughPeersError
|
||||||
from allmydata.interfaces import IURI, INewDirectoryURI, \
|
from allmydata.interfaces import IURI, INewDirectoryURI, \
|
||||||
IMutableFileURI, IUploadable, IFileURI
|
IMutableFileURI, IUploadable, IFileURI
|
||||||
from allmydata.filenode import LiteralFileNode
|
from allmydata.filenode import LiteralFileNode
|
||||||
|
from foolscap.eventual import eventually
|
||||||
|
from foolscap.logging import log
|
||||||
import sha
|
import sha
|
||||||
|
|
||||||
#from allmydata.test.common import FakeMutableFileNode
|
#from allmydata.test.common import FakeMutableFileNode
|
||||||
@ -59,10 +61,35 @@ class FakeStorage:
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self._peers = {}
|
self._peers = {}
|
||||||
|
# _sequence is used to cause the responses to occur in a specific
|
||||||
|
# order. If it is in use, then we will defer queries instead of
|
||||||
|
# answering them right away, accumulating the Deferreds in a dict. We
|
||||||
|
# don't know exactly how many queries we'll get, so exactly one
|
||||||
|
# second after the first query arrives, we will release them all (in
|
||||||
|
# order).
|
||||||
|
self._sequence = None
|
||||||
|
self._pending = {}
|
||||||
|
|
||||||
def read(self, peerid, storage_index):
|
def read(self, peerid, storage_index):
|
||||||
shares = self._peers.get(peerid, {})
|
shares = self._peers.get(peerid, {})
|
||||||
return shares
|
if self._sequence is None:
|
||||||
|
return shares
|
||||||
|
d = defer.Deferred()
|
||||||
|
if not self._pending:
|
||||||
|
reactor.callLater(1.0, self._fire_readers)
|
||||||
|
self._pending[peerid] = (d, shares)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _fire_readers(self):
|
||||||
|
pending = self._pending
|
||||||
|
self._pending = {}
|
||||||
|
extra = []
|
||||||
|
for peerid in self._sequence:
|
||||||
|
if peerid in pending:
|
||||||
|
d, shares = pending.pop(peerid)
|
||||||
|
eventually(d.callback, shares)
|
||||||
|
for (d, shares) in pending.items():
|
||||||
|
eventually(d.callback, shares)
|
||||||
|
|
||||||
def write(self, peerid, storage_index, shnum, offset, data):
|
def write(self, peerid, storage_index, shnum, offset, data):
|
||||||
if peerid not in self._peers:
|
if peerid not in self._peers:
|
||||||
@ -80,7 +107,7 @@ class FakePublish(mutable.Publish):
|
|||||||
def _do_read(self, ss, peerid, storage_index, shnums, readv):
|
def _do_read(self, ss, peerid, storage_index, shnums, readv):
|
||||||
assert ss[0] == peerid
|
assert ss[0] == peerid
|
||||||
assert shnums == []
|
assert shnums == []
|
||||||
return defer.succeed(self._storage.read(peerid, storage_index))
|
return defer.maybeDeferred(self._storage.read, peerid, storage_index)
|
||||||
|
|
||||||
def _do_testreadwrite(self, peerid, secrets,
|
def _do_testreadwrite(self, peerid, secrets,
|
||||||
tw_vectors, read_vector):
|
tw_vectors, read_vector):
|
||||||
@ -432,16 +459,18 @@ class Publish(unittest.TestCase):
|
|||||||
|
|
||||||
class FakeRetrieve(mutable.Retrieve):
|
class FakeRetrieve(mutable.Retrieve):
|
||||||
def _do_read(self, ss, peerid, storage_index, shnums, readv):
|
def _do_read(self, ss, peerid, storage_index, shnums, readv):
|
||||||
shares = self._storage.read(peerid, storage_index)
|
d = defer.maybeDeferred(self._storage.read, peerid, storage_index)
|
||||||
|
def _read(shares):
|
||||||
response = {}
|
response = {}
|
||||||
for shnum in shares:
|
for shnum in shares:
|
||||||
if shnums and shnum not in shnums:
|
if shnums and shnum not in shnums:
|
||||||
continue
|
continue
|
||||||
vector = response[shnum] = []
|
vector = response[shnum] = []
|
||||||
for (offset, length) in readv:
|
for (offset, length) in readv:
|
||||||
vector.append(shares[shnum][offset:offset+length])
|
vector.append(shares[shnum][offset:offset+length])
|
||||||
return defer.succeed(response)
|
return response
|
||||||
|
d.addCallback(_read)
|
||||||
|
return d
|
||||||
|
|
||||||
def _deserialize_pubkey(self, pubkey_s):
|
def _deserialize_pubkey(self, pubkey_s):
|
||||||
mo = re.search(r"^PUBKEY-(\d+)$", pubkey_s)
|
mo = re.search(r"^PUBKEY-(\d+)$", pubkey_s)
|
||||||
@ -626,3 +655,154 @@ class Roundtrip(unittest.TestCase):
|
|||||||
self.failUnlessEqual(contents, new_contents)
|
self.failUnlessEqual(contents, new_contents)
|
||||||
d.addCallback(_retrieved)
|
d.addCallback(_retrieved)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
def test_basic_sequenced(self):
|
||||||
|
c, s, fn, p, r = self.setup_for_publish(20)
|
||||||
|
s._sequence = c._peerids[:]
|
||||||
|
contents = "New contents go here"
|
||||||
|
d = p.publish(contents)
|
||||||
|
def _published(res):
|
||||||
|
return r.retrieve()
|
||||||
|
d.addCallback(_published)
|
||||||
|
def _retrieved(new_contents):
|
||||||
|
self.failUnlessEqual(contents, new_contents)
|
||||||
|
d.addCallback(_retrieved)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def test_basic_pubkey_at_end(self):
|
||||||
|
# we corrupt the pubkey in all but the last 'k' shares, allowing the
|
||||||
|
# download to succeed but forcing a bunch of retries first. Note that
|
||||||
|
# this is rather pessimistic: our Retrieve process will throw away
|
||||||
|
# the whole share if the pubkey is bad, even though the rest of the
|
||||||
|
# share might be good.
|
||||||
|
c, s, fn, p, r = self.setup_for_publish(20)
|
||||||
|
s._sequence = c._peerids[:]
|
||||||
|
contents = "New contents go here"
|
||||||
|
d = p.publish(contents)
|
||||||
|
def _published(res):
|
||||||
|
r._pubkey = None
|
||||||
|
homes = [peerid for peerid in c._peerids
|
||||||
|
if s._peers.get(peerid, {})]
|
||||||
|
k = fn.get_required_shares()
|
||||||
|
homes_to_corrupt = homes[:-k]
|
||||||
|
for peerid in homes_to_corrupt:
|
||||||
|
shares = s._peers[peerid]
|
||||||
|
for shnum in shares:
|
||||||
|
data = shares[shnum]
|
||||||
|
(version,
|
||||||
|
seqnum,
|
||||||
|
root_hash,
|
||||||
|
IV,
|
||||||
|
k, N, segsize, datalen,
|
||||||
|
o) = mutable.unpack_header(data)
|
||||||
|
offset = 107 # pubkey
|
||||||
|
shares[shnum] = self.flip_bit(data, offset)
|
||||||
|
return r.retrieve()
|
||||||
|
d.addCallback(_published)
|
||||||
|
def _retrieved(new_contents):
|
||||||
|
self.failUnlessEqual(contents, new_contents)
|
||||||
|
d.addCallback(_retrieved)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def OFF_test_multiple_encodings(self): # not finished yet
|
||||||
|
# we encode the same file in two different ways (3-of-10 and 4-of-9),
|
||||||
|
# then mix up the shares, to make sure that download survives seeing
|
||||||
|
# a variety of encodings. This is actually kind of tricky to set up.
|
||||||
|
c, s, fn, p, r = self.setup_for_publish(20)
|
||||||
|
# we ignore fn, p, and r
|
||||||
|
|
||||||
|
# force share retrieval to occur in this order
|
||||||
|
s._sequence = c._peerids[:]
|
||||||
|
|
||||||
|
fn1 = FakeFilenode(c)
|
||||||
|
fn1.DEFAULT_ENCODING = (3,10)
|
||||||
|
fn1.create("")
|
||||||
|
p1 = FakePublish(fn1)
|
||||||
|
p1._storage = s
|
||||||
|
|
||||||
|
# and we make a second filenode with the same key but different
|
||||||
|
# encoding
|
||||||
|
fn2 = FakeFilenode(c)
|
||||||
|
# init_from_uri populates _uri, _writekey, _readkey, _storage_index,
|
||||||
|
# and _fingerprint
|
||||||
|
fn2.init_from_uri(fn1.get_uri())
|
||||||
|
# then we copy over other fields that are normally fetched from the
|
||||||
|
# existing shares
|
||||||
|
fn2._pubkey = fn1._pubkey
|
||||||
|
fn2._privkey = fn1._privkey
|
||||||
|
fn2._encprivkey = fn1._encprivkey
|
||||||
|
fn2._current_seqnum = 0
|
||||||
|
fn2._current_roothash = "\x00" * 32
|
||||||
|
# and set the encoding parameters to something completely different
|
||||||
|
fn2._required_shares = 4
|
||||||
|
fn2._total_shares = 9
|
||||||
|
|
||||||
|
p2 = FakePublish(fn2)
|
||||||
|
p2._storage = s
|
||||||
|
|
||||||
|
# we make a retrieval object that doesn't know what encoding
|
||||||
|
# parameters to use
|
||||||
|
fn3 = FakeFilenode(c)
|
||||||
|
fn3.init_from_uri(fn1.get_uri())
|
||||||
|
r3 = FakeRetrieve(fn3)
|
||||||
|
r3._storage = s
|
||||||
|
|
||||||
|
# now we upload a file through fn1, and grab its shares
|
||||||
|
contents1 = "Contents for encoding 1 (3-of-10) go here"
|
||||||
|
contents2 = "Contents for encoding 2 (4-of-9) go here"
|
||||||
|
d = p1.publish(contents1)
|
||||||
|
def _published_1(res):
|
||||||
|
self._shares1 = s._peers
|
||||||
|
s._peers = {}
|
||||||
|
# and upload it through fn2
|
||||||
|
return p2.publish(contents2)
|
||||||
|
d.addCallback(_published_1)
|
||||||
|
def _published_2(res):
|
||||||
|
self._shares2 = s._peers
|
||||||
|
s._peers = {}
|
||||||
|
d.addCallback(_published_2)
|
||||||
|
def _merge(res):
|
||||||
|
log.msg("merging sharelists")
|
||||||
|
print len(self._shares1), len(self._shares2)
|
||||||
|
from allmydata.util import idlib
|
||||||
|
# we rearrange the shares, removing them from their original
|
||||||
|
# homes.
|
||||||
|
shares1 = self._shares1.values()
|
||||||
|
shares2 = self._shares2.values()
|
||||||
|
|
||||||
|
print len(shares1), len(shares2)
|
||||||
|
# then we place shares in the following order:
|
||||||
|
# 4-of-9 a s2
|
||||||
|
# 4-of-9 b s2
|
||||||
|
# 4-of-9 c s2
|
||||||
|
# 3-of-9 d s1
|
||||||
|
# 3-of-9 e s1
|
||||||
|
# 4-of-9 f s2
|
||||||
|
# 3-of-9 g s1
|
||||||
|
# so that neither form can be recovered until fetch [f]. Later,
|
||||||
|
# when we implement code that handles multiple versions, we can
|
||||||
|
# use this framework to assert that all recoverable versions are
|
||||||
|
# retrieved, and test that 'epsilon' does its job
|
||||||
|
places = [2, 2, 2, 1, 1, 2, 1]
|
||||||
|
for i in range(len(s._sequence)):
|
||||||
|
peerid = s._sequence[i]
|
||||||
|
if not places:
|
||||||
|
print idlib.shortnodeid_b2a(peerid), "-", "-"
|
||||||
|
break
|
||||||
|
which = places.pop(0)
|
||||||
|
if which == 1:
|
||||||
|
print idlib.shortnodeid_b2a(peerid), "1", "-"
|
||||||
|
s._peers[peerid] = shares1.pop(0)
|
||||||
|
else:
|
||||||
|
print idlib.shortnodeid_b2a(peerid), "-", "2"
|
||||||
|
s._peers[peerid] = shares2.pop(0)
|
||||||
|
# we don't bother placing any other shares
|
||||||
|
log.msg("merge done")
|
||||||
|
d.addCallback(_merge)
|
||||||
|
d.addCallback(lambda res: r3.retrieve())
|
||||||
|
def _retrieved(new_contents):
|
||||||
|
# the current specified behavior is "first version recoverable"
|
||||||
|
self.failUnlessEqual(new_contents, contents2)
|
||||||
|
d.addCallback(_retrieved)
|
||||||
|
return d
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user