2020-08-06 16:07:50 +02:00
|
|
|
"""
|
|
|
|
Ported to Python3.
|
|
|
|
"""
|
|
|
|
|
2019-03-22 17:25:11 +01:00
|
|
|
from __future__ import print_function
|
2020-07-30 09:29:20 +02:00
|
|
|
from __future__ import absolute_import
|
|
|
|
from __future__ import division
|
|
|
|
from __future__ import unicode_literals
|
2019-03-22 17:25:11 +01:00
|
|
|
|
2020-07-30 09:29:20 +02:00
|
|
|
from future.utils import PY2
|
|
|
|
if PY2:
|
2020-10-13 08:39:25 -04:00
|
|
|
# open is not here because we want to use native strings on Py2
|
2020-08-06 16:07:50 +02:00
|
|
|
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, bytes, dict, list, object, range, str, max, min # noqa: F401
|
2019-03-30 17:44:39 +01:00
|
|
|
import six
|
2017-08-12 21:21:04 -07:00
|
|
|
import os, time, sys
|
2017-08-09 18:53:29 -07:00
|
|
|
import yaml
|
2020-10-05 11:01:11 -04:00
|
|
|
import json
|
2019-05-28 16:22:31 +02:00
|
|
|
|
2007-04-06 16:29:25 -07:00
|
|
|
from twisted.trial import unittest
|
|
|
|
|
2020-07-15 10:58:05 -04:00
|
|
|
from allmydata.util import idlib, mathutil
|
2020-07-21 14:27:08 -04:00
|
|
|
from allmydata.util import fileutil
|
2020-10-05 11:01:11 -04:00
|
|
|
from allmydata.util import jsonbytes
|
2020-07-29 14:38:58 -04:00
|
|
|
from allmydata.util import pollmixin
|
2020-07-27 15:53:26 -04:00
|
|
|
from allmydata.util import yamlutil
|
2019-07-08 14:00:39 -06:00
|
|
|
from allmydata.util.fileutil import EncryptedTemporaryFile
|
2020-07-16 15:44:41 -04:00
|
|
|
from allmydata.test.common_util import ReallyEqualMixin
|
2015-01-30 00:05:14 +00:00
|
|
|
|
2020-10-05 11:01:11 -04:00
|
|
|
|
2019-03-30 17:44:39 +01:00
|
|
|
if six.PY3:
|
|
|
|
long = int
|
|
|
|
|
2007-04-06 16:29:25 -07:00
|
|
|
|
2008-02-14 19:27:47 -07:00
|
|
|
class IDLib(unittest.TestCase):
|
2007-11-06 18:49:59 -07:00
|
|
|
def test_nodeid_b2a(self):
|
2020-12-10 10:11:43 -05:00
|
|
|
result = idlib.nodeid_b2a(b"\x00"*20)
|
|
|
|
self.assertEqual(result, "a"*32)
|
|
|
|
self.assertIsInstance(result, str)
|
2007-04-06 16:29:25 -07:00
|
|
|
|
2020-07-02 13:40:40 -04:00
|
|
|
|
2007-04-06 16:29:25 -07:00
|
|
|
class MyList(list):
|
|
|
|
pass
|
|
|
|
|
2007-04-08 12:43:01 -07:00
|
|
|
class Math(unittest.TestCase):
|
2009-01-13 20:12:35 -07:00
|
|
|
def test_round_sigfigs(self):
|
|
|
|
f = mathutil.round_sigfigs
|
|
|
|
self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002)
|
|
|
|
|
2007-04-08 13:02:13 -07:00
|
|
|
|
2015-01-30 00:05:14 +00:00
|
|
|
class FileUtil(ReallyEqualMixin, unittest.TestCase):
|
2019-03-28 14:02:56 +01:00
|
|
|
def mkdir(self, basedir, path, mode=0o777):
|
2007-07-03 11:15:05 -07:00
|
|
|
fn = os.path.join(basedir, path)
|
|
|
|
fileutil.make_dirs(fn, mode)
|
|
|
|
|
2007-07-03 15:49:45 -07:00
|
|
|
def touch(self, basedir, path, mode=None, data="touch\n"):
|
2007-07-03 11:15:05 -07:00
|
|
|
fn = os.path.join(basedir, path)
|
|
|
|
f = open(fn, "w")
|
2007-07-03 15:49:45 -07:00
|
|
|
f.write(data)
|
2007-07-03 11:15:05 -07:00
|
|
|
f.close()
|
|
|
|
if mode is not None:
|
|
|
|
os.chmod(fn, mode)
|
|
|
|
|
|
|
|
def test_rm_dir(self):
|
|
|
|
basedir = "util/FileUtil/test_rm_dir"
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
# create it again to test idempotency
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
d = os.path.join(basedir, "doomed")
|
|
|
|
self.mkdir(d, "a/b")
|
|
|
|
self.touch(d, "a/b/1.txt")
|
2019-03-28 14:02:56 +01:00
|
|
|
self.touch(d, "a/b/2.txt", 0o444)
|
2007-07-03 11:15:05 -07:00
|
|
|
self.touch(d, "a/b/3.txt", 0)
|
|
|
|
self.mkdir(d, "a/c")
|
|
|
|
self.touch(d, "a/c/1.txt")
|
2019-03-28 14:02:56 +01:00
|
|
|
self.touch(d, "a/c/2.txt", 0o444)
|
2007-07-03 11:15:05 -07:00
|
|
|
self.touch(d, "a/c/3.txt", 0)
|
2019-03-28 14:02:56 +01:00
|
|
|
os.chmod(os.path.join(d, "a/c"), 0o444)
|
2007-07-03 11:15:05 -07:00
|
|
|
self.mkdir(d, "a/d")
|
|
|
|
self.touch(d, "a/d/1.txt")
|
2019-03-28 14:02:56 +01:00
|
|
|
self.touch(d, "a/d/2.txt", 0o444)
|
2007-07-03 11:15:05 -07:00
|
|
|
self.touch(d, "a/d/3.txt", 0)
|
|
|
|
os.chmod(os.path.join(d, "a/d"), 0)
|
|
|
|
|
|
|
|
fileutil.rm_dir(d)
|
|
|
|
self.failIf(os.path.exists(d))
|
|
|
|
# remove it again to test idempotency
|
|
|
|
fileutil.rm_dir(d)
|
|
|
|
|
|
|
|
def test_remove_if_possible(self):
|
|
|
|
basedir = "util/FileUtil/test_remove_if_possible"
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
self.touch(basedir, "here")
|
|
|
|
fn = os.path.join(basedir, "here")
|
|
|
|
fileutil.remove_if_possible(fn)
|
|
|
|
self.failIf(os.path.exists(fn))
|
|
|
|
fileutil.remove_if_possible(fn) # should be idempotent
|
|
|
|
fileutil.rm_dir(basedir)
|
|
|
|
fileutil.remove_if_possible(fn) # should survive errors
|
|
|
|
|
write node.url and portnum files atomically, to fix race in test_runner
Previously, test_runner sometimes fails because the _node_has_started()
poller fires after the portnum file has been opened, but before it has
actually been filled, allowing the test process to observe an empty file,
which flunks the test.
This adds a new fileutil.write_atomically() function (using the usual
write-to-.tmp-then-rename approach), and uses it for both node.url and
client.port . These files are written a bit before the node is really up and
running, but they're late enough for test_runner's purposes, which is to know
when it's safe to read client.port and use 'tahoe restart' (and therefore
SIGINT) to restart the node.
The current node/client code doesn't offer any better "are you really done
with startup" indicator.. the ideal approach would be to either watch the
logfile, or connect to its flogport, but both are a hassle. Changing the node
to write out a new "all done" file would be intrusive for regular
operations.
2012-05-14 13:32:03 -07:00
|
|
|
def test_write_atomically(self):
|
|
|
|
basedir = "util/FileUtil/test_write_atomically"
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
fn = os.path.join(basedir, "here")
|
2020-08-04 12:26:43 +02:00
|
|
|
fileutil.write_atomically(fn, b"one", "b")
|
2020-07-30 09:29:20 +02:00
|
|
|
self.failUnlessEqual(fileutil.read(fn), b"one")
|
|
|
|
fileutil.write_atomically(fn, u"two", mode="") # non-binary
|
|
|
|
self.failUnlessEqual(fileutil.read(fn), b"two")
|
write node.url and portnum files atomically, to fix race in test_runner
Previously, test_runner sometimes fails because the _node_has_started()
poller fires after the portnum file has been opened, but before it has
actually been filled, allowing the test process to observe an empty file,
which flunks the test.
This adds a new fileutil.write_atomically() function (using the usual
write-to-.tmp-then-rename approach), and uses it for both node.url and
client.port . These files are written a bit before the node is really up and
running, but they're late enough for test_runner's purposes, which is to know
when it's safe to read client.port and use 'tahoe restart' (and therefore
SIGINT) to restart the node.
The current node/client code doesn't offer any better "are you really done
with startup" indicator.. the ideal approach would be to either watch the
logfile, or connect to its flogport, but both are a hassle. Changing the node
to write out a new "all done" file would be intrusive for regular
operations.
2012-05-14 13:32:03 -07:00
|
|
|
|
2007-07-03 11:15:05 -07:00
|
|
|
def test_rename(self):
|
|
|
|
basedir = "util/FileUtil/test_rename"
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
self.touch(basedir, "here")
|
|
|
|
fn = os.path.join(basedir, "here")
|
|
|
|
fn2 = os.path.join(basedir, "there")
|
|
|
|
fileutil.rename(fn, fn2)
|
|
|
|
self.failIf(os.path.exists(fn))
|
|
|
|
self.failUnless(os.path.exists(fn2))
|
2007-07-03 15:49:45 -07:00
|
|
|
|
2015-10-01 22:40:10 +01:00
|
|
|
def test_rename_no_overwrite(self):
|
|
|
|
workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite")
|
|
|
|
fileutil.make_dirs(workdir)
|
|
|
|
|
|
|
|
source_path = os.path.join(workdir, "source")
|
|
|
|
dest_path = os.path.join(workdir, "dest")
|
|
|
|
|
|
|
|
# when neither file exists
|
|
|
|
self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
|
|
|
|
|
|
|
|
# when only dest exists
|
2020-07-29 09:00:50 +02:00
|
|
|
fileutil.write(dest_path, b"dest")
|
2015-10-01 22:40:10 +01:00
|
|
|
self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
|
2020-07-29 09:00:50 +02:00
|
|
|
self.failUnlessEqual(fileutil.read(dest_path), b"dest")
|
2015-10-01 22:40:10 +01:00
|
|
|
|
|
|
|
# when both exist
|
2020-07-29 09:00:50 +02:00
|
|
|
fileutil.write(source_path, b"source")
|
2015-10-01 22:40:10 +01:00
|
|
|
self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
|
2020-07-29 09:00:50 +02:00
|
|
|
self.failUnlessEqual(fileutil.read(source_path), b"source")
|
|
|
|
self.failUnlessEqual(fileutil.read(dest_path), b"dest")
|
2015-10-01 22:40:10 +01:00
|
|
|
|
|
|
|
# when only source exists
|
|
|
|
os.remove(dest_path)
|
|
|
|
fileutil.rename_no_overwrite(source_path, dest_path)
|
2020-07-29 09:00:50 +02:00
|
|
|
self.failUnlessEqual(fileutil.read(dest_path), b"source")
|
2015-10-01 22:40:10 +01:00
|
|
|
self.failIf(os.path.exists(source_path))
|
|
|
|
|
|
|
|
def test_replace_file(self):
|
|
|
|
workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file")
|
|
|
|
fileutil.make_dirs(workdir)
|
|
|
|
|
|
|
|
replaced_path = os.path.join(workdir, "replaced")
|
|
|
|
replacement_path = os.path.join(workdir, "replacement")
|
|
|
|
|
|
|
|
# when none of the files exist
|
2018-03-27 16:11:40 -06:00
|
|
|
self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path)
|
2015-10-01 22:40:10 +01:00
|
|
|
|
|
|
|
# when only replaced exists
|
2020-07-29 09:00:50 +02:00
|
|
|
fileutil.write(replaced_path, b"foo")
|
2018-03-27 16:11:40 -06:00
|
|
|
self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path)
|
2020-07-29 09:00:50 +02:00
|
|
|
self.failUnlessEqual(fileutil.read(replaced_path), b"foo")
|
2015-10-01 22:40:10 +01:00
|
|
|
|
2018-03-27 16:11:40 -06:00
|
|
|
# when both replaced and replacement exist
|
2020-07-29 09:00:50 +02:00
|
|
|
fileutil.write(replacement_path, b"bar")
|
2018-03-27 16:11:40 -06:00
|
|
|
fileutil.replace_file(replaced_path, replacement_path)
|
2020-07-29 09:00:50 +02:00
|
|
|
self.failUnlessEqual(fileutil.read(replaced_path), b"bar")
|
2015-10-01 22:40:10 +01:00
|
|
|
self.failIf(os.path.exists(replacement_path))
|
|
|
|
|
|
|
|
# when only replacement exists
|
|
|
|
os.remove(replaced_path)
|
2020-07-29 09:00:50 +02:00
|
|
|
fileutil.write(replacement_path, b"bar")
|
2018-03-27 16:11:40 -06:00
|
|
|
fileutil.replace_file(replaced_path, replacement_path)
|
2020-07-29 09:00:50 +02:00
|
|
|
self.failUnlessEqual(fileutil.read(replaced_path), b"bar")
|
2015-10-01 22:40:10 +01:00
|
|
|
self.failIf(os.path.exists(replacement_path))
|
|
|
|
|
2007-07-03 15:49:45 -07:00
|
|
|
def test_du(self):
|
|
|
|
basedir = "util/FileUtil/test_du"
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
d = os.path.join(basedir, "space-consuming")
|
|
|
|
self.mkdir(d, "a/b")
|
|
|
|
self.touch(d, "a/b/1.txt", data="a"*10)
|
|
|
|
self.touch(d, "a/b/2.txt", data="b"*11)
|
|
|
|
self.mkdir(d, "a/c")
|
|
|
|
self.touch(d, "a/c/1.txt", data="c"*12)
|
|
|
|
self.touch(d, "a/c/2.txt", data="d"*13)
|
|
|
|
|
|
|
|
used = fileutil.du(basedir)
|
|
|
|
self.failUnlessEqual(10+11+12+13, used)
|
|
|
|
|
2010-07-21 16:15:07 -07:00
|
|
|
def test_abspath_expanduser_unicode(self):
|
2020-07-30 09:29:20 +02:00
|
|
|
self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, b"bytestring")
|
2010-07-21 16:15:07 -07:00
|
|
|
|
2020-07-30 09:29:20 +02:00
|
|
|
saved_cwd = os.path.normpath(os.getcwd())
|
|
|
|
if PY2:
|
|
|
|
saved_cwd = saved_cwd.decode("utf8")
|
2010-07-21 16:15:07 -07:00
|
|
|
abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
|
2015-10-23 21:58:39 +01:00
|
|
|
abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False)
|
2020-07-30 09:29:20 +02:00
|
|
|
self.failUnless(isinstance(saved_cwd, str), saved_cwd)
|
|
|
|
self.failUnless(isinstance(abspath_cwd, str), abspath_cwd)
|
2015-01-30 00:05:14 +00:00
|
|
|
if sys.platform == "win32":
|
|
|
|
self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
|
|
|
|
else:
|
|
|
|
self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
|
2015-10-23 21:58:39 +01:00
|
|
|
self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd)
|
2015-01-30 00:05:14 +00:00
|
|
|
|
|
|
|
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
|
|
|
|
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
|
|
|
|
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo")
|
|
|
|
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo")
|
|
|
|
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar")
|
2010-07-21 16:15:07 -07:00
|
|
|
|
|
|
|
# adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
|
|
|
|
|
2015-04-21 21:03:45 +01:00
|
|
|
foo = fileutil.abspath_expanduser_unicode(u"foo")
|
2015-04-21 21:28:21 +01:00
|
|
|
self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo)
|
2015-04-21 21:03:45 +01:00
|
|
|
|
|
|
|
foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo)
|
2015-04-21 21:28:21 +01:00
|
|
|
self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar)
|
2015-04-21 21:03:45 +01:00
|
|
|
|
|
|
|
if sys.platform == "win32":
|
|
|
|
# This is checking that a drive letter is added for a path without one.
|
|
|
|
baz = fileutil.abspath_expanduser_unicode(u"\\baz")
|
|
|
|
self.failUnless(baz.startswith(u"\\\\?\\"), baz)
|
|
|
|
self.failUnlessReallyEqual(baz[5 :], u":\\baz")
|
|
|
|
|
|
|
|
bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz)
|
|
|
|
self.failUnless(bar.startswith(u"\\\\?\\"), bar)
|
|
|
|
self.failUnlessReallyEqual(bar[5 :], u":\\bar")
|
|
|
|
# not u":\\baz\\bar", because \bar is absolute on the current drive.
|
|
|
|
|
|
|
|
self.failUnlessReallyEqual(baz[4], bar[4]) # same drive
|
|
|
|
|
2015-10-23 21:58:39 +01:00
|
|
|
baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False)
|
|
|
|
self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong)
|
|
|
|
self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz")
|
|
|
|
|
2015-10-24 01:14:18 +01:00
|
|
|
bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz_notlong, long_path=False)
|
2015-10-23 21:58:39 +01:00
|
|
|
self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong)
|
|
|
|
self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar")
|
|
|
|
# not u":\\baz\\bar", because \bar is absolute on the current drive.
|
|
|
|
|
|
|
|
self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0]) # same drive
|
|
|
|
|
2010-07-21 16:15:07 -07:00
|
|
|
self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
|
2015-10-23 21:58:39 +01:00
|
|
|
self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False))
|
2010-07-21 16:15:07 -07:00
|
|
|
|
|
|
|
cwds = ['cwd']
|
|
|
|
try:
|
|
|
|
cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding()
|
|
|
|
or 'ascii'))
|
|
|
|
except UnicodeEncodeError:
|
|
|
|
pass # the cwd can't be encoded -- test with ascii cwd only
|
|
|
|
|
|
|
|
for cwd in cwds:
|
|
|
|
try:
|
|
|
|
os.mkdir(cwd)
|
|
|
|
os.chdir(cwd)
|
|
|
|
for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
|
|
|
|
uabspath = fileutil.abspath_expanduser_unicode(upath)
|
2020-07-30 09:29:20 +02:00
|
|
|
self.failUnless(isinstance(uabspath, str), uabspath)
|
2015-10-23 21:58:39 +01:00
|
|
|
|
|
|
|
uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False)
|
2020-07-30 09:29:20 +02:00
|
|
|
self.failUnless(isinstance(uabspath_notlong, str), uabspath_notlong)
|
2010-07-21 16:15:07 -07:00
|
|
|
finally:
|
|
|
|
os.chdir(saved_cwd)
|
|
|
|
|
2016-01-21 19:34:24 +01:00
|
|
|
def test_make_dirs_with_absolute_mode(self):
|
2016-04-26 15:56:19 -06:00
|
|
|
if sys.platform == 'win32':
|
|
|
|
raise unittest.SkipTest("Permissions don't work the same on windows.")
|
|
|
|
|
2016-02-15 15:29:29 +00:00
|
|
|
workdir = fileutil.abspath_expanduser_unicode(u"test_make_dirs_with_absolute_mode")
|
2016-01-21 19:34:24 +01:00
|
|
|
fileutil.make_dirs(workdir)
|
2016-02-15 15:29:29 +00:00
|
|
|
abspath = fileutil.abspath_expanduser_unicode(u"a/b/c/d", base=workdir)
|
2019-03-28 14:02:56 +01:00
|
|
|
fileutil.make_dirs_with_absolute_mode(workdir, abspath, 0o766)
|
|
|
|
new_mode = os.stat(os.path.join(workdir, "a", "b", "c", "d")).st_mode & 0o777
|
|
|
|
self.failUnlessEqual(new_mode, 0o766)
|
|
|
|
new_mode = os.stat(os.path.join(workdir, "a", "b", "c")).st_mode & 0o777
|
|
|
|
self.failUnlessEqual(new_mode, 0o766)
|
|
|
|
new_mode = os.stat(os.path.join(workdir, "a", "b")).st_mode & 0o777
|
|
|
|
self.failUnlessEqual(new_mode, 0o766)
|
|
|
|
new_mode = os.stat(os.path.join(workdir, "a")).st_mode & 0o777
|
|
|
|
self.failUnlessEqual(new_mode, 0o766)
|
|
|
|
new_mode = os.stat(workdir).st_mode & 0o777
|
|
|
|
self.failIfEqual(new_mode, 0o766)
|
2016-01-21 19:34:24 +01:00
|
|
|
|
2015-01-30 00:05:14 +00:00
|
|
|
def test_create_long_path(self):
|
2018-06-13 13:12:41 -04:00
|
|
|
"""
|
|
|
|
Even for paths with total length greater than 260 bytes,
|
|
|
|
``fileutil.abspath_expanduser_unicode`` produces a path on which other
|
|
|
|
path-related APIs can operate.
|
|
|
|
|
|
|
|
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
|
|
|
|
documents certain Windows-specific path length limitations this test
|
|
|
|
is specifically intended to demonstrate can be overcome.
|
|
|
|
"""
|
2015-01-30 00:05:14 +00:00
|
|
|
workdir = u"test_create_long_path"
|
|
|
|
fileutil.make_dirs(workdir)
|
2018-06-13 13:12:41 -04:00
|
|
|
base_path = fileutil.abspath_expanduser_unicode(workdir)
|
|
|
|
base_length = len(base_path)
|
|
|
|
|
|
|
|
# Construct a path /just/ long enough to exercise the important case.
|
|
|
|
# It would be nice if we could just use a seemingly globally valid
|
|
|
|
# long file name (the `x...` portion) here - for example, a name 255
|
|
|
|
# bytes long- and a previous version of this test did just that.
|
|
|
|
# However, aufs imposes a 242 byte length limit on file names. Most
|
|
|
|
# other POSIX filesystems do allow names up to 255 bytes. It's not
|
|
|
|
# clear there's anything we can *do* about lower limits, though, and
|
|
|
|
# POSIX.1-2017 (and earlier) only requires that the maximum be at
|
|
|
|
# least 14 (!!!) bytes.
|
|
|
|
long_path = os.path.join(base_path, u'x' * (261 - base_length))
|
|
|
|
|
2015-01-30 00:05:14 +00:00
|
|
|
def _cleanup():
|
|
|
|
fileutil.remove(long_path)
|
|
|
|
self.addCleanup(_cleanup)
|
|
|
|
|
2020-07-29 09:00:50 +02:00
|
|
|
fileutil.write(long_path, b"test")
|
2015-01-30 00:05:14 +00:00
|
|
|
self.failUnless(os.path.exists(long_path))
|
2020-07-29 09:00:50 +02:00
|
|
|
self.failUnlessEqual(fileutil.read(long_path), b"test")
|
2015-01-30 00:05:14 +00:00
|
|
|
_cleanup()
|
|
|
|
self.failIf(os.path.exists(long_path))
|
|
|
|
|
2015-05-13 14:42:31 +01:00
|
|
|
def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None):
|
2015-01-29 18:32:05 +00:00
|
|
|
def call_windows_getenv(name):
|
2015-05-13 14:42:31 +01:00
|
|
|
if name == u"USERPROFILE": return userprofile
|
|
|
|
if name == u"HOMEDRIVE": return homedrive
|
|
|
|
if name == u"HOMEPATH": return homepath
|
2015-01-29 18:32:05 +00:00
|
|
|
self.fail("unexpected argument to call_windows_getenv")
|
|
|
|
self.patch(fileutil, 'windows_getenv', call_windows_getenv)
|
|
|
|
|
|
|
|
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
|
|
|
|
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
|
|
|
|
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
|
|
|
|
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a")
|
|
|
|
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~")
|
|
|
|
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo")
|
|
|
|
|
2015-05-13 14:42:31 +01:00
|
|
|
def test_windows_expanduser_xp(self):
|
|
|
|
return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100")
|
|
|
|
|
|
|
|
def test_windows_expanduser_win7(self):
|
|
|
|
return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
|
|
|
|
|
2010-09-10 08:35:20 -08:00
|
|
|
def test_disk_stats(self):
|
|
|
|
avail = fileutil.get_available_space('.', 2**14)
|
|
|
|
if avail == 0:
|
|
|
|
raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
|
|
|
|
|
|
|
|
disk = fileutil.get_disk_stats('.', 2**13)
|
|
|
|
self.failUnless(disk['total'] > 0, disk['total'])
|
2014-09-12 12:55:52 -07:00
|
|
|
# we tolerate used==0 for a Travis-CI bug, see #2290
|
|
|
|
self.failUnless(disk['used'] >= 0, disk['used'])
|
2010-09-10 08:35:20 -08:00
|
|
|
self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
|
|
|
|
self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
|
|
|
|
self.failUnless(disk['avail'] > 0, disk['avail'])
|
|
|
|
|
|
|
|
def test_disk_stats_avail_nonnegative(self):
|
|
|
|
# This test will spuriously fail if you have more than 2^128
|
|
|
|
# bytes of available space on your filesystem.
|
|
|
|
disk = fileutil.get_disk_stats('.', 2**128)
|
|
|
|
self.failUnlessEqual(disk['avail'], 0)
|
|
|
|
|
2015-10-24 01:14:18 +01:00
|
|
|
def test_get_pathinfo(self):
|
|
|
|
basedir = "util/FileUtil/test_get_pathinfo"
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
|
|
|
|
# create a directory
|
|
|
|
self.mkdir(basedir, "a")
|
|
|
|
dirinfo = fileutil.get_pathinfo(basedir)
|
|
|
|
self.failUnlessTrue(dirinfo.isdir)
|
|
|
|
self.failUnlessTrue(dirinfo.exists)
|
|
|
|
self.failUnlessFalse(dirinfo.isfile)
|
|
|
|
self.failUnlessFalse(dirinfo.islink)
|
|
|
|
|
2015-10-24 01:14:18 +01:00
|
|
|
# create a file
|
|
|
|
f = os.path.join(basedir, "1.txt")
|
2020-07-29 09:00:50 +02:00
|
|
|
fileutil.write(f, b"a"*10)
|
2015-10-24 01:14:18 +01:00
|
|
|
fileinfo = fileutil.get_pathinfo(f)
|
|
|
|
self.failUnlessTrue(fileinfo.isfile)
|
|
|
|
self.failUnlessTrue(fileinfo.exists)
|
|
|
|
self.failUnlessFalse(fileinfo.isdir)
|
|
|
|
self.failUnlessFalse(fileinfo.islink)
|
|
|
|
self.failUnlessEqual(fileinfo.size, 10)
|
|
|
|
|
|
|
|
# path at which nothing exists
|
|
|
|
dnename = os.path.join(basedir, "doesnotexist")
|
2016-04-12 16:57:25 +01:00
|
|
|
now_ns = fileutil.seconds_to_ns(time.time())
|
|
|
|
dneinfo = fileutil.get_pathinfo(dnename, now_ns=now_ns)
|
2015-10-24 01:14:18 +01:00
|
|
|
self.failUnlessFalse(dneinfo.exists)
|
|
|
|
self.failUnlessFalse(dneinfo.isfile)
|
|
|
|
self.failUnlessFalse(dneinfo.isdir)
|
|
|
|
self.failUnlessFalse(dneinfo.islink)
|
|
|
|
self.failUnlessEqual(dneinfo.size, None)
|
2016-04-12 16:57:25 +01:00
|
|
|
self.failUnlessEqual(dneinfo.mtime_ns, now_ns)
|
|
|
|
self.failUnlessEqual(dneinfo.ctime_ns, now_ns)
|
2015-10-24 01:14:18 +01:00
|
|
|
|
|
|
|
def test_get_pathinfo_symlink(self):
|
|
|
|
if not hasattr(os, 'symlink'):
|
|
|
|
raise unittest.SkipTest("can't create symlinks on this platform")
|
|
|
|
|
|
|
|
basedir = "util/FileUtil/test_get_pathinfo"
|
|
|
|
fileutil.make_dirs(basedir)
|
|
|
|
|
|
|
|
f = os.path.join(basedir, "1.txt")
|
2020-07-29 09:00:50 +02:00
|
|
|
fileutil.write(f, b"a"*10)
|
2015-10-24 01:14:18 +01:00
|
|
|
|
|
|
|
# create a symlink pointing to 1.txt
|
|
|
|
slname = os.path.join(basedir, "linkto1.txt")
|
|
|
|
os.symlink(f, slname)
|
|
|
|
symlinkinfo = fileutil.get_pathinfo(slname)
|
|
|
|
self.failUnlessTrue(symlinkinfo.islink)
|
|
|
|
self.failUnlessTrue(symlinkinfo.exists)
|
|
|
|
self.failUnlessFalse(symlinkinfo.isfile)
|
|
|
|
self.failUnlessFalse(symlinkinfo.isdir)
|
|
|
|
|
2019-06-17 21:56:06 -06:00
|
|
|
def test_encrypted_tempfile(self):
|
|
|
|
f = EncryptedTemporaryFile()
|
2020-07-30 09:29:20 +02:00
|
|
|
f.write(b"foobar")
|
2019-06-17 21:56:06 -06:00
|
|
|
f.close()
|
|
|
|
|
2020-10-27 14:24:23 -04:00
|
|
|
def test_write(self):
|
|
|
|
"""fileutil.write() can write both unicode and bytes."""
|
|
|
|
path = self.mktemp()
|
|
|
|
fileutil.write(path, b"abc")
|
|
|
|
with open(path, "rb") as f:
|
|
|
|
self.assertEqual(f.read(), b"abc")
|
|
|
|
fileutil.write(path, u"def \u1234")
|
|
|
|
with open(path, "rb") as f:
|
|
|
|
self.assertEqual(f.read(), u"def \u1234".encode("utf-8"))
|
|
|
|
|
2015-10-24 01:14:18 +01:00
|
|
|
|
2007-09-07 16:15:41 -07:00
|
|
|
class PollMixinTests(unittest.TestCase):
|
|
|
|
def setUp(self):
|
2008-10-28 21:15:48 -07:00
|
|
|
self.pm = pollmixin.PollMixin()
|
2007-09-07 16:15:41 -07:00
|
|
|
|
|
|
|
def test_PollMixin_True(self):
|
|
|
|
d = self.pm.poll(check_f=lambda : True,
|
|
|
|
pollinterval=0.1)
|
2008-02-04 20:35:07 -07:00
|
|
|
return d
|
2007-09-07 16:15:41 -07:00
|
|
|
|
|
|
|
def test_PollMixin_False_then_True(self):
|
|
|
|
i = iter([False, True])
|
2020-07-29 09:00:50 +02:00
|
|
|
d = self.pm.poll(check_f=lambda: next(i),
|
2007-09-07 16:15:41 -07:00
|
|
|
pollinterval=0.1)
|
2008-02-04 20:35:07 -07:00
|
|
|
return d
|
|
|
|
|
|
|
|
def test_timeout(self):
|
|
|
|
d = self.pm.poll(check_f=lambda: False,
|
|
|
|
pollinterval=0.01,
|
|
|
|
timeout=1)
|
|
|
|
def _suc(res):
|
|
|
|
self.fail("poll should have failed, not returned %s" % (res,))
|
|
|
|
def _err(f):
|
2008-10-28 21:15:48 -07:00
|
|
|
f.trap(pollmixin.TimeoutError)
|
2008-02-04 20:35:07 -07:00
|
|
|
return None # success
|
|
|
|
d.addCallbacks(_suc, _err)
|
|
|
|
return d
|
2008-02-06 16:41:04 -07:00
|
|
|
|
2012-11-23 00:23:54 +00:00
|
|
|
|
2009-02-15 20:32:10 -07:00
|
|
|
ctr = [0]
|
2019-05-15 08:17:44 +02:00
|
|
|
class EqButNotIs(object):
|
2009-02-15 20:32:10 -07:00
|
|
|
def __init__(self, x):
|
|
|
|
self.x = x
|
|
|
|
self.hash = ctr[0]
|
|
|
|
ctr[0] += 1
|
|
|
|
def __repr__(self):
|
|
|
|
return "<%s %s>" % (self.__class__.__name__, self.x,)
|
|
|
|
def __hash__(self):
|
|
|
|
return self.hash
|
|
|
|
def __le__(self, other):
|
|
|
|
return self.x <= other
|
|
|
|
def __lt__(self, other):
|
|
|
|
return self.x < other
|
|
|
|
def __ge__(self, other):
|
|
|
|
return self.x >= other
|
|
|
|
def __gt__(self, other):
|
|
|
|
return self.x > other
|
|
|
|
def __ne__(self, other):
|
|
|
|
return self.x != other
|
|
|
|
def __eq__(self, other):
|
|
|
|
return self.x == other
|
|
|
|
|
2010-01-14 14:17:19 -08:00
|
|
|
|
2016-07-19 17:22:12 -07:00
|
|
|
class YAML(unittest.TestCase):
|
|
|
|
def test_convert(self):
|
|
|
|
data = yaml.safe_dump(["str", u"unicode", u"\u1234nicode"])
|
|
|
|
back = yamlutil.safe_load(data)
|
2020-08-06 16:06:01 +02:00
|
|
|
self.assertIsInstance(back[0], str)
|
|
|
|
self.assertIsInstance(back[1], str)
|
|
|
|
self.assertIsInstance(back[2], str)
|
2020-10-05 11:01:11 -04:00
|
|
|
|
|
|
|
|
|
|
|
class JSONBytes(unittest.TestCase):
|
|
|
|
"""Tests for BytesJSONEncoder."""
|
|
|
|
|
|
|
|
def test_encode_bytes(self):
|
2021-02-03 10:16:34 -05:00
|
|
|
"""BytesJSONEncoder can encode bytes.
|
|
|
|
|
|
|
|
Bytes are presumed to be UTF-8 encoded.
|
|
|
|
"""
|
|
|
|
snowman = u"def\N{SNOWMAN}\uFF00"
|
2020-10-05 11:01:11 -04:00
|
|
|
data = {
|
2021-02-03 10:16:34 -05:00
|
|
|
b"hello": [1, b"cd", {b"abc": [123, snowman.encode("utf-8")]}],
|
2020-10-05 11:01:11 -04:00
|
|
|
}
|
|
|
|
expected = {
|
2021-02-03 10:16:34 -05:00
|
|
|
u"hello": [1, u"cd", {u"abc": [123, snowman]}],
|
2020-10-05 11:01:11 -04:00
|
|
|
}
|
2020-10-15 08:37:09 -04:00
|
|
|
# Bytes get passed through as if they were UTF-8 Unicode:
|
2020-10-05 11:01:11 -04:00
|
|
|
encoded = jsonbytes.dumps(data)
|
|
|
|
self.assertEqual(json.loads(encoded), expected)
|
|
|
|
self.assertEqual(jsonbytes.loads(encoded), expected)
|
2020-10-15 08:37:09 -04:00
|
|
|
|
|
|
|
def test_encode_unicode(self):
|
|
|
|
"""BytesJSONEncoder encodes Unicode string as usual."""
|
|
|
|
expected = {
|
|
|
|
u"hello": [1, u"cd"],
|
|
|
|
}
|
|
|
|
encoded = jsonbytes.dumps(expected)
|
|
|
|
self.assertEqual(json.loads(encoded), expected)
|
2021-03-23 10:53:10 -04:00
|
|
|
|
|
|
|
def test_dumps_bytes(self):
|
|
|
|
"""jsonbytes.dumps_bytes always returns bytes."""
|
|
|
|
x = {u"def\N{SNOWMAN}\uFF00": 123}
|
|
|
|
encoded = jsonbytes.dumps_bytes(x)
|
|
|
|
self.assertIsInstance(encoded, bytes)
|
|
|
|
self.assertEqual(json.loads(encoded, encoding="utf-8"), x)
|