diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index 474dc5fe2..f1e145d0e 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -460,6 +460,19 @@ PERFORM_SCAN = ActionType( u"Remote storage is being scanned for changes which need to be synchronized.", ) +_COUNT = Field.for_types( + u"count", + [int, long], + u"The number of items in the processing queue.", +) + +PROCESS_QUEUE = ActionType( + u"magic-folder:process-queue", + [_COUNT], + [], + u"A Magic-Folder is working through an item queue.", +) + SCAN_REMOTE_COLLECTIVE = ActionType( u"magic-folder:scan-remote-collective", [], @@ -713,7 +726,7 @@ class QueueMixin(HookMixin): def _perform_scan(self): return - @defer.inlineCallbacks + @eliotutil.inline_callbacks def _process_deque(self): # process everything currently in the queue. we're turning it # into a list so that if any new items get added while we're @@ -729,24 +742,21 @@ class QueueMixin(HookMixin): # completed) self._in_progress.extend(to_process) - if to_process: - self._log("%d items to process" % len(to_process), ) - for item in to_process: - self._process_history.appendleft(item) - self._in_progress.remove(item) - try: - self._log(" processing '%r'" % (item,)) - proc = yield self._process(item) - self._log(" done: %r" % proc) - if not proc: - self._process_history.remove(item) - self._call_hook(item, 'item_processed') - except Exception as e: - log.err("processing '%r' failed: %s" % (item, e)) - item.set_status('failed', self._clock.seconds()) - proc = Failure() + with PROCESS_QUEUE(count=len(to_process)): + for item in to_process: + self._process_history.appendleft(item) + self._in_progress.remove(item) + try: + proc = yield self._process(item) + if not proc: + self._process_history.remove(item) + self._call_hook(item, 'item_processed') + except: + write_traceback() + item.set_status('failed', self._clock.seconds()) + proc = Failure() - self._call_hook(proc, 'processed') + self._call_hook(proc, 'processed') def _get_relpath(self, filepath): self._log("_get_relpath(%r)" % (filepath,)) @@ -1009,8 +1019,8 @@ class Uploader(QueueMixin): def _process(self, item): """ - process a single QueuedItem. If this returns False, the item is - removed from _process_history + Possibly upload a single QueuedItem. If this returns False, the item is + removed from _process_history. """ with PROCESS_ITEM(item=item).context(): d = DeferredContext(defer.succeed(False)) @@ -1542,6 +1552,10 @@ class Downloader(QueueMixin, WriteFileMixin): ) def _process(self, item): + """ + Possibly upload a single QueuedItem. If this returns False, the item is + removed from _process_history. + """ # Downloader now = self._clock.seconds()