mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-30 16:13:58 +00:00
workqueue: start adding tests
This commit is contained in:
parent
aefe54574f
commit
b9edb02820
53
src/allmydata/test/test_workqueue.py
Normal file
53
src/allmydata/test/test_workqueue.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
|
||||||
|
import os
|
||||||
|
from twisted.trial import unittest
|
||||||
|
from allmydata import workqueue
|
||||||
|
from allmydata.util import idlib
|
||||||
|
|
||||||
|
class FakeWorkQueue(workqueue.WorkQueue):
|
||||||
|
|
||||||
|
def __init__(self, basedir):
|
||||||
|
workqueue.WorkQueue.__init__(self, basedir)
|
||||||
|
self.dispatched_steps = []
|
||||||
|
|
||||||
|
def dispatch_step(self, steptype, lines):
|
||||||
|
self.dispatched_steps.append(steptype, lines)
|
||||||
|
|
||||||
|
class Items(unittest.TestCase):
|
||||||
|
def wq(self, testname):
|
||||||
|
return FakeWorkQueue("test_workqueue/Items/%s/workqueue" % testname)
|
||||||
|
def testTempfile(self):
|
||||||
|
wq = self.wq("testTempfile")
|
||||||
|
(f, filename) = wq.create_tempfile(".chkdir")
|
||||||
|
self.failUnless(filename.endswith(".chkdir"))
|
||||||
|
data = "this is some random data: %s\n" % idlib.b2a(os.urandom(15))
|
||||||
|
f.write(data)
|
||||||
|
f.close()
|
||||||
|
f2 = wq.open_tempfile(filename)
|
||||||
|
data2 = f2.read()
|
||||||
|
f2.close()
|
||||||
|
self.failUnlessEqual(data, data2)
|
||||||
|
|
||||||
|
def testCHK(self):
|
||||||
|
wq = self.wq("testCHK")
|
||||||
|
wq.add_upload_chk("source_filename", "box1")
|
||||||
|
wq.add_retain_uri_from_box("box1")
|
||||||
|
wq.add_addpath("box1", ["home", "warner", "foo.txt"])
|
||||||
|
wq.add_delete_box("box1")
|
||||||
|
wq.add_unlink_uri("olduri")
|
||||||
|
|
||||||
|
self.failUnlessEqual(wq.count_pending_steps(), 5)
|
||||||
|
stepname, steptype, lines = wq.get_next_step()
|
||||||
|
self.failUnlessEqual(steptype, "upload_chk")
|
||||||
|
steps = wq.get_all_steps()
|
||||||
|
self.failUnlessEqual(steps[0], ("upload_chk",
|
||||||
|
["source_filename", "box1"]))
|
||||||
|
self.failUnlessEqual(steps[1], ("retain_uri_from_box",
|
||||||
|
["box1"]))
|
||||||
|
self.failUnlessEqual(steps[2], ("addpath",
|
||||||
|
["box1", "home", "warner", "foo.txt"]))
|
||||||
|
self.failUnlessEqual(steps[3], ("delete_box",
|
||||||
|
["box1"]))
|
||||||
|
self.failUnlessEqual(steps[4], ("unlink_uri",
|
||||||
|
["olduri"]))
|
||||||
|
|
@ -145,7 +145,7 @@ class WorkQueue(object):
|
|||||||
# either 'first' or 'last'. These steps are to be executed in
|
# either 'first' or 'last'. These steps are to be executed in
|
||||||
# alphabetical order, with all 'step-first-NNN' steps running before
|
# alphabetical order, with all 'step-first-NNN' steps running before
|
||||||
# any 'step-last-NNN'.
|
# any 'step-last-NNN'.
|
||||||
for n in os.list(self.basedir):
|
for n in os.listdir(self.basedir):
|
||||||
if n.startswith("step-first-"):
|
if n.startswith("step-first-"):
|
||||||
sn = int(n[len("step-first-"):])
|
sn = int(n[len("step-first-"):])
|
||||||
self.seqnum = max(self.seqnum, sn)
|
self.seqnum = max(self.seqnum, sn)
|
||||||
@ -157,13 +157,13 @@ class WorkQueue(object):
|
|||||||
assert self.seqnum < 1000 # TODO: don't let this grow unboundedly
|
assert self.seqnum < 1000 # TODO: don't let this grow unboundedly
|
||||||
|
|
||||||
def create_tempfile(self, suffix=""):
|
def create_tempfile(self, suffix=""):
|
||||||
randomname = b2a(os.random(10))
|
randomname = b2a(os.urandom(10))
|
||||||
filename = randomname + suffix
|
filename = randomname + suffix
|
||||||
f = open(os.path.join(self.filesdir, filename), "wb")
|
f = open(os.path.join(self.filesdir, filename), "wb")
|
||||||
return (f, filename)
|
return (f, filename)
|
||||||
|
|
||||||
def create_boxname(self):
|
def create_boxname(self):
|
||||||
return b2a(os.random(10))
|
return b2a(os.urandom(10))
|
||||||
def write_to_box(self, boxname, data):
|
def write_to_box(self, boxname, data):
|
||||||
f = open(os.path.join(self.boxesdir, boxname), "w")
|
f = open(os.path.join(self.boxesdir, boxname), "w")
|
||||||
f.write(data)
|
f.write(data)
|
||||||
@ -192,51 +192,56 @@ class WorkQueue(object):
|
|||||||
tofile = os.path.join(self.basedir, filename)
|
tofile = os.path.join(self.basedir, filename)
|
||||||
os.rename(fromfile, tofile)
|
os.rename(fromfile, tofile)
|
||||||
|
|
||||||
|
def _create_step_first(self, lines):
|
||||||
|
self._create_step("first", lines)
|
||||||
|
def _create_step_last(self, lines):
|
||||||
|
self._create_step("last", lines)
|
||||||
|
|
||||||
# methods to add entries to the queue
|
# methods to add entries to the queue
|
||||||
def add_upload_chk(self, source_filename, stash_uri_in_boxname):
|
def add_upload_chk(self, source_filename, stash_uri_in_boxname):
|
||||||
# source_filename is absolute, and can point to things outside our
|
# source_filename is absolute, and can point to things outside our
|
||||||
# workqueue.
|
# workqueue.
|
||||||
lines = ["upload_chk", source_filename, stash_uri_in_boxname]
|
lines = ["upload_chk", source_filename, stash_uri_in_boxname]
|
||||||
self._create_step(lines)
|
self._create_step_first(lines)
|
||||||
|
|
||||||
def add_upload_ssk(self, source_filename, write_capability,
|
def add_upload_ssk(self, source_filename, write_capability,
|
||||||
previous_version):
|
previous_version):
|
||||||
lines = ["upload_ssk", source_filename,
|
lines = ["upload_ssk", source_filename,
|
||||||
b2a(write_capability.index), b2a(write_capability.key),
|
b2a(write_capability.index), b2a(write_capability.key),
|
||||||
str(previous_version)]
|
str(previous_version)]
|
||||||
self._create_step(lines)
|
self._create_step_first(lines)
|
||||||
|
|
||||||
def add_retain_ssk(self, read_capability):
|
def add_retain_ssk(self, read_capability):
|
||||||
lines = ["retain_ssk", b2a(read_capability.index),
|
lines = ["retain_ssk", b2a(read_capability.index),
|
||||||
b2a(read_capability.key)]
|
b2a(read_capability.key)]
|
||||||
self._create_step(lines)
|
self._create_step_first(lines)
|
||||||
|
|
||||||
def add_unlink_ssk(self, write_capability):
|
def add_unlink_ssk(self, write_capability):
|
||||||
lines = ["unlink_ssk", b2a(write_capability.index),
|
lines = ["unlink_ssk", b2a(write_capability.index),
|
||||||
b2a(write_capability.key)]
|
b2a(write_capability.key)]
|
||||||
self._create_step(lines)
|
self._create_step_last(lines)
|
||||||
|
|
||||||
def add_retain_uri_from_box(self, boxname):
|
def add_retain_uri_from_box(self, boxname):
|
||||||
lines = ["retain_uri_from_box", boxname]
|
lines = ["retain_uri_from_box", boxname]
|
||||||
self._create_step(lines)
|
self._create_step_first(lines)
|
||||||
|
|
||||||
def add_addpath(self, boxname, path):
|
def add_addpath(self, boxname, path):
|
||||||
assert isinstance(path, (list, tuple))
|
assert isinstance(path, (list, tuple))
|
||||||
lines = ["addpath", boxname]
|
lines = ["addpath", boxname]
|
||||||
lines.extend(path)
|
lines.extend(path)
|
||||||
self._create_step(lines)
|
self._create_step_first(lines)
|
||||||
|
|
||||||
def add_unlink_uri(self, uri):
|
def add_unlink_uri(self, uri):
|
||||||
lines = ["unlink_uri", uri]
|
lines = ["unlink_uri", uri]
|
||||||
self._create_step(lines)
|
self._create_step_last(lines)
|
||||||
|
|
||||||
def delete_tempfile(self, filename):
|
def add_delete_tempfile(self, filename):
|
||||||
lines = ["delete_tempfile", filename]
|
lines = ["delete_tempfile", filename]
|
||||||
self._create_step(lines)
|
self._create_step_first(lines)
|
||||||
|
|
||||||
def delete_box(self, boxname):
|
def add_delete_box(self, boxname):
|
||||||
lines = ["delete_box", boxname]
|
lines = ["delete_box", boxname]
|
||||||
self._create_step(lines)
|
self._create_step_first(lines)
|
||||||
|
|
||||||
|
|
||||||
# methods to perform work
|
# methods to perform work
|
||||||
@ -245,44 +250,66 @@ class WorkQueue(object):
|
|||||||
"""Run the next pending step.
|
"""Run the next pending step.
|
||||||
|
|
||||||
Returns None if there is no next step to run, or a Deferred that
|
Returns None if there is no next step to run, or a Deferred that
|
||||||
will fire when the step completes."""
|
will fire when the step completes. The step will be removed
|
||||||
|
from the queue when it completes."""
|
||||||
next_step = self.get_next_step()
|
next_step = self.get_next_step()
|
||||||
if next_step:
|
if next_step:
|
||||||
stepname, steptype, lines = self.get_next_step()
|
stepname, steptype, lines = self.get_next_step()
|
||||||
return self.dispatch_step(stepname, steptype, lines)
|
d = self.dispatch_step(steptype, lines)
|
||||||
|
d.addCallback(self._delete_step, stepname)
|
||||||
|
return d
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def get_next_step(self):
|
def get_next_step(self):
|
||||||
stepnames = [n for n in os.list(self.basedir)
|
stepnames = [n for n in os.listdir(self.basedir)
|
||||||
if n.startswith("step-")]
|
if n.startswith("step-")]
|
||||||
stepnames.sort()
|
stepnames.sort()
|
||||||
if not stepnames:
|
if not stepnames:
|
||||||
return None
|
return None
|
||||||
stepname = stepnames[0]
|
stepname = stepnames[0]
|
||||||
|
return self._get_step(stepname)
|
||||||
|
|
||||||
|
def _get_step(self, stepname):
|
||||||
f = open(os.path.join(self.basedir, stepname), "r")
|
f = open(os.path.join(self.basedir, stepname), "r")
|
||||||
lines = f.read().split("\n")
|
lines = f.read().split("\n")
|
||||||
f.close()
|
f.close()
|
||||||
assert lines[-1] == ""
|
assert lines[-1] == "" # files should end with a newline
|
||||||
lines.pop(-1)
|
lines.pop(-1) # remove the newline
|
||||||
steptype = lines[0]
|
steptype = lines.pop(0)
|
||||||
return stepname, steptype, lines
|
return stepname, steptype, lines
|
||||||
|
|
||||||
def dispatch_step(self, stepname, steptype, lines):
|
def dispatch_step(self, steptype, lines):
|
||||||
handlername = "step_" + steptype
|
handlername = "step_" + steptype
|
||||||
if not hasattr(self, handlername):
|
if not hasattr(self, handlername):
|
||||||
raise RuntimeError("unknown workqueue step type '%s'" % steptype)
|
raise RuntimeError("unknown workqueue step type '%s'" % steptype)
|
||||||
handler = getattr(self, handlername)
|
handler = getattr(self, handlername)
|
||||||
d = defer.maybeDeferred(handler, *lines[1:])
|
d = defer.maybeDeferred(handler, *lines[1:])
|
||||||
d.addCallback(self._done, stepname)
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _done(self, res, stepname):
|
def _delete_step(self, res, stepname):
|
||||||
os.unlink(os.path.join(self.basedir, stepname))
|
os.unlink(os.path.join(self.basedir, stepname))
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
# debug/test methods
|
||||||
def count_pending_steps(self):
|
def count_pending_steps(self):
|
||||||
return len([n for n in os.list(self.basedir)
|
return len([n for n in os.listdir(self.basedir)
|
||||||
if n.startswith("step-")])
|
if n.startswith("step-")])
|
||||||
|
def get_all_steps(self):
|
||||||
|
# returns a list of (steptype, lines) for all steps
|
||||||
|
steps = []
|
||||||
|
for stepname in os.listdir(self.basedir):
|
||||||
|
if stepname.startswith("step-"):
|
||||||
|
steps.append(self._get_step(stepname)[1:])
|
||||||
|
return steps
|
||||||
|
|
||||||
|
|
||||||
|
def open_tempfile(self, filename):
|
||||||
|
f = open(os.path.join(self.filesdir, filename), "rb")
|
||||||
|
return f
|
||||||
|
|
||||||
|
# work is dispatched to these methods. To add a new step type, add a
|
||||||
|
# dispatch method here and an add_ method above.
|
||||||
|
|
||||||
|
|
||||||
def step_upload_chk(self, source_filename, index_a, write_key_a):
|
def step_upload_chk(self, source_filename, index_a, write_key_a):
|
||||||
pass
|
pass
|
||||||
|
Loading…
x
Reference in New Issue
Block a user