SFTP: 'sync' any open files at a direntry before opening any new file at that direntry. This works around the sshfs misbehaviour of returning success to clients immediately on close.

This commit is contained in:
david-sarah 2010-05-25 16:02:57 -07:00
parent a143b1297b
commit 0b888f8201

View File

@ -568,7 +568,7 @@ class ShortReadOnlySFTPFile(PrefixingLogMixin):
d = defer.Deferred()
def _read(data):
if noisy: self.log("_read(%r) in readChunk(%r, %r)" % (data, offset, length), level=NOISY)
if noisy: self.log("_read(<data of length %r>) in readChunk(%r, %r)" % (len(data), offset, length), level=NOISY)
# "In response to this request, the server will read as many bytes as it
# can from the file (up to 'len'), and return them in a SSH_FXP_DATA
@ -643,6 +643,7 @@ class GeneralSFTPFile(PrefixingLogMixin):
# Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
self.closed = False
self.added = False
self.removed = False
# self.consumer should only be relied on in callbacks for self.async, since it might
@ -688,6 +689,13 @@ class GeneralSFTPFile(PrefixingLogMixin):
self.removed = True
def sync(self):
self.log(".sync()", level=OPERATIONAL)
d = defer.Deferred()
self.async.addBoth(eventually_callback(d))
return d
def readChunk(self, offset, length):
request = ".readChunk(%r, %r)" % (offset, length)
self.log(request, level=OPERATIONAL)
@ -727,9 +735,9 @@ class GeneralSFTPFile(PrefixingLogMixin):
# Note that we return without waiting for the write to occur. Reads and
# close wait for prior writes, and will fail if any prior operation failed.
# This is ok because SFTP makes no guarantee that the request completes
# before the write. In fact it explicitly allows write errors to be delayed
# until close:
# This is ok because SFTP makes no guarantee that the write completes
# before the request does. In fact it explicitly allows write errors to be
# delayed until close:
# "One should note that on some server platforms even a close can fail.
# This can happen e.g. if the server operating system caches writes,
# and an error occurs while flushing cached writes during the close."
@ -767,10 +775,12 @@ class GeneralSFTPFile(PrefixingLogMixin):
if self.has_changed and not self.removed:
d2.addCallback(lambda ign: self.consumer.when_done())
if self.filenode and self.filenode.is_mutable():
self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
d2.addCallback(lambda ign: self.consumer.get_current_size())
d2.addCallback(lambda size: self.consumer.read(0, size))
d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
else:
self.added = True
def _add_file(ign):
self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL)
u = FileHandle(self.consumer.get_file(), self.convergence)
@ -785,12 +795,14 @@ class GeneralSFTPFile(PrefixingLogMixin):
self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
def _closed(res):
if noisy: self.log("_closed(%r)" % (res,), level=NOISY)
self.close_notify(self.parent, self.childname, self)
# It is possible for there to be a race between adding the file and removing it.
if self.removed:
if self.added and self.removed:
self.log("oops, we added %r but must now remove it" % (self.childname,), level=OPERATIONAL)
d2 = self.parent.delete(self.childname)
d2.addBoth(_convert_error, request) # just for logging
d2.addBoth(lambda ign: res)
return d2
return res
@ -899,6 +911,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
}
def _add_open_files(self, direntry, files_to_add):
if noisy: self.log("._add_open_files(%r, %r)" % (direntry, files_to_add), level=NOISY)
if direntry:
if direntry in self._open_files:
self._open_files[direntry] += files_to_add
@ -912,23 +926,34 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
all_open_files[direntry] = (files_to_add, time())
def _remove_any_open_files(self, direntry):
removed = False
if noisy: self.log("._remove_any_open_files(%r)" % (direntry,), level=NOISY)
if direntry in self._open_files:
for f in self._open_files[direntry]:
f.remove()
del self._open_files[direntry]
removed = True
if direntry in all_open_files:
(files, opentime) = all_open_files[direntry]
for f in files:
f.remove()
del all_open_files[direntry]
removed = True
return True
return removed
return False
def _sync_open_files(self, direntry):
if noisy: self.log("._sync_open_files(%r)" % (direntry,), level=NOISY)
d = defer.succeed(None)
if direntry in all_open_files:
(files, opentime) = all_open_files[direntry]
for f in files:
d.addCallback(lambda ign: f.sync())
return d
def _close_notify(self, parent, childname, file_to_remove):
if noisy: self.log("._close_notify(%r, %r, %r)" % (parent, childname, file_to_remove), level=NOISY)
direntry = self._direntry_for(parent, childname)
if direntry in self._open_files:
old_files = self._open_files[direntry]
@ -950,6 +975,9 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
"""When an direntry is renamed, any open files for that direntry are also renamed.
Return True if there were any open files at from_direntry."""
if noisy: self.log("._rename_open_files(%r, %r, %r, %r)" %
(from_parent, from_childname, to_parent, to_childname), level=NOISY)
from_direntry = self._direntry_for(from_parent, from_childname)
to_direntry = self._direntry_for(to_parent, to_childname)
@ -964,11 +992,13 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
else:
return False
def _direntry_for(self, parent, childname):
if parent and childname:
rw_uri = parent.get_write_uri()
if rw_uri:
def _direntry_for(self, filenode_or_parent, childname=None):
if filenode_or_parent:
rw_uri = filenode_or_parent.get_write_uri()
if rw_uri and childname:
return rw_uri + "/" + childname.encode('utf-8')
else:
return rw_uri
return None
@ -987,17 +1017,24 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
assert metadata is None or 'readonly' in metadata, metadata
writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
return ShortReadOnlySFTPFile(filenode, metadata)
if childname:
direntry = self._direntry_for(parent, childname)
else:
direntry = None
if writing:
direntry = self._direntry_for(parent, childname)
direntry = self._direntry_for(filenode)
file = GeneralSFTPFile(self._close_notify, flags, self._convergence,
parent=parent, childname=childname, filenode=filenode, metadata=metadata)
self._add_open_files(direntry, [file])
return file
d = self._sync_open_files(direntry)
if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
d.addCallback(lambda ign: ShortReadOnlySFTPFile(filenode, metadata))
else:
d.addCallback(lambda ign: GeneralSFTPFile(self._close_notify, flags, self._convergence,
parent=parent, childname=childname, filenode=filenode, metadata=metadata))
def _add_to_open(file):
if writing:
self._add_open_files(direntry, [file])
return file
d.addCallback(_add_to_open)
return d
def openFile(self, pathstring, flags, attrs):
request = ".openFile(%r, %r = %r, %r)" % (pathstring, flags, _repr_flags(flags), attrs)
@ -1006,10 +1043,6 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
# This is used for both reading and writing.
# First exclude invalid combinations of flags.
# /usr/bin/sftp 'get' gives us FXF_READ, while 'put' on a new file
# gives FXF_WRITE | FXF_CREAT | FXF_TRUNC. I'm guessing that 'put' on an
# existing file gives the same.
if not (flags & (FXF_READ | FXF_WRITE)):
raise SFTPError(FX_BAD_MESSAGE,
"invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
@ -1040,9 +1073,14 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
# d. the child is immutable: if we are trying to open it write-only or
# read/write, then we must be able to write to the parent directory.
#
# To reduce latency, open succeeds as soon as these conditions are met, even
# though there might be a failure in downloading the existing file or uploading
# a new one.
# To reduce latency, open normally succeeds as soon as these conditions are
# met, even though there might be a failure in downloading the existing file
# or uploading a new one. However, there is an exception: if a file has been
# written, then closed, and is now being reopened, then we have to delay the
# open until the previous upload/publish has completed. This is necessary
# because sshfs does not wait for the result of an FXF_CLOSE message before
# reporting to the client that a file has been closed. It applies both to
# mutable files, and to directory entries linked to an immutable file.
#
# Note that the permission checks below are for more precise error reporting on
# the open call; later operations would fail even if we did not make these checks.
@ -1268,7 +1306,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
if childname is None:
raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
removed = self._remove_any_open_files(self._direntry_for(parent, childname))
direntry = self._direntry_for(parent, childname)
removed = self._remove_any_open_files(direntry)
d2 = parent.get(childname)
def _got_child(child):
@ -1281,6 +1320,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
def _no_child(err):
if removed and err.check(NoSuchChildError):
# suppress NoSuchChildError if an open file was removed
if noisy: self.log("suppressing NoSuchChildError for %r because it was removed as an open file" %
(direntry,), level=NOISY)
return None
else:
return err
@ -1339,6 +1380,11 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
# reported as the update time of the best version. But that
# information isn't currently stored in mutable shares, I think.
# TODO: some clients will incorrectly try to get the attributes
# of a file immediately after opening it, before it has been put
# into the all_open_files table. This is a race condition bug in
# the client, but we probably need to handle it anyway.
path = self._path_from_string(pathstring)
d = self._get_parent_or_node(path)
def _got_parent_or_node( (parent_or_node, childname) ):