diff --git a/integration/conftest.py b/integration/conftest.py index 68fbea5d7..98091c7ce 100644 --- a/integration/conftest.py +++ b/integration/conftest.py @@ -8,6 +8,12 @@ from os.path import join, exists from tempfile import mkdtemp, mktemp from functools import partial +from eliot import ( + to_file, + log_call, + start_action, +) + from twisted.python.procutils import which from twisted.internet.error import ( ProcessExitedAlready, @@ -36,14 +42,21 @@ def pytest_addoption(parser): help="Keep the tmpdir with the client directories (introducer, etc)", ) +@pytest.fixture(autouse=True, scope='session') +def eliot_logging(): + with open("integration.eliot.json", "w") as f: + to_file(f) + yield + + # I've mostly defined these fixtures from "easiest" to "most # complicated", and the dependencies basically go "down the # page". They're all session-scoped which has the "pro" that we only # set up the grid once, but the "con" that each test has to be a # little careful they're not stepping on toes etc :/ - @pytest.fixture(scope='session') +@log_call(action_type=u"integration:reactor", include_result=False) def reactor(): # this is a fixture in case we might want to try different # reactors for some reason. @@ -52,6 +65,7 @@ def reactor(): @pytest.fixture(scope='session') +@log_call(action_type=u"integration:temp_dir", include_args=[]) def temp_dir(request): """ Invoke like 'py.test --keep-tempdir ...' to avoid deleting the temp-dir @@ -76,11 +90,13 @@ def temp_dir(request): @pytest.fixture(scope='session') +@log_call(action_type=u"integration:flog_binary", include_args=[]) def flog_binary(): return which('flogtool')[0] @pytest.fixture(scope='session') +@log_call(action_type=u"integration:flog_gatherer", include_args=[]) def flog_gatherer(reactor, temp_dir, flog_binary, request): out_protocol = _CollectOutputProtocol() gather_dir = join(temp_dir, 'flog_gather') @@ -139,6 +155,11 @@ def flog_gatherer(reactor, temp_dir, flog_binary, request): @pytest.fixture(scope='session') +@log_call( + action_type=u"integration:introducer", + include_args=["temp_dir", "flog_gatherer"], + include_result=False, +) def introducer(reactor, temp_dir, flog_gatherer, request): config = ''' [node] @@ -190,6 +211,7 @@ log_gatherer.furl = {log_furl} @pytest.fixture(scope='session') +@log_call(action_type=u"integration:introducer:furl", include_args=["temp_dir"]) def introducer_furl(introducer, temp_dir): furl_fname = join(temp_dir, 'introducer', 'private', 'introducer.furl') while not exists(furl_fname): @@ -200,6 +222,11 @@ def introducer_furl(introducer, temp_dir): @pytest.fixture(scope='session') +@log_call( + action_type=u"integration:tor:introducer", + include_args=["temp_dir", "flog_gatherer"], + include_result=False, +) def tor_introducer(reactor, temp_dir, flog_gatherer, request): config = ''' [node] @@ -268,6 +295,11 @@ def tor_introducer_furl(tor_introducer, temp_dir): @pytest.fixture(scope='session') +@log_call( + action_type=u"integration:storage_nodes", + include_args=["temp_dir", "introducer_furl", "flog_gatherer"], + include_result=False, +) def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, request): nodes = [] # start all 5 nodes in parallel @@ -287,6 +319,7 @@ def storage_nodes(reactor, temp_dir, introducer, introducer_furl, flog_gatherer, @pytest.fixture(scope='session') +@log_call(action_type=u"integration:alice", include_args=[], include_result=False) def alice(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, request): try: mkdir(join(temp_dir, 'magic-alice')) @@ -304,6 +337,7 @@ def alice(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, requ @pytest.fixture(scope='session') +@log_call(action_type=u"integration:bob", include_args=[], include_result=False) def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, request): try: mkdir(join(temp_dir, 'magic-bob')) @@ -321,54 +355,63 @@ def bob(reactor, temp_dir, introducer_furl, flog_gatherer, storage_nodes, reques @pytest.fixture(scope='session') +@log_call(action_type=u"integration:alice:invite", include_args=["temp_dir"]) def alice_invite(reactor, alice, temp_dir, request): node_dir = join(temp_dir, 'alice') - # FIXME XXX by the time we see "client running" in the logs, the - # storage servers aren't "really" ready to roll yet (uploads - # fairly consistently fail if we don't hack in this pause...) - import time ; time.sleep(5) - proto = _CollectOutputProtocol() - reactor.spawnProcess( - proto, - sys.executable, - [ - sys.executable, '-m', 'allmydata.scripts.runner', - 'magic-folder', 'create', - '--poll-interval', '2', - '--basedir', node_dir, 'magik:', 'alice', - join(temp_dir, 'magic-alice'), - ] - ) - pytest_twisted.blockon(proto.done) + with start_action(action_type=u"integration:alice:magic_folder:create"): + # FIXME XXX by the time we see "client running" in the logs, the + # storage servers aren't "really" ready to roll yet (uploads fairly + # consistently fail if we don't hack in this pause...) + import time ; time.sleep(5) + proto = _CollectOutputProtocol() + reactor.spawnProcess( + proto, + sys.executable, + [ + sys.executable, '-m', 'allmydata.scripts.runner', + 'magic-folder', 'create', + '--poll-interval', '2', + '--basedir', node_dir, 'magik:', 'alice', + join(temp_dir, 'magic-alice'), + ] + ) + pytest_twisted.blockon(proto.done) - proto = _CollectOutputProtocol() - reactor.spawnProcess( - proto, - sys.executable, - [ - sys.executable, '-m', 'allmydata.scripts.runner', - 'magic-folder', 'invite', - '--basedir', node_dir, 'magik:', 'bob', - ] - ) - pytest_twisted.blockon(proto.done) - invite = proto.output.getvalue() - print("invite from alice", invite) + with start_action(action_type=u"integration:alice:magic_folder:invite") as a: + proto = _CollectOutputProtocol() + reactor.spawnProcess( + proto, + sys.executable, + [ + sys.executable, '-m', 'allmydata.scripts.runner', + 'magic-folder', 'invite', + '--basedir', node_dir, 'magik:', 'bob', + ] + ) + pytest_twisted.blockon(proto.done) + invite = proto.output.getvalue() + a.add_success_fields(invite=invite) - # before magic-folder works, we have to stop and restart (this is - # crappy for the tests -- can we fix it in magic-folder?) - try: - alice.signalProcess('TERM') - pytest_twisted.blockon(alice.exited) - except ProcessExitedAlready: - pass - magic_text = 'Completed initial Magic Folder scan successfully' - pytest_twisted.blockon(_run_node(reactor, node_dir, request, magic_text)) + with start_action(action_type=u"integration:alice:magic_folder:restart"): + # before magic-folder works, we have to stop and restart (this is + # crappy for the tests -- can we fix it in magic-folder?) + try: + alice.signalProcess('TERM') + pytest_twisted.blockon(alice.exited) + except ProcessExitedAlready: + pass + with start_action(action_type=u"integration:alice:magic_folder:magic-text"): + magic_text = 'Completed initial Magic Folder scan successfully' + pytest_twisted.blockon(_run_node(reactor, node_dir, request, magic_text)) return invite @pytest.fixture(scope='session') +@log_call( + action_type=u"integration:magic_folder", + include_args=["alice_invite", "temp_dir"], +) def magic_folder(reactor, alice_invite, alice, bob, temp_dir, request): print("pairing magic-folder") bob_dir = join(temp_dir, 'bob') diff --git a/integration/util.py b/integration/util.py index 8674740d2..302166d19 100644 --- a/integration/util.py +++ b/integration/util.py @@ -139,6 +139,7 @@ def _run_node(reactor, node_dir, request, magic_text): sys.executable, ( sys.executable, '-m', 'allmydata.scripts.runner', + '--eliot-destination', 'file:{}/logs/eliot.json'.format(node_dir), 'run', node_dir, ), diff --git a/newsfragments/2977.other b/newsfragments/2977.other new file mode 100644 index 000000000..a30aba7a6 --- /dev/null +++ b/newsfragments/2977.other @@ -0,0 +1 @@ +The Magic-Folder frontend has had additional logging improvements. \ No newline at end of file diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index 0b3633efb..851e271c7 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -13,7 +13,6 @@ from twisted.internet import defer, reactor, task from twisted.internet.error import AlreadyCancelled from twisted.python.failure import Failure from twisted.python import runtime -from twisted.python import log as twlog from twisted.application import service from zope.interface import Interface, Attribute, implementer @@ -23,6 +22,8 @@ from eliot import ( ActionType, MessageType, write_failure, + write_traceback, + log_call, ) from eliot.twisted import ( DeferredContext, @@ -34,6 +35,9 @@ from allmydata.util import ( yamlutil, eliotutil, ) +from allmydata.util.fake_inotify import ( + humanReadableMask, +) from allmydata.interfaces import IDirectoryNode from allmydata.util import log from allmydata.util.fileutil import ( @@ -47,7 +51,7 @@ from allmydata.util.deferredutil import HookMixin from allmydata.util.progress import PercentProgress from allmydata.util.encodingutil import listdir_filepath, to_filepath, \ extend_filepath, unicode_from_filepath, unicode_segments_from, \ - quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError + quote_filepath, quote_local_unicode_path, FilenameEncodingError from allmydata.util.time_format import format_time from allmydata.immutable.upload import FileName, Data from allmydata import magicfolderdb, magicpath @@ -386,11 +390,6 @@ class MagicFolder(service.MultiService): self.set_public_status, poll_interval=downloader_delay) self._public_status = (False, ['Magic folder has not yet started']) - def enable_debug_log(self, enabled=True): - twlog.msg("enable debug log: %s" % enabled) - self.uploader.enable_debug_log(enabled) - self.downloader.enable_debug_log(enabled) - def get_public_status(self): """ For the web UI, basically. @@ -444,6 +443,13 @@ _DIRECTION = Field.for_types( eliotutil.validateSetMembership({u"uploader", u"downloader"}), ) +PROCESSING_LOOP = ActionType( + u"magic-folder:processing-loop", + [_NICKNAME, _DIRECTION], + [], + u"A Magic-Folder is processing uploads or downloads.", +) + ITERATION = ActionType( u"magic-folder:iteration", [_NICKNAME, _DIRECTION], @@ -451,11 +457,17 @@ ITERATION = ActionType( u"A step towards synchronization in one direction.", ) -PERFORM_SCAN = ActionType( - u"magic-folder:perform-scan", +_COUNT = Field.for_types( + u"count", + [int, long], + u"The number of items in the processing queue.", +) + +PROCESS_QUEUE = ActionType( + u"magic-folder:process-queue", + [_COUNT], [], - [], - u"Remote storage is being scanned for changes which need to be synchronized.", + u"A Magic-Folder is working through an item queue.", ) SCAN_REMOTE_COLLECTIVE = ActionType( @@ -560,6 +572,12 @@ PROCESS_DIRECTORY = ActionType( u"An item being processed was a directory.", ) +DIRECTORY_PATHENTRY = MessageType( + u"magic-folder:directory-dbentry", + [magicfolderdb.PATHENTRY], + u"Local database state relating to an item possibly being uploaded.", +) + NOT_NEW_DIRECTORY = MessageType( u"magic-folder:not-new-directory", [], @@ -578,6 +596,267 @@ SPECIAL_FILE = MessageType( u"An item being processed was found to be of a special type which is not supported.", ) +_COUNTER_NAME = Field.for_types( + u"counter_name", + # Should really only be unicode + [unicode, bytes], + u"The name of a counter.", +) + +_DELTA = Field.for_types( + u"delta", + [int, long], + u"An amount of a specific change in a counter.", +) + +_VALUE = Field.for_types( + u"value", + [int, long], + u"The new value of a counter after a change.", +) + +COUNT_CHANGED = MessageType( + u"magic-folder:count", + [_COUNTER_NAME, _DELTA, _VALUE], + u"The value of a counter has changed.", +) + +START_MONITORING = ActionType( + u"magic-folder:start-monitoring", + [_NICKNAME, _DIRECTION], + [], + u"Uploader is beginning to monitor the filesystem for uploadable changes.", +) + +STOP_MONITORING = ActionType( + u"magic-folder:stop-monitoring", + [_NICKNAME, _DIRECTION], + [], + u"Uploader is terminating filesystem monitoring operation.", +) + +START_UPLOADING = ActionType( + u"magic-folder:start-uploading", + [_NICKNAME, _DIRECTION], + [], + u"Uploader is performing startup-time inspection of known files.", +) + +_IGNORED = Field.for_types( + u"ignored", + [bool], + u"A file proposed for queueing for processing is instead being ignored by policy.", +) + +_ALREADY_PENDING = Field.for_types( + u"already_pending", + [bool], + u"A file proposed for queueing for processing is already in the queue.", +) + +_SIZE = Field.for_types( + u"size", + [int, long, type(None)], + u"The size of a file accepted into the processing queue.", +) + +ADD_PENDING = ActionType( + u"magic-folder:add-pending", + [eliotutil.RELPATH], + [_IGNORED, _ALREADY_PENDING, _SIZE], + u"Uploader is adding a path to the processing queue.", +) + +FULL_SCAN = ActionType( + u"magic-folder:full-scan", + [_NICKNAME, _DIRECTION], + [], + u"A complete brute-force scan of the local directory is being performed.", +) + +SCAN = ActionType( + u"magic-folder:scan", + [eliotutil.RELPATH], + [], + u"A brute-force scan of a subset of the local directory is being performed.", +) + +NOTIFIED = ActionType( + u"magic-folder:notified", + [PATH, _NICKNAME, _DIRECTION], + [], + u"Magic-Folder received a notification of a local filesystem change for a certain path.", +) + +_EVENTS = Field( + u"events", + humanReadableMask, + u"Details about a filesystem event generating a notification event.", + eliotutil.validateInstanceOf((int, long)), +) + +_NON_DIR_CREATED = Field.for_types( + u"non_dir_created", + [bool], + u"A creation event was for a non-directory and requires no further inspection.", +) + + +REACT_TO_INOTIFY = ActionType( + u"magic-folder:react-to-inotify", + [_EVENTS], + [_IGNORED, _NON_DIR_CREATED, _ALREADY_PENDING], + u"Magic-Folder is processing a notification from inotify(7) (or a clone) about a filesystem event.", +) + +_ABSPATH = Field.for_types( + u"abspath", + [unicode], + u"The absolute path of a file being written in a local directory.", +) + +_IS_CONFLICT = Field.for_types( + u"is_conflict", + [bool], + u"An indication of whether a file being written in a local directory is in a conflicted state.", +) + +_NOW = Field.for_types( + u"now", + [int, long, float], + u"The time at which a file is being written in a local directory.", +) + +_MTIME = Field.for_types( + u"mtime", + [int, long, float, type(None)], + u"A modification time to put into the metadata of a file being written in a local directory.", +) + +WRITE_DOWNLOADED_FILE = ActionType( + u"magic-folder:write-downloaded-file", + [_ABSPATH, _SIZE, _IS_CONFLICT, _NOW, _MTIME], + [], + u"A downloaded file is being written to the filesystem.", +) + +ALREADY_GONE = MessageType( + u"magic-folder:rename:already-gone", + [], + u"A deleted file could not be rewritten to a backup path because it no longer exists.", +) + +_REASON = Field( + u"reason", + lambda e: str(e), + u"An exception which may describe the form of the conflict.", + eliotutil.validateInstanceOf(Exception), +) + +OVERWRITE_BECOMES_CONFLICT = MessageType( + u"magic-folder:overwrite-becomes-conflict", + [_REASON], + u"An attempt to overwrite an existing file failed because that file is now conflicted.", +) + +_FILES = Field( + u"files", + lambda file_set: list(file_set), + u"All of the relative paths belonging to a Magic-Folder that are locally known.", +) + +ALL_FILES = MessageType( + u"magic-folder:all-files", + [_FILES], + u"A record of the rough state of the local database at the time of downloader start up.", +) + +_ITEMS = Field( + u"items", + lambda deque: list(deque), + u"Items in a processing queue.", +) + +ITEM_QUEUE = MessageType( + u"magic-folder:item-queue", + [_ITEMS], + u"A report of the items in the processing queue at this point.", +) + +_BATCH = Field( + u"batch", + # Just report the paths for now. Perhaps something from the values would + # also be useful, though? Consider it. + lambda batch: batch.keys(), + u"A batch of scanned items.", + eliotutil.validateInstanceOf(dict), +) + +SCAN_BATCH = MessageType( + u"magic-folder:scan-batch", + [_BATCH], + u"Items in a batch of files which were scanned from the DMD.", +) + +START_DOWNLOADING = ActionType( + u"magic-folder:start-downloading", + [_NICKNAME, _DIRECTION], + [], + u"A Magic-Folder downloader is initializing and beginning to manage downloads.", +) + +PERFORM_SCAN = ActionType( + u"magic-folder:perform-scan", + [], + [], + u"Remote storage is being scanned for changes which need to be synchronized.", +) + +_STATUS = Field.for_types( + u"status", + # Should just be unicode... + [unicode, bytes], + u"The status of an item in a processing queue.", +) + +QUEUED_ITEM_STATUS_CHANGE = MessageType( + u"magic-folder:item:status-change", + [eliotutil.RELPATH, _STATUS], + u"A queued item changed status.", +) + +_CONFLICT_REASON = Field.for_types( + u"conflict_reason", + [unicode, type(None)], + u"A human-readable explanation of why a file was in conflict.", + eliotutil.validateSetMembership({ + u"dbentry mismatch metadata", + u"dbentry newer version", + u"last_downloaded_uri mismatch", + u"file appeared", + None, + }), +) + +CHECKING_CONFLICTS = ActionType( + u"magic-folder:item:checking-conflicts", + [], + [_IS_CONFLICT, _CONFLICT_REASON], + u"A potential download item is being checked to determine if it is in a conflicted state.", +) + +REMOTE_DIRECTORY_CREATED = MessageType( + u"magic-folder:remote-directory-created", + [], + u"The downloader found a new directory in the DMD.", +) + +REMOTE_DIRECTORY_DELETED = MessageType( + u"magic-folder:remote-directory-deleted", + [], + u"The downloader found a directory has been deleted from the DMD.", +) + class QueueMixin(HookMixin): """ A parent class for Uploader and Downloader that handles putting @@ -606,8 +885,10 @@ class QueueMixin(HookMixin): self._db = db self._name = name self._clock = clock - self._debug_log = False - self._logger = None + self._log_fields = dict( + nickname=self._client.nickname, + direction=self._name, + ) self._hooks = { 'processed': None, 'started': None, @@ -627,10 +908,6 @@ class QueueMixin(HookMixin): self._process_history = deque(maxlen=20) self._in_progress = [] - def enable_debug_log(self, enabled=True): - twlog.msg("queue mixin enable debug logging: %s" % enabled) - self._debug_log = enabled - def get_status(self): """ Returns an iterable of instances that implement IQueuedItem @@ -662,27 +939,28 @@ class QueueMixin(HookMixin): """ Start a loop that looks for work to do and then does it. """ + action = PROCESSING_LOOP(**self._log_fields) + + # Note that we don't put the processing iterations into the logging + # action because we expect this loop to run for the whole lifetime of + # the process. The tooling for dealing with incomplete action trees + # is still somewhat lacking. Putting the iteractions into the overall + # loop action would hamper reading those logs for now. self._processing_loop = task.LoopingCall(self._processing_iteration) self._processing_loop.clock = self._clock self._processing = self._processing_loop.start(self._scan_delay(), now=True) - # if there are any errors coming out of _processing then our loop is - # done and we're hosed (i.e. _processing_iteration() itself has a bug - # in it) - def fatal_error(f): - self._log("internal error: %s" % (f.value,)) - self._log(f) - self._processing.addErrback(fatal_error) + with action.context(): + # We do make sure errors appear in the loop action though. + d = DeferredContext(self._processing) + d.addActionFinish() def _processing_iteration(self): """ One iteration runs self._process_deque which calls _perform_scan() and then completely drains the _deque (processing each item). """ - action = ITERATION( - nickname=self._client.nickname, - direction=self._name, - ) + action = ITERATION(**self._log_fields) with action.context(): d = DeferredContext(defer.Deferred()) @@ -710,7 +988,7 @@ class QueueMixin(HookMixin): def _perform_scan(self): return - @defer.inlineCallbacks + @eliotutil.inline_callbacks def _process_deque(self): # process everything currently in the queue. we're turning it # into a list so that if any new items get added while we're @@ -726,45 +1004,34 @@ class QueueMixin(HookMixin): # completed) self._in_progress.extend(to_process) - if to_process: - self._log("%d items to process" % len(to_process), ) - for item in to_process: - self._process_history.appendleft(item) - self._in_progress.remove(item) - try: - self._log(" processing '%r'" % (item,)) - proc = yield self._process(item) - self._log(" done: %r" % proc) - if not proc: - self._process_history.remove(item) - self._call_hook(item, 'item_processed') - except Exception as e: - log.err("processing '%r' failed: %s" % (item, e)) - item.set_status('failed', self._clock.seconds()) - proc = Failure() + with PROCESS_QUEUE(count=len(to_process)): + for item in to_process: + self._process_history.appendleft(item) + self._in_progress.remove(item) + try: + proc = yield self._process(item) + if not proc: + self._process_history.remove(item) + self._call_hook(item, 'item_processed') + except: + write_traceback() + item.set_status('failed', self._clock.seconds()) + proc = Failure() - self._call_hook(proc, 'processed') + self._call_hook(proc, 'processed') def _get_relpath(self, filepath): - self._log("_get_relpath(%r)" % (filepath,)) segments = unicode_segments_from(filepath, self._local_filepath) - self._log("segments = %r" % (segments,)) return u"/".join(segments) def _count(self, counter_name, delta=1): ctr = 'magic_folder.%s.%s' % (self._name, counter_name) self._client.stats_provider.count(ctr, delta) - self._log("%s += %r (now %r)" % (counter_name, delta, self._client.stats_provider.counters[ctr])) - - def _logcb(self, res, msg): - self._log("%s: %r" % (msg, res)) - return res - - def _log(self, msg): - s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg) - self._client.log(s) - if self._debug_log: - twlog.msg(s) + COUNT_CHANGED.log( + counter_name=counter_name, + delta=delta, + value=self._client.stats_provider.counters[ctr], + ) # this isn't in interfaces.py because it's very specific to QueueMixin class IQueuedItem(Interface): @@ -798,6 +1065,10 @@ class QueuedItem(object): if current_time is None: current_time = time.time() self._status_history[status] = current_time + QUEUED_ITEM_STATUS_CHANGE.log( + relpath=self.relpath_u, + status=status, + ) def status_time(self, state): """ @@ -883,147 +1154,152 @@ class Uploader(QueueMixin): recursive=False)#True) def start_monitoring(self): - self._log("start_monitoring") - d = defer.succeed(None) + action = START_MONITORING(**self._log_fields) + with action.context(): + d = DeferredContext(defer.succeed(None)) + d.addCallback(lambda ign: self._notifier.startReading()) d.addCallback(lambda ign: self._count('dirs_monitored')) d.addBoth(self._call_hook, 'started') - return d + return d.addActionFinish() def stop(self): - self._notifier.stopReading() - self._count('dirs_monitored', -1) - if self._periodic_callid: - try: - self._periodic_callid.cancel() - except AlreadyCancelled: - pass + action = STOP_MONITORING(**self._log_fields) + with action.context(): + self._notifier.stopReading() + self._count('dirs_monitored', -1) + if self._periodic_callid: + try: + self._periodic_callid.cancel() + except AlreadyCancelled: + pass - if hasattr(self._notifier, 'wait_until_stopped'): - d = self._notifier.wait_until_stopped() - else: - d = defer.succeed(None) + if hasattr(self._notifier, 'wait_until_stopped'): + d = DeferredContext(self._notifier.wait_until_stopped()) + else: + d = DeferredContext(defer.succeed(None)) - d.addCallback(lambda ignored: QueueMixin.stop(self)) - return d + d.addCallback(lambda ignored: QueueMixin.stop(self)) + return d.addActionFinish() def start_uploading(self): - self._log("start_uploading") - self.is_ready = True + action = START_UPLOADING(**self._log_fields) + with action: + self.is_ready = True - all_relpaths = self._db.get_all_relpaths() - self._log("all relpaths: %r" % (all_relpaths,)) + all_relpaths = self._db.get_all_relpaths() - for relpath_u in all_relpaths: - self._add_pending(relpath_u) + for relpath_u in all_relpaths: + self._add_pending(relpath_u) - self._full_scan() - return self._begin_processing() + self._full_scan() + self._begin_processing() def _scan_delay(self): return self._pending_delay def _full_scan(self): - self._periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan) - self._log("FULL SCAN") - self._log("_pending %r" % (self._pending)) - self._scan(u"") + with FULL_SCAN(**self._log_fields): + self._periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan) + self._scan(u"") def _add_pending(self, relpath_u): - self._log("add pending %r" % (relpath_u,)) - if magicpath.should_ignore_file(relpath_u): - self._log("_add_pending %r but should_ignore()==True" % (relpath_u,)) - return - if relpath_u in self._pending: - self._log("_add_pending %r but already pending" % (relpath_u,)) - return + with ADD_PENDING(relpath=relpath_u) as action: + if magicpath.should_ignore_file(relpath_u): + action.add_success_fields(ignored=True, already_pending=False, size=None) + return + if self.is_pending(relpath_u): + action.add_success_fields(ignored=False, already_pending=True, size=None) + return - self._pending.add(relpath_u) - fp = self._get_filepath(relpath_u) - pathinfo = get_pathinfo(unicode_from_filepath(fp)) - progress = PercentProgress() - self._log(u"add pending size: {}: {}".format(relpath_u, pathinfo.size)) - item = UploadItem(relpath_u, progress, pathinfo.size) - item.set_status('queued', self._clock.seconds()) - self._deque.append(item) - self._count('objects_queued') - self._log("_add_pending(%r) queued item" % (relpath_u,)) + self._pending.add(relpath_u) + fp = self._get_filepath(relpath_u) + pathinfo = get_pathinfo(unicode_from_filepath(fp)) + progress = PercentProgress() + action.add_success_fields(ignored=False, already_pending=False, size=pathinfo.size) + item = UploadItem(relpath_u, progress, pathinfo.size) + item.set_status('queued', self._clock.seconds()) + self._deque.append(item) + self._count('objects_queued') def _scan(self, reldir_u): # Scan a directory by (synchronously) adding the paths of all its children to self._pending. # Note that this doesn't add them to the deque -- that will + with SCAN(relpath=reldir_u): + fp = self._get_filepath(reldir_u) + try: + children = listdir_filepath(fp) + except EnvironmentError: + raise Exception("WARNING: magic folder: permission denied on directory %s" + % quote_filepath(fp)) + except FilenameEncodingError: + raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" + % quote_filepath(fp)) - self._log("SCAN '%r'" % (reldir_u,)) - fp = self._get_filepath(reldir_u) - try: - children = listdir_filepath(fp) - except EnvironmentError: - raise Exception("WARNING: magic folder: permission denied on directory %s" - % quote_filepath(fp)) - except FilenameEncodingError: - raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" - % quote_filepath(fp)) - - for child in children: - self._log(" scan; child %r" % (child,)) - _assert(isinstance(child, unicode), child=child) - self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child) + for child in children: + _assert(isinstance(child, unicode), child=child) + self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child) def is_pending(self, relpath_u): return relpath_u in self._pending def _notify(self, opaque, path, events_mask): - # Twisted doesn't seem to do anything if our callback throws - # an error, so... - try: - return self._real_notify(opaque, path, events_mask) - except Exception as e: - self._log(u"error calling _real_notify: {}".format(e)) - twlog.err(Failure(), "Error calling _real_notify") + with NOTIFIED(path=path, **self._log_fields): + try: + return self._real_notify(opaque, path, events_mask) + except Exception: + write_traceback() def _real_notify(self, opaque, path, events_mask): - self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) - relpath_u = self._get_relpath(path) + action = REACT_TO_INOTIFY( + # We could think about logging opaque here but ... it's opaque. + # All can do is id() or repr() it and neither of those actually + # produces very illuminating results. We drop opaque on the + # floor, anyway. + events=events_mask, + ) + success_fields = dict(non_dir_created=False, already_pending=False, ignored=False) - # We filter out IN_CREATE events not associated with a directory. - # Acting on IN_CREATE for files could cause us to read and upload - # a possibly-incomplete file before the application has closed it. - # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think. - # It isn't possible to avoid watching for IN_CREATE at all, because - # it is the only event notified for a directory creation. + with action: + relpath_u = self._get_relpath(path) - if ((events_mask & self._inotify.IN_CREATE) != 0 and - (events_mask & self._inotify.IN_ISDIR) == 0): - self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,)) - return - if relpath_u in self._pending: - self._log("not queueing %r because it is already pending" % (relpath_u,)) - return - if magicpath.should_ignore_file(relpath_u): - self._log("ignoring event for %r (ignorable path)" % (relpath_u,)) - return + # We filter out IN_CREATE events not associated with a directory. + # Acting on IN_CREATE for files could cause us to read and upload + # a possibly-incomplete file before the application has closed it. + # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think. + # It isn't possible to avoid watching for IN_CREATE at all, because + # it is the only event notified for a directory creation. - self._add_pending(relpath_u) - self._call_hook(path, 'inotify') + if ((events_mask & self._inotify.IN_CREATE) != 0 and + (events_mask & self._inotify.IN_ISDIR) == 0): + success_fields[u"non_dir_created"] = True + elif relpath_u in self._pending: + success_fields[u"already_pending"] = True + elif magicpath.should_ignore_file(relpath_u): + success_fields[u"ignored"] = True + else: + self._add_pending(relpath_u) + self._call_hook(path, 'inotify') + action.add_success_fields(**success_fields) def _process(self, item): """ - process a single QueuedItem. If this returns False, the item is - removed from _process_history + Possibly upload a single QueuedItem. If this returns False, the item is + removed from _process_history. """ + # Uploader with PROCESS_ITEM(item=item).context(): d = DeferredContext(defer.succeed(False)) - # Uploader - relpath_u = item.relpath_u - item.set_status('started', self._clock.seconds()) + relpath_u = item.relpath_u + item.set_status('started', self._clock.seconds()) - if relpath_u is None: - item.set_status('invalid_path', self._clock.seconds()) - return d.addActionFinish() + if relpath_u is None: + item.set_status('invalid_path', self._clock.seconds()) + return d.addActionFinish() - precondition(isinstance(relpath_u, unicode), relpath_u) - precondition(not relpath_u.endswith(u'/'), relpath_u) + precondition(isinstance(relpath_u, unicode), relpath_u) + precondition(not relpath_u.endswith(u'/'), relpath_u) def _maybe_upload(ign, now=None): MAYBE_UPLOAD.log(relpath=relpath_u) @@ -1125,7 +1401,7 @@ class Uploader(QueueMixin): self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True) db_entry = self._db.get_db_entry(relpath_u) - self._log("isdir dbentry %r" % (db_entry,)) + DIRECTORY_PATHENTRY.log(pathentry=db_entry) if not is_new_file(pathinfo, db_entry): NOT_NEW_DIRECTORY.log() return False @@ -1239,17 +1515,27 @@ class WriteFileMixin(object): def _write_downloaded_file(self, local_path_u, abspath_u, file_contents, is_conflict=False, now=None, mtime=None): - self._log( - ("_write_downloaded_file({abspath}, <{file_size} bytes>," - " is_conflict={is_conflict}, now={now}, mtime={mtime})").format( - abspath=abspath_u, - file_size=len(file_contents), - is_conflict=is_conflict, - now=now, - mtime=mtime, - ) + if now is None: + now = time.time() + action = WRITE_DOWNLOADED_FILE( + abspath=abspath_u, + size=len(file_contents), + is_conflict=is_conflict, + now=now, + mtime=mtime, ) + with action: + return self._write_downloaded_file_logged( + local_path_u, + abspath_u, + file_contents, + is_conflict, + now, + mtime, + ) + def _write_downloaded_file_logged(self, local_path_u, abspath_u, + file_contents, is_conflict, now, mtime): # 1. Write a temporary file, say .foo.tmp. # 2. is_conflict determines whether this is an overwrite or a conflict. # 3. Set the mtime of the replacement file to be T seconds before the @@ -1259,11 +1545,8 @@ class WriteFileMixin(object): # this operation fails, reclassify as a conflict and stop. # # Returns the path of the destination file. - precondition_abspath(abspath_u) replacement_path_u = abspath_u + u".tmp" # FIXME more unique - if now is None: - now = time.time() initial_path_u = os.path.dirname(abspath_u) fileutil.make_dirs_with_absolute_mode(local_path_u, initial_path_u, (~ self._umask) & 0777) @@ -1285,27 +1568,27 @@ class WriteFileMixin(object): fileutil.replace_file(abspath_u, replacement_path_u) return abspath_u except fileutil.ConflictError as e: - self._log("overwrite becomes _conflict: {}".format(e)) + OVERWRITE_BECOMES_CONFLICT.log(reason=e) return self._rename_conflicted_file(abspath_u, replacement_path_u) + @log_call( + action_type=u"magic-folder:rename-conflicted", + include_args=["abspath_u", "replacement_path_u"], + ) def _rename_conflicted_file(self, abspath_u, replacement_path_u): - self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u)) - conflict_path_u = self._get_conflicted_filename(abspath_u) - #if os.path.isfile(replacement_path_u): - # print "%r exists" % (replacement_path_u,) - #if os.path.isfile(conflict_path_u): - # print "%r exists" % (conflict_path_u,) - fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u) return conflict_path_u + @log_call( + action_type=u"magic-folder:rename-deleted", + include_args=["abspath_u"], + ) def _rename_deleted_file(self, abspath_u): - self._log('renaming deleted file to backup: %s' % (abspath_u,)) try: fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup') except OSError: - self._log("Already gone: '%s'" % (abspath_u,)) + ALREADY_GONE.log() return abspath_u @@ -1353,27 +1636,28 @@ class Downloader(QueueMixin, WriteFileMixin): self._status_reporter = status_reporter self._poll_interval = poll_interval - @defer.inlineCallbacks + @eliotutil.inline_callbacks def start_downloading(self): - self._log("start_downloading") - files = self._db.get_all_relpaths() - self._log("all files %s" % files) + action = START_DOWNLOADING(**self._log_fields) + with action: + ALL_FILES.log(files=self._db.get_all_relpaths()) - while True: - try: - data = yield self._scan_remote_collective(scan_self=True) - twlog.msg("Completed initial Magic Folder scan successfully ({})".format(self)) - self._begin_processing() - defer.returnValue(data) - break - - except Exception as e: - self._status_reporter( - False, "Initial scan has failed", - "Last tried at %s" % self.nice_current_time(), - ) - twlog.msg("Magic Folder failed initial scan: %s" % (e,)) - yield task.deferLater(self._clock, self._poll_interval, lambda: None) + while True: + try: + yield self._scan_remote_collective(scan_self=True) + # The integration tests watch for this log message to + # decide when it is safe to proceed. Clearly, we need + # better programmatic interrogation of magic-folder state. + print("Completed initial Magic Folder scan successfully ({})".format(self)) + self._begin_processing() + return + except Exception: + self._status_reporter( + False, "Initial scan has failed", + "Last tried at %s" % self.nice_current_time(), + ) + write_traceback() + yield task.deferLater(self._clock, self._poll_interval, lambda: None) def nice_current_time(self): return format_time(datetime.fromtimestamp(self._clock.seconds()).timetuple()) @@ -1444,7 +1728,6 @@ class Downloader(QueueMixin, WriteFileMixin): def scan_listing(listing_map): for encoded_relpath_u in listing_map.keys(): relpath_u = magicpath.magic2path(encoded_relpath_u) - self._log("found %r" % (relpath_u,)) file_node, metadata = listing_map[encoded_relpath_u] local_dbentry = self._get_local_latest(relpath_u) @@ -1488,16 +1771,19 @@ class Downloader(QueueMixin, WriteFileMixin): if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap: d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode: self._scan_remote_dmd(dir_name, dirnode, scan_batch)) - def _err(f, dir_name=dir_name): - self._log("failed to scan DMD for client %r: %s" % (dir_name, f)) - # XXX what should we do to make this failure more visible to users? - d2.addErrback(_err) - + # XXX what should we do to make this failure more visible to users? + d2.addErrback(write_traceback) return d2.result d.addCallback(scan_collective) + @log_call( + action_type=u"magic-folder:filter-batch-to-deque", + include_args=[], + include_result=False, + ) def _filter_batch_to_deque(ign): - self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch)) + ITEM_QUEUE.log(items=self._deque) + SCAN_BATCH.log(batch=scan_batch) for relpath_u in scan_batch.keys(): file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version']) @@ -1513,10 +1799,8 @@ class Downloader(QueueMixin, WriteFileMixin): self._deque.append(to_dl) self._count("objects_queued") else: - self._log("Excluding %r" % (relpath_u,)) self._call_hook(None, 'processed', async=True) # await this maybe-Deferred?? - self._log("deque after = %r" % (self._deque,)) d.addCallback(_filter_batch_to_deque) return d.addActionFinish() @@ -1533,18 +1817,20 @@ class Downloader(QueueMixin, WriteFileMixin): 'Last scan: %s' % self.nice_current_time(), ) except Exception as e: - twlog.msg("Remote scan failed: %s" % (e,)) - self._log("_scan failed: %s" % (repr(e),)) + write_traceback() self._status_reporter( False, 'Remote scan has failed: %s' % str(e), 'Last attempted at %s' % self.nice_current_time(), ) def _process(self, item): + """ + Possibly upload a single QueuedItem. If this returns False, the item is + removed from _process_history. + """ # Downloader now = self._clock.seconds() - self._log("started! %s" % (now,)) item.set_status('started', now) fp = self._get_filepath(item.relpath_u) abspath_u = unicode_from_filepath(fp) @@ -1579,7 +1865,6 @@ class Downloader(QueueMixin, WriteFileMixin): def failed(f): item.set_status('failure', self._clock.seconds()) - self._log("download failed: %s" % (str(f),)) self._count('objects_failed') return f @@ -1612,47 +1897,53 @@ class Downloader(QueueMixin, WriteFileMixin): # for this file, which is the URI under which the file was last # uploaded. - if db_entry: - # * 2c. If any of the following are true, then classify as a conflict: - # * i. there are pending notifications of changes to ``foo``; - # * ii. the last-seen statinfo is either absent (i.e. there is - # no entry in the database for this path), or different from the - # current statinfo; + with CHECKING_CONFLICTS() as action: + conflict_reason = None + if db_entry: + # * 2c. If any of the following are true, then classify as a conflict: + # * i. there are pending notifications of changes to ``foo``; + # * ii. the last-seen statinfo is either absent (i.e. there is + # no entry in the database for this path), or different from the + # current statinfo; - if current_statinfo.exists: - self._log("checking conflicts {}".format(item.relpath_u)) - if (db_entry.mtime_ns != current_statinfo.mtime_ns or \ - db_entry.ctime_ns != current_statinfo.ctime_ns or \ - db_entry.size != current_statinfo.size): - is_conflict = True - self._log("conflict because local change0") - - if db_entry.last_downloaded_uri is None \ - or db_entry.last_uploaded_uri is None \ - or dmd_last_downloaded_uri is None: - # we've never downloaded anything before for this - # file, but the other side might have created a new - # file "at the same time" - if db_entry.version >= item.metadata['version']: - self._log("conflict because my version >= remote version") + if current_statinfo.exists: + if (db_entry.mtime_ns != current_statinfo.mtime_ns or \ + db_entry.ctime_ns != current_statinfo.ctime_ns or \ + db_entry.size != current_statinfo.size): is_conflict = True - elif dmd_last_downloaded_uri != db_entry.last_downloaded_uri: - is_conflict = True - self._log("conflict because dmd_last_downloaded_uri != db_entry.last_downloaded_uri") + conflict_reason = u"dbentry mismatch metadata" - else: # no local db_entry .. but has the file appeared locally meantime? - if current_statinfo.exists: - is_conflict = True - self._log("conflict because local change1") + if db_entry.last_downloaded_uri is None \ + or db_entry.last_uploaded_uri is None \ + or dmd_last_downloaded_uri is None: + # we've never downloaded anything before for this + # file, but the other side might have created a new + # file "at the same time" + if db_entry.version >= item.metadata['version']: + is_conflict = True + conflict_reason = u"dbentry newer version" + elif dmd_last_downloaded_uri != db_entry.last_downloaded_uri: + is_conflict = True + conflict_reason = u"last_downloaded_uri mismatch" + + else: # no local db_entry .. but has the file appeared locally meantime? + if current_statinfo.exists: + is_conflict = True + conflict_reason = u"file appeared" + + action.add_success_fields( + is_conflict=is_conflict, + conflict_reason=conflict_reason, + ) if is_conflict: self._count('objects_conflicted') if item.relpath_u.endswith(u"/"): if item.metadata.get('deleted', False): - self._log("rmdir(%r) ignored" % (abspath_u,)) + REMOTE_DIRECTORY_DELETED.log() else: - self._log("mkdir(%r)" % (abspath_u,)) + REMOTE_DIRECTORY_CREATED.log() d.addCallback(lambda ign: fileutil.make_dirs(abspath_u)) d.addCallback(lambda ign: abspath_u) else: @@ -1673,7 +1964,6 @@ class Downloader(QueueMixin, WriteFileMixin): def trap_conflicts(f): f.trap(ConflictError) - self._log("IGNORE CONFLICT ERROR %r" % f) return False d.addErrback(trap_conflicts) return d.addActionFinish() diff --git a/src/allmydata/test/__init__.py b/src/allmydata/test/__init__.py index 03bd56f5d..4ee3e8060 100644 --- a/src/allmydata/test/__init__.py +++ b/src/allmydata/test/__init__.py @@ -57,3 +57,6 @@ import sys if sys.platform == "win32": from allmydata.windows.fixups import initialize initialize() + +from eliot import to_file +to_file(open("eliot.log", "w")) diff --git a/src/allmydata/test/eliotutil.py b/src/allmydata/test/eliotutil.py index 891af1934..abf292a09 100644 --- a/src/allmydata/test/eliotutil.py +++ b/src/allmydata/test/eliotutil.py @@ -71,6 +71,10 @@ def eliot_logged_test(f): def run(self, logger): # Record the MemoryLogger for later message extraction. storage.logger = logger + # Give the test access to the logger as well. It would be just + # fine to pass this as a keyword argument to `f` but implementing + # that now will give me conflict headaches so I'm not doing it. + self.eliot_logger = logger return f(self, *a, **kw) # Arrange for all messages written to the memory logger that diff --git a/src/allmydata/test/test_magic_folder.py b/src/allmydata/test/test_magic_folder.py index 800570b51..21ec2668d 100644 --- a/src/allmydata/test/test_magic_folder.py +++ b/src/allmydata/test/test_magic_folder.py @@ -9,7 +9,10 @@ from twisted.internet import defer, task, reactor from eliot.twisted import DeferredContext -from allmydata.interfaces import IDirectoryNode +from allmydata.interfaces import ( + IDirectoryNode, + NoSharesError, +) from allmydata.util.assertutil import precondition from allmydata.util import fake_inotify, fileutil, configutil, yamlutil @@ -29,6 +32,9 @@ from allmydata import magicfolderdb, magicpath from allmydata.util.fileutil import get_pathinfo from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.immutable.upload import Data +from allmydata.mutable.common import ( + UnrecoverableFileError, +) from .eliotutil import ( eliot_logged_test, @@ -928,6 +934,14 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea # now let bob try to do the download yield iterate(self.bob_magicfolder) + self.eliot_logger.flushTracebacks(UnrecoverableFileError) + logged = self.eliot_logger.flushTracebacks(NoSharesError) + self.assertEqual( + 1, + len(logged), + "Got other than expected single NoSharesError: {}".format(logged), + ) + # ...however Bob shouldn't have downloaded anything self._check_version_in_local_db(self.bob_magicfolder, u"blam", 0) # bob should *not* have downloaded anything, as we failed all the servers @@ -1482,11 +1496,6 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall def tearDown(self): d = super(SingleMagicFolderTestMixin, self).tearDown() - def _disable_debugging(res): - if self.magicfolder: - self.magicfolder.enable_debug_log(False) - return res - d.addBoth(_disable_debugging) d.addCallback(self.cleanup) shutil.rmtree(self.basedir, ignore_errors=True) return d @@ -1593,7 +1602,6 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall into the magic folder, so we upload the file and record the directory. (XXX split to separate test) """ - self.magicfolder.enable_debug_log() empty_tree_name = self.unicode_or_fallback(u"empty_tr\u00EAe", u"empty_tree") empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.basedir) new_empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.local_dir) @@ -1785,7 +1793,8 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall # pending callbacks including the exception are processed # before we flush the errors. yield task.deferLater(reactor, 0, lambda: None) - errors = self.flushLoggedErrors(BadStuff) + + errors = self.eliot_logger.flushTracebacks(BadStuff) # it seems on Windows the "RealTest" variant only produces 1 # notification for some reason.. self.assertTrue(len(errors) >= 1) @@ -1963,8 +1972,8 @@ class MockTest(SingleMagicFolderTestMixin, TestCase): self.magicfolder.uploader._clock.advance(self.magicfolder.uploader._periodic_full_scan_duration + 1) # this will have now done the full scan, so we have to do # an iteration to process anything from it - iterate_uploader(self.magicfolder) - return processed_d + iterate_d = iterate_uploader(self.magicfolder) + return processed_d.addCallback(lambda ignored: iterate_d) d.addCallback(_create_file_without_event) def _advance_clock(res): processed_d = self.magicfolder.uploader.set_hook('processed')