mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-21 02:01:31 +00:00
Merge pull request #550 from tahoe-lafs/2972.magic-folder-eliot-logs
Convert Magic-Folder logging to use Eliot Fixes: ticket:2972
This commit is contained in:
commit
9cf0096603
1
newsfragments/2972.feature
Normal file
1
newsfragments/2972.feature
Normal file
@ -0,0 +1 @@
|
||||
The Magic-Folder frontend now emits structured, causal logs. This makes it easier for developers to make sense of its behavior and for users to submit useful debugging information alongside problem reports.
|
@ -7,6 +7,7 @@ from datetime import datetime
|
||||
import time
|
||||
import ConfigParser
|
||||
|
||||
from twisted.python.filepath import FilePath
|
||||
from twisted.python.monkey import MonkeyPatcher
|
||||
from twisted.internet import defer, reactor, task
|
||||
from twisted.internet.error import AlreadyCancelled
|
||||
@ -17,7 +18,22 @@ from twisted.application import service
|
||||
|
||||
from zope.interface import Interface, Attribute, implementer
|
||||
|
||||
from allmydata.util import fileutil, configutil, yamlutil
|
||||
from eliot import (
|
||||
Field,
|
||||
ActionType,
|
||||
MessageType,
|
||||
write_failure,
|
||||
)
|
||||
from eliot.twisted import (
|
||||
DeferredContext,
|
||||
)
|
||||
|
||||
from allmydata.util import (
|
||||
fileutil,
|
||||
configutil,
|
||||
yamlutil,
|
||||
eliotutil,
|
||||
)
|
||||
from allmydata.interfaces import IDirectoryNode
|
||||
from allmydata.util import log
|
||||
from allmydata.util.fileutil import (
|
||||
@ -392,7 +408,12 @@ class MagicFolder(service.MultiService):
|
||||
return self.uploader.start_monitoring()
|
||||
|
||||
def stopService(self):
|
||||
return defer.gatherResults([service.MultiService.stopService(self), self.finish()])
|
||||
with MAGIC_FOLDER_STOP(nickname=self.name).context():
|
||||
d = DeferredContext(self._finish())
|
||||
d.addBoth(
|
||||
lambda ign: service.MultiService.stopService(self)
|
||||
)
|
||||
return d.addActionFinish()
|
||||
|
||||
def ready(self):
|
||||
"""ready is used to signal us to start
|
||||
@ -401,18 +422,161 @@ class MagicFolder(service.MultiService):
|
||||
self.uploader.start_uploading() # synchronous, returns None
|
||||
return self.downloader.start_downloading()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def finish(self):
|
||||
# must stop these concurrently so that the clock.advance()s
|
||||
# work correctly in the tests. Also, it's arguably
|
||||
# most-correct.
|
||||
def _finish(self):
|
||||
d0 = self.downloader.stop()
|
||||
d1 = self.uploader.stop()
|
||||
yield defer.DeferredList([d0, d1])
|
||||
return defer.DeferredList(list(
|
||||
DeferredContext(d).addErrback(write_failure).result
|
||||
for d in [d0, d1]
|
||||
))
|
||||
|
||||
def remove_service(self):
|
||||
return service.MultiService.disownServiceParent(self)
|
||||
|
||||
_NICKNAME = Field.for_types(
|
||||
u"nickname",
|
||||
[unicode, bytes],
|
||||
u"A Magic-Folder participant nickname.",
|
||||
)
|
||||
|
||||
_DIRECTION = Field.for_types(
|
||||
u"direction",
|
||||
[unicode],
|
||||
u"A synchronization direction: uploader or downloader.",
|
||||
eliotutil.validateSetMembership({u"uploader", u"downloader"}),
|
||||
)
|
||||
|
||||
ITERATION = ActionType(
|
||||
u"magic-folder:iteration",
|
||||
[_NICKNAME, _DIRECTION],
|
||||
[],
|
||||
u"A step towards synchronization in one direction.",
|
||||
)
|
||||
|
||||
PERFORM_SCAN = ActionType(
|
||||
u"magic-folder:perform-scan",
|
||||
[],
|
||||
[],
|
||||
u"Remote storage is being scanned for changes which need to be synchronized.",
|
||||
)
|
||||
|
||||
SCAN_REMOTE_COLLECTIVE = ActionType(
|
||||
u"magic-folder:scan-remote-collective",
|
||||
[],
|
||||
[],
|
||||
u"The remote collective is being scanned for peer DMDs.",
|
||||
)
|
||||
|
||||
SCAN_REMOTE_DMD = ActionType(
|
||||
u"magic-folder:scan-remote-dmd",
|
||||
[_NICKNAME],
|
||||
[],
|
||||
u"A peer DMD is being scanned for changes.",
|
||||
)
|
||||
|
||||
REMOTE_VERSION = Field.for_types(
|
||||
u"remote_version",
|
||||
[int, long],
|
||||
u"The version of a path found in a peer DMD.",
|
||||
)
|
||||
|
||||
REMOTE_URI = Field.for_types(
|
||||
u"remote_uri",
|
||||
[bytes],
|
||||
u"The filecap of a path found in a peer DMD.",
|
||||
)
|
||||
|
||||
REMOTE_DMD_ENTRY = MessageType(
|
||||
u"magic-folder:remote-dmd-entry",
|
||||
[eliotutil.RELPATH, magicfolderdb.PATHENTRY, REMOTE_VERSION, REMOTE_URI],
|
||||
u"A single entry found by scanning a peer DMD.",
|
||||
)
|
||||
|
||||
ADD_TO_DOWNLOAD_QUEUE = MessageType(
|
||||
u"magic-folder:add-to-download-queue",
|
||||
[eliotutil.RELPATH],
|
||||
u"An entry was found to be changed and is being queued for download.",
|
||||
)
|
||||
|
||||
MAGIC_FOLDER_STOP = ActionType(
|
||||
u"magic-folder:stop",
|
||||
[_NICKNAME],
|
||||
[],
|
||||
u"A Magic-Folder is being stopped.",
|
||||
)
|
||||
|
||||
MAYBE_UPLOAD = MessageType(
|
||||
u"magic-folder:maybe-upload",
|
||||
[eliotutil.RELPATH],
|
||||
u"A decision is being made about whether to upload a file.",
|
||||
)
|
||||
|
||||
PENDING = Field.for_types(
|
||||
u"pending",
|
||||
[list],
|
||||
u"The paths which are pending processing.",
|
||||
)
|
||||
|
||||
REMOVE_FROM_PENDING = ActionType(
|
||||
u"magic-folder:remove-from-pending",
|
||||
[eliotutil.RELPATH, PENDING],
|
||||
[],
|
||||
u"An item being processed is being removed from the pending set.",
|
||||
)
|
||||
|
||||
PATH = Field(
|
||||
u"path",
|
||||
lambda fp: quote_filepath(fp),
|
||||
u"A local filesystem path.",
|
||||
eliotutil.validateInstanceOf(FilePath),
|
||||
)
|
||||
|
||||
NOTIFIED_OBJECT_DISAPPEARED = MessageType(
|
||||
u"magic-folder:notified-object-disappeared",
|
||||
[PATH],
|
||||
u"A path which generated a notification was not found on the filesystem. This is normal.",
|
||||
)
|
||||
|
||||
NOT_UPLOADING = MessageType(
|
||||
u"magic-folder:not-uploading",
|
||||
[],
|
||||
u"An item being processed is not going to be uploaded.",
|
||||
)
|
||||
|
||||
SYMLINK = MessageType(
|
||||
u"magic-folder:symlink",
|
||||
[PATH],
|
||||
u"An item being processed was a symlink and is being skipped",
|
||||
)
|
||||
|
||||
CREATED_DIRECTORY = Field.for_types(
|
||||
u"created_directory",
|
||||
[unicode],
|
||||
u"The relative path of a newly created directory in a magic-folder.",
|
||||
)
|
||||
|
||||
PROCESS_DIRECTORY = ActionType(
|
||||
u"magic-folder:process-directory",
|
||||
[],
|
||||
[CREATED_DIRECTORY],
|
||||
u"An item being processed was a directory.",
|
||||
)
|
||||
|
||||
NOT_NEW_DIRECTORY = MessageType(
|
||||
u"magic-folder:not-new-directory",
|
||||
[],
|
||||
u"A directory item being processed was found to not be new.",
|
||||
)
|
||||
|
||||
NOT_NEW_FILE = MessageType(
|
||||
u"magic-folder:not-new-file",
|
||||
[],
|
||||
u"A file item being processed was found to not be new (or changed).",
|
||||
)
|
||||
|
||||
SPECIAL_FILE = MessageType(
|
||||
u"magic-folder:special-file",
|
||||
[],
|
||||
u"An item being processed was found to be of a special type which is not supported.",
|
||||
)
|
||||
|
||||
class QueueMixin(HookMixin):
|
||||
"""
|
||||
@ -423,6 +587,8 @@ class QueueMixin(HookMixin):
|
||||
|
||||
Subclasses implement _scan_delay, _perform_scan and _process
|
||||
|
||||
:ivar unicode _name: Either "uploader" or "downloader".
|
||||
|
||||
:ivar _deque: IQueuedItem instances to process
|
||||
|
||||
:ivar _process_history: the last 20 items we processed
|
||||
@ -460,12 +626,6 @@ class QueueMixin(HookMixin):
|
||||
# do we also want to bound on "maximum age"?
|
||||
self._process_history = deque(maxlen=20)
|
||||
self._in_progress = []
|
||||
self._stopped = False
|
||||
|
||||
# a Deferred to wait for the _do_processing() loop to exit
|
||||
# (gets set to the return from _do_processing() if we get that
|
||||
# far)
|
||||
self._processing = defer.succeed(None)
|
||||
|
||||
def enable_debug_log(self, enabled=True):
|
||||
twlog.msg("queue mixin enable debug logging: %s" % enabled)
|
||||
@ -483,50 +643,66 @@ class QueueMixin(HookMixin):
|
||||
yield item
|
||||
|
||||
def _get_filepath(self, relpath_u):
|
||||
self._log("_get_filepath(%r)" % (relpath_u,))
|
||||
return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
|
||||
|
||||
def _begin_processing(self, res):
|
||||
self._log("starting processing loop")
|
||||
self._processing = self._do_processing()
|
||||
def stop(self):
|
||||
"""
|
||||
Don't process queued items anymore.
|
||||
|
||||
# if there are any errors coming out of _do_processing then
|
||||
# our loop is done and we're hosed (i.e. _do_processing()
|
||||
# itself has a bug in it)
|
||||
:return Deferred: A ``Deferred`` that fires when processing has
|
||||
completely stopped.
|
||||
"""
|
||||
d = self._processing
|
||||
self._processing_loop.stop()
|
||||
self._processing = None
|
||||
self._processing_loop = None
|
||||
return d
|
||||
|
||||
def _begin_processing(self):
|
||||
"""
|
||||
Start a loop that looks for work to do and then does it.
|
||||
"""
|
||||
self._processing_loop = task.LoopingCall(self._processing_iteration)
|
||||
self._processing_loop.clock = self._clock
|
||||
self._processing = self._processing_loop.start(self._scan_delay(), now=True)
|
||||
|
||||
# if there are any errors coming out of _processing then our loop is
|
||||
# done and we're hosed (i.e. _processing_iteration() itself has a bug
|
||||
# in it)
|
||||
def fatal_error(f):
|
||||
self._log("internal error: %s" % (f.value,))
|
||||
self._log(f)
|
||||
self._processing.addErrback(fatal_error)
|
||||
return res
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_processing(self):
|
||||
def _processing_iteration(self):
|
||||
"""
|
||||
This is an infinite loop that processes things out of the _deque.
|
||||
|
||||
One iteration runs self._process_deque which calls
|
||||
_perform_scan() and then completely drains the _deque
|
||||
(processing each item). After that we yield for _turn_deque
|
||||
seconds.
|
||||
One iteration runs self._process_deque which calls _perform_scan() and
|
||||
then completely drains the _deque (processing each item).
|
||||
"""
|
||||
while not self._stopped:
|
||||
d = task.deferLater(self._clock, self._scan_delay(), lambda: None)
|
||||
action = ITERATION(
|
||||
nickname=self._client.nickname,
|
||||
direction=self._name,
|
||||
)
|
||||
with action.context():
|
||||
d = DeferredContext(defer.Deferred())
|
||||
|
||||
# adds items to our deque
|
||||
yield self._perform_scan()
|
||||
d.addCallback(lambda ignored: self._perform_scan())
|
||||
|
||||
# process anything in our queue
|
||||
yield self._process_deque()
|
||||
d.addCallback(lambda ignored: self._process_deque())
|
||||
|
||||
# we want to have our callLater queued in the reactor
|
||||
# *before* we trigger the 'iteration' hook, so that hook
|
||||
# can successfully advance the Clock and bypass the delay
|
||||
# if required (e.g. in the tests).
|
||||
self._call_hook(None, 'iteration')
|
||||
if not self._stopped:
|
||||
yield d
|
||||
# Let the tests know we've made it this far.
|
||||
d.addCallback(lambda ignored: self._call_hook(None, 'iteration'))
|
||||
|
||||
self._log("stopped")
|
||||
# Get it out of the Eliot context
|
||||
result = d.addActionFinish()
|
||||
|
||||
# Kick it off
|
||||
result.callback(None)
|
||||
|
||||
# Give it back to LoopingCall so it can wait on us.
|
||||
return result
|
||||
|
||||
def _scan_delay(self):
|
||||
raise NotImplementedError
|
||||
@ -652,10 +828,27 @@ class UploadItem(QueuedItem):
|
||||
pass
|
||||
|
||||
|
||||
_ITEM = Field(
|
||||
u"item",
|
||||
lambda i: {
|
||||
u"relpath": i.relpath_u,
|
||||
u"size": i.size,
|
||||
},
|
||||
u"An item to be uploaded or downloaded.",
|
||||
eliotutil.validateInstanceOf(QueuedItem),
|
||||
)
|
||||
|
||||
PROCESS_ITEM = ActionType(
|
||||
u"magic-folder:process-item",
|
||||
[_ITEM],
|
||||
[],
|
||||
u"A path which was found wanting of an update is receiving an update.",
|
||||
)
|
||||
|
||||
class Uploader(QueueMixin):
|
||||
|
||||
def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock):
|
||||
QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
|
||||
QueueMixin.__init__(self, client, local_path_u, db, u'uploader', clock)
|
||||
|
||||
self.is_ready = False
|
||||
|
||||
@ -698,8 +891,6 @@ class Uploader(QueueMixin):
|
||||
return d
|
||||
|
||||
def stop(self):
|
||||
self._log("stop")
|
||||
self._stopped = True
|
||||
self._notifier.stopReading()
|
||||
self._count('dirs_monitored', -1)
|
||||
if self._periodic_callid:
|
||||
@ -712,10 +903,8 @@ class Uploader(QueueMixin):
|
||||
d = self._notifier.wait_until_stopped()
|
||||
else:
|
||||
d = defer.succeed(None)
|
||||
# Speed up shutdown
|
||||
self._processing.cancel()
|
||||
# wait for processing loop to actually exit
|
||||
d.addCallback(lambda ign: self._processing)
|
||||
|
||||
d.addCallback(lambda ignored: QueueMixin.stop(self))
|
||||
return d
|
||||
|
||||
def start_uploading(self):
|
||||
@ -729,9 +918,7 @@ class Uploader(QueueMixin):
|
||||
self._add_pending(relpath_u)
|
||||
|
||||
self._full_scan()
|
||||
# XXX changed this while re-basing; double check we can
|
||||
# *really* just call this synchronously.
|
||||
return self._begin_processing(None)
|
||||
return self._begin_processing()
|
||||
|
||||
def _scan_delay(self):
|
||||
return self._pending_delay
|
||||
@ -824,37 +1011,37 @@ class Uploader(QueueMixin):
|
||||
process a single QueuedItem. If this returns False, the item is
|
||||
removed from _process_history
|
||||
"""
|
||||
with PROCESS_ITEM(item=item).context():
|
||||
d = DeferredContext(defer.succeed(False))
|
||||
|
||||
# Uploader
|
||||
relpath_u = item.relpath_u
|
||||
self._log("_process(%r)" % (relpath_u,))
|
||||
item.set_status('started', self._clock.seconds())
|
||||
|
||||
if relpath_u is None:
|
||||
item.set_status('invalid_path', self._clock.seconds())
|
||||
return defer.succeed(False)
|
||||
return d.addActionFinish()
|
||||
|
||||
precondition(isinstance(relpath_u, unicode), relpath_u)
|
||||
precondition(not relpath_u.endswith(u'/'), relpath_u)
|
||||
|
||||
d = defer.succeed(False)
|
||||
|
||||
def _maybe_upload(ign, now=None):
|
||||
self._log("_maybe_upload: relpath_u=%r, now=%r" % (relpath_u, now))
|
||||
MAYBE_UPLOAD.log(relpath=relpath_u)
|
||||
if now is None:
|
||||
now = time.time()
|
||||
fp = self._get_filepath(relpath_u)
|
||||
pathinfo = get_pathinfo(unicode_from_filepath(fp))
|
||||
|
||||
self._log("about to remove %r from pending set %r" %
|
||||
(relpath_u, self._pending))
|
||||
try:
|
||||
self._pending.remove(relpath_u)
|
||||
with REMOVE_FROM_PENDING(relpath=relpath_u, pending=list(self._pending)):
|
||||
self._pending.remove(relpath_u)
|
||||
except KeyError:
|
||||
self._log("WRONG that %r wasn't in pending" % (relpath_u,))
|
||||
pass
|
||||
encoded_path_u = magicpath.path2magic(relpath_u)
|
||||
|
||||
if not pathinfo.exists:
|
||||
# FIXME merge this with the 'isfile' case.
|
||||
self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
|
||||
NOTIFIED_OBJECT_DISAPPEARED.log(path=fp)
|
||||
self._count('objects_disappeared')
|
||||
|
||||
db_entry = self._db.get_db_entry(relpath_u)
|
||||
@ -866,7 +1053,7 @@ class Uploader(QueueMixin):
|
||||
if is_new_file(pathinfo, db_entry):
|
||||
new_version = db_entry.version + 1
|
||||
else:
|
||||
self._log("Not uploading %r" % (relpath_u,))
|
||||
NOT_UPLOADING.log()
|
||||
self._count('objects_not_uploaded')
|
||||
return False
|
||||
|
||||
@ -905,12 +1092,12 @@ class Uploader(QueueMixin):
|
||||
metadata['last_uploaded_uri'] = db_entry.last_uploaded_uri
|
||||
|
||||
empty_uploadable = Data("", self._client.convergence)
|
||||
d2 = self._upload_dirnode.add_file(
|
||||
d2 = DeferredContext(self._upload_dirnode.add_file(
|
||||
encoded_path_u, empty_uploadable,
|
||||
metadata=metadata,
|
||||
overwrite=True,
|
||||
progress=item.progress,
|
||||
)
|
||||
))
|
||||
|
||||
def _add_db_entry(filenode):
|
||||
filecap = filenode.get_uri()
|
||||
@ -929,40 +1116,36 @@ class Uploader(QueueMixin):
|
||||
self._count('files_uploaded')
|
||||
d2.addCallback(_add_db_entry)
|
||||
d2.addCallback(lambda ign: True)
|
||||
return d2
|
||||
return d2.result
|
||||
elif pathinfo.islink:
|
||||
self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
|
||||
SYMLINK.log(path=fp)
|
||||
return False
|
||||
elif pathinfo.isdir:
|
||||
self._log("ISDIR")
|
||||
if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
|
||||
self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
|
||||
|
||||
db_entry = self._db.get_db_entry(relpath_u)
|
||||
self._log("isdir dbentry %r" % (db_entry,))
|
||||
if not is_new_file(pathinfo, db_entry):
|
||||
self._log("NOT A NEW FILE")
|
||||
NOT_NEW_DIRECTORY.log()
|
||||
return False
|
||||
|
||||
uploadable = Data("", self._client.convergence)
|
||||
encoded_path_u += magicpath.path2magic(u"/")
|
||||
self._log("encoded_path_u = %r" % (encoded_path_u,))
|
||||
upload_d = self._upload_dirnode.add_file(
|
||||
encoded_path_u, uploadable,
|
||||
metadata={"version": 0},
|
||||
overwrite=True,
|
||||
progress=item.progress,
|
||||
)
|
||||
with PROCESS_DIRECTORY().context() as action:
|
||||
upload_d = DeferredContext(self._upload_dirnode.add_file(
|
||||
encoded_path_u, uploadable,
|
||||
metadata={"version": 0},
|
||||
overwrite=True,
|
||||
progress=item.progress,
|
||||
))
|
||||
def _dir_succeeded(ign):
|
||||
self._log("created subdirectory %r" % (relpath_u,))
|
||||
action.add_success_fields(created_directory=relpath_u)
|
||||
self._count('directories_created')
|
||||
def _dir_failed(f):
|
||||
self._log("failed to create subdirectory %r" % (relpath_u,))
|
||||
return f
|
||||
upload_d.addCallbacks(_dir_succeeded, _dir_failed)
|
||||
upload_d.addCallback(_dir_succeeded)
|
||||
upload_d.addCallback(lambda ign: self._scan(relpath_u))
|
||||
upload_d.addCallback(lambda ign: True)
|
||||
return upload_d
|
||||
return upload_d.addActionFinish()
|
||||
elif pathinfo.isfile:
|
||||
db_entry = self._db.get_db_entry(relpath_u)
|
||||
|
||||
@ -973,7 +1156,7 @@ class Uploader(QueueMixin):
|
||||
elif is_new_file(pathinfo, db_entry):
|
||||
new_version = db_entry.version + 1
|
||||
else:
|
||||
self._log("Not uploading %r" % (relpath_u,))
|
||||
NOT_NEW_FILE.log()
|
||||
self._count('objects_not_uploaded')
|
||||
return False
|
||||
|
||||
@ -989,12 +1172,12 @@ class Uploader(QueueMixin):
|
||||
metadata['last_uploaded_uri'] = db_entry.last_uploaded_uri
|
||||
|
||||
uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
|
||||
d2 = self._upload_dirnode.add_file(
|
||||
d2 = DeferredContext(self._upload_dirnode.add_file(
|
||||
encoded_path_u, uploadable,
|
||||
metadata=metadata,
|
||||
overwrite=True,
|
||||
progress=item.progress,
|
||||
)
|
||||
))
|
||||
|
||||
def _add_db_entry(filenode):
|
||||
filecap = filenode.get_uri()
|
||||
@ -1013,15 +1196,14 @@ class Uploader(QueueMixin):
|
||||
self._count('files_uploaded')
|
||||
return True
|
||||
d2.addCallback(_add_db_entry)
|
||||
return d2
|
||||
return d2.result
|
||||
else:
|
||||
self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
|
||||
SPECIAL_FILE.log()
|
||||
return False
|
||||
|
||||
d.addCallback(_maybe_upload)
|
||||
|
||||
def _succeeded(res):
|
||||
self._log("_succeeded(%r)" % (res,))
|
||||
if res:
|
||||
self._count('objects_succeeded')
|
||||
# TODO: maybe we want the status to be 'ignored' if res is False
|
||||
@ -1029,11 +1211,10 @@ class Uploader(QueueMixin):
|
||||
return res
|
||||
def _failed(f):
|
||||
self._count('objects_failed')
|
||||
self._log("%s while processing %r" % (f, relpath_u))
|
||||
item.set_status('failure', self._clock.seconds())
|
||||
return f
|
||||
d.addCallbacks(_succeeded, _failed)
|
||||
return d
|
||||
return d.addActionFinish()
|
||||
|
||||
def _get_metadata(self, encoded_path_u):
|
||||
try:
|
||||
@ -1158,7 +1339,7 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
def __init__(self, client, local_path_u, db, collective_dirnode,
|
||||
upload_readonly_dircap, clock, is_upload_pending, umask,
|
||||
status_reporter, poll_interval=60):
|
||||
QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
|
||||
QueueMixin.__init__(self, client, local_path_u, db, u'downloader', clock)
|
||||
|
||||
if not IDirectoryNode.providedBy(collective_dirnode):
|
||||
raise AssertionError("'collective_dircap' does not refer to a directory")
|
||||
@ -1182,8 +1363,8 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
try:
|
||||
data = yield self._scan_remote_collective(scan_self=True)
|
||||
twlog.msg("Completed initial Magic Folder scan successfully ({})".format(self))
|
||||
x = yield self._begin_processing(data)
|
||||
defer.returnValue(x)
|
||||
self._begin_processing()
|
||||
defer.returnValue(data)
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
@ -1197,18 +1378,6 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
def nice_current_time(self):
|
||||
return format_time(datetime.fromtimestamp(self._clock.seconds()).timetuple())
|
||||
|
||||
def stop(self):
|
||||
self._log("stop")
|
||||
self._stopped = True
|
||||
|
||||
# Speed up shutdown
|
||||
self._processing.cancel()
|
||||
|
||||
d = defer.succeed(None)
|
||||
# wait for processing loop to actually exit
|
||||
d.addCallback(lambda ign: self._processing)
|
||||
return d
|
||||
|
||||
def _should_download(self, relpath_u, remote_version, remote_uri):
|
||||
"""
|
||||
_should_download returns a bool indicating whether or not a remote object should be downloaded.
|
||||
@ -1270,8 +1439,8 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
return collective_dirmap_d
|
||||
|
||||
def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
|
||||
self._log("_scan_remote_dmd nickname %r" % (nickname,))
|
||||
d = dirnode.list()
|
||||
with SCAN_REMOTE_DMD(nickname=nickname).context():
|
||||
d = DeferredContext(dirnode.list())
|
||||
def scan_listing(listing_map):
|
||||
for encoded_relpath_u in listing_map.keys():
|
||||
relpath_u = magicpath.magic2path(encoded_relpath_u)
|
||||
@ -1285,13 +1454,17 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
# share?
|
||||
remote_version = metadata.get('version', None)
|
||||
remote_uri = file_node.get_readonly_uri()
|
||||
self._log("%r has local dbentry %r, remote version %r, remote uri %r"
|
||||
% (relpath_u, local_dbentry, remote_version, remote_uri))
|
||||
REMOTE_DMD_ENTRY.log(
|
||||
relpath=relpath_u,
|
||||
pathentry=local_dbentry,
|
||||
remote_version=remote_version,
|
||||
remote_uri=remote_uri,
|
||||
)
|
||||
|
||||
if (local_dbentry is None or remote_version is None or
|
||||
local_dbentry.version < remote_version or
|
||||
(local_dbentry.version == remote_version and local_dbentry.last_downloaded_uri != remote_uri)):
|
||||
self._log("%r added to download queue" % (relpath_u,))
|
||||
ADD_TO_DOWNLOAD_QUEUE.log(relpath=relpath_u)
|
||||
if scan_batch.has_key(relpath_u):
|
||||
scan_batch[relpath_u] += [(file_node, metadata)]
|
||||
else:
|
||||
@ -1302,16 +1475,14 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
)
|
||||
|
||||
d.addCallback(scan_listing)
|
||||
d.addBoth(self._logcb, "end of _scan_remote_dmd")
|
||||
return d
|
||||
return d.addActionFinish()
|
||||
|
||||
def _scan_remote_collective(self, scan_self=False):
|
||||
self._log("_scan_remote_collective")
|
||||
scan_batch = {} # path -> [(filenode, metadata)]
|
||||
|
||||
d = self._collective_dirnode.list()
|
||||
with SCAN_REMOTE_COLLECTIVE().context():
|
||||
d = DeferredContext(self._collective_dirnode.list())
|
||||
def scan_collective(dirmap):
|
||||
d2 = defer.succeed(None)
|
||||
d2 = DeferredContext(defer.succeed(None))
|
||||
for dir_name in dirmap:
|
||||
(dirnode, metadata) = dirmap[dir_name]
|
||||
if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
|
||||
@ -1322,7 +1493,7 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
# XXX what should we do to make this failure more visible to users?
|
||||
d2.addErrback(_err)
|
||||
|
||||
return d2
|
||||
return d2.result
|
||||
d.addCallback(scan_collective)
|
||||
|
||||
def _filter_batch_to_deque(ign):
|
||||
@ -1347,33 +1518,30 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
|
||||
self._log("deque after = %r" % (self._deque,))
|
||||
d.addCallback(_filter_batch_to_deque)
|
||||
return d
|
||||
|
||||
return d.addActionFinish()
|
||||
|
||||
def _scan_delay(self):
|
||||
return self._poll_interval
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@eliotutil.inline_callbacks
|
||||
def _perform_scan(self):
|
||||
x = None
|
||||
try:
|
||||
x = yield self._scan_remote_collective()
|
||||
self._status_reporter(
|
||||
True, 'Magic folder is working',
|
||||
'Last scan: %s' % self.nice_current_time(),
|
||||
)
|
||||
except Exception as e:
|
||||
twlog.msg("Remote scan failed: %s" % (e,))
|
||||
self._log("_scan failed: %s" % (repr(e),))
|
||||
self._status_reporter(
|
||||
False, 'Remote scan has failed: %s' % str(e),
|
||||
'Last attempted at %s' % self.nice_current_time(),
|
||||
)
|
||||
defer.returnValue(x)
|
||||
with PERFORM_SCAN():
|
||||
try:
|
||||
yield self._scan_remote_collective()
|
||||
self._status_reporter(
|
||||
True, 'Magic folder is working',
|
||||
'Last scan: %s' % self.nice_current_time(),
|
||||
)
|
||||
except Exception as e:
|
||||
twlog.msg("Remote scan failed: %s" % (e,))
|
||||
self._log("_scan failed: %s" % (repr(e),))
|
||||
self._status_reporter(
|
||||
False, 'Remote scan has failed: %s' % str(e),
|
||||
'Last attempted at %s' % self.nice_current_time(),
|
||||
)
|
||||
|
||||
def _process(self, item):
|
||||
# Downloader
|
||||
self._log("_process(%r)" % (item,))
|
||||
now = self._clock.seconds()
|
||||
|
||||
self._log("started! %s" % (now,))
|
||||
@ -1383,7 +1551,8 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
conflict_path_u = self._get_conflicted_filename(abspath_u)
|
||||
last_uploaded_uri = item.metadata.get('last_uploaded_uri', None)
|
||||
|
||||
d = defer.succeed(False)
|
||||
with PROCESS_ITEM(item=item):
|
||||
d = DeferredContext(defer.succeed(False))
|
||||
|
||||
def do_update_db(written_abspath_u):
|
||||
filecap = item.file_node.get_uri()
|
||||
@ -1499,7 +1668,7 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
)
|
||||
)
|
||||
|
||||
d.addCallbacks(do_update_db)
|
||||
d.addCallback(do_update_db)
|
||||
d.addErrback(failed)
|
||||
|
||||
def trap_conflicts(f):
|
||||
@ -1507,4 +1676,4 @@ class Downloader(QueueMixin, WriteFileMixin):
|
||||
self._log("IGNORE CONFLICT ERROR %r" % f)
|
||||
return False
|
||||
d.addErrback(trap_conflicts)
|
||||
return d
|
||||
return d.addActionFinish()
|
||||
|
@ -3,6 +3,52 @@ import sys
|
||||
from collections import namedtuple
|
||||
|
||||
from allmydata.util.dbutil import get_db, DBError
|
||||
from allmydata.util.eliotutil import (
|
||||
RELPATH,
|
||||
VERSION,
|
||||
LAST_UPLOADED_URI,
|
||||
LAST_DOWNLOADED_URI,
|
||||
LAST_DOWNLOADED_TIMESTAMP,
|
||||
PATHINFO,
|
||||
validateSetMembership,
|
||||
validateInstanceOf,
|
||||
)
|
||||
from eliot import (
|
||||
Field,
|
||||
ActionType,
|
||||
)
|
||||
|
||||
PathEntry = namedtuple('PathEntry', 'size mtime_ns ctime_ns version last_uploaded_uri '
|
||||
'last_downloaded_uri last_downloaded_timestamp')
|
||||
|
||||
PATHENTRY = Field(
|
||||
u"pathentry",
|
||||
lambda v: None if v is None else {
|
||||
"size": v.size,
|
||||
"mtime_ns": v.mtime_ns,
|
||||
"ctime_ns": v.ctime_ns,
|
||||
"version": v.version,
|
||||
"last_uploaded_uri": v.last_uploaded_uri,
|
||||
"last_downloaded_uri": v.last_downloaded_uri,
|
||||
"last_downloaded_timestamp": v.last_downloaded_timestamp,
|
||||
},
|
||||
u"The local database state of a file.",
|
||||
validateInstanceOf((type(None), PathEntry)),
|
||||
)
|
||||
|
||||
_INSERT_OR_UPDATE = Field.for_types(
|
||||
u"insert_or_update",
|
||||
[unicode],
|
||||
u"An indication of whether the record for this upload was new or an update to a previous entry.",
|
||||
validateSetMembership({u"insert", u"update"}),
|
||||
)
|
||||
|
||||
DID_UPLOAD_VERSION = ActionType(
|
||||
u"magic-folder-db:did-upload-version",
|
||||
[RELPATH, VERSION, LAST_UPLOADED_URI, LAST_DOWNLOADED_URI, LAST_DOWNLOADED_TIMESTAMP, PATHINFO],
|
||||
[_INSERT_OR_UPDATE],
|
||||
u"An file upload is being recorded in the database.",
|
||||
)
|
||||
|
||||
|
||||
# magic-folder db schema version 1
|
||||
@ -42,9 +88,6 @@ def get_magicfolderdb(dbfile, stderr=sys.stderr,
|
||||
print >>stderr, e
|
||||
return None
|
||||
|
||||
PathEntry = namedtuple('PathEntry', 'size mtime_ns ctime_ns version last_uploaded_uri '
|
||||
'last_downloaded_uri last_downloaded_timestamp')
|
||||
|
||||
class MagicFolderDB(object):
|
||||
VERSION = 1
|
||||
|
||||
@ -88,17 +131,28 @@ class MagicFolderDB(object):
|
||||
return set([r[0] for r in rows])
|
||||
|
||||
def did_upload_version(self, relpath_u, version, last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp, pathinfo):
|
||||
try:
|
||||
self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?,?,?)",
|
||||
(relpath_u, pathinfo.size, pathinfo.mtime_ns, pathinfo.ctime_ns,
|
||||
version, last_uploaded_uri, last_downloaded_uri,
|
||||
last_downloaded_timestamp))
|
||||
except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
|
||||
self.cursor.execute("UPDATE local_files"
|
||||
" SET size=?, mtime_ns=?, ctime_ns=?, version=?, last_uploaded_uri=?,"
|
||||
" last_downloaded_uri=?, last_downloaded_timestamp=?"
|
||||
" WHERE path=?",
|
||||
(pathinfo.size, pathinfo.mtime_ns, pathinfo.ctime_ns, version,
|
||||
last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp,
|
||||
relpath_u))
|
||||
self.connection.commit()
|
||||
action = DID_UPLOAD_VERSION(
|
||||
relpath=relpath_u,
|
||||
version=version,
|
||||
last_uploaded_uri=last_uploaded_uri,
|
||||
last_downloaded_uri=last_downloaded_uri,
|
||||
last_downloaded_timestamp=last_downloaded_timestamp,
|
||||
pathinfo=pathinfo,
|
||||
)
|
||||
with action:
|
||||
try:
|
||||
self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?,?,?)",
|
||||
(relpath_u, pathinfo.size, pathinfo.mtime_ns, pathinfo.ctime_ns,
|
||||
version, last_uploaded_uri, last_downloaded_uri,
|
||||
last_downloaded_timestamp))
|
||||
action.add_success_fields(insert_or_update=u"insert")
|
||||
except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
|
||||
self.cursor.execute("UPDATE local_files"
|
||||
" SET size=?, mtime_ns=?, ctime_ns=?, version=?, last_uploaded_uri=?,"
|
||||
" last_downloaded_uri=?, last_downloaded_timestamp=?"
|
||||
" WHERE path=?",
|
||||
(pathinfo.size, pathinfo.mtime_ns, pathinfo.ctime_ns, version,
|
||||
last_uploaded_uri, last_downloaded_uri, last_downloaded_timestamp,
|
||||
relpath_u))
|
||||
action.add_success_fields(insert_or_update=u"update")
|
||||
self.connection.commit()
|
||||
|
@ -149,10 +149,7 @@ class MagicFolderCLITestMixin(CLITestMixin, GridTestMixin, NonASCIIPathMixin):
|
||||
def cleanup(self, res):
|
||||
d = defer.succeed(None)
|
||||
def _clean(ign):
|
||||
d = self.magicfolder.finish()
|
||||
self.magicfolder.uploader._clock.advance(self.magicfolder.uploader._pending_delay + 1)
|
||||
self.magicfolder.downloader._clock.advance(self.magicfolder.downloader._poll_interval + 1)
|
||||
return d
|
||||
return self.magicfolder.disownServiceParent()
|
||||
|
||||
d.addCallback(_clean)
|
||||
d.addCallback(lambda ign: res)
|
||||
|
@ -7,6 +7,8 @@ from os.path import join, exists, isdir
|
||||
from twisted.trial import unittest
|
||||
from twisted.internet import defer, task, reactor
|
||||
|
||||
from eliot.twisted import DeferredContext
|
||||
|
||||
from allmydata.interfaces import IDirectoryNode
|
||||
from allmydata.util.assertutil import precondition
|
||||
|
||||
@ -28,10 +30,32 @@ from allmydata.util.fileutil import get_pathinfo
|
||||
from allmydata.util.fileutil import abspath_expanduser_unicode
|
||||
from allmydata.immutable.upload import Data
|
||||
|
||||
from .eliotutil import (
|
||||
eliot_logged_test,
|
||||
)
|
||||
from ..util.eliotutil import (
|
||||
inline_callbacks,
|
||||
)
|
||||
|
||||
_debug = False
|
||||
|
||||
|
||||
class NewConfigUtilTests(unittest.TestCase):
|
||||
class TestCase(unittest.TestCase):
|
||||
"""
|
||||
A ``TestCase`` which collects helpful behaviors for subclasses.
|
||||
|
||||
Those behaviors are:
|
||||
|
||||
* Each test method will be run in a unique Eliot action context which
|
||||
identifies the test and collects all Eliot log messages emitted by that
|
||||
test (including setUp and tearDown messages).
|
||||
"""
|
||||
@eliot_logged_test
|
||||
def run(self, result):
|
||||
return super(TestCase, self).run(result)
|
||||
|
||||
|
||||
class NewConfigUtilTests(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# some tests look at the umask of created directories or files
|
||||
@ -147,7 +171,6 @@ class NewConfigUtilTests(unittest.TestCase):
|
||||
str(ctx.exception),
|
||||
)
|
||||
|
||||
|
||||
def test_both_styles_of_config(self):
|
||||
os.unlink(join(self.basedir, u"private", u"magic_folders.yaml"))
|
||||
with self.assertRaises(Exception) as ctx:
|
||||
@ -240,7 +263,7 @@ class NewConfigUtilTests(unittest.TestCase):
|
||||
)
|
||||
|
||||
|
||||
class LegacyConfigUtilTests(unittest.TestCase):
|
||||
class LegacyConfigUtilTests(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# create a valid 'old style' magic-folder configuration
|
||||
@ -383,7 +406,7 @@ class LegacyConfigUtilTests(unittest.TestCase):
|
||||
|
||||
|
||||
|
||||
class MagicFolderDbTests(unittest.TestCase):
|
||||
class MagicFolderDbTests(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.temp = abspath_expanduser_unicode(unicode(self.mktemp()))
|
||||
@ -465,19 +488,13 @@ class MagicFolderDbTests(unittest.TestCase):
|
||||
|
||||
|
||||
def iterate_downloader(magic):
|
||||
# can do either of these:
|
||||
#d = magic.downloader._process_deque()
|
||||
d = magic.downloader.set_hook('iteration')
|
||||
magic.downloader._clock.advance(magic.downloader._poll_interval + 1)
|
||||
return d
|
||||
return magic.downloader._processing_iteration()
|
||||
|
||||
|
||||
def iterate_uploader(magic):
|
||||
d = magic.uploader.set_hook('iteration')
|
||||
magic.uploader._clock.advance(magic.uploader._pending_delay + 1)
|
||||
return d
|
||||
return magic.uploader._processing_iteration()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def iterate(magic):
|
||||
yield iterate_uploader(magic)
|
||||
yield iterate_downloader(magic)
|
||||
@ -650,7 +667,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
self.mkdir_nonascii(self.bob_magic_dir)
|
||||
|
||||
# Alice creates a Magic Folder, invites herself and joins.
|
||||
d = self.do_create_magic_folder(0)
|
||||
d = DeferredContext(self.do_create_magic_folder(0))
|
||||
d.addCallback(lambda ign: self.do_invite(0, self.alice_nickname))
|
||||
def get_invite_code(result):
|
||||
self.invite_code = result[1].strip()
|
||||
@ -697,22 +714,17 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
d0.addCallback(lambda ign: result)
|
||||
return d0
|
||||
d.addCallback(get_Bob_magicfolder)
|
||||
return d
|
||||
return d.result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def tearDown(self):
|
||||
yield GridTestMixin.tearDown(self)
|
||||
d0 = self.alice_magicfolder.finish()
|
||||
d1 = self.bob_magicfolder.finish()
|
||||
|
||||
for mf in [self.alice_magicfolder, self.bob_magicfolder]:
|
||||
mf.uploader._clock.advance(mf.uploader._pending_delay + 1)
|
||||
mf.downloader._clock.advance(mf.downloader._poll_interval + 1)
|
||||
|
||||
yield d0
|
||||
yield d1
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_alice_delete_bob_restore(self):
|
||||
alice_fname = os.path.join(self.alice_magic_dir, 'blam')
|
||||
bob_fname = os.path.join(self.bob_magic_dir, 'blam')
|
||||
@ -788,7 +800,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 2)
|
||||
yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 2)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_alice_sees_bobs_delete_with_error(self):
|
||||
# alice creates a file, bob deletes it -- and we also arrange
|
||||
# for Alice's file to have "gone missing" as well.
|
||||
@ -846,7 +858,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 1)
|
||||
yield self._check_version_in_local_db(self.alice_magicfolder, u"blam", 1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_alice_create_bob_update(self):
|
||||
alice_fname = os.path.join(self.alice_magic_dir, 'blam')
|
||||
bob_fname = os.path.join(self.bob_magic_dir, 'blam')
|
||||
@ -886,7 +898,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
yield self._check_version_in_dmd(self.alice_magicfolder, u"blam", 1)
|
||||
self._check_version_in_local_db(self.alice_magicfolder, u"blam", 1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_download_retry(self):
|
||||
alice_fname = os.path.join(self.alice_magic_dir, 'blam')
|
||||
# bob_fname = os.path.join(self.bob_magic_dir, 'blam')
|
||||
@ -938,7 +950,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
)
|
||||
yield self._check_version_in_dmd(self.bob_magicfolder, u"blam", 0)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_conflict_local_change_fresh(self):
|
||||
alice_fname = os.path.join(self.alice_magic_dir, 'localchange0')
|
||||
bob_fname = os.path.join(self.bob_magic_dir, 'localchange0')
|
||||
@ -964,7 +976,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
# ...so now bob should produce a conflict
|
||||
self.assertTrue(os.path.exists(bob_fname + '.conflict'))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_conflict_local_change_existing(self):
|
||||
alice_fname = os.path.join(self.alice_magic_dir, 'localchange1')
|
||||
bob_fname = os.path.join(self.bob_magic_dir, 'localchange1')
|
||||
@ -1002,7 +1014,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
# ...so now bob should produce a conflict
|
||||
self.assertTrue(os.path.exists(bob_fname + '.conflict'))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_alice_delete_and_restore(self):
|
||||
alice_fname = os.path.join(self.alice_magic_dir, 'blam')
|
||||
bob_fname = os.path.join(self.bob_magic_dir, 'blam')
|
||||
@ -1124,7 +1136,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
d.addCallback(advance)
|
||||
return d
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def Alice_to_write_a_file():
|
||||
if _debug: print "Alice writes a file\n\n\n\n\n"
|
||||
self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u)
|
||||
@ -1149,7 +1161,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
# d.addCallback(lambda ign: self._check_uploader_count('objects_not_uploaded', 0, magic=self.bob_magicfolder))
|
||||
d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 1, magic=self.bob_magicfolder))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def Alice_to_delete_file():
|
||||
if _debug: print "Alice deletes the file!\n\n\n\n"
|
||||
yield self.alice_fileops.delete(self.file_path)
|
||||
@ -1157,7 +1169,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
yield iterate(self.bob_magicfolder)
|
||||
d.addCallback(_wait_for, Alice_to_delete_file)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def notify_bob_moved(ign):
|
||||
# WARNING: this is just directly notifying for the mock
|
||||
# tests, because in the Real* tests the .backup file will
|
||||
@ -1182,7 +1194,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 2))
|
||||
d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 2, magic=self.bob_magicfolder))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def Alice_to_rewrite_file():
|
||||
if _debug: print "Alice rewrites file\n"
|
||||
self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u)
|
||||
@ -1237,7 +1249,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0))
|
||||
# d.addCallback(lambda ign: self._check_uploader_count('objects_not_uploaded', 2, magic=self.bob_magicfolder))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def Bob_to_rewrite_file():
|
||||
if _debug: print "Bob rewrites file\n"
|
||||
self.file_path = abspath_expanduser_unicode(u"file1", base=self.bob_magicfolder.uploader._local_path_u)
|
||||
@ -1287,7 +1299,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
d.addCallback(lambda ign: self._check_uploader_count('objects_succeeded', 1, magic=self.bob_magicfolder))
|
||||
|
||||
# prepare to perform another conflict test
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def Alice_to_write_file2():
|
||||
if _debug: print "Alice writes a file2\n"
|
||||
self.file_path = abspath_expanduser_unicode(u"file2", base=self.alice_magicfolder.uploader._local_path_u)
|
||||
@ -1315,13 +1327,16 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
d.addCallback(advance)
|
||||
d.addCallback(lambda ign: self._check_version_in_local_db(self.bob_magicfolder, u"file2", 0))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def Bob_to_rewrite_file2():
|
||||
if _debug: print "Bob rewrites file2\n"
|
||||
self.file_path = abspath_expanduser_unicode(u"file2", base=self.bob_magicfolder.uploader._local_path_u)
|
||||
if _debug: print "---- bob's file is %r" % (self.file_path,)
|
||||
yield self.bob_fileops.write(self.file_path, "roger roger. what vector?")
|
||||
yield iterate(self.bob_magicfolder)
|
||||
yield self.bob_fileops.write(self.file_path, "roger roger. what vector?")
|
||||
if _debug: print "---- bob rewrote file2"
|
||||
yield iterate(self.bob_magicfolder)
|
||||
if _debug: print "---- iterated bob's magicfolder"
|
||||
d.addCallback(lambda ign: _wait_for(None, Bob_to_rewrite_file2, alice=False))
|
||||
d.addCallback(lambda ign: self._check_version_in_dmd(self.bob_magicfolder, u"file2", 1))
|
||||
d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 5, magic=self.bob_magicfolder))
|
||||
@ -1393,7 +1408,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
d.addCallback(lambda ign: self._check_downloader_count('objects_downloaded', 6))
|
||||
|
||||
# prepare to perform another conflict test
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def Alice_to_write_file3():
|
||||
if _debug: print "Alice writes a file\n"
|
||||
self.file_path = abspath_expanduser_unicode(u"file3", base=self.alice_magicfolder.uploader._local_path_u)
|
||||
@ -1407,7 +1422,7 @@ class MagicFolderAliceBobTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Rea
|
||||
d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 1))
|
||||
d.addCallback(lambda ign: self._check_downloader_count('objects_conflicted', 0, magic=self.alice_magicfolder))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def Bob_to_rewrite_file3():
|
||||
if _debug: print "Bob rewrites file3\n"
|
||||
self.file_path = abspath_expanduser_unicode(u"file3", base=self.bob_magicfolder.uploader._local_path_u)
|
||||
@ -1510,7 +1525,7 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
fileutil.make_dirs(self.basedir)
|
||||
self._createdb()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_scan_once_on_startup(self):
|
||||
# What is this test? Maybe it is just a stub and needs finishing.
|
||||
self.magicfolder.uploader._clock.advance(99)
|
||||
@ -1587,9 +1602,9 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.basedir)
|
||||
new_small_tree_dir = abspath_expanduser_unicode(small_tree_name, base=self.local_dir)
|
||||
|
||||
d = defer.succeed(None)
|
||||
d = DeferredContext(defer.succeed(None))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def _check_move_empty_tree(res):
|
||||
self.mkdir_nonascii(empty_tree_dir)
|
||||
yield self.fileops.move(empty_tree_dir, new_empty_tree_dir)
|
||||
@ -1602,7 +1617,7 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 1))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def _check_move_small_tree(res):
|
||||
self.mkdir_nonascii(small_tree_dir)
|
||||
what_path = abspath_expanduser_unicode(u"what", base=small_tree_dir)
|
||||
@ -1626,7 +1641,7 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def _check_moved_tree_is_watched(res):
|
||||
another_path = abspath_expanduser_unicode(u"another", base=new_small_tree_dir)
|
||||
yield self.fileops.write(another_path, "file")
|
||||
@ -1640,7 +1655,7 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 2))
|
||||
|
||||
return d
|
||||
return d.result
|
||||
test_move_tree.todo = "fails on certain linux flavors: see ticket #2834"
|
||||
|
||||
def test_persistence(self):
|
||||
@ -1652,9 +1667,9 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
"""
|
||||
self.collective_dircap = "" # XXX hmmm?
|
||||
|
||||
d = defer.succeed(None)
|
||||
d = DeferredContext(defer.succeed(None))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def create_test_file(filename):
|
||||
test_file = abspath_expanduser_unicode(filename, base=self.local_dir)
|
||||
yield self.fileops.write(test_file, "meow %s" % filename)
|
||||
@ -1675,13 +1690,13 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_succeeded'), 1))
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
|
||||
return d
|
||||
return d.result
|
||||
|
||||
# all this "self.*" state via 9000 mix-ins is really really
|
||||
# hard to read, keep track of, etc. Very hard to understand
|
||||
# what each test uses for setup, etc. :(
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_delete(self):
|
||||
# setup: create a file 'foo'
|
||||
path = os.path.join(self.local_dir, u'foo')
|
||||
@ -1704,7 +1719,7 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
self.assertTrue(node is not None, "Failed to find %r in DMD" % (path,))
|
||||
self.failUnlessEqual(metadata['version'], 1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_batched_process(self):
|
||||
"""
|
||||
status APIs correctly function when there are 2 items queued at
|
||||
@ -1736,7 +1751,7 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
# should see the same status from the outside
|
||||
self.assertEqual(upstatus0, upstatus1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_real_notify_failure(self):
|
||||
"""
|
||||
Simulate an exception from the _real_notify helper in
|
||||
@ -1775,7 +1790,7 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
# notification for some reason..
|
||||
self.assertTrue(len(errors) >= 1)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@inline_callbacks
|
||||
def test_delete_and_restore(self):
|
||||
# setup: create a file
|
||||
path = os.path.join(self.local_dir, u'foo')
|
||||
@ -1805,7 +1820,7 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
self.failUnlessEqual(metadata['version'], 2)
|
||||
|
||||
def test_magic_folder(self):
|
||||
d = defer.succeed(None)
|
||||
d = DeferredContext(defer.succeed(None))
|
||||
# Write something short enough for a LIT file.
|
||||
d.addCallback(lambda ign: self._check_file(u"short", "test"))
|
||||
|
||||
@ -1825,10 +1840,10 @@ class SingleMagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, Reall
|
||||
# TODO: test that causes an upload failure.
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_failed'), 0))
|
||||
|
||||
return d
|
||||
return d.result
|
||||
|
||||
|
||||
class MockTestAliceBob(MagicFolderAliceBobTestMixin, unittest.TestCase):
|
||||
class MockTestAliceBob(MagicFolderAliceBobTestMixin, TestCase):
|
||||
inject_inotify = True
|
||||
|
||||
def setUp(self):
|
||||
@ -1838,7 +1853,7 @@ class MockTestAliceBob(MagicFolderAliceBobTestMixin, unittest.TestCase):
|
||||
return d
|
||||
|
||||
|
||||
class MockTest(SingleMagicFolderTestMixin, unittest.TestCase):
|
||||
class MockTest(SingleMagicFolderTestMixin, TestCase):
|
||||
"""This can run on any platform, and even if twisted.internet.inotify can't be imported."""
|
||||
inject_inotify = True
|
||||
|
||||
@ -1859,7 +1874,7 @@ class MockTest(SingleMagicFolderTestMixin, unittest.TestCase):
|
||||
doesnotexist = abspath_expanduser_unicode(u"doesnotexist", base=self.basedir)
|
||||
|
||||
client = self.g.clients[0]
|
||||
d = client.create_dirnode()
|
||||
d = DeferredContext(client.create_dirnode())
|
||||
def _check_errors(n):
|
||||
self.failUnless(IDirectoryNode.providedBy(n))
|
||||
upload_dircap = n.get_uri()
|
||||
@ -1884,7 +1899,7 @@ class MockTest(SingleMagicFolderTestMixin, unittest.TestCase):
|
||||
self.shouldFail(NotImplementedError, 'unsupported', 'blah',
|
||||
MagicFolder, client, upload_dircap, '', errors_dir, magicfolderdb, 0077, 'default')
|
||||
d.addCallback(_check_errors)
|
||||
return d
|
||||
return d.result
|
||||
|
||||
def test_write_downloaded_file(self):
|
||||
workdir = fileutil.abspath_expanduser_unicode(u"cli/MagicFolder/write-downloaded-file")
|
||||
@ -1939,7 +1954,7 @@ class MockTest(SingleMagicFolderTestMixin, unittest.TestCase):
|
||||
sub_dir = abspath_expanduser_unicode(u"subdir", base=self.local_dir)
|
||||
self.mkdir_nonascii(sub_dir)
|
||||
|
||||
d = defer.succeed(None)
|
||||
d = DeferredContext(defer.succeed(None))
|
||||
|
||||
def _create_file_without_event(res):
|
||||
processed_d = self.magicfolder.uploader.set_hook('processed')
|
||||
@ -1957,10 +1972,10 @@ class MockTest(SingleMagicFolderTestMixin, unittest.TestCase):
|
||||
return processed_d
|
||||
d.addCallback(_advance_clock)
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 1))
|
||||
return d
|
||||
return d.result
|
||||
|
||||
def test_statistics(self):
|
||||
d = defer.succeed(None)
|
||||
d = DeferredContext(defer.succeed(None))
|
||||
# Write something short enough for a LIT file.
|
||||
d.addCallback(lambda ign: self._check_file(u"short", "test"))
|
||||
|
||||
@ -1985,10 +2000,10 @@ class MockTest(SingleMagicFolderTestMixin, unittest.TestCase):
|
||||
self.failUnlessEqual(data["counters"]["magic_folder.uploader.files_uploaded"], 1)
|
||||
self.failUnlessEqual(data["counters"]["magic_folder.uploader.objects_queued"], 0)
|
||||
d.addCallback(_got_stats_json)
|
||||
return d
|
||||
return d.result
|
||||
|
||||
|
||||
class RealTest(SingleMagicFolderTestMixin, unittest.TestCase):
|
||||
class RealTest(SingleMagicFolderTestMixin, TestCase):
|
||||
"""This is skipped unless both Twisted and the platform support inotify."""
|
||||
inject_inotify = False
|
||||
|
||||
@ -1998,7 +2013,7 @@ class RealTest(SingleMagicFolderTestMixin, unittest.TestCase):
|
||||
return d
|
||||
|
||||
|
||||
class RealTestAliceBob(MagicFolderAliceBobTestMixin, unittest.TestCase):
|
||||
class RealTestAliceBob(MagicFolderAliceBobTestMixin, TestCase):
|
||||
"""This is skipped unless both Twisted and the platform support inotify."""
|
||||
inject_inotify = False
|
||||
|
||||
|
@ -16,6 +16,14 @@ __all__ = [
|
||||
"eliot_logging_service",
|
||||
"opt_eliot_destination",
|
||||
"opt_help_eliot_destinations",
|
||||
"validateInstanceOf",
|
||||
"validateSetMembership",
|
||||
"RELPATH",
|
||||
"VERSION",
|
||||
"LAST_UPLOADED_URI",
|
||||
"LAST_DOWNLOADED_URI",
|
||||
"LAST_DOWNLOADED_TIMESTAMP",
|
||||
"PATHINFO",
|
||||
]
|
||||
|
||||
from sys import (
|
||||
@ -50,6 +58,12 @@ from eliot import (
|
||||
remove_destination,
|
||||
write_traceback,
|
||||
)
|
||||
from eliot import (
|
||||
Field,
|
||||
)
|
||||
from eliot._validation import (
|
||||
ValidationError,
|
||||
)
|
||||
|
||||
from twisted.python.filepath import (
|
||||
FilePath,
|
||||
@ -68,6 +82,10 @@ from twisted.internet.defer import (
|
||||
from twisted.application.service import Service
|
||||
|
||||
|
||||
from .fileutil import (
|
||||
PathInfo,
|
||||
)
|
||||
|
||||
class _GeneratorContext(object):
|
||||
def __init__(self, execution_context):
|
||||
self._execution_context = execution_context
|
||||
@ -185,6 +203,68 @@ def inline_callbacks(original):
|
||||
eliot_friendly_generator_function(original)
|
||||
)
|
||||
|
||||
def validateInstanceOf(t):
|
||||
"""
|
||||
Return an Eliot validator that requires values to be instances of ``t``.
|
||||
"""
|
||||
def validator(v):
|
||||
if not isinstance(v, t):
|
||||
raise ValidationError("{} not an instance of {}".format(v, t))
|
||||
return validator
|
||||
|
||||
def validateSetMembership(s):
|
||||
"""
|
||||
Return an Eliot validator that requires values to be elements of ``s``.
|
||||
"""
|
||||
def validator(v):
|
||||
if v not in s:
|
||||
raise ValidationError("{} not in {}".format(v, s))
|
||||
return validator
|
||||
|
||||
RELPATH = Field.for_types(
|
||||
u"relpath",
|
||||
[unicode],
|
||||
u"The relative path of a file in a magic-folder.",
|
||||
)
|
||||
|
||||
VERSION = Field.for_types(
|
||||
u"version",
|
||||
[int, long],
|
||||
u"The version of the file.",
|
||||
)
|
||||
|
||||
LAST_UPLOADED_URI = Field.for_types(
|
||||
u"last_uploaded_uri",
|
||||
[unicode, bytes, None],
|
||||
u"The filecap to which this version of this file was uploaded.",
|
||||
)
|
||||
|
||||
LAST_DOWNLOADED_URI = Field.for_types(
|
||||
u"last_downloaded_uri",
|
||||
[unicode, bytes, None],
|
||||
u"The filecap from which the previous version of this file was downloaded.",
|
||||
)
|
||||
|
||||
LAST_DOWNLOADED_TIMESTAMP = Field.for_types(
|
||||
u"last_downloaded_timestamp",
|
||||
[float, int, long],
|
||||
u"(XXX probably not really, don't trust this) The timestamp of the last download of this file.",
|
||||
)
|
||||
|
||||
PATHINFO = Field(
|
||||
u"pathinfo",
|
||||
lambda v: None if v is None else {
|
||||
"isdir": v.isdir,
|
||||
"isfile": v.isfile,
|
||||
"islink": v.islink,
|
||||
"exists": v.exists,
|
||||
"size": v.size,
|
||||
"mtime_ns": v.mtime_ns,
|
||||
"ctime_ns": v.ctime_ns,
|
||||
},
|
||||
u"The metadata for this version of this file.",
|
||||
validateInstanceOf((type(None), PathInfo)),
|
||||
)
|
||||
|
||||
def eliot_logging_service(reactor, destinations):
|
||||
"""
|
||||
|
@ -1,7 +1,9 @@
|
||||
import collections, itertools
|
||||
import collections, itertools, functools
|
||||
|
||||
objnums = collections.defaultdict(itertools.count)
|
||||
|
||||
|
||||
@functools.total_ordering
|
||||
class NummedObj(object):
|
||||
"""
|
||||
This is useful for nicer debug printouts. Instead of objects of the same class being
|
||||
@ -27,22 +29,14 @@ class NummedObj(object):
|
||||
return "<%s #%d>" % (self._classname, self._objid,)
|
||||
|
||||
def __lt__(self, other):
|
||||
return (self._objid, self._classname,) < (other._objid, other._classname,)
|
||||
|
||||
def __le__(self, other):
|
||||
return (self._objid, self._classname,) <= (other._objid, other._classname,)
|
||||
if isinstance(other, NummedObj):
|
||||
return (self._objid, self._classname,) < (other._objid, other._classname,)
|
||||
return NotImplemented
|
||||
|
||||
def __eq__(self, other):
|
||||
return (self._objid, self._classname,) == (other._objid, other._classname,)
|
||||
|
||||
def __ne__(self, other):
|
||||
return (self._objid, self._classname,) != (other._objid, other._classname,)
|
||||
|
||||
def __gt__(self, other):
|
||||
return (self._objid, self._classname,) > (other._objid, other._classname,)
|
||||
|
||||
def __ge__(self, other):
|
||||
return (self._objid, self._classname,) >= (other._objid, other._classname,)
|
||||
if isinstance(other, NummedObj):
|
||||
return (self._objid, self._classname,) == (other._objid, other._classname,)
|
||||
return NotImplemented
|
||||
|
||||
def __hash__(self):
|
||||
return id(self)
|
||||
|
Loading…
x
Reference in New Issue
Block a user