Switch to per-call parameter for controlling lease renewal behavior

This is closer to an implementation where you could have two frontends, say a
Foolscap frontend and an HTTP frontend or even just two different HTTP
frontends, which had different opinions about what the behaviour should be.
This commit is contained in:
Jean-Paul Calderone 2021-11-12 16:20:27 -05:00
parent 2742de6f7c
commit c3cb0ebaea
2 changed files with 61 additions and 53 deletions

View File

@ -59,21 +59,10 @@ DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
class StorageServer(service.MultiService, Referenceable):
"""
A filesystem-based implementation of ``RIStorageServer``.
:ivar bool _implicit_bucket_lease_renewal: If and only if this is ``True``
then ``allocate_buckets`` will renew leases on existing shares
associated with the storage index it operates on.
:ivar bool _implicit_slot_lease_renewal: If and only if this is ``True``
then ``slot_testv_and_readv_and_writev`` will renew leases on shares
associated with the slot it operates on.
"""
name = 'storage'
LeaseCheckerClass = LeaseCheckingCrawler
_implicit_bucket_lease_renewal = True
_implicit_slot_lease_renewal = True
def __init__(self, storedir, nodeid, reserved_space=0,
discard_storage=False, readonly_storage=False,
stats_provider=None,
@ -149,29 +138,6 @@ class StorageServer(service.MultiService, Referenceable):
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
def set_implicit_bucket_lease_renewal(self, enabled):
# type: (bool) -> None
"""
Control the behavior of implicit lease renewal by *allocate_buckets*.
:param enabled: If and only if ``True`` then future *allocate_buckets*
calls will renew leases on shares that already exist in the bucket.
"""
self._implicit_bucket_lease_renewal = enabled
def set_implicit_slot_lease_renewal(self, enabled):
# type: (bool) -> None
"""
Control the behavior of implicit lease renewal by
*slot_testv_and_readv_and_writev*.
:param enabled: If and only if ``True`` then future
*slot_testv_and_readv_and_writev* calls will renew leases on
shares that still exist in the slot after the writev is applied
and which were touched by the writev.
"""
self._implicit_slot_lease_renewal = enabled
def have_shares(self):
# quick test to decide if we need to commit to an implicit
# permutation-seed or if we should use a new one
@ -314,9 +280,12 @@ class StorageServer(service.MultiService, Referenceable):
def _allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
owner_num=0):
owner_num=0, renew_leases=True):
"""
Generic bucket allocation API.
:param bool renew_leases: If and only if this is ``True`` then
renew leases on existing shares in this bucket.
"""
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
@ -356,7 +325,7 @@ class StorageServer(service.MultiService, Referenceable):
# file, they'll want us to hold leases for this file.
for (shnum, fn) in self._get_bucket_shares(storage_index):
alreadygot.add(shnum)
if self._implicit_bucket_lease_renewal:
if renew_leases:
sf = ShareFile(fn)
sf.add_or_renew_lease(lease_info)
@ -399,7 +368,7 @@ class StorageServer(service.MultiService, Referenceable):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num,
owner_num=owner_num, renew_leases=True,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
@ -661,12 +630,17 @@ class StorageServer(service.MultiService, Referenceable):
secrets,
test_and_write_vectors,
read_vector,
renew_leases,
):
"""
Read data from shares and conditionally write some data to them.
See ``allmydata.interfaces.RIStorageServer`` for details about other
parameters and return value.
:param bool renew_leases: If and only if this is ``True`` then renew
leases on all shares mentioned in ``test_and_write_vectors` that
still exist after the changes are made.
"""
start = self._get_current_time()
self.count("writev")
@ -704,7 +678,7 @@ class StorageServer(service.MultiService, Referenceable):
test_and_write_vectors,
shares,
)
if self._implicit_slot_lease_renewal:
if renew_leases:
lease_info = self._make_lease_info(renew_secret, cancel_secret)
self._add_or_renew_leases(remaining_shares, lease_info)
@ -721,6 +695,7 @@ class StorageServer(service.MultiService, Referenceable):
secrets,
test_and_write_vectors,
read_vector,
renew_leases=True,
)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,

