mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-05-31 23:00:53 +00:00
split test_web.py into smaller files
there's more to be done, but this is a start
This commit is contained in:
parent
421520ade4
commit
4f0e71db4a
0
src/allmydata/test/web/__init__.py
Normal file
0
src/allmydata/test/web/__init__.py
Normal file
6
src/allmydata/test/web/common.py
Normal file
6
src/allmydata/test/web/common.py
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
|
||||||
|
unknown_rwcap = u"lafs://from_the_future_rw_\u263A".encode('utf-8')
|
||||||
|
unknown_rocap = u"ro.lafs://readonly_from_the_future_ro_\u263A".encode('utf-8')
|
||||||
|
unknown_immcap = u"imm.lafs://immutable_from_the_future_imm_\u263A".encode('utf-8')
|
||||||
|
|
||||||
|
FAVICON_MARKUP = '<link href="/icon.png" rel="shortcut icon" />'
|
1375
src/allmydata/test/web/test_grid.py
Normal file
1375
src/allmydata/test/web/test_grid.py
Normal file
File diff suppressed because it is too large
Load Diff
59
src/allmydata/test/web/test_introducer.py
Normal file
59
src/allmydata/test/web/test_introducer.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
import os.path
|
||||||
|
from twisted.trial import unittest
|
||||||
|
from foolscap.api import fireEventually, flushEventualQueue
|
||||||
|
from allmydata.util import fileutil
|
||||||
|
from twisted.internet import defer, reactor
|
||||||
|
from allmydata.introducer import IntroducerNode
|
||||||
|
from .common import FAVICON_MARKUP
|
||||||
|
from ..common_web import HTTPClientGETFactory
|
||||||
|
|
||||||
|
class IntroducerWeb(unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.node = None
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
d = defer.succeed(None)
|
||||||
|
if self.node:
|
||||||
|
d.addCallback(lambda ign: self.node.stopService())
|
||||||
|
d.addCallback(flushEventualQueue)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def test_welcome(self):
|
||||||
|
basedir = "web.IntroducerWeb.test_welcome"
|
||||||
|
os.mkdir(basedir)
|
||||||
|
cfg = "\n".join(["[node]",
|
||||||
|
"tub.location = 127.0.0.1:1",
|
||||||
|
"web.port = tcp:0",
|
||||||
|
]) + "\n"
|
||||||
|
fileutil.write(os.path.join(basedir, "tahoe.cfg"), cfg)
|
||||||
|
self.node = IntroducerNode(basedir)
|
||||||
|
self.ws = self.node.getServiceNamed("webish")
|
||||||
|
|
||||||
|
d = fireEventually(None)
|
||||||
|
d.addCallback(lambda ign: self.node.startService())
|
||||||
|
|
||||||
|
d.addCallback(lambda ign: self.GET("/"))
|
||||||
|
def _check(res):
|
||||||
|
self.failUnlessIn('Welcome to the Tahoe-LAFS Introducer', res)
|
||||||
|
self.failUnlessIn(FAVICON_MARKUP, res)
|
||||||
|
self.failUnlessIn('Page rendered at', res)
|
||||||
|
self.failUnlessIn('Tahoe-LAFS code imported from:', res)
|
||||||
|
d.addCallback(_check)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def GET(self, urlpath, followRedirect=False, return_response=False,
|
||||||
|
**kwargs):
|
||||||
|
# if return_response=True, this fires with (data, statuscode,
|
||||||
|
# respheaders) instead of just data.
|
||||||
|
assert not isinstance(urlpath, unicode)
|
||||||
|
url = self.ws.getURL().rstrip('/') + urlpath
|
||||||
|
factory = HTTPClientGETFactory(url, method="GET",
|
||||||
|
followRedirect=followRedirect, **kwargs)
|
||||||
|
reactor.connectTCP("localhost", self.ws.getPortnum(), factory)
|
||||||
|
d = factory.deferred
|
||||||
|
def _got_data(data):
|
||||||
|
return (data, factory.status, factory.response_headers)
|
||||||
|
if return_response:
|
||||||
|
d.addCallback(_got_data)
|
||||||
|
return factory.deferred
|
||||||
|
|
105
src/allmydata/test/web/test_token.py
Normal file
105
src/allmydata/test/web/test_token.py
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
from zope.interface import implementer
|
||||||
|
from twisted.trial import unittest
|
||||||
|
from twisted.web import server
|
||||||
|
from nevow.inevow import IRequest
|
||||||
|
from allmydata.web import common
|
||||||
|
|
||||||
|
# XXX FIXME when we introduce "mock" as a dependency, these can
|
||||||
|
# probably just be Mock instances
|
||||||
|
@implementer(IRequest)
|
||||||
|
class FakeRequest(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.method = "POST"
|
||||||
|
self.fields = dict()
|
||||||
|
self.args = dict()
|
||||||
|
|
||||||
|
|
||||||
|
class FakeField(object):
|
||||||
|
def __init__(self, *values):
|
||||||
|
if len(values) == 1:
|
||||||
|
self.value = values[0]
|
||||||
|
else:
|
||||||
|
self.value = list(values)
|
||||||
|
|
||||||
|
|
||||||
|
class FakeClientWithToken(object):
|
||||||
|
token = 'a' * 32
|
||||||
|
|
||||||
|
def get_auth_token(self):
|
||||||
|
return self.token
|
||||||
|
|
||||||
|
|
||||||
|
class TestTokenOnlyApi(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
self.client = FakeClientWithToken()
|
||||||
|
self.page = common.TokenOnlyWebApi(self.client)
|
||||||
|
|
||||||
|
def test_not_post(self):
|
||||||
|
req = FakeRequest()
|
||||||
|
req.method = "GET"
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
server.UnsupportedMethod,
|
||||||
|
self.page.render, req,
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_missing_token(self):
|
||||||
|
req = FakeRequest()
|
||||||
|
|
||||||
|
exc = self.assertRaises(
|
||||||
|
common.WebError,
|
||||||
|
self.page.render, req,
|
||||||
|
)
|
||||||
|
self.assertEquals(exc.text, "Missing token")
|
||||||
|
self.assertEquals(exc.code, 401)
|
||||||
|
|
||||||
|
def test_token_in_get_args(self):
|
||||||
|
req = FakeRequest()
|
||||||
|
req.args['token'] = 'z' * 32
|
||||||
|
|
||||||
|
exc = self.assertRaises(
|
||||||
|
common.WebError,
|
||||||
|
self.page.render, req,
|
||||||
|
)
|
||||||
|
self.assertEquals(exc.text, "Do not pass 'token' as URL argument")
|
||||||
|
self.assertEquals(exc.code, 400)
|
||||||
|
|
||||||
|
def test_invalid_token(self):
|
||||||
|
wrong_token = 'b' * 32
|
||||||
|
req = FakeRequest()
|
||||||
|
req.fields['token'] = FakeField(wrong_token)
|
||||||
|
|
||||||
|
exc = self.assertRaises(
|
||||||
|
common.WebError,
|
||||||
|
self.page.render, req,
|
||||||
|
)
|
||||||
|
self.assertEquals(exc.text, "Invalid token")
|
||||||
|
self.assertEquals(exc.code, 401)
|
||||||
|
|
||||||
|
def test_valid_token_no_t_arg(self):
|
||||||
|
req = FakeRequest()
|
||||||
|
req.fields['token'] = FakeField(self.client.token)
|
||||||
|
|
||||||
|
with self.assertRaises(common.WebError) as exc:
|
||||||
|
self.page.render(req)
|
||||||
|
self.assertEquals(exc.exception.text, "Must provide 't=' argument")
|
||||||
|
self.assertEquals(exc.exception.code, 400)
|
||||||
|
|
||||||
|
def test_valid_token_invalid_t_arg(self):
|
||||||
|
req = FakeRequest()
|
||||||
|
req.fields['token'] = FakeField(self.client.token)
|
||||||
|
req.args['t'] = 'not at all json'
|
||||||
|
|
||||||
|
with self.assertRaises(common.WebError) as exc:
|
||||||
|
self.page.render(req)
|
||||||
|
self.assertTrue("invalid type" in exc.exception.text)
|
||||||
|
self.assertEquals(exc.exception.code, 400)
|
||||||
|
|
||||||
|
def test_valid(self):
|
||||||
|
req = FakeRequest()
|
||||||
|
req.fields['token'] = FakeField(self.client.token)
|
||||||
|
req.args['t'] = ['json']
|
||||||
|
|
||||||
|
result = self.page.render(req)
|
||||||
|
self.assertTrue(result == NotImplemented)
|
65
src/allmydata/test/web/test_util.py
Normal file
65
src/allmydata/test/web/test_util.py
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
from twisted.trial import unittest
|
||||||
|
from allmydata.web import status, common
|
||||||
|
from ..common import ShouldFailMixin
|
||||||
|
from .. import common_util as testutil
|
||||||
|
|
||||||
|
class Util(ShouldFailMixin, testutil.ReallyEqualMixin, unittest.TestCase):
|
||||||
|
def test_load_file(self):
|
||||||
|
# This will raise an exception unless a well-formed XML file is found under that name.
|
||||||
|
common.getxmlfile('directory.xhtml').load()
|
||||||
|
|
||||||
|
def test_parse_replace_arg(self):
|
||||||
|
self.failUnlessReallyEqual(common.parse_replace_arg("true"), True)
|
||||||
|
self.failUnlessReallyEqual(common.parse_replace_arg("false"), False)
|
||||||
|
self.failUnlessReallyEqual(common.parse_replace_arg("only-files"),
|
||||||
|
"only-files")
|
||||||
|
self.failUnlessRaises(common.WebError, common.parse_replace_arg, "only_fles")
|
||||||
|
|
||||||
|
def test_abbreviate_time(self):
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_time(None), "")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_time(1.234), "1.23s")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_time(0.123), "123ms")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_time(0.00123), "1.2ms")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_time(0.000123), "123us")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_time(-123000), "-123000000000us")
|
||||||
|
|
||||||
|
def test_compute_rate(self):
|
||||||
|
self.failUnlessReallyEqual(common.compute_rate(None, None), None)
|
||||||
|
self.failUnlessReallyEqual(common.compute_rate(None, 1), None)
|
||||||
|
self.failUnlessReallyEqual(common.compute_rate(250000, None), None)
|
||||||
|
self.failUnlessReallyEqual(common.compute_rate(250000, 0), None)
|
||||||
|
self.failUnlessReallyEqual(common.compute_rate(250000, 10), 25000.0)
|
||||||
|
self.failUnlessReallyEqual(common.compute_rate(0, 10), 0.0)
|
||||||
|
self.shouldFail(AssertionError, "test_compute_rate", "",
|
||||||
|
common.compute_rate, -100, 10)
|
||||||
|
self.shouldFail(AssertionError, "test_compute_rate", "",
|
||||||
|
common.compute_rate, 100, -10)
|
||||||
|
|
||||||
|
# Sanity check
|
||||||
|
rate = common.compute_rate(10*1000*1000, 1)
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_rate(rate), "10.00MBps")
|
||||||
|
|
||||||
|
def test_abbreviate_rate(self):
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_rate(None), "")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_rate(1234000), "1.23MBps")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_rate(12340), "12.3kBps")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_rate(123), "123Bps")
|
||||||
|
|
||||||
|
def test_abbreviate_size(self):
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_size(None), "")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_size(1.23*1000*1000*1000), "1.23GB")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_size(1.23*1000*1000), "1.23MB")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_size(1230), "1.2kB")
|
||||||
|
self.failUnlessReallyEqual(common.abbreviate_size(123), "123B")
|
||||||
|
|
||||||
|
def test_plural(self):
|
||||||
|
def convert(s):
|
||||||
|
return "%d second%s" % (s, status.plural(s))
|
||||||
|
self.failUnlessReallyEqual(convert(0), "0 seconds")
|
||||||
|
self.failUnlessReallyEqual(convert(1), "1 second")
|
||||||
|
self.failUnlessReallyEqual(convert(2), "2 seconds")
|
||||||
|
def convert2(s):
|
||||||
|
return "has share%s: %s" % (status.plural(s), ",".join(s))
|
||||||
|
self.failUnlessReallyEqual(convert2([]), "has shares: ")
|
||||||
|
self.failUnlessReallyEqual(convert2(["1"]), "has share: 1")
|
||||||
|
self.failUnlessReallyEqual(convert2(["1","2"]), "has shares: 1,2")
|
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user