storage: rewrite slot API, now use testv_and_readv_and_writev or readv

This commit is contained in:
Brian Warner
2007-11-05 20:17:14 -07:00
parent 95a3da92fe
commit e08b091d9f
3 changed files with 449 additions and 352 deletions

View File

@ -84,49 +84,19 @@ class RIBucketReader(RemoteInterface):
TestVector = ListOf(TupleOf(int, int, str, str)) TestVector = ListOf(TupleOf(int, int, str, str))
# elements are (offset, length, operator, specimen) # elements are (offset, length, operator, specimen)
# operator is one of "lt, le, eq, ne, ge, gt, nop" # operator is one of "lt, le, eq, ne, ge, gt"
# nop always passes and is used to fetch data while writing. # nop always passes and is used to fetch data while writing.
# you should use length==len(specimen) for everything except nop # you should use length==len(specimen) for everything except nop
DataVector = ListOf(TupleOf(int, ShareData)) DataVector = ListOf(TupleOf(int, ShareData))
# (offset, data). This limits us to 30 writes of 1MiB each per call # (offset, data). This limits us to 30 writes of 1MiB each per call
TestAndWriteVectorsForShares = DictOf(int,
TupleOf(TestVector,
DataVector,
ChoiceOf(None, int))) # new_length
ReadVector = ListOf(TupleOf(int, int)) ReadVector = ListOf(TupleOf(int, int))
TestResults = ListOf(str) ReadData = ListOf(ShareData)
# returns data[offset:offset+length] for each element of TestVector # returns data[offset:offset+length] for each element of TestVector
class RIMutableSlot(RemoteInterface):
def testv_and_writev(write_enabler=Hash,
testv=TestVector,
datav=DataVector,
new_length=ChoiceOf(None, int)):
"""General-purpose test-and-set operation for mutable slots. Perform
the given comparisons. If they all pass, then apply the write vector.
If new_length is not None, use it to set the size of the container.
This can be used to pre-allocate space for a series of upcoming
writes, or truncate existing data. If the container is growing,
new_length will be applied before datav. If the container is
shrinking, it will be applied afterwards.
Return the old data that was used for the comparisons.
The boolean return value is True if the write vector was applied,
false if not.
If the write_enabler is wrong, this will raise BadWriteEnablerError.
To enable share migration, the exception will have the nodeid used
for the old write enabler embedded in it, in the following string::
The write enabler was recorded by nodeid '%s'.
"""
return TupleOf(bool, TestResults)
def read(offset=int, length=int):
return ShareData
def get_length():
return int
class RIStorageServer(RemoteInterface): class RIStorageServer(RemoteInterface):
def allocate_buckets(storage_index=StorageIndex, def allocate_buckets(storage_index=StorageIndex,
renew_secret=LeaseRenewSecret, renew_secret=LeaseRenewSecret,
@ -170,13 +140,27 @@ class RIStorageServer(RemoteInterface):
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS) return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
def allocate_mutable_slot(storage_index=StorageIndex,
write_enabler=Hash, def slot_readv(storage_index=StorageIndex,
renew_secret=LeaseRenewSecret, shares=ListOf(int), readv=ReadVector):
cancel_secret=LeaseCancelSecret, """Read a vector from the numbered shares associated with the given
sharenums=SetOf(int, maxLength=MAX_BUCKETS), storage index. An empty shares list means to return data from all
allocated_size=int): known shares. Returns a dictionary with one key per share."""
""" return DictOf(int, DataVector) # shnum -> results
def slot_testv_and_readv_and_writev(storage_index=StorageIndex,
secrets=TupleOf(Hash, Hash, Hash),
tw_vectors=TestAndWriteVectorsForShares,
r_vector=ReadVector,
):
"""General-purpose test-and-set operation for mutable slots. Perform
a bunch of comparisons against the existing shares. If they all pass,
then apply a bunch of write vectors to those shares. Then use the
read vectors to extract data from all the shares and return the data.
This method is, um, large. The goal is to allow clients to update all
the shares associated with a mutable file in a single round trip.
@param storage_index: the index of the bucket to be created or @param storage_index: the index of the bucket to be created or
increfed. increfed.
@param write_enabler: a secret that is stored along with the slot. @param write_enabler: a secret that is stored along with the slot.
@ -188,32 +172,51 @@ class RIStorageServer(RemoteInterface):
stored for later comparison by the server. Each stored for later comparison by the server. Each
server is given a different secret. server is given a different secret.
@param cancel_secret: Like renew_secret, but protects bucket decref. @param cancel_secret: Like renew_secret, but protects bucket decref.
@param sharenums: these are the share numbers (probably between 0 and
99) that the sender is proposing to store on this
server.
@param allocated_size: all shares will pre-allocate this many bytes.
Use this to a) confirm that you can claim as
much space as you want before you actually
send the data, and b) reduce the disk-IO cost
of doing incremental writes.
@return: dict mapping sharenum to slot. The return value may include The 'secrets' argument is a tuple of (write_enabler, renew_secret,
more sharenums than asked, if some shares already existed. cancel_secret). The first is required to perform any write. The
New leases are added for all latter two are used when allocating new shares. To simply acquire a
shares. new lease on existing shares, use an empty testv and an empty writev.
Each share can have a separate test vector (i.e. a list of
comparisons to perform). If all vectors for all shares pass, then all
writes for all shares are recorded. Each comparison is a 4-tuple of
(offset, length, operator, specimen), which effectively does a
read(offset, length) and then compares the result against the
specimen using the given equality/inequality operator. Reads from the
end of the container are truncated, and missing shares behave like
empty ones, so to assert that a share doesn't exist (for use when
creating a new share), use (0, 1, 'eq', '').
The write vector will be applied to the given share, expanding it if
necessary. A write vector applied to a share number that did not
exist previously will cause that share to be created.
Each write vector is accompanied by a 'new_length' argument. If
new_length is not None, use it to set the size of the container. This
can be used to pre-allocate space for a series of upcoming writes, or
truncate existing data. If the container is growing, new_length will
be applied before datav. If the container is shrinking, it will be
applied afterwards.
The read vector is used to extract data from all known shares,
*before* any writes have been applied. The same vector is used for
all shares. This captures the state that was tested by the test
vector.
This method returns two values: a boolean and a dict. The boolean is
True if the write vectors were applied, False if not. The dict is
keyed by share number, and each value contains a list of strings, one
for each element of the read vector.
If the write_enabler is wrong, this will raise BadWriteEnablerError.
To enable share migration, the exception will have the nodeid used
for the old write enabler embedded in it, in the following string::
The write enabler was recorded by nodeid '%s'.
""" """
return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS) return TupleOf(bool, DictOf(int, ReadData))
def get_mutable_slot(storage_index=StorageIndex):
"""This returns an empty dictionary if the server has no shares
of the slot mentioned."""
return DictOf(int, RIMutableSlot, maxKeys=MAX_BUCKETS)
def readv_slots(storage_index=StorageIndex, readv=ReadVector):
"""Read a vector from all shares associated with the given storage
index. Returns a dictionary with one key per share."""
return DictOf(int, DataVector) # shnum -> results
class IStorageBucketWriter(Interface): class IStorageBucketWriter(Interface):
def put_block(segmentnum=int, data=ShareData): def put_block(segmentnum=int, data=ShareData):

