Flesh out "tahoe magic-folder status" command

Adds:

 - a JSON endpoint
 - CLI to display information
 - QueuedItem + IQueuedItem for uploader/downloader
 - IProgress interface + PercentProgress implementation
 - progress= args to many upload/download APIs
This commit is contained in:
meejah 2015-11-12 16:16:28 -07:00 committed by Brian Warner
parent 3df0a82a38
commit 86abe56d91
8 changed files with 354 additions and 33 deletions

View File

@ -493,6 +493,7 @@ class Client(node.Node, pollmixin.PollMixin):
from allmydata.frontends import magic_folder
umask = self.get_config("magic_folder", "download.umask", 0077)
s = magic_folder.MagicFolder(self, upload_dircap, collective_dircap, local_dir, dbfile, umask)
self._magic_folder = s
s.setServiceParent(self)
s.startService()

View File

@ -15,6 +15,7 @@ from allmydata.util import log
from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
from allmydata.util.assertutil import precondition, _assert
from allmydata.util.deferredutil import HookMixin
from allmydata.util.progress import PercentProgress
from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
extend_filepath, unicode_from_filepath, unicode_segments_from, \
quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
@ -126,10 +127,21 @@ class QueueMixin(HookMixin):
% quote_local_unicode_path(self._local_path_u))
self._deque = deque()
# do we also want to bound on "maximum age"?
self._process_history = deque(maxlen=10)
self._lazy_tail = defer.succeed(None)
self._stopped = False
self._turn_delay = 0
def get_status(self):
"""
Returns an iterable of instances that implement IQueuedItem
"""
for item in self._deque:
yield item
for item in self._process_history:
yield item
def _get_filepath(self, relpath_u):
self._log("_get_filepath(%r)" % (relpath_u,))
return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
@ -162,8 +174,10 @@ class QueueMixin(HookMixin):
self._log("stopped")
return
try:
item = self._deque.pop()
self._log("popped %r" % (item,))
item = IQueuedItem(self._deque.pop())
self._process_history.append(item)
self._log("popped %r, now have %d" % (item, len(self._deque)))
self._count('objects_queued', -1)
except IndexError:
self._log("deque is now empty")
@ -177,6 +191,7 @@ class QueueMixin(HookMixin):
self._lazy_tail.addBoth(self._logcb, "got past _process")
self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
self._lazy_tail.addErrback(log.err)
self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
self._lazy_tail.addBoth(self._logcb, "got past deferLater")
except Exception as e:
@ -184,6 +199,44 @@ class QueueMixin(HookMixin):
raise
from zope.interface import Interface, implementer
class IQueuedItem(Interface):
pass
@implementer(IQueuedItem)
class QueuedItem(object):
def __init__(self, relpath_u, progress):
self.relpath_u = relpath_u
self.progress = progress
self._status_history = dict()
def set_status(self, status, current_time=None):
if current_time is None:
current_time = time.time()
self._status_history[status] = current_time
def status_time(self, state):
"""
Returns None if there's no status-update for 'state', else returns
the timestamp when that state was reached.
"""
return self._status_history.get(state, None)
def status_history(self):
"""
Returns a list of 2-tuples of (state, timestamp) sorted by timestamp
"""
hist = self._status_history.items()
hist.sort(lambda a, b: cmp(a[1], b[1]))
return hist
class UploadItem(QueuedItem):
pass
class Uploader(QueueMixin):
def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
immediate=False):
@ -256,7 +309,12 @@ class Uploader(QueueMixin):
def _extend_queue_and_keep_going(self, relpaths_u):
self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
self._deque.extend(relpaths_u)
for relpath_u in relpaths_u:
progress = PercentProgress()
item = UploadItem(relpath_u, progress)
item.set_status('queued', self._clock.seconds())
self._deque.append(item)
self._count('objects_queued', len(relpaths_u))
if self.is_ready:
@ -273,7 +331,7 @@ class Uploader(QueueMixin):
self._extend_queue_and_keep_going(self._pending)
def _add_pending(self, relpath_u):
self._log("add pending %r" % (relpath_u,))
self._log("add pending %r" % (relpath_u,))
if not magicpath.should_ignore_file(relpath_u):
self._pending.add(relpath_u)
@ -327,10 +385,14 @@ class Uploader(QueueMixin):
def _when_queue_is_empty(self):
return defer.succeed(None)
def _process(self, relpath_u):
def _process(self, item):
# Uploader
relpath_u = item.relpath_u
self._log("_process(%r)" % (relpath_u,))
item.set_status('started', self._clock.seconds())
if relpath_u is None:
item.set_status('invalid_path', self._clock.seconds())
return
precondition(isinstance(relpath_u, unicode), relpath_u)
precondition(not relpath_u.endswith(u'/'), relpath_u)
@ -374,8 +436,12 @@ class Uploader(QueueMixin):
metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
empty_uploadable = Data("", self._client.convergence)
d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
metadata=metadata, overwrite=True)
d2 = self._upload_dirnode.add_file(
encoded_path_u, empty_uploadable,
metadata=metadata,
overwrite=True,
progress=item.progress,
)
def _add_db_entry(filenode):
filecap = filenode.get_uri()
@ -397,7 +463,12 @@ class Uploader(QueueMixin):
uploadable = Data("", self._client.convergence)
encoded_path_u += magicpath.path2magic(u"/")
self._log("encoded_path_u = %r" % (encoded_path_u,))
upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
upload_d = self._upload_dirnode.add_file(
encoded_path_u, uploadable,
metadata={"version": 0},
overwrite=True,
progress=item.progress,
)
def _dir_succeeded(ign):
self._log("created subdirectory %r" % (relpath_u,))
self._count('directories_created')
@ -428,8 +499,12 @@ class Uploader(QueueMixin):
metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
metadata=metadata, overwrite=True)
d2 = self._upload_dirnode.add_file(
encoded_path_u, uploadable,
metadata=metadata,
overwrite=True,
progress=item.progress,
)
def _add_db_entry(filenode):
filecap = filenode.get_uri()
@ -448,10 +523,12 @@ class Uploader(QueueMixin):
def _succeeded(res):
self._count('objects_succeeded')
item.set_status('success', self._clock.seconds())
return res
def _failed(f):
self._count('objects_failed')
self._log("%s while processing %r" % (f, relpath_u))
item.set_status('failure', self._clock.seconds())
return f
d.addCallbacks(_succeeded, _failed)
return d
@ -540,6 +617,13 @@ class WriteFileMixin(object):
return abspath_u
class DownloadItem(QueuedItem):
def __init__(self, relpath_u, progress, filenode, metadata):
super(DownloadItem, self).__init__(relpath_u, progress)
self.file_node = filenode
self.metadata = metadata
class Downloader(QueueMixin, WriteFileMixin):
REMOTE_SCAN_INTERVAL = 3 # facilitates tests
@ -561,6 +645,7 @@ class Downloader(QueueMixin, WriteFileMixin):
def start_downloading(self):
self._log("start_downloading")
self._turn_delay = self.REMOTE_SCAN_INTERVAL
files = self._db.get_all_relpaths()
self._log("all files %s" % files)
@ -685,7 +770,14 @@ class Downloader(QueueMixin, WriteFileMixin):
file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
if self._should_download(relpath_u, metadata['version']):
self._deque.append( (relpath_u, file_node, metadata) )
to_dl = DownloadItem(
relpath_u,
PercentProgress(file_node.get_size()),
file_node,
metadata,
)
to_dl.set_status('queued', self._clock.seconds())
self._deque.append(to_dl)
else:
self._log("Excluding %r" % (relpath_u,))
self._call_hook(None, 'processed', async=True)
@ -703,42 +795,49 @@ class Downloader(QueueMixin, WriteFileMixin):
def _process(self, item, now=None):
# Downloader
self._log("_process(%r)" % (item,))
if now is None:
now = time.time()
(relpath_u, file_node, metadata) = item
fp = self._get_filepath(relpath_u)
if now is None: # XXX why can we pass in now?
now = time.time() # self._clock.seconds()
self._log("started! %s" % (now,))
item.set_status('started', now)
fp = self._get_filepath(item.relpath_u)
abspath_u = unicode_from_filepath(fp)
conflict_path_u = self._get_conflicted_filename(abspath_u)
d = defer.succeed(None)
def do_update_db(written_abspath_u):
filecap = file_node.get_uri()
last_uploaded_uri = metadata.get('last_uploaded_uri', None)
filecap = item.file_node.get_uri()
last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
last_downloaded_uri = filecap
last_downloaded_timestamp = now
written_pathinfo = get_pathinfo(written_abspath_u)
if not written_pathinfo.exists and not metadata.get('deleted', False):
if not written_pathinfo.exists and not item.metadata.get('deleted', False):
raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
self._db.did_upload_version(
item.relpath_u, item.metadata['version'], last_uploaded_uri,
last_downloaded_uri, last_downloaded_timestamp, written_pathinfo,
)
self._count('objects_downloaded')
item.set_status('success', self._clock.seconds())
def failed(f):
item.set_status('failure', self._clock.seconds())
self._log("download failed: %s" % (str(f),))
self._count('objects_failed')
return f
if os.path.isfile(conflict_path_u):
def fail(res):
raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
raise ConflictError("download failed: already conflicted: %r" % (item.relpath_u,))
d.addCallback(fail)
else:
is_conflict = False
db_entry = self._db.get_db_entry(relpath_u)
dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
db_entry = self._db.get_db_entry(item.relpath_u)
dmd_last_downloaded_uri = item.metadata.get('last_downloaded_uri', None)
dmd_last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
if db_entry:
if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
@ -747,22 +846,22 @@ class Downloader(QueueMixin, WriteFileMixin):
elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
is_conflict = True
self._count('objects_conflicted')
elif self._is_upload_pending(relpath_u):
elif self._is_upload_pending(item.relpath_u):
is_conflict = True
self._count('objects_conflicted')
if relpath_u.endswith(u"/"):
if metadata.get('deleted', False):
if item.relpath_u.endswith(u"/"):
if item.metadata.get('deleted', False):
self._log("rmdir(%r) ignored" % (abspath_u,))
else:
self._log("mkdir(%r)" % (abspath_u,))
d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
d.addCallback(lambda ign: abspath_u)
else:
if metadata.get('deleted', False):
if item.metadata.get('deleted', False):
d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
else:
d.addCallback(lambda ign: file_node.download_best_version())
d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
is_conflict=is_conflict))

