mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2024-12-20 13:33:09 +00:00
storage: add add_lease/update_write_enabler to remote API, revamp lease handling
This commit is contained in:
parent
de8ce0cb5b
commit
60725ed065
@ -21,6 +21,7 @@ URIExtensionData = StringConstraint(1000)
|
||||
Number = IntegerConstraint(8) # 2**(8*8) == 16EiB ~= 18e18 ~= 18 exabytes
|
||||
Offset = Number
|
||||
ReadSize = int # the 'int' constraint is 2**31 == 2Gib -- large files are processed in not-so-large increments
|
||||
WriteEnablerSecret = Hash # used to protect mutable bucket modifications
|
||||
LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
|
||||
LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
|
||||
|
||||
@ -110,11 +111,21 @@ class RIStorageServer(RemoteInterface):
|
||||
return TupleOf(SetOf(int, maxLength=MAX_BUCKETS),
|
||||
DictOf(int, RIBucketWriter, maxKeys=MAX_BUCKETS))
|
||||
|
||||
def add_lease(storage_index=StorageIndex,
|
||||
renew_secret=LeaseRenewSecret,
|
||||
cancel_secret=LeaseCancelSecret):
|
||||
"""
|
||||
Add a new lease on the given bucket. If the renew_secret matches an
|
||||
existing lease, that lease will be renewed instead.
|
||||
"""
|
||||
return None
|
||||
|
||||
def renew_lease(storage_index=StorageIndex, renew_secret=LeaseRenewSecret):
|
||||
"""
|
||||
Renew the lease on a given bucket. Some networks will use this, some
|
||||
will not.
|
||||
"""
|
||||
return None
|
||||
|
||||
def cancel_lease(storage_index=StorageIndex,
|
||||
cancel_secret=LeaseCancelSecret):
|
||||
@ -122,6 +133,7 @@ class RIStorageServer(RemoteInterface):
|
||||
Cancel the lease on a given bucket. If this was the last lease on the
|
||||
bucket, the bucket will be deleted.
|
||||
"""
|
||||
return None
|
||||
|
||||
def get_buckets(storage_index=StorageIndex):
|
||||
return DictOf(int, RIBucketReader, maxKeys=MAX_BUCKETS)
|
||||
@ -136,7 +148,9 @@ class RIStorageServer(RemoteInterface):
|
||||
return DictOf(int, ReadData) # shnum -> results
|
||||
|
||||
def slot_testv_and_readv_and_writev(storage_index=StorageIndex,
|
||||
secrets=TupleOf(Hash, Hash, Hash),
|
||||
secrets=TupleOf(WriteEnablerSecret,
|
||||
LeaseRenewSecret,
|
||||
LeaseCancelSecret),
|
||||
tw_vectors=TestAndWriteVectorsForShares,
|
||||
r_vector=ReadVector,
|
||||
):
|
||||
@ -200,8 +214,9 @@ class RIStorageServer(RemoteInterface):
|
||||
for each element of the read vector.
|
||||
|
||||
If the write_enabler is wrong, this will raise BadWriteEnablerError.
|
||||
To enable share migration, the exception will have the nodeid used
|
||||
for the old write enabler embedded in it, in the following string::
|
||||
To enable share migration (using update_write_enabler), the exception
|
||||
will have the nodeid used for the old write enabler embedded in it,
|
||||
in the following string::
|
||||
|
||||
The write enabler was recorded by nodeid '%s'.
|
||||
|
||||
@ -211,6 +226,24 @@ class RIStorageServer(RemoteInterface):
|
||||
"""
|
||||
return TupleOf(bool, DictOf(int, ReadData))
|
||||
|
||||
def update_write_enabler(storage_index=StorageIndex,
|
||||
old_write_enabler=WriteEnablerSecret,
|
||||
new_write_enabler=WriteEnablerSecret):
|
||||
"""
|
||||
Replace the write-enabler on a given bucket. This is used when a
|
||||
share has been moved from one server to another, causing the secret
|
||||
(which is scoped to a given server's nodeid) to become invalid. The
|
||||
client discovers this when it gets a BadWriteEnablerError, and the
|
||||
string body of the exception will contain a message that includes the
|
||||
nodeid that was used for the old secret.
|
||||
|
||||
The client should compute the old write-enabler secret, and send it
|
||||
in conjunction with the new one. The server will then update the
|
||||
share to record the new write-enabler instead of the old one. The
|
||||
client can then retry its writev call.
|
||||
"""
|
||||
return None
|
||||
|
||||
class IStorageBucketWriter(Interface):
|
||||
def put_block(segmentnum=int, data=ShareData):
|
||||
"""@param data: For most segments, this data will be 'blocksize'
|
||||
|
@ -88,10 +88,9 @@ def dump_share(config, out=sys.stdout, err=sys.stderr):
|
||||
leases = list(f.iter_leases())
|
||||
if leases:
|
||||
for i,lease in enumerate(leases):
|
||||
(owner_num, renew_secret, cancel_secret, expiration_time) = lease
|
||||
when = format_expiration_time(expiration_time)
|
||||
print >>out, " Lease #%d: owner=%d, expire in %s" % (i, owner_num,
|
||||
when)
|
||||
when = format_expiration_time(lease.expiration_time)
|
||||
print >>out, " Lease #%d: owner=%d, expire in %s" \
|
||||
% (i, lease.owner_num, when)
|
||||
else:
|
||||
print >>out, " No leases."
|
||||
|
||||
@ -137,15 +136,15 @@ def dump_mutable_share(config, out, err):
|
||||
print >>out, " container_size: %d" % container_size
|
||||
print >>out, " data_length: %d" % data_length
|
||||
if leases:
|
||||
for (leasenum, (oid,et,rs,cs,anid)) in leases:
|
||||
for (leasenum, lease) in leases:
|
||||
print >>out
|
||||
print >>out, " Lease #%d:" % leasenum
|
||||
print >>out, " ownerid: %d" % oid
|
||||
when = format_expiration_time(et)
|
||||
print >>out, " ownerid: %d" % lease.owner_num
|
||||
when = format_expiration_time(lease.expiration_time)
|
||||
print >>out, " expires in %s" % when
|
||||
print >>out, " renew_secret: %s" % base32.b2a(rs)
|
||||
print >>out, " cancel_secret: %s" % base32.b2a(cs)
|
||||
print >>out, " secrets are for nodeid: %s" % idlib.nodeid_b2a(anid)
|
||||
print >>out, " renew_secret: %s" % base32.b2a(lease.renew_secret)
|
||||
print >>out, " cancel_secret: %s" % base32.b2a(lease.cancel_secret)
|
||||
print >>out, " secrets are for nodeid: %s" % idlib.nodeid_b2a(lease.nodeid)
|
||||
else:
|
||||
print >>out, "No leases."
|
||||
print >>out
|
||||
@ -402,10 +401,8 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out, err):
|
||||
extra_lease_offset = m._read_extra_lease_offset(f)
|
||||
container_size = extra_lease_offset - m.DATA_OFFSET
|
||||
leases = list(m._enumerate_leases(f))
|
||||
expiration_time = min( [expiration_time
|
||||
for (leasenum,
|
||||
(ownerid, expiration_time, rs, cs, nodeid))
|
||||
in leases] )
|
||||
expiration_time = min( [lease[1].expiration_time
|
||||
for lease in leases] )
|
||||
expiration = max(0, expiration_time - now)
|
||||
|
||||
share_type = "unknown"
|
||||
@ -448,9 +445,8 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out, err):
|
||||
length = struct.unpack(">L", sf.read_share_data(seek, 4))[0]
|
||||
seek += 4
|
||||
UEB_data = sf.read_share_data(seek, length)
|
||||
expiration_time = min( [expiration_time
|
||||
for (ownerid, rs, cs, expiration_time)
|
||||
in sf.iter_leases()] )
|
||||
expiration_time = min( [lease.expiration_time
|
||||
for lease in sf.iter_leases()] )
|
||||
expiration = max(0, expiration_time - now)
|
||||
|
||||
unpacked = uri.unpack_extension_readable(UEB_data)
|
||||
|
@ -56,6 +56,45 @@ def storage_index_to_dir(storageindex):
|
||||
sia = si_b2a(storageindex)
|
||||
return os.path.join(sia[:2], sia)
|
||||
|
||||
class LeaseInfo:
|
||||
def __init__(self, owner_num=None, renew_secret=None, cancel_secret=None,
|
||||
expiration_time=None, nodeid=None):
|
||||
self.owner_num = owner_num
|
||||
self.renew_secret = renew_secret
|
||||
self.cancel_secret = cancel_secret
|
||||
self.expiration_time = expiration_time
|
||||
if nodeid is not None:
|
||||
assert isinstance(nodeid, str)
|
||||
assert len(nodeid) == 20
|
||||
self.nodeid = nodeid
|
||||
|
||||
def from_immutable_data(self, data):
|
||||
(self.owner_num,
|
||||
self.renew_secret,
|
||||
self.cancel_secret,
|
||||
self.expiration_time) = struct.unpack(">L32s32sL", data)
|
||||
self.nodeid = None
|
||||
return self
|
||||
def to_immutable_data(self):
|
||||
return struct.pack(">L32s32sL",
|
||||
self.owner_num,
|
||||
self.renew_secret, self.cancel_secret,
|
||||
int(self.expiration_time))
|
||||
|
||||
def to_mutable_data(self):
|
||||
return struct.pack(">LL32s32s20s",
|
||||
self.owner_num,
|
||||
int(self.expiration_time),
|
||||
self.renew_secret, self.cancel_secret,
|
||||
self.nodeid)
|
||||
def from_mutable_data(self, data):
|
||||
(self.owner_num,
|
||||
self.expiration_time,
|
||||
self.renew_secret, self.cancel_secret,
|
||||
self.nodeid) = struct.unpack(">LL32s32s20s", data)
|
||||
return self
|
||||
|
||||
|
||||
class ShareFile:
|
||||
LEASE_SIZE = struct.calcsize(">L32s32sL")
|
||||
|
||||
@ -69,6 +108,9 @@ class ShareFile:
|
||||
self._data_offset = 0xc
|
||||
self._lease_offset = 0xc + self._size
|
||||
|
||||
def unlink(self):
|
||||
os.unlink(self.home)
|
||||
|
||||
def read_share_data(self, offset, length):
|
||||
precondition(offset >= 0)
|
||||
precondition(offset+length <= self._size)
|
||||
@ -88,13 +130,10 @@ class ShareFile:
|
||||
f.close()
|
||||
|
||||
def _write_lease_record(self, f, lease_number, lease_info):
|
||||
(owner_num, renew_secret, cancel_secret, expiration_time) = lease_info
|
||||
offset = self._lease_offset + lease_number * self.LEASE_SIZE
|
||||
f.seek(offset)
|
||||
assert f.tell() == offset
|
||||
f.write(struct.pack(">L32s32sL",
|
||||
owner_num, renew_secret, cancel_secret,
|
||||
int(expiration_time)))
|
||||
f.write(lease_info.to_immutable_data())
|
||||
|
||||
def _read_num_leases(self, f):
|
||||
f.seek(0x08)
|
||||
@ -117,7 +156,7 @@ class ShareFile:
|
||||
for i in range(num_leases):
|
||||
data = f.read(self.LEASE_SIZE)
|
||||
if data:
|
||||
yield struct.unpack(">L32s32sL", data)
|
||||
yield LeaseInfo().from_immutable_data(data)
|
||||
|
||||
def add_lease(self, lease_info):
|
||||
f = open(self.home, 'rb+')
|
||||
@ -127,36 +166,39 @@ class ShareFile:
|
||||
f.close()
|
||||
|
||||
def renew_lease(self, renew_secret, new_expire_time):
|
||||
for i,(on,rs,cs,et) in enumerate(self.iter_leases()):
|
||||
if rs == renew_secret:
|
||||
for i,lease in enumerate(self.iter_leases()):
|
||||
if lease.renew_secret == renew_secret:
|
||||
# yup. See if we need to update the owner time.
|
||||
if new_expire_time > et:
|
||||
if new_expire_time > lease.expiration_time:
|
||||
# yes
|
||||
new_lease = (on,rs,cs,new_expire_time)
|
||||
lease.expiration_time = new_expire_time
|
||||
f = open(self.home, 'rb+')
|
||||
self._write_lease_record(f, i, new_lease)
|
||||
self._write_lease_record(f, i, lease)
|
||||
f.close()
|
||||
return
|
||||
raise IndexError("unable to renew non-existent lease")
|
||||
|
||||
def add_or_renew_lease(self, lease_info):
|
||||
owner_num, renew_secret, cancel_secret, expire_time = lease_info
|
||||
try:
|
||||
self.renew_lease(renew_secret, expire_time)
|
||||
self.renew_lease(lease_info.renew_secret,
|
||||
lease_info.expiration_time)
|
||||
except IndexError:
|
||||
self.add_lease(lease_info)
|
||||
|
||||
|
||||
def cancel_lease(self, cancel_secret):
|
||||
"""Remove a lease with the given cancel_secret. Return
|
||||
(num_remaining_leases, space_freed). Raise IndexError if there was no
|
||||
lease with the given cancel_secret."""
|
||||
"""Remove a lease with the given cancel_secret. If the last lease is
|
||||
cancelled, the file will be removed. Return the number of bytes that
|
||||
were freed (by truncating the list of leases, and possibly by
|
||||
deleting the file. Raise IndexError if there was no lease with the
|
||||
given cancel_secret.
|
||||
"""
|
||||
|
||||
leases = list(self.iter_leases())
|
||||
num_leases = len(leases)
|
||||
num_leases_removed = 0
|
||||
for i,lease_info in enumerate(leases[:]):
|
||||
(on,rs,cs,et) = lease_info
|
||||
if cs == cancel_secret:
|
||||
for i,lease in enumerate(leases[:]):
|
||||
if lease.cancel_secret == cancel_secret:
|
||||
leases[i] = None
|
||||
num_leases_removed += 1
|
||||
if not num_leases_removed:
|
||||
@ -172,7 +214,11 @@ class ShareFile:
|
||||
self._write_num_leases(f, len(leases))
|
||||
self._truncate_leases(f, len(leases))
|
||||
f.close()
|
||||
return len(leases), self.LEASE_SIZE * num_leases_removed
|
||||
space_freed = self.LEASE_SIZE * num_leases_removed
|
||||
if not len(leases):
|
||||
space_freed += os.stat(self.home)[stat.ST_SIZE]
|
||||
self.unlink()
|
||||
return space_freed
|
||||
|
||||
|
||||
class BucketWriter(Referenceable):
|
||||
@ -365,6 +411,9 @@ class MutableShareFile:
|
||||
# extra leases go here, none at creation
|
||||
f.close()
|
||||
|
||||
def unlink(self):
|
||||
os.unlink(self.home)
|
||||
|
||||
def _read_data_length(self, f):
|
||||
f.seek(self.DATA_LENGTH_OFFSET)
|
||||
(data_length,) = struct.unpack(">Q", f.read(8))
|
||||
@ -457,8 +506,6 @@ class MutableShareFile:
|
||||
return
|
||||
|
||||
def _write_lease_record(self, f, lease_number, lease_info):
|
||||
(ownerid, expiration_time,
|
||||
renew_secret, cancel_secret, nodeid) = lease_info
|
||||
extra_lease_offset = self._read_extra_lease_offset(f)
|
||||
num_extra_leases = self._read_num_extra_leases(f)
|
||||
if lease_number < 4:
|
||||
@ -475,12 +522,10 @@ class MutableShareFile:
|
||||
+ (lease_number-4)*self.LEASE_SIZE)
|
||||
f.seek(offset)
|
||||
assert f.tell() == offset
|
||||
f.write(struct.pack(">LL32s32s20s",
|
||||
ownerid, int(expiration_time),
|
||||
renew_secret, cancel_secret, nodeid))
|
||||
f.write(lease_info.to_mutable_data())
|
||||
|
||||
def _read_lease_record(self, f, lease_number):
|
||||
# returns a 5-tuple of lease info, or None
|
||||
# returns a LeaseInfo instance, or None
|
||||
extra_lease_offset = self._read_extra_lease_offset(f)
|
||||
num_extra_leases = self._read_num_extra_leases(f)
|
||||
if lease_number < 4:
|
||||
@ -494,10 +539,8 @@ class MutableShareFile:
|
||||
f.seek(offset)
|
||||
assert f.tell() == offset
|
||||
data = f.read(self.LEASE_SIZE)
|
||||
lease_info = struct.unpack(">LL32s32s20s", data)
|
||||
(ownerid, expiration_time,
|
||||
renew_secret, cancel_secret, nodeid) = lease_info
|
||||
if ownerid == 0:
|
||||
lease_info = LeaseInfo().from_mutable_data(data)
|
||||
if lease_info.owner_num == 0:
|
||||
return None
|
||||
return lease_info
|
||||
|
||||
@ -546,16 +589,16 @@ class MutableShareFile:
|
||||
def renew_lease(self, renew_secret, new_expire_time):
|
||||
accepting_nodeids = set()
|
||||
f = open(self.home, 'rb+')
|
||||
for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
|
||||
if rs == renew_secret:
|
||||
for (leasenum,lease) in self._enumerate_leases(f):
|
||||
if lease.renew_secret == renew_secret:
|
||||
# yup. See if we need to update the owner time.
|
||||
if new_expire_time > et:
|
||||
if new_expire_time > lease.expiration_time:
|
||||
# yes
|
||||
new_lease = (oid,new_expire_time,rs,cs,anid)
|
||||
self._write_lease_record(f, leasenum, new_lease)
|
||||
lease.expiration_time = new_expire_time
|
||||
self._write_lease_record(f, leasenum, lease)
|
||||
f.close()
|
||||
return
|
||||
accepting_nodeids.add(anid)
|
||||
accepting_nodeids.add(lease.nodeid)
|
||||
f.close()
|
||||
# Return the accepting_nodeids set, to give the client a chance to
|
||||
# update the leases on a share which has been migrated from its
|
||||
@ -568,25 +611,31 @@ class MutableShareFile:
|
||||
raise IndexError(msg)
|
||||
|
||||
def add_or_renew_lease(self, lease_info):
|
||||
ownerid, expire_time, renew_secret, cancel_secret, anid = lease_info
|
||||
try:
|
||||
self.renew_lease(renew_secret, expire_time)
|
||||
self.renew_lease(lease_info.renew_secret,
|
||||
lease_info.expiration_time)
|
||||
except IndexError:
|
||||
self.add_lease(lease_info)
|
||||
|
||||
def cancel_lease(self, cancel_secret):
|
||||
"""Remove any leases with the given cancel_secret. Return
|
||||
(num_remaining_leases, space_freed). Raise IndexError if there was no
|
||||
lease with the given cancel_secret."""
|
||||
"""Remove any leases with the given cancel_secret. If the last lease
|
||||
is cancelled, the file will be removed. Return the number of bytes
|
||||
that were freed (by truncating the list of leases, and possibly by
|
||||
deleting the file. Raise IndexError if there was no lease with the
|
||||
given cancel_secret."""
|
||||
|
||||
accepting_nodeids = set()
|
||||
modified = 0
|
||||
remaining = 0
|
||||
blank_lease = (0, 0, "\x00"*32, "\x00"*32, "\x00"*20)
|
||||
blank_lease = LeaseInfo(owner_num=0,
|
||||
renew_secret="\x00"*32,
|
||||
cancel_secret="\x00"*32,
|
||||
expiration_time=0,
|
||||
nodeid="\x00"*20)
|
||||
f = open(self.home, 'rb+')
|
||||
for (leasenum,(oid,et,rs,cs,anid)) in self._enumerate_leases(f):
|
||||
accepting_nodeids.add(anid)
|
||||
if cs == cancel_secret:
|
||||
for (leasenum,lease) in self._enumerate_leases(f):
|
||||
accepting_nodeids.add(lease.nodeid)
|
||||
if lease.cancel_secret == cancel_secret:
|
||||
self._write_lease_record(f, leasenum, blank_lease)
|
||||
modified += 1
|
||||
else:
|
||||
@ -594,7 +643,11 @@ class MutableShareFile:
|
||||
if modified:
|
||||
freed_space = self._pack_leases(f)
|
||||
f.close()
|
||||
return (remaining, freed_space)
|
||||
if not remaining:
|
||||
freed_space += os.stat(self.home)[stat.ST_SIZE]
|
||||
self.unlink()
|
||||
return freed_space
|
||||
|
||||
msg = ("Unable to cancel non-existent lease. I have leases "
|
||||
"accepted by nodeids: ")
|
||||
msg += ",".join([("'%s'" % idlib.nodeid_b2a(anid))
|
||||
@ -647,6 +700,16 @@ class MutableShareFile:
|
||||
(idlib.nodeid_b2a(write_enabler_nodeid),)
|
||||
raise BadWriteEnablerError(msg)
|
||||
|
||||
def update_write_enabler(self, old_write_enabler, new_write_enabler,
|
||||
my_nodeid, si_s):
|
||||
self.check_write_enabler(old_write_enabler, si_s)
|
||||
f = open(self.home, 'rb+')
|
||||
f.seek(0)
|
||||
header = struct.pack(">32s20s32s",
|
||||
self.MAGIC, my_nodeid, new_write_enabler)
|
||||
f.write(header)
|
||||
f.close()
|
||||
|
||||
def check_testv(self, testv):
|
||||
test_good = True
|
||||
f = open(self.home, 'rb+')
|
||||
@ -839,7 +902,9 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
# separate database. Note that the lease should not be added until
|
||||
# the BucketWrite has been closed.
|
||||
expire_time = time.time() + 31*24*60*60
|
||||
lease_info = (owner_num, renew_secret, cancel_secret, expire_time)
|
||||
lease_info = LeaseInfo(owner_num,
|
||||
renew_secret, cancel_secret,
|
||||
expire_time, self.my_nodeid)
|
||||
|
||||
space_per_bucket = allocated_size
|
||||
no_limits = self.sizelimit is None
|
||||
@ -893,13 +958,8 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
self.add_latency("allocate", time.time() - start)
|
||||
return alreadygot, bucketwriters
|
||||
|
||||
def remote_renew_lease(self, storage_index, renew_secret):
|
||||
start = time.time()
|
||||
self.count("renew")
|
||||
new_expire_time = time.time() + 31*24*60*60
|
||||
found_buckets = False
|
||||
def _iter_share_files(self, storage_index):
|
||||
for shnum, filename in self._get_bucket_shares(storage_index):
|
||||
found_buckets = True
|
||||
f = open(filename, 'rb')
|
||||
header = f.read(32)
|
||||
f.close()
|
||||
@ -911,7 +971,36 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
elif header[:4] == struct.pack(">L", 1):
|
||||
sf = ShareFile(filename)
|
||||
else:
|
||||
pass # non-sharefile
|
||||
continue # non-sharefile
|
||||
yield sf
|
||||
|
||||
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
|
||||
owner_num=0):
|
||||
start = time.time()
|
||||
self.count("add-lease")
|
||||
new_expire_time = time.time() + 31*24*60*60
|
||||
lease_info = LeaseInfo(owner_num,
|
||||
renew_secret, cancel_secret,
|
||||
new_expire_time, self.my_nodeid)
|
||||
found_buckets = False
|
||||
for sf in self._iter_share_files(storage_index):
|
||||
found_buckets = True
|
||||
# note: if the share has been migrated, the renew_lease()
|
||||
# call will throw an exception, with information to help the
|
||||
# client update the lease.
|
||||
sf.add_or_renew_lease(lease_info)
|
||||
self.add_latency("add-lease", time.time() - start)
|
||||
if not found_buckets:
|
||||
raise IndexError("no such storage index to do add-lease")
|
||||
|
||||
|
||||
def remote_renew_lease(self, storage_index, renew_secret):
|
||||
start = time.time()
|
||||
self.count("renew")
|
||||
new_expire_time = time.time() + 31*24*60*60
|
||||
found_buckets = False
|
||||
for sf in self._iter_share_files(storage_index):
|
||||
found_buckets = True
|
||||
sf.renew_lease(renew_secret, new_expire_time)
|
||||
self.add_latency("renew", time.time() - start)
|
||||
if not found_buckets:
|
||||
@ -920,49 +1009,32 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
def remote_cancel_lease(self, storage_index, cancel_secret):
|
||||
start = time.time()
|
||||
self.count("cancel")
|
||||
storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
|
||||
|
||||
remaining_files = 0
|
||||
total_space_freed = 0
|
||||
found_buckets = False
|
||||
for shnum, filename in self._get_bucket_shares(storage_index):
|
||||
for sf in self._iter_share_files(storage_index):
|
||||
# note: if we can't find a lease on one share, we won't bother
|
||||
# looking in the others. Unless something broke internally
|
||||
# (perhaps we ran out of disk space while adding a lease), the
|
||||
# leases on all shares will be identical.
|
||||
found_buckets = True
|
||||
f = open(filename, 'rb')
|
||||
header = f.read(32)
|
||||
f.close()
|
||||
if header[:32] == MutableShareFile.MAGIC:
|
||||
sf = MutableShareFile(filename, self)
|
||||
# note: if the share has been migrated, the renew_lease()
|
||||
# call will throw an exception, with information to help the
|
||||
# client update the lease.
|
||||
elif header[:4] == struct.pack(">L", 1):
|
||||
sf = ShareFile(filename)
|
||||
else:
|
||||
pass # non-sharefile
|
||||
# this raises IndexError if the lease wasn't present
|
||||
remaining_leases, space_freed = sf.cancel_lease(cancel_secret)
|
||||
total_space_freed += space_freed
|
||||
if remaining_leases:
|
||||
remaining_files += 1
|
||||
else:
|
||||
# now remove the sharefile. We'll almost certainly be
|
||||
# removing the entire directory soon.
|
||||
filelen = os.stat(filename)[stat.ST_SIZE]
|
||||
os.unlink(filename)
|
||||
total_space_freed += filelen
|
||||
if not remaining_files:
|
||||
fileutil.rm_dir(storagedir)
|
||||
# this raises IndexError if the lease wasn't present XXXX
|
||||
total_space_freed += sf.cancel_lease(cancel_secret)
|
||||
|
||||
if found_buckets:
|
||||
storagedir = os.path.join(self.sharedir,
|
||||
storage_index_to_dir(storage_index))
|
||||
if not os.listdir(storagedir):
|
||||
os.rmdir(storagedir)
|
||||
|
||||
if self.consumed is not None:
|
||||
self.consumed -= total_space_freed
|
||||
if self.stats_provider:
|
||||
self.stats_provider.count('storage_server.bytes_freed', total_space_freed)
|
||||
self.stats_provider.count('storage_server.bytes_freed',
|
||||
total_space_freed)
|
||||
self.add_latency("cancel", time.time() - start)
|
||||
if not found_buckets:
|
||||
raise IndexError("no such lease to cancel")
|
||||
raise IndexError("no such storage index")
|
||||
|
||||
def bucket_writer_closed(self, bw, consumed_size):
|
||||
if self.consumed is not None:
|
||||
@ -1077,10 +1149,9 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
# and update the leases on all shares
|
||||
ownerid = 1 # TODO
|
||||
expire_time = time.time() + 31*24*60*60 # one month
|
||||
my_nodeid = self.my_nodeid
|
||||
anid = my_nodeid
|
||||
lease_info = (ownerid, expire_time, renew_secret, cancel_secret,
|
||||
anid)
|
||||
lease_info = LeaseInfo(ownerid,
|
||||
renew_secret, cancel_secret,
|
||||
expire_time, self.my_nodeid)
|
||||
for share in shares.values():
|
||||
share.add_or_renew_lease(lease_info)
|
||||
|
||||
@ -1125,6 +1196,14 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
self.add_latency("readv", time.time() - start)
|
||||
return datavs
|
||||
|
||||
def remote_update_write_enabler(self, storage_index,
|
||||
old_write_enabler, new_write_enabler):
|
||||
si_s = si_b2a(storage_index)
|
||||
for sf in self._iter_share_files(storage_index):
|
||||
if not isinstance(sf, MutableShareFile):
|
||||
continue
|
||||
sf.update_write_enabler(old_write_enabler, new_write_enabler,
|
||||
self.my_nodeid, si_s)
|
||||
|
||||
|
||||
# the code before here runs on the storage server, not the client
|
||||
|
@ -8,7 +8,7 @@ from allmydata import interfaces
|
||||
from allmydata.util import fileutil, hashutil
|
||||
from allmydata.storage import BucketWriter, BucketReader, \
|
||||
WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
|
||||
storage_index_to_dir, DataTooLargeError
|
||||
storage_index_to_dir, DataTooLargeError, LeaseInfo
|
||||
from allmydata.interfaces import BadWriteEnablerError
|
||||
from allmydata.test.common import LoggingServiceParent
|
||||
|
||||
@ -56,7 +56,8 @@ class Bucket(unittest.TestCase):
|
||||
renew_secret = os.urandom(32)
|
||||
cancel_secret = os.urandom(32)
|
||||
expiration_time = time.time() + 5000
|
||||
return (owner_num, renew_secret, cancel_secret, expiration_time)
|
||||
return LeaseInfo(owner_num, renew_secret, cancel_secret,
|
||||
expiration_time, "\x00" * 20)
|
||||
|
||||
def test_create(self):
|
||||
incoming, final = self.make_workdir("test_create")
|
||||
@ -109,7 +110,8 @@ class BucketProxy(unittest.TestCase):
|
||||
renew_secret = os.urandom(32)
|
||||
cancel_secret = os.urandom(32)
|
||||
expiration_time = time.time() + 5000
|
||||
return (owner_num, renew_secret, cancel_secret, expiration_time)
|
||||
return LeaseInfo(owner_num, renew_secret, cancel_secret,
|
||||
expiration_time, "\x00" * 20)
|
||||
|
||||
def bucket_writer_closed(self, bw, consumed):
|
||||
pass
|
||||
@ -228,6 +230,7 @@ class Server(unittest.TestCase):
|
||||
workdir = self.workdir(name)
|
||||
ss = StorageServer(workdir, sizelimit,
|
||||
stats_provider=FakeStatsProvider())
|
||||
ss.setNodeID("\x00" * 20)
|
||||
ss.setServiceParent(self.sparent)
|
||||
return ss
|
||||
|
||||
@ -450,7 +453,7 @@ class Server(unittest.TestCase):
|
||||
|
||||
leases = list(ss.get_leases("si0"))
|
||||
self.failUnlessEqual(len(leases), 1)
|
||||
self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
|
||||
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
|
||||
|
||||
rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
|
||||
hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
|
||||
@ -469,7 +472,7 @@ class Server(unittest.TestCase):
|
||||
|
||||
leases = list(ss.get_leases("si1"))
|
||||
self.failUnlessEqual(len(leases), 2)
|
||||
self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
|
||||
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
|
||||
|
||||
# check that si0 is readable
|
||||
readers = ss.remote_get_buckets("si0")
|
||||
@ -505,7 +508,7 @@ class Server(unittest.TestCase):
|
||||
|
||||
leases = list(ss.get_leases("si1"))
|
||||
self.failUnlessEqual(len(leases), 1)
|
||||
self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
|
||||
self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2]))
|
||||
|
||||
ss.remote_renew_lease("si1", rs2)
|
||||
# cancelling the second should make it go away
|
||||
@ -549,6 +552,7 @@ class Server(unittest.TestCase):
|
||||
def test_readonly(self):
|
||||
workdir = self.workdir("test_readonly")
|
||||
ss = StorageServer(workdir, readonly_storage=True)
|
||||
ss.setNodeID("\x00" * 20)
|
||||
ss.setServiceParent(self.sparent)
|
||||
|
||||
canary = FakeCanary()
|
||||
@ -560,6 +564,7 @@ class Server(unittest.TestCase):
|
||||
# discard is really only used for other tests, but we test it anyways
|
||||
workdir = self.workdir("test_discard")
|
||||
ss = StorageServer(workdir, discard_storage=True)
|
||||
ss.setNodeID("\x00" * 20)
|
||||
ss.setServiceParent(self.sparent)
|
||||
|
||||
canary = FakeCanary()
|
||||
@ -594,7 +599,7 @@ class MutableServer(unittest.TestCase):
|
||||
workdir = self.workdir(name)
|
||||
ss = StorageServer(workdir, sizelimit)
|
||||
ss.setServiceParent(self.sparent)
|
||||
ss.setNodeID("\x00" * 32)
|
||||
ss.setNodeID("\x00" * 20)
|
||||
return ss
|
||||
|
||||
def test_create(self):
|
||||
@ -925,17 +930,28 @@ class MutableServer(unittest.TestCase):
|
||||
1: ["1"*10],
|
||||
2: ["2"*10]})
|
||||
|
||||
def compare_leases_without_timestamps(self, a, b):
|
||||
self.failUnlessEqual(len(a), len(b))
|
||||
for i in range(len(a)):
|
||||
(num_a, (ownerid_a, expiration_time_a,
|
||||
renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
|
||||
(num_b, (ownerid_b, expiration_time_b,
|
||||
renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
|
||||
self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
|
||||
cancel_secret_a, nodeid_a),
|
||||
(num_b, ownerid_b, renew_secret_b,
|
||||
cancel_secret_b, nodeid_b) )
|
||||
def compare_leases_without_timestamps(self, leases_a, leases_b):
|
||||
self.failUnlessEqual(len(leases_a), len(leases_b))
|
||||
for i in range(len(leases_a)):
|
||||
num_a, a = leases_a[i]
|
||||
num_b, b = leases_b[i]
|
||||
self.failUnlessEqual(num_a, num_b)
|
||||
self.failUnlessEqual(a.owner_num, b.owner_num)
|
||||
self.failUnlessEqual(a.renew_secret, b.renew_secret)
|
||||
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
|
||||
self.failUnlessEqual(a.nodeid, b.nodeid)
|
||||
|
||||
def compare_leases(self, leases_a, leases_b):
|
||||
self.failUnlessEqual(len(leases_a), len(leases_b))
|
||||
for i in range(len(leases_a)):
|
||||
num_a, a = leases_a[i]
|
||||
num_b, b = leases_b[i]
|
||||
self.failUnlessEqual(num_a, num_b)
|
||||
self.failUnlessEqual(a.owner_num, b.owner_num)
|
||||
self.failUnlessEqual(a.renew_secret, b.renew_secret)
|
||||
self.failUnlessEqual(a.cancel_secret, b.cancel_secret)
|
||||
self.failUnlessEqual(a.nodeid, b.nodeid)
|
||||
self.failUnlessEqual(a.expiration_time, b.expiration_time)
|
||||
|
||||
def test_leases(self):
|
||||
ss = self.create("test_leases", sizelimit=1000*1000)
|
||||
@ -1002,20 +1018,25 @@ class MutableServer(unittest.TestCase):
|
||||
all_leases = s0.debug_get_leases()
|
||||
# renewing with a bogus token should prompt an error message
|
||||
|
||||
# TODO: examine the exception thus raised, make sure the old nodeid
|
||||
# is present, to provide for share migration
|
||||
self.failUnlessRaises(IndexError,
|
||||
ss.remote_renew_lease, "si1",
|
||||
secrets(20)[1])
|
||||
# examine the exception thus raised, make sure the old nodeid is
|
||||
# present, to provide for share migration
|
||||
e = self.failUnlessRaises(IndexError,
|
||||
ss.remote_renew_lease, "si1",
|
||||
secrets(20)[1])
|
||||
e_s = str(e)
|
||||
self.failUnless("Unable to renew non-existent lease" in e_s)
|
||||
self.failUnless("I have leases accepted by nodeids:" in e_s)
|
||||
self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
|
||||
|
||||
# same for cancelling
|
||||
self.failUnlessRaises(IndexError,
|
||||
ss.remote_cancel_lease, "si1",
|
||||
secrets(20)[2])
|
||||
self.failUnlessEqual(all_leases, s0.debug_get_leases())
|
||||
self.compare_leases(all_leases, s0.debug_get_leases())
|
||||
|
||||
# reading shares should not modify the timestamp
|
||||
read("si1", [], [(0,200)])
|
||||
self.failUnlessEqual(all_leases, s0.debug_get_leases())
|
||||
self.compare_leases(all_leases, s0.debug_get_leases())
|
||||
|
||||
write("si1", secrets(0),
|
||||
{0: ([], [(200, "make me bigger")], None)}, [])
|
||||
@ -1056,6 +1077,43 @@ class MutableServer(unittest.TestCase):
|
||||
self.failUnlessRaises(IndexError,
|
||||
ss.remote_cancel_lease, "si2", "nonsecret")
|
||||
|
||||
def test_update_write_enabler(self):
|
||||
ss = self.create("test_update_write_enabler", sizelimit=1000*1000)
|
||||
secrets = ( self.write_enabler("we1"),
|
||||
self.renew_secret("we1-0"),
|
||||
self.cancel_secret("we1-0") )
|
||||
old_write_enabler = secrets[0]
|
||||
new_write_enabler = self.write_enabler("we2")
|
||||
new_secrets = (new_write_enabler, secrets[1], secrets[2])
|
||||
|
||||
data = "".join([ ("%d" % i) * 10 for i in range(10) ])
|
||||
write = ss.remote_slot_testv_and_readv_and_writev
|
||||
read = ss.remote_slot_readv
|
||||
update_write_enabler = ss.remote_update_write_enabler
|
||||
|
||||
rc = write("si1", secrets, {0: ([], [(0,data)], None)}, [])
|
||||
self.failUnlessEqual(rc, (True, {}))
|
||||
|
||||
rc = write("si1", secrets, {0: ([], [(1,data)], None)}, [])
|
||||
self.failUnlessEqual(rc[0], True)
|
||||
|
||||
f = self.failUnlessRaises(BadWriteEnablerError,
|
||||
write, "si1", new_secrets,
|
||||
{}, [])
|
||||
self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
|
||||
ss.setNodeID("\xff" * 20)
|
||||
|
||||
rc = update_write_enabler("si1", old_write_enabler, new_write_enabler)
|
||||
self.failUnlessEqual(rc, None)
|
||||
|
||||
f = self.failUnlessRaises(BadWriteEnablerError,
|
||||
write, "si1", secrets,
|
||||
{}, [])
|
||||
self.failUnless("The write enabler was recorded by nodeid '77777777777777777777777777777777'." in f, f)
|
||||
|
||||
rc = write("si1", new_secrets, {0: ([], [(2,data)], None)}, [])
|
||||
self.failUnlessEqual(rc[0], True)
|
||||
|
||||
|
||||
class Stats(unittest.TestCase):
|
||||
|
||||
@ -1072,6 +1130,7 @@ class Stats(unittest.TestCase):
|
||||
def create(self, name, sizelimit=None):
|
||||
workdir = self.workdir(name)
|
||||
ss = StorageServer(workdir, sizelimit)
|
||||
ss.setNodeID("\x00" * 20)
|
||||
ss.setServiceParent(self.sparent)
|
||||
return ss
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user