mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-05-07 19:38:16 +00:00
SFTP: cater to clients that assume a file is created as soon as they have made an open request; also, fix some race conditions associated with closing a file at about the same time as renaming or removing it.
This commit is contained in:
parent
0b888f8201
commit
e867985539
@ -27,7 +27,7 @@ from allmydata.util import deferredutil
|
|||||||
|
|
||||||
from allmydata.util.consumer import download_to_data
|
from allmydata.util.consumer import download_to_data
|
||||||
from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
|
from allmydata.interfaces import IFileNode, IDirectoryNode, ExistingChildError, \
|
||||||
NoSuchChildError
|
NoSuchChildError, ChildOfWrongTypeError
|
||||||
from allmydata.mutable.common import NotWriteableError
|
from allmydata.mutable.common import NotWriteableError
|
||||||
from allmydata.immutable.upload import FileHandle
|
from allmydata.immutable.upload import FileHandle
|
||||||
|
|
||||||
@ -43,7 +43,7 @@ warnings.filterwarnings("ignore", category=DeprecationWarning,
|
|||||||
noisy = True
|
noisy = True
|
||||||
use_foolscap_logging = True
|
use_foolscap_logging = True
|
||||||
|
|
||||||
from allmydata.util.log import NOISY, OPERATIONAL, \
|
from allmydata.util.log import NOISY, OPERATIONAL, WEIRD, \
|
||||||
msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
|
msg as _msg, err as _err, PrefixingLogMixin as _PrefixingLogMixin
|
||||||
|
|
||||||
if use_foolscap_logging:
|
if use_foolscap_logging:
|
||||||
@ -54,10 +54,10 @@ else: # pragma: no cover
|
|||||||
def logerr(s, level=None):
|
def logerr(s, level=None):
|
||||||
print s
|
print s
|
||||||
class PrefixingLogMixin:
|
class PrefixingLogMixin:
|
||||||
def __init__(self, facility=None):
|
def __init__(self, facility=None, prefix=''):
|
||||||
pass
|
self.prefix = prefix
|
||||||
def log(self, s, level=None):
|
def log(self, s, level=None):
|
||||||
print s
|
print "%r %s" % (self.prefix, s)
|
||||||
|
|
||||||
|
|
||||||
def eventually_callback(d):
|
def eventually_callback(d):
|
||||||
@ -105,7 +105,7 @@ def _convert_error(res, request):
|
|||||||
if err.check(NoSuchChildError):
|
if err.check(NoSuchChildError):
|
||||||
childname = _utf8(err.value.args[0])
|
childname = _utf8(err.value.args[0])
|
||||||
raise SFTPError(FX_NO_SUCH_FILE, childname)
|
raise SFTPError(FX_NO_SUCH_FILE, childname)
|
||||||
if err.check(NotWriteableError):
|
if err.check(NotWriteableError) or err.check(ChildOfWrongTypeError):
|
||||||
msg = _utf8(err.value.args[0])
|
msg = _utf8(err.value.args[0])
|
||||||
raise SFTPError(FX_PERMISSION_DENIED, msg)
|
raise SFTPError(FX_PERMISSION_DENIED, msg)
|
||||||
if err.check(ExistingChildError):
|
if err.check(ExistingChildError):
|
||||||
@ -160,12 +160,7 @@ def _lsLine(name, attrs):
|
|||||||
perms = array.array('c', '-'*10)
|
perms = array.array('c', '-'*10)
|
||||||
ft = stat.S_IFMT(mode)
|
ft = stat.S_IFMT(mode)
|
||||||
if stat.S_ISDIR(ft): perms[0] = 'd'
|
if stat.S_ISDIR(ft): perms[0] = 'd'
|
||||||
elif stat.S_ISCHR(ft): perms[0] = 'c'
|
|
||||||
elif stat.S_ISBLK(ft): perms[0] = 'b'
|
|
||||||
elif stat.S_ISREG(ft): perms[0] = '-'
|
elif stat.S_ISREG(ft): perms[0] = '-'
|
||||||
elif stat.S_ISFIFO(ft): perms[0] = 'f'
|
|
||||||
elif stat.S_ISLNK(ft): perms[0] = 'l'
|
|
||||||
elif stat.S_ISSOCK(ft): perms[0] = 's'
|
|
||||||
else: perms[0] = '?'
|
else: perms[0] = '?'
|
||||||
# user
|
# user
|
||||||
if mode&stat.S_IRUSR: perms[1] = 'r'
|
if mode&stat.S_IRUSR: perms[1] = 'r'
|
||||||
@ -277,7 +272,7 @@ class EncryptedTemporaryFile(PrefixingLogMixin):
|
|||||||
self.key = os.urandom(16) # AES-128
|
self.key = os.urandom(16) # AES-128
|
||||||
|
|
||||||
def _crypt(self, offset, data):
|
def _crypt(self, offset, data):
|
||||||
# FIXME: use random-access AES (pycryptopp ticket #18)
|
# TODO: use random-access AES (pycryptopp ticket #18)
|
||||||
offset_big = offset // 16
|
offset_big = offset // 16
|
||||||
offset_small = offset % 16
|
offset_small = offset % 16
|
||||||
iv = binascii.unhexlify("%032x" % offset_big)
|
iv = binascii.unhexlify("%032x" % offset_big)
|
||||||
@ -329,7 +324,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
|||||||
|
|
||||||
The temporary file reflects the contents of the file that I represent, except that:
|
The temporary file reflects the contents of the file that I represent, except that:
|
||||||
- regions that have neither been downloaded nor overwritten, if present,
|
- regions that have neither been downloaded nor overwritten, if present,
|
||||||
contain zeroes.
|
contain garbage.
|
||||||
- the temporary file may be shorter than the represented file (it is never longer).
|
- the temporary file may be shorter than the represented file (it is never longer).
|
||||||
The latter's current size is stored in self.current_size.
|
The latter's current size is stored in self.current_size.
|
||||||
|
|
||||||
@ -365,7 +360,11 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
|||||||
(size, self.current_size, self.downloaded), level=NOISY)
|
(size, self.current_size, self.downloaded), level=NOISY)
|
||||||
if size < self.current_size or size < self.downloaded:
|
if size < self.current_size or size < self.downloaded:
|
||||||
self.f.truncate(size)
|
self.f.truncate(size)
|
||||||
|
if size > self.current_size:
|
||||||
|
self.overwrite(self.current_size, "\x00" * (size - self.current_size))
|
||||||
self.current_size = size
|
self.current_size = size
|
||||||
|
|
||||||
|
# invariant: self.download_size <= self.current_size
|
||||||
if size < self.download_size:
|
if size < self.download_size:
|
||||||
self.download_size = size
|
self.download_size = size
|
||||||
if self.downloaded >= self.download_size:
|
if self.downloaded >= self.download_size:
|
||||||
@ -453,7 +452,7 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
|||||||
|
|
||||||
def overwrite(self, offset, data):
|
def overwrite(self, offset, data):
|
||||||
if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
|
if noisy: self.log(".overwrite(%r, <data of length %r>)" % (offset, len(data)), level=NOISY)
|
||||||
if offset > self.download_size and offset > self.current_size:
|
if offset > self.current_size:
|
||||||
# Normally writing at an offset beyond the current end-of-file
|
# Normally writing at an offset beyond the current end-of-file
|
||||||
# would leave a hole that appears filled with zeroes. However, an
|
# would leave a hole that appears filled with zeroes. However, an
|
||||||
# EncryptedTemporaryFile doesn't behave like that (if there is a
|
# EncryptedTemporaryFile doesn't behave like that (if there is a
|
||||||
@ -463,13 +462,16 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
|||||||
|
|
||||||
self.f.seek(self.current_size)
|
self.f.seek(self.current_size)
|
||||||
self.f.write("\x00" * (offset - self.current_size))
|
self.f.write("\x00" * (offset - self.current_size))
|
||||||
|
start = self.current_size
|
||||||
else:
|
else:
|
||||||
self.f.seek(offset)
|
self.f.seek(offset)
|
||||||
|
start = offset
|
||||||
|
|
||||||
self.f.write(data)
|
self.f.write(data)
|
||||||
end = offset + len(data)
|
end = offset + len(data)
|
||||||
self.current_size = max(self.current_size, end)
|
self.current_size = max(self.current_size, end)
|
||||||
if end > self.downloaded:
|
if end > self.downloaded:
|
||||||
heapq.heappush(self.overwrites, (offset, end))
|
heapq.heappush(self.overwrites, (start, end))
|
||||||
|
|
||||||
def read(self, offset, length):
|
def read(self, offset, length):
|
||||||
"""When the data has been read, callback the Deferred that we return with this data.
|
"""When the data has been read, callback the Deferred that we return with this data.
|
||||||
@ -531,7 +533,10 @@ class OverwriteableFileConsumer(PrefixingLogMixin):
|
|||||||
def close(self):
|
def close(self):
|
||||||
self.is_closed = True
|
self.is_closed = True
|
||||||
self.finish()
|
self.finish()
|
||||||
|
try:
|
||||||
self.f.close()
|
self.f.close()
|
||||||
|
except EnvironmentError as e:
|
||||||
|
self.log("suppressed %r from close of temporary file %r" % (e, self.f), level=WEIRD)
|
||||||
|
|
||||||
def unregisterProducer(self):
|
def unregisterProducer(self):
|
||||||
if self.producer:
|
if self.producer:
|
||||||
@ -548,9 +553,9 @@ class ShortReadOnlySFTPFile(PrefixingLogMixin):
|
|||||||
I am used only for short immutable files opened in read-only mode.
|
I am used only for short immutable files opened in read-only mode.
|
||||||
The file contents are downloaded to memory when I am created."""
|
The file contents are downloaded to memory when I am created."""
|
||||||
|
|
||||||
def __init__(self, filenode, metadata):
|
def __init__(self, userpath, filenode, metadata):
|
||||||
PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
|
PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
|
||||||
if noisy: self.log(".__init__(%r, %r)" % (filenode, metadata), level=NOISY)
|
if noisy: self.log(".__init__(%r, %r, %r)" % (userpath, filenode, metadata), level=NOISY)
|
||||||
|
|
||||||
assert IFileNode.providedBy(filenode), filenode
|
assert IFileNode.providedBy(filenode), filenode
|
||||||
self.filenode = filenode
|
self.filenode = filenode
|
||||||
@ -625,33 +630,50 @@ class GeneralSFTPFile(PrefixingLogMixin):
|
|||||||
storing the file contents. In order to allow write requests to be satisfied
|
storing the file contents. In order to allow write requests to be satisfied
|
||||||
immediately, there is effectively a FIFO queue between requests made to this
|
immediately, there is effectively a FIFO queue between requests made to this
|
||||||
file handle, and requests to my OverwriteableFileConsumer. This queue is
|
file handle, and requests to my OverwriteableFileConsumer. This queue is
|
||||||
implemented by the callback chain of self.async."""
|
implemented by the callback chain of self.async.
|
||||||
|
|
||||||
def __init__(self, close_notify, flags, convergence, parent=None, childname=None, filenode=None, metadata=None):
|
When first constructed, I am in an 'unopened' state that causes most
|
||||||
PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
|
operations to be delayed until 'open' is called."""
|
||||||
if noisy: self.log(".__init__(%r, %r, <convergence censored>, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
|
|
||||||
(close_notify, flags, parent, childname, filenode, metadata), level=NOISY)
|
|
||||||
|
|
||||||
self.close_notify = close_notify
|
def __init__(self, userpath, flags, close_notify, convergence):
|
||||||
|
PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=userpath)
|
||||||
|
if noisy: self.log(".__init__(%r, %r = %r, %r, <convergence censored>)" %
|
||||||
|
(userpath, flags, _repr_flags(flags), close_notify), level=NOISY)
|
||||||
|
|
||||||
|
self.userpath = userpath
|
||||||
self.flags = flags
|
self.flags = flags
|
||||||
|
self.close_notify = close_notify
|
||||||
self.convergence = convergence
|
self.convergence = convergence
|
||||||
self.parent = parent
|
self.async = defer.Deferred()
|
||||||
self.childname = childname
|
|
||||||
self.filenode = filenode
|
|
||||||
self.metadata = metadata
|
|
||||||
self.async = defer.succeed(None)
|
|
||||||
# Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
|
# Creating or truncating the file is a change, but if FXF_EXCL is set, a zero-length file has already been created.
|
||||||
self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
|
self.has_changed = (flags & (FXF_CREAT | FXF_TRUNC)) and not (flags & FXF_EXCL)
|
||||||
self.closed = False
|
self.closed = False
|
||||||
self.added = False
|
self.abandoned = False
|
||||||
self.removed = False
|
self.parent = None
|
||||||
|
self.childname = None
|
||||||
|
self.filenode = None
|
||||||
|
self.metadata = None
|
||||||
|
|
||||||
# self.consumer should only be relied on in callbacks for self.async, since it might
|
# self.consumer should only be relied on in callbacks for self.async, since it might
|
||||||
# not be set before then.
|
# not be set before then.
|
||||||
self.consumer = None
|
self.consumer = None
|
||||||
|
|
||||||
|
def open(self, parent=None, childname=None, filenode=None, metadata=None):
|
||||||
|
self.log(".open(parent=%r, childname=%r, filenode=%r, metadata=%r)" %
|
||||||
|
(parent, childname, filenode, metadata), level=OPERATIONAL)
|
||||||
|
|
||||||
|
# If the file has been renamed, the new (parent, childname) takes precedence.
|
||||||
|
if self.parent is None:
|
||||||
|
self.parent = parent
|
||||||
|
if self.childname is None:
|
||||||
|
self.childname = childname
|
||||||
|
self.filenode = filenode
|
||||||
|
self.metadata = metadata
|
||||||
|
|
||||||
|
if not self.closed:
|
||||||
tempfile_maker = EncryptedTemporaryFile
|
tempfile_maker = EncryptedTemporaryFile
|
||||||
|
|
||||||
if (flags & FXF_TRUNC) or not filenode:
|
if (self.flags & FXF_TRUNC) or not filenode:
|
||||||
# We're either truncating or creating the file, so we don't need the old contents.
|
# We're either truncating or creating the file, so we don't need the old contents.
|
||||||
self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
|
self.consumer = OverwriteableFileConsumer(0, tempfile_maker)
|
||||||
self.consumer.finish()
|
self.consumer.finish()
|
||||||
@ -676,18 +698,22 @@ class GeneralSFTPFile(PrefixingLogMixin):
|
|||||||
filenode.read(self.consumer, 0, None)
|
filenode.read(self.consumer, 0, None)
|
||||||
self.async.addCallback(_read)
|
self.async.addCallback(_read)
|
||||||
|
|
||||||
if noisy: self.log("__init__ done", level=NOISY)
|
eventually_callback(self.async)(None)
|
||||||
|
|
||||||
def rename(self, new_parent, new_childname):
|
if noisy: self.log("open done", level=NOISY)
|
||||||
self.log(".rename(%r, %r)" % (new_parent, new_childname), level=OPERATIONAL)
|
return self
|
||||||
|
|
||||||
|
def rename(self, new_userpath, new_parent, new_childname):
|
||||||
|
self.log(".rename(%r, %r, %r)" % (new_userpath, new_parent, new_childname), level=OPERATIONAL)
|
||||||
|
|
||||||
|
self.userpath = new_userpath
|
||||||
self.parent = new_parent
|
self.parent = new_parent
|
||||||
self.childname = new_childname
|
self.childname = new_childname
|
||||||
|
|
||||||
def remove(self):
|
def abandon(self):
|
||||||
self.log(".remove()", level=OPERATIONAL)
|
self.log(".abandon()", level=OPERATIONAL)
|
||||||
|
|
||||||
self.removed = True
|
self.abandoned = True
|
||||||
|
|
||||||
def sync(self):
|
def sync(self):
|
||||||
self.log(".sync()", level=OPERATIONAL)
|
self.log(".sync()", level=OPERATIONAL)
|
||||||
@ -712,7 +738,6 @@ class GeneralSFTPFile(PrefixingLogMixin):
|
|||||||
def _read(ign):
|
def _read(ign):
|
||||||
if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
|
if noisy: self.log("_read in readChunk(%r, %r)" % (offset, length), level=NOISY)
|
||||||
d2 = self.consumer.read(offset, length)
|
d2 = self.consumer.read(offset, length)
|
||||||
d2.addErrback(_convert_error, request)
|
|
||||||
d2.addCallbacks(eventually_callback(d), eventually_errback(d))
|
d2.addCallbacks(eventually_callback(d), eventually_errback(d))
|
||||||
# It is correct to drop d2 here.
|
# It is correct to drop d2 here.
|
||||||
return None
|
return None
|
||||||
@ -768,11 +793,21 @@ class GeneralSFTPFile(PrefixingLogMixin):
|
|||||||
self.closed = True
|
self.closed = True
|
||||||
|
|
||||||
if not (self.flags & (FXF_WRITE | FXF_CREAT)):
|
if not (self.flags & (FXF_WRITE | FXF_CREAT)):
|
||||||
return defer.execute(self.consumer.close)
|
def _readonly_close():
|
||||||
|
if self.consumer:
|
||||||
|
self.consumer.close()
|
||||||
|
return defer.execute(_readonly_close)
|
||||||
|
|
||||||
|
# We must capture the abandoned, parent, and childname variables synchronously
|
||||||
|
# at the close call. This is needed by the correctness arguments in the comments
|
||||||
|
# for _abandon_any_heisenfiles and _rename_heisenfiles.
|
||||||
|
abandoned = self.abandoned
|
||||||
|
parent = self.parent
|
||||||
|
childname = self.childname
|
||||||
|
|
||||||
def _close(ign):
|
def _close(ign):
|
||||||
d2 = defer.succeed(None)
|
d2 = defer.succeed(None)
|
||||||
if self.has_changed and not self.removed:
|
if self.has_changed and not abandoned:
|
||||||
d2.addCallback(lambda ign: self.consumer.when_done())
|
d2.addCallback(lambda ign: self.consumer.when_done())
|
||||||
if self.filenode and self.filenode.is_mutable():
|
if self.filenode and self.filenode.is_mutable():
|
||||||
self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
|
self.log("update mutable file %r childname=%r" % (self.filenode, self.childname,), level=OPERATIONAL)
|
||||||
@ -780,33 +815,28 @@ class GeneralSFTPFile(PrefixingLogMixin):
|
|||||||
d2.addCallback(lambda size: self.consumer.read(0, size))
|
d2.addCallback(lambda size: self.consumer.read(0, size))
|
||||||
d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
|
d2.addCallback(lambda new_contents: self.filenode.overwrite(new_contents))
|
||||||
else:
|
else:
|
||||||
self.added = True
|
|
||||||
def _add_file(ign):
|
def _add_file(ign):
|
||||||
self.log("_add_file childname=%r" % (self.childname,), level=OPERATIONAL)
|
self.log("_add_file childname=%r" % (childname,), level=OPERATIONAL)
|
||||||
u = FileHandle(self.consumer.get_file(), self.convergence)
|
u = FileHandle(self.consumer.get_file(), self.convergence)
|
||||||
return self.parent.add_file(self.childname, u)
|
return parent.add_file(childname, u)
|
||||||
d2.addCallback(_add_file)
|
d2.addCallback(_add_file)
|
||||||
|
|
||||||
d2.addCallback(lambda ign: self.consumer.close())
|
def _committed(res):
|
||||||
|
if noisy: self.log("_committed(%r)" % (res,), level=NOISY)
|
||||||
|
|
||||||
|
self.consumer.close()
|
||||||
|
|
||||||
|
# We must close_notify before re-firing self.async.
|
||||||
|
if self.close_notify:
|
||||||
|
self.close_notify(self.userpath, self.parent, self.childname, self)
|
||||||
|
return res
|
||||||
|
d2.addBoth(_committed)
|
||||||
return d2
|
return d2
|
||||||
|
|
||||||
self.async.addCallback(_close)
|
self.async.addCallback(_close)
|
||||||
|
|
||||||
d = defer.Deferred()
|
d = defer.Deferred()
|
||||||
self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
|
self.async.addCallbacks(eventually_callback(d), eventually_errback(d))
|
||||||
|
|
||||||
def _closed(res):
|
|
||||||
if noisy: self.log("_closed(%r)" % (res,), level=NOISY)
|
|
||||||
self.close_notify(self.parent, self.childname, self)
|
|
||||||
|
|
||||||
# It is possible for there to be a race between adding the file and removing it.
|
|
||||||
if self.added and self.removed:
|
|
||||||
self.log("oops, we added %r but must now remove it" % (self.childname,), level=OPERATIONAL)
|
|
||||||
d2 = self.parent.delete(self.childname)
|
|
||||||
d2.addBoth(_convert_error, request) # just for logging
|
|
||||||
d2.addBoth(lambda ign: res)
|
|
||||||
return d2
|
|
||||||
return res
|
|
||||||
d.addBoth(_closed)
|
|
||||||
d.addBoth(_convert_error, request)
|
d.addBoth(_convert_error, request)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
@ -877,18 +907,22 @@ class Reason:
|
|||||||
self.value = value
|
self.value = value
|
||||||
|
|
||||||
|
|
||||||
# For each immutable file that has been opened with write flags
|
# A "heisenfile" is a file that has been opened with write flags
|
||||||
# (FXF_WRITE and/or FXF_CREAT) and is still open, this maps from
|
# (FXF_WRITE and/or FXF_CREAT) and not yet close-notified.
|
||||||
# parent_write_uri + "/" + childname_utf8, to (list_of_ISFTPFile, open_time_utc).
|
# 'all_heisenfiles' maps from a direntry string to
|
||||||
|
# (list_of_GeneralSFTPFile, open_time_utc).
|
||||||
|
# A direntry string is parent_write_uri + "/" + childname_utf8 for
|
||||||
|
# an immutable file, or file_write_uri for a mutable file.
|
||||||
# Updates to this dict are single-threaded.
|
# Updates to this dict are single-threaded.
|
||||||
|
|
||||||
all_open_files = {}
|
all_heisenfiles = {}
|
||||||
|
|
||||||
|
|
||||||
class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
||||||
implements(ISFTPServer)
|
implements(ISFTPServer)
|
||||||
def __init__(self, client, rootnode, username):
|
def __init__(self, client, rootnode, username):
|
||||||
ConchUser.__init__(self)
|
ConchUser.__init__(self)
|
||||||
PrefixingLogMixin.__init__(self, facility="tahoe.sftp")
|
PrefixingLogMixin.__init__(self, facility="tahoe.sftp", prefix=username)
|
||||||
if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
|
if noisy: self.log(".__init__(%r, %r, %r)" % (client, rootnode, username), level=NOISY)
|
||||||
|
|
||||||
self.channelLookup["session"] = session.SSHSession
|
self.channelLookup["session"] = session.SSHSession
|
||||||
@ -898,7 +932,9 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
self._root = rootnode
|
self._root = rootnode
|
||||||
self._username = username
|
self._username = username
|
||||||
self._convergence = client.convergence
|
self._convergence = client.convergence
|
||||||
self._open_files = {} # files created by this user handler and still open
|
|
||||||
|
# maps from UTF-8 paths for this user, to files written and still open
|
||||||
|
self._heisenfiles = {}
|
||||||
|
|
||||||
def gotVersion(self, otherVersion, extData):
|
def gotVersion(self, otherVersion, extData):
|
||||||
self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
|
self.log(".gotVersion(%r, %r)" % (otherVersion, extData), level=OPERATIONAL)
|
||||||
@ -910,87 +946,164 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
'fstatvfs@openssh.com': '2',
|
'fstatvfs@openssh.com': '2',
|
||||||
}
|
}
|
||||||
|
|
||||||
def _add_open_files(self, direntry, files_to_add):
|
def logout(self):
|
||||||
if noisy: self.log("._add_open_files(%r, %r)" % (direntry, files_to_add), level=NOISY)
|
self.log(".logout()", level=OPERATIONAL)
|
||||||
|
|
||||||
|
for files in self._heisenfiles.itervalues():
|
||||||
|
for f in files:
|
||||||
|
f.abandon()
|
||||||
|
|
||||||
|
def _add_heisenfiles_by_path(self, userpath, files):
|
||||||
|
if noisy: self.log("._add_heisenfiles_by_path(%r, %r)" % (userpath, files), level=NOISY)
|
||||||
|
|
||||||
|
if userpath in self._heisenfiles:
|
||||||
|
self._heisenfiles[userpath] += files
|
||||||
|
else:
|
||||||
|
self._heisenfiles[userpath] = files
|
||||||
|
|
||||||
|
def _add_heisenfiles_by_direntry(self, direntry, files_to_add):
|
||||||
|
if noisy: self.log("._add_heisenfiles_by_direntry(%r, %r)" % (direntry, files_to_add), level=NOISY)
|
||||||
|
|
||||||
if direntry:
|
if direntry:
|
||||||
if direntry in self._open_files:
|
if direntry in all_heisenfiles:
|
||||||
self._open_files[direntry] += files_to_add
|
(old_files, opentime) = all_heisenfiles[direntry]
|
||||||
|
all_heisenfiles[direntry] = (old_files + files_to_add, opentime)
|
||||||
else:
|
else:
|
||||||
self._open_files[direntry] = files_to_add
|
all_heisenfiles[direntry] = (files_to_add, time())
|
||||||
|
|
||||||
if direntry in all_open_files:
|
def _abandon_any_heisenfiles(self, userpath, direntry):
|
||||||
(old_files, opentime) = all_open_files[direntry]
|
if noisy: self.log("._abandon_any_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY)
|
||||||
all_open_files[direntry] = (old_files + files_to_add, opentime)
|
|
||||||
else:
|
|
||||||
all_open_files[direntry] = (files_to_add, time())
|
|
||||||
|
|
||||||
def _remove_any_open_files(self, direntry):
|
# First we synchronously mark all heisenfiles matching the userpath or direntry
|
||||||
if noisy: self.log("._remove_any_open_files(%r)" % (direntry,), level=NOISY)
|
# as abandoned, and remove them from the two heisenfile dicts. Then we .sync()
|
||||||
|
# each file that we abandoned.
|
||||||
|
#
|
||||||
|
# For each file, the call to .abandon() occurs:
|
||||||
|
# * before the file is closed, in which case it will never be committed
|
||||||
|
# (uploaded+linked or published); or
|
||||||
|
# * after it is closed but before it has been close_notified, in which case the
|
||||||
|
# .sync() ensures that it has been committed (successfully or not) before we
|
||||||
|
# return.
|
||||||
|
#
|
||||||
|
# This avoids a race that might otherwise cause the file to be committed after
|
||||||
|
# the remove operation has completed.
|
||||||
|
#
|
||||||
|
# We return a Deferred that fires with True if any files were abandoned (this
|
||||||
|
# does not mean that they were not committed; it is used to determine whether
|
||||||
|
# a NoSuchChildError from the attempt to delete the file should be suppressed).
|
||||||
|
|
||||||
if direntry in self._open_files:
|
files = []
|
||||||
del self._open_files[direntry]
|
if direntry in all_heisenfiles:
|
||||||
|
(files, opentime) = all_heisenfiles[direntry]
|
||||||
|
del all_heisenfiles[direntry]
|
||||||
|
if userpath in self._heisenfiles:
|
||||||
|
files += self._heisenfiles[userpath]
|
||||||
|
del self._heisenfiles[userpath]
|
||||||
|
|
||||||
if direntry in all_open_files:
|
|
||||||
(files, opentime) = all_open_files[direntry]
|
|
||||||
for f in files:
|
for f in files:
|
||||||
f.remove()
|
f.abandon()
|
||||||
del all_open_files[direntry]
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def _sync_open_files(self, direntry):
|
|
||||||
if noisy: self.log("._sync_open_files(%r)" % (direntry,), level=NOISY)
|
|
||||||
|
|
||||||
d = defer.succeed(None)
|
d = defer.succeed(None)
|
||||||
if direntry in all_open_files:
|
|
||||||
(files, opentime) = all_open_files[direntry]
|
|
||||||
for f in files:
|
for f in files:
|
||||||
d.addCallback(lambda ign: f.sync())
|
d.addBoth(lambda ign: f.sync())
|
||||||
|
|
||||||
|
d.addBoth(lambda ign: len(files) > 0)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _close_notify(self, parent, childname, file_to_remove):
|
def _rename_heisenfiles(self, from_userpath, from_parent, from_childname,
|
||||||
if noisy: self.log("._close_notify(%r, %r, %r)" % (parent, childname, file_to_remove), level=NOISY)
|
to_userpath, to_parent, to_childname, overwrite=True):
|
||||||
|
if noisy: self.log("._rename_heisenfiles(%r, %r, %r, %r, %r, %r, overwrite=%r)" %
|
||||||
|
(from_userpath, from_parent, from_childname,
|
||||||
|
to_userpath, to_parent, to_childname, overwrite), level=NOISY)
|
||||||
|
|
||||||
direntry = self._direntry_for(parent, childname)
|
# First we synchronously rename all heisenfiles matching the userpath or direntry.
|
||||||
if direntry in self._open_files:
|
# Then we .sync() each file that we renamed.
|
||||||
old_files = self._open_files[direntry]
|
#
|
||||||
new_files = [f for f in old_files if f is not file_to_remove]
|
# For each file, the call to .rename occurs:
|
||||||
if len(new_files) > 0:
|
# * before the file is closed, in which case it will be committed at the
|
||||||
self._open_files[direntry] = new_files
|
# new direntry; or
|
||||||
else:
|
# * after it is closed but before it has been close_notified, in which case the
|
||||||
del self._open_files[direntry]
|
# .sync() ensures that it has been committed (successfully or not) before we
|
||||||
|
# return.
|
||||||
if direntry in all_open_files:
|
#
|
||||||
(all_old_files, opentime) = all_open_files[direntry]
|
# This avoids a race that might otherwise cause the file to be committed at the
|
||||||
all_new_files = [f for f in all_old_files if f is not file_to_remove]
|
# old name after the rename operation has completed.
|
||||||
if len(all_new_files) > 0:
|
#
|
||||||
all_open_files[direntry] = (all_new_files, opentime)
|
# Note that if overwrite is False, the caller should already have checked
|
||||||
else:
|
# whether a real direntry exists at the destination. It is possible that another
|
||||||
del all_open_files[direntry]
|
# direntry (heisen or real) comes to exist at the destination after that check,
|
||||||
|
# but in that case it is correct for the rename to succeed (and for the commit
|
||||||
def _rename_open_files(self, from_parent, from_childname, to_parent, to_childname):
|
# of the heisenfile at the destination to possibly clobber the other entry, since
|
||||||
"""When an direntry is renamed, any open files for that direntry are also renamed.
|
# that can happen anyway when we have concurrent write handles to the same direntry).
|
||||||
Return True if there were any open files at from_direntry."""
|
#
|
||||||
|
# We return a Deferred that fires with True if any files were renamed (this
|
||||||
if noisy: self.log("._rename_open_files(%r, %r, %r, %r)" %
|
# does not mean that they were not committed; it is used to determine whether
|
||||||
(from_parent, from_childname, to_parent, to_childname), level=NOISY)
|
# a NoSuchChildError from the rename attempt should be suppressed). If overwrite
|
||||||
|
# is False and there were already heisenfiles at the destination userpath or
|
||||||
|
# direntry, we return a Deferred that fails with SFTPError(FX_PERMISSION_DENIED).
|
||||||
|
|
||||||
from_direntry = self._direntry_for(from_parent, from_childname)
|
from_direntry = self._direntry_for(from_parent, from_childname)
|
||||||
to_direntry = self._direntry_for(to_parent, to_childname)
|
to_direntry = self._direntry_for(to_parent, to_childname)
|
||||||
|
|
||||||
if from_direntry in all_open_files:
|
if not overwrite and (to_userpath in self._heisenfiles or to_direntry in all_heisenfiles):
|
||||||
(from_files, opentime) = all_open_files[from_direntry]
|
def _existing(): raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
|
||||||
del self._open_files[from_direntry]
|
return defer.execute(_existing)
|
||||||
del all_open_files[from_direntry]
|
|
||||||
for file in from_files:
|
from_files = []
|
||||||
file.rename(to_parent, to_childname)
|
if from_direntry in all_heisenfiles:
|
||||||
self._add_open_files(to_direntry, from_files)
|
(from_files, opentime) = all_heisenfiles[from_direntry]
|
||||||
return True
|
del all_heisenfiles[from_direntry]
|
||||||
|
if from_userpath in self._heisenfiles:
|
||||||
|
from_files += self._heisenfiles[from_userpath]
|
||||||
|
del self._heisenfiles[from_userpath]
|
||||||
|
|
||||||
|
self._add_heisenfiles_by_direntry(to_direntry, from_files)
|
||||||
|
self._add_heisenfiles_by_path(to_userpath, from_files)
|
||||||
|
|
||||||
|
for f in from_files:
|
||||||
|
f.rename(to_userpath, to_parent, to_childname)
|
||||||
|
|
||||||
|
d = defer.succeed(None)
|
||||||
|
for f in from_files:
|
||||||
|
d.addBoth(lambda ign: f.sync())
|
||||||
|
|
||||||
|
d.addBoth(lambda ign: len(from_files) > 0)
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _sync_heisenfiles(self, userpath, direntry, ignore=None):
|
||||||
|
if noisy: self.log("._sync_heisenfiles(%r, %r)" % (userpath, direntry), level=NOISY)
|
||||||
|
|
||||||
|
files = []
|
||||||
|
if direntry in all_heisenfiles:
|
||||||
|
(files, opentime) = all_heisenfiles[direntry]
|
||||||
|
if userpath in self._heisenfiles:
|
||||||
|
files += self._heisenfiles[userpath]
|
||||||
|
|
||||||
|
d = defer.succeed(None)
|
||||||
|
for f in files:
|
||||||
|
if f is not ignore:
|
||||||
|
d.addCallback(lambda ign: f.sync())
|
||||||
|
return d
|
||||||
|
|
||||||
|
def _remove_heisenfile(self, userpath, parent, childname, file_to_remove):
|
||||||
|
if noisy: self.log("._remove_file(%r, %r, %r, %r)" % (userpath, parent, childname, file_to_remove), level=NOISY)
|
||||||
|
|
||||||
|
direntry = self._direntry_for(parent, childname)
|
||||||
|
if direntry in all_heisenfiles:
|
||||||
|
(all_old_files, opentime) = all_heisenfiles[direntry]
|
||||||
|
all_new_files = [f for f in all_old_files if f is not file_to_remove]
|
||||||
|
if len(all_new_files) > 0:
|
||||||
|
all_heisenfiles[direntry] = (all_new_files, opentime)
|
||||||
else:
|
else:
|
||||||
return False
|
del all_heisenfiles[direntry]
|
||||||
|
|
||||||
|
if userpath in self._heisenfiles:
|
||||||
|
old_files = self._heisenfiles[userpath]
|
||||||
|
new_files = [f for f in old_files if f is not file_to_remove]
|
||||||
|
if len(new_files) > 0:
|
||||||
|
self._heisenfiles[userpath] = new_files
|
||||||
|
else:
|
||||||
|
del self._heisenfiles[userpath]
|
||||||
|
|
||||||
def _direntry_for(self, filenode_or_parent, childname=None):
|
def _direntry_for(self, filenode_or_parent, childname=None):
|
||||||
if filenode_or_parent:
|
if filenode_or_parent:
|
||||||
@ -1002,38 +1115,34 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def logout(self):
|
def _make_file(self, existing_file, userpath, flags, parent=None, childname=None, filenode=None, metadata=None):
|
||||||
self.log(".logout()", level=OPERATIONAL)
|
if noisy: self.log("._make_file(%r, %r, %r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r)" %
|
||||||
|
(existing_file, userpath, flags, _repr_flags(flags), parent, childname, filenode, metadata),
|
||||||
for (direntry, files_at_direntry) in enumerate(self._open_files):
|
level=NOISY)
|
||||||
for f in files_at_direntry:
|
|
||||||
f.remove()
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
def _make_file(self, flags, parent=None, childname=None, filenode=None, metadata=None):
|
|
||||||
if noisy: self.log("._make_file(%r = %r, parent=%r, childname=%r, filenode=%r, metadata=%r" %
|
|
||||||
(flags, _repr_flags(flags), parent, childname, filenode, metadata), level=NOISY)
|
|
||||||
|
|
||||||
assert metadata is None or 'readonly' in metadata, metadata
|
assert metadata is None or 'readonly' in metadata, metadata
|
||||||
writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
|
|
||||||
|
|
||||||
|
writing = (flags & (FXF_WRITE | FXF_CREAT)) != 0
|
||||||
if childname:
|
if childname:
|
||||||
direntry = self._direntry_for(parent, childname)
|
direntry = self._direntry_for(parent, childname)
|
||||||
else:
|
else:
|
||||||
direntry = self._direntry_for(filenode)
|
direntry = self._direntry_for(filenode)
|
||||||
|
|
||||||
d = self._sync_open_files(direntry)
|
d = self._sync_heisenfiles(userpath, direntry, ignore=existing_file)
|
||||||
|
|
||||||
if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
|
if not writing and (flags & FXF_READ) and filenode and not filenode.is_mutable() and filenode.get_size() <= SIZE_THRESHOLD:
|
||||||
d.addCallback(lambda ign: ShortReadOnlySFTPFile(filenode, metadata))
|
d.addCallback(lambda ign: ShortReadOnlySFTPFile(userpath, filenode, metadata))
|
||||||
else:
|
else:
|
||||||
d.addCallback(lambda ign: GeneralSFTPFile(self._close_notify, flags, self._convergence,
|
close_notify = None
|
||||||
parent=parent, childname=childname, filenode=filenode, metadata=metadata))
|
|
||||||
def _add_to_open(file):
|
|
||||||
if writing:
|
if writing:
|
||||||
self._add_open_files(direntry, [file])
|
close_notify = self._remove_heisenfile
|
||||||
return file
|
|
||||||
d.addCallback(_add_to_open)
|
d.addCallback(lambda ign: existing_file or GeneralSFTPFile(userpath, flags, close_notify, self._convergence))
|
||||||
|
def _got_file(file):
|
||||||
|
if writing:
|
||||||
|
self._add_heisenfiles_by_direntry(direntry, [file])
|
||||||
|
return file.open(parent=parent, childname=childname, filenode=filenode, metadata=metadata)
|
||||||
|
d.addCallback(_got_file)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def openFile(self, pathstring, flags, attrs):
|
def openFile(self, pathstring, flags, attrs):
|
||||||
@ -1041,21 +1150,43 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
self.log(request, level=OPERATIONAL)
|
self.log(request, level=OPERATIONAL)
|
||||||
|
|
||||||
# This is used for both reading and writing.
|
# This is used for both reading and writing.
|
||||||
# First exclude invalid combinations of flags.
|
# First exclude invalid combinations of flags, and empty paths.
|
||||||
|
|
||||||
if not (flags & (FXF_READ | FXF_WRITE)):
|
if not (flags & (FXF_READ | FXF_WRITE)):
|
||||||
raise SFTPError(FX_BAD_MESSAGE,
|
def _bad_readwrite():
|
||||||
"invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
|
raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: at least one of FXF_READ and FXF_WRITE must be set")
|
||||||
|
return defer.execute(_bad_readwrite)
|
||||||
|
|
||||||
if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
|
if (flags & FXF_EXCL) and not (flags & FXF_CREAT):
|
||||||
raise SFTPError(FX_BAD_MESSAGE,
|
def _bad_exclcreat():
|
||||||
"invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
|
raise SFTPError(FX_BAD_MESSAGE, "invalid file open flags: FXF_EXCL cannot be set without FXF_CREAT")
|
||||||
|
return defer.execute(_bad_exclcreat)
|
||||||
|
|
||||||
path = self._path_from_string(pathstring)
|
path = self._path_from_string(pathstring)
|
||||||
if not path:
|
if not path:
|
||||||
raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
|
def _emptypath(): raise SFTPError(FX_NO_SUCH_FILE, "path cannot be empty")
|
||||||
|
return defer.execute(_emptypath)
|
||||||
|
|
||||||
# The combination of flags is potentially valid. Now there are two major cases:
|
# The combination of flags is potentially valid.
|
||||||
|
|
||||||
|
# To work around clients that have race condition bugs, a getAttr, rename, or
|
||||||
|
# remove request following an 'open' request with FXF_WRITE or FXF_CREAT flags,
|
||||||
|
# should succeed even if the 'open' request has not yet completed. So we now
|
||||||
|
# synchronously add a file object into the self._heisenfiles dict, indexed
|
||||||
|
# by its UTF-8 userpath. (We can't yet add it to the all_heisenfiles dict,
|
||||||
|
# because we don't yet have a user-independent path for the file.) The file
|
||||||
|
# object does not know its filenode, parent, or childname at this point.
|
||||||
|
|
||||||
|
userpath = self._path_to_utf8(path)
|
||||||
|
|
||||||
|
if flags & (FXF_WRITE | FXF_CREAT):
|
||||||
|
file = GeneralSFTPFile(userpath, flags, self._remove_heisenfile, self._convergence)
|
||||||
|
self._add_heisenfiles_by_path(userpath, [file])
|
||||||
|
else:
|
||||||
|
# We haven't decided which file implementation to use yet.
|
||||||
|
file = None
|
||||||
|
|
||||||
|
# Now there are two major cases:
|
||||||
#
|
#
|
||||||
# 1. The path is specified as /uri/FILECAP, with no parent directory.
|
# 1. The path is specified as /uri/FILECAP, with no parent directory.
|
||||||
# If the FILECAP is mutable and writeable, then we can open it in write-only
|
# If the FILECAP is mutable and writeable, then we can open it in write-only
|
||||||
@ -1104,10 +1235,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
raise SFTPError(FX_FAILURE,
|
raise SFTPError(FX_FAILURE,
|
||||||
"cannot create a file exclusively when it already exists")
|
"cannot create a file exclusively when it already exists")
|
||||||
|
|
||||||
# The file does not need to be added to all_open_files, because it is not
|
# The file does not need to be added to all_heisenfiles, because it is not
|
||||||
# associated with a directory entry that needs to be updated.
|
# associated with a directory entry that needs to be updated.
|
||||||
|
|
||||||
return self._make_file(flags, filenode=root)
|
return self._make_file(file, userpath, flags, filenode=root)
|
||||||
else:
|
else:
|
||||||
# case 2
|
# case 2
|
||||||
childname = path[-1]
|
childname = path[-1]
|
||||||
@ -1170,7 +1301,8 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
"cannot open a file for writing when the parent directory is read-only")
|
"cannot open a file for writing when the parent directory is read-only")
|
||||||
|
|
||||||
metadata['readonly'] = _is_readonly(parent_readonly, filenode)
|
metadata['readonly'] = _is_readonly(parent_readonly, filenode)
|
||||||
return self._make_file(flags, parent=parent, childname=childname, filenode=filenode, metadata=metadata)
|
return self._make_file(file, userpath, flags, parent=parent, childname=childname,
|
||||||
|
filenode=filenode, metadata=metadata)
|
||||||
def _no_child(f):
|
def _no_child(f):
|
||||||
if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
|
if noisy: self.log("_no_child(%r)" % (f,), level=NOISY)
|
||||||
f.trap(NoSuchChildError)
|
f.trap(NoSuchChildError)
|
||||||
@ -1182,7 +1314,7 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
raise SFTPError(FX_PERMISSION_DENIED,
|
raise SFTPError(FX_PERMISSION_DENIED,
|
||||||
"cannot create a file when the parent directory is read-only")
|
"cannot create a file when the parent directory is read-only")
|
||||||
|
|
||||||
return self._make_file(flags, parent=parent, childname=childname)
|
return self._make_file(file, userpath, flags, parent=parent, childname=childname)
|
||||||
d3.addCallbacks(_got_child, _no_child)
|
d3.addCallbacks(_got_child, _no_child)
|
||||||
return d3
|
return d3
|
||||||
|
|
||||||
@ -1190,6 +1322,11 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
return d2
|
return d2
|
||||||
|
|
||||||
d.addCallback(_got_root)
|
d.addCallback(_got_root)
|
||||||
|
def _remove_on_error(err):
|
||||||
|
if file:
|
||||||
|
self._remove_heisenfile(userpath, None, None, file)
|
||||||
|
return err
|
||||||
|
d.addErrback(_remove_on_error)
|
||||||
d.addBoth(_convert_error, request)
|
d.addBoth(_convert_error, request)
|
||||||
return d
|
return d
|
||||||
|
|
||||||
@ -1199,13 +1336,15 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
|
|
||||||
from_path = self._path_from_string(from_pathstring)
|
from_path = self._path_from_string(from_pathstring)
|
||||||
to_path = self._path_from_string(to_pathstring)
|
to_path = self._path_from_string(to_pathstring)
|
||||||
|
from_userpath = self._path_to_utf8(from_path)
|
||||||
|
to_userpath = self._path_to_utf8(to_path)
|
||||||
|
|
||||||
# the target directory must already exist
|
# the target directory must already exist
|
||||||
d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
|
d = deferredutil.gatherResults([self._get_parent_or_node(from_path),
|
||||||
self._get_parent_or_node(to_path)])
|
self._get_parent_or_node(to_path)])
|
||||||
def _got( (from_pair, to_pair) ):
|
def _got( (from_pair, to_pair) ):
|
||||||
if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r)" %
|
if noisy: self.log("_got( (%r, %r) ) in .renameFile(%r, %r, overwrite=%r)" %
|
||||||
(from_pair, to_pair, from_pathstring, to_pathstring), level=NOISY)
|
(from_pair, to_pair, from_pathstring, to_pathstring, overwrite), level=NOISY)
|
||||||
(from_parent, from_childname) = from_pair
|
(from_parent, from_childname) = from_pair
|
||||||
(to_parent, to_childname) = to_pair
|
(to_parent, to_childname) = to_pair
|
||||||
|
|
||||||
@ -1217,33 +1356,47 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
# <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
|
# <http://tools.ietf.org/html/draft-ietf-secsh-filexfer-02#section-6.5>
|
||||||
# "It is an error if there already exists a file with the name specified
|
# "It is an error if there already exists a file with the name specified
|
||||||
# by newpath."
|
# by newpath."
|
||||||
|
# OpenSSH's SFTP server returns FX_PERMISSION_DENIED for this error.
|
||||||
|
#
|
||||||
# For the standard SSH_FXP_RENAME operation, overwrite=False.
|
# For the standard SSH_FXP_RENAME operation, overwrite=False.
|
||||||
# We also support the posix-rename@openssh.com extension, which uses overwrite=True.
|
# We also support the posix-rename@openssh.com extension, which uses overwrite=True.
|
||||||
|
|
||||||
# FIXME: use move_child_to_path to avoid possible data loss due to #943
|
d2 = defer.fail(NoSuchChildError())
|
||||||
#d2 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
|
if not overwrite:
|
||||||
|
d2.addCallback(lambda ign: to_parent.get(to_childname))
|
||||||
|
def _expect_fail(res):
|
||||||
|
if not isinstance(res, Failure):
|
||||||
|
raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
|
||||||
|
|
||||||
d2 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
|
# It is OK if we fail for errors other than NoSuchChildError, since that probably
|
||||||
def _check(err):
|
# indicates some problem accessing the destination directory.
|
||||||
if noisy: self.log("_check(%r) in .renameFile(%r, %r)" %
|
res.trap(NoSuchChildError)
|
||||||
(err, from_pathstring, to_pathstring), level=NOISY)
|
d2.addBoth(_expect_fail)
|
||||||
|
|
||||||
if not isinstance(err, Failure) or err.check(NoSuchChildError):
|
# If there are heisenfiles to be written at the 'from' direntry, then ensure
|
||||||
# If there are open files to be written at the 'from' direntry, then ensure
|
|
||||||
# they will now be written at the 'to' direntry instead.
|
# they will now be written at the 'to' direntry instead.
|
||||||
if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r" %
|
d2.addCallback(lambda ign:
|
||||||
(self._open_files, all_open_files), level=NOISY)
|
self._rename_heisenfiles(from_userpath, from_parent, from_childname,
|
||||||
if self._rename_open_files(from_parent, from_childname, to_parent, to_childname):
|
to_userpath, to_parent, to_childname, overwrite=overwrite))
|
||||||
# suppress the NoSuchChildError if any open files were renamed
|
|
||||||
if noisy: self.log("after renaming:\nself._open_files = %r\nall_open_files = %r" %
|
def _move(renamed):
|
||||||
(self._open_files, all_open_files), level=NOISY)
|
# FIXME: use move_child_to_path to avoid possible data loss due to #943
|
||||||
|
#d3 = from_parent.move_child_to_path(from_childname, to_root, to_path, overwrite=overwrite)
|
||||||
|
|
||||||
|
d3 = from_parent.move_child_to(from_childname, to_parent, to_childname, overwrite=overwrite)
|
||||||
|
def _check(err):
|
||||||
|
if noisy: self.log("_check(%r) in .renameFile(%r, %r, overwrite=%r)" %
|
||||||
|
(err, from_pathstring, to_pathstring, overwrite), level=NOISY)
|
||||||
|
|
||||||
|
if not isinstance(err, Failure) or (renamed and err.check(NoSuchChildError)):
|
||||||
return None
|
return None
|
||||||
elif err.check(ExistingChildError):
|
if not overwrite and err.check(ExistingChildError):
|
||||||
# OpenSSH SFTP server returns FX_PERMISSION_DENIED
|
raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_userpath)
|
||||||
raise SFTPError(FX_PERMISSION_DENIED, "cannot rename to existing path " + to_pathstring)
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
d2.addBoth(_check)
|
d3.addBoth(_check)
|
||||||
|
return d3
|
||||||
|
d2.addCallback(_move)
|
||||||
return d2
|
return d2
|
||||||
d.addCallback(_got)
|
d.addCallback(_got)
|
||||||
d.addBoth(_convert_error, request)
|
d.addBoth(_convert_error, request)
|
||||||
@ -1297,35 +1450,20 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
return d
|
return d
|
||||||
|
|
||||||
def _remove_object(self, path, must_be_directory=False, must_be_file=False):
|
def _remove_object(self, path, must_be_directory=False, must_be_file=False):
|
||||||
d = defer.maybeDeferred(self._get_parent_or_node, path)
|
userpath = self._path_to_utf8(path)
|
||||||
|
d = self._get_parent_or_node(path)
|
||||||
def _got_parent( (parent, childname) ):
|
def _got_parent( (parent, childname) ):
|
||||||
# FIXME (minor): there is a race condition between the 'get' and 'delete',
|
|
||||||
# so it is possible that the must_be_directory or must_be_file restrictions
|
|
||||||
# might not be enforced correctly if the type has just changed.
|
|
||||||
|
|
||||||
if childname is None:
|
if childname is None:
|
||||||
raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
|
raise SFTPError(FX_NO_SUCH_FILE, "cannot remove an object specified by URI")
|
||||||
|
|
||||||
direntry = self._direntry_for(parent, childname)
|
direntry = self._direntry_for(parent, childname)
|
||||||
removed = self._remove_any_open_files(direntry)
|
d2 = defer.succeed(False)
|
||||||
|
if not must_be_directory:
|
||||||
|
d2.addCallback(lambda ign: self._abandon_any_heisenfiles(userpath, direntry))
|
||||||
|
|
||||||
d2 = parent.get(childname)
|
d2.addCallback(lambda abandoned:
|
||||||
def _got_child(child):
|
parent.delete(childname, must_exist=not abandoned,
|
||||||
# Unknown children can be removed by either removeFile or removeDirectory.
|
must_be_directory=must_be_directory, must_be_file=must_be_file))
|
||||||
if must_be_directory and IFileNode.providedBy(child):
|
|
||||||
raise SFTPError(FX_PERMISSION_DENIED, "rmdir called on a file")
|
|
||||||
if must_be_file and IDirectoryNode.providedBy(child):
|
|
||||||
raise SFTPError(FX_PERMISSION_DENIED, "rmfile called on a directory")
|
|
||||||
return parent.delete(childname)
|
|
||||||
def _no_child(err):
|
|
||||||
if removed and err.check(NoSuchChildError):
|
|
||||||
# suppress NoSuchChildError if an open file was removed
|
|
||||||
if noisy: self.log("suppressing NoSuchChildError for %r because it was removed as an open file" %
|
|
||||||
(direntry,), level=NOISY)
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
return err
|
|
||||||
d2.addCallbacks(_got_child, _no_child)
|
|
||||||
return d2
|
return d2
|
||||||
d.addCallback(_got_parent)
|
d.addCallback(_got_parent)
|
||||||
return d
|
return d
|
||||||
@ -1380,24 +1518,28 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
# reported as the update time of the best version. But that
|
# reported as the update time of the best version. But that
|
||||||
# information isn't currently stored in mutable shares, I think.
|
# information isn't currently stored in mutable shares, I think.
|
||||||
|
|
||||||
# TODO: some clients will incorrectly try to get the attributes
|
# Some clients will incorrectly try to get the attributes
|
||||||
# of a file immediately after opening it, before it has been put
|
# of a file immediately after opening it, before it has been put
|
||||||
# into the all_open_files table. This is a race condition bug in
|
# into the all_heisenfiles table. This is a race condition bug in
|
||||||
# the client, but we probably need to handle it anyway.
|
# the client, but we probably need to handle it anyway.
|
||||||
|
|
||||||
path = self._path_from_string(pathstring)
|
path = self._path_from_string(pathstring)
|
||||||
|
userpath = self._path_to_utf8(path)
|
||||||
d = self._get_parent_or_node(path)
|
d = self._get_parent_or_node(path)
|
||||||
def _got_parent_or_node( (parent_or_node, childname) ):
|
def _got_parent_or_node( (parent_or_node, childname) ):
|
||||||
if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
|
if noisy: self.log("_got_parent_or_node( (%r, %r) )" % (parent_or_node, childname), level=NOISY)
|
||||||
|
|
||||||
|
direntry = self._direntry_for(parent_or_node, childname)
|
||||||
|
d2 = self._sync_heisenfiles(userpath, direntry)
|
||||||
|
|
||||||
if childname is None:
|
if childname is None:
|
||||||
node = parent_or_node
|
node = parent_or_node
|
||||||
d2 = node.get_current_size()
|
d2.addCallback(lambda ign: node.get_current_size())
|
||||||
d2.addCallback(lambda size:
|
d2.addCallback(lambda size:
|
||||||
_populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size))
|
_populate_attrs(node, {'readonly': node.is_unknown() or node.is_readonly()}, size=size))
|
||||||
return d2
|
|
||||||
else:
|
else:
|
||||||
parent = parent_or_node
|
parent = parent_or_node
|
||||||
d2 = parent.get_child_and_metadata_at_path([childname])
|
d2.addCallback(lambda ign: parent.get_child_and_metadata_at_path([childname]))
|
||||||
def _got( (child, metadata) ):
|
def _got( (child, metadata) ):
|
||||||
if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
|
if noisy: self.log("_got( (%r, %r) )" % (child, metadata), level=NOISY)
|
||||||
assert IDirectoryNode.providedBy(parent), parent
|
assert IDirectoryNode.providedBy(parent), parent
|
||||||
@ -1409,10 +1551,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
|
if noisy: self.log("_nosuch(%r)" % (err,), level=NOISY)
|
||||||
err.trap(NoSuchChildError)
|
err.trap(NoSuchChildError)
|
||||||
direntry = self._direntry_for(parent, childname)
|
direntry = self._direntry_for(parent, childname)
|
||||||
if noisy: self.log("checking open files:\nself._open_files = %r\nall_open_files = %r\ndirentry=%r" %
|
if noisy: self.log("checking open files:\nself._heisenfiles = %r\nall_heisenfiles = %r\ndirentry=%r" %
|
||||||
(self._open_files, all_open_files, direntry), level=NOISY)
|
(self._heisenfiles, all_heisenfiles, direntry), level=NOISY)
|
||||||
if direntry in all_open_files:
|
if direntry in all_heisenfiles:
|
||||||
(files, opentime) = all_open_files[direntry]
|
(files, opentime) = all_heisenfiles[direntry]
|
||||||
sftptime = _to_sftp_time(opentime)
|
sftptime = _to_sftp_time(opentime)
|
||||||
# A file that has been opened for writing necessarily has permissions rw-rw-rw-.
|
# A file that has been opened for writing necessarily has permissions rw-rw-rw-.
|
||||||
return {'permissions': S_IFREG | 0666,
|
return {'permissions': S_IFREG | 0666,
|
||||||
@ -1503,8 +1645,10 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
def realPath(self, pathstring):
|
def realPath(self, pathstring):
|
||||||
self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
|
self.log(".realPath(%r)" % (pathstring,), level=OPERATIONAL)
|
||||||
|
|
||||||
path_utf8 = [p.encode('utf-8') for p in self._path_from_string(pathstring)]
|
return self._path_to_utf8(self._path_from_string(pathstring))
|
||||||
return "/" + "/".join(path_utf8)
|
|
||||||
|
def _path_to_utf8(self, path):
|
||||||
|
return (u"/" + u"/".join(path)).encode('utf-8')
|
||||||
|
|
||||||
def _path_from_string(self, pathstring):
|
def _path_from_string(self, pathstring):
|
||||||
if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
|
if noisy: self.log("CONVERT %r" % (pathstring,), level=NOISY)
|
||||||
@ -1537,11 +1681,12 @@ class SFTPUserHandler(ConchUser, PrefixingLogMixin):
|
|||||||
|
|
||||||
def _get_root(self, path):
|
def _get_root(self, path):
|
||||||
# return Deferred (root, remaining_path)
|
# return Deferred (root, remaining_path)
|
||||||
|
d = defer.succeed(None)
|
||||||
if path and path[0] == u"uri":
|
if path and path[0] == u"uri":
|
||||||
d = defer.maybeDeferred(self._client.create_node_from_uri, path[1].encode('utf-8'))
|
d.addCallback(lambda ign: self._client.create_node_from_uri(path[1].encode('utf-8')))
|
||||||
d.addCallback(lambda root: (root, path[2:]))
|
d.addCallback(lambda root: (root, path[2:]))
|
||||||
else:
|
else:
|
||||||
d = defer.succeed((self._root, path))
|
d.addCallback(lambda ign: (self._root, path))
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def _get_parent_or_node(self, path):
|
def _get_parent_or_node(self, path):
|
||||||
|
@ -34,7 +34,7 @@ from allmydata.test.common import ShouldFailMixin
|
|||||||
timeout = 240
|
timeout = 240
|
||||||
|
|
||||||
class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
|
class Handler(GridTestMixin, ShouldFailMixin, unittest.TestCase):
|
||||||
"""This is a no-network unit test of the SFTPHandler class."""
|
"""This is a no-network unit test of the SFTPUserHandler and the abstractions it uses."""
|
||||||
|
|
||||||
if not have_pycrypto:
|
if not have_pycrypto:
|
||||||
skip = "SFTP support requires pycrypto, which is not installed"
|
skip = "SFTP support requires pycrypto, which is not installed"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user