2007-05-23 22:08:55 +00:00
|
|
|
|
2007-12-17 23:39:54 +00:00
|
|
|
import os, stat, sys, time
|
2007-05-23 22:08:55 +00:00
|
|
|
from twisted.trial import unittest
|
2007-05-24 00:55:04 +00:00
|
|
|
from twisted.internet import defer
|
2007-05-23 22:08:55 +00:00
|
|
|
from twisted.python import log
|
|
|
|
|
2007-12-13 03:31:01 +00:00
|
|
|
from foolscap.eventual import flushEventualQueue
|
2007-05-23 22:08:55 +00:00
|
|
|
from twisted.application import service
|
2007-10-31 07:54:42 +00:00
|
|
|
from allmydata.node import Node, formatTimeTahoeStyle
|
2008-10-29 04:28:31 +00:00
|
|
|
from allmydata.util import fileutil
|
|
|
|
import common_util as testutil
|
2007-05-23 22:08:55 +00:00
|
|
|
|
|
|
|
class LoggingMultiService(service.MultiService):
|
2007-11-20 01:23:18 +00:00
|
|
|
def log(self, msg, **kw):
|
2007-05-23 22:08:55 +00:00
|
|
|
pass
|
|
|
|
|
|
|
|
class TestNode(Node):
|
|
|
|
CERTFILE='DEFAULT_CERTFILE_BLANK'
|
|
|
|
PORTNUMFILE='DEFAULT_PORTNUMFILE_BLANK'
|
|
|
|
|
2009-02-23 21:43:12 +00:00
|
|
|
class TestCase(testutil.SignalMixin, unittest.TestCase):
|
2007-05-23 22:08:55 +00:00
|
|
|
def setUp(self):
|
2009-02-23 21:43:12 +00:00
|
|
|
testutil.SignalMixin.setUp(self)
|
2007-05-23 22:08:55 +00:00
|
|
|
self.parent = LoggingMultiService()
|
|
|
|
self.parent.startService()
|
|
|
|
def tearDown(self):
|
|
|
|
log.msg("%s.tearDown" % self.__class__.__name__)
|
2009-02-23 21:43:12 +00:00
|
|
|
testutil.SignalMixin.tearDown(self)
|
2007-05-23 22:08:55 +00:00
|
|
|
d = defer.succeed(None)
|
|
|
|
d.addCallback(lambda res: self.parent.stopService())
|
|
|
|
d.addCallback(flushEventualQueue)
|
|
|
|
return d
|
|
|
|
|
2008-11-13 01:44:58 +00:00
|
|
|
def test_location(self):
|
|
|
|
basedir = "test_node/test_location"
|
2007-11-02 00:29:15 +00:00
|
|
|
fileutil.make_dirs(basedir)
|
2008-11-13 01:44:58 +00:00
|
|
|
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
|
|
|
|
f.write("[node]\n")
|
|
|
|
f.write("tub.location = 1.2.3.4:5\n")
|
2007-11-02 00:29:15 +00:00
|
|
|
f.close()
|
2007-05-23 22:08:55 +00:00
|
|
|
|
2007-11-02 00:29:15 +00:00
|
|
|
n = TestNode(basedir)
|
2007-05-23 22:08:55 +00:00
|
|
|
n.setServiceParent(self.parent)
|
|
|
|
d = n.when_tub_ready()
|
|
|
|
|
|
|
|
def _check_addresses(ignored_result):
|
2007-11-02 00:29:15 +00:00
|
|
|
furl = n.tub.registerReference(n)
|
|
|
|
self.failUnless("1.2.3.4:5" in furl, furl)
|
2007-05-23 22:08:55 +00:00
|
|
|
|
|
|
|
d.addCallback(_check_addresses)
|
|
|
|
return d
|
|
|
|
|
2008-11-13 01:44:58 +00:00
|
|
|
def test_location2(self):
|
|
|
|
basedir = "test_node/test_location2"
|
2008-09-30 06:08:16 +00:00
|
|
|
fileutil.make_dirs(basedir)
|
2008-11-13 01:44:58 +00:00
|
|
|
f = open(os.path.join(basedir, 'tahoe.cfg'), 'wt')
|
|
|
|
f.write("[node]\n")
|
|
|
|
f.write("tub.location = 1.2.3.4:5,example.org:8091\n")
|
|
|
|
f.close()
|
2008-09-30 06:08:16 +00:00
|
|
|
|
|
|
|
n = TestNode(basedir)
|
|
|
|
n.setServiceParent(self.parent)
|
|
|
|
d = n.when_tub_ready()
|
|
|
|
|
|
|
|
def _check_addresses(ignored_result):
|
2008-11-13 01:44:58 +00:00
|
|
|
furl = n.tub.registerReference(n)
|
|
|
|
self.failUnless("1.2.3.4:5" in furl, furl)
|
|
|
|
self.failUnless("example.org:8091" in furl, furl)
|
2008-09-30 06:08:16 +00:00
|
|
|
|
|
|
|
d.addCallback(_check_addresses)
|
|
|
|
return d
|
|
|
|
|
2007-10-31 07:54:42 +00:00
|
|
|
def test_timestamp(self):
|
|
|
|
# this modified logger doesn't seem to get used during the tests,
|
|
|
|
# probably because we don't modify the LogObserver that trial
|
|
|
|
# installs (only the one that twistd installs). So manually exercise
|
|
|
|
# it a little bit.
|
|
|
|
t = formatTimeTahoeStyle("ignored", time.time())
|
|
|
|
self.failUnless("Z" in t)
|
|
|
|
t2 = formatTimeTahoeStyle("ignored", int(time.time()))
|
|
|
|
self.failUnless("Z" in t2)
|
2007-12-17 23:39:54 +00:00
|
|
|
|
|
|
|
def test_secrets_dir(self):
|
|
|
|
basedir = "test_node/test_secrets_dir"
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
n = TestNode(basedir)
|
|
|
|
self.failUnless(os.path.exists(os.path.join(basedir, "private")))
|
|
|
|
|
|
|
|
def test_secrets_dir_protected(self):
|
|
|
|
if "win32" in sys.platform.lower() or "cygwin" in sys.platform.lower():
|
|
|
|
# We don't know how to test that unprivileged users can't read this
|
|
|
|
# thing. (Also we don't know exactly how to set the permissions so
|
|
|
|
# that unprivileged users can't read this thing.)
|
|
|
|
raise unittest.SkipTest("We don't know how to set permissions on Windows.")
|
|
|
|
basedir = "test_node/test_secrets_dir_protected"
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
n = TestNode(basedir)
|
|
|
|
privdir = os.path.join(basedir, "private")
|
|
|
|
st = os.stat(privdir)
|
|
|
|
bits = stat.S_IMODE(st[stat.ST_MODE])
|
|
|
|
self.failUnless(bits & 0001 == 0, bits)
|