mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-02 03:06:41 +00:00
wip
This commit is contained in:
parent
69a5df6fea
commit
3fe005def0
@ -74,8 +74,14 @@ class MagicFolder(service.MultiService):
|
||||
self.downloader.start_scanning()
|
||||
|
||||
def finish(self):
|
||||
print "finish"
|
||||
d = self.uploader.stop()
|
||||
def _print(f):
|
||||
print f
|
||||
return f
|
||||
d.addErrback(_print)
|
||||
d.addBoth(lambda ign: self.downloader.stop())
|
||||
d.addErrback(_print)
|
||||
return d
|
||||
|
||||
def remove_service(self):
|
||||
@ -167,6 +173,7 @@ class Uploader(QueueMixin):
|
||||
return d
|
||||
|
||||
def stop(self):
|
||||
print "stop: _deque = %r, _pending = %r" % (self._deque, self._pending)
|
||||
self._notifier.stopReading()
|
||||
self._counter('magic_folder.dirs_monitored', -1)
|
||||
|
||||
@ -174,6 +181,10 @@ class Uploader(QueueMixin):
|
||||
d = self._notifier.wait_until_stopped()
|
||||
else:
|
||||
d = defer.succeed(None)
|
||||
def _after(res):
|
||||
print "stop _after: res = %r, _deque = %r, _pending = %r" % (res, self._deque, self._pending)
|
||||
return res
|
||||
d.addBoth(_after)
|
||||
return d
|
||||
|
||||
def start_scanning(self):
|
||||
@ -377,13 +388,22 @@ class Downloader(QueueMixin):
|
||||
|
||||
self._remote_scan_delay = 3 # XXX
|
||||
self._download_scan_batch = {} # path -> [(filenode, metadata)]
|
||||
print "Downloader init"
|
||||
|
||||
def start_scanning(self):
|
||||
print "downloader start_scanning"
|
||||
self._scan_remote_collective()
|
||||
self._turn_deque()
|
||||
|
||||
def stop(self):
|
||||
return self._lazy_tail
|
||||
print "downloader stop"
|
||||
d = defer.succeed(None)
|
||||
d.addCallback(lambda ign: self._lazy_tail)
|
||||
def _print(res):
|
||||
print "downloader stop _after: res = %r, _deque = %r, _pending = %r" % (res, self._deque, self._pending)
|
||||
return res
|
||||
d.addBoth(_print)
|
||||
return d
|
||||
|
||||
def _should_download(self, relpath_u, remote_version):
|
||||
"""
|
||||
@ -391,6 +411,7 @@ class Downloader(QueueMixin):
|
||||
We check the remote metadata version against our magic-folder db version number;
|
||||
latest version wins.
|
||||
"""
|
||||
print "_should_download"
|
||||
v = self._db.get_local_file_version(relpath_u)
|
||||
return (v is None or v < remote_version)
|
||||
|
||||
@ -432,19 +453,29 @@ class Downloader(QueueMixin):
|
||||
return collective_dirmap_d
|
||||
|
||||
def _scan_remote(self, nickname, dirnode):
|
||||
print "_scan_remote START: nickname %s dirnode %s" % (nickname, dirnode)
|
||||
listing_d = dirnode.list()
|
||||
self._download_scan_batch = {}
|
||||
def scan_listing(listing_map):
|
||||
for name in listing_map.keys():
|
||||
print "name ", name
|
||||
file_node, metadata = listing_map[name]
|
||||
print "ALL KEYS %s" % (self._download_scan_batch.keys(),)
|
||||
if self._download_scan_batch.has_key(name):
|
||||
print "HAS KEY - %s %s" % (file_node, metadata)
|
||||
self._download_scan_batch[name] += [(file_node, metadata)]
|
||||
else:
|
||||
print "NOT HAS KEY"
|
||||
self._download_scan_batch[name] = [(file_node, metadata)]
|
||||
|
||||
print "download scan batch before filtering", repr(self._download_scan_batch)
|
||||
listing_d.addCallback(scan_listing)
|
||||
print "_scan_remote END"
|
||||
return listing_d
|
||||
|
||||
def _scan_remote_collective(self):
|
||||
self._download_scan_batch = {} # XXX
|
||||
|
||||
print "downloader _scan_remote_collective"
|
||||
if self._collective_dirnode is None:
|
||||
return
|
||||
collective_dirmap_d = self._collective_dirnode.list()
|
||||
@ -463,7 +494,12 @@ class Downloader(QueueMixin):
|
||||
return d
|
||||
collective_dirmap_d.addCallback(scan_collective)
|
||||
collective_dirmap_d.addCallback(self._filter_scan_batch)
|
||||
def _print(f):
|
||||
print f
|
||||
return f
|
||||
collective_dirmap_d.addErrback(_print)
|
||||
collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
|
||||
print "end of _scan_remote_collective"
|
||||
return collective_dirmap_d
|
||||
|
||||
def _add_batch_to_download_queue(self, result):
|
||||
@ -471,30 +507,40 @@ class Downloader(QueueMixin):
|
||||
self._pending.update(map(lambda x: x[0], result))
|
||||
|
||||
def _filter_scan_batch(self, result):
|
||||
print "FILTER START len %s" % (len(self._download_scan_batch),)
|
||||
extension = [] # consider whether this should be a dict
|
||||
for name in self._download_scan_batch.keys():
|
||||
if name in self._pending:
|
||||
print "downloader: %s found in pending; skipping" % (name,)
|
||||
continue
|
||||
file_node, metadata = max(self._download_scan_batch[name], key=lambda x: x[1]['version'])
|
||||
print "file_node %s metadata %s" % (file_node, metadata)
|
||||
if self._should_download(name, metadata['version']):
|
||||
print "should download"
|
||||
extension += [(name, file_node, metadata)]
|
||||
else:
|
||||
print "should not download"
|
||||
print "FILTER END"
|
||||
return extension
|
||||
|
||||
def _download_file(self, name, file_node):
|
||||
print "_download_file"
|
||||
d = file_node.download_best_version()
|
||||
def succeeded(res):
|
||||
d.addCallback(lambda result: self._write_downloaded_file(name, result))
|
||||
self._counter('magic_folder.objects_downloaded', 1)
|
||||
return None
|
||||
def failed(f):
|
||||
self._log("download failed")
|
||||
self._log("download failed: %s" % (str(f),))
|
||||
self._counter('magic_folder.objects_download_failed', 1)
|
||||
return f
|
||||
def remove_from_pending(result):
|
||||
self._pending = self._pending.difference(set([name]))
|
||||
def remove_from_pending(ign):
|
||||
print "REMOVE FROM PENDING _pending = %r, name = %r" % (self._pending, name)
|
||||
self._pending.remove(name)
|
||||
print "REMOVE FROM PENDING _after: _pending = %r" % (self._pending,)
|
||||
d.addCallbacks(succeeded, failed)
|
||||
d.addBoth(self._do_callback)
|
||||
d.addBoth(remove_from_pending)
|
||||
d.addCallback(remove_from_pending)
|
||||
return d
|
||||
|
||||
def _write_downloaded_file(self, name, file_contents):
|
||||
@ -502,6 +548,7 @@ class Downloader(QueueMixin):
|
||||
|
||||
# FIXME move to QueueMixin
|
||||
def _append_to_deque(self, path):
|
||||
print "downloader _append_to_deque"
|
||||
if path in self._download_scan_batch.keys():
|
||||
return
|
||||
self._deque.append(path)
|
||||
@ -512,6 +559,7 @@ class Downloader(QueueMixin):
|
||||
|
||||
# FIXME move to QueueMixin
|
||||
def _turn_deque(self):
|
||||
print "downloader _turn_deque"
|
||||
#if self._stopped:
|
||||
# return
|
||||
try:
|
||||
|
@ -142,7 +142,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||
self.mkdir_nonascii(small_tree_dir)
|
||||
fileutil.write(abspath_expanduser_unicode(u"what", base=small_tree_dir), "say when")
|
||||
d2 = defer.Deferred()
|
||||
self.magicfolder.set_callback(d2.callback, ignore_count=1)
|
||||
self.magicfolder.uploader.set_callback(d2.callback, ignore_count=1)
|
||||
os.rename(small_tree_dir, new_small_tree_dir)
|
||||
self.notify(to_filepath(new_small_tree_dir), self.inotify.IN_MOVED_TO)
|
||||
return d2
|
||||
@ -154,7 +154,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||
|
||||
def _check_moved_tree_is_watched(res):
|
||||
d2 = defer.Deferred()
|
||||
self.magicfolder.set_callback(d2.callback)
|
||||
self.magicfolder.uploader.set_callback(d2.callback)
|
||||
fileutil.write(abspath_expanduser_unicode(u"another", base=new_small_tree_dir), "file")
|
||||
self.notify(to_filepath(abspath_expanduser_unicode(u"another", base=new_small_tree_dir)), self.inotify.IN_CLOSE_WRITE)
|
||||
return d2
|
||||
@ -380,7 +380,7 @@ class MagicFolderTestMixin(MagicFolderTestMixin, ShouldFailMixin, ReallyEqualMix
|
||||
|
||||
def Alice_rewrite_file(result):
|
||||
print "Alice rewrites file\n"
|
||||
self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder._local_dir)
|
||||
self.file_path = abspath_expanduser_unicode(u"file1", base=self.alice_magicfolder.uploader._local_path_u)
|
||||
fileutil.write(self.file_path, "Alice suddenly sees the white rabbit running into the forest.")
|
||||
self.magicfolder = self.alice_magicfolder
|
||||
self.notify(to_filepath(self.file_path), self.inotify.IN_CLOSE_WRITE)
|
||||
|
Loading…
Reference in New Issue
Block a user