#590: add webish t=stream-manifest

This commit is contained in:
Brian Warner 2009-01-22 22:01:36 -07:00
parent 72adeccf2d
commit 26260374e9
5 changed files with 161 additions and 0 deletions

View File

@ -257,6 +257,13 @@ If a retain-for= argument is not used, the default handle lifetimes are:
* collected handles (i.e. the GET page has been retrieved at least once
since the operation completed) will remain valid for ten minutes.
Many "slow" operations can begin to use unacceptable amounts of memory when
operation on large directory structures. The memory usage increases when the
ophandle is polled, as the results must be copied into a JSON string, sent
over the wire, then parsed by a client. So, as an alternative, many "slow"
operations have streaming equivalents. These equivalents do not use operation
handles. Instead, they emit line-oriented status results immediately. Client
code can cancel the operation by simply closing the HTTP connection.
== Programmatic Operations ==
@ -1043,6 +1050,37 @@ POST $DIRURL?t=start-deep-stats (must add &ophandle=XYZ)
share management data (leases)
backend (ext3) minimum block size
POST $URL?t=stream-manifest
This operation performs a recursive walk of all files and directories
reachable from the given starting point. For each such unique object
(duplicates are skipped), a single line of JSON is emitted to the HTTP
response channel. When the walk is complete, a final line of JSON is emitted
which contains the accumulated file-size/count "deep-stats" data.
A CLI tool can split the response stream on newlines into "response units",
and parse each response unit as JSON. Each such parsed unit will be a
dictionary, and will contain at least the "type" key: a string, one of
"file", "directory", or "stats".
For all units that have a type of "file" or "directory", the dictionary will
contain the following keys:
"path": a list of strings, with the path that is traversed to reach the
object
"cap": a writecap for the file or directory, if available, else a readcap
"verifycap": a verifycap for the file or directory
"repaircap": the weakest cap which can still be used to repair the object
"storage-index": a base32 storage index for the object
Note that non-distributed files (i.e. LIT files) will have values of None
for verifycap, repaircap, and storage-index, since these files can neither
be verified nor repaired, and are not stored on the storage servers.
The last unit in the stream will have a type of "stats", and will contain
the keys described in the "start-deep-stats" operation, below.
== Other Useful Pages ==
The portion of the web namespace that begins with "/uri" (and "/named") is

View File

@ -48,6 +48,8 @@ class FakeCHKFileNode:
return self.my_uri.to_string()
def get_verify_cap(self):
return self.my_uri.get_verify_cap()
def get_repair_cap(self):
return self.my_uri.get_verify_cap()
def get_storage_index(self):
return self.storage_index

View File

