import re, urllib
import simplejson
from twisted.application import service
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.web import client, error, http
from twisted.python import failure, log
from allmydata import interfaces, provisioning, uri, webish
from allmydata.immutable import upload, download
from allmydata.web import status, common
from allmydata.util import fileutil, testutil
from allmydata.test.common import FakeDirectoryNode, FakeCHKFileNode, \
FakeMutableFileNode, create_chk_filenode
from allmydata.interfaces import IURI, INewDirectoryURI, \
IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode
from allmydata.mutable import servermap, publish, retrieve
# create a fake uploader/downloader, and a couple of fake dirnodes, then
# create a webserver that works against them
class FakeIntroducerClient:
def get_all_connectors(self):
return {}
def get_all_connections_for(self, service_name):
return frozenset()
def get_all_peerids(self):
return frozenset()
class FakeClient(service.MultiService):
nodeid = "fake_nodeid"
nickname = "fake_nickname"
basedir = "fake_basedir"
def get_versions(self):
return {'allmydata': "fake",
'foolscap': "fake",
'twisted': "fake",
'zfec': "fake",
}
introducer_furl = "None"
introducer_client = FakeIntroducerClient()
_all_upload_status = [upload.UploadStatus()]
_all_download_status = [download.DownloadStatus()]
_all_mapupdate_statuses = [servermap.UpdateStatus()]
_all_publish_statuses = [publish.PublishStatus()]
_all_retrieve_statuses = [retrieve.RetrieveStatus()]
convergence = "some random string"
def connected_to_introducer(self):
return False
def get_nickname_for_peerid(self, peerid):
return u"John Doe"
def create_node_from_uri(self, auri):
u = uri.from_string(auri)
if (INewDirectoryURI.providedBy(u)
or IReadonlyNewDirectoryURI.providedBy(u)):
return FakeDirectoryNode(self).init_from_uri(u)
if IFileURI.providedBy(u):
return FakeCHKFileNode(u, self)
assert IMutableFileURI.providedBy(u), u
return FakeMutableFileNode(self).init_from_uri(u)
def create_empty_dirnode(self):
n = FakeDirectoryNode(self)
d = n.create()
d.addCallback(lambda res: n)
return d
MUTABLE_SIZELIMIT = FakeMutableFileNode.MUTABLE_SIZELIMIT
def create_mutable_file(self, contents=""):
n = FakeMutableFileNode(self)
return n.create(contents)
def upload(self, uploadable):
d = uploadable.get_size()
d.addCallback(lambda size: uploadable.read(size))
def _got_data(datav):
data = "".join(datav)
n = create_chk_filenode(self, data)
results = upload.UploadResults()
results.uri = n.get_uri()
return results
d.addCallback(_got_data)
return d
def list_all_upload_statuses(self):
return self._all_upload_status
def list_all_download_statuses(self):
return self._all_download_status
def list_all_mapupdate_statuses(self):
return self._all_mapupdate_statuses
def list_all_publish_statuses(self):
return self._all_publish_statuses
def list_all_retrieve_statuses(self):
return self._all_retrieve_statuses
def list_all_helper_statuses(self):
return []
class HTTPClientHEADFactory(client.HTTPClientFactory):
def __init__(self, *args, **kwargs):
client.HTTPClientFactory.__init__(self, *args, **kwargs)
self.deferred.addCallback(lambda res: self.response_headers)
def noPage(self, reason):
# Twisted-2.5.0 and earlier had a bug, in which they would raise an
# exception when the response to a HEAD request had no body (when in
# fact they are defined to never have a body). This was fixed in
# Twisted-8.0 . To work around this, we catch the
# PartialDownloadError and make it disappear.
if (reason.check(client.PartialDownloadError)
and self.method.upper() == "HEAD"):
self.page("")
return
return client.HTTPClientFactory.noPage(self, reason)
class WebMixin(object):
def setUp(self):
self.s = FakeClient()
self.s.startService()
self.ws = s = webish.WebishServer("0")
s.setServiceParent(self.s)
self.webish_port = port = s.listener._port.getHost().port
self.webish_url = "http://localhost:%d" % port
l = [ self.s.create_empty_dirnode() for x in range(6) ]
d = defer.DeferredList(l)
def _then(res):
self.public_root = res[0][1]
assert interfaces.IDirectoryNode.providedBy(self.public_root), res
self.public_url = "/uri/" + self.public_root.get_uri()
self.private_root = res[1][1]
foo = res[2][1]
self._foo_node = foo
self._foo_uri = foo.get_uri()
self._foo_readonly_uri = foo.get_readonly_uri()
# NOTE: we ignore the deferred on all set_uri() calls, because we
# know the fake nodes do these synchronously
self.public_root.set_uri(u"foo", foo.get_uri())
self.BAR_CONTENTS, n, self._bar_txt_uri = self.makefile(0)
foo.set_uri(u"bar.txt", self._bar_txt_uri)
foo.set_uri(u"empty", res[3][1].get_uri())
sub_uri = res[4][1].get_uri()
self._sub_uri = sub_uri
foo.set_uri(u"sub", sub_uri)
sub = self.s.create_node_from_uri(sub_uri)
_ign, n, blocking_uri = self.makefile(1)
foo.set_uri(u"blockingfile", blocking_uri)
unicode_filename = u"n\u00fc.txt" # n u-umlaut . t x t
# ok, unicode calls it LATIN SMALL LETTER U WITH DIAERESIS but I
# still think of it as an umlaut
foo.set_uri(unicode_filename, self._bar_txt_uri)
_ign, n, baz_file = self.makefile(2)
sub.set_uri(u"baz.txt", baz_file)
_ign, n, self._bad_file_uri = self.makefile(3)
# this uri should not be downloadable
del FakeCHKFileNode.all_contents[self._bad_file_uri]
rodir = res[5][1]
self.public_root.set_uri(u"reedownlee", rodir.get_readonly_uri())
rodir.set_uri(u"nor", baz_file)
# public/
# public/foo/
# public/foo/bar.txt
# public/foo/blockingfile
# public/foo/empty/
# public/foo/sub/
# public/foo/sub/baz.txt
# public/reedownlee/
# public/reedownlee/nor
self.NEWFILE_CONTENTS = "newfile contents\n"
return foo.get_metadata_for(u"bar.txt")
d.addCallback(_then)
def _got_metadata(metadata):
self._bar_txt_metadata = metadata
d.addCallback(_got_metadata)
return d
def makefile(self, number):
contents = "contents of file %s\n" % number
n = create_chk_filenode(self.s, contents)
return contents, n, n.get_uri()
def tearDown(self):
return self.s.stopService()
def failUnlessIsBarDotTxt(self, res):
self.failUnlessEqual(res, self.BAR_CONTENTS, res)
def failUnlessIsBarJSON(self, res):
data = simplejson.loads(res)
self.failUnless(isinstance(data, list))
self.failUnlessEqual(data[0], u"filenode")
self.failUnless(isinstance(data[1], dict))
self.failIf(data[1]["mutable"])
self.failIf("rw_uri" in data[1]) # immutable
self.failUnlessEqual(data[1]["ro_uri"], self._bar_txt_uri)
self.failUnlessEqual(data[1]["size"], len(self.BAR_CONTENTS))
def failUnlessIsFooJSON(self, res):
data = simplejson.loads(res)
self.failUnless(isinstance(data, list))
self.failUnlessEqual(data[0], "dirnode", res)
self.failUnless(isinstance(data[1], dict))
self.failUnless(data[1]["mutable"])
self.failUnless("rw_uri" in data[1]) # mutable
self.failUnlessEqual(data[1]["rw_uri"], self._foo_uri)
self.failUnlessEqual(data[1]["ro_uri"], self._foo_readonly_uri)
kidnames = sorted([unicode(n) for n in data[1]["children"]])
self.failUnlessEqual(kidnames,
[u"bar.txt", u"blockingfile", u"empty",
u"n\u00fc.txt", u"sub"])
kids = dict( [(unicode(name),value)
for (name,value)
in data[1]["children"].iteritems()] )
self.failUnlessEqual(kids[u"sub"][0], "dirnode")
self.failUnless("metadata" in kids[u"sub"][1])
self.failUnless("ctime" in kids[u"sub"][1]["metadata"])
self.failUnless("mtime" in kids[u"sub"][1]["metadata"])
self.failUnlessEqual(kids[u"bar.txt"][0], "filenode")
self.failUnlessEqual(kids[u"bar.txt"][1]["size"], len(self.BAR_CONTENTS))
self.failUnlessEqual(kids[u"bar.txt"][1]["ro_uri"], self._bar_txt_uri)
self.failUnlessEqual(kids[u"bar.txt"][1]["metadata"]["ctime"],
self._bar_txt_metadata["ctime"])
self.failUnlessEqual(kids[u"n\u00fc.txt"][1]["ro_uri"],
self._bar_txt_uri)
def GET(self, urlpath, followRedirect=False):
assert not isinstance(urlpath, unicode)
url = self.webish_url + urlpath
return client.getPage(url, method="GET", followRedirect=followRedirect)
def HEAD(self, urlpath):
# this requires some surgery, because twisted.web.client doesn't want
# to give us back the response headers.
factory = HTTPClientHEADFactory(urlpath, method="HEAD")
reactor.connectTCP("localhost", self.webish_port, factory)
return factory.deferred
def PUT(self, urlpath, data):
url = self.webish_url + urlpath
return client.getPage(url, method="PUT", postdata=data)
def DELETE(self, urlpath):
url = self.webish_url + urlpath
return client.getPage(url, method="DELETE")
def POST(self, urlpath, followRedirect=False, **fields):
url = self.webish_url + urlpath
sepbase = "boogabooga"
sep = "--" + sepbase
form = []
form.append(sep)
form.append('Content-Disposition: form-data; name="_charset"')
form.append('')
form.append('UTF-8')
form.append(sep)
for name, value in fields.iteritems():
if isinstance(value, tuple):
filename, value = value
form.append('Content-Disposition: form-data; name="%s"; '
'filename="%s"' % (name, filename.encode("utf-8")))
else:
form.append('Content-Disposition: form-data; name="%s"' % name)
form.append('')
if isinstance(value, unicode):
value = value.encode("utf-8")
else:
value = str(value)
assert isinstance(value, str)
form.append(value)
form.append(sep)
form[-1] += "--"
body = "\r\n".join(form) + "\r\n"
headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
}
return client.getPage(url, method="POST", postdata=body,
headers=headers, followRedirect=followRedirect)
def shouldFail(self, res, expected_failure, which,
substring=None, response_substring=None):
if isinstance(res, failure.Failure):
res.trap(expected_failure)
if substring:
self.failUnless(substring in str(res),
"substring '%s' not in '%s'"
% (substring, str(res)))
if response_substring:
self.failUnless(response_substring in res.value.response,
"response substring '%s' not in '%s'"
% (response_substring, res.value.response))
else:
self.fail("%s was supposed to raise %s, not get '%s'" %
(which, expected_failure, res))
def shouldFail2(self, expected_failure, which, substring,
response_substring,
callable, *args, **kwargs):
assert substring is None or isinstance(substring, str)
assert response_substring is None or isinstance(response_substring, str)
d = defer.maybeDeferred(callable, *args, **kwargs)
def done(res):
if isinstance(res, failure.Failure):
res.trap(expected_failure)
if substring:
self.failUnless(substring in str(res),
"%s: substring '%s' not in '%s'"
% (which, substring, str(res)))
if response_substring:
self.failUnless(response_substring in res.value.response,
"%s: response substring '%s' not in '%s'"
% (which,
response_substring, res.value.response))
else:
self.fail("%s was supposed to raise %s, not get '%s'" %
(which, expected_failure, res))
d.addBoth(done)
return d
def should404(self, res, which):
if isinstance(res, failure.Failure):
res.trap(error.Error)
self.failUnlessEqual(res.value.status, "404")
else:
self.fail("%s was supposed to Error(404), not get '%s'" %
(which, res))
def shouldHTTPError(self, res, which, code=None, substring=None,
response_substring=None):
if isinstance(res, failure.Failure):
res.trap(error.Error)
if code is not None:
self.failUnlessEqual(res.value.status, str(code))
if substring:
self.failUnless(substring in str(res),
"substring '%s' not in '%s'"
% (substring, str(res)))
if response_substring:
self.failUnless(response_substring in res.value.response,
"response substring '%s' not in '%s'"
% (response_substring, res.value.response))
else:
self.fail("%s was supposed to Error(%s), not get '%s'" %
(which, code, res))
def shouldHTTPError2(self, which,
code=None, substring=None, response_substring=None,
callable=None, *args, **kwargs):
assert substring is None or isinstance(substring, str)
assert callable
d = defer.maybeDeferred(callable, *args, **kwargs)
d.addBoth(self.shouldHTTPError, which,
code, substring, response_substring)
return d
class Web(WebMixin, testutil.StallMixin, unittest.TestCase):
def test_create(self):
pass
def test_welcome(self):
d = self.GET("/")
def _check(res):
self.failUnless('Welcome To AllMyData' in res)
self.failUnless('Tahoe' in res)
self.s.basedir = 'web/test_welcome'
fileutil.make_dirs("web/test_welcome")
fileutil.make_dirs("web/test_welcome/private")
return self.GET("/")
d.addCallback(_check)
return d
def test_provisioning_math(self):
self.failUnlessEqual(provisioning.binomial(10, 0), 1)
self.failUnlessEqual(provisioning.binomial(10, 1), 10)
self.failUnlessEqual(provisioning.binomial(10, 2), 45)
self.failUnlessEqual(provisioning.binomial(10, 9), 10)
self.failUnlessEqual(provisioning.binomial(10, 10), 1)
def test_provisioning(self):
d = self.GET("/provisioning/")
def _check(res):
self.failUnless('Tahoe Provisioning Tool' in res)
fields = {'filled': True,
"num_users": int(50e3),
"files_per_user": 1000,
"space_per_user": int(1e9),
"sharing_ratio": 1.0,
"encoding_parameters": "3-of-10-5",
"num_servers": 30,
"ownership_mode": "A",
"download_rate": 100,
"upload_rate": 10,
"delete_rate": 10,
"lease_timer": 7,
}
return self.POST("/provisioning/", **fields)
d.addCallback(_check)
def _check2(res):
self.failUnless('Tahoe Provisioning Tool' in res)
self.failUnless("Share space consumed: 167.01TB" in res)
fields = {'filled': True,
"num_users": int(50e6),
"files_per_user": 1000,
"space_per_user": int(5e9),
"sharing_ratio": 1.0,
"encoding_parameters": "25-of-100-50",
"num_servers": 30000,
"ownership_mode": "E",
"drive_failure_model": "U",
"drive_size": 1000,
"download_rate": 1000,
"upload_rate": 100,
"delete_rate": 100,
"lease_timer": 7,
}
return self.POST("/provisioning/", **fields)
d.addCallback(_check2)
def _check3(res):
self.failUnless("Share space consumed: huge!" in res)
fields = {'filled': True}
return self.POST("/provisioning/", **fields)
d.addCallback(_check3)
def _check4(res):
self.failUnless("Share space consumed:" in res)
d.addCallback(_check4)
return d
def test_status(self):
dl_num = self.s.list_all_download_statuses()[0].get_counter()
ul_num = self.s.list_all_upload_statuses()[0].get_counter()
mu_num = self.s.list_all_mapupdate_statuses()[0].get_counter()
pub_num = self.s.list_all_publish_statuses()[0].get_counter()
ret_num = self.s.list_all_retrieve_statuses()[0].get_counter()
d = self.GET("/status", followRedirect=True)
def _check(res):
self.failUnless('Upload and Download Status' in res, res)
self.failUnless('"down-%d"' % dl_num in res, res)
self.failUnless('"up-%d"' % ul_num in res, res)
self.failUnless('"mapupdate-%d"' % mu_num in res, res)
self.failUnless('"publish-%d"' % pub_num in res, res)
self.failUnless('"retrieve-%d"' % ret_num in res, res)
d.addCallback(_check)
d.addCallback(lambda res: self.GET("/status/?t=json"))
def _check_json(res):
data = simplejson.loads(res)
self.failUnless(isinstance(data, dict))
active = data["active"]
# TODO: test more. We need a way to fake an active operation
# here.
d.addCallback(_check_json)
d.addCallback(lambda res: self.GET("/status/down-%d" % dl_num))
def _check_dl(res):
self.failUnless("File Download Status" in res, res)
d.addCallback(_check_dl)
d.addCallback(lambda res: self.GET("/status/up-%d" % ul_num))
def _check_ul(res):
self.failUnless("File Upload Status" in res, res)
d.addCallback(_check_ul)
d.addCallback(lambda res: self.GET("/status/mapupdate-%d" % mu_num))
def _check_mapupdate(res):
self.failUnless("Mutable File Servermap Update Status" in res, res)
d.addCallback(_check_mapupdate)
d.addCallback(lambda res: self.GET("/status/publish-%d" % pub_num))
def _check_publish(res):
self.failUnless("Mutable File Publish Status" in res, res)
d.addCallback(_check_publish)
d.addCallback(lambda res: self.GET("/status/retrieve-%d" % ret_num))
def _check_retrieve(res):
self.failUnless("Mutable File Retrieve Status" in res, res)
d.addCallback(_check_retrieve)
return d
def test_status_numbers(self):
drrm = status.DownloadResultsRendererMixin()
self.failUnlessEqual(drrm.render_time(None, None), "")
self.failUnlessEqual(drrm.render_time(None, 2.5), "2.50s")
self.failUnlessEqual(drrm.render_time(None, 0.25), "250ms")
self.failUnlessEqual(drrm.render_time(None, 0.0021), "2.1ms")
self.failUnlessEqual(drrm.render_time(None, 0.000123), "123us")
self.failUnlessEqual(drrm.render_rate(None, None), "")
self.failUnlessEqual(drrm.render_rate(None, 2500000), "2.50MBps")
self.failUnlessEqual(drrm.render_rate(None, 30100), "30.1kBps")
self.failUnlessEqual(drrm.render_rate(None, 123), "123Bps")
urrm = status.UploadResultsRendererMixin()
self.failUnlessEqual(urrm.render_time(None, None), "")
self.failUnlessEqual(urrm.render_time(None, 2.5), "2.50s")
self.failUnlessEqual(urrm.render_time(None, 0.25), "250ms")
self.failUnlessEqual(urrm.render_time(None, 0.0021), "2.1ms")
self.failUnlessEqual(urrm.render_time(None, 0.000123), "123us")
self.failUnlessEqual(urrm.render_rate(None, None), "")
self.failUnlessEqual(urrm.render_rate(None, 2500000), "2.50MBps")
self.failUnlessEqual(urrm.render_rate(None, 30100), "30.1kBps")
self.failUnlessEqual(urrm.render_rate(None, 123), "123Bps")
def test_GET_FILEURL(self):
d = self.GET(self.public_url + "/foo/bar.txt")
d.addCallback(self.failUnlessIsBarDotTxt)
return d
def test_HEAD_FILEURL(self):
d = self.HEAD(self.public_url + "/foo/bar.txt")
def _got(headers):
self.failUnlessEqual(headers["content-length"][0],
str(len(self.BAR_CONTENTS)))
self.failUnlessEqual(headers["content-type"], ["text/plain"])
d.addCallback(_got)
return d
def test_GET_FILEURL_named(self):
base = "/file/%s" % urllib.quote(self._bar_txt_uri)
base2 = "/named/%s" % urllib.quote(self._bar_txt_uri)
d = self.GET(base + "/@@name=/blah.txt")
d.addCallback(self.failUnlessIsBarDotTxt)
d.addCallback(lambda res: self.GET(base + "/blah.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
d.addCallback(lambda res: self.GET(base + "/ignore/lots/blah.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
d.addCallback(lambda res: self.GET(base2 + "/@@name=/blah.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
save_url = base + "?save=true&filename=blah.txt"
d.addCallback(lambda res: self.GET(save_url))
d.addCallback(self.failUnlessIsBarDotTxt) # TODO: check headers
u_filename = u"n\u00e9wer.txt" # n e-acute w e r . t x t
u_fn_e = urllib.quote(u_filename.encode("utf-8"))
u_url = base + "?save=true&filename=" + u_fn_e
d.addCallback(lambda res: self.GET(u_url))
d.addCallback(self.failUnlessIsBarDotTxt) # TODO: check headers
return d
def test_PUT_FILEURL_named_bad(self):
base = "/file/%s" % urllib.quote(self._bar_txt_uri)
d = self.shouldFail2(error.Error, "test_PUT_FILEURL_named_bad",
"400 Bad Request",
"/file can only be used with GET or HEAD",
self.PUT, base + "/@@name=/blah.txt", "")
return d
def test_GET_DIRURL_named_bad(self):
base = "/file/%s" % urllib.quote(self._foo_uri)
d = self.shouldFail2(error.Error, "test_PUT_DIRURL_named_bad",
"400 Bad Request",
"is not a file-cap",
self.GET, base + "/@@name=/blah.txt")
return d
def test_GET_slash_file_bad(self):
d = self.shouldFail2(error.Error, "test_GET_slash_file_bad",
"404 Not Found",
"/file must be followed by a file-cap and a name",
self.GET, "/file")
return d
def test_GET_unhandled_URI_named(self):
contents, n, newuri = self.makefile(12)
verifier_cap = n.get_verifier().to_string()
base = "/file/%s" % urllib.quote(verifier_cap)
# client.create_node_from_uri() can't handle verify-caps
d = self.shouldFail2(error.Error, "GET_unhandled_URI_named",
"400 Bad Request",
"is not a valid file- or directory- cap",
self.GET, base)
return d
def test_GET_unhandled_URI(self):
contents, n, newuri = self.makefile(12)
verifier_cap = n.get_verifier().to_string()
base = "/uri/%s" % urllib.quote(verifier_cap)
# client.create_node_from_uri() can't handle verify-caps
d = self.shouldFail2(error.Error, "test_GET_unhandled_URI",
"400 Bad Request",
"is not a valid file- or directory- cap",
self.GET, base)
return d
def test_GET_FILE_URI(self):
base = "/uri/%s" % urllib.quote(self._bar_txt_uri)
d = self.GET(base)
d.addCallback(self.failUnlessIsBarDotTxt)
return d
def test_GET_FILE_URI_badchild(self):
base = "/uri/%s/boguschild" % urllib.quote(self._bar_txt_uri)
errmsg = "Files have no children, certainly not named 'boguschild'"
d = self.shouldFail2(error.Error, "test_GET_FILE_URI_badchild",
"400 Bad Request", errmsg,
self.GET, base)
return d
def test_PUT_FILE_URI_badchild(self):
base = "/uri/%s/boguschild" % urllib.quote(self._bar_txt_uri)
errmsg = "Cannot create directory 'boguschild', because its parent is a file, not a directory"
d = self.shouldFail2(error.Error, "test_GET_FILE_URI_badchild",
"400 Bad Request", errmsg,
self.PUT, base, "")
return d
def test_GET_FILEURL_save(self):
d = self.GET(self.public_url + "/foo/bar.txt?filename=bar.txt&save=true")
# TODO: look at the headers, expect a Content-Disposition: attachment
# header.
d.addCallback(self.failUnlessIsBarDotTxt)
return d
def test_GET_FILEURL_missing(self):
d = self.GET(self.public_url + "/foo/missing")
d.addBoth(self.should404, "test_GET_FILEURL_missing")
return d
def test_PUT_NEWFILEURL(self):
d = self.PUT(self.public_url + "/foo/new.txt", self.NEWFILE_CONTENTS)
# TODO: we lose the response code, so we can't check this
#self.failUnlessEqual(responsecode, 201)
d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, u"new.txt")
d.addCallback(lambda res:
self.failUnlessChildContentsAre(self._foo_node, u"new.txt",
self.NEWFILE_CONTENTS))
return d
def test_PUT_NEWFILEURL_mutable(self):
d = self.PUT(self.public_url + "/foo/new.txt?mutable=true",
self.NEWFILE_CONTENTS)
# TODO: we lose the response code, so we can't check this
#self.failUnlessEqual(responsecode, 201)
def _check_uri(res):
u = uri.from_string_mutable_filenode(res)
self.failUnless(u.is_mutable())
self.failIf(u.is_readonly())
return res
d.addCallback(_check_uri)
d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, u"new.txt")
d.addCallback(lambda res:
self.failUnlessMutableChildContentsAre(self._foo_node,
u"new.txt",
self.NEWFILE_CONTENTS))
return d
def test_PUT_NEWFILEURL_mutable_toobig(self):
d = self.shouldFail2(error.Error, "test_PUT_NEWFILEURL_mutable_toobig",
"413 Request Entity Too Large",
"SDMF is limited to one segment, and 10001 > 10000",
self.PUT,
self.public_url + "/foo/new.txt?mutable=true",
"b" * (self.s.MUTABLE_SIZELIMIT+1))
return d
def test_PUT_NEWFILEURL_replace(self):
d = self.PUT(self.public_url + "/foo/bar.txt", self.NEWFILE_CONTENTS)
# TODO: we lose the response code, so we can't check this
#self.failUnlessEqual(responsecode, 200)
d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, u"bar.txt")
d.addCallback(lambda res:
self.failUnlessChildContentsAre(self._foo_node, u"bar.txt",
self.NEWFILE_CONTENTS))
return d
def test_PUT_NEWFILEURL_bad_t(self):
d = self.shouldFail2(error.Error, "PUT_bad_t", "400 Bad Request",
"PUT to a file: bad t=bogus",
self.PUT, self.public_url + "/foo/bar.txt?t=bogus",
"contents")
return d
def test_PUT_NEWFILEURL_no_replace(self):
d = self.PUT(self.public_url + "/foo/bar.txt?replace=false",
self.NEWFILE_CONTENTS)
d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_no_replace",
"409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it")
return d
def test_PUT_NEWFILEURL_mkdirs(self):
d = self.PUT(self.public_url + "/foo/newdir/new.txt", self.NEWFILE_CONTENTS)
fn = self._foo_node
d.addCallback(self.failUnlessURIMatchesChild, fn, u"newdir/new.txt")
d.addCallback(lambda res: self.failIfNodeHasChild(fn, u"new.txt"))
d.addCallback(lambda res: self.failUnlessNodeHasChild(fn, u"newdir"))
d.addCallback(lambda res:
self.failUnlessChildContentsAre(fn, u"newdir/new.txt",
self.NEWFILE_CONTENTS))
return d
def test_PUT_NEWFILEURL_blocked(self):
d = self.PUT(self.public_url + "/foo/blockingfile/new.txt",
self.NEWFILE_CONTENTS)
d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_blocked",
"409 Conflict",
"Unable to create directory 'blockingfile': a file was in the way")
return d
def test_DELETE_FILEURL(self):
d = self.DELETE(self.public_url + "/foo/bar.txt")
d.addCallback(lambda res:
self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
return d
def test_DELETE_FILEURL_missing(self):
d = self.DELETE(self.public_url + "/foo/missing")
d.addBoth(self.should404, "test_DELETE_FILEURL_missing")
return d
def test_DELETE_FILEURL_missing2(self):
d = self.DELETE(self.public_url + "/missing/missing")
d.addBoth(self.should404, "test_DELETE_FILEURL_missing2")
return d
def test_GET_FILEURL_json(self):
# twisted.web.http.parse_qs ignores any query args without an '=', so
# I can't do "GET /path?json", I have to do "GET /path/t=json"
# instead. This may make it tricky to emulate the S3 interface
# completely.
d = self.GET(self.public_url + "/foo/bar.txt?t=json")
d.addCallback(self.failUnlessIsBarJSON)
return d
def test_GET_FILEURL_json_missing(self):
d = self.GET(self.public_url + "/foo/missing?json")
d.addBoth(self.should404, "test_GET_FILEURL_json_missing")
return d
def test_GET_FILEURL_uri(self):
d = self.GET(self.public_url + "/foo/bar.txt?t=uri")
def _check(res):
self.failUnlessEqual(res, self._bar_txt_uri)
d.addCallback(_check)
d.addCallback(lambda res:
self.GET(self.public_url + "/foo/bar.txt?t=readonly-uri"))
def _check2(res):
# for now, for files, uris and readonly-uris are the same
self.failUnlessEqual(res, self._bar_txt_uri)
d.addCallback(_check2)
return d
def test_GET_FILEURL_badtype(self):
d = self.shouldHTTPError2("GET t=bogus", 400, "Bad Request",
"bad t=bogus",
self.GET,
self.public_url + "/foo/bar.txt?t=bogus")
return d
def test_GET_FILEURL_uri_missing(self):
d = self.GET(self.public_url + "/foo/missing?t=uri")
d.addBoth(self.should404, "test_GET_FILEURL_uri_missing")
return d
def test_GET_DIRURL(self):
# the addSlash means we get a redirect here
# from /uri/$URI/foo/ , we need ../../../ to get back to the root
ROOT = "../../.."
d = self.GET(self.public_url + "/foo", followRedirect=True)
def _check(res):
self.failUnless(('Return to Welcome page' % ROOT)
in res, res)
# the FILE reference points to a URI, but it should end in bar.txt
bar_url = ("%s/file/%s/@@named=/bar.txt" %
(ROOT, urllib.quote(self._bar_txt_uri)))
get_bar = "".join([r'
' % len(self.BAR_CONTENTS),
])
self.failUnless(re.search(get_bar, res), res)
for line in res.split("\n"):
# find the line that contains the delete button for bar.txt
if ("form action" in line and
'value="delete"' in line and
'value="bar.txt"' in line):
# the form target should use a relative URL
foo_url = urllib.quote("%s/uri/%s/" % (ROOT, self._foo_uri))
self.failUnless(('action="%s"' % foo_url) in line, line)
# and the when_done= should too
#done_url = urllib.quote(???)
#self.failUnless(('name="when_done" value="%s"' % done_url)
# in line, line)
break
else:
self.fail("unable to find delete-bar.txt line", res)
# the DIR reference just points to a URI
sub_url = ("%s/uri/%s/" % (ROOT, urllib.quote(self._sub_uri)))
get_sub = ((r'
')
self.failUnless(re.search(get_sub, res), res)
d.addCallback(_check)
# look at a directory which is readonly
d.addCallback(lambda res:
self.GET(self.public_url + "/reedownlee", followRedirect=True))
def _check2(res):
self.failUnless("(readonly)" in res, res)
self.failIf("Upload a file" in res, res)
d.addCallback(_check2)
# and at a directory that contains a readonly directory
d.addCallback(lambda res:
self.GET(self.public_url, followRedirect=True))
def _check3(res):
self.failUnless(re.search(r'