remove unused/obsoleted workqueue.py

This commit is contained in:
Brian Warner 2007-06-26 17:25:23 -07:00
parent b11fa20191
commit 2766b7988b
2 changed files with 0 additions and 637 deletions

View File

@ -634,153 +634,6 @@ class IUploader(Interface):
"""Like upload(), but accepts an open filehandle."""
class IWorkQueue(Interface):
"""Each filetable root is associated a work queue, which is persisted on
disk and contains idempotent actions that need to be performed. After
each action is completed, it is removed from the queue.
The queue is broken up into several sections. First are the 'upload'
steps. After this are the 'add_subpath' commands. The last section has
the 'unlink' steps. Somewhere in here are the 'retain' steps.. maybe
interspersed with 'upload', maybe after 'add_subpath' and before
'unlink'.
The general idea is that the processing of the work queue could be
interrupted at any time, in the middle of a step, and the next time the
application is started, the step can be re-started without problems. The
placement of the 'retain' commands depends upon how long we might expect
the app to be offline.
tempfiles: the workqueue has a special directory where temporary files
are stored. create_tempfile() generates these files, while steps like
add_upload_chk() use them. The add_delete_tempfile() will delete the
tempfile. All tempfiles are deleted when the workqueue becomes empty,
since at that point none of them can still be referenced.
boxes: there is another special directory where named slots (called
'boxes') hold serialized INode specifications (the strings which are
returned by INode.serialize_node()). Boxes are created by calling
create_boxname(). Boxes are filled either at the time of creation or by
steps like add_upload_chk(). Boxes are used by steps like add_addpath()
and add_retain_uri_from_box. Boxes are deleted by add_delete_box(), as
well as when the workqueue becomes empty.
"""
def create_tempfile(suffix=""):
"""Return (f, filename), where 'f' is an open filehandle, and
'filename' is a string that can be passed to other workqueue steps to
refer to that same file later. NOTE: 'filename' is not an absolute
path, rather it will be interpreted relative to some directory known
only by the workqueue."""
def create_boxname(contents=None):
"""Return a unique box name (as a string). If 'contents' are
provided, it must be an instance that provides INode, and the
serialized form of the node will be written into the box. Otherwise
the boxname can be used by steps like add_upload_chk to hold the
generated uri."""
def add_upload_chk(source_filename, stash_uri_in_boxname):
"""This step uploads a file to the grid and obtains a content-based
URI which can be used to later retrieve the same contents ('CHK'
mode). This URI includes unlink rights. It does not mark the file for
retention.
Non-absolute filenames are interpreted relative to the workqueue's
special just-for-tempfiles directory.
When the upload is complete, the resulting URI is stashed in a 'box'
with the specified name. This is basically a local variable. A later
'add_subpath' step will reference this boxname and retrieve the URI.
"""
def add_upload_ssk(write_capability, previous_version, source_filename):
"""This step uploads a file to the grid in a way that replaces the
previous version and does not require a change to the ID referenced
by the parent.
"""
def add_vdrive_update_handle(handle, source_filename):
"""Arrange for a vdrive server to be notified that the given handle
has been updated with the contents of the given tempfile. This will
send a set_handle() message to the vdrive."""
def add_retain_ssk(read_capability):
"""Arrange for the given SSK to be kept alive."""
def add_unlink_ssk(write_capability):
"""Stop keeping the given SSK alive."""
def add_retain_uri_from_box(boxname):
"""When executed, this step retrieves the URI from the given box and
marks it for retention: this adds it to a list of all URIs that this
system cares about, which will initiate filechecking/repair for the
file."""
def add_addpath(boxname, path):
"""When executed, this step pulls a node specification from 'boxname'
and figures out which subtrees must be modified to allow that node to
live at the 'path' (which is an absolute path). This will probably
cause one or more 'add_modify_subtree' or 'add_modify_redirection'
steps to be added to the workqueue.
"""
def add_deletepath(path):
"""When executed, finds the subtree that contains the node at 'path'
and modifies it (and any necessary parent subtrees) to delete that
path. This will probably cause one or more 'add_modify_subtree' or
'add_modify_redirection' steps to be added to the workqueue.
"""
def add_modify_subtree(subtree_node, localpath, new_node_boxname,
new_subtree_boxname=None):
"""When executed, this step retrieves the subtree specified by
'subtree_node', pulls a node specification out of 'new_node_boxname',
then modifies the subtree such that a subtree-relative 'localpath'
points to the new node. If 'new_node_boxname' is None, this deletes
the given path. It then serializes the subtree in its new form, and
optionally puts a node that describes the new subtree in
'new_subtree_boxname' for use by another add_modify_subtree step.
The idea is that 'subtree_node' will refer a CHKDirectorySubTree, and
'new_node_boxname' will contain the CHKFileNode that points to a
newly-uploaded file. When the CHKDirectorySubTree is modified, it
acquires a new URI, which will be stuffed (in the form of a
CHKDirectorySubTreeNode) into 'new_subtree_boxname'. A subsequent
step would then read from 'new_subtree_boxname' and modify some other
subtree with the contents.
If 'subtree_node' refers to a redirection subtree like
LocalFileRedirection or VdriveRedirection, then 'localpath' is
ignored, because redirection subtrees don't consume path components
and have no internal directory structure (they just have the one
redirection target). Redirection subtrees generally retain a constant
identity, so it is unlikely that 'new_subtree_boxname' will be used.
"""
def add_unlink_uri(uri):
"""When executed, this step will unlink the data referenced by the
given URI: the unlink rights are used to tell any shareholders to
unlink the file (possibly deleting it), and the URI is removed from
the list that this system cares about, cancelling filechecking/repair
for the file.
All 'unlink' steps are pushed to the end of the queue.
"""
def add_delete_tempfile(filename):
"""This step will delete a tempfile created by create_tempfile."""
def add_delete_box(boxname):
"""When executed, this step deletes the given box."""
# methods for use in unit tests
def flush():
"""Execute all steps in the WorkQueue right away. Return a Deferred
that fires (with self) when the queue is empty.
"""
class NotCapableError(Exception):
"""You have tried to write to a read-only node."""

