mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-15 22:56:41 +00:00
filetree: add vdrive upload/download test, change workqueue relative-filename semantics
This commit is contained in:
parent
5453e0f022
commit
9dc1c0cfc0
src/allmydata
@ -1,4 +1,5 @@
|
||||
|
||||
import os.path
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from allmydata.filetree import directory, file, redirect
|
||||
@ -82,6 +83,7 @@ class VirtualDrive(object):
|
||||
self.workqueue = workqueue
|
||||
workqueue.set_vdrive(self)
|
||||
workqueue.set_uploader(uploader)
|
||||
self._downloader = downloader
|
||||
# TODO: queen?
|
||||
self.queen = None
|
||||
self.root_node = root_node
|
||||
@ -225,11 +227,21 @@ class VirtualDrive(object):
|
||||
return d
|
||||
|
||||
def download(self, path, target):
|
||||
# TODO: does this mean download it right now? or schedule it in the
|
||||
# workqueue for eventual download? should we add download steps to
|
||||
# the workqueue?
|
||||
assert isinstance(path, list)
|
||||
d = self._get_file_uri(path)
|
||||
d.addCallback(lambda uri: self.downloader.download(uri, target))
|
||||
d.addCallback(lambda uri: self._downloader.download(uri, target))
|
||||
return d
|
||||
|
||||
def download_as_data(self, path):
|
||||
# TODO: this is kind of goofy.. think of a better download API that
|
||||
# is appropriate for this class
|
||||
from allmydata import download
|
||||
target = download.Data()
|
||||
return self.download(path, target)
|
||||
|
||||
def upload_now(self, path, uploadable):
|
||||
assert isinstance(path, list)
|
||||
# note: the first few steps of this do not use the workqueue, but I
|
||||
@ -251,6 +263,7 @@ class VirtualDrive(object):
|
||||
|
||||
def upload_later(self, path, filename):
|
||||
assert isinstance(path, list)
|
||||
filename = os.path.abspath(filename)
|
||||
boxname = self.workqueue.create_boxname()
|
||||
self.workqueue.add_upload_chk(filename, boxname)
|
||||
self.workqueue.add_addpath(boxname, path)
|
||||
|
@ -276,6 +276,9 @@ class IWorkQueue(Interface):
|
||||
mode). This URI includes unlink rights. It does not mark the file for
|
||||
retention.
|
||||
|
||||
Non-absolute filenames are interpreted relative to the workqueue's
|
||||
special just-for-tempfiles directory.
|
||||
|
||||
When the upload is complete, the resulting URI is stashed in a 'box'
|
||||
with the specified name. This is basically a local variable. A later
|
||||
'add_subpath' step will reference this boxname and retrieve the URI.
|
||||
|
@ -332,24 +332,46 @@ class InPairs(unittest.TestCase):
|
||||
pairs = list(directory.in_pairs(l))
|
||||
self.failUnlessEqual(pairs, [(0,1), (2,3), (4,5), (6,7)])
|
||||
|
||||
class StubDownloader(object):
|
||||
implements(IDownloader)
|
||||
class FakeMesh(object):
|
||||
implements(IDownloader, IUploader)
|
||||
|
||||
class StubUploader(object):
|
||||
implements(IUploader)
|
||||
def __init__(self):
|
||||
self.files = {}
|
||||
def upload_filename(self, filename):
|
||||
uri = "stub-uri-%d" % len(self.files)
|
||||
data = open(filename,"r").read()
|
||||
self.files[uri] = data
|
||||
return defer.succeed(uri)
|
||||
def download(self, uri, target):
|
||||
target.open()
|
||||
target.write(self.files[uri])
|
||||
target.close()
|
||||
return defer.maybeDeferred(target.finish)
|
||||
|
||||
class Stuff(unittest.TestCase):
|
||||
|
||||
class VDrive(unittest.TestCase):
|
||||
|
||||
def makeVirtualDrive(self, basedir, root_node=None):
|
||||
wq = workqueue.WorkQueue(os.path.join(basedir, "1.workqueue"))
|
||||
dl = StubDownloader()
|
||||
ul = StubUploader()
|
||||
dl = ul = FakeMesh()
|
||||
if not root_node:
|
||||
root_node = directory.LocalFileSubTreeNode()
|
||||
root_node.new("rootdirtree.save")
|
||||
v = vdrive.VirtualDrive(wq, dl, ul, root_node)
|
||||
return v
|
||||
|
||||
def makeLocalTree(self, basename):
|
||||
# create a LocalFileRedirection pointing at a LocalFileSubTree.
|
||||
# Returns a VirtualDrive instance.
|
||||
topdir = directory.LocalFileSubTree().new("%s-dirtree.save" % basename)
|
||||
topdir.update_now(None)
|
||||
root = redirect.LocalFileRedirection().new("%s-root" % basename,
|
||||
topdir.create_node_now())
|
||||
root.update_now(None)
|
||||
v = self.makeVirtualDrive("%s-vdrive" % basename,
|
||||
root.create_node_now())
|
||||
return v
|
||||
|
||||
def failUnlessListsAreEqual(self, list1, list2):
|
||||
self.failUnlessEqual(sorted(list1), sorted(list2))
|
||||
|
||||
@ -359,7 +381,7 @@ class Stuff(unittest.TestCase):
|
||||
self.failUnlessEqual(c1a, c2a)
|
||||
|
||||
def testDirectory(self):
|
||||
stm = vdrive.SubTreeMaker(None, StubDownloader())
|
||||
stm = vdrive.SubTreeMaker(None, FakeMesh())
|
||||
|
||||
# create an empty directory (stored locally)
|
||||
subtree = directory.LocalFileSubTree()
|
||||
@ -453,12 +475,7 @@ class Stuff(unittest.TestCase):
|
||||
(which, expected_failure, res))
|
||||
|
||||
def testVdrive(self):
|
||||
topdir = directory.LocalFileSubTree().new("vdrive-dirtree.save")
|
||||
topdir.update_now(None)
|
||||
root = redirect.LocalFileRedirection().new("vdrive-root",
|
||||
topdir.create_node_now())
|
||||
root.update_now(None)
|
||||
v = self.makeVirtualDrive("vdrive", root.create_node_now())
|
||||
v = self.makeLocalTree("vdrive")
|
||||
|
||||
d = v.list([])
|
||||
def _listed(contents):
|
||||
@ -500,3 +517,31 @@ class Stuff(unittest.TestCase):
|
||||
|
||||
return d
|
||||
|
||||
def testUpload(self):
|
||||
v = self.makeLocalTree("upload")
|
||||
filename = "upload1"
|
||||
DATA = "here is some data\n"
|
||||
f = open(filename, "w")
|
||||
f.write(DATA)
|
||||
f.close()
|
||||
|
||||
rc = v.upload_later(["a","b","upload1"], filename)
|
||||
self.failUnlessIdentical(rc, None)
|
||||
|
||||
d = v.workqueue.flush()
|
||||
|
||||
d.addCallback(lambda res: v.list([]))
|
||||
d.addCallback(lambda contents:
|
||||
self.failUnlessListsAreEqual(contents.keys(), ["a"]))
|
||||
d.addCallback(lambda res: v.list(["a"]))
|
||||
d.addCallback(lambda contents:
|
||||
self.failUnlessListsAreEqual(contents.keys(), ["b"]))
|
||||
d.addCallback(lambda res: v.list(["a","b"]))
|
||||
d.addCallback(lambda contents:
|
||||
self.failUnlessListsAreEqual(contents.keys(),
|
||||
["upload1"]))
|
||||
d.addCallback(lambda res: v.download_as_data(["a","b","upload1"]))
|
||||
d.addCallback(self.failUnlessEqual, DATA)
|
||||
|
||||
return d
|
||||
|
||||
|
@ -4,6 +4,7 @@ from twisted.trial import unittest
|
||||
from twisted.internet import defer
|
||||
from allmydata import workqueue
|
||||
from allmydata.util import idlib
|
||||
from allmydata.filetree.file import CHKFileNode
|
||||
|
||||
class FakeWorkQueue(workqueue.WorkQueue):
|
||||
|
||||
@ -65,8 +66,10 @@ class Items(unittest.TestCase):
|
||||
def testBox(self):
|
||||
wq = self.wq("testBox")
|
||||
boxname = wq.create_boxname()
|
||||
wq.write_to_box(boxname, "contents of box")
|
||||
self.failUnlessEqual(wq.read_from_box(boxname), "contents of box")
|
||||
wq.write_to_box(boxname, CHKFileNode().new("uri goes here"))
|
||||
out = wq.read_from_box(boxname)
|
||||
self.failUnless(isinstance(out, CHKFileNode))
|
||||
self.failUnlessEqual(out.get_uri(), "uri goes here")
|
||||
|
||||
def testCHK(self):
|
||||
wq = self.wq("testCHK")
|
||||
@ -149,7 +152,7 @@ class Items(unittest.TestCase):
|
||||
self.failUnless(os.path.exists(tmpfilename))
|
||||
# likewise this unreferenced box should get deleted
|
||||
boxname = wq.create_boxname()
|
||||
wq.write_to_box(boxname, "contents of box")
|
||||
wq.write_to_box(boxname, CHKFileNode().new("uri here"))
|
||||
boxfile = os.path.join(wq.boxesdir, boxname)
|
||||
self.failUnless(os.path.exists(boxfile))
|
||||
|
||||
|
@ -1,12 +1,13 @@
|
||||
|
||||
import os, shutil, sha
|
||||
from zope.interface import Interface, implements
|
||||
from zope.interface import implements
|
||||
from twisted.internet import defer
|
||||
from allmydata.util import bencode
|
||||
from allmydata.util.idlib import b2a
|
||||
from allmydata.Crypto.Cipher import AES
|
||||
from allmydata.filetree.nodemaker import NodeMaker
|
||||
from allmydata.filetree.interfaces import INode
|
||||
from allmydata.filetree.file import CHKFileNode
|
||||
from allmydata.interfaces import IWorkQueue, NotCapableError, IUploader
|
||||
|
||||
|
||||
@ -101,20 +102,21 @@ class WorkQueue(object):
|
||||
def create_boxname(self, contents=None):
|
||||
boxname = b2a(os.urandom(10))
|
||||
if contents is not None:
|
||||
assert INode(contents)
|
||||
self.write_to_box(boxname, contents.serialize_node())
|
||||
self.write_to_box(boxname, contents)
|
||||
return boxname
|
||||
def write_to_box(self, boxname, data):
|
||||
def write_to_box(self, boxname, contents):
|
||||
assert INode(contents)
|
||||
f = open(os.path.join(self.boxesdir, boxname), "w")
|
||||
f.write(data)
|
||||
f.write(contents.serialize_node())
|
||||
f.flush()
|
||||
os.fsync(f)
|
||||
f.close()
|
||||
def read_from_box(self, boxname):
|
||||
f = open(os.path.join(self.boxesdir, boxname), "r")
|
||||
data = f.read()
|
||||
node = self._node_maker.make_node_from_serialized(data)
|
||||
f.close()
|
||||
return data
|
||||
return node
|
||||
|
||||
def _create_step(self, end, lines):
|
||||
assert end in ("first", "last")
|
||||
@ -139,8 +141,10 @@ class WorkQueue(object):
|
||||
|
||||
# methods to add entries to the queue
|
||||
def add_upload_chk(self, source_filename, stash_uri_in_boxname):
|
||||
# source_filename is absolute, and can point to things outside our
|
||||
# workqueue.
|
||||
# If source_filename is absolute, it will point to something outside
|
||||
# of our workqueue (this is how user files are uploaded). If it is
|
||||
# relative, it points to something inside self.filesdir (this is how
|
||||
# serialized directories and tempfiles are uploaded)
|
||||
lines = ["upload_chk", source_filename, stash_uri_in_boxname]
|
||||
self._create_step_first(lines)
|
||||
|
||||
@ -277,15 +281,25 @@ class WorkQueue(object):
|
||||
# dispatch method here and an add_ method above.
|
||||
|
||||
|
||||
def step_upload_chk(self, source_filename, index_a, write_key_a):
|
||||
pass
|
||||
def step_upload_chk(self, source_filename, stash_uri_in_boxname):
|
||||
# we use relative filenames for tempfiles created by
|
||||
# workqueue.create_tempfile, and absolute filenames for everything
|
||||
# that comes from the vdrive. That means using os.path.abspath() on
|
||||
# user files in VirtualDrive methods.
|
||||
filename = os.path.join(self.filesdir, source_filename)
|
||||
d = self._uploader.upload_filename(filename)
|
||||
def _uploaded(uri):
|
||||
node = CHKFileNode().new(uri)
|
||||
self.write_to_box(stash_uri_in_boxname, node)
|
||||
d.addCallback(_uploaded)
|
||||
return d
|
||||
|
||||
def step_upload_ssk(self, source_filename, index_a, write_key_a, prev_ver):
|
||||
pass
|
||||
|
||||
def step_addpath(self, boxname, *path):
|
||||
path = list(path)
|
||||
data = self.read_from_box(boxname)
|
||||
child_node = self._node_maker.make_node_from_serialized(data)
|
||||
child_node = self.read_from_box(boxname)
|
||||
return self.vdrive.add(path, child_node)
|
||||
|
||||
def step_retain_ssk(self, index_a, read_key_a):
|
||||
|
Loading…
x
Reference in New Issue
Block a user