tahoe-lafs/src/allmydata/test/test_node.py

251 lines
9.7 KiB
Python
Raw Normal View History

import os, stat, sys, time
from twisted.trial import unittest
2007-05-24 00:55:04 +00:00
from twisted.internet import defer
from twisted.python import log
from foolscap.api import flushEventualQueue
import foolscap.logging.log
from twisted.application import service
from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
from allmydata.introducer.server import IntroducerNode
from allmydata.client import Client
from allmydata.util import fileutil, iputil
from allmydata.util.namespace import Namespace
2010-02-26 08:14:33 +00:00
import allmydata.test.common_util as testutil
class LoggingMultiService(service.MultiService):
def log(self, msg, **kw):
pass
class TestNode(Node):
CERTFILE='DEFAULT_CERTFILE_BLANK'
PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
class TestCase(testutil.SignalMixin, unittest.TestCase):
def setUp(self):
testutil.SignalMixin.setUp(self)
self.parent = LoggingMultiService()
self.parent.startService()
def tearDown(self):
log.msg("%s.tearDown" % self.__class__.__name__)
testutil.SignalMixin.tearDown(self)
d = defer.succeed(None)
d.addCallback(lambda res: self.parent.stopService())
d.addCallback(flushEventualQueue)
return d
def _test_location(self, basedir, expected_addresses, tub_port=None, tub_location=None, local_addresses=None):
fileutil.make_dirs(basedir)
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
f.write("[node]\n")
if tub_port:
f.write("tub.port = %d\n" % (tub_port,))
if tub_location is not None:
f.write("tub.location = %s\n" % (tub_location,))
f.close()
if local_addresses:
self.patch(iputil, 'get_local_addresses_sync',
lambda: local_addresses)
n = TestNode(basedir)
n.setServiceParent(self.parent)
furl = n.tub.registerReference(n)
for address in expected_addresses:
self.failUnlessIn(address, furl)
def test_location1(self):
return self._test_location(basedir="test_node/test_location1",
expected_addresses=["192.0.2.0:1234"],
tub_location="192.0.2.0:1234")
def test_location2(self):
return self._test_location(basedir="test_node/test_location2",
expected_addresses=["192.0.2.0:1234", "example.org:8091"],
tub_location="192.0.2.0:1234,example.org:8091")
def test_location_not_set(self):
"""Checks the autogenerated furl when tub.location is not set."""
return self._test_location(basedir="test_node/test_location3",
expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234"],
tub_port=1234,
local_addresses=["127.0.0.1", "192.0.2.0"])
def test_location_auto_and_explicit(self):
"""Checks the autogenerated furl when tub.location contains 'AUTO'."""
return self._test_location(basedir="test_node/test_location4",
expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234", "example.com:4321"],
tub_port=1234,
tub_location="AUTO,example.com:4321",
local_addresses=["127.0.0.1", "192.0.2.0", "example.com:4321"])
def test_tahoe_cfg_utf8(self):
basedir = "test_node/test_tahoe_cfg_utf8"
fileutil.make_dirs(basedir)
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
f.write(u"\uFEFF[node]\n".encode('utf-8'))
f.write(u"nickname = \u2621\n".encode('utf-8'))
f.close()
n = TestNode(basedir)
n.setServiceParent(self.parent)
self.failUnlessEqual(n.get_config("node", "nickname").decode('utf-8'),
u"\u2621")
def test_tahoe_cfg_hash_in_name(self):
basedir = "test_node/test_cfg_hash_in_name"
nickname = "Hash#Bang!" # a clever nickname containing a hash
fileutil.make_dirs(basedir)
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
f.write("[node]\n")
f.write("nickname = %s\n" % (nickname,))
f.close()
n = TestNode(basedir)
self.failUnless(n.nickname == nickname)
def test_private_config(self):
basedir = "test_node/test_private_config"
privdir = os.path.join(basedir, "private")
fileutil.make_dirs(privdir)
f = open(os.path.join(privdir, 'already'), 'wt')
f.write("secret")
f.close()
n = TestNode(basedir)
self.failUnlessEqual(n.get_private_config("already"), "secret")
self.failUnlessEqual(n.get_private_config("not", "default"), "default")
self.failUnlessRaises(MissingConfigEntry, n.get_private_config, "not")
value = n.get_or_create_private_config("new", "start")
self.failUnlessEqual(value, "start")
self.failUnlessEqual(n.get_private_config("new"), "start")
counter = []
def make_newer():
counter.append("called")
return "newer"
value = n.get_or_create_private_config("newer", make_newer)
self.failUnlessEqual(len(counter), 1)
self.failUnlessEqual(value, "newer")
self.failUnlessEqual(n.get_private_config("newer"), "newer")
value = n.get_or_create_private_config("newer", make_newer)
self.failUnlessEqual(len(counter), 1) # don't call unless necessary
self.failUnlessEqual(value, "newer")
def test_timestamp(self):
# this modified logger doesn't seem to get used during the tests,
# probably because we don't modify the LogObserver that trial
# installs (only the one that twistd installs). So manually exercise
# it a little bit.
t = formatTimeTahoeStyle("ignored", time.time())
self.failUnless("Z" in t)
t2 = formatTimeTahoeStyle("ignored", int(time.time()))
self.failUnless("Z" in t2)
def test_secrets_dir(self):
basedir = "test_node/test_secrets_dir"
fileutil.make_dirs(basedir)
n = TestNode(basedir)
self.failUnless(isinstance(n, TestNode))
self.failUnless(os.path.exists(os.path.join(basedir, "private")))
def test_secrets_dir_protected(self):
if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
# We don't know how to test that unprivileged users can't read this
# thing. (Also we don't know exactly how to set the permissions so
# that unprivileged users can't read this thing.)
raise unittest.SkipTest("We don't know how to set permissions on Windows.")
basedir = "test_node/test_secrets_dir_protected"
fileutil.make_dirs(basedir)
n = TestNode(basedir)
self.failUnless(isinstance(n, TestNode))
privdir = os.path.join(basedir, "private")
st = os.stat(privdir)
bits = stat.S_IMODE(st[stat.ST_MODE])
self.failUnless(bits & 0001 == 0, bits)
def test_logdir_is_str(self):
basedir = "test_node/test_logdir_is_str"
fileutil.make_dirs(basedir)
ns = Namespace()
ns.called = False
def call_setLogDir(logdir):
ns.called = True
self.failUnless(isinstance(logdir, str), logdir)
self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
TestNode(basedir)
self.failUnless(ns.called)
NO_LISTEN_CONFIG = """
[node]
tub.port =
#tub.location =
[client]
introducer.furl = empty
"""
class ClientNotListening(unittest.TestCase):
def test_port_none(self):
basedir = "test_node/test_port_none"
fileutil.make_dirs(basedir)
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
f.write(NO_LISTEN_CONFIG)
f.write("[storage]\n")
f.write("enabled = false\n")
f.close()
n = Client(basedir)
self.assertEqual(n.tub.getListeners(), [])
def test_port_none_location_none(self):
basedir = "test_node/test_port_none_location_none"
fileutil.make_dirs(basedir)
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
f.write(NO_LISTEN_CONFIG)
f.write("tub.location =\n")
f.write("[storage]\n")
f.write("enabled = false\n")
f.close()
n = Client(basedir)
self.assertEqual(n.tub.getListeners(), [])
def test_port_none_storage(self):
basedir = "test_node/test_port_none_storage"
fileutil.make_dirs(basedir)
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
f.write(NO_LISTEN_CONFIG)
f.write("[storage]\n")
f.write("enabled = true")
f.close()
e = self.assertRaises(ValueError, Client, basedir)
self.assertIn("storage is enabled, but tub is not listening", str(e))
def test_port_none_helper(self):
basedir = "test_node/test_port_none_helper"
fileutil.make_dirs(basedir)
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
f.write(NO_LISTEN_CONFIG)
f.write("[storage]\n")
f.write("enabled = false\n")
f.write("[helper]\n")
f.write("enabled = true")
f.close()
e = self.assertRaises(ValueError, Client, basedir)
self.assertIn("helper is enabled, but tub is not listening", str(e))
class IntroducerNotListening(unittest.TestCase):
def test_port_none_introducer(self):
basedir = "test_node/test_port_none_introducer"
fileutil.make_dirs(basedir)
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
f.write("[node]\n")
f.write("tub.port = \n")
f.write("#tub.location = \n")
f.close()
e = self.assertRaises(ValueError, IntroducerNode, basedir)
self.assertIn("we are Introducer, but tub is not listening", str(e))