mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-02-22 10:20:59 +00:00
Merge pull request #1160 from tahoe-lafs/3842.allocate_buckets-without-lease-renewal
Allow allocate_buckets without implicit lease renewal Fixes: ticket:3842
This commit is contained in:
commit
d3c6f58a8d
0
newsfragments/3842.minor
Normal file
0
newsfragments/3842.minor
Normal file
@ -57,6 +57,9 @@ DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
|
|||||||
|
|
||||||
@implementer(RIStorageServer, IStatsProducer)
|
@implementer(RIStorageServer, IStatsProducer)
|
||||||
class StorageServer(service.MultiService, Referenceable):
|
class StorageServer(service.MultiService, Referenceable):
|
||||||
|
"""
|
||||||
|
A filesystem-based implementation of ``RIStorageServer``.
|
||||||
|
"""
|
||||||
name = 'storage'
|
name = 'storage'
|
||||||
LeaseCheckerClass = LeaseCheckingCrawler
|
LeaseCheckerClass = LeaseCheckingCrawler
|
||||||
|
|
||||||
@ -277,9 +280,14 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
def _allocate_buckets(self, storage_index,
|
def _allocate_buckets(self, storage_index,
|
||||||
renew_secret, cancel_secret,
|
renew_secret, cancel_secret,
|
||||||
sharenums, allocated_size,
|
sharenums, allocated_size,
|
||||||
owner_num=0):
|
owner_num=0, renew_leases=True):
|
||||||
"""
|
"""
|
||||||
Generic bucket allocation API.
|
Generic bucket allocation API.
|
||||||
|
|
||||||
|
:param bool renew_leases: If and only if this is ``True`` then renew a
|
||||||
|
secret-matching lease on (or, if none match, add a new lease to)
|
||||||
|
existing shares in this bucket. Any *new* shares are given a new
|
||||||
|
lease regardless.
|
||||||
"""
|
"""
|
||||||
# owner_num is not for clients to set, but rather it should be
|
# owner_num is not for clients to set, but rather it should be
|
||||||
# curried into the PersonalStorageServer instance that is dedicated
|
# curried into the PersonalStorageServer instance that is dedicated
|
||||||
@ -319,8 +327,9 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
# file, they'll want us to hold leases for this file.
|
# file, they'll want us to hold leases for this file.
|
||||||
for (shnum, fn) in self._get_bucket_shares(storage_index):
|
for (shnum, fn) in self._get_bucket_shares(storage_index):
|
||||||
alreadygot.add(shnum)
|
alreadygot.add(shnum)
|
||||||
sf = ShareFile(fn)
|
if renew_leases:
|
||||||
sf.add_or_renew_lease(lease_info)
|
sf = ShareFile(fn)
|
||||||
|
sf.add_or_renew_lease(lease_info)
|
||||||
|
|
||||||
for shnum in sharenums:
|
for shnum in sharenums:
|
||||||
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
|
incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
|
||||||
@ -361,7 +370,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
"""Foolscap-specific ``allocate_buckets()`` API."""
|
"""Foolscap-specific ``allocate_buckets()`` API."""
|
||||||
alreadygot, bucketwriters = self._allocate_buckets(
|
alreadygot, bucketwriters = self._allocate_buckets(
|
||||||
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
|
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
|
||||||
owner_num=owner_num,
|
owner_num=owner_num, renew_leases=True,
|
||||||
)
|
)
|
||||||
# Abort BucketWriters if disconnection happens.
|
# Abort BucketWriters if disconnection happens.
|
||||||
for bw in bucketwriters.values():
|
for bw in bucketwriters.values():
|
||||||
@ -579,10 +588,8 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
else:
|
else:
|
||||||
if sharenum not in shares:
|
if sharenum not in shares:
|
||||||
# allocate a new share
|
# allocate a new share
|
||||||
allocated_size = 2000 # arbitrary, really
|
|
||||||
share = self._allocate_slot_share(bucketdir, secrets,
|
share = self._allocate_slot_share(bucketdir, secrets,
|
||||||
sharenum,
|
sharenum,
|
||||||
allocated_size,
|
|
||||||
owner_num=0)
|
owner_num=0)
|
||||||
shares[sharenum] = share
|
shares[sharenum] = share
|
||||||
shares[sharenum].writev(datav, new_length)
|
shares[sharenum].writev(datav, new_length)
|
||||||
@ -631,8 +638,10 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
Read data from shares and conditionally write some data to them.
|
Read data from shares and conditionally write some data to them.
|
||||||
|
|
||||||
:param bool renew_leases: If and only if this is ``True`` and the test
|
:param bool renew_leases: If and only if this is ``True`` and the test
|
||||||
vectors pass then shares in this slot will also have an updated
|
vectors pass then shares mentioned in ``test_and_write_vectors``
|
||||||
lease applied to them.
|
that still exist after the changes are made will also have a
|
||||||
|
secret-matching lease renewed (or, if none match, a new lease
|
||||||
|
added).
|
||||||
|
|
||||||
See ``allmydata.interfaces.RIStorageServer`` for details about other
|
See ``allmydata.interfaces.RIStorageServer`` for details about other
|
||||||
parameters and return value.
|
parameters and return value.
|
||||||
@ -694,7 +703,7 @@ class StorageServer(service.MultiService, Referenceable):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
|
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
|
||||||
allocated_size, owner_num=0):
|
owner_num=0):
|
||||||
(write_enabler, renew_secret, cancel_secret) = secrets
|
(write_enabler, renew_secret, cancel_secret) = secrets
|
||||||
my_nodeid = self.my_nodeid
|
my_nodeid = self.my_nodeid
|
||||||
fileutil.make_dirs(bucketdir)
|
fileutil.make_dirs(bucketdir)
|
||||||
|
@ -468,14 +468,19 @@ class Server(unittest.TestCase):
|
|||||||
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
|
sv1 = ver[b'http://allmydata.org/tahoe/protocols/storage/v1']
|
||||||
self.failUnlessIn(b'available-space', sv1)
|
self.failUnlessIn(b'available-space', sv1)
|
||||||
|
|
||||||
def allocate(self, ss, storage_index, sharenums, size, canary=None):
|
def allocate(self, ss, storage_index, sharenums, size, renew_leases=True):
|
||||||
|
"""
|
||||||
|
Call directly into the storage server's allocate_buckets implementation,
|
||||||
|
skipping the Foolscap layer.
|
||||||
|
"""
|
||||||
renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
|
renew_secret = hashutil.my_renewal_secret_hash(b"%d" % next(self._lease_secret))
|
||||||
cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
|
cancel_secret = hashutil.my_cancel_secret_hash(b"%d" % next(self._lease_secret))
|
||||||
if not canary:
|
return ss._allocate_buckets(
|
||||||
canary = FakeCanary()
|
storage_index,
|
||||||
return ss.remote_allocate_buckets(storage_index,
|
renew_secret, cancel_secret,
|
||||||
renew_secret, cancel_secret,
|
sharenums, size,
|
||||||
sharenums, size, canary)
|
renew_leases=renew_leases,
|
||||||
|
)
|
||||||
|
|
||||||
def test_large_share(self):
|
def test_large_share(self):
|
||||||
syslow = platform.system().lower()
|
syslow = platform.system().lower()
|
||||||
@ -608,6 +613,64 @@ class Server(unittest.TestCase):
|
|||||||
for i,wb in writers.items():
|
for i,wb in writers.items():
|
||||||
wb.remote_abort()
|
wb.remote_abort()
|
||||||
|
|
||||||
|
def test_allocate_without_lease_renewal(self):
|
||||||
|
"""
|
||||||
|
``StorageServer._allocate_buckets`` does not renew leases on existing
|
||||||
|
shares if ``renew_leases`` is ``False``.
|
||||||
|
"""
|
||||||
|
first_lease = 456
|
||||||
|
second_lease = 543
|
||||||
|
storage_index = b"allocate"
|
||||||
|
|
||||||
|
clock = Clock()
|
||||||
|
clock.advance(first_lease)
|
||||||
|
ss = self.create(
|
||||||
|
"test_allocate_without_lease_renewal",
|
||||||
|
get_current_time=clock.seconds,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Put a share on there
|
||||||
|
already, writers = self.allocate(
|
||||||
|
ss, storage_index, [0], 1, renew_leases=False,
|
||||||
|
)
|
||||||
|
(writer,) = writers.values()
|
||||||
|
writer.remote_write(0, b"x")
|
||||||
|
writer.remote_close()
|
||||||
|
|
||||||
|
# It should have a lease granted at the current time.
|
||||||
|
shares = dict(ss._get_bucket_shares(storage_index))
|
||||||
|
self.assertEqual(
|
||||||
|
[first_lease],
|
||||||
|
list(
|
||||||
|
lease.get_grant_renew_time_time()
|
||||||
|
for lease
|
||||||
|
in ShareFile(shares[0]).get_leases()
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Let some time pass so we can tell if the lease on share 0 is
|
||||||
|
# renewed.
|
||||||
|
clock.advance(second_lease)
|
||||||
|
|
||||||
|
# Put another share on there.
|
||||||
|
already, writers = self.allocate(
|
||||||
|
ss, storage_index, [1], 1, renew_leases=False,
|
||||||
|
)
|
||||||
|
(writer,) = writers.values()
|
||||||
|
writer.remote_write(0, b"x")
|
||||||
|
writer.remote_close()
|
||||||
|
|
||||||
|
# The first share's lease expiration time is unchanged.
|
||||||
|
shares = dict(ss._get_bucket_shares(storage_index))
|
||||||
|
self.assertEqual(
|
||||||
|
[first_lease],
|
||||||
|
list(
|
||||||
|
lease.get_grant_renew_time_time()
|
||||||
|
for lease
|
||||||
|
in ShareFile(shares[0]).get_leases()
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
def test_bad_container_version(self):
|
def test_bad_container_version(self):
|
||||||
ss = self.create("test_bad_container_version")
|
ss = self.create("test_bad_container_version")
|
||||||
a,w = self.allocate(ss, b"si1", [0], 10)
|
a,w = self.allocate(ss, b"si1", [0], 10)
|
||||||
@ -629,8 +692,17 @@ class Server(unittest.TestCase):
|
|||||||
def test_disconnect(self):
|
def test_disconnect(self):
|
||||||
# simulate a disconnection
|
# simulate a disconnection
|
||||||
ss = self.create("test_disconnect")
|
ss = self.create("test_disconnect")
|
||||||
|
renew_secret = b"r" * 32
|
||||||
|
cancel_secret = b"c" * 32
|
||||||
canary = FakeCanary()
|
canary = FakeCanary()
|
||||||
already,writers = self.allocate(ss, b"disconnect", [0,1,2], 75, canary)
|
already,writers = ss.remote_allocate_buckets(
|
||||||
|
b"disconnect",
|
||||||
|
renew_secret,
|
||||||
|
cancel_secret,
|
||||||
|
sharenums=[0,1,2],
|
||||||
|
allocated_size=75,
|
||||||
|
canary=canary,
|
||||||
|
)
|
||||||
self.failUnlessEqual(already, set())
|
self.failUnlessEqual(already, set())
|
||||||
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
|
self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
|
||||||
for (f,args,kwargs) in list(canary.disconnectors.values()):
|
for (f,args,kwargs) in list(canary.disconnectors.values()):
|
||||||
@ -662,8 +734,17 @@ class Server(unittest.TestCase):
|
|||||||
# the size we request.
|
# the size we request.
|
||||||
OVERHEAD = 3*4
|
OVERHEAD = 3*4
|
||||||
LEASE_SIZE = 4+32+32+4
|
LEASE_SIZE = 4+32+32+4
|
||||||
|
renew_secret = b"r" * 32
|
||||||
|
cancel_secret = b"c" * 32
|
||||||
canary = FakeCanary()
|
canary = FakeCanary()
|
||||||
already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary)
|
already, writers = ss.remote_allocate_buckets(
|
||||||
|
b"vid1",
|
||||||
|
renew_secret,
|
||||||
|
cancel_secret,
|
||||||
|
sharenums=[0,1,2],
|
||||||
|
allocated_size=1000,
|
||||||
|
canary=canary,
|
||||||
|
)
|
||||||
self.failUnlessEqual(len(writers), 3)
|
self.failUnlessEqual(len(writers), 3)
|
||||||
# now the StorageServer should have 3000 bytes provisionally
|
# now the StorageServer should have 3000 bytes provisionally
|
||||||
# allocated, allowing only 2000 more to be claimed
|
# allocated, allowing only 2000 more to be claimed
|
||||||
@ -696,7 +777,14 @@ class Server(unittest.TestCase):
|
|||||||
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
|
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
|
||||||
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
|
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
|
||||||
canary3 = FakeCanary()
|
canary3 = FakeCanary()
|
||||||
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3)
|
already3, writers3 = ss.remote_allocate_buckets(
|
||||||
|
b"vid3",
|
||||||
|
renew_secret,
|
||||||
|
cancel_secret,
|
||||||
|
sharenums=list(range(100)),
|
||||||
|
allocated_size=100,
|
||||||
|
canary=canary3,
|
||||||
|
)
|
||||||
self.failUnlessEqual(len(writers3), 39)
|
self.failUnlessEqual(len(writers3), 39)
|
||||||
self.failUnlessEqual(len(ss._bucket_writers), 39)
|
self.failUnlessEqual(len(ss._bucket_writers), 39)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user