View File

@ -8,7 +8,7 @@ from twisted.internet import defer
from allmydata import uri
from twisted.internet.interfaces import IConsumer
from allmydata.interfaces import IImmutableFileNode, IUploadResults
from allmydata.util import consumer
from allmydata.util import consumer, progress
from allmydata.check_results import CheckResults, CheckAndRepairResults
from allmydata.util.dictutil import DictOfSets
from allmydata.util.happinessutil import servers_of_happiness

View File

@ -65,6 +65,7 @@ class MagicFolderDB(object):
(relpath_u,))
row = self.cursor.fetchone()
if not row:
print "found nothing for", relpath_u
return None
else:
(size, mtime, ctime, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp) = row

View File

@ -1,7 +1,13 @@
import os
import urllib
from sys import stderr
from types import NoneType
from cStringIO import StringIO
from datetime import datetime
import humanize
import simplejson
from twisted.python import usage
@ -12,10 +18,13 @@ from .cli import MakeDirectoryOptions, LnOptions, CreateAliasOptions
import tahoe_mv
from allmydata.util.encodingutil import argv_to_abspath, argv_to_unicode, to_str, \
quote_local_unicode_path
from allmydata.scripts.common_http import do_http, format_http_success, \
format_http_error, BadResponse
from allmydata.util import fileutil
from allmydata.util import configutil
from allmydata import uri
INVITE_SEPARATOR = "+"
class CreateOptions(BasedirOptions):
@ -200,21 +209,167 @@ class StatusOptions(BasedirOptions):
nickname = None
synopsis = ""
stdin = StringIO("")
def parseArgs(self):
BasedirOptions.parseArgs(self)
node_url_file = os.path.join(self['node-directory'], u"node.url")
self['node-url'] = open(node_url_file, "r").read().strip()
with open(node_url_file, "r") as f:
self['node-url'] = f.read().strip()
def _get_json_for_fragment(options, fragment):
nodeurl = options['node-url']
if nodeurl.endswith('/'):
nodeurl = nodeurl[:-1]
url = u'%s/%s' % (nodeurl, fragment)
resp = do_http(method, url)
if isinstance(resp, BadResponse):
# specifically NOT using format_http_error() here because the
# URL is pretty sensitive (we're doing /uri/<key>).
raise RuntimeError(
"Failed to get json from '%s': %s" % (nodeurl, resp.error)
)
data = resp.read()
parsed = simplejson.loads(data)
if not parsed:
raise RuntimeError("No data from '%s'" % (nodeurl,))
return parsed
def _get_json_for_cap(options, cap):
return _get_json_for_fragment(
options,
'uri/%s?t=json' % urllib.quote(cap),
)
def _print_item_status(item, now, longest):
paddedname = (' ' * (longest - len(item['path']))) + item['path']
if 'failure_at' in item:
ts = datetime.fromtimestamp(item['started_at'])
prog = 'Failed %s (%s)' % (humanize.naturaltime(now - ts), ts)
elif item['percent_done'] < 100.0:
if 'started_at' not in item:
prog = 'not yet started'
else:
so_far = now - datetime.fromtimestamp(item['started_at'])
if so_far.seconds > 0.0:
rate = item['percent_done'] / so_far.seconds
if rate != 0:
time_left = (100.0 - item['percent_done']) / rate
prog = '%2.1f%% done, around %s left' % (
item['percent_done'],
humanize.naturaldelta(time_left),
)
else:
time_left = None
prog = '%2.1f%% done' % (item['percent_done'],)
else:
prog = 'just started'
else:
prog = ''
for verb in ['finished', 'started', 'queued']:
keyname = verb + '_at'
if keyname in item:
when = datetime.fromtimestamp(item[keyname])
prog = '%s %s' % (verb, humanize.naturaltime(now - when))
break
print " %s: %s" % (paddedname, prog)
def status(options):
# XXX todo: use http interface to ask about our magic-folder upload status
nodedir = options["node-directory"]
with open(os.path.join(nodedir, u"private", u"magic_folder_dircap")) as f:
dmd_cap = f.read().strip()
with open(os.path.join(nodedir, u"private", u"collective_dircap")) as f:
collective_readcap = f.read().strip()
try:
captype, dmd = _get_json_for_cap(options, dmd_cap)
if captype != 'dirnode':
print >>stderr, "magic_folder_dircap isn't a directory capability"
return 2
except RuntimeError as e:
print >>stderr, str(e)
return 1
now = datetime.now()
print "Local files:"
for (name, child) in dmd['children'].items():
captype, meta = child
status = 'good'
size = meta['size']
created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime'])
version = meta['metadata']['version']
nice_size = humanize.naturalsize(size)
nice_created = humanize.naturaltime(now - created)
if captype != 'filenode':
print "%20s: error, should be a filecap" % name
continue
print " %s (%s): %s, version=%s, created %s" % (name, nice_size, status, version, nice_created)
captype, collective = _get_json_for_cap(options, collective_readcap)
print
print "Remote files:"
for (name, data) in collective['children'].items():
if data[0] != 'dirnode':
print "Error: '%s': expected a dirnode, not '%s'" % (name, data[0])
print " %s's remote:" % name
dmd = _get_json_for_cap(options, data[1]['ro_uri'])
if dmd[0] != 'dirnode':
print "Error: should be a dirnode"
continue
for (n, d) in dmd[1]['children'].items():
if d[0] != 'filenode':
print "Error: expected '%s' to be a filenode." % (n,)
meta = d[1]
status = 'good'
size = meta['size']
created = datetime.fromtimestamp(meta['metadata']['tahoe']['linkcrtime'])
version = meta['metadata']['version']
nice_size = humanize.naturalsize(size)
nice_created = humanize.naturaltime(now - created)
print " %s (%s): %s, version=%s, created %s" % (n, nice_size, status, version, nice_created)
magicdata = _get_json_for_fragment(options, 'magic_folder?t=json')
if len(magicdata):
uploads = [item for item in magicdata if item['kind'] == 'upload']
downloads = [item for item in magicdata if item['kind'] == 'download']
longest = max([len(item['path']) for item in magicdata])
if True: # maybe --show-completed option or something?
uploads = [item for item in uploads if item['status'] != 'success']
downloads = [item for item in downloads if item['status'] != 'success']
if len(uploads):
print
print "Uploads:"
for item in uploads:
_print_item_status(item, now, longest)
if len(downloads):
print
print "Downloads:"
for item in downloads:
_print_item_status(item, now, longest)
for item in magicdata:
if item['status'] == 'failure':
print "Failed:", item
return 0
class MagicFolderCommand(BaseOptions):
subCommands = [
["create", None, CreateOptions, "Create a Magic Folder."],
["invite", None, InviteOptions, "Invite someone to a Magic Folder."],
["join", None, JoinOptions, "Join a Magic Folder."],
["leave", None, LeaveOptions, "Leave a Magic Folder."],
["status", None, StatusOptions, "Display stutus of uploads/downloads."],
]
def postOptions(self):
if not hasattr(self, 'subOptions'):
@ -234,6 +389,7 @@ subDispatch = {
"invite": invite,
"join": join,
"leave": leave,
"status": status,
}
def do_magic_folder(options):

