tahoe-lafs/src/allmydata/test/test_filenode.py

107 lines
3.9 KiB
Python
Raw Normal View History

from twisted.trial import unittest
from allmydata import filenode, uri, download
from allmydata.mutable.node import MutableFileNode
from allmydata.util import hashutil
class NotANode:
pass
class Node(unittest.TestCase):
def test_chk_filenode(self):
u = uri.CHKFileURI(key="\x00"*16,
uri_extension_hash="\x00"*32,
needed_shares=3,
total_shares=10,
size=1000)
c = None
fn1 = filenode.FileNode(u, c)
fn2 = filenode.FileNode(u.to_string(), c)
self.failUnlessEqual(fn1, fn2)
self.failIfEqual(fn1, "I am not a filenode")
self.failIfEqual(fn1, NotANode())
self.failUnlessEqual(fn1.get_uri(), u.to_string())
self.failUnlessEqual(fn1.is_readonly(), True)
self.failUnlessEqual(fn1.get_readonly_uri(), u.to_string())
self.failUnlessEqual(fn1.get_size(), 1000)
d = {}
d[fn1] = 1 # exercise __hash__
v = fn1.get_verifier()
self.failUnless(isinstance(v, uri.CHKFileVerifierURI))
def test_literal_filenode(self):
DATA = "I am a short file."
u = uri.LiteralFileURI(data=DATA)
c = None
fn1 = filenode.LiteralFileNode(u, c)
fn2 = filenode.LiteralFileNode(u.to_string(), c)
self.failUnlessEqual(fn1, fn2)
self.failIfEqual(fn1, "I am not a filenode")
self.failIfEqual(fn1, NotANode())
self.failUnlessEqual(fn1.get_uri(), u.to_string())
self.failUnlessEqual(fn1.is_readonly(), True)
self.failUnlessEqual(fn1.get_readonly_uri(), u.to_string())
self.failUnlessEqual(fn1.get_size(), len(DATA))
d = {}
d[fn1] = 1 # exercise __hash__
v = fn1.get_verifier()
self.failUnlessEqual(v, None)
self.failUnlessEqual(fn1.check(), None)
target = download.Data()
d = fn1.download(target)
def _check(res):
self.failUnlessEqual(res, DATA)
d.addCallback(_check)
d.addCallback(lambda res: fn1.download_to_data())
d.addCallback(_check)
return d
def test_mutable_filenode(self):
client = None
wk = "\x00"*16
fp = "\x00"*32
rk = hashutil.ssk_readkey_hash(wk)
si = hashutil.ssk_storage_index_hash(rk)
u = uri.WriteableSSKFileURI("\x00"*16, "\x00"*32)
n = MutableFileNode(client).init_from_uri(u)
self.failUnlessEqual(n.get_writekey(), wk)
self.failUnlessEqual(n.get_readkey(), rk)
self.failUnlessEqual(n.get_storage_index(), si)
# these itmes are populated on first read (or create), so until that
# happens they'll be None
self.failUnlessEqual(n.get_privkey(), None)
self.failUnlessEqual(n.get_encprivkey(), None)
self.failUnlessEqual(n.get_pubkey(), None)
self.failUnlessEqual(n.get_uri(), u.to_string())
self.failUnlessEqual(n.get_readonly_uri(), u.get_readonly().to_string())
self.failUnlessEqual(n.is_mutable(), True)
self.failUnlessEqual(n.is_readonly(), False)
n2 = MutableFileNode(client).init_from_uri(u)
self.failUnlessEqual(n, n2)
self.failIfEqual(n, "not even the right type")
self.failIfEqual(n, u) # not the right class
d = {n: "can these be used as dictionary keys?"}
d[n2] = "replace the old one"
self.failUnlessEqual(len(d), 1)
nro = n.get_readonly()
self.failUnless(isinstance(nro, MutableFileNode))
self.failUnlessEqual(nro.get_readonly(), nro)
nro_u = nro.get_uri()
self.failUnlessEqual(nro_u, nro.get_readonly_uri())
self.failUnlessEqual(nro_u, u.get_readonly().to_string())
self.failUnlessEqual(nro.is_mutable(), True)
self.failUnlessEqual(nro.is_readonly(), True)
v = n.get_verifier()
self.failUnless(isinstance(v, uri.SSKVerifierURI))