Refactor remote_slot_testv_and_readv_and_writev into some bite-sized pieces

This commit is contained in:
Jean-Paul Calderone 2019-08-13 13:19:44 -04:00
parent 0723a2f4d2
commit dcc8f93b4e

View File

@ -1,4 +1,5 @@
import os, re, weakref, struct, time
import six
from foolscap.api import Referenceable
from twisted.application import service
@ -402,6 +403,102 @@ class StorageServer(service.MultiService, Referenceable):
except StopIteration:
return iter([])
def _collect_mutable_shares_for_storage_index(self, bucketdir, write_enabler, si_s):
"""
Gather up existing mutable shares for the given storage index.
:param bytes bucketdir: The filesystem path containing shares for the
given storage index.
:param bytes write_enabler: The write enabler secret for the shares.
:param bytes si_s: The storage index in encoded (base32) form.
:raise BadWriteEnablerError: If the write enabler is not correct for
any of the collected shares.
:return dict[int, MutableShareFile]: The collected shares in a mapping
from integer share numbers to ``MutableShareFile`` instances.
"""
shares = {}
if os.path.isdir(bucketdir):
# shares exist if there is a file for them
for sharenum_s in os.listdir(bucketdir):
try:
sharenum = int(sharenum_s)
except ValueError:
continue
filename = os.path.join(bucketdir, sharenum_s)
msf = MutableShareFile(filename, self)
msf.check_write_enabler(write_enabler, si_s)
shares[sharenum] = msf
return shares
def _evaluate_test_vectors(self, test_and_write_vectors, shares):
"""
Execute test vectors against share data.
:param test_and_write_vectors:
"""
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if sharenum in shares:
if not shares[sharenum].check_testv(testv):
self.log("testv failed: [%d]: %r" % (sharenum, testv))
return False
else:
# compare the vectors against an empty share, in which all
# reads return empty strings.
if not EmptyShare().check_testv(testv):
self.log("testv failed (empty): [%d] %r" % (sharenum,
testv))
return False
return True
def _evaluate_read_vectors(self, read_vector, shares):
read_data = {}
for sharenum, share in shares.items():
read_data[sharenum] = share.readv(read_vector)
return read_data
def _evaluate_write_vectors(self, bucketdir, secrets, test_and_write_vectors, shares):
remaining_shares = {}
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if new_length == 0:
if sharenum in shares:
shares[sharenum].unlink()
else:
if sharenum not in shares:
# allocate a new share
allocated_size = 2000 # arbitrary, really
share = self._allocate_slot_share(bucketdir, secrets,
sharenum,
allocated_size,
owner_num=0)
shares[sharenum] = share
shares[sharenum].writev(datav, new_length)
remaining_shares[sharenum] = shares[sharenum]
if new_length == 0:
# delete empty bucket directories
if not os.listdir(bucketdir):
os.rmdir(bucketdir)
return remaining_shares
def _make_lease_info(self, renew_secret, cancel_secret):
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
lease_info = LeaseInfo(ownerid,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
return lease_info
def _add_or_renew_leases(self, shares, lease_info):
for share in six.viewvalues(shares):
share.add_or_renew_lease(lease_info)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
secrets,
test_and_write_vectors,
@ -412,75 +509,39 @@ class StorageServer(service.MultiService, Referenceable):
log.msg("storage: slot_writev %s" % si_s)
si_dir = storage_index_to_dir(storage_index)
(write_enabler, renew_secret, cancel_secret) = secrets
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
shares = {}
if os.path.isdir(bucketdir):
for sharenum_s in os.listdir(bucketdir):
try:
sharenum = int(sharenum_s)
except ValueError:
continue
filename = os.path.join(bucketdir, sharenum_s)
msf = MutableShareFile(filename, self)
msf.check_write_enabler(write_enabler, si_s)
shares[sharenum] = msf
# write_enabler is good for all existing shares.
# If collection succeeds we know the write_enabler is good for all
# existing shares.
shares = self._collect_mutable_shares_for_storage_index(
bucketdir,
write_enabler,
si_s,
)
# Now evaluate test vectors.
testv_is_good = True
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if sharenum in shares:
if not shares[sharenum].check_testv(testv):
self.log("testv failed: [%d]: %r" % (sharenum, testv))
testv_is_good = False
break
else:
# compare the vectors against an empty share, in which all
# reads return empty strings.
if not EmptyShare().check_testv(testv):
self.log("testv failed (empty): [%d] %r" % (sharenum,
testv))
testv_is_good = False
break
testv_is_good = self._evaluate_test_vectors(
test_and_write_vectors,
shares,
)
# now gather the read vectors, before we do any writes
read_data = {}
for sharenum, share in shares.items():
read_data[sharenum] = share.readv(read_vector)
ownerid = 1 # TODO
expire_time = time.time() + 31*24*60*60 # one month
lease_info = LeaseInfo(ownerid,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
read_data = self._evaluate_read_vectors(
read_vector,
shares,
)
if testv_is_good:
lease_info = self._make_lease_info(renew_secret, cancel_secret)
# now apply the write vectors
for sharenum in test_and_write_vectors:
(testv, datav, new_length) = test_and_write_vectors[sharenum]
if new_length == 0:
if sharenum in shares:
shares[sharenum].unlink()
else:
if sharenum not in shares:
# allocate a new share
allocated_size = 2000 # arbitrary, really
share = self._allocate_slot_share(bucketdir, secrets,
sharenum,
allocated_size,
owner_num=0)
shares[sharenum] = share
shares[sharenum].writev(datav, new_length)
# and update the lease
shares[sharenum].add_or_renew_lease(lease_info)
if new_length == 0:
# delete empty bucket directories
if not os.listdir(bucketdir):
os.rmdir(bucketdir)
remaining_shares = self._evaluate_write_vectors(
bucketdir,
secrets,
test_and_write_vectors,
shares,
)
self._add_or_renew_leases(remaining_shares, lease_info)
# all done
self.add_latency("writev", time.time() - start)