Drop-upload frontend, rerecorded for 1.9 beta (and correcting a minor mistake). Includes some fixes for Windows but not the Windows inotify implementation. fixes #1429

This commit is contained in:
david-sarah
2011-08-08 16:40:49 -07:00
parent 05684b9251
commit 32a7717205
9 changed files with 458 additions and 0 deletions

View File

@ -19,6 +19,8 @@ install_requires = [
"zope.interface == 3.3.1, == 3.5.3, == 3.6.1", "zope.interface == 3.3.1, == 3.5.3, == 3.6.1",
# On Windows we need at least Twisted 9.0 to avoid an indirect dependency on pywin32. # On Windows we need at least Twisted 9.0 to avoid an indirect dependency on pywin32.
# On Linux we need at least Twisted 10.1.0 for inotify support used by the drop-upload
# frontend.
# We also need Twisted 10.1 for the FTP frontend in order for Twisted's FTP server to # We also need Twisted 10.1 for the FTP frontend in order for Twisted's FTP server to
# support asynchronous close. # support asynchronous close.
"Twisted >= 10.1.0", "Twisted >= 10.1.0",

View File

@ -150,6 +150,7 @@ class Client(node.Node, pollmixin.PollMixin):
# ControlServer and Helper are attached after Tub startup # ControlServer and Helper are attached after Tub startup
self.init_ftp_server() self.init_ftp_server()
self.init_sftp_server() self.init_sftp_server()
self.init_drop_uploader()
hotline_file = os.path.join(self.basedir, hotline_file = os.path.join(self.basedir,
self.SUICIDE_PREVENTION_HOTLINE_FILE) self.SUICIDE_PREVENTION_HOTLINE_FILE)
@ -421,6 +422,22 @@ class Client(node.Node, pollmixin.PollMixin):
sftp_portstr, pubkey_file, privkey_file) sftp_portstr, pubkey_file, privkey_file)
s.setServiceParent(self) s.setServiceParent(self)
def init_drop_uploader(self):
if self.get_config("drop_upload", "enabled", False, boolean=True):
upload_uri = self.get_config("drop_upload", "upload.uri", None)
local_dir_utf8 = self.get_config("drop_upload", "local.directory", None)
if upload_uri and local_dir_utf8:
try:
from allmydata.frontends import drop_upload
s = drop_upload.DropUploader(self, upload_uri, local_dir_utf8)
s.setServiceParent(self)
s.start()
except Exception, e:
self.log("couldn't start drop-uploader: %r", args=(e,))
else:
self.log("couldn't start drop-uploader: upload.uri or local.directory not specified")
def _check_hotline(self, hotline_file): def _check_hotline(self, hotline_file):
if os.path.exists(hotline_file): if os.path.exists(hotline_file):
mtime = os.stat(hotline_file)[stat.ST_MTIME] mtime = os.stat(hotline_file)[stat.ST_MTIME]

View File

