More tests passing on Python 3.

This commit is contained in:
Itamar Turner-Trauring 2021-05-04 10:27:26 -04:00
parent 75deef906d
commit deaaa8c727
2 changed files with 52 additions and 50 deletions

View File

@ -452,7 +452,7 @@ def dump_cap(options):
from allmydata import uri
from allmydata.util import base32
from base64 import b32decode
import urlparse, urllib
from urllib.parse import unquote, urlparse
out = options.stdout
cap = options.cap
@ -461,18 +461,18 @@ def dump_cap(options):
nodeid = b32decode(options['nodeid'].upper())
secret = None
if options['client-secret']:
secret = base32.a2b(options['client-secret'])
secret = base32.a2b(options['client-secret'].encode("ascii"))
elif options['client-dir']:
secretfile = os.path.join(options['client-dir'], "private", "secret")
try:
secret = base32.a2b(open(secretfile, "r").read().strip())
secret = base32.a2b(open(secretfile, "rb").read().strip())
except EnvironmentError:
pass
if cap.startswith("http"):
scheme, netloc, path, params, query, fragment = urlparse.urlparse(cap)
scheme, netloc, path, params, query, fragment = urlparse(cap)
assert path.startswith("/uri/")
cap = urllib.unquote(path[len("/uri/"):])
cap = unquote(path[len("/uri/"):])
u = uri.from_string(cap)
@ -485,19 +485,19 @@ def _dump_secrets(storage_index, secret, nodeid, out):
if secret:
crs = hashutil.my_renewal_secret_hash(secret)
print(" client renewal secret:", base32.b2a(crs), file=out)
print(" client renewal secret:", unicode(base32.b2a(crs), "ascii"), file=out)
frs = hashutil.file_renewal_secret_hash(crs, storage_index)
print(" file renewal secret:", base32.b2a(frs), file=out)
print(" file renewal secret:", unicode(base32.b2a(frs), "ascii"), file=out)
if nodeid:
renew = hashutil.bucket_renewal_secret_hash(frs, nodeid)
print(" lease renewal secret:", base32.b2a(renew), file=out)
print(" lease renewal secret:", unicode(base32.b2a(renew), "ascii"), file=out)
ccs = hashutil.my_cancel_secret_hash(secret)
print(" client cancel secret:", base32.b2a(ccs), file=out)
print(" client cancel secret:", unicode(base32.b2a(ccs), "ascii"), file=out)
fcs = hashutil.file_cancel_secret_hash(ccs, storage_index)
print(" file cancel secret:", base32.b2a(fcs), file=out)
print(" file cancel secret:", unicode(base32.b2a(fcs), "ascii"), file=out)
if nodeid:
cancel = hashutil.bucket_cancel_secret_hash(fcs, nodeid)
print(" lease cancel secret:", base32.b2a(cancel), file=out)
print(" lease cancel secret:", unicode(base32.b2a(cancel), "ascii"), file=out)
def dump_uri_instance(u, nodeid, secret, out, show_header=True):
from allmydata import uri
@ -508,19 +508,19 @@ def dump_uri_instance(u, nodeid, secret, out, show_header=True):
if isinstance(u, uri.CHKFileURI):
if show_header:
print("CHK File:", file=out)
print(" key:", base32.b2a(u.key), file=out)
print(" UEB hash:", base32.b2a(u.uri_extension_hash), file=out)
print(" key:", unicode(base32.b2a(u.key), "ascii"), file=out)
print(" UEB hash:", unicode(base32.b2a(u.uri_extension_hash), "ascii"), file=out)
print(" size:", u.size, file=out)
print(" k/N: %d/%d" % (u.needed_shares, u.total_shares), file=out)
print(" storage index:", si_b2a(u.get_storage_index()), file=out)
print(" storage index:", unicode(si_b2a(u.get_storage_index()), "ascii"), file=out)
_dump_secrets(u.get_storage_index(), secret, nodeid, out)
elif isinstance(u, uri.CHKFileVerifierURI):
if show_header:
print("CHK Verifier URI:", file=out)
print(" UEB hash:", base32.b2a(u.uri_extension_hash), file=out)
print(" UEB hash:", unicode(base32.b2a(u.uri_extension_hash), "ascii"), file=out)
print(" size:", u.size, file=out)
print(" k/N: %d/%d" % (u.needed_shares, u.total_shares), file=out)
print(" storage index:", si_b2a(u.get_storage_index()), file=out)
print(" storage index:", unicode(si_b2a(u.get_storage_index()), "ascii"), file=out)
elif isinstance(u, uri.LiteralFileURI):
if show_header:
@ -530,52 +530,52 @@ def dump_uri_instance(u, nodeid, secret, out, show_header=True):
elif isinstance(u, uri.WriteableSSKFileURI): # SDMF
if show_header:
print("SDMF Writeable URI:", file=out)
print(" writekey:", base32.b2a(u.writekey), file=out)
print(" readkey:", base32.b2a(u.readkey), file=out)
print(" storage index:", si_b2a(u.get_storage_index()), file=out)
print(" fingerprint:", base32.b2a(u.fingerprint), file=out)
print(" writekey:", unicode(base32.b2a(u.writekey), "ascii"), file=out)
print(" readkey:", unicode(base32.b2a(u.readkey), "ascii"), file=out)
print(" storage index:", unicode(si_b2a(u.get_storage_index()), "ascii"), file=out)
print(" fingerprint:", unicode(base32.b2a(u.fingerprint), "ascii"), file=out)
print(file=out)
if nodeid:
we = hashutil.ssk_write_enabler_hash(u.writekey, nodeid)
print(" write_enabler:", base32.b2a(we), file=out)
print(" write_enabler:", unicode(base32.b2a(we), "ascii"), file=out)
print(file=out)
_dump_secrets(u.get_storage_index(), secret, nodeid, out)
elif isinstance(u, uri.ReadonlySSKFileURI):
if show_header:
print("SDMF Read-only URI:", file=out)
print(" readkey:", base32.b2a(u.readkey), file=out)
print(" storage index:", si_b2a(u.get_storage_index()), file=out)
print(" fingerprint:", base32.b2a(u.fingerprint), file=out)
print(" readkey:", unicode(base32.b2a(u.readkey), "ascii"), file=out)
print(" storage index:", unicode(si_b2a(u.get_storage_index()), "ascii"), file=out)
print(" fingerprint:", unicode(base32.b2a(u.fingerprint), "ascii"), file=out)
elif isinstance(u, uri.SSKVerifierURI):
if show_header:
print("SDMF Verifier URI:", file=out)
print(" storage index:", si_b2a(u.get_storage_index()), file=out)
print(" fingerprint:", base32.b2a(u.fingerprint), file=out)
print(" storage index:", unicode(si_b2a(u.get_storage_index()), "ascii"), file=out)
print(" fingerprint:", unicode(base32.b2a(u.fingerprint), "ascii"), file=out)
elif isinstance(u, uri.WriteableMDMFFileURI): # MDMF
if show_header:
print("MDMF Writeable URI:", file=out)
print(" writekey:", base32.b2a(u.writekey), file=out)
print(" readkey:", base32.b2a(u.readkey), file=out)
print(" storage index:", si_b2a(u.get_storage_index()), file=out)
print(" fingerprint:", base32.b2a(u.fingerprint), file=out)
print(" writekey:", unicode(base32.b2a(u.writekey), "ascii"), file=out)
print(" readkey:", unicode(base32.b2a(u.readkey), "ascii"), file=out)
print(" storage index:", unicode(si_b2a(u.get_storage_index()), "ascii"), file=out)
print(" fingerprint:", unicode(base32.b2a(u.fingerprint), "ascii"), file=out)
print(file=out)
if nodeid:
we = hashutil.ssk_write_enabler_hash(u.writekey, nodeid)
print(" write_enabler:", base32.b2a(we), file=out)
print(" write_enabler:", unicode(base32.b2a(we), "ascii"), file=out)
print(file=out)
_dump_secrets(u.get_storage_index(), secret, nodeid, out)
elif isinstance(u, uri.ReadonlyMDMFFileURI):
if show_header:
print("MDMF Read-only URI:", file=out)
print(" readkey:", base32.b2a(u.readkey), file=out)
print(" storage index:", si_b2a(u.get_storage_index()), file=out)
print(" fingerprint:", base32.b2a(u.fingerprint), file=out)
print(" readkey:", unicode(base32.b2a(u.readkey), "ascii"), file=out)
print(" storage index:", unicode(si_b2a(u.get_storage_index()), "ascii"), file=out)
print(" fingerprint:", unicode(base32.b2a(u.fingerprint), "ascii"), file=out)
elif isinstance(u, uri.MDMFVerifierURI):
if show_header:
print("MDMF Verifier URI:", file=out)
print(" storage index:", si_b2a(u.get_storage_index()), file=out)
print(" fingerprint:", base32.b2a(u.fingerprint), file=out)
print(" storage index:", unicode(si_b2a(u.get_storage_index()), "ascii"), file=out)
print(" fingerprint:", unicode(base32.b2a(u.fingerprint), "ascii"), file=out)
elif isinstance(u, uri.ImmutableDirectoryURI): # CHK-based directory

