Merge branch '2885.magic-folder-status.0'

This commit is contained in:
meejah 2018-03-20 12:12:16 -06:00
commit 3fa74d860f
4 changed files with 175 additions and 19 deletions

View File

@ -303,6 +303,23 @@ class MagicFolder(service.MultiService):
class QueueMixin(HookMixin): class QueueMixin(HookMixin):
"""
A parent class for Uploader and Downloader that handles putting
IQueuedItem instances into a work queue and processing
them. Tracks some history of recent items processed (for the
"status" API).
Subclasses implement _scan_delay, _perform_scan and _process
:ivar _deque: IQueuedItem instances to process
:ivar _process_history: the last 20 items we processed
:ivar _in_progress: current batch of items which are currently
being processed; chunks of work are removed from _deque and
worked on. As each finishes, it is added to _process_history
(with oldest items falling off the end).
"""
def __init__(self, client, local_path_u, db, name, clock): def __init__(self, client, local_path_u, db, name, clock):
self._client = client self._client = client
@ -318,6 +335,7 @@ class QueueMixin(HookMixin):
'started': None, 'started': None,
'iteration': None, 'iteration': None,
'inotify': None, 'inotify': None,
'item_processed': None,
} }
self.started_d = self.set_hook('started') self.started_d = self.set_hook('started')
@ -329,6 +347,7 @@ class QueueMixin(HookMixin):
self._deque = deque() self._deque = deque()
# do we also want to bound on "maximum age"? # do we also want to bound on "maximum age"?
self._process_history = deque(maxlen=20) self._process_history = deque(maxlen=20)
self._in_progress = []
self._stopped = False self._stopped = False
# a Deferred to wait for the _do_processing() loop to exit # a Deferred to wait for the _do_processing() loop to exit
@ -346,6 +365,8 @@ class QueueMixin(HookMixin):
""" """
for item in self._deque: for item in self._deque:
yield item yield item
for item in self._in_progress:
yield item
for item in self._process_history: for item in self._process_history:
yield item yield item
@ -414,15 +435,27 @@ class QueueMixin(HookMixin):
self._deque.clear() self._deque.clear()
self._count('objects_queued', -len(to_process)) self._count('objects_queued', -len(to_process))
# we want to include all these in the next status request, so
# we must put them 'somewhere' before the next yield (and it's
# not in _process_history because that gets trimmed and we
# don't want anything to disappear until after it is
# completed)
self._in_progress.extend(to_process)
self._log("%d items to process" % len(to_process), ) self._log("%d items to process" % len(to_process), )
for item in to_process: for item in to_process:
self._process_history.appendleft(item) self._process_history.appendleft(item)
self._in_progress.remove(item)
try: try:
self._log(" processing '%r'" % (item,)) self._log(" processing '%r'" % (item,))
proc = yield self._process(item) proc = yield self._process(item)
self._log(" done: %r" % proc) self._log(" done: %r" % proc)
if not proc:
self._process_history.remove(item)
self._call_hook(item, 'item_processed')
except Exception as e: except Exception as e:
log.err("processing '%r' failed: %s" % (item, e)) log.err("processing '%r' failed: %s" % (item, e))
item.set_status('failed', self._clock.seconds())
proc = Failure() proc = Failure()
self._call_hook(proc, 'processed') self._call_hook(proc, 'processed')
@ -470,10 +503,11 @@ class IQueuedItem(Interface):
@implementer(IQueuedItem) @implementer(IQueuedItem)
class QueuedItem(object): class QueuedItem(object):
def __init__(self, relpath_u, progress): def __init__(self, relpath_u, progress, size):
self.relpath_u = relpath_u self.relpath_u = relpath_u
self.progress = progress self.progress = progress
self._status_history = dict() self._status_history = dict()
self.size = size
def set_status(self, status, current_time=None): def set_status(self, status, current_time=None):
if current_time is None: if current_time is None:
@ -495,6 +529,12 @@ class QueuedItem(object):
hist.sort(lambda a, b: cmp(a[1], b[1])) hist.sort(lambda a, b: cmp(a[1], b[1]))
return hist return hist
def __eq__(self, other):
return (
other.relpath_u == self.relpath_u,
other.status_history() == self.status_history(),
)
class UploadItem(QueuedItem): class UploadItem(QueuedItem):
""" """
@ -601,8 +641,11 @@ class Uploader(QueueMixin):
return return
self._pending.add(relpath_u) self._pending.add(relpath_u)
fp = self._get_filepath(relpath_u)
pathinfo = get_pathinfo(unicode_from_filepath(fp))
progress = PercentProgress() progress = PercentProgress()
item = UploadItem(relpath_u, progress) self._log(u"add pending size: {}: {}".format(relpath_u, pathinfo.size))
item = UploadItem(relpath_u, progress, pathinfo.size)
item.set_status('queued', self._clock.seconds()) item.set_status('queued', self._clock.seconds())
self._deque.append(item) self._deque.append(item)
self._count('objects_queued') self._count('objects_queued')
@ -632,6 +675,15 @@ class Uploader(QueueMixin):
return relpath_u in self._pending return relpath_u in self._pending
def _notify(self, opaque, path, events_mask): def _notify(self, opaque, path, events_mask):
# Twisted doesn't seem to do anything if our callback throws
# an error, so...
try:
return self._real_notify(opaque, path, events_mask)
except Exception as e:
self._log(u"error calling _real_notify: {}".format(e))
twlog.err(Failure(), "Error calling _real_notify")
def _real_notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
relpath_u = self._get_relpath(path) relpath_u = self._get_relpath(path)
@ -657,6 +709,10 @@ class Uploader(QueueMixin):
self._call_hook(path, 'inotify') self._call_hook(path, 'inotify')
def _process(self, item): def _process(self, item):
"""
process a single QueuedItem. If this returns False, the item is
removed from _process_history
"""
# Uploader # Uploader
relpath_u = item.relpath_u relpath_u = item.relpath_u
self._log("_process(%r)" % (relpath_u,)) self._log("_process(%r)" % (relpath_u,))
@ -944,8 +1000,8 @@ class DownloadItem(QueuedItem):
""" """
Represents a single item in the _deque of the Downloader Represents a single item in the _deque of the Downloader
""" """
def __init__(self, relpath_u, progress, filenode, metadata): def __init__(self, relpath_u, progress, filenode, metadata, size):
super(DownloadItem, self).__init__(relpath_u, progress) super(DownloadItem, self).__init__(relpath_u, progress, size)
self.file_node = filenode self.file_node = filenode
self.metadata = metadata self.metadata = metadata
@ -1133,6 +1189,7 @@ class Downloader(QueueMixin, WriteFileMixin):
PercentProgress(file_node.get_size()), PercentProgress(file_node.get_size()),
file_node, file_node,
metadata, metadata,
file_node.get_size(),
) )
to_dl.set_status('queued', self._clock.seconds()) to_dl.set_status('queued', self._clock.seconds())
self._deque.append(to_dl) self._deque.append(to_dl)

