WIP - fix some bugs

This commit is contained in:
David Stainton 2015-07-24 15:12:57 -07:00
parent 3c77914519
commit bcfdcf2877
3 changed files with 41 additions and 35 deletions

View File

@ -11,6 +11,7 @@ from twisted.application import service
from allmydata.interfaces import IDirectoryNode from allmydata.interfaces import IDirectoryNode
from allmydata.util import log from allmydata.util import log
from allmydata.util.fileutil import precondition_abspath from allmydata.util.fileutil import precondition_abspath
from allmydata.util.assertutil import precondition from allmydata.util.assertutil import precondition
from allmydata.util.encodingutil import listdir_unicode, to_filepath, \ from allmydata.util.encodingutil import listdir_unicode, to_filepath, \
unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError unicode_from_filepath, quote_local_unicode_path, FilenameEncodingError
@ -41,12 +42,12 @@ def get_inotify_module():
class MagicFolder(service.MultiService): class MagicFolder(service.MultiService):
name = 'magic-folder' name = 'magic-folder'
def __init__(self, client, upload_dircap, collective_dircap, local_dir, dbfile, inotify=None, def __init__(self, client, upload_dircap, collective_dircap, local_dir_path_u, dbfile, inotify=None,
pending_delay=1.0): pending_delay=1.0):
precondition_abspath(local_dir) precondition_abspath(local_dir_path_u)
service.MultiService.__init__(self) service.MultiService.__init__(self)
local_path = to_filepath(local_dir) local_path = to_filepath(local_dir_path_u)
db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3)) db = backupdb.get_backupdb(dbfile, create_version=(backupdb.SCHEMA_v3, 3))
if db is None: if db is None:
@ -57,13 +58,13 @@ class MagicFolder(service.MultiService):
if not local_path.exists(): if not local_path.exists():
raise AssertionError("The '[magic_folder] local.directory' parameter was %s " raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
"but there is no directory at that location." "but there is no directory at that location."
% quote_local_unicode_path(local_dir)) % quote_local_unicode_path(local_dir_path_u))
if not local_path.isdir(): if not local_path.isdir():
raise AssertionError("The '[magic_folder] local.directory' parameter was %s " raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
"but the thing at that location is not a directory." "but the thing at that location is not a directory."
% quote_local_unicode_path(local_dir)) % quote_local_unicode_path(local_dir_path_u))
self.uploader = Uploader(client, local_path, db, upload_dircap, inotify, pending_delay) self.uploader = Uploader(client, local_dir_path_u, db, upload_dircap, inotify, pending_delay)
self.downloader = Downloader(client, local_path, db, collective_dircap) self.downloader = Downloader(client, local_path, db, collective_dircap)
def startService(self): def startService(self):
@ -88,7 +89,7 @@ class MagicFolder(service.MultiService):
class QueueMixin(object): class QueueMixin(object):
def __init__(self, client, counter, local_path, db): def __init__(self, client, local_path, db):
self._client = client self._client = client
self._counter = client.stats_provider.count self._counter = client.stats_provider.count
self._local_path = local_path self._local_path = local_path
@ -97,7 +98,7 @@ class QueueMixin(object):
self._deque = deque() self._deque = deque()
self._lazy_tail = defer.succeed(None) self._lazy_tail = defer.succeed(None)
self._pending = set() self._pending = set()
self._processed_callback = lambda ign: None self._callback = lambda ign: None
self._ignore_count = 0 self._ignore_count = 0
def _do_callback(self, res): def _do_callback(self, res):
@ -122,8 +123,11 @@ class QueueMixin(object):
class Uploader(QueueMixin): class Uploader(QueueMixin):
def __init__(self, client, local_path, db, upload_dircap, inotify, pending_delay): def __init__(self, client, local_dir_path_u, db, upload_dircap, inotify, pending_delay):
QueueMixin.__init__(self, client, local_path, db) QueueMixin.__init__(self, client, local_dir_path_u, db)
self.local_path = local_dir_path_u
self.is_ready = False
# TODO: allow a path rather than a cap URI. # TODO: allow a path rather than a cap URI.
self._upload_dirnode = self._client.create_node_from_uri(upload_dircap) self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
@ -150,7 +154,7 @@ class Uploader(QueueMixin):
| self._inotify.IN_ONLYDIR | self._inotify.IN_ONLYDIR
| IN_EXCL_UNLINK | IN_EXCL_UNLINK
) )
self._notifier.watch(self._local_path, mask=self.mask, callbacks=[self._notify], self._notifier.watch(to_filepath(self.local_path), mask=self.mask, callbacks=[self._notify],
recursive=True) recursive=True)
def start_monitoring(self): def start_monitoring(self):
@ -169,7 +173,8 @@ class Uploader(QueueMixin):
return d return d
def start_scanning(self): def start_scanning(self):
self._scan(self._local_dir) self.is_ready = True
self._scan(self._local_path)
self._turn_deque() self._turn_deque()
def _scan(self, localpath): def _scan(self, localpath):
@ -267,12 +272,12 @@ class Uploader(QueueMixin):
d = defer.succeed(None) d = defer.succeed(None)
def _add_file(encoded_name_u, version): def _add_file(encoded_name_u, version):
uploadable = FileName(path_u, self._convergence) uploadable = FileName(path_u, self._client.convergence)
return self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True) return self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":version}, overwrite=True)
def _add_dir(encoded_name_u): def _add_dir(encoded_name_u):
self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True) self._notifier.watch(to_filepath(path_u), mask=self.mask, callbacks=[self._notify], recursive=True)
uploadable = Data("", self._convergence) uploadable = Data("", self._client.convergence)
encoded_name_u += u"@_" encoded_name_u += u"@_"
upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True) upload_d = self._upload_dirnode.add_file(encoded_name_u, uploadable, metadata={"version":0}, overwrite=True)
def _succeeded(ign): def _succeeded(ign):
@ -286,8 +291,8 @@ class Uploader(QueueMixin):
return upload_d return upload_d
def _maybe_upload(val): def _maybe_upload(val):
self._upload_pending.remove(path_u) # FIXME make _upload_pending hold relative paths self._pending.remove(path_u) # FIXME make _upload_pending hold relative paths
relpath_u = os.path.relpath(path_u, self._local_dir) relpath_u = os.path.relpath(path_u, self.local_path)
encoded_name_u = magicpath.path2magic(relpath_u) encoded_name_u = magicpath.path2magic(relpath_u)
def get_metadata(result): def get_metadata(result):
@ -308,7 +313,7 @@ class Uploader(QueueMixin):
current_version = self._db.get_local_file_version(relpath_u) + 1 current_version = self._db.get_local_file_version(relpath_u) + 1
metadata['version'] = current_version metadata['version'] = current_version
metadata['deleted'] = True metadata['deleted'] = True
empty_uploadable = Data("", self._convergence) empty_uploadable = Data("", self._client.convergence)
return self._upload_dirnode.add_file(encoded_name_u, empty_uploadable, overwrite=True, metadata=metadata) return self._upload_dirnode.add_file(encoded_name_u, empty_uploadable, overwrite=True, metadata=metadata)
d2.addCallback(set_deleted) d2.addCallback(set_deleted)
d2.addCallback(lambda x: Exception("file does not exist")) d2.addCallback(lambda x: Exception("file does not exist"))
@ -438,12 +443,13 @@ class Downloader(QueueMixin):
def _scan_remote_collective(self): def _scan_remote_collective(self):
if self._collective_dirnode is None: if self._collective_dirnode is None:
return return
upload_readonly_dircap = self._upload_dirnode.get_readonly_uri()
collective_dirmap_d = self._collective_dirnode.list() collective_dirmap_d = self._collective_dirnode.list()
def do_filter(result):
others = [x for x in result.keys() if result[x][0].get_readonly_uri() != upload_readonly_dircap] def do_list(result):
others = [x for x in result.keys()]
return result, others return result, others
collective_dirmap_d.addCallback(do_filter) collective_dirmap_d.addCallback(do_list)
def scan_collective(result): def scan_collective(result):
d = defer.succeed(None) d = defer.succeed(None)
collective_dirmap, others_list = result collective_dirmap, others_list = result
@ -502,8 +508,8 @@ class Downloader(QueueMixin):
# FIXME move to QueueMixin # FIXME move to QueueMixin
def _turn_deque(self): def _turn_deque(self):
if self._stopped: #if self._stopped:
return # return
try: try:
file_path, file_node, metadata = self._deque.pop() file_path, file_node, metadata = self._deque.pop()
except IndexError: except IndexError:

