more filetree, workqueue-boxes now hold serialized Nodes, move NodeMaker out to a separate module

This commit is contained in:
Brian Warner 2007-01-21 04:18:54 -07:00
parent 324033c9e2
commit 7b8c524d7c
7 changed files with 147 additions and 51 deletions

View File

@ -10,6 +10,7 @@ class CHKFileNode(BaseDataNode):
def new(self, uri):
self.uri = uri
return self
def get_base_data(self):
return self.uri

View File

@ -245,11 +245,11 @@ class IVirtualDrive(Interface):
path[:-1] must refer to a writable DIRECTORY node. 'uploadable' must
implement IUploadable. This returns a Deferred that fires (with
'uploadable') when the upload is complete.
'uploadable') when the upload is complete. Do not use the workqueue.
"""
def upload_later(path, filename):
"""Upload a file from disk to the given path.
"""Upload a file from disk to the given path. Use the workqueue.
"""
def delete(path):
@ -259,6 +259,10 @@ class IVirtualDrive(Interface):
complete.
"""
def add_node(path, node):
"""Add a node to the given path. Use the workqueue.
"""
# commands to manipulate subtrees
# ... detach subtree, merge subtree, etc

View File

@ -0,0 +1,41 @@
from zope.interface import implements
from allmydata.filetree import directory, file, redirect
from allmydata.filetree.interfaces import INodeMaker
# this list is used by NodeMaker to convert node specification strings (found
# inside the serialized form of subtrees) into Nodes (which live in the
# in-RAM form of subtrees).
all_node_types = [
directory.LocalFileSubTreeNode,
directory.CHKDirectorySubTreeNode,
directory.SSKDirectorySubTreeNode,
file.CHKFileNode,
file.SSKFileNode,
redirect.LocalFileRedirectionNode,
redirect.QueenRedirectionNode,
redirect.HTTPRedirectionNode,
redirect.QueenOrLocalFileRedirectionNode,
]
class NodeMaker(object):
implements(INodeMaker)
def make_node_from_serialized(self, serialized):
# this turns a string into an INode, which contains information about
# the file or directory (like a URI), but does not contain the actual
# contents. An ISubTreeMaker can be used later to retrieve the
# contents (which means downloading the file if this is an IFileNode,
# or perhaps creating a new subtree from the contents)
# maybe include parent_is_mutable?
assert isinstance(serialized, str)
prefix, body = serialized.split(":", 2)
for node_class in all_node_types:
if prefix == node_class.prefix:
node = node_class()
node.populate_node(body, self)
return node
raise RuntimeError("unable to handle node type '%s'" % prefix)

View File

@ -10,43 +10,7 @@ from allmydata.filetree.interfaces import (
)
from allmydata.upload import IUploadable
# this list is used by NodeMaker to convert node specification strings (found
# inside the serialized form of subtrees) into Nodes (which live in the
# in-RAM form of subtrees).
all_node_types = [
directory.LocalFileSubTreeNode,
directory.CHKDirectorySubTreeNode,
directory.SSKDirectorySubTreeNode,
file.CHKFileNode,
file.SSKFileNode,
redirect.LocalFileRedirectionNode,
redirect.QueenRedirectionNode,
redirect.HTTPRedirectionNode,
redirect.QueenOrLocalFileRedirectionNode,
]
class NodeMaker(object):
implements(INodeMaker)
def make_node_from_serialized(self, serialized):
# this turns a string into an INode, which contains information about
# the file or directory (like a URI), but does not contain the actual
# contents. An ISubTreeMaker can be used later to retrieve the
# contents (which means downloading the file if this is an IFileNode,
# or perhaps creating a new subtree from the contents)
# maybe include parent_is_mutable?
assert isinstance(serialized, str)
prefix, body = serialized.split(":", 2)
for node_class in all_node_types:
if prefix == node_class.prefix:
node = node_class()
node.populate_node(body, self)
return node
raise RuntimeError("unable to handle node type '%s'" % prefix)
from allmydata.filetree.nodemaker import NodeMaker
all_openable_subtree_types = [
directory.LocalFileSubTree,
@ -204,7 +168,7 @@ class VirtualDrive(object):
def _got_closest((node, remaining_path)):
prepath_len = len(path) - len(remaining_path)
prepath = path[:prepath_len]
assert path[prepath_len:] == remaining_path
assert path[prepath_len:] == remaining_path, "um, path=%s, prepath=%s, prepath_len=%d, remaining_path=%s" % (path, prepath, prepath_len, remaining_path)
return (prepath, node, remaining_path)
d.addCallback(_got_closest)
return d
@ -226,6 +190,7 @@ class VirtualDrive(object):
d = self._get_closest_node_and_prepath(parent_path)
def _got_closest((prepath, node, remaining_path)):
# now tell it to create any necessary parent directories
remaining_path = remaining_path[:]
while remaining_path:
node = node.add_subdir(remaining_path.pop(0))
# 'node' is now the directory where the child wants to go
@ -248,16 +213,19 @@ class VirtualDrive(object):
# these are user-visible
def list(self, path):
assert isinstance(path, list)
d = self._get_directory(path)
d.addCallback(lambda node: node.list())
return d
def download(self, path, target):
assert isinstance(path, list)
d = self._get_file_uri(path)
d.addCallback(lambda uri: self.downloader.download(uri, target))
return d
def upload_now(self, path, uploadable):
assert isinstance(path, list)
# note: the first few steps of this do not use the workqueue, but I
# think things should remain consistent anyways. If the node is shut
# down before the file has finished uploading, then we forget all
@ -266,17 +234,24 @@ class VirtualDrive(object):
d = self._child_should_not_exist(path)
# then we upload the file
d.addCallback(lambda ignored: self.uploader.upload(uploadable))
d.addCallback(lambda uri: self.workqueue.create_boxname(uri))
d.addCallback(lambda boxname:
self.workqueue.add_addpath(boxname, path))
def _uploaded(uri):
assert isinstance(uri, str)
new_node = file.CHKFileNode().new(uri)
boxname = self.workqueue.create_boxname(new_node)
self.workqueue.add_addpath(boxname, path)
self.workqueue.add_delete_box(boxname)
d.addCallback(_uploaded)
return d
def upload_later(self, path, filename):
assert isinstance(path, list)
boxname = self.workqueue.create_boxname()
self.workqueue.add_upload_chk(filename, boxname)
self.workqueue.add_addpath(boxname, path)
self.workqueue.add_delete_box(boxname)
def delete(self, path):
assert isinstance(path, list)
parent_path = path[:-1]
orphan_path = path[-1]
d = self._get_closest_node_and_prepath(parent_path)
@ -288,7 +263,16 @@ class VirtualDrive(object):
boxname = subtree.update(self.workqueue)
if boxname:
self.workqueue.add_addpath(boxname, prepath)
self.workqueue.add_delete_box(boxname)
return self
d.addCallback(_got_parent)
return d
def add_node(self, path, node):
assert isinstance(path, list)
assert INode(node)
assert not IDirectoryNode.providedBy(node)
boxname = self.workqueue.create_boxname(node)
self.workqueue.add_addpath(boxname, path)
self.workqueue.add_delete_box(boxname)

View File

@ -338,6 +338,11 @@ class Stuff(unittest.TestCase):
def failUnlessListsAreEqual(self, list1, list2):
self.failUnlessEqual(sorted(list1), sorted(list2))
def failUnlessContentsAreEqual(self, c1, c2):
c1a = dict([(k,v.serialize_node()) for k,v in c1.items()])
c2a = dict([(k,v.serialize_node()) for k,v in c2.items()])
self.failUnlessEqual(c1a, c2a)
def testDirectory(self):
stm = vdrive.SubTreeMaker(None, None)
@ -431,11 +436,36 @@ class Stuff(unittest.TestCase):
root = redirect.LocalFileRedirection().new("vdrive-root",
topdir.create_node_now())
root.update_now(None)
wq = self.makeVirtualDrive("vdrive", root.create_node_now())
v = self.makeVirtualDrive("vdrive", root.create_node_now())
d = wq.list([])
d = v.list([])
def _listed(contents):
self.failUnlessEqual(contents, {})
d.addCallback(_listed)
child1 = CHKFileNode().new("uri1")
d.addCallback(lambda res: v.add_node(["a"], child1))
d.addCallback(lambda res: v.workqueue.flush())
d.addCallback(lambda res: v.list([]))
def _listed2(contents):
self.failUnlessListsAreEqual(contents.keys(), ["a"])
self.failUnlessContentsAreEqual(contents, {"a": child1})
d.addCallback(_listed2)
child2 = CHKFileNode().new("uri2")
child3 = CHKFileNode().new("uri3")
d.addCallback(lambda res: v.add_node(["b","c"], child2))
d.addCallback(lambda res: v.add_node(["b","d"], child3))
d.addCallback(lambda res: v.workqueue.flush())
d.addCallback(lambda res: v.list([]))
def _listed3(contents):
self.failUnlessListsAreEqual(contents.keys(), ["a","b"])
d.addCallback(_listed3)
d.addCallback(lambda res: v.list(["b"]))
def _listed4(contents):
self.failUnlessListsAreEqual(contents.keys(), ["c","d"])
self.failUnlessContentsAreEqual(contents,
{"c": child2, "d": child3})
d.addCallback(_listed4)
return d

View File

@ -153,7 +153,7 @@ class Items(unittest.TestCase):
boxfile = os.path.join(wq.boxesdir, boxname)
self.failUnless(os.path.exists(boxfile))
d = wq.run_all_steps()
d = wq.flush()
def _check(res):
self.failUnlessEqual(len(wq.dispatched_steps), 5)
self.failUnlessEqual(wq.dispatched_steps[0][0], "upload_chk")

View File

@ -5,6 +5,8 @@ from twisted.internet import defer
from allmydata.util import bencode
from allmydata.util.idlib import b2a
from allmydata.Crypto.Cipher import AES
from allmydata.filetree.nodemaker import NodeMaker
from allmydata.filetree.interfaces import INode
class IWorkQueue(Interface):
"""Each filetable root is associated a work queue, which is persisted on
@ -22,6 +24,20 @@ class IWorkQueue(Interface):
application is started, the step can be re-started without problems. The
placement of the 'retain' commands depends upon how long we might expect
the app to be offline.
tempfiles: the workqueue has a special directory where temporary files
are stored. create_tempfile() generates these files, while steps like
add_upload_chk() use them. The add_delete_tempfile() will delete the
tempfile. All tempfiles are deleted when the workqueue becomes empty,
since at that point none of them can still be referenced.
boxes: there is another special directory where named slots (called
'boxes') hold serialized INode specifications (the strings which are
returned by INode.serialize_node()). Boxes are created by calling
create_boxname(). Boxes are filled either at the time of creation or by
steps like add_upload_chk(). Boxes are used by steps like add_addpath()
and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
well as when the workqueue becomes empty.
"""
def create_tempfile(suffix=""):
@ -31,7 +47,11 @@ class IWorkQueue(Interface):
path, rather it will be interpreted relative to some directory known
only by the workqueue."""
def create_boxname(contents=None):
"""Return a unique box name (as a string)."""
"""Return a unique box name (as a string). If 'contents' are
provided, it must be an instance that provides INode, and the
serialized form of the node will be written into the box. Otherwise
the boxname can be used by steps like add_upload_chk to hold the
generated uri."""
def add_upload_chk(source_filename, stash_uri_in_boxname):
"""This step uploads a file to the mesh and obtains a content-based
@ -88,6 +108,14 @@ class IWorkQueue(Interface):
def add_delete_box(boxname):
"""When executed, this step deletes the given box."""
# methods for use in unit tests
def flush():
"""Execute all steps in the WorkQueue right away. Return a Deferred
that fires (with self) when the queue is empty.
"""
class NotCapableError(Exception):
"""You have tried to write to a read-only node."""
@ -132,6 +160,7 @@ class WorkQueue(object):
def __init__(self, basedir):
assert basedir.endswith("workqueue")
self.basedir = basedir
self._node_maker = NodeMaker()
self.seqnum = 0
self.tmpdir = os.path.join(basedir, "tmp")
#self.trashdir = os.path.join(basedir, "trash")
@ -174,8 +203,12 @@ class WorkQueue(object):
f = open(os.path.join(self.filesdir, filename), "wb")
return (f, filename)
def create_boxname(self):
return b2a(os.urandom(10))
def create_boxname(self, contents=None):
boxname = b2a(os.urandom(10))
if contents is not None:
assert INode(contents)
self.write_to_box(boxname, contents.serialize_node())
return boxname
def write_to_box(self, boxname, data):
f = open(os.path.join(self.boxesdir, boxname), "w")
f.write(data)
@ -309,7 +342,7 @@ class WorkQueue(object):
if not hasattr(self, handlername):
raise RuntimeError("unknown workqueue step type '%s'" % steptype)
handler = getattr(self, handlername)
d = defer.maybeDeferred(handler, *lines[1:])
d = defer.maybeDeferred(handler, *lines)
return d
def _delete_step(self, res, stepname):
@ -337,6 +370,8 @@ class WorkQueue(object):
d.addCallback(self.run_all_steps)
return d
return defer.succeed(None)
def flush(self):
return self.run_all_steps()
def open_tempfile(self, filename):
@ -353,8 +388,9 @@ class WorkQueue(object):
pass
def step_addpath(self, boxname, *path):
path = list(path)
data = self.read_from_box(boxname)
child_node = unserialize(data) # TODO: unserialize ?
child_node = self._node_maker.make_node_from_serialized(data)
return self.vdrive.add(path, child_node)
def step_retain_ssk(self, index_a, read_key_a):