SFTP: avoid blocking close on a heisenfile that has been abandoned or never changed. Also, improve the logging to help track down a case where OpenOffice hangs on opening a file with FXF_READ|FXF_WRITE.

This commit is contained in:
david-sarah 2010-05-29 19:55:44 -07:00
parent 8ed28a95e7
commit 4be24a89df

View File

@ -526,10 +526,11 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
def close(self): def close(self):
self.is_closed = True self.is_closed = True
self.finish() self.finish()
try: if not self.is_closed:
self.f.close() try:
except EnvironmentError as e: self.f.close()
self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD) except BaseException as e:
self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
def unregisterProducer(self): def unregisterProducer(self):
if self.producer: if self.producer:
@ -798,37 +799,48 @@ class GeneralSFTPFile(PrefixingLogMixin):
parent = self.parent parent = self.parent
childname = self.childname childname = self.childname
# has_changed is set when writeChunk is called, not when the write occurs, so
# it is correct to optimize out the commit if it is False at the close call.
has_changed = self.has_changed
def _committed(res):
if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
self.consumer.close()
# We must close_notify before re-firing self.async.
if self.close_notify:
self.close_notify(self.userpath, self.parent, self.childname, self)
return res
def _close(ign): def _close(ign):
d2 = defer.succeed(None) d2 = self.consumer.when_done()
if self.has_changed and not abandoned: if self.filenode and self.filenode.is_mutable():
d2.addCallback(lambda ign: self.consumer.when_done()) self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
if self.filenode and self.filenode.is_mutable(): d2.addCallback(lambda ign: self.consumer.get_current_size())
self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL) d2.addCallback(lambda size: self.consumer.read(0, size))
d2.addCallback(lambda ign: self.consumer.get_current_size()) d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
d2.addCallback(lambda size: self.consumer.read(0, size)) else:
d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents)) def _add_file(ign):
else: self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
def _add_file(ign): u = FileHandle(self.consumer.get_file(), self.convergence)
self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL) return parent.add_file(childname, u)
u = FileHandle(self.consumer.get_file(), self.convergence) d2.addCallback(_add_file)
return parent.add_file(childname, u)
d2.addCallback(_add_file)
def _committed(res):
if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
self.consumer.close()
# We must close_notify before re-firing self.async.
if self.close_notify:
self.close_notify(self.userpath, self.parent, self.childname, self)
return res
d2.addBoth(_committed) d2.addBoth(_committed)
return d2 return d2
self.async.addCallback(_close)
d = defer.Deferred() d = defer.Deferred()
# If the file has been abandoned, we don't want the close operation to get "stuck",
# even if self.async fails to re-fire. Doing the close independently of self.async
# in that case ensures that dropping an ssh connection is sufficient to abandon
# any heisenfiles that were not explicitly closed in that connection.
if abandoned or not has_changed:
d.addCallback(_committed)
else:
self.async.addCallback(_close)
self.async.addCallbacks(eventually_callback(d), eventually_errback(d)) self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
d.addBoth(_convert_error, request) d.addBoth(_convert_error, request)
return d return d
@ -1064,7 +1076,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
return d return d
def _sync_heisenfiles(self, userpath, direntry, ignore=None): def _sync_heisenfiles(self, userpath, direntry, ignore=None):
if noisy: self.log("._sync_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY) request = "._sync_heisenfiles(%r, %r, ignore=%r)" % (userpath, direntry, ignore)
self.log(request, level=OPERATIONAL)
files = [] files = []
if direntry in all_heisenfiles: if direntry in all_heisenfiles:
@ -1075,7 +1088,12 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
d = defer.succeed(None) d = defer.succeed(None)
for f in files: for f in files:
if f is not ignore: if f is not ignore:
d.addCallback(lambda ign: f.sync()) d.addBoth(lambda ign: f.sync())
def _done(ign):
self.log("done %r" % (request,), level=OPERATIONAL)
return None
d.addBoth(_done)
return d return d
def _remove_heisenfile(self, userpath, parent, childname, file_to_remove): def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):