tahoe-lafs/src/allmydata/test/test_uri.py

915 lines
40 KiB
Python
Raw Normal View History

import re
from twisted.trial import unittest
from allmydata import uri
from allmydata.util import hashutil, base32
from allmydata.interfaces import IURI, IFileURI, IDirnodeURI, IMutableFileURI, \
IVerifierURI, CapConstraintError
import allmydata.test.common_util as testutil
class Literal(testutil.ReallyEqualMixin, unittest.TestCase):
def _help_test(self, data):
u = uri.LiteralFileURI(data)
self.failUnless(IURI.providedBy(u))
self.failUnless(IFileURI.providedBy(u))
self.failIf(IDirnodeURI.providedBy(u))
self.failUnlessReallyEqual(u.data, data)
self.failUnlessReallyEqual(u.get_size(), len(data))
self.failUnless(u.is_readonly())
self.failIf(u.is_mutable())
u2 = uri.from_string(u.to_string())
self.failUnless(IURI.providedBy(u2))
self.failUnless(IFileURI.providedBy(u2))
self.failIf(IDirnodeURI.providedBy(u2))
self.failUnlessReallyEqual(u2.data, data)
self.failUnlessReallyEqual(u2.get_size(), len(data))
self.failUnless(u2.is_readonly())
self.failIf(u2.is_mutable())
u2i = uri.from_string(u.to_string(), deep_immutable=True)
self.failUnless(IFileURI.providedBy(u2i))
self.failIf(IDirnodeURI.providedBy(u2i))
self.failUnlessReallyEqual(u2i.data, data)
self.failUnlessReallyEqual(u2i.get_size(), len(data))
self.failUnless(u2i.is_readonly())
self.failIf(u2i.is_mutable())
2007-12-05 00:38:31 +00:00
u3 = u.get_readonly()
self.failUnlessIdentical(u, u3)
self.failUnlessReallyEqual(u.get_verify_cap(), None)
2007-12-05 00:38:31 +00:00
he = u.to_human_encoding()
u_h = uri.LiteralFileURI.init_from_human_encoding(he)
self.failUnlessReallyEqual(u, u_h)
def test_empty(self):
data = "" # This data is some *very* small data!
return self._help_test(data)
def test_pack(self):
data = "This is some small data"
return self._help_test(data)
def test_nonascii(self):
data = "This contains \x00 and URI:LIT: and \n, oh my."
return self._help_test(data)
class Compare(testutil.ReallyEqualMixin, unittest.TestCase):
2007-09-17 08:09:47 +00:00
def test_compare(self):
lit1 = uri.LiteralFileURI("some data")
fileURI = 'URI:CHK:f5ahxa25t4qkktywz6teyfvcx4:opuioq7tj2y6idzfp6cazehtmgs5fdcebcz3cygrxyydvcozrmeq:3:10:345834'
chk1 = uri.CHKFileURI.init_from_string(fileURI)
chk2 = uri.CHKFileURI.init_from_string(fileURI)
unk = uri.UnknownURI("lafs://from_the_future")
2007-09-17 08:09:47 +00:00
self.failIfEqual(lit1, chk1)
self.failUnlessReallyEqual(chk1, chk2)
2007-09-17 08:09:47 +00:00
self.failIfEqual(chk1, "not actually a URI")
# these should be hashable too
s = set([lit1, chk1, chk2, unk])
self.failUnlessReallyEqual(len(s), 3) # since chk1==chk2
2007-09-17 08:09:47 +00:00
def test_is_uri(self):
lit1 = uri.LiteralFileURI("some data").to_string()
self.failUnless(uri.is_uri(lit1))
self.failIf(uri.is_uri(None))
def test_is_literal_file_uri(self):
lit1 = uri.LiteralFileURI("some data").to_string()
self.failUnless(uri.is_literal_file_uri(lit1))
self.failIf(uri.is_literal_file_uri(None))
self.failIf(uri.is_literal_file_uri("foo"))
self.failIf(uri.is_literal_file_uri("ro.foo"))
self.failIf(uri.is_literal_file_uri("URI:LITfoo"))
self.failUnless(uri.is_literal_file_uri("ro.URI:LIT:foo"))
self.failUnless(uri.is_literal_file_uri("imm.URI:LIT:foo"))
def test_has_uri_prefix(self):
self.failUnless(uri.has_uri_prefix("URI:foo"))
self.failUnless(uri.has_uri_prefix("ro.URI:foo"))
self.failUnless(uri.has_uri_prefix("imm.URI:foo"))
self.failIf(uri.has_uri_prefix(None))
self.failIf(uri.has_uri_prefix("foo"))
class CHKFile(testutil.ReallyEqualMixin, unittest.TestCase):
def test_pack(self):
key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
storage_index = hashutil.storage_index_hash(key)
uri_extension_hash = hashutil.uri_extension_hash("stuff")
needed_shares = 25
total_shares = 100
size = 1234
u = uri.CHKFileURI(key=key,
uri_extension_hash=uri_extension_hash,
needed_shares=needed_shares,
total_shares=total_shares,
size=size)
self.failUnlessReallyEqual(u.get_storage_index(), storage_index)
self.failUnlessReallyEqual(u.key, key)
self.failUnlessReallyEqual(u.uri_extension_hash, uri_extension_hash)
self.failUnlessReallyEqual(u.needed_shares, needed_shares)
self.failUnlessReallyEqual(u.total_shares, total_shares)
self.failUnlessReallyEqual(u.size, size)
self.failUnless(u.is_readonly())
self.failIf(u.is_mutable())
self.failUnless(IURI.providedBy(u))
self.failUnless(IFileURI.providedBy(u))
self.failIf(IDirnodeURI.providedBy(u))
self.failUnlessReallyEqual(u.get_size(), 1234)
2007-12-05 00:38:31 +00:00
u_ro = u.get_readonly()
self.failUnlessIdentical(u, u_ro)
he = u.to_human_encoding()
self.failUnlessReallyEqual(he, "http://127.0.0.1:3456/uri/" + u.to_string())
self.failUnlessReallyEqual(uri.CHKFileURI.init_from_human_encoding(he), u)
u2 = uri.from_string(u.to_string())
self.failUnlessReallyEqual(u2.get_storage_index(), storage_index)
self.failUnlessReallyEqual(u2.key, key)
self.failUnlessReallyEqual(u2.uri_extension_hash, uri_extension_hash)
self.failUnlessReallyEqual(u2.needed_shares, needed_shares)
self.failUnlessReallyEqual(u2.total_shares, total_shares)
self.failUnlessReallyEqual(u2.size, size)
self.failUnless(u2.is_readonly())
self.failIf(u2.is_mutable())
self.failUnless(IURI.providedBy(u2))
self.failUnless(IFileURI.providedBy(u2))
self.failIf(IDirnodeURI.providedBy(u2))
self.failUnlessReallyEqual(u2.get_size(), 1234)
u2i = uri.from_string(u.to_string(), deep_immutable=True)
self.failUnlessReallyEqual(u.to_string(), u2i.to_string())
u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
self.failUnlessReallyEqual(u.to_string(), u2ro.to_string())
u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
self.failUnlessReallyEqual(u.to_string(), u2imm.to_string())
v = u.get_verify_cap()
2007-12-05 00:38:31 +00:00
self.failUnless(isinstance(v.to_string(), str))
self.failUnless(v.is_readonly())
self.failIf(v.is_mutable())
2007-12-05 00:38:31 +00:00
v2 = uri.from_string(v.to_string())
self.failUnlessReallyEqual(v, v2)
he = v.to_human_encoding()
v2_h = uri.CHKFileVerifierURI.init_from_human_encoding(he)
self.failUnlessReallyEqual(v2, v2_h)
2007-12-05 00:38:31 +00:00
v3 = uri.CHKFileVerifierURI(storage_index="\x00"*16,
uri_extension_hash="\x00"*32,
needed_shares=3,
total_shares=10,
size=1234)
self.failUnless(isinstance(v3.to_string(), str))
self.failUnless(v3.is_readonly())
self.failIf(v3.is_mutable())
2007-12-05 00:38:31 +00:00
2007-09-17 08:09:47 +00:00
def test_pack_badly(self):
key = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f"
storage_index = hashutil.storage_index_hash(key)
2007-09-17 08:09:47 +00:00
uri_extension_hash = hashutil.uri_extension_hash("stuff")
needed_shares = 25
total_shares = 100
size = 1234
self.failUnlessRaises(TypeError,
uri.CHKFileURI,
key=key,
uri_extension_hash=uri_extension_hash,
needed_shares=needed_shares,
total_shares=total_shares,
size=size,
bogus_extra_argument="reject me",
)
2007-12-05 00:38:31 +00:00
self.failUnlessRaises(TypeError,
uri.CHKFileVerifierURI,
bogus="bogus")
self.failUnlessRaises(TypeError,
uri.CHKFileVerifierURI,
storage_index=storage_index,
uri_extension_hash=uri_extension_hash,
needed_shares=3,
total_shares=10,
# leave size= missing
)
2007-09-17 08:09:47 +00:00
class Extension(testutil.ReallyEqualMixin, unittest.TestCase):
def test_pack(self):
data = {"stuff": "value",
"size": 12,
"needed_shares": 3,
"big_hash": hashutil.tagged_hash("foo", "bar"),
}
ext = uri.pack_extension(data)
d = uri.unpack_extension(ext)
self.failUnlessReallyEqual(d["stuff"], "value")
self.failUnlessReallyEqual(d["size"], 12)
self.failUnlessReallyEqual(d["big_hash"], hashutil.tagged_hash("foo", "bar"))
readable = uri.unpack_extension_readable(ext)
self.failUnlessReallyEqual(readable["needed_shares"], 3)
self.failUnlessReallyEqual(readable["stuff"], "value")
self.failUnlessReallyEqual(readable["size"], 12)
self.failUnlessReallyEqual(readable["big_hash"],
base32.b2a(hashutil.tagged_hash("foo", "bar")))
self.failUnlessReallyEqual(readable["UEB_hash"],
base32.b2a(hashutil.uri_extension_hash(ext)))
class Unknown(testutil.ReallyEqualMixin, unittest.TestCase):
def test_from_future(self):
# any URI type that we don't recognize should be treated as unknown
future_uri = "I am a URI from the future. Whatever you do, don't "
u = uri.from_string(future_uri)
self.failUnless(isinstance(u, uri.UnknownURI))
self.failUnlessReallyEqual(u.to_string(), future_uri)
self.failUnless(u.get_readonly() is None)
self.failUnless(u.get_error() is None)
u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
self.failUnlessReallyEqual(u.to_string(), future_uri)
self.failUnless(u2.get_readonly() is None)
self.failUnless(isinstance(u2.get_error(), CapConstraintError))
2007-08-29 06:28:26 +00:00
# Future caps might have non-ASCII chars in them. (Or maybe not, who can tell about the future?)
future_uri = u"I am a cap from the \u263A future. Whatever you ".encode('utf-8')
u = uri.from_string(future_uri)
self.failUnless(isinstance(u, uri.UnknownURI))
self.failUnlessReallyEqual(u.to_string(), future_uri)
self.failUnless(u.get_readonly() is None)
self.failUnless(u.get_error() is None)
u2 = uri.UnknownURI(future_uri, error=CapConstraintError("..."))
self.failUnlessReallyEqual(u.to_string(), future_uri)
self.failUnless(u2.get_readonly() is None)
self.failUnless(isinstance(u2.get_error(), CapConstraintError))
class Constraint(testutil.ReallyEqualMixin, unittest.TestCase):
2007-08-29 06:28:26 +00:00
def test_constraint(self):
good="http://127.0.0.1:3456/uri/URI%3ADIR2%3Agh3l5rbvnv2333mrfvalmjfr4i%3Alz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma/"
uri.DirectoryURI.init_from_human_encoding(good)
self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, good)
bad = good + '==='
self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_human_encoding, bad)
self.failUnlessRaises(uri.BadURIError, uri.DirectoryURI.init_from_string, bad)
fileURI = 'URI:CHK:gh3l5rbvnv2333mrfvalmjfr4i:lz6l7u3z3b7g37s4zkdmfpx5ly4ib4m6thrpbusi6ys62qtc6mma:3:10:345834'
uri.CHKFileURI.init_from_string(fileURI)
class Mutable(testutil.ReallyEqualMixin, unittest.TestCase):
def setUp(self):
self.writekey = "\x01" * 16
self.fingerprint = "\x02" * 32
self.readkey = hashutil.ssk_readkey_hash(self.writekey)
self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
def test_pack(self):
u = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
self.failUnlessReallyEqual(u.writekey, self.writekey)
self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
self.failIf(u.is_readonly())
self.failUnless(u.is_mutable())
self.failUnless(IURI.providedBy(u))
self.failUnless(IMutableFileURI.providedBy(u))
self.failIf(IDirnodeURI.providedBy(u))
2007-12-05 00:38:31 +00:00
self.failUnless("WriteableSSKFileURI" in str(u))
he = u.to_human_encoding()
u_h = uri.WriteableSSKFileURI.init_from_human_encoding(he)
self.failUnlessReallyEqual(u, u_h)
u2 = uri.from_string(u.to_string())
self.failUnlessReallyEqual(u2.writekey, self.writekey)
self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
self.failIf(u2.is_readonly())
self.failUnless(u2.is_mutable())
self.failUnless(IURI.providedBy(u2))
self.failUnless(IMutableFileURI.providedBy(u2))
self.failIf(IDirnodeURI.providedBy(u2))
u2i = uri.from_string(u.to_string(), deep_immutable=True)
self.failUnless(isinstance(u2i, uri.UnknownURI), u2i)
u2ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u.to_string())
self.failUnless(isinstance(u2ro, uri.UnknownURI), u2ro)
u2imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u.to_string())
self.failUnless(isinstance(u2imm, uri.UnknownURI), u2imm)
u3 = u2.get_readonly()
readkey = hashutil.ssk_readkey_hash(self.writekey)
self.failUnlessReallyEqual(u3.fingerprint, self.fingerprint)
self.failUnlessReallyEqual(u3.readkey, readkey)
self.failUnless(u3.is_readonly())
self.failUnless(u3.is_mutable())
self.failUnless(IURI.providedBy(u3))
self.failUnless(IMutableFileURI.providedBy(u3))
self.failIf(IDirnodeURI.providedBy(u3))
u3i = uri.from_string(u3.to_string(), deep_immutable=True)
self.failUnless(isinstance(u3i, uri.UnknownURI), u3i)
u3ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u3.to_string())
self.failUnlessReallyEqual(u3.to_string(), u3ro.to_string())
u3imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u3.to_string())
self.failUnless(isinstance(u3imm, uri.UnknownURI), u3imm)
he = u3.to_human_encoding()
u3_h = uri.ReadonlySSKFileURI.init_from_human_encoding(he)
self.failUnlessReallyEqual(u3, u3_h)
u4 = uri.ReadonlySSKFileURI(readkey, self.fingerprint)
self.failUnlessReallyEqual(u4.fingerprint, self.fingerprint)
self.failUnlessReallyEqual(u4.readkey, readkey)
self.failUnless(u4.is_readonly())
self.failUnless(u4.is_mutable())
self.failUnless(IURI.providedBy(u4))
self.failUnless(IMutableFileURI.providedBy(u4))
self.failIf(IDirnodeURI.providedBy(u4))
u4i = uri.from_string(u4.to_string(), deep_immutable=True)
self.failUnless(isinstance(u4i, uri.UnknownURI), u4i)
u4ro = uri.from_string(uri.ALLEGED_READONLY_PREFIX + u4.to_string())
self.failUnlessReallyEqual(u4.to_string(), u4ro.to_string())
u4imm = uri.from_string(uri.ALLEGED_IMMUTABLE_PREFIX + u4.to_string())
self.failUnless(isinstance(u4imm, uri.UnknownURI), u4imm)
2007-12-05 00:38:31 +00:00
u4a = uri.from_string(u4.to_string())
self.failUnlessReallyEqual(u4a, u4)
2007-12-05 00:38:31 +00:00
self.failUnless("ReadonlySSKFileURI" in str(u4a))
self.failUnlessIdentical(u4a.get_readonly(), u4a)
u5 = u4.get_verify_cap()
self.failUnless(IVerifierURI.providedBy(u5))
self.failUnlessReallyEqual(u5.get_storage_index(), u.get_storage_index())
u7 = u.get_verify_cap()
self.failUnless(IVerifierURI.providedBy(u7))
self.failUnlessReallyEqual(u7.get_storage_index(), u.get_storage_index())
he = u5.to_human_encoding()
u5_h = uri.SSKVerifierURI.init_from_human_encoding(he)
self.failUnlessReallyEqual(u5, u5_h)
def test_writable_mdmf_cap(self):
u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
cap = u1.to_string()
u = uri.WritableMDMFFileURI.init_from_string(cap)
self.failUnless(IMutableFileURI.providedBy(u))
self.failUnlessReallyEqual(u.fingerprint, self.fingerprint)
self.failUnlessReallyEqual(u.writekey, self.writekey)
self.failUnless(u.is_mutable())
self.failIf(u.is_readonly())
self.failUnlessEqual(cap, u.to_string())
# Now get a readonly cap from the writable cap, and test that it
# degrades gracefully.
ru = u.get_readonly()
self.failUnlessReallyEqual(self.readkey, ru.readkey)
self.failUnlessReallyEqual(self.fingerprint, ru.fingerprint)
self.failUnless(ru.is_mutable())
self.failUnless(ru.is_readonly())
# Now get a verifier cap.
vu = ru.get_verify_cap()
self.failUnlessReallyEqual(self.storage_index, vu.storage_index)
self.failUnlessReallyEqual(self.fingerprint, vu.fingerprint)
self.failUnless(IVerifierURI.providedBy(vu))
def test_readonly_mdmf_cap(self):
u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
cap = u1.to_string()
u2 = uri.ReadonlyMDMFFileURI.init_from_string(cap)
self.failUnlessReallyEqual(u2.fingerprint, self.fingerprint)
self.failUnlessReallyEqual(u2.readkey, self.readkey)
self.failUnless(u2.is_readonly())
self.failUnless(u2.is_mutable())
vu = u2.get_verify_cap()
self.failUnlessEqual(vu.storage_index, self.storage_index)
self.failUnlessEqual(vu.fingerprint, self.fingerprint)
def test_create_writable_mdmf_cap_from_readcap(self):
# we shouldn't be able to create a writable MDMF cap given only a
# readcap.
u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
cap = u1.to_string()
self.failUnlessRaises(uri.BadURIError,
uri.WritableMDMFFileURI.init_from_string,
cap)
def test_create_writable_mdmf_cap_from_verifycap(self):
u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
cap = u1.to_string()
self.failUnlessRaises(uri.BadURIError,
uri.WritableMDMFFileURI.init_from_string,
cap)
def test_create_readonly_mdmf_cap_from_verifycap(self):
u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
cap = u1.to_string()
self.failUnlessRaises(uri.BadURIError,
uri.ReadonlyMDMFFileURI.init_from_string,
cap)
def test_mdmf_verifier_cap(self):
u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
self.failUnless(u1.is_readonly())
self.failIf(u1.is_mutable())
self.failUnlessReallyEqual(self.storage_index, u1.storage_index)
self.failUnlessReallyEqual(self.fingerprint, u1.fingerprint)
cap = u1.to_string()
u2 = uri.MDMFVerifierURI.init_from_string(cap)
self.failUnless(u2.is_readonly())
self.failIf(u2.is_mutable())
self.failUnlessReallyEqual(self.storage_index, u2.storage_index)
self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
u3 = u2.get_readonly()
self.failUnlessReallyEqual(u3, u2)
u4 = u2.get_verify_cap()
self.failUnlessReallyEqual(u4, u2)
def test_mdmf_cap_extra_information(self):
# MDMF caps can be arbitrarily extended after the fingerprint
# and key/storage index fields.
u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
self.failUnlessEqual([], u1.get_extension_params())
cap = u1.to_string()
# Now let's append some fields. Say, 131073 (the segment size)
# and 3 (the "k" encoding parameter).
expected_extensions = []
for e in ('131073', '3'):
cap += (":%s" % e)
expected_extensions.append(e)
u2 = uri.WritableMDMFFileURI.init_from_string(cap)
self.failUnlessReallyEqual(self.writekey, u2.writekey)
self.failUnlessReallyEqual(self.fingerprint, u2.fingerprint)
self.failIf(u2.is_readonly())
self.failUnless(u2.is_mutable())
c2 = u2.to_string()
u2n = uri.WritableMDMFFileURI.init_from_string(c2)
self.failUnlessReallyEqual(u2, u2n)
# We should get the extra back when we ask for it.
self.failUnlessEqual(expected_extensions, u2.get_extension_params())
# These should be preserved through cap attenuation, too.
u3 = u2.get_readonly()
self.failUnlessReallyEqual(self.readkey, u3.readkey)
self.failUnlessReallyEqual(self.fingerprint, u3.fingerprint)
self.failUnless(u3.is_readonly())
self.failUnless(u3.is_mutable())
self.failUnlessEqual(expected_extensions, u3.get_extension_params())
c3 = u3.to_string()
u3n = uri.ReadonlyMDMFFileURI.init_from_string(c3)
self.failUnlessReallyEqual(u3, u3n)
u4 = u3.get_verify_cap()
self.failUnlessReallyEqual(self.storage_index, u4.storage_index)
self.failUnlessReallyEqual(self.fingerprint, u4.fingerprint)
self.failUnless(u4.is_readonly())
self.failIf(u4.is_mutable())
c4 = u4.to_string()
u4n = uri.MDMFVerifierURI.init_from_string(c4)
self.failUnlessReallyEqual(u4n, u4)
self.failUnlessEqual(expected_extensions, u4.get_extension_params())
def test_sdmf_cap_extra_information(self):
# For interface consistency, we define a method to get
# extensions for SDMF files as well. This method must always
# return no extensions, since SDMF files were not created with
# extensions and cannot be modified to include extensions
# without breaking older clients.
u1 = uri.WriteableSSKFileURI(self.writekey, self.fingerprint)
cap = u1.to_string()
u2 = uri.WriteableSSKFileURI.init_from_string(cap)
self.failUnlessEqual([], u2.get_extension_params())
def test_extension_character_range(self):
# As written now, we shouldn't put things other than numbers in
# the extension fields.
writecap = uri.WritableMDMFFileURI(self.writekey, self.fingerprint).to_string()
readcap = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint).to_string()
vcap = uri.MDMFVerifierURI(self.storage_index, self.fingerprint).to_string()
self.failUnlessRaises(uri.BadURIError,
uri.WritableMDMFFileURI.init_from_string,
("%s:invalid" % writecap))
self.failUnlessRaises(uri.BadURIError,
uri.ReadonlyMDMFFileURI.init_from_string,
("%s:invalid" % readcap))
self.failUnlessRaises(uri.BadURIError,
uri.MDMFVerifierURI.init_from_string,
("%s:invalid" % vcap))
def test_mdmf_valid_human_encoding(self):
# What's a human encoding? Well, it's of the form:
base = "https://127.0.0.1:3456/uri/"
# With a cap on the end. For each of the cap types, we need to
# test that a valid cap (with and without the traditional
# separators) is recognized and accepted by the classes.
w1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
w2 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
['131073', '3'])
r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
['131073', '3'])
v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
['131073', '3'])
# These will yield six different caps.
for o in (w1, w2, r1 , r2, v1, v2):
url = base + o.to_string()
o1 = o.__class__.init_from_human_encoding(url)
self.failUnlessReallyEqual(o1, o)
# Note that our cap will, by default, have : as separators.
# But it's expected that users from, e.g., the WUI, will
# have %3A as a separator. We need to make sure that the
# initialization routine handles that, too.
cap = o.to_string()
cap = re.sub(":", "%3A", cap)
url = base + cap
o2 = o.__class__.init_from_human_encoding(url)
self.failUnlessReallyEqual(o2, o)
def test_mdmf_human_encoding_invalid_base(self):
# What's a human encoding? Well, it's of the form:
base = "https://127.0.0.1:3456/foo/bar/bazuri/"
# With a cap on the end. For each of the cap types, we need to
# test that a valid cap (with and without the traditional
# separators) is recognized and accepted by the classes.
w1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
w2 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
['131073', '3'])
r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
['131073', '3'])
v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
['131073', '3'])
# These will yield six different caps.
for o in (w1, w2, r1 , r2, v1, v2):
url = base + o.to_string()
self.failUnlessRaises(uri.BadURIError,
o.__class__.init_from_human_encoding,
url)
def test_mdmf_human_encoding_invalid_cap(self):
base = "https://127.0.0.1:3456/uri/"
# With a cap on the end. For each of the cap types, we need to
# test that a valid cap (with and without the traditional
# separators) is recognized and accepted by the classes.
w1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
w2 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
['131073', '3'])
r1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
r2 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
['131073', '3'])
v1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
v2 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
['131073', '3'])
# These will yield six different caps.
for o in (w1, w2, r1 , r2, v1, v2):
# not exhaustive, obviously...
url = base + o.to_string() + "foobarbaz"
url2 = base + "foobarbaz" + o.to_string()
url3 = base + o.to_string()[:25] + "foo" + o.to_string()[:25]
for u in (url, url2, url3):
self.failUnlessRaises(uri.BadURIError,
o.__class__.init_from_human_encoding,
u)
def test_mdmf_from_string(self):
# Make sure that the from_string utility function works with
# MDMF caps.
u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint)
cap = u1.to_string()
self.failUnless(uri.is_uri(cap))
u2 = uri.from_string(cap)
self.failUnlessReallyEqual(u1, u2)
u3 = uri.from_string_mutable_filenode(cap)
self.failUnlessEqual(u3, u1)
# XXX: We should refactor the extension field into setUp
u1 = uri.WritableMDMFFileURI(self.writekey, self.fingerprint,
['131073', '3'])
cap = u1.to_string()
self.failUnless(uri.is_uri(cap))
u2 = uri.from_string(cap)
self.failUnlessReallyEqual(u1, u2)
u3 = uri.from_string_mutable_filenode(cap)
self.failUnlessEqual(u3, u1)
u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint)
cap = u1.to_string()
self.failUnless(uri.is_uri(cap))
u2 = uri.from_string(cap)
self.failUnlessReallyEqual(u1, u2)
u3 = uri.from_string_mutable_filenode(cap)
self.failUnlessEqual(u3, u1)
u1 = uri.ReadonlyMDMFFileURI(self.readkey, self.fingerprint,
['131073', '3'])
cap = u1.to_string()
self.failUnless(uri.is_uri(cap))
u2 = uri.from_string(cap)
self.failUnlessReallyEqual(u1, u2)
u3 = uri.from_string_mutable_filenode(cap)
self.failUnlessEqual(u3, u1)
u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint)
cap = u1.to_string()
self.failUnless(uri.is_uri(cap))
u2 = uri.from_string(cap)
self.failUnlessReallyEqual(u1, u2)
u3 = uri.from_string_verifier(cap)
self.failUnlessEqual(u3, u1)
u1 = uri.MDMFVerifierURI(self.storage_index, self.fingerprint,
['131073', '3'])
cap = u1.to_string()
self.failUnless(uri.is_uri(cap))
u2 = uri.from_string(cap)
self.failUnlessReallyEqual(u1, u2)
u3 = uri.from_string_verifier(cap)
self.failUnlessEqual(u3, u1)
class Dirnode(testutil.ReallyEqualMixin, unittest.TestCase):
def test_pack(self):
writekey = "\x01" * 16
fingerprint = "\x02" * 32
n = uri.WriteableSSKFileURI(writekey, fingerprint)
u1 = uri.DirectoryURI(n)
self.failIf(u1.is_readonly())
self.failUnless(u1.is_mutable())
self.failUnless(IURI.providedBy(u1))
self.failIf(IFileURI.providedBy(u1))
self.failUnless(IDirnodeURI.providedBy(u1))
self.failUnless("DirectoryURI" in str(u1))
u1_filenode = u1.get_filenode_cap()
2007-12-05 00:38:31 +00:00
self.failUnless(u1_filenode.is_mutable())
self.failIf(u1_filenode.is_readonly())
u2 = uri.from_string(u1.to_string())
self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
self.failIf(u2.is_readonly())
self.failUnless(u2.is_mutable())
self.failUnless(IURI.providedBy(u2))
self.failIf(IFileURI.providedBy(u2))
self.failUnless(IDirnodeURI.providedBy(u2))
u2i = uri.from_string(u1.to_string(), deep_immutable=True)
self.failUnless(isinstance(u2i, uri.UnknownURI))
u3 = u2.get_readonly()
self.failUnless(u3.is_readonly())
self.failUnless(u3.is_mutable())
self.failUnless(IURI.providedBy(u3))
self.failIf(IFileURI.providedBy(u3))
self.failUnless(IDirnodeURI.providedBy(u3))
u3i = uri.from_string(u2.to_string(), deep_immutable=True)
self.failUnless(isinstance(u3i, uri.UnknownURI))
u3n = u3._filenode_uri
self.failUnless(u3n.is_readonly())
self.failUnless(u3n.is_mutable())
u3_filenode = u3.get_filenode_cap()
2007-12-05 00:38:31 +00:00
self.failUnless(u3_filenode.is_mutable())
self.failUnless(u3_filenode.is_readonly())
u3a = uri.from_string(u3.to_string())
self.failUnlessIdentical(u3a, u3a.get_readonly())
u4 = uri.ReadonlyDirectoryURI(u2._filenode_uri.get_readonly())
self.failUnlessReallyEqual(u4.to_string(), u3.to_string())
self.failUnless(u4.is_readonly())
self.failUnless(u4.is_mutable())
self.failUnless(IURI.providedBy(u4))
self.failIf(IFileURI.providedBy(u4))
self.failUnless(IDirnodeURI.providedBy(u4))
u4_verifier = u4.get_verify_cap()
u4_verifier_filenode = u4_verifier.get_filenode_cap()
2007-12-05 00:38:31 +00:00
self.failUnless(isinstance(u4_verifier_filenode, uri.SSKVerifierURI))
verifiers = [u1.get_verify_cap(), u2.get_verify_cap(),
u3.get_verify_cap(), u4.get_verify_cap(),
uri.DirectoryURIVerifier(n.get_verify_cap()),
]
for v in verifiers:
self.failUnless(IVerifierURI.providedBy(v))
self.failUnlessReallyEqual(v._filenode_uri,
u1.get_verify_cap()._filenode_uri)
def test_immutable(self):
readkey = "\x01" * 16
uri_extension_hash = hashutil.uri_extension_hash("stuff")
needed_shares = 3
total_shares = 10
size = 1234
fnuri = uri.CHKFileURI(key=readkey,
uri_extension_hash=uri_extension_hash,
needed_shares=needed_shares,
total_shares=total_shares,
size=size)
fncap = fnuri.to_string()
self.failUnlessReallyEqual(fncap, "URI:CHK:aeaqcaibaeaqcaibaeaqcaibae:nf3nimquen7aeqm36ekgxomalstenpkvsdmf6fplj7swdatbv5oa:3:10:1234")
u1 = uri.ImmutableDirectoryURI(fnuri)
self.failUnless(u1.is_readonly())
self.failIf(u1.is_mutable())
self.failUnless(IURI.providedBy(u1))
self.failIf(IFileURI.providedBy(u1))
self.failUnless(IDirnodeURI.providedBy(u1))
self.failUnless("DirectoryURI" in str(u1))
u1_filenode = u1.get_filenode_cap()
self.failIf(u1_filenode.is_mutable())
self.failUnless(u1_filenode.is_readonly())
self.failUnlessReallyEqual(u1_filenode.to_string(), fncap)
self.failUnless(str(u1))
u2 = uri.from_string(u1.to_string())
self.failUnlessReallyEqual(u1.to_string(), u2.to_string())
self.failUnless(u2.is_readonly())
self.failIf(u2.is_mutable())
self.failUnless(IURI.providedBy(u2))
self.failIf(IFileURI.providedBy(u2))
self.failUnless(IDirnodeURI.providedBy(u2))
u2i = uri.from_string(u1.to_string(), deep_immutable=True)
self.failUnlessReallyEqual(u1.to_string(), u2i.to_string())
u3 = u2.get_readonly()
self.failUnlessReallyEqual(u3.to_string(), u2.to_string())
self.failUnless(str(u3))
u3i = uri.from_string(u2.to_string(), deep_immutable=True)
self.failUnlessReallyEqual(u2.to_string(), u3i.to_string())
u2_verifier = u2.get_verify_cap()
self.failUnless(isinstance(u2_verifier,
uri.ImmutableDirectoryURIVerifier),
u2_verifier)
self.failUnless(IVerifierURI.providedBy(u2_verifier))
u2vs = u2_verifier.to_string()
# URI:DIR2-CHK-Verifier:$key:$ueb:$k:$n:$size
self.failUnless(u2vs.startswith("URI:DIR2-CHK-Verifier:"), u2vs)
u2_verifier_fileuri = u2_verifier.get_filenode_cap()
self.failUnless(IVerifierURI.providedBy(u2_verifier_fileuri))
u2vfs = u2_verifier_fileuri.to_string()
# URI:CHK-Verifier:$key:$ueb:$k:$n:$size
self.failUnlessReallyEqual(u2vfs, fnuri.get_verify_cap().to_string())
self.failUnlessReallyEqual(u2vs[len("URI:DIR2-"):], u2vfs[len("URI:"):])
self.failUnless(str(u2_verifier))
def test_literal(self):
u0 = uri.LiteralFileURI("data")
u1 = uri.LiteralDirectoryURI(u0)
self.failUnless(str(u1))
self.failUnlessReallyEqual(u1.to_string(), "URI:DIR2-LIT:mrqxiyi")
self.failUnless(u1.is_readonly())
self.failIf(u1.is_mutable())
self.failUnless(IURI.providedBy(u1))
self.failIf(IFileURI.providedBy(u1))
self.failUnless(IDirnodeURI.providedBy(u1))
self.failUnlessReallyEqual(u1.get_verify_cap(), None)
self.failUnlessReallyEqual(u1.get_storage_index(), None)
self.failUnlessReallyEqual(u1.abbrev_si(), "<LIT>")
def test_mdmf(self):
writekey = "\x01" * 16
fingerprint = "\x02" * 32
uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
d1 = uri.MDMFDirectoryURI(uri1)
self.failIf(d1.is_readonly())
self.failUnless(d1.is_mutable())
self.failUnless(IURI.providedBy(d1))
self.failUnless(IDirnodeURI.providedBy(d1))
d1_uri = d1.to_string()
d2 = uri.from_string(d1_uri)
self.failUnlessIsInstance(d2, uri.MDMFDirectoryURI)
self.failIf(d2.is_readonly())
self.failUnless(d2.is_mutable())
self.failUnless(IURI.providedBy(d2))
self.failUnless(IDirnodeURI.providedBy(d2))
# It doesn't make sense to ask for a deep immutable URI for a
# mutable directory, and we should get back a result to that
# effect.
d3 = uri.from_string(d2.to_string(), deep_immutable=True)
self.failUnlessIsInstance(d3, uri.UnknownURI)
def test_mdmf_with_extensions(self):
writekey = "\x01" * 16
fingerprint = "\x02" * 32
uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
d1 = uri.MDMFDirectoryURI(uri1)
d1_uri = d1.to_string()
# Add some extensions, verify that the URI is interpreted
# correctly.
d1_uri += ":3:131073"
uri2 = uri.from_string(d1_uri)
self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
self.failUnless(IURI.providedBy(uri2))
self.failUnless(IDirnodeURI.providedBy(uri2))
self.failUnless(uri1.is_mutable())
self.failIf(uri1.is_readonly())
d2_uri = uri2.to_string()
self.failUnlessIn(":3:131073", d2_uri)
# Now attenuate, verify that the extensions persist
ro_uri = uri2.get_readonly()
self.failUnlessIsInstance(ro_uri, uri.ReadonlyMDMFDirectoryURI)
self.failUnless(ro_uri.is_mutable())
self.failUnless(ro_uri.is_readonly())
self.failUnless(IURI.providedBy(ro_uri))
self.failUnless(IDirnodeURI.providedBy(ro_uri))
ro_uri_str = ro_uri.to_string()
self.failUnlessIn(":3:131073", ro_uri_str)
def test_mdmf_attenuation(self):
writekey = "\x01" * 16
fingerprint = "\x02" * 32
uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
d1 = uri.MDMFDirectoryURI(uri1)
self.failUnless(d1.is_mutable())
self.failIf(d1.is_readonly())
self.failUnless(IURI.providedBy(d1))
self.failUnless(IDirnodeURI.providedBy(d1))
d1_uri = d1.to_string()
d1_uri_from_fn = uri.MDMFDirectoryURI(d1.get_filenode_cap()).to_string()
self.failUnlessEqual(d1_uri_from_fn, d1_uri)
uri2 = uri.from_string(d1_uri)
self.failUnlessIsInstance(uri2, uri.MDMFDirectoryURI)
self.failUnless(IURI.providedBy(uri2))
self.failUnless(IDirnodeURI.providedBy(uri2))
self.failUnless(uri2.is_mutable())
self.failIf(uri2.is_readonly())
ro = uri2.get_readonly()
self.failUnlessIsInstance(ro, uri.ReadonlyMDMFDirectoryURI)
self.failUnless(ro.is_mutable())
self.failUnless(ro.is_readonly())
self.failUnless(IURI.providedBy(ro))
self.failUnless(IDirnodeURI.providedBy(ro))
ro_uri = ro.to_string()
n = uri.from_string(ro_uri, deep_immutable=True)
self.failUnlessIsInstance(n, uri.UnknownURI)
fn_cap = ro.get_filenode_cap()
fn_ro_cap = fn_cap.get_readonly()
d3 = uri.ReadonlyMDMFDirectoryURI(fn_ro_cap)
self.failUnlessEqual(ro.to_string(), d3.to_string())
self.failUnless(ro.is_mutable())
self.failUnless(ro.is_readonly())
def test_mdmf_verifier(self):
# I'm not sure what I want to write here yet.
writekey = "\x01" * 16
fingerprint = "\x02" * 32
uri1 = uri.WritableMDMFFileURI(writekey, fingerprint)
d1 = uri.MDMFDirectoryURI(uri1)
v1 = d1.get_verify_cap()
self.failUnlessIsInstance(v1, uri.MDMFDirectoryURIVerifier)
self.failIf(v1.is_mutable())
d2 = uri.from_string(d1.to_string())
v2 = d2.get_verify_cap()
self.failUnlessIsInstance(v2, uri.MDMFDirectoryURIVerifier)
self.failIf(v2.is_mutable())
self.failUnlessEqual(v2.to_string(), v1.to_string())
# Now attenuate and make sure that works correctly.
r3 = d2.get_readonly()
v3 = r3.get_verify_cap()
self.failUnlessIsInstance(v3, uri.MDMFDirectoryURIVerifier)
self.failIf(v3.is_mutable())
self.failUnlessEqual(v3.to_string(), v1.to_string())
r4 = uri.from_string(r3.to_string())
v4 = r4.get_verify_cap()
self.failUnlessIsInstance(v4, uri.MDMFDirectoryURIVerifier)
self.failIf(v4.is_mutable())
self.failUnlessEqual(v4.to_string(), v3.to_string())