mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-31 18:27:00 +00:00
905 lines
38 KiB
Python
905 lines
38 KiB
Python
|
|
from twisted.trial import unittest
|
|
|
|
from twisted.internet import defer
|
|
import time, os.path, stat
|
|
import itertools
|
|
from allmydata import interfaces
|
|
from allmydata.util import fileutil, hashutil
|
|
from allmydata.storage import BucketWriter, BucketReader, \
|
|
WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
|
|
storage_index_to_dir
|
|
from allmydata.interfaces import BadWriteEnablerError
|
|
from allmydata.test.common import LoggingServiceParent
|
|
|
|
class FakeCanary:
|
|
def notifyOnDisconnect(self, *args, **kwargs):
|
|
pass
|
|
def dontNotifyOnDisconnect(self, marker):
|
|
pass
|
|
|
|
class Bucket(unittest.TestCase):
|
|
def make_workdir(self, name):
|
|
basedir = os.path.join("storage", "Bucket", name)
|
|
incoming = os.path.join(basedir, "tmp", "bucket")
|
|
final = os.path.join(basedir, "bucket")
|
|
fileutil.make_dirs(basedir)
|
|
fileutil.make_dirs(os.path.join(basedir, "tmp"))
|
|
return incoming, final
|
|
|
|
def bucket_writer_closed(self, bw, consumed):
|
|
pass
|
|
|
|
def make_lease(self):
|
|
owner_num = 0
|
|
renew_secret = os.urandom(32)
|
|
cancel_secret = os.urandom(32)
|
|
expiration_time = time.time() + 5000
|
|
return (owner_num, renew_secret, cancel_secret, expiration_time)
|
|
|
|
def test_create(self):
|
|
incoming, final = self.make_workdir("test_create")
|
|
bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
|
|
FakeCanary())
|
|
bw.remote_write(0, "a"*25)
|
|
bw.remote_write(25, "b"*25)
|
|
bw.remote_write(50, "c"*25)
|
|
bw.remote_write(75, "d"*7)
|
|
bw.remote_close()
|
|
|
|
def test_readwrite(self):
|
|
incoming, final = self.make_workdir("test_readwrite")
|
|
bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
|
|
FakeCanary())
|
|
bw.remote_write(0, "a"*25)
|
|
bw.remote_write(25, "b"*25)
|
|
bw.remote_write(50, "c"*7) # last block may be short
|
|
bw.remote_close()
|
|
|
|
# now read from it
|
|
br = BucketReader(bw.finalhome)
|
|
self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
|
|
self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
|
|
self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
|
|
|
|
class RemoteBucket:
|
|
|
|
def callRemote(self, methname, *args, **kwargs):
|
|
def _call():
|
|
meth = getattr(self.target, "remote_" + methname)
|
|
return meth(*args, **kwargs)
|
|
return defer.maybeDeferred(_call)
|
|
|
|
class BucketProxy(unittest.TestCase):
|
|
def make_bucket(self, name, size):
|
|
basedir = os.path.join("storage", "BucketProxy", name)
|
|
incoming = os.path.join(basedir, "tmp", "bucket")
|
|
final = os.path.join(basedir, "bucket")
|
|
fileutil.make_dirs(basedir)
|
|
fileutil.make_dirs(os.path.join(basedir, "tmp"))
|
|
bw = BucketWriter(self, incoming, final, size, self.make_lease(),
|
|
FakeCanary())
|
|
rb = RemoteBucket()
|
|
rb.target = bw
|
|
return bw, rb, final
|
|
|
|
def make_lease(self):
|
|
owner_num = 0
|
|
renew_secret = os.urandom(32)
|
|
cancel_secret = os.urandom(32)
|
|
expiration_time = time.time() + 5000
|
|
return (owner_num, renew_secret, cancel_secret, expiration_time)
|
|
|
|
def bucket_writer_closed(self, bw, consumed):
|
|
pass
|
|
|
|
def test_create(self):
|
|
bw, rb, sharefname = self.make_bucket("test_create", 500)
|
|
bp = WriteBucketProxy(rb,
|
|
data_size=300,
|
|
segment_size=10,
|
|
num_segments=5,
|
|
num_share_hashes=3,
|
|
uri_extension_size=500, nodeid=None)
|
|
self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
|
|
|
|
def test_readwrite(self):
|
|
# Let's pretend each share has 100 bytes of data, and that there are
|
|
# 4 segments (25 bytes each), and 8 shares total. So the three
|
|
# per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
|
|
# block_hashes) will have 4 leaves and 7 nodes each. The per-share
|
|
# merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
|
|
# nodes. Furthermore, let's assume the uri_extension is 500 bytes
|
|
# long. That should make the whole share:
|
|
#
|
|
# 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
|
|
|
|
plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
|
|
for i in range(7)]
|
|
crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
|
|
for i in range(7)]
|
|
block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
|
|
for i in range(7)]
|
|
share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
|
|
for i in (1,9,13)]
|
|
uri_extension = "s" + "E"*498 + "e"
|
|
|
|
bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
|
|
bp = WriteBucketProxy(rb,
|
|
data_size=95,
|
|
segment_size=25,
|
|
num_segments=4,
|
|
num_share_hashes=3,
|
|
uri_extension_size=len(uri_extension),
|
|
nodeid=None)
|
|
|
|
d = bp.start()
|
|
d.addCallback(lambda res: bp.put_block(0, "a"*25))
|
|
d.addCallback(lambda res: bp.put_block(1, "b"*25))
|
|
d.addCallback(lambda res: bp.put_block(2, "c"*25))
|
|
d.addCallback(lambda res: bp.put_block(3, "d"*20))
|
|
d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
|
|
d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
|
|
d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
|
|
d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
|
|
d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
|
|
d.addCallback(lambda res: bp.close())
|
|
|
|
# now read everything back
|
|
def _start_reading(res):
|
|
br = BucketReader(sharefname)
|
|
rb = RemoteBucket()
|
|
rb.target = br
|
|
rbp = ReadBucketProxy(rb)
|
|
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
|
|
|
|
d1 = rbp.startIfNecessary()
|
|
d1.addCallback(lambda res: rbp.get_block(0))
|
|
d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
|
|
d1.addCallback(lambda res: rbp.get_block(1))
|
|
d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
|
|
d1.addCallback(lambda res: rbp.get_block(2))
|
|
d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
|
|
d1.addCallback(lambda res: rbp.get_block(3))
|
|
d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
|
|
|
|
d1.addCallback(lambda res: rbp.get_plaintext_hashes())
|
|
d1.addCallback(lambda res:
|
|
self.failUnlessEqual(res, plaintext_hashes))
|
|
d1.addCallback(lambda res: rbp.get_crypttext_hashes())
|
|
d1.addCallback(lambda res:
|
|
self.failUnlessEqual(res, crypttext_hashes))
|
|
d1.addCallback(lambda res: rbp.get_block_hashes())
|
|
d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
|
|
d1.addCallback(lambda res: rbp.get_share_hashes())
|
|
d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
|
|
d1.addCallback(lambda res: rbp.get_uri_extension())
|
|
d1.addCallback(lambda res:
|
|
self.failUnlessEqual(res, uri_extension))
|
|
|
|
return d1
|
|
|
|
d.addCallback(_start_reading)
|
|
|
|
return d
|
|
|
|
|
|
|
|
class Server(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.sparent = LoggingServiceParent()
|
|
self._lease_secret = itertools.count()
|
|
def tearDown(self):
|
|
return self.sparent.stopService()
|
|
|
|
def workdir(self, name):
|
|
basedir = os.path.join("storage", "Server", name)
|
|
return basedir
|
|
|
|
def create(self, name, sizelimit=None):
|
|
workdir = self.workdir(name)
|
|
ss = StorageServer(workdir, sizelimit)
|
|
ss.setServiceParent(self.sparent)
|
|
return ss
|
|
|
|
def test_create(self):
|
|
ss = self.create("test_create")
|
|
|
|
def allocate(self, ss, storage_index, sharenums, size):
|
|
renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
|
|
cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
|
|
return ss.remote_allocate_buckets(storage_index,
|
|
renew_secret, cancel_secret,
|
|
sharenums, size, FakeCanary())
|
|
|
|
def test_dont_overfill_dirs(self):
|
|
"""
|
|
This test asserts that if you add a second share whose storage index
|
|
share lots of leading bits with an extant share (but isn't the exact
|
|
same storage index), this won't add an entry to the share directory.
|
|
"""
|
|
ss = self.create("test_dont_overfill_dirs")
|
|
already, writers = self.allocate(ss, "storageindex", [0], 10)
|
|
for i, wb in writers.items():
|
|
wb.remote_write(0, "%10d" % i)
|
|
wb.remote_close()
|
|
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
|
|
"shares")
|
|
children_of_storedir = set(os.listdir(storedir))
|
|
|
|
# Now store another one under another storageindex that has leading
|
|
# chars the same as the first storageindex.
|
|
already, writers = self.allocate(ss, "storageindey", [0], 10)
|
|
for i, wb in writers.items():
|
|
wb.remote_write(0, "%10d" % i)
|
|
wb.remote_close()
|
|
storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
|
|
"shares")
|
|
new_children_of_storedir = set(os.listdir(storedir))
|
|
self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
|
|
|
|
def test_remove_incoming(self):
|
|
ss = self.create("test_remove_incoming")
|
|
already, writers = self.allocate(ss, "vid", range(3), 10)
|
|
for i,wb in writers.items():
|
|
wb.remote_write(0, "%10d" % i)
|
|
wb.remote_close()
|
|
incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
|
|
self.failIf(os.path.exists(incomingdir))
|
|
|
|
def test_allocate(self):
|
|
ss = self.create("test_allocate")
|
|
|
|
self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
|
|
|
|
canary = FakeCanary()
|
|
already,writers = self.allocate(ss, "vid", [0,1,2], 75)
|
|
self.failUnlessEqual(already, set())
|
|
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
|
|
|
|
# while the buckets are open, they should not count as readable
|
|
self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
|
|
|
|
for i,wb in writers.items():
|
|
wb.remote_write(0, "%25d" % i)
|
|
wb.remote_close()
|
|
|
|
# now they should be readable
|
|
b = ss.remote_get_buckets("vid")
|
|
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
|
|
self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
|
|
|
|
# now if we about writing again, the server should offer those three
|
|
# buckets as already present. It should offer them even if we don't
|
|
# ask about those specific ones.
|
|
already,writers = self.allocate(ss, "vid", [2,3,4], 75)
|
|
self.failUnlessEqual(already, set([0,1,2]))
|
|
self.failUnlessEqual(set(writers.keys()), set([3,4]))
|
|
|
|
# while those two buckets are open for writing, the server should
|
|
# tell new uploaders that they already exist (so that we don't try to
|
|
# upload into them a second time)
|
|
|
|
already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
|
|
self.failUnlessEqual(already, set([0,1,2,3,4]))
|
|
self.failUnlessEqual(set(writers.keys()), set([5]))
|
|
|
|
def test_sizelimits(self):
|
|
ss = self.create("test_sizelimits", 5000)
|
|
canary = FakeCanary()
|
|
# a newly created and filled share incurs this much overhead, beyond
|
|
# the size we request.
|
|
OVERHEAD = 3*4
|
|
LEASE_SIZE = 4+32+32+4
|
|
|
|
already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
|
|
self.failUnlessEqual(len(writers), 3)
|
|
# now the StorageServer should have 3000 bytes provisionally
|
|
# allocated, allowing only 2000 more to be claimed
|
|
self.failUnlessEqual(len(ss._active_writers), 3)
|
|
|
|
# allocating 1001-byte shares only leaves room for one
|
|
already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
|
|
self.failUnlessEqual(len(writers2), 1)
|
|
self.failUnlessEqual(len(ss._active_writers), 4)
|
|
|
|
# we abandon the first set, so their provisional allocation should be
|
|
# returned
|
|
del already
|
|
del writers
|
|
self.failUnlessEqual(len(ss._active_writers), 1)
|
|
# now we have a provisional allocation of 1001 bytes
|
|
|
|
# and we close the second set, so their provisional allocation should
|
|
# become real, long-term allocation, and grows to include the
|
|
# overhead.
|
|
for bw in writers2.values():
|
|
bw.remote_write(0, "a"*25)
|
|
bw.remote_close()
|
|
del already2
|
|
del writers2
|
|
del bw
|
|
self.failUnlessEqual(len(ss._active_writers), 0)
|
|
|
|
allocated = 1001 + OVERHEAD + LEASE_SIZE
|
|
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
|
|
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
|
|
already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
|
|
self.failUnlessEqual(len(writers3), 39)
|
|
self.failUnlessEqual(len(ss._active_writers), 39)
|
|
|
|
del already3
|
|
del writers3
|
|
self.failUnlessEqual(len(ss._active_writers), 0)
|
|
ss.disownServiceParent()
|
|
del ss
|
|
|
|
# creating a new StorageServer in the same directory should see the
|
|
# same usage.
|
|
|
|
# metadata that goes into the share file is counted upon share close,
|
|
# as well as at startup. metadata that goes into other files will not
|
|
# be counted until the next startup, so if we were creating any
|
|
# extra-file metadata, the allocation would be more than 'allocated'
|
|
# and this test would need to be changed.
|
|
ss = self.create("test_sizelimits", 5000)
|
|
already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
|
|
self.failUnlessEqual(len(writers4), 39)
|
|
self.failUnlessEqual(len(ss._active_writers), 39)
|
|
|
|
def test_seek(self):
|
|
basedir = self.workdir("test_seek_behavior")
|
|
fileutil.make_dirs(basedir)
|
|
filename = os.path.join(basedir, "testfile")
|
|
f = open(filename, "wb")
|
|
f.write("start")
|
|
f.close()
|
|
# mode="w" allows seeking-to-create-holes, but truncates pre-existing
|
|
# files. mode="a" preserves previous contents but does not allow
|
|
# seeking-to-create-holes. mode="r+" allows both.
|
|
f = open(filename, "rb+")
|
|
f.seek(100)
|
|
f.write("100")
|
|
f.close()
|
|
filelen = os.stat(filename)[stat.ST_SIZE]
|
|
self.failUnlessEqual(filelen, 100+3)
|
|
f2 = open(filename, "rb")
|
|
self.failUnlessEqual(f2.read(5), "start")
|
|
|
|
|
|
def test_leases(self):
|
|
ss = self.create("test_leases")
|
|
canary = FakeCanary()
|
|
sharenums = range(5)
|
|
size = 100
|
|
|
|
rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
|
|
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
|
|
already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
|
|
sharenums, size, canary)
|
|
self.failUnlessEqual(len(already), 0)
|
|
self.failUnlessEqual(len(writers), 5)
|
|
for wb in writers.values():
|
|
wb.remote_close()
|
|
|
|
leases = list(ss.get_leases("si0"))
|
|
self.failUnlessEqual(len(leases), 1)
|
|
self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
|
|
|
|
rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
|
|
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
|
|
already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
|
|
sharenums, size, canary)
|
|
for wb in writers.values():
|
|
wb.remote_close()
|
|
|
|
# take out a second lease on si1
|
|
rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
|
|
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
|
|
already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
|
|
sharenums, size, canary)
|
|
self.failUnlessEqual(len(already), 5)
|
|
self.failUnlessEqual(len(writers), 0)
|
|
|
|
leases = list(ss.get_leases("si1"))
|
|
self.failUnlessEqual(len(leases), 2)
|
|
self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
|
|
|
|
# check that si0 is readable
|
|
readers = ss.remote_get_buckets("si0")
|
|
self.failUnlessEqual(len(readers), 5)
|
|
|
|
# renew the first lease. Only the proper renew_secret should work
|
|
ss.remote_renew_lease("si0", rs0)
|
|
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
|
|
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
|
|
|
|
# check that si0 is still readable
|
|
readers = ss.remote_get_buckets("si0")
|
|
self.failUnlessEqual(len(readers), 5)
|
|
|
|
# now cancel it
|
|
self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
|
|
self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
|
|
ss.remote_cancel_lease("si0", cs0)
|
|
|
|
# si0 should now be gone
|
|
readers = ss.remote_get_buckets("si0")
|
|
self.failUnlessEqual(len(readers), 0)
|
|
# and the renew should no longer work
|
|
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
|
|
|
|
|
|
# cancel the first lease on si1, leaving the second in place
|
|
ss.remote_cancel_lease("si1", cs1)
|
|
readers = ss.remote_get_buckets("si1")
|
|
self.failUnlessEqual(len(readers), 5)
|
|
# the corresponding renew should no longer work
|
|
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
|
|
|
|
leases = list(ss.get_leases("si1"))
|
|
self.failUnlessEqual(len(leases), 1)
|
|
self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
|
|
|
|
ss.remote_renew_lease("si1", rs2)
|
|
# cancelling the second should make it go away
|
|
ss.remote_cancel_lease("si1", cs2)
|
|
readers = ss.remote_get_buckets("si1")
|
|
self.failUnlessEqual(len(readers), 0)
|
|
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
|
|
self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
|
|
|
|
leases = list(ss.get_leases("si1"))
|
|
self.failUnlessEqual(len(leases), 0)
|
|
|
|
|
|
# test overlapping uploads
|
|
rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
|
|
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
|
|
rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
|
|
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
|
|
already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
|
|
sharenums, size, canary)
|
|
self.failUnlessEqual(len(already), 0)
|
|
self.failUnlessEqual(len(writers), 5)
|
|
already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
|
|
sharenums, size, canary)
|
|
self.failUnlessEqual(len(already2), 5)
|
|
self.failUnlessEqual(len(writers2), 0)
|
|
for wb in writers.values():
|
|
wb.remote_close()
|
|
|
|
leases = list(ss.get_leases("si3"))
|
|
self.failUnlessEqual(len(leases), 2)
|
|
|
|
|
|
|
|
class MutableServer(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.sparent = LoggingServiceParent()
|
|
self._lease_secret = itertools.count()
|
|
def tearDown(self):
|
|
return self.sparent.stopService()
|
|
|
|
def workdir(self, name):
|
|
basedir = os.path.join("storage", "MutableServer", name)
|
|
return basedir
|
|
|
|
def create(self, name, sizelimit=None):
|
|
workdir = self.workdir(name)
|
|
ss = StorageServer(workdir, sizelimit)
|
|
ss.setServiceParent(self.sparent)
|
|
ss.setNodeID("\x00" * 32)
|
|
return ss
|
|
|
|
def test_create(self):
|
|
ss = self.create("test_create")
|
|
|
|
def write_enabler(self, we_tag):
|
|
return hashutil.tagged_hash("we_blah", we_tag)
|
|
|
|
def renew_secret(self, tag):
|
|
return hashutil.tagged_hash("renew_blah", str(tag))
|
|
|
|
def cancel_secret(self, tag):
|
|
return hashutil.tagged_hash("cancel_blah", str(tag))
|
|
|
|
def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
|
|
write_enabler = self.write_enabler(we_tag)
|
|
renew_secret = self.renew_secret(lease_tag)
|
|
cancel_secret = self.cancel_secret(lease_tag)
|
|
rstaraw = ss.remote_slot_testv_and_readv_and_writev
|
|
testandwritev = dict( [ (shnum, ([], [], None) )
|
|
for shnum in sharenums ] )
|
|
readv = []
|
|
rc = rstaraw(storage_index,
|
|
(write_enabler, renew_secret, cancel_secret),
|
|
testandwritev,
|
|
readv)
|
|
(did_write, readv_data) = rc
|
|
self.failUnless(did_write)
|
|
self.failUnless(isinstance(readv_data, dict))
|
|
self.failUnlessEqual(len(readv_data), 0)
|
|
|
|
def test_allocate(self):
|
|
ss = self.create("test_allocate")
|
|
self.allocate(ss, "si1", "we1", self._lease_secret.next(),
|
|
set([0,1,2]), 100)
|
|
|
|
read = ss.remote_slot_readv
|
|
self.failUnlessEqual(read("si1", [0], [(0, 10)]),
|
|
{0: [""]})
|
|
self.failUnlessEqual(read("si1", [], [(0, 10)]),
|
|
{0: [""], 1: [""], 2: [""]})
|
|
self.failUnlessEqual(read("si1", [0], [(100, 10)]),
|
|
{0: [""]})
|
|
|
|
# try writing to one
|
|
secrets = ( self.write_enabler("we1"),
|
|
self.renew_secret("we1"),
|
|
self.cancel_secret("we1") )
|
|
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
|
|
write = ss.remote_slot_testv_and_readv_and_writev
|
|
answer = write("si1", secrets,
|
|
{0: ([], [(0,data)], None)},
|
|
[])
|
|
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
|
|
|
|
self.failUnlessEqual(read("si1", [0], [(0,20)]),
|
|
{0: ["00000000001111111111"]})
|
|
self.failUnlessEqual(read("si1", [0], [(95,10)]),
|
|
{0: ["99999"]})
|
|
#self.failUnlessEqual(s0.remote_get_length(), 100)
|
|
|
|
bad_secrets = ("bad write enabler", secrets[1], secrets[2])
|
|
f = self.failUnlessRaises(BadWriteEnablerError,
|
|
write, "si1", bad_secrets,
|
|
{}, [])
|
|
self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
|
|
|
|
# this testv should fail
|
|
answer = write("si1", secrets,
|
|
{0: ([(0, 12, "eq", "444444444444"),
|
|
(20, 5, "eq", "22222"),
|
|
],
|
|
[(0, "x"*100)],
|
|
None),
|
|
},
|
|
[(0,12), (20,5)],
|
|
)
|
|
self.failUnlessEqual(answer, (False,
|
|
{0: ["000000000011", "22222"],
|
|
1: ["", ""],
|
|
2: ["", ""],
|
|
}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
|
|
# as should this one
|
|
answer = write("si1", secrets,
|
|
{0: ([(10, 5, "lt", "11111"),
|
|
],
|
|
[(0, "x"*100)],
|
|
None),
|
|
},
|
|
[(10,5)],
|
|
)
|
|
self.failUnlessEqual(answer, (False,
|
|
{0: ["11111"],
|
|
1: [""],
|
|
2: [""]},
|
|
))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
|
|
|
|
def test_operators(self):
|
|
# test operators, the data we're comparing is '11111' in all cases.
|
|
# test both fail+pass, reset data after each one.
|
|
ss = self.create("test_operators")
|
|
|
|
secrets = ( self.write_enabler("we1"),
|
|
self.renew_secret("we1"),
|
|
self.cancel_secret("we1") )
|
|
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
|
|
write = ss.remote_slot_testv_and_readv_and_writev
|
|
read = ss.remote_slot_readv
|
|
|
|
def reset():
|
|
write("si1", secrets,
|
|
{0: ([], [(0,data)], None)},
|
|
[])
|
|
|
|
reset()
|
|
|
|
# lt
|
|
answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
|
|
],
|
|
[(0, "x"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
|
|
],
|
|
[(0, "x"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
|
|
],
|
|
[(0, "y"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
|
|
reset()
|
|
|
|
# le
|
|
answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
|
|
],
|
|
[(0, "x"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
|
|
],
|
|
[(0, "y"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
|
|
],
|
|
[(0, "y"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
|
|
reset()
|
|
|
|
# eq
|
|
answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
|
|
],
|
|
[(0, "x"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
|
|
],
|
|
[(0, "y"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
|
|
reset()
|
|
|
|
# ne
|
|
answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
|
|
],
|
|
[(0, "x"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
|
|
],
|
|
[(0, "y"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
|
|
reset()
|
|
|
|
# ge
|
|
answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
|
|
],
|
|
[(0, "y"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
|
|
],
|
|
[(0, "y"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
|
|
],
|
|
[(0, "y"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
reset()
|
|
|
|
# gt
|
|
answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
|
|
],
|
|
[(0, "y"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (True, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
|
|
],
|
|
[(0, "x"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
reset()
|
|
|
|
answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
|
|
],
|
|
[(0, "x"*100)],
|
|
None,
|
|
)}, [(10,5)])
|
|
self.failUnlessEqual(answer, (False, {0: ["11111"]}))
|
|
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
|
|
reset()
|
|
|
|
def test_readv(self):
|
|
ss = self.create("test_readv")
|
|
secrets = ( self.write_enabler("we1"),
|
|
self.renew_secret("we1"),
|
|
self.cancel_secret("we1") )
|
|
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
|
|
write = ss.remote_slot_testv_and_readv_and_writev
|
|
read = ss.remote_slot_readv
|
|
data = [("%d" % i) * 100 for i in range(3)]
|
|
rc = write("si1", secrets,
|
|
{0: ([], [(0,data[0])], None),
|
|
1: ([], [(0,data[1])], None),
|
|
2: ([], [(0,data[2])], None),
|
|
}, [])
|
|
self.failUnlessEqual(rc, (True, {}))
|
|
|
|
answer = read("si1", [], [(0, 10)])
|
|
self.failUnlessEqual(answer, {0: ["0"*10],
|
|
1: ["1"*10],
|
|
2: ["2"*10]})
|
|
|
|
def compare_leases_without_timestamps(self, a, b):
|
|
self.failUnlessEqual(len(a), len(b))
|
|
for i in range(len(a)):
|
|
(num_a, (ownerid_a, expiration_time_a,
|
|
renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
|
|
(num_b, (ownerid_b, expiration_time_b,
|
|
renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
|
|
self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
|
|
cancel_secret_a, nodeid_a),
|
|
(num_b, ownerid_b, renew_secret_b,
|
|
cancel_secret_b, nodeid_b) )
|
|
|
|
def test_leases(self):
|
|
ss = self.create("test_leases")
|
|
def secrets(n):
|
|
return ( self.write_enabler("we1"),
|
|
self.renew_secret("we1-%d" % n),
|
|
self.cancel_secret("we1-%d" % n) )
|
|
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
|
|
write = ss.remote_slot_testv_and_readv_and_writev
|
|
read = ss.remote_slot_readv
|
|
rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
|
|
self.failUnlessEqual(rc, (True, {}))
|
|
|
|
# create a random non-numeric file in the bucket directory, to
|
|
# exercise the code that's supposed to ignore those.
|
|
bucket_dir = os.path.join(self.workdir("test_leases"),
|
|
"shares", storage_index_to_dir("si1"))
|
|
f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
|
|
f.write("you ought to be ignoring me\n")
|
|
f.close()
|
|
|
|
# re-allocate the slots and use the same secrets, that should update
|
|
# the lease
|
|
write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
|
|
|
|
# renew it directly
|
|
ss.remote_renew_lease("si1", secrets(0)[1])
|
|
|
|
# now allocate them with a bunch of different secrets, to trigger the
|
|
# extended lease code
|
|
write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
|
|
write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
|
|
write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
|
|
write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
|
|
write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
|
|
|
|
# cancel one of them
|
|
ss.remote_cancel_lease("si1", secrets(5)[2])
|
|
|
|
s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
|
|
all_leases = s0.debug_get_leases()
|
|
self.failUnlessEqual(len(all_leases), 5)
|
|
|
|
# and write enough data to expand the container, forcing the server
|
|
# to move the leases
|
|
write("si1", secrets(0),
|
|
{0: ([], [(0,data)], 200), },
|
|
[])
|
|
|
|
# read back the leases, make sure they're still intact.
|
|
self.compare_leases_without_timestamps(all_leases,
|
|
s0.debug_get_leases())
|
|
|
|
ss.remote_renew_lease("si1", secrets(0)[1])
|
|
ss.remote_renew_lease("si1", secrets(1)[1])
|
|
ss.remote_renew_lease("si1", secrets(2)[1])
|
|
ss.remote_renew_lease("si1", secrets(3)[1])
|
|
ss.remote_renew_lease("si1", secrets(4)[1])
|
|
self.compare_leases_without_timestamps(all_leases,
|
|
s0.debug_get_leases())
|
|
# get a new copy of the leases, with the current timestamps. Reading
|
|
# data and failing to renew/cancel leases should leave the timestamps
|
|
# alone.
|
|
all_leases = s0.debug_get_leases()
|
|
# renewing with a bogus token should prompt an error message
|
|
|
|
# TODO: examine the exception thus raised, make sure the old nodeid
|
|
# is present, to provide for share migration
|
|
self.failUnlessRaises(IndexError,
|
|
ss.remote_renew_lease, "si1",
|
|
secrets(20)[1])
|
|
# same for cancelling
|
|
self.failUnlessRaises(IndexError,
|
|
ss.remote_cancel_lease, "si1",
|
|
secrets(20)[2])
|
|
self.failUnlessEqual(all_leases, s0.debug_get_leases())
|
|
|
|
# reading shares should not modify the timestamp
|
|
read("si1", [], [(0,200)])
|
|
self.failUnlessEqual(all_leases, s0.debug_get_leases())
|
|
|
|
write("si1", secrets(0),
|
|
{0: ([], [(200, "make me bigger")], None)}, [])
|
|
self.compare_leases_without_timestamps(all_leases,
|
|
s0.debug_get_leases())
|
|
|
|
write("si1", secrets(0),
|
|
{0: ([], [(500, "make me really bigger")], None)}, [])
|
|
self.compare_leases_without_timestamps(all_leases,
|
|
s0.debug_get_leases())
|
|
|
|
# now cancel them all
|
|
ss.remote_cancel_lease("si1", secrets(0)[2])
|
|
ss.remote_cancel_lease("si1", secrets(1)[2])
|
|
ss.remote_cancel_lease("si1", secrets(2)[2])
|
|
ss.remote_cancel_lease("si1", secrets(3)[2])
|
|
|
|
# the slot should still be there
|
|
remaining_shares = read("si1", [], [(0,10)])
|
|
self.failUnlessEqual(len(remaining_shares), 1)
|
|
self.failUnlessEqual(len(s0.debug_get_leases()), 1)
|
|
|
|
ss.remote_cancel_lease("si1", secrets(4)[2])
|
|
# now the slot should be gone
|
|
no_shares = read("si1", [], [(0,10)])
|
|
self.failUnlessEqual(no_shares, {})
|
|
|