Merge pull request #648 from tahoe-lafs/3241.refactor-mutable-share-write-implementation

Refactor mutable share write implementation

Fixes: ticket:3241
This commit is contained in:
Jean-Paul Calderone 2019-08-23 08:45:48 -04:00 committed by GitHub
commit debefdc977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 270 additions and 44 deletions

0
newsfragments/1893.minor Normal file
View File

View File

@ -1,4 +1,5 @@
import os, re, weakref, struct, time
import six
from foolscap.api import Referenceable
from twisted.application import service
@ -391,8 +392,9 @@ class StorageServer(service.MultiService, Referenceable):
bucket. Each lease is returned as a LeaseInfo instance.
This method is not for client use.
"""
:note: Only for immutable shares.
"""
# since all shares get the same lease data, we just grab the leases
# from the first share
try:
@ -402,20 +404,39 @@ class StorageServer(service.MultiService, Referenceable):
except StopIteration:
return iter([])
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
read_vector):
start = time.time()
self.count("writev")
si_s = si_b2a(storage_index)
log.msg("storage: slot_writev %s" % si_s)
si_dir = storage_index_to_dir(storage_index)
(write_enabler, renew_secret, cancel_secret) = secrets
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
def get_slot_leases(self, storage_index):
"""
This method is not for client use.
:note: Only for mutable shares.
:return: An iterable of the leases attached to this slot.
"""
for _, share_filename in self._get_bucket_shares(storage_index):
share = MutableShareFile(share_filename)
return share.get_leases()
return []
def _collect_mutable_shares_for_storage_index(self, bucketdir, write_enabler, si_s):
"""
Gather up existing mutable shares for the given storage index.
:param bytes bucketdir: The filesystem path containing shares for the
given storage index.
:param bytes write_enabler: The write enabler secret for the shares.
:param bytes si_s: The storage index in encoded (base32) form.
:raise BadWriteEnablerError: If the write enabler is not correct for
any of the collected shares.
:return dict[int, MutableShareFile]: The collected shares in a mapping
from integer share numbers to ``MutableShareFile`` instances.
"""
shares = {}
if os.path.isdir(bucketdir):
# shares exist if there is a file for them
for sharenum_s in os.listdir(bucketdir):
try:
sharenum = int(sharenum_s)
@ -425,67 +446,197 @@ class StorageServer(service.MultiService, Referenceable):
msf = MutableShareFile(filename, self)
msf.check_write_enabler(write_enabler, si_s)
shares[sharenum] = msf
# write_enabler is good for all existing shares.
return shares
# Now evaluate test vectors.
testv_is_good = True
def _evaluate_test_vectors(self, test_and_write_vectors, shares):
"""
Execute test vectors against share data.
:param test_and_write_vectors: See
``allmydata.interfaces.TestAndWriteVectorsForShares``.
:param dict[int, MutableShareFile] shares: The shares against which to
execute the vectors.
:return bool: ``True`` if and only if all of the test vectors succeed
against the given shares.
"""
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if sharenum in shares:
if not shares[sharenum].check_testv(testv):
self.log("testv failed: [%d]: %r" % (sharenum, testv))
testv_is_good = False
break
return False
else:
# compare the vectors against an empty share, in which all
# reads return empty strings.
if not EmptyShare().check_testv(testv):
self.log("testv failed (empty): [%d] %r" % (sharenum,
testv))
testv_is_good = False
break
return False
return True
# now gather the read vectors, before we do any writes
def _evaluate_read_vectors(self, read_vector, shares):
"""
Execute read vectors against share data.
:param read_vector: See ``allmydata.interfaces.ReadVector``.
:param dict[int, MutableShareFile] shares: The shares against which to
execute the vector.
:return dict[int, bytes]: The data read from the shares.
"""
read_data = {}
for sharenum, share in shares.items():
read_data[sharenum] = share.readv(read_vector)
return read_data
def _evaluate_write_vectors(self, bucketdir, secrets, test_and_write_vectors, shares):
"""
Execute write vectors against share data.
:param bytes bucketdir: The parent directory holding the shares. This
is removed if the last share is removed from it. If shares are
created, they are created in it.
:param secrets: A tuple of ``WriteEnablerSecret``,
``LeaseRenewSecret``, and ``LeaseCancelSecret``. These secrets
are used to initialize new shares.
:param test_and_write_vectors: See
``allmydata.interfaces.TestAndWriteVectorsForShares``.
:param dict[int, MutableShareFile]: The shares against which to
execute the vectors.
:return dict[int, MutableShareFile]: The shares which still exist
after applying the vectors.
"""
remaining_shares = {}
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if new_length == 0:
if sharenum in shares:
shares[sharenum].unlink()
else:
if sharenum not in shares:
# allocate a new share
allocated_size = 2000 # arbitrary, really
share = self._allocate_slot_share(bucketdir, secrets,
sharenum,
allocated_size,
owner_num=0)
shares[sharenum] = share
shares[sharenum].writev(datav, new_length)
remaining_shares[sharenum] = shares[sharenum]
if new_length == 0:
# delete bucket directories that exist but are empty. They
# might not exist if a client showed up and asked us to
# truncate a share we weren't even holding.
if os.path.exists(bucketdir) and [] == os.listdir(bucketdir):
os.rmdir(bucketdir)
return remaining_shares
def _make_lease_info(self, renew_secret, cancel_secret):
"""
:return LeaseInfo: Information for a new lease for a share.
"""
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
lease_info = LeaseInfo(ownerid,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
return lease_info
def _add_or_renew_leases(self, shares, lease_info):
"""
Put the given lease onto the given shares.
:param dict[int, MutableShareFile] shares: The shares to put the lease
onto.
:param LeaseInfo lease_info: The lease to put on the shares.
"""
for share in six.viewvalues(shares):
share.add_or_renew_lease(lease_info)
def slot_testv_and_readv_and_writev(
self,
storage_index,
secrets,
test_and_write_vectors,
read_vector,
renew_leases,
):
"""
Read data from shares and conditionally write some data to them.
:param bool renew_leases: If and only if this is ``True`` and the test
vectors pass then shares in this slot will also have an updated
lease applied to them.
See ``allmydata.interfaces.RIStorageServer`` for details about other
parameters and return value.
"""
start = time.time()
self.count("writev")
si_s = si_b2a(storage_index)
log.msg("storage: slot_writev %s" % si_s)
si_dir = storage_index_to_dir(storage_index)
(write_enabler, renew_secret, cancel_secret) = secrets
bucketdir = os.path.join(self.sharedir, si_dir)
# If collection succeeds we know the write_enabler is good for all
# existing shares.
shares = self._collect_mutable_shares_for_storage_index(
bucketdir,
write_enabler,
si_s,
)
# Now evaluate test vectors.
testv_is_good = self._evaluate_test_vectors(
test_and_write_vectors,
shares,
)
# now gather the read vectors, before we do any writes
read_data = self._evaluate_read_vectors(
read_vector,
shares,
)
if testv_is_good:
# now apply the write vectors
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if new_length == 0:
if sharenum in shares:
shares[sharenum].unlink()
else:
if sharenum not in shares:
# allocate a new share
allocated_size = 2000 # arbitrary, really
share = self._allocate_slot_share(bucketdir, secrets,
sharenum,
allocated_size,
owner_num=0)
shares[sharenum] = share
shares[sharenum].writev(datav, new_length)
# and update the lease
shares[sharenum].add_or_renew_lease(lease_info)
if new_length == 0:
# delete empty bucket directories
if not os.listdir(bucketdir):
os.rmdir(bucketdir)
remaining_shares = self._evaluate_write_vectors(
bucketdir,
secrets,
test_and_write_vectors,
shares,
)
if renew_leases:
lease_info = self._make_lease_info(renew_secret, cancel_secret)
self._add_or_renew_leases(remaining_shares, lease_info)
# all done
self.add_latency("writev", time.time() - start)
return (testv_is_good, read_data)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
read_vector):
return self.slot_testv_and_readv_and_writev(
storage_index,
secrets,
test_and_write_vectors,
read_vector,
renew_leases=True,
)
def _allocate_slot_share(self, bucketdir, secrets, sharenum,
allocated_size, owner_num=0):
(write_enabler, renew_secret, cancel_secret) = secrets

