mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-21 02:01:31 +00:00
Merge pull request #265 from meejah/fileutil-improvements
This commit is contained in:
commit
c715e0d839
@ -497,12 +497,14 @@ class FileUtil(ReallyEqualMixin, unittest.TestCase):
|
||||
|
||||
saved_cwd = os.path.normpath(os.getcwdu())
|
||||
abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
|
||||
abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False)
|
||||
self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
|
||||
self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
|
||||
if sys.platform == "win32":
|
||||
self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
|
||||
else:
|
||||
self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
|
||||
self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd)
|
||||
|
||||
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
|
||||
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
|
||||
@ -531,7 +533,19 @@ class FileUtil(ReallyEqualMixin, unittest.TestCase):
|
||||
|
||||
self.failUnlessReallyEqual(baz[4], bar[4]) # same drive
|
||||
|
||||
baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False)
|
||||
self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong)
|
||||
self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz")
|
||||
|
||||
bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz_notlong, long_path=False)
|
||||
self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong)
|
||||
self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar")
|
||||
# not u":\\baz\\bar", because \bar is absolute on the current drive.
|
||||
|
||||
self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0]) # same drive
|
||||
|
||||
self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
|
||||
self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False))
|
||||
|
||||
cwds = ['cwd']
|
||||
try:
|
||||
@ -547,9 +561,31 @@ class FileUtil(ReallyEqualMixin, unittest.TestCase):
|
||||
for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
|
||||
uabspath = fileutil.abspath_expanduser_unicode(upath)
|
||||
self.failUnless(isinstance(uabspath, unicode), uabspath)
|
||||
|
||||
uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False)
|
||||
self.failUnless(isinstance(uabspath_notlong, unicode), uabspath_notlong)
|
||||
finally:
|
||||
os.chdir(saved_cwd)
|
||||
|
||||
def test_make_dirs_with_absolute_mode(self):
|
||||
if sys.platform == 'win32':
|
||||
raise unittest.SkipTest("Permissions don't work the same on windows.")
|
||||
|
||||
workdir = fileutil.abspath_expanduser_unicode(u"test_make_dirs_with_absolute_mode")
|
||||
fileutil.make_dirs(workdir)
|
||||
abspath = fileutil.abspath_expanduser_unicode(u"a/b/c/d", base=workdir)
|
||||
fileutil.make_dirs_with_absolute_mode(workdir, abspath, 0766)
|
||||
new_mode = os.stat(os.path.join(workdir, "a", "b", "c", "d")).st_mode & 0777
|
||||
self.failUnlessEqual(new_mode, 0766)
|
||||
new_mode = os.stat(os.path.join(workdir, "a", "b", "c")).st_mode & 0777
|
||||
self.failUnlessEqual(new_mode, 0766)
|
||||
new_mode = os.stat(os.path.join(workdir, "a", "b")).st_mode & 0777
|
||||
self.failUnlessEqual(new_mode, 0766)
|
||||
new_mode = os.stat(os.path.join(workdir,"a")).st_mode & 0777
|
||||
self.failUnlessEqual(new_mode, 0766)
|
||||
new_mode = os.stat(workdir).st_mode & 0777
|
||||
self.failIfEqual(new_mode, 0766)
|
||||
|
||||
def test_create_long_path(self):
|
||||
workdir = u"test_create_long_path"
|
||||
fileutil.make_dirs(workdir)
|
||||
@ -604,6 +640,60 @@ class FileUtil(ReallyEqualMixin, unittest.TestCase):
|
||||
disk = fileutil.get_disk_stats('.', 2**128)
|
||||
self.failUnlessEqual(disk['avail'], 0)
|
||||
|
||||
def test_get_pathinfo(self):
|
||||
basedir = "util/FileUtil/test_get_pathinfo"
|
||||
fileutil.make_dirs(basedir)
|
||||
|
||||
# create a directory
|
||||
self.mkdir(basedir, "a")
|
||||
dirinfo = fileutil.get_pathinfo(basedir)
|
||||
self.failUnlessTrue(dirinfo.isdir)
|
||||
self.failUnlessTrue(dirinfo.exists)
|
||||
self.failUnlessFalse(dirinfo.isfile)
|
||||
self.failUnlessFalse(dirinfo.islink)
|
||||
|
||||
# create a file
|
||||
f = os.path.join(basedir, "1.txt")
|
||||
fileutil.write(f, "a"*10)
|
||||
fileinfo = fileutil.get_pathinfo(f)
|
||||
self.failUnlessTrue(fileinfo.isfile)
|
||||
self.failUnlessTrue(fileinfo.exists)
|
||||
self.failUnlessFalse(fileinfo.isdir)
|
||||
self.failUnlessFalse(fileinfo.islink)
|
||||
self.failUnlessEqual(fileinfo.size, 10)
|
||||
|
||||
# path at which nothing exists
|
||||
dnename = os.path.join(basedir, "doesnotexist")
|
||||
now = time.time()
|
||||
dneinfo = fileutil.get_pathinfo(dnename, now=now)
|
||||
self.failUnlessFalse(dneinfo.exists)
|
||||
self.failUnlessFalse(dneinfo.isfile)
|
||||
self.failUnlessFalse(dneinfo.isdir)
|
||||
self.failUnlessFalse(dneinfo.islink)
|
||||
self.failUnlessEqual(dneinfo.size, None)
|
||||
self.failUnlessEqual(dneinfo.mtime, now)
|
||||
self.failUnlessEqual(dneinfo.ctime, now)
|
||||
|
||||
def test_get_pathinfo_symlink(self):
|
||||
if not hasattr(os, 'symlink'):
|
||||
raise unittest.SkipTest("can't create symlinks on this platform")
|
||||
|
||||
basedir = "util/FileUtil/test_get_pathinfo"
|
||||
fileutil.make_dirs(basedir)
|
||||
|
||||
f = os.path.join(basedir, "1.txt")
|
||||
fileutil.write(f, "a"*10)
|
||||
|
||||
# create a symlink pointing to 1.txt
|
||||
slname = os.path.join(basedir, "linkto1.txt")
|
||||
os.symlink(f, slname)
|
||||
symlinkinfo = fileutil.get_pathinfo(slname)
|
||||
self.failUnlessTrue(symlinkinfo.islink)
|
||||
self.failUnlessTrue(symlinkinfo.exists)
|
||||
self.failUnlessFalse(symlinkinfo.isfile)
|
||||
self.failUnlessFalse(symlinkinfo.isdir)
|
||||
|
||||
|
||||
class PollMixinTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.pm = pollmixin.PollMixin()
|
||||
|
@ -3,16 +3,20 @@ Futz with files like a pro.
|
||||
"""
|
||||
|
||||
import sys, exceptions, os, stat, tempfile, time, binascii
|
||||
from collections import namedtuple
|
||||
from errno import ENOENT
|
||||
|
||||
if sys.platform == "win32":
|
||||
from ctypes import WINFUNCTYPE, WinError, windll, POINTER, byref, c_ulonglong, \
|
||||
create_unicode_buffer, get_last_error
|
||||
from ctypes.wintypes import BOOL, DWORD, LPCWSTR, LPWSTR
|
||||
from ctypes.wintypes import BOOL, DWORD, LPCWSTR, LPWSTR, LPVOID, HANDLE
|
||||
|
||||
from twisted.python import log
|
||||
|
||||
from pycryptopp.cipher.aes import AES
|
||||
|
||||
from allmydata.util.assertutil import _assert
|
||||
|
||||
|
||||
def rename(src, dst, tries=4, basedelay=0.1):
|
||||
""" Here is a superkludge to workaround the fact that occasionally on
|
||||
@ -140,6 +144,31 @@ class EncryptedTemporaryFile:
|
||||
old end-of-file are unspecified. The file position after this operation is unspecified."""
|
||||
self.file.truncate(newsize)
|
||||
|
||||
def make_dirs_with_absolute_mode(parent, dirname, mode):
|
||||
"""
|
||||
Make directory `dirname` and chmod it to `mode` afterwards.
|
||||
We chmod all parent directories of `dirname` until we reach
|
||||
`parent`.
|
||||
"""
|
||||
precondition_abspath(parent)
|
||||
precondition_abspath(dirname)
|
||||
if not is_ancestor_path(parent, dirname):
|
||||
raise AssertionError("dirname must be a descendant of parent")
|
||||
|
||||
make_dirs(dirname)
|
||||
while dirname != parent:
|
||||
os.chmod(dirname, mode)
|
||||
# FIXME: doesn't seem to work on Windows for long paths
|
||||
old_dirname, dirname = dirname, os.path.dirname(dirname)
|
||||
_assert(len(dirname) < len(old_dirname), dirname=dirname, old_dirname=old_dirname)
|
||||
|
||||
def is_ancestor_path(parent, dirname):
|
||||
while dirname != parent:
|
||||
# FIXME: doesn't seem to work on Windows for long paths
|
||||
old_dirname, dirname = dirname, os.path.dirname(dirname)
|
||||
if len(dirname) >= len(old_dirname):
|
||||
return False
|
||||
return True
|
||||
|
||||
def make_dirs(dirname, mode=0777):
|
||||
"""
|
||||
@ -279,17 +308,18 @@ try:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def abspath_expanduser_unicode(path, base=None):
|
||||
def abspath_expanduser_unicode(path, base=None, long_path=True):
|
||||
"""
|
||||
Return the absolute version of a path. If 'base' is given and 'path' is relative,
|
||||
the path will be expanded relative to 'base'.
|
||||
'path' must be a Unicode string. 'base', if given, must be a Unicode string
|
||||
corresponding to an absolute path as returned by a previous call to
|
||||
abspath_expanduser_unicode.
|
||||
On Windows, the result will be a long path unless long_path is given as False.
|
||||
"""
|
||||
if not isinstance(path, unicode):
|
||||
raise AssertionError("paths must be Unicode strings")
|
||||
if base is not None:
|
||||
if base is not None and long_path:
|
||||
precondition_abspath(base)
|
||||
|
||||
path = expanduser(path)
|
||||
@ -316,7 +346,7 @@ def abspath_expanduser_unicode(path, base=None):
|
||||
# there is always at least one Unicode path component.
|
||||
path = os.path.normpath(path)
|
||||
|
||||
if sys.platform == "win32":
|
||||
if sys.platform == "win32" and long_path:
|
||||
path = to_windows_long_path(path)
|
||||
|
||||
return path
|
||||
@ -514,3 +544,157 @@ def get_available_space(whichdir, reserved_space):
|
||||
except EnvironmentError:
|
||||
log.msg("OS call to get disk statistics failed")
|
||||
return 0
|
||||
|
||||
|
||||
if sys.platform == "win32":
|
||||
# <http://msdn.microsoft.com/en-us/library/aa363858%28v=vs.85%29.aspx>
|
||||
CreateFileW = WINFUNCTYPE(
|
||||
HANDLE, LPCWSTR, DWORD, DWORD, LPVOID, DWORD, DWORD, HANDLE,
|
||||
use_last_error=True
|
||||
)(("CreateFileW", windll.kernel32))
|
||||
|
||||
GENERIC_WRITE = 0x40000000
|
||||
FILE_SHARE_READ = 0x00000001
|
||||
FILE_SHARE_WRITE = 0x00000002
|
||||
OPEN_EXISTING = 3
|
||||
INVALID_HANDLE_VALUE = 0xFFFFFFFF
|
||||
|
||||
# <http://msdn.microsoft.com/en-us/library/aa364439%28v=vs.85%29.aspx>
|
||||
FlushFileBuffers = WINFUNCTYPE(
|
||||
BOOL, HANDLE,
|
||||
use_last_error=True
|
||||
)(("FlushFileBuffers", windll.kernel32))
|
||||
|
||||
# <http://msdn.microsoft.com/en-us/library/ms724211%28v=vs.85%29.aspx>
|
||||
CloseHandle = WINFUNCTYPE(
|
||||
BOOL, HANDLE,
|
||||
use_last_error=True
|
||||
)(("CloseHandle", windll.kernel32))
|
||||
|
||||
# <http://social.msdn.microsoft.com/forums/en-US/netfxbcl/thread/4465cafb-f4ed-434f-89d8-c85ced6ffaa8/>
|
||||
def flush_volume(path):
|
||||
abspath = os.path.realpath(path)
|
||||
if abspath.startswith("\\\\?\\"):
|
||||
abspath = abspath[4 :]
|
||||
drive = os.path.splitdrive(abspath)[0]
|
||||
|
||||
print "flushing %r" % (drive,)
|
||||
hVolume = CreateFileW(u"\\\\.\\" + drive,
|
||||
GENERIC_WRITE,
|
||||
FILE_SHARE_READ | FILE_SHARE_WRITE,
|
||||
None,
|
||||
OPEN_EXISTING,
|
||||
0,
|
||||
None
|
||||
)
|
||||
if hVolume == INVALID_HANDLE_VALUE:
|
||||
raise WinError(get_last_error())
|
||||
|
||||
if FlushFileBuffers(hVolume) == 0:
|
||||
raise WinError(get_last_error())
|
||||
|
||||
CloseHandle(hVolume)
|
||||
else:
|
||||
def flush_volume(path):
|
||||
# use sync()?
|
||||
pass
|
||||
|
||||
|
||||
class ConflictError(Exception):
|
||||
pass
|
||||
|
||||
class UnableToUnlinkReplacementError(Exception):
|
||||
pass
|
||||
|
||||
def reraise(wrapper):
|
||||
_, exc, tb = sys.exc_info()
|
||||
wrapper_exc = wrapper("%s: %s" % (exc.__class__.__name__, exc))
|
||||
raise wrapper_exc.__class__, wrapper_exc, tb
|
||||
|
||||
if sys.platform == "win32":
|
||||
# <https://msdn.microsoft.com/en-us/library/windows/desktop/aa365512%28v=vs.85%29.aspx>
|
||||
ReplaceFileW = WINFUNCTYPE(
|
||||
BOOL, LPCWSTR, LPCWSTR, LPCWSTR, DWORD, LPVOID, LPVOID,
|
||||
use_last_error=True
|
||||
)(("ReplaceFileW", windll.kernel32))
|
||||
|
||||
REPLACEFILE_IGNORE_MERGE_ERRORS = 0x00000002
|
||||
|
||||
# <https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382%28v=vs.85%29.aspx>
|
||||
ERROR_FILE_NOT_FOUND = 2
|
||||
|
||||
def rename_no_overwrite(source_path, dest_path):
|
||||
os.rename(source_path, dest_path)
|
||||
|
||||
def replace_file(replaced_path, replacement_path, backup_path):
|
||||
precondition_abspath(replaced_path)
|
||||
precondition_abspath(replacement_path)
|
||||
precondition_abspath(backup_path)
|
||||
|
||||
r = ReplaceFileW(replaced_path, replacement_path, backup_path,
|
||||
REPLACEFILE_IGNORE_MERGE_ERRORS, None, None)
|
||||
if r == 0:
|
||||
# The UnableToUnlinkReplacementError case does not happen on Windows;
|
||||
# all errors should be treated as signalling a conflict.
|
||||
err = get_last_error()
|
||||
if err != ERROR_FILE_NOT_FOUND:
|
||||
raise ConflictError("WinError: %s" % (WinError(err),))
|
||||
|
||||
try:
|
||||
rename_no_overwrite(replacement_path, replaced_path)
|
||||
except EnvironmentError:
|
||||
reraise(ConflictError)
|
||||
else:
|
||||
def rename_no_overwrite(source_path, dest_path):
|
||||
# link will fail with EEXIST if there is already something at dest_path.
|
||||
os.link(source_path, dest_path)
|
||||
try:
|
||||
os.unlink(source_path)
|
||||
except EnvironmentError:
|
||||
reraise(UnableToUnlinkReplacementError)
|
||||
|
||||
def replace_file(replaced_path, replacement_path, backup_path):
|
||||
precondition_abspath(replaced_path)
|
||||
precondition_abspath(replacement_path)
|
||||
precondition_abspath(backup_path)
|
||||
|
||||
if not os.path.exists(replacement_path):
|
||||
raise ConflictError("Replacement file not found: %r" % (replacement_path,))
|
||||
|
||||
try:
|
||||
os.rename(replaced_path, backup_path)
|
||||
except OSError as e:
|
||||
if e.errno != ENOENT:
|
||||
raise
|
||||
try:
|
||||
rename_no_overwrite(replacement_path, replaced_path)
|
||||
except EnvironmentError:
|
||||
reraise(ConflictError)
|
||||
|
||||
PathInfo = namedtuple('PathInfo', 'isdir isfile islink exists size mtime ctime')
|
||||
|
||||
def get_pathinfo(path_u, now=None):
|
||||
try:
|
||||
statinfo = os.lstat(path_u)
|
||||
mode = statinfo.st_mode
|
||||
return PathInfo(isdir =stat.S_ISDIR(mode),
|
||||
isfile=stat.S_ISREG(mode),
|
||||
islink=stat.S_ISLNK(mode),
|
||||
exists=True,
|
||||
size =statinfo.st_size,
|
||||
mtime =statinfo.st_mtime,
|
||||
ctime =statinfo.st_ctime,
|
||||
)
|
||||
except OSError as e:
|
||||
if e.errno == ENOENT:
|
||||
if now is None:
|
||||
now = time.time()
|
||||
return PathInfo(isdir =False,
|
||||
isfile=False,
|
||||
islink=False,
|
||||
exists=False,
|
||||
size =None,
|
||||
mtime =now,
|
||||
ctime =now,
|
||||
)
|
||||
raise
|
||||
|
Loading…
x
Reference in New Issue
Block a user