From 60725ed0656a3195bcd094dd18d7468d29a5bee6 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Wed, 9 Jul 2008 18:06:55 -0700 Subject: [PATCH] storage: add add_lease/update_write_enabler to remote API, revamp lease handling --- src/allmydata/interfaces.py | 39 ++++- src/allmydata/scripts/debug.py | 30 ++-- src/allmydata/storage.py | 253 +++++++++++++++++++---------- src/allmydata/test/test_storage.py | 109 ++++++++++--- 4 files changed, 299 insertions(+), 132 deletions(-) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 5ef7c38aa..ff6b6b5b5 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -21,6 +21,7 @@ URIExtensionData = StringConstraint(1000) Number = IntegerConstraint(8) # 2**(8*8) == 16EiB ~= 18e18 ~= 18 exabytes Offset = Number ReadSize = int # the 'int' constraint is 2**31 == 2Gib -- large files are processed in not-so-large increments +WriteEnablerSecret = Hash # used to protect mutable bucket modifications LeaseRenewSecret = Hash # used to protect bucket lease renewal requests LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests @@ -110,11 +111,21 @@ class RIStorageServer(RemoteInterface): return TupleOf(SetOf(int, maxLength=MAX_BUCKETS), DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS)) + def add_lease(storage_index=StorageIndex, + renew_secret=LeaseRenewSecret, + cancel_secret=LeaseCancelSecret): + """ + Add a new lease on the given bucket. If the renew_secret matches an + existing lease, that lease will be renewed instead. + """ + return None + def renew_lease(storage_index=StorageIndex, renew_secret=LeaseRenewSecret): """ Renew the lease on a given bucket. Some networks will use this, some will not. """ + return None def cancel_lease(storage_index=StorageIndex, cancel_secret=LeaseCancelSecret): @@ -122,6 +133,7 @@ class RIStorageServer(RemoteInterface): Cancel the lease on a given bucket. If this was the last lease on the bucket, the bucket will be deleted. """ + return None def get_buckets(storage_index=StorageIndex): return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS) @@ -136,7 +148,9 @@ class RIStorageServer(RemoteInterface): return DictOf(int, ReadData) # shnum -> results def slot_testv_and_readv_and_writev(storage_index=StorageIndex, - secrets=TupleOf(Hash, Hash, Hash), + secrets=TupleOf(WriteEnablerSecret, + LeaseRenewSecret, + LeaseCancelSecret), tw_vectors=TestAndWriteVectorsForShares, r_vector=ReadVector, ): @@ -200,8 +214,9 @@ class RIStorageServer(RemoteInterface): for each element of the read vector. If the write_enabler is wrong, this will raise BadWriteEnablerError. - To enable share migration, the exception will have the nodeid used - for the old write enabler embedded in it, in the following string:: + To enable share migration (using update_write_enabler), the exception + will have the nodeid used for the old write enabler embedded in it, + in the following string:: The write enabler was recorded by nodeid '%s'. @@ -211,6 +226,24 @@ class RIStorageServer(RemoteInterface): """ return TupleOf(bool, DictOf(int, ReadData)) + def update_write_enabler(storage_index=StorageIndex, + old_write_enabler=WriteEnablerSecret, + new_write_enabler=WriteEnablerSecret): + """ + Replace the write-enabler on a given bucket. This is used when a + share has been moved from one server to another, causing the secret + (which is scoped to a given server's nodeid) to become invalid. The + client discovers this when it gets a BadWriteEnablerError, and the + string body of the exception will contain a message that includes the + nodeid that was used for the old secret. + + The client should compute the old write-enabler secret, and send it + in conjunction with the new one. The server will then update the + share to record the new write-enabler instead of the old one. The + client can then retry its writev call. + """ + return None + class IStorageBucketWriter(Interface): def put_block(segmentnum=int, data=ShareData): """@param data: For most segments, this data will be 'blocksize' diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index 19edc7773..f930ff94f 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -88,10 +88,9 @@ def dump_share(config, out=sys.stdout, err=sys.stderr): leases = list(f.iter_leases()) if leases: for i,lease in enumerate(leases): - (owner_num, renew_secret, cancel_secret, expiration_time) = lease - when = format_expiration_time(expiration_time) - print >>out, " Lease #%d: owner=%d, expire in %s" % (i, owner_num, - when) + when = format_expiration_time(lease.expiration_time) + print >>out, " Lease #%d: owner=%d, expire in %s" \ + % (i, lease.owner_num, when) else: print >>out, " No leases." @@ -137,15 +136,15 @@ def dump_mutable_share(config, out, err): print >>out, " container_size: %d" % container_size print >>out, " data_length: %d" % data_length if leases: - for (leasenum, (oid,et,rs,cs,anid)) in leases: + for (leasenum, lease) in leases: print >>out print >>out, " Lease #%d:" % leasenum - print >>out, " ownerid: %d" % oid - when = format_expiration_time(et) + print >>out, " ownerid: %d" % lease.owner_num + when = format_expiration_time(lease.expiration_time) print >>out, " expires in %s" % when - print >>out, " renew_secret: %s" % base32.b2a(rs) - print >>out, " cancel_secret: %s" % base32.b2a(cs) - print >>out, " secrets are for nodeid: %s" % idlib.nodeid_b2a(anid) + print >>out, " renew_secret: %s" % base32.b2a(lease.renew_secret) + print >>out, " cancel_secret: %s" % base32.b2a(lease.cancel_secret) + print >>out, " secrets are for nodeid: %s" % idlib.nodeid_b2a(lease.nodeid) else: print >>out, "No leases." print >>out @@ -402,10 +401,8 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out, err): extra_lease_offset = m._read_extra_lease_offset(f) container_size = extra_lease_offset - m.DATA_OFFSET leases = list(m._enumerate_leases(f)) - expiration_time = min( [expiration_time - for (leasenum, - (ownerid, expiration_time, rs, cs, nodeid)) - in leases] ) + expiration_time = min( [lease[1].expiration_time + for lease in leases] ) expiration = max(0, expiration_time - now) share_type = "unknown" @@ -448,9 +445,8 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out, err): length = struct.unpack(">L", sf.read_share_data(seek, 4))[0] seek += 4 UEB_data = sf.read_share_data(seek, length) - expiration_time = min( [expiration_time - for (ownerid, rs, cs, expiration_time) - in sf.iter_leases()] ) + expiration_time = min( [lease.expiration_time + for lease in sf.iter_leases()] ) expiration = max(0, expiration_time - now) unpacked = uri.unpack_extension_readable(UEB_data) diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index b31195e8d..513b486f6 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -56,6 +56,45 @@ def storage_index_to_dir(storageindex): sia = si_b2a(storageindex) return os.path.join(sia[:2], sia) +class LeaseInfo: + def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None, + expiration_time=None, nodeid=None): + self.owner_num = owner_num + self.renew_secret = renew_secret + self.cancel_secret = cancel_secret + self.expiration_time = expiration_time + if nodeid is not None: + assert isinstance(nodeid, str) + assert len(nodeid) == 20 + self.nodeid = nodeid + + def from_immutable_data(self, data): + (self.owner_num, + self.renew_secret, + self.cancel_secret, + self.expiration_time) = struct.unpack(">L32s32sL", data) + self.nodeid = None + return self + def to_immutable_data(self): + return struct.pack(">L32s32sL", + self.owner_num, + self.renew_secret, self.cancel_secret, + int(self.expiration_time)) + + def to_mutable_data(self): + return struct.pack(">LL32s32s20s", + self.owner_num, + int(self.expiration_time), + self.renew_secret, self.cancel_secret, + self.nodeid) + def from_mutable_data(self, data): + (self.owner_num, + self.expiration_time, + self.renew_secret, self.cancel_secret, + self.nodeid) = struct.unpack(">LL32s32s20s", data) + return self + + class ShareFile: LEASE_SIZE = struct.calcsize(">L32s32sL") @@ -69,6 +108,9 @@ class ShareFile: self._data_offset = 0xc self._lease_offset = 0xc + self._size + def unlink(self): + os.unlink(self.home) + def read_share_data(self, offset, length): precondition(offset >= 0) precondition(offset+length <= self._size) @@ -88,13 +130,10 @@ class ShareFile: f.close() def _write_lease_record(self, f, lease_number, lease_info): - (owner_num, renew_secret, cancel_secret, expiration_time) = lease_info offset = self._lease_offset + lease_number * self.LEASE_SIZE f.seek(offset) assert f.tell() == offset - f.write(struct.pack(">L32s32sL", - owner_num, renew_secret, cancel_secret, - int(expiration_time))) + f.write(lease_info.to_immutable_data()) def _read_num_leases(self, f): f.seek(0x08) @@ -117,7 +156,7 @@ class ShareFile: for i in range(num_leases): data = f.read(self.LEASE_SIZE) if data: - yield struct.unpack(">L32s32sL", data) + yield LeaseInfo().from_immutable_data(data) def add_lease(self, lease_info): f = open(self.home, 'rb+') @@ -127,36 +166,39 @@ class ShareFile: f.close() def renew_lease(self, renew_secret, new_expire_time): - for i,(on,rs,cs,et) in enumerate(self.iter_leases()): - if rs == renew_secret: + for i,lease in enumerate(self.iter_leases()): + if lease.renew_secret == renew_secret: # yup. See if we need to update the owner time. - if new_expire_time > et: + if new_expire_time > lease.expiration_time: # yes - new_lease = (on,rs,cs,new_expire_time) + lease.expiration_time = new_expire_time f = open(self.home, 'rb+') - self._write_lease_record(f, i, new_lease) + self._write_lease_record(f, i, lease) f.close() return raise IndexError("unable to renew non-existent lease") def add_or_renew_lease(self, lease_info): - owner_num, renew_secret, cancel_secret, expire_time = lease_info try: - self.renew_lease(renew_secret, expire_time) + self.renew_lease(lease_info.renew_secret, + lease_info.expiration_time) except IndexError: self.add_lease(lease_info) + def cancel_lease(self, cancel_secret): - """Remove a lease with the given cancel_secret. Return - (num_remaining_leases, space_freed). Raise IndexError if there was no - lease with the given cancel_secret.""" + """Remove a lease with the given cancel_secret. If the last lease is + cancelled, the file will be removed. Return the number of bytes that + were freed (by truncating the list of leases, and possibly by + deleting the file. Raise IndexError if there was no lease with the + given cancel_secret. + """ leases = list(self.iter_leases()) num_leases = len(leases) num_leases_removed = 0 - for i,lease_info in enumerate(leases[:]): - (on,rs,cs,et) = lease_info - if cs == cancel_secret: + for i,lease in enumerate(leases[:]): + if lease.cancel_secret == cancel_secret: leases[i] = None num_leases_removed += 1 if not num_leases_removed: @@ -172,7 +214,11 @@ class ShareFile: self._write_num_leases(f, len(leases)) self._truncate_leases(f, len(leases)) f.close() - return len(leases), self.LEASE_SIZE * num_leases_removed + space_freed = self.LEASE_SIZE * num_leases_removed + if not len(leases): + space_freed += os.stat(self.home)[stat.ST_SIZE] + self.unlink() + return space_freed class BucketWriter(Referenceable): @@ -365,6 +411,9 @@ class MutableShareFile: # extra leases go here, none at creation f.close() + def unlink(self): + os.unlink(self.home) + def _read_data_length(self, f): f.seek(self.DATA_LENGTH_OFFSET) (data_length,) = struct.unpack(">Q", f.read(8)) @@ -457,8 +506,6 @@ class MutableShareFile: return def _write_lease_record(self, f, lease_number, lease_info): - (ownerid, expiration_time, - renew_secret, cancel_secret, nodeid) = lease_info extra_lease_offset = self._read_extra_lease_offset(f) num_extra_leases = self._read_num_extra_leases(f) if lease_number < 4: @@ -475,12 +522,10 @@ class MutableShareFile: + (lease_number-4)*self.LEASE_SIZE) f.seek(offset) assert f.tell() == offset - f.write(struct.pack(">LL32s32s20s", - ownerid, int(expiration_time), - renew_secret, cancel_secret, nodeid)) + f.write(lease_info.to_mutable_data()) def _read_lease_record(self, f, lease_number): - # returns a 5-tuple of lease info, or None + # returns a LeaseInfo instance, or None extra_lease_offset = self._read_extra_lease_offset(f) num_extra_leases = self._read_num_extra_leases(f) if lease_number < 4: @@ -494,10 +539,8 @@ class MutableShareFile: f.seek(offset) assert f.tell() == offset data = f.read(self.LEASE_SIZE) - lease_info = struct.unpack(">LL32s32s20s", data) - (ownerid, expiration_time, - renew_secret, cancel_secret, nodeid) = lease_info - if ownerid == 0: + lease_info = LeaseInfo().from_mutable_data(data) + if lease_info.owner_num == 0: return None return lease_info @@ -546,16 +589,16 @@ class MutableShareFile: def renew_lease(self, renew_secret, new_expire_time): accepting_nodeids = set() f = open(self.home, 'rb+') - for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f): - if rs == renew_secret: + for (leasenum,lease) in self._enumerate_leases(f): + if lease.renew_secret == renew_secret: # yup. See if we need to update the owner time. - if new_expire_time > et: + if new_expire_time > lease.expiration_time: # yes - new_lease = (oid,new_expire_time,rs,cs,anid) - self._write_lease_record(f, leasenum, new_lease) + lease.expiration_time = new_expire_time + self._write_lease_record(f, leasenum, lease) f.close() return - accepting_nodeids.add(anid) + accepting_nodeids.add(lease.nodeid) f.close() # Return the accepting_nodeids set, to give the client a chance to # update the leases on a share which has been migrated from its @@ -568,25 +611,31 @@ class MutableShareFile: raise IndexError(msg) def add_or_renew_lease(self, lease_info): - ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info try: - self.renew_lease(renew_secret, expire_time) + self.renew_lease(lease_info.renew_secret, + lease_info.expiration_time) except IndexError: self.add_lease(lease_info) def cancel_lease(self, cancel_secret): - """Remove any leases with the given cancel_secret. Return - (num_remaining_leases, space_freed). Raise IndexError if there was no - lease with the given cancel_secret.""" + """Remove any leases with the given cancel_secret. If the last lease + is cancelled, the file will be removed. Return the number of bytes + that were freed (by truncating the list of leases, and possibly by + deleting the file. Raise IndexError if there was no lease with the + given cancel_secret.""" accepting_nodeids = set() modified = 0 remaining = 0 - blank_lease = (0, 0, "\x00"*32, "\x00"*32, "\x00"*20) + blank_lease = LeaseInfo(owner_num=0, + renew_secret="\x00"*32, + cancel_secret="\x00"*32, + expiration_time=0, + nodeid="\x00"*20) f = open(self.home, 'rb+') - for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f): - accepting_nodeids.add(anid) - if cs == cancel_secret: + for (leasenum,lease) in self._enumerate_leases(f): + accepting_nodeids.add(lease.nodeid) + if lease.cancel_secret == cancel_secret: self._write_lease_record(f, leasenum, blank_lease) modified += 1 else: @@ -594,7 +643,11 @@ class MutableShareFile: if modified: freed_space = self._pack_leases(f) f.close() - return (remaining, freed_space) + if not remaining: + freed_space += os.stat(self.home)[stat.ST_SIZE] + self.unlink() + return freed_space + msg = ("Unable to cancel non-existent lease. I have leases " "accepted by nodeids: ") msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid)) @@ -647,6 +700,16 @@ class MutableShareFile: (idlib.nodeid_b2a(write_enabler_nodeid),) raise BadWriteEnablerError(msg) + def update_write_enabler(self, old_write_enabler, new_write_enabler, + my_nodeid, si_s): + self.check_write_enabler(old_write_enabler, si_s) + f = open(self.home, 'rb+') + f.seek(0) + header = struct.pack(">32s20s32s", + self.MAGIC, my_nodeid, new_write_enabler) + f.write(header) + f.close() + def check_testv(self, testv): test_good = True f = open(self.home, 'rb+') @@ -839,7 +902,9 @@ class StorageServer(service.MultiService, Referenceable): # separate database. Note that the lease should not be added until # the BucketWrite has been closed. expire_time = time.time() + 31*24*60*60 - lease_info = (owner_num, renew_secret, cancel_secret, expire_time) + lease_info = LeaseInfo(owner_num, + renew_secret, cancel_secret, + expire_time, self.my_nodeid) space_per_bucket = allocated_size no_limits = self.sizelimit is None @@ -893,13 +958,8 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("allocate", time.time() - start) return alreadygot, bucketwriters - def remote_renew_lease(self, storage_index, renew_secret): - start = time.time() - self.count("renew") - new_expire_time = time.time() + 31*24*60*60 - found_buckets = False + def _iter_share_files(self, storage_index): for shnum, filename in self._get_bucket_shares(storage_index): - found_buckets = True f = open(filename, 'rb') header = f.read(32) f.close() @@ -911,7 +971,36 @@ class StorageServer(service.MultiService, Referenceable): elif header[:4] == struct.pack(">L", 1): sf = ShareFile(filename) else: - pass # non-sharefile + continue # non-sharefile + yield sf + + def remote_add_lease(self, storage_index, renew_secret, cancel_secret, + owner_num=0): + start = time.time() + self.count("add-lease") + new_expire_time = time.time() + 31*24*60*60 + lease_info = LeaseInfo(owner_num, + renew_secret, cancel_secret, + new_expire_time, self.my_nodeid) + found_buckets = False + for sf in self._iter_share_files(storage_index): + found_buckets = True + # note: if the share has been migrated, the renew_lease() + # call will throw an exception, with information to help the + # client update the lease. + sf.add_or_renew_lease(lease_info) + self.add_latency("add-lease", time.time() - start) + if not found_buckets: + raise IndexError("no such storage index to do add-lease") + + + def remote_renew_lease(self, storage_index, renew_secret): + start = time.time() + self.count("renew") + new_expire_time = time.time() + 31*24*60*60 + found_buckets = False + for sf in self._iter_share_files(storage_index): + found_buckets = True sf.renew_lease(renew_secret, new_expire_time) self.add_latency("renew", time.time() - start) if not found_buckets: @@ -920,49 +1009,32 @@ class StorageServer(service.MultiService, Referenceable): def remote_cancel_lease(self, storage_index, cancel_secret): start = time.time() self.count("cancel") - storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index)) - remaining_files = 0 total_space_freed = 0 found_buckets = False - for shnum, filename in self._get_bucket_shares(storage_index): + for sf in self._iter_share_files(storage_index): # note: if we can't find a lease on one share, we won't bother # looking in the others. Unless something broke internally # (perhaps we ran out of disk space while adding a lease), the # leases on all shares will be identical. found_buckets = True - f = open(filename, 'rb') - header = f.read(32) - f.close() - if header[:32] == MutableShareFile.MAGIC: - sf = MutableShareFile(filename, self) - # note: if the share has been migrated, the renew_lease() - # call will throw an exception, with information to help the - # client update the lease. - elif header[:4] == struct.pack(">L", 1): - sf = ShareFile(filename) - else: - pass # non-sharefile - # this raises IndexError if the lease wasn't present - remaining_leases, space_freed = sf.cancel_lease(cancel_secret) - total_space_freed += space_freed - if remaining_leases: - remaining_files += 1 - else: - # now remove the sharefile. We'll almost certainly be - # removing the entire directory soon. - filelen = os.stat(filename)[stat.ST_SIZE] - os.unlink(filename) - total_space_freed += filelen - if not remaining_files: - fileutil.rm_dir(storagedir) + # this raises IndexError if the lease wasn't present XXXX + total_space_freed += sf.cancel_lease(cancel_secret) + + if found_buckets: + storagedir = os.path.join(self.sharedir, + storage_index_to_dir(storage_index)) + if not os.listdir(storagedir): + os.rmdir(storagedir) + if self.consumed is not None: self.consumed -= total_space_freed if self.stats_provider: - self.stats_provider.count('storage_server.bytes_freed', total_space_freed) + self.stats_provider.count('storage_server.bytes_freed', + total_space_freed) self.add_latency("cancel", time.time() - start) if not found_buckets: - raise IndexError("no such lease to cancel") + raise IndexError("no such storage index") def bucket_writer_closed(self, bw, consumed_size): if self.consumed is not None: @@ -1077,10 +1149,9 @@ class StorageServer(service.MultiService, Referenceable): # and update the leases on all shares ownerid = 1 # TODO expire_time = time.time() + 31*24*60*60 # one month - my_nodeid = self.my_nodeid - anid = my_nodeid - lease_info = (ownerid, expire_time, renew_secret, cancel_secret, - anid) + lease_info = LeaseInfo(ownerid, + renew_secret, cancel_secret, + expire_time, self.my_nodeid) for share in shares.values(): share.add_or_renew_lease(lease_info) @@ -1125,6 +1196,14 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("readv", time.time() - start) return datavs + def remote_update_write_enabler(self, storage_index, + old_write_enabler, new_write_enabler): + si_s = si_b2a(storage_index) + for sf in self._iter_share_files(storage_index): + if not isinstance(sf, MutableShareFile): + continue + sf.update_write_enabler(old_write_enabler, new_write_enabler, + self.my_nodeid, si_s) # the code before here runs on the storage server, not the client diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index fca443a65..d660caa75 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -8,7 +8,7 @@ from allmydata import interfaces from allmydata.util import fileutil, hashutil from allmydata.storage import BucketWriter, BucketReader, \ WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \ - storage_index_to_dir, DataTooLargeError + storage_index_to_dir, DataTooLargeError, LeaseInfo from allmydata.interfaces import BadWriteEnablerError from allmydata.test.common import LoggingServiceParent @@ -56,7 +56,8 @@ class Bucket(unittest.TestCase): renew_secret = os.urandom(32) cancel_secret = os.urandom(32) expiration_time = time.time() + 5000 - return (owner_num, renew_secret, cancel_secret, expiration_time) + return LeaseInfo(owner_num, renew_secret, cancel_secret, + expiration_time, "\x00" * 20) def test_create(self): incoming, final = self.make_workdir("test_create") @@ -109,7 +110,8 @@ class BucketProxy(unittest.TestCase): renew_secret = os.urandom(32) cancel_secret = os.urandom(32) expiration_time = time.time() + 5000 - return (owner_num, renew_secret, cancel_secret, expiration_time) + return LeaseInfo(owner_num, renew_secret, cancel_secret, + expiration_time, "\x00" * 20) def bucket_writer_closed(self, bw, consumed): pass @@ -228,6 +230,7 @@ class Server(unittest.TestCase): workdir = self.workdir(name) ss = StorageServer(workdir, sizelimit, stats_provider=FakeStatsProvider()) + ss.setNodeID("\x00" * 20) ss.setServiceParent(self.sparent) return ss @@ -450,7 +453,7 @@ class Server(unittest.TestCase): leases = list(ss.get_leases("si0")) self.failUnlessEqual(len(leases), 1) - self.failUnlessEqual(set([l[1] for l in leases]), set([rs0])) + self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0])) rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()), hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())) @@ -469,7 +472,7 @@ class Server(unittest.TestCase): leases = list(ss.get_leases("si1")) self.failUnlessEqual(len(leases), 2) - self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2])) + self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2])) # check that si0 is readable readers = ss.remote_get_buckets("si0") @@ -505,7 +508,7 @@ class Server(unittest.TestCase): leases = list(ss.get_leases("si1")) self.failUnlessEqual(len(leases), 1) - self.failUnlessEqual(set([l[1] for l in leases]), set([rs2])) + self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2])) ss.remote_renew_lease("si1", rs2) # cancelling the second should make it go away @@ -549,6 +552,7 @@ class Server(unittest.TestCase): def test_readonly(self): workdir = self.workdir("test_readonly") ss = StorageServer(workdir, readonly_storage=True) + ss.setNodeID("\x00" * 20) ss.setServiceParent(self.sparent) canary = FakeCanary() @@ -560,6 +564,7 @@ class Server(unittest.TestCase): # discard is really only used for other tests, but we test it anyways workdir = self.workdir("test_discard") ss = StorageServer(workdir, discard_storage=True) + ss.setNodeID("\x00" * 20) ss.setServiceParent(self.sparent) canary = FakeCanary() @@ -594,7 +599,7 @@ class MutableServer(unittest.TestCase): workdir = self.workdir(name) ss = StorageServer(workdir, sizelimit) ss.setServiceParent(self.sparent) - ss.setNodeID("\x00" * 32) + ss.setNodeID("\x00" * 20) return ss def test_create(self): @@ -925,17 +930,28 @@ class MutableServer(unittest.TestCase): 1: ["1"*10], 2: ["2"*10]}) - def compare_leases_without_timestamps(self, a, b): - self.failUnlessEqual(len(a), len(b)) - for i in range(len(a)): - (num_a, (ownerid_a, expiration_time_a, - renew_secret_a, cancel_secret_a, nodeid_a)) = a[i] - (num_b, (ownerid_b, expiration_time_b, - renew_secret_b, cancel_secret_b, nodeid_b)) = b[i] - self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a, - cancel_secret_a, nodeid_a), - (num_b, ownerid_b, renew_secret_b, - cancel_secret_b, nodeid_b) ) + def compare_leases_without_timestamps(self, leases_a, leases_b): + self.failUnlessEqual(len(leases_a), len(leases_b)) + for i in range(len(leases_a)): + num_a, a = leases_a[i] + num_b, b = leases_b[i] + self.failUnlessEqual(num_a, num_b) + self.failUnlessEqual(a.owner_num, b.owner_num) + self.failUnlessEqual(a.renew_secret, b.renew_secret) + self.failUnlessEqual(a.cancel_secret, b.cancel_secret) + self.failUnlessEqual(a.nodeid, b.nodeid) + + def compare_leases(self, leases_a, leases_b): + self.failUnlessEqual(len(leases_a), len(leases_b)) + for i in range(len(leases_a)): + num_a, a = leases_a[i] + num_b, b = leases_b[i] + self.failUnlessEqual(num_a, num_b) + self.failUnlessEqual(a.owner_num, b.owner_num) + self.failUnlessEqual(a.renew_secret, b.renew_secret) + self.failUnlessEqual(a.cancel_secret, b.cancel_secret) + self.failUnlessEqual(a.nodeid, b.nodeid) + self.failUnlessEqual(a.expiration_time, b.expiration_time) def test_leases(self): ss = self.create("test_leases", sizelimit=1000*1000) @@ -1002,20 +1018,25 @@ class MutableServer(unittest.TestCase): all_leases = s0.debug_get_leases() # renewing with a bogus token should prompt an error message - # TODO: examine the exception thus raised, make sure the old nodeid - # is present, to provide for share migration - self.failUnlessRaises(IndexError, - ss.remote_renew_lease, "si1", - secrets(20)[1]) + # examine the exception thus raised, make sure the old nodeid is + # present, to provide for share migration + e = self.failUnlessRaises(IndexError, + ss.remote_renew_lease, "si1", + secrets(20)[1]) + e_s = str(e) + self.failUnless("Unable to renew non-existent lease" in e_s) + self.failUnless("I have leases accepted by nodeids:" in e_s) + self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s) + # same for cancelling self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si1", secrets(20)[2]) - self.failUnlessEqual(all_leases, s0.debug_get_leases()) + self.compare_leases(all_leases, s0.debug_get_leases()) # reading shares should not modify the timestamp read("si1", [], [(0,200)]) - self.failUnlessEqual(all_leases, s0.debug_get_leases()) + self.compare_leases(all_leases, s0.debug_get_leases()) write("si1", secrets(0), {0: ([], [(200, "make me bigger")], None)}, []) @@ -1056,6 +1077,43 @@ class MutableServer(unittest.TestCase): self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si2", "nonsecret") + def test_update_write_enabler(self): + ss = self.create("test_update_write_enabler", sizelimit=1000*1000) + secrets = ( self.write_enabler("we1"), + self.renew_secret("we1-0"), + self.cancel_secret("we1-0") ) + old_write_enabler = secrets[0] + new_write_enabler = self.write_enabler("we2") + new_secrets = (new_write_enabler, secrets[1], secrets[2]) + + data = "".join([ ("%d" % i) * 10 for i in range(10) ]) + write = ss.remote_slot_testv_and_readv_and_writev + read = ss.remote_slot_readv + update_write_enabler = ss.remote_update_write_enabler + + rc = write("si1", secrets, {0: ([], [(0,data)], None)}, []) + self.failUnlessEqual(rc, (True, {})) + + rc = write("si1", secrets, {0: ([], [(1,data)], None)}, []) + self.failUnlessEqual(rc[0], True) + + f = self.failUnlessRaises(BadWriteEnablerError, + write, "si1", new_secrets, + {}, []) + self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f) + ss.setNodeID("\xff" * 20) + + rc = update_write_enabler("si1", old_write_enabler, new_write_enabler) + self.failUnlessEqual(rc, None) + + f = self.failUnlessRaises(BadWriteEnablerError, + write, "si1", secrets, + {}, []) + self.failUnless("The write enabler was recorded by nodeid '77777777777777777777777777777777'." in f, f) + + rc = write("si1", new_secrets, {0: ([], [(2,data)], None)}, []) + self.failUnlessEqual(rc[0], True) + class Stats(unittest.TestCase): @@ -1072,6 +1130,7 @@ class Stats(unittest.TestCase): def create(self, name, sizelimit=None): workdir = self.workdir(name) ss = StorageServer(workdir, sizelimit) + ss.setNodeID("\x00" * 20) ss.setServiceParent(self.sparent) return ss