View File

@ -8,7 +8,7 @@ from twisted.internet import defer
from zope.interface import implements from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \ RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
BadWriteEnablerError, RIMutableSlot BadWriteEnablerError
from allmydata.util import fileutil, idlib, mathutil from allmydata.util import fileutil, idlib, mathutil
from allmydata.util.assertutil import precondition, _assert from allmydata.util.assertutil import precondition, _assert
@ -240,13 +240,7 @@ class BucketReader(Referenceable):
assert struct.calcsize("L"), 4 assert struct.calcsize("L"), 4
assert struct.calcsize("Q"), 8 assert struct.calcsize("Q"), 8
class MutableShareFile(Referenceable): class MutableShareFile:
# note: at any given time, there should only be a single instance of this
# class per filename. More than one is likely to corrupt the container,
# because of state we cache in instance variables. This requires the
# StorageServer to use a WeakValueDictionary, indexed by filename. This
# could be improved by cacheing less and doing more IO.
implements(RIMutableSlot)
DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s") DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8 EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
@ -548,12 +542,6 @@ class MutableShareFile(Referenceable):
assert magic == self.MAGIC assert magic == self.MAGIC
return (write_enabler, write_enabler_nodeid) return (write_enabler, write_enabler_nodeid)
def remote_read(self, offset, length):
f = open(self.home, 'rb')
data = self._read_share_data(f, offset, length)
f.close()
return data
def readv(self, readv): def readv(self, readv):
datav = [] datav = []
f = open(self.home, 'rb') f = open(self.home, 'rb')
@ -562,36 +550,37 @@ class MutableShareFile(Referenceable):
f.close() f.close()
return datav return datav
def remote_get_length(self): # def remote_get_length(self):
f = open(self.home, 'rb') # f = open(self.home, 'rb')
data_length = self._read_data_length(f) # data_length = self._read_data_length(f)
f.close() # f.close()
return data_length # return data_length
def remote_testv_and_writev(self, write_enabler, testv, datav, new_length): def check_write_enabler(self, write_enabler):
f = open(self.home, 'rb+') f = open(self.home, 'rb+')
(real_write_enabler, write_enabler_nodeid) = \ (real_write_enabler, write_enabler_nodeid) = \
self._read_write_enabler_and_nodeid(f) self._read_write_enabler_and_nodeid(f)
f.close()
if write_enabler != real_write_enabler: if write_enabler != real_write_enabler:
# accomodate share migration by reporting the nodeid used for the # accomodate share migration by reporting the nodeid used for the
# old write enabler. # old write enabler.
f.close()
msg = "The write enabler was recorded by nodeid '%s'." % \ msg = "The write enabler was recorded by nodeid '%s'." % \
(idlib.b2a(write_enabler_nodeid),) (idlib.b2a(write_enabler_nodeid),)
raise BadWriteEnablerError(msg) raise BadWriteEnablerError(msg)
# check testv def check_testv(self, testv):
test_results_v = [] test_good = True
test_failed = False f = open(self.home, 'rb+')
for (offset, length, operator, specimen) in testv: for (offset, length, operator, specimen) in testv:
data = self._read_share_data(f, offset, length) data = self._read_share_data(f, offset, length)
test_results_v.append(data)
if not self.compare(data, operator, specimen): if not self.compare(data, operator, specimen):
test_failed = True test_good = False
if test_failed: break
f.close() f.close()
return (False, test_results_v) return test_good
# now apply the write vector
def writev(self, datav, new_length):
f = open(self.home, 'rb+')
for (offset, data) in datav: for (offset, data) in datav:
self._write_share_data(f, offset, data) self._write_share_data(f, offset, data)
if new_length is not None: if new_length is not None:
@ -599,12 +588,36 @@ class MutableShareFile(Referenceable):
f.seek(self.DATA_LENGTH_OFFSET) f.seek(self.DATA_LENGTH_OFFSET)
f.write(struct.pack(">Q", new_length)) f.write(struct.pack(">Q", new_length))
f.close() f.close()
return (True, test_results_v)
def compare(self, a, op, b): def compare(self, a, op, b):
assert op in ("nop", "lt", "le", "eq", "ne", "ge", "gt") assert op in ("lt", "le", "eq", "ne", "ge", "gt")
if op == "nop": if op == "lt":
return True return a < b
if op == "le":
return a <= b
if op == "eq":
return a == b
if op == "ne":
return a != b
if op == "ge":
return a >= b
if op == "gt":
return a > b
# never reached
class EmptyShare:
def check_testv(self, testv):
test_good = True
for (offset, length, operator, specimen) in testv:
data = ""
if not self.compare(data, operator, specimen):
test_good = False
break
return test_good
def compare(self, a, op, b):
assert op in ("lt", "le", "eq", "ne", "ge", "gt")
if op == "lt": if op == "lt":
return a < b return a < b
if op == "le": if op == "le":
@ -842,51 +855,83 @@ class StorageServer(service.MultiService, Referenceable):
except StopIteration: except StopIteration:
return iter([]) return iter([])
def remote_allocate_mutable_slot(self, storage_index, def remote_slot_testv_and_readv_and_writev(self, storage_index,
write_enabler, secrets,
renew_secret, cancel_secret, test_and_write_vectors,
sharenums, read_vector):
allocated_size):
my_nodeid = self.my_nodeid
sharenums = set(sharenums)
shares = self.remote_get_mutable_slot(storage_index)
existing_shnums = set(shares.keys())
si_s = idlib.b2a(storage_index)
bucketdir = os.path.join(self.sharedir, si_s)
fileutil.make_dirs(bucketdir)
for shnum in (sharenums - existing_shnums):
filename = os.path.join(bucketdir, "%d" % shnum)
shares[shnum] = create_mutable_sharefile(filename, my_nodeid,
write_enabler)
# update the lease on everything
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
anid = my_nodeid
lease_info = (ownerid, expire_time, renew_secret, cancel_secret, anid)
for share in shares.values():
share.add_or_renew_lease(lease_info)
return shares
def remote_get_mutable_slot(self, storage_index):
"""This returns an empty dictionary if the server has no shares
of the slot mentioned."""
si_s = idlib.b2a(storage_index) si_s = idlib.b2a(storage_index)
(write_enabler, renew_secret, cancel_secret) = secrets
# shares exist if there is a file for them # shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_s) bucketdir = os.path.join(self.sharedir, si_s)
if not os.path.isdir(bucketdir): shares = {}
return {} if os.path.isdir(bucketdir):
slots = {}
for sharenum_s in os.listdir(bucketdir): for sharenum_s in os.listdir(bucketdir):
try: try:
sharenum = int(sharenum_s) sharenum = int(sharenum_s)
except ValueError: except ValueError:
continue continue
filename = os.path.join(bucketdir, sharenum_s) filename = os.path.join(bucketdir, sharenum_s)
slots[sharenum] = MutableShareFile(filename) msf = MutableShareFile(filename)
return slots msf.check_write_enabler(write_enabler)
shares[sharenum] = msf
# write_enabler is good for all existing shares.
def remote_readv_slots(self, storage_index, readv): # Now evaluate test vectors.
testv_is_good = True
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if sharenum in shares:
if not shares[sharenum].check_testv(testv):
testv_is_good = False
break
else:
# compare the vectors against an empty share, in which all
# reads return empty strings.
if not EmptyShare().check_testv(testv):
testv_is_good = False
break
# now gather the read vectors, before we do any writes
read_data = {}
for sharenum, share in shares.items():
read_data[sharenum] = share.readv(read_vector)
if testv_is_good:
# now apply the write vectors
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if sharenum not in shares:
# allocate a new share
allocated_size = 2000 # arbitrary, really
share = self._allocate_slot_share(bucketdir, secrets,
sharenum,
allocated_size,
owner_num=0)
shares[sharenum] = share
shares[sharenum].writev(datav, new_length)
# and update the leases on all shares
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
my_nodeid = self.my_nodeid
anid = my_nodeid
lease_info = (ownerid, expire_time, renew_secret, cancel_secret,
anid)
for share in shares.values():
share.add_or_renew_lease(lease_info)
# all done
return (testv_is_good, read_data)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
allocated_size, owner_num=0):
(write_enabler, renew_secret, cancel_secret) = secrets
my_nodeid = self.my_nodeid
fileutil.make_dirs(bucketdir)
filename = os.path.join(bucketdir, "%d" % sharenum)
share = create_mutable_sharefile(filename, my_nodeid, write_enabler)
return share
def remote_slot_readv(self, storage_index, shares, readv):
si_s = idlib.b2a(storage_index) si_s = idlib.b2a(storage_index)
# shares exist if there is a file for them # shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_s) bucketdir = os.path.join(self.sharedir, si_s)
@ -898,6 +943,7 @@ class StorageServer(service.MultiService, Referenceable):
sharenum = int(sharenum_s) sharenum = int(sharenum_s)
except ValueError: except ValueError:
continue continue
if sharenum in shares or not shares:
filename = os.path.join(bucketdir, sharenum_s) filename = os.path.join(bucketdir, sharenum_s)
msf = MutableShareFile(filename) msf = MutableShareFile(filename)
datavs[sharenum] = msf.readv(readv) datavs[sharenum] = msf.readv(readv)

