From 8fb6afee1bc2ff5509695c5d53b2d5dd3d72ed08 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 29 Sep 2021 13:42:17 -0400 Subject: [PATCH] Refactor BucketWriters such that disconnection can be limited Foolscap. --- src/allmydata/storage/immutable.py | 14 +++----- src/allmydata/storage/server.py | 54 ++++++++++++++++++++++++------ src/allmydata/test/common_util.py | 5 +++ src/allmydata/test/test_storage.py | 34 ++++++------------- 4 files changed, 65 insertions(+), 42 deletions(-) diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index c2b190e01..d3b6ce875 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -208,8 +208,9 @@ class ShareFile(object): # Batch 1: # - bucketwriter dict in the server, to persist them + TEST of persistence # - aborting bucketwriter removes it from server persistent + TEST -# - get rid of disconnect notification (probably no test, rely on existing?) -# - add bucketwriter cancellation to remote_allocate_buckets() (probably rely on existing tests) +# - disconnect still aborts _for Foolscap only_ +# - existing in-use buckets are not returned _for Foolscap only_ +# - this implies splitting remote_allocate_buckets into generic and Foolscap-y parts # Batch 2: # - scheduled events for aborting bucketwriter + TEST # - bucketwriter writes delay cancellation + TEST @@ -218,13 +219,11 @@ class ShareFile(object): @implementer(RIBucketWriter) class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 - def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary): + def __init__(self, ss, incominghome, finalhome, max_size, lease_info): self.ss = ss self.incominghome = incominghome self.finalhome = finalhome self._max_size = max_size # don't allow the client to write more than this - self._canary = canary - self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) self.closed = False self.throw_out_all_data = False self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) @@ -290,22 +289,19 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 pass self._sharefile = None self.closed = True - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) filelen = os.stat(self.finalhome)[stat.ST_SIZE] self.ss.bucket_writer_closed(self, filelen) self.ss.add_latency("close", time.time() - start) self.ss.count("close") - def _disconnected(self): + def disconnected(self): if not self.closed: self._abort() def remote_abort(self): log.msg("storage: aborting sharefile %s" % self.incominghome, facility="tahoe.storage", level=log.UNUSUAL) - if not self.closed: - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) self._abort() self.ss.count("abort") diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index f4996756e..92b3d2a1b 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -11,13 +11,14 @@ if PY2: # Omit open() to get native behavior where open("w") always accepts native # strings. Omit bytes so we don't leak future's custom bytes. from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401 - +else: + from typing import Dict import os, re, struct, time -import weakref import six from foolscap.api import Referenceable +from foolscap.ipb import IRemoteReference from twisted.application import service from zope.interface import implementer @@ -89,7 +90,6 @@ class StorageServer(service.MultiService, Referenceable): self.incomingdir = os.path.join(sharedir, 'incoming') self._clean_incomplete() fileutil.make_dirs(self.incomingdir) - self._active_writers = weakref.WeakKeyDictionary() log.msg("StorageServer created", facility="tahoe.storage") if reserved_space: @@ -121,6 +121,15 @@ class StorageServer(service.MultiService, Referenceable): self.lease_checker.setServiceParent(self) self._get_current_time = get_current_time + # Currently being-written Bucketwriters. TODO Can probably refactor so + # active_writers is unnecessary, do as second pass. + # Map BucketWriter -> (storage_index, share_num) + self._active_writers = {} # type: Dict[BucketWriter, (bytes,int)] + # Map (storage_index, share_num) -> BucketWriter: + self._bucket_writers = {} # type: Dict[(bytes, int),BucketWriter] + # Canaries and disconnect markers for BucketWriters created via Foolscap: + self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)] + def __repr__(self): return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) @@ -263,10 +272,14 @@ class StorageServer(service.MultiService, Referenceable): } return version - def remote_allocate_buckets(self, storage_index, - renew_secret, cancel_secret, - sharenums, allocated_size, - canary, owner_num=0): + def allocate_buckets(self, storage_index, + renew_secret, cancel_secret, + sharenums, allocated_size, + include_in_progress, + owner_num=0): + """ + Generic bucket allocation API. + """ # owner_num is not for clients to set, but rather it should be # curried into the PersonalStorageServer instance that is dedicated # to a particular owner. @@ -315,6 +328,7 @@ class StorageServer(service.MultiService, Referenceable): # great! we already have it. easy. pass elif os.path.exists(incominghome): + # TODO use include_in_progress # Note that we don't create BucketWriters for shnums that # have a partial share (in incoming/), so if a second upload # occurs while the first is still in progress, the second @@ -323,11 +337,12 @@ class StorageServer(service.MultiService, Referenceable): elif (not limited) or (remaining_space >= max_space_per_bucket): # ok! we need to create the new share file. bw = BucketWriter(self, incominghome, finalhome, - max_space_per_bucket, lease_info, canary) + max_space_per_bucket, lease_info) if self.no_storage: bw.throw_out_all_data = True bucketwriters[shnum] = bw - self._active_writers[bw] = 1 + self._active_writers[bw] = (storage_index, shnum) + self._bucket_writers[(storage_index, shnum)] = bw if limited: remaining_space -= max_space_per_bucket else: @@ -340,6 +355,21 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("allocate", self._get_current_time() - start) return alreadygot, bucketwriters + def remote_allocate_buckets(self, storage_index, + renew_secret, cancel_secret, + sharenums, allocated_size, + canary, owner_num=0): + """Foolscap-specific ``allocate_buckets()`` API.""" + alreadygot, bucketwriters = self.allocate_buckets( + storage_index, renew_secret, cancel_secret, sharenums, allocated_size, + include_in_progress=False, owner_num=owner_num, + ) + # Abort BucketWriters if disconnection happens. + for bw in bucketwriters.values(): + disconnect_marker = canary.notifyOnDisconnect(bw.disconnected) + self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker) + return alreadygot, bucketwriters + def _iter_share_files(self, storage_index): for shnum, filename in self._get_bucket_shares(storage_index): with open(filename, 'rb') as f: @@ -383,7 +413,11 @@ class StorageServer(service.MultiService, Referenceable): def bucket_writer_closed(self, bw, consumed_size): if self.stats_provider: self.stats_provider.count('storage_server.bytes_added', consumed_size) - del self._active_writers[bw] + storage_index, shnum = self._active_writers.pop(bw) + del self._bucket_writers[(storage_index, shnum)] + if bw in self._bucket_writer_disconnect_markers: + canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) + canary.dontNotifyOnDisconnect(disconnect_marker) def _get_bucket_shares(self, storage_index): """Return a list of (shnum, pathname) tuples for files that hold diff --git a/src/allmydata/test/common_util.py b/src/allmydata/test/common_util.py index b5229ca11..de8d774b3 100644 --- a/src/allmydata/test/common_util.py +++ b/src/allmydata/test/common_util.py @@ -314,6 +314,11 @@ class FakeCanary(object): def getPeer(self): return "" + # For use by tests: + def disconnected(self): + for (f, args, kwargs) in list(self.disconnectors.values()): + f(*args, **kwargs) + class ShouldFailMixin(object): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 60173cf75..3b36f293f 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -129,8 +129,7 @@ class Bucket(unittest.TestCase): def test_create(self): incoming, final = self.make_workdir("test_create") - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw.remote_write(0, b"a"*25) bw.remote_write(25, b"b"*25) bw.remote_write(50, b"c"*25) @@ -139,8 +138,7 @@ class Bucket(unittest.TestCase): def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw.remote_write(0, b"a"*25) bw.remote_write(25, b"b"*25) bw.remote_write(50, b"c"*7) # last block may be short @@ -158,8 +156,7 @@ class Bucket(unittest.TestCase): incoming, final = self.make_workdir( "test_write_past_size_errors-{}".format(i) ) - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) with self.assertRaises(DataTooLargeError): bw.remote_write(offset, b"a" * length) @@ -179,7 +176,6 @@ class Bucket(unittest.TestCase): incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) bw = BucketWriter( self, incoming, final, length, self.make_lease(), - FakeCanary() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. bw.remote_write(10, expected_data[10:20]) @@ -218,7 +214,6 @@ class Bucket(unittest.TestCase): incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) bw = BucketWriter( self, incoming, final, length, self.make_lease(), - FakeCanary() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. bw.remote_write(10, b"1" * 10) @@ -318,8 +313,7 @@ class BucketProxy(unittest.TestCase): final = os.path.join(basedir, "bucket") fileutil.make_dirs(basedir) fileutil.make_dirs(os.path.join(basedir, "tmp")) - bw = BucketWriter(self, incoming, final, size, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, size, self.make_lease()) rb = RemoteBucket(bw) return bw, rb, final @@ -669,7 +663,7 @@ class Server(unittest.TestCase): # the size we request. OVERHEAD = 3*4 LEASE_SIZE = 4+32+32+4 - canary = FakeCanary(True) + canary = FakeCanary() already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary) self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally @@ -677,16 +671,14 @@ class Server(unittest.TestCase): self.failUnlessEqual(len(ss._active_writers), 3) # allocating 1001-byte shares only leaves room for one - already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary) + canary2 = FakeCanary() + already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2) self.failUnlessEqual(len(writers2), 1) self.failUnlessEqual(len(ss._active_writers), 4) # we abandon the first set, so their provisional allocation should be # returned - - del already - del writers - gc.collect() + canary.disconnected() self.failUnlessEqual(len(ss._active_writers), 1) # now we have a provisional allocation of 1001 bytes @@ -697,9 +689,6 @@ class Server(unittest.TestCase): for bw in writers2.values(): bw.remote_write(0, b"a"*25) bw.remote_close() - del already2 - del writers2 - del bw self.failUnlessEqual(len(ss._active_writers), 0) # this also changes the amount reported as available by call_get_disk_stats @@ -707,13 +696,12 @@ class Server(unittest.TestCase): # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and # 5000-1085=3915 free, therefore we can fit 39 100byte shares - already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary) + canary3 = FakeCanary() + already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3) self.failUnlessEqual(len(writers3), 39) self.failUnlessEqual(len(ss._active_writers), 39) - del already3 - del writers3 - gc.collect() + canary3.disconnected() self.failUnlessEqual(len(ss._active_writers), 0) ss.disownServiceParent()