@ -0,0 +1,117 @@
import os, sys
from twisted.internet import defer
from twisted.python.filepath import FilePath
from twisted.application import service
from foolscap.api import eventually
from allmydata.interfaces import IDirectoryNode
from allmydata.util.encodingutil import quote_output, get_filesystem_encoding
from allmydata.immutable.upload import FileName
class DropUploader(service.MultiService):
def __init__(self, client, upload_uri, local_dir_utf8, inotify=None):
service.MultiService.__init__(self)
try:
local_dir_u = os.path.expanduser(local_dir_utf8.decode('utf-8'))
if sys.platform == "win32":
local_dir = local_dir_u
else:
local_dir = local_dir_u.encode(get_filesystem_encoding())
except (UnicodeEncodeError, UnicodeDecodeError):
raise AssertionError("The drop-upload path %s was not valid UTF-8 or could not be represented in the filesystem encoding."
% quote_output(local_dir_utf8))
self._client = client
self._stats_provider = client.stats_provider
self._convergence = client.convergence
self._local_path = FilePath(local_dir)
if inotify is None:
from twisted.internet import inotify
self._inotify = inotify
if not self._local_path.isdir():
raise AssertionError("The drop-upload local path %r was not an existing directory." % quote_output(local_dir_u))
# TODO: allow a path rather than an URI.
self._parent = self._client.create_node_from_uri(upload_uri)
if not IDirectoryNode.providedBy(self._parent):
raise AssertionError("The drop-upload remote URI is not a directory URI.")
if self._parent.is_unknown() or self._parent.is_readonly():
raise AssertionError("The drop-upload remote URI does not refer to a writeable directory.")
self._uploaded_callback = lambda ign: None
self._notifier = inotify.INotify()
# We don't watch for IN_CREATE, because that would cause us to read and upload a
# possibly-incomplete file before the application has closed it. There should always
# be an IN_CLOSE_WRITE after an IN_CREATE (I think).
# TODO: what about IN_MOVE_SELF or IN_UNMOUNT?
mask = inotify.IN_CLOSE_WRITE | inotify.IN_MOVED_TO | inotify.IN_ONLYDIR
self._notifier.watch(self._local_path, mask=mask, callbacks=[self._notify])
def start(self):
d = self._notifier.startReading()
self._stats_provider.count('drop_upload.dirs_monitored', 1)
return d
def _notify(self, opaque, path, events_mask):
self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
self._stats_provider.count('drop_upload.files_queued', 1)
eventually(self._process, opaque, path, events_mask)
def _process(self, opaque, path, events_mask):
d = defer.succeed(None)
# FIXME: if this already exists as a mutable file, we replace the directory entry,
# but we should probably modify the file (as the SFTP frontend does).
def _add_file(ign):
name = path.basename()
# on Windows the name is already Unicode
if not isinstance(name, unicode):
name = name.decode(get_filesystem_encoding())
u = FileName(path.path, self._convergence)
return self._parent.add_file(name, u)
d.addCallback(_add_file)
def _succeeded(ign):
self._stats_provider.count('drop_upload.files_queued', -1)
self._stats_provider.count('drop_upload.files_uploaded', 1)
def _failed(f):
self._stats_provider.count('drop_upload.files_queued', -1)
if path.exists():
self._log("drop-upload: %r failed to upload due to %r" % (path.path, f))
self._stats_provider.count('drop_upload.files_failed', 1)
return f
else:
self._log("drop-upload: notified file %r disappeared "
"(this is normal for temporary files): %r" % (path.path, f))
self._stats_provider.count('drop_upload.files_disappeared', 1)
return None
d.addCallbacks(_succeeded, _failed)
d.addBoth(self._uploaded_callback)
return d
def set_uploaded_callback(self, callback):
"""This sets a function that will be called after a file has been uploaded."""
self._uploaded_callback = callback
def finish(self, for_tests=False):
self._notifier.stopReading()
self._stats_provider.count('drop_upload.dirs_monitored', -1)
if for_tests and hasattr(self._notifier, 'wait_until_stopped'):
return self._notifier.wait_until_stopped()
else:
return defer.succeed(None)
def _log(self, msg):
self._client.log(msg)
#open("events", "ab+").write(msg)

View File

@ -152,6 +152,14 @@ def create_node(config, out=sys.stdout, err=sys.stderr):
c.write("enabled = false\n") c.write("enabled = false\n")
c.write("\n") c.write("\n")
c.write("[drop_upload]\n")
c.write("# Shall this node automatically upload files created or modified in a local directory?\n")
c.write("enabled = false\n")
c.write("# This must be an URI for a writeable directory.\n")
c.write("upload.uri =\n")
c.write("local.directory = ~/drop_upload\n")
c.write("\n")
c.close() c.close()
from allmydata.util import fileutil from allmydata.util import fileutil

View File