View File

@ -1,6 +1,7 @@
import os, sys, time import os, sys, time
import shutil, json import shutil, json
import mock
from os.path import join, exists from os.path import join, exists
from twisted.trial import unittest from twisted.trial import unittest
@ -410,6 +411,9 @@ class FileOperationsHelper(object):
def write(self, path_u, contents): def write(self, path_u, contents):
fname = path_u fname = path_u
if not os.path.exists(fname):
self._maybe_notify(fname, self._inotify.IN_CREATE)
d = self._uploader.set_hook('inotify') d = self._uploader.set_hook('inotify')
with open(fname, "wb") as f: with open(fname, "wb") as f:
f.write(contents) f.write(contents)
@ -434,7 +438,7 @@ class FileOperationsHelper(object):
def _maybe_notify(self, fname, mask): def _maybe_notify(self, fname, mask):
if self._fake_inotify: if self._fake_inotify:
self._uploader._notifier.event(to_filepath(fname), self._inotify.IN_DELETE) self._uploader._notifier.event(to_filepath(fname), mask)
class CheckerMixin(object): class CheckerMixin(object):
@ -1428,7 +1432,13 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
what_path = abspath_expanduser_unicode(u"what", base=small_tree_dir) what_path = abspath_expanduser_unicode(u"what", base=small_tree_dir)
fileutil.write(what_path, "say when") fileutil.write(what_path, "say when")
yield self.fileops.move(small_tree_dir, new_small_tree_dir) yield self.fileops.move(small_tree_dir, new_small_tree_dir)
upstatus = list(self.magicfolder.uploader.get_status())
downstatus = list(self.magicfolder.downloader.get_status())
self.assertEqual(2, len(upstatus))
self.assertEqual(0, len(downstatus))
yield iterate(self.magicfolder) yield iterate(self.magicfolder)
# when we add the dir, we queue a scan of it; so we want # when we add the dir, we queue a scan of it; so we want
# the upload to "go" as well requiring 1 more iteration # the upload to "go" as well requiring 1 more iteration
yield iterate(self.magicfolder) yield iterate(self.magicfolder)
@ -1518,6 +1528,77 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
self.assertTrue(node is not None, "Failed to find %r in DMD" % (path,)) self.assertTrue(node is not None, "Failed to find %r in DMD" % (path,))
self.failUnlessEqual(metadata['version'], 1) self.failUnlessEqual(metadata['version'], 1)
@defer.inlineCallbacks
def test_batched_process(self):
"""
status APIs correctly function when there are 2 items queued at
once for processing
"""
# setup: get at least two items into the deque
path0 = os.path.join(self.local_dir, u'foo')
yield self.fileops.write(path0, 'foo\n')
path1 = os.path.join(self.local_dir, u'bar')
yield self.fileops.write(path1, 'bar\n')
# get the status before we've processed anything
upstatus0 = list(self.magicfolder.uploader.get_status())
upstatus1 = []
def one_item(item):
# grab status after we've processed a single item
us = list(self.magicfolder.uploader.get_status())
upstatus1.extend(us)
one_d = self.magicfolder.uploader.set_hook('item_processed')
# can't 'yield' here because the hook isn't called until
# inside iterate()
one_d.addCallbacks(one_item, self.fail)
yield iterate_uploader(self.magicfolder)
yield iterate_uploader(self.magicfolder) # req'd for windows; not sure why?
# no matter which part of the queue the items are in, we
# should see the same status from the outside
self.assertEqual(upstatus0, upstatus1)
@defer.inlineCallbacks
def test_real_notify_failure(self):
"""
Simulate an exception from the _real_notify helper in
magic-folder's uploader, confirming error-handling works.
"""
orig_notify = self.magicfolder.uploader._real_notify
class BadStuff(Exception):
pass
def bad_stuff(*args, **kw):
# call original method ..
orig_notify(*args, **kw)
# ..but then cause a special problem
raise BadStuff("the bad stuff")
patch_notify = mock.patch.object(
self.magicfolder.uploader,
'_real_notify',
mock.Mock(side_effect=bad_stuff),
)
with patch_notify:
path0 = os.path.join(self.local_dir, u'foo')
yield self.fileops.write(path0, 'foo\n')
# this actually triggers two notifies
# do a reactor turn; this is necessary because our "bad_stuff"
# method calls the hook (so the above 'yield' resumes) right
# *before* it raises the exception; thus, we ensure all the
# pending callbacks including the exception are processed
# before we flush the errors.
yield task.deferLater(reactor, 0, lambda: None)
errors = self.flushLoggedErrors(BadStuff)
# it seems on Windows the "RealTest" variant only produces 1
# notification for some reason..
self.assertTrue(len(errors) >= 1)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_delete_and_restore(self): def test_delete_and_restore(self):
# setup: create a file # setup: create a file

