tahoe-lafs/src/allmydata/test/test_upload.py

1293 lines
51 KiB
Python
Raw Normal View History

2006-12-01 09:54:28 +00:00
import os, shutil
from cStringIO import StringIO
2006-12-01 09:54:28 +00:00
from twisted.trial import unittest
2007-03-30 21:54:33 +00:00
from twisted.python.failure import Failure
from twisted.python import log
from twisted.internet import defer
from foolscap.api import fireEventually
2006-12-01 09:54:28 +00:00
import allmydata # for __full_version__
Overhaul IFilesystemNode handling, to simplify tests and use POLA internally. * stop using IURI as an adapter * pass cap strings around instead of URI instances * move filenode/dirnode creation duties from Client to new NodeMaker class * move other Client duties to KeyGenerator, SecretHolder, History classes * stop passing Client reference to dirnode/filenode constructors - pass less-powerful references instead, like StorageBroker or Uploader * always create DirectoryNodes by wrapping a filenode (mutable for now) * remove some specialized mock classes from unit tests Detailed list of changes (done one at a time, then merged together) always pass a string to create_node_from_uri(), not an IURI instance always pass a string to IFilesystemNode constructors, not an IURI instance stop using IURI() as an adapter, switch on cap prefix in create_node_from_uri() client.py: move SecretHolder code out to a separate class test_web.py: hush pyflakes client.py: move NodeMaker functionality out into a separate object LiteralFileNode: stop storing a Client reference immutable Checker: remove Client reference, it only needs a SecretHolder immutable Upload: remove Client reference, leave SecretHolder and StorageBroker immutable Repairer: replace Client reference with StorageBroker and SecretHolder immutable FileNode: remove Client reference mutable.Publish: stop passing Client mutable.ServermapUpdater: get StorageBroker in constructor, not by peeking into Client reference MutableChecker: reference StorageBroker and History directly, not through Client mutable.FileNode: removed unused indirection to checker classes mutable.FileNode: remove Client reference client.py: move RSA key generation into a separate class, so it can be passed to the nodemaker move create_mutable_file() into NodeMaker test_dirnode.py: stop using FakeClient mockups, use NoNetworkGrid instead. This simplifies the code, but takes longer to run (17s instead of 6s). This should come down later when other cleanups make it possible to use simpler (non-RSA) fake mutable files for dirnode tests. test_mutable.py: clean up basedir names client.py: move create_empty_dirnode() into NodeMaker dirnode.py: get rid of DirectoryNode.create remove DirectoryNode.init_from_uri, refactor NodeMaker for customization, simplify test_web's mock Client to match stop passing Client to DirectoryNode, make DirectoryNode.create_with_mutablefile the normal DirectoryNode constructor, start removing client from NodeMaker remove Client from NodeMaker move helper status into History, pass History to web.Status instead of Client test_mutable.py: fix minor typo
2009-08-15 11:02:56 +00:00
from allmydata import uri, monitor, client
from allmydata.immutable import upload, encode
2009-11-11 22:45:42 +00:00
from allmydata.interfaces import FileTooLargeError, NoSharesError, \
NotEnoughSharesError
2007-07-13 22:09:01 +00:00
from allmydata.util.assertutil import precondition
from allmydata.util.deferredutil import DeferredListShouldSucceed
from no_network import GridTestMixin
from common_util import ShouldFailMixin
from allmydata.storage_client import StorageFarmBroker
from allmydata.storage.server import storage_index_to_dir
2006-12-01 09:54:28 +00:00
MiB = 1024*1024
def extract_uri(results):
return results.uri
# Some of these took longer than 480 seconds on Zandr's arm box, but this may
# have been due to an earlier test ERROR'ing out due to timeout, which seems
# to screw up subsequent tests.
timeout = 960
class Uploadable(unittest.TestCase):
def shouldEqual(self, data, expected):
self.failUnless(isinstance(data, list))
for e in data:
self.failUnless(isinstance(e, str))
s = "".join(data)
self.failUnlessEqual(s, expected)
def test_filehandle_random_key(self):
return self._test_filehandle(convergence=None)
def test_filehandle_convergent_encryption(self):
return self._test_filehandle(convergence="some convergence string")
def _test_filehandle(self, convergence):
s = StringIO("a"*41)
u = upload.FileHandle(s, convergence=convergence)
d = u.get_size()
d.addCallback(self.failUnlessEqual, 41)
d.addCallback(lambda res: u.read(1))
d.addCallback(self.shouldEqual, "a")
d.addCallback(lambda res: u.read(80))
d.addCallback(self.shouldEqual, "a"*40)
d.addCallback(lambda res: u.close()) # this doesn't close the filehandle
d.addCallback(lambda res: s.close()) # that privilege is reserved for us
return d
def test_filename(self):
basedir = "upload/Uploadable/test_filename"
os.makedirs(basedir)
fn = os.path.join(basedir, "file")
f = open(fn, "w")
f.write("a"*41)
f.close()
u = upload.FileName(fn, convergence=None)
d = u.get_size()
d.addCallback(self.failUnlessEqual, 41)
d.addCallback(lambda res: u.read(1))
d.addCallback(self.shouldEqual, "a")
d.addCallback(lambda res: u.read(80))
d.addCallback(self.shouldEqual, "a"*40)
d.addCallback(lambda res: u.close())
return d
def test_data(self):
s = "a"*41
u = upload.Data(s, convergence=None)
d = u.get_size()
d.addCallback(self.failUnlessEqual, 41)
d.addCallback(lambda res: u.read(1))
d.addCallback(self.shouldEqual, "a")
d.addCallback(lambda res: u.read(80))
d.addCallback(self.shouldEqual, "a"*40)
d.addCallback(lambda res: u.close())
return d
class ServerError(Exception):
pass
class SetDEPMixin:
def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
p = {"k": k,
"happy": happy,
"n": n,
"max_segment_size": max_segsize,
}
self.node.DEFAULT_ENCODING_PARAMETERS = p
2007-07-13 22:09:01 +00:00
class FakeStorageServer:
def __init__(self, mode):
self.mode = mode
self.allocated = []
self.queries = 0
self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": 2**32 },
"application-version": str(allmydata.__full_version__),
}
if mode == "small":
self.version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
{ "maximum-immutable-share-size": 10 },
"application-version": str(allmydata.__full_version__),
}
2007-07-13 22:09:01 +00:00
def callRemote(self, methname, *args, **kwargs):
def _call():
meth = getattr(self, methname)
return meth(*args, **kwargs)
d = fireEventually()
2007-07-13 22:09:01 +00:00
d.addCallback(lambda res: _call())
return d
def allocate_buckets(self, storage_index, renew_secret, cancel_secret,
sharenums, share_size, canary):
2007-07-13 22:09:01 +00:00
#print "FakeStorageServer.allocate_buckets(num=%d, size=%d)" % (len(sharenums), share_size)
if self.mode == "first-fail":
if self.queries == 0:
raise ServerError
if self.mode == "second-fail":
if self.queries == 1:
raise ServerError
self.queries += 1
2007-07-13 22:09:01 +00:00
if self.mode == "full":
return (set(), {},)
elif self.mode == "already got them":
return (set(sharenums), {},)
else:
for shnum in sharenums:
self.allocated.append( (storage_index, shnum) )
2007-07-13 22:09:01 +00:00
return (set(),
dict([( shnum, FakeBucketWriter(share_size) )
for shnum in sharenums]),
)
class FakeBucketWriter:
# a diagnostic version of storageserver.BucketWriter
def __init__(self, size):
self.data = StringIO()
self.closed = False
self._size = size
def callRemote(self, methname, *args, **kwargs):
def _call():
meth = getattr(self, "remote_" + methname)
return meth(*args, **kwargs)
d = fireEventually()
2007-07-13 22:09:01 +00:00
d.addCallback(lambda res: _call())
return d
def remote_write(self, offset, data):
precondition(not self.closed)
precondition(offset >= 0)
precondition(offset+len(data) <= self._size,
"offset=%d + data=%d > size=%d" %
(offset, len(data), self._size))
self.data.seek(offset)
self.data.write(data)
def remote_close(self):
precondition(not self.closed)
self.closed = True
2007-03-30 21:54:33 +00:00
def remote_abort(self):
log.err(RuntimeError("uh oh, I was asked to abort"))
2006-12-01 09:54:28 +00:00
class FakeClient:
DEFAULT_ENCODING_PARAMETERS = {"k":25,
"happy": 75,
"n": 100,
"max_segment_size": 1*MiB,
}
def __init__(self, mode="good", num_servers=50):
self.num_servers = num_servers
if type(mode) is str:
mode = dict([i,mode] for i in range(num_servers))
peers = [ ("%20d"%fakeid, FakeStorageServer(mode[fakeid]))
for fakeid in range(self.num_servers) ]
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
for (serverid, server) in peers:
self.storage_broker.test_add_server(serverid, server)
self.last_peers = [p[1] for p in peers]
def log(self, *args, **kwargs):
pass
def get_encoding_parameters(self):
return self.DEFAULT_ENCODING_PARAMETERS
def get_storage_broker(self):
return self.storage_broker
_secret_holder = client.SecretHolder("lease secret", "convergence secret")
class GotTooFarError(Exception):
pass
class GiganticUploadable(upload.FileHandle):
def __init__(self, size):
self._size = size
self._fp = 0
def get_encryption_key(self):
return defer.succeed("\x00" * 16)
def get_size(self):
return defer.succeed(self._size)
def read(self, length):
left = self._size - self._fp
length = min(left, length)
self._fp += length
if self._fp > 1000000:
# terminate the test early.
raise GotTooFarError("we shouldn't be allowed to get this far")
return defer.succeed(["\x00" * length])
def close(self):
pass
DATA = """
Once upon a time, there was a beautiful princess named Buttercup. She lived
in a magical land where every file was stored securely among millions of
machines, and nobody ever worried about their data being lost ever again.
The End.
"""
assert len(DATA) > upload.Uploader.URI_LIT_SIZE_THRESHOLD
SIZE_ZERO = 0
SIZE_SMALL = 16
SIZE_LARGE = len(DATA)
def upload_data(uploader, data):
u = upload.Data(data, convergence=None)
return uploader.upload(u)
def upload_filename(uploader, filename):
u = upload.FileName(filename, convergence=None)
return uploader.upload(u)
def upload_filehandle(uploader, fh):
u = upload.FileHandle(fh, convergence=None)
return uploader.upload(u)
class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
def setUp(self):
2007-03-30 21:54:33 +00:00
self.node = FakeClient(mode="good")
self.u = upload.Uploader()
self.u.running = True
self.u.parent = self.node
def _check_small(self, newuri, size):
2009-11-11 22:45:42 +00:00
u = uri.from_string(newuri)
self.failUnless(isinstance(u, uri.LiteralFileURI))
self.failUnlessEqual(len(u.data), size)
def _check_large(self, newuri, size):
2009-11-11 22:45:42 +00:00
u = uri.from_string(newuri)
self.failUnless(isinstance(u, uri.CHKFileURI))
self.failUnless(isinstance(u.get_storage_index(), str))
self.failUnlessEqual(len(u.get_storage_index()), 16)
self.failUnless(isinstance(u.key, str))
self.failUnlessEqual(len(u.key), 16)
self.failUnlessEqual(u.size, size)
def get_data(self, size):
return DATA[:size]
def test_too_large(self):
# we've removed the 4GiB share size limit (see ticket #346 for
# details), but still have an 8-byte field, so the limit is now
# 2**64, so make sure we reject files larger than that.
k = 3; happy = 7; n = 10
self.set_encoding_parameters(k, happy, n)
big = k*(2**64)
data1 = GiganticUploadable(big)
d = self.shouldFail(FileTooLargeError, "test_too_large-data1",
"This file is too large to be uploaded (data_size)",
self.u.upload, data1)
data2 = GiganticUploadable(big-3)
d.addCallback(lambda res:
self.shouldFail(FileTooLargeError,
"test_too_large-data2",
"This file is too large to be uploaded (offsets)",
self.u.upload, data2))
# I don't know where the actual limit is.. it depends upon how large
# the hash trees wind up. It's somewhere close to k*4GiB-ln2(size).
return d
def test_data_zero(self):
data = self.get_data(SIZE_ZERO)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_small, SIZE_ZERO)
return d
def test_data_small(self):
data = self.get_data(SIZE_SMALL)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_small, SIZE_SMALL)
return d
def test_data_large(self):
data = self.get_data(SIZE_LARGE)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
return d
def test_data_large_odd_segments(self):
data = self.get_data(SIZE_LARGE)
segsize = int(SIZE_LARGE / 2.5)
# we want 3 segments, since that's not a power of two
self.set_encoding_parameters(25, 75, 100, segsize)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
return d
def test_filehandle_zero(self):
data = self.get_data(SIZE_ZERO)
d = upload_filehandle(self.u, StringIO(data))
d.addCallback(extract_uri)
d.addCallback(self._check_small, SIZE_ZERO)
return d
def test_filehandle_small(self):
data = self.get_data(SIZE_SMALL)
d = upload_filehandle(self.u, StringIO(data))
d.addCallback(extract_uri)
d.addCallback(self._check_small, SIZE_SMALL)
return d
def test_filehandle_large(self):
data = self.get_data(SIZE_LARGE)
d = upload_filehandle(self.u, StringIO(data))
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
return d
def test_filename_zero(self):
fn = "Uploader-test_filename_zero.data"
f = open(fn, "wb")
data = self.get_data(SIZE_ZERO)
f.write(data)
f.close()
d = upload_filename(self.u, fn)
d.addCallback(extract_uri)
d.addCallback(self._check_small, SIZE_ZERO)
return d
def test_filename_small(self):
fn = "Uploader-test_filename_small.data"
f = open(fn, "wb")
data = self.get_data(SIZE_SMALL)
f.write(data)
f.close()
d = upload_filename(self.u, fn)
d.addCallback(extract_uri)
d.addCallback(self._check_small, SIZE_SMALL)
return d
def test_filename_large(self):
fn = "Uploader-test_filename_large.data"
f = open(fn, "wb")
data = self.get_data(SIZE_LARGE)
f.write(data)
f.close()
d = upload_filename(self.u, fn)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
return d
2007-03-30 21:54:33 +00:00
class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
def make_node(self, mode, num_servers=10):
self.node = FakeClient(mode, num_servers)
self.u = upload.Uploader()
self.u.running = True
self.u.parent = self.node
def _check_large(self, newuri, size):
2009-11-11 22:45:42 +00:00
u = uri.from_string(newuri)
self.failUnless(isinstance(u, uri.CHKFileURI))
self.failUnless(isinstance(u.get_storage_index(), str))
self.failUnlessEqual(len(u.get_storage_index()), 16)
self.failUnless(isinstance(u.key, str))
self.failUnlessEqual(len(u.key), 16)
self.failUnlessEqual(u.size, size)
def test_first_error(self):
mode = dict([(0,"good")] + [(i,"first-fail") for i in range(1,10)])
self.make_node(mode)
d = upload_data(self.u, DATA)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
return d
def test_first_error_all(self):
self.make_node("first-fail")
d = self.shouldFail(NoSharesError, "first_error_all",
"peer selection failed",
upload_data, self.u, DATA)
def _check((f,)):
self.failUnlessIn("placed 0 shares out of 100 total", str(f.value))
# there should also be a 'last failure was' message
self.failUnlessIn("ServerError", str(f.value))
d.addCallback(_check)
return d
def test_second_error(self):
# we want to make sure we make it to a third pass. This means that
# the first pass was insufficient to place all shares, and at least
# one of second pass servers (other than the last one) accepted a
# share (so we'll believe that a third pass will be useful). (if
# everyone but the last server throws an error, then we'll send all
# the remaining shares to the last server at the end of the second
# pass, and if that succeeds, we won't make it to a third pass).
#
# we can achieve this 97.5% of the time by using 40 servers, having
# 39 of them fail on the second request, leaving only one to succeed
# on the second request. (we need to keep the number of servers low
# enough to ensure a second pass with 100 shares).
mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
self.make_node(mode, 40)
d = upload_data(self.u, DATA)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
return d
def test_second_error_all(self):
self.make_node("second-fail")
d = self.shouldFail(NotEnoughSharesError, "second_error_all",
"peer selection failed",
upload_data, self.u, DATA)
def _check((f,)):
self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
# there should also be a 'last failure was' message
self.failUnlessIn("ServerError", str(f.value))
d.addCallback(_check)
return d
2007-03-30 21:54:33 +00:00
class FullServer(unittest.TestCase):
def setUp(self):
self.node = FakeClient(mode="full")
self.u = upload.Uploader()
self.u.running = True
self.u.parent = self.node
def _should_fail(self, f):
self.failUnless(isinstance(f, Failure) and f.check(NoSharesError), f)
2007-03-30 21:54:33 +00:00
def test_data_large(self):
data = DATA
d = upload_data(self.u, data)
2007-03-30 21:54:33 +00:00
d.addBoth(self._should_fail)
return d
class PeerSelection(unittest.TestCase):
def make_client(self, num_servers=50):
self.node = FakeClient(mode="good", num_servers=num_servers)
self.u = upload.Uploader()
self.u.running = True
self.u.parent = self.node
def get_data(self, size):
return DATA[:size]
def _check_large(self, newuri, size):
2009-11-11 22:45:42 +00:00
u = uri.from_string(newuri)
self.failUnless(isinstance(u, uri.CHKFileURI))
self.failUnless(isinstance(u.get_storage_index(), str))
self.failUnlessEqual(len(u.get_storage_index()), 16)
self.failUnless(isinstance(u.key, str))
self.failUnlessEqual(len(u.key), 16)
self.failUnlessEqual(u.size, size)
def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
p = {"k": k,
"happy": happy,
"n": n,
"max_segment_size": max_segsize,
}
self.node.DEFAULT_ENCODING_PARAMETERS = p
def test_one_each(self):
# if we have 50 shares, and there are 50 peers, and they all accept a
# share, we should get exactly one share per peer
self.make_client()
data = self.get_data(SIZE_LARGE)
self.set_encoding_parameters(25, 30, 50)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
for p in self.node.last_peers:
allocated = p.allocated
self.failUnlessEqual(len(allocated), 1)
self.failUnlessEqual(p.queries, 1)
d.addCallback(_check)
return d
def test_two_each(self):
# if we have 100 shares, and there are 50 peers, and they all accept
# all shares, we should get exactly two shares per peer
self.make_client()
data = self.get_data(SIZE_LARGE)
self.set_encoding_parameters(50, 75, 100)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
for p in self.node.last_peers:
allocated = p.allocated
self.failUnlessEqual(len(allocated), 2)
self.failUnlessEqual(p.queries, 2)
d.addCallback(_check)
return d
def test_one_each_plus_one_extra(self):
# if we have 51 shares, and there are 50 peers, then one peer gets
# two shares and the rest get just one
self.make_client()
data = self.get_data(SIZE_LARGE)
self.set_encoding_parameters(24, 41, 51)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
got_one = []
got_two = []
for p in self.node.last_peers:
allocated = p.allocated
self.failUnless(len(allocated) in (1,2), len(allocated))
if len(allocated) == 1:
self.failUnlessEqual(p.queries, 1)
got_one.append(p)
else:
self.failUnlessEqual(p.queries, 2)
got_two.append(p)
self.failUnlessEqual(len(got_one), 49)
self.failUnlessEqual(len(got_two), 1)
d.addCallback(_check)
return d
def test_four_each(self):
# if we have 200 shares, and there are 50 peers, then each peer gets
# 4 shares. The design goal is to accomplish this with only two
# queries per peer.
self.make_client()
data = self.get_data(SIZE_LARGE)
self.set_encoding_parameters(100, 150, 200)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
for p in self.node.last_peers:
allocated = p.allocated
self.failUnlessEqual(len(allocated), 4)
self.failUnlessEqual(p.queries, 2)
d.addCallback(_check)
return d
def test_three_of_ten(self):
# if we have 10 shares and 3 servers, I want to see 3+3+4 rather than
# 4+4+2
self.make_client(3)
data = self.get_data(SIZE_LARGE)
self.set_encoding_parameters(3, 5, 10)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
counts = {}
for p in self.node.last_peers:
allocated = p.allocated
counts[len(allocated)] = counts.get(len(allocated), 0) + 1
histogram = [counts.get(i, 0) for i in range(5)]
self.failUnlessEqual(histogram, [0,0,0,2,1])
d.addCallback(_check)
return d
def test_some_big_some_small(self):
# 10 shares, 20 servers, but half the servers don't support a
# share-size large enough for our file
mode = dict([(i,{0:"good",1:"small"}[i%2]) for i in range(20)])
self.node = FakeClient(mode, num_servers=20)
self.u = upload.Uploader()
self.u.running = True
self.u.parent = self.node
data = self.get_data(SIZE_LARGE)
self.set_encoding_parameters(3, 5, 10)
d = upload_data(self.u, data)
d.addCallback(extract_uri)
d.addCallback(self._check_large, SIZE_LARGE)
def _check(res):
# we should have put one share each on the big peers, and zero
# shares on the small peers
total_allocated = 0
for p in self.node.last_peers:
if p.mode == "good":
self.failUnlessEqual(len(p.allocated), 1)
elif p.mode == "small":
self.failUnlessEqual(len(p.allocated), 0)
total_allocated += len(p.allocated)
self.failUnlessEqual(total_allocated, 10)
d.addCallback(_check)
return d
class StorageIndex(unittest.TestCase):
def test_params_must_matter(self):
DATA = "I am some data"
u = upload.Data(DATA, convergence="")
eu = upload.EncryptAnUploadable(u)
d1 = eu.get_storage_index()
# CHK means the same data should encrypt the same way
u = upload.Data(DATA, convergence="")
eu = upload.EncryptAnUploadable(u)
d1a = eu.get_storage_index()
# but if we use a different convergence string it should be different
u = upload.Data(DATA, convergence="wheee!")
eu = upload.EncryptAnUploadable(u)
d1salt1 = eu.get_storage_index()
# and if we add yet a different convergence it should be different again
u = upload.Data(DATA, convergence="NOT wheee!")
eu = upload.EncryptAnUploadable(u)
d1salt2 = eu.get_storage_index()
# and if we use the first string again it should be the same as last time
u = upload.Data(DATA, convergence="wheee!")
eu = upload.EncryptAnUploadable(u)
d1salt1a = eu.get_storage_index()
# and if we change the encoding parameters, it should be different (from the same convergence string with different encoding parameters)
u = upload.Data(DATA, convergence="")
u.encoding_param_k = u.default_encoding_param_k + 1
eu = upload.EncryptAnUploadable(u)
d2 = eu.get_storage_index()
# and if we use a random key, it should be different than the CHK
u = upload.Data(DATA, convergence=None)
eu = upload.EncryptAnUploadable(u)
d3 = eu.get_storage_index()
# and different from another instance
u = upload.Data(DATA, convergence=None)
eu = upload.EncryptAnUploadable(u)
d4 = eu.get_storage_index()
d = DeferredListShouldSucceed([d1,d1a,d1salt1,d1salt2,d1salt1a,d2,d3,d4])
def _done(res):
si1, si1a, si1salt1, si1salt2, si1salt1a, si2, si3, si4 = res
self.failUnlessEqual(si1, si1a)
self.failIfEqual(si1, si2)
self.failIfEqual(si1, si3)
self.failIfEqual(si1, si4)
self.failIfEqual(si3, si4)
self.failIfEqual(si1salt1, si1)
self.failIfEqual(si1salt1, si1salt2)
self.failIfEqual(si1salt2, si1)
self.failUnlessEqual(si1salt1, si1salt1a)
d.addCallback(_done)
return d
class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
ShouldFailMixin):
def _do_upload_with_broken_servers(self, servers_to_break):
"""
I act like a normal upload, but before I send the results of
Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
PeerTrackers in the used_peers part of the return result.
"""
assert self.g, "I tried to find a grid at self.g, but failed"
broker = self.g.clients[0].storage_broker
sh = self.g.clients[0]._secret_holder
data = upload.Data("data" * 10000, convergence="")
data.encoding_param_k = 3
data.encoding_param_happy = 4
data.encoding_param_n = 10
uploadable = upload.EncryptAnUploadable(data)
encoder = encode.Encoder()
encoder.set_encrypted_uploadable(uploadable)
status = upload.UploadStatus()
selector = upload.Tahoe2PeerSelector("dglev", "test", status)
storage_index = encoder.get_param("storage_index")
share_size = encoder.get_param("share_size")
block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments")
d = selector.get_shareholders(broker, sh, storage_index,
share_size, block_size, num_segments,
10, 4)
def _have_shareholders((used_peers, already_peers)):
assert servers_to_break <= len(used_peers)
for index in xrange(servers_to_break):
server = list(used_peers)[index]
for share in server.buckets.keys():
server.buckets[share].abort()
buckets = {}
servermap = already_peers.copy()
for peer in used_peers:
buckets.update(peer.buckets)
for bucket in peer.buckets:
servermap[bucket] = peer.peerid
encoder.set_shareholders(buckets, servermap)
d = encoder.start()
return d
d.addCallback(_have_shareholders)
return d
def _add_server(self, server_number, readonly=False):
assert self.g, "I tried to find a grid at self.g, but failed"
assert self.shares, "I tried to find shares at self.shares, but failed"
ss = self.g.make_server(server_number, readonly)
self.g.add_server(server_number, ss)
def _add_server_with_share(self, server_number, share_number=None,
readonly=False):
self._add_server(server_number, readonly)
if share_number is not None:
self._copy_share_to_server(share_number, server_number)
def _copy_share_to_server(self, share_number, server_number):
ss = self.g.servers_by_number[server_number]
# Copy share i from the directory associated with the first
# storage server to the directory associated with this one.
assert self.g, "I tried to find a grid at self.g, but failed"
assert self.shares, "I tried to find shares at self.shares, but failed"
old_share_location = self.shares[share_number][2]
new_share_location = os.path.join(ss.storedir, "shares")
si = uri.from_string(self.uri).get_storage_index()
new_share_location = os.path.join(new_share_location,
storage_index_to_dir(si))
if not os.path.exists(new_share_location):
os.makedirs(new_share_location)
new_share_location = os.path.join(new_share_location,
str(share_number))
if old_share_location != new_share_location:
shutil.copy(old_share_location, new_share_location)
shares = self.find_shares(self.uri)
# Make sure that the storage server has the share.
self.failUnless((share_number, ss.my_nodeid, new_share_location)
in shares)
def _setup_and_upload(self):
"""
I set up a NoNetworkGrid with a single server and client,
upload a file to it, store its uri in self.uri, and store its
sharedata in self.shares.
"""
self.set_up_grid(num_clients=1, num_servers=1)
client = self.g.clients[0]
client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
data = upload.Data("data" * 10000, convergence="")
self.data = data
d = client.upload(data)
def _store_uri(ur):
self.uri = ur.uri
d.addCallback(_store_uri)
d.addCallback(lambda ign:
self.find_shares(self.uri))
def _store_shares(shares):
self.shares = shares
d.addCallback(_store_shares)
return d
def test_configure_parameters(self):
self.basedir = self.mktemp()
hooks = {0: self._set_up_nodes_extra_config}
self.set_up_grid(client_config_hooks=hooks)
c0 = self.g.clients[0]
DATA = "data" * 100
u = upload.Data(DATA, convergence="")
d = c0.upload(u)
d.addCallback(lambda ur: c0.create_node_from_uri(ur.uri))
m = monitor.Monitor()
d.addCallback(lambda fn: fn.check(m))
def _check(cr):
data = cr.get_data()
self.failUnlessEqual(data["count-shares-needed"], 7)
self.failUnlessEqual(data["count-shares-expected"], 12)
d.addCallback(_check)
return d
def _setUp(self, ns):
# Used by test_happy_semantics and test_prexisting_share_behavior
# to set up the grid.
self.node = FakeClient(mode="good", num_servers=ns)
self.u = upload.Uploader()
self.u.running = True
self.u.parent = self.node
def test_happy_semantics(self):
self._setUp(2)
DATA = upload.Data("kittens" * 10000, convergence="")
# These parameters are unsatisfiable with the client that we've made
# -- we'll use them to test that the semnatics work correctly.
self.set_encoding_parameters(k=3, happy=5, n=10)
d = self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
"shares could only be placed on 2 servers "
"(5 were requested)",
self.u.upload, DATA)
# Let's reset the client to have 10 servers
d.addCallback(lambda ign:
self._setUp(10))
# These parameters are satisfiable with the client we've made.
d.addCallback(lambda ign:
self.set_encoding_parameters(k=3, happy=5, n=10))
# this should work
d.addCallback(lambda ign:
self.u.upload(DATA))
# Let's reset the client to have 7 servers
# (this is less than n, but more than h)
d.addCallback(lambda ign:
self._setUp(7))
# These encoding parameters should still be satisfiable with our
# client setup
d.addCallback(lambda ign:
self.set_encoding_parameters(k=3, happy=5, n=10))
# This, then, should work.
d.addCallback(lambda ign:
self.u.upload(DATA))
return d
def test_problem_layout_comment_52(self):
def _basedir():
self.basedir = self.mktemp()
_basedir()
# This scenario is at
# http://allmydata.org/trac/tahoe/ticket/778#comment:52
#
# The scenario in comment:52 proposes that we have a layout
# like:
# server 1: share 1
# server 2: share 1
# server 3: share 1
# server 4: shares 2 - 10
# To get access to the shares, we will first upload to one
# server, which will then have shares 1 - 10. We'll then
# add three new servers, configure them to not accept any new
# shares, then write share 1 directly into the serverdir of each.
# Then each of servers 1 - 3 will report that they have share 1,
# and will not accept any new share, while server 4 will report that
# it has shares 2 - 10 and will accept new shares.
# We'll then set 'happy' = 4, and see that an upload fails
# (as it should)
d = self._setup_and_upload()
d.addCallback(lambda ign:
self._add_server_with_share(1, 0, True))
d.addCallback(lambda ign:
self._add_server_with_share(2, 0, True))
d.addCallback(lambda ign:
self._add_server_with_share(3, 0, True))
# Remove the first share from server 0.
def _remove_share_0():
share_location = self.shares[0][2]
os.remove(share_location)
d.addCallback(lambda ign:
_remove_share_0())
# Set happy = 4 in the client.
def _prepare():
client = self.g.clients[0]
client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
return client
d.addCallback(lambda ign:
_prepare())
# Uploading data should fail
d.addCallback(lambda client:
self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
"shares could only be placed on 2 servers "
"(4 were requested)",
client.upload, upload.Data("data" * 10000,
convergence="")))
# Do comment:52, but like this:
# server 2: empty
# server 3: share 0, read-only
# server 1: share 0, read-only
# server 0: shares 0-9
d.addCallback(lambda ign:
_basedir())
d.addCallback(lambda ign:
self._setup_and_upload())
d.addCallback(lambda ign:
self._add_server_with_share(server_number=2))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=3, share_number=0,
readonly=True))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=1, share_number=0,
readonly=True))
def _prepare2():
client = self.g.clients[0]
client.DEFAULT_ENCODING_PARAMETERS['happy'] = 3
return client
d.addCallback(lambda ign:
_prepare2())
d.addCallback(lambda client:
self.shouldFail(NotEnoughSharesError, "test_happy_sematics",
"shares could only be placed on 2 servers "
"(3 were requested)",
client.upload, upload.Data("data" * 10000,
convergence="")))
return d
def test_problem_layout_comment_53(self):
# This scenario is at
# http://allmydata.org/trac/tahoe/ticket/778#comment:53
#
# Set up the grid to have one server
def _change_basedir(ign):
self.basedir = self.mktemp()
_change_basedir(None)
d = self._setup_and_upload()
# We start by uploading all of the shares to one server (which has
# already been done above).
# Next, we'll add three new servers to our NoNetworkGrid. We'll add
# one share from our initial upload to each of these.
# The counterintuitive ordering of the share numbers is to deal with
# the permuting of these servers -- distributing the shares this
# way ensures that the Tahoe2PeerSelector sees them in the order
# described above.
d.addCallback(lambda ign:
self._add_server_with_share(server_number=1, share_number=2))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=2, share_number=0))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=3, share_number=1))
# So, we now have the following layout:
# server 0: shares 0 - 9
# server 1: share 2
# server 2: share 0
# server 3: share 1
# We want to change the 'happy' parameter in the client to 4.
# The Tahoe2PeerSelector will see the peers permuted as:
# 2, 3, 1, 0
# Ideally, a reupload of our original data should work.
def _reset_encoding_parameters(ign, happy=4):
client = self.g.clients[0]
client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
return client
d.addCallback(_reset_encoding_parameters)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
# This scenario is basically comment:53, but with the order reversed;
# this means that the Tahoe2PeerSelector sees
# server 2: shares 1-10
# server 3: share 1
# server 1: share 2
# server 4: share 3
d.addCallback(_change_basedir)
d.addCallback(lambda ign:
self._setup_and_upload())
d.addCallback(lambda ign:
self._add_server_with_share(server_number=2, share_number=0))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=3, share_number=1))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=1, share_number=2))
# Copy all of the other shares to server number 2
def _copy_shares(ign):
for i in xrange(1, 10):
self._copy_share_to_server(i, 2)
d.addCallback(_copy_shares)
# Remove the first server, and add a placeholder with share 0
d.addCallback(lambda ign:
self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=4, share_number=0))
# Now try uploading.
d.addCallback(_reset_encoding_parameters)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
# Try the same thing, but with empty servers after the first one
# We want to make sure that Tahoe2PeerSelector will redistribute
# shares as necessary, not simply discover an existing layout.
d.addCallback(_change_basedir)
d.addCallback(lambda ign:
self._setup_and_upload())
d.addCallback(lambda ign:
self._add_server(server_number=2))
d.addCallback(lambda ign:
self._add_server(server_number=3))
d.addCallback(lambda ign:
self._add_server(server_number=1))
d.addCallback(_copy_shares)
d.addCallback(lambda ign:
self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
d.addCallback(lambda ign:
self._add_server(server_number=4))
d.addCallback(_reset_encoding_parameters)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
return d
def test_happiness_with_some_readonly_peers(self):
# Try the following layout
# server 2: shares 0-9
# server 4: share 0, read-only
# server 3: share 1, read-only
# server 1: share 2, read-only
self.basedir = self.mktemp()
d = self._setup_and_upload()
d.addCallback(lambda ign:
self._add_server_with_share(server_number=2, share_number=0))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=3, share_number=1,
readonly=True))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=1, share_number=2,
readonly=True))
# Copy all of the other shares to server number 2
def _copy_shares(ign):
for i in xrange(1, 10):
self._copy_share_to_server(i, 2)
d.addCallback(_copy_shares)
# Remove server 0, and add another in its place
d.addCallback(lambda ign:
self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=4, share_number=0,
readonly=True))
def _reset_encoding_parameters(ign, happy=4):
client = self.g.clients[0]
client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
return client
d.addCallback(_reset_encoding_parameters)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
return d
def test_happiness_with_all_readonly_peers(self):
# server 3: share 1, read-only
# server 1: share 2, read-only
# server 2: shares 0-9, read-only
# server 4: share 0, read-only
# The idea with this test is to make sure that the survey of
# read-only peers doesn't undercount servers of happiness
self.basedir = self.mktemp()
d = self._setup_and_upload()
d.addCallback(lambda ign:
self._add_server_with_share(server_number=4, share_number=0,
readonly=True))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=3, share_number=1,
readonly=True))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=1, share_number=2,
readonly=True))
d.addCallback(lambda ign:
self._add_server_with_share(server_number=2, share_number=0,
readonly=True))
def _copy_shares(ign):
for i in xrange(1, 10):
self._copy_share_to_server(i, 2)
d.addCallback(_copy_shares)
d.addCallback(lambda ign:
self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
def _reset_encoding_parameters(ign, happy=4):
client = self.g.clients[0]
client.DEFAULT_ENCODING_PARAMETERS['happy'] = happy
return client
d.addCallback(_reset_encoding_parameters)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
return d
def test_dropped_servers_in_encoder(self):
def _set_basedir(ign=None):
self.basedir = self.mktemp()
_set_basedir()
d = self._setup_and_upload();
# Add 5 servers
def _do_server_setup(ign):
self._add_server_with_share(1)
self._add_server_with_share(2)
self._add_server_with_share(3)
self._add_server_with_share(4)
self._add_server_with_share(5)
d.addCallback(_do_server_setup)
# remove the original server
# (necessary to ensure that the Tahoe2PeerSelector will distribute
# all the shares)
def _remove_server(ign):
server = self.g.servers_by_number[0]
self.g.remove_server(server.my_nodeid)
d.addCallback(_remove_server)
# This should succeed.
d.addCallback(lambda ign:
self._do_upload_with_broken_servers(1))
# Now, do the same thing over again, but drop 2 servers instead
# of 1. This should fail.
d.addCallback(_set_basedir)
d.addCallback(lambda ign:
self._setup_and_upload())
d.addCallback(_do_server_setup)
d.addCallback(_remove_server)
d.addCallback(lambda ign:
self.shouldFail(NotEnoughSharesError,
"test_dropped_servers_in_encoder",
"lost too many servers during upload "
"(still have 3, want 4)",
self._do_upload_with_broken_servers, 2))
# Now do the same thing over again, but make some of the servers
# readonly, break some of the ones that aren't, and make sure that
# happiness accounting is preserved.
d.addCallback(_set_basedir)
d.addCallback(lambda ign:
self._setup_and_upload())
def _do_server_setup_2(ign):
self._add_server_with_share(1)
self._add_server_with_share(2)
self._add_server_with_share(3)
self._add_server_with_share(4, 7, readonly=True)
self._add_server_with_share(5, 8, readonly=True)
d.addCallback(_do_server_setup_2)
d.addCallback(_remove_server)
d.addCallback(lambda ign:
self._do_upload_with_broken_servers(1))
d.addCallback(_set_basedir)
d.addCallback(lambda ign:
self._setup_and_upload())
d.addCallback(_do_server_setup_2)
d.addCallback(_remove_server)
d.addCallback(lambda ign:
self.shouldFail(NotEnoughSharesError,
"test_dropped_servers_in_encoder",
"lost too many servers during upload "
"(still have 3, want 4)",
self._do_upload_with_broken_servers, 2))
return d
def test_servers_with_unique_shares(self):
# servers_with_unique_shares expects a dict of
# shnum => peerid as a preexisting shares argument.
test1 = {
1 : "server1",
2 : "server2",
3 : "server3",
4 : "server4"
}
unique_servers = upload.servers_with_unique_shares(test1)
self.failUnlessEqual(4, len(unique_servers))
for server in ["server1", "server2", "server3", "server4"]:
self.failUnlessIn(server, unique_servers)
test1[4] = "server1"
# Now there should only be 3 unique servers.
unique_servers = upload.servers_with_unique_shares(test1)
self.failUnlessEqual(3, len(unique_servers))
for server in ["server1", "server2", "server3"]:
self.failUnlessIn(server, unique_servers)
# servers_with_unique_shares expects to receive some object with
# a peerid attribute. So we make a FakePeerTracker whose only
# job is to have a peerid attribute.
class FakePeerTracker:
pass
trackers = []
for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
t = FakePeerTracker()
t.peerid = server
t.buckets = [i]
trackers.append(t)
# Recall that there are 3 unique servers in test1. Since none of
# those overlap with the ones in trackers, we should get 7 back
unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
self.failUnlessEqual(7, len(unique_servers))
expected_servers = ["server" + str(i) for i in xrange(1, 9)]
expected_servers.remove("server4")
for server in expected_servers:
self.failUnlessIn(server, unique_servers)
# Now add an overlapping server to trackers.
t = FakePeerTracker()
t.peerid = "server1"
t.buckets = [1]
trackers.append(t)
unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
self.failUnlessEqual(7, len(unique_servers))
for server in expected_servers:
self.failUnlessIn(server, unique_servers)
test = {}
unique_servers = upload.servers_with_unique_shares(test)
self.failUnlessEqual(0, len(test))
2009-11-04 12:13:24 +00:00
def test_shares_by_server(self):
test = dict([(i, "server%d" % i) for i in xrange(1, 5)])
2009-11-04 12:13:24 +00:00
shares_by_server = upload.shares_by_server(test)
self.failUnlessEqual(set([1]), shares_by_server["server1"])
self.failUnlessEqual(set([2]), shares_by_server["server2"])
self.failUnlessEqual(set([3]), shares_by_server["server3"])
self.failUnlessEqual(set([4]), shares_by_server["server4"])
test1 = {
1 : "server1",
2 : "server1",
3 : "server1",
4 : "server2",
5 : "server2"
}
shares_by_server = upload.shares_by_server(test1)
self.failUnlessEqual(set([1, 2, 3]), shares_by_server["server1"])
self.failUnlessEqual(set([4, 5]), shares_by_server["server2"])
def test_existing_share_detection(self):
self.basedir = self.mktemp()
d = self._setup_and_upload()
# Our final setup should look like this:
# server 1: shares 1 - 10, read-only
# server 2: empty
# server 3: empty
# server 4: empty
# The purpose of this test is to make sure that the peer selector
# knows about the shares on server 1, even though it is read-only.
# It used to simply filter these out, which would cause the test
# to fail when servers_of_happiness = 4.
d.addCallback(lambda ign:
self._add_server_with_share(1, 0, True))
d.addCallback(lambda ign:
self._add_server_with_share(2))
d.addCallback(lambda ign:
self._add_server_with_share(3))
d.addCallback(lambda ign:
self._add_server_with_share(4))
def _copy_shares(ign):
for i in xrange(1, 10):
self._copy_share_to_server(i, 1)
d.addCallback(_copy_shares)
d.addCallback(lambda ign:
self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
def _prepare_client(ign):
client = self.g.clients[0]
client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
return client
d.addCallback(_prepare_client)
d.addCallback(lambda client:
client.upload(upload.Data("data" * 10000, convergence="")))
return d
def test_should_add_server(self):
shares = dict([(i, "server%d" % i) for i in xrange(10)])
self.failIf(upload.should_add_server(shares, "server1", 4))
shares[4] = "server1"
self.failUnless(upload.should_add_server(shares, "server4", 4))
shares = {}
self.failUnless(upload.should_add_server(shares, "server1", 1))
def _set_up_nodes_extra_config(self, clientdir):
cfgfn = os.path.join(clientdir, "tahoe.cfg")
oldcfg = open(cfgfn, "r").read()
f = open(cfgfn, "wt")
f.write(oldcfg)
f.write("\n")
f.write("[client]\n")
f.write("shares.needed = 7\n")
f.write("shares.total = 12\n")
f.write("\n")
f.close()
return None
2007-04-24 00:30:40 +00:00
# TODO:
# upload with exactly 75 peers (shares_of_happiness)
# have a download fail
# cancel a download (need to implement more cancel stuff)