mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-22 12:28:05 +00:00
Fix version deletion propagation and test
This commit is contained in:
parent
5fe1e13120
commit
8a5c01cdab
@ -417,3 +417,24 @@ class BackupDB_v3(BackupDB_v2):
|
||||
return None
|
||||
else:
|
||||
return row[0]
|
||||
|
||||
def did_upload_file(self, filecap, path, version, mtime, ctime, size):
|
||||
now = time.time()
|
||||
fileid = self.get_or_allocate_fileid_for_cap(filecap)
|
||||
try:
|
||||
self.cursor.execute("INSERT INTO last_upload VALUES (?,?,?)",
|
||||
(fileid, now, now))
|
||||
except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
|
||||
self.cursor.execute("UPDATE last_upload"
|
||||
" SET last_uploaded=?, last_checked=?"
|
||||
" WHERE fileid=?",
|
||||
(now, now, fileid))
|
||||
try:
|
||||
self.cursor.execute("INSERT INTO local_files VALUES (?,?,?,?,?,?)",
|
||||
(path, size, mtime, ctime, fileid, version))
|
||||
except (self.sqlite_module.IntegrityError, self.sqlite_module.OperationalError):
|
||||
self.cursor.execute("UPDATE local_files"
|
||||
" SET size=?, mtime=?, ctime=?, fileid=?, version=?"
|
||||
" WHERE path=?",
|
||||
(size, mtime, ctime, fileid, path, version))
|
||||
self.connection.commit()
|
||||
|
@ -46,7 +46,7 @@ class MagicFolder(service.MultiService):
|
||||
|
||||
service.MultiService.__init__(self)
|
||||
self._stopped = False
|
||||
self._remote_scan_delay = 3 # XXX
|
||||
self._remote_scan_delay = 10 # XXX
|
||||
self._local_dir = abspath_expanduser_unicode(local_dir)
|
||||
self._upload_lazy_tail = defer.succeed(None)
|
||||
self._upload_pending = set()
|
||||
@ -122,11 +122,18 @@ class MagicFolder(service.MultiService):
|
||||
We check the remote metadata version against our magic-folder db version number;
|
||||
latest version wins.
|
||||
"""
|
||||
# XXX todo
|
||||
v = self._db.get_local_file_version(path)
|
||||
if v is None:
|
||||
return True
|
||||
else:
|
||||
if v < remote_version:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _scan_remote(self, nickname, dirnode):
|
||||
listing_d = dirnode.list()
|
||||
self._download_scan_batch = {}
|
||||
def scan_listing(listing_map):
|
||||
for name in listing_map.keys():
|
||||
file_node, metadata = listing_map[name]
|
||||
@ -150,11 +157,12 @@ class MagicFolder(service.MultiService):
|
||||
d = defer.succeed(None)
|
||||
collective_dirmap, others_list = result
|
||||
for dir_name in others_list:
|
||||
# XXX this is broken
|
||||
d.addCallback(lambda x: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
|
||||
return d
|
||||
collective_dirmap_d.addCallback(scan_collective)
|
||||
collective_dirmap_d.addCallback(self._filter_scan_batch)
|
||||
collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
|
||||
return d
|
||||
collective_dirmap_d.addCallback(scan_collective)
|
||||
return collective_dirmap_d
|
||||
|
||||
def _add_batch_to_download_queue(self, result):
|
||||
@ -165,29 +173,26 @@ class MagicFolder(service.MultiService):
|
||||
extension = []
|
||||
for name in self._download_scan_batch.keys():
|
||||
if name in self._download_pending:
|
||||
# XXX
|
||||
continue
|
||||
if len(self._download_scan_batch[name]) == 1:
|
||||
filename, file_node, metadata = self._download_scan_batch[name][0]
|
||||
if self._should_download(name, metadata['version']):
|
||||
extension += [(name, file_node, metadata)]
|
||||
else:
|
||||
for item in self._download_scan_batch:
|
||||
nickname, file_node, metadata = item
|
||||
for item in self._download_scan_batch[name]:
|
||||
(nickname, file_node, metadata) = item
|
||||
if self._should_download(name, metadata['version']):
|
||||
extension += [(name, file_node, metadata)]
|
||||
return extension
|
||||
|
||||
def _download_file(self, name, file_node):
|
||||
print "_download_file"
|
||||
d = file_node.download_best_version()
|
||||
def succeeded(res):
|
||||
d.addCallback(lambda result: self._write_downloaded_file(name, result))
|
||||
self._stats_provider.count('magic_folder.objects_downloaded', +1)
|
||||
return None
|
||||
def failed(f):
|
||||
pass
|
||||
return failure.Failure("download failed")
|
||||
def remove_from_pending(result):
|
||||
self._download_pending = self._download_pending.difference(set([name]))
|
||||
d.addCallbacks(succeeded, failed)
|
||||
d.addBoth(self._do_download_callback)
|
||||
d.addBoth(remove_from_pending)
|
||||
return d
|
||||
|
||||
def _write_downloaded_file(self, name, file_contents):
|
||||
@ -256,18 +261,8 @@ class MagicFolder(service.MultiService):
|
||||
self.is_ready = True
|
||||
self._turn_upload_deque()
|
||||
self._turn_download_deque()
|
||||
self._scan_remote_collective()
|
||||
|
||||
def _append_to_download_deque(self, name, file_node):
|
||||
if name in self._download_pending:
|
||||
return
|
||||
self._download_deque.append(file_node) # XXX
|
||||
self._download_pending.add(name)
|
||||
self._stats_provider.count('magic_folder.objects_queued_for_download', 1)
|
||||
reactor.callLater(0, self._turn_download_deque)
|
||||
|
||||
def _turn_download_deque(self):
|
||||
print "_turn_download_deque"
|
||||
if self._stopped:
|
||||
return
|
||||
try:
|
||||
@ -275,7 +270,8 @@ class MagicFolder(service.MultiService):
|
||||
except IndexError:
|
||||
self._log("magic folder upload deque is now empty")
|
||||
self._download_lazy_tail = defer.succeed(None)
|
||||
self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque))
|
||||
self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._scan_remote_collective))
|
||||
self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._turn_download_deque))
|
||||
return
|
||||
self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, 0, self._download_file, file_path, file_node))
|
||||
self._download_lazy_tail.addCallback(lambda ign: task.deferLater(reactor, self._remote_scan_delay, self._turn_download_deque))
|
||||
@ -333,18 +329,11 @@ class MagicFolder(service.MultiService):
|
||||
|
||||
def get_metadata(result):
|
||||
try:
|
||||
metadata_d = self._parent.get_metadata_for(name)
|
||||
metadata_d = self._upload_dirnode.get_metadata_for(name)
|
||||
except KeyError:
|
||||
return failure.Failure()
|
||||
return metadata_d
|
||||
|
||||
def get_local_version(path):
|
||||
v = self._db.get_local_file_version(path)
|
||||
if v is None:
|
||||
return 1
|
||||
else:
|
||||
return v
|
||||
|
||||
if not os.path.exists(path):
|
||||
self._log("drop-upload: notified object %r disappeared "
|
||||
"(this is normal for temporary objects)" % (path,))
|
||||
@ -353,10 +342,12 @@ class MagicFolder(service.MultiService):
|
||||
if self._db.check_file_db_exists(path):
|
||||
d2.addCallback(get_metadata)
|
||||
def set_deleted(metadata):
|
||||
metadata['version'] = get_local_version(path) + 1
|
||||
current_version = self._db.get_local_file_version(path) + 1
|
||||
print "current version ", current_version
|
||||
metadata['version'] = current_version
|
||||
metadata['deleted'] = True
|
||||
emptyUploadable = Data("", self._convergence)
|
||||
return self._parent.add_file(name, emptyUploadable, overwrite=True, metadata=metadata)
|
||||
return self._upload_dirnode.add_file(name, emptyUploadable, overwrite=True, metadata=metadata)
|
||||
d2.addCallback(set_deleted)
|
||||
d2.addCallback(lambda x: Exception("file does not exist"))
|
||||
return d2
|
||||
@ -365,7 +356,11 @@ class MagicFolder(service.MultiService):
|
||||
if os.path.isdir(path):
|
||||
return _add_dir(name)
|
||||
elif os.path.isfile(path):
|
||||
version = get_local_version(path)
|
||||
version = self._db.get_local_file_version(path)
|
||||
if version is None:
|
||||
version = 1
|
||||
else:
|
||||
version += 1
|
||||
d2 = _add_file(name, version)
|
||||
def add_db_entry(filenode):
|
||||
filecap = filenode.get_uri()
|
||||
@ -373,7 +368,7 @@ class MagicFolder(service.MultiService):
|
||||
size = s[stat.ST_SIZE]
|
||||
ctime = s[stat.ST_CTIME]
|
||||
mtime = s[stat.ST_MTIME]
|
||||
self._db.did_upload_file(filecap, path, mtime, ctime, size)
|
||||
self._db.did_upload_file(filecap, path, version, mtime, ctime, size)
|
||||
self._stats_provider.count('magic_folder.files_uploaded', 1)
|
||||
d2.addCallback(add_db_entry)
|
||||
return d2
|
||||
|
@ -322,35 +322,52 @@ class MagicFolderTestMixin(MagicFolderCLITestMixin, ShouldFailMixin, ReallyEqual
|
||||
self.alice_collective_dir, self.alice_upload_dircap, self.alice_magicfolder, self.bob_collective_dircap, self.bob_upload_dircap, self.bob_magicfolder = result
|
||||
d.addCallback(get_results)
|
||||
|
||||
def write_a_file(result):
|
||||
file_path = os.path.join(self.alice_magicfolder._local_dir, "file1")
|
||||
fileutil.write(file_path, "meow, meow meow. meow? meow meow! meow.")
|
||||
def Alice_write_a_file(result):
|
||||
print "Alice writes a file\n"
|
||||
self.file_path = os.path.join(self.alice_magicfolder._local_dir, "file1")
|
||||
fileutil.write(self.file_path, "meow, meow meow. meow? meow meow! meow.")
|
||||
# XXX fix me --> self.notify(file_path, self.inotify.IN_CLOSE_WRITE)
|
||||
d.addCallback(write_a_file)
|
||||
d.addCallback(Alice_write_a_file)
|
||||
|
||||
def wait_for_upload(result):
|
||||
def Alice_wait_for_upload(result):
|
||||
print "Alice waits for an upload\n"
|
||||
d2 = defer.Deferred()
|
||||
self.alice_magicfolder.set_processed_callback(d2.callback, ignore_count=0)
|
||||
return d2
|
||||
d.addCallback(wait_for_upload)
|
||||
def prepare_for_alice_stats(result):
|
||||
d.addCallback(Alice_wait_for_upload)
|
||||
def Alice_prepare_for_alice_stats(result):
|
||||
self.stats_provider = self.alice_magicfolder._client.stats_provider
|
||||
d.addCallback(prepare_for_alice_stats)
|
||||
d.addCallback(Alice_prepare_for_alice_stats)
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_succeeded'), 1))
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.files_uploaded'), 1))
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_queued'), 0))
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.directories_created'), 0))
|
||||
|
||||
def wait_for_download(result):
|
||||
def Bob_wait_for_download(result):
|
||||
print "Bob waits for a download\n"
|
||||
d2 = defer.Deferred()
|
||||
self.bob_magicfolder.set_download_callback(d2.callback, ignore_count=0)
|
||||
return d2
|
||||
d.addCallback(wait_for_download)
|
||||
def prepare_for_bob_stats(result):
|
||||
d.addCallback(Bob_wait_for_download)
|
||||
def Bob_prepare_for_stats(result):
|
||||
self.stats_provider = self.bob_magicfolder._client.stats_provider
|
||||
d.addCallback(prepare_for_bob_stats)
|
||||
d.addCallback(Bob_prepare_for_stats)
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_downloaded'), 1))
|
||||
|
||||
# test deletion of file behavior
|
||||
def Alice_delete_file(result):
|
||||
print "Alice deletes the file!\n"
|
||||
os.unlink(self.file_path)
|
||||
self.notify(self.file_path, self.inotify.IN_DELETE)
|
||||
return None
|
||||
d.addCallback(Alice_delete_file)
|
||||
d.addCallback(Alice_wait_for_upload)
|
||||
d.addCallback(Alice_prepare_for_alice_stats)
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_succeeded'), 2)) # XXX ?
|
||||
d.addCallback(Bob_wait_for_download)
|
||||
d.addCallback(Bob_prepare_for_stats)
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('magic_folder.objects_downloaded'), 2)) # XXX ?
|
||||
|
||||
def cleanup_Alice_and_Bob(result):
|
||||
d = defer.succeed(None)
|
||||
d.addCallback(lambda ign: self.alice_magicfolder.finish(for_tests=True))
|
||||
|
Loading…
Reference in New Issue
Block a user