mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-23 12:59:55 +00:00
WIP
This commit is contained in:
parent
96b846f762
commit
e1822c0518
@ -21,7 +21,7 @@ from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
|
||||
from allmydata.immutable.upload import FileName, Data
|
||||
from allmydata import magicfolderdb, magicpath
|
||||
|
||||
|
||||
defer.setDebugging(True)
|
||||
IN_EXCL_UNLINK = 0x04000000L
|
||||
|
||||
def get_inotify_module():
|
||||
@ -156,22 +156,31 @@ class QueueMixin(HookMixin):
|
||||
#open("events", "ab+").write(msg)
|
||||
|
||||
def _turn_deque(self):
|
||||
self._log("_turn_deque")
|
||||
if self._stopped:
|
||||
self._log("stopped")
|
||||
return
|
||||
try:
|
||||
item = self._deque.pop()
|
||||
self._log("popped %r" % (item,))
|
||||
self._count('objects_queued', -1)
|
||||
except IndexError:
|
||||
self._log("deque is now empty")
|
||||
self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
|
||||
else:
|
||||
self._lazy_tail.addCallback(lambda ign: self._process(item))
|
||||
self._lazy_tail.addBoth(self._call_hook, 'processed')
|
||||
self._lazy_tail.addErrback(log.err)
|
||||
self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
|
||||
self._log("_turn_deque")
|
||||
if self._stopped:
|
||||
self._log("stopped")
|
||||
return
|
||||
try:
|
||||
item = self._deque.pop()
|
||||
self._log("popped %r" % (item,))
|
||||
self._count('objects_queued', -1)
|
||||
except IndexError:
|
||||
self._log("deque is now empty")
|
||||
self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
|
||||
else:
|
||||
self._log("_turn_deque else clause")
|
||||
def whawhat(result):
|
||||
self._log("whawhat result %r" % (result,))
|
||||
return result
|
||||
self._lazy_tail.addBoth(whawhat)
|
||||
self._lazy_tail.addCallback(lambda ign: self._process(item))
|
||||
self._lazy_tail.addBoth(self._call_hook, 'processed')
|
||||
self._lazy_tail.addErrback(log.err)
|
||||
self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
|
||||
except Exception as e:
|
||||
self._log("turn deque exception %s" % (e,))
|
||||
raise
|
||||
|
||||
|
||||
class Uploader(QueueMixin):
|
||||
@ -242,11 +251,10 @@ class Uploader(QueueMixin):
|
||||
for relpath_u in all_relpaths:
|
||||
self._add_pending(relpath_u)
|
||||
|
||||
self._periodic_full_scan(ignore_pending=True)
|
||||
self._extend_queue_and_keep_going(self._pending)
|
||||
self._full_scan()
|
||||
|
||||
def _extend_queue_and_keep_going(self, relpaths_u):
|
||||
self._log("queueing %r" % (relpaths_u,))
|
||||
self._log("_extend_queue_and_keep_going %r" % (relpaths_u,))
|
||||
self._deque.extend(relpaths_u)
|
||||
self._count('objects_queued', len(relpaths_u))
|
||||
|
||||
@ -256,20 +264,15 @@ class Uploader(QueueMixin):
|
||||
else:
|
||||
self._clock.callLater(0, self._turn_deque)
|
||||
|
||||
def _periodic_full_scan(self, ignore_pending=False):
|
||||
self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._periodic_full_scan)
|
||||
if ignore_pending:
|
||||
self._full_scan()
|
||||
else:
|
||||
if len(self._pending) == 0:
|
||||
self._full_scan()
|
||||
|
||||
def _full_scan(self):
|
||||
self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._full_scan)
|
||||
print "FULL SCAN"
|
||||
self._log("_pending %r" % (self._pending))
|
||||
self._scan(u"")
|
||||
self._extend_queue_and_keep_going(self._pending)
|
||||
|
||||
def _add_pending(self, relpath_u):
|
||||
self._log("add pending %r" % (relpath_u,))
|
||||
if not magicpath.should_ignore_file(relpath_u):
|
||||
self._pending.add(relpath_u)
|
||||
|
||||
@ -334,6 +337,7 @@ class Uploader(QueueMixin):
|
||||
d = defer.succeed(None)
|
||||
|
||||
def _maybe_upload(val, now=None):
|
||||
self._log("_maybe_upload(%r, now=%r)" % (val, now))
|
||||
if now is None:
|
||||
now = time.time()
|
||||
fp = self._get_filepath(relpath_u)
|
||||
@ -385,6 +389,7 @@ class Uploader(QueueMixin):
|
||||
self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
|
||||
return None
|
||||
elif pathinfo.isdir:
|
||||
print "ISDIR "
|
||||
if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
|
||||
self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
|
||||
|
||||
|
@ -1188,7 +1188,7 @@ class MockTest(MagicFolderTestMixin, unittest.TestCase):
|
||||
# .tmp file shouldn't exist
|
||||
self.failIf(os.path.exists(local_file + u".tmp"))
|
||||
|
||||
def test_periodic_full_scan(self):
|
||||
def meowmeow_test_periodic_full_scan(self):
|
||||
self.set_up_grid()
|
||||
self.local_dir = abspath_expanduser_unicode(u"test_periodic_full_scan",base=self.basedir)
|
||||
self.mkdir_nonascii(self.local_dir)
|
||||
@ -1215,11 +1215,8 @@ class MockTest(MagicFolderTestMixin, unittest.TestCase):
|
||||
empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.basedir)
|
||||
new_empty_tree_dir = abspath_expanduser_unicode(empty_tree_name, base=self.local_dir)
|
||||
|
||||
#d = self.create_invite_join_magic_folder(u"Alice", self.local_dir)
|
||||
d.addCallback(self._restart_client)
|
||||
|
||||
def _check_move_empty_tree(res):
|
||||
print "_check_move_empty_tree"
|
||||
print "CHECK MOVE EMPTY TREE"
|
||||
uploaded_d = self.magicfolder.uploader.set_hook('processed')
|
||||
self.mkdir_nonascii(empty_tree_dir)
|
||||
os.rename(empty_tree_dir, new_empty_tree_dir)
|
||||
@ -1232,17 +1229,15 @@ class MockTest(MagicFolderTestMixin, unittest.TestCase):
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.objects_queued'), 0))
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.directories_created'), 1))
|
||||
|
||||
self.uploaded_d = None
|
||||
def _create_file_without_event(res):
|
||||
self.uploaded_d = self.magicfolder.uploader.set_hook('processed', ignore_count=0)
|
||||
print "CREATE FILE WITHOUT EMITTING EVENT"
|
||||
uploaded_d = self.magicfolder.uploader.set_hook('processed')
|
||||
what_path = abspath_expanduser_unicode(u"what", base=new_empty_tree_dir)
|
||||
fileutil.write(what_path, "say when")
|
||||
return None
|
||||
print "ADVANCE CLOCK"
|
||||
alice_clock.advance(self.magicfolder.uploader._periodic_full_scan_duration + 1)
|
||||
return uploaded_d
|
||||
d.addCallback(_create_file_without_event)
|
||||
def advance_clock(res):
|
||||
alice_clock.advance(20)
|
||||
d.addCallback(advance_clock)
|
||||
d.addCallback(lambda ign: self.uploaded_d)
|
||||
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('uploader.files_uploaded'), 1))
|
||||
d.addCallback(lambda ign: self.magicfolder.finish())
|
||||
return d
|
||||
|
Loading…
Reference in New Issue
Block a user