@ -0,0 +1,185 @@
import os, sys, platform
from twisted.trial import unittest
from twisted.python import filepath, runtime
from twisted.internet import defer, base
from allmydata.interfaces import IDirectoryNode, NoSuchChildError
from allmydata.util import fileutil, fake_inotify
from allmydata.util.encodingutil import get_filesystem_encoding
from allmydata.util.consumer import download_to_data
from allmydata.test.no_network import GridTestMixin
from allmydata.test.common_util import ReallyEqualMixin
from allmydata.test.common import ShouldFailMixin
from allmydata.frontends.drop_upload import DropUploader
class DropUploadTestMixin(GridTestMixin, ShouldFailMixin, ReallyEqualMixin):
"""
These tests will be run both with a mock notifier, and (on platforms that support it)
with the real INotify.
"""
def _get_count(self, name):
return self.stats_provider.get_stats()["counters"].get(name, 0)
def _test(self):
self.uploader = None
self.set_up_grid()
dirname_u = u"loc\u0101l_dir"
if sys.platform != "win32":
try:
u"loc\u0101l_dir".encode(get_filesystem_encoding())
except UnicodeEncodeError:
dirname_u = u"local_dir"
self.local_dir = os.path.join(self.basedir, dirname_u)
os.mkdir(self.local_dir)
self.client = self.g.clients[0]
self.stats_provider = self.client.stats_provider
d = self.client.create_dirnode()
def _made_upload_dir(n):
self.failUnless(IDirectoryNode.providedBy(n))
self.upload_dirnode = n
self.upload_uri = n.get_uri()
self.uploader = DropUploader(self.client, self.upload_uri, self.local_dir.encode('utf-8'),
inotify=self.inotify)
return self.uploader.start()
d.addCallback(_made_upload_dir)
# Write something short enough for a LIT file.
d.addCallback(lambda ign: self._test_file(u"short", "test"))
# Write to the same file again with different data.
d.addCallback(lambda ign: self._test_file(u"short", "different"))
# Test that temporary files are not uploaded.
d.addCallback(lambda ign: self._test_file(u"tempfile", "test", temporary=True))
# Test that we tolerate creation of a subdirectory.
d.addCallback(lambda ign: os.mkdir(os.path.join(self.local_dir, u"directory")))
# Write something longer, and also try to test a Unicode name if the fs can represent it.
name_u = u"l\u00F8ng"
if sys.platform != "win32":
try:
u"l\u00F8ng".encode(get_filesystem_encoding())
except UnicodeEncodeError:
name_u = u"long"
d.addCallback(lambda ign: self._test_file(name_u, "test"*100))
# TODO: test that causes an upload failure.
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_failed'), 0))
# Prevent unclean reactor errors.
def _cleanup(res):
d = defer.succeed(None)
if self.uploader is not None:
d.addCallback(lambda ign: self.uploader.finish(for_tests=True))
d.addCallback(lambda ign: res)
return d
d.addBoth(_cleanup)
return d
def _test_file(self, name_u, data, temporary=False):
previously_uploaded = self._get_count('drop_upload.files_uploaded')
previously_disappeared = self._get_count('drop_upload.files_disappeared')
d = defer.Deferred()
# Note: this relies on the fact that we only get one IN_CLOSE_WRITE notification per file
# (otherwise we would get a defer.AlreadyCalledError). Should we be relying on that?
self.uploader.set_uploaded_callback(d.callback)
path_u = os.path.join(self.local_dir, name_u)
if sys.platform == "win32":
path = filepath.FilePath(path_u)
else:
path = filepath.FilePath(path_u.encode(get_filesystem_encoding()))
f = open(path.path, "wb")
try:
if temporary and sys.platform != "win32":
os.unlink(path.path)
f.write(data)
finally:
f.close()
if temporary and sys.platform == "win32":
os.unlink(path.path)
self.notify_close_write(path)
if temporary:
d.addCallback(lambda ign: self.shouldFail(NoSuchChildError, 'temp file not uploaded', None,
self.upload_dirnode.get, name_u))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_disappeared'),
previously_disappeared + 1))
else:
d.addCallback(lambda ign: self.upload_dirnode.get(name_u))
d.addCallback(download_to_data)
d.addCallback(lambda actual_data: self.failUnlessReallyEqual(actual_data, data))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_uploaded'),
previously_uploaded + 1))
d.addCallback(lambda ign: self.failUnlessReallyEqual(self._get_count('drop_upload.files_queued'), 0))
return d
class MockTest(DropUploadTestMixin, unittest.TestCase):
"""This can run on any platform, and even if twisted.internet.inotify can't be imported."""
def test_errors(self):
self.basedir = "drop_upload.MockTest.test_errors"
self.set_up_grid()
errors_dir = os.path.join(self.basedir, "errors_dir")
os.mkdir(errors_dir)
client = self.g.clients[0]
d = client.create_dirnode()
def _made_upload_dir(n):
self.failUnless(IDirectoryNode.providedBy(n))
upload_uri = n.get_uri()
readonly_uri = n.get_readonly_uri()
self.shouldFail(AssertionError, 'invalid local dir', 'could not be represented',
DropUploader, client, upload_uri, '\xFF', inotify=fake_inotify)
self.shouldFail(AssertionError, 'non-existant local dir', 'not an existing directory',
DropUploader, client, upload_uri, os.path.join(self.basedir, "Laputa"), inotify=fake_inotify)
self.shouldFail(AssertionError, 'bad URI', 'not a directory URI',
DropUploader, client, 'bad', errors_dir, inotify=fake_inotify)
self.shouldFail(AssertionError, 'non-directory URI', 'not a directory URI',
DropUploader, client, 'URI:LIT:foo', errors_dir, inotify=fake_inotify)
self.shouldFail(AssertionError, 'readonly directory URI', 'does not refer to a writeable directory',
DropUploader, client, readonly_uri, errors_dir, inotify=fake_inotify)
d.addCallback(_made_upload_dir)
return d
def test_drop_upload(self):
self.inotify = fake_inotify
self.basedir = "drop_upload.MockTest.test_drop_upload"
return self._test()
def notify_close_write(self, path):
self.uploader._notifier.event(path, self.inotify.IN_CLOSE_WRITE)
class RealTest(DropUploadTestMixin, unittest.TestCase):
"""This is skipped unless both Twisted and the platform support inotify."""
def test_drop_upload(self):
# We should always have runtime.platform.supportsINotify, because we're using
# Twisted >= 10.1.
if not runtime.platform.supportsINotify():
raise unittest.SkipTest("Drop-upload support can only be tested for-real on an OS that supports inotify or equivalent.")
self.inotify = None # use the appropriate inotify for the platform
self.basedir = "drop_upload.RealTest.test_drop_upload"
return self._test()
def notify_close_write(self, path):
# Writing to the file causes the notification.
pass

