tahoe-lafs/src/allmydata/test/test_util.py

1358 lines
51 KiB
Python
Raw Normal View History

from __future__ import print_function
2019-05-28 16:22:31 +02:00
import binascii
import six
2019-05-28 16:22:31 +02:00
import hashlib
2017-08-12 21:21:04 -07:00
import os, time, sys
import yaml
2019-05-28 16:22:31 +02:00
from six.moves import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from allmydata.util import idlib, mathutil
2020-07-21 14:27:08 -04:00
from allmydata.util import fileutil
from allmydata.util import limiter, pollmixin
from allmydata.util import statistics, dictutil, yamlutil
from allmydata.util import log as tahoe_log
from allmydata.util.spans import Spans, overlap, DataSpans
2019-07-08 14:00:39 -06:00
from allmydata.util.fileutil import EncryptedTemporaryFile
from allmydata.test.common_util import ReallyEqualMixin
if six.PY3:
long = int
2019-05-28 16:22:31 +02:00
def sha256(data):
2019-06-17 19:16:39 -06:00
"""
:param bytes data: data to hash
:returns: a hex-encoded SHA256 hash of the data
"""
2020-07-29 09:00:50 +02:00
return binascii.hexlify(hashlib.sha256(data.encode('utf8')).digest()).decode("utf8")
2019-05-28 16:22:31 +02:00
class IDLib(unittest.TestCase):
def test_nodeid_b2a(self):
2020-07-29 09:00:50 +02:00
self.failUnlessEqual(idlib.nodeid_b2a(b"\x00"*20), "a"*32)
class MyList(list):
pass
class Math(unittest.TestCase):
def test_round_sigfigs(self):
f = mathutil.round_sigfigs
self.failUnlessEqual(f(22.0/3, 4), 7.3330000000000002)
class Statistics(unittest.TestCase):
def should_assert(self, msg, func, *args, **kwargs):
try:
func(*args, **kwargs)
self.fail(msg)
except AssertionError:
pass
def failUnlessListEqual(self, a, b, msg = None):
self.failUnlessEqual(len(a), len(b))
for i in range(len(a)):
self.failUnlessEqual(a[i], b[i], msg)
def failUnlessListAlmostEqual(self, a, b, places = 7, msg = None):
self.failUnlessEqual(len(a), len(b))
for i in range(len(a)):
self.failUnlessAlmostEqual(a[i], b[i], places, msg)
def test_binomial_coeff(self):
f = statistics.binomial_coeff
self.failUnlessEqual(f(20, 0), 1)
self.failUnlessEqual(f(20, 1), 20)
self.failUnlessEqual(f(20, 2), 190)
self.failUnlessEqual(f(20, 8), f(20, 12))
self.should_assert("Should assert if n < k", f, 2, 3)
def test_binomial_distribution_pmf(self):
f = statistics.binomial_distribution_pmf
pmf_comp = f(2, .1)
pmf_stat = [0.81, 0.18, 0.01]
self.failUnlessListAlmostEqual(pmf_comp, pmf_stat)
2009-04-03 16:33:02 -07:00
# Summing across a PMF should give the total probability 1
self.failUnlessAlmostEqual(sum(pmf_comp), 1)
self.should_assert("Should assert if not 0<=p<=1", f, 1, -1)
self.should_assert("Should assert if n < 1", f, 0, .1)
out = StringIO()
statistics.print_pmf(pmf_comp, out=out)
lines = out.getvalue().splitlines()
self.failUnlessEqual(lines[0], "i=0: 0.81")
self.failUnlessEqual(lines[1], "i=1: 0.18")
self.failUnlessEqual(lines[2], "i=2: 0.01")
def test_survival_pmf(self):
f = statistics.survival_pmf
# Cross-check binomial-distribution method against convolution
# method.
p_list = [.9999] * 100 + [.99] * 50 + [.8] * 20
pmf1 = statistics.survival_pmf_via_conv(p_list)
pmf2 = statistics.survival_pmf_via_bd(p_list)
self.failUnlessListAlmostEqual(pmf1, pmf2)
self.failUnlessTrue(statistics.valid_pmf(pmf1))
self.should_assert("Should assert if p_i > 1", f, [1.1]);
self.should_assert("Should assert if p_i < 0", f, [-.1]);
def test_repair_count_pmf(self):
survival_pmf = statistics.binomial_distribution_pmf(5, .9)
repair_pmf = statistics.repair_count_pmf(survival_pmf, 3)
# repair_pmf[0] == sum(survival_pmf[0,1,2,5])
# repair_pmf[1] == survival_pmf[4]
# repair_pmf[2] = survival_pmf[3]
self.failUnlessListAlmostEqual(repair_pmf,
[0.00001 + 0.00045 + 0.0081 + 0.59049,
.32805,
.0729,
0, 0, 0])
def test_repair_cost(self):
survival_pmf = statistics.binomial_distribution_pmf(5, .9)
bwcost = statistics.bandwidth_cost_function
cost = statistics.mean_repair_cost(bwcost, 1000,
survival_pmf, 3, ul_dl_ratio=1.0)
self.failUnlessAlmostEqual(cost, 558.90)
cost = statistics.mean_repair_cost(bwcost, 1000,
survival_pmf, 3, ul_dl_ratio=8.0)
self.failUnlessAlmostEqual(cost, 1664.55)
# I haven't manually checked the math beyond here -warner
cost = statistics.eternal_repair_cost(bwcost, 1000,
survival_pmf, 3,
discount_rate=0, ul_dl_ratio=1.0)
self.failUnlessAlmostEqual(cost, 65292.056074766246)
cost = statistics.eternal_repair_cost(bwcost, 1000,
survival_pmf, 3,
discount_rate=0.05,
ul_dl_ratio=1.0)
self.failUnlessAlmostEqual(cost, 9133.6097158191551)
def test_convolve(self):
f = statistics.convolve
v1 = [ 1, 2, 3 ]
v2 = [ 4, 5, 6 ]
v3 = [ 7, 8 ]
v1v2result = [ 4, 13, 28, 27, 18 ]
# Convolution is commutative
r1 = f(v1, v2)
r2 = f(v2, v1)
self.failUnlessListEqual(r1, r2, "Convolution should be commutative")
self.failUnlessListEqual(r1, v1v2result, "Didn't match known result")
# Convolution is associative
r1 = f(f(v1, v2), v3)
r2 = f(v1, f(v2, v3))
self.failUnlessListEqual(r1, r2, "Convolution should be associative")
# Convolution is distributive
r1 = f(v3, [ a + b for a, b in zip(v1, v2) ])
tmp1 = f(v3, v1)
tmp2 = f(v3, v2)
r2 = [ a + b for a, b in zip(tmp1, tmp2) ]
self.failUnlessListEqual(r1, r2, "Convolution should be distributive")
# Convolution is scalar multiplication associative
tmp1 = f(v1, v2)
r1 = [ a * 4 for a in tmp1 ]
tmp2 = [ a * 4 for a in v1 ]
r2 = f(tmp2, v2)
self.failUnlessListEqual(r1, r2, "Convolution should be scalar multiplication associative")
def test_find_k(self):
f = statistics.find_k
g = statistics.pr_file_loss
plist = [.9] * 10 + [.8] * 10 # N=20
t = .0001
k = f(plist, t)
self.failUnlessEqual(k, 10)
self.failUnless(g(plist, k) < t)
def test_pr_file_loss(self):
f = statistics.pr_file_loss
plist = [.5] * 10
self.failUnlessEqual(f(plist, 3), .0546875)
def test_pr_backup_file_loss(self):
f = statistics.pr_backup_file_loss
plist = [.5] * 10
self.failUnlessEqual(f(plist, .5, 3), .02734375)
class FileUtil(ReallyEqualMixin, unittest.TestCase):
def mkdir(self, basedir, path, mode=0o777):
fn = os.path.join(basedir, path)
fileutil.make_dirs(fn, mode)
2007-07-03 15:49:45 -07:00
def touch(self, basedir, path, mode=None, data="touch\n"):
fn = os.path.join(basedir, path)
f = open(fn, "w")
2007-07-03 15:49:45 -07:00
f.write(data)
f.close()
if mode is not None:
os.chmod(fn, mode)
def test_rm_dir(self):
basedir = "util/FileUtil/test_rm_dir"
fileutil.make_dirs(basedir)
# create it again to test idempotency
fileutil.make_dirs(basedir)
d = os.path.join(basedir, "doomed")
self.mkdir(d, "a/b")
self.touch(d, "a/b/1.txt")
self.touch(d, "a/b/2.txt", 0o444)
self.touch(d, "a/b/3.txt", 0)
self.mkdir(d, "a/c")
self.touch(d, "a/c/1.txt")
self.touch(d, "a/c/2.txt", 0o444)
self.touch(d, "a/c/3.txt", 0)
os.chmod(os.path.join(d, "a/c"), 0o444)
self.mkdir(d, "a/d")
self.touch(d, "a/d/1.txt")
self.touch(d, "a/d/2.txt", 0o444)
self.touch(d, "a/d/3.txt", 0)
os.chmod(os.path.join(d, "a/d"), 0)
fileutil.rm_dir(d)
self.failIf(os.path.exists(d))
# remove it again to test idempotency
fileutil.rm_dir(d)
def test_remove_if_possible(self):
basedir = "util/FileUtil/test_remove_if_possible"
fileutil.make_dirs(basedir)
self.touch(basedir, "here")
fn = os.path.join(basedir, "here")
fileutil.remove_if_possible(fn)
self.failIf(os.path.exists(fn))
fileutil.remove_if_possible(fn) # should be idempotent
fileutil.rm_dir(basedir)
fileutil.remove_if_possible(fn) # should survive errors
def test_write_atomically(self):
basedir = "util/FileUtil/test_write_atomically"
fileutil.make_dirs(basedir)
fn = os.path.join(basedir, "here")
fileutil.write_atomically(fn, "one")
self.failUnlessEqual(fileutil.read(fn), "one")
fileutil.write_atomically(fn, "two", mode="") # non-binary
self.failUnlessEqual(fileutil.read(fn), "two")
def test_rename(self):
basedir = "util/FileUtil/test_rename"
fileutil.make_dirs(basedir)
self.touch(basedir, "here")
fn = os.path.join(basedir, "here")
fn2 = os.path.join(basedir, "there")
fileutil.rename(fn, fn2)
self.failIf(os.path.exists(fn))
self.failUnless(os.path.exists(fn2))
2007-07-03 15:49:45 -07:00
def test_rename_no_overwrite(self):
workdir = fileutil.abspath_expanduser_unicode(u"test_rename_no_overwrite")
fileutil.make_dirs(workdir)
source_path = os.path.join(workdir, "source")
dest_path = os.path.join(workdir, "dest")
# when neither file exists
self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
# when only dest exists
2020-07-29 09:00:50 +02:00
fileutil.write(dest_path, b"dest")
self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
2020-07-29 09:00:50 +02:00
self.failUnlessEqual(fileutil.read(dest_path), b"dest")
# when both exist
2020-07-29 09:00:50 +02:00
fileutil.write(source_path, b"source")
self.failUnlessRaises(OSError, fileutil.rename_no_overwrite, source_path, dest_path)
2020-07-29 09:00:50 +02:00
self.failUnlessEqual(fileutil.read(source_path), b"source")
self.failUnlessEqual(fileutil.read(dest_path), b"dest")
# when only source exists
os.remove(dest_path)
fileutil.rename_no_overwrite(source_path, dest_path)
2020-07-29 09:00:50 +02:00
self.failUnlessEqual(fileutil.read(dest_path), b"source")
self.failIf(os.path.exists(source_path))
def test_replace_file(self):
workdir = fileutil.abspath_expanduser_unicode(u"test_replace_file")
fileutil.make_dirs(workdir)
replaced_path = os.path.join(workdir, "replaced")
replacement_path = os.path.join(workdir, "replacement")
# when none of the files exist
2018-03-27 16:11:40 -06:00
self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path)
# when only replaced exists
2020-07-29 09:00:50 +02:00
fileutil.write(replaced_path, b"foo")
2018-03-27 16:11:40 -06:00
self.failUnlessRaises(fileutil.ConflictError, fileutil.replace_file, replaced_path, replacement_path)
2020-07-29 09:00:50 +02:00
self.failUnlessEqual(fileutil.read(replaced_path), b"foo")
2018-03-27 16:11:40 -06:00
# when both replaced and replacement exist
2020-07-29 09:00:50 +02:00
fileutil.write(replacement_path, b"bar")
2018-03-27 16:11:40 -06:00
fileutil.replace_file(replaced_path, replacement_path)
2020-07-29 09:00:50 +02:00
self.failUnlessEqual(fileutil.read(replaced_path), b"bar")
self.failIf(os.path.exists(replacement_path))
# when only replacement exists
os.remove(replaced_path)
2020-07-29 09:00:50 +02:00
fileutil.write(replacement_path, b"bar")
2018-03-27 16:11:40 -06:00
fileutil.replace_file(replaced_path, replacement_path)
2020-07-29 09:00:50 +02:00
self.failUnlessEqual(fileutil.read(replaced_path), b"bar")
self.failIf(os.path.exists(replacement_path))
2007-07-03 15:49:45 -07:00
def test_du(self):
basedir = "util/FileUtil/test_du"
fileutil.make_dirs(basedir)
d = os.path.join(basedir, "space-consuming")
self.mkdir(d, "a/b")
self.touch(d, "a/b/1.txt", data="a"*10)
self.touch(d, "a/b/2.txt", data="b"*11)
self.mkdir(d, "a/c")
self.touch(d, "a/c/1.txt", data="c"*12)
self.touch(d, "a/c/2.txt", data="d"*13)
used = fileutil.du(basedir)
self.failUnlessEqual(10+11+12+13, used)
def test_abspath_expanduser_unicode(self):
self.failUnlessRaises(AssertionError, fileutil.abspath_expanduser_unicode, "bytestring")
saved_cwd = os.path.normpath(os.getcwdu())
abspath_cwd = fileutil.abspath_expanduser_unicode(u".")
abspath_cwd_notlong = fileutil.abspath_expanduser_unicode(u".", long_path=False)
self.failUnless(isinstance(saved_cwd, unicode), saved_cwd)
self.failUnless(isinstance(abspath_cwd, unicode), abspath_cwd)
if sys.platform == "win32":
self.failUnlessReallyEqual(abspath_cwd, fileutil.to_windows_long_path(saved_cwd))
else:
self.failUnlessReallyEqual(abspath_cwd, saved_cwd)
self.failUnlessReallyEqual(abspath_cwd_notlong, saved_cwd)
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\?\\foo"), u"\\\\?\\foo")
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\.\\foo"), u"\\\\.\\foo")
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"\\\\server\\foo"), u"\\\\?\\UNC\\server\\foo")
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo"), u"\\\\?\\C:\\foo")
self.failUnlessReallyEqual(fileutil.to_windows_long_path(u"C:\\foo/bar"), u"\\\\?\\C:\\foo\\bar")
# adapted from <http://svn.python.org/view/python/branches/release26-maint/Lib/test/test_posixpath.py?view=markup&pathrev=78279#test_abspath>
foo = fileutil.abspath_expanduser_unicode(u"foo")
self.failUnless(foo.endswith(u"%sfoo" % (os.path.sep,)), foo)
foobar = fileutil.abspath_expanduser_unicode(u"bar", base=foo)
self.failUnless(foobar.endswith(u"%sfoo%sbar" % (os.path.sep, os.path.sep)), foobar)
if sys.platform == "win32":
# This is checking that a drive letter is added for a path without one.
baz = fileutil.abspath_expanduser_unicode(u"\\baz")
self.failUnless(baz.startswith(u"\\\\?\\"), baz)
self.failUnlessReallyEqual(baz[5 :], u":\\baz")
bar = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz)
self.failUnless(bar.startswith(u"\\\\?\\"), bar)
self.failUnlessReallyEqual(bar[5 :], u":\\bar")
# not u":\\baz\\bar", because \bar is absolute on the current drive.
self.failUnlessReallyEqual(baz[4], bar[4]) # same drive
baz_notlong = fileutil.abspath_expanduser_unicode(u"\\baz", long_path=False)
self.failIf(baz_notlong.startswith(u"\\\\?\\"), baz_notlong)
self.failUnlessReallyEqual(baz_notlong[1 :], u":\\baz")
bar_notlong = fileutil.abspath_expanduser_unicode(u"\\bar", base=baz_notlong, long_path=False)
self.failIf(bar_notlong.startswith(u"\\\\?\\"), bar_notlong)
self.failUnlessReallyEqual(bar_notlong[1 :], u":\\bar")
# not u":\\baz\\bar", because \bar is absolute on the current drive.
self.failUnlessReallyEqual(baz_notlong[0], bar_notlong[0]) # same drive
self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~"))
self.failIfIn(u"~", fileutil.abspath_expanduser_unicode(u"~", long_path=False))
cwds = ['cwd']
try:
cwds.append(u'\xe7w\xf0'.encode(sys.getfilesystemencoding()
or 'ascii'))
except UnicodeEncodeError:
pass # the cwd can't be encoded -- test with ascii cwd only
for cwd in cwds:
try:
os.mkdir(cwd)
os.chdir(cwd)
for upath in (u'', u'fuu', u'f\xf9\xf9', u'/fuu', u'U:\\', u'~'):
uabspath = fileutil.abspath_expanduser_unicode(upath)
self.failUnless(isinstance(uabspath, unicode), uabspath)
uabspath_notlong = fileutil.abspath_expanduser_unicode(upath, long_path=False)
self.failUnless(isinstance(uabspath_notlong, unicode), uabspath_notlong)
finally:
os.chdir(saved_cwd)
def test_make_dirs_with_absolute_mode(self):
2016-04-26 15:56:19 -06:00
if sys.platform == 'win32':
raise unittest.SkipTest("Permissions don't work the same on windows.")
workdir = fileutil.abspath_expanduser_unicode(u"test_make_dirs_with_absolute_mode")
fileutil.make_dirs(workdir)
abspath = fileutil.abspath_expanduser_unicode(u"a/b/c/d", base=workdir)
fileutil.make_dirs_with_absolute_mode(workdir, abspath, 0o766)
new_mode = os.stat(os.path.join(workdir, "a", "b", "c", "d")).st_mode & 0o777
self.failUnlessEqual(new_mode, 0o766)
new_mode = os.stat(os.path.join(workdir, "a", "b", "c")).st_mode & 0o777
self.failUnlessEqual(new_mode, 0o766)
new_mode = os.stat(os.path.join(workdir, "a", "b")).st_mode & 0o777
self.failUnlessEqual(new_mode, 0o766)
new_mode = os.stat(os.path.join(workdir, "a")).st_mode & 0o777
self.failUnlessEqual(new_mode, 0o766)
new_mode = os.stat(workdir).st_mode & 0o777
self.failIfEqual(new_mode, 0o766)
def test_create_long_path(self):
"""
Even for paths with total length greater than 260 bytes,
``fileutil.abspath_expanduser_unicode`` produces a path on which other
path-related APIs can operate.
https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247(v=vs.85).aspx
documents certain Windows-specific path length limitations this test
is specifically intended to demonstrate can be overcome.
"""
workdir = u"test_create_long_path"
fileutil.make_dirs(workdir)
base_path = fileutil.abspath_expanduser_unicode(workdir)
base_length = len(base_path)
# Construct a path /just/ long enough to exercise the important case.
# It would be nice if we could just use a seemingly globally valid
# long file name (the `x...` portion) here - for example, a name 255
# bytes long- and a previous version of this test did just that.
# However, aufs imposes a 242 byte length limit on file names. Most
# other POSIX filesystems do allow names up to 255 bytes. It's not
# clear there's anything we can *do* about lower limits, though, and
# POSIX.1-2017 (and earlier) only requires that the maximum be at
# least 14 (!!!) bytes.
long_path = os.path.join(base_path, u'x' * (261 - base_length))
def _cleanup():
fileutil.remove(long_path)
self.addCleanup(_cleanup)
2020-07-29 09:00:50 +02:00
fileutil.write(long_path, b"test")
self.failUnless(os.path.exists(long_path))
2020-07-29 09:00:50 +02:00
self.failUnlessEqual(fileutil.read(long_path), b"test")
_cleanup()
self.failIf(os.path.exists(long_path))
def _test_windows_expanduser(self, userprofile=None, homedrive=None, homepath=None):
def call_windows_getenv(name):
if name == u"USERPROFILE": return userprofile
if name == u"HOMEDRIVE": return homedrive
if name == u"HOMEPATH": return homepath
self.fail("unexpected argument to call_windows_getenv")
self.patch(fileutil, 'windows_getenv', call_windows_getenv)
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~\\foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"~/foo"), os.path.join(u"C:", u"\\Documents and Settings\\\u0100", u"foo"))
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a"), u"a")
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a~"), u"a~")
self.failUnlessReallyEqual(fileutil.windows_expanduser(u"a\\~\\foo"), u"a\\~\\foo")
def test_windows_expanduser_xp(self):
return self._test_windows_expanduser(homedrive=u"C:", homepath=u"\\Documents and Settings\\\u0100")
def test_windows_expanduser_win7(self):
return self._test_windows_expanduser(userprofile=os.path.join(u"C:", u"\\Documents and Settings\\\u0100"))
def test_disk_stats(self):
avail = fileutil.get_available_space('.', 2**14)
if avail == 0:
raise unittest.SkipTest("This test will spuriously fail there is no disk space left.")
disk = fileutil.get_disk_stats('.', 2**13)
self.failUnless(disk['total'] > 0, disk['total'])
# we tolerate used==0 for a Travis-CI bug, see #2290
self.failUnless(disk['used'] >= 0, disk['used'])
self.failUnless(disk['free_for_root'] > 0, disk['free_for_root'])
self.failUnless(disk['free_for_nonroot'] > 0, disk['free_for_nonroot'])
self.failUnless(disk['avail'] > 0, disk['avail'])
def test_disk_stats_avail_nonnegative(self):
# This test will spuriously fail if you have more than 2^128
# bytes of available space on your filesystem.
disk = fileutil.get_disk_stats('.', 2**128)
self.failUnlessEqual(disk['avail'], 0)
def test_get_pathinfo(self):
basedir = "util/FileUtil/test_get_pathinfo"
fileutil.make_dirs(basedir)
# create a directory
self.mkdir(basedir, "a")
dirinfo = fileutil.get_pathinfo(basedir)
self.failUnlessTrue(dirinfo.isdir)
self.failUnlessTrue(dirinfo.exists)
self.failUnlessFalse(dirinfo.isfile)
self.failUnlessFalse(dirinfo.islink)
# create a file
f = os.path.join(basedir, "1.txt")
2020-07-29 09:00:50 +02:00
fileutil.write(f, b"a"*10)
fileinfo = fileutil.get_pathinfo(f)
self.failUnlessTrue(fileinfo.isfile)
self.failUnlessTrue(fileinfo.exists)
self.failUnlessFalse(fileinfo.isdir)
self.failUnlessFalse(fileinfo.islink)
self.failUnlessEqual(fileinfo.size, 10)
# path at which nothing exists
dnename = os.path.join(basedir, "doesnotexist")
now_ns = fileutil.seconds_to_ns(time.time())
dneinfo = fileutil.get_pathinfo(dnename, now_ns=now_ns)
self.failUnlessFalse(dneinfo.exists)
self.failUnlessFalse(dneinfo.isfile)
self.failUnlessFalse(dneinfo.isdir)
self.failUnlessFalse(dneinfo.islink)
self.failUnlessEqual(dneinfo.size, None)
self.failUnlessEqual(dneinfo.mtime_ns, now_ns)
self.failUnlessEqual(dneinfo.ctime_ns, now_ns)
def test_get_pathinfo_symlink(self):
if not hasattr(os, 'symlink'):
raise unittest.SkipTest("can't create symlinks on this platform")
basedir = "util/FileUtil/test_get_pathinfo"
fileutil.make_dirs(basedir)
f = os.path.join(basedir, "1.txt")
2020-07-29 09:00:50 +02:00
fileutil.write(f, b"a"*10)
# create a symlink pointing to 1.txt
slname = os.path.join(basedir, "linkto1.txt")
os.symlink(f, slname)
symlinkinfo = fileutil.get_pathinfo(slname)
self.failUnlessTrue(symlinkinfo.islink)
self.failUnlessTrue(symlinkinfo.exists)
self.failUnlessFalse(symlinkinfo.isfile)
self.failUnlessFalse(symlinkinfo.isdir)
2019-06-17 21:56:06 -06:00
def test_encrypted_tempfile(self):
f = EncryptedTemporaryFile()
f.write("foobar")
2019-06-17 21:56:06 -06:00
f.close()
class PollMixinTests(unittest.TestCase):
def setUp(self):
self.pm = pollmixin.PollMixin()
def test_PollMixin_True(self):
d = self.pm.poll(check_f=lambda : True,
pollinterval=0.1)
return d
def test_PollMixin_False_then_True(self):
i = iter([False, True])
2020-07-29 09:00:50 +02:00
d = self.pm.poll(check_f=lambda: next(i),
pollinterval=0.1)
return d
def test_timeout(self):
d = self.pm.poll(check_f=lambda: False,
pollinterval=0.01,
timeout=1)
def _suc(res):
self.fail("poll should have failed, not returned %s" % (res,))
def _err(f):
f.trap(pollmixin.TimeoutError)
return None # success
d.addCallbacks(_suc, _err)
return d
class Limiter(unittest.TestCase):
def job(self, i, foo):
self.calls.append( (i, foo) )
self.simultaneous += 1
self.peak_simultaneous = max(self.simultaneous, self.peak_simultaneous)
d = defer.Deferred()
def _done():
self.simultaneous -= 1
d.callback("done %d" % i)
reactor.callLater(1.0, _done)
return d
def bad_job(self, i, foo):
raise ValueError("bad_job %d" % i)
def test_limiter(self):
self.calls = []
self.simultaneous = 0
self.peak_simultaneous = 0
l = limiter.ConcurrencyLimiter()
dl = []
for i in range(20):
dl.append(l.add(self.job, i, foo=str(i)))
d = defer.DeferredList(dl, fireOnOneErrback=True)
def _done(res):
self.failUnlessEqual(self.simultaneous, 0)
self.failUnless(self.peak_simultaneous <= 10)
self.failUnlessEqual(len(self.calls), 20)
for i in range(20):
self.failUnless( (i, str(i)) in self.calls)
d.addCallback(_done)
return d
def test_errors(self):
self.calls = []
self.simultaneous = 0
self.peak_simultaneous = 0
l = limiter.ConcurrencyLimiter()
dl = []
for i in range(20):
dl.append(l.add(self.job, i, foo=str(i)))
d2 = l.add(self.bad_job, 21, "21")
d = defer.DeferredList(dl, fireOnOneErrback=True)
def _most_done(res):
results = []
for (success, result) in res:
self.failUnlessEqual(success, True)
results.append(result)
results.sort()
expected_results = ["done %d" % i for i in range(20)]
expected_results.sort()
self.failUnlessEqual(results, expected_results)
self.failUnless(self.peak_simultaneous <= 10)
self.failUnlessEqual(len(self.calls), 20)
for i in range(20):
self.failUnless( (i, str(i)) in self.calls)
def _good(res):
self.fail("should have failed, not got %s" % (res,))
def _err(f):
f.trap(ValueError)
self.failUnless("bad_job 21" in str(f))
d2.addCallbacks(_good, _err)
return d2
d.addCallback(_most_done)
def _all_done(res):
self.failUnlessEqual(self.simultaneous, 0)
self.failUnless(self.peak_simultaneous <= 10)
self.failUnlessEqual(len(self.calls), 20)
for i in range(20):
self.failUnless( (i, str(i)) in self.calls)
d.addCallback(_all_done)
return d
ctr = [0]
2019-05-15 08:17:44 +02:00
class EqButNotIs(object):
def __init__(self, x):
self.x = x
self.hash = ctr[0]
ctr[0] += 1
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.x,)
def __hash__(self):
return self.hash
def __le__(self, other):
return self.x <= other
def __lt__(self, other):
return self.x < other
def __ge__(self, other):
return self.x >= other
def __gt__(self, other):
return self.x > other
def __ne__(self, other):
return self.x != other
def __eq__(self, other):
return self.x == other
class DictUtil(unittest.TestCase):
def test_dict_of_sets(self):
ds = dictutil.DictOfSets()
ds.add(1, "a")
ds.add(2, "b")
ds.add(2, "b")
ds.add(2, "c")
self.failUnlessEqual(ds[1], set(["a"]))
self.failUnlessEqual(ds[2], set(["b", "c"]))
ds.discard(3, "d") # should not raise an exception
ds.discard(2, "b")
self.failUnlessEqual(ds[2], set(["c"]))
ds.discard(2, "c")
self.failIf(2 in ds)
ds.add(3, "f")
ds2 = dictutil.DictOfSets()
ds2.add(3, "f")
ds2.add(3, "g")
ds2.add(4, "h")
ds.update(ds2)
self.failUnlessEqual(ds[1], set(["a"]))
self.failUnlessEqual(ds[3], set(["f", "g"]))
self.failUnlessEqual(ds[4], set(["h"]))
def test_auxdict(self):
d = dictutil.AuxValueDict()
# we put the serialized form in the auxdata
d.set_with_aux("key", ("filecap", "metadata"), "serialized")
self.failUnlessEqual(d.keys(), ["key"])
self.failUnlessEqual(d["key"], ("filecap", "metadata"))
self.failUnlessEqual(d.get_aux("key"), "serialized")
def _get_missing(key):
return d[key]
self.failUnlessRaises(KeyError, _get_missing, "nonkey")
self.failUnlessEqual(d.get("nonkey"), None)
self.failUnlessEqual(d.get("nonkey", "nonvalue"), "nonvalue")
self.failUnlessEqual(d.get_aux("nonkey"), None)
self.failUnlessEqual(d.get_aux("nonkey", "nonvalue"), "nonvalue")
d["key"] = ("filecap2", "metadata2")
self.failUnlessEqual(d["key"], ("filecap2", "metadata2"))
self.failUnlessEqual(d.get_aux("key"), None)
d.set_with_aux("key2", "value2", "aux2")
self.failUnlessEqual(sorted(d.keys()), ["key", "key2"])
del d["key2"]
self.failUnlessEqual(d.keys(), ["key"])
self.failIf("key2" in d)
self.failUnlessRaises(KeyError, _get_missing, "key2")
self.failUnlessEqual(d.get("key2"), None)
self.failUnlessEqual(d.get_aux("key2"), None)
d["key2"] = "newvalue2"
self.failUnlessEqual(d.get("key2"), "newvalue2")
self.failUnlessEqual(d.get_aux("key2"), None)
d = dictutil.AuxValueDict({1:2,3:4})
self.failUnlessEqual(sorted(d.keys()), [1,3])
self.failUnlessEqual(d[1], 2)
self.failUnlessEqual(d.get_aux(1), None)
d = dictutil.AuxValueDict([ (1,2), (3,4) ])
self.failUnlessEqual(sorted(d.keys()), [1,3])
self.failUnlessEqual(d[1], 2)
self.failUnlessEqual(d.get_aux(1), None)
d = dictutil.AuxValueDict(one=1, two=2)
self.failUnlessEqual(sorted(d.keys()), ["one","two"])
self.failUnlessEqual(d["one"], 1)
self.failUnlessEqual(d.get_aux("one"), None)
class SampleError(Exception):
pass
class Log(unittest.TestCase):
def test_err(self):
try:
raise SampleError("simple sample")
except:
f = Failure()
tahoe_log.err(format="intentional sample error",
failure=f, level=tahoe_log.OPERATIONAL, umid="wO9UoQ")
self.flushLoggedErrors(SampleError)
2019-05-15 08:17:44 +02:00
class SimpleSpans(object):
# this is a simple+inefficient form of util.spans.Spans . We compare the
# behavior of this reference model against the real (efficient) form.
def __init__(self, _span_or_start=None, length=None):
self._have = set()
if length is not None:
for i in range(_span_or_start, _span_or_start+length):
self._have.add(i)
elif _span_or_start:
for (start,length) in _span_or_start:
self.add(start, length)
def add(self, start, length):
for i in range(start, start+length):
self._have.add(i)
return self
def remove(self, start, length):
for i in range(start, start+length):
self._have.discard(i)
return self
def each(self):
return sorted(self._have)
def __iter__(self):
items = sorted(self._have)
prevstart = None
prevend = None
for i in items:
if prevstart is None:
prevstart = prevend = i
continue
if i == prevend+1:
prevend = i
continue
yield (prevstart, prevend-prevstart+1)
prevstart = prevend = i
if prevstart is not None:
yield (prevstart, prevend-prevstart+1)
def __bool__(self): # this gets us bool()
2020-07-29 09:00:50 +02:00
return bool(self.len())
def len(self):
return len(self._have)
def __add__(self, other):
s = self.__class__(self)
for (start, length) in other:
s.add(start, length)
return s
def __sub__(self, other):
s = self.__class__(self)
for (start, length) in other:
s.remove(start, length)
return s
def __iadd__(self, other):
for (start, length) in other:
self.add(start, length)
return self
def __isub__(self, other):
for (start, length) in other:
self.remove(start, length)
return self
def __and__(self, other):
s = self.__class__()
for i in other.each():
if i in self._have:
s.add(i, 1)
return s
def __contains__(self, start_and_length):
(start, length) = start_and_length
for i in range(start, start+length):
if i not in self._have:
return False
return True
class ByteSpans(unittest.TestCase):
def test_basic(self):
s = Spans()
self.failUnlessEqual(list(s), [])
self.failIf(s)
self.failIf((0,1) in s)
self.failUnlessEqual(s.len(), 0)
s1 = Spans(3, 4) # 3,4,5,6
self._check1(s1)
s1 = Spans(long(3), long(4)) # 3,4,5,6
self._check1(s1)
s2 = Spans(s1)
self._check1(s2)
s2.add(10,2) # 10,11
self._check1(s1)
self.failUnless((10,1) in s2)
self.failIf((10,1) in s1)
self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11])
self.failUnlessEqual(s2.len(), 6)
s2.add(15,2).add(20,2)
self.failUnlessEqual(list(s2.each()), [3,4,5,6,10,11,15,16,20,21])
self.failUnlessEqual(s2.len(), 10)
s2.remove(4,3).remove(15,1)
self.failUnlessEqual(list(s2.each()), [3,10,11,16,20,21])
self.failUnlessEqual(s2.len(), 6)
s1 = SimpleSpans(3, 4) # 3 4 5 6
s2 = SimpleSpans(5, 4) # 5 6 7 8
i = s1 & s2
self.failUnlessEqual(list(i.each()), [5, 6])
def _check1(self, s):
self.failUnlessEqual(list(s), [(3,4)])
self.failUnless(s)
self.failUnlessEqual(s.len(), 4)
self.failIf((0,1) in s)
self.failUnless((3,4) in s)
self.failUnless((3,1) in s)
self.failUnless((5,2) in s)
self.failUnless((6,1) in s)
self.failIf((6,2) in s)
self.failIf((7,1) in s)
self.failUnlessEqual(list(s.each()), [3,4,5,6])
def test_large(self):
s = Spans(4, 2**65) # don't do this with a SimpleSpans
self.failUnlessEqual(list(s), [(4, 2**65)])
self.failUnless(s)
self.failUnlessEqual(s.len(), 2**65)
self.failIf((0,1) in s)
self.failUnless((4,2) in s)
self.failUnless((2**65,2) in s)
def test_math(self):
s1 = Spans(0, 10) # 0,1,2,3,4,5,6,7,8,9
s2 = Spans(5, 3) # 5,6,7
s3 = Spans(8, 4) # 8,9,10,11
s = s1 - s2
self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
s = s1 - s3
self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
s = s2 - s3
self.failUnlessEqual(list(s.each()), [5,6,7])
s = s1 & s2
self.failUnlessEqual(list(s.each()), [5,6,7])
s = s2 & s1
self.failUnlessEqual(list(s.each()), [5,6,7])
s = s1 & s3
self.failUnlessEqual(list(s.each()), [8,9])
s = s3 & s1
self.failUnlessEqual(list(s.each()), [8,9])
s = s2 & s3
self.failUnlessEqual(list(s.each()), [])
s = s3 & s2
self.failUnlessEqual(list(s.each()), [])
s = Spans() & s3
self.failUnlessEqual(list(s.each()), [])
s = s3 & Spans()
self.failUnlessEqual(list(s.each()), [])
s = s1 + s2
self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
s = s1 + s3
self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
s = s2 + s3
self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
s = Spans(s1)
s -= s2
self.failUnlessEqual(list(s.each()), [0,1,2,3,4,8,9])
s = Spans(s1)
s -= s3
self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7])
s = Spans(s2)
s -= s3
self.failUnlessEqual(list(s.each()), [5,6,7])
s = Spans(s1)
s += s2
self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9])
s = Spans(s1)
s += s3
self.failUnlessEqual(list(s.each()), [0,1,2,3,4,5,6,7,8,9,10,11])
s = Spans(s2)
s += s3
self.failUnlessEqual(list(s.each()), [5,6,7,8,9,10,11])
def test_random(self):
# attempt to increase coverage of corner cases by comparing behavior
# of a simple-but-slow model implementation against the
# complex-but-fast actual implementation, in a large number of random
# operations
S1 = SimpleSpans
S2 = Spans
s1 = S1(); s2 = S2()
seed = ""
def _create(subseed):
ns1 = S1(); ns2 = S2()
for i in range(10):
2019-05-28 16:22:31 +02:00
what = sha256(subseed+str(i))
start = int(what[2:4], 16)
length = max(1,int(what[5:6], 16))
ns1.add(start, length); ns2.add(start, length)
return ns1, ns2
#print
for i in range(1000):
2019-05-28 16:22:31 +02:00
what = sha256(seed+str(i))
op = what[0]
subop = what[1]
start = int(what[2:4], 16)
length = max(1,int(what[5:6], 16))
#print what
if op in "0":
if subop in "01234":
s1 = S1(); s2 = S2()
elif subop in "5678":
s1 = S1(start, length); s2 = S2(start, length)
else:
s1 = S1(s1); s2 = S2(s2)
#print "s2 = %s" % s2.dump()
elif op in "123":
#print "s2.add(%d,%d)" % (start, length)
s1.add(start, length); s2.add(start, length)
elif op in "456":
#print "s2.remove(%d,%d)" % (start, length)
s1.remove(start, length); s2.remove(start, length)
elif op in "78":
ns1, ns2 = _create(what[7:11])
#print "s2 + %s" % ns2.dump()
s1 = s1 + ns1; s2 = s2 + ns2
elif op in "9a":
ns1, ns2 = _create(what[7:11])
#print "%s - %s" % (s2.dump(), ns2.dump())
s1 = s1 - ns1; s2 = s2 - ns2
elif op in "bc":
ns1, ns2 = _create(what[7:11])
#print "s2 += %s" % ns2.dump()
s1 += ns1; s2 += ns2
elif op in "de":
ns1, ns2 = _create(what[7:11])
#print "%s -= %s" % (s2.dump(), ns2.dump())
s1 -= ns1; s2 -= ns2
else:
ns1, ns2 = _create(what[7:11])
#print "%s &= %s" % (s2.dump(), ns2.dump())
s1 = s1 & ns1; s2 = s2 & ns2
#print "s2 now %s" % s2.dump()
self.failUnlessEqual(list(s1.each()), list(s2.each()))
self.failUnlessEqual(s1.len(), s2.len())
self.failUnlessEqual(bool(s1), bool(s2))
self.failUnlessEqual(list(s1), list(s2))
for j in range(10):
2019-05-28 16:22:31 +02:00
what = sha256(what[12:14]+str(j))
start = int(what[2:4], 16)
length = max(1, int(what[5:6], 16))
span = (start, length)
self.failUnlessEqual(bool(span in s1), bool(span in s2))
# s()
# s(start,length)
# s(s0)
# s.add(start,length) : returns s
# s.remove(start,length)
# s.each() -> list of byte offsets, mostly for testing
# list(s) -> list of (start,length) tuples, one per span
# (start,length) in s -> True if (start..start+length-1) are all members
# NOT equivalent to x in list(s)
# s.len() -> number of bytes, for testing, bool(), and accounting/limiting
# bool(s) (__nonzeron__)
# s = s1+s2, s1-s2, +=s1, -=s1
def test_overlap(self):
for a in range(20):
for b in range(10):
for c in range(20):
for d in range(10):
self._test_overlap(a,b,c,d)
def _test_overlap(self, a, b, c, d):
s1 = set(range(a,a+b))
s2 = set(range(c,c+d))
#print "---"
#self._show_overlap(s1, "1")
#self._show_overlap(s2, "2")
o = overlap(a,b,c,d)
expected = s1.intersection(s2)
if not expected:
self.failUnlessEqual(o, None)
else:
start,length = o
so = set(range(start,start+length))
#self._show(so, "o")
self.failUnlessEqual(so, expected)
def _show_overlap(self, s, c):
import sys
out = sys.stdout
if s:
for i in range(max(s)):
if i in s:
out.write(c)
else:
out.write(" ")
out.write("\n")
def extend(s, start, length, fill):
if len(s) >= start+length:
return s
assert len(fill) == 1
return s + fill*(start+length-len(s))
def replace(s, start, data):
assert len(s) >= start+len(data)
return s[:start] + data + s[start+len(data):]
2019-05-15 08:17:44 +02:00
class SimpleDataSpans(object):
def __init__(self, other=None):
self.missing = "" # "1" where missing, "0" where found
self.data = ""
if other:
for (start, data) in other.get_chunks():
self.add(start, data)
def __bool__(self): # this gets us bool()
2020-07-29 09:00:50 +02:00
return bool(self.len())
2020-07-29 21:09:23 +02:00
def len(self):
return len(self.missing.replace("1", ""))
def _dump(self):
return [i for (i,c) in enumerate(self.missing) if c == "0"]
def _have(self, start, length):
m = self.missing[start:start+length]
if not m or len(m)<length or int(m):
return False
return True
def get_chunks(self):
for i in self._dump():
yield (i, self.data[i])
def get_spans(self):
return SimpleSpans([(start,len(data))
for (start,data) in self.get_chunks()])
def get(self, start, length):
if self._have(start, length):
return self.data[start:start+length]
return None
def pop(self, start, length):
data = self.get(start, length)
if data:
self.remove(start, length)
return data
def remove(self, start, length):
self.missing = replace(extend(self.missing, start, length, "1"),
start, "1"*length)
def add(self, start, data):
self.missing = replace(extend(self.missing, start, len(data), "1"),
start, "0"*len(data))
self.data = replace(extend(self.data, start, len(data), " "),
start, data)
class StringSpans(unittest.TestCase):
def do_basic(self, klass):
ds = klass()
self.failUnlessEqual(ds.len(), 0)
self.failUnlessEqual(list(ds._dump()), [])
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 0)
s1 = ds.get_spans()
self.failUnlessEqual(ds.get(0, 4), None)
self.failUnlessEqual(ds.pop(0, 4), None)
ds.remove(0, 4)
ds.add(2, "four")
self.failUnlessEqual(ds.len(), 4)
self.failUnlessEqual(list(ds._dump()), [2,3,4,5])
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
s1 = ds.get_spans()
self.failUnless((2,2) in s1)
self.failUnlessEqual(ds.get(0, 4), None)
self.failUnlessEqual(ds.pop(0, 4), None)
self.failUnlessEqual(ds.get(4, 4), None)
ds2 = klass(ds)
self.failUnlessEqual(ds2.len(), 4)
self.failUnlessEqual(list(ds2._dump()), [2,3,4,5])
self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 4)
self.failUnlessEqual(ds2.get(0, 4), None)
self.failUnlessEqual(ds2.pop(0, 4), None)
self.failUnlessEqual(ds2.pop(2, 3), "fou")
self.failUnlessEqual(sum([len(d) for (s,d) in ds2.get_chunks()]), 1)
self.failUnlessEqual(ds2.get(2, 3), None)
self.failUnlessEqual(ds2.get(5, 1), "r")
self.failUnlessEqual(ds.get(2, 3), "fou")
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 4)
ds.add(0, "23")
self.failUnlessEqual(ds.len(), 6)
self.failUnlessEqual(list(ds._dump()), [0,1,2,3,4,5])
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 6)
self.failUnlessEqual(ds.get(0, 4), "23fo")
self.failUnlessEqual(ds.pop(0, 4), "23fo")
self.failUnlessEqual(sum([len(d) for (s,d) in ds.get_chunks()]), 2)
self.failUnlessEqual(ds.get(0, 4), None)
self.failUnlessEqual(ds.pop(0, 4), None)
ds = klass()
ds.add(2, "four")
ds.add(3, "ea")
self.failUnlessEqual(ds.get(2, 4), "fear")
ds = klass()
ds.add(long(2), "four")
ds.add(long(3), "ea")
self.failUnlessEqual(ds.get(long(2), long(4)), "fear")
def do_scan(self, klass):
# do a test with gaps and spans of size 1 and 2
# left=(1,11) * right=(1,11) * gapsize=(1,2)
# 111, 112, 121, 122, 211, 212, 221, 222
# 211
# 121
# 112
# 212
# 222
# 221
# 111
# 122
# 11 1 1 11 11 11 1 1 111
# 0123456789012345678901234567
# abcdefghijklmnopqrstuvwxyz-=
pieces = [(1, "bc"),
(4, "e"),
(7, "h"),
(9, "jk"),
(12, "mn"),
(16, "qr"),
(20, "u"),
(22, "w"),
(25, "z-="),
]
p_elements = set([1,2,4,7,9,10,12,13,16,17,20,22,25,26,27])
S = "abcdefghijklmnopqrstuvwxyz-="
# TODO: when adding data, add capital letters, to make sure we aren't
# just leaving the old data in place
l = len(S)
def base():
ds = klass()
for start, data in pieces:
ds.add(start, data)
return ds
def dump(s):
p = set(s._dump())
d = "".join([((i not in p) and " " or S[i]) for i in range(l)])
assert len(d) == l
return d
DEBUG = False
for start in range(0, l):
for end in range(start+1, l):
# add [start-end) to the baseline
which = "%d-%d" % (start, end-1)
p_added = set(range(start, end))
b = base()
if DEBUG:
print()
print(dump(b), which)
add = klass(); add.add(start, S[start:end])
print(dump(add))
b.add(start, S[start:end])
if DEBUG:
print(dump(b))
# check that the new span is there
d = b.get(start, end-start)
self.failUnlessEqual(d, S[start:end], which)
# check that all the original pieces are still there
for t_start, t_data in pieces:
t_len = len(t_data)
self.failUnlessEqual(b.get(t_start, t_len),
S[t_start:t_start+t_len],
"%s %d+%d" % (which, t_start, t_len))
# check that a lot of subspans are mostly correct
for t_start in range(l):
for t_len in range(1,4):
d = b.get(t_start, t_len)
if d is not None:
which2 = "%s+(%d-%d)" % (which, t_start,
t_start+t_len-1)
self.failUnlessEqual(d, S[t_start:t_start+t_len],
which2)
# check that removing a subspan gives the right value
b2 = klass(b)
b2.remove(t_start, t_len)
removed = set(range(t_start, t_start+t_len))
for i in range(l):
exp = (((i in p_elements) or (i in p_added))
and (i not in removed))
which2 = "%s-(%d-%d)" % (which, t_start,
t_start+t_len-1)
self.failUnlessEqual(bool(b2.get(i, 1)), exp,
which2+" %d" % i)
def test_test(self):
self.do_basic(SimpleDataSpans)
self.do_scan(SimpleDataSpans)
def test_basic(self):
self.do_basic(DataSpans)
self.do_scan(DataSpans)
def test_random(self):
# attempt to increase coverage of corner cases by comparing behavior
# of a simple-but-slow model implementation against the
# complex-but-fast actual implementation, in a large number of random
# operations
S1 = SimpleDataSpans
S2 = DataSpans
s1 = S1(); s2 = S2()
seed = ""
def _randstr(length, seed):
created = 0
pieces = []
while created < length:
2019-05-28 16:22:31 +02:00
piece = sha256(seed + str(created))
pieces.append(piece)
created += len(piece)
return "".join(pieces)[:length]
def _create(subseed):
ns1 = S1(); ns2 = S2()
for i in range(10):
2019-05-28 16:22:31 +02:00
what = sha256(subseed+str(i))
start = int(what[2:4], 16)
length = max(1,int(what[5:6], 16))
ns1.add(start, _randstr(length, what[7:9]));
ns2.add(start, _randstr(length, what[7:9]))
return ns1, ns2
#print
for i in range(1000):
2019-05-28 16:22:31 +02:00
what = sha256(seed+str(i))
op = what[0]
subop = what[1]
start = int(what[2:4], 16)
length = max(1,int(what[5:6], 16))
#print what
if op in "0":
if subop in "0123456":
s1 = S1(); s2 = S2()
else:
s1, s2 = _create(what[7:11])
#print "s2 = %s" % list(s2._dump())
elif op in "123456":
#print "s2.add(%d,%d)" % (start, length)
s1.add(start, _randstr(length, what[7:9]));
s2.add(start, _randstr(length, what[7:9]))
elif op in "789abc":
#print "s2.remove(%d,%d)" % (start, length)
s1.remove(start, length); s2.remove(start, length)
else:
#print "s2.pop(%d,%d)" % (start, length)
d1 = s1.pop(start, length); d2 = s2.pop(start, length)
self.failUnlessEqual(d1, d2)
#print "s1 now %s" % list(s1._dump())
#print "s2 now %s" % list(s2._dump())
self.failUnlessEqual(s1.len(), s2.len())
self.failUnlessEqual(list(s1._dump()), list(s2._dump()))
for j in range(100):
2019-05-28 16:22:31 +02:00
what = sha256(what[12:14]+str(j))
start = int(what[2:4], 16)
length = max(1, int(what[5:6], 16))
d1 = s1.get(start, length); d2 = s2.get(start, length)
self.failUnlessEqual(d1, d2, "%d+%d" % (start, length))
class YAML(unittest.TestCase):
def test_convert(self):
data = yaml.safe_dump(["str", u"unicode", u"\u1234nicode"])
back = yamlutil.safe_load(data)
self.failUnlessEqual(type(back[0]), unicode)
self.failUnlessEqual(type(back[1]), unicode)
self.failUnlessEqual(type(back[2]), unicode)