mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-01 08:48:01 +00:00
checkpoint work-in-progress for WorkQueue, a disk-persistent list of work to be done
This commit is contained in:
parent
a6678b9a37
commit
e600571f82
@ -413,7 +413,7 @@ class MutableCHKDirectorySubTree(_MutableDirectorySubTree):
|
||||
|
||||
def upload_my_serialized_form(self, work_queue):
|
||||
# this is the CHK form
|
||||
f, filename = work_queue.create_tempfile()
|
||||
f, filename = work_queue.create_tempfile(".chkdir")
|
||||
self.serialize_to_file(f)
|
||||
f.close()
|
||||
boxname = work_queue.create_boxname()
|
||||
@ -441,7 +441,7 @@ class MutableSSKDirectorySubTree(_MutableDirectorySubTree):
|
||||
|
||||
def upload_my_serialized_form(self, work_queue):
|
||||
# this is the SSK form
|
||||
f, filename = work_queue.create_tempfile()
|
||||
f, filename = work_queue.create_tempfile(".sskdir")
|
||||
self.serialize_to_file(f)
|
||||
f.close()
|
||||
work_queue.add_upload_ssk(filename, self.get_write_capability(),
|
||||
|
@ -28,7 +28,7 @@ class FakeWorkQueue(object):
|
||||
self.first_commands = []
|
||||
self.last_commands = []
|
||||
|
||||
def create_tempfile(self):
|
||||
def create_tempfile(self, suffix=""):
|
||||
self.tempfile_number += 1
|
||||
self.first_commands.append("create_tempfile-%d" % self.tempfile_number)
|
||||
return (StringIO(), "dummy_filename-%d" % self.tempfile_number)
|
||||
|
@ -2,6 +2,7 @@
|
||||
import os, shutil
|
||||
from zope.interface import Interface, implements
|
||||
from allmydata.util import bencode
|
||||
from allmydata.util.idlib import b2a
|
||||
|
||||
class IWorkQueue(Interface):
|
||||
"""Each filetable root is associated a work queue, which is persisted on
|
||||
@ -21,7 +22,7 @@ class IWorkQueue(Interface):
|
||||
the app to be offline.
|
||||
"""
|
||||
|
||||
def create_tempfile():
|
||||
def create_tempfile(suffix=""):
|
||||
"""Return (f, filename)."""
|
||||
def create_boxname():
|
||||
"""Return a unique box name (as a string)."""
|
||||
@ -115,26 +116,193 @@ class UploadSSKStep(Step):
|
||||
class WorkQueue(object):
|
||||
implements(IWorkQueue)
|
||||
def __init__(self, basedir):
|
||||
assert basedir.endswith("workqueue")
|
||||
self.basedir = basedir
|
||||
self.seqnum = 0
|
||||
self.tmpdir = os.path.join(basedir, "tmp")
|
||||
#self.trashdir = os.path.join(basedir, "trash")
|
||||
self.filesdir = os.path.join(basedir, "files")
|
||||
self.boxesdir = os.path.join(basedir, "boxes")
|
||||
if os.path.exists(self.tmpdir):
|
||||
shutil.rmtree(self.tmpdir)
|
||||
os.makedirs(self.tmpdir)
|
||||
#if os.path.exists(self.trashdir):
|
||||
# shutil.rmtree(self.trashdir)
|
||||
#os.makedirs(self.trashdir)
|
||||
if not os.path.exists(self.filesdir):
|
||||
# filesdir is *not* cleared
|
||||
os.makedirs(self.filesdir)
|
||||
if not os.path.exists(self.boxesdir):
|
||||
# likewise, boxesdir is not cleared
|
||||
os.makedirs(self.boxesdir)
|
||||
# all Steps are recorded in separate files in our basedir. All such
|
||||
# files are named with the pattern 'step-END-NNN', where END is
|
||||
# either 'first' or 'last'. These steps are to be executed in
|
||||
# alphabetical order, with all 'step-first-NNN' steps running before
|
||||
# any 'step-last-NNN'.
|
||||
for n in os.list(self.basedir):
|
||||
if n.startswith("step-first-"):
|
||||
sn = int(n[len("step-first-"):])
|
||||
self.seqnum = max(self.seqnum, sn)
|
||||
elif n.startswith("step-last-"):
|
||||
sn = int(n[len("step-last-"):])
|
||||
self.seqnum = max(self.seqnum, sn)
|
||||
# each of these files contains one string per line, and the first
|
||||
# line specifies what kind of step it is
|
||||
assert seqnum < 1000 # TODO: don't let this grow unboundedly
|
||||
|
||||
def create_tempfile(self, suffix=""):
|
||||
randomname = b2a(os.random(10))
|
||||
filename = randomname + suffix
|
||||
f = open(os.path.join(self.filesdir, filename), "wb")
|
||||
return (f, filename)
|
||||
|
||||
def create_boxname(self):
|
||||
return b2a(os.random(10))
|
||||
def write_to_box(self, boxname, data):
|
||||
f = open(os.path.join(self.boxesdir, boxname), "w")
|
||||
f.write(data)
|
||||
f.flush()
|
||||
os.fsync(f)
|
||||
f.close()
|
||||
def read_from_box(self, boxname):
|
||||
f = open(os.path.join(self.boxesdir, boxname), "r")
|
||||
data = f.read()
|
||||
f.close()
|
||||
return data
|
||||
|
||||
def _create_step(self, end, lines):
|
||||
assert end in ("first", "last")
|
||||
filename = "step-%s-%d" % (end, self.seqnum)
|
||||
self.seqnum += 1
|
||||
f = open(os.path.join(self.tmpdir, filename), "w")
|
||||
for line in lines:
|
||||
assert "\n" not in line, line
|
||||
f.write(line)
|
||||
f.write("\n")
|
||||
f.flush()
|
||||
os.fsync(f)
|
||||
f.close()
|
||||
fromfile = os.path.join(self.tmpdir, filename)
|
||||
tofile = os.path.join(self.basedir, filename)
|
||||
os.rename(fromfile, tofile)
|
||||
|
||||
# methods to add entries to the queue
|
||||
def add_upload_chk(self, source_filename, stash_uri_in_boxname):
|
||||
# source_filename is absolute, and can point to things outside our
|
||||
# workqueue.
|
||||
lines = ["upload_chk", source_filename, stash_uri_in_boxname]
|
||||
self._create_step(lines)
|
||||
|
||||
def add_upload_ssk(self, source_filename, write_capability,
|
||||
previous_version):
|
||||
lines = ["upload_ssk", source_filename,
|
||||
b2a(write_capability.index), b2a(write_capability.key),
|
||||
str(previous_version)]
|
||||
self._create_step(lines)
|
||||
|
||||
def add_retain_ssk(self, read_capability):
|
||||
lines = ["retain_ssk", b2a(read_capability.index),
|
||||
b2a(read_capability.key)]
|
||||
self._create_step(lines)
|
||||
|
||||
def add_unlink_ssk(self, write_capability):
|
||||
lines = ["unlink_ssk", b2a(write_capability.index),
|
||||
b2a(write_capability.key)]
|
||||
self._create_step(lines)
|
||||
|
||||
def add_retain_uri_from_box(self, boxname):
|
||||
lines = ["retain_uri_from_box", boxname]
|
||||
self._create_step(lines)
|
||||
|
||||
def add_addpath(self, boxname, path):
|
||||
assert isinstance(path, (list, tuple))
|
||||
lines = ["addpath", boxname]
|
||||
lines.extend(path)
|
||||
self._create_step(lines)
|
||||
|
||||
def add_unlink_uri(self, uri):
|
||||
lines = ["unlink_uri", uri]
|
||||
self._create_step(lines)
|
||||
|
||||
def delete_tempfile(self, filename):
|
||||
lines = ["delete_tempfile", filename]
|
||||
self._create_step(lines)
|
||||
|
||||
def delete_box(self, boxname):
|
||||
lines = ["delete_box", boxname]
|
||||
self._create_step(lines)
|
||||
|
||||
|
||||
# methods to perform work
|
||||
|
||||
def run_next_step(self):
|
||||
"""Run the next pending step.
|
||||
|
||||
Returns None if there is no next step to run, or a Deferred that
|
||||
will fire when the step completes."""
|
||||
next_step = self.get_next_step()
|
||||
if next_step:
|
||||
steptype, lines = self.get_next_step()
|
||||
return self.dispatch_step(steptype, lines)
|
||||
return None
|
||||
|
||||
def get_next_step(self):
|
||||
stepname = self._find_first_step()
|
||||
stepbase = os.path.join(self.basedir, stepname)
|
||||
f = open(os.path.join(stepbase, "type"), "r")
|
||||
stype = f.read().strip()
|
||||
stepnames = [n for n in os.list(self.basedir)
|
||||
if n.startswith("step-")]
|
||||
stepnames.sort()
|
||||
if not stepnames:
|
||||
return None
|
||||
stepname = stepnames[0]
|
||||
f = open(os.path.join(self.basedir, stepname), "r")
|
||||
lines = f.read().split("\n")
|
||||
f.close()
|
||||
if stype == "upload_ssk":
|
||||
s = UploadSSKStep()
|
||||
# ...
|
||||
else:
|
||||
raise RuntimeError("unknown step type '%s'" % stype)
|
||||
s.setup(stepname, self.basedir)
|
||||
d = s.start()
|
||||
assert lines[-1] == ""
|
||||
lines.pop(-1)
|
||||
steptype = lines[0]
|
||||
return steptype, lines
|
||||
|
||||
def dispatch_step(self, steptype, lines):
|
||||
handlername = "step_" + steptype
|
||||
if not hasattr(self, handlername):
|
||||
raise RuntimeError("unknown workqueue step type '%s'" % steptype)
|
||||
handler = getattr(self, handlername)
|
||||
d = defer.maybeDeferred(handler, *lines[1:])
|
||||
d.addCallback(self._done, stepname)
|
||||
return d
|
||||
|
||||
def _done(self, res, stepname):
|
||||
os.unlink(os.path.join(self.basedir, stepname))
|
||||
return res
|
||||
|
||||
def count_pending_steps(self):
|
||||
return len([n for n in os.list(self.basedir)
|
||||
if n.startswith("step-")])
|
||||
|
||||
def step_upload_chk(self, source_filename, index_a, write_key_a):
|
||||
pass
|
||||
def step_upload_ssk(self, source_filename, index_a, write_key_a, prev_ver):
|
||||
pass
|
||||
|
||||
def step_addpath(self, boxname, *path):
|
||||
data = self.read_from_box(boxname)
|
||||
child_spec = something.unserialize(data)
|
||||
return self.root.add_subpath(path, child_spec, self)
|
||||
|
||||
def step_retain_ssk(self, index_a, read_key_a):
|
||||
pass
|
||||
def step_unlink_ssk(self, index_a, write_key_a):
|
||||
pass
|
||||
def step_retain_uri_from_box(self, boxname):
|
||||
pass
|
||||
def step_unlink_uri(self, uri):
|
||||
pass
|
||||
|
||||
def step_delete_tempfile(self, filename):
|
||||
os.unlink(os.path.join(self.filesdir, filename))
|
||||
def step_delete_box(self, boxname):
|
||||
os.unlink(os.path.join(self.boxesdir, boxname))
|
||||
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user