View File

@ -253,6 +253,8 @@ class CreateNode(unittest.TestCase):
self.failUnless(re.search(r"\n\[storage\]\n#.*\nenabled = true\n", content), content) self.failUnless(re.search(r"\n\[storage\]\n#.*\nenabled = true\n", content), content)
self.failUnless("\nreserved_space = 1G\n" in content) self.failUnless("\nreserved_space = 1G\n" in content)
self.failUnless(re.search(r"\n\[drop_upload\]\n#.*\nenabled = false\n", content), content)
# creating the node a second time should be rejected # creating the node a second time should be rejected
rc, out, err = self.run_tahoe(argv) rc, out, err = self.run_tahoe(argv)
self.failIfEqual(rc, 0, str((out, err, rc))) self.failIfEqual(rc, 0, str((out, err, rc)))

View File

@ -0,0 +1,99 @@
# Most of this is copied from Twisted 11.0. The reason for this hack is that
# twisted.internet.inotify can't be imported when the platform does not support inotify.
# from /usr/src/linux/include/linux/inotify.h
IN_ACCESS = 0x00000001L # File was accessed
IN_MODIFY = 0x00000002L # File was modified
IN_ATTRIB = 0x00000004L # Metadata changed
IN_CLOSE_WRITE = 0x00000008L # Writeable file was closed
IN_CLOSE_NOWRITE = 0x00000010L # Unwriteable file closed
IN_OPEN = 0x00000020L # File was opened
IN_MOVED_FROM = 0x00000040L # File was moved from X
IN_MOVED_TO = 0x00000080L # File was moved to Y
IN_CREATE = 0x00000100L # Subfile was created
IN_DELETE = 0x00000200L # Subfile was delete
IN_DELETE_SELF = 0x00000400L # Self was deleted
IN_MOVE_SELF = 0x00000800L # Self was moved
IN_UNMOUNT = 0x00002000L # Backing fs was unmounted
IN_Q_OVERFLOW = 0x00004000L # Event queued overflowed
IN_IGNORED = 0x00008000L # File was ignored
IN_ONLYDIR = 0x01000000 # only watch the path if it is a directory
IN_DONT_FOLLOW = 0x02000000 # don't follow a sym link
IN_MASK_ADD = 0x20000000 # add to the mask of an already existing watch
IN_ISDIR = 0x40000000 # event occurred against dir
IN_ONESHOT = 0x80000000 # only send event once
IN_CLOSE = IN_CLOSE_WRITE | IN_CLOSE_NOWRITE # closes
IN_MOVED = IN_MOVED_FROM | IN_MOVED_TO # moves
IN_CHANGED = IN_MODIFY | IN_ATTRIB # changes
IN_WATCH_MASK = (IN_MODIFY | IN_ATTRIB |
IN_CREATE | IN_DELETE |
IN_DELETE_SELF | IN_MOVE_SELF |
IN_UNMOUNT | IN_MOVED_FROM | IN_MOVED_TO)
_FLAG_TO_HUMAN = [
(IN_ACCESS, 'access'),
(IN_MODIFY, 'modify'),
(IN_ATTRIB, 'attrib'),
(IN_CLOSE_WRITE, 'close_write'),
(IN_CLOSE_NOWRITE, 'close_nowrite'),
(IN_OPEN, 'open'),
(IN_MOVED_FROM, 'moved_from'),
(IN_MOVED_TO, 'moved_to'),
(IN_CREATE, 'create'),
(IN_DELETE, 'delete'),
(IN_DELETE_SELF, 'delete_self'),
(IN_MOVE_SELF, 'move_self'),
(IN_UNMOUNT, 'unmount'),
(IN_Q_OVERFLOW, 'queue_overflow'),
(IN_IGNORED, 'ignored'),
(IN_ONLYDIR, 'only_dir'),
(IN_DONT_FOLLOW, 'dont_follow'),
(IN_MASK_ADD, 'mask_add'),
(IN_ISDIR, 'is_dir'),
(IN_ONESHOT, 'one_shot')
]
def humanReadableMask(mask):
"""
Auxiliary function that converts an hexadecimal mask into a series
of human readable flags.
"""
s = []
for k, v in _FLAG_TO_HUMAN:
if k & mask:
s.append(v)
return s
# This class is not copied from Twisted; it acts as a mock.
class INotify(object):
def startReading(self):
pass
def stopReading(self):
pass
def watch(self, filepath, mask=IN_WATCH_MASK, autoAdd=False, callbacks=None, recursive=False):
self.callbacks = callbacks
def event(self, filepath, mask):
for cb in self.callbacks:
cb(None, filepath, mask)
__all__ = ["INotify", "humanReadableMask", "IN_WATCH_MASK", "IN_ACCESS",
"IN_MODIFY", "IN_ATTRIB", "IN_CLOSE_NOWRITE", "IN_CLOSE_WRITE",
"IN_OPEN", "IN_MOVED_FROM", "IN_MOVED_TO", "IN_CREATE",
"IN_DELETE", "IN_DELETE_SELF", "IN_MOVE_SELF", "IN_UNMOUNT",
"IN_Q_OVERFLOW", "IN_IGNORED", "IN_ONLYDIR", "IN_DONT_FOLLOW",
"IN_MASK_ADD", "IN_ISDIR", "IN_ONESHOT", "IN_CLOSE",
"IN_MOVED", "IN_CHANGED"]

