Eliminate mock dependency.

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
Daira Hopwood 2015-07-17 22:03:53 +01:00
parent 3ae6ceb6a8
commit c830419e04
11 changed files with 254 additions and 179 deletions

View File

@ -44,10 +44,6 @@ install_requires = [
# pycrypto 2.4 doesn't work due to <https://bugs.launchpad.net/pycrypto/+bug/881130>
"pycrypto >= 2.1.0, != 2.2, != 2.4",
# <http://www.voidspace.org.uk/python/mock/>, 0.8.0 provides "call"
# mock 1.1.x seems to cause problems on several buildslaves.
"mock >= 0.8.0, <= 1.0.1",
# pycryptopp-0.6.0 includes ed25519
"pycryptopp >= 0.6.0",
@ -74,7 +70,6 @@ package_imports = [
('simplejson', 'simplejson'),
('pycrypto', 'Crypto'),
('pyasn1', 'pyasn1'),
('mock', 'mock'),
('service-identity', 'service_identity'),
('characteristic', 'characteristic'),
('pyasn1-modules', 'pyasn1_modules'),

View File

@ -1,13 +1,14 @@
import os.path
from twisted.trial import unittest
from cStringIO import StringIO
import urllib, sys
from mock import Mock, call
from twisted.trial import unittest
from twisted.python.monkey import MonkeyPatcher
import allmydata
from allmydata.util import fileutil, hashutil, base32, keyutil
from allmydata.util.namespace import Namespace
from allmydata import uri
from allmydata.immutable import upload
from allmydata.dirnode import normalize
@ -541,22 +542,29 @@ class CLI(CLITestMixin, unittest.TestCase):
def test_exception_catcher(self):
self.basedir = "cli/exception_catcher"
runner_mock = Mock()
sys_exit_mock = Mock()
stderr = StringIO()
self.patch(sys, "argv", ["tahoe"])
self.patch(runner, "runner", runner_mock)
self.patch(sys, "exit", sys_exit_mock)
self.patch(sys, "stderr", stderr)
exc = Exception("canary")
ns = Namespace()
ns.runner_called = False
def call_runner(args, install_node_control=True):
ns.runner_called = True
self.failUnlessEqual(install_node_control, True)
raise exc
runner_mock.side_effect = call_runner
runner.run()
self.failUnlessEqual(runner_mock.call_args_list, [call([], install_node_control=True)])
self.failUnlessEqual(sys_exit_mock.call_args_list, [call(1)])
ns.sys_exit_called = False
def call_sys_exit(exitcode):
ns.sys_exit_called = True
self.failUnlessEqual(exitcode, 1)
patcher = MonkeyPatcher((runner, 'runner', call_runner),
(sys, 'argv', ["tahoe"]),
(sys, 'exit', call_sys_exit),
(sys, 'stderr', stderr))
patcher.runWithPatches(runner.run)
self.failUnless(ns.runner_called)
self.failUnless(ns.sys_exit_called)
self.failUnlessIn(str(exc), stderr.getvalue())

View File

@ -1,13 +1,16 @@
import os.path
from twisted.trial import unittest
from cStringIO import StringIO
import re
from mock import patch
from twisted.trial import unittest
from twisted.python.monkey import MonkeyPatcher
import __builtin__
from allmydata.util import fileutil
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.encodingutil import get_io_encoding, unicode_to_argv
from allmydata.util.namespace import Namespace
from allmydata.scripts import cli, backupdb
from .common_util import StallMixin
from .no_network import GridTestMixin
@ -328,20 +331,25 @@ class Backup(GridTestMixin, CLITestMixin, StallMixin, unittest.TestCase):
self._check_filtering(filtered, root_listdir, (u'lib.a', u'_darcs', u'subdir'),
(nice_doc,))
@patch('__builtin__.file')
def test_exclude_from_tilde_expansion(self, mock):
def test_exclude_from_tilde_expansion(self):
basedir = "cli/Backup/exclude_from_tilde_expansion"
fileutil.make_dirs(basedir)
nodeurl_path = os.path.join(basedir, 'node.url')
fileutil.write(nodeurl_path, 'http://example.net:2357/')
def parse(args): return parse_options(basedir, "backup", args)
# ensure that tilde expansion is performed on exclude-from argument
exclude_file = u'~/.tahoe/excludes.dummy'
mock.return_value = StringIO()
parse(['--exclude-from', unicode_to_argv(exclude_file), 'from', 'to'])
self.failUnlessIn(((abspath_expanduser_unicode(exclude_file),), {}), mock.call_args_list)
ns = Namespace()
ns.called = False
def call_file(name, *args):
ns.called = True
self.failUnlessEqual(name, abspath_expanduser_unicode(exclude_file))
return StringIO()
patcher = MonkeyPatcher((__builtin__, 'file', call_file))
patcher.runWithPatches(parse_options, basedir, "backup", ['--exclude-from', unicode_to_argv(exclude_file), 'from', 'to'])
self.failUnless(ns.called)
def test_ignore_symlinks(self):
if not hasattr(os, 'symlink'):

View File

@ -1,8 +1,12 @@
import os, sys
import twisted
from twisted.trial import unittest
from twisted.application import service
import allmydata
import allmydata.frontends.drop_upload
import allmydata.util.log
from allmydata.node import Node, OldConfigError, OldConfigOptionError, MissingConfigEntry, UnescapedHashError
from allmydata.frontends.auth import NeedRootcapLookupScheme
from allmydata import client
@ -14,7 +18,6 @@ from allmydata.interfaces import IFilesystemNode, IFileNode, \
from foolscap.api import flushEventualQueue
import allmydata.test.common_util as testutil
import mock
BASECONFIG = ("[client]\n"
"introducer.furl = \n"
@ -55,8 +58,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
client.Client(basedir)
@mock.patch('twisted.python.log.msg')
def test_error_on_old_config_files(self, mock_log_msg):
def test_error_on_old_config_files(self):
basedir = "test_client.Basic.test_error_on_old_config_files"
os.mkdir(basedir)
fileutil.write(os.path.join(basedir, "tahoe.cfg"),
@ -69,6 +71,9 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
fileutil.write(os.path.join(basedir, "readonly_storage"), "")
fileutil.write(os.path.join(basedir, "debug_discard_storage"), "")
logged_messages = []
self.patch(twisted.python.log, 'msg', logged_messages.append)
e = self.failUnlessRaises(OldConfigError, client.Client, basedir)
abs_basedir = fileutil.abspath_expanduser_unicode(unicode(basedir)).encode(sys.getfilesystemencoding())
self.failUnlessIn(os.path.join(abs_basedir, "introducer.furl"), e.args[0])
@ -78,18 +83,18 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
for oldfile in ['introducer.furl', 'no_storage', 'readonly_storage',
'debug_discard_storage']:
logged = [ m for m in mock_log_msg.call_args_list if
("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m[0][0]) and oldfile in str(m[0][0])) ]
self.failUnless(logged, (oldfile, mock_log_msg.call_args_list))
logged = [ m for m in logged_messages if
("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m) and oldfile in str(m)) ]
self.failUnless(logged, (oldfile, logged_messages))
for oldfile in [
'nickname', 'webport', 'keepalive_timeout', 'log_gatherer.furl',
'disconnect_timeout', 'advertised_ip_addresses', 'helper.furl',
'key_generator.furl', 'stats_gatherer.furl', 'sizelimit',
'run_helper']:
logged = [ m for m in mock_log_msg.call_args_list if
("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m[0][0]) and oldfile in str(m[0][0])) ]
self.failIf(logged, oldfile)
logged = [ m for m in logged_messages if
("Found pre-Tahoe-LAFS-v1.3 configuration file" in str(m) and oldfile in str(m)) ]
self.failIf(logged, (oldfile, logged_messages))
def test_secrets(self):
basedir = "test_client.Basic.test_secrets"
@ -297,9 +302,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
_check("helper.furl = None", None)
_check("helper.furl = pb://blah\n", "pb://blah")
@mock.patch('allmydata.util.log.msg')
@mock.patch('allmydata.frontends.drop_upload.DropUploader')
def test_create_drop_uploader(self, mock_drop_uploader, mock_log_msg):
def test_create_drop_uploader(self):
class MockDropUploader(service.MultiService):
name = 'drop-upload'
@ -310,7 +313,7 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
self.local_dir_utf8 = local_dir_utf8
self.inotify = inotify
mock_drop_uploader.side_effect = MockDropUploader
self.patch(allmydata.frontends.drop_upload, 'DropUploader', MockDropUploader)
upload_dircap = "URI:DIR2:blah"
local_dir_utf8 = u"loc\u0101l_dir".encode('utf-8')
@ -347,7 +350,14 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
class Boom(Exception):
pass
mock_drop_uploader.side_effect = Boom()
def BoomDropUploader(client, upload_dircap, local_dir_utf8, inotify=None):
raise Boom()
logged_messages = []
def mock_log(*args, **kwargs):
logged_messages.append("%r %r" % (args, kwargs))
self.patch(allmydata.util.log, 'msg', mock_log)
self.patch(allmydata.frontends.drop_upload, 'DropUploader', BoomDropUploader)
basedir2 = "test_client.Basic.test_create_drop_uploader2"
os.mkdir(basedir2)
@ -360,8 +370,8 @@ class Basic(testutil.ReallyEqualMixin, unittest.TestCase):
fileutil.write(os.path.join(basedir2, "private", "drop_upload_dircap"), "URI:DIR2:blah")
c2 = client.Client(basedir2)
self.failUnlessRaises(KeyError, c2.getServiceNamed, 'drop-upload')
self.failUnless([True for arg in mock_log_msg.call_args_list if "Boom" in repr(arg)],
mock_log_msg.call_args_list)
self.failUnless([True for arg in logged_messages if "Boom" in arg],
logged_messages)
def flush_but_dont_ignore(res):

View File

@ -56,10 +56,11 @@ if __name__ == "__main__":
shutil.rmtree(tmpdir)
sys.exit(0)
from twisted.trial import unittest
from mock import patch
import os, sys, locale
from twisted.trial import unittest
from allmydata.test.common_util import ReallyEqualMixin
from allmydata.util import encodingutil, fileutil
from allmydata.util.encodingutil import argv_to_unicode, unicode_to_url, \
@ -70,10 +71,15 @@ from allmydata.dirnode import normalize
from twisted.python import usage
class EncodingUtilErrors(ReallyEqualMixin, unittest.TestCase):
@patch('sys.stdout')
def test_get_io_encoding(self, mock_stdout):
class MockStdout(object):
pass
class EncodingUtilErrors(ReallyEqualMixin, unittest.TestCase):
def test_get_io_encoding(self):
mock_stdout = MockStdout()
self.patch(sys, 'stdout', mock_stdout)
mock_stdout.encoding = 'UTF-8'
_reload()
self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
@ -94,29 +100,25 @@ class EncodingUtilErrors(ReallyEqualMixin, unittest.TestCase):
else:
self.failUnlessRaises(AssertionError, _reload)
@patch('locale.getpreferredencoding')
def test_get_io_encoding_not_from_stdout(self, mock_locale_getpreferredencoding):
locale # hush pyflakes
mock_locale_getpreferredencoding.return_value = 'koi8-r'
def test_get_io_encoding_not_from_stdout(self):
preferredencoding = 'koi8-r'
def call_locale_getpreferredencoding():
return preferredencoding
self.patch(locale, 'getpreferredencoding', call_locale_getpreferredencoding)
mock_stdout = MockStdout()
self.patch(sys, 'stdout', mock_stdout)
class DummyStdout:
pass
old_stdout = sys.stdout
sys.stdout = DummyStdout()
try:
expected = sys.platform == "win32" and 'utf-8' or 'koi8-r'
_reload()
self.failUnlessReallyEqual(get_io_encoding(), expected)
expected = sys.platform == "win32" and 'utf-8' or 'koi8-r'
_reload()
self.failUnlessReallyEqual(get_io_encoding(), expected)
sys.stdout.encoding = None
_reload()
self.failUnlessReallyEqual(get_io_encoding(), expected)
mock_stdout.encoding = None
_reload()
self.failUnlessReallyEqual(get_io_encoding(), expected)
mock_locale_getpreferredencoding.return_value = None
_reload()
self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
finally:
sys.stdout = old_stdout
preferredencoding = None
_reload()
self.failUnlessReallyEqual(get_io_encoding(), 'utf-8')
def test_argv_to_unicode(self):
encodingutil.io_encoding = 'utf-8'
@ -128,18 +130,18 @@ class EncodingUtilErrors(ReallyEqualMixin, unittest.TestCase):
encodingutil.io_encoding = 'koi8-r'
self.failUnlessRaises(UnicodeEncodeError, unicode_to_output, lumiere_nfc)
@patch('os.listdir')
def test_no_unicode_normalization(self, mock):
def test_no_unicode_normalization(self):
# Pretend to run on a Unicode platform.
# We normalized to NFC in 1.7beta, but we now don't.
orig_platform = sys.platform
try:
sys.platform = 'darwin'
mock.return_value = [Artonwall_nfd]
_reload()
self.failUnlessReallyEqual(listdir_unicode(u'/dummy'), [Artonwall_nfd])
finally:
sys.platform = orig_platform
# listdir_unicode normalized to NFC in 1.7beta, but now doesn't.
def call_os_listdir(path):
return [Artonwall_nfd]
self.patch(os, 'listdir', call_os_listdir)
self.patch(sys, 'platform', 'darwin')
_reload()
self.failUnlessReallyEqual(listdir_unicode(u'/dummy'), [Artonwall_nfd])
# The following tests apply only to platforms that don't store filenames as
# Unicode entities on the filesystem.
@ -153,16 +155,21 @@ class EncodingUtilNonUnicodePlatform(unittest.TestCase):
sys.platform = self.original_platform
_reload()
@patch('sys.getfilesystemencoding')
@patch('os.listdir')
def test_listdir_unicode(self, mock_listdir, mock_getfilesystemencoding):
def test_listdir_unicode(self):
# What happens if latin1-encoded filenames are encountered on an UTF-8
# filesystem?
mock_listdir.return_value = [
lumiere_nfc.encode('utf-8'),
lumiere_nfc.encode('latin1')]
def call_os_listdir(path):
return [
lumiere_nfc.encode('utf-8'),
lumiere_nfc.encode('latin1')
]
self.patch(os, 'listdir', call_os_listdir)
sys_filesystemencoding = 'utf-8'
def call_sys_getfilesystemencoding():
return sys_filesystemencoding
self.patch(sys, 'getfilesystemencoding', call_sys_getfilesystemencoding)
mock_getfilesystemencoding.return_value = 'utf-8'
_reload()
self.failUnlessRaises(FilenameEncodingError,
listdir_unicode,
@ -170,7 +177,7 @@ class EncodingUtilNonUnicodePlatform(unittest.TestCase):
# We're trying to list a directory whose name cannot be represented in
# the filesystem encoding. This should fail.
mock_getfilesystemencoding.return_value = 'ascii'
sys_filesystemencoding = 'ascii'
_reload()
self.failUnlessRaises(FilenameEncodingError,
listdir_unicode,
@ -186,12 +193,14 @@ class EncodingUtil(ReallyEqualMixin):
sys.platform = self.original_platform
_reload()
@patch('sys.stdout')
def test_argv_to_unicode(self, mock):
def test_argv_to_unicode(self):
if 'argv' not in dir(self):
return
mock.encoding = self.io_encoding
mock_stdout = MockStdout()
mock_stdout.encoding = self.io_encoding
self.patch(sys, 'stdout', mock_stdout)
argu = lumiere_nfc
argv = self.argv
_reload()
@ -200,12 +209,14 @@ class EncodingUtil(ReallyEqualMixin):
def test_unicode_to_url(self):
self.failUnless(unicode_to_url(lumiere_nfc), "lumi\xc3\xa8re")
@patch('sys.stdout')
def test_unicode_to_output(self, mock):
def test_unicode_to_output(self):
if 'argv' not in dir(self):
return
mock.encoding = self.io_encoding
mock_stdout = MockStdout()
mock_stdout.encoding = self.io_encoding
self.patch(sys, 'stdout', mock_stdout)
_reload()
self.failUnlessReallyEqual(unicode_to_output(lumiere_nfc), self.argv)
@ -221,9 +232,7 @@ class EncodingUtil(ReallyEqualMixin):
_reload()
self.failUnlessReallyEqual(unicode_platform(), matrix[self.platform])
@patch('sys.getfilesystemencoding')
@patch('os.listdir')
def test_listdir_unicode(self, mock_listdir, mock_getfilesystemencoding):
def test_listdir_unicode(self):
if 'dirlist' not in dir(self):
return
@ -234,8 +243,13 @@ class EncodingUtil(ReallyEqualMixin):
"that we are testing for the benefit of a different platform."
% (self.filesystem_encoding,))
mock_listdir.return_value = self.dirlist
mock_getfilesystemencoding.return_value = self.filesystem_encoding
def call_os_listdir(path):
return self.dirlist
self.patch(os, 'listdir', call_os_listdir)
def call_sys_getfilesystemencoding():
return self.filesystem_encoding
self.patch(sys, 'getfilesystemencoding', call_sys_getfilesystemencoding)
_reload()
filenames = listdir_unicode(u'/dummy')

View File

@ -1,8 +1,8 @@
import random
from twisted.trial import unittest
from twisted.internet import defer
import mock
from foolscap.api import eventually
from allmydata.test import common
@ -16,6 +16,11 @@ from allmydata.interfaces import NotEnoughSharesError
from allmydata.immutable.upload import Data
from allmydata.immutable.downloader import finder
class MockShareHashTree(object):
def needed_hashes(self):
return False
class MockNode(object):
def __init__(self, check_reneging, check_fetch_failed):
self.got = 0
@ -27,8 +32,7 @@ class MockNode(object):
self.check_fetch_failed = check_fetch_failed
self._si_prefix='aa'
self.have_UEB = True
self.share_hash_tree = mock.Mock()
self.share_hash_tree.needed_hashes.return_value = False
self.share_hash_tree = MockShareHashTree()
self.on_want_more_shares = None
def when_finished(self):
@ -75,6 +79,9 @@ class TestShareFinder(unittest.TestCase):
rcap = uri.CHKFileURI('a'*32, 'a'*32, 3, 99, 100)
vcap = rcap.get_verify_cap()
class MockBuckets(object):
pass
class MockServer(object):
def __init__(self, buckets):
self.version = {
@ -98,6 +105,7 @@ class TestShareFinder(unittest.TestCase):
self.s.hungry()
eventually(_give_buckets_and_hunger_again)
return d
class MockIServer(object):
def __init__(self, serverid, rref):
self.serverid = serverid
@ -111,15 +119,28 @@ class TestShareFinder(unittest.TestCase):
def get_version(self):
return self.rref.version
mockserver1 = MockServer({1: mock.Mock(), 2: mock.Mock()})
class MockStorageBroker(object):
def __init__(self, servers):
self.servers = servers
def get_servers_for_psi(self, si):
return self.servers
class MockDownloadStatus(object):
def add_dyhb_request(self, server, when):
return MockDYHBEvent()
class MockDYHBEvent(object):
def finished(self, shnums, when):
pass
mockserver1 = MockServer({1: MockBuckets(), 2: MockBuckets()})
mockserver2 = MockServer({})
mockserver3 = MockServer({3: mock.Mock()})
mockstoragebroker = mock.Mock()
mockserver3 = MockServer({3: MockBuckets()})
servers = [ MockIServer("ms1", mockserver1),
MockIServer("ms2", mockserver2),
MockIServer("ms3", mockserver3), ]
mockstoragebroker.get_servers_for_psi.return_value = servers
mockdownloadstatus = mock.Mock()
mockstoragebroker = MockStorageBroker(servers)
mockdownloadstatus = MockDownloadStatus()
mocknode = MockNode(check_reneging=True, check_fetch_failed=True)
s = finder.ShareFinder(mockstoragebroker, vcap, mocknode, mockdownloadstatus)

View File

@ -1,14 +1,14 @@
from twisted.trial import unittest
from twisted.python.monkey import MonkeyPatcher
import allmydata
import mock
import __builtin__
real_import_func = __import__
class T(unittest.TestCase):
@mock.patch('__builtin__.__import__')
def test_report_import_error(self, mockimport):
def test_report_import_error(self):
real_import_func = __import__
def raiseIE_from_this_particular_func(name, *args):
if name == "foolscap":
marker = "wheeeyo"
@ -16,9 +16,10 @@ class T(unittest.TestCase):
else:
return real_import_func(name, *args)
mockimport.side_effect = raiseIE_from_this_particular_func
# Let's run as little code as possible with __import__ patched.
patcher = MonkeyPatcher((__builtin__, '__import__', raiseIE_from_this_particular_func))
vers_and_locs = patcher.runWithPatches(allmydata.get_package_versions_and_locations)
vers_and_locs = allmydata.get_package_versions_and_locations()
for (pkgname, stuff) in vers_and_locs:
if pkgname == 'foolscap':
self.failUnless('wheeeyo' in str(stuff[2]), stuff)

View File

@ -1,48 +1,56 @@
import mock
from twisted.trial.unittest import TestCase
from allmydata.web.common import get_filenode_metadata, SDMF_VERSION, MDMF_VERSION
class CommonFixture(object):
def setUp(self):
self.mockfilenode = mock.Mock()
class MockFileNode(object):
def __init__(self, size, mutable_version=None):
self.size = size
self.mutable_version = mutable_version
def get_size(self):
return self.size
def is_mutable(self):
return self.mutable_version is not None
def get_version(self):
if self.mutable_version is None:
raise AttributeError()
return self.mutable_version
class CommonFixture(object):
def test_size_is_0(self):
"""If get_size doesn't return None the returned metadata must contain "size"."""
self.mockfilenode.get_size.return_value = 0
metadata = get_filenode_metadata(self.mockfilenode)
self.failUnlessIn('size', metadata)
mockfilenode = MockFileNode(0, mutable_version=self.mutable_version)
metadata = get_filenode_metadata(mockfilenode)
self.failUnlessEqual(metadata['size'], 0)
def test_size_is_1000(self):
"""1000 is sufficiently large to guarantee the cap is not a literal."""
self.mockfilenode.get_size.return_value = 1000
metadata = get_filenode_metadata(self.mockfilenode)
self.failUnlessIn('size', metadata)
mockfilenode = MockFileNode(1000, mutable_version=self.mutable_version)
metadata = get_filenode_metadata(mockfilenode)
self.failUnlessEqual(metadata['size'], 1000)
def test_size_is_None(self):
"""If get_size returns None the returned metadata must not contain "size"."""
self.mockfilenode.get_size.return_value = None
metadata = get_filenode_metadata(self.mockfilenode)
mockfilenode = MockFileNode(None, mutable_version=self.mutable_version)
metadata = get_filenode_metadata(mockfilenode)
self.failIfIn('size', metadata)
class Test_GetFileNodeMetaData_Immutable(CommonFixture, TestCase):
def setUp(self):
CommonFixture.setUp(self)
self.mockfilenode.is_mutable.return_value = False
self.mutable_version = None
class Test_GetFileNodeMetaData_SDMF(CommonFixture, TestCase):
def setUp(self):
CommonFixture.setUp(self)
self.mockfilenode.is_mutable.return_value = True
self.mockfilenode.get_version.return_value = SDMF_VERSION
self.mutable_version = SDMF_VERSION
class Test_GetFileNodeMetaData_MDMF(CommonFixture, TestCase):
def setUp(self):
CommonFixture.setUp(self)
self.mockfilenode.is_mutable.return_value = True
self.mockfilenode.get_version.return_value = MDMF_VERSION
self.mutable_version = MDMF_VERSION

View File

@ -1,17 +1,20 @@
import os, stat, sys, time
from twisted.trial import unittest
from twisted.internet import defer
from twisted.python import log
from mock import patch
from foolscap.api import flushEventualQueue
import foolscap.logging.log
from twisted.application import service
from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
from allmydata.util import fileutil, iputil
from allmydata.util.namespace import Namespace
import allmydata.test.common_util as testutil
class LoggingMultiService(service.MultiService):
def log(self, msg, **kw):
pass
@ -169,14 +172,16 @@ class TestCase(testutil.SignalMixin, unittest.TestCase):
bits = stat.S_IMODE(st[stat.ST_MODE])
self.failUnless(bits & 0001 == 0, bits)
@patch("foolscap.logging.log.setLogDir")
def test_logdir_is_str(self, mock_setLogDir):
def test_logdir_is_str(self):
basedir = "test_node/test_logdir_is_str"
fileutil.make_dirs(basedir)
ns = Namespace()
ns.called = False
def call_setLogDir(logdir):
ns.called = True
self.failUnless(isinstance(logdir, str), logdir)
mock_setLogDir.side_effect = call_setLogDir
self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
TestNode(basedir)
self.failUnless(mock_setLogDir.called)
self.failUnless(ns.called)

View File

@ -1,6 +1,5 @@
import time, os.path, platform, stat, re, simplejson, struct, shutil
import mock
import time, os.path, platform, stat, re, simplejson, struct, shutil
from twisted.trial import unittest
@ -138,7 +137,13 @@ class Bucket(unittest.TestCase):
fileutil.write(final, share_file_data)
mockstorageserver = mock.Mock()
class MockStorageServer(object):
def add_latency(self, category, latency):
pass
def count(self, name, delta=1):
pass
mockstorageserver = MockStorageServer()
# Now read from it.
br = BucketReader(mockstorageserver, final)
@ -513,15 +518,19 @@ class Server(unittest.TestCase):
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
@mock.patch('allmydata.util.fileutil.get_disk_stats')
def test_reserved_space(self, mock_get_disk_stats):
reserved_space=10000
mock_get_disk_stats.return_value = {
'free_for_nonroot': 15000,
'avail': max(15000 - reserved_space, 0),
}
def test_reserved_space(self):
reserved = 10000
allocated = 0
ss = self.create("test_reserved_space", reserved_space=reserved_space)
def call_get_disk_stats(whichdir, reserved_space=0):
self.failUnlessEqual(reserved_space, reserved)
return {
'free_for_nonroot': 15000 - allocated,
'avail': max(15000 - allocated - reserved_space, 0),
}
self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
ss = self.create("test_reserved_space", reserved_space=reserved)
# 15k available, 10k reserved, leaves 5k for shares
# a newly created and filled share incurs this much overhead, beyond
@ -558,15 +567,9 @@ class Server(unittest.TestCase):
del bw
self.failUnlessEqual(len(ss._active_writers), 0)
# this also changes the amount reported as available by call_get_disk_stats
allocated = 1001 + OVERHEAD + LEASE_SIZE
# we have to manually increase available, since we're not doing real
# disk measurements
mock_get_disk_stats.return_value = {
'free_for_nonroot': 15000 - allocated,
'avail': max(15000 - allocated - reserved_space, 0),
}
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
@ -4007,9 +4010,10 @@ class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
d = self.render1(page, args={"t": ["json"]})
return d
@mock.patch('allmydata.util.fileutil.get_disk_stats')
def test_status_no_disk_stats(self, mock_get_disk_stats):
mock_get_disk_stats.side_effect = AttributeError()
def test_status_no_disk_stats(self):
def call_get_disk_stats(whichdir, reserved_space=0):
raise AttributeError()
self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
# Some platforms may have no disk stats API. Make sure the code can handle that
# (test runs on all platforms).
@ -4026,9 +4030,10 @@ class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
self.failUnlessIn("Space Available to Tahoe: ?", s)
self.failUnless(ss.get_available_space() is None)
@mock.patch('allmydata.util.fileutil.get_disk_stats')
def test_status_bad_disk_stats(self, mock_get_disk_stats):
mock_get_disk_stats.side_effect = OSError()
def test_status_bad_disk_stats(self):
def call_get_disk_stats(whichdir, reserved_space=0):
raise OSError()
self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
# If the API to get disk stats exists but a call to it fails, then the status should
# show that no shares will be accepted, and get_available_space() should be 0.
@ -4045,34 +4050,36 @@ class WebStatus(unittest.TestCase, pollmixin.PollMixin, WebRenderingMixin):
self.failUnlessIn("Space Available to Tahoe: ?", s)
self.failUnlessEqual(ss.get_available_space(), 0)
@mock.patch('allmydata.util.fileutil.get_disk_stats')
def test_status_right_disk_stats(self, mock_get_disk_stats):
def test_status_right_disk_stats(self):
GB = 1000000000
total = 5*GB
free_for_root = 4*GB
free_for_nonroot = 3*GB
reserved_space = 1*GB
used = total - free_for_root
avail = max(free_for_nonroot - reserved_space, 0)
mock_get_disk_stats.return_value = {
'total': total,
'free_for_root': free_for_root,
'free_for_nonroot': free_for_nonroot,
'used': used,
'avail': avail,
}
reserved = 1*GB
basedir = "storage/WebStatus/status_right_disk_stats"
fileutil.make_dirs(basedir)
ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved_space)
ss = StorageServer(basedir, "\x00" * 20, reserved_space=reserved)
expecteddir = ss.sharedir
def call_get_disk_stats(whichdir, reserved_space=0):
self.failUnlessEqual(whichdir, expecteddir)
self.failUnlessEqual(reserved_space, reserved)
used = total - free_for_root
avail = max(free_for_nonroot - reserved_space, 0)
return {
'total': total,
'free_for_root': free_for_root,
'free_for_nonroot': free_for_nonroot,
'used': used,
'avail': avail,
}
self.patch(fileutil, 'get_disk_stats', call_get_disk_stats)
ss.setServiceParent(self.s)
w = StorageStatus(ss)
html = w.renderSynchronously()
self.failIf([True for args in mock_get_disk_stats.call_args_list if args != ((expecteddir, reserved_space), {})],
mock_get_disk_stats.call_args_list)
self.failUnlessIn("<h1>Storage Server Status</h1>", html)
s = remove_tags(html)
self.failUnlessIn("Total disk space: 5.00 GB", s)

View File

@ -13,13 +13,11 @@ from xml.dom import minidom
import allmydata.web
import mock
# junk to appease pyflakes's outrage
[
accessors, appserver, static, rend, url, util, query, i18n, flat, guard, stan, testutil,
context, flatmdom, flatstan, twist, webform, processors, annotate, iformless, Decimal,
minidom, allmydata, mock,
minidom, allmydata,
]
from allmydata.scripts import runner