View File

@ -94,7 +94,7 @@ class MagicFolderTestMixin(CLITestMixin, GridTestMixin):
def cleanup(self, res): def cleanup(self, res):
d = defer.succeed(None) d = defer.succeed(None)
if self.magicfolder is not None: if self.magicfolder is not None:
d.addCallback(lambda ign: self.magicfolder.finish(for_tests=True)) d.addCallback(lambda ign: self.magicfolder.finish())
d.addCallback(lambda ign: res) d.addCallback(lambda ign: res)
return d return d

View File

@ -128,7 +128,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
def _check_move_empty_tree(res): def _check_move_empty_tree(res):
self.mkdir_nonascii(empty_tree_dir) self.mkdir_nonascii(empty_tree_dir)
d2 = defer.Deferred() d2 = defer.Deferred()
self.magicfolder.set_processed_callback(d2.callback) self.magicfolder.set_callback(d2.callback)
os.rename(empty_tree_dir, new_empty_tree_dir) os.rename(empty_tree_dir, new_empty_tree_dir)
self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO) self.notify(to_filepath(new_empty_tree_dir), self.inotify.IN_MOVED_TO)
return d2 return d2
@ -142,7 +142,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
self.mkdir_nonascii(small_tree_dir) self.mkdir_nonascii(small_tree_dir)
fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when") fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
d2 = defer.Deferred() d2 = defer.Deferred()
self.magicfolder.set_processed_callback(d2.callback, ignore_count=1) self.magicfolder.set_callback(d2.callback, ignore_count=1)
os.rename(small_tree_dir, new_small_tree_dir) os.rename(small_tree_dir, new_small_tree_dir)
self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO) self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO)
return d2 return d2
@ -154,7 +154,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
def _check_moved_tree_is_watched(res): def _check_moved_tree_is_watched(res):
d2 = defer.Deferred() d2 = defer.Deferred()
self.magicfolder.set_processed_callback(d2.callback) self.magicfolder.set_callback(d2.callback)
fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file") fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE) self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE)
return d2 return d2
@ -204,7 +204,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
def create_test_file(result): def create_test_file(result):
d2 = defer.Deferred() d2 = defer.Deferred()
self.magicfolder.set_processed_callback(d2.callback) self.magicfolder.set_callback(d2.callback)
test_file = abspath_expanduser_unicode(u"what", base=self.local_dir) test_file = abspath_expanduser_unicode(u"what", base=self.local_dir)
fileutil.write(test_file, "meow") fileutil.write(test_file, "meow")
self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE) self.notify(to_filepath(test_file), self.inotify.IN_CLOSE_WRITE)
@ -279,7 +279,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
# Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file # Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
# (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that? # (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
d = defer.Deferred() d = defer.Deferred()
self.magicfolder.set_processed_callback(d.callback) self.magicfolder.set_callback(d.callback)
path_u = abspath_expanduser_unicode(name_u, base=self.local_dir) path_u = abspath_expanduser_unicode(name_u, base=self.local_dir)
path = to_filepath(path_u) path = to_filepath(path_u)
@ -314,7 +314,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
def _check_version_in_dmd(self, magicfolder, relpath_u, expected_version): def _check_version_in_dmd(self, magicfolder, relpath_u, expected_version):
encoded_name_u = magicpath.path2magic(relpath_u) encoded_name_u = magicpath.path2magic(relpath_u)
d = magicfolder._upload_dirnode.get_child_and_metadata(encoded_name_u) d = magicfolder.uploader._upload_dirnode.get_child_and_metadata(encoded_name_u)
def _check((filenode, metadata)): def _check((filenode, metadata)):
self.failUnless(metadata, "no metadata for %r" % (relpath_u,)) self.failUnless(metadata, "no metadata for %r" % (relpath_u,))
self.failUnlessEqual(metadata['version'], expected_version) self.failUnlessEqual(metadata['version'], expected_version)
@ -334,7 +334,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
def Alice_write_a_file(result): def Alice_write_a_file(result):
print "Alice writes a file\n" print "Alice writes a file\n"
self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder._local_dir) self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader.local_path)
fileutil.write(self.file_path, "meow, meow meow. meow? meow meow! meow.") fileutil.write(self.file_path, "meow, meow meow. meow? meow meow! meow.")
self.magicfolder = self.alice_magicfolder self.magicfolder = self.alice_magicfolder
self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE) self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE)
@ -344,7 +344,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
def Alice_wait_for_upload(result): def Alice_wait_for_upload(result):
print "Alice waits for an upload\n" print "Alice waits for an upload\n"
d2 = defer.Deferred() d2 = defer.Deferred()
self.alice_magicfolder.set_processed_callback(d2.callback) self.alice_magicfolder.uploader.set_callback(d2.callback)
return d2 return d2
d.addCallback(Alice_wait_for_upload) d.addCallback(Alice_wait_for_upload)
d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0)) d.addCallback(lambda ign: self._check_version_in_dmd(self.alice_magicfolder, u"file1", 0))
@ -393,8 +393,8 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
def cleanup_Alice_and_Bob(result): def cleanup_Alice_and_Bob(result):
d = defer.succeed(None) d = defer.succeed(None)
d.addCallback(lambda ign: self.alice_magicfolder.finish(for_tests=True)) d.addCallback(lambda ign: self.alice_magicfolder.finish())
d.addCallback(lambda ign: self.bob_magicfolder.finish(for_tests=True)) d.addCallback(lambda ign: self.bob_magicfolder.finish())
d.addCallback(lambda ign: result) d.addCallback(lambda ign: result)
return d return d
d.addCallback(cleanup_Alice_and_Bob) d.addCallback(cleanup_Alice_and_Bob)
@ -408,7 +408,7 @@ class MockTest(MagicFolderTestMixin, unittest.TestCase):
self.inotify = fake_inotify self.inotify = fake_inotify
def notify(self, path, mask): def notify(self, path, mask):
self.magicfolder._notifier.event(path, mask) self.magicfolder.uploader._notifier.event(path, mask)
def test_errors(self): def test_errors(self):
self.set_up_grid() self.set_up_grid()