mutable slots: finish up basic coding on server-side containers, add some tests. Remove all caching from MutableShareFile.

This commit is contained in:
Brian Warner 2007-10-31 00:10:40 -07:00
parent ebc44c3fcb
commit 68d3d62002
3 changed files with 416 additions and 129 deletions

View File

@ -111,7 +111,7 @@ class RIMutableSlot(RemoteInterface):
The boolean return value is True if the write vector was applied,
false if not.
If the write_enabler is wrong, this will raise BadWriterEnablerError.
If the write_enabler is wrong, this will raise BadWriteEnablerError.
To enable share migration, the exception will have the nodeid used
for the old write enabler embedded in it, in the following string::

View File

@ -8,7 +8,7 @@ from twisted.internet import defer
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader, IStorageBucketWriter, IStorageBucketReader, HASH_SIZE, \
BadWriterEnablerError, RIMutableSlot
BadWriteEnablerError, RIMutableSlot
from allmydata.util import fileutil, idlib, mathutil
from allmydata.util.assertutil import precondition, _assert
@ -224,15 +224,15 @@ class BucketReader(Referenceable):
# 1 0 32 magic verstr "tahoe mutable container v1" plus binary
# 2 32 32 write enabler's nodeid
# 3 64 32 write enabler
# 4 72 8 data size (actual share data present) (a)
# 5 80 8 offset of (8) count of extra leases (after data)
# 6 88 416 four leases, 104 bytes each
# 4 96 8 data size (actual share data present) (a)
# 5 104 8 offset of (8) count of extra leases (after data)
# 6 112 416 four leases, 104 bytes each
# 0 4 ownerid (0 means "no lease here")
# 4 4 expiration timestamp
# 8 32 renewal token
# 40 32 cancel token
# 72 32 nodeid which accepted the tokens
# 7 504 (a) data
# 7 528 (a) data
# 8 ?? 4 count of extra leases
# 9 ?? n*104 extra leases
@ -249,9 +249,12 @@ class MutableShareFile(Referenceable):
implements(RIMutableSlot)
DATA_LENGTH_OFFSET = struct.calcsize(">32s32s32s")
EXTRA_LEASE_OFFSET = DATA_LENGTH_OFFSET + 8
HEADER_SIZE = struct.calcsize(">32s32s32sQQ") # doesn't include leases
LEASE_SIZE = struct.calcsize(">LL32s32s32s")
assert LEASE_SIZE == 104
DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE
assert DATA_OFFSET == 528, DATA_OFFSET
# our sharefiles share with a recognizable string, plus some random
# binary data to reduce the chance that a regular text file will look
# like a sharefile.
@ -262,130 +265,143 @@ class MutableShareFile(Referenceable):
def __init__(self, filename):
self.home = filename
self._base_lease_offset = self.HEADER_SIZE
if os.path.exists(self.home):
# we don't cache anything, just check the magic
f = open(self.home, 'rb')
data = f.read(88)
data = f.read(self.HEADER_SIZE)
(magic,
self._write_enabler_nodeid, self._write_enabler,
self._data_length, offset) = struct.unpack(">32s32s32sQQ", data)
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
struct.unpack(">32s32s32sQQ", data)
assert magic == self.MAGIC
self._extra_lease_offset = offset # points at (8)
f.seek(self._extra_lease_offset)
data = f.read(4)
self._num_extra_leases = struct.unpack(">L", data)
f.close()
def create(self, my_nodeid, write_enabler):
assert not os.path.exists(self.home)
self._write_enabler = write_enabler
self._data_length = 0
self._extra_lease_offset = (self.HEADER_SIZE
+ 4 * self.LEASE_SIZE
+ self._data_length)
assert self._extra_lease_offset == self.DATA_OFFSET # true at creation
self._num_extra_leases = 0
data_length = 0
extra_lease_offset = (self.HEADER_SIZE
+ 4 * self.LEASE_SIZE
+ data_length)
assert extra_lease_offset == self.DATA_OFFSET # true at creation
num_extra_leases = 0
f = open(self.home, 'wb')
header = struct.pack(">32s32s32sQQ",
self.MAGIC, my_nodeid, write_enabler,
self._data_length, self._extra_lease_offset,
data_length, extra_lease_offset,
)
f.write(header)
leases = ("\x00"*self.LEASE_SIZE) * 4
f.write(header + leases)
# data goes here, empty after creation
f.write(struct.pack(">L", self._num_extra_leases))
f.write(struct.pack(">L", num_extra_leases))
# extra leases go here, none at creation
f.close()
def _read_data_length(self, f):
f.seek(self.DATA_LENGTH_OFFSET)
(data_length,) = struct.unpack(">Q", f.read(8))
return data_length
def read_share_data(self, offset, length):
def _write_data_length(self, f, data_length):
f.seek(self.DATA_LENGTH_OFFSET)
f.write(struct.pack(">Q", data_length))
def _read_share_data(self, f, offset, length):
precondition(offset >= 0)
if offset+length > self._data_length:
data_length = self._read_data_length(f)
if offset+length > data_length:
# reads beyond the end of the data are truncated. Reads that
# start beyond the end of the data return an empty string.
length = max(0, self._data_length-offset)
length = max(0, data_length-offset)
if length == 0:
return ""
precondition(offset+length <= self._data_length)
f = open(self.home, 'rb')
precondition(offset+length <= data_length)
f.seek(self.DATA_OFFSET+offset)
return f.read(length)
data = f.read(length)
return data
def change_container_size(self, new_container_size):
def _read_extra_lease_offset(self, f):
f.seek(self.EXTRA_LEASE_OFFSET)
(extra_lease_offset,) = struct.unpack(">Q", f.read(8))
return extra_lease_offset
def _write_extra_lease_offset(self, f, offset):
f.seek(self.EXTRA_LEASE_OFFSET)
f.write(struct.pack(">Q", offset))
def _read_num_extra_leases(self, f):
offset = self._read_extra_lease_offset(f)
f.seek(offset)
(num_extra_leases,) = struct.unpack(">L", f.read(4))
return num_extra_leases
def _write_num_extra_leases(self, f, num_leases):
extra_lease_offset = self._read_extra_lease_offset(f)
f.seek(extra_lease_offset)
f.write(struct.pack(">L", num_leases))
def _change_container_size(self, f, new_container_size):
if new_container_size > self.MAX_SIZE:
raise DataTooLargeError()
old_extra_lease_offset = self._read_extra_lease_offset(f)
new_extra_lease_offset = self.DATA_OFFSET + new_container_size
if new_extra_lease_offset < self._extra_lease_offset:
# TODO: allow containers to shrink
if new_extra_lease_offset < old_extra_lease_offset:
# TODO: allow containers to shrink. For now they remain large.
return
f = open(self.home, 'rb+')
f.seek(self._extra_lease_offset)
extra_lease_data = f.read(4 + self._num_extra_leases * self.LEASE_SIZE)
num_extra_leases = self._read_num_extra_leases(f)
f.seek(old_extra_lease_offset)
extra_lease_data = f.read(4 + num_extra_leases * self.LEASE_SIZE)
f.seek(new_extra_lease_offset)
f.write(extra_lease_data)
self._extra_lease_offset = new_extra_lease_offset
# an interrupt here will corrupt the leases, iff the move caused the
# extra leases to overlap.
f.seek(self.DATA_LENGTH_OFFSET+8)
f.write(struct.pack(">Q", new_extra_lease_offset))
f.close()
self._write_extra_lease_offset(f, new_extra_lease_offset)
def write_share_data(self, offset, data):
def _write_share_data(self, f, offset, data):
length = len(data)
precondition(offset >= 0)
if offset+length < self._data_length:
# they are not expanding their data size
f = open(self.home, 'rb+')
f.seek(self.DATA_OFFSET+offset)
f.write(data)
f.close()
return
if self.DATA_OFFSET+offset+length <= self._extra_lease_offset:
# they are expanding their data size, but not the container size
f = open(self.home, 'rb+')
self._data_length = offset+length
f.seek(self.DATA_LENGTH_OFFSET)
f.write(struct.pack(">Q", self._data_length))
data_length = self._read_data_length(f)
extra_lease_offset = self._read_extra_lease_offset(f)
if offset+length >= data_length:
# They are expanding their data size.
if self.DATA_OFFSET+offset+length > extra_lease_offset:
# Their new data won't fit in the current container, so we
# have to move the leases. With luck, they're expanding it
# more than the size of the extra lease block, which will
# minimize the corrupt-the-share window
self._change_container_size(f, offset+length)
extra_lease_offset = self._read_extra_lease_offset(f)
# an interrupt here is ok.. the container has been enlarged
# but the data remains untouched
assert self.DATA_OFFSET+offset+length <= extra_lease_offset
# Their data now fits in the current container. We must write
# their new data and modify the recorded data size.
new_data_length = offset+length
self._write_data_length(f, new_data_length)
# an interrupt here will result in a corrupted share
f.seek(self.DATA_OFFSET+offset)
f.write(data)
f.close()
return
# they are expanding the container, so we have to move the leases.
# With luck, they're expanding it more than the size of the extra
# lease block, which will minimize the corrupt-the-share window
self.change_container_size(offset+length)
# an interrupt here is ok.. the container has been enlarged but the
# data remains untouched
self._data_length = offset+length
f = open(self.home, 'rb+')
# now all that's left to do is write out their data
f.seek(self.DATA_OFFSET+offset)
f.write(data)
# an interrupt here will result in a corrupted share
f.seek(self.DATA_LENGTH_OFFSET)
f.write(struct.pack(">Q", self._data_length))
f.close()
return
def _write_lease_record(self, f, lease_number, lease_info):
(ownerid, expiration_time,
renew_secret, cancel_secret, nodeid) = lease_info
extra_lease_offset = self._read_extra_lease_offset(f)
num_extra_leases = self._read_num_extra_leases(f)
if lease_number < 4:
offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
elif (lease_number-4) < self._num_extra_leases:
offset = (self._extra_lease_offset
elif (lease_number-4) < num_extra_leases:
offset = (extra_lease_offset
+ 4
+ (lease_number-4)*self.LEASE_NUMBER)
else:
f.seek(self._extra_lease_offset)
f.write(struct.pack(">L", self._num_extra_leases+1))
self._num_extra_leases += 1
offset = (self._extra_lease_offset
# must add an extra lease record
self._write_num_extra_leases(f, num_extra_leases+1)
offset = (extra_lease_offset
+ 4
+ (lease_number-4)*self.LEASE_NUMBER)
f.seek(offset)
@ -396,10 +412,12 @@ class MutableShareFile(Referenceable):
def _read_lease_record(self, f, lease_number):
# returns a 5-tuple of lease info, or None
extra_lease_offset = self._read_extra_lease_offset(f)
num_extra_leases = self._read_num_extra_leases(f)
if lease_number < 4:
offset = self.HEADER_SIZE + lease_number * self.LEASE_SIZE
elif (lease_number-4) < self._num_extra_leases:
offset = (self._extra_lease_offset
elif (lease_number-4) < num_extra_leases:
offset = (extra_lease_offset
+ 4
+ (lease_number-4)*self.LEASE_NUMBER)
else:
@ -414,27 +432,25 @@ class MutableShareFile(Referenceable):
return None
return lease_info
def _read_num_leases(self, f):
f.seek(self.HEADER_SIZE)
leasedata = f.read(4*self.LEASE_SIZE)
num_leases = 0
for i in range(4):
base = i*self.LEASE_SIZE
(ownerid,) = struct.unpack(">L", leasedata[base:base+4])
if ownerid != 0:
num_leases += 1
return num_leases + self._num_extra_leases
def _get_num_lease_slots(self, f):
# how many places do we have allocated for leases? Not all of them
# are filled.
num_extra_leases = self._read_num_extra_leases(f)
return 4+num_extra_leases
def pack_leases(self):
pass
def _get_first_empty_lease_slot(self, f):
# return an int with the index of an empty slot, or None if we do not
# currently have an empty slot
def _truncate_leases(self, f, num_leases):
f.truncate(self._lease_offset + num_leases * self.LEASE_SIZE)
for i in range(self._get_num_lease_slots(f)):
if self._read_lease_record(f, i) is None:
return i
return None
def enumerate_leases(self, f):
"""Yields (leasenum, (ownerid, expiration_time, renew_secret,
cancel_secret, accepting_nodeid)) for all leases."""
for i in range(self._read_num_leases(f)):
for i in range(self._get_num_lease_slots(f)):
try:
data = self._read_lease_record(f, i)
if data is not None:
@ -444,9 +460,12 @@ class MutableShareFile(Referenceable):
def add_lease(self, lease_info):
f = open(self.home, 'rb+')
num_leases = self._read_num_leases(f)
self._write_lease_record(f, num_leases, lease_info)
self._write_num_leases(f, num_leases+1)
num_lease_slots = self._get_num_lease_slots(f)
empty_slot = self._get_first_empty_lease_slot(f)
if empty_slot is not None:
self._write_lease_record(f, empty_slot, lease_info)
else:
self._write_lease_record(f, num_lease_slots, lease_info)
f.close()
def renew_lease(self, renew_secret, new_expire_time):
@ -463,10 +482,15 @@ class MutableShareFile(Referenceable):
return
accepting_nodeids.add(anid)
f.close()
# TODO: return the accepting_nodeids set, to give the client a chance
# to update the leases on a share which has been migrated from its
# Return the accepting_nodeids set, to give the client a chance to
# update the leases on a share which has been migrated from its
# original server to a new one.
raise IndexError("unable to renew non-existent lease")
msg = ("Unable to renew non-existent lease. I have leases accepted by"
" nodeids: ")
msg += ",".join([("'%s'" % idlib.b2a(anid))
for anid in accepting_nodeids])
msg += " ."
raise IndexError(msg)
def add_or_renew_lease(self, lease_info):
ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
@ -480,12 +504,14 @@ class MutableShareFile(Referenceable):
(num_remaining_leases, space_freed). Raise IndexError if there was no
lease with the given cancel_secret."""
accepting_nodeids = set()
modified = 0
remaining = 0
blank = "\x00"*32
blank_lease = (0, 0, blank, blank, blank)
f = open(self.home, 'rb+')
for (leasenum,(oid,et,rs,cs,anid)) in self.enumerate_leases(f):
accepting_nodeids.add(anid)
if cs == cancel_secret:
self._write_lease_record(f, leasenum, blank_lease)
modified += 1
@ -493,44 +519,72 @@ class MutableShareFile(Referenceable):
remaining += 1
if modified:
freed_space = self._pack_leases(f)
f.close()
return (freed_space, remaining)
f.close()
return (freed_space, remaining)
msg = ("Unable to cancel non-existent lease. I have leases "
"accepted by nodeids: ")
msg += ",".join([("'%s'" % idlib.b2a(anid))
for anid in accepting_nodeids])
msg += " ."
raise IndexError(msg)
def _pack_leases(self, f):
# TODO: reclaim space from cancelled leases
return 0
def _read_write_enabler_and_nodeid(self, f):
f.seek(0)
data = f.read(self.HEADER_SIZE)
(magic,
write_enabler_nodeid, write_enabler,
data_length, extra_least_offset) = \
struct.unpack(">32s32s32sQQ", data)
assert magic == self.MAGIC
return (write_enabler, write_enabler_nodeid)
def remote_read(self, offset, length):
return self.read_share_data(offset, length)
f = open(self.home, 'rb')
data = self._read_share_data(f, offset, length)
f.close()
return data
def remote_get_length(self):
return self._data_length
f = open(self.home, 'rb')
data_length = self._read_data_length(f)
f.close()
return data_length
def remote_testv_and_writev(self, write_enabler, testv, datav, new_length):
if write_enabler != self._write_enabler:
f = open(self.home, 'rb+')
(real_write_enabler, write_enabler_nodeid) = \
self._read_write_enabler_and_nodeid(f)
if write_enabler != real_write_enabler:
# accomodate share migration by reporting the nodeid used for the
# old write enabler.
f.close()
msg = "The write enabler was recorded by nodeid '%s'." % \
(idlib.b2a(self._write_enabler_nodeid),)
raise BadWriterEnablerError(msg)
(idlib.b2a(write_enabler_nodeid),)
raise BadWriteEnablerError(msg)
# check testv
test_results_v = []
test_failed = False
for (offset, length, operator, specimen) in testv:
data = self.read_share_data(offset, length)
data = self._read_share_data(f, offset, length)
test_results_v.append(data)
if not self.compare(data, operator, specimen):
test_failed = False
test_failed = True
if test_failed:
f.close()
return (False, test_results_v)
# now apply the write vector
for (offset, data) in datav:
self.write_share_data(offset, data)
self._write_share_data(f, offset, data)
if new_length is not None:
self.change_container_size(new_length)
self._data_length = new_length
f = open(self.home, "rb+")
self._change_container_size(f, new_length)
f.seek(self.DATA_LENGTH_OFFSET)
f.write(struct.pack(">Q", self._data_length))
f.close()
f.write(struct.pack(">Q", new_length))
f.close()
return (True, test_results_v)
def compare(self, a, op, b):
@ -538,9 +592,9 @@ class MutableShareFile(Referenceable):
if op == "nop":
return True
if op == "lt":
return a <= b
if op == "le":
return a < b
if op == "le":
return a <= b
if op == "eq":
return a == b
if op == "ne":
@ -574,9 +628,21 @@ class StorageServer(service.MultiService, Referenceable):
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
self.measure_size()
def setNodeID(self, nodeid):
# somebody must set this before any slots can be created or leases
# added
self.my_nodeid = nodeid
def startService(self):
service.MultiService.startService(self)
if self.parent:
nodeid = self.parent.nodeid # 20 bytes, binary
assert len(nodeid) == 20
self.setNodeID(nodeid + "\x00"*12) # make it 32 bytes
# TODO: review this 20-vs-32 thing, settle on one or the other
def _clean_incomplete(self):
fileutil.rm_dir(self.incomingdir)
@ -740,6 +806,50 @@ class StorageServer(service.MultiService, Referenceable):
except StopIteration:
return iter([])
def remote_allocate_mutable_slot(self, storage_index,
write_enabler,
renew_secret, cancel_secret,
sharenums,
allocated_size):
my_nodeid = self.my_nodeid
sharenums = set(sharenums)
shares = self.remote_get_mutable_slot(storage_index)
existing_shnums = set(shares.keys())
si_s = idlib.b2a(storage_index)
bucketdir = os.path.join(self.sharedir, si_s)
fileutil.make_dirs(bucketdir)
for shnum in (sharenums - existing_shnums):
filename = os.path.join(bucketdir, "%d" % shnum)
shares[shnum] = create_mutable_sharefile(filename, my_nodeid,
write_enabler)
# update the lease on everything
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
anid = my_nodeid
lease_info = (ownerid, expire_time, renew_secret, cancel_secret, anid)
for share in shares.values():
share.add_or_renew_lease(lease_info)
return shares
def remote_get_mutable_slot(self, storage_index):
"""This returns an empty dictionary if the server has no shares
of the slot mentioned."""
si_s = idlib.b2a(storage_index)
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_s)
if not os.path.isdir(bucketdir):
return {}
slots = {}
for sharenum_s in os.listdir(bucketdir):
try:
sharenum = int(sharenum_s)
except ValueError:
continue
filename = os.path.join(bucketdir, sharenum_s)
slots[sharenum] = MutableShareFile(filename)
return slots
# the code before here runs on the storage server, not the client
# the code beyond here runs on the client, not the storage server

View File

@ -10,6 +10,7 @@ from allmydata import interfaces
from allmydata.util import fileutil, hashutil
from allmydata.storage import BucketWriter, BucketReader, \
WriteBucketProxy, ReadBucketProxy, StorageServer
from allmydata.interfaces import BadWriteEnablerError
class Bucket(unittest.TestCase):
def make_workdir(self, name):
@ -456,6 +457,7 @@ class MutableServer(unittest.TestCase):
workdir = self.workdir(name)
ss = StorageServer(workdir, sizelimit)
ss.setServiceParent(self.sparent)
ss.setNodeID("\x00" * 32)
return ss
def test_create(self):
@ -478,26 +480,201 @@ class MutableServer(unittest.TestCase):
shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100)
self.failUnlessEqual(len(shares), 3)
self.failUnlessEqual(set(shares.keys()), set([0,1,2]))
shares2 = ss.get_mutable_slot("si1")
shares2 = ss.remote_get_mutable_slot("si1")
self.failUnlessEqual(len(shares2), 3)
self.failUnlessEqual(set(shares2.keys()), set([0,1,2]))
# the actual RIMutableSlot objects are required to be singtons (one
# per SI+shnum), so each get_mutable_slot() call should return the
# same RemoteReferences
self.failUnlessEqual(set(shares.values()), set(shares2.values()))
s0 = shares[0]
self.failUnlessEqual(s0.remote_read(0, 10), "")
self.failUnlessEqual(s0.remote_read(100, 10), "")
# try writing to one
WE = self.write_enabler("we1")
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
answer = s0.remote_testv_and_writev(self.write_enabler("we1"),
answer = s0.remote_testv_and_writev(WE,
[],
[(0, data),],
new_length=None)
self.failUnlessEqual(answer, [])
self.failUnlessEqual(answer, (True, []))
self.failUnlessEqual(s0.remote_read(0, 20), "00000000001111111111")
self.failUnlessEqual(s0.remote_read(95, 10), "99999")
self.failUnlessEqual(s0.remote_get_length(), 100)
self.failUnlessRaises(BadWriteEnablerError,
s0.remote_testv_and_writev,
"bad write enabler",
[], [], None)
# this testv should fail
answer = s0.remote_testv_and_writev(WE,
[(0, 12, "eq", "444444444444"),
(20, 5, "eq", "22222"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (False, ["000000000011",
"22222"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
# as should this one
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "lt", "11111"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (False, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
def test_operators(self):
# test operators, the data we're comparing is '11111' in all cases.
# test both fail+pass, reset data after each one.
ss = self.create("test_operators")
shares = self.allocate(ss, "si1", "we1", set([0,1,2]), 100)
s0 = shares[0]
WE = self.write_enabler("we1")
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
answer = s0.remote_testv_and_writev(WE,
[],
[(0, data),],
new_length=None)
# nop
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "nop", "11111"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (True, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), "x"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
# lt
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "lt", "11110"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (False, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "lt", "11111"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (False, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "lt", "11112"),
],
[(0, "y"*100)], None)
self.failUnlessEqual(answer, (True, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
# le
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "le", "11110"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (False, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "le", "11111"),
],
[(0, "y"*100)], None)
self.failUnlessEqual(answer, (True, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "le", "11112"),
],
[(0, "y"*100)], None)
self.failUnlessEqual(answer, (True, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
# eq
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "eq", "11112"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (False, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "eq", "11111"),
],
[(0, "y"*100)], None)
self.failUnlessEqual(answer, (True, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
# ne
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "ne", "11111"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (False, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "ne", "11112"),
],
[(0, "y"*100)], None)
self.failUnlessEqual(answer, (True, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
# ge
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "ge", "11110"),
],
[(0, "y"*100)], None)
self.failUnlessEqual(answer, (True, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "ge", "11111"),
],
[(0, "y"*100)], None)
self.failUnlessEqual(answer, (True, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "ge", "11112"),
],
[(0, "y"*100)], None)
self.failUnlessEqual(answer, (False, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
# gt
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "gt", "11110"),
],
[(0, "y"*100)], None)
self.failUnlessEqual(answer, (True, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), "y"*100)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "gt", "11111"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (False, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)
answer = s0.remote_testv_and_writev(WE,
[(10, 5, "gt", "11112"),
],
[(0, "x"*100)], None)
self.failUnlessEqual(answer, (False, ["11111"]))
self.failUnlessEqual(s0.remote_read(0, 100), data)
s0.remote_testv_and_writev(WE, [], [(0,data)], None)