@ -1937,6 +1937,13 @@ class DeepCheckBase(SystemTestMixin, ErrorMixin):
self.fail("%s: not JSON: '%s'" % (url, s))
return data
def parse_streamed_json(self, s):
for unit in s.split("\n"):
if not unit:
# stream should end with a newline, so split returns ""
continue
yield simplejson.loads(unit)
def web(self, n, method="GET", **kwargs):
# returns (data, url)
url = (self.webish_url + "uri/%s" % urllib.quote(n.get_uri())
@ -2100,6 +2107,7 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
d = self.set_up_nodes()
d.addCallback(self.set_up_tree)
d.addCallback(self.do_stats)
d.addCallback(self.do_web_stream_manifest)
d.addCallback(self.do_test_check_good)
d.addCallback(self.do_test_web_good)
d.addCallback(self.do_test_cli_good)
@ -2136,6 +2144,45 @@ class DeepCheckWebGood(DeepCheckBase, unittest.TestCase):
self.failUnlessEqual(s["size-immutable-files"], 13000)
self.failUnlessEqual(s["size-literal-files"], 48)
def do_web_stream_manifest(self, ignored):
d = self.web(self.root, method="POST", t="stream-manifest")
def _check((res,url)):
units = list(self.parse_streamed_json(res))
files = [u for u in units if u["type"] in ("file", "directory")]
assert units[-1]["type"] == "stats"
stats = units[-1]["stats"]
self.failUnlessEqual(len(files), 5)
# [root,mutable,large] are distributed, [small,small2] are not
self.failUnlessEqual(len([f for f in files
if f["verifycap"] is not None]), 3)
self.failUnlessEqual(len([f for f in files
if f["verifycap"] is None]), 2)
self.failUnlessEqual(len([f for f in files
if f["repaircap"] is not None]), 3)
self.failUnlessEqual(len([f for f in files
if f["repaircap"] is None]), 2)
self.failUnlessEqual(len([f for f in files
if f["storage-index"] is not None]), 3)
self.failUnlessEqual(len([f for f in files
if f["storage-index"] is None]), 2)
# make sure that a mutable file has filecap==repaircap!=verifycap
mutable = [f for f in files
if f["cap"] is not None
and f["cap"].startswith("URI:SSK:")][0]
self.failUnlessEqual(mutable["cap"], self.mutable_uri)
self.failIfEqual(mutable["cap"], mutable["verifycap"])
self.failUnlessEqual(mutable["cap"], mutable["repaircap"])
# for immutable file, verifycap==repaircap!=filecap
large = [f for f in files
if f["cap"] is not None
and f["cap"].startswith("URI:CHK:")][0]
self.failUnlessEqual(large["cap"], self.large_uri)
self.failIfEqual(large["cap"], large["verifycap"])
self.failUnlessEqual(large["verifycap"], large["repaircap"])
self.check_stats_good(stats)
d.addCallback(_check)
return d
def do_test_check_good(self, ignored):
d = defer.succeed(None)
# check the individual items

View File

@ -168,6 +168,7 @@ class WebMixin(object):
foo.set_uri(unicode_filename, self._bar_txt_uri)
_ign, n, baz_file = self.makefile(2)
self._baz_file_uri = baz_file
sub.set_uri(u"baz.txt", baz_file)
_ign, n, self._bad_file_uri = self.makefile(3)
@ -1050,6 +1051,26 @@ class Web(WebMixin, testutil.StallMixin, unittest.TestCase):
d.addCallback(_got_json)
return d
def test_POST_DIRURL_stream_manifest(self):
d = self.POST(self.public_url + "/foo/?t=stream-manifest")
def _check(res):
self.failUnless(res.endswith("\n"))
units = [simplejson.loads(t) for t in res[:-1].split("\n")]
self.failUnlessEqual(len(units), 7)
self.failUnlessEqual(units[-1]["type"], "stats")
first = units[0]
self.failUnlessEqual(first["path"], [])
self.failUnlessEqual(first["cap"], self._foo_uri)
self.failUnlessEqual(first["type"], "directory")
baz = [u for u in units[:-1] if u["cap"] == self._baz_file_uri][0]
self.failUnlessEqual(baz["path"], ["sub", "baz.txt"])
self.failIfEqual(baz["storage-index"], None)
self.failIfEqual(baz["verifycap"], None)
self.failIfEqual(baz["repaircap"], None)
return
d.addCallback(_check)
return d
def test_GET_DIRURL_uri(self):
d = self.GET(self.public_url + "/foo?t=uri")
def _check(res):

View File

@ -16,6 +16,7 @@ from allmydata.uri import from_string_dirnode
from allmydata.interfaces import IDirectoryNode, IFileNode, IMutableFileNode, \
ExistingChildError, NoSuchChildError
from allmydata.monitor import Monitor
from allmydata import dirnode
from allmydata.web.common import text_plain, WebError, \
IClient, IOpHandleTable, NeedOperationHandleError, \
boolean_of_arg, get_arg, get_root, \
@ -192,6 +193,8 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
d = self._POST_start_deep_size(ctx)
elif t == "start-deep-stats":
d = self._POST_start_deep_stats(ctx)
elif t == "stream-manifest":
d = self._POST_stream_manifest(ctx)
elif t == "set_children":
# TODO: docs
d = self._POST_set_children(req)
@ -394,6 +397,11 @@ class DirectoryNodeHandler(RenderMixin, rend.Page, ReplaceMeMixin):
renderer = DeepStatsResults(monitor)
return self._start_operation(monitor, renderer, ctx)
def _POST_stream_manifest(self, ctx):
walker = ManifestStreamer(ctx, self.node)
monitor = self.node.deep_traverse(walker)
return monitor.when_done()
def _POST_set_children(self, req):
replace = boolean_of_arg(get_arg(req, "replace", "true"))
req.content.seek(0)
@ -812,3 +820,48 @@ class DeepStatsResults(rend.Page):
s = self.monitor.get_status().copy()
s["finished"] = self.monitor.is_finished()
return simplejson.dumps(s, indent=1)
class ManifestStreamer(dirnode.DeepStats):
def __init__(self, ctx, origin):
dirnode.DeepStats.__init__(self, origin)
self.req = IRequest(ctx)
def add_node(self, node, path):
dirnode.DeepStats.add_node(self, node, path)
d = {"path": path,
"cap": node.get_uri()}
if IDirectoryNode.providedBy(node):
d["type"] = "directory"
else:
d["type"] = "file"
v = node.get_verify_cap()
if v:
v = v.to_string()
d["verifycap"] = v
r = node.get_repair_cap()
if r:
r = r.to_string()
d["repaircap"] = r
si = node.get_storage_index()
if si:
si = base32.b2a(si)
d["storage-index"] = si
j = simplejson.dumps(d, ensure_ascii=True)
assert "\n" not in j
self.req.write(j+"\n")
def finish(self):
stats = dirnode.DeepStats.get_results(self)
d = {"type": "stats",
"stats": stats,
}
j = simplejson.dumps(d, ensure_ascii=True)
assert "\n" not in j
self.req.write(j+"\n")
return ""