WIP to unify queue processing between uploader and downloader.

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
Daira Hopwood 2015-07-30 18:52:05 +01:00
parent c7091ef6e6
commit 27fe50ff24

View File

@ -104,13 +104,16 @@ class QueueMixin(object):
self._pending = set()
self._callback = lambda ign: None
self._ignore_count = 0
self._stopped = False
self._turn_delay = 0
def _count(self, counter_name, delta=1):
self._client.stats_provider.count('magic_folder.%s.%s' % (self._name, counter_name), delta)
def _log(self, msg):
self._client.log("Magic Folder %s: %s" % (self._name, msg))
#print "_log %s" % (msg,)
s = "Magic Folder %s: %s" % (self._name, msg)
self._client.log(s)
print s
#open("events", "ab+").write(msg)
def _append_to_deque(self, path):
@ -123,14 +126,17 @@ class QueueMixin(object):
reactor.callLater(0, self._turn_deque)
def _turn_deque(self):
try:
path = self._deque.pop()
except IndexError:
self._log("magic folder upload deque is now empty")
self._lazy_tail = defer.succeed(None)
if self._stopped:
return
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path))
self._lazy_tail.addCallback(lambda ign: self._turn_deque())
try:
item = self._deque.pop()
except IndexError:
self._log("deque is now empty")
self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
else:
self._lazy_tail.addCallback(lambda ign: self._process(item))
#self._lazy_tail.addErrback(lambda f: self._log("error: %s" % (f,)))
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._turn_delay, self._turn_deque))
def _do_callback(self, res):
if self._ignore_count == 0:
@ -270,6 +276,9 @@ class Uploader(QueueMixin):
path_u = unicode_from_filepath(path)
self._append_to_deque(path_u)
def _when_queue_is_empty(self):
return defer.succeed(None)
def _process(self, path_u):
precondition(isinstance(path_u, unicode), path_u)
d = defer.succeed(None)
@ -373,9 +382,8 @@ class Downloader(QueueMixin):
if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
self._remote_scan_delay = 3 # XXX
self._turn_delay = 3 # delay between remote scans
self._download_scan_batch = {} # path -> [(filenode, metadata)]
self._stopped = False
def start_scanning(self):
self._scan_remote_collective()
@ -493,7 +501,13 @@ class Downloader(QueueMixin):
extension += [(name, file_node, metadata)]
return extension
def _download_file(self, name, file_node):
def _when_queue_is_empty(self):
d = task.deferLater(reactor, self._turn_delay, self._scan_remote_collective)
d.addCallback(lambda ign: self._turn_deque())
return d
def _process(self, item):
(name, file_node, metadata) = item
d = file_node.download_best_version()
def succeeded(res):
d.addCallback(lambda result: self._write_downloaded_file(name, result))
@ -522,18 +536,3 @@ class Downloader(QueueMixin):
self._count('download_objects_queued')
if self.is_ready:
reactor.callLater(0, self._turn_deque)
# FIXME move to QueueMixin
def _turn_deque(self):
if self._stopped:
return
try:
file_path, file_node, metadata = self._deque.pop()
except IndexError:
self._log("magic folder upload deque is now empty")
self._lazy_tail = defer.succeed(None)
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective))
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_deque))
return
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node))
self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_deque))