Merge pull request #558 from tahoe-lafs/1432.watchdog-magic-folder-with-eliot

Add support for macOS to Magic-Folders.

Fixes: ticket:1432
This commit is contained in:
Jean-Paul Calderone 2019-03-14 13:02:09 -04:00 committed by GitHub
commit c1e6f08813
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1262 additions and 384 deletions

View File

@ -0,0 +1 @@
Magic-Folders are now supported on macOS.

View File

@ -255,6 +255,7 @@ setup(name="tahoe-lafs", # also set in __init__.py
install_requires=install_requires,
extras_require={
':sys_platform=="win32"': ["pypiwin32"],
':sys_platform!="win32" and sys_platform!="linux2"': ["watchdog"], # For magic-folder on "darwin" (macOS) and the BSDs
"test": [
# Pin a specific pyflakes so we don't have different folks
# disagreeing on what is or is not a lint issue. We can bump

View File

@ -7,6 +7,7 @@ from datetime import datetime
import time
import ConfigParser
from twisted.python.log import msg as twmsg
from twisted.python.filepath import FilePath
from twisted.python.monkey import MonkeyPatcher
from twisted.internet import defer, reactor, task
@ -19,6 +20,8 @@ from zope.interface import Interface, Attribute, implementer
from eliot import (
Field,
Message,
start_action,
ActionType,
MessageType,
write_failure,
@ -35,9 +38,6 @@ from allmydata.util import (
yamlutil,
eliotutil,
)
from allmydata.util.fake_inotify import (
humanReadableMask,
)
from allmydata.interfaces import IDirectoryNode
from allmydata.util import log
from allmydata.util.fileutil import (
@ -75,9 +75,11 @@ def _get_inotify_module():
from allmydata.windows import inotify
elif runtime.platform.supportsINotify():
from twisted.internet import inotify
elif not sys.platform.startswith("linux"):
from allmydata.watchdog import inotify
else:
raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
"This currently requires Linux or Windows.")
"This currently requires Linux, Windows, or macOS.")
return inotify
except (ImportError, AttributeError) as e:
log.msg(e)
@ -521,10 +523,11 @@ MAYBE_UPLOAD = MessageType(
u"A decision is being made about whether to upload a file.",
)
PENDING = Field.for_types(
PENDING = Field(
u"pending",
[list],
lambda s: list(s),
u"The paths which are pending processing.",
eliotutil.validateInstanceOf(set),
)
REMOVE_FROM_PENDING = ActionType(
@ -547,6 +550,19 @@ NOTIFIED_OBJECT_DISAPPEARED = MessageType(
u"A path which generated a notification was not found on the filesystem. This is normal.",
)
PROPAGATE_DIRECTORY_DELETION = ActionType(
u"magic-folder:propagate-directory-deletion",
[],
[],
u"Children of a deleted directory are being queued for upload processing.",
)
NO_DATABASE_ENTRY = MessageType(
u"magic-folder:no-database-entry",
[],
u"There is no local database entry for a particular relative path in the magic folder.",
)
NOT_UPLOADING = MessageType(
u"magic-folder:not-uploading",
[],
@ -688,13 +704,6 @@ NOTIFIED = ActionType(
u"Magic-Folder received a notification of a local filesystem change for a certain path.",
)
_EVENTS = Field(
u"events",
humanReadableMask,
u"Details about a filesystem event generating a notification event.",
eliotutil.validateInstanceOf((int, long)),
)
_NON_DIR_CREATED = Field.for_types(
u"non_dir_created",
[bool],
@ -704,7 +713,7 @@ _NON_DIR_CREATED = Field.for_types(
REACT_TO_INOTIFY = ActionType(
u"magic-folder:react-to-inotify",
[_EVENTS],
[eliotutil.INOTIFY_EVENTS],
[_IGNORED, _NON_DIR_CREATED, _ALREADY_PENDING],
u"Magic-Folder is processing a notification from inotify(7) (or a clone) about a filesystem event.",
)
@ -1118,6 +1127,13 @@ PROCESS_ITEM = ActionType(
u"A path which was found wanting of an update is receiving an update.",
)
DOWNLOAD_BEST_VERSION = ActionType(
u"magic-folder:download-best-version",
[],
[],
u"The content of a file in the Magic Folder is being downloaded.",
)
class Uploader(QueueMixin):
def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
@ -1152,14 +1168,21 @@ class Uploader(QueueMixin):
| self._inotify.IN_ONLYDIR
| IN_EXCL_UNLINK
)
self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
recursive=True)
def _add_watch(self, filepath):
self._notifier.watch(
filepath,
mask=self.mask,
callbacks=[self._notify],
recursive=True,
)
def start_monitoring(self):
action = START_MONITORING(**self._log_fields)
with action.context():
d = DeferredContext(defer.succeed(None))
d.addCallback(lambda ign: self._add_watch(self._local_filepath))
d.addCallback(lambda ign: self._notifier.startReading())
d.addCallback(lambda ign: self._count('dirs_monitored'))
d.addBoth(self._call_hook, 'started')
@ -1258,7 +1281,7 @@ class Uploader(QueueMixin):
# All can do is id() or repr() it and neither of those actually
# produces very illuminating results. We drop opaque on the
# floor, anyway.
events=events_mask,
inotify_events=events_mask,
)
success_fields = dict(non_dir_created=False, already_pending=False, ignored=False)
@ -1296,41 +1319,70 @@ class Uploader(QueueMixin):
"""
# Uploader
with PROCESS_ITEM(item=item).context():
d = DeferredContext(defer.succeed(False))
relpath_u = item.relpath_u
item.set_status('started', self._clock.seconds())
precondition(isinstance(relpath_u, unicode), relpath_u)
precondition(not relpath_u.endswith(u'/'), relpath_u)
encoded_path_u = magicpath.path2magic(relpath_u)
d = DeferredContext(defer.succeed(False))
if relpath_u is None:
item.set_status('invalid_path', self._clock.seconds())
return d.addActionFinish()
item.set_status('started', self._clock.seconds())
precondition(isinstance(relpath_u, unicode), relpath_u)
precondition(not relpath_u.endswith(u'/'), relpath_u)
try:
# Take this item out of the pending set before we do any
# I/O-based processing related to it. If a further change
# takes place after we remove it from this set, we want it to
# end up in the set again. If we haven't gotten around to
# doing the I/O-based processing yet then the worst that will
# happen is we'll do a little redundant processing.
#
# If we did it the other way around, the sequence of events
# might be something like: we do some I/O, someone else does
# some I/O, a notification gets discarded because the path is
# still in the pending set, _then_ we remove it from the
# pending set. In such a circumstance, we've missed some I/O
# that we should have responded to.
with REMOVE_FROM_PENDING(relpath=relpath_u, pending=self._pending):
self._pending.remove(relpath_u)
except KeyError:
pass
fp = self._get_filepath(relpath_u)
pathinfo = get_pathinfo(unicode_from_filepath(fp))
db_entry_is_dir = False
db_entry = self._db.get_db_entry(relpath_u)
if db_entry is None:
# Maybe it was a directory!
db_entry = self._db.get_db_entry(relpath_u + u"/")
if db_entry is None:
NO_DATABASE_ENTRY.log()
else:
db_entry_is_dir = True
def _maybe_upload(ign, now=None):
MAYBE_UPLOAD.log(relpath=relpath_u)
if now is None:
now = time.time()
fp = self._get_filepath(relpath_u)
pathinfo = get_pathinfo(unicode_from_filepath(fp))
try:
with REMOVE_FROM_PENDING(relpath=relpath_u, pending=list(self._pending)):
self._pending.remove(relpath_u)
except KeyError:
pass
encoded_path_u = magicpath.path2magic(relpath_u)
if not pathinfo.exists:
# FIXME merge this with the 'isfile' case.
NOTIFIED_OBJECT_DISAPPEARED.log(path=fp)
self._count('objects_disappeared')
db_entry = self._db.get_db_entry(relpath_u)
if db_entry is None:
# If it exists neither on the filesystem nor in the
# database, it's neither a creation nor a deletion and
# there's nothing more to do.
return False
if pathinfo.isdir or db_entry_is_dir:
with PROPAGATE_DIRECTORY_DELETION():
for localpath in self._db.get_direct_children(relpath_u):
self._add_pending(localpath.relpath_u)
last_downloaded_timestamp = now # is this correct?
if is_new_file(pathinfo, db_entry):
@ -1374,9 +1426,17 @@ class Uploader(QueueMixin):
if db_entry.last_uploaded_uri is not None:
metadata['last_uploaded_uri'] = db_entry.last_uploaded_uri
if db_entry_is_dir:
real_encoded_path_u = encoded_path_u + magicpath.path2magic(u"/")
real_relpath_u = relpath_u + u"/"
else:
real_encoded_path_u = encoded_path_u
real_relpath_u = relpath_u
empty_uploadable = Data("", self._client.convergence)
d2 = DeferredContext(self._upload_dirnode.add_file(
encoded_path_u, empty_uploadable,
real_encoded_path_u,
empty_uploadable,
metadata=metadata,
overwrite=True,
progress=item.progress,
@ -1389,7 +1449,7 @@ class Uploader(QueueMixin):
# immediately re-download it when we start up next
last_downloaded_uri = metadata.get('last_downloaded_uri', filecap)
self._db.did_upload_version(
relpath_u,
real_relpath_u,
new_version,
filecap,
last_downloaded_uri,
@ -1405,33 +1465,38 @@ class Uploader(QueueMixin):
return False
elif pathinfo.isdir:
if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
self._add_watch(fp)
db_entry = self._db.get_db_entry(relpath_u)
DIRECTORY_PATHENTRY.log(pathentry=db_entry)
if not is_new_file(pathinfo, db_entry):
NOT_NEW_DIRECTORY.log()
return False
uploadable = Data("", self._client.convergence)
encoded_path_u += magicpath.path2magic(u"/")
with PROCESS_DIRECTORY().context() as action:
upload_d = DeferredContext(self._upload_dirnode.add_file(
encoded_path_u, uploadable,
encoded_path_u + magicpath.path2magic(u"/"),
uploadable,
metadata={"version": 0},
overwrite=True,
progress=item.progress,
))
def _dir_succeeded(ign):
def _dir_succeeded(dirnode):
action.add_success_fields(created_directory=relpath_u)
self._count('directories_created')
self._db.did_upload_version(
relpath_u + u"/",
version=0,
last_uploaded_uri=dirnode.get_uri(),
last_downloaded_uri=None,
last_downloaded_timestamp=now,
pathinfo=pathinfo,
)
upload_d.addCallback(_dir_succeeded)
upload_d.addCallback(lambda ign: self._scan(relpath_u))
upload_d.addCallback(lambda ign: True)
return upload_d.addActionFinish()
elif pathinfo.isfile:
db_entry = self._db.get_db_entry(relpath_u)
last_downloaded_timestamp = now
if db_entry is None:
@ -1654,10 +1719,11 @@ class Downloader(QueueMixin, WriteFileMixin):
while True:
try:
yield self._scan_remote_collective(scan_self=True)
# The integration tests watch for this log message to
# decide when it is safe to proceed. Clearly, we need
# better programmatic interrogation of magic-folder state.
print("Completed initial Magic Folder scan successfully ({})".format(self))
# The integration tests watch for this log message (in the
# Twisted log) to decide when it is safe to proceed.
# Clearly, we need better programmatic interrogation of
# magic-folder state.
twmsg("Completed initial Magic Folder scan successfully ({})".format(self))
self._begin_processing()
return
except Exception:
@ -1707,12 +1773,21 @@ class Downloader(QueueMixin, WriteFileMixin):
file node and metadata for the latest version of the file located in the
magic-folder collective directory.
"""
collective_dirmap_d = self._collective_dirnode.list()
action = start_action(
action_type=u"magic-folder:downloader:get-latest-file",
name=filename,
)
with action.context():
collective_dirmap_d = DeferredContext(self._collective_dirnode.list())
def scan_collective(result):
Message.log(
message_type=u"magic-folder:downloader:get-latest-file:collective-scan",
dmds=result.keys(),
)
list_of_deferreds = []
for dir_name in result.keys():
# XXX make sure it's a directory
d = defer.succeed(None)
d = DeferredContext(defer.succeed(None))
d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
list_of_deferreds.append(d)
deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
@ -1724,12 +1799,20 @@ class Downloader(QueueMixin, WriteFileMixin):
node = None
for success, result in deferredList:
if success:
Message.log(
message_type=u"magic-folder:downloader:get-latest-file:version",
version=result[1]['version'],
)
if node is None or result[1]['version'] > max_version:
node, metadata = result
max_version = result[1]['version']
else:
Message.log(
message_type="magic-folder:downloader:get-latest-file:failed",
)
return node, metadata
collective_dirmap_d.addCallback(highest_version)
return collective_dirmap_d
return collective_dirmap_d.addActionFinish()
def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
with SCAN_REMOTE_DMD(nickname=nickname).context():
@ -1959,14 +2042,17 @@ class Downloader(QueueMixin, WriteFileMixin):
if item.metadata.get('deleted', False):
d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
else:
d.addCallback(lambda ign: item.file_node.download_best_version(progress=item.progress))
d.addCallback(
lambda contents: self._write_downloaded_file(
@eliotutil.log_call_deferred(DOWNLOAD_BEST_VERSION.action_type)
def download_best_version(ignored):
d = DeferredContext(item.file_node.download_best_version(progress=item.progress))
d.addCallback(lambda contents: self._write_downloaded_file(
self._local_path_u, abspath_u, contents,
is_conflict=is_conflict,
mtime=item.metadata.get('user_mtime', item.metadata.get('tahoe', {}).get('linkmotime')),
)
)
))
return d.result
d.addCallback(download_best_version)
d.addCallback(do_update_db)
d.addErrback(failed)

View File

@ -43,11 +43,11 @@ _INSERT_OR_UPDATE = Field.for_types(
validateSetMembership({u"insert", u"update"}),
)
DID_UPLOAD_VERSION = ActionType(
u"magic-folder-db:did-upload-version",
UPDATE_ENTRY = ActionType(
u"magic-folder-db:update-entry",
[RELPATH, VERSION, LAST_UPLOADED_URI, LAST_DOWNLOADED_URI, LAST_DOWNLOADED_TIMESTAMP, PATHINFO],
[_INSERT_OR_UPDATE],
u"An file upload is being recorded in the database.",
u"Record some metadata about a relative path in the magic-folder.",
)
@ -88,6 +88,15 @@ def get_magicfolderdb(dbfile, stderr=sys.stderr,
print >>stderr, e
return None
class LocalPath(object):
@classmethod
def fromrow(self, row):
p = LocalPath()
p.relpath_u = row[0]
p.entry = PathEntry(*row[1:])
return p
class MagicFolderDB(object):
VERSION = 1
@ -121,6 +130,42 @@ class MagicFolderDB(object):
last_downloaded_uri=last_downloaded_uri,
last_downloaded_timestamp=last_downloaded_timestamp)
def get_direct_children(self, relpath_u):
"""
Given the relative path to a directory, return ``LocalPath`` instances
representing all direct children of that directory.
"""
# It would be great to not be interpolating data into query
# statements. However, query parameters are not supported in the
# position where we need them.
sqlitesafe_relpath_u = relpath_u.replace(u"'", u"''")
statement = (
"""
SELECT
path, size, mtime_ns, ctime_ns, version, last_uploaded_uri,
last_downloaded_uri, last_downloaded_timestamp
FROM
local_files
WHERE
-- The "_" used here ensures there is at least one character
-- after the /. This prevents matching the path itself.
path LIKE '{path}/_%' AND
-- The "_" used here serves a similar purpose. This allows
-- matching directory children but avoids matching their
-- children.
path NOT LIKE '{path}/_%/_%'
"""
).format(path=sqlitesafe_relpath_u)
self.cursor.execute(statement)
rows = self.cursor.fetchall()
return list(
LocalPath.fromrow(row)
for row
in rows
)
def get_all_relpaths(self):
"""
Retrieve a set of all relpaths of files that have had an entry in magic folder db
@ -131,7 +176,7 @@ class MagicFolderDB(object):
return set([r[0] for r in rows])
def did_upload_version(self, relpath_u, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp, pathinfo):
action = DID_UPLOAD_VERSION(
action = UPDATE_ENTRY(
relpath=relpath_u,
version=version,
last_uploaded_uri=last_uploaded_uri,

View File

@ -6,6 +6,10 @@ import re
import time
from datetime import datetime
from eliot import (
log_call,
start_action,
)
from eliot.twisted import (
DeferredContext,
)
@ -38,7 +42,13 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin):
self.bob_nickname = self.unicode_or_fallback(u"Bob\u00F8", u"Bob", io_as_well=True)
def do_create_magic_folder(self, client_num):
d = self.do_cli("magic-folder", "--debug", "create", "magic:", client_num=client_num)
with start_action(action_type=u"create-magic-folder", client_num=client_num).context():
d = DeferredContext(
self.do_cli(
"magic-folder", "--debug", "create", "magic:",
client_num=client_num,
)
)
def _done((rc,stdout,stderr)):
self.failUnlessEqual(rc, 0, stdout + stderr)
self.assertIn("Alias 'magic' created", stdout)
@ -49,16 +59,30 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin):
self.assertIn("magic", aliases)
self.failUnless(aliases["magic"].startswith("URI:DIR2:"))
d.addCallback(_done)
return d
return d.addActionFinish()
def do_invite(self, client_num, nickname):
nickname_arg = unicode_to_argv(nickname)
d = self.do_cli("magic-folder", "invite", "magic:", nickname_arg, client_num=client_num)
action = start_action(
action_type=u"invite-to-magic-folder",
client_num=client_num,
nickname=nickname,
)
with action.context():
d = DeferredContext(
self.do_cli(
"magic-folder",
"invite",
"magic:",
nickname_arg,
client_num=client_num,
)
)
def _done((rc, stdout, stderr)):
self.failUnlessEqual(rc, 0)
return (rc, stdout, stderr)
d.addCallback(_done)
return d
return d.addActionFinish()
def do_list(self, client_num, json=False):
args = ("magic-folder", "list",)
@ -81,18 +105,32 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin):
return d
def do_join(self, client_num, local_dir, invite_code):
precondition(isinstance(local_dir, unicode), local_dir=local_dir)
precondition(isinstance(invite_code, str), invite_code=invite_code)
local_dir_arg = unicode_to_argv(local_dir)
d = self.do_cli("magic-folder", "join", invite_code, local_dir_arg, client_num=client_num)
action = start_action(
action_type=u"join-magic-folder",
client_num=client_num,
local_dir=local_dir,
invite_code=invite_code,
)
with action.context():
precondition(isinstance(local_dir, unicode), local_dir=local_dir)
precondition(isinstance(invite_code, str), invite_code=invite_code)
local_dir_arg = unicode_to_argv(local_dir)
d = DeferredContext(
self.do_cli(
"magic-folder",
"join",
invite_code,
local_dir_arg,
client_num=client_num,
)
)
def _done((rc, stdout, stderr)):
self.failUnlessEqual(rc, 0)
self.failUnlessEqual(stdout, "")
self.failUnlessEqual(stderr, "")
return (rc, stdout, stderr)
d.addCallback(_done)
return d
return d.addActionFinish()
def do_leave(self, client_num):
d = self.do_cli("magic-folder", "leave", client_num=client_num)
@ -106,8 +144,16 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin):
"""Tests that our collective directory has the readonly cap of
our upload directory.
"""
collective_readonly_cap = self.get_caps_from_files(client_num)[0]
d = self.do_cli("ls", "--json", collective_readonly_cap, client_num=client_num)
action = start_action(action_type=u"check-joined-config")
with action.context():
collective_readonly_cap = self.get_caps_from_files(client_num)[0]
d = DeferredContext(
self.do_cli(
"ls", "--json",
collective_readonly_cap,
client_num=client_num,
)
)
def _done((rc, stdout, stderr)):
self.failUnlessEqual(rc, 0)
return (rc, stdout, stderr)
@ -118,7 +164,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin):
self.failUnless(s is not None)
return None
d.addCallback(test_joined_magic_folder)
return d
return d.addActionFinish()
def get_caps_from_files(self, client_num):
from allmydata.frontends.magic_folder import load_magic_folders
@ -126,6 +172,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin):
mf = folders["default"]
return mf['collective_dircap'], mf['upload_dircap']
@log_call
def check_config(self, client_num, local_dir):
client_config = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "tahoe.cfg"))
mf_yaml = fileutil.read(os.path.join(self.get_clientdir(i=client_num), "private", "magic_folders.yaml"))

View File

@ -2,6 +2,10 @@ __all__ = [
"SyncTestCase",
"AsyncTestCase",
"AsyncBrokenTestCase",
"flush_logged_errors",
"skip",
"skipIf",
]
import os, random, struct
@ -16,11 +20,14 @@ from zope.interface import implementer
from testtools import (
TestCase,
skip,
skipIf,
)
from testtools.twistedsupport import (
SynchronousDeferredRunTest,
AsynchronousDeferredRunTest,
AsynchronousDeferredRunTestForBrokenTwisted,
flush_logged_errors,
)
from twisted.internet import defer

View File

@ -57,22 +57,23 @@ def eliot_logged_test(f):
@wraps(f)
def run_and_republish(self, *a, **kw):
# Unfortunately the only way to get at the global/default logger...
# This import is delayed here so that we get the *current* default
# logger at the time the decorated function is run.
from eliot._output import _DEFAULT_LOGGER as default_logger
def republish():
# This is called as a cleanup function after capture_logging has
# restored the global/default logger to its original state. We
# can now emit messages that go to whatever global destinations
# are installed.
# Unfortunately the only way to get at the global/default
# logger...
from eliot._output import _DEFAULT_LOGGER as logger
# storage.logger.serialize() seems like it would make more sense
# than storage.logger.messages here. However, serialize()
# explodes, seemingly as a result of double-serializing the logged
# messages. I don't understand this.
for msg in storage.logger.messages:
logger.write(msg)
default_logger.write(msg)
# And now that we've re-published all of the test's messages, we
# can finish the test's action.

View File

@ -0,0 +1,171 @@
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Tests for the inotify-alike implementation L{allmydata.watchdog}.
"""
# Note: See https://twistedmatrix.com/trac/ticket/8915 for a proposal
# to avoid all of this duplicated code from Twisted.
from twisted.internet import defer, reactor
from twisted.python import filepath, runtime
from allmydata.frontends.magic_folder import get_inotify_module
from .common import (
AsyncTestCase,
skipIf,
)
inotify = get_inotify_module()
@skipIf(runtime.platformType == "win32", "inotify does not yet work on windows")
class INotifyTests(AsyncTestCase):
"""
Define all the tests for the basic functionality exposed by
L{inotify.INotify}.
"""
def setUp(self):
self.dirname = filepath.FilePath(self.mktemp())
self.dirname.createDirectory()
self.inotify = inotify.INotify()
self.inotify.startReading()
self.addCleanup(self.inotify.stopReading)
return super(INotifyTests, self).setUp()
def _notificationTest(self, mask, operation, expectedPath=None):
"""
Test notification from some filesystem operation.
@param mask: The event mask to use when setting up the watch.
@param operation: A function which will be called with the
name of a file in the watched directory and which should
trigger the event.
@param expectedPath: Optionally, the name of the path which is
expected to come back in the notification event; this will
also be passed to C{operation} (primarily useful when the
operation is being done to the directory itself, not a
file in it).
@return: A L{Deferred} which fires successfully when the
expected event has been received or fails otherwise.
"""
if expectedPath is None:
expectedPath = self.dirname.child("foo.bar")
notified = defer.Deferred()
def cbNotified(result):
(watch, filename, events) = result
self.assertEqual(filename.asBytesMode(), expectedPath.asBytesMode())
self.assertTrue(events & mask)
self.inotify.ignore(self.dirname)
notified.addCallback(cbNotified)
def notify_event(*args):
notified.callback(args)
self.inotify.watch(
self.dirname, mask=mask,
callbacks=[notify_event])
operation(expectedPath)
return notified
def test_modify(self):
"""
Writing to a file in a monitored directory sends an
C{inotify.IN_MODIFY} event to the callback.
"""
def operation(path):
with path.open("w") as fObj:
fObj.write(b'foo')
return self._notificationTest(inotify.IN_MODIFY, operation)
def test_attrib(self):
"""
Changing the metadata of a file in a monitored directory
sends an C{inotify.IN_ATTRIB} event to the callback.
"""
def operation(path):
# Create the file.
path.touch()
# Modify the file's attributes.
path.touch()
return self._notificationTest(inotify.IN_ATTRIB, operation)
def test_closeWrite(self):
"""
Closing a file which was open for writing in a monitored
directory sends an C{inotify.IN_CLOSE_WRITE} event to the
callback.
"""
def operation(path):
path.open("w").close()
return self._notificationTest(inotify.IN_CLOSE_WRITE, operation)
def test_delete(self):
"""
Deleting a file in a monitored directory sends an
C{inotify.IN_DELETE} event to the callback.
"""
expectedPath = self.dirname.child("foo.bar")
expectedPath.touch()
notified = defer.Deferred()
def cbNotified(result):
(watch, filename, events) = result
self.assertEqual(filename.asBytesMode(), expectedPath.asBytesMode())
self.assertTrue(events & inotify.IN_DELETE)
notified.addCallback(cbNotified)
self.inotify.watch(
self.dirname, mask=inotify.IN_DELETE,
callbacks=[lambda *args: notified.callback(args)])
expectedPath.remove()
return notified
def test_humanReadableMask(self):
"""
L{inotify.humanReadableMask} translates all the possible event masks to a
human readable string.
"""
for mask, value in inotify._FLAG_TO_HUMAN:
self.assertEqual(inotify.humanReadableMask(mask)[0], value)
checkMask = (
inotify.IN_CLOSE_WRITE | inotify.IN_ACCESS | inotify.IN_OPEN)
self.assertEqual(
set(inotify.humanReadableMask(checkMask)),
set(['close_write', 'access', 'open']))
def test_noAutoAddSubdirectory(self):
"""
L{inotify.INotify.watch} with autoAdd==False will stop inotify
from watching subdirectories created under the watched one.
"""
def _callback(wp, fp, mask):
# We are notified before we actually process new
# directories, so we need to defer this check.
def _():
try:
self.assertFalse(self.inotify._isWatched(subdir))
d.callback(None)
except Exception:
d.errback()
reactor.callLater(0, _)
checkMask = inotify.IN_ISDIR | inotify.IN_CREATE
self.inotify.watch(
self.dirname, mask=checkMask, autoAdd=False,
callbacks=[_callback])
subdir = self.dirname.child('test')
d = defer.Deferred()
subdir.createDirectory()
return d

File diff suppressed because it is too large Load Diff

View File

@ -361,6 +361,7 @@ class _EliotLogging(Service):
self.twisted_observer = _TwistedLoggerToEliotObserver()
globalLogPublisher.addObserver(self.twisted_observer)
add_destinations(*self.destinations)
return Service.startService(self)
def stopService(self):
@ -368,7 +369,7 @@ class _EliotLogging(Service):
remove_destination(dest)
globalLogPublisher.removeObserver(self.twisted_observer)
self.stdlib_cleanup()
return Service.stopService(self)
@implementer(ILogObserver)

View File

@ -74,6 +74,8 @@ def humanReadableMask(mask):
return s
from eliot import start_action
# This class is not copied from Twisted; it acts as a mock.
class INotify(object):
def startReading(self):
@ -89,8 +91,9 @@ class INotify(object):
self.callbacks = callbacks
def event(self, filepath, mask):
for cb in self.callbacks:
cb(None, filepath, mask)
with start_action(action_type=u"fake-inotify:event", path=filepath.path, mask=mask):
for cb in self.callbacks:
cb(None, filepath, mask)
__all__ = ["INotify", "humanReadableMask", "IN_WATCH_MASK", "IN_ACCESS",

View File

View File

@ -0,0 +1,16 @@
"""
Hotfix for https://github.com/gorakhargosh/watchdog/issues/541
"""
from watchdog.observers.fsevents import FSEventsEmitter
# The class object has already been bundled up in the default arguments to
# FSEventsObserver.__init__. So mutate the class object (instead of replacing
# it with a safer version).
original_on_thread_stop = FSEventsEmitter.on_thread_stop
def safe_on_thread_stop(self):
if self.is_alive():
return original_on_thread_stop(self)
def patch():
FSEventsEmitter.on_thread_stop = safe_on_thread_stop

View File

@ -0,0 +1,212 @@
"""
An implementation of an inotify-like interface on top of the ``watchdog`` library.
"""
from __future__ import (
unicode_literals,
print_function,
absolute_import,
division,
)
__all__ = [
"humanReadableMask", "INotify",
"IN_WATCH_MASK", "IN_ACCESS", "IN_MODIFY", "IN_ATTRIB", "IN_CLOSE_NOWRITE",
"IN_CLOSE_WRITE", "IN_OPEN", "IN_MOVED_FROM", "IN_MOVED_TO", "IN_CREATE",
"IN_DELETE", "IN_DELETE_SELF", "IN_MOVE_SELF", "IN_UNMOUNT", "IN_ONESHOT",
"IN_Q_OVERFLOW", "IN_IGNORED", "IN_ONLYDIR", "IN_DONT_FOLLOW", "IN_MOVED",
"IN_MASK_ADD", "IN_ISDIR", "IN_CLOSE", "IN_CHANGED", "_FLAG_TO_HUMAN",
]
from watchdog.observers import Observer
from watchdog.events import (
FileSystemEvent,
FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent,
DirDeletedEvent, FileDeletedEvent, FileModifiedEvent
)
from twisted.internet import reactor
from twisted.python.filepath import FilePath
from allmydata.util.fileutil import abspath_expanduser_unicode
from eliot import (
ActionType,
Message,
Field,
preserve_context,
start_action,
)
from allmydata.util.pollmixin import PollMixin
from allmydata.util.assertutil import _assert, precondition
from allmydata.util import encodingutil
from allmydata.util.fake_inotify import humanReadableMask, \
IN_WATCH_MASK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_NOWRITE, IN_CLOSE_WRITE, \
IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, IN_CREATE, IN_DELETE, IN_DELETE_SELF, \
IN_MOVE_SELF, IN_UNMOUNT, IN_Q_OVERFLOW, IN_IGNORED, IN_ONLYDIR, IN_DONT_FOLLOW, \
IN_MASK_ADD, IN_ISDIR, IN_ONESHOT, IN_CLOSE, IN_MOVED, IN_CHANGED, \
_FLAG_TO_HUMAN
from ..util.eliotutil import (
MAYBE_NOTIFY,
CALLBACK,
validateInstanceOf,
)
from . import _watchdog_541
_watchdog_541.patch()
NOT_STARTED = "NOT_STARTED"
STARTED = "STARTED"
STOPPING = "STOPPING"
STOPPED = "STOPPED"
_PATH = Field.for_types(
u"path",
[bytes, unicode],
u"The path an inotify event concerns.",
)
_EVENT = Field(
u"event",
lambda e: e.__class__.__name__,
u"The watchdog event that has taken place.",
validateInstanceOf(FileSystemEvent),
)
ANY_INOTIFY_EVENT = ActionType(
u"watchdog:inotify:any-event",
[_PATH, _EVENT],
[],
u"An inotify event is being dispatched.",
)
class INotifyEventHandler(FileSystemEventHandler):
def __init__(self, path, mask, callbacks, pending_delay):
FileSystemEventHandler.__init__(self)
self._path = path
self._mask = mask
self._callbacks = callbacks
self._pending_delay = pending_delay
self._pending = set()
def _maybe_notify(self, path, event):
with MAYBE_NOTIFY():
event_mask = IN_CHANGED
if isinstance(event, FileModifiedEvent):
event_mask = event_mask | IN_CLOSE_WRITE
event_mask = event_mask | IN_MODIFY
if isinstance(event, (DirCreatedEvent, FileCreatedEvent)):
# For our purposes, IN_CREATE is irrelevant.
event_mask = event_mask | IN_CLOSE_WRITE
if isinstance(event, (DirDeletedEvent, FileDeletedEvent)):
event_mask = event_mask | IN_DELETE
if event.is_directory:
event_mask = event_mask | IN_ISDIR
if not (self._mask & event_mask):
return
for cb in self._callbacks:
try:
with CALLBACK(inotify_events=event_mask):
cb(None, FilePath(path), event_mask)
except:
# Eliot already logged the exception for us.
# There's nothing else we can do about it here.
pass
def process(self, event):
event_filepath_u = event.src_path.decode(encodingutil.get_filesystem_encoding())
event_filepath_u = abspath_expanduser_unicode(event_filepath_u, base=self._path)
if event_filepath_u == self._path:
# ignore events for parent directory
return
self._maybe_notify(event_filepath_u, event)
def on_any_event(self, event):
with ANY_INOTIFY_EVENT(path=event.src_path, event=event):
reactor.callFromThread(
preserve_context(self.process),
event,
)
class INotify(PollMixin):
"""
I am a prototype INotify, made to work on Mac OS X (Darwin)
using the Watchdog python library. This is actually a simplified subset
of the twisted Linux INotify class because we do not utilize the watch mask
and only implement the following methods:
- watch
- startReading
- stopReading
- wait_until_stopped
- set_pending_delay
"""
def __init__(self):
self._pending_delay = 1.0
self.recursive_includes_new_subdirectories = False
self._callbacks = {}
self._watches = {}
self._state = NOT_STARTED
self._observer = Observer(timeout=self._pending_delay)
def set_pending_delay(self, delay):
Message.log(message_type=u"watchdog:inotify:set-pending-delay", delay=delay)
assert self._state != STARTED
self._pending_delay = delay
def startReading(self):
with start_action(action_type=u"watchdog:inotify:start-reading"):
assert self._state != STARTED
try:
# XXX twisted.internet.inotify doesn't require watches to
# be set before startReading is called.
# _assert(len(self._callbacks) != 0, "no watch set")
self._observer.start()
self._state = STARTED
except:
self._state = STOPPED
raise
def stopReading(self):
with start_action(action_type=u"watchdog:inotify:stop-reading"):
if self._state != STOPPED:
self._state = STOPPING
self._observer.unschedule_all()
self._observer.stop()
self._observer.join()
self._state = STOPPED
def wait_until_stopped(self):
return self.poll(lambda: self._state == STOPPED)
def _isWatched(self, path_u):
return path_u in self._callbacks.keys()
def ignore(self, path):
path_u = path.path
self._observer.unschedule(self._watches[path_u])
del self._callbacks[path_u]
del self._watches[path_u]
def watch(self, path, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False):
precondition(isinstance(autoAdd, bool), autoAdd=autoAdd)
precondition(isinstance(recursive, bool), recursive=recursive)
assert autoAdd == False
path_u = path.path
if not isinstance(path_u, unicode):
path_u = path_u.decode('utf-8')
_assert(isinstance(path_u, unicode), path_u=path_u)
if path_u not in self._callbacks.keys():
self._callbacks[path_u] = callbacks or []
self._watches[path_u] = self._observer.schedule(
INotifyEventHandler(path_u, mask, self._callbacks[path_u], self._pending_delay),
path=path_u,
recursive=False,
)

View File

@ -154,6 +154,7 @@ class FileNotifyInformation(object):
bytes = self._read_dword(pos+8)
s = Event(self._read_dword(pos+4),
self.data[pos+12 : pos+12+bytes].decode('utf-16-le'))
Message.log(message_type="fni", info=repr(s))
next_entry_offset = self._read_dword(pos)
yield s
@ -310,7 +311,6 @@ class INotify(PollMixin):
if self._check_stop():
return
for info in fni:
# print info
path = self._path.preauthChild(info.filename) # FilePath with Unicode path
if info.action == FILE_ACTION_MODIFIED and path.isdir():
Message.log(