From dcc8f93b4e4a34c76b913b1f381e0bda0b0a9d85 Mon Sep 17 00:00:00 2001 From: Jean-Paul Calderone Date: Tue, 13 Aug 2019 13:19:44 -0400 Subject: [PATCH] Refactor remote_slot_testv_and_readv_and_writev into some bite-sized pieces --- src/allmydata/storage/server.py | 183 +++++++++++++++++++++----------- 1 file changed, 122 insertions(+), 61 deletions(-) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index ac5567a30..a02dacd7f 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -1,4 +1,5 @@ import os, re, weakref, struct, time +import six from foolscap.api import Referenceable from twisted.application import service @@ -402,6 +403,102 @@ class StorageServer(service.MultiService, Referenceable): except StopIteration: return iter([]) + def _collect_mutable_shares_for_storage_index(self, bucketdir, write_enabler, si_s): + """ + Gather up existing mutable shares for the given storage index. + + :param bytes bucketdir: The filesystem path containing shares for the + given storage index. + + :param bytes write_enabler: The write enabler secret for the shares. + + :param bytes si_s: The storage index in encoded (base32) form. + + :raise BadWriteEnablerError: If the write enabler is not correct for + any of the collected shares. + + :return dict[int, MutableShareFile]: The collected shares in a mapping + from integer share numbers to ``MutableShareFile`` instances. + """ + shares = {} + if os.path.isdir(bucketdir): + # shares exist if there is a file for them + for sharenum_s in os.listdir(bucketdir): + try: + sharenum = int(sharenum_s) + except ValueError: + continue + filename = os.path.join(bucketdir, sharenum_s) + msf = MutableShareFile(filename, self) + msf.check_write_enabler(write_enabler, si_s) + shares[sharenum] = msf + return shares + + def _evaluate_test_vectors(self, test_and_write_vectors, shares): + """ + Execute test vectors against share data. + + :param test_and_write_vectors: + """ + for sharenum in test_and_write_vectors: + (testv, datav, new_length) = test_and_write_vectors[sharenum] + if sharenum in shares: + if not shares[sharenum].check_testv(testv): + self.log("testv failed: [%d]: %r" % (sharenum, testv)) + return False + else: + # compare the vectors against an empty share, in which all + # reads return empty strings. + if not EmptyShare().check_testv(testv): + self.log("testv failed (empty): [%d] %r" % (sharenum, + testv)) + return False + return True + + def _evaluate_read_vectors(self, read_vector, shares): + read_data = {} + for sharenum, share in shares.items(): + read_data[sharenum] = share.readv(read_vector) + return read_data + + def _evaluate_write_vectors(self, bucketdir, secrets, test_and_write_vectors, shares): + remaining_shares = {} + + for sharenum in test_and_write_vectors: + (testv, datav, new_length) = test_and_write_vectors[sharenum] + if new_length == 0: + if sharenum in shares: + shares[sharenum].unlink() + else: + if sharenum not in shares: + # allocate a new share + allocated_size = 2000 # arbitrary, really + share = self._allocate_slot_share(bucketdir, secrets, + sharenum, + allocated_size, + owner_num=0) + shares[sharenum] = share + shares[sharenum].writev(datav, new_length) + remaining_shares[sharenum] = shares[sharenum] + + if new_length == 0: + # delete empty bucket directories + if not os.listdir(bucketdir): + os.rmdir(bucketdir) + return remaining_shares + + def _make_lease_info(self, renew_secret, cancel_secret): + ownerid = 1 # TODO + expire_time = time.time() + 31*24*60*60 # one month + lease_info = LeaseInfo(ownerid, + renew_secret, cancel_secret, + expire_time, self.my_nodeid) + return lease_info + + def _add_or_renew_leases(self, shares, lease_info): + for share in six.viewvalues(shares): + share.add_or_renew_lease(lease_info) + def remote_slot_testv_and_readv_and_writev(self, storage_index, secrets, test_and_write_vectors, @@ -412,75 +509,39 @@ class StorageServer(service.MultiService, Referenceable): log.msg("storage: slot_writev %s" % si_s) si_dir = storage_index_to_dir(storage_index) (write_enabler, renew_secret, cancel_secret) = secrets - # shares exist if there is a file for them bucketdir = os.path.join(self.sharedir, si_dir) - shares = {} - if os.path.isdir(bucketdir): - for sharenum_s in os.listdir(bucketdir): - try: - sharenum = int(sharenum_s) - except ValueError: - continue - filename = os.path.join(bucketdir, sharenum_s) - msf = MutableShareFile(filename, self) - msf.check_write_enabler(write_enabler, si_s) - shares[sharenum] = msf - # write_enabler is good for all existing shares. + + # If collection succeeds we know the write_enabler is good for all + # existing shares. + shares = self._collect_mutable_shares_for_storage_index( + bucketdir, + write_enabler, + si_s, + ) # Now evaluate test vectors. - testv_is_good = True - for sharenum in test_and_write_vectors: - (testv, datav, new_length) = test_and_write_vectors[sharenum] - if sharenum in shares: - if not shares[sharenum].check_testv(testv): - self.log("testv failed: [%d]: %r" % (sharenum, testv)) - testv_is_good = False - break - else: - # compare the vectors against an empty share, in which all - # reads return empty strings. - if not EmptyShare().check_testv(testv): - self.log("testv failed (empty): [%d] %r" % (sharenum, - testv)) - testv_is_good = False - break + testv_is_good = self._evaluate_test_vectors( + test_and_write_vectors, + shares, + ) # now gather the read vectors, before we do any writes - read_data = {} - for sharenum, share in shares.items(): - read_data[sharenum] = share.readv(read_vector) - - ownerid = 1 # TODO - expire_time = time.time() + 31*24*60*60 # one month - lease_info = LeaseInfo(ownerid, - renew_secret, cancel_secret, - expire_time, self.my_nodeid) + read_data = self._evaluate_read_vectors( + read_vector, + shares, + ) if testv_is_good: + lease_info = self._make_lease_info(renew_secret, cancel_secret) + # now apply the write vectors - for sharenum in test_and_write_vectors: - (testv, datav, new_length) = test_and_write_vectors[sharenum] - if new_length == 0: - if sharenum in shares: - shares[sharenum].unlink() - else: - if sharenum not in shares: - # allocate a new share - allocated_size = 2000 # arbitrary, really - share = self._allocate_slot_share(bucketdir, secrets, - sharenum, - allocated_size, - owner_num=0) - shares[sharenum] = share - shares[sharenum].writev(datav, new_length) - # and update the lease - shares[sharenum].add_or_renew_lease(lease_info) - - if new_length == 0: - # delete empty bucket directories - if not os.listdir(bucketdir): - os.rmdir(bucketdir) - + remaining_shares = self._evaluate_write_vectors( + bucketdir, + secrets, + test_and_write_vectors, + shares, + ) + self._add_or_renew_leases(remaining_shares, lease_info) # all done self.add_latency("writev", time.time() - start)