diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index da25f042b..a91142ce7 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -335,6 +335,7 @@ class QueueMixin(HookMixin): 'started': None, 'iteration': None, 'inotify': None, + 'item_processed': None, } self.started_d = self.set_hook('started') @@ -451,6 +452,7 @@ class QueueMixin(HookMixin): self._log(" done: %r" % proc) if not proc: self._process_history.remove(item) + self._call_hook(item, 'item_processed') except Exception as e: log.err("processing '%r' failed: %s" % (item, e)) item.set_status('failed', self._clock.seconds()) @@ -527,6 +529,12 @@ class QueuedItem(object): hist.sort(lambda a, b: cmp(a[1], b[1])) return hist + def __eq__(self, other): + return ( + other.relpath_u == self.relpath_u, + other.status_history() == self.status_history(), + ) + class UploadItem(QueuedItem): """ diff --git a/src/allmydata/test/test_magic_folder.py b/src/allmydata/test/test_magic_folder.py index 17a36cf2a..78ac22b03 100644 --- a/src/allmydata/test/test_magic_folder.py +++ b/src/allmydata/test/test_magic_folder.py @@ -1430,9 +1430,11 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall yield self.fileops.move(small_tree_dir, new_small_tree_dir) upstatus = list(self.magicfolder.uploader.get_status()) downstatus = list(self.magicfolder.downloader.get_status()) + self.assertEqual(2, len(upstatus)) self.assertEqual(0, len(downstatus)) yield iterate(self.magicfolder) + # when we add the dir, we queue a scan of it; so we want # the upload to "go" as well requiring 1 more iteration yield iterate(self.magicfolder) @@ -1522,6 +1524,35 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall self.assertTrue(node is not None, "Failed to find %r in DMD" % (path,)) self.failUnlessEqual(metadata['version'], 1) + @defer.inlineCallbacks + def test_batched_process(self): + # setup: get at least two items into the deque + path0 = os.path.join(self.local_dir, u'foo') + yield self.fileops.write(path0, 'foo\n') + path1 = os.path.join(self.local_dir, u'bar') + yield self.fileops.write(path1, 'bar\n') + + # get the status before we've processed anything + upstatus0 = list(self.magicfolder.uploader.get_status()) + upstatus1 = [] + + def one_item(item): + # grab status after we've processed a single item + us = list(self.magicfolder.uploader.get_status()) + upstatus1.extend(us) + one_d = self.magicfolder.uploader.set_hook('item_processed') + # can't 'yield' here because the hook isn't called until + # inside iterate() + one_d.addCallbacks(one_item, self.fail) + + yield iterate_uploader(self.magicfolder) + yield iterate_uploader(self.magicfolder) # req'd for windows; not sure why? + + # status we got each time should be the same + self.assertEqual(len(upstatus0), len(upstatus1)) + for item0, item1 in zip(upstatus0, upstatus1): + self.assertEqual(item0, item1) + @defer.inlineCallbacks def test_delete_and_restore(self): # setup: create a file