mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-03-31 15:50:42 +00:00
Refactor some behavior into a mixin, and add tests for the behavior described in #778
This commit is contained in:
parent
362f204075
commit
ee9690b357
@ -1,5 +1,5 @@
|
||||
|
||||
import os
|
||||
import os, shutil
|
||||
from cStringIO import StringIO
|
||||
from twisted.trial import unittest
|
||||
from twisted.python.failure import Failure
|
||||
@ -9,7 +9,7 @@ from foolscap.api import fireEventually
|
||||
|
||||
import allmydata # for __full_version__
|
||||
from allmydata import uri, monitor, client
|
||||
from allmydata.immutable import upload
|
||||
from allmydata.immutable import upload, encode
|
||||
from allmydata.interfaces import FileTooLargeError, NoSharesError, \
|
||||
NotEnoughSharesError
|
||||
from allmydata.util.assertutil import precondition
|
||||
@ -17,6 +17,7 @@ from allmydata.util.deferredutil import DeferredListShouldSucceed
|
||||
from no_network import GridTestMixin
|
||||
from common_util import ShouldFailMixin
|
||||
from allmydata.storage_client import StorageFarmBroker
|
||||
from allmydata.storage.server import storage_index_to_dir
|
||||
|
||||
MiB = 1024*1024
|
||||
|
||||
@ -87,6 +88,15 @@ class Uploadable(unittest.TestCase):
|
||||
class ServerError(Exception):
|
||||
pass
|
||||
|
||||
class SetDEPMixin:
|
||||
def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
|
||||
p = {"k": k,
|
||||
"happy": happy,
|
||||
"n": n,
|
||||
"max_segment_size": max_segsize,
|
||||
}
|
||||
self.node.DEFAULT_ENCODING_PARAMETERS = p
|
||||
|
||||
class FakeStorageServer:
|
||||
def __init__(self, mode):
|
||||
self.mode = mode
|
||||
@ -234,21 +244,13 @@ def upload_filehandle(uploader, fh):
|
||||
u = upload.FileHandle(fh, convergence=None)
|
||||
return uploader.upload(u)
|
||||
|
||||
class GoodServer(unittest.TestCase, ShouldFailMixin):
|
||||
class GoodServer(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
|
||||
def setUp(self):
|
||||
self.node = FakeClient(mode="good")
|
||||
self.u = upload.Uploader()
|
||||
self.u.running = True
|
||||
self.u.parent = self.node
|
||||
|
||||
def set_encoding_parameters(self, k, happy, n, max_segsize=1*MiB):
|
||||
p = {"k": k,
|
||||
"happy": happy,
|
||||
"n": n,
|
||||
"max_segment_size": max_segsize,
|
||||
}
|
||||
self.node.DEFAULT_ENCODING_PARAMETERS = p
|
||||
|
||||
def _check_small(self, newuri, size):
|
||||
u = uri.from_string(newuri)
|
||||
self.failUnless(isinstance(u, uri.LiteralFileURI))
|
||||
@ -372,7 +374,7 @@ class GoodServer(unittest.TestCase, ShouldFailMixin):
|
||||
d.addCallback(self._check_large, SIZE_LARGE)
|
||||
return d
|
||||
|
||||
class ServerErrors(unittest.TestCase, ShouldFailMixin):
|
||||
class ServerErrors(unittest.TestCase, ShouldFailMixin, SetDEPMixin):
|
||||
def make_node(self, mode, num_servers=10):
|
||||
self.node = FakeClient(mode, num_servers)
|
||||
self.u = upload.Uploader()
|
||||
@ -672,7 +674,94 @@ class StorageIndex(unittest.TestCase):
|
||||
d.addCallback(_done)
|
||||
return d
|
||||
|
||||
class EncodingParameters(GridTestMixin, unittest.TestCase):
|
||||
class EncodingParameters(GridTestMixin, unittest.TestCase, SetDEPMixin,
|
||||
ShouldFailMixin):
|
||||
def _do_upload_with_broken_servers(self, servers_to_break):
|
||||
"""
|
||||
I act like a normal upload, but before I send the results of
|
||||
Tahoe2PeerSelector to the Encoder, I break the first servers_to_break
|
||||
PeerTrackers in the used_peers part of the return result.
|
||||
"""
|
||||
assert self.g, "I tried to find a grid at self.g, but failed"
|
||||
broker = self.g.clients[0].storage_broker
|
||||
sh = self.g.clients[0]._secret_holder
|
||||
data = upload.Data("data" * 10000, convergence="")
|
||||
data.encoding_param_k = 3
|
||||
data.encoding_param_happy = 4
|
||||
data.encoding_param_n = 10
|
||||
uploadable = upload.EncryptAnUploadable(data)
|
||||
encoder = encode.Encoder()
|
||||
encoder.set_encrypted_uploadable(uploadable)
|
||||
status = upload.UploadStatus()
|
||||
selector = upload.Tahoe2PeerSelector("dglev", "test", status)
|
||||
storage_index = encoder.get_param("storage_index")
|
||||
share_size = encoder.get_param("share_size")
|
||||
block_size = encoder.get_param("block_size")
|
||||
num_segments = encoder.get_param("num_segments")
|
||||
d = selector.get_shareholders(broker, sh, storage_index,
|
||||
share_size, block_size, num_segments,
|
||||
10, 4)
|
||||
def _have_shareholders((used_peers, already_peers)):
|
||||
assert servers_to_break <= len(used_peers)
|
||||
for index in xrange(servers_to_break):
|
||||
server = list(used_peers)[index]
|
||||
for share in server.buckets.keys():
|
||||
server.buckets[share].abort()
|
||||
buckets = {}
|
||||
for peer in used_peers:
|
||||
buckets.update(peer.buckets)
|
||||
encoder.set_shareholders(buckets)
|
||||
d = encoder.start()
|
||||
return d
|
||||
d.addCallback(_have_shareholders)
|
||||
return d
|
||||
|
||||
def _add_server_with_share(self, server_number, share_number=None,
|
||||
readonly=False):
|
||||
assert self.g, "I tried to find a grid at self.g, but failed"
|
||||
assert self.shares, "I tried to find shares at self.shares, but failed"
|
||||
ss = self.g.make_server(server_number, readonly)
|
||||
self.g.add_server(server_number, ss)
|
||||
if share_number:
|
||||
# Copy share i from the directory associated with the first
|
||||
# storage server to the directory associated with this one.
|
||||
old_share_location = self.shares[share_number][2]
|
||||
new_share_location = os.path.join(ss.storedir, "shares")
|
||||
si = uri.from_string(self.uri).get_storage_index()
|
||||
new_share_location = os.path.join(new_share_location,
|
||||
storage_index_to_dir(si))
|
||||
if not os.path.exists(new_share_location):
|
||||
os.makedirs(new_share_location)
|
||||
new_share_location = os.path.join(new_share_location,
|
||||
str(share_number))
|
||||
shutil.copy(old_share_location, new_share_location)
|
||||
shares = self.find_shares(self.uri)
|
||||
# Make sure that the storage server has the share.
|
||||
self.failUnless((share_number, ss.my_nodeid, new_share_location)
|
||||
in shares)
|
||||
|
||||
def _setup_and_upload(self):
|
||||
"""
|
||||
I set up a NoNetworkGrid with a single server and client,
|
||||
upload a file to it, store its uri in self.uri, and store its
|
||||
sharedata in self.shares.
|
||||
"""
|
||||
self.set_up_grid(num_clients=1, num_servers=1)
|
||||
client = self.g.clients[0]
|
||||
client.DEFAULT_ENCODING_PARAMETERS['happy'] = 1
|
||||
data = upload.Data("data" * 10000, convergence="")
|
||||
self.data = data
|
||||
d = client.upload(data)
|
||||
def _store_uri(ur):
|
||||
self.uri = ur.uri
|
||||
d.addCallback(_store_uri)
|
||||
d.addCallback(lambda ign:
|
||||
self.find_shares(self.uri))
|
||||
def _store_shares(shares):
|
||||
self.shares = shares
|
||||
d.addCallback(_store_shares)
|
||||
return d
|
||||
|
||||
def test_configure_parameters(self):
|
||||
self.basedir = self.mktemp()
|
||||
hooks = {0: self._set_up_nodes_extra_config}
|
||||
@ -692,6 +781,233 @@ class EncodingParameters(GridTestMixin, unittest.TestCase):
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
def _setUp(self, ns):
|
||||
# Used by test_happy_semantics and test_prexisting_share_behavior
|
||||
# to set up the grid.
|
||||
self.node = FakeClient(mode="good", num_servers=ns)
|
||||
self.u = upload.Uploader()
|
||||
self.u.running = True
|
||||
self.u.parent = self.node
|
||||
|
||||
def test_happy_semantics(self):
|
||||
self._setUp(2)
|
||||
DATA = upload.Data("kittens" * 10000, convergence="")
|
||||
# These parameters are unsatisfiable with the client that we've made
|
||||
# -- we'll use them to test that the semnatics work correctly.
|
||||
self.set_encoding_parameters(k=3, happy=5, n=10)
|
||||
d = self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
|
||||
"shares could only be placed on 2 servers "
|
||||
"(5 were requested)",
|
||||
self.u.upload, DATA)
|
||||
# Let's reset the client to have 10 servers
|
||||
d.addCallback(lambda ign:
|
||||
self._setUp(10))
|
||||
# These parameters are satisfiable with the client we've made.
|
||||
d.addCallback(lambda ign:
|
||||
self.set_encoding_parameters(k=3, happy=5, n=10))
|
||||
# this should work
|
||||
d.addCallback(lambda ign:
|
||||
self.u.upload(DATA))
|
||||
# Let's reset the client to have 7 servers
|
||||
# (this is less than n, but more than h)
|
||||
d.addCallback(lambda ign:
|
||||
self._setUp(7))
|
||||
# These encoding parameters should still be satisfiable with our
|
||||
# client setup
|
||||
d.addCallback(lambda ign:
|
||||
self.set_encoding_parameters(k=3, happy=5, n=10))
|
||||
# This, then, should work.
|
||||
d.addCallback(lambda ign:
|
||||
self.u.upload(DATA))
|
||||
return d
|
||||
|
||||
def test_problem_layouts(self):
|
||||
self.basedir = self.mktemp()
|
||||
# This scenario is at
|
||||
# http://allmydata.org/trac/tahoe/ticket/778#comment:52
|
||||
#
|
||||
# The scenario in comment:52 proposes that we have a layout
|
||||
# like:
|
||||
# server 1: share 1
|
||||
# server 2: share 1
|
||||
# server 3: share 1
|
||||
# server 4: shares 2 - 10
|
||||
# To get access to the shares, we will first upload to one
|
||||
# server, which will then have shares 1 - 10. We'll then
|
||||
# add three new servers, configure them to not accept any new
|
||||
# shares, then write share 1 directly into the serverdir of each.
|
||||
# Then each of servers 1 - 3 will report that they have share 1,
|
||||
# and will not accept any new share, while server 4 will report that
|
||||
# it has shares 2 - 10 and will accept new shares.
|
||||
# We'll then set 'happy' = 4, and see that an upload fails
|
||||
# (as it should)
|
||||
d = self._setup_and_upload()
|
||||
d.addCallback(lambda ign:
|
||||
self._add_server_with_share(1, 0, True))
|
||||
d.addCallback(lambda ign:
|
||||
self._add_server_with_share(2, 0, True))
|
||||
d.addCallback(lambda ign:
|
||||
self._add_server_with_share(3, 0, True))
|
||||
# Remove the first share from server 0.
|
||||
def _remove_share_0():
|
||||
share_location = self.shares[0][2]
|
||||
os.remove(share_location)
|
||||
d.addCallback(lambda ign:
|
||||
_remove_share_0())
|
||||
# Set happy = 4 in the client.
|
||||
def _prepare():
|
||||
client = self.g.clients[0]
|
||||
client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
|
||||
return client
|
||||
d.addCallback(lambda ign:
|
||||
_prepare())
|
||||
# Uploading data should fail
|
||||
d.addCallback(lambda client:
|
||||
self.shouldFail(NotEnoughSharesError, "test_happy_semantics",
|
||||
"shares could only be placed on 1 servers "
|
||||
"(4 were requested)",
|
||||
client.upload, upload.Data("data" * 10000,
|
||||
convergence="")))
|
||||
|
||||
|
||||
# This scenario is at
|
||||
# http://allmydata.org/trac/tahoe/ticket/778#comment:53
|
||||
#
|
||||
# Set up the grid to have one server
|
||||
def _change_basedir(ign):
|
||||
self.basedir = self.mktemp()
|
||||
d.addCallback(_change_basedir)
|
||||
d.addCallback(lambda ign:
|
||||
self._setup_and_upload())
|
||||
# We want to have a layout like this:
|
||||
# server 1: share 1
|
||||
# server 2: share 2
|
||||
# server 3: share 3
|
||||
# server 4: shares 1 - 10
|
||||
# (this is an expansion of Zooko's example because it is easier
|
||||
# to code, but it will fail in the same way)
|
||||
# To start, we'll create a server with shares 1-10 of the data
|
||||
# we're about to upload.
|
||||
# Next, we'll add three new servers to our NoNetworkGrid. We'll add
|
||||
# one share from our initial upload to each of these.
|
||||
# The counterintuitive ordering of the share numbers is to deal with
|
||||
# the permuting of these servers -- distributing the shares this
|
||||
# way ensures that the Tahoe2PeerSelector sees them in the order
|
||||
# described above.
|
||||
d.addCallback(lambda ign:
|
||||
self._add_server_with_share(server_number=1, share_number=2))
|
||||
d.addCallback(lambda ign:
|
||||
self._add_server_with_share(server_number=2, share_number=0))
|
||||
d.addCallback(lambda ign:
|
||||
self._add_server_with_share(server_number=3, share_number=1))
|
||||
# So, we now have the following layout:
|
||||
# server 0: shares 1 - 10
|
||||
# server 1: share 0
|
||||
# server 2: share 1
|
||||
# server 3: share 2
|
||||
# We want to change the 'happy' parameter in the client to 4.
|
||||
# We then want to feed the upload process a list of peers that
|
||||
# server 0 is at the front of, so we trigger Zooko's scenario.
|
||||
# Ideally, a reupload of our original data should work.
|
||||
def _reset_encoding_parameters(ign):
|
||||
client = self.g.clients[0]
|
||||
client.DEFAULT_ENCODING_PARAMETERS['happy'] = 4
|
||||
return client
|
||||
d.addCallback(_reset_encoding_parameters)
|
||||
# We need this to get around the fact that the old Data
|
||||
# instance already has a happy parameter set.
|
||||
d.addCallback(lambda client:
|
||||
client.upload(upload.Data("data" * 10000, convergence="")))
|
||||
return d
|
||||
|
||||
|
||||
def test_dropped_servers_in_encoder(self):
|
||||
def _set_basedir(ign=None):
|
||||
self.basedir = self.mktemp()
|
||||
_set_basedir()
|
||||
d = self._setup_and_upload();
|
||||
# Add 5 servers, with one share each from the original
|
||||
# Add a readonly server
|
||||
def _do_server_setup(ign):
|
||||
self._add_server_with_share(1, 1, True)
|
||||
self._add_server_with_share(2)
|
||||
self._add_server_with_share(3)
|
||||
self._add_server_with_share(4)
|
||||
self._add_server_with_share(5)
|
||||
d.addCallback(_do_server_setup)
|
||||
# remove the original server
|
||||
# (necessary to ensure that the Tahoe2PeerSelector will distribute
|
||||
# all the shares)
|
||||
def _remove_server(ign):
|
||||
server = self.g.servers_by_number[0]
|
||||
self.g.remove_server(server.my_nodeid)
|
||||
d.addCallback(_remove_server)
|
||||
# This should succeed.
|
||||
d.addCallback(lambda ign:
|
||||
self._do_upload_with_broken_servers(1))
|
||||
# Now, do the same thing over again, but drop 2 servers instead
|
||||
# of 1. This should fail.
|
||||
d.addCallback(_set_basedir)
|
||||
d.addCallback(lambda ign:
|
||||
self._setup_and_upload())
|
||||
d.addCallback(_do_server_setup)
|
||||
d.addCallback(_remove_server)
|
||||
d.addCallback(lambda ign:
|
||||
self.shouldFail(NotEnoughSharesError,
|
||||
"test_dropped_server_in_encoder", "",
|
||||
self._do_upload_with_broken_servers, 2))
|
||||
return d
|
||||
|
||||
|
||||
def test_servers_with_unique_shares(self):
|
||||
# servers_with_unique_shares expects a dict of
|
||||
# shnum => peerid as a preexisting shares argument.
|
||||
test1 = {
|
||||
1 : "server1",
|
||||
2 : "server2",
|
||||
3 : "server3",
|
||||
4 : "server4"
|
||||
}
|
||||
unique_servers = upload.servers_with_unique_shares(test1)
|
||||
self.failUnlessEqual(4, len(unique_servers))
|
||||
for server in ["server1", "server2", "server3", "server4"]:
|
||||
self.failUnlessIn(server, unique_servers)
|
||||
test1[4] = "server1"
|
||||
# Now there should only be 3 unique servers.
|
||||
unique_servers = upload.servers_with_unique_shares(test1)
|
||||
self.failUnlessEqual(3, len(unique_servers))
|
||||
for server in ["server1", "server2", "server3"]:
|
||||
self.failUnlessIn(server, unique_servers)
|
||||
# servers_with_unique_shares expects a set of PeerTracker
|
||||
# instances as a used_peers argument, but only uses the peerid
|
||||
# instance variable to assess uniqueness. So we feed it some fake
|
||||
# PeerTrackers whose only important characteristic is that they
|
||||
# have peerid set to something.
|
||||
class FakePeerTracker:
|
||||
pass
|
||||
trackers = []
|
||||
for server in ["server5", "server6", "server7", "server8"]:
|
||||
t = FakePeerTracker()
|
||||
t.peerid = server
|
||||
trackers.append(t)
|
||||
# Recall that there are 3 unique servers in test1. Since none of
|
||||
# those overlap with the ones in trackers, we should get 7 back
|
||||
unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
|
||||
self.failUnlessEqual(7, len(unique_servers))
|
||||
expected_servers = ["server" + str(i) for i in xrange(1, 9)]
|
||||
expected_servers.remove("server4")
|
||||
for server in expected_servers:
|
||||
self.failUnlessIn(server, unique_servers)
|
||||
# Now add an overlapping server to trackers.
|
||||
t = FakePeerTracker()
|
||||
t.peerid = "server1"
|
||||
trackers.append(t)
|
||||
unique_servers = upload.servers_with_unique_shares(test1, set(trackers))
|
||||
self.failUnlessEqual(7, len(unique_servers))
|
||||
for server in expected_servers:
|
||||
self.failUnlessIn(server, unique_servers)
|
||||
|
||||
|
||||
def _set_up_nodes_extra_config(self, clientdir):
|
||||
cfgfn = os.path.join(clientdir, "tahoe.cfg")
|
||||
oldcfg = open(cfgfn, "r").read()
|
||||
|
Loading…
x
Reference in New Issue
Block a user