mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-20 11:38:52 +00:00
refactor web tests, and interfaces.IFileNode
This commit is contained in:
parent
4b8c2e93b3
commit
f190382d5e
@ -382,27 +382,6 @@ class IReadonlyNewDirectoryURI(Interface):
|
||||
|
||||
|
||||
class IFilesystemNode(Interface):
|
||||
def get_uri():
|
||||
"""Return the URI that can be used by others to get access to this
|
||||
file or directory.
|
||||
"""
|
||||
|
||||
def get_verifier():
|
||||
"""Return an IVerifierURI instance that represents the
|
||||
'verifiy/refresh capability' for this node. The holder of this
|
||||
capability will be able to renew the lease for this node, protecting
|
||||
it from garbage-collection. They will also be able to ask a server if
|
||||
it holds a share for the file or directory.
|
||||
"""
|
||||
|
||||
def check():
|
||||
"""Perform a file check. See IChecker.check for details."""
|
||||
|
||||
def is_mutable():
|
||||
"""Return True if this file or directory is mutable, False if it is read-only.
|
||||
"""
|
||||
|
||||
class IMutableFilesystemNode(IFilesystemNode):
|
||||
def get_uri():
|
||||
"""
|
||||
Return the URI that can be used by others to get access to this
|
||||
@ -423,6 +402,35 @@ class IMutableFilesystemNode(IFilesystemNode):
|
||||
get_readonly_uri() will return the same thing as get_uri().
|
||||
"""
|
||||
|
||||
def get_verifier():
|
||||
"""Return an IVerifierURI instance that represents the
|
||||
'verifiy/refresh capability' for this node. The holder of this
|
||||
capability will be able to renew the lease for this node, protecting
|
||||
it from garbage-collection. They will also be able to ask a server if
|
||||
it holds a share for the file or directory.
|
||||
"""
|
||||
|
||||
def check():
|
||||
"""Perform a file check. See IChecker.check for details."""
|
||||
|
||||
def is_readonly():
|
||||
"""Return True if this reference provides mutable access to the given
|
||||
file or directory (i.e. if you can modify it), or False if not. Note
|
||||
that even if this reference is read-only, someone else may hold a
|
||||
read-write reference to it."""
|
||||
|
||||
def is_mutable():
|
||||
"""Return True if this file or directory is mutable (by *somebody*,
|
||||
not necessarily you), False if it is is immutable. Note that a file
|
||||
might be mutable overall, but your reference to it might be
|
||||
read-only. On the other hand, all references to an immutable file
|
||||
will be read-only; there are no read-write references to an immutable
|
||||
file.
|
||||
"""
|
||||
|
||||
class IMutableFilesystemNode(IFilesystemNode):
|
||||
pass
|
||||
|
||||
class IFileNode(IFilesystemNode):
|
||||
def download(target):
|
||||
"""Download the file's contents to a given IDownloadTarget"""
|
||||
@ -488,7 +496,9 @@ class IDirectoryNode(IMutableFilesystemNode):
|
||||
|
||||
def list():
|
||||
"""I return a Deferred that fires with a dictionary mapping child
|
||||
name to an IFileNode or IDirectoryNode."""
|
||||
name to (node, metadata_dict) tuples, in which 'node' is either an
|
||||
IFileNode or IDirectoryNode, and 'metadata_dict' is a dictionary of
|
||||
metadata."""
|
||||
|
||||
def has_child(name):
|
||||
"""I return a Deferred that fires with a boolean, True if there
|
||||
|
113
src/allmydata/test/common.py
Normal file
113
src/allmydata/test/common.py
Normal file
@ -0,0 +1,113 @@
|
||||
|
||||
import os
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from twisted.python import failure
|
||||
from allmydata import uri, dirnode
|
||||
from allmydata.interfaces import IURI, IMutableFileNode, IFileNode
|
||||
from allmydata.encode import NotEnoughPeersError
|
||||
|
||||
class FakeCHKFileNode:
|
||||
"""I provide IFileNode, but all of my data is stored in a class-level
|
||||
dictionary."""
|
||||
implements(IFileNode)
|
||||
all_contents = {}
|
||||
|
||||
def __init__(self, u, client):
|
||||
self.client = client
|
||||
self.my_uri = u.to_string()
|
||||
|
||||
def get_uri(self):
|
||||
return self.my_uri
|
||||
def get_readonly_uri(self):
|
||||
return self.my_uri
|
||||
def get_verifier(self):
|
||||
return IURI(self.my_uri).get_verifier()
|
||||
def check(self):
|
||||
return defer.succeed(None)
|
||||
def is_mutable(self):
|
||||
return False
|
||||
def is_readonly(self):
|
||||
return True
|
||||
|
||||
def download(self, target):
|
||||
if self.my_uri not in self.all_contents:
|
||||
f = failure.Failure(NotEnoughPeersError())
|
||||
target.fail(f)
|
||||
return defer.fail(f)
|
||||
data = self.all_contents[self.my_uri]
|
||||
target.open(len(data))
|
||||
target.write(data)
|
||||
target.close()
|
||||
return defer.maybeDeferred(target.finish)
|
||||
def download_to_data(self):
|
||||
if self.my_uri not in self.all_contents:
|
||||
return defer.fail(NotEnoughPeersError())
|
||||
data = self.all_contents[self.my_uri]
|
||||
return defer.succeed(data)
|
||||
def get_size(self):
|
||||
data = self.all_contents[self.my_uri]
|
||||
return len(data)
|
||||
|
||||
def make_chk_file_uri(size):
|
||||
return uri.CHKFileURI(key=os.urandom(16),
|
||||
uri_extension_hash=os.urandom(32),
|
||||
needed_shares=3,
|
||||
total_shares=10,
|
||||
size=size)
|
||||
|
||||
def create_chk_filenode(client, contents):
|
||||
u = make_chk_file_uri(len(contents))
|
||||
n = FakeCHKFileNode(u, client)
|
||||
FakeCHKFileNode.all_contents[u.to_string()] = contents
|
||||
return n
|
||||
|
||||
|
||||
class FakeMutableFileNode:
|
||||
"""I provide IMutableFileNode, but all of my data is stored in a
|
||||
class-level dictionary."""
|
||||
|
||||
implements(IMutableFileNode)
|
||||
all_contents = {}
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
self.my_uri = make_mutable_file_uri()
|
||||
self.storage_index = self.my_uri.storage_index
|
||||
def create(self, initial_contents, wait_for_numpeers=None):
|
||||
self.all_contents[self.storage_index] = initial_contents
|
||||
return defer.succeed(self)
|
||||
def init_from_uri(self, myuri):
|
||||
self.my_uri = IURI(myuri)
|
||||
self.storage_index = self.my_uri.storage_index
|
||||
return self
|
||||
def get_uri(self):
|
||||
return self.my_uri
|
||||
def is_readonly(self):
|
||||
return self.my_uri.is_readonly()
|
||||
def is_mutable(self):
|
||||
return self.my_uri.is_mutable()
|
||||
def download_to_data(self):
|
||||
return defer.succeed(self.all_contents[self.storage_index])
|
||||
def get_writekey(self):
|
||||
return "\x00"*16
|
||||
|
||||
def replace(self, new_contents, wait_for_numpeers=None):
|
||||
assert not self.is_readonly()
|
||||
self.all_contents[self.storage_index] = new_contents
|
||||
return defer.succeed(None)
|
||||
|
||||
|
||||
def make_mutable_file_uri():
|
||||
return uri.WriteableSSKFileURI(writekey=os.urandom(16),
|
||||
fingerprint=os.urandom(32))
|
||||
def make_verifier_uri():
|
||||
return uri.SSKVerifierURI(storage_index=os.urandom(16),
|
||||
fingerprint=os.urandom(32))
|
||||
|
||||
class NonGridDirectoryNode(dirnode.NewDirectoryNode):
|
||||
"""This offers IDirectoryNode, but uses a FakeMutableFileNode for the
|
||||
backing store, so it doesn't go to the grid. The child data is still
|
||||
encrypted and serialized, so this isn't useful for tests that want to
|
||||
look inside the dirnodes and check their contents.
|
||||
"""
|
||||
filenode_class = FakeMutableFileNode
|
@ -1,62 +1,18 @@
|
||||
|
||||
import os
|
||||
from zope.interface import implements
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from allmydata import uri, dirnode, upload
|
||||
from allmydata.interfaces import IURI, IClient, IMutableFileNode, \
|
||||
INewDirectoryURI, IReadonlyNewDirectoryURI, IFileNode
|
||||
from allmydata.util import hashutil, testutil
|
||||
from allmydata.test.common import make_chk_file_uri, make_mutable_file_uri, \
|
||||
NonGridDirectoryNode, create_chk_filenode
|
||||
|
||||
# to test dirnode.py, we want to construct a tree of real DirectoryNodes that
|
||||
# contain pointers to fake files. We start with a fake MutableFileNode that
|
||||
# stores all of its data in a static table.
|
||||
|
||||
def make_chk_file_uri(size):
|
||||
return uri.CHKFileURI(key=os.urandom(16),
|
||||
uri_extension_hash=os.urandom(32),
|
||||
needed_shares=3,
|
||||
total_shares=10,
|
||||
size=size)
|
||||
|
||||
def make_mutable_file_uri():
|
||||
return uri.WriteableSSKFileURI(writekey=os.urandom(16),
|
||||
fingerprint=os.urandom(32))
|
||||
def make_verifier_uri():
|
||||
return uri.SSKVerifierURI(storage_index=os.urandom(16),
|
||||
fingerprint=os.urandom(32))
|
||||
|
||||
class FakeMutableFileNode:
|
||||
implements(IMutableFileNode)
|
||||
all_contents = {}
|
||||
def __init__(self, client):
|
||||
self.client = client
|
||||
self.my_uri = make_mutable_file_uri()
|
||||
self.storage_index = self.my_uri.storage_index
|
||||
def create(self, initial_contents, wait_for_numpeers=None):
|
||||
self.all_contents[self.storage_index] = initial_contents
|
||||
return defer.succeed(self)
|
||||
def init_from_uri(self, myuri):
|
||||
self.my_uri = IURI(myuri)
|
||||
self.storage_index = self.my_uri.storage_index
|
||||
return self
|
||||
def get_uri(self):
|
||||
return self.my_uri
|
||||
def is_readonly(self):
|
||||
return self.my_uri.is_readonly()
|
||||
def is_mutable(self):
|
||||
return self.my_uri.is_mutable()
|
||||
def download_to_data(self):
|
||||
return defer.succeed(self.all_contents[self.storage_index])
|
||||
def get_writekey(self):
|
||||
return "\x00"*16
|
||||
|
||||
def replace(self, new_contents, wait_for_numpeers=None):
|
||||
self.all_contents[self.storage_index] = new_contents
|
||||
return defer.succeed(None)
|
||||
|
||||
class MyDirectoryNode(dirnode.NewDirectoryNode):
|
||||
filenode_class = FakeMutableFileNode
|
||||
MyDirectoryNode = NonGridDirectoryNode
|
||||
|
||||
class Marker:
|
||||
implements(IFileNode, IMutableFileNode) # sure, why not
|
||||
@ -81,16 +37,14 @@ class Marker:
|
||||
|
||||
class FakeClient:
|
||||
implements(IClient)
|
||||
chk_contents = {}
|
||||
|
||||
def upload(self, uploadable, wait_for_numpeers):
|
||||
d = uploadable.get_size()
|
||||
d.addCallback(lambda size: uploadable.read(size))
|
||||
def _got_data(datav):
|
||||
data = "".join(datav)
|
||||
u = make_chk_file_uri(len(data))
|
||||
self.chk_contents[u] = data
|
||||
return u
|
||||
n = create_chk_filenode(self, data)
|
||||
return n.get_uri()
|
||||
d.addCallback(_got_data)
|
||||
return d
|
||||
|
||||
|
@ -11,6 +11,9 @@ from allmydata.interfaces import IURI, INewDirectoryURI, \
|
||||
from allmydata.filenode import LiteralFileNode
|
||||
import sha
|
||||
|
||||
#from allmydata.test.common import FakeMutableFileNode
|
||||
#FakeFilenode = FakeMutableFileNode
|
||||
|
||||
class FakeFilenode(mutable.MutableFileNode):
|
||||
counter = itertools.count(1)
|
||||
all_contents = {}
|
||||
|
@ -1,17 +1,14 @@
|
||||
|
||||
import re, os.path, urllib
|
||||
from zope.interface import implements
|
||||
from twisted.application import service
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from twisted.web import client, error, http
|
||||
from twisted.python import failure, log
|
||||
from allmydata import dirnode, webish, interfaces, uri, provisioning, filenode
|
||||
from allmydata.encode import NotEnoughPeersError
|
||||
from allmydata import webish, interfaces, provisioning
|
||||
from allmydata.util import fileutil
|
||||
import itertools
|
||||
|
||||
import test_mutable
|
||||
from allmydata.test.common import NonGridDirectoryNode, FakeCHKFileNode, FakeMutableFileNode, create_chk_filenode
|
||||
from allmydata.interfaces import IURI, INewDirectoryURI, IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI
|
||||
|
||||
# create a fake uploader/downloader, and a couple of fake dirnodes, then
|
||||
# create a webserver that works against them
|
||||
@ -32,73 +29,36 @@ class MyClient(service.MultiService):
|
||||
return []
|
||||
|
||||
def create_node_from_uri(self, uri):
|
||||
return self.my_nodes[uri]
|
||||
u = IURI(uri)
|
||||
if (INewDirectoryURI.providedBy(u)
|
||||
or IReadonlyNewDirectoryURI.providedBy(u)):
|
||||
return NonGridDirectoryNode(self).init_from_uri(u)
|
||||
if IFileURI.providedBy(u):
|
||||
return FakeCHKFileNode(u, self)
|
||||
assert IMutableFileURI.providedBy(u), u
|
||||
return FakeMutableFileNode(self).init_from_uri(u)
|
||||
|
||||
def create_empty_dirnode(self, wait_for_numpeers=None):
|
||||
n = FakeDirectoryNode(self)
|
||||
r = defer.succeed(n.fake_create(wait_for_numpeers=1))
|
||||
self.my_nodes[n.get_uri()] = n
|
||||
nro = FakeDirectoryNode(self)
|
||||
nro.init_from_uri(n.get_readonly_uri())
|
||||
nro._node.all_rw_friends[nro._node.get_uri()] = n._node.get_uri()
|
||||
self.my_nodes[nro.get_uri()] = nro
|
||||
return r
|
||||
n = NonGridDirectoryNode(self)
|
||||
d = n.create(wait_for_numpeers)
|
||||
d.addCallback(lambda res: n)
|
||||
return d
|
||||
|
||||
def upload(self, uploadable, wait_for_numpeers=None):
|
||||
uploader = self.getServiceNamed("uploader")
|
||||
return uploader.upload(uploadable, wait_for_numpeers=wait_for_numpeers)
|
||||
|
||||
|
||||
class MyDownloader(service.Service):
|
||||
implements(interfaces.IDownloader)
|
||||
name = "downloader"
|
||||
def __init__(self, files):
|
||||
self.files = files
|
||||
|
||||
def download(self, uri, target):
|
||||
if uri not in self.files:
|
||||
e = NotEnoughPeersError()
|
||||
f = failure.Failure(e)
|
||||
target.fail(f)
|
||||
return defer.fail(f)
|
||||
data = self.files[uri]
|
||||
target.open(len(data))
|
||||
target.write(data)
|
||||
target.close()
|
||||
return defer.maybeDeferred(target.finish)
|
||||
|
||||
uri_counter = itertools.count()
|
||||
|
||||
def make_newuri(data):
|
||||
n = uri_counter.next()
|
||||
assert len(str(n)) < 5
|
||||
newuri = uri.CHKFileURI(key="K%05d" % n + "k"*10,
|
||||
uri_extension_hash="EH" + "h"*30,
|
||||
needed_shares=25,
|
||||
total_shares=100,
|
||||
size=len(data))
|
||||
return newuri.to_string()
|
||||
|
||||
class MyUploader(service.Service):
|
||||
implements(interfaces.IUploader)
|
||||
name = "uploader"
|
||||
def __init__(self, files, nodes):
|
||||
self.files = files
|
||||
self.nodes = nodes
|
||||
def create_mutable_file(self, contents="", wait_for_numpeers=None):
|
||||
n = FakeMutableFileNode(self)
|
||||
return n.create(contents)
|
||||
|
||||
def upload(self, uploadable, wait_for_numpeers=None):
|
||||
d = uploadable.get_size()
|
||||
d.addCallback(lambda size: uploadable.read(size))
|
||||
d.addCallback(lambda data: "".join(data))
|
||||
def _got_data(data):
|
||||
newuri = make_newuri(data)
|
||||
self.files[newuri] = data
|
||||
self.nodes[newuri] = MyFileNode(newuri, self.parent)
|
||||
uploadable.close()
|
||||
return newuri
|
||||
def _got_data(datav):
|
||||
data = "".join(datav)
|
||||
n = create_chk_filenode(self, data)
|
||||
return n.get_uri()
|
||||
d.addCallback(_got_data)
|
||||
return d
|
||||
|
||||
|
||||
def syncwrap(meth):
|
||||
"""
|
||||
syncwrap invokes a method, assumes that it fired its deferred
|
||||
@ -112,8 +72,8 @@ def syncwrap(meth):
|
||||
return l[0]
|
||||
return _syncwrapped_meth
|
||||
|
||||
class FakeDirectoryNode(dirnode.NewDirectoryNode):
|
||||
filenode_class = test_mutable.FakeFilenode
|
||||
class FakeDirectoryNode: #(dirnode.NewDirectoryNode):
|
||||
#filenode_class = test_mutable.FakeFilenode
|
||||
|
||||
@syncwrap
|
||||
def fake_create(self, wait_for_numpeers=None):
|
||||
@ -135,9 +95,6 @@ class FakeDirectoryNode(dirnode.NewDirectoryNode):
|
||||
def fake_set_uri(self, name, uri):
|
||||
return self.set_uri(name, uri)
|
||||
|
||||
class MyFileNode(filenode.FileNode):
|
||||
pass
|
||||
|
||||
class WebMixin(object):
|
||||
def setUp(self):
|
||||
self.s = MyClient()
|
||||
@ -148,15 +105,6 @@ class WebMixin(object):
|
||||
port = s.listener._port.getHost().port
|
||||
self.webish_url = "http://localhost:%d" % port
|
||||
|
||||
self.nodes = {} # maps URI to node
|
||||
self.s.my_nodes = self.nodes
|
||||
self.files = {} # maps file URI to contents
|
||||
|
||||
dl = MyDownloader(self.files)
|
||||
dl.setServiceParent(self.s)
|
||||
ul = MyUploader(self.files, self.nodes)
|
||||
ul.setServiceParent(self.s)
|
||||
|
||||
l = [ self.s.create_empty_dirnode() for x in range(6) ]
|
||||
d = defer.DeferredList(l)
|
||||
def _then(res):
|
||||
@ -169,23 +117,26 @@ class WebMixin(object):
|
||||
self._foo_node = foo
|
||||
self._foo_uri = foo.get_uri()
|
||||
self._foo_readonly_uri = foo.get_readonly_uri()
|
||||
self.public_root.set_uri("foo", foo.get_uri()) # ignore the deferred because we know the fake one does this synchronously
|
||||
# NOTE: we ignore the deferred on all set_uri() calls, because we
|
||||
# know the fake nodes do these synchronously
|
||||
self.public_root.set_uri("foo", foo.get_uri())
|
||||
|
||||
self._bar_txt_uri = self.makefile(0)
|
||||
self.BAR_CONTENTS = self.files[self._bar_txt_uri]
|
||||
self.BAR_CONTENTS, n, self._bar_txt_uri = self.makefile(0)
|
||||
foo.set_uri("bar.txt", self._bar_txt_uri)
|
||||
foo.set_uri("empty", res[3][1].get_uri())
|
||||
sub_uri = res[4][1].get_uri()
|
||||
sub = foo.fake_set_uri("sub", sub_uri)
|
||||
foo.set_uri("sub", sub_uri)
|
||||
sub = self.s.create_node_from_uri(sub_uri)
|
||||
|
||||
blocking_uri = self.make_smallfile(1)
|
||||
_ign, n, blocking_uri = self.makefile(1)
|
||||
foo.set_uri("blockingfile", blocking_uri)
|
||||
|
||||
baz_file = self.makefile(2)
|
||||
_ign, n, baz_file = self.makefile(2)
|
||||
sub.set_uri("baz.txt", baz_file)
|
||||
|
||||
self._bad_file_uri = self.makefile(3)
|
||||
del self.files[self._bad_file_uri]
|
||||
_ign, n, self._bad_file_uri = self.makefile(3)
|
||||
# this uri should not be downloadable
|
||||
del FakeCHKFileNode.all_contents[self._bad_file_uri]
|
||||
|
||||
rodir = res[5][1]
|
||||
self.public_root.set_uri("reedownlee", rodir.get_readonly_uri())
|
||||
@ -205,31 +156,9 @@ class WebMixin(object):
|
||||
return d
|
||||
|
||||
def makefile(self, number):
|
||||
n = str(number)
|
||||
|
||||
assert len(n) == 1
|
||||
newuri = uri.CHKFileURI(key="K" + n*15,
|
||||
uri_extension_hash="EH" + n*30,
|
||||
needed_shares=25,
|
||||
total_shares=100,
|
||||
size=123+number).to_string()
|
||||
assert newuri not in self.files
|
||||
node = MyFileNode(newuri, self.s)
|
||||
contents = "contents of file %s\n" % n
|
||||
self.files[newuri] = contents
|
||||
self.nodes[newuri] = node
|
||||
return newuri
|
||||
|
||||
def make_smallfile(self, number):
|
||||
n = str(number)
|
||||
assert len(n) == 1
|
||||
contents = "small data %s\n" % n
|
||||
newuri = uri.LiteralFileURI(contents).to_string()
|
||||
assert newuri not in self.files
|
||||
node = MyFileNode(newuri, self.s)
|
||||
self.files[newuri] = contents
|
||||
self.nodes[newuri] = node
|
||||
return newuri
|
||||
contents = "contents of file %s\n" % number
|
||||
n = create_chk_filenode(self.s, contents)
|
||||
return contents, n, n.get_uri()
|
||||
|
||||
def tearDown(self):
|
||||
return self.s.stopService()
|
||||
@ -251,7 +180,7 @@ class WebMixin(object):
|
||||
self.failUnless(isinstance(data[1], dict))
|
||||
self.failIf("rw_uri" in data[1]) # immutable
|
||||
self.failUnlessEqual(data[1]["ro_uri"], self._bar_txt_uri)
|
||||
self.failUnlessEqual(data[1]["size"], 123)
|
||||
self.failUnlessEqual(data[1]["size"], len(self.BAR_CONTENTS))
|
||||
|
||||
def failUnlessIsFooJSON(self, res):
|
||||
data = self.worlds_cheapest_json_decoder(res)
|
||||
@ -268,7 +197,7 @@ class WebMixin(object):
|
||||
kids = data[1]["children"]
|
||||
self.failUnlessEqual(kids["sub"][0], "dirnode")
|
||||
self.failUnlessEqual(kids["bar.txt"][0], "filenode")
|
||||
self.failUnlessEqual(kids["bar.txt"][1]["size"], 123)
|
||||
self.failUnlessEqual(kids["bar.txt"][1]["size"], len(self.BAR_CONTENTS))
|
||||
self.failUnlessEqual(kids["bar.txt"][1]["ro_uri"], self._bar_txt_uri)
|
||||
|
||||
def GET(self, urlpath, followRedirect=False):
|
||||
@ -469,28 +398,22 @@ class Web(WebMixin, unittest.TestCase):
|
||||
|
||||
def test_PUT_NEWFILEURL(self):
|
||||
d = self.PUT(self.public_url + "/foo/new.txt", self.NEWFILE_CONTENTS)
|
||||
def _check(res):
|
||||
# TODO: we lose the response code, so we can't check this
|
||||
#self.failUnlessEqual(responsecode, 201)
|
||||
self.failUnless(self._foo_node.fake_has_child("new.txt"))
|
||||
new_uri = self._foo_node.fake_get("new.txt").get_uri()
|
||||
new_contents = self.files[new_uri]
|
||||
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
|
||||
self.failUnlessEqual(res.strip(), new_uri)
|
||||
d.addCallback(_check)
|
||||
# TODO: we lose the response code, so we can't check this
|
||||
#self.failUnlessEqual(responsecode, 201)
|
||||
d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "new.txt")
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(self._foo_node, "new.txt",
|
||||
self.NEWFILE_CONTENTS))
|
||||
return d
|
||||
|
||||
def test_PUT_NEWFILEURL_replace(self):
|
||||
d = self.PUT(self.public_url + "/foo/bar.txt", self.NEWFILE_CONTENTS)
|
||||
def _check(res):
|
||||
# TODO: we lose the response code, so we can't check this
|
||||
#self.failUnlessEqual(responsecode, 200)
|
||||
self.failUnless(self._foo_node.fake_has_child("bar.txt"))
|
||||
new_node = self._foo_node.fake_get("bar.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
|
||||
self.failUnlessEqual(res.strip(), new_node.get_uri())
|
||||
d.addCallback(_check)
|
||||
# TODO: we lose the response code, so we can't check this
|
||||
#self.failUnlessEqual(responsecode, 200)
|
||||
d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "bar.txt")
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(self._foo_node, "bar.txt",
|
||||
self.NEWFILE_CONTENTS))
|
||||
return d
|
||||
|
||||
def test_PUT_NEWFILEURL_no_replace(self):
|
||||
@ -504,16 +427,13 @@ class Web(WebMixin, unittest.TestCase):
|
||||
|
||||
def test_PUT_NEWFILEURL_mkdirs(self):
|
||||
d = self.PUT(self.public_url + "/foo/newdir/new.txt", self.NEWFILE_CONTENTS)
|
||||
def _check(res):
|
||||
self.failIf(self._foo_node.fake_has_child("new.txt"))
|
||||
self.failUnless(self._foo_node.fake_has_child("newdir"))
|
||||
newdir_node = self._foo_node.fake_get("newdir")
|
||||
self.failUnless(newdir_node.fake_has_child("new.txt"))
|
||||
new_node = newdir_node.fake_get("new.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
|
||||
self.failUnlessEqual(res.strip(), new_node.get_uri())
|
||||
d.addCallback(_check)
|
||||
fn = self._foo_node
|
||||
d.addCallback(self.failUnlessURIMatchesChild, fn, "newdir/new.txt")
|
||||
d.addCallback(lambda res: self.failIfNodeHasChild(fn, "new.txt"))
|
||||
d.addCallback(lambda res: self.failUnlessNodeHasChild(fn, "newdir"))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(fn, "newdir/new.txt",
|
||||
self.NEWFILE_CONTENTS))
|
||||
return d
|
||||
|
||||
def test_PUT_NEWFILEURL_blocked(self):
|
||||
@ -526,9 +446,8 @@ class Web(WebMixin, unittest.TestCase):
|
||||
|
||||
def test_DELETE_FILEURL(self):
|
||||
d = self.DELETE(self.public_url + "/foo/bar.txt")
|
||||
def _check(res):
|
||||
self.failIf(self._foo_node.fake_has_child("bar.txt"))
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failIfNodeHasChild(self._foo_node, "bar.txt"))
|
||||
return d
|
||||
|
||||
def test_DELETE_FILEURL_missing(self):
|
||||
@ -625,13 +544,10 @@ class Web(WebMixin, unittest.TestCase):
|
||||
f.write(self.NEWFILE_CONTENTS)
|
||||
f.close()
|
||||
d = self.PUT(url, "")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("new.txt"))
|
||||
new_node = self._foo_node.fake_get("new.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
|
||||
self.failUnlessEqual(res.strip(), new_node.get_uri())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "new.txt")
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(self._foo_node, "new.txt",
|
||||
self.NEWFILE_CONTENTS))
|
||||
return d
|
||||
|
||||
def test_PUT_NEWFILEURL_localfile_disabled(self):
|
||||
@ -656,16 +572,16 @@ class Web(WebMixin, unittest.TestCase):
|
||||
f.close()
|
||||
d = self.PUT(self.public_url + "/foo/newdir/new.txt?t=upload&localfile=%s"
|
||||
% localfile, "")
|
||||
def _check(res):
|
||||
self.failIf(self._foo_node.fake_has_child("new.txt"))
|
||||
self.failUnless(self._foo_node.fake_has_child("newdir"))
|
||||
newdir_node = self._foo_node.fake_get("newdir")
|
||||
self.failUnless(newdir_node.fake_has_child("new.txt"))
|
||||
new_node = newdir_node.fake_get("new.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
|
||||
self.failUnlessEqual(res.strip(), new_node.get_uri())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(self.failUnlessURIMatchesChild,
|
||||
self._foo_node, "newdir/new.txt")
|
||||
d.addCallback(lambda res:
|
||||
self.failIfNodeHasChild(self._foo_node, "new.txt"))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self._foo_node, "newdir"))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(self._foo_node,
|
||||
"newdir/new.txt",
|
||||
self.NEWFILE_CONTENTS))
|
||||
return d
|
||||
|
||||
def test_GET_FILEURL_uri(self):
|
||||
@ -695,7 +611,7 @@ class Web(WebMixin, unittest.TestCase):
|
||||
'<a href="[^"]+bar.txt">bar.txt</a>'
|
||||
'</td>'
|
||||
'\s+<td>FILE</td>'
|
||||
'\s+<td>123</td>'
|
||||
'\s+<td>%d</td>' % len(self.BAR_CONTENTS)
|
||||
, res))
|
||||
self.failUnless(re.search(r'<td><a href="sub">sub</a></td>'
|
||||
'\s+<td>DIR</td>', res))
|
||||
@ -747,20 +663,18 @@ class Web(WebMixin, unittest.TestCase):
|
||||
|
||||
def test_PUT_NEWDIRURL(self):
|
||||
d = self.PUT(self.public_url + "/foo/newdir?t=mkdir", "")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("newdir"))
|
||||
newdir_node = self._foo_node.fake_get("newdir")
|
||||
self.failIf(newdir_node.fake_list())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self._foo_node, "newdir"))
|
||||
d.addCallback(lambda res: self._foo_node.get("newdir"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, [])
|
||||
return d
|
||||
|
||||
def test_PUT_NEWDIRURL_replace(self):
|
||||
d = self.PUT(self.public_url + "/foo/sub?t=mkdir", "")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("sub"))
|
||||
newdir_node = self._foo_node.fake_get("sub")
|
||||
self.failIf(newdir_node.fake_list())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self._foo_node, "sub"))
|
||||
d.addCallback(lambda res: self._foo_node.get("sub"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, [])
|
||||
return d
|
||||
|
||||
def test_PUT_NEWDIRURL_no_replace(self):
|
||||
@ -769,38 +683,34 @@ class Web(WebMixin, unittest.TestCase):
|
||||
"409 Conflict",
|
||||
"There was already a child by that name, and you asked me "
|
||||
"to not replace it")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("sub"))
|
||||
newdir_node = self._foo_node.fake_get("sub")
|
||||
self.failUnlessEqual(newdir_node.fake_list().keys(), ["baz.txt"])
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self._foo_node, "sub"))
|
||||
d.addCallback(lambda res: self._foo_node.get("sub"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, ["baz.txt"])
|
||||
return d
|
||||
|
||||
def test_PUT_NEWDIRURL_mkdirs(self):
|
||||
d = self.PUT(self.public_url + "/foo/subdir/newdir?t=mkdir", "")
|
||||
def _check(res):
|
||||
self.failIf(self._foo_node.fake_has_child("newdir"))
|
||||
self.failUnless(self._foo_node.fake_has_child("subdir"))
|
||||
subdir_node = self._foo_node.fake_get("subdir")
|
||||
self.failUnless(subdir_node.fake_has_child("newdir"))
|
||||
newdir_node = subdir_node.fake_get("newdir")
|
||||
self.failIf(newdir_node.fake_list())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failIfNodeHasChild(self._foo_node, "newdir"))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self._foo_node, "subdir"))
|
||||
d.addCallback(lambda res:
|
||||
self._foo_node.get_child_at_path("subdir/newdir"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, [])
|
||||
return d
|
||||
|
||||
def test_DELETE_DIRURL(self):
|
||||
d = self.DELETE(self.public_url + "/foo")
|
||||
def _check(res):
|
||||
self.failIf(self.public_root.fake_has_child("foo"))
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failIfNodeHasChild(self.public_root, "foo"))
|
||||
return d
|
||||
|
||||
def test_DELETE_DIRURL_missing(self):
|
||||
d = self.DELETE(self.public_url + "/foo/missing")
|
||||
d.addBoth(self.should404, "test_DELETE_DIRURL_missing")
|
||||
def _check(res):
|
||||
self.failUnless(self.public_root.fake_has_child("foo"))
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self.public_root, "foo"))
|
||||
return d
|
||||
|
||||
def test_DELETE_DIRURL_missing2(self):
|
||||
@ -896,6 +806,47 @@ class Web(WebMixin, unittest.TestCase):
|
||||
for path,node in self.walk_mynodes(self.public_root):
|
||||
print path
|
||||
|
||||
def failUnlessNodeKeysAre(self, node, expected_keys):
|
||||
d = node.list()
|
||||
def _check(children):
|
||||
self.failUnlessEqual(sorted(children.keys()), sorted(expected_keys))
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
def failUnlessNodeHasChild(self, node, name):
|
||||
d = node.list()
|
||||
def _check(children):
|
||||
self.failUnless(name in children)
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
def failIfNodeHasChild(self, node, name):
|
||||
d = node.list()
|
||||
def _check(children):
|
||||
self.failIf(name in children)
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
def failUnlessChildContentsAre(self, node, name, expected_contents):
|
||||
d = node.get_child_at_path(name)
|
||||
d.addCallback(lambda node: node.download_to_data())
|
||||
def _check(contents):
|
||||
self.failUnlessEqual(contents, expected_contents)
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
def failUnlessChildURIIs(self, node, name, expected_uri):
|
||||
d = node.get_child_at_path(name)
|
||||
def _check(child):
|
||||
self.failUnlessEqual(child.get_uri(), expected_uri.strip())
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
def failUnlessURIMatchesChild(self, got_uri, node, name):
|
||||
d = node.get_child_at_path(name)
|
||||
def _check(child):
|
||||
self.failUnlessEqual(got_uri.strip(), child.get_uri())
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
def test_PUT_NEWDIRURL_localdir(self):
|
||||
localdir = os.path.abspath("web/PUT_NEWDIRURL_localdir")
|
||||
# create some files there
|
||||
@ -909,21 +860,20 @@ class Web(WebMixin, unittest.TestCase):
|
||||
|
||||
d = self.PUT(self.public_url + "/newdir?t=upload&localdir=%s"
|
||||
% localdir, "")
|
||||
def _check(res):
|
||||
self.failUnless(self.public_root.fake_has_child("newdir"))
|
||||
newnode = self.public_root.fake_get("newdir")
|
||||
self.failUnlessEqual(sorted(newnode.fake_list().keys()),
|
||||
sorted(["one", "two", "three", "zap.zip"]))
|
||||
onenode = newnode.fake_get("one")
|
||||
self.failUnlessEqual(sorted(onenode.fake_list().keys()),
|
||||
sorted(["sub"]))
|
||||
threenode = newnode.fake_get("three")
|
||||
self.failUnlessEqual(sorted(threenode.fake_list().keys()),
|
||||
sorted(["foo.txt", "bar.txt"]))
|
||||
barnode = threenode.fake_get("bar.txt")
|
||||
contents = self.files[barnode.get_uri()]
|
||||
self.failUnlessEqual(contents, "contents of three/bar.txt\n")
|
||||
d.addCallback(_check)
|
||||
pr = self.public_root
|
||||
d.addCallback(lambda res: self.failUnlessNodeHasChild(pr, "newdir"))
|
||||
d.addCallback(lambda res: pr.get("newdir"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre,
|
||||
["one", "two", "three", "zap.zip"])
|
||||
d.addCallback(lambda res: pr.get_child_at_path("newdir/one"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, ["sub"])
|
||||
d.addCallback(lambda res: pr.get_child_at_path("newdir/three"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, ["foo.txt", "bar.txt"])
|
||||
d.addCallback(lambda res: pr.get_child_at_path("newdir/three/bar.txt"))
|
||||
d.addCallback(lambda barnode: barnode.download_to_data())
|
||||
d.addCallback(lambda contents:
|
||||
self.failUnlessEqual(contents,
|
||||
"contents of three/bar.txt\n"))
|
||||
return d
|
||||
|
||||
def test_PUT_NEWDIRURL_localdir_disabled(self):
|
||||
@ -959,47 +909,41 @@ class Web(WebMixin, unittest.TestCase):
|
||||
d = self.PUT(self.public_url + "/foo/subdir/newdir?t=upload&localdir=%s"
|
||||
% localdir,
|
||||
"")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("subdir"))
|
||||
subnode = self._foo_node.fake_get("subdir")
|
||||
self.failUnless(subnode.fake_has_child("newdir"))
|
||||
newnode = subnode.fake_get("newdir")
|
||||
self.failUnlessEqual(sorted(newnode.fake_list()),
|
||||
sorted(["one", "two", "three", "zap.zip"]))
|
||||
onenode = newnode.fake_get("one")
|
||||
self.failUnlessEqual(sorted(onenode.fake_list()),
|
||||
sorted(["sub"]))
|
||||
threenode = newnode.fake_get("three")
|
||||
self.failUnlessEqual(sorted(threenode.fake_list()),
|
||||
sorted(["foo.txt", "bar.txt"]))
|
||||
barnode = threenode.fake_get("bar.txt")
|
||||
contents = self.files[barnode.get_uri()]
|
||||
self.failUnlessEqual(contents, "contents of three/bar.txt\n")
|
||||
d.addCallback(_check)
|
||||
fn = self._foo_node
|
||||
d.addCallback(lambda res: self.failUnlessNodeHasChild(fn, "subdir"))
|
||||
d.addCallback(lambda res: fn.get_child_at_path("subdir/newdir"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre,
|
||||
["one", "two", "three", "zap.zip"])
|
||||
d.addCallback(lambda res: fn.get_child_at_path("subdir/newdir/one"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, ["sub"])
|
||||
d.addCallback(lambda res: fn.get_child_at_path("subdir/newdir/three"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, ["foo.txt", "bar.txt"])
|
||||
d.addCallback(lambda res:
|
||||
fn.get_child_at_path("subdir/newdir/three/bar.txt"))
|
||||
d.addCallback(lambda barnode: barnode.download_to_data())
|
||||
d.addCallback(lambda contents:
|
||||
self.failUnlessEqual(contents,
|
||||
"contents of three/bar.txt\n"))
|
||||
return d
|
||||
|
||||
def test_POST_upload(self):
|
||||
d = self.POST(self.public_url + "/foo", t="upload",
|
||||
file=("new.txt", self.NEWFILE_CONTENTS))
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("new.txt"))
|
||||
new_node = self._foo_node.fake_get("new.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
|
||||
self.failUnlessEqual(res.strip(), new_node.get_uri())
|
||||
d.addCallback(_check)
|
||||
fn = self._foo_node
|
||||
d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(fn, "new.txt",
|
||||
self.NEWFILE_CONTENTS))
|
||||
return d
|
||||
|
||||
def test_POST_upload_replace(self):
|
||||
d = self.POST(self.public_url + "/foo", t="upload",
|
||||
file=("bar.txt", self.NEWFILE_CONTENTS))
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("bar.txt"))
|
||||
new_node = self._foo_node.fake_get("bar.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
|
||||
self.failUnlessEqual(res.strip(), new_node.get_uri())
|
||||
d.addCallback(_check)
|
||||
fn = self._foo_node
|
||||
d.addCallback(self.failUnlessURIMatchesChild, fn, "bar.txt")
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(fn, "bar.txt",
|
||||
self.NEWFILE_CONTENTS))
|
||||
return d
|
||||
|
||||
def test_POST_upload_no_replace_queryarg(self):
|
||||
@ -1029,24 +973,20 @@ class Web(WebMixin, unittest.TestCase):
|
||||
d = self.POST(self.public_url + "/foo", t="upload", when_done="/THERE",
|
||||
file=("new.txt", self.NEWFILE_CONTENTS))
|
||||
d.addBoth(self.shouldRedirect, "/THERE")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("new.txt"))
|
||||
new_node = self._foo_node.fake_get("new.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
|
||||
d.addCallback(_check)
|
||||
fn = self._foo_node
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(fn, "new.txt",
|
||||
self.NEWFILE_CONTENTS))
|
||||
return d
|
||||
|
||||
def test_POST_upload_named(self):
|
||||
fn = self._foo_node
|
||||
d = self.POST(self.public_url + "/foo", t="upload",
|
||||
name="new.txt", file=self.NEWFILE_CONTENTS)
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("new.txt"))
|
||||
new_node = self._foo_node.fake_get("new.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.NEWFILE_CONTENTS)
|
||||
self.failUnlessEqual(res.strip(), new_node.get_uri())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(fn, "new.txt",
|
||||
self.NEWFILE_CONTENTS))
|
||||
return d
|
||||
|
||||
def test_POST_upload_named_badfilename(self):
|
||||
@ -1057,31 +997,23 @@ class Web(WebMixin, unittest.TestCase):
|
||||
"400 Bad Request",
|
||||
"name= may not contain a slash",
|
||||
)
|
||||
def _check(res):
|
||||
# make sure that nothing was added
|
||||
kids = sorted(self._foo_node.fake_list())
|
||||
self.failUnlessEqual(sorted(["bar.txt", "blockingfile",
|
||||
"empty", "sub"]),
|
||||
kids)
|
||||
d.addCallback(_check)
|
||||
# make sure that nothing was added
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeKeysAre(self._foo_node,
|
||||
["bar.txt", "blockingfile",
|
||||
"empty", "sub"]))
|
||||
return d
|
||||
|
||||
def test_POST_mkdir(self): # return value?
|
||||
d = self.POST(self.public_url + "/foo", t="mkdir", name="newdir")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("newdir"))
|
||||
newdir_node = self._foo_node.fake_get("newdir")
|
||||
self.failIf(newdir_node.fake_list())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res: self._foo_node.get("newdir"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, [])
|
||||
return d
|
||||
|
||||
def test_POST_mkdir_replace(self): # return value?
|
||||
d = self.POST(self.public_url + "/foo", t="mkdir", name="sub")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("sub"))
|
||||
newdir_node = self._foo_node.fake_get("sub")
|
||||
self.failIf(newdir_node.fake_list())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res: self._foo_node.get("sub"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, [])
|
||||
return d
|
||||
|
||||
def test_POST_mkdir_no_replace_queryarg(self): # return value?
|
||||
@ -1091,11 +1023,8 @@ class Web(WebMixin, unittest.TestCase):
|
||||
"409 Conflict",
|
||||
"There was already a child by that name, and you asked me "
|
||||
"to not replace it")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("sub"))
|
||||
newdir_node = self._foo_node.fake_get("sub")
|
||||
self.failUnlessEqual(newdir_node.fake_list().keys(), ["baz.txt"])
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res: self._foo_node.get("sub"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, ["baz.txt"])
|
||||
return d
|
||||
|
||||
def test_POST_mkdir_no_replace_field(self): # return value?
|
||||
@ -1105,64 +1034,46 @@ class Web(WebMixin, unittest.TestCase):
|
||||
"409 Conflict",
|
||||
"There was already a child by that name, and you asked me "
|
||||
"to not replace it")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("sub"))
|
||||
newdir_node = self._foo_node.fake_get("sub")
|
||||
self.failUnlessEqual(newdir_node.fake_list().keys(), ["baz.txt"])
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res: self._foo_node.get("sub"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, ["baz.txt"])
|
||||
return d
|
||||
|
||||
def test_POST_mkdir_whendone_field(self):
|
||||
d = self.POST(self.public_url + "/foo",
|
||||
t="mkdir", name="newdir", when_done="/THERE")
|
||||
d.addBoth(self.shouldRedirect, "/THERE")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("newdir"))
|
||||
newdir_node = self._foo_node.fake_get("newdir")
|
||||
self.failIf(newdir_node.fake_list())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res: self._foo_node.get("newdir"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, [])
|
||||
return d
|
||||
|
||||
def test_POST_mkdir_whendone_queryarg(self):
|
||||
d = self.POST(self.public_url + "/foo?when_done=/THERE",
|
||||
t="mkdir", name="newdir")
|
||||
d.addBoth(self.shouldRedirect, "/THERE")
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("newdir"))
|
||||
newdir_node = self._foo_node.fake_get("newdir")
|
||||
self.failIf(newdir_node.fake_list())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res: self._foo_node.get("newdir"))
|
||||
d.addCallback(self.failUnlessNodeKeysAre, [])
|
||||
return d
|
||||
|
||||
def test_POST_put_uri(self):
|
||||
newuri = self.makefile(8)
|
||||
contents = self.files[newuri]
|
||||
contents, n, newuri = self.makefile(8)
|
||||
d = self.POST(self.public_url + "/foo", t="uri", name="new.txt", uri=newuri)
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("new.txt"))
|
||||
new_node = self._foo_node.fake_get("new.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, contents)
|
||||
self.failUnlessEqual(res.strip(), new_node.get_uri())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "new.txt")
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(self._foo_node, "new.txt",
|
||||
contents))
|
||||
return d
|
||||
|
||||
def test_POST_put_uri_replace(self):
|
||||
newuri = self.makefile(8)
|
||||
contents = self.files[newuri]
|
||||
contents, n, newuri = self.makefile(8)
|
||||
d = self.POST(self.public_url + "/foo", t="uri", name="bar.txt", uri=newuri)
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("bar.txt"))
|
||||
new_node = self._foo_node.fake_get("bar.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, contents)
|
||||
self.failUnlessEqual(res.strip(), new_node.get_uri())
|
||||
d.addCallback(_check)
|
||||
d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "bar.txt")
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(self._foo_node, "bar.txt",
|
||||
contents))
|
||||
return d
|
||||
|
||||
def test_POST_put_uri_no_replace_queryarg(self):
|
||||
newuri = self.makefile(8)
|
||||
contents = self.files[newuri]
|
||||
contents, n, newuri = self.makefile(8)
|
||||
d = self.POST(self.public_url + "/foo?replace=false", t="uri",
|
||||
name="bar.txt", uri=newuri)
|
||||
d.addBoth(self.shouldFail, error.Error,
|
||||
@ -1175,8 +1086,7 @@ class Web(WebMixin, unittest.TestCase):
|
||||
return d
|
||||
|
||||
def test_POST_put_uri_no_replace_field(self):
|
||||
newuri = self.makefile(8)
|
||||
contents = self.files[newuri]
|
||||
contents, n, newuri = self.makefile(8)
|
||||
d = self.POST(self.public_url + "/foo", t="uri", replace="false",
|
||||
name="bar.txt", uri=newuri)
|
||||
d.addBoth(self.shouldFail, error.Error,
|
||||
@ -1190,18 +1100,19 @@ class Web(WebMixin, unittest.TestCase):
|
||||
|
||||
def test_POST_delete(self):
|
||||
d = self.POST(self.public_url + "/foo", t="delete", name="bar.txt")
|
||||
def _check(res):
|
||||
self.failIf(self._foo_node.fake_has_child("bar.txt"))
|
||||
d.addCallback(lambda res: self._foo_node.list())
|
||||
def _check(children):
|
||||
self.failIf("bar.txt" in children)
|
||||
d.addCallback(_check)
|
||||
return d
|
||||
|
||||
def test_POST_rename_file(self):
|
||||
d = self.POST(self.public_url + "/foo", t="rename",
|
||||
from_name="bar.txt", to_name='wibble.txt')
|
||||
def _check(res):
|
||||
self.failIf(self._foo_node.fake_has_child("bar.txt"))
|
||||
self.failUnless(self._foo_node.fake_has_child("wibble.txt"))
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failIfNodeHasChild(self._foo_node, "bar.txt"))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self._foo_node, "wibble.txt"))
|
||||
d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt"))
|
||||
d.addCallback(self.failUnlessIsBarDotTxt)
|
||||
d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt?t=json"))
|
||||
@ -1212,10 +1123,10 @@ class Web(WebMixin, unittest.TestCase):
|
||||
# rename a file and replace a directory with it
|
||||
d = self.POST(self.public_url + "/foo", t="rename",
|
||||
from_name="bar.txt", to_name='empty')
|
||||
def _check(res):
|
||||
self.failIf(self._foo_node.fake_has_child("bar.txt"))
|
||||
self.failUnless(self._foo_node.fake_has_child("empty"))
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failIfNodeHasChild(self._foo_node, "bar.txt"))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self._foo_node, "empty"))
|
||||
d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty"))
|
||||
d.addCallback(self.failUnlessIsBarDotTxt)
|
||||
d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
|
||||
@ -1261,9 +1172,8 @@ class Web(WebMixin, unittest.TestCase):
|
||||
"400 Bad Request",
|
||||
"to_name= may not contain a slash",
|
||||
)
|
||||
def _check1(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("bar.txt"))
|
||||
d.addCallback(_check1)
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self._foo_node, "bar.txt"))
|
||||
d.addCallback(lambda res: self.POST(self.public_url, t="rename",
|
||||
from_name="foo/bar.txt", to_name='george.txt'))
|
||||
d.addBoth(self.shouldFail, error.Error,
|
||||
@ -1271,11 +1181,12 @@ class Web(WebMixin, unittest.TestCase):
|
||||
"400 Bad Request",
|
||||
"from_name= may not contain a slash",
|
||||
)
|
||||
def _check2(res):
|
||||
self.failUnless(self.public_root.fake_has_child("foo"))
|
||||
self.failIf(self.public_root.fake_has_child("george.txt"))
|
||||
self.failUnless(self._foo_node.fake_has_child("bar.txt"))
|
||||
d.addCallback(_check2)
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self.public_root, "foo"))
|
||||
d.addCallback(lambda res:
|
||||
self.failIfNodeHasChild(self.public_root, "george.txt"))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self._foo_node, "bar.txt"))
|
||||
d.addCallback(lambda res: self.GET(self.public_url + "/foo?t=json"))
|
||||
d.addCallback(self.failUnlessIsFooJSON)
|
||||
return d
|
||||
@ -1283,10 +1194,10 @@ class Web(WebMixin, unittest.TestCase):
|
||||
def test_POST_rename_dir(self):
|
||||
d = self.POST(self.public_url, t="rename",
|
||||
from_name="foo", to_name='plunk')
|
||||
def _check(res):
|
||||
self.failIf(self.public_root.fake_has_child("foo"))
|
||||
self.failUnless(self.public_root.fake_has_child("plunk"))
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res:
|
||||
self.failIfNodeHasChild(self.public_root, "foo"))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessNodeHasChild(self.public_root, "plunk"))
|
||||
d.addCallback(lambda res: self.GET(self.public_url + "/plunk?t=json"))
|
||||
d.addCallback(self.failUnlessIsFooJSON)
|
||||
return d
|
||||
@ -1364,31 +1275,25 @@ class Web(WebMixin, unittest.TestCase):
|
||||
return d
|
||||
|
||||
def test_PUT_NEWFILEURL_uri(self):
|
||||
new_uri = self.makefile(8)
|
||||
contents, n, new_uri = self.makefile(8)
|
||||
d = self.PUT(self.public_url + "/foo/new.txt?t=uri", new_uri)
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("new.txt"))
|
||||
new_node = self._foo_node.fake_get("new.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.files[new_uri])
|
||||
self.failUnlessEqual(res.strip(), new_uri)
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res: self.failUnlessEqual(res.strip(), new_uri))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(self._foo_node, "new.txt",
|
||||
contents))
|
||||
return d
|
||||
|
||||
def test_PUT_NEWFILEURL_uri_replace(self):
|
||||
new_uri = self.makefile(8)
|
||||
contents, n, new_uri = self.makefile(8)
|
||||
d = self.PUT(self.public_url + "/foo/bar.txt?t=uri", new_uri)
|
||||
def _check(res):
|
||||
self.failUnless(self._foo_node.fake_has_child("bar.txt"))
|
||||
new_node = self._foo_node.fake_get("bar.txt")
|
||||
new_contents = self.files[new_node.get_uri()]
|
||||
self.failUnlessEqual(new_contents, self.files[new_uri])
|
||||
self.failUnlessEqual(res.strip(), new_uri)
|
||||
d.addCallback(_check)
|
||||
d.addCallback(lambda res: self.failUnlessEqual(res.strip(), new_uri))
|
||||
d.addCallback(lambda res:
|
||||
self.failUnlessChildContentsAre(self._foo_node, "bar.txt",
|
||||
contents))
|
||||
return d
|
||||
|
||||
def test_PUT_NEWFILEURL_uri_no_replace(self):
|
||||
new_uri = self.makefile(8)
|
||||
contents, n, new_uri = self.makefile(8)
|
||||
d = self.PUT(self.public_url + "/foo/bar.txt?t=uri&replace=false", new_uri)
|
||||
d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_uri_no_replace",
|
||||
"409 Conflict",
|
||||
@ -1400,8 +1305,9 @@ class Web(WebMixin, unittest.TestCase):
|
||||
file_contents = "New file contents here\n"
|
||||
d = self.PUT("/uri", file_contents)
|
||||
def _check(uri):
|
||||
self.failUnless(uri in self.files)
|
||||
self.failUnlessEqual(self.files[uri], file_contents)
|
||||
self.failUnless(uri in FakeCHKFileNode.all_contents)
|
||||
self.failUnlessEqual(FakeCHKFileNode.all_contents[uri],
|
||||
file_contents)
|
||||
return self.GET("/uri/%s" % uri.replace("/","!"))
|
||||
d.addCallback(_check)
|
||||
def _check2(res):
|
||||
@ -1420,9 +1326,11 @@ class Web(WebMixin, unittest.TestCase):
|
||||
def test_PUT_NEWDIR_URI(self):
|
||||
d = self.PUT("/uri?t=mkdir", "")
|
||||
def _check(uri):
|
||||
self.failUnless(uri in self.nodes)
|
||||
self.failUnless(isinstance(self.nodes[uri], FakeDirectoryNode))
|
||||
return self.GET("/uri/%s?t=json" % uri.replace("/","!"))
|
||||
n = self.s.create_node_from_uri(uri.strip())
|
||||
d2 = self.failUnlessNodeKeysAre(n, [])
|
||||
d2.addCallback(lambda res:
|
||||
self.GET("/uri/%s?t=json" % uri.replace("/","!")))
|
||||
return d2
|
||||
d.addCallback(_check)
|
||||
d.addCallback(self.failUnlessIsEmptyJSON)
|
||||
return d
|
||||
|
Loading…
Reference in New Issue
Block a user