diff --git a/src/allmydata/frontends/magic_folder.py b/src/allmydata/frontends/magic_folder.py index 7532f2b19..68107f3f7 100644 --- a/src/allmydata/frontends/magic_folder.py +++ b/src/allmydata/frontends/magic_folder.py @@ -46,55 +46,93 @@ class MagicFolder(service.MultiService): precondition_abspath(local_dir) service.MultiService.__init__(self) - self._stopped = False - self._remote_scan_delay = 3 # XXX - self._local_dir = local_dir - self._upload_lazy_tail = defer.succeed(None) - self._upload_pending = set() - self._download_scan_batch = {} # path -> [(filenode, metadata)] - self._download_lazy_tail = defer.succeed(None) - self._download_pending = set() - self._collective_dirnode = None - self._client = client - self._stats_provider = client.stats_provider - self._convergence = client.convergence - self._local_path = to_filepath(self._local_dir) - self._dbfile = dbfile + local_path = to_filepath(local_dir) + + db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3)) + if db is None: + return Failure(Exception('ERROR: Unable to load magic folder db.')) - self._download_deque = deque() - self._upload_deque = deque() self.is_ready = False - self._inotify = inotify or get_inotify_module() - - if not self._local_path.exists(): + if not local_path.exists(): raise AssertionError("The '[magic_folder] local.directory' parameter was %s " "but there is no directory at that location." % quote_local_unicode_path(local_dir)) - if not self._local_path.isdir(): + if not local_path.isdir(): raise AssertionError("The '[magic_folder] local.directory' parameter was %s " "but the thing at that location is not a directory." % quote_local_unicode_path(local_dir)) + self.uploader = Uploader(client, local_path, db, upload_dircap, inotify, pending_delay) + self.downloader = Downloader(client, local_path, db, collective_dircap) + + def startService(self): + service.MultiService.startService(self) + return self.uploader.start_monitoring() + + def ready(self): + """ready is used to signal us to start + processing the upload and download items... + """ + self.is_ready = True + self.uploader.start_scanning() + self.downloader.start_scanning() + + def finish(self): + d = self.uploader.stop() + d.addBoth(lambda ign: self.downloader.stop()) + return d + + def remove_service(self): + return service.MultiService.disownServiceParent(self) + + +class QueueMixin(object): + def __init__(self, client, counter, local_path, db): + self._client = client + self._counter = client.stats_provider.count + self._local_path = local_path + self._db = db + + self._deque = deque() + self._lazy_tail = defer.succeed(None) + self._pending = set() + self._processed_callback = lambda ign: None + self._ignore_count = 0 + + def _do_callback(self, res): + if self._ignore_count == 0: + self._callback(res) + else: + self._ignore_count -= 1 + return None # intentionally suppress failures, which have already been logged + + def set_callback(self, callback, ignore_count=0): + """ + set_callback sets a function that will be called after a filesystem change + (either local or remote) has been processed, successfully or unsuccessfully. + """ + self._callback = callback + self._ignore_count = ignore_count + + def _log(self, msg): + self._client.log("Magic Folder: " + msg) + #print "_log %s" % (msg,) + #open("events", "ab+").write(msg) + + +class Uploader(QueueMixin): + def __init__(self, client, local_path, db, upload_dircap, inotify, pending_delay): + QueueMixin.__init__(self, client, local_path, db) + # TODO: allow a path rather than a cap URI. self._upload_dirnode = self._client.create_node_from_uri(upload_dircap) - self._collective_dirnode = self._client.create_node_from_uri(collective_dircap) - if not IDirectoryNode.providedBy(self._upload_dirnode): raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.") if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly(): raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.") - if not IDirectoryNode.providedBy(self._collective_dirnode): - raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.") - if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly(): - raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.") - - self._processed_callback = lambda ign: None - self._download_callback = lambda ign: None - self._upload_ignore_count = 0 - self._download_ignore_count = 0 - + self._inotify = inotify or get_inotify_module() self._notifier = self._inotify.INotify() if hasattr(self._notifier, 'set_pending_delay'): @@ -115,6 +153,229 @@ class MagicFolder(service.MultiService): self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify], recursive=True) + def start_monitoring(self): + d = self._notifier.startReading() + self._counter('magic_folder.dirs_monitored', 1) + return d + + def stop(self): + self._notifier.stopReading() + self._counter('magic_folder.dirs_monitored', -1) + + if hasattr(self._notifier, 'wait_until_stopped'): + d = self._notifier.wait_until_stopped() + else: + d = defer.succeed(None) + return d + + def start_scanning(self): + self._scan(self._local_dir) + self._turn_deque() + + def _scan(self, localpath): + if not os.path.isdir(localpath): + raise AssertionError("Programmer error: _scan() must be passed a directory path.") + quoted_path = quote_local_unicode_path(localpath) + try: + children = listdir_unicode(localpath) + except EnvironmentError: + raise(Exception("WARNING: magic folder: permission denied on directory %s" % (quoted_path,))) + except FilenameEncodingError: + raise(Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % (quoted_path,))) + + d = defer.succeed(None) + for child in children: + assert isinstance(child, unicode), child + childpath = os.path.join(localpath, child) + + def _process_child(ign, childpath=childpath): + # note: symlinks to directories are both islink() and isdir() + isdir = os.path.isdir(childpath) + isfile = os.path.isfile(childpath) + islink = os.path.islink(childpath) + + if islink: + self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(childpath)) + return None + elif isdir: + # process directories unconditionally + self._append_to_deque(childpath) + + # recurse on the child directory + return self._scan(childpath) + elif isfile: + file_version = self._db.get_local_file_version(childpath) + if file_version is None: + # XXX upload if we didn't record our version in magicfolder db? + self._append_to_deque(childpath) + return None + else: + d2 = self._get_collective_latest_file(childpath) + def _got_latest_file((file_node, metadata)): + collective_version = metadata['version'] + if collective_version is None: + return None + if file_version > collective_version: + self._append_to_upload_deque(childpath) + elif file_version < collective_version: # FIXME Daira thinks this is wrong + # if a collective version of the file is newer than ours + # we must download it and unlink the old file from our upload dirnode + self._append_to_download_deque(childpath) + # XXX where should we save the returned deferred? + return self._upload_dirnode.delete(childpath, must_be_file=True) + else: + # XXX same version. do nothing. + pass + d2.addCallback(_got_latest_file) + return d2 + else: + self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(childpath)) + return None + d.addCallback(_process_child) + d.addErrback(log.err) + + return d + + # FIXME move to QueueMixin + def _append_to_deque(self, path): + if path in self._pending: + return + self._deque.append(path) + self._pending.add(path) + self._counter('magic_folder.objects_queued', 1) + if self.is_ready: + reactor.callLater(0, self._turn_deque) + + # FIXME move to QueueMixin + def _turn_deque(self): + try: + path = self._deque.pop() + except IndexError: + self._log("magic folder upload deque is now empty") + self._lazy_tail = defer.succeed(None) + return + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path)) + self._lazy_tail.addCallback(lambda ign: self._turn_deque()) + + def _notify(self, opaque, path, events_mask): + self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) + path_u = unicode_from_filepath(path) + self._append_to_deque(path_u) + + def _process(self, path_u): + precondition(isinstance(path_u, unicode), path_u) + d = defer.succeed(None) + + def _add_file(encoded_name_u, version): + uploadable = FileName(path_u, self._convergence) + return self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True) + + def _add_dir(encoded_name_u): + self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True) + uploadable = Data("", self._convergence) + encoded_name_u += u"@_" + upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True) + def _succeeded(ign): + self._log("created subdirectory %r" % (path_u,)) + self._counter('magic_folder.directories_created', 1) + def _failed(f): + self._log("failed to create subdirectory %r" % (path_u,)) + return f + upload_d.addCallbacks(_succeeded, _failed) + upload_d.addCallback(lambda ign: self._scan(path_u)) + return upload_d + + def _maybe_upload(val): + self._upload_pending.remove(path_u) # FIXME make _upload_pending hold relative paths + relpath_u = os.path.relpath(path_u, self._local_dir) + encoded_name_u = magicpath.path2magic(relpath_u) + + def get_metadata(result): + try: + metadata_d = self._upload_dirnode.get_metadata_for(encoded_name_u) + except KeyError: + return Failure() + return metadata_d + + if not os.path.exists(path_u): + self._log("drop-upload: notified object %r disappeared " + "(this is normal for temporary objects)" % (path_u,)) + self._counter('magic_folder.objects_disappeared', 1) + d2 = defer.succeed(None) + if self._db.check_file_db_exists(relpath_u): + d2.addCallback(get_metadata) + def set_deleted(metadata): + current_version = self._db.get_local_file_version(relpath_u) + 1 + metadata['version'] = current_version + metadata['deleted'] = True + empty_uploadable = Data("", self._convergence) + return self._upload_dirnode.add_file(encoded_name_u, empty_uploadable, overwrite=True, metadata=metadata) + d2.addCallback(set_deleted) + d2.addCallback(lambda x: Exception("file does not exist")) + return d2 + elif os.path.islink(path_u): + raise Exception("symlink not being processed") + if os.path.isdir(path_u): + return _add_dir(encoded_name_u) + elif os.path.isfile(path_u): + version = self._db.get_local_file_version(relpath_u) + if version is None: + version = 0 + else: + version += 1 + print "NEW VERSION %d for %r" % (version, relpath_u) + d2 = _add_file(encoded_name_u, version) + def add_db_entry(filenode): + filecap = filenode.get_uri() + s = os.stat(path_u) + size = s[stat.ST_SIZE] + ctime = s[stat.ST_CTIME] + mtime = s[stat.ST_MTIME] + self._db.did_upload_file(filecap, relpath_u, version, mtime, ctime, size) + self._counter('magic_folder.files_uploaded', 1) + d2.addCallback(add_db_entry) + return d2 + else: + raise Exception("non-directory/non-regular file not being processed") + + d.addCallback(_maybe_upload) + + def _succeeded(res): + self._counter('magic_folder.objects_queued', -1) + self._counter('magic_folder.objects_succeeded', 1) + return res + def _failed(f): + self._counter('magic_folder.objects_queued', -1) + self._counter('magic_folder.objects_failed', 1) + self._log("%r while processing %r" % (f, path_u)) + return f + d.addCallbacks(_succeeded, _failed) + d.addBoth(self._do_callback) + return d + + +class Downloader(QueueMixin): + def __init__(self, client, local_path, db, collective_dircap): + QueueMixin.__init__(self, client, local_path, db) + + # TODO: allow a path rather than a cap URI. + self._collective_dirnode = self._client.create_node_from_uri(collective_dircap) + + if not IDirectoryNode.providedBy(self._collective_dirnode): + raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.") + if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly(): + raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.") + + self._remote_scan_delay = 3 # XXX + self._download_scan_batch = {} # path -> [(filenode, metadata)] + + def start_scanning(self): + self._scan_remote_collective() + self._turn_deque() + + def stop(self): + return self._lazy_tail + def _should_download(self, relpath_u, remote_version): """ _should_download returns a bool indicating whether or not a remote object should be downloaded. @@ -196,13 +457,13 @@ class MagicFolder(service.MultiService): return collective_dirmap_d def _add_batch_to_download_queue(self, result): - self._download_deque.extend(result) - self._download_pending.update(map(lambda x: x[0], result)) + self._deque.extend(result) + self._pending.update(map(lambda x: x[0], result)) def _filter_scan_batch(self, result): extension = [] # consider whether this should be a dict for name in self._download_scan_batch.keys(): - if name in self._download_pending: + if name in self._pending: continue file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version']) if self._should_download(name, metadata['version']): @@ -213,298 +474,43 @@ class MagicFolder(service.MultiService): d = file_node.download_best_version() def succeeded(res): d.addCallback(lambda result: self._write_downloaded_file(name, result)) - self._stats_provider.count('magic_folder.objects_downloaded', 1) + self._counter('magic_folder.objects_downloaded', 1) return None def failed(f): self._log("download failed") - self._stats_provider.count('magic_folder.objects_download_failed', 1) + self._counter('magic_folder.objects_download_failed', 1) return f def remove_from_pending(result): - self._download_pending = self._download_pending.difference(set([name])) + self._pending = self._pending.difference(set([name])) d.addCallbacks(succeeded, failed) - d.addBoth(self._do_download_callback) + d.addBoth(self._do_callback) d.addBoth(remove_from_pending) return d def _write_downloaded_file(self, name, file_contents): print "_write_downloaded_file: no-op." - def _db_file_is_uploaded(self, childpath): - """_db_file_is_uploaded returns true if the file was previously uploaded - """ - assert self._db != None - r = self._db.check_file(childpath) - return r.was_uploaded() + # FIXME move to QueueMixin + def _append_to_deque(self, path): + if path in self._download_scan_batch.keys(): + return + self._deque.append(path) + self._pending.add(path) + self._counter('magic_folder.download_objects_queued', 1) + if self.is_ready: + reactor.callLater(0, self._turn_deque) - def _scan(self, localpath): - if not os.path.isdir(localpath): - raise AssertionError("Programmer error: _scan() must be passed a directory path.") - quoted_path = quote_local_unicode_path(localpath) - try: - children = listdir_unicode(localpath) - except EnvironmentError: - raise(Exception("WARNING: magic folder: permission denied on directory %s" % (quoted_path,))) - except FilenameEncodingError: - raise(Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error" % (quoted_path,))) - - d = defer.succeed(None) - for child in children: - assert isinstance(child, unicode), child - childpath = os.path.join(localpath, child) - - def _process_child(ign, childpath=childpath): - # note: symlinks to directories are both islink() and isdir() - isdir = os.path.isdir(childpath) - isfile = os.path.isfile(childpath) - islink = os.path.islink(childpath) - - if islink: - self.warn("WARNING: cannot backup symlink %s" % quote_local_unicode_path(childpath)) - return None - elif isdir: - # process directories unconditionally - self._append_to_upload_deque(childpath) - - # recurse on the child directory - return self._scan(childpath) - elif isfile: - file_version = self._db.get_local_file_version(childpath) - if file_version is None: - # XXX upload if we didn't record our version in magicfolder db? - self._append_to_upload_deque(childpath) - return None - else: - d2 = self._get_collective_latest_file(childpath) - def _got_latest_file((file_node, metadata)): - collective_version = metadata['version'] - if collective_version is None: - return None - if file_version > collective_version: - self._append_to_upload_deque(childpath) - elif file_version < collective_version: - # if a collective version of the file is newer than ours - # we must download it and unlink the old file from our upload dirnode - self._append_to_download_deque(childpath) - # XXX where should we save the returned deferred? - return self._upload_dirnode.delete(childpath, must_be_file=True) - else: - # XXX same version. do nothing. - pass - d2.addCallback(_got_latest_file) - return d2 - else: - self.warn("WARNING: cannot backup special file %s" % quote_local_unicode_path(childpath)) - return None - d.addCallback(_process_child) - d.addErrback(log.err) - - return d - - def startService(self): - self._db = backupdb.get_backupdb(self._dbfile, create_version=(backupdb.SCHEMA_v3, 3)) - if self._db is None: - return Failure(Exception('ERROR: Unable to load magic folder db.')) - - service.MultiService.startService(self) - d = self._notifier.startReading() - self._stats_provider.count('magic_folder.dirs_monitored', 1) - return d - - def ready(self): - """ready is used to signal us to start - processing the upload and download items... - """ - self.is_ready = True - self._scan(self._local_dir) - self._scan_remote_collective() - self._turn_upload_deque() - self._turn_download_deque() - - def _turn_download_deque(self): + # FIXME move to QueueMixin + def _turn_deque(self): if self._stopped: return try: - file_path, file_node, metadata = self._download_deque.pop() + file_path, file_node, metadata = self._deque.pop() except IndexError: self._log("magic folder upload deque is now empty") - self._download_lazy_tail = defer.succeed(None) - self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective)) - self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_download_deque)) + self._lazy_tail = defer.succeed(None) + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective)) + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_deque)) return - self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node)) - self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque)) - - def _append_to_download_deque(self, path): - if path in self._download_scan_batch.keys(): - return - self._download_deque.append(path) - self._download_pending.add(path) - self._stats_provider.count('magic_folder.download_objects_queued', 1) - if self.is_ready: - reactor.callLater(0, self._turn_download_deque) - - def _append_to_upload_deque(self, path): - if path in self._upload_pending: - return - self._upload_deque.append(path) - self._upload_pending.add(path) - self._stats_provider.count('magic_folder.objects_queued', 1) - if self.is_ready: - reactor.callLater(0, self._turn_upload_deque) - - def _turn_upload_deque(self): - try: - path = self._upload_deque.pop() - except IndexError: - self._log("magic folder upload deque is now empty") - self._upload_lazy_tail = defer.succeed(None) - return - self._upload_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._process, path)) - self._upload_lazy_tail.addCallback(lambda ign: self._turn_upload_deque()) - - def _notify(self, opaque, path, events_mask): - self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask)))) - path_u = unicode_from_filepath(path) - self._append_to_upload_deque(path_u) - - def _process(self, path_u): - precondition(isinstance(path_u, unicode), path_u) - d = defer.succeed(None) - - def _add_file(encoded_name_u, version): - uploadable = FileName(path_u, self._convergence) - return self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True) - - def _add_dir(encoded_name_u): - self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True) - uploadable = Data("", self._convergence) - encoded_name_u += u"@_" - upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True) - def _succeeded(ign): - self._log("created subdirectory %r" % (path_u,)) - self._stats_provider.count('magic_folder.directories_created', 1) - def _failed(f): - self._log("failed to create subdirectory %r" % (path_u,)) - return f - upload_d.addCallbacks(_succeeded, _failed) - upload_d.addCallback(lambda ign: self._scan(path_u)) - return upload_d - - def _maybe_upload(val): - self._upload_pending.remove(path_u) # FIXME make _upload_pending hold relative paths - relpath_u = os.path.relpath(path_u, self._local_dir) - encoded_name_u = magicpath.path2magic(relpath_u) - - def get_metadata(result): - try: - metadata_d = self._upload_dirnode.get_metadata_for(encoded_name_u) - except KeyError: - return Failure() - return metadata_d - - if not os.path.exists(path_u): - self._log("drop-upload: notified object %r disappeared " - "(this is normal for temporary objects)" % (path_u,)) - self._stats_provider.count('magic_folder.objects_disappeared', 1) - d2 = defer.succeed(None) - if self._db.check_file_db_exists(relpath_u): - d2.addCallback(get_metadata) - def set_deleted(metadata): - current_version = self._db.get_local_file_version(relpath_u) + 1 - metadata['version'] = current_version - metadata['deleted'] = True - emptyUploadable = Data("", self._convergence) - return self._upload_dirnode.add_file(encoded_name_u, emptyUploadable, overwrite=True, metadata=metadata) - d2.addCallback(set_deleted) - d2.addCallback(lambda x: Exception("file does not exist")) - return d2 - elif os.path.islink(path_u): - raise Exception("symlink not being processed") - if os.path.isdir(path_u): - return _add_dir(encoded_name_u) - elif os.path.isfile(path_u): - version = self._db.get_local_file_version(relpath_u) - if version is None: - version = 0 - else: - version += 1 - print "NEW VERSION %d for %r" % (version, relpath_u) - d2 = _add_file(encoded_name_u, version) - def add_db_entry(filenode): - filecap = filenode.get_uri() - s = os.stat(path_u) - size = s[stat.ST_SIZE] - ctime = s[stat.ST_CTIME] - mtime = s[stat.ST_MTIME] - self._db.did_upload_file(filecap, relpath_u, version, mtime, ctime, size) - self._stats_provider.count('magic_folder.files_uploaded', 1) - d2.addCallback(add_db_entry) - return d2 - else: - raise Exception("non-directory/non-regular file not being processed") - - d.addCallback(_maybe_upload) - - def _succeeded(res): - self._stats_provider.count('magic_folder.objects_queued', -1) - self._stats_provider.count('magic_folder.objects_succeeded', 1) - return res - def _failed(f): - self._stats_provider.count('magic_folder.objects_queued', -1) - self._stats_provider.count('magic_folder.objects_failed', 1) - self._log("%r while processing %r" % (f, path_u)) - return f - d.addCallbacks(_succeeded, _failed) - d.addBoth(self._do_processed_callback) - return d - - def _do_download_callback(self, res): - if self._download_ignore_count == 0: - self._download_callback(res) - else: - self._download_ignore_count -= 1 - return None # intentionally suppress failures, which have already been logged - - def _do_processed_callback(self, res): - if self._upload_ignore_count == 0: - self._processed_callback(res) - else: - self._upload_ignore_count -= 1 - return None # intentionally suppress failures, which have already been logged - - def set_download_callback(self, callback, ignore_count=0): - """ - set_download_callback sets a function that will be called after a - remote filesystem notification has been processed (successfully or unsuccessfully). - """ - self._download_callback = callback - self._download_ignore_count = ignore_count - - def set_processed_callback(self, callback, ignore_count=0): - """ - set_processed_callback sets a function that will be called after a - local filesystem notification has been processed (successfully or unsuccessfully). - """ - self._processed_callback = callback - self._upload_ignore_count = ignore_count - - def finish(self, for_tests=False): - self._stopped = True - self._notifier.stopReading() - self._stats_provider.count('magic_folder.dirs_monitored', -1) - - if for_tests and hasattr(self._notifier, 'wait_until_stopped'): - d = self._notifier.wait_until_stopped() - else: - d = defer.succeed(None) - - d.addCallback(lambda x: self._download_lazy_tail) - return d - - def remove_service(self): - return service.MultiService.disownServiceParent(self) - - def _log(self, msg): - self._client.log("drop-upload: " + msg) - #print "_log %s" % (msg,) - #open("events", "ab+").write(msg) + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node)) + self._lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_deque))