Convert various logging to use Eliot

This unfortunately also involves refactoring some inlineCallbacks-using code
to not use inlineCallbacks.
This commit is contained in:
Jean-Paul Calderone 2019-02-19 14:20:57 -05:00
parent e226956d14
commit 7e89776349
3 changed files with 228 additions and 165 deletions

View File

@ -17,7 +17,19 @@ from twisted.application import service
from zope.interface import Interface, Attribute, implementer
from allmydata.util import fileutil, configutil, yamlutil
from eliot import (
Field,
ActionType,
MessageType,
)
from eliot.twisted import DeferredContext
from allmydata.util import (
fileutil,
configutil,
yamlutil,
eliotutil,
)
from allmydata.interfaces import IDirectoryNode
from allmydata.util import log
from allmydata.util.fileutil import (
@ -392,7 +404,15 @@ class MagicFolder(service.MultiService):
return self.uploader.start_monitoring()
def stopService(self):
return defer.gatherResults([service.MultiService.stopService(self), self.finish()])
with MAGIC_FOLDER_STOP(name=self.name).context():
d = DeferredContext(defer.gatherResults([
self.uploader.stop(),
self.downloader.stop(),
]))
d.addBoth(
lambda ign: service.MultiService.stopService(self)
)
return d.addActionFinish()
def ready(self):
"""ready is used to signal us to start
@ -401,18 +421,78 @@ class MagicFolder(service.MultiService):
self.uploader.start_uploading() # synchronous, returns None
return self.downloader.start_downloading()
@defer.inlineCallbacks
def finish(self):
# must stop these concurrently so that the clock.advance()s
# work correctly in the tests. Also, it's arguably
# most-correct.
d0 = self.downloader.stop()
d1 = self.uploader.stop()
yield defer.DeferredList([d0, d1])
def remove_service(self):
return service.MultiService.disownServiceParent(self)
_NICKNAME = Field.for_types(
u"nickname",
[unicode],
u"A Magic-Folder participant nickname.",
)
_DIRECTION = Field.for_types(
u"direction",
[unicode],
u"A synchronization direction: uploader or downloader.",
eliotutil.validateSetMembership({u"uploader", u"downloader"}),
)
ITERATION = ActionType(
u"magic-folder:iteration",
[_NICKNAME, _DIRECTION],
[],
u"A step towards synchronization in one direction.",
)
PERFORM_SCAN = ActionType(
u"magic-folder:perform-scan",
[],
[],
u"Remote storage is being scanned for changes which need to be synchronized.",
)
SCAN_REMOTE_COLLECTIVE = ActionType(
u"magic-folder:scan-remote-collective",
[],
[],
u"The remote collective is being scanned for peer DMDs.",
)
SCAN_REMOTE_DMD = ActionType(
u"magic-folder:scan-remote-dmd",
[_NICKNAME],
[],
u"A peer DMD is being scanned for changes.",
)
REMOTE_VERSION = Field.for_types(
u"remote_version",
[int, long],
u"The version of a path found in a peer DMD.",
)
REMOTE_URI = Field.for_types(
u"remote_uri",
[bytes],
u"The filecap of a path found in a peer DMD.",
)
REMOTE_DMD_ENTRY = MessageType(
u"magic-folder:remote-dmd-entry",
[eliotutil.RELPATH, magicfolderdb.PATHENTRY, REMOTE_VERSION, REMOTE_URI],
u"A single entry found by scanning a peer DMD.",
)
ADD_TO_DOWNLOAD_QUEUE = MessageType(
u"magic-folder:add-to-download-queue",
[eliotutil.RELPATH],
u"An entry was found to be changed and is being queued for download.",
)
MAGIC_FOLDER_STOP = ActionType(
u"magic-folder:stop",
[_NICKNAME],
[],
u"A Magic-Folder is being stopped.",
)
class QueueMixin(HookMixin):
"""
@ -423,6 +503,8 @@ class QueueMixin(HookMixin):
Subclasses implement _scan_delay, _perform_scan and _process
:ivar _name: Either "uploader" or "downloader".
:ivar _deque: IQueuedItem instances to process
:ivar _process_history: the last 20 items we processed
@ -460,12 +542,6 @@ class QueueMixin(HookMixin):
# do we also want to bound on "maximum age"?
self._process_history = deque(maxlen=20)
self._in_progress = []
self._stopped = False
# a Deferred to wait for the _do_processing() loop to exit
# (gets set to the return from _do_processing() if we get that
# far)
self._processing = defer.succeed(None)
def enable_debug_log(self, enabled=True):
twlog.msg("queue mixin enable debug logging: %s" % enabled)
@ -483,50 +559,61 @@ class QueueMixin(HookMixin):
yield item
def _get_filepath(self, relpath_u):
self._log("_get_filepath(%r)" % (relpath_u,))
return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
def _begin_processing(self, res):
self._log("starting processing loop")
self._processing = self._do_processing()
def stop(self):
d = self._processing
# Speed up shutdown
self._processing_loop.stop()
self._processing = None
self._processing_loop = None
# wait for processing loop to actually exit
return d
# if there are any errors coming out of _do_processing then
# our loop is done and we're hosed (i.e. _do_processing()
# itself has a bug in it)
def _begin_processing(self, res):
self._processing_loop = task.LoopingCall(self._processing_iteration)
self._processing_loop.clock = self._clock
self._processing = self._processing_loop.start(self._scan_delay(), now=True)
# if there are any errors coming out of _processing then our loop is
# done and we're hosed (i.e. _processing_iteration() itself has a bug
# in it)
def fatal_error(f):
self._log("internal error: %s" % (f.value,))
self._log(f)
self._processing.addErrback(fatal_error)
return res
@defer.inlineCallbacks
def _do_processing(self):
def _processing_iteration(self):
"""
This is an infinite loop that processes things out of the _deque.
One iteration runs self._process_deque which calls
_perform_scan() and then completely drains the _deque
(processing each item). After that we yield for _turn_deque
seconds.
One iteration runs self._process_deque which calls _perform_scan() and
then completely drains the _deque (processing each item).
"""
while not self._stopped:
d = task.deferLater(self._clock, self._scan_delay(), lambda: None)
action = ITERATION(
nickname=self._client.nickname,
direction=self._name,
)
with action.context():
d = DeferredContext(defer.Deferred())
# adds items to our deque
yield self._perform_scan()
d.addCallback(lambda ignored: self._perform_scan())
# process anything in our queue
yield self._process_deque()
d.addCallback(lambda ignored: self._process_deque())
# we want to have our callLater queued in the reactor
# *before* we trigger the 'iteration' hook, so that hook
# can successfully advance the Clock and bypass the delay
# if required (e.g. in the tests).
self._call_hook(None, 'iteration')
if not self._stopped:
yield d
# Let the tests know we've made it this far.
d.addCallback(lambda ignored: self._call_hook(None, 'iteration'))
self._log("stopped")
# Get it out of the Eliot context
result = d.addActionFinish()
# Kick it off
result.callback(None)
# Give it back to LoopingCall so it can wait on us.
return result
def _scan_delay(self):
raise NotImplementedError
@ -655,7 +742,7 @@ class UploadItem(QueuedItem):
class Uploader(QueueMixin):
def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
QueueMixin.__init__(self, client, local_path_u, db, u'uploader', clock)
self.is_ready = False
@ -698,8 +785,6 @@ class Uploader(QueueMixin):
return d
def stop(self):
self._log("stop")
self._stopped = True
self._notifier.stopReading()
self._count('dirs_monitored', -1)
if self._periodic_callid:
@ -712,10 +797,8 @@ class Uploader(QueueMixin):
d = self._notifier.wait_until_stopped()
else:
d = defer.succeed(None)
# Speed up shutdown
self._processing.cancel()
# wait for processing loop to actually exit
d.addCallback(lambda ign: self._processing)
d.addCallback(lambda ignored: QueueMixin.stop(self))
return d
def start_uploading(self):
@ -1158,7 +1241,7 @@ class Downloader(QueueMixin, WriteFileMixin):
def __init__(self, client, local_path_u, db, collective_dirnode,
upload_readonly_dircap, clock, is_upload_pending, umask,
status_reporter, poll_interval=60):
QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
QueueMixin.__init__(self, client, local_path_u, db, u'downloader', clock)
if not IDirectoryNode.providedBy(collective_dirnode):
raise AssertionError("'collective_dircap' does not refer to a directory")
@ -1197,18 +1280,6 @@ class Downloader(QueueMixin, WriteFileMixin):
def nice_current_time(self):
return format_time(datetime.fromtimestamp(self._clock.seconds()).timetuple())
def stop(self):
self._log("stop")
self._stopped = True
# Speed up shutdown
self._processing.cancel()
d = defer.succeed(None)
# wait for processing loop to actually exit
d.addCallback(lambda ign: self._processing)
return d
def _should_download(self, relpath_u, remote_version, remote_uri):
"""
_should_download returns a bool indicating whether or not a remote object should be downloaded.
@ -1270,106 +1341,112 @@ class Downloader(QueueMixin, WriteFileMixin):
return collective_dirmap_d
def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
self._log("_scan_remote_dmd nickname %r" % (nickname,))
d = dirnode.list()
def scan_listing(listing_map):
for encoded_relpath_u in listing_map.keys():
relpath_u = magicpath.magic2path(encoded_relpath_u)
self._log("found %r" % (relpath_u,))
with SCAN_REMOTE_DMD(nickname=nickname).context():
d = DeferredContext(dirnode.list())
def scan_listing(listing_map):
for encoded_relpath_u in listing_map.keys():
relpath_u = magicpath.magic2path(encoded_relpath_u)
self._log("found %r" % (relpath_u,))
file_node, metadata = listing_map[encoded_relpath_u]
local_dbentry = self._get_local_latest(relpath_u)
file_node, metadata = listing_map[encoded_relpath_u]
local_dbentry = self._get_local_latest(relpath_u)
# XXX FIXME this is *awefully* similar to
# _should_download code in function etc -- can we
# share?
remote_version = metadata.get('version', None)
remote_uri = file_node.get_readonly_uri()
self._log("%r has local dbentry %r, remote version %r, remote uri %r"
% (relpath_u, local_dbentry, remote_version, remote_uri))
# XXX FIXME this is *awefully* similar to
# _should_download code in function etc -- can we
# share?
remote_version = metadata.get('version', None)
remote_uri = file_node.get_readonly_uri()
REMOTE_DMD_ENTRY.log(
relpath=relpath_u,
pathentry=local_dbentry,
remote_version=remote_version,
remote_uri=remote_uri,
)
if (local_dbentry is None or remote_version is None or
local_dbentry.version < remote_version or
(local_dbentry.version == remote_version and local_dbentry.last_downloaded_uri != remote_uri)):
self._log("%r added to download queue" % (relpath_u,))
if scan_batch.has_key(relpath_u):
scan_batch[relpath_u] += [(file_node, metadata)]
else:
scan_batch[relpath_u] = [(file_node, metadata)]
self._status_reporter(
True, 'Magic folder is working',
'Last scan: %s' % self.nice_current_time(),
)
if (local_dbentry is None or remote_version is None or
local_dbentry.version < remote_version or
(local_dbentry.version == remote_version and local_dbentry.last_downloaded_uri != remote_uri)):
ADD_TO_DOWNLOAD_QUEUE.log(relpath=relpath_u)
if scan_batch.has_key(relpath_u):
scan_batch[relpath_u] += [(file_node, metadata)]
else:
scan_batch[relpath_u] = [(file_node, metadata)]
self._status_reporter(
True, 'Magic folder is working',
'Last scan: %s' % self.nice_current_time(),
)
d.addCallback(scan_listing)
d.addBoth(self._logcb, "end of _scan_remote_dmd")
return d
d.addCallback(scan_listing)
d.addBoth(self._logcb, "end of _scan_remote_dmd")
return d.addActionFinish()
def _scan_remote_collective(self, scan_self=False):
self._log("_scan_remote_collective")
scan_batch = {} # path -> [(filenode, metadata)]
with SCAN_REMOTE_COLLECTIVE().context():
scan_batch = {} # path -> [(filenode, metadata)]
d = self._collective_dirnode.list()
def scan_collective(dirmap):
d2 = defer.succeed(None)
for dir_name in dirmap:
(dirnode, metadata) = dirmap[dir_name]
if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
self._scan_remote_dmd(dir_name, dirnode, scan_batch))
def _err(f, dir_name=dir_name):
self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
# XXX what should we do to make this failure more visible to users?
d2.addErrback(_err)
d = DeferredContext(self._collective_dirnode.list())
def scan_collective(dirmap):
d2 = defer.succeed(None)
for dir_name in dirmap:
(dirnode, metadata) = dirmap[dir_name]
if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
self._scan_remote_dmd(dir_name, dirnode, scan_batch))
def _err(f, dir_name=dir_name):
self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
# XXX what should we do to make this failure more visible to users?
d2.addErrback(_err)
return d2
d.addCallback(scan_collective)
return d2
d.addCallback(scan_collective)
def _filter_batch_to_deque(ign):
self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
for relpath_u in scan_batch.keys():
file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
def _filter_batch_to_deque(ign):
self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
for relpath_u in scan_batch.keys():
file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
if self._should_download(relpath_u, metadata['version'], file_node.get_readonly_uri()):
to_dl = DownloadItem(
relpath_u,
PercentProgress(file_node.get_size()),
file_node,
metadata,
file_node.get_size(),
)
to_dl.set_status('queued', self._clock.seconds())
self._deque.append(to_dl)
self._count("objects_queued")
else:
self._log("Excluding %r" % (relpath_u,))
self._call_hook(None, 'processed', async=True) # await this maybe-Deferred??
if self._should_download(relpath_u, metadata['version'], file_node.get_readonly_uri()):
to_dl = DownloadItem(
relpath_u,
PercentProgress(file_node.get_size()),
file_node,
metadata,
file_node.get_size(),
)
to_dl.set_status('queued', self._clock.seconds())
self._deque.append(to_dl)
self._count("objects_queued")
else:
self._log("Excluding %r" % (relpath_u,))
self._call_hook(None, 'processed', async=True) # await this maybe-Deferred??
self._log("deque after = %r" % (self._deque,))
d.addCallback(_filter_batch_to_deque)
return d
self._log("deque after = %r" % (self._deque,))
d.addCallback(_filter_batch_to_deque)
return d.addActionFinish()
def _scan_delay(self):
return self._poll_interval
@defer.inlineCallbacks
def _perform_scan(self):
x = None
try:
x = yield self._scan_remote_collective()
self._status_reporter(
True, 'Magic folder is working',
'Last scan: %s' % self.nice_current_time(),
)
except Exception as e:
twlog.msg("Remote scan failed: %s" % (e,))
self._log("_scan failed: %s" % (repr(e),))
self._status_reporter(
False, 'Remote scan has failed: %s' % str(e),
'Last attempted at %s' % self.nice_current_time(),
)
defer.returnValue(x)
with PERFORM_SCAN().context():
d = DeferredContext(defer.maybeDeferred(self._scan_remote_collective))
def scanned(result):
self._status_reporter(
True, 'Magic folder is working',
'Last scan: %s' % self.nice_current_time(),
)
return result
def scan_failed(reason):
twlog.msg("Remote scan failed: %s" % (reason.value,))
self._log("_scan failed: %s" % (repr(reason.value),))
self._status_reporter(
False, 'Remote scan has failed: %s' % str(reason.value),
'Last attempted at %s' % self.nice_current_time(),
)
return None
d.addCallbacks(scanned, scan_failed)
return d.addActionFinish()
def _process(self, item):
# Downloader

View File

@ -149,10 +149,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin):
def cleanup(self, res):
d = defer.succeed(None)
def _clean(ign):
d = self.magicfolder.finish()
self.magicfolder.uploader._clock.advance(self.magicfolder.uploader._pending_delay + 1)
self.magicfolder.downloader._clock.advance(self.magicfolder.downloader._poll_interval + 1)
return d
return self.magicfolder.disownServiceParent()
d.addCallback(_clean)
d.addCallback(lambda ign: res)

View File

@ -468,17 +468,11 @@ class MagicFolderDbTests(unittest.TestCase):
def iterate_downloader(magic):
# can do either of these:
#d = magic.downloader._process_deque()
d = magic.downloader.set_hook('iteration')
magic.downloader._clock.advance(magic.downloader._poll_interval + 1)
return d
return magic.downloader._processing_iteration()
def iterate_uploader(magic):
d = magic.uploader.set_hook('iteration')
magic.uploader._clock.advance(magic.uploader._pending_delay + 1)
return d
return magic.uploader._processing_iteration()
@defer.inlineCallbacks
def iterate(magic):
@ -705,16 +699,11 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
@defer.inlineCallbacks
def tearDown(self):
yield GridTestMixin.tearDown(self)
d0 = self.alice_magicfolder.finish()
d1 = self.bob_magicfolder.finish()
for mf in [self.alice_magicfolder, self.bob_magicfolder]:
mf.uploader._clock.advance(mf.uploader._pending_delay + 1)
mf.downloader._clock.advance(mf.downloader._poll_interval + 1)
yield d0
yield d1
@capture_logging(None)
@defer.inlineCallbacks
def test_alice_delete_bob_restore(self, logger):