mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-15 09:19:49 +00:00
ffc2f48cfe
This can be done synchronously because we now know the port number earlier. This still uses get_local_addresses_sync() (not _async) to do automatic IP-address detection if the config file didn't set tub.location or used the special word "AUTO" in it. The new implementation slightly changes the mapping from tub.location to the assigned location string. The old code removed all instances of "AUTO" from the location and then extended the hints with the local ones (so "hint1:AUTO:hint2" turns into "hint1:hint2:auto1:auto2"). The new code exactly replaces each "AUTO" with the local hints (so that example turns into "hint1:auto1:auto2:hint2", and a silly "hint1:AUTO:AUTO" would turn into "hint1:auto1:auto2:auto1:auto2"). This is unlikely to affect anybody.
181 lines
7.3 KiB
Python
181 lines
7.3 KiB
Python
|
|
import os, stat, sys, time
|
|
|
|
from twisted.trial import unittest
|
|
from twisted.internet import defer
|
|
from twisted.python import log
|
|
|
|
from foolscap.api import flushEventualQueue
|
|
import foolscap.logging.log
|
|
|
|
from twisted.application import service
|
|
from allmydata.node import Node, formatTimeTahoeStyle, MissingConfigEntry
|
|
from allmydata.util import fileutil, iputil
|
|
from allmydata.util.namespace import Namespace
|
|
import allmydata.test.common_util as testutil
|
|
|
|
|
|
class LoggingMultiService(service.MultiService):
|
|
def log(self, msg, **kw):
|
|
pass
|
|
|
|
class TestNode(Node):
|
|
CERTFILE='DEFAULT_CERTFILE_BLANK'
|
|
PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
|
|
|
|
class TestCase(testutil.SignalMixin, unittest.TestCase):
|
|
def setUp(self):
|
|
testutil.SignalMixin.setUp(self)
|
|
self.parent = LoggingMultiService()
|
|
self.parent.startService()
|
|
def tearDown(self):
|
|
log.msg("%s.tearDown" % self.__class__.__name__)
|
|
testutil.SignalMixin.tearDown(self)
|
|
d = defer.succeed(None)
|
|
d.addCallback(lambda res: self.parent.stopService())
|
|
d.addCallback(flushEventualQueue)
|
|
return d
|
|
|
|
def _test_location(self, basedir, expected_addresses, tub_port=None, tub_location=None, local_addresses=None):
|
|
fileutil.make_dirs(basedir)
|
|
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
|
|
f.write("[node]\n")
|
|
if tub_port:
|
|
f.write("tub.port = %d\n" % (tub_port,))
|
|
if tub_location is not None:
|
|
f.write("tub.location = %s\n" % (tub_location,))
|
|
f.close()
|
|
|
|
if local_addresses:
|
|
self.patch(iputil, 'get_local_addresses_sync',
|
|
lambda: local_addresses)
|
|
|
|
n = TestNode(basedir)
|
|
n.setServiceParent(self.parent)
|
|
furl = n.tub.registerReference(n)
|
|
for address in expected_addresses:
|
|
self.failUnlessIn(address, furl)
|
|
|
|
def test_location1(self):
|
|
return self._test_location(basedir="test_node/test_location1",
|
|
expected_addresses=["192.0.2.0:1234"],
|
|
tub_location="192.0.2.0:1234")
|
|
|
|
def test_location2(self):
|
|
return self._test_location(basedir="test_node/test_location2",
|
|
expected_addresses=["192.0.2.0:1234", "example.org:8091"],
|
|
tub_location="192.0.2.0:1234,example.org:8091")
|
|
|
|
def test_location_not_set(self):
|
|
"""Checks the autogenerated furl when tub.location is not set."""
|
|
return self._test_location(basedir="test_node/test_location3",
|
|
expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234"],
|
|
tub_port=1234,
|
|
local_addresses=["127.0.0.1", "192.0.2.0"])
|
|
|
|
def test_location_auto_and_explicit(self):
|
|
"""Checks the autogenerated furl when tub.location contains 'AUTO'."""
|
|
return self._test_location(basedir="test_node/test_location4",
|
|
expected_addresses=["127.0.0.1:1234", "192.0.2.0:1234", "example.com:4321"],
|
|
tub_port=1234,
|
|
tub_location="AUTO,example.com:4321",
|
|
local_addresses=["127.0.0.1", "192.0.2.0", "example.com:4321"])
|
|
|
|
def test_tahoe_cfg_utf8(self):
|
|
basedir = "test_node/test_tahoe_cfg_utf8"
|
|
fileutil.make_dirs(basedir)
|
|
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
|
|
f.write(u"\uFEFF[node]\n".encode('utf-8'))
|
|
f.write(u"nickname = \u2621\n".encode('utf-8'))
|
|
f.close()
|
|
|
|
n = TestNode(basedir)
|
|
n.setServiceParent(self.parent)
|
|
self.failUnlessEqual(n.get_config("node", "nickname").decode('utf-8'),
|
|
u"\u2621")
|
|
|
|
def test_tahoe_cfg_hash_in_name(self):
|
|
basedir = "test_node/test_cfg_hash_in_name"
|
|
nickname = "Hash#Bang!" # a clever nickname containing a hash
|
|
fileutil.make_dirs(basedir)
|
|
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
|
|
f.write("[node]\n")
|
|
f.write("nickname = %s\n" % (nickname,))
|
|
f.close()
|
|
n = TestNode(basedir)
|
|
self.failUnless(n.nickname == nickname)
|
|
|
|
def test_private_config(self):
|
|
basedir = "test_node/test_private_config"
|
|
privdir = os.path.join(basedir, "private")
|
|
fileutil.make_dirs(privdir)
|
|
f = open(os.path.join(privdir, 'already'), 'wt')
|
|
f.write("secret")
|
|
f.close()
|
|
|
|
n = TestNode(basedir)
|
|
self.failUnlessEqual(n.get_private_config("already"), "secret")
|
|
self.failUnlessEqual(n.get_private_config("not", "default"), "default")
|
|
self.failUnlessRaises(MissingConfigEntry, n.get_private_config, "not")
|
|
value = n.get_or_create_private_config("new", "start")
|
|
self.failUnlessEqual(value, "start")
|
|
self.failUnlessEqual(n.get_private_config("new"), "start")
|
|
counter = []
|
|
def make_newer():
|
|
counter.append("called")
|
|
return "newer"
|
|
value = n.get_or_create_private_config("newer", make_newer)
|
|
self.failUnlessEqual(len(counter), 1)
|
|
self.failUnlessEqual(value, "newer")
|
|
self.failUnlessEqual(n.get_private_config("newer"), "newer")
|
|
|
|
value = n.get_or_create_private_config("newer", make_newer)
|
|
self.failUnlessEqual(len(counter), 1) # don't call unless necessary
|
|
self.failUnlessEqual(value, "newer")
|
|
|
|
def test_timestamp(self):
|
|
# this modified logger doesn't seem to get used during the tests,
|
|
# probably because we don't modify the LogObserver that trial
|
|
# installs (only the one that twistd installs). So manually exercise
|
|
# it a little bit.
|
|
t = formatTimeTahoeStyle("ignored", time.time())
|
|
self.failUnless("Z" in t)
|
|
t2 = formatTimeTahoeStyle("ignored", int(time.time()))
|
|
self.failUnless("Z" in t2)
|
|
|
|
def test_secrets_dir(self):
|
|
basedir = "test_node/test_secrets_dir"
|
|
fileutil.make_dirs(basedir)
|
|
n = TestNode(basedir)
|
|
self.failUnless(isinstance(n, TestNode))
|
|
self.failUnless(os.path.exists(os.path.join(basedir, "private")))
|
|
|
|
def test_secrets_dir_protected(self):
|
|
if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
|
|
# We don't know how to test that unprivileged users can't read this
|
|
# thing. (Also we don't know exactly how to set the permissions so
|
|
# that unprivileged users can't read this thing.)
|
|
raise unittest.SkipTest("We don't know how to set permissions on Windows.")
|
|
basedir = "test_node/test_secrets_dir_protected"
|
|
fileutil.make_dirs(basedir)
|
|
n = TestNode(basedir)
|
|
self.failUnless(isinstance(n, TestNode))
|
|
privdir = os.path.join(basedir, "private")
|
|
st = os.stat(privdir)
|
|
bits = stat.S_IMODE(st[stat.ST_MODE])
|
|
self.failUnless(bits & 0001 == 0, bits)
|
|
|
|
def test_logdir_is_str(self):
|
|
basedir = "test_node/test_logdir_is_str"
|
|
fileutil.make_dirs(basedir)
|
|
|
|
ns = Namespace()
|
|
ns.called = False
|
|
def call_setLogDir(logdir):
|
|
ns.called = True
|
|
self.failUnless(isinstance(logdir, str), logdir)
|
|
self.patch(foolscap.logging.log, 'setLogDir', call_setLogDir)
|
|
|
|
TestNode(basedir)
|
|
self.failUnless(ns.called)
|