View File

@ -1369,6 +1369,81 @@ class MutableServer(unittest.TestCase):
self.failUnless(os.path.exists(prefixdir), prefixdir)
self.failIf(os.path.exists(bucketdir), bucketdir)
def test_writev_without_renew_lease(self):
"""
The helper method ``slot_testv_and_readv_and_writev`` does not renew
leases if ``False`` is passed for the ``renew_leases`` parameter.
"""
ss = self.create("test_writev_without_renew_lease")
storage_index = "si2"
secrets = (
self.write_enabler(storage_index),
self.renew_secret(storage_index),
self.cancel_secret(storage_index),
)
sharenum = 3
datav = [(0, b"Hello, world")]
ss.slot_testv_and_readv_and_writev(
storage_index=storage_index,
secrets=secrets,
test_and_write_vectors={
sharenum: ([], datav, None),
},
read_vector=[],
renew_leases=False,
)
leases = list(ss.get_slot_leases(storage_index))
self.assertEqual([], leases)
def test_get_slot_leases_empty_slot(self):
"""
When ``get_slot_leases`` is called for a slot for which the server has no
shares, it returns an empty iterable.
"""
ss = self.create(b"test_get_slot_leases_empty_slot")
self.assertEqual(
list(ss.get_slot_leases(b"si1")),
[],
)
def test_remove_non_present(self):
"""
A write vector which would remove a share completely is applied as a no-op
by a server which does not have the share.
"""
ss = self.create("test_remove_non_present")
storage_index = "si1"
secrets = (
self.write_enabler(storage_index),
self.renew_secret(storage_index),
self.cancel_secret(storage_index),
)
sharenum = 3
testv = []
datav = []
new_length = 0
read_vector = []
# We don't even need to create any shares to exercise this
# functionality. Just go straight to sending a truncate-to-zero
# write.
testv_is_good, read_data = ss.remote_slot_testv_and_readv_and_writev(
storage_index=storage_index,
secrets=secrets,
test_and_write_vectors={
sharenum: (testv, datav, new_length),
},
read_vector=read_vector,
)
self.assertTrue(testv_is_good)
self.assertEqual({}, read_data)
class MDMFProxies(unittest.TestCase, ShouldFailMixin):
def setUp(self):