Replace _lazy_tail with iterative loop, cleanup tests (#2412)

* uses @inlineCallbacks to turn the _lazy_tail recursion into
   a "real" looking loop;
 * remove the need for "immediate" vs delayed iteration of said loop;
 * make it easier for the unit-tests to control the behavior of the
   uploader/downloader;
 * consolidates (some) setup/teardown code into the setUp and tearDown
   hooks provided by unittest so unit-tests aren't doing that themselves
 * re-factors some of the unit-tests to use an @inlineCallbacks style
   so they're easier to follow and debug

This doesn't tackle the "how to know when our inotify events have arrived"
problem the unit-tests still have, nor does it eliminate the myriad bits
of state that get added to tests via all the MixIns.
This commit is contained in:
meejah 2016-01-28 13:33:09 -07:00 committed by Brian Warner
parent e03e243c67
commit e6104cd1a2
5 changed files with 986 additions and 935 deletions

View File

@ -4,7 +4,7 @@ import os.path
from collections import deque
import time
from twisted.internet import defer, reactor, task
from twisted.internet import defer, reactor, task, error
from twisted.python.failure import Failure
from twisted.python import runtime
from twisted.application import service
@ -65,7 +65,6 @@ class MagicFolder(service.MultiService):
service.MultiService.__init__(self)
immediate = clock is not None
clock = clock or reactor
db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
if db is None:
@ -78,7 +77,7 @@ class MagicFolder(service.MultiService):
upload_dirnode = self._client.create_node_from_uri(upload_dircap)
collective_dirnode = self._client.create_node_from_uri(collective_dircap)
self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock)
self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
@ -109,14 +108,20 @@ class MagicFolder(service.MultiService):
class QueueMixin(HookMixin):
def __init__(self, client, local_path_u, db, name, clock):
scan_interval = 0
def __init__(self, client, local_path_u, db, name, clock, delay=0):
self._client = client
self._local_path_u = local_path_u
self._local_filepath = to_filepath(local_path_u)
self._db = db
self._name = name
self._clock = clock
self._hooks = {'processed': None, 'started': None}
self._hooks = {
'processed': None,
'started': None,
'iteration': None,
}
self.started_d = self.set_hook('started')
if not self._local_filepath.exists():
@ -131,9 +136,15 @@ class QueueMixin(HookMixin):
self._deque = deque()
# do we also want to bound on "maximum age"?
self._process_history = deque(maxlen=20)
self._lazy_tail = defer.succeed(None)
self._stopped = False
self._turn_delay = 0
# XXX pass in an initial value for this; it seems like .10 broke this and it's always 0
self._turn_delay = delay
self._log('delay is %f' % self._turn_delay)
# a Deferred to wait for the _do_processing() loop to exit
# (gets set to the return from _do_processing() if we get that
# far)
self._processing = defer.succeed(None)
def get_status(self):
"""
@ -148,6 +159,83 @@ class QueueMixin(HookMixin):
self._log("_get_filepath(%r)" % (relpath_u,))
return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
def _begin_processing(self, res):
self._log("starting processing loop")
self._processing = self._do_processing()
# if there are any errors coming out of _do_processing then
# our loop is done and we're hosed (i.e. _do_processing()
# itself has a bug in it)
def fatal_error(f):
self._log("internal error: %s" % (f.value,))
self._log(f)
self._processing.addErrback(fatal_error)
return res
@defer.inlineCallbacks
def _do_processing(self):
"""
This is an infinite loop that processes things out of the _deque.
One iteration runs self._process_deque which calls
_when_queue_is_empty() and then completely drains the _deque
(processing each item). After that we yield for _turn_deque
seconds.
"""
# we subtract here so there's a scan on the very first iteration
last_scan = self._clock.seconds() - self.scan_interval
while not self._stopped:
self._log("doing iteration")
d = task.deferLater(self._clock, self._turn_delay, lambda: None)
# ">=" is important here if scan scan_interval is 0
if self._clock.seconds() - last_scan >= self.scan_interval:
# XXX can't we unify the "_full_scan" vs what
# Downloader does...
last_scan = self._clock.seconds()
yield self._when_queue_is_empty() # (this no-op for us, only Downloader uses it...)
self._log("did scan; now %d" % last_scan)
else:
self._log("skipped scan")
# process anything in our queue
yield self._process_deque()
self._log("one loop; call_hook iteration %r" % self)
self._call_hook(None, 'iteration')
# we want to have our callLater queued in the reactor
# *before* we trigger the 'iteration' hook, so that hook
# can successfully advance the Clock and bypass the delay
# if required (e.g. in the tests).
if not self._stopped:
self._log("waiting... %r" % d)
yield d
self._log("stopped")
def _when_queue_is_empty(self):
return
@defer.inlineCallbacks
def _process_deque(self):
self._log("_process_deque %r" % (self._deque,))
# process everything currently in the queue. we're turning it
# into a list so that if any new items get added while we're
# processing, they'll not run until next time)
to_process = list(self._deque)
self._deque.clear()
self._count('objects_queued', -len(to_process))
self._log("%d items to process" % len(to_process), )
for item in to_process:
try:
self._log(" processing '%r'" % (item,))
proc = yield self._process(item)
self._log(" done: %r" % proc)
except Exception as e:
log.err("processing '%r' failed: %s" % (item, e))
proc = None # actually in old _lazy_tail way, proc would be Failure
# XXX can we just get rid of the hooks now?
yield self._call_hook(proc, 'processed')
def _get_relpath(self, filepath):
self._log("_get_relpath(%r)" % (filepath,))
segments = unicode_segments_from(filepath, self._local_filepath)
@ -156,8 +244,8 @@ class QueueMixin(HookMixin):
def _count(self, counter_name, delta=1):
ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
self._log("%s += %r" % (counter_name, delta))
self._client.stats_provider.count(ctr, delta)
self._log("%s += %r (now %r)" % (counter_name, delta, self._client.stats_provider.counters[ctr]))
def _logcb(self, res, msg):
self._log("%s: %r" % (msg, res))
@ -169,37 +257,6 @@ class QueueMixin(HookMixin):
print s
#open("events", "ab+").write(msg)
def _turn_deque(self):
try:
self._log("_turn_deque")
if self._stopped:
self._log("stopped")
return
try:
item = IQueuedItem(self._deque.pop())
self._process_history.append(item)
self._log("popped %r, now have %d" % (item, len(self._deque)))
self._count('objects_queued', -1)
except IndexError:
self._log("deque is now empty")
self._lazy_tail.addBoth(self._logcb, "whawhat empty")
self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
self._lazy_tail.addBoth(self._logcb, "got past _when_queue_is_empty")
else:
self._log("_turn_deque else clause")
self._lazy_tail.addBoth(self._logcb, "whawhat else %r" % (item,))
self._lazy_tail.addCallback(lambda ign: self._process(item))
self._lazy_tail.addBoth(self._logcb, "got past _process")
self._lazy_tail.addBoth(self._call_hook, 'processed', async=True)
self._lazy_tail.addBoth(self._logcb, "got past _call_hook (turn_delay = %r)" % (self._turn_delay,))
self._lazy_tail.addErrback(log.err)
self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
self._lazy_tail.addBoth(self._logcb, "got past deferLater")
except Exception as e:
self._log("---- turn deque exception %s" % (e,))
raise
# this isn't in interfaces.py because it's very specific to QueueMixin
class IQueuedItem(Interface):
@ -257,12 +314,11 @@ class UploadItem(QueuedItem):
class Uploader(QueueMixin):
def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
immediate=False):
QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock, delay=pending_delay)
self.is_ready = False
self._immediate = immediate
if not IDirectoryNode.providedBy(upload_dirnode):
raise AssertionError("The URI in '%s' does not refer to a directory."
@ -311,7 +367,9 @@ class Uploader(QueueMixin):
d = self._notifier.wait_until_stopped()
else:
d = defer.succeed(None)
d.addCallback(lambda ign: self._lazy_tail)
self._stopped = True
# wait for processing loop to actually exit
d.addCallback(lambda ign: self._processing)
return d
def start_uploading(self):
@ -325,40 +383,38 @@ class Uploader(QueueMixin):
self._add_pending(relpath_u)
self._full_scan()
# XXX changed this while re-basing; double check we can
# *really* just call this synchronously.
return self._begin_processing(None)
def _extend_queue_and_keep_going(self, relpaths_u):
self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
for relpath_u in relpaths_u:
def _full_scan(self):
self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
self._log("FULL SCAN")
self._log("_pending %r" % (self._pending))
self._scan(u"")
def _add_pending(self, relpath_u):
self._log("add pending %r" % (relpath_u,))
if magicpath.should_ignore_file(relpath_u):
self._log("_add_pending %r but should_ignore()==True" % (relpath_u,))
return
if relpath_u in self._pending:
self._log("_add_pending %r but already pending" % (relpath_u,))
return
self._pending.add(relpath_u)
progress = PercentProgress()
item = UploadItem(relpath_u, progress)
item.set_status('queued', self._clock.seconds())
self._deque.append(item)
self._count('objects_queued', len(relpaths_u))
if self.is_ready:
if self._immediate: # for tests
self._turn_deque()
else:
self._clock.callLater(0, self._turn_deque)
def _full_scan(self):
self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
print "FULL SCAN"
self._log("_pending %r" % (self._pending))
self._scan(u"")
self._extend_queue_and_keep_going(self._pending)
def _add_pending(self, relpath_u):
self._log("add pending %r" % (relpath_u,))
if not magicpath.should_ignore_file(relpath_u):
self._pending.add(relpath_u)
self._count('objects_queued')
self._log("_add_pending(%r) queued item" % (relpath_u,))
def _scan(self, reldir_u):
# Scan a directory by (synchronously) adding the paths of all its children to self._pending.
# Note that this doesn't add them to the deque -- that will
self._log("scan %r" % (reldir_u,))
self._log("SCAN '%r'" % (reldir_u,))
fp = self._get_filepath(reldir_u)
try:
children = listdir_filepath(fp)
@ -370,6 +426,7 @@ class Uploader(QueueMixin):
% quote_filepath(fp))
for child in children:
self._log(" scan; child %r" % (child,))
_assert(isinstance(child, unicode), child=child)
self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
@ -398,11 +455,7 @@ class Uploader(QueueMixin):
self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
return
self._pending.add(relpath_u)
self._extend_queue_and_keep_going([relpath_u])
def _when_queue_is_empty(self):
return defer.succeed(None)
self._add_pending(relpath_u)
def _process(self, item):
# Uploader
@ -412,7 +465,7 @@ class Uploader(QueueMixin):
if relpath_u is None:
item.set_status('invalid_path', self._clock.seconds())
return
return defer.succeed(None)
precondition(isinstance(relpath_u, unicode), relpath_u)
precondition(not relpath_u.endswith(u'/'), relpath_u)
@ -427,7 +480,10 @@ class Uploader(QueueMixin):
self._log("about to remove %r from pending set %r" %
(relpath_u, self._pending))
try:
self._pending.remove(relpath_u)
except KeyError:
self._log("WRONG that %r wasn't in pending" % (relpath_u,))
encoded_path_u = magicpath.path2magic(relpath_u)
if not pathinfo.exists:
@ -448,9 +504,11 @@ class Uploader(QueueMixin):
self._count('objects_not_uploaded')
return
metadata = { 'version': new_version,
metadata = {
'version': new_version,
'deleted': True,
'last_downloaded_timestamp': last_downloaded_timestamp }
'last_downloaded_timestamp': last_downloaded_timestamp,
}
if db_entry.last_downloaded_uri is not None:
metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
@ -475,10 +533,16 @@ class Uploader(QueueMixin):
self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
return None
elif pathinfo.isdir:
print "ISDIR "
self._log("ISDIR")
if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
db_entry = self._db.get_db_entry(relpath_u)
self._log("isdir dbentry %r" % (db_entry,))
if not is_new_file(pathinfo, db_entry):
self._log("NOT A NEW FILE")
return defer.succeed(None)
uploadable = Data("", self._client.convergence)
encoded_path_u += magicpath.path2magic(u"/")
self._log("encoded_path_u = %r" % (encoded_path_u,))
@ -496,7 +560,6 @@ class Uploader(QueueMixin):
return f
upload_d.addCallbacks(_dir_succeeded, _dir_failed)
upload_d.addCallback(lambda ign: self._scan(relpath_u))
upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
return upload_d
elif pathinfo.isfile:
db_entry = self._db.get_db_entry(relpath_u)
@ -512,8 +575,10 @@ class Uploader(QueueMixin):
self._count('objects_not_uploaded')
return None
metadata = { 'version': new_version,
'last_downloaded_timestamp': last_downloaded_timestamp }
metadata = {
'version': new_version,
'last_downloaded_timestamp': last_downloaded_timestamp,
}
if db_entry is not None and db_entry.last_downloaded_uri is not None:
metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
@ -646,11 +711,11 @@ class DownloadItem(QueuedItem):
class Downloader(QueueMixin, WriteFileMixin):
REMOTE_SCAN_INTERVAL = 3 # facilitates tests
scan_interval = 3
def __init__(self, client, local_path_u, db, collective_dirnode,
upload_readonly_dircap, clock, is_upload_pending, umask):
QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock, delay=self.scan_interval)
if not IDirectoryNode.providedBy(collective_dirnode):
raise AssertionError("The URI in '%s' does not refer to a directory."
@ -666,20 +731,21 @@ class Downloader(QueueMixin, WriteFileMixin):
def start_downloading(self):
self._log("start_downloading")
self._turn_delay = self.REMOTE_SCAN_INTERVAL
self._turn_delay = self.scan_interval
files = self._db.get_all_relpaths()
self._log("all files %s" % files)
d = self._scan_remote_collective(scan_self=True)
d.addBoth(self._logcb, "after _scan_remote_collective 0")
self._turn_deque()
d.addCallback(self._begin_processing)
return d
def stop(self):
self._log("stop")
self._stopped = True
d = defer.succeed(None)
d.addCallback(lambda ign: self._lazy_tail)
# wait for processing loop to actually exit
d.addCallback(lambda ign: self._processing)
return d
def _should_download(self, relpath_u, remote_version):
@ -734,7 +800,7 @@ class Downloader(QueueMixin, WriteFileMixin):
node = None
for success, result in deferredList:
if success:
if result[1]['version'] > max_version:
if node is None or result[1]['version'] > max_version:
node, metadata = result
max_version = result[1]['version']
return node, metadata
@ -801,23 +867,23 @@ class Downloader(QueueMixin, WriteFileMixin):
self._deque.append(to_dl)
else:
self._log("Excluding %r" % (relpath_u,))
self._call_hook(None, 'processed', async=True)
self._call_hook(None, 'processed', async=True) # await this maybe-Deferred??
self._log("deque after = %r" % (self._deque,))
d.addCallback(_filter_batch_to_deque)
return d
# XXX fixme
def _when_queue_is_empty(self):
d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
d.addBoth(self._logcb, "after _scan_remote_collective 1")
d.addCallback(lambda ign: self._turn_deque())
return d
return self._scan(None)
def _process(self, item, now=None):
def _scan(self, ign):
return self._scan_remote_collective()
def _process(self, item):
# Downloader
self._log("_process(%r)" % (item,))
if now is None: # XXX why can we pass in now?
now = time.time() # self._clock.seconds()
now = self._clock.seconds()
self._log("started! %s" % (now,))
item.set_status('started', now)
@ -830,6 +896,7 @@ class Downloader(QueueMixin, WriteFileMixin):
def do_update_db(written_abspath_u):
filecap = item.file_node.get_uri()
last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
self._log("DOUPDATEDB %r" % written_abspath_u)
last_downloaded_uri = filecap
last_downloaded_timestamp = now
written_pathinfo = get_pathinfo(written_abspath_u)
@ -886,10 +953,12 @@ class Downloader(QueueMixin, WriteFileMixin):
d.addCallback(lambda contents: self._write_downloaded_file(self._local_path_u, abspath_u, contents,
is_conflict=is_conflict))
d.addCallbacks(do_update_db, failed)
d.addCallbacks(do_update_db)
d.addErrback(failed)
def trap_conflicts(f):
f.trap(ConflictError)
self._log("IGNORE CONFLICT ERROR %r" % f)
return None
d.addErrback(trap_conflicts)
return d

View File

@ -65,7 +65,7 @@ class MagicFolderDB(object):
(relpath_u,))
row = self.cursor.fetchone()
if not row:
print "found nothing for", relpath_u
print "no dbentry for %r" % (relpath_u,)
return None
else:
(size, mtime, ctime, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp) = row

View File

@ -22,7 +22,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin):
def do_create_magic_folder(self, client_num):
d = self.do_cli("magic-folder", "create", "magic:", client_num=client_num)
def _done((rc,stdout,stderr)):
self.failUnlessEqual(rc, 0)
self.failUnlessEqual(rc, 0, stdout + stderr)
self.failUnlessIn("Alias 'magic' created", stdout)
self.failUnlessEqual(stderr, "")
aliases = get_aliases(self.get_clientdir(i=client_num))
@ -100,7 +100,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin):
local_dir_arg = unicode_to_argv(local_dir)
d = self.do_cli("magic-folder", "create", "magic:", nickname_arg, local_dir_arg)
def _done((rc, stdout, stderr)):
self.failUnlessEqual(rc, 0)
self.failUnlessEqual(rc, 0, stdout + stderr)
client = self.get_client()
self.collective_dircap, self.upload_dircap = self.get_caps_from_files(0)
@ -111,10 +111,16 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin):
d.addCallback(lambda ign: self.check_config(0, local_dir))
return d
# XXX should probably just be "tearDown"...
def cleanup(self, res):
d = defer.succeed(None)
if self.magicfolder is not None:
d.addCallback(lambda ign: self.magicfolder.finish())
def _clean(ign):
d = self.magicfolder.finish()
self.magicfolder.uploader._clock.advance(self.magicfolder.uploader.scan_interval + 1)
self.magicfolder.downloader._clock.advance(self.magicfolder.downloader.scan_interval + 1)
return d
d.addCallback(_clean)
d.addCallback(lambda ign: res)
return d

File diff suppressed because it is too large Load Diff

View File

@ -282,11 +282,8 @@ def write_atomically(target, contents, mode="b"):
move_into_place(target+".tmp", target)
def write(path, data, mode="wb"):
wf = open(path, mode)
try:
wf.write(data)
finally:
wf.close()
with open(path, mode) as f:
f.write(data)
def read(path):
rf = open(path, "rb")