View File

@ -9,6 +9,8 @@
<h1>Node Statistics</h1> <h1>Node Statistics</h1>
<h2>General</h2>
<ul> <ul>
<li>Load Average: <span n:render="load_average" /></li> <li>Load Average: <span n:render="load_average" /></li>
<li>Peak Load: <span n:render="peak_load" /></li> <li>Peak Load: <span n:render="peak_load" /></li>
@ -18,6 +20,15 @@
<li>Files Retrieved (mutable): <span n:render="retrieves" /></li> <li>Files Retrieved (mutable): <span n:render="retrieves" /></li>
</ul> </ul>
<h2>Drop-Uploader</h2>
<ul>
<li>Local Directories Monitored: <span n:render="drop_monitored" /></li>
<li>Files Uploaded: <span n:render="drop_uploads" /></li>
<li>File Changes Queued: <span n:render="drop_queued" /></li>
<li>Failed Uploads: <span n:render="drop_failed" /></li>
</ul>
<h2>Raw Stats:</h2> <h2>Raw Stats:</h2>
<pre n:render="raw" /> <pre n:render="raw" />

View File

@ -1290,6 +1290,23 @@ class Statistics(rend.Page):
return "%s files / %s bytes (%s)" % (files, bytes, return "%s files / %s bytes (%s)" % (files, bytes,
abbreviate_size(bytes)) abbreviate_size(bytes))
def render_drop_monitored(self, ctx, data):
dirs = data["counters"].get("drop_upload.dirs_monitored", 0)
return "%s directories" % (dirs,)
def render_drop_uploads(self, ctx, data):
# TODO: bytes uploaded
files = data["counters"].get("drop_upload.files_uploaded", 0)
return "%s files" % (files,)
def render_drop_queued(self, ctx, data):
files = data["counters"].get("drop_upload.files_queued", 0)
return "%s files" % (files,)
def render_drop_failed(self, ctx, data):
files = data["counters"].get("drop_upload.files_failed", 0)
return "%s files" % (files,)
def render_raw(self, ctx, data): def render_raw(self, ctx, data):
raw = pprint.pformat(data) raw = pprint.pformat(data)
return ctx.tag[raw] return ctx.tag[raw]