refactor constructing URI objects from strings and add human-encoding of WriteableSSKFileURI

This commit is contained in:
Zooko O'Whielacronx 2008-01-02 22:12:15 -07:00
parent 02202c76fe
commit eae499539d

View File

@ -18,7 +18,7 @@ ZBASE32STR_256bits = "%s{51}%s" % (ZBASE32CHAR, ZBASE32CHAR_1bits)
COLON="(:|%3A)"
# Writeable SSK bits
WSSKBITS= "%s%s%s" % (ZBASE32STR_128bits, COLON, ZBASE32STR_256bits)
WSSKBITS= "(%s)%s(%s)" % (ZBASE32STR_128bits, COLON, ZBASE32STR_256bits)
# URIs (soon to be renamed "caps") are always allowed to come with a leading
# "http://127.0.0.1:8123/uri/" that will be ignored.
@ -59,7 +59,8 @@ class CHKFileURI(_BaseURI):
% name)
self.storage_index = hashutil.storage_index_chk_hash(self.key)
def init_from_string(self, uri):
@classmethod
def init_from_string(cls, uri):
assert uri.startswith("URI:CHK:"), uri
d = {}
(header_uri, header_chk,
@ -68,22 +69,24 @@ class CHKFileURI(_BaseURI):
assert header_uri == "URI"
assert header_chk == "CHK"
self.key = idlib.a2b(key_s)
assert isinstance(self.key, str)
assert len(self.key) == 16 # AES-128
key = idlib.a2b(key_s)
assert isinstance(key, str)
assert len(key) == 16 # AES-128
self.storage_index = hashutil.storage_index_chk_hash(self.key)
assert isinstance(self.storage_index, str)
assert len(self.storage_index) == 16 # sha256 hash truncated to 128
storage_index = hashutil.storage_index_chk_hash(key)
assert isinstance(storage_index, str)
assert len(storage_index) == 16 # sha256 hash truncated to 128
self.uri_extension_hash = idlib.a2b(uri_extension_hash_s)
assert isinstance(self.uri_extension_hash, str)
assert len(self.uri_extension_hash) == 32 # sha56 hash
uri_extension_hash = idlib.a2b(uri_extension_hash_s)
assert isinstance(uri_extension_hash, str)
assert len(uri_extension_hash) == 32 # sha256 hash
self.needed_shares = int(needed_shares_s)
self.total_shares = int(total_shares_s)
self.size = int(size_s)
return self
needed_shares = int(needed_shares_s)
total_shares = int(total_shares_s)
size = int(size_s)
return cls(key=key, uri_extension_hash=uri_extension_hash,
needed_shares=needed_shares, total_shares=total_shares,
size=size)
def to_string(self):
assert isinstance(self.needed_shares, int)
@ -131,7 +134,8 @@ class CHKFileVerifierURI(_BaseURI):
self.total_shares = total_shares
self.size = size
def init_from_string(self, uri):
@classmethod
def init_from_string(cls, uri):
assert uri.startswith("URI:CHK-Verifier:"), uri
d = {}
(header_uri, header_chk,
@ -140,18 +144,23 @@ class CHKFileVerifierURI(_BaseURI):
assert header_uri == "URI"
assert header_chk == "CHK-Verifier"
self.storage_index = idlib.a2b(storage_index_s)
assert isinstance(self.storage_index, str)
assert len(self.storage_index) == 16 # sha256 hash truncated to 128
storage_index = idlib.a2b(storage_index_s)
assert isinstance(storage_index, str)
assert len(storage_index) == 16 # sha256 hash truncated to 128
self.uri_extension_hash = idlib.a2b(uri_extension_hash_s)
assert isinstance(self.uri_extension_hash, str)
assert len(self.uri_extension_hash) == 32 # sha56 hash
uri_extension_hash = idlib.a2b(uri_extension_hash_s)
assert isinstance(uri_extension_hash, str)
assert len(uri_extension_hash) == 32 # sha256 hash
self.needed_shares = int(needed_shares_s)
self.total_shares = int(total_shares_s)
self.size = int(size_s)
return self
needed_shares = int(needed_shares_s)
total_shares = int(total_shares_s)
size = int(size_s)
return cls(storage_index=storage_index,
uri_extension_hash=uri_extension_hash,
needed_shares=needed_shares,
total_shares=total_shares,
size=size)
def to_string(self):
assert isinstance(self.needed_shares, int)
@ -173,11 +182,11 @@ class LiteralFileURI(_BaseURI):
if data is not None:
self.data = data
def init_from_string(self, uri):
@classmethod
def init_from_string(cls, uri):
assert uri.startswith("URI:LIT:")
data_s = uri[len("URI:LIT:"):]
self.data = idlib.a2b(data_s)
return self
return cls(idlib.a2b(data_s))
def to_string(self):
return "URI:LIT:%s" % idlib.b2a(self.data)
@ -199,22 +208,28 @@ class LiteralFileURI(_BaseURI):
class WriteableSSKFileURI(_BaseURI):
implements(IURI, IMutableFileURI)
def __init__(self, *args, **kwargs):
if not args and not kwargs:
return
self.populate(*args, **kwargs)
def populate(self, writekey, fingerprint):
def __init__(self, writekey, fingerprint):
self.writekey = writekey
self.readkey = hashutil.ssk_readkey_hash(writekey)
self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
self.fingerprint = fingerprint
def init_from_string(self, uri):
@classmethod
def init_from_human_encoding(cls, uri):
mo = WriteableSSKFileURI_RE.search(uri)
assert mo
return cls(idlib.a2b(mo.group(5)), idlib.a2b(mo.group(7)))
@classmethod
def init_from_string(cls, uri):
assert uri.startswith("URI:SSK:"), uri
(header_uri, header_ssk, writekey_s, fingerprint_s) = uri.split(":")
self.populate(idlib.a2b(writekey_s), idlib.a2b(fingerprint_s))
return self
return cls(idlib.a2b(writekey_s), idlib.a2b(fingerprint_s))
def to_human_encoding(self):
assert isinstance(self.writekey, str)
assert isinstance(self.fingerprint, str)
return "http://127.0.0.1:8123/uri/URI:SSK:%s:%s" % (idlib.b2a(self.writekey), idlib.b2a(self.fingerprint))
def to_string(self):
assert isinstance(self.writekey, str)
@ -240,21 +255,16 @@ class WriteableSSKFileURI(_BaseURI):
class ReadonlySSKFileURI(_BaseURI):
implements(IURI, IMutableFileURI)
def __init__(self, *args, **kwargs):
if not args and not kwargs:
return
self.populate(*args, **kwargs)
def populate(self, readkey, fingerprint):
def __init__(self, readkey, fingerprint):
self.readkey = readkey
self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
self.fingerprint = fingerprint
def init_from_string(self, uri):
@classmethod
def init_from_string(cls, uri):
assert uri.startswith("URI:SSK-RO:"), uri
(header_uri, header_ssk, readkey_s, fingerprint_s) = uri.split(":")
self.populate(idlib.a2b(readkey_s), idlib.a2b(fingerprint_s))
return self
return cls(idlib.a2b(readkey_s), idlib.a2b(fingerprint_s))
def to_string(self):
assert isinstance(self.readkey, str)
@ -280,21 +290,16 @@ class ReadonlySSKFileURI(_BaseURI):
class SSKVerifierURI(_BaseURI):
implements(IVerifierURI)
def __init__(self, *args, **kwargs):
if not args and not kwargs:
return
self.populate(*args, **kwargs)
def populate(self, storage_index, fingerprint):
def __init__(self, storage_index, fingerprint):
self.storage_index = storage_index
self.fingerprint = fingerprint
def init_from_string(self, uri):
@classmethod
def init_from_string(cls, uri):
assert uri.startswith("URI:SSK-Verifier:"), uri
(header_uri, header_ssk,
storage_index_s, fingerprint_s) = uri.split(":")
self.populate(idlib.a2b(storage_index_s), idlib.a2b(fingerprint_s))
return self
return cls(idlib.a2b(storage_index_s), idlib.a2b(fingerprint_s))
def to_string(self):
assert isinstance(self.storage_index, str)
@ -329,13 +334,12 @@ class NewDirectoryURI(_NewDirectoryBaseURI):
assert not filenode_uri.is_readonly()
_NewDirectoryBaseURI.__init__(self, filenode_uri)
def init_from_string(self, uri):
@classmethod
def init_from_string(cls, uri):
assert uri.startswith("URI:DIR2:")
(header_uri, header_dir2, bits) = uri.split(":", 2)
fn = WriteableSSKFileURI()
fn.init_from_string("URI:SSK:" + bits)
self._filenode_uri = fn
return self
fn = WriteableSSKFileURI.init_from_string("URI:SSK:" + bits)
return cls(fn)
def to_string(self):
assert isinstance(self._filenode_uri, WriteableSSKFileURI)
@ -356,13 +360,12 @@ class ReadonlyNewDirectoryURI(_NewDirectoryBaseURI):
assert filenode_uri.is_readonly()
_NewDirectoryBaseURI.__init__(self, filenode_uri)
def init_from_string(self, uri):
@classmethod
def init_from_string(cls, uri):
assert uri.startswith("URI:DIR2-RO:")
(header_uri, header_dir2, bits) = uri.split(":", 2)
fn = ReadonlySSKFileURI()
fn.init_from_string("URI:SSK-RO:" + bits)
self._filenode_uri = fn
return self
fn = ReadonlySSKFileURI.init_from_string("URI:SSK-RO:" + bits)
return cls(fn)
def to_string(self):
assert isinstance(self._filenode_uri, ReadonlySSKFileURI)
@ -384,13 +387,12 @@ class NewDirectoryURIVerifier(_BaseURI):
filenode_uri = IVerifierURI(filenode_uri)
self._filenode_uri = filenode_uri
def init_from_string(self, uri):
@classmethod
def init_from_string(cls, uri):
assert uri.startswith("URI:DIR2-Verifier:")
(header_uri, header_dir2, bits) = uri.split(":", 2)
fn = SSKVerifierURI()
fn.init_from_string("URI:SSK-Verifier:" + bits)
self._filenode_uri = fn
return self
fv = SSKVerifierURI.init_from_string("URI:SSK-Verifier:" + bits)
return cls(fv)
def to_string(self):
assert isinstance(self._filenode_uri, SSKVerifierURI)
@ -403,27 +405,25 @@ class NewDirectoryURIVerifier(_BaseURI):
def from_string(s):
if s.startswith("URI:CHK:"):
return CHKFileURI().init_from_string(s)
return CHKFileURI.init_from_string(s)
elif s.startswith("URI:CHK-Verifier:"):
return CHKFileVerifierURI().init_from_string(s)
return CHKFileVerifierURI.init_from_string(s)
elif s.startswith("URI:LIT:"):
return LiteralFileURI().init_from_string(s)
return LiteralFileURI.init_from_string(s)
elif s.startswith("URI:SSK:"):
return WriteableSSKFileURI().init_from_string(s)
return WriteableSSKFileURI.init_from_string(s)
elif s.startswith("URI:SSK-RO:"):
return ReadonlySSKFileURI().init_from_string(s)
return ReadonlySSKFileURI.init_from_string(s)
elif s.startswith("URI:SSK-Verifier:"):
return SSKVerifierURI().init_from_string(s)
return SSKVerifierURI.init_from_string(s)
elif s.startswith("URI:DIR2:"):
return NewDirectoryURI().init_from_string(s)
return NewDirectoryURI.init_from_string(s)
elif s.startswith("URI:DIR2-RO:"):
return ReadonlyNewDirectoryURI().init_from_string(s)
return ReadonlyNewDirectoryURI.init_from_string(s)
elif s.startswith("URI:DIR2-Verifier:"):
return NewDirectoryURIVerifier().init_from_string(s)
return NewDirectoryURIVerifier.init_from_string(s)
else:
raise TypeError("unknown URI type: %s.." % s[:12])