mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-04-10 04:09:58 +00:00
SFTP: fixed bugs that caused hangs during write (#1037).
This commit is contained in:
parent
5f9c10901b
commit
57699fd1eb
@ -318,6 +318,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
||||
|
||||
def __init__(self, check_abort, download_size, tempfile_maker):
|
||||
PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
|
||||
if noisy: self.log(".__init__(%r, %r, %r)" % (check_abort, download_size, tempfile_maker), level=NOISY)
|
||||
self.check_abort = check_abort
|
||||
self.download_size = download_size
|
||||
self.current_size = download_size
|
||||
@ -326,6 +327,11 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
||||
self.milestones = [] # empty heap of (offset, d)
|
||||
self.overwrites = [] # empty heap of (start, end)
|
||||
self.done = self.when_reached(download_size) # adds a milestone
|
||||
self.is_done = False
|
||||
def _signal_done(ign):
|
||||
if noisy: self.log("DONE", level=NOISY)
|
||||
self.is_done = True
|
||||
self.done.addCallback(_signal_done)
|
||||
self.producer = None
|
||||
|
||||
def get_file(self):
|
||||
@ -346,13 +352,17 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
||||
self.finish()
|
||||
|
||||
def registerProducer(self, p, streaming):
|
||||
if noisy: self.log(".registerProducer(%r, streaming=%r)" % (p, streaming), level=NOISY)
|
||||
self.producer = p
|
||||
if streaming:
|
||||
# call resumeProducing once to start things off
|
||||
p.resumeProducing()
|
||||
else:
|
||||
while not self.done:
|
||||
p.resumeProducing()
|
||||
def _iterate():
|
||||
if not self.is_done:
|
||||
p.resumeProducing()
|
||||
eventually(_iterate)
|
||||
_iterate()
|
||||
|
||||
def write(self, data):
|
||||
if noisy: self.log(".write(<data of length %r>)" % (len(data),), level=NOISY)
|
||||
@ -454,6 +464,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
||||
|
||||
if offset + length > self.current_size:
|
||||
length = self.current_size - offset
|
||||
if noisy: self.log("truncating read to %r bytes" % (length,), level=NOISY)
|
||||
|
||||
needed = min(offset + length, self.download_size)
|
||||
d = self.when_reached(needed)
|
||||
@ -511,8 +522,8 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
||||
SIZE_THRESHOLD = 1000
|
||||
|
||||
def _make_sftp_file(check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
|
||||
if noisy: logmsg("_make_sftp_file(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r" %
|
||||
(check_abort, flags, convergence, parent, childname, filenode, metadata), level=NOISY)
|
||||
if noisy: logmsg("_make_sftp_file(%r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r" %
|
||||
(check_abort, flags, parent, childname, filenode, metadata), level=NOISY)
|
||||
|
||||
if not (flags & (FXF_WRITE | FXF_CREAT)) and (flags & FXF_READ) and filenode and \
|
||||
not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
|
||||
@ -604,8 +615,8 @@ class GeneralSFTPFile(PrefixingLogMixin):
|
||||
|
||||
def __init__(self, check_abort, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
|
||||
PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
|
||||
if noisy: self.log(".__init__(%r, %r, %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
|
||||
(check_abort, flags, convergence, parent, childname, filenode, metadata), level=NOISY)
|
||||
if noisy: self.log(".__init__(%r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
|
||||
(check_abort, flags, parent, childname, filenode, metadata), level=NOISY)
|
||||
|
||||
self.check_abort = check_abort
|
||||
self.flags = flags
|
||||
@ -643,7 +654,10 @@ class GeneralSFTPFile(PrefixingLogMixin):
|
||||
download_size = filenode.get_size()
|
||||
assert download_size is not None
|
||||
self.consumer = OverwriteableFileConsumer(self.check_abort, download_size, tempfile_maker)
|
||||
self.async.addCallback(lambda ign: filenode.read(self.consumer, 0, None))
|
||||
def _read(ign):
|
||||
if noisy: self.log("_read immutable", level=NOISY)
|
||||
filenode.read(self.consumer, 0, None)
|
||||
self.async.addCallback(_read)
|
||||
|
||||
def readChunk(self, offset, length):
|
||||
self.log(".readChunk(%r, %r)" % (offset, length), level=OPERATIONAL)
|
||||
@ -688,6 +702,8 @@ class GeneralSFTPFile(PrefixingLogMixin):
|
||||
# and an error occurs while flushing cached writes during the close."
|
||||
|
||||
def _write(ign):
|
||||
if noisy: self.log("_write in .writeChunk(%r, <data of length %r>), current_size = %r" %
|
||||
(offset, len(data), self.consumer.get_current_size()), level=NOISY)
|
||||
# FXF_APPEND means that we should always write at the current end of file.
|
||||
write_offset = offset
|
||||
if self.flags & FXF_APPEND:
|
||||
|
@ -23,10 +23,6 @@ if have_pycrypto:
|
||||
from twisted.conch.ssh import filetransfer as sftp
|
||||
from allmydata.frontends import sftpd
|
||||
|
||||
# FIXME remove this
|
||||
#from twisted.internet.base import DelayedCall
|
||||
#DelayedCall.debug = True
|
||||
|
||||
import traceback
|
||||
|
||||
"""
|
||||
@ -649,7 +645,6 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
|
||||
d.addCallback(lambda node: download_to_data(node))
|
||||
d.addCallback(lambda data: self.failUnlessReallyEqual(data, "0123456789"))
|
||||
|
||||
"""
|
||||
# test that writing a zero-length file with EXCL only updates the directory once
|
||||
d.addCallback(lambda ign:
|
||||
self.handler.openFile("zerolength", sftp.FXF_WRITE | sftp.FXF_CREAT |
|
||||
@ -681,15 +676,14 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
|
||||
d2.addCallback(lambda node: download_to_data(node))
|
||||
d2.addCallback(lambda data: self.failUnlessReallyEqual(data, ""))
|
||||
|
||||
d2 = wf.writeChunk(10, "0123456789")
|
||||
d2.addCallback(wf.writeChunk(5, "01234"))
|
||||
d2.addCallback(lambda ign: wf.writeChunk(10, "0123456789"))
|
||||
d2.addCallback(lambda ign: wf.writeChunk(5, "01234"))
|
||||
d2.addCallback(lambda ign: wf.close())
|
||||
return d2
|
||||
d.addCallback(_write_excl_append)
|
||||
d.addCallback(lambda ign: self.root.get(u"exclappend"))
|
||||
d.addCallback(lambda node: download_to_data(node))
|
||||
d.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345678901234"))
|
||||
"""
|
||||
|
||||
# test WRITE | CREAT without TRUNC
|
||||
d.addCallback(lambda ign:
|
||||
@ -719,12 +713,11 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
|
||||
d.addCallback(_check_same_file)
|
||||
d.addCallback(lambda data: self.failUnlessReallyEqual(data, "mutable new! contents"))
|
||||
|
||||
"""
|
||||
# test READ | WRITE without CREAT or TRUNC
|
||||
d.addCallback(lambda ign:
|
||||
self.handler.openFile("small", sftp.FXF_READ | sftp.FXF_WRITE, {}))
|
||||
def _read_write(rwf):
|
||||
d2 = rwf.writeChunk(8, "0123")
|
||||
d2 = rwf.writeChunk(8, "0123")
|
||||
d2.addCallback(lambda ign: rwf.readChunk(0, 100))
|
||||
d2.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345670123"))
|
||||
d2.addCallback(lambda ign: rwf.close())
|
||||
@ -733,7 +726,7 @@ class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
|
||||
d.addCallback(lambda ign: self.root.get(u"small"))
|
||||
d.addCallback(lambda node: download_to_data(node))
|
||||
d.addCallback(lambda data: self.failUnlessReallyEqual(data, "012345670123"))
|
||||
"""
|
||||
|
||||
return d
|
||||
|
||||
def test_removeFile(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user