test_storage.py: improve test coverage

This commit is contained in:
Brian Warner 2008-06-17 17:01:42 -07:00
parent 8e52b3d97d
commit 6b55b8b022

View File

@ -8,14 +8,31 @@ from allmydata import interfaces
from allmydata.util import fileutil, hashutil from allmydata.util import fileutil, hashutil
from allmydata.storage import BucketWriter, BucketReader, \ from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \ WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
storage_index_to_dir storage_index_to_dir, DataTooLargeError
from allmydata.interfaces import BadWriteEnablerError from allmydata.interfaces import BadWriteEnablerError
from allmydata.test.common import LoggingServiceParent from allmydata.test.common import LoggingServiceParent
class Marker:
pass
class FakeCanary: class FakeCanary:
def notifyOnDisconnect(self, *args, **kwargs): def __init__(self, ignore_disconnectors=False):
pass self.ignore = ignore_disconnectors
self.disconnectors = {}
def notifyOnDisconnect(self, f, *args, **kwargs):
if self.ignore:
return
m = Marker()
self.disconnectors[m] = (f, args, kwargs)
return m
def dontNotifyOnDisconnect(self, marker): def dontNotifyOnDisconnect(self, marker):
if self.ignore:
return
del self.disconnectors[marker]
class FakeStatsProvider:
def count(self, name, delta=1):
pass
def register_producer(self, producer):
pass pass
class Bucket(unittest.TestCase): class Bucket(unittest.TestCase):
@ -158,10 +175,12 @@ class BucketProxy(unittest.TestCase):
br = BucketReader(self, sharefname) br = BucketReader(self, sharefname)
rb = RemoteBucket() rb = RemoteBucket()
rb.target = br rb.target = br
rbp = ReadBucketProxy(rb) rbp = ReadBucketProxy(rb, peerid="abc")
self.failUnless("to peer" in repr(rbp))
self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp)) self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
d1 = rbp.startIfNecessary() d1 = rbp.startIfNecessary()
d1.addCallback(lambda res: rbp.startIfNecessary()) # idempotent
d1.addCallback(lambda res: rbp.get_block(0)) d1.addCallback(lambda res: rbp.get_block(0))
d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25)) d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
d1.addCallback(lambda res: rbp.get_block(1)) d1.addCallback(lambda res: rbp.get_block(1))
@ -207,19 +226,22 @@ class Server(unittest.TestCase):
def create(self, name, sizelimit=None): def create(self, name, sizelimit=None):
workdir = self.workdir(name) workdir = self.workdir(name)
ss = StorageServer(workdir, sizelimit) ss = StorageServer(workdir, sizelimit,
stats_provider=FakeStatsProvider())
ss.setServiceParent(self.sparent) ss.setServiceParent(self.sparent)
return ss return ss
def test_create(self): def test_create(self):
ss = self.create("test_create") ss = self.create("test_create")
def allocate(self, ss, storage_index, sharenums, size): def allocate(self, ss, storage_index, sharenums, size, canary=None):
renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()) renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()) cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
if not canary:
canary = FakeCanary()
return ss.remote_allocate_buckets(storage_index, return ss.remote_allocate_buckets(storage_index,
renew_secret, cancel_secret, renew_secret, cancel_secret,
sharenums, size, FakeCanary()) sharenums, size, canary)
def test_dont_overfill_dirs(self): def test_dont_overfill_dirs(self):
""" """
@ -259,56 +281,86 @@ class Server(unittest.TestCase):
def test_allocate(self): def test_allocate(self):
ss = self.create("test_allocate") ss = self.create("test_allocate")
self.failUnlessEqual(ss.remote_get_buckets("vid"), {}) self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
canary = FakeCanary() canary = FakeCanary()
already,writers = self.allocate(ss, "vid", [0,1,2], 75) already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
self.failUnlessEqual(already, set()) self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2])) self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
# while the buckets are open, they should not count as readable # while the buckets are open, they should not count as readable
self.failUnlessEqual(ss.remote_get_buckets("vid"), {}) self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
# close the buckets # close the buckets
for i,wb in writers.items(): for i,wb in writers.items():
wb.remote_write(0, "%25d" % i) wb.remote_write(0, "%25d" % i)
wb.remote_close() wb.remote_close()
# aborting a bucket that was already closed is a no-op
wb.remote_abort()
# now they should be readable # now they should be readable
b = ss.remote_get_buckets("vid") b = ss.remote_get_buckets("allocate")
self.failUnlessEqual(set(b.keys()), set([0,1,2])) self.failUnlessEqual(set(b.keys()), set([0,1,2]))
self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0) self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
# now if we about writing again, the server should offer those three # now if we ask about writing again, the server should offer those
# buckets as already present. It should offer them even if we don't # three buckets as already present. It should offer them even if we
# ask about those specific ones. # don't ask about those specific ones.
already,writers = self.allocate(ss, "vid", [2,3,4], 75) already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
self.failUnlessEqual(already, set([0,1,2])) self.failUnlessEqual(already, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([3,4])) self.failUnlessEqual(set(writers.keys()), set([3,4]))
# while those two buckets are open for writing, the server should # while those two buckets are open for writing, the server should
# refuse to offer them to uploaders # refuse to offer them to uploaders
already,writers = self.allocate(ss, "vid", [2,3,4,5], 75) already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
self.failUnlessEqual(already, set([0,1,2])) self.failUnlessEqual(already2, set([0,1,2]))
self.failUnlessEqual(set(writers.keys()), set([5])) self.failUnlessEqual(set(writers2.keys()), set([5]))
# aborting the writes should remove the tempfiles
for i,wb in writers2.items():
wb.remote_abort()
already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
self.failUnlessEqual(already2, set([0,1,2]))
self.failUnlessEqual(set(writers2.keys()), set([5]))
for i,wb in writers2.items():
wb.remote_abort()
for i,wb in writers.items():
wb.remote_abort()
def test_disconnect(self):
# simulate a disconnection
ss = self.create("test_disconnect")
canary = FakeCanary()
already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
for (f,args,kwargs) in canary.disconnectors.values():
f(*args, **kwargs)
del already
del writers
# that ought to delete the incoming shares
already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
def test_sizelimits(self): def test_sizelimits(self):
ss = self.create("test_sizelimits", 5000) ss = self.create("test_sizelimits", 5000)
canary = FakeCanary()
# a newly created and filled share incurs this much overhead, beyond # a newly created and filled share incurs this much overhead, beyond
# the size we request. # the size we request.
OVERHEAD = 3*4 OVERHEAD = 3*4
LEASE_SIZE = 4+32+32+4 LEASE_SIZE = 4+32+32+4
canary = FakeCanary(True)
already,writers = self.allocate(ss, "vid1", [0,1,2], 1000) already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
self.failUnlessEqual(len(writers), 3) self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally # now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed # allocated, allowing only 2000 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3) self.failUnlessEqual(len(ss._active_writers), 3)
# allocating 1001-byte shares only leaves room for one # allocating 1001-byte shares only leaves room for one
already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001) already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
self.failUnlessEqual(len(writers2), 1) self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4) self.failUnlessEqual(len(ss._active_writers), 4)
@ -333,7 +385,7 @@ class Server(unittest.TestCase):
allocated = 1001 + OVERHEAD + LEASE_SIZE allocated = 1001 + OVERHEAD + LEASE_SIZE
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares # 5000-1085=3915 free, therefore we can fit 39 100byte shares
already3,writers3 = self.allocate(ss,"vid3", range(100), 100) already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
self.failUnlessEqual(len(writers3), 39) self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._active_writers), 39) self.failUnlessEqual(len(ss._active_writers), 39)
@ -352,7 +404,7 @@ class Server(unittest.TestCase):
# extra-file metadata, the allocation would be more than 'allocated' # extra-file metadata, the allocation would be more than 'allocated'
# and this test would need to be changed. # and this test would need to be changed.
ss = self.create("test_sizelimits", 5000) ss = self.create("test_sizelimits", 5000)
already4,writers4 = self.allocate(ss, "vid4", range(100), 100) already4,writers4 = self.allocate(ss, "vid4", range(100), 100, canary)
self.failUnlessEqual(len(writers4), 39) self.failUnlessEqual(len(writers4), 39)
self.failUnlessEqual(len(ss._active_writers), 39) self.failUnlessEqual(len(ss._active_writers), 39)
@ -569,10 +621,38 @@ class MutableServer(unittest.TestCase):
self.failUnless(isinstance(readv_data, dict)) self.failUnless(isinstance(readv_data, dict))
self.failUnlessEqual(len(readv_data), 0) self.failUnlessEqual(len(readv_data), 0)
def test_container_size(self):
ss = self.create("test_container_size")
self.allocate(ss, "si1", "we1", self._lease_secret.next(),
set([0,1,2]), 100)
rstaraw = ss.remote_slot_testv_and_readv_and_writev
secrets = ( self.write_enabler("we1"),
self.renew_secret("we1"),
self.cancel_secret("we1") )
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
answer = rstaraw("si1", secrets,
{0: ([], [(0,data)], len(data)+12)},
[])
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
# trying to make the container too large will raise an exception
TOOBIG = MutableShareFile.MAX_SIZE + 10
self.failUnlessRaises(DataTooLargeError,
rstaraw, "si1", secrets,
{0: ([], [(0,data)], TOOBIG)},
[])
# it should be possible to make the container smaller, although at
# the moment this doesn't actually affect the share
answer = rstaraw("si1", secrets,
{0: ([], [(0,data)], len(data)+8)},
[])
self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
def test_allocate(self): def test_allocate(self):
ss = self.create("test_allocate") ss = self.create("test_allocate")
self.allocate(ss, "si1", "we1", self._lease_secret.next(), self.allocate(ss, "si1", "we1", self._lease_secret.next(),
set([0,1,2]), 100) set([0,1,2]), 100)
read = ss.remote_slot_readv read = ss.remote_slot_readv
self.failUnlessEqual(read("si1", [0], [(0, 10)]), self.failUnlessEqual(read("si1", [0], [(0, 10)]),
@ -843,7 +923,7 @@ class MutableServer(unittest.TestCase):
cancel_secret_b, nodeid_b) ) cancel_secret_b, nodeid_b) )
def test_leases(self): def test_leases(self):
ss = self.create("test_leases") ss = self.create("test_leases", sizelimit=1000*1000)
def secrets(n): def secrets(n):
return ( self.write_enabler("we1"), return ( self.write_enabler("we1"),
self.renew_secret("we1-%d" % n), self.renew_secret("we1-%d" % n),
@ -943,11 +1023,25 @@ class MutableServer(unittest.TestCase):
self.failUnlessEqual(len(remaining_shares), 1) self.failUnlessEqual(len(remaining_shares), 1)
self.failUnlessEqual(len(s0.debug_get_leases()), 1) self.failUnlessEqual(len(s0.debug_get_leases()), 1)
# cancelling a non-existent lease should raise an IndexError
self.failUnlessRaises(IndexError,
ss.remote_cancel_lease, "si1", "nonsecret")
# and the slot should still be there
remaining_shares = read("si1", [], [(0,10)])
self.failUnlessEqual(len(remaining_shares), 1)
self.failUnlessEqual(len(s0.debug_get_leases()), 1)
ss.remote_cancel_lease("si1", secrets(4)[2]) ss.remote_cancel_lease("si1", secrets(4)[2])
# now the slot should be gone # now the slot should be gone
no_shares = read("si1", [], [(0,10)]) no_shares = read("si1", [], [(0,10)])
self.failUnlessEqual(no_shares, {}) self.failUnlessEqual(no_shares, {})
# cancelling a lease on a non-existent share should raise an IndexError
self.failUnlessRaises(IndexError,
ss.remote_cancel_lease, "si2", "nonsecret")
class Stats(unittest.TestCase): class Stats(unittest.TestCase):
def setUp(self): def setUp(self):