View File

@ -1,5 +1,6 @@
import os, sys
import shutil
from twisted.trial import unittest
from twisted.internet import defer, task

View File

@ -0,0 +1,60 @@
import simplejson
from nevow import rend, url, tags as T
from nevow.inevow import IRequest
from allmydata.web.common import getxmlfile, get_arg, WebError
class MagicFolderWebApi(rend.Page):
"""
I provide the web-based API for Magic Folder status etc.
"""
def __init__(self, client):
##rend.Page.__init__(self, storage)
super(MagicFolderWebApi, self).__init__(client)
self.client = client
def _render_json(self, req):
req.setHeader("content-type", "application/json")
data = []
for item in self.client._magic_folder.uploader.get_status():
d = dict(
path=item.relpath_u,
status=item.status_history()[-1][0],
kind='upload',
)
for (status, ts) in item.status_history():
d[status + '_at'] = ts
d['percent_done'] = item.progress.progress
data.append(d)
for item in self.client._magic_folder.downloader.get_status():
d = dict(
path=item.relpath_u,
status=item.status_history()[-1][0],
kind='download',
)
for (status, ts) in item.status_history():
d[status + '_at'] = ts
d['percent_done'] = item.progress.progress
data.append(d)
return simplejson.dumps(data)
def renderHTTP(self, ctx):
req = IRequest(ctx)
t = get_arg(req, "t", None)
if t is None:
return rend.Page.renderHTTP(self, ctx)
t = t.strip()
if t == 'json':
return self._render_json(req)
raise WebError("'%s' invalid type for 't' arg" % (t,), 400)

View File

@ -12,7 +12,7 @@ from allmydata import get_package_versions_string
from allmydata.util import log
from allmydata.interfaces import IFileNode
from allmydata.web import filenode, directory, unlinked, status, operations
from allmydata.web import storage
from allmydata.web import storage, magic_folder
from allmydata.web.common import abbreviate_size, getxmlfile, WebError, \
get_arg, RenderMixin, get_format, get_mutable_type, render_time_delta, render_time, render_time_attr
@ -154,6 +154,9 @@ class Root(rend.Page):
self.child_uri = URIHandler(client)
self.child_cap = URIHandler(client)
# handler for "/magic_folder" URIs
self.child_magic_folder = magic_folder.MagicFolderWebApi(client)
self.child_file = FileHandler(client)
self.child_named = FileHandler(client)
self.child_status = status.Status(client.get_history())