import re, os.path, urllib from zope.interface import implements from twisted.application import service from twisted.trial import unittest from twisted.internet import defer from twisted.web import client, error from twisted.python import failure, log from allmydata import webish, interfaces, dirnode, uri from allmydata.encode import NotEnoughPeersError from allmydata.util import fileutil import itertools # create a fake uploader/downloader, and a couple of fake dirnodes, then # create a webserver that works against them class MyClient(service.MultiService): nodeid = "fake_nodeid" def get_versions(self): return {'allmydata': "fake", 'foolscap': "fake", 'twisted': "fake", 'zfec': "fake", } introducer_furl = "None" def connected_to_introducer(self): return False def get_all_peerids(self): return [] class MyDownloader(service.Service): implements(interfaces.IDownloader) name = "downloader" def __init__(self, files): self.files = files def download(self, uri, target): print "DOWNLOADING", uri if uri not in self.files: e = NotEnoughPeersError() f = failure.Failure(e) target.fail(f) return defer.fail(f) data = self.files[uri] target.open(len(data)) target.write(data) target.close() return defer.maybeDeferred(target.finish) uri_counter = itertools.count() class MyUploader(service.Service): implements(interfaces.IUploader) name = "uploader" def __init__(self, files): self.files = files def upload(self, uploadable): f = uploadable.get_filehandle() data = f.read() uri = str(uri_counter.next()) self.files[uri] = data uploadable.close_filehandle(f) return defer.succeed(uri) class MyDirectoryNode(dirnode.MutableDirectoryNode): def __init__(self, nodes, files, client, uri=None): self._my_nodes = nodes self._my_files = files self._my_client = client if uri is None: uri = str(uri_counter.next()) self._uri = str(uri) self._my_nodes[self._uri] = self self.children = {} self._mutable = True def get_immutable_uri(self): return self.get_uri() + "RO" def get(self, name): def _try(): uri = self.children[name] if uri not in self._my_nodes: raise IndexError("this isn't supposed to happen") return self._my_nodes[uri] return defer.maybeDeferred(_try) def set_uri(self, name, child_uri): self.children[name] = child_uri return defer.succeed(None) def add_file(self, name, uploadable): f = uploadable.get_filehandle() data = f.read() uri = str(uri_counter.next()) self._my_files[uri] = data self._my_nodes[uri] = MyFileNode(uri, self._my_client) uploadable.close_filehandle(f) self.children[name] = uri return defer.succeed(self._my_nodes[uri]) def delete(self, name): def _try(): del self.children[name] return defer.maybeDeferred(_try) def create_empty_directory(self, name): node = MyDirectoryNode(self._my_nodes, self._my_files, self._my_client) self.children[name] = node.get_uri() return defer.succeed(node) def list(self): kids = dict([(name, self._my_nodes[uri]) for name,uri in self.children.iteritems()]) return defer.succeed(kids) class MyFileNode(dirnode.FileNode): pass class MyVirtualDrive(service.Service): name = "vdrive" public_root = None private_root = None def __init__(self, nodes): self._my_nodes = nodes def have_public_root(self): return bool(self.public_root) def have_private_root(self): return bool(self.private_root) def get_public_root(self): return defer.succeed(self.public_root) def get_private_root(self): return defer.succeed(self.private_root) def get_node(self, uri): def _try(): return self._my_nodes[uri] return defer.maybeDeferred(_try) class Web(unittest.TestCase): def setUp(self): self.s = MyClient() self.s.startService() s = webish.WebishServer("0") s.setServiceParent(self.s) port = s.listener._port.getHost().port self.webish_url = "http://localhost:%d" % port self.nodes = {} # maps URI to node self.files = {} # maps file URI to contents v = MyVirtualDrive(self.nodes) v.setServiceParent(self.s) dl = MyDownloader(self.files) dl.setServiceParent(self.s) ul = MyUploader(self.files) ul.setServiceParent(self.s) v.public_root = self.makedir() self.public_root = v.public_root v.private_root = self.makedir() foo = self.makedir() self._foo_node = foo self._foo_uri = foo.get_uri() self._foo_readonly_uri = foo.get_immutable_uri() v.public_root.children["foo"] = foo.get_uri() self._bar_txt_uri = self.makefile(0) self.BAR_CONTENTS = self.files[self._bar_txt_uri] foo.children["bar.txt"] = self._bar_txt_uri foo.children["empty"] = self.makedir().get_uri() sub_uri = foo.children["sub"] = self.makedir().get_uri() sub = self.nodes[sub_uri] blocking_uri = self.makefile(1) foo.children["blockingfile"] = blocking_uri baz_file = self.makefile(2) sub.children["baz.txt"] = baz_file # public/ # public/foo/ # public/foo/bar.txt # public/foo/blockingfile # public/foo/empty/ # public/foo/sub/ # public/foo/sub/baz.txt self.NEWFILE_CONTENTS = "newfile contents\n" def makefile(self, number): n = str(number) assert len(n) == 1 newuri = uri.pack_uri("SI" + n*30, "K" + n*15, "EH" + n*30, 25, 100, 123+number) assert newuri not in self.nodes assert newuri not in self.files node = MyFileNode(newuri, self.s) self.nodes[newuri] = node contents = "contents of file %s\n" % n self.files[newuri] = contents return newuri def makedir(self): node = MyDirectoryNode(self.nodes, self.files, self.s) return node def tearDown(self): return self.s.stopService() def failUnlessIsBarDotTxt(self, res): self.failUnlessEqual(res, self.BAR_CONTENTS) def failUnlessIsFooJSON(self, res): self.failUnless("JSONny stuff here" in res) self.failUnless("name=bar.txt, child_uri=%s" % self._bar_txt_uri in res) self.failUnless("name=blockingfile" in res) def GET(self, urlpath, followRedirect=False): url = self.webish_url + urlpath return client.getPage(url, method="GET", followRedirect=followRedirect) def PUT(self, urlpath, data): url = self.webish_url + urlpath return client.getPage(url, method="PUT", postdata=data) def DELETE(self, urlpath): url = self.webish_url + urlpath return client.getPage(url, method="DELETE") def POST(self, urlpath, **fields): url = self.webish_url + urlpath sepbase = "boogabooga" sep = "--" + sepbase form = [] form.append(sep) form.append('Content-Disposition: form-data; name="_charset"') form.append('') form.append('UTF-8') form.append(sep) for name, value in fields.iteritems(): if isinstance(value, tuple): filename, value = value form.append('Content-Disposition: form-data; name="%s"; ' 'filename="%s"' % (name, filename)) else: form.append('Content-Disposition: form-data; name="%s"' % name) form.append('') form.append(value) form.append(sep) form[-1] += "--" body = "\r\n".join(form) + "\r\n" headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase, } print "BODY", body return client.getPage(url, method="POST", postdata=body, headers=headers, followRedirect=False) def shouldFail(self, res, expected_failure, which, substring=None): print "SHOULDFAIL", res if isinstance(res, failure.Failure): res.trap(expected_failure) if substring: self.failUnless(substring in str(res), "substring '%s' not in '%s'" % (substring, str(res))) else: self.fail("%s was supposed to raise %s, not get '%s'" % (which, expected_failure, res)) def should404(self, res, which): if isinstance(res, failure.Failure): res.trap(error.Error) self.failUnlessEqual(res.value.status, "404") else: self.fail("%s was supposed to Error(404), not get '%s'" % (which, res)) def test_create(self): # YES pass def test_welcome(self): # YES d = self.GET("") return d def test_GET_FILEURL(self): # YES d = self.GET("/vdrive/global/foo/bar.txt") d.addCallback(self.failUnlessIsBarDotTxt) return d def test_GET_FILEURL_missing(self): # YES d = self.GET("/vdrive/global/foo/missing") def _oops(f): print f print dir(f) print f.value print dir(f.value) print f.value.args print f.value.response print f.value.status return f #d.addBoth(_oops) d.addBoth(self.should404, "test_GET_FILEURL_missing") return d def test_PUT_NEWFILEURL(self): # YES d = self.PUT("/vdrive/global/foo/new.txt", self.NEWFILE_CONTENTS) def _check(res): self.failUnless("new.txt" in self._foo_node.children) new_uri = self._foo_node.children["new.txt"] new_contents = self.files[new_uri] self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS) self.failUnlessEqual(res.strip(), new_uri) d.addCallback(_check) return d def test_PUT_NEWFILEURL_mkdirs(self): # YES d = self.PUT("/vdrive/global/foo/newdir/new.txt", self.NEWFILE_CONTENTS) def _check(res): self.failIf("new.txt" in self._foo_node.children) self.failUnless("newdir" in self._foo_node.children) newdir_uri = self._foo_node.children["newdir"] newdir_node = self.nodes[newdir_uri] self.failUnless("new.txt" in newdir_node.children) new_uri = newdir_node.children["new.txt"] new_contents = self.files[new_uri] self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS) self.failUnlessEqual(res.strip(), new_uri) d.addCallback(_check) return d def test_PUT_NEWFILEURL_blocked(self): # YES d = self.PUT("/vdrive/global/foo/blockingfile/new.txt", self.NEWFILE_CONTENTS) d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_blocked", "403 Forbidden") return d def test_DELETE_FILEURL(self): # YES d = self.DELETE("/vdrive/global/foo/bar.txt") def _check(res): self.failIf("bar.txt" in self._foo_node.children) d.addCallback(_check) return d def test_DELETE_FILEURL_missing(self): # YES d = self.DELETE("/vdrive/global/foo/missing") d.addBoth(self.should404, "test_DELETE_FILEURL_missing") return d def test_DELETE_FILEURL_missing2(self): # YES d = self.DELETE("/vdrive/global/missing/missing") d.addBoth(self.should404, "test_DELETE_FILEURL_missing2") return d def test_GET_FILEURL_json(self): # YES # twisted.web.http.parse_qs ignores any query args without an '=', so # I can't do "GET /path?json", I have to do "GET /path/t=json" # instead. This may make it tricky to emulate the S3 interface # completely. d = self.GET("/vdrive/global/foo/bar.txt?t=json") def _got(json): # TODO self.failUnless("JSON" in json, json) d.addCallback(_got) return d def test_GET_FILEURL_json_missing(self): # YES d = self.GET("/vdrive/global/foo/missing?json") d.addBoth(self.should404, "test_GET_FILEURL_json_missing") return d def test_GET_FILEURL_localfile(self): # YES localfile = os.path.abspath("web/GET_FILEURL_localfile") fileutil.make_dirs("web") d = self.GET("/vdrive/global/foo/bar.txt?localfile=%s" % localfile) def _done(res): self.failUnless(os.path.exists(localfile)) data = open(localfile, "rb").read() self.failUnlessEqual(data, self.BAR_CONTENTS) d.addCallback(_done) return d def test_GET_FILEURL_localfile_nonlocal(self): # YES # TODO: somehow pretend that we aren't local, and verify that the # server refuses to write to local files, probably by changing the # server's idea of what counts as "local". old_LOCALHOST = webish.LOCALHOST webish.LOCALHOST = "127.0.0.2" localfile = os.path.abspath("web/GET_FILEURL_localfile_nonlocal") fileutil.make_dirs("web") d = self.GET("/vdrive/global/foo/bar.txt?localfile=%s" % localfile) d.addBoth(self.shouldFail, error.Error, "localfile non-local", "403 Forbidden") def _check(res): self.failIf(os.path.exists(localfile)) d.addCallback(_check) def _reset(res): print "RESETTING", res webish.LOCALHOST = old_LOCALHOST return res d.addBoth(_reset) return d def test_GET_FILEURL_localfile_nonabsolute(self): localfile = "web/nonabsolute/path" fileutil.make_dirs("web/nonabsolute") d = self.GET("/vdrive/global/foo/bar.txt?localfile=%s" % localfile) d.addBoth(self.shouldFail, error.Error, "localfile non-absolute", "403 Forbidden") def _check(res): self.failIf(os.path.exists(localfile)) d.addCallback(_check) return d def test_PUT_NEWFILEURL_localfile(self): # YES localfile = os.path.abspath("web/PUT_NEWFILEURL_localfile") fileutil.make_dirs("web") f = open(localfile, "wb") f.write(self.NEWFILE_CONTENTS) f.close() d = self.PUT("/vdrive/global/foo/new.txt?localfile=%s" % localfile, "") def _check(res): self.failUnless("new.txt" in self._foo_node.children) new_uri = self._foo_node.children["new.txt"] new_contents = self.files[new_uri] self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS) self.failUnlessEqual(res.strip(), new_uri) d.addCallback(_check) return d def test_PUT_NEWFILEURL_localfile_mkdirs(self): # YES localfile = os.path.abspath("web/PUT_NEWFILEURL_localfile_mkdirs") fileutil.make_dirs("web") f = open(localfile, "wb") f.write(self.NEWFILE_CONTENTS) f.close() d = self.PUT("/vdrive/global/foo/newdir/new.txt?localfile=%s" % localfile, "") def _check(res): self.failIf("new.txt" in self._foo_node.children) self.failUnless("newdir" in self._foo_node.children) newdir_uri = self._foo_node.children["newdir"] newdir_node = self.nodes[newdir_uri] self.failUnless("new.txt" in newdir_node.children) new_uri = newdir_node.children["new.txt"] new_contents = self.files[new_uri] self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS) self.failUnlessEqual(res.strip(), new_uri) d.addCallback(_check) return d def test_GET_FILEURL_uri(self): # YES d = self.GET("/vdrive/global/foo/bar.txt?t=uri") def _check(res): self.failUnlessEqual(res, self._bar_txt_uri) d.addCallback(_check) return d def test_GET_FILEURL_uri_missing(self): # YES d = self.GET("/vdrive/global/foo/missing?t=uri") d.addBoth(self.should404, "test_GET_FILEURL_uri_missing") return d def test_GET_DIRURL(self): # YES # the addSlash means we get a redirect here d = self.GET("/vdrive/global/foo", followRedirect=True) def _check(res): self.failUnless(re.search(r'