View File

@ -1,490 +0,0 @@
import os, shutil
from zope.interface import implements
from twisted.internet import defer
from allmydata.util import bencode
from allmydata.util.idlib import b2a
from allmydata.Crypto.Cipher import AES
from allmydata.Crypto.Hash import SHA256
from allmydata.filetree.nodemaker import NodeMaker
from allmydata.filetree.interfaces import INode
from allmydata.filetree.file import CHKFileNode
from allmydata.interfaces import IWorkQueue, NotCapableError, IUploader
class Step(object):
def setup(self, stepname, basedir):
self.basedir = basedir
self.stepname = stepname
self.stepbase = os.path.join(self.basedir, self.stepname)
def remove(self, _ignored=None):
trashdir = os.path.join(self.basedir, "trash", self.stepname)
os.rename(self.stepbase, trashdir)
shutil.rmtree(trashdir)
class UploadSSKStep(Step):
def start(self):
f = open(os.path.join(self.stepbase, "source_filename"), "r")
source_filename = f.read()
f.close()
f = open(os.path.join(self.stepbase, "write_capability"), "r")
write_cap = bencode.bdecode(f.read())
f.close()
f = open(os.path.join(self.stepbase, "previous_version"), "r")
previous_version = bencode.bdecode(f.read())
f.close()
n = MutableSSKTracker()
n.set_version(previous_version)
n.set_write_capability(write_cap)
f = open(source_filename, "rb")
data = f.read()
f.close()
published_data = n.write_new_version(data)
d = self.push_ssk(n.ssk_index, n.vresion, published_data)
d.addCallback(self.remove)
return d
class WorkQueue(object):
implements(IWorkQueue)
debug = False
def __init__(self, basedir):
assert basedir.endswith("workqueue")
self.basedir = basedir
self._node_maker = NodeMaker()
self._uploader = None # filled in later
self._downloader = None # filled in later
self.seqnum = 0
self.tmpdir = os.path.join(basedir, "tmp")
#self.trashdir = os.path.join(basedir, "trash")
self.filesdir = os.path.join(basedir, "files")
self.boxesdir = os.path.join(basedir, "boxes")
if os.path.exists(self.tmpdir):
shutil.rmtree(self.tmpdir)
os.makedirs(self.tmpdir)
#if os.path.exists(self.trashdir):
# shutil.rmtree(self.trashdir)
#os.makedirs(self.trashdir)
if not os.path.exists(self.filesdir):
# filesdir is *not* cleared
os.makedirs(self.filesdir)
if not os.path.exists(self.boxesdir):
# likewise, boxesdir is not cleared
os.makedirs(self.boxesdir)
# all Steps are recorded in separate files in our basedir. All such
# files are named with the pattern 'step-END-NNN', where END is
# either 'first' or 'last'. These steps are to be executed in
# alphabetical order, with all 'step-first-NNN' steps running before
# any 'step-last-NNN'.
for n in os.listdir(self.basedir):
if n.startswith("step-first-"):
sn = int(n[len("step-first-"):])
self.seqnum = max(self.seqnum, sn)
elif n.startswith("step-last-"):
sn = int(n[len("step-last-"):])
self.seqnum = max(self.seqnum, sn)
# each of these files contains one string per line, and the first
# line specifies what kind of step it is
assert self.seqnum < 1000 # TODO: don't let this grow unboundedly
def set_vdrive(self, vdrive):
self.vdrive = vdrive
def set_uploader(self, uploader):
assert IUploader(uploader)
self._uploader = uploader
def create_tempfile(self, suffix=""):
randomname = b2a(os.urandom(10))
filename = randomname + suffix
f = open(os.path.join(self.filesdir, filename), "wb")
return (f, filename)
def create_boxname(self, contents=None):
boxname = b2a(os.urandom(10))
if contents is not None:
self.write_to_box(boxname, contents)
return boxname
def write_to_box(self, boxname, contents):
assert INode(contents)
f = open(os.path.join(self.boxesdir, boxname), "w")
f.write(contents.serialize_node())
f.flush()
os.fsync(f)
f.close()
def read_from_box(self, boxname):
f = open(os.path.join(self.boxesdir, boxname), "r")
data = f.read()
node = self._node_maker.make_node_from_serialized(data)
f.close()
return node
def _create_step(self, end, lines):
assert end in ("first", "last")
filename = "step-%s-%d" % (end, self.seqnum)
self.seqnum += 1
f = open(os.path.join(self.tmpdir, filename), "w")
for line in lines:
assert "\n" not in line, line
f.write(line)
f.write("\n")
f.flush()
os.fsync(f)
f.close()
fromfile = os.path.join(self.tmpdir, filename)
tofile = os.path.join(self.basedir, filename)
os.rename(fromfile, tofile)
def _create_step_first(self, lines):
self._create_step("first", lines)
def _create_step_last(self, lines):
self._create_step("last", lines)
# methods to add entries to the queue
def add_upload_chk(self, source_filename, stash_uri_in_boxname):
# If source_filename is absolute, it will point to something outside
# of our workqueue (this is how user files are uploaded). If it is
# relative, it points to something inside self.filesdir (this is how
# serialized directories and tempfiles are uploaded)
lines = ["upload_chk", source_filename, stash_uri_in_boxname]
self._create_step_first(lines)
def add_upload_ssk(self, source_filename, write_capability,
previous_version):
lines = ["upload_ssk", source_filename,
b2a(write_capability.index), b2a(write_capability.key),
str(previous_version)]
self._create_step_first(lines)
def add_retain_ssk(self, read_capability):
lines = ["retain_ssk", b2a(read_capability.index),
b2a(read_capability.key)]
self._create_step_first(lines)
def add_unlink_ssk(self, write_capability):
lines = ["unlink_ssk", b2a(write_capability.index),
b2a(write_capability.key)]
self._create_step_last(lines)
def add_retain_uri_from_box(self, boxname):
lines = ["retain_uri_from_box", boxname]
self._create_step_first(lines)
def add_addpath(self, boxname, path):
assert isinstance(path, (list, tuple))
lines = ["addpath", boxname]
lines.extend(path)
self._create_step_first(lines)
def add_deletepath(self, path):
assert isinstance(path, (list, tuple))
lines = ["deletepath"]
lines.extend(path)
self._create_step_first(lines)
def add_modify_subtree(self, subtree_node, localpath, new_node_boxname,
new_subtree_boxname=None):
assert isinstance(localpath, (list, tuple))
box1 = self.create_boxname(subtree_node)
self.add_delete_box(box1)
# TODO: it would probably be easier if steps were represented in
# directories, with a separate file for each argument
if new_node_boxname is None:
new_node_boxname = ""
if new_subtree_boxname is None:
new_subtree_boxname = ""
lines = ["modify_subtree",
box1, new_node_boxname, new_subtree_boxname]
lines.extend(localpath)
self._create_step_first(lines)
def add_unlink_uri(self, uri):
lines = ["unlink_uri", uri]
self._create_step_last(lines)
def add_delete_tempfile(self, filename):
lines = ["delete_tempfile", filename]
self._create_step_last(lines)
def add_delete_box(self, boxname):
lines = ["delete_box", boxname]
self._create_step_last(lines)
# methods to perform work
def run_next_step(self):
"""Run the next pending step.
Returns None if there is no next step to run, or a Deferred that
will fire when the step completes. The step will be removed
from the queue when it completes."""
next_step = self.get_next_step()
if next_step:
stepname, steptype, lines = self.get_next_step()
d = self.dispatch_step(steptype, lines)
d.addCallback(self._delete_step, stepname)
return d
# no steps pending, it is safe to clean out leftover files
self._clean_leftover_files()
return None
def _clean_leftover_files(self):
# there are no steps pending, therefore any leftover files in our
# filesdir are orphaned and can be deleted. This catches things like
# a tempfile being created but the application gets interrupted
# before the upload step which references it gets created, or if an
# upload step gets written but the remaining sequence (addpath,
# delete_box) does not.
for n in os.listdir(self.filesdir):
os.unlink(os.path.join(self.filesdir, n))
for n in os.listdir(self.boxesdir):
os.unlink(os.path.join(self.boxesdir, n))
def get_next_step(self):
stepnames = [n for n in os.listdir(self.basedir)
if n.startswith("step-")]
stepnames.sort()
if not stepnames:
return None
stepname = stepnames[0]
return self._get_step(stepname)
def _get_step(self, stepname):
f = open(os.path.join(self.basedir, stepname), "r")
lines = f.read().split("\n")
f.close()
assert lines[-1] == "" # files should end with a newline
lines.pop(-1) # remove the newline
steptype = lines.pop(0)
return stepname, steptype, lines
def dispatch_step(self, steptype, lines):
handlername = "step_" + steptype
if not hasattr(self, handlername):
raise RuntimeError("unknown workqueue step type '%s'" % steptype)
handler = getattr(self, handlername)
d = defer.maybeDeferred(handler, *lines)
return d
def _delete_step(self, res, stepname):
os.unlink(os.path.join(self.basedir, stepname))
return res
# debug/test methods
def count_pending_steps(self):
return len([n for n in os.listdir(self.basedir)
if n.startswith("step-")])
def get_all_steps(self):
# returns a list of (steptype, lines) for all steps
stepnames = []
for stepname in os.listdir(self.basedir):
if stepname.startswith("step-"):
stepnames.append(stepname)
stepnames.sort()
steps = []
for stepname in stepnames:
steps.append(self._get_step(stepname)[1:])
return steps
def run_all_steps(self, ignored=None):
d = self.run_next_step()
if d:
d.addCallback(self.run_all_steps)
return d
return defer.succeed(None)
def flush(self):
return self.run_all_steps()
def open_tempfile(self, filename):
f = open(os.path.join(self.filesdir, filename), "rb")
return f
# work is dispatched to these methods. To add a new step type, add a
# dispatch method here and an add_ method above.
def step_upload_chk(self, source_filename, stash_uri_in_boxname):
if self.debug:
print "STEP_UPLOAD_CHK(%s -> %s)" % (source_filename,
stash_uri_in_boxname)
# we use relative filenames for tempfiles created by
# workqueue.create_tempfile, and absolute filenames for everything
# that comes from the vdrive. That means using os.path.abspath() on
# user files in VirtualDrive methods.
filename = os.path.join(self.filesdir, source_filename)
d = self._uploader.upload_filename(filename)
def _uploaded(uri):
if self.debug:
print " -> %s" % uri
node = CHKFileNode().new(uri)
self.write_to_box(stash_uri_in_boxname, node)
d.addCallback(_uploaded)
return d
def step_upload_ssk(self, source_filename, index_a, write_key_a, prev_ver):
pass
def step_addpath(self, boxname, *path):
if self.debug:
print "STEP_ADDPATH(%s -> %s)" % (boxname, "/".join(path))
path = list(path)
return self.vdrive.addpath(path, boxname)
def step_deletepath(self, *path):
if self.debug:
print "STEP_DELETEPATH(%s)" % "/".join(path)
path = list(path)
return self.vdrive.deletepath(path)
def step_modify_subtree(self, subtree_node_boxname, new_node_boxname,
new_subtree_boxname, *localpath):
# the weird order of arguments is a consequence of the fact that
# localpath is variable-length and new_subtree_boxname is optional.
if not new_subtree_boxname:
new_subtree_boxname = None
subtree_node = self.read_from_box(subtree_node_boxname)
new_node = None
if new_node_boxname:
new_node = self.read_from_box(new_node_boxname)
localpath = list(localpath)
return self.vdrive.modify_subtree(subtree_node, localpath,
new_node, new_subtree_boxname)
def step_retain_ssk(self, index_a, read_key_a):
pass
def step_unlink_ssk(self, index_a, write_key_a):
pass
def step_retain_uri_from_box(self, boxname):
pass
def step_unlink_uri(self, uri):
if self.debug:
print "STEP_UNLINK_URI(%s)" % uri
pass
def step_delete_tempfile(self, filename):
if self.debug:
print "STEP_DELETE_TEMPFILE(%s)" % filename
assert not filename.startswith("/")
os.unlink(os.path.join(self.filesdir, filename))
def step_delete_box(self, boxname):
if self.debug:
print "DELETE_BOX", boxname
os.unlink(os.path.join(self.boxesdir, boxname))
AES_KEY_LENGTH = 16
def make_aes_key():
return os.urandom(16)
def make_rsa_key():
raise NotImplementedError
def hash_sha(data):
return SHA256.new(data).digest()
def hash_sha_to_key(data):
return SHA256.new(data).digest()[:AES_KEY_LENGTH]
def aes_encrypt(key, plaintext):
assert isinstance(key, str)
assert len(key) == AES_KEY_LENGTH
cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
crypttext = cryptor.encrypt(plaintext)
return crypttext
def aes_decrypt(key, crypttext):
assert isinstance(key, str)
assert len(key) == AES_KEY_LENGTH
cryptor = AES.new(key=key, mode=AES.MODE_CTR, counterstart="\x00"*16)
plaintext = cryptor.decrypt(crypttext)
return plaintext
def serialize(objects):
return bencode.bencode(objects)
def unserialize(data):
return bencode.bdecode(data)
class MutableSSKTracker(object):
"""I represent a mutable file, indexed by an SSK.
"""
def create(self):
# if you create the node this way, you will have both read and write
# capabilities
self.priv_key, self.pub_key = make_rsa_key()
self.ssk_index = hash_sha(self.pub_key.serialized())
self.write_key = make_aes_key()
self.read_key = hash_sha_to_key(self.write_key)
self.version = 0
def set_version(self, version):
self.version = version
def set_read_capability(self, read_cap):
(self.ssk_index, self.read_key) = read_cap
def set_write_capability(self, write_cap):
# TODO: add some assertions here, if someone calls both
# set_read_capability and set_write_capability, make sure the keys
# match
(self.ssk_index, self.write_key) = write_cap
self.read_key = hash_sha_to_key(self.write_key)
def extract_readwrite_from_published(self, published_data, write_key):
self.write_key = write_key
self.read_key = hash_sha_to_key(self.write_key)
self._extract(published_data)
self.priv_key = aes_decrypt(write_key, self.encrypted_privkey)
assert self.priv_key.is_this_your_pub_key(self.pub_key)
def extract_readonly_from_published(self, published_data, read_key):
self.write_key = None
self.read_key = read_key
self._extract(published_data)
self.priv_key = None
def _extract(self, published_data):
(signed_data, serialized_pub_key, sig) = unserialize(published_data)
self.pub_key = unserialize(serialized_pub_key)
self.pub_key.check_signature(sig, signed_data)
(encrypted_privkey, encrypted_data, version) = unserialize(signed_data)
self.data = aes_decrypt(self.read_key, encrypted_data)
self.encrypted_privkey = encrypted_privkey
def get_read_capability(self):
return (self.ssk_index, self.read_key)
def get_write_capability(self):
if not self.write_key:
raise NotCapableError("This MutableSSKTracker is read-only")
return (self.ssk_index, self.write_key)
def write_new_version(self, data):
if not self.write_key:
raise NotCapableError("This MutableSSKTracker is read-only")
encrypted_privkey = aes_encrypt(self.write_key,
self.priv_key.serialized())
encrypted_data = aes_encrypt(self.read_key, data)
self.version += 1
signed_data = serialize((encrypted_privkey,
encrypted_data,
self.version))
sig = self.priv_key.sign(signed_data)
serialized_pub_key = self.pub_key.serialized()
published_data = serialize((signed_data, serialized_pub_key, sig))
return published_data
def make_new_SSK_node():
n = MutableSSKTracker()
n.create()
return n
def extract_readwrite_SSK_node(published_data, write_key):
n = MutableSSKTracker()
n.extract_readwrite_SSK_node(published_data, write_key)
return n
def extract_readonly_SSK_node(published_data, read_key):
n = MutableSSKTracker()
n.extract_readonly_from_published(published_data, read_key)
return n