View File

@ -1,6 +1,7 @@
import os.path, re, urllib, time, cgi import os.path, re, urllib, time, cgi
import json import json
import treq import treq
import mock
from twisted.application import service from twisted.application import service
from twisted.trial import unittest from twisted.trial import unittest
@ -30,6 +31,7 @@ from allmydata.immutable import upload
from allmydata.immutable.downloader.status import DownloadStatus from allmydata.immutable.downloader.status import DownloadStatus
from allmydata.dirnode import DirectoryNode from allmydata.dirnode import DirectoryNode
from allmydata.nodemaker import NodeMaker from allmydata.nodemaker import NodeMaker
from allmydata.frontends.magic_folder import QueuedItem
from allmydata.web import status from allmydata.web import status
from allmydata.web.common import WebError, MultiFormatPage from allmydata.web.common import WebError, MultiFormatPage
from allmydata.util import fileutil, base32, hashutil from allmydata.util import fileutil, base32, hashutil
@ -120,17 +122,13 @@ class FakeStatus(object):
return self.status return self.status
class FakeStatusItem(object): def create_test_queued_item(relpath_u, history=[]):
def __init__(self, p, history): progress = mock.Mock()
self.relpath_u = p progress.progress = 100.0
self.history = history item = QueuedItem(relpath_u, progress, 1234)
import mock for the_status, timestamp in history:
self.progress = mock.Mock() item.set_status(the_status, current_time=timestamp)
self.progress.progress = 100.0 return item
def status_history(self):
return self.history
class FakeMagicFolder(object): class FakeMagicFolder(object):
@ -1005,7 +1003,10 @@ class Web(WebMixin, WebErrorMixin, testutil.StallMixin, testutil.ReallyEqualMixi
def test_magicfolder_status_success(self): def test_magicfolder_status_success(self):
self.s._magic_folders['default'] = mf = FakeMagicFolder() self.s._magic_folders['default'] = mf = FakeMagicFolder()
mf.uploader.status = [ mf.uploader.status = [
FakeStatusItem(u"rel/path", [('done', 12345)]) create_test_queued_item(u"rel/uppath", [('done', 12345)])
]
mf.downloader.status = [
create_test_queued_item(u"rel/downpath", [('done', 23456)])
] ]
data = yield self.POST( data = yield self.POST(
'/magic_folder?t=json', '/magic_folder?t=json',
@ -1017,7 +1018,22 @@ class Web(WebMixin, WebErrorMixin, testutil.StallMixin, testutil.ReallyEqualMixi
self.assertEqual( self.assertEqual(
data, data,
[ [
{"status": "done", "path": "rel/path", "kind": "upload", "percent_done": 100.0, "done_at": 12345}, {
"status": "done",
"path": "rel/uppath",
"kind": "upload",
"percent_done": 100.0,
"done_at": 12345,
"size": 1234,
},
{
"status": "done",
"path": "rel/downpath",
"kind": "download",
"percent_done": 100.0,
"done_at": 23456,
"size": 1234,
},
] ]
) )
@ -1025,7 +1041,7 @@ class Web(WebMixin, WebErrorMixin, testutil.StallMixin, testutil.ReallyEqualMixi
def test_magicfolder_root_success(self): def test_magicfolder_root_success(self):
self.s._magic_folders['default'] = mf = FakeMagicFolder() self.s._magic_folders['default'] = mf = FakeMagicFolder()
mf.uploader.status = [ mf.uploader.status = [
FakeStatusItem(u"rel/path", [('done', 12345)]) create_test_queued_item(u"rel/path", [('done', 12345)])
] ]
data = yield self.GET( data = yield self.GET(
'/', '/',

View File

@ -34,6 +34,7 @@ class MagicFolderWebApi(TokenOnlyWebApi):
for (status, ts) in item.status_history(): for (status, ts) in item.status_history():
d[status + '_at'] = ts d[status + '_at'] = ts
d['percent_done'] = item.progress.progress d['percent_done'] = item.progress.progress
d['size'] = item.size
data.append(d) data.append(d)
for item in magic_folder.downloader.get_status(): for item in magic_folder.downloader.get_status():
@ -45,6 +46,7 @@ class MagicFolderWebApi(TokenOnlyWebApi):
for (status, ts) in item.status_history(): for (status, ts) in item.status_history():
d[status + '_at'] = ts d[status + '_at'] = ts
d['percent_done'] = item.progress.progress d['percent_done'] = item.progress.progress
d['size'] = item.size
data.append(d) data.append(d)
return json.dumps(data) return json.dumps(data)