diff --git a/newsfragments/1432.feature b/newsfragments/1432.feature new file mode 100644 index 000000000..2bb9a8b01 --- /dev/null +++ b/newsfragments/1432.feature @@ -0,0 +1 @@ +Magic-Folders are now supported on macOS. diff --git a/setup.py b/setup.py index 3d4ccc4c0..361e2c5dc 100644 --- a/setup.py +++ b/setup.py @@ -255,6 +255,7 @@ setup(name="tahoe-lafs", # also set in __init__.py install_requires=install_requires, extras_require={ ':sys_platform=="win32"': ["pypiwin32"], + ':sys_platform!="win32" and sys_platform!="linux2"': ["watchdog"], # For magic-folder on "darwin" (macOS) and the BSDs "test": [ # Pin a specific pyflakes so we don't have different folks # disagreeing on what is or is not a lint issue. We can bump diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index e3d2d9ebf..a335c40b5 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -7,6 +7,7 @@ from datetime import datetime import time import ConfigParser +from twisted.python.log import msg as twmsg from twisted.python.filepath import FilePath from twisted.python.monkey import MonkeyPatcher from twisted.internet import defer, reactor, task @@ -19,6 +20,8 @@ from zope.interface import Interface, Attribute, implementer from eliot import ( Field, + Message, + start_action, ActionType, MessageType, write_failure, @@ -35,9 +38,6 @@ from allmydata.util import ( yamlutil, eliotutil, ) -from allmydata.util.fake_inotify import ( - humanReadableMask, -) from allmydata.interfaces import IDirectoryNode from allmydata.util import log from allmydata.util.fileutil import ( @@ -75,9 +75,11 @@ def _get_inotify_module(): from allmydata.windows import inotify elif runtime.platform.supportsINotify(): from twisted.internet import inotify + elif not sys.platform.startswith("linux"): + from allmydata.watchdog import inotify else: raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n" - "This currently requires Linux or Windows.") + "This currently requires Linux, Windows, or macOS.") return inotify except (ImportError, AttributeError) as e: log.msg(e) @@ -521,10 +523,11 @@ MAYBE_UPLOAD = MessageType( u"A decision is being made about whether to upload a file.", ) -PENDING = Field.for_types( +PENDING = Field( u"pending", - [list], + lambda s: list(s), u"The paths which are pending processing.", + eliotutil.validateInstanceOf(set), ) REMOVE_FROM_PENDING = ActionType( @@ -547,6 +550,19 @@ NOTIFIED_OBJECT_DISAPPEARED = MessageType( u"A path which generated a notification was not found on the filesystem. This is normal.", ) +PROPAGATE_DIRECTORY_DELETION = ActionType( + u"magic-folder:propagate-directory-deletion", + [], + [], + u"Children of a deleted directory are being queued for upload processing.", +) + +NO_DATABASE_ENTRY = MessageType( + u"magic-folder:no-database-entry", + [], + u"There is no local database entry for a particular relative path in the magic folder.", +) + NOT_UPLOADING = MessageType( u"magic-folder:not-uploading", [], @@ -688,13 +704,6 @@ NOTIFIED = ActionType( u"Magic-Folder received a notification of a local filesystem change for a certain path.", ) -_EVENTS = Field( - u"events", - humanReadableMask, - u"Details about a filesystem event generating a notification event.", - eliotutil.validateInstanceOf((int, long)), -) - _NON_DIR_CREATED = Field.for_types( u"non_dir_created", [bool], @@ -704,7 +713,7 @@ _NON_DIR_CREATED = Field.for_types( REACT_TO_INOTIFY = ActionType( u"magic-folder:react-to-inotify", - [_EVENTS], + [eliotutil.INOTIFY_EVENTS], [_IGNORED, _NON_DIR_CREATED, _ALREADY_PENDING], u"Magic-Folder is processing a notification from inotify(7) (or a clone) about a filesystem event.", ) @@ -1118,6 +1127,13 @@ PROCESS_ITEM = ActionType( u"A path which was found wanting of an update is receiving an update.", ) +DOWNLOAD_BEST_VERSION = ActionType( + u"magic-folder:download-best-version", + [], + [], + u"The content of a file in the Magic Folder is being downloaded.", +) + class Uploader(QueueMixin): def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock): @@ -1152,14 +1168,21 @@ class Uploader(QueueMixin): | self._inotify.IN_ONLYDIR | IN_EXCL_UNLINK ) - self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify], - recursive=True) + + def _add_watch(self, filepath): + self._notifier.watch( + filepath, + mask=self.mask, + callbacks=[self._notify], + recursive=True, + ) def start_monitoring(self): action = START_MONITORING(**self._log_fields) with action.context(): d = DeferredContext(defer.succeed(None)) + d.addCallback(lambda ign: self._add_watch(self._local_filepath)) d.addCallback(lambda ign: self._notifier.startReading()) d.addCallback(lambda ign: self._count('dirs_monitored')) d.addBoth(self._call_hook, 'started') @@ -1258,7 +1281,7 @@ class Uploader(QueueMixin): # All can do is id() or repr() it and neither of those actually # produces very illuminating results. We drop opaque on the # floor, anyway. - events=events_mask, + inotify_events=events_mask, ) success_fields = dict(non_dir_created=False, already_pending=False, ignored=False) @@ -1296,41 +1319,70 @@ class Uploader(QueueMixin): """ # Uploader with PROCESS_ITEM(item=item).context(): - d = DeferredContext(defer.succeed(False)) - relpath_u = item.relpath_u - item.set_status('started', self._clock.seconds()) + precondition(isinstance(relpath_u, unicode), relpath_u) + precondition(not relpath_u.endswith(u'/'), relpath_u) + encoded_path_u = magicpath.path2magic(relpath_u) + d = DeferredContext(defer.succeed(False)) if relpath_u is None: item.set_status('invalid_path', self._clock.seconds()) return d.addActionFinish() + item.set_status('started', self._clock.seconds()) - precondition(isinstance(relpath_u, unicode), relpath_u) - precondition(not relpath_u.endswith(u'/'), relpath_u) + try: + # Take this item out of the pending set before we do any + # I/O-based processing related to it. If a further change + # takes place after we remove it from this set, we want it to + # end up in the set again. If we haven't gotten around to + # doing the I/O-based processing yet then the worst that will + # happen is we'll do a little redundant processing. + # + # If we did it the other way around, the sequence of events + # might be something like: we do some I/O, someone else does + # some I/O, a notification gets discarded because the path is + # still in the pending set, _then_ we remove it from the + # pending set. In such a circumstance, we've missed some I/O + # that we should have responded to. + with REMOVE_FROM_PENDING(relpath=relpath_u, pending=self._pending): + self._pending.remove(relpath_u) + except KeyError: + pass + + fp = self._get_filepath(relpath_u) + pathinfo = get_pathinfo(unicode_from_filepath(fp)) + + db_entry_is_dir = False + db_entry = self._db.get_db_entry(relpath_u) + if db_entry is None: + # Maybe it was a directory! + db_entry = self._db.get_db_entry(relpath_u + u"/") + if db_entry is None: + NO_DATABASE_ENTRY.log() + else: + db_entry_is_dir = True def _maybe_upload(ign, now=None): MAYBE_UPLOAD.log(relpath=relpath_u) if now is None: now = time.time() - fp = self._get_filepath(relpath_u) - pathinfo = get_pathinfo(unicode_from_filepath(fp)) - - try: - with REMOVE_FROM_PENDING(relpath=relpath_u, pending=list(self._pending)): - self._pending.remove(relpath_u) - except KeyError: - pass - encoded_path_u = magicpath.path2magic(relpath_u) if not pathinfo.exists: # FIXME merge this with the 'isfile' case. NOTIFIED_OBJECT_DISAPPEARED.log(path=fp) self._count('objects_disappeared') - db_entry = self._db.get_db_entry(relpath_u) if db_entry is None: + # If it exists neither on the filesystem nor in the + # database, it's neither a creation nor a deletion and + # there's nothing more to do. return False + if pathinfo.isdir or db_entry_is_dir: + with PROPAGATE_DIRECTORY_DELETION(): + for localpath in self._db.get_direct_children(relpath_u): + self._add_pending(localpath.relpath_u) + last_downloaded_timestamp = now # is this correct? if is_new_file(pathinfo, db_entry): @@ -1374,9 +1426,17 @@ class Uploader(QueueMixin): if db_entry.last_uploaded_uri is not None: metadata['last_uploaded_uri'] = db_entry.last_uploaded_uri + if db_entry_is_dir: + real_encoded_path_u = encoded_path_u + magicpath.path2magic(u"/") + real_relpath_u = relpath_u + u"/" + else: + real_encoded_path_u = encoded_path_u + real_relpath_u = relpath_u + empty_uploadable = Data("", self._client.convergence) d2 = DeferredContext(self._upload_dirnode.add_file( - encoded_path_u, empty_uploadable, + real_encoded_path_u, + empty_uploadable, metadata=metadata, overwrite=True, progress=item.progress, @@ -1389,7 +1449,7 @@ class Uploader(QueueMixin): # immediately re-download it when we start up next last_downloaded_uri = metadata.get('last_downloaded_uri', filecap) self._db.did_upload_version( - relpath_u, + real_relpath_u, new_version, filecap, last_downloaded_uri, @@ -1405,33 +1465,38 @@ class Uploader(QueueMixin): return False elif pathinfo.isdir: if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False): - self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True) + self._add_watch(fp) - db_entry = self._db.get_db_entry(relpath_u) DIRECTORY_PATHENTRY.log(pathentry=db_entry) if not is_new_file(pathinfo, db_entry): NOT_NEW_DIRECTORY.log() return False uploadable = Data("", self._client.convergence) - encoded_path_u += magicpath.path2magic(u"/") with PROCESS_DIRECTORY().context() as action: upload_d = DeferredContext(self._upload_dirnode.add_file( - encoded_path_u, uploadable, + encoded_path_u + magicpath.path2magic(u"/"), + uploadable, metadata={"version": 0}, overwrite=True, progress=item.progress, )) - def _dir_succeeded(ign): + def _dir_succeeded(dirnode): action.add_success_fields(created_directory=relpath_u) self._count('directories_created') + self._db.did_upload_version( + relpath_u + u"/", + version=0, + last_uploaded_uri=dirnode.get_uri(), + last_downloaded_uri=None, + last_downloaded_timestamp=now, + pathinfo=pathinfo, + ) upload_d.addCallback(_dir_succeeded) upload_d.addCallback(lambda ign: self._scan(relpath_u)) upload_d.addCallback(lambda ign: True) return upload_d.addActionFinish() elif pathinfo.isfile: - db_entry = self._db.get_db_entry(relpath_u) - last_downloaded_timestamp = now if db_entry is None: @@ -1654,10 +1719,11 @@ class Downloader(QueueMixin, WriteFileMixin): while True: try: yield self._scan_remote_collective(scan_self=True) - # The integration tests watch for this log message to - # decide when it is safe to proceed. Clearly, we need - # better programmatic interrogation of magic-folder state. - print("Completed initial Magic Folder scan successfully ({})".format(self)) + # The integration tests watch for this log message (in the + # Twisted log) to decide when it is safe to proceed. + # Clearly, we need better programmatic interrogation of + # magic-folder state. + twmsg("Completed initial Magic Folder scan successfully ({})".format(self)) self._begin_processing() return except Exception: @@ -1707,12 +1773,21 @@ class Downloader(QueueMixin, WriteFileMixin): file node and metadata for the latest version of the file located in the magic-folder collective directory. """ - collective_dirmap_d = self._collective_dirnode.list() + action = start_action( + action_type=u"magic-folder:downloader:get-latest-file", + name=filename, + ) + with action.context(): + collective_dirmap_d = DeferredContext(self._collective_dirnode.list()) def scan_collective(result): + Message.log( + message_type=u"magic-folder:downloader:get-latest-file:collective-scan", + dmds=result.keys(), + ) list_of_deferreds = [] for dir_name in result.keys(): # XXX make sure it's a directory - d = defer.succeed(None) + d = DeferredContext(defer.succeed(None)) d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename)) list_of_deferreds.append(d) deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True) @@ -1724,12 +1799,20 @@ class Downloader(QueueMixin, WriteFileMixin): node = None for success, result in deferredList: if success: + Message.log( + message_type=u"magic-folder:downloader:get-latest-file:version", + version=result[1]['version'], + ) if node is None or result[1]['version'] > max_version: node, metadata = result max_version = result[1]['version'] + else: + Message.log( + message_type="magic-folder:downloader:get-latest-file:failed", + ) return node, metadata collective_dirmap_d.addCallback(highest_version) - return collective_dirmap_d + return collective_dirmap_d.addActionFinish() def _scan_remote_dmd(self, nickname, dirnode, scan_batch): with SCAN_REMOTE_DMD(nickname=nickname).context(): @@ -1959,14 +2042,17 @@ class Downloader(QueueMixin, WriteFileMixin): if item.metadata.get('deleted', False): d.addCallback(lambda ign: self._rename_deleted_file(abspath_u)) else: - d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress)) - d.addCallback( - lambda contents: self._write_downloaded_file( + @eliotutil.log_call_deferred(DOWNLOAD_BEST_VERSION.action_type) + def download_best_version(ignored): + d = DeferredContext(item.file_node.download_best_version(progress=item.progress)) + d.addCallback(lambda contents: self._write_downloaded_file( self._local_path_u, abspath_u, contents, is_conflict=is_conflict, mtime=item.metadata.get('user_mtime', item.metadata.get('tahoe', {}).get('linkmotime')), - ) - ) + )) + return d.result + + d.addCallback(download_best_version) d.addCallback(do_update_db) d.addErrback(failed) diff --git a/src/allmydata/magicfolderdb.py b/src/allmydata/magicfolderdb.py index b74edc9da..171dbe4e6 100644 --- a/src/allmydata/magicfolderdb.py +++ b/src/allmydata/magicfolderdb.py @@ -43,11 +43,11 @@ _INSERT_OR_UPDATE = Field.for_types( validateSetMembership({u"insert", u"update"}), ) -DID_UPLOAD_VERSION = ActionType( - u"magic-folder-db:did-upload-version", +UPDATE_ENTRY = ActionType( + u"magic-folder-db:update-entry", [RELPATH, VERSION, LAST_UPLOADED_URI, LAST_DOWNLOADED_URI, LAST_DOWNLOADED_TIMESTAMP, PATHINFO], [_INSERT_OR_UPDATE], - u"An file upload is being recorded in the database.", + u"Record some metadata about a relative path in the magic-folder.", ) @@ -88,6 +88,15 @@ def get_magicfolderdb(dbfile, stderr=sys.stderr, print >>stderr, e return None +class LocalPath(object): + @classmethod + def fromrow(self, row): + p = LocalPath() + p.relpath_u = row[0] + p.entry = PathEntry(*row[1:]) + return p + + class MagicFolderDB(object): VERSION = 1 @@ -121,6 +130,42 @@ class MagicFolderDB(object): last_downloaded_uri=last_downloaded_uri, last_downloaded_timestamp=last_downloaded_timestamp) + def get_direct_children(self, relpath_u): + """ + Given the relative path to a directory, return ``LocalPath`` instances + representing all direct children of that directory. + """ + # It would be great to not be interpolating data into query + # statements. However, query parameters are not supported in the + # position where we need them. + sqlitesafe_relpath_u = relpath_u.replace(u"'", u"''") + statement = ( + """ + SELECT + path, size, mtime_ns, ctime_ns, version, last_uploaded_uri, + last_downloaded_uri, last_downloaded_timestamp + FROM + local_files + WHERE + -- The "_" used here ensures there is at least one character + -- after the /. This prevents matching the path itself. + path LIKE '{path}/_%' AND + + -- The "_" used here serves a similar purpose. This allows + -- matching directory children but avoids matching their + -- children. + path NOT LIKE '{path}/_%/_%' + """ + ).format(path=sqlitesafe_relpath_u) + + self.cursor.execute(statement) + rows = self.cursor.fetchall() + return list( + LocalPath.fromrow(row) + for row + in rows + ) + def get_all_relpaths(self): """ Retrieve a set of all relpaths of files that have had an entry in magic folder db @@ -131,7 +176,7 @@ class MagicFolderDB(object): return set([r[0] for r in rows]) def did_upload_version(self, relpath_u, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp, pathinfo): - action = DID_UPLOAD_VERSION( + action = UPDATE_ENTRY( relpath=relpath_u, version=version, last_uploaded_uri=last_uploaded_uri, diff --git a/src/allmydata/test/cli/test_magic_folder.py b/src/allmydata/test/cli/test_magic_folder.py index 6556b063f..7704f8a33 100644 --- a/src/allmydata/test/cli/test_magic_folder.py +++ b/src/allmydata/test/cli/test_magic_folder.py @@ -6,6 +6,10 @@ import re import time from datetime import datetime +from eliot import ( + log_call, + start_action, +) from eliot.twisted import ( DeferredContext, ) @@ -38,7 +42,13 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin): self.bob_nickname = self.unicode_or_fallback(u"Bob\u00F8", u"Bob", io_as_well=True) def do_create_magic_folder(self, client_num): - d = self.do_cli("magic-folder", "--debug", "create", "magic:", client_num=client_num) + with start_action(action_type=u"create-magic-folder", client_num=client_num).context(): + d = DeferredContext( + self.do_cli( + "magic-folder", "--debug", "create", "magic:", + client_num=client_num, + ) + ) def _done((rc,stdout,stderr)): self.failUnlessEqual(rc, 0, stdout + stderr) self.assertIn("Alias 'magic' created", stdout) @@ -49,16 +59,30 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin): self.assertIn("magic", aliases) self.failUnless(aliases["magic"].startswith("URI:DIR2:")) d.addCallback(_done) - return d + return d.addActionFinish() def do_invite(self, client_num, nickname): nickname_arg = unicode_to_argv(nickname) - d = self.do_cli("magic-folder", "invite", "magic:", nickname_arg, client_num=client_num) + action = start_action( + action_type=u"invite-to-magic-folder", + client_num=client_num, + nickname=nickname, + ) + with action.context(): + d = DeferredContext( + self.do_cli( + "magic-folder", + "invite", + "magic:", + nickname_arg, + client_num=client_num, + ) + ) def _done((rc, stdout, stderr)): self.failUnlessEqual(rc, 0) return (rc, stdout, stderr) d.addCallback(_done) - return d + return d.addActionFinish() def do_list(self, client_num, json=False): args = ("magic-folder", "list",) @@ -81,18 +105,32 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin): return d def do_join(self, client_num, local_dir, invite_code): - precondition(isinstance(local_dir, unicode), local_dir=local_dir) - precondition(isinstance(invite_code, str), invite_code=invite_code) - - local_dir_arg = unicode_to_argv(local_dir) - d = self.do_cli("magic-folder", "join", invite_code, local_dir_arg, client_num=client_num) + action = start_action( + action_type=u"join-magic-folder", + client_num=client_num, + local_dir=local_dir, + invite_code=invite_code, + ) + with action.context(): + precondition(isinstance(local_dir, unicode), local_dir=local_dir) + precondition(isinstance(invite_code, str), invite_code=invite_code) + local_dir_arg = unicode_to_argv(local_dir) + d = DeferredContext( + self.do_cli( + "magic-folder", + "join", + invite_code, + local_dir_arg, + client_num=client_num, + ) + ) def _done((rc, stdout, stderr)): self.failUnlessEqual(rc, 0) self.failUnlessEqual(stdout, "") self.failUnlessEqual(stderr, "") return (rc, stdout, stderr) d.addCallback(_done) - return d + return d.addActionFinish() def do_leave(self, client_num): d = self.do_cli("magic-folder", "leave", client_num=client_num) @@ -106,8 +144,16 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin): """Tests that our collective directory has the readonly cap of our upload directory. """ - collective_readonly_cap = self.get_caps_from_files(client_num)[0] - d = self.do_cli("ls", "--json", collective_readonly_cap, client_num=client_num) + action = start_action(action_type=u"check-joined-config") + with action.context(): + collective_readonly_cap = self.get_caps_from_files(client_num)[0] + d = DeferredContext( + self.do_cli( + "ls", "--json", + collective_readonly_cap, + client_num=client_num, + ) + ) def _done((rc, stdout, stderr)): self.failUnlessEqual(rc, 0) return (rc, stdout, stderr) @@ -118,7 +164,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin): self.failUnless(s is not None) return None d.addCallback(test_joined_magic_folder) - return d + return d.addActionFinish() def get_caps_from_files(self, client_num): from allmydata.frontends.magic_folder import load_magic_folders @@ -126,6 +172,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin): mf = folders["default"] return mf['collective_dircap'], mf['upload_dircap'] + @log_call def check_config(self, client_num, local_dir): client_config = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "tahoe.cfg")) mf_yaml = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private", "magic_folders.yaml")) diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index 5d44dd0e5..06a7161ff 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -2,6 +2,10 @@ __all__ = [ "SyncTestCase", "AsyncTestCase", "AsyncBrokenTestCase", + + "flush_logged_errors", + "skip", + "skipIf", ] import os, random, struct @@ -16,11 +20,14 @@ from zope.interface import implementer from testtools import ( TestCase, + skip, + skipIf, ) from testtools.twistedsupport import ( SynchronousDeferredRunTest, AsynchronousDeferredRunTest, AsynchronousDeferredRunTestForBrokenTwisted, + flush_logged_errors, ) from twisted.internet import defer diff --git a/src/allmydata/test/eliotutil.py b/src/allmydata/test/eliotutil.py index 90ff9179c..09d34f938 100644 --- a/src/allmydata/test/eliotutil.py +++ b/src/allmydata/test/eliotutil.py @@ -57,22 +57,23 @@ def eliot_logged_test(f): @wraps(f) def run_and_republish(self, *a, **kw): + # Unfortunately the only way to get at the global/default logger... + # This import is delayed here so that we get the *current* default + # logger at the time the decorated function is run. + from eliot._output import _DEFAULT_LOGGER as default_logger + def republish(): # This is called as a cleanup function after capture_logging has # restored the global/default logger to its original state. We # can now emit messages that go to whatever global destinations # are installed. - # Unfortunately the only way to get at the global/default - # logger... - from eliot._output import _DEFAULT_LOGGER as logger - # storage.logger.serialize() seems like it would make more sense # than storage.logger.messages here. However, serialize() # explodes, seemingly as a result of double-serializing the logged # messages. I don't understand this. for msg in storage.logger.messages: - logger.write(msg) + default_logger.write(msg) # And now that we've re-published all of the test's messages, we # can finish the test's action. diff --git a/src/allmydata/test/test_inotify.py b/src/allmydata/test/test_inotify.py new file mode 100644 index 000000000..9f618a34a --- /dev/null +++ b/src/allmydata/test/test_inotify.py @@ -0,0 +1,171 @@ +# Copyright (c) Twisted Matrix Laboratories. +# See LICENSE for details. + +""" +Tests for the inotify-alike implementation L{allmydata.watchdog}. +""" + +# Note: See https://twistedmatrix.com/trac/ticket/8915 for a proposal +# to avoid all of this duplicated code from Twisted. + +from twisted.internet import defer, reactor +from twisted.python import filepath, runtime + +from allmydata.frontends.magic_folder import get_inotify_module +from .common import ( + AsyncTestCase, + skipIf, +) +inotify = get_inotify_module() + + +@skipIf(runtime.platformType == "win32", "inotify does not yet work on windows") +class INotifyTests(AsyncTestCase): + """ + Define all the tests for the basic functionality exposed by + L{inotify.INotify}. + """ + def setUp(self): + self.dirname = filepath.FilePath(self.mktemp()) + self.dirname.createDirectory() + self.inotify = inotify.INotify() + self.inotify.startReading() + self.addCleanup(self.inotify.stopReading) + return super(INotifyTests, self).setUp() + + + def _notificationTest(self, mask, operation, expectedPath=None): + """ + Test notification from some filesystem operation. + + @param mask: The event mask to use when setting up the watch. + + @param operation: A function which will be called with the + name of a file in the watched directory and which should + trigger the event. + + @param expectedPath: Optionally, the name of the path which is + expected to come back in the notification event; this will + also be passed to C{operation} (primarily useful when the + operation is being done to the directory itself, not a + file in it). + + @return: A L{Deferred} which fires successfully when the + expected event has been received or fails otherwise. + """ + if expectedPath is None: + expectedPath = self.dirname.child("foo.bar") + notified = defer.Deferred() + def cbNotified(result): + (watch, filename, events) = result + self.assertEqual(filename.asBytesMode(), expectedPath.asBytesMode()) + self.assertTrue(events & mask) + self.inotify.ignore(self.dirname) + notified.addCallback(cbNotified) + + def notify_event(*args): + notified.callback(args) + self.inotify.watch( + self.dirname, mask=mask, + callbacks=[notify_event]) + operation(expectedPath) + return notified + + + def test_modify(self): + """ + Writing to a file in a monitored directory sends an + C{inotify.IN_MODIFY} event to the callback. + """ + def operation(path): + with path.open("w") as fObj: + fObj.write(b'foo') + + return self._notificationTest(inotify.IN_MODIFY, operation) + + + def test_attrib(self): + """ + Changing the metadata of a file in a monitored directory + sends an C{inotify.IN_ATTRIB} event to the callback. + """ + def operation(path): + # Create the file. + path.touch() + # Modify the file's attributes. + path.touch() + + return self._notificationTest(inotify.IN_ATTRIB, operation) + + + def test_closeWrite(self): + """ + Closing a file which was open for writing in a monitored + directory sends an C{inotify.IN_CLOSE_WRITE} event to the + callback. + """ + def operation(path): + path.open("w").close() + + return self._notificationTest(inotify.IN_CLOSE_WRITE, operation) + + + def test_delete(self): + """ + Deleting a file in a monitored directory sends an + C{inotify.IN_DELETE} event to the callback. + """ + expectedPath = self.dirname.child("foo.bar") + expectedPath.touch() + notified = defer.Deferred() + def cbNotified(result): + (watch, filename, events) = result + self.assertEqual(filename.asBytesMode(), expectedPath.asBytesMode()) + self.assertTrue(events & inotify.IN_DELETE) + notified.addCallback(cbNotified) + self.inotify.watch( + self.dirname, mask=inotify.IN_DELETE, + callbacks=[lambda *args: notified.callback(args)]) + expectedPath.remove() + return notified + + + def test_humanReadableMask(self): + """ + L{inotify.humanReadableMask} translates all the possible event masks to a + human readable string. + """ + for mask, value in inotify._FLAG_TO_HUMAN: + self.assertEqual(inotify.humanReadableMask(mask)[0], value) + + checkMask = ( + inotify.IN_CLOSE_WRITE | inotify.IN_ACCESS | inotify.IN_OPEN) + self.assertEqual( + set(inotify.humanReadableMask(checkMask)), + set(['close_write', 'access', 'open'])) + + + def test_noAutoAddSubdirectory(self): + """ + L{inotify.INotify.watch} with autoAdd==False will stop inotify + from watching subdirectories created under the watched one. + """ + def _callback(wp, fp, mask): + # We are notified before we actually process new + # directories, so we need to defer this check. + def _(): + try: + self.assertFalse(self.inotify._isWatched(subdir)) + d.callback(None) + except Exception: + d.errback() + reactor.callLater(0, _) + + checkMask = inotify.IN_ISDIR | inotify.IN_CREATE + self.inotify.watch( + self.dirname, mask=checkMask, autoAdd=False, + callbacks=[_callback]) + subdir = self.dirname.child('test') + d = defer.Deferred() + subdir.createDirectory() + return d diff --git a/src/allmydata/test/test_magic_folder.py b/src/allmydata/test/test_magic_folder.py index 06962d2c5..fa0118a96 100644 --- a/src/allmydata/test/test_magic_folder.py +++ b/src/allmydata/test/test_magic_folder.py @@ -3,13 +3,12 @@ import os, sys, time import stat, shutil, json import mock from os.path import join, exists, isdir +from errno import ENOENT from twisted.internet import defer, task, reactor +from twisted.python.runtime import platform from twisted.python.filepath import FilePath -from testtools import ( - skipIf, -) from testtools.matchers import ( Not, Is, @@ -20,6 +19,7 @@ from testtools.matchers import ( from eliot import ( Message, start_action, + log_call, ) from eliot.twisted import DeferredContext @@ -38,6 +38,7 @@ from .common import ( ShouldFailMixin, SyncTestCase, AsyncTestCase, + skipIf, ) from .cli.test_magic_folder import MagicFolderCLITestMixin @@ -73,6 +74,37 @@ else: support_missing = False support_message = None +if platform.isMacOSX(): + def modified_mtime_barrier(path): + """ + macOS filesystem (HFS+) has one second resolution on filesystem + modification time metadata. Make sure that code running after this + function which modifies the file will produce a changed mtime on that + file. + """ + try: + mtime = path.getModificationTime() + except OSError as e: + if e.errno == ENOENT: + # If the file does not exist yet, there is no current mtime + # value that might match a future mtime value. We have + # nothing to do. + return + # Propagate any other errors as we don't know what's going on. + raise + if int(time.time()) == int(mtime): + # The current time matches the file's modification time, to the + # resolution of the filesystem metadata. Therefore, change the + # current time. + time.sleep(1) +else: + def modified_mtime_barrier(path): + """ + non-macOS platforms have sufficiently high-resolution file modification + time metadata that nothing in particular is required to ensure a + modified mtime as a result of a future write. + """ + class NewConfigUtilTests(SyncTestCase): @@ -502,6 +534,53 @@ class MagicFolderDbTests(SyncTestCase): self.assertTrue(entry is not None) self.assertEqual(entry.last_downloaded_uri, content_uri) + def test_get_direct_children(self): + """ + ``get_direct_children`` returns a list of ``PathEntry`` representing each + local file in the database which is a direct child of the given path. + """ + def add_file(relpath_u): + self.db.did_upload_version( + relpath_u=relpath_u, + version=0, + last_uploaded_uri=None, + last_downloaded_uri=None, + last_downloaded_timestamp=1234, + pathinfo=get_pathinfo(self.temp), + ) + paths = [ + u"some_random_file", + u"the_target_directory_is_elsewhere", + u"the_target_directory_is_not_this/", + u"the_target_directory_is_not_this/and_not_in_here", + u"the_target_directory/", + u"the_target_directory/foo", + u"the_target_directory/bar", + u"the_target_directory/baz", + u"the_target_directory/quux/", + u"the_target_directory/quux/exclude_grandchildren", + u"the_target_directory/quux/and_great_grandchildren/", + u"the_target_directory/quux/and_great_grandchildren/foo", + u"the_target_directory_is_over/stuff", + u"please_ignore_this_for_sure", + ] + for relpath_u in paths: + add_file(relpath_u) + + expected_paths = [ + u"the_target_directory/foo", + u"the_target_directory/bar", + u"the_target_directory/baz", + u"the_target_directory/quux/", + ] + + actual_paths = list( + localpath.relpath_u + for localpath + in self.db.get_direct_children(u"the_target_directory") + ) + self.assertEqual(expected_paths, actual_paths) + def iterate_downloader(magic): return magic.downloader._processing_iteration() @@ -533,10 +612,8 @@ class FileOperationsHelper(object): propagate. For the mock tests this is easy, since we're sending them sychronously. For the Real tests we have to wait for the actual inotify thing. - - We could write this as a mixin instead; might fit existing style better? """ - _timeout = 5.0 + _timeout = 30.0 def __init__(self, uploader, inject_events=False): self._uploader = uploader @@ -563,6 +640,7 @@ class FileOperationsHelper(object): d = notify_when_pending(self._uploader, path_u) + modified_mtime_barrier(FilePath(fname)) with open(fname, "wb") as f: f.write(contents) @@ -581,7 +659,11 @@ class FileOperationsHelper(object): def delete(self, path_u): fname = path_u d = self._uploader.set_hook('inotify') - os.unlink(fname) + if os.path.isdir(fname): + remove = os.rmdir + else: + remove = os.unlink + remove(fname) self._maybe_notify(fname, self._inotify.IN_DELETE) return d.addTimeout(self._timeout, reactor) @@ -647,8 +729,17 @@ class CheckerMixin(object): def _check_version_in_dmd(self, magicfolder, relpath_u, expected_version): encoded_name_u = magicpath.path2magic(relpath_u) result = yield magicfolder.downloader._get_collective_latest_file(encoded_name_u) - self.assertTrue(result is not None) + self.assertIsNot( + result, + None, + "collective_latest_file({}) is None".format(encoded_name_u), + ) node, metadata = result + self.assertIsNot( + metadata, + None, + "collective_latest_file({}) metadata is None".format(encoded_name_u), + ) self.failUnlessEqual(metadata['version'], expected_version) def _check_version_in_local_db(self, magicfolder, relpath_u, expected_version): @@ -662,8 +753,17 @@ class CheckerMixin(object): self.assertTrue(not os.path.exists(path)) def _check_uploader_count(self, name, expected, magic=None): - self.failUnlessReallyEqual(self._get_count('uploader.'+name, client=(magic or self.alice_magicfolder)._client), - expected) + if magic is None: + magic = self.alice_magicfolder + self.failUnlessReallyEqual( + self._get_count( + 'uploader.'+name, + client=magic._client, + ), + expected, + "Pending: {}\n" + "Deque: {}\n".format(magic.uploader._pending, magic.uploader._deque), + ) def _check_downloader_count(self, name, expected, magic=None): self.failUnlessReallyEqual(self._get_count('downloader.'+name, client=(magic or self.bob_magicfolder)._client), @@ -683,7 +783,8 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea temp = self.mktemp() self.basedir = abspath_expanduser_unicode(temp.decode(get_filesystem_encoding())) # set_up_grid depends on self.basedir existing - self.set_up_grid(num_clients=2, oneshare=True) + with start_action(action_type=u"set_up_grid"): + self.set_up_grid(num_clients=2, oneshare=True) self.alice_clock = task.Clock() self.bob_clock = task.Clock() @@ -761,75 +862,98 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea bob_fname = os.path.join(self.bob_magic_dir, 'blam') alice_proc = self.alice_magicfolder.uploader.set_hook('processed') - yield self.alice_fileops.write(alice_fname, 'contents0\n') - yield iterate(self.alice_magicfolder) # for windows - # alice uploads - yield iterate_uploader(self.alice_magicfolder) - yield alice_proc + with start_action(action_type=u"alice:create"): + yield self.alice_fileops.write(alice_fname, 'contents0\n') + yield iterate(self.alice_magicfolder) # for windows - yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 0) - yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 0) + with start_action(action_type=u"alice:upload"): + yield iterate_uploader(self.alice_magicfolder) + yield alice_proc - # bob downloads - yield iterate_downloader(self.bob_magicfolder) + with start_action(action_type=u"alice:check-upload"): + yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 0) + yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 0) + + with start_action(action_type=u"bob:download"): + yield iterate_downloader(self.bob_magicfolder) + + with start_action(action_type=u"alice:recheck-upload"): + yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 0) + yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 0) + + with start_action(action_type=u"bob:check-download"): + yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 0) + yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 0) + yield self.failUnlessReallyEqual( + self._get_count('downloader.objects_failed', client=self.bob_magicfolder._client), + 0 + ) + yield self.failUnlessReallyEqual( + self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), + 1 + ) + + yield iterate(self.bob_magicfolder) # for windows - # check the state - yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 0) - yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 0) - yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 0) - yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 0) - yield self.failUnlessReallyEqual( - self._get_count('downloader.objects_failed', client=self.bob_magicfolder._client), - 0 - ) - yield self.failUnlessReallyEqual( - self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), - 1 - ) - yield iterate(self.bob_magicfolder) # for windows - # now bob deletes it (bob should upload, alice download) bob_proc = self.bob_magicfolder.uploader.set_hook('processed') alice_proc = self.alice_magicfolder.downloader.set_hook('processed') - yield self.bob_fileops.delete(bob_fname) - yield iterate(self.bob_magicfolder) # for windows - yield iterate_uploader(self.bob_magicfolder) - yield bob_proc - yield iterate_downloader(self.alice_magicfolder) - yield alice_proc + with start_action(action_type=u"bob:delete"): + yield self.bob_fileops.delete(bob_fname) + yield iterate(self.bob_magicfolder) # for windows + + with start_action(action_type=u"bob:upload"): + yield iterate_uploader(self.bob_magicfolder) + yield bob_proc + + with start_action(action_type=u"alice:download"): + yield iterate_downloader(self.alice_magicfolder) + yield alice_proc # check versions - node, metadata = yield self.alice_magicfolder.downloader._get_collective_latest_file(u'blam') - self.assertTrue(metadata['deleted']) - yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 1) - yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 1) - yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 1) - yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 1) + with start_action(action_type=u"bob:check-upload"): + node, metadata = yield self.alice_magicfolder.downloader._get_collective_latest_file(u'blam') + self.assertTrue(metadata['deleted']) + yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 1) + yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 1) - # not *entirely* sure why we need to iterate Alice for the - # real test here. But, we do. - yield iterate(self.alice_magicfolder) + with start_action(action_type=u"alice:check-download"): + yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 1) + yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 1) + + with start_action(action_type=u"alice:mysterious-iterate"): + # not *entirely* sure why we need to iterate Alice for the + # real test here. But, we do. + yield iterate(self.alice_magicfolder) # now alice restores it (alice should upload, bob download) alice_proc = self.alice_magicfolder.uploader.set_hook('processed') bob_proc = self.bob_magicfolder.downloader.set_hook('processed') - yield self.alice_fileops.write(alice_fname, 'new contents\n') - yield iterate(self.alice_magicfolder) # for windows - yield iterate_uploader(self.alice_magicfolder) - yield alice_proc - yield iterate_downloader(self.bob_magicfolder) - yield bob_proc + with start_action(action_type=u"alice:rewrite"): + yield self.alice_fileops.write(alice_fname, 'new contents\n') + yield iterate(self.alice_magicfolder) # for windows + + with start_action(action_type=u"alice:reupload"): + yield iterate_uploader(self.alice_magicfolder) + yield alice_proc + + with start_action(action_type=u"bob:redownload"): + yield iterate_downloader(self.bob_magicfolder) + yield bob_proc # check versions - node, metadata = yield self.alice_magicfolder.downloader._get_collective_latest_file(u'blam') - self.assertTrue('deleted' not in metadata or not metadata['deleted']) - yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 2) - yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 2) - yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 2) - yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 2) + with start_action(action_type=u"bob:recheck-download"): + node, metadata = yield self.alice_magicfolder.downloader._get_collective_latest_file(u'blam') + self.assertTrue('deleted' not in metadata or not metadata['deleted']) + yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 2) + yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 2) + + with start_action(action_type=u"alice:final-check-upload"): + yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 2) + yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 2) @inline_callbacks def test_alice_sees_bobs_delete_with_error(self): @@ -842,52 +966,68 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea alice_proc = self.alice_magicfolder.uploader.set_hook('processed') bob_proc = self.bob_magicfolder.downloader.set_hook('processed') - yield self.alice_fileops.write(alice_fname, 'contents0\n') - yield iterate(self.alice_magicfolder) # for windows + with start_action(action_type=u"alice:create"): + yield self.alice_fileops.write(alice_fname, 'contents0\n') + yield iterate(self.alice_magicfolder) # for windows - yield iterate_uploader(self.alice_magicfolder) - yield alice_proc # alice uploads + with start_action(action_type=u"alice:upload"): + yield iterate_uploader(self.alice_magicfolder) + yield alice_proc # alice uploads - yield iterate_downloader(self.bob_magicfolder) - yield bob_proc # bob downloads + with start_action(action_type=u"bob:download"): + yield iterate_downloader(self.bob_magicfolder) + yield bob_proc # bob downloads - yield iterate(self.alice_magicfolder) # for windows - yield iterate(self.bob_magicfolder) # for windows + with start_action(action_type=u"mysterious:iterate"): + yield iterate(self.alice_magicfolder) # for windows + yield iterate(self.bob_magicfolder) # for windows # check the state (XXX I had to switch the versions to 0; is that really right? why?) - yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 0) - yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 0) - yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 0) - yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 0) - self.failUnlessReallyEqual( - self._get_count('downloader.objects_failed', client=self.bob_magicfolder._client), - 0 - ) - self.failUnlessReallyEqual( - self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), - 1 - ) + with start_action(action_type=u"alice:check"): + yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 0) + yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 0) + + with start_action(action_type=u"bob:check"): + yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 0) + yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 0) + self.failUnlessReallyEqual( + self._get_count('downloader.objects_failed', client=self.bob_magicfolder._client), + 0 + ) + self.failUnlessReallyEqual( + self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), + 1 + ) - # now bob deletes it (bob should upload, alice download) bob_proc = self.bob_magicfolder.uploader.set_hook('processed') alice_proc = self.alice_magicfolder.downloader.set_hook('processed') - yield self.bob_fileops.delete(bob_fname) - # just after notifying bob, we also delete alice's, - # covering the 'except' flow in _rename_deleted_file() - yield self.alice_fileops.delete(alice_fname) - yield iterate_uploader(self.bob_magicfolder) - yield bob_proc - yield iterate_downloader(self.alice_magicfolder) - yield alice_proc + with start_action(action_type=u"bob:delete"): + yield self.bob_fileops.delete(bob_fname) + + with start_action(action_type=u"alice:delete"): + # just after notifying bob, we also delete alice's, + # covering the 'except' flow in _rename_deleted_file() + yield self.alice_fileops.delete(alice_fname) + + with start_action(action_type=u"bob:upload-delete"): + yield iterate_uploader(self.bob_magicfolder) + yield bob_proc + + with start_action(action_type=u"alice:download-delete"): + yield iterate_downloader(self.alice_magicfolder) + yield alice_proc # check versions - node, metadata = yield self.alice_magicfolder.downloader._get_collective_latest_file(u'blam') - self.assertTrue(metadata['deleted']) - yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 1) - yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 1) - yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 1) - yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 1) + with start_action(action_type=u"bob:check"): + node, metadata = yield self.alice_magicfolder.downloader._get_collective_latest_file(u'blam') + self.assertTrue(metadata['deleted']) + yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 1) + yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 1) + + with start_action(action_type=u"alice:check"): + yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 1) + yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 1) @inline_callbacks def test_alice_create_bob_update(self): @@ -1020,38 +1160,79 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea alice_fname = os.path.join(self.alice_magic_dir, 'localchange1') bob_fname = os.path.join(self.bob_magic_dir, 'localchange1') - # alice creates a file, bob downloads it alice_proc = self.alice_magicfolder.uploader.set_hook('processed') bob_proc = self.bob_magicfolder.downloader.set_hook('processed') - yield self.alice_fileops.write(alice_fname, 'contents0\n') - yield iterate(self.alice_magicfolder) # for windows + with start_action(action_type=u"alice:create"): + yield self.alice_fileops.write(alice_fname, 'contents0\n') + yield iterate(self.alice_magicfolder) # for windows - yield iterate_uploader(self.alice_magicfolder) - yield alice_proc # alice uploads + with start_action(action_type=u"alice:upload"): + yield iterate_uploader(self.alice_magicfolder) + yield alice_proc # alice uploads + self.assertEqual( + 1, + self._get_count( + 'uploader.files_uploaded', + client=self.alice_magicfolder._client, + ), + ) - yield iterate_downloader(self.bob_magicfolder) - yield bob_proc # bob downloads + with start_action(action_type=u"bob:download"): + yield iterate_downloader(self.bob_magicfolder) + yield bob_proc # bob downloads + self.assertEqual( + 1, + self._get_count( + 'downloader.objects_downloaded', + client=self.bob_magicfolder._client, + ), + ) - # alice creates a new change alice_proc = self.alice_magicfolder.uploader.set_hook('processed') bob_proc = self.bob_magicfolder.downloader.set_hook('processed') - yield self.alice_fileops.write(alice_fname, 'contents1\n') - yield iterate(self.alice_magicfolder) # for windows + with start_action(action_type=u"alice:rewrite"): + yield self.alice_fileops.write(alice_fname, 'contents1\n') + yield iterate(self.alice_magicfolder) # for windows - # before bob downloads, make a local change - with open(bob_fname, "w") as f: - f.write("bob's local change") + with start_action(action_type=u"bob:rewrite"): + # before bob downloads, make a local change + with open(bob_fname, "w") as f: + f.write("bob's local change") - yield iterate_uploader(self.alice_magicfolder) - yield alice_proc # alice uploads + with start_action(action_type=u"alice:reupload"): + yield iterate_uploader(self.alice_magicfolder) + yield alice_proc # alice uploads + self.assertEqual( + 2, + self._get_count( + 'uploader.files_uploaded', + client=self.alice_magicfolder._client, + ), + ) - yield iterate_downloader(self.bob_magicfolder) - yield bob_proc # bob downloads + with start_action(action_type=u"bob:redownload-and-conflict"): + yield iterate_downloader(self.bob_magicfolder) + yield bob_proc # bob downloads - # ...so now bob should produce a conflict - self.assertTrue(os.path.exists(bob_fname + '.conflict')) + self.assertEqual( + 2, + self._get_count( + 'downloader.objects_downloaded', + client=self.bob_magicfolder._client, + ), + ) + self.assertEqual( + 1, + self._get_count( + 'downloader.objects_conflicted', + client=self.bob_magicfolder._client, + ), + ) + + # ...so now bob should produce a conflict + self.assertTrue(os.path.exists(bob_fname + '.conflict')) @inline_callbacks def test_alice_delete_and_restore(self): @@ -1062,77 +1243,91 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea alice_proc = self.alice_magicfolder.uploader.set_hook('processed') bob_proc = self.bob_magicfolder.downloader.set_hook('processed') - yield self.alice_fileops.write(alice_fname, 'contents0\n') - yield iterate(self.alice_magicfolder) # for windows + with start_action(action_type=u"alice:create"): + yield self.alice_fileops.write(alice_fname, 'contents0\n') + yield iterate(self.alice_magicfolder) # for windows - yield iterate_uploader(self.alice_magicfolder) - yield alice_proc # alice uploads + with start_action(action_type=u"alice:upload"): + yield iterate_uploader(self.alice_magicfolder) + yield alice_proc # alice uploads - yield iterate_downloader(self.bob_magicfolder) - yield bob_proc # bob downloads + with start_action(action_type=u"bob:download"): + yield iterate_downloader(self.bob_magicfolder) + yield bob_proc # bob downloads - # check the state - yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 0) - yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 0) - yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 0) - yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 0) - yield self.failUnlessReallyEqual( - self._get_count('downloader.objects_failed', client=self.bob_magicfolder._client), - 0 - ) - yield self.failUnlessReallyEqual( - self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), - 1 - ) - self.failUnless(os.path.exists(bob_fname)) - self.failUnless(not os.path.exists(bob_fname + '.backup')) - self.failUnless(not os.path.exists(bob_fname + '.conflict')) + with start_action(action_type=u"alice:check"): + yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 0) + yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 0) + + with start_action(action_type=u"bob:check"): + yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 0) + yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 0) + yield self.failUnlessReallyEqual( + self._get_count('downloader.objects_failed', client=self.bob_magicfolder._client), + 0 + ) + yield self.failUnlessReallyEqual( + self._get_count('downloader.objects_downloaded', client=self.bob_magicfolder._client), + 1 + ) + self.failUnless(os.path.exists(bob_fname)) + self.failUnless(not os.path.exists(bob_fname + '.backup')) + self.failUnless(not os.path.exists(bob_fname + '.conflict')) - # now alice deletes it (alice should upload, bob download) alice_proc = self.alice_magicfolder.uploader.set_hook('processed') bob_proc = self.bob_magicfolder.downloader.set_hook('processed') - yield self.alice_fileops.delete(alice_fname) - yield iterate_uploader(self.alice_magicfolder) - yield alice_proc - yield iterate_downloader(self.bob_magicfolder) - yield bob_proc + with start_action(action_type=u"alice:delete"): + yield self.alice_fileops.delete(alice_fname) + yield iterate_uploader(self.alice_magicfolder) + yield alice_proc - # check the state - yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 1) - yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 1) - yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 1) - yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 1) - self.assertFalse(os.path.exists(bob_fname)) - self.assertTrue(os.path.exists(bob_fname + '.backup')) - self.assertFalse(os.path.exists(bob_fname + '.conflict')) + with start_action(action_type=u"bob:redownload"): + yield iterate_downloader(self.bob_magicfolder) + yield bob_proc - # now alice restores the file (with new contents) - os.unlink(bob_fname + '.backup') - alice_proc = self.alice_magicfolder.uploader.set_hook('processed') - bob_proc = self.bob_magicfolder.downloader.set_hook('processed') - yield self.alice_fileops.write(alice_fname, 'alice wuz here\n') - yield iterate(self.alice_magicfolder) # for windows + with start_action(action_type=u"bob:recheck"): + yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 1) + yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 1) + self.assertFalse(os.path.exists(bob_fname)) + self.assertTrue(os.path.exists(bob_fname + '.backup')) + self.assertFalse(os.path.exists(bob_fname + '.conflict')) - yield iterate_uploader(self.alice_magicfolder) - yield iterate_downloader(self.alice_magicfolder) # why? - yield alice_proc - yield iterate_downloader(self.bob_magicfolder) - yield iterate_uploader(self.bob_magicfolder) - yield bob_proc + with start_action(action_type=u"alice:recheck"): + yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 1) + yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 1) - # check the state - yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 2) - yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 2) - yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 2) - yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 2) - self.failUnless(os.path.exists(bob_fname)) + with start_action(action_type=u"alice:restore"): + os.unlink(bob_fname + '.backup') + alice_proc = self.alice_magicfolder.uploader.set_hook('processed') + bob_proc = self.bob_magicfolder.downloader.set_hook('processed') + yield self.alice_fileops.write(alice_fname, 'alice wuz here\n') + yield iterate(self.alice_magicfolder) # for windows + + with start_action(action_type=u"alice:reupload"): + yield iterate_uploader(self.alice_magicfolder) + yield iterate_downloader(self.alice_magicfolder) # why? + yield alice_proc + + with start_action(action_type=u"bob:final-redownload"): + yield iterate_downloader(self.bob_magicfolder) + yield iterate_uploader(self.bob_magicfolder) + yield bob_proc + + with start_action(action_type=u"bob:final-check"): + yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 2) + yield self._check_version_in_local_db(self.bob_magicfolder, u"blam", 2) + self.failUnless(os.path.exists(bob_fname)) + + with start_action(action_type=u"alice:final-check"): + yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 2) + yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 2) # XXX this should be shortened -- as in, any cases not covered by # the other tests in here should get their own minimal test-case. @skipIf(sys.platform == "win32", "Still inotify problems on Windows (FIXME)") def test_alice_bob(self): - d = defer.succeed(None) + d = DeferredContext(defer.succeed(None)) # XXX FIXME just quickly porting this test via aliases -- the # "real" solution is to break out any relevant test-cases as @@ -1181,22 +1376,25 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea yield iterate(self.alice_magicfolder) d.addCallback(_wait_for, Alice_to_write_a_file) - d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0)) - d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 0)) - d.addCallback(lambda ign: self._check_uploader_count('objects_failed', 0)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 1)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 1)) - d.addCallback(lambda ign: self._check_uploader_count('objects_queued', 0)) - d.addCallback(lambda ign: self._check_uploader_count('directories_created', 0)) - d.addCallback(lambda ign: self._check_uploader_count('objects_conflicted', 0)) - d.addCallback(lambda ign: self._check_uploader_count('objects_conflicted', 0, magic=self.bob_magicfolder)) + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0) + self._check_version_in_local_db(self.alice_magicfolder, u"file1", 0) + self._check_uploader_count('objects_failed', 0) + self._check_uploader_count('objects_succeeded', 1) + self._check_uploader_count('files_uploaded', 1) + self._check_uploader_count('objects_queued', 0) + self._check_uploader_count('directories_created', 0) + self._check_uploader_count('objects_conflicted', 0) + self._check_uploader_count('objects_conflicted', 0, magic=self.bob_magicfolder) - d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 0)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 1)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 0, magic=self.bob_magicfolder)) -# d.addCallback(lambda ign: self._check_uploader_count('objects_not_uploaded', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 1, magic=self.bob_magicfolder)) + self._check_version_in_local_db(self.bob_magicfolder, u"file1", 0) + self._check_downloader_count('objects_failed', 0) + self._check_downloader_count('objects_downloaded', 1) + self._check_uploader_count('objects_succeeded', 0, magic=self.bob_magicfolder) + self._check_downloader_count('objects_downloaded', 1, magic=self.bob_magicfolder) + d.addCallback(check_state) @inline_callbacks def Alice_to_delete_file(): @@ -1217,19 +1415,22 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea yield iterate(self.bob_magicfolder) d.addCallback(notify_bob_moved) - d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 1)) - d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 1)) - d.addCallback(lambda ign: self._check_uploader_count('objects_failed', 0)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 2)) -# d.addCallback(lambda ign: self._check_uploader_count('objects_not_uploaded', 1, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 0, magic=self.bob_magicfolder)) + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.alice_magicfolder, u"file1", 1) + self._check_version_in_local_db(self.alice_magicfolder, u"file1", 1) + self._check_uploader_count('objects_failed', 0) + self._check_uploader_count('objects_succeeded', 2) + self._check_uploader_count('objects_succeeded', 0, magic=self.bob_magicfolder) - d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1)) - d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 1)) - d.addCallback(lambda ign: self._check_file_gone(self.bob_magicfolder, u"file1")) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 2)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 2, magic=self.bob_magicfolder)) + self._check_version_in_local_db(self.bob_magicfolder, u"file1", 1) + self._check_version_in_dmd(self.bob_magicfolder, u"file1", 1) + self._check_file_gone(self.bob_magicfolder, u"file1") + self._check_downloader_count('objects_failed', 0) + self._check_downloader_count('objects_downloaded', 2) + self._check_downloader_count('objects_downloaded', 2, magic=self.bob_magicfolder) + d.addCallback(check_state) @inline_callbacks def Alice_to_rewrite_file(): @@ -1241,24 +1442,27 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea ) yield iterate(self.alice_magicfolder) d.addCallback(_wait_for, Alice_to_rewrite_file) - d.addCallback(lambda ign: iterate(self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 2)) - d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 2)) - d.addCallback(lambda ign: self._check_uploader_count('objects_failed', 0)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 3)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 3)) - d.addCallback(lambda ign: self._check_uploader_count('objects_queued', 0)) - d.addCallback(lambda ign: self._check_uploader_count('directories_created', 0)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0)) - d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 2)) - d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 2)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 3)) -# d.addCallback(lambda ign: self._check_uploader_count('objects_not_uploaded', 1, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 0, magic=self.bob_magicfolder)) + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.alice_magicfolder, u"file1", 2) + self._check_version_in_local_db(self.alice_magicfolder, u"file1", 2) + self._check_uploader_count('objects_failed', 0) + self._check_uploader_count('objects_succeeded', 3) + self._check_uploader_count('files_uploaded', 3) + self._check_uploader_count('objects_queued', 0) + self._check_uploader_count('directories_created', 0) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_conflicted', 0) + + self._check_version_in_dmd(self.bob_magicfolder, u"file1", 2) + self._check_version_in_local_db(self.bob_magicfolder, u"file1", 2) + self._check_downloader_count('objects_failed', 0) + self._check_downloader_count('objects_downloaded', 3) + self._check_uploader_count('objects_succeeded', 0, magic=self.bob_magicfolder) + d.addCallback(check_state) path_u = u"/tmp/magic_folder_test" encoded_path_u = magicpath.path2magic(u"/tmp/magic_folder_test") @@ -1279,12 +1483,14 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea return d2 d.addCallback(Alice_tries_to_p0wn_Bob) - d.addCallback(lambda ign: self.failIf(os.path.exists(path_u))) - d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, encoded_path_u, None)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 3)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0)) -# d.addCallback(lambda ign: self._check_uploader_count('objects_not_uploaded', 2, magic=self.bob_magicfolder)) + @log_call(action_type=u"check_state", include_args=[], include_result=False) + def check_state(ignored): + self.failIf(os.path.exists(path_u)) + self._check_version_in_local_db(self.bob_magicfolder, encoded_path_u, None) + self._check_downloader_count('objects_downloaded', 3) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_conflicted', 0) + d.addCallback(check_state) @inline_callbacks def Bob_to_rewrite_file(): @@ -1295,21 +1501,24 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea yield iterate(self.bob_magicfolder) d.addCallback(lambda ign: _wait_for(None, Bob_to_rewrite_file, alice=False)) - d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file1", 3)) - d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file1", 3)) -# d.addCallback(lambda ign: self._check_uploader_count('objects_not_uploaded', 1, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 1, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 1, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_queued', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.bob_magicfolder)) + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.bob_magicfolder, u"file1", 3) + self._check_version_in_local_db(self.bob_magicfolder, u"file1", 3) + self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder) + self._check_uploader_count('objects_succeeded', 1, magic=self.bob_magicfolder) + self._check_uploader_count('files_uploaded', 1, magic=self.bob_magicfolder) + self._check_uploader_count('objects_queued', 0, magic=self.bob_magicfolder) + self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder) + self._check_downloader_count('objects_conflicted', 0, magic=self.bob_magicfolder) - d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 3)) - d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file1", 3)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 1, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) + self._check_version_in_dmd(self.alice_magicfolder, u"file1", 3) + self._check_version_in_local_db(self.alice_magicfolder, u"file1", 3) + self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_downloaded', 1, magic=self.alice_magicfolder) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + d.addCallback(check_state) def Alice_conflicts_with_Bobs_last_downloaded_uri(): if _debug: print "Alice conflicts with Bob\n" @@ -1325,17 +1534,21 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea d2.addCallback(lambda ign: downloaded_d) d2.addCallback(lambda ign: self.failUnless(alice_dmd.has_child(encoded_path_u))) return d2 - d.addCallback(lambda ign: Alice_conflicts_with_Bobs_last_downloaded_uri()) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 4, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 1, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 1, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 1, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 1, magic=self.bob_magicfolder)) + + @log_call(action_type=u"check_state", include_args=[], include_result=False) + def check_state(ignored): + self._check_downloader_count('objects_downloaded', 4, magic=self.bob_magicfolder) + self._check_downloader_count('objects_conflicted', 1, magic=self.bob_magicfolder) + self._check_downloader_count('objects_downloaded', 1, magic=self.alice_magicfolder) + self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + self._check_uploader_count('files_uploaded', 1, magic=self.bob_magicfolder) + self._check_uploader_count('objects_succeeded', 1, magic=self.bob_magicfolder) + d.addCallback(check_state) # prepare to perform another conflict test + @log_call_deferred(action_type=u"alice:to-write:file2") @inline_callbacks def Alice_to_write_file2(): if _debug: print "Alice writes a file2\n" @@ -1344,11 +1557,16 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea self.bob_clock.advance(4) yield d d.addCallback(_wait_for, Alice_to_write_file2) - d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file2", 0)) - d.addCallback(lambda ign: self._check_version_in_local_db(self.alice_magicfolder, u"file2", 0)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 1, magic=self.bob_magicfolder)) + + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.alice_magicfolder, u"file2", 0) + self._check_version_in_local_db(self.alice_magicfolder, u"file2", 0) + self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + self._check_uploader_count('files_uploaded', 1, magic=self.bob_magicfolder) + d.addCallback(check_state) def advance(ign): alice_clock.advance(4) @@ -1375,34 +1593,43 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea yield iterate(self.bob_magicfolder) if _debug: print "---- iterated bob's magicfolder" d.addCallback(lambda ign: _wait_for(None, Bob_to_rewrite_file2, alice=False)) - d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file2", 1)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 5, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 1, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 2, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 2, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_queued', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder)) + + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.bob_magicfolder, u"file2", 1) + self._check_downloader_count('objects_downloaded', 5, magic=self.bob_magicfolder) + self._check_downloader_count('objects_conflicted', 1, magic=self.bob_magicfolder) + self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder) + self._check_uploader_count('objects_succeeded', 2, magic=self.bob_magicfolder) + self._check_uploader_count('files_uploaded', 2, magic=self.bob_magicfolder) + self._check_uploader_count('objects_queued', 0, magic=self.bob_magicfolder) + self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder) + d.addCallback(check_state) # XXX here we advance the clock and then test again to make sure no values are monotonically increasing # with each queue turn ;-p alice_clock.advance(6) bob_clock.advance(6) - d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file2", 1)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 5)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 1)) - d.addCallback(lambda ign: self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 2, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 2, magic=self.bob_magicfolder)) -## d.addCallback(lambda ign: self._check_uploader_count('objects_queued', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 2, magic=self.bob_magicfolder)) + + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.bob_magicfolder, u"file2", 1) + self._check_downloader_count('objects_downloaded', 5) + self._check_downloader_count('objects_conflicted', 1) + self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder) + self._check_uploader_count('objects_succeeded', 2, magic=self.bob_magicfolder) + self._check_uploader_count('files_uploaded', 2, magic=self.bob_magicfolder) + self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder) + self._check_uploader_count('files_uploaded', 2, magic=self.bob_magicfolder) + d.addCallback(check_state) def Alice_conflicts_with_Bobs_last_uploaded_uri(): if _debug: print "Alice conflicts with Bob\n" @@ -1420,17 +1647,21 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea d2.addCallback(lambda ign: self.failUnless(alice_dmd.has_child(encoded_path_u))) return d2 d.addCallback(lambda ign: Alice_conflicts_with_Bobs_last_uploaded_uri()) - d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file2", 5)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 6)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 1)) - d.addCallback(lambda ign: self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 2, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 2, magic=self.bob_magicfolder)) -## d.addCallback(lambda ign: self._check_uploader_count('objects_queued', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder)) + + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.bob_magicfolder, u"file2", 5) + self._check_downloader_count('objects_downloaded', 6) + self._check_downloader_count('objects_conflicted', 1) + self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder) + self._check_uploader_count('objects_succeeded', 2, magic=self.bob_magicfolder) + self._check_uploader_count('files_uploaded', 2, magic=self.bob_magicfolder) + self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder) + d.addCallback(check_state) def foo(ign): alice_clock.advance(6) @@ -1439,10 +1670,13 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea bob_clock.advance(6) d.addCallback(foo) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 1)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 6)) + @log_call(action_type=u"check_state", include_args=[], include_result=False) + def check_state(ignored): + self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_conflicted', 1) + self._check_downloader_count('objects_downloaded', 6) + d.addCallback(check_state) # prepare to perform another conflict test @inline_callbacks @@ -1451,13 +1685,20 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea self.file_path = abspath_expanduser_unicode(u"file3", base=self.alice_magicfolder.uploader._local_path_u) yield self.alice_fileops.write(self.file_path, "something") yield iterate(self.alice_magicfolder) + # Make sure Bob gets the file before we do anything else. + yield iterate(self.bob_magicfolder) d.addCallback(_wait_for, Alice_to_write_file3) - d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file3", 0)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 7)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 1)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) + + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.alice_magicfolder, u"file3", 0) + self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_downloaded', 7) + self._check_downloader_count('objects_downloaded', 2, magic=self.alice_magicfolder) + self._check_downloader_count('objects_conflicted', 1) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + d.addCallback(check_state) @inline_callbacks def Bob_to_rewrite_file3(): @@ -1468,21 +1709,24 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea yield self.bob_fileops.write(self.file_path, "roger roger") yield iterate(self.bob_magicfolder) d.addCallback(lambda ign: _wait_for(None, Bob_to_rewrite_file3, alice=False)) - d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file3", 1)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 7)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 1)) - d.addCallback(lambda ign: self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 3, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('files_uploaded', 3, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('objects_queued', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder)) - d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 3, magic=self.alice_magicfolder)) - return d + @log_call_deferred(action_type=u"check_state") + @inline_callbacks + def check_state(ignored): + yield self._check_version_in_dmd(self.bob_magicfolder, u"file3", 1) + self._check_downloader_count('objects_downloaded', 7) + self._check_downloader_count('objects_conflicted', 1) + self._check_uploader_count('objects_failed', 0, magic=self.bob_magicfolder) + self._check_uploader_count('objects_succeeded', 3, magic=self.bob_magicfolder) + self._check_uploader_count('files_uploaded', 3, magic=self.bob_magicfolder) + self._check_uploader_count('objects_queued', 0, magic=self.bob_magicfolder) + self._check_uploader_count('directories_created', 0, magic=self.bob_magicfolder) + self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_failed', 0, magic=self.alice_magicfolder) + self._check_downloader_count('objects_downloaded', 3, magic=self.alice_magicfolder) + d.addCallback(check_state) - test_alice_bob.timeout = 300 + return d.addActionFinish() class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, ReallyEqualMixin, CheckerMixin): @@ -1527,7 +1771,6 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall def tearDown(self): d = DeferredContext(super(SingleMagicFolderTestMixin, self).tearDown()) d.addCallback(self.cleanup) - shutil.rmtree(self.basedir, ignore_errors=True) return d.result def _createdb(self): @@ -1951,6 +2194,49 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall self.assertThat(metadata['version'], Equals(1)) self.assertThat(metadata['deleted'], Equals(True)) + def test_delete_sub_directory_containing_file(self): + reldir_u = u'subdir' + relpath_u = os.path.join(reldir_u, u'some-file') + content = u'some great content' + yield self._create_directory_with_file( + relpath_u, + content, + ) + # Delete the sub-directory and the file in it. Don't wait in between + # because the case where all events are delivered before any + # processing happens is interesting. And don't use the fileops API to + # delete the contained file so that we don't necessarily generate a + # notification for that path at all. We require that the + # implementation behave correctly when receiving only the notification + # for the containing directory. + os.unlink(os.path.join(self.local_dir, relpath_u)) + yield self.fileops.delete(os.path.join(self.local_dir, reldir_u)) + + # Now allow processing. + yield iterate(self.magicfolder) + # Give it some extra time because of recursive directory processing. + yield iterate(self.magicfolder) + + # Deletion of both entities should have been uploaded. + downloader = self.magicfolder.downloader + encoded_dir_u = magicpath.path2magic(reldir_u + u"/") + encoded_path_u = magicpath.path2magic(relpath_u) + + dir_node, dir_meta = yield downloader._get_collective_latest_file(encoded_dir_u) + path_node, path_meta = yield downloader._get_collective_latest_file(encoded_path_u) + + self.expectThat(dir_node, Not(Is(None)), "dir node") + self.expectThat(dir_meta, ContainsDict({ + "version": Equals(1), + "deleted": Equals(True), + }), "dir meta") + + self.expectThat(path_node, Not(Is(None)), "path node") + self.expectThat(path_meta, ContainsDict({ + "version": Equals(1), + "deleted": Equals(True), + }), "path meta") + @skipIf(support_missing, support_message) class MockTestAliceBob(MagicFolderAliceBobTestMixin, AsyncTestCase): @@ -2127,6 +2413,7 @@ class RealTest(SingleMagicFolderTestMixin, AsyncTestCase): class RealTestAliceBob(MagicFolderAliceBobTestMixin, AsyncTestCase): """This is skipped unless both Twisted and the platform support inotify.""" inject_inotify = False + timeout = 15 def setUp(self): d = super(RealTestAliceBob, self).setUp() diff --git a/src/allmydata/util/eliotutil.py b/src/allmydata/util/eliotutil.py index 696a66afc..535f4913d 100644 --- a/src/allmydata/util/eliotutil.py +++ b/src/allmydata/util/eliotutil.py @@ -361,6 +361,7 @@ class _EliotLogging(Service): self.twisted_observer = _TwistedLoggerToEliotObserver() globalLogPublisher.addObserver(self.twisted_observer) add_destinations(*self.destinations) + return Service.startService(self) def stopService(self): @@ -368,7 +369,7 @@ class _EliotLogging(Service): remove_destination(dest) globalLogPublisher.removeObserver(self.twisted_observer) self.stdlib_cleanup() - + return Service.stopService(self) @implementer(ILogObserver) diff --git a/src/allmydata/util/fake_inotify.py b/src/allmydata/util/fake_inotify.py index 15893243a..45d360105 100644 --- a/src/allmydata/util/fake_inotify.py +++ b/src/allmydata/util/fake_inotify.py @@ -74,6 +74,8 @@ def humanReadableMask(mask): return s +from eliot import start_action + # This class is not copied from Twisted; it acts as a mock. class INotify(object): def startReading(self): @@ -89,8 +91,9 @@ class INotify(object): self.callbacks = callbacks def event(self, filepath, mask): - for cb in self.callbacks: - cb(None, filepath, mask) + with start_action(action_type=u"fake-inotify:event", path=filepath.path, mask=mask): + for cb in self.callbacks: + cb(None, filepath, mask) __all__ = ["INotify", "humanReadableMask", "IN_WATCH_MASK", "IN_ACCESS", diff --git a/src/allmydata/watchdog/__init__.py b/src/allmydata/watchdog/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/allmydata/watchdog/_watchdog_541.py b/src/allmydata/watchdog/_watchdog_541.py new file mode 100644 index 000000000..87686ce6d --- /dev/null +++ b/src/allmydata/watchdog/_watchdog_541.py @@ -0,0 +1,16 @@ +""" +Hotfix for https://github.com/gorakhargosh/watchdog/issues/541 +""" + +from watchdog.observers.fsevents import FSEventsEmitter + +# The class object has already been bundled up in the default arguments to +# FSEventsObserver.__init__. So mutate the class object (instead of replacing +# it with a safer version). +original_on_thread_stop = FSEventsEmitter.on_thread_stop +def safe_on_thread_stop(self): + if self.is_alive(): + return original_on_thread_stop(self) + +def patch(): + FSEventsEmitter.on_thread_stop = safe_on_thread_stop diff --git a/src/allmydata/watchdog/inotify.py b/src/allmydata/watchdog/inotify.py new file mode 100644 index 000000000..68a121a60 --- /dev/null +++ b/src/allmydata/watchdog/inotify.py @@ -0,0 +1,212 @@ + +""" +An implementation of an inotify-like interface on top of the ``watchdog`` library. +""" + +from __future__ import ( + unicode_literals, + print_function, + absolute_import, + division, +) + +__all__ = [ + "humanReadableMask", "INotify", + "IN_WATCH_MASK", "IN_ACCESS", "IN_MODIFY", "IN_ATTRIB", "IN_CLOSE_NOWRITE", + "IN_CLOSE_WRITE", "IN_OPEN", "IN_MOVED_FROM", "IN_MOVED_TO", "IN_CREATE", + "IN_DELETE", "IN_DELETE_SELF", "IN_MOVE_SELF", "IN_UNMOUNT", "IN_ONESHOT", + "IN_Q_OVERFLOW", "IN_IGNORED", "IN_ONLYDIR", "IN_DONT_FOLLOW", "IN_MOVED", + "IN_MASK_ADD", "IN_ISDIR", "IN_CLOSE", "IN_CHANGED", "_FLAG_TO_HUMAN", +] + +from watchdog.observers import Observer +from watchdog.events import ( + FileSystemEvent, + FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent, + DirDeletedEvent, FileDeletedEvent, FileModifiedEvent +) + +from twisted.internet import reactor +from twisted.python.filepath import FilePath +from allmydata.util.fileutil import abspath_expanduser_unicode + +from eliot import ( + ActionType, + Message, + Field, + preserve_context, + start_action, +) + +from allmydata.util.pollmixin import PollMixin +from allmydata.util.assertutil import _assert, precondition +from allmydata.util import encodingutil +from allmydata.util.fake_inotify import humanReadableMask, \ + IN_WATCH_MASK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_NOWRITE, IN_CLOSE_WRITE, \ + IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, IN_CREATE, IN_DELETE, IN_DELETE_SELF, \ + IN_MOVE_SELF, IN_UNMOUNT, IN_Q_OVERFLOW, IN_IGNORED, IN_ONLYDIR, IN_DONT_FOLLOW, \ + IN_MASK_ADD, IN_ISDIR, IN_ONESHOT, IN_CLOSE, IN_MOVED, IN_CHANGED, \ + _FLAG_TO_HUMAN + +from ..util.eliotutil import ( + MAYBE_NOTIFY, + CALLBACK, + validateInstanceOf, +) + +from . import _watchdog_541 + +_watchdog_541.patch() + +NOT_STARTED = "NOT_STARTED" +STARTED = "STARTED" +STOPPING = "STOPPING" +STOPPED = "STOPPED" + +_PATH = Field.for_types( + u"path", + [bytes, unicode], + u"The path an inotify event concerns.", +) + +_EVENT = Field( + u"event", + lambda e: e.__class__.__name__, + u"The watchdog event that has taken place.", + validateInstanceOf(FileSystemEvent), +) + +ANY_INOTIFY_EVENT = ActionType( + u"watchdog:inotify:any-event", + [_PATH, _EVENT], + [], + u"An inotify event is being dispatched.", +) + +class INotifyEventHandler(FileSystemEventHandler): + def __init__(self, path, mask, callbacks, pending_delay): + FileSystemEventHandler.__init__(self) + self._path = path + self._mask = mask + self._callbacks = callbacks + self._pending_delay = pending_delay + self._pending = set() + + def _maybe_notify(self, path, event): + with MAYBE_NOTIFY(): + event_mask = IN_CHANGED + if isinstance(event, FileModifiedEvent): + event_mask = event_mask | IN_CLOSE_WRITE + event_mask = event_mask | IN_MODIFY + if isinstance(event, (DirCreatedEvent, FileCreatedEvent)): + # For our purposes, IN_CREATE is irrelevant. + event_mask = event_mask | IN_CLOSE_WRITE + if isinstance(event, (DirDeletedEvent, FileDeletedEvent)): + event_mask = event_mask | IN_DELETE + if event.is_directory: + event_mask = event_mask | IN_ISDIR + if not (self._mask & event_mask): + return + for cb in self._callbacks: + try: + with CALLBACK(inotify_events=event_mask): + cb(None, FilePath(path), event_mask) + except: + # Eliot already logged the exception for us. + # There's nothing else we can do about it here. + pass + + def process(self, event): + event_filepath_u = event.src_path.decode(encodingutil.get_filesystem_encoding()) + event_filepath_u = abspath_expanduser_unicode(event_filepath_u, base=self._path) + + if event_filepath_u == self._path: + # ignore events for parent directory + return + + self._maybe_notify(event_filepath_u, event) + + def on_any_event(self, event): + with ANY_INOTIFY_EVENT(path=event.src_path, event=event): + reactor.callFromThread( + preserve_context(self.process), + event, + ) + + +class INotify(PollMixin): + """ + I am a prototype INotify, made to work on Mac OS X (Darwin) + using the Watchdog python library. This is actually a simplified subset + of the twisted Linux INotify class because we do not utilize the watch mask + and only implement the following methods: + - watch + - startReading + - stopReading + - wait_until_stopped + - set_pending_delay + """ + def __init__(self): + self._pending_delay = 1.0 + self.recursive_includes_new_subdirectories = False + self._callbacks = {} + self._watches = {} + self._state = NOT_STARTED + self._observer = Observer(timeout=self._pending_delay) + + def set_pending_delay(self, delay): + Message.log(message_type=u"watchdog:inotify:set-pending-delay", delay=delay) + assert self._state != STARTED + self._pending_delay = delay + + def startReading(self): + with start_action(action_type=u"watchdog:inotify:start-reading"): + assert self._state != STARTED + try: + # XXX twisted.internet.inotify doesn't require watches to + # be set before startReading is called. + # _assert(len(self._callbacks) != 0, "no watch set") + self._observer.start() + self._state = STARTED + except: + self._state = STOPPED + raise + + def stopReading(self): + with start_action(action_type=u"watchdog:inotify:stop-reading"): + if self._state != STOPPED: + self._state = STOPPING + self._observer.unschedule_all() + self._observer.stop() + self._observer.join() + self._state = STOPPED + + def wait_until_stopped(self): + return self.poll(lambda: self._state == STOPPED) + + def _isWatched(self, path_u): + return path_u in self._callbacks.keys() + + def ignore(self, path): + path_u = path.path + self._observer.unschedule(self._watches[path_u]) + del self._callbacks[path_u] + del self._watches[path_u] + + def watch(self, path, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False): + precondition(isinstance(autoAdd, bool), autoAdd=autoAdd) + precondition(isinstance(recursive, bool), recursive=recursive) + assert autoAdd == False + + path_u = path.path + if not isinstance(path_u, unicode): + path_u = path_u.decode('utf-8') + _assert(isinstance(path_u, unicode), path_u=path_u) + + if path_u not in self._callbacks.keys(): + self._callbacks[path_u] = callbacks or [] + self._watches[path_u] = self._observer.schedule( + INotifyEventHandler(path_u, mask, self._callbacks[path_u], self._pending_delay), + path=path_u, + recursive=False, + ) diff --git a/src/allmydata/windows/inotify.py b/src/allmydata/windows/inotify.py index b36051690..5e29a00ec 100644 --- a/src/allmydata/windows/inotify.py +++ b/src/allmydata/windows/inotify.py @@ -154,6 +154,7 @@ class FileNotifyInformation(object): bytes = self._read_dword(pos+8) s = Event(self._read_dword(pos+4), self.data[pos+12 : pos+12+bytes].decode('utf-16-le')) + Message.log(message_type="fni", info=repr(s)) next_entry_offset = self._read_dword(pos) yield s @@ -310,7 +311,6 @@ class INotify(PollMixin): if self._check_stop(): return for info in fni: - # print info path = self._path.preauthChild(info.filename) # FilePath with Unicode path if info.action == FILE_ACTION_MODIFIED and path.isdir(): Message.log(