Move the pending_delay mechanism from Windows-specific code to magic_folder.py.

This is necessary because we have insufficent information in the Windows code
about how event masks are used.

Signed-off-by: Daira Hopwood <daira@jacaranda.org>
This commit is contained in:
Daira Hopwood 2015-11-03 14:17:35 +00:00
parent a2e18bf43e
commit 182e07e151
2 changed files with 44 additions and 62 deletions

View File

@ -183,9 +183,7 @@ class Uploader(QueueMixin):
self._inotify = get_inotify_module()
self._notifier = self._inotify.INotify()
self._pending = set()
if hasattr(self._notifier, 'set_pending_delay'):
self._notifier.set_pending_delay(pending_delay)
self._pending_delay = pending_delay
# TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
#
@ -222,15 +220,19 @@ class Uploader(QueueMixin):
def start_scanning(self):
self._log("start_scanning")
self.is_ready = True
self._pending = self._db.get_all_relpaths()
self._log("all_files %r" % (self._pending))
d = self._scan(u"")
def _add_pending(ign):
# This adds all of the files that were in the db but not already processed
# (normally because they have been deleted on disk).
self._log("adding %r" % (self._pending))
self._deque.extend(self._pending)
d.addCallback(_add_pending)
# Notify ALL THE THINGS.
# XXX this does not guarantee to notify parents before children. Is that a problem?
all_relpaths = self._db.get_all_relpaths()
self._log("all_files %r" % (all_relpaths,))
for relpath_u in all_relpaths:
fp = self._get_filepath(relpath_u)
self._notify(None, fp, IN_CHANGED)
self._scan(u"")
d = defer.succeed(None)
d.addCallback(lambda ign: self._turn_deque())
return d
@ -239,31 +241,16 @@ class Uploader(QueueMixin):
fp = self._get_filepath(reldir_u)
try:
children = listdir_filepath(fp)
except EnvironmentError:
raise Exception("WARNING: magic folder: permission denied on directory %s"
% quote_filepath(fp))
except FilenameEncodingError:
raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
% quote_filepath(fp))
except (EnvironmentError, FilenameEncodingError) as e:
self._log("error listing directory %s: %s" % (quote_filepath(fp), e))
return None
d = defer.succeed(None)
for child in children:
_assert(isinstance(child, unicode), child=child)
d.addCallback(lambda ign, child=child:
("%s/%s" % (reldir_u, child) if reldir_u else child))
def _add_pending(relpath_u):
if magicpath.should_ignore_file(relpath_u):
return None
self._pending.add(relpath_u)
return relpath_u
d.addCallback(_add_pending)
# This call to _process doesn't go through the deque, and probably should.
d.addCallback(self._process)
d.addBoth(self._call_hook, 'processed')
d.addErrback(log.err)
return d
relpath_u = ("%s/%s" % (reldir_u, child)) if reldir_u else child
if not magicpath.should_ignore_file(relpath_u):
child_fp = self._get_filepath(relpath_u)
self._notify(None, child_fp, IN_CHANGED)
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
@ -288,14 +275,21 @@ class Uploader(QueueMixin):
return
self._log("appending %r to deque" % (relpath_u,))
self._deque.append(relpath_u)
self._pending.add(relpath_u)
self._count('objects_queued')
if self.is_ready:
if self._immediate: # for tests
self._turn_deque()
else:
self._clock.callLater(0, self._turn_deque)
def _do_append():
self._pending.remove(relpath_u)
self._deque.append(relpath_u)
self._count('objects_queued')
if self.is_ready:
if self._immediate: # for tests
self._turn_deque()
else:
self._clock.callLater(0, self._turn_deque)
if self._immediate:
_do_append()
else:
self._clock.callLater(self._pending_delay, _do_append)
def _when_queue_is_empty(self):
return defer.succeed(None)
@ -316,9 +310,6 @@ class Uploader(QueueMixin):
fp = self._get_filepath(relpath_u)
pathinfo = get_pathinfo(unicode_from_filepath(fp))
self._log("about to remove %r from pending set %r" %
(relpath_u, self._pending))
self._pending.remove(relpath_u)
encoded_path_u = magicpath.path2magic(relpath_u)
if not pathinfo.exists:

View File

@ -199,13 +199,8 @@ class INotify(PollMixin):
self._callbacks = None
self._hDirectory = None
self._path = None
self._pending = set()
self._pending_delay = 1.0
self.recursive_includes_new_subdirectories = True
def set_pending_delay(self, delay):
self._pending_delay = delay
def startReading(self):
deferToThread(self._thread)
return self.poll(lambda: self._state != NOT_STARTED)
@ -265,20 +260,16 @@ class INotify(PollMixin):
return
path = self._path.preauthChild(info.filename) # FilePath with Unicode path
#mask = _action_to_inotify_mask.get(info.action, IN_CHANGED)
mask = _action_to_inotify_mask.get(info.action, IN_CHANGED)
def _maybe_notify(path):
if path not in self._pending:
self._pending.add(path)
def _do_callbacks():
self._pending.remove(path)
for cb in self._callbacks:
try:
cb(None, path, IN_CHANGED)
except Exception, e:
log.err(e)
reactor.callLater(self._pending_delay, _do_callbacks)
reactor.callFromThread(_maybe_notify, path)
def _notify(path):
for cb in self._callbacks:
try:
cb(None, path, mask)
except Exception, e:
log.err(e)
reactor.callFromThread(_notify, path)
except Exception, e:
log.err(e)
self._state = STOPPED