View File

@ -468,14 +468,19 @@ class Server(unittest.TestCase):
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
self.failUnlessIn(b'available-space', sv1)
def allocate(self, ss, storage_index, sharenums, size, canary=None):
def allocate(self, ss, storage_index, sharenums, size, renew_leases=True):
"""
Call directly into the storage server's allocate_buckets implementation,
skipping the Foolscap layer.
"""
renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
if not canary:
canary = FakeCanary()
return ss.remote_allocate_buckets(storage_index,
renew_secret, cancel_secret,
sharenums, size, canary)
return ss._allocate_buckets(
storage_index,
renew_secret, cancel_secret,
sharenums, size,
renew_leases=renew_leases,
)
def test_large_share(self):
syslow = platform.system().lower()
@ -611,7 +616,7 @@ class Server(unittest.TestCase):
def test_allocate_without_lease_renewal(self):
"""
``remote_allocate_buckets`` does not renew leases on existing shares if
``set_implicit_bucket_lease_renewal(False)`` is called first.
``renew_leases`` is ``False``.
"""
first_lease = 456
second_lease = 543
@ -623,10 +628,11 @@ class Server(unittest.TestCase):
"test_allocate_without_lease_renewal",
get_current_time=clock.seconds,
)
ss.set_implicit_bucket_lease_renewal(False)
# Put a share on there
already, writers = self.allocate(ss, storage_index, [0], 1)
already, writers = self.allocate(
ss, storage_index, [0], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
@ -647,7 +653,9 @@ class Server(unittest.TestCase):
clock.advance(second_lease)
# Put another share on there.
already, writers = self.allocate(ss, storage_index, [1], 1)
already, writers = self.allocate(
ss, storage_index, [1], 1, renew_leases=False,
)
(writer,) = writers.values()
writer.remote_write(0, b"x")
writer.remote_close()
@ -684,8 +692,17 @@ class Server(unittest.TestCase):
def test_disconnect(self):
# simulate a disconnection
ss = self.create("test_disconnect")
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
canary = FakeCanary()
already,writers = self.allocate(ss, b"disconnect", [0,1,2], 75, canary)
already,writers = ss.remote_allocate_buckets(
b"disconnect",
renew_secret,
cancel_secret,
sharenums=[0,1,2],
allocated_size=75,
canary=canary,
)
self.failUnlessEqual(already, set())
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
for (f,args,kwargs) in list(canary.disconnectors.values()):
@ -717,8 +734,17 @@ class Server(unittest.TestCase):
# the size we request.
OVERHEAD = 3*4
LEASE_SIZE = 4+32+32+4
renew_secret = b"r" * 32
cancel_secret = b"c" * 32
canary = FakeCanary()
already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary)
already, writers = ss.remote_allocate_buckets(
b"vid1",
renew_secret,
cancel_secret,
sharenums=[0,1,2],
allocated_size=1000,
canary=canary,
)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed
@ -751,7 +777,14 @@ class Server(unittest.TestCase):
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
canary3 = FakeCanary()
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3)
already3, writers3 = ss.remote_allocate_buckets(
b"vid3",
renew_secret,
cancel_secret,
sharenums=list(range(100)),
allocated_size=100,
canary=canary3,
)
self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._bucket_writers), 39)
@ -1463,10 +1496,9 @@ class MutableServer(unittest.TestCase):
def test_writev_without_renew_lease(self):
"""
The helper method ``slot_testv_and_readv_and_writev`` does not renew
leases if ``set_implicit_bucket_lease_renewal(False)`` is called first.
leases if ``renew_leases```` is ``False``.
"""
ss = self.create("test_writev_without_renew_lease")
ss.set_implicit_slot_lease_renewal(False)
storage_index = b"si2"
secrets = (
@ -1485,6 +1517,7 @@ class MutableServer(unittest.TestCase):
sharenum: ([], datav, None),
},
read_vector=[],
renew_leases=False,
)
leases = list(ss.get_slot_leases(storage_index))
self.assertEqual([], leases)