From 86abe56d913f18a206b690d0b50925a6069556c8 Mon Sep 17 00:00:00 2001 From: meejah Date: Thu, 12 Nov 2015 16:16:28 -0700 Subject: [PATCH] Flesh out "tahoe magic-folder status" command Adds: - a JSON endpoint - CLI to display information - QueuedItem + IQueuedItem for uploader/downloader - IProgress interface + PercentProgress implementation - progress= args to many upload/download APIs --- src/allmydata/client.py | 1 + src/allmydata/frontends/magic_folder.py | 157 +++++++++++++++++---- src/allmydata/immutable/filenode.py | 2 +- src/allmydata/magicfolderdb.py | 1 + src/allmydata/scripts/magic_folder_cli.py | 160 +++++++++++++++++++++- src/allmydata/test/test_magic_folder.py | 1 + src/allmydata/web/magic_folder.py | 60 ++++++++ src/allmydata/web/root.py | 5 +- 8 files changed, 354 insertions(+), 33 deletions(-) create mode 100644 src/allmydata/web/magic_folder.py diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 04e94a01d..c7a5d1464 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -493,6 +493,7 @@ class Client(node.Node, pollmixin.PollMixin): from allmydata.frontends import magic_folder umask = self.get_config("magic_folder", "download.umask", 0077) s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile, umask) + self._magic_folder = s s.setServiceParent(self) s.startService() diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index afe26081d..8a2b07e1b 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -15,6 +15,7 @@ from allmydata.util import log from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError from allmydata.util.assertutil import precondition, _assert from allmydata.util.deferredutil import HookMixin +from allmydata.util.progress import PercentProgress from allmydata.util.encodingutil import listdir_filepath, to_filepath, \ extend_filepath, unicode_from_filepath, unicode_segments_from, \ quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError @@ -126,10 +127,21 @@ class QueueMixin(HookMixin): % quote_local_unicode_path(self._local_path_u)) self._deque = deque() + # do we also want to bound on "maximum age"? + self._process_history = deque(maxlen=10) self._lazy_tail = defer.succeed(None) self._stopped = False self._turn_delay = 0 + def get_status(self): + """ + Returns an iterable of instances that implement IQueuedItem + """ + for item in self._deque: + yield item + for item in self._process_history: + yield item + def _get_filepath(self, relpath_u): self._log("_get_filepath(%r)" % (relpath_u,)) return extend_filepath(self._local_filepath, relpath_u.split(u"/")) @@ -162,8 +174,10 @@ class QueueMixin(HookMixin): self._log("stopped") return try: - item = self._deque.pop() - self._log("popped %r" % (item,)) + item = IQueuedItem(self._deque.pop()) + self._process_history.append(item) + + self._log("popped %r, now have %d" % (item, len(self._deque))) self._count('objects_queued', -1) except IndexError: self._log("deque is now empty") @@ -177,6 +191,7 @@ class QueueMixin(HookMixin): self._lazy_tail.addBoth(self._logcb, "got past _process") self._lazy_tail.addBoth(self._call_hook, 'processed', async=True) self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,)) + self._lazy_tail.addErrback(log.err) self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque)) self._lazy_tail.addBoth(self._logcb, "got past deferLater") except Exception as e: @@ -184,6 +199,44 @@ class QueueMixin(HookMixin): raise +from zope.interface import Interface, implementer + +class IQueuedItem(Interface): + pass + + +@implementer(IQueuedItem) +class QueuedItem(object): + def __init__(self, relpath_u, progress): + self.relpath_u = relpath_u + self.progress = progress + self._status_history = dict() + + def set_status(self, status, current_time=None): + if current_time is None: + current_time = time.time() + self._status_history[status] = current_time + + def status_time(self, state): + """ + Returns None if there's no status-update for 'state', else returns + the timestamp when that state was reached. + """ + return self._status_history.get(state, None) + + def status_history(self): + """ + Returns a list of 2-tuples of (state, timestamp) sorted by timestamp + """ + hist = self._status_history.items() + hist.sort(lambda a, b: cmp(a[1], b[1])) + return hist + + +class UploadItem(QueuedItem): + pass + + class Uploader(QueueMixin): def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate=False): @@ -256,7 +309,12 @@ class Uploader(QueueMixin): def _extend_queue_and_keep_going(self, relpaths_u): self._log("_extend_queue_and_keep_going %r" % (relpaths_u,)) - self._deque.extend(relpaths_u) + for relpath_u in relpaths_u: + progress = PercentProgress() + item = UploadItem(relpath_u, progress) + item.set_status('queued', self._clock.seconds()) + self._deque.append(item) + self._count('objects_queued', len(relpaths_u)) if self.is_ready: @@ -273,7 +331,7 @@ class Uploader(QueueMixin): self._extend_queue_and_keep_going(self._pending) def _add_pending(self, relpath_u): - self._log("add pending %r" % (relpath_u,)) + self._log("add pending %r" % (relpath_u,)) if not magicpath.should_ignore_file(relpath_u): self._pending.add(relpath_u) @@ -327,10 +385,14 @@ class Uploader(QueueMixin): def _when_queue_is_empty(self): return defer.succeed(None) - def _process(self, relpath_u): + def _process(self, item): # Uploader + relpath_u = item.relpath_u self._log("_process(%r)" % (relpath_u,)) + item.set_status('started', self._clock.seconds()) + if relpath_u is None: + item.set_status('invalid_path', self._clock.seconds()) return precondition(isinstance(relpath_u, unicode), relpath_u) precondition(not relpath_u.endswith(u'/'), relpath_u) @@ -374,8 +436,12 @@ class Uploader(QueueMixin): metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri empty_uploadable = Data("", self._client.convergence) - d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable, - metadata=metadata, overwrite=True) + d2 = self._upload_dirnode.add_file( + encoded_path_u, empty_uploadable, + metadata=metadata, + overwrite=True, + progress=item.progress, + ) def _add_db_entry(filenode): filecap = filenode.get_uri() @@ -397,7 +463,12 @@ class Uploader(QueueMixin): uploadable = Data("", self._client.convergence) encoded_path_u += magicpath.path2magic(u"/") self._log("encoded_path_u = %r" % (encoded_path_u,)) - upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True) + upload_d = self._upload_dirnode.add_file( + encoded_path_u, uploadable, + metadata={"version": 0}, + overwrite=True, + progress=item.progress, + ) def _dir_succeeded(ign): self._log("created subdirectory %r" % (relpath_u,)) self._count('directories_created') @@ -428,8 +499,12 @@ class Uploader(QueueMixin): metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri uploadable = FileName(unicode_from_filepath(fp), self._client.convergence) - d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable, - metadata=metadata, overwrite=True) + d2 = self._upload_dirnode.add_file( + encoded_path_u, uploadable, + metadata=metadata, + overwrite=True, + progress=item.progress, + ) def _add_db_entry(filenode): filecap = filenode.get_uri() @@ -448,10 +523,12 @@ class Uploader(QueueMixin): def _succeeded(res): self._count('objects_succeeded') + item.set_status('success', self._clock.seconds()) return res def _failed(f): self._count('objects_failed') self._log("%s while processing %r" % (f, relpath_u)) + item.set_status('failure', self._clock.seconds()) return f d.addCallbacks(_succeeded, _failed) return d @@ -540,6 +617,13 @@ class WriteFileMixin(object): return abspath_u +class DownloadItem(QueuedItem): + def __init__(self, relpath_u, progress, filenode, metadata): + super(DownloadItem, self).__init__(relpath_u, progress) + self.file_node = filenode + self.metadata = metadata + + class Downloader(QueueMixin, WriteFileMixin): REMOTE_SCAN_INTERVAL = 3 # facilitates tests @@ -561,6 +645,7 @@ class Downloader(QueueMixin, WriteFileMixin): def start_downloading(self): self._log("start_downloading") + self._turn_delay = self.REMOTE_SCAN_INTERVAL files = self._db.get_all_relpaths() self._log("all files %s" % files) @@ -685,7 +770,14 @@ class Downloader(QueueMixin, WriteFileMixin): file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version']) if self._should_download(relpath_u, metadata['version']): - self._deque.append( (relpath_u, file_node, metadata) ) + to_dl = DownloadItem( + relpath_u, + PercentProgress(file_node.get_size()), + file_node, + metadata, + ) + to_dl.set_status('queued', self._clock.seconds()) + self._deque.append(to_dl) else: self._log("Excluding %r" % (relpath_u,)) self._call_hook(None, 'processed', async=True) @@ -703,42 +795,49 @@ class Downloader(QueueMixin, WriteFileMixin): def _process(self, item, now=None): # Downloader self._log("_process(%r)" % (item,)) - if now is None: - now = time.time() - (relpath_u, file_node, metadata) = item - fp = self._get_filepath(relpath_u) + if now is None: # XXX why can we pass in now? + now = time.time() # self._clock.seconds() + + self._log("started! %s" % (now,)) + item.set_status('started', now) + fp = self._get_filepath(item.relpath_u) abspath_u = unicode_from_filepath(fp) conflict_path_u = self._get_conflicted_filename(abspath_u) d = defer.succeed(None) def do_update_db(written_abspath_u): - filecap = file_node.get_uri() - last_uploaded_uri = metadata.get('last_uploaded_uri', None) + filecap = item.file_node.get_uri() + last_uploaded_uri = item.metadata.get('last_uploaded_uri', None) last_downloaded_uri = filecap last_downloaded_timestamp = now written_pathinfo = get_pathinfo(written_abspath_u) - if not written_pathinfo.exists and not metadata.get('deleted', False): + if not written_pathinfo.exists and not item.metadata.get('deleted', False): raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u)) - self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri, - last_downloaded_uri, last_downloaded_timestamp, written_pathinfo) + self._db.did_upload_version( + item.relpath_u, item.metadata['version'], last_uploaded_uri, + last_downloaded_uri, last_downloaded_timestamp, written_pathinfo, + ) self._count('objects_downloaded') + item.set_status('success', self._clock.seconds()) + def failed(f): + item.set_status('failure', self._clock.seconds()) self._log("download failed: %s" % (str(f),)) self._count('objects_failed') return f if os.path.isfile(conflict_path_u): def fail(res): - raise ConflictError("download failed: already conflicted: %r" % (relpath_u,)) + raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,)) d.addCallback(fail) else: is_conflict = False - db_entry = self._db.get_db_entry(relpath_u) - dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None) - dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None) + db_entry = self._db.get_db_entry(item.relpath_u) + dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None) + dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None) if db_entry: if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None: if dmd_last_downloaded_uri != db_entry.last_downloaded_uri: @@ -747,22 +846,22 @@ class Downloader(QueueMixin, WriteFileMixin): elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri: is_conflict = True self._count('objects_conflicted') - elif self._is_upload_pending(relpath_u): + elif self._is_upload_pending(item.relpath_u): is_conflict = True self._count('objects_conflicted') - if relpath_u.endswith(u"/"): - if metadata.get('deleted', False): + if item.relpath_u.endswith(u"/"): + if item.metadata.get('deleted', False): self._log("rmdir(%r) ignored" % (abspath_u,)) else: self._log("mkdir(%r)" % (abspath_u,)) d.addCallback(lambda ign: fileutil.make_dirs(abspath_u)) d.addCallback(lambda ign: abspath_u) else: - if metadata.get('deleted', False): + if item.metadata.get('deleted', False): d.addCallback(lambda ign: self._rename_deleted_file(abspath_u)) else: - d.addCallback(lambda ign: file_node.download_best_version()) + d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress)) d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=is_conflict)) diff --git a/src/allmydata/immutable/filenode.py b/src/allmydata/immutable/filenode.py index 8274da8ef..779977670 100644 --- a/src/allmydata/immutable/filenode.py +++ b/src/allmydata/immutable/filenode.py @@ -8,7 +8,7 @@ from twisted.internet import defer from allmydata import uri from twisted.internet.interfaces import IConsumer from allmydata.interfaces import IImmutableFileNode, IUploadResults -from allmydata.util import consumer +from allmydata.util import consumer, progress from allmydata.check_results import CheckResults, CheckAndRepairResults from allmydata.util.dictutil import DictOfSets from allmydata.util.happinessutil import servers_of_happiness diff --git a/src/allmydata/magicfolderdb.py b/src/allmydata/magicfolderdb.py index 982b5fe29..661c774d2 100644 --- a/src/allmydata/magicfolderdb.py +++ b/src/allmydata/magicfolderdb.py @@ -65,6 +65,7 @@ class MagicFolderDB(object): (relpath_u,)) row = self.cursor.fetchone() if not row: + print "found nothing for", relpath_u return None else: (size, mtime, ctime, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp) = row diff --git a/src/allmydata/scripts/magic_folder_cli.py b/src/allmydata/scripts/magic_folder_cli.py index 5d5c61dc5..4c9a469be 100644 --- a/src/allmydata/scripts/magic_folder_cli.py +++ b/src/allmydata/scripts/magic_folder_cli.py @@ -1,7 +1,13 @@ import os +import urllib +from sys import stderr from types import NoneType from cStringIO import StringIO +from datetime import datetime + +import humanize +import simplejson from twisted.python import usage @@ -12,10 +18,13 @@ from .cli import MakeDirectoryOptions, LnOptions, CreateAliasOptions import tahoe_mv from allmydata.util.encodingutil import argv_to_abspath, argv_to_unicode, to_str, \ quote_local_unicode_path +from allmydata.scripts.common_http import do_http, format_http_success, \ + format_http_error, BadResponse from allmydata.util import fileutil from allmydata.util import configutil from allmydata import uri + INVITE_SEPARATOR = "+" class CreateOptions(BasedirOptions): @@ -200,21 +209,167 @@ class StatusOptions(BasedirOptions): nickname = None synopsis = "" stdin = StringIO("") + def parseArgs(self): BasedirOptions.parseArgs(self) node_url_file = os.path.join(self['node-directory'], u"node.url") - self['node-url'] = open(node_url_file, "r").read().strip() + with open(node_url_file, "r") as f: + self['node-url'] = f.read().strip() + + +def _get_json_for_fragment(options, fragment): + nodeurl = options['node-url'] + if nodeurl.endswith('/'): + nodeurl = nodeurl[:-1] + + url = u'%s/%s' % (nodeurl, fragment) + resp = do_http(method, url) + if isinstance(resp, BadResponse): + # specifically NOT using format_http_error() here because the + # URL is pretty sensitive (we're doing /uri/). + raise RuntimeError( + "Failed to get json from '%s': %s" % (nodeurl, resp.error) + ) + + data = resp.read() + parsed = simplejson.loads(data) + if not parsed: + raise RuntimeError("No data from '%s'" % (nodeurl,)) + return parsed + + +def _get_json_for_cap(options, cap): + return _get_json_for_fragment( + options, + 'uri/%s?t=json' % urllib.quote(cap), + ) + +def _print_item_status(item, now, longest): + paddedname = (' ' * (longest - len(item['path']))) + item['path'] + if 'failure_at' in item: + ts = datetime.fromtimestamp(item['started_at']) + prog = 'Failed %s (%s)' % (humanize.naturaltime(now - ts), ts) + elif item['percent_done'] < 100.0: + if 'started_at' not in item: + prog = 'not yet started' + else: + so_far = now - datetime.fromtimestamp(item['started_at']) + if so_far.seconds > 0.0: + rate = item['percent_done'] / so_far.seconds + if rate != 0: + time_left = (100.0 - item['percent_done']) / rate + prog = '%2.1f%% done, around %s left' % ( + item['percent_done'], + humanize.naturaldelta(time_left), + ) + else: + time_left = None + prog = '%2.1f%% done' % (item['percent_done'],) + else: + prog = 'just started' + else: + prog = '' + for verb in ['finished', 'started', 'queued']: + keyname = verb + '_at' + if keyname in item: + when = datetime.fromtimestamp(item[keyname]) + prog = '%s %s' % (verb, humanize.naturaltime(now - when)) + break + + print " %s: %s" % (paddedname, prog) def status(options): - # XXX todo: use http interface to ask about our magic-folder upload status + nodedir = options["node-directory"] + with open(os.path.join(nodedir, u"private", u"magic_folder_dircap")) as f: + dmd_cap = f.read().strip() + with open(os.path.join(nodedir, u"private", u"collective_dircap")) as f: + collective_readcap = f.read().strip() + + try: + captype, dmd = _get_json_for_cap(options, dmd_cap) + if captype != 'dirnode': + print >>stderr, "magic_folder_dircap isn't a directory capability" + return 2 + except RuntimeError as e: + print >>stderr, str(e) + return 1 + + now = datetime.now() + + print "Local files:" + for (name, child) in dmd['children'].items(): + captype, meta = child + status = 'good' + size = meta['size'] + created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime']) + version = meta['metadata']['version'] + nice_size = humanize.naturalsize(size) + nice_created = humanize.naturaltime(now - created) + if captype != 'filenode': + print "%20s: error, should be a filecap" % name + continue + print " %s (%s): %s, version=%s, created %s" % (name, nice_size, status, version, nice_created) + + captype, collective = _get_json_for_cap(options, collective_readcap) + print + print "Remote files:" + for (name, data) in collective['children'].items(): + if data[0] != 'dirnode': + print "Error: '%s': expected a dirnode, not '%s'" % (name, data[0]) + print " %s's remote:" % name + dmd = _get_json_for_cap(options, data[1]['ro_uri']) + if dmd[0] != 'dirnode': + print "Error: should be a dirnode" + continue + for (n, d) in dmd[1]['children'].items(): + if d[0] != 'filenode': + print "Error: expected '%s' to be a filenode." % (n,) + + meta = d[1] + status = 'good' + size = meta['size'] + created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime']) + version = meta['metadata']['version'] + nice_size = humanize.naturalsize(size) + nice_created = humanize.naturaltime(now - created) + print " %s (%s): %s, version=%s, created %s" % (n, nice_size, status, version, nice_created) + + magicdata = _get_json_for_fragment(options, 'magic_folder?t=json') + if len(magicdata): + uploads = [item for item in magicdata if item['kind'] == 'upload'] + downloads = [item for item in magicdata if item['kind'] == 'download'] + longest = max([len(item['path']) for item in magicdata]) + + if True: # maybe --show-completed option or something? + uploads = [item for item in uploads if item['status'] != 'success'] + downloads = [item for item in downloads if item['status'] != 'success'] + + if len(uploads): + print + print "Uploads:" + for item in uploads: + _print_item_status(item, now, longest) + + if len(downloads): + print + print "Downloads:" + for item in downloads: + _print_item_status(item, now, longest) + + for item in magicdata: + if item['status'] == 'failure': + print "Failed:", item + return 0 + class MagicFolderCommand(BaseOptions): subCommands = [ ["create", None, CreateOptions, "Create a Magic Folder."], ["invite", None, InviteOptions, "Invite someone to a Magic Folder."], ["join", None, JoinOptions, "Join a Magic Folder."], ["leave", None, LeaveOptions, "Leave a Magic Folder."], + ["status", None, StatusOptions, "Display stutus of uploads/downloads."], ] def postOptions(self): if not hasattr(self, 'subOptions'): @@ -234,6 +389,7 @@ subDispatch = { "invite": invite, "join": join, "leave": leave, + "status": status, } def do_magic_folder(options): diff --git a/src/allmydata/test/test_magic_folder.py b/src/allmydata/test/test_magic_folder.py index e862fcede..d974e5deb 100644 --- a/src/allmydata/test/test_magic_folder.py +++ b/src/allmydata/test/test_magic_folder.py @@ -1,5 +1,6 @@ import os, sys +import shutil from twisted.trial import unittest from twisted.internet import defer, task diff --git a/src/allmydata/web/magic_folder.py b/src/allmydata/web/magic_folder.py new file mode 100644 index 000000000..5d2f3e5ed --- /dev/null +++ b/src/allmydata/web/magic_folder.py @@ -0,0 +1,60 @@ +import simplejson + +from nevow import rend, url, tags as T +from nevow.inevow import IRequest + +from allmydata.web.common import getxmlfile, get_arg, WebError + + +class MagicFolderWebApi(rend.Page): + """ + I provide the web-based API for Magic Folder status etc. + """ + + def __init__(self, client): + ##rend.Page.__init__(self, storage) + super(MagicFolderWebApi, self).__init__(client) + self.client = client + + def _render_json(self, req): + req.setHeader("content-type", "application/json") + + data = [] + for item in self.client._magic_folder.uploader.get_status(): + d = dict( + path=item.relpath_u, + status=item.status_history()[-1][0], + kind='upload', + ) + for (status, ts) in item.status_history(): + d[status + '_at'] = ts + d['percent_done'] = item.progress.progress + data.append(d) + + for item in self.client._magic_folder.downloader.get_status(): + d = dict( + path=item.relpath_u, + status=item.status_history()[-1][0], + kind='download', + ) + for (status, ts) in item.status_history(): + d[status + '_at'] = ts + d['percent_done'] = item.progress.progress + data.append(d) + + return simplejson.dumps(data) + + def renderHTTP(self, ctx): + req = IRequest(ctx) + t = get_arg(req, "t", None) + + if t is None: + return rend.Page.renderHTTP(self, ctx) + + t = t.strip() + if t == 'json': + return self._render_json(req) + + raise WebError("'%s' invalid type for 't' arg" % (t,), 400) + + diff --git a/src/allmydata/web/root.py b/src/allmydata/web/root.py index d8d789cf3..6b2848105 100644 --- a/src/allmydata/web/root.py +++ b/src/allmydata/web/root.py @@ -12,7 +12,7 @@ from allmydata import get_package_versions_string from allmydata.util import log from allmydata.interfaces import IFileNode from allmydata.web import filenode, directory, unlinked, status, operations -from allmydata.web import storage +from allmydata.web import storage, magic_folder from allmydata.web.common import abbreviate_size, getxmlfile, WebError, \ get_arg, RenderMixin, get_format, get_mutable_type, render_time_delta, render_time, render_time_attr @@ -154,6 +154,9 @@ class Root(rend.Page): self.child_uri = URIHandler(client) self.child_cap = URIHandler(client) + # handler for "/magic_folder" URIs + self.child_magic_folder = magic_folder.MagicFolderWebApi(client) + self.child_file = FileHandler(client) self.child_named = FileHandler(client) self.child_status = status.Status(client.get_history())