mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-19 13:07:56 +00:00
ftp: change the twisted hack necessary for async-write-close, to one more agreeable to the twisted-dev folks, add a copy of the necessary patch to docs/ftp.txt
This commit is contained in:
parent
70ae1bfc8e
commit
05a8360177
65
docs/ftp.txt
65
docs/ftp.txt
@ -46,3 +46,68 @@ add the following lines to BASEDIR/tahoe.cfg:
|
||||
The FTP server requires Twisted's "vfs" component, which is not included in
|
||||
the Twisted-8.1.0 release. If there is no newer release available, it may be
|
||||
necessary to run Twisted from the SVN trunk to obtain this component.
|
||||
|
||||
In addition, the following patch must be applied (as of r24943) to enable
|
||||
asynchronous closing of file-upload operations:
|
||||
|
||||
Index: twisted/protocols/ftp.py
|
||||
===================================================================
|
||||
--- twisted/protocols/ftp.py (revision 24956)
|
||||
+++ twisted/protocols/ftp.py (working copy)
|
||||
@@ -1049,7 +1049,6 @@
|
||||
cons = ASCIIConsumerWrapper(cons)
|
||||
|
||||
d = self.dtpInstance.registerConsumer(cons)
|
||||
- d.addCallbacks(cbSent, ebSent)
|
||||
|
||||
# Tell them what to doooo
|
||||
if self.dtpInstance.isConnected:
|
||||
@@ -1062,6 +1061,8 @@
|
||||
def cbOpened(file):
|
||||
d = file.receive()
|
||||
d.addCallback(cbConsumer)
|
||||
+ d.addCallback(lambda ignored: file.close())
|
||||
+ d.addCallbacks(cbSent, ebSent)
|
||||
return d
|
||||
|
||||
def ebOpened(err):
|
||||
@@ -1434,7 +1435,14 @@
|
||||
@rtype: C{Deferred} of C{IConsumer}
|
||||
"""
|
||||
|
||||
+ def close():
|
||||
+ """
|
||||
+ Perform any post-write work that needs to be done. This method may
|
||||
+ only be invoked once on each provider, and will always be invoked
|
||||
+ after receive().
|
||||
|
||||
+ @rtype: C{Deferred} of anything: the value is ignored
|
||||
+ """
|
||||
|
||||
def _getgroups(uid):
|
||||
"""Return the primary and supplementary groups for the given UID.
|
||||
@@ -1795,6 +1803,8 @@
|
||||
# FileConsumer will close the file object
|
||||
return defer.succeed(FileConsumer(self.fObj))
|
||||
|
||||
+ def close(self):
|
||||
+ return defer.succeed(None)
|
||||
|
||||
|
||||
class FTPRealm:
|
||||
Index: twisted/vfs/adapters/ftp.py
|
||||
===================================================================
|
||||
--- twisted/vfs/adapters/ftp.py (revision 24956)
|
||||
+++ twisted/vfs/adapters/ftp.py (working copy)
|
||||
@@ -295,6 +295,11 @@
|
||||
"""
|
||||
return defer.succeed(IConsumer(self.node))
|
||||
|
||||
+ def close(self):
|
||||
+ """
|
||||
+ Perform post-write actions.
|
||||
+ """
|
||||
+ return defer.succeed(None)
|
||||
|
||||
|
||||
class _FileToConsumerAdapter(object):
|
||||
|
@ -255,15 +255,14 @@ class Client(node.Node, testutil.PollMixin):
|
||||
self.add_service(ws)
|
||||
|
||||
def init_ftp_server(self):
|
||||
if not self.get_config("ftpd", "enabled", False, boolean=True):
|
||||
return
|
||||
portstr = self.get_config("ftpd", "ftp.port", "8021")
|
||||
accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
|
||||
accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
|
||||
if self.get_config("ftpd", "enabled", False, boolean=True):
|
||||
accountfile = self.get_config("ftpd", "ftp.accounts.file", None)
|
||||
accounturl = self.get_config("ftpd", "ftp.accounts.url", None)
|
||||
ftp_portstr = self.get_config("ftpd", "ftp.port", "8021")
|
||||
|
||||
from allmydata import ftpd
|
||||
s = ftpd.FTPServer(self, portstr, accountfile, accounturl)
|
||||
s.setServiceParent(self)
|
||||
from allmydata import ftpd
|
||||
s = ftpd.FTPServer(self, accountfile, accounturl, ftp_portstr)
|
||||
s.setServiceParent(self)
|
||||
|
||||
def _check_hotline(self, hotline_file):
|
||||
if os.path.exists(hotline_file):
|
||||
|
@ -26,62 +26,39 @@ class ReadFile:
|
||||
|
||||
class FileWriter:
|
||||
implements(IConsumer)
|
||||
def __init__(self, parent, childname, convergence):
|
||||
self.parent = parent
|
||||
self.childname = childname
|
||||
self.convergence = convergence
|
||||
|
||||
def registerProducer(self, producer, streaming):
|
||||
self.producer = producer
|
||||
if not streaming:
|
||||
raise NotImplementedError("Non-streaming producer not supported.")
|
||||
# we write the data to a temporary file, since Tahoe can't do
|
||||
# streaming upload yet.
|
||||
self.f = tempfile.TemporaryFile()
|
||||
return None
|
||||
def unregisterProducer(self):
|
||||
# now we do the upload.
|
||||
|
||||
# bummer: unregisterProducer is run synchronously. twisted's FTP
|
||||
# server (twisted.protocols.ftp.DTP._unregConsumer:454) ignores our
|
||||
# return value, and sends the 226 Transfer Complete right after
|
||||
# unregisterProducer returns. The client will believe that the
|
||||
# transfer is indeed complete, whereas for us it is just starting.
|
||||
# Some clients will do an immediate LIST to see if the file was
|
||||
# really uploaded.
|
||||
u = FileHandle(self.f, self.convergence)
|
||||
d = self.parent.add_file(self.childname, u)
|
||||
# by patching twisted.protocols.ftp.DTP._unregConsumer to pass this
|
||||
# Deferred back, we can obtain the async-upload that we desire.
|
||||
return d
|
||||
def unregisterProducer(self):
|
||||
# the upload actually happens in WriteFile.close()
|
||||
pass
|
||||
|
||||
def write(self, data):
|
||||
# if streaming==True, then the sender/producer is in charge. We are
|
||||
# allowed to call self.producer.pauseProducing() when we have too
|
||||
# much data and want them to stop, but they might not listen to us.
|
||||
# We must then call self.producer.resumeProducing() when we can
|
||||
# accomodate more.
|
||||
#
|
||||
# if streaming==False, then we (the consumer) are in charge. We must
|
||||
# keep calling p.resumeProducing() (which will prompt them to call
|
||||
# our .write again) until they finish.
|
||||
#
|
||||
# If we experience an error, call p.stopProducing().
|
||||
self.f.write(data)
|
||||
|
||||
class WriteFile:
|
||||
implements(ftp.IWriteFile)
|
||||
|
||||
def __init__(self, parent, childname, convergence):
|
||||
self.parent = parent
|
||||
self.childname = childname
|
||||
self.convergence = convergence
|
||||
|
||||
def receive(self):
|
||||
try:
|
||||
c = FileWriter(self.parent, self.childname, self.convergence)
|
||||
except:
|
||||
log.err()
|
||||
raise
|
||||
return defer.succeed(c)
|
||||
self.c = FileWriter()
|
||||
return defer.succeed(self.c)
|
||||
|
||||
def close(self):
|
||||
u = FileHandle(self.c.f, self.convergence)
|
||||
d = self.parent.add_file(self.childname, u)
|
||||
return d
|
||||
|
||||
|
||||
class NoParentError(Exception):
|
||||
pass
|
||||
@ -392,7 +369,7 @@ class Dispatcher:
|
||||
|
||||
|
||||
class FTPServer(service.MultiService):
|
||||
def __init__(self, client, portstr, accountfile, accounturl):
|
||||
def __init__(self, client, accountfile, accounturl, ftp_portstr):
|
||||
service.MultiService.__init__(self)
|
||||
|
||||
if accountfile:
|
||||
@ -408,5 +385,5 @@ class FTPServer(service.MultiService):
|
||||
p.registerChecker(c)
|
||||
f = ftp.FTPFactory(p)
|
||||
|
||||
s = strports.service(portstr, f)
|
||||
s = strports.service(ftp_portstr, f)
|
||||
s.setServiceParent(self)
|
||||
|
Loading…
Reference in New Issue
Block a user