View File

@ -47,6 +47,8 @@ from allmydata.util.encodingutil import listdir_unicode, get_io_encoding
class CLI(CLITestMixin, unittest.TestCase):
def _dump_cap(self, *args):
args = [(unicode(s, "ascii") if isinstance(s, bytes) else s)
for s in args]
config = debug.DumpCapOptions()
config.stdout,config.stderr = StringIO(), StringIO()
config.parseOptions(args)
@ -66,7 +68,7 @@ class CLI(CLITestMixin, unittest.TestCase):
needed_shares=needed_shares,
total_shares=total_shares,
size=size)
output = self._dump_cap(unicode(u.to_string(), "ascii"))
output = self._dump_cap(u.to_string())
self.failUnless("CHK File:" in output, output)
self.failUnless("key: aaaqeayeaudaocajbifqydiob4" in output, output)
self.failUnless("UEB hash: nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa" in output, output)
@ -78,7 +80,7 @@ class CLI(CLITestMixin, unittest.TestCase):
u.to_string())
self.failUnless("client renewal secret: znxmki5zdibb5qlt46xbdvk2t55j7hibejq3i5ijyurkr6m6jkhq" in output, output)
output = self._dump_cap(u.get_verify_cap().to_string())
output = self._dump_cap(unicode(u.get_verify_cap().to_string(), "ascii"))
self.failIf("key: " in output, output)
self.failUnless("UEB hash: nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa" in output, output)
self.failUnless("size: 1234" in output, output)
@ -95,14 +97,14 @@ class CLI(CLITestMixin, unittest.TestCase):
self.failUnless("storage index: hdis5iaveku6lnlaiccydyid7q" in output, output)
def test_dump_cap_lit(self):
u = uri.LiteralFileURI("this is some data")
u = uri.LiteralFileURI(b"this is some data")
output = self._dump_cap(u.to_string())
self.failUnless("Literal File URI:" in output, output)
self.failUnless("data: 'this is some data'" in output, output)
def test_dump_cap_sdmf(self):
writekey = "\x01" * 16
fingerprint = "\xfe" * 32
writekey = b"\x01" * 16
fingerprint = b"\xfe" * 32
u = uri.WriteableSSKFileURI(writekey, fingerprint)
output = self._dump_cap(u.to_string())
@ -152,8 +154,8 @@ class CLI(CLITestMixin, unittest.TestCase):
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
def test_dump_cap_mdmf(self):
writekey = "\x01" * 16
fingerprint = "\xfe" * 32
writekey = b"\x01" * 16
fingerprint = b"\xfe" * 32
u = uri.WriteableMDMFFileURI(writekey, fingerprint)
output = self._dump_cap(u.to_string())
@ -204,8 +206,8 @@ class CLI(CLITestMixin, unittest.TestCase):
def test_dump_cap_chk_directory(self):
key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
uri_extension_hash = hashutil.uri_extension_hash("stuff")
key = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
uri_extension_hash = hashutil.uri_extension_hash(b"stuff")
needed_shares = 25
total_shares = 100
size = 1234
@ -238,8 +240,8 @@ class CLI(CLITestMixin, unittest.TestCase):
self.failUnless("storage index: hdis5iaveku6lnlaiccydyid7q" in output, output)
def test_dump_cap_sdmf_directory(self):
writekey = "\x01" * 16
fingerprint = "\xfe" * 32
writekey = b"\x01" * 16
fingerprint = b"\xfe" * 32
u1 = uri.WriteableSSKFileURI(writekey, fingerprint)
u = uri.DirectoryURI(u1)
@ -282,8 +284,8 @@ class CLI(CLITestMixin, unittest.TestCase):
self.failUnless("fingerprint: 737p57x6737p57x6737p57x6737p57x6737p57x6737p57x6737a" in output, output)
def test_dump_cap_mdmf_directory(self):
writekey = "\x01" * 16
fingerprint = "\xfe" * 32
writekey = b"\x01" * 16
fingerprint = b"\xfe" * 32
u1 = uri.WriteableMDMFFileURI(writekey, fingerprint)
u = uri.MDMFDirectoryURI(u1)
@ -343,7 +345,7 @@ class CLI(CLITestMixin, unittest.TestCase):
fileutil.write("cli/test_catalog_shares/node1/storage/shares/mq/not-a-dir", "")
# write a bogus share that looks a little bit like CHK
fileutil.write(os.path.join(sharedir, "8"),
"\x00\x00\x00\x01" + "\xff" * 200) # this triggers an assert
b"\x00\x00\x00\x01" + b"\xff" * 200) # this triggers an assert
nodedir2 = "cli/test_catalog_shares/node2"
fileutil.make_dirs(nodedir2)