mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-01 00:45:52 +00:00
add human-encodings of caps, refactor encodings of caps, tighten checks, fix unit tests to use values of the right size
This commit is contained in:
parent
46db7cf1dc
commit
eb373c0001
@ -128,7 +128,9 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
|
||||
if os.path.exists(privdirfile):
|
||||
try:
|
||||
theuri = open(privdirfile, "r").read().strip()
|
||||
if not uri.is_string_newdirnode_rw(theuri):
|
||||
try:
|
||||
uri.NewDirectoryURI.init_from_human_encoding(theuri)
|
||||
except:
|
||||
raise EnvironmentError("not a well-formed mutable directory uri")
|
||||
except EnvironmentError, le:
|
||||
d = self.when_tub_ready()
|
||||
|
@ -20,7 +20,7 @@ class Marker:
|
||||
if not isinstance(nodeuri, str):
|
||||
nodeuri = nodeuri.to_string()
|
||||
self.nodeuri = nodeuri
|
||||
si = hashutil.tagged_hash("tag1", nodeuri)
|
||||
si = hashutil.tagged_hash("tag1", nodeuri)[:16]
|
||||
fp = hashutil.tagged_hash("tag2", nodeuri)
|
||||
self.verifieruri = uri.SSKVerifierURI(storage_index=si,
|
||||
fingerprint=fp).to_string()
|
||||
|
@ -45,8 +45,8 @@ class Compare(unittest.TestCase):
|
||||
def test_compare(self):
|
||||
lit1 = uri.LiteralFileURI("some data")
|
||||
fileURI = 'URI:CHK:f3mf6az85wpcai8ma4qayfmxuc:nnw518w5hu3t5oohwtp7ah9n81z9rfg6c1ywk33ia3m64o67nsgo:3:10:345834'
|
||||
chk1 = uri.CHKFileURI().init_from_string(fileURI)
|
||||
chk2 = uri.CHKFileURI().init_from_string(fileURI)
|
||||
chk1 = uri.CHKFileURI.init_from_string(fileURI)
|
||||
chk2 = uri.CHKFileURI.init_from_string(fileURI)
|
||||
self.failIfEqual(lit1, chk1)
|
||||
self.failUnlessEqual(chk1, chk2)
|
||||
self.failIfEqual(chk1, "not actually a URI")
|
||||
@ -168,12 +168,13 @@ class Invalid(unittest.TestCase):
|
||||
class Constraint(unittest.TestCase):
|
||||
def test_constraint(self):
|
||||
good="http://127.0.0.1:8123/uri/URI%3ADIR2%3Aqo8ayna47cpw3rx3kho3mu7q4h%3Abk9qbgx76gh6eyj5ps8p6buz8fffw1ofc37e9w9d6ncsfpuz7icy/"
|
||||
self.failUnless(uri.DirnodeURI_RE.search(good))
|
||||
uri.NewDirectoryURI.init_from_human_encoding(good)
|
||||
self.failUnlessRaises(AssertionError, uri.NewDirectoryURI.init_from_string, good)
|
||||
bad = good + '==='
|
||||
self.failIf(uri.DirnodeURI_RE.search(bad))
|
||||
self.failUnlessRaises(AssertionError, uri.NewDirectoryURI.init_from_human_encoding, bad)
|
||||
self.failUnlessRaises(AssertionError, uri.NewDirectoryURI.init_from_string, bad)
|
||||
fileURI = 'URI:CHK:f3mf6az85wpcai8ma4qayfmxuc:nnw518w5hu3t5oohwtp7ah9n81z9rfg6c1ywk33ia3m64o67nsgo:3:10:345834'
|
||||
self.failIf(uri.DirnodeURI_RE.search(fileURI))
|
||||
|
||||
uri.CHKFileURI.init_from_string(fileURI)
|
||||
|
||||
class Mutable(unittest.TestCase):
|
||||
def test_pack(self):
|
||||
@ -239,7 +240,7 @@ class Mutable(unittest.TestCase):
|
||||
class NewDirnode(unittest.TestCase):
|
||||
def test_pack(self):
|
||||
writekey = "\x01" * 16
|
||||
fingerprint = "\x02" * 16
|
||||
fingerprint = "\x02" * 32
|
||||
|
||||
n = uri.WriteableSSKFileURI(writekey, fingerprint)
|
||||
u1 = uri.NewDirectoryURI(n)
|
||||
@ -301,16 +302,3 @@ class NewDirnode(unittest.TestCase):
|
||||
self.failUnless(IVerifierURI.providedBy(v))
|
||||
self.failUnlessEqual(v._filenode_uri,
|
||||
u1.get_verifier()._filenode_uri)
|
||||
|
||||
def test_is_string_newdirnode_rw(self):
|
||||
writekey = "\x01" * 16
|
||||
fingerprint = "\x02" * 32
|
||||
|
||||
n = uri.WriteableSSKFileURI(writekey, fingerprint)
|
||||
u1 = uri.NewDirectoryURI(n)
|
||||
|
||||
self.failUnless(uri.is_string_newdirnode_rw(u1.to_string()), u1.to_string())
|
||||
|
||||
self.failIf(uri.is_string_newdirnode_rw("bogus"))
|
||||
self.failIf(uri.is_string_newdirnode_rw("URI:DIR2:bogus"))
|
||||
|
||||
|
@ -1184,7 +1184,7 @@ class Web(WebMixin, unittest.TestCase):
|
||||
def test_POST_mkdir_no_parentdir_noredirect(self):
|
||||
d = self.POST("/uri?t=mkdir")
|
||||
def _after_mkdir(res):
|
||||
self.failUnless(uri.is_string_newdirnode_rw(res))
|
||||
uri.NewDirectoryURI.init_from_string(res)
|
||||
d.addCallback(_after_mkdir)
|
||||
return d
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
|
||||
import re
|
||||
import re, urllib
|
||||
from zope.interface import implements
|
||||
from twisted.python.components import registerAdapter
|
||||
from allmydata.util import idlib, hashutil
|
||||
@ -10,25 +10,15 @@ from allmydata.interfaces import IURI, IDirnodeURI, IFileURI, IVerifierURI, \
|
||||
# enough information to retrieve and validate the contents. It shall be
|
||||
# expressed in a limited character set (namely [TODO]).
|
||||
|
||||
ZBASE32CHAR = "[ybndrfg8ejkmcpqxot1uwisza345h769]" # excludes l, 0, 2, and v
|
||||
ZBASE32CHAR_3bits = "[yoearcwh]"
|
||||
ZBASE32CHAR_1bits = "[yo]"
|
||||
ZBASE32STR_128bits = "%s{25}%s" % (ZBASE32CHAR, ZBASE32CHAR_3bits)
|
||||
ZBASE32STR_256bits = "%s{51}%s" % (ZBASE32CHAR, ZBASE32CHAR_1bits)
|
||||
COLON="(:|%3A)"
|
||||
ZBASE32STR_128bits = '(%s{25}%s)' % (idlib.ZBASE32CHAR, idlib.ZBASE32CHAR_3bits)
|
||||
ZBASE32STR_256bits = '(%s{51}%s)' % (idlib.ZBASE32CHAR, idlib.ZBASE32CHAR_1bits)
|
||||
|
||||
# Writeable SSK bits
|
||||
WSSKBITS= "(%s)%s(%s)" % (ZBASE32STR_128bits, COLON, ZBASE32STR_256bits)
|
||||
SEP='(?::|%3A)'
|
||||
NUMBER='([0-9]+)'
|
||||
|
||||
# URIs (soon to be renamed "caps") are always allowed to come with a leading
|
||||
# "http://127.0.0.1:8123/uri/" that will be ignored.
|
||||
OPTIONALHTTPLEAD=r'(https?://(127.0.0.1|localhost):8123/uri/)?'
|
||||
|
||||
# Writeable SSK URI
|
||||
WriteableSSKFileURI_RE=re.compile("^%sURI%sSSK%s%s$" % (OPTIONALHTTPLEAD, COLON, COLON, WSSKBITS))
|
||||
|
||||
# NewDirectory Read-Write URI
|
||||
DirnodeURI_RE=re.compile("^%sURI%sDIR2%s%s/?$" % (OPTIONALHTTPLEAD, COLON, COLON, WSSKBITS))
|
||||
# 'http://127.0.0.1:8123/uri/' that will be ignored.
|
||||
OPTIONALHTTPLEAD=r'(?:https?://(127.0.0.1|localhost):8123/uri/)?'
|
||||
|
||||
|
||||
class _BaseURI:
|
||||
@ -40,60 +30,51 @@ class _BaseURI:
|
||||
if cmp(self.__class__, them.__class__):
|
||||
return cmp(self.__class__, them.__class__)
|
||||
return cmp(self.to_string(), them.to_string())
|
||||
def to_human_encoding(self):
|
||||
return 'http://127.0.0.1:8123/uri/'+self.to_string()
|
||||
|
||||
class CHKFileURI(_BaseURI):
|
||||
implements(IURI, IFileURI)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
# construct me with kwargs, since there are so many of them
|
||||
if not kwargs:
|
||||
return
|
||||
keys = ("key", "uri_extension_hash",
|
||||
"needed_shares", "total_shares", "size")
|
||||
for name in kwargs:
|
||||
if name in keys:
|
||||
value = kwargs[name]
|
||||
setattr(self, name, value)
|
||||
else:
|
||||
raise TypeError("CHKFileURI does not accept '%s=' argument"
|
||||
% name)
|
||||
STRING_RE=re.compile('^URI:CHK:'+ZBASE32STR_128bits+':'+
|
||||
ZBASE32STR_256bits+':'+NUMBER+':'+NUMBER+':'+NUMBER+
|
||||
'$')
|
||||
HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'CHK'+SEP+
|
||||
ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+SEP+NUMBER+
|
||||
SEP+NUMBER+SEP+NUMBER+'$')
|
||||
|
||||
def __init__(self, key, uri_extension_hash, needed_shares, total_shares,
|
||||
size):
|
||||
self.key = key
|
||||
self.uri_extension_hash = uri_extension_hash
|
||||
self.needed_shares = needed_shares
|
||||
self.total_shares = total_shares
|
||||
self.size = size
|
||||
self.storage_index = hashutil.storage_index_chk_hash(self.key)
|
||||
assert len(self.storage_index) == 16
|
||||
self.storage_index = hashutil.storage_index_chk_hash(key)
|
||||
assert len(self.storage_index) == 16 # sha256 hash truncated to 128
|
||||
|
||||
@classmethod
|
||||
def init_from_human_encoding(cls, uri):
|
||||
mo = cls.HUMAN_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)),
|
||||
int(mo.group(3)), int(mo.group(4)), int(mo.group(5)))
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
assert uri.startswith("URI:CHK:"), uri
|
||||
d = {}
|
||||
(header_uri, header_chk,
|
||||
key_s, uri_extension_hash_s,
|
||||
needed_shares_s, total_shares_s, size_s) = uri.split(":")
|
||||
assert header_uri == "URI"
|
||||
assert header_chk == "CHK"
|
||||
|
||||
key = idlib.a2b(key_s)
|
||||
assert isinstance(key, str)
|
||||
assert len(key) == 16 # AES-128
|
||||
|
||||
storage_index = hashutil.storage_index_chk_hash(key)
|
||||
assert isinstance(storage_index, str)
|
||||
assert len(storage_index) == 16 # sha256 hash truncated to 128
|
||||
|
||||
uri_extension_hash = idlib.a2b(uri_extension_hash_s)
|
||||
assert isinstance(uri_extension_hash, str)
|
||||
assert len(uri_extension_hash) == 32 # sha256 hash
|
||||
|
||||
needed_shares = int(needed_shares_s)
|
||||
total_shares = int(total_shares_s)
|
||||
size = int(size_s)
|
||||
return cls(key=key, uri_extension_hash=uri_extension_hash,
|
||||
needed_shares=needed_shares, total_shares=total_shares,
|
||||
size=size)
|
||||
mo = cls.STRING_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)),
|
||||
int(mo.group(3)), int(mo.group(4)), int(mo.group(5)))
|
||||
|
||||
def to_string(self):
|
||||
assert isinstance(self.needed_shares, int)
|
||||
assert isinstance(self.total_shares, int)
|
||||
assert isinstance(self.size, (int,long))
|
||||
|
||||
return ("URI:CHK:%s:%s:%d:%d:%d" %
|
||||
return ('URI:CHK:%s:%s:%d:%d:%d' %
|
||||
(idlib.b2a(self.key),
|
||||
idlib.b2a(self.uri_extension_hash),
|
||||
self.needed_shares,
|
||||
@ -120,54 +101,41 @@ class CHKFileURI(_BaseURI):
|
||||
class CHKFileVerifierURI(_BaseURI):
|
||||
implements(IVerifierURI)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
# construct me with kwargs, since there are so many of them
|
||||
if not kwargs:
|
||||
return
|
||||
self.populate(**kwargs)
|
||||
STRING_RE=re.compile('^URI:CHK-Verifier:'+ZBASE32STR_128bits+':'+
|
||||
ZBASE32STR_256bits+':'+NUMBER+':'+NUMBER+':'+NUMBER)
|
||||
HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'CHK-Verifier'+SEP+
|
||||
ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+SEP+NUMBER+
|
||||
SEP+NUMBER+SEP+NUMBER)
|
||||
|
||||
def populate(self, storage_index, uri_extension_hash,
|
||||
def __init__(self, storage_index, uri_extension_hash,
|
||||
needed_shares, total_shares, size):
|
||||
assert len(storage_index) == 16
|
||||
self.storage_index = storage_index
|
||||
self.uri_extension_hash = uri_extension_hash
|
||||
self.needed_shares = needed_shares
|
||||
self.total_shares = total_shares
|
||||
self.size = size
|
||||
|
||||
@classmethod
|
||||
def init_from_human_encoding(cls, uri):
|
||||
mo = cls.HUMAN_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)),
|
||||
int(mo.group(3)), int(mo.group(4)), int(mo.group(5)))
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
assert uri.startswith("URI:CHK-Verifier:"), uri
|
||||
d = {}
|
||||
(header_uri, header_chk,
|
||||
storage_index_s, uri_extension_hash_s,
|
||||
needed_shares_s, total_shares_s, size_s) = uri.split(":")
|
||||
assert header_uri == "URI"
|
||||
assert header_chk == "CHK-Verifier"
|
||||
|
||||
storage_index = idlib.a2b(storage_index_s)
|
||||
assert isinstance(storage_index, str)
|
||||
assert len(storage_index) == 16 # sha256 hash truncated to 128
|
||||
|
||||
uri_extension_hash = idlib.a2b(uri_extension_hash_s)
|
||||
assert isinstance(uri_extension_hash, str)
|
||||
assert len(uri_extension_hash) == 32 # sha256 hash
|
||||
|
||||
needed_shares = int(needed_shares_s)
|
||||
total_shares = int(total_shares_s)
|
||||
size = int(size_s)
|
||||
|
||||
return cls(storage_index=storage_index,
|
||||
uri_extension_hash=uri_extension_hash,
|
||||
needed_shares=needed_shares,
|
||||
total_shares=total_shares,
|
||||
size=size)
|
||||
mo = cls.STRING_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)),
|
||||
int(mo.group(3)), int(mo.group(4)), int(mo.group(5)))
|
||||
|
||||
def to_string(self):
|
||||
assert isinstance(self.needed_shares, int)
|
||||
assert isinstance(self.total_shares, int)
|
||||
assert isinstance(self.size, (int,long))
|
||||
|
||||
return ("URI:CHK-Verifier:%s:%s:%d:%d:%d" %
|
||||
return ('URI:CHK-Verifier:%s:%s:%d:%d:%d' %
|
||||
(idlib.b2a(self.storage_index),
|
||||
idlib.b2a(self.uri_extension_hash),
|
||||
self.needed_shares,
|
||||
@ -178,18 +146,27 @@ class CHKFileVerifierURI(_BaseURI):
|
||||
class LiteralFileURI(_BaseURI):
|
||||
implements(IURI, IFileURI)
|
||||
|
||||
STRING_RE=re.compile('^URI:LIT:'+idlib.ZBASE32STR_anybytes+'$')
|
||||
HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'LIT'+SEP+idlib.ZBASE32STR_anybytes+'$')
|
||||
|
||||
def __init__(self, data=None):
|
||||
if data is not None:
|
||||
self.data = data
|
||||
|
||||
@classmethod
|
||||
def init_from_human_encoding(cls, uri):
|
||||
mo = cls.HUMAN_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)))
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
assert uri.startswith("URI:LIT:")
|
||||
data_s = uri[len("URI:LIT:"):]
|
||||
return cls(idlib.a2b(data_s))
|
||||
mo = cls.STRING_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)))
|
||||
|
||||
def to_string(self):
|
||||
return "URI:LIT:%s" % idlib.b2a(self.data)
|
||||
return 'URI:LIT:%s' % idlib.b2a(self.data)
|
||||
|
||||
def is_readonly(self):
|
||||
return True
|
||||
@ -208,33 +185,35 @@ class LiteralFileURI(_BaseURI):
|
||||
class WriteableSSKFileURI(_BaseURI):
|
||||
implements(IURI, IMutableFileURI)
|
||||
|
||||
BASE_STRING='URI:SSK:'
|
||||
STRING_RE=re.compile('^'+BASE_STRING+ZBASE32STR_128bits+':'+
|
||||
ZBASE32STR_256bits+'$')
|
||||
HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK'+SEP+
|
||||
ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+'$')
|
||||
|
||||
def __init__(self, writekey, fingerprint):
|
||||
self.writekey = writekey
|
||||
self.readkey = hashutil.ssk_readkey_hash(writekey)
|
||||
self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
|
||||
assert len(self.storage_index) == 16
|
||||
self.fingerprint = fingerprint
|
||||
|
||||
@classmethod
|
||||
def init_from_human_encoding(cls, uri):
|
||||
mo = WriteableSSKFileURI_RE.search(uri)
|
||||
assert mo
|
||||
return cls(idlib.a2b(mo.group(5)), idlib.a2b(mo.group(7)))
|
||||
mo = cls.HUMAN_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)))
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
assert uri.startswith("URI:SSK:"), uri
|
||||
(header_uri, header_ssk, writekey_s, fingerprint_s) = uri.split(":")
|
||||
return cls(idlib.a2b(writekey_s), idlib.a2b(fingerprint_s))
|
||||
|
||||
def to_human_encoding(self):
|
||||
assert isinstance(self.writekey, str)
|
||||
assert isinstance(self.fingerprint, str)
|
||||
return "http://127.0.0.1:8123/uri/URI:SSK:%s:%s" % (idlib.b2a(self.writekey), idlib.b2a(self.fingerprint))
|
||||
mo = cls.STRING_RE.search(uri)
|
||||
assert mo, (uri, cls)
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)))
|
||||
|
||||
def to_string(self):
|
||||
assert isinstance(self.writekey, str)
|
||||
assert isinstance(self.fingerprint, str)
|
||||
return "URI:SSK:%s:%s" % (idlib.b2a(self.writekey),
|
||||
return 'URI:SSK:%s:%s' % (idlib.b2a(self.writekey),
|
||||
idlib.b2a(self.fingerprint))
|
||||
|
||||
def __repr__(self):
|
||||
@ -255,21 +234,32 @@ class WriteableSSKFileURI(_BaseURI):
|
||||
class ReadonlySSKFileURI(_BaseURI):
|
||||
implements(IURI, IMutableFileURI)
|
||||
|
||||
BASE_STRING='URI:SSK-RO:'
|
||||
STRING_RE=re.compile('^URI:SSK-RO:'+ZBASE32STR_128bits+':'+ZBASE32STR_256bits+'$')
|
||||
HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK-RO'+SEP+ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+'$')
|
||||
|
||||
def __init__(self, readkey, fingerprint):
|
||||
self.readkey = readkey
|
||||
self.storage_index = hashutil.ssk_storage_index_hash(self.readkey)
|
||||
assert len(self.storage_index) == 16
|
||||
self.fingerprint = fingerprint
|
||||
|
||||
@classmethod
|
||||
def init_from_human_encoding(cls, uri):
|
||||
mo = cls.HUMAN_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)))
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
assert uri.startswith("URI:SSK-RO:"), uri
|
||||
(header_uri, header_ssk, readkey_s, fingerprint_s) = uri.split(":")
|
||||
return cls(idlib.a2b(readkey_s), idlib.a2b(fingerprint_s))
|
||||
mo = cls.STRING_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)))
|
||||
|
||||
def to_string(self):
|
||||
assert isinstance(self.readkey, str)
|
||||
assert isinstance(self.fingerprint, str)
|
||||
return "URI:SSK-RO:%s:%s" % (idlib.b2a(self.readkey),
|
||||
return 'URI:SSK-RO:%s:%s' % (idlib.b2a(self.readkey),
|
||||
idlib.b2a(self.fingerprint))
|
||||
|
||||
def __repr__(self):
|
||||
@ -290,21 +280,31 @@ class ReadonlySSKFileURI(_BaseURI):
|
||||
class SSKVerifierURI(_BaseURI):
|
||||
implements(IVerifierURI)
|
||||
|
||||
BASE_STRING='URI:SSK-Verifier:'
|
||||
STRING_RE=re.compile('^'+BASE_STRING+ZBASE32STR_128bits+':'+ZBASE32STR_256bits+'$')
|
||||
HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'SSK-RO'+SEP+ZBASE32STR_128bits+SEP+ZBASE32STR_256bits+'$')
|
||||
|
||||
def __init__(self, storage_index, fingerprint):
|
||||
assert len(storage_index) == 16
|
||||
self.storage_index = storage_index
|
||||
self.fingerprint = fingerprint
|
||||
|
||||
@classmethod
|
||||
def init_from_human_encoding(cls, uri):
|
||||
mo = cls.HUMAN_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)))
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
assert uri.startswith("URI:SSK-Verifier:"), uri
|
||||
(header_uri, header_ssk,
|
||||
storage_index_s, fingerprint_s) = uri.split(":")
|
||||
return cls(idlib.a2b(storage_index_s), idlib.a2b(fingerprint_s))
|
||||
mo = cls.STRING_RE.search(uri)
|
||||
assert mo, uri
|
||||
return cls(idlib.a2b(mo.group(1)), idlib.a2b(mo.group(2)))
|
||||
|
||||
def to_string(self):
|
||||
assert isinstance(self.storage_index, str)
|
||||
assert isinstance(self.fingerprint, str)
|
||||
return "URI:SSK-Verifier:%s:%s" % (idlib.b2a(self.storage_index),
|
||||
return 'URI:SSK-Verifier:%s:%s' % (idlib.b2a(self.storage_index),
|
||||
idlib.b2a(self.fingerprint))
|
||||
|
||||
class _NewDirectoryBaseURI(_BaseURI):
|
||||
@ -315,6 +315,33 @@ class _NewDirectoryBaseURI(_BaseURI):
|
||||
def __repr__(self):
|
||||
return "<%s %s>" % (self.__class__.__name__, self.abbrev())
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
mo = cls.BASE_STRING_RE.search(uri)
|
||||
assert mo, (uri, cls)
|
||||
bits = uri[mo.end():]
|
||||
fn = cls.INNER_URI_CLASS.init_from_string(
|
||||
cls.INNER_URI_CLASS.BASE_STRING+bits)
|
||||
return cls(fn)
|
||||
|
||||
@classmethod
|
||||
def init_from_human_encoding(cls, uri):
|
||||
mo = cls.BASE_HUMAN_RE.search(uri)
|
||||
assert mo, uri
|
||||
bits = uri[mo.end():]
|
||||
while bits and bits[-1] == '/':
|
||||
bits = bits[:-1]
|
||||
fn = cls.INNER_URI_CLASS.init_from_string(
|
||||
cls.INNER_URI_CLASS.BASE_STRING+urllib.unquote(bits))
|
||||
return cls(fn)
|
||||
|
||||
def to_string(self):
|
||||
fnuri = self._filenode_uri.to_string()
|
||||
mo = re.match(self.INNER_URI_CLASS.BASE_STRING, fnuri)
|
||||
assert mo, fnuri
|
||||
bits = fnuri[mo.end():]
|
||||
return self.BASE_STRING+bits
|
||||
|
||||
def abbrev(self):
|
||||
return self._filenode_uri.to_string().split(':')[2][:5]
|
||||
|
||||
@ -329,24 +356,17 @@ class _NewDirectoryBaseURI(_BaseURI):
|
||||
|
||||
class NewDirectoryURI(_NewDirectoryBaseURI):
|
||||
implements(INewDirectoryURI)
|
||||
|
||||
BASE_STRING='URI:DIR2:'
|
||||
BASE_STRING_RE=re.compile('^'+BASE_STRING)
|
||||
BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2'+SEP)
|
||||
INNER_URI_CLASS=WriteableSSKFileURI
|
||||
|
||||
def __init__(self, filenode_uri=None):
|
||||
if filenode_uri:
|
||||
assert not filenode_uri.is_readonly()
|
||||
_NewDirectoryBaseURI.__init__(self, filenode_uri)
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
assert uri.startswith("URI:DIR2:")
|
||||
(header_uri, header_dir2, bits) = uri.split(":", 2)
|
||||
fn = WriteableSSKFileURI.init_from_string("URI:SSK:" + bits)
|
||||
return cls(fn)
|
||||
|
||||
def to_string(self):
|
||||
assert isinstance(self._filenode_uri, WriteableSSKFileURI)
|
||||
fn_u = self._filenode_uri.to_string()
|
||||
(header_uri, header_ssk, bits) = fn_u.split(":", 2)
|
||||
return "URI:DIR2:" + bits
|
||||
|
||||
def is_readonly(self):
|
||||
return False
|
||||
|
||||
@ -355,74 +375,59 @@ class NewDirectoryURI(_NewDirectoryBaseURI):
|
||||
|
||||
class ReadonlyNewDirectoryURI(_NewDirectoryBaseURI):
|
||||
implements(IReadonlyNewDirectoryURI)
|
||||
|
||||
BASE_STRING='URI:DIR2-RO:'
|
||||
BASE_STRING_RE=re.compile('^'+BASE_STRING)
|
||||
BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-RO'+SEP)
|
||||
INNER_URI_CLASS=ReadonlySSKFileURI
|
||||
|
||||
def __init__(self, filenode_uri=None):
|
||||
if filenode_uri:
|
||||
assert filenode_uri.is_readonly()
|
||||
_NewDirectoryBaseURI.__init__(self, filenode_uri)
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
assert uri.startswith("URI:DIR2-RO:")
|
||||
(header_uri, header_dir2, bits) = uri.split(":", 2)
|
||||
fn = ReadonlySSKFileURI.init_from_string("URI:SSK-RO:" + bits)
|
||||
return cls(fn)
|
||||
|
||||
def to_string(self):
|
||||
assert isinstance(self._filenode_uri, ReadonlySSKFileURI)
|
||||
fn_u = self._filenode_uri.to_string()
|
||||
(header_uri, header_ssk, bits) = fn_u.split(":", 2)
|
||||
return "URI:DIR2-RO:" + bits
|
||||
|
||||
def is_readonly(self):
|
||||
return True
|
||||
|
||||
def get_readonly(self):
|
||||
return self
|
||||
|
||||
class NewDirectoryURIVerifier(_BaseURI):
|
||||
class NewDirectoryURIVerifier(_NewDirectoryBaseURI):
|
||||
implements(IVerifierURI)
|
||||
|
||||
BASE_STRING='URI:DIR2-Verifier:'
|
||||
BASE_STRING_RE=re.compile('^'+BASE_STRING)
|
||||
BASE_HUMAN_RE=re.compile('^'+OPTIONALHTTPLEAD+'URI'+SEP+'DIR2-Verifier'+SEP)
|
||||
INNER_URI_CLASS=SSKVerifierURI
|
||||
|
||||
def __init__(self, filenode_uri=None):
|
||||
if filenode_uri:
|
||||
filenode_uri = IVerifierURI(filenode_uri)
|
||||
self._filenode_uri = filenode_uri
|
||||
|
||||
@classmethod
|
||||
def init_from_string(cls, uri):
|
||||
assert uri.startswith("URI:DIR2-Verifier:")
|
||||
(header_uri, header_dir2, bits) = uri.split(":", 2)
|
||||
fv = SSKVerifierURI.init_from_string("URI:SSK-Verifier:" + bits)
|
||||
return cls(fv)
|
||||
|
||||
def to_string(self):
|
||||
assert isinstance(self._filenode_uri, SSKVerifierURI)
|
||||
fn_u = self._filenode_uri.to_string()
|
||||
(header_uri, header_ssk, bits) = fn_u.split(":", 2)
|
||||
return "URI:DIR2-Verifier:" + bits
|
||||
|
||||
def get_filenode_uri(self):
|
||||
return self._filenode_uri
|
||||
|
||||
|
||||
|
||||
def from_string(s):
|
||||
if s.startswith("URI:CHK:"):
|
||||
if s.startswith('URI:CHK:'):
|
||||
return CHKFileURI.init_from_string(s)
|
||||
elif s.startswith("URI:CHK-Verifier:"):
|
||||
elif s.startswith('URI:CHK-Verifier:'):
|
||||
return CHKFileVerifierURI.init_from_string(s)
|
||||
elif s.startswith("URI:LIT:"):
|
||||
elif s.startswith('URI:LIT:'):
|
||||
return LiteralFileURI.init_from_string(s)
|
||||
elif s.startswith("URI:SSK:"):
|
||||
elif s.startswith('URI:SSK:'):
|
||||
return WriteableSSKFileURI.init_from_string(s)
|
||||
elif s.startswith("URI:SSK-RO:"):
|
||||
elif s.startswith('URI:SSK-RO:'):
|
||||
return ReadonlySSKFileURI.init_from_string(s)
|
||||
elif s.startswith("URI:SSK-Verifier:"):
|
||||
elif s.startswith('URI:SSK-Verifier:'):
|
||||
return SSKVerifierURI.init_from_string(s)
|
||||
elif s.startswith("URI:DIR2:"):
|
||||
elif s.startswith('URI:DIR2:'):
|
||||
return NewDirectoryURI.init_from_string(s)
|
||||
elif s.startswith("URI:DIR2-RO:"):
|
||||
elif s.startswith('URI:DIR2-RO:'):
|
||||
return ReadonlyNewDirectoryURI.init_from_string(s)
|
||||
elif s.startswith("URI:DIR2-Verifier:"):
|
||||
elif s.startswith('URI:DIR2-Verifier:'):
|
||||
return NewDirectoryURIVerifier.init_from_string(s)
|
||||
else:
|
||||
raise TypeError("unknown URI type: %s.." % s[:12])
|
||||
@ -436,9 +441,6 @@ def from_string_dirnode(s):
|
||||
|
||||
registerAdapter(from_string_dirnode, str, IDirnodeURI)
|
||||
|
||||
def is_string_newdirnode_rw(s):
|
||||
return DirnodeURI_RE.search(s)
|
||||
|
||||
def from_string_filenode(s):
|
||||
u = from_string(s)
|
||||
assert IFileURI.providedBy(u)
|
||||
@ -467,31 +469,31 @@ def pack_extension(data):
|
||||
value = "%d" % value
|
||||
assert isinstance(value, str), k
|
||||
assert re.match(r'^[a-zA-Z_\-]+$', k)
|
||||
pieces.append(k + ":" + hashutil.netstring(value))
|
||||
uri_extension = "".join(pieces)
|
||||
pieces.append(k + ':' + hashutil.netstring(value))
|
||||
uri_extension = ''.join(pieces)
|
||||
return uri_extension
|
||||
|
||||
def unpack_extension(data):
|
||||
d = {}
|
||||
while data:
|
||||
colon = data.index(":")
|
||||
colon = data.index(':')
|
||||
key = data[:colon]
|
||||
data = data[colon+1:]
|
||||
|
||||
colon = data.index(":")
|
||||
colon = data.index(':')
|
||||
number = data[:colon]
|
||||
length = int(number)
|
||||
data = data[colon+1:]
|
||||
|
||||
value = data[:length]
|
||||
assert data[length] == ","
|
||||
assert data[length] == ','
|
||||
data = data[length+1:]
|
||||
|
||||
d[key] = value
|
||||
|
||||
# convert certain things to numbers
|
||||
for intkey in ("size", "segment_size", "num_segments",
|
||||
"needed_shares", "total_shares"):
|
||||
for intkey in ('size', 'segment_size', 'num_segments',
|
||||
'needed_shares', 'total_shares'):
|
||||
if intkey in d:
|
||||
d[intkey] = int(d[intkey])
|
||||
return d
|
||||
@ -500,7 +502,7 @@ def unpack_extension(data):
|
||||
def unpack_extension_readable(data):
|
||||
unpacked = unpack_extension(data)
|
||||
for k in sorted(unpacked.keys()):
|
||||
if "hash" in k:
|
||||
if 'hash' in k:
|
||||
unpacked[k] = idlib.b2a(unpacked[k])
|
||||
return unpacked
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user