View File

@ -9,7 +9,7 @@ import itertools
from allmydata import interfaces from allmydata import interfaces
from allmydata.util import fileutil, hashutil, idlib from allmydata.util import fileutil, hashutil, idlib
from allmydata.storage import BucketWriter, BucketReader, \ from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile
from allmydata.interfaces import BadWriteEnablerError from allmydata.interfaces import BadWriteEnablerError
class Bucket(unittest.TestCase): class Bucket(unittest.TestCase):
@ -476,228 +476,275 @@ class MutableServer(unittest.TestCase):
write_enabler = self.write_enabler(we_tag) write_enabler = self.write_enabler(we_tag)
renew_secret = self.renew_secret(lease_tag) renew_secret = self.renew_secret(lease_tag)
cancel_secret = self.cancel_secret(lease_tag) cancel_secret = self.cancel_secret(lease_tag)
return ss.remote_allocate_mutable_slot(storage_index, rstaraw = ss.remote_slot_testv_and_readv_and_writev
write_enabler, testandwritev = dict( [ (shnum, ([], [], None) )
renew_secret, cancel_secret, for shnum in sharenums ] )
sharenums, size) readv = []
rc = rstaraw(storage_index,
(write_enabler, renew_secret, cancel_secret),
testandwritev,
readv)
(did_write, readv_data) = rc
self.failUnless(did_write)
self.failUnless(isinstance(readv_data, dict))
self.failUnlessEqual(len(readv_data), 0)
def test_allocate(self): def test_allocate(self):
ss = self.create("test_allocate") ss = self.create("test_allocate")
shares = self.allocate(ss, "si1", "we1", self._secret.next(), self.allocate(ss, "si1", "we1", self._secret.next(),
set([0,1,2]), 100) set([0,1,2]), 100)
self.failUnlessEqual(len(shares), 3)
self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
shares2 = ss.remote_get_mutable_slot("si1")
self.failUnlessEqual(len(shares2), 3)
self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
s0 = shares[0] read = ss.remote_slot_readv
self.failUnlessEqual(s0.remote_read(0, 10), "") self.failUnlessEqual(read("si1", [0], [(0, 10)]),
self.failUnlessEqual(s0.remote_read(100, 10), "") {0: [""]})
self.failUnlessEqual(read("si1", [], [(0, 10)]),
{0: [""], 1: [""], 2: [""]})
self.failUnlessEqual(read("si1", [0], [(100, 10)]),
{0: [""]})
# try writing to one # try writing to one
WE = self.write_enabler("we1") secrets = ( self.write_enabler("we1"),
self.renew_secret("we1"),
self.cancel_secret("we1") )
data = "".join([ ("%d" % i) * 10 for i in range(10) ]) data = "".join([ ("%d" % i) * 10 for i in range(10) ])
answer = s0.remote_testv_and_writev(WE, write = ss.remote_slot_testv_and_readv_and_writev
[], answer = write("si1", secrets,
[(0, data),], {0: ([], [(0,data)], None)},
new_length=None) [])
self.failUnlessEqual(answer, (True, [])) self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111") self.failUnlessEqual(read("si1", [0], [(0,20)]),
self.failUnlessEqual(s0.remote_read(95, 10), "99999") {0: ["00000000001111111111"]})
self.failUnlessEqual(s0.remote_get_length(), 100) self.failUnlessEqual(read("si1", [0], [(95,10)]),
{0: ["99999"]})
#self.failUnlessEqual(s0.remote_get_length(), 100)
bad_secrets = ("bad write enabler", secrets[1], secrets[2])
self.failUnlessRaises(BadWriteEnablerError, self.failUnlessRaises(BadWriteEnablerError,
s0.remote_testv_and_writev, write, "si1", bad_secrets,
"bad write enabler", {}, [])
[], [], None)
# this testv should fail # this testv should fail
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets,
[(0, 12, "eq", "444444444444"), {0: ([(0, 12, "eq", "444444444444"),
(20, 5, "eq", "22222"), (20, 5, "eq", "22222"),
], ],
[(0, "x"*100)], None) [(0, "x"*100)],
self.failUnlessEqual(answer, (False, ["000000000011", None),
"22222"])) },
self.failUnlessEqual(s0.remote_read(0, 100), data) [(0,12), (20,5)],
)
self.failUnlessEqual(answer, (False,
{0: ["000000000011", "22222"],
1: ["", ""],
2: ["", ""],
}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
# as should this one # as should this one
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets,
[(10, 5, "lt", "11111"), {0: ([(10, 5, "lt", "11111"),
], ],
[(0, "x"*100)], None) [(0, "x"*100)],
self.failUnlessEqual(answer, (False, ["11111"])) None),
self.failUnlessEqual(s0.remote_read(0, 100), data) },
[(10,5)],
)
self.failUnlessEqual(answer, (False,
{0: ["11111"],
1: [""],
2: [""]},
))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
def test_operators(self): def test_operators(self):
# test operators, the data we're comparing is '11111' in all cases. # test operators, the data we're comparing is '11111' in all cases.
# test both fail+pass, reset data after each one. # test both fail+pass, reset data after each one.
ss = self.create("test_operators") ss = self.create("test_operators")
shares = self.allocate(ss, "si1", "we1", self._secret.next(),
set([0,1,2]), 100)
s0 = shares[0]
WE = self.write_enabler("we1")
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
answer = s0.remote_testv_and_writev(WE,
[],
[(0, data),],
new_length=None)
# nop secrets = ( self.write_enabler("we1"),
answer = s0.remote_testv_and_writev(WE, self.renew_secret("we1"),
[(10, 5, "nop", "11111"), self.cancel_secret("we1") )
], data = "".join([ ("%d" % i) * 10 for i in range(10) ])
[(0, "x"*100)], None) write = ss.remote_slot_testv_and_readv_and_writev
self.failUnlessEqual(answer, (True, ["11111"])) read = ss.remote_slot_readv
self.failUnlessEqual(s0.remote_read(0, 100), "x"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None) def reset():
write("si1", secrets,
{0: ([], [(0,data)], None)},
[])
reset()
# lt # lt
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
[(10, 5, "lt", "11110"),
], ],
[(0, "x"*100)], None) [(0, "x"*100)],
self.failUnlessEqual(answer, (False, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), data) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (False, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
[(10, 5, "lt", "11111"),
], ],
[(0, "x"*100)], None) [(0, "x"*100)],
self.failUnlessEqual(answer, (False, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), data) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (False, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
[(10, 5, "lt", "11112"),
], ],
[(0, "y"*100)], None) [(0, "y"*100)],
self.failUnlessEqual(answer, (True, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (True, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
reset()
# le # le
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
[(10, 5, "le", "11110"),
], ],
[(0, "x"*100)], None) [(0, "x"*100)],
self.failUnlessEqual(answer, (False, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), data) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (False, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
[(10, 5, "le", "11111"),
], ],
[(0, "y"*100)], None) [(0, "y"*100)],
self.failUnlessEqual(answer, (True, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (True, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
[(10, 5, "le", "11112"),
], ],
[(0, "y"*100)], None) [(0, "y"*100)],
self.failUnlessEqual(answer, (True, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (True, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
reset()
# eq # eq
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
[(10, 5, "eq", "11112"),
], ],
[(0, "x"*100)], None) [(0, "x"*100)],
self.failUnlessEqual(answer, (False, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), data) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (False, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
[(10, 5, "eq", "11111"),
], ],
[(0, "y"*100)], None) [(0, "y"*100)],
self.failUnlessEqual(answer, (True, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (True, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
reset()
# ne # ne
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
[(10, 5, "ne", "11111"),
], ],
[(0, "x"*100)], None) [(0, "x"*100)],
self.failUnlessEqual(answer, (False, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), data) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (False, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
[(10, 5, "ne", "11112"),
], ],
[(0, "y"*100)], None) [(0, "y"*100)],
self.failUnlessEqual(answer, (True, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (True, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
reset()
# ge # ge
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
[(10, 5, "ge", "11110"),
], ],
[(0, "y"*100)], None) [(0, "y"*100)],
self.failUnlessEqual(answer, (True, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (True, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
[(10, 5, "ge", "11111"),
], ],
[(0, "y"*100)], None) [(0, "y"*100)],
self.failUnlessEqual(answer, (True, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (True, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
[(10, 5, "ge", "11112"),
], ],
[(0, "y"*100)], None) [(0, "y"*100)],
self.failUnlessEqual(answer, (False, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), data) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (False, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
reset()
# gt # gt
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
[(10, 5, "gt", "11110"),
], ],
[(0, "y"*100)], None) [(0, "y"*100)],
self.failUnlessEqual(answer, (True, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (True, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
[(10, 5, "gt", "11111"),
], ],
[(0, "x"*100)], None) [(0, "x"*100)],
self.failUnlessEqual(answer, (False, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), data) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (False, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
reset()
answer = s0.remote_testv_and_writev(WE, answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
[(10, 5, "gt", "11112"),
], ],
[(0, "x"*100)], None) [(0, "x"*100)],
self.failUnlessEqual(answer, (False, ["11111"])) None,
self.failUnlessEqual(s0.remote_read(0, 100), data) )}, [(10,5)])
s0.remote_testv_and_writev(WE, [], [(0,data)], None) self.failUnlessEqual(answer, (False, {0: ["11111"]}))
self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
reset()
def test_readv(self): def test_readv(self):
ss = self.create("test_allocate") ss = self.create("test_readv")
shares = self.allocate(ss, "si1", "we1", self._secret.next(), secrets = ( self.write_enabler("we1"),
set([0,1,2]), 100) self.renew_secret("we1"),
WE = self.write_enabler("we1") self.cancel_secret("we1") )
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
write = ss.remote_slot_testv_and_readv_and_writev
read = ss.remote_slot_readv
data = [("%d" % i) * 100 for i in range(3)] data = [("%d" % i) * 100 for i in range(3)]
for i in range(3): rc = write("si1", secrets,
rc = shares[i].remote_testv_and_writev(WE, [], [(0, data[i])], {0: ([], [(0,data[0])], None),
new_length=None) 1: ([], [(0,data[1])], None),
self.failUnlessEqual(rc, (True, [])) 2: ([], [(0,data[2])], None),
answer = ss.remote_readv_slots("si1", [(0, 10)]) }, [])
self.failUnlessEqual(rc, (True, {}))
answer = read("si1", [], [(0, 10)])
self.failUnlessEqual(answer, {0: ["0"*10], self.failUnlessEqual(answer, {0: ["0"*10],
1: ["1"*10], 1: ["1"*10],
2: ["2"*10]}) 2: ["2"*10]})
@ -716,15 +763,15 @@ class MutableServer(unittest.TestCase):
def test_leases(self): def test_leases(self):
ss = self.create("test_leases") ss = self.create("test_leases")
secret = 14 def secrets(n):
shares = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100) return ( self.write_enabler("we1"),
s0 = shares[0] self.renew_secret("we1-%d" % n),
WE = self.write_enabler("we1") self.cancel_secret("we1-%d" % n) )
data = "".join([ ("%d" % i) * 10 for i in range(10) ]) data = "".join([ ("%d" % i) * 10 for i in range(10) ])
answer = s0.remote_testv_and_writev(WE, write = ss.remote_slot_testv_and_readv_and_writev
[], read = ss.remote_slot_readv
[(0, data),], rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
new_length=None) self.failUnlessEqual(rc, (True, {}))
# create a random non-numeric file in the bucket directory, to # create a random non-numeric file in the bucket directory, to
# exercise the code that's supposed to ignore those. # exercise the code that's supposed to ignore those.
@ -736,40 +783,41 @@ class MutableServer(unittest.TestCase):
# re-allocate the slots and use the same secrets, that should update # re-allocate the slots and use the same secrets, that should update
# the lease # the lease
shares2 = self.allocate(ss, "si1", "we1", secret, set([0,1,2]), 100) write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
# renew it directly # renew it directly
ss.remote_renew_lease("si1", self.renew_secret(secret)) ss.remote_renew_lease("si1", secrets(0)[1])
# now allocate them with a bunch of different secrets, to trigger the # now allocate them with a bunch of different secrets, to trigger the
# extended lease code # extended lease code
shares2 = self.allocate(ss, "si1", "we1", secret+1, set([0,1,2]), 100) write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
shares2 = self.allocate(ss, "si1", "we1", secret+2, set([0,1,2]), 100) write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
shares2 = self.allocate(ss, "si1", "we1", secret+3, set([0,1,2]), 100) write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
shares2 = self.allocate(ss, "si1", "we1", secret+4, set([0,1,2]), 100) write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
shares2 = self.allocate(ss, "si1", "we1", secret+5, set([0,1,2]), 100) write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
# cancel one of them
ss.remote_cancel_lease("si1", self.cancel_secret(secret+5))
# cancel one of them
ss.remote_cancel_lease("si1", secrets(5)[2])
s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
all_leases = s0.debug_get_leases() all_leases = s0.debug_get_leases()
self.failUnlessEqual(len(all_leases), 5) self.failUnlessEqual(len(all_leases), 5)
# and write enough data to expand the container, forcing the server # and write enough data to expand the container, forcing the server
# to move the leases # to move the leases
answer = s0.remote_testv_and_writev(WE, write("si1", secrets(0),
[], {0: ([], [(0,data)], 200), },
[(0, data),], [])
new_length=200)
# read back the leases, make sure they're still intact. # read back the leases, make sure they're still intact.
self.compare_leases_without_timestamps(all_leases, self.compare_leases_without_timestamps(all_leases,
s0.debug_get_leases()) s0.debug_get_leases())
ss.remote_renew_lease("si1", self.renew_secret(secret)) ss.remote_renew_lease("si1", secrets(0)[1])
ss.remote_renew_lease("si1", self.renew_secret(secret+1)) ss.remote_renew_lease("si1", secrets(1)[1])
ss.remote_renew_lease("si1", self.renew_secret(secret+2)) ss.remote_renew_lease("si1", secrets(2)[1])
ss.remote_renew_lease("si1", self.renew_secret(secret+3)) ss.remote_renew_lease("si1", secrets(3)[1])
ss.remote_renew_lease("si1", self.renew_secret(secret+4)) ss.remote_renew_lease("si1", secrets(4)[1])
self.compare_leases_without_timestamps(all_leases, self.compare_leases_without_timestamps(all_leases,
s0.debug_get_leases()) s0.debug_get_leases())
# get a new copy of the leases, with the current timestamps. Reading # get a new copy of the leases, with the current timestamps. Reading
@ -782,40 +830,40 @@ class MutableServer(unittest.TestCase):
# is present, to provide for share migration # is present, to provide for share migration
self.failUnlessRaises(IndexError, self.failUnlessRaises(IndexError,
ss.remote_renew_lease, "si1", ss.remote_renew_lease, "si1",
self.renew_secret(secret+20)) secrets(20)[1])
# same for cancelling # same for cancelling
self.failUnlessRaises(IndexError, self.failUnlessRaises(IndexError,
ss.remote_cancel_lease, "si1", ss.remote_cancel_lease, "si1",
self.cancel_secret(secret+20)) secrets(20)[2])
self.failUnlessEqual(all_leases, s0.debug_get_leases())
s0.remote_read(0, 200)
self.failUnlessEqual(all_leases, s0.debug_get_leases()) self.failUnlessEqual(all_leases, s0.debug_get_leases())
answer = s0.remote_testv_and_writev(WE, # reading shares should not modify the timestamp
[], read("si1", [], [(0,200)])
[(200, "make me bigger"),], self.failUnlessEqual(all_leases, s0.debug_get_leases())
new_length=None)
write("si1", secrets(0),
{0: ([], [(200, "make me bigger")], None)}, [])
self.compare_leases_without_timestamps(all_leases, self.compare_leases_without_timestamps(all_leases,
s0.debug_get_leases()) s0.debug_get_leases())
answer = s0.remote_testv_and_writev(WE, write("si1", secrets(0),
[], {0: ([], [(500, "make me really bigger")], None)}, [])
[(500, "make me really bigger"),],
new_length=None)
self.compare_leases_without_timestamps(all_leases, self.compare_leases_without_timestamps(all_leases,
s0.debug_get_leases()) s0.debug_get_leases())
# now cancel them all # now cancel them all
ss.remote_cancel_lease("si1", self.cancel_secret(secret)) ss.remote_cancel_lease("si1", secrets(0)[2])
ss.remote_cancel_lease("si1", self.cancel_secret(secret+1)) ss.remote_cancel_lease("si1", secrets(1)[2])
ss.remote_cancel_lease("si1", self.cancel_secret(secret+2)) ss.remote_cancel_lease("si1", secrets(2)[2])
ss.remote_cancel_lease("si1", self.cancel_secret(secret+3)) ss.remote_cancel_lease("si1", secrets(3)[2])
# the slot should still be there # the slot should still be there
shares3 = ss.remote_get_mutable_slot("si1") remaining_shares = read("si1", [], [(0,10)])
self.failUnlessEqual(len(shares3), 3) self.failUnlessEqual(len(remaining_shares), 1)
self.failUnlessEqual(len(s0.debug_get_leases()), 1) self.failUnlessEqual(len(s0.debug_get_leases()), 1)
ss.remote_cancel_lease("si1", self.cancel_secret(secret+4)) ss.remote_cancel_lease("si1", secrets(4)[2])
# now the slot should be gone # now the slot should be gone
self.failUnlessEqual(ss.remote_get_mutable_slot("si1"), {}) no_shares = read("si1", [], [(0,10)])
self.failUnlessEqual(no_shares, {})