From a02d5f4c9cb81e46c461dda417be327b2de604a9 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 Sep 2021 13:02:01 -0400 Subject: [PATCH 01/10] Just stick to current behavior. --- src/allmydata/test/test_istorageserver.py | 44 +++-------------------- 1 file changed, 4 insertions(+), 40 deletions(-) diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 7c6090980..be327770e 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -98,13 +98,10 @@ class IStorageServerImmutableAPIsTestsMixin(object): # We validate the bucket objects' interface in a later test. @inlineCallbacks - @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793") def test_allocate_buckets_repeat(self): """ - allocate_buckets() with the same storage index returns the same result, - because the shares have not been written to. - - This fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793 + allocate_buckets() with the same storage index does not return + work-in-progress buckets, but will add any newly added buckets. """ storage_index, renew_secret, cancel_secret = ( new_storage_index(), @@ -115,7 +112,7 @@ class IStorageServerImmutableAPIsTestsMixin(object): storage_index, renew_secret, cancel_secret, - sharenums=set(range(5)), + sharenums=set(range(4)), allocated_size=1024, canary=Referenceable(), ) @@ -128,40 +125,7 @@ class IStorageServerImmutableAPIsTestsMixin(object): Referenceable(), ) self.assertEqual(already_got, already_got2) - self.assertEqual(set(allocated.keys()), set(allocated2.keys())) - - @skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793") - @inlineCallbacks - def test_allocate_buckets_more_sharenums(self): - """ - allocate_buckets() with the same storage index but more sharenums - acknowledges the extra shares don't exist. - - Fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793 - """ - storage_index, renew_secret, cancel_secret = ( - new_storage_index(), - new_secret(), - new_secret(), - ) - yield self.storage_server.allocate_buckets( - storage_index, - renew_secret, - cancel_secret, - sharenums=set(range(5)), - allocated_size=1024, - canary=Referenceable(), - ) - (already_got2, allocated2) = yield self.storage_server.allocate_buckets( - storage_index, - renew_secret, - cancel_secret, - sharenums=set(range(7)), - allocated_size=1024, - canary=Referenceable(), - ) - self.assertEqual(already_got2, set()) # none were fully written - self.assertEqual(set(allocated2.keys()), set(range(7))) + self.assertEqual(set(allocated2.keys()), {4}) @inlineCallbacks def test_written_shares_are_allocated(self): From 2b1502eff601a5c43b2d7f2fb9ad6de4959a4fc5 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 Sep 2021 13:02:24 -0400 Subject: [PATCH 02/10] News file. --- newsfragments/3793.minor | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 newsfragments/3793.minor diff --git a/newsfragments/3793.minor b/newsfragments/3793.minor new file mode 100644 index 000000000..e69de29bb From e64c397fc5d22052c6268e14704041ed0c85f7f6 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 Sep 2021 13:51:31 -0400 Subject: [PATCH 03/10] WIP disconnection test. --- src/allmydata/test/test_istorageserver.py | 75 +++++++++++++++++++++-- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index be327770e..daf264286 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -20,9 +20,9 @@ if PY2: from random import Random -from testtools import skipIf - from twisted.internet.defer import inlineCallbacks +from twisted.internet import reactor +from twisted.internet.task import deferLater from foolscap.api import Referenceable, RemoteException @@ -77,6 +77,10 @@ class IStorageServerImmutableAPIsTestsMixin(object): Tests for ``IStorageServer``'s immutable APIs. ``self.storage_server`` is expected to provide ``IStorageServer``. + + ``self.disconnect()`` should disconnect and then reconnect, creating a new + ``self.storage_server``. Some implementations may wish to skip tests using + this; HTTP has no notion of disconnection. """ @inlineCallbacks @@ -100,7 +104,7 @@ class IStorageServerImmutableAPIsTestsMixin(object): @inlineCallbacks def test_allocate_buckets_repeat(self): """ - allocate_buckets() with the same storage index does not return + ``IStorageServer.allocate_buckets()`` with the same storage index does not return work-in-progress buckets, but will add any newly added buckets. """ storage_index, renew_secret, cancel_secret = ( @@ -127,6 +131,45 @@ class IStorageServerImmutableAPIsTestsMixin(object): self.assertEqual(already_got, already_got2) self.assertEqual(set(allocated2.keys()), {4}) + @inlineCallbacks + def test_disconnection(self): + """ + If we disconnect in the middle of writing to a bucket, all data is + wiped, and it's even possible to write different data to the bucket + (don't do that though, mostly it's just a good way to test that the + data really was wiped). + """ + storage_index, renew_secret, cancel_secret = ( + new_storage_index(), + new_secret(), + new_secret(), + ) + (_, allocated) = yield self.storage_server.allocate_buckets( + storage_index, + renew_secret, + cancel_secret, + sharenums={0}, + allocated_size=1024, + canary=Referenceable(), + ) + + # Bucket 1 is fully written in one go. + yield allocated[0].callRemote("write", 0, b"1" * 1024) + + # Disconnect: + yield self.disconnect() + + # Write different data with no complaint: + (_, allocated) = yield self.storage_server.allocate_buckets( + storage_index, + renew_secret, + cancel_secret, + sharenums={0}, + allocated_size=1024, + canary=Referenceable(), + ) + yield allocated[0].callRemote("write", 0, b"2" * 1024) + @inlineCallbacks def test_written_shares_are_allocated(self): """ @@ -359,15 +402,16 @@ class IStorageServerImmutableAPIsTestsMixin(object): class _FoolscapMixin(SystemTestMixin): """Run tests on Foolscap version of ``IStorageServer.""" + def _get_native_server(self): + return next(iter(self.clients[0].storage_broker.get_known_servers())) + @inlineCallbacks def setUp(self): AsyncTestCase.setUp(self) self.basedir = "test_istorageserver/" + self.id() yield SystemTestMixin.setUp(self) yield self.set_up_nodes(1) - self.storage_server = next( - iter(self.clients[0].storage_broker.get_known_servers()) - ).get_storage_server() + self.storage_server = self._get_native_server().get_storage_server() self.assertTrue(IStorageServer.providedBy(self.storage_server)) @inlineCallbacks @@ -375,6 +419,25 @@ class _FoolscapMixin(SystemTestMixin): AsyncTestCase.tearDown(self) yield SystemTestMixin.tearDown(self) + @inlineCallbacks + def disconnect(self): + """ + Disconnect and then reconnect with a new ``IStorageServer``. + """ + current = self.storage_server + self._get_native_server()._rref.tracker.broker.transport.loseConnection() + for i in range(100000): + yield deferLater(reactor, 0.001) + import pdb + + pdb.set_trace() + new = self._get_native_server().get_storage_server() + if new is not None and new is not current: + self.storage_server = new + return + + raise RuntimeError("Failed to reconnect") + class FoolscapSharedAPIsTests( _FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase From 51e8b5e197e07ec2247c75212e0df342bc3a091d Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 29 Sep 2021 11:17:33 -0400 Subject: [PATCH 04/10] Disconnection test works now. --- src/allmydata/test/test_istorageserver.py | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index daf264286..25b814237 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -135,9 +135,10 @@ class IStorageServerImmutableAPIsTestsMixin(object): def test_disconnection(self): """ If we disconnect in the middle of writing to a bucket, all data is - wiped, and it's even possible to write different data to the bucket - (don't do that though, mostly it's just a good way to test that the - data really was wiped). + wiped, and it's even possible to write different data to the bucket. + + (In the real world one shouldn't do that, but writing different data is + a good way to test that the original data really was wiped.) """ storage_index, renew_secret, cancel_secret = ( new_storage_index(), @@ -425,18 +426,9 @@ class _FoolscapMixin(SystemTestMixin): Disconnect and then reconnect with a new ``IStorageServer``. """ current = self.storage_server - self._get_native_server()._rref.tracker.broker.transport.loseConnection() - for i in range(100000): - yield deferLater(reactor, 0.001) - import pdb - - pdb.set_trace() - new = self._get_native_server().get_storage_server() - if new is not None and new is not current: - self.storage_server = new - return - - raise RuntimeError("Failed to reconnect") + yield self.bounce_client(0) + self.storage_server = self._get_native_server().get_storage_server() + assert self.storage_server is not current class FoolscapSharedAPIsTests( From a4153b71256db2dd4094e2e21324dd2b8c9cfcab Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 29 Sep 2021 11:56:04 -0400 Subject: [PATCH 05/10] Implementation plan. --- src/allmydata/storage/immutable.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index 9e3a9622a..c2b190e01 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -204,6 +204,16 @@ class ShareFile(object): self.unlink() return space_freed +# TODOs +# Batch 1: +# - bucketwriter dict in the server, to persist them + TEST of persistence +# - aborting bucketwriter removes it from server persistent + TEST +# - get rid of disconnect notification (probably no test, rely on existing?) +# - add bucketwriter cancellation to remote_allocate_buckets() (probably rely on existing tests) +# Batch 2: +# - scheduled events for aborting bucketwriter + TEST +# - bucketwriter writes delay cancellation + TEST + @implementer(RIBucketWriter) class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 From 8fb6afee1bc2ff5509695c5d53b2d5dd3d72ed08 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 29 Sep 2021 13:42:17 -0400 Subject: [PATCH 06/10] Refactor BucketWriters such that disconnection can be limited Foolscap. --- src/allmydata/storage/immutable.py | 14 +++----- src/allmydata/storage/server.py | 54 ++++++++++++++++++++++++------ src/allmydata/test/common_util.py | 5 +++ src/allmydata/test/test_storage.py | 34 ++++++------------- 4 files changed, 65 insertions(+), 42 deletions(-) diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index c2b190e01..d3b6ce875 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -208,8 +208,9 @@ class ShareFile(object): # Batch 1: # - bucketwriter dict in the server, to persist them + TEST of persistence # - aborting bucketwriter removes it from server persistent + TEST -# - get rid of disconnect notification (probably no test, rely on existing?) -# - add bucketwriter cancellation to remote_allocate_buckets() (probably rely on existing tests) +# - disconnect still aborts _for Foolscap only_ +# - existing in-use buckets are not returned _for Foolscap only_ +# - this implies splitting remote_allocate_buckets into generic and Foolscap-y parts # Batch 2: # - scheduled events for aborting bucketwriter + TEST # - bucketwriter writes delay cancellation + TEST @@ -218,13 +219,11 @@ class ShareFile(object): @implementer(RIBucketWriter) class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 - def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary): + def __init__(self, ss, incominghome, finalhome, max_size, lease_info): self.ss = ss self.incominghome = incominghome self.finalhome = finalhome self._max_size = max_size # don't allow the client to write more than this - self._canary = canary - self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected) self.closed = False self.throw_out_all_data = False self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) @@ -290,22 +289,19 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 pass self._sharefile = None self.closed = True - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) filelen = os.stat(self.finalhome)[stat.ST_SIZE] self.ss.bucket_writer_closed(self, filelen) self.ss.add_latency("close", time.time() - start) self.ss.count("close") - def _disconnected(self): + def disconnected(self): if not self.closed: self._abort() def remote_abort(self): log.msg("storage: aborting sharefile %s" % self.incominghome, facility="tahoe.storage", level=log.UNUSUAL) - if not self.closed: - self._canary.dontNotifyOnDisconnect(self._disconnect_marker) self._abort() self.ss.count("abort") diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index f4996756e..92b3d2a1b 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -11,13 +11,14 @@ if PY2: # Omit open() to get native behavior where open("w") always accepts native # strings. Omit bytes so we don't leak future's custom bytes. from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401 - +else: + from typing import Dict import os, re, struct, time -import weakref import six from foolscap.api import Referenceable +from foolscap.ipb import IRemoteReference from twisted.application import service from zope.interface import implementer @@ -89,7 +90,6 @@ class StorageServer(service.MultiService, Referenceable): self.incomingdir = os.path.join(sharedir, 'incoming') self._clean_incomplete() fileutil.make_dirs(self.incomingdir) - self._active_writers = weakref.WeakKeyDictionary() log.msg("StorageServer created", facility="tahoe.storage") if reserved_space: @@ -121,6 +121,15 @@ class StorageServer(service.MultiService, Referenceable): self.lease_checker.setServiceParent(self) self._get_current_time = get_current_time + # Currently being-written Bucketwriters. TODO Can probably refactor so + # active_writers is unnecessary, do as second pass. + # Map BucketWriter -> (storage_index, share_num) + self._active_writers = {} # type: Dict[BucketWriter, (bytes,int)] + # Map (storage_index, share_num) -> BucketWriter: + self._bucket_writers = {} # type: Dict[(bytes, int),BucketWriter] + # Canaries and disconnect markers for BucketWriters created via Foolscap: + self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)] + def __repr__(self): return "" % (idlib.shortnodeid_b2a(self.my_nodeid),) @@ -263,10 +272,14 @@ class StorageServer(service.MultiService, Referenceable): } return version - def remote_allocate_buckets(self, storage_index, - renew_secret, cancel_secret, - sharenums, allocated_size, - canary, owner_num=0): + def allocate_buckets(self, storage_index, + renew_secret, cancel_secret, + sharenums, allocated_size, + include_in_progress, + owner_num=0): + """ + Generic bucket allocation API. + """ # owner_num is not for clients to set, but rather it should be # curried into the PersonalStorageServer instance that is dedicated # to a particular owner. @@ -315,6 +328,7 @@ class StorageServer(service.MultiService, Referenceable): # great! we already have it. easy. pass elif os.path.exists(incominghome): + # TODO use include_in_progress # Note that we don't create BucketWriters for shnums that # have a partial share (in incoming/), so if a second upload # occurs while the first is still in progress, the second @@ -323,11 +337,12 @@ class StorageServer(service.MultiService, Referenceable): elif (not limited) or (remaining_space >= max_space_per_bucket): # ok! we need to create the new share file. bw = BucketWriter(self, incominghome, finalhome, - max_space_per_bucket, lease_info, canary) + max_space_per_bucket, lease_info) if self.no_storage: bw.throw_out_all_data = True bucketwriters[shnum] = bw - self._active_writers[bw] = 1 + self._active_writers[bw] = (storage_index, shnum) + self._bucket_writers[(storage_index, shnum)] = bw if limited: remaining_space -= max_space_per_bucket else: @@ -340,6 +355,21 @@ class StorageServer(service.MultiService, Referenceable): self.add_latency("allocate", self._get_current_time() - start) return alreadygot, bucketwriters + def remote_allocate_buckets(self, storage_index, + renew_secret, cancel_secret, + sharenums, allocated_size, + canary, owner_num=0): + """Foolscap-specific ``allocate_buckets()`` API.""" + alreadygot, bucketwriters = self.allocate_buckets( + storage_index, renew_secret, cancel_secret, sharenums, allocated_size, + include_in_progress=False, owner_num=owner_num, + ) + # Abort BucketWriters if disconnection happens. + for bw in bucketwriters.values(): + disconnect_marker = canary.notifyOnDisconnect(bw.disconnected) + self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker) + return alreadygot, bucketwriters + def _iter_share_files(self, storage_index): for shnum, filename in self._get_bucket_shares(storage_index): with open(filename, 'rb') as f: @@ -383,7 +413,11 @@ class StorageServer(service.MultiService, Referenceable): def bucket_writer_closed(self, bw, consumed_size): if self.stats_provider: self.stats_provider.count('storage_server.bytes_added', consumed_size) - del self._active_writers[bw] + storage_index, shnum = self._active_writers.pop(bw) + del self._bucket_writers[(storage_index, shnum)] + if bw in self._bucket_writer_disconnect_markers: + canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) + canary.dontNotifyOnDisconnect(disconnect_marker) def _get_bucket_shares(self, storage_index): """Return a list of (shnum, pathname) tuples for files that hold diff --git a/src/allmydata/test/common_util.py b/src/allmydata/test/common_util.py index b5229ca11..de8d774b3 100644 --- a/src/allmydata/test/common_util.py +++ b/src/allmydata/test/common_util.py @@ -314,6 +314,11 @@ class FakeCanary(object): def getPeer(self): return "" + # For use by tests: + def disconnected(self): + for (f, args, kwargs) in list(self.disconnectors.values()): + f(*args, **kwargs) + class ShouldFailMixin(object): diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 60173cf75..3b36f293f 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -129,8 +129,7 @@ class Bucket(unittest.TestCase): def test_create(self): incoming, final = self.make_workdir("test_create") - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw.remote_write(0, b"a"*25) bw.remote_write(25, b"b"*25) bw.remote_write(50, b"c"*25) @@ -139,8 +138,7 @@ class Bucket(unittest.TestCase): def test_readwrite(self): incoming, final = self.make_workdir("test_readwrite") - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) bw.remote_write(0, b"a"*25) bw.remote_write(25, b"b"*25) bw.remote_write(50, b"c"*7) # last block may be short @@ -158,8 +156,7 @@ class Bucket(unittest.TestCase): incoming, final = self.make_workdir( "test_write_past_size_errors-{}".format(i) ) - bw = BucketWriter(self, incoming, final, 200, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, 200, self.make_lease()) with self.assertRaises(DataTooLargeError): bw.remote_write(offset, b"a" * length) @@ -179,7 +176,6 @@ class Bucket(unittest.TestCase): incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) bw = BucketWriter( self, incoming, final, length, self.make_lease(), - FakeCanary() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. bw.remote_write(10, expected_data[10:20]) @@ -218,7 +214,6 @@ class Bucket(unittest.TestCase): incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) bw = BucketWriter( self, incoming, final, length, self.make_lease(), - FakeCanary() ) # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. bw.remote_write(10, b"1" * 10) @@ -318,8 +313,7 @@ class BucketProxy(unittest.TestCase): final = os.path.join(basedir, "bucket") fileutil.make_dirs(basedir) fileutil.make_dirs(os.path.join(basedir, "tmp")) - bw = BucketWriter(self, incoming, final, size, self.make_lease(), - FakeCanary()) + bw = BucketWriter(self, incoming, final, size, self.make_lease()) rb = RemoteBucket(bw) return bw, rb, final @@ -669,7 +663,7 @@ class Server(unittest.TestCase): # the size we request. OVERHEAD = 3*4 LEASE_SIZE = 4+32+32+4 - canary = FakeCanary(True) + canary = FakeCanary() already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary) self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally @@ -677,16 +671,14 @@ class Server(unittest.TestCase): self.failUnlessEqual(len(ss._active_writers), 3) # allocating 1001-byte shares only leaves room for one - already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary) + canary2 = FakeCanary() + already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2) self.failUnlessEqual(len(writers2), 1) self.failUnlessEqual(len(ss._active_writers), 4) # we abandon the first set, so their provisional allocation should be # returned - - del already - del writers - gc.collect() + canary.disconnected() self.failUnlessEqual(len(ss._active_writers), 1) # now we have a provisional allocation of 1001 bytes @@ -697,9 +689,6 @@ class Server(unittest.TestCase): for bw in writers2.values(): bw.remote_write(0, b"a"*25) bw.remote_close() - del already2 - del writers2 - del bw self.failUnlessEqual(len(ss._active_writers), 0) # this also changes the amount reported as available by call_get_disk_stats @@ -707,13 +696,12 @@ class Server(unittest.TestCase): # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and # 5000-1085=3915 free, therefore we can fit 39 100byte shares - already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary) + canary3 = FakeCanary() + already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3) self.failUnlessEqual(len(writers3), 39) self.failUnlessEqual(len(ss._active_writers), 39) - del already3 - del writers3 - gc.collect() + canary3.disconnected() self.failUnlessEqual(len(ss._active_writers), 0) ss.disownServiceParent() From 58d7e2f62785eaa698107528a06469e86f1cbf05 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 29 Sep 2021 13:58:53 -0400 Subject: [PATCH 07/10] Simplify implementation. --- src/allmydata/storage/immutable.py | 11 ----------- src/allmydata/storage/server.py | 28 +++++++++++++--------------- src/allmydata/test/test_storage.py | 12 ++++++------ 3 files changed, 19 insertions(+), 32 deletions(-) diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index d3b6ce875..b8b18f140 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -204,17 +204,6 @@ class ShareFile(object): self.unlink() return space_freed -# TODOs -# Batch 1: -# - bucketwriter dict in the server, to persist them + TEST of persistence -# - aborting bucketwriter removes it from server persistent + TEST -# - disconnect still aborts _for Foolscap only_ -# - existing in-use buckets are not returned _for Foolscap only_ -# - this implies splitting remote_allocate_buckets into generic and Foolscap-y parts -# Batch 2: -# - scheduled events for aborting bucketwriter + TEST -# - bucketwriter writes delay cancellation + TEST - @implementer(RIBucketWriter) class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 92b3d2a1b..2a4b9a54d 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -121,12 +121,14 @@ class StorageServer(service.MultiService, Referenceable): self.lease_checker.setServiceParent(self) self._get_current_time = get_current_time - # Currently being-written Bucketwriters. TODO Can probably refactor so - # active_writers is unnecessary, do as second pass. - # Map BucketWriter -> (storage_index, share_num) - self._active_writers = {} # type: Dict[BucketWriter, (bytes,int)] - # Map (storage_index, share_num) -> BucketWriter: - self._bucket_writers = {} # type: Dict[(bytes, int),BucketWriter] + # Currently being-written Bucketwriters. For Foolscap, lifetime is tied + # to connection: when disconnection happens, the BucketWriters are + # removed. For HTTP, this makes no sense, so there will be + # timeout-based cleanup; see + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807. + + # Map in-progress filesystem path -> BucketWriter: + self._bucket_writers = {} # type: Dict[str,BucketWriter] # Canaries and disconnect markers for BucketWriters created via Foolscap: self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)] @@ -247,7 +249,7 @@ class StorageServer(service.MultiService, Referenceable): def allocated_size(self): space = 0 - for bw in self._active_writers: + for bw in self._bucket_writers.values(): space += bw.allocated_size() return space @@ -275,7 +277,6 @@ class StorageServer(service.MultiService, Referenceable): def allocate_buckets(self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, - include_in_progress, owner_num=0): """ Generic bucket allocation API. @@ -328,8 +329,7 @@ class StorageServer(service.MultiService, Referenceable): # great! we already have it. easy. pass elif os.path.exists(incominghome): - # TODO use include_in_progress - # Note that we don't create BucketWriters for shnums that + # For Foolscap we don't create BucketWriters for shnums that # have a partial share (in incoming/), so if a second upload # occurs while the first is still in progress, the second # uploader will use different storage servers. @@ -341,8 +341,7 @@ class StorageServer(service.MultiService, Referenceable): if self.no_storage: bw.throw_out_all_data = True bucketwriters[shnum] = bw - self._active_writers[bw] = (storage_index, shnum) - self._bucket_writers[(storage_index, shnum)] = bw + self._bucket_writers[incominghome] = bw if limited: remaining_space -= max_space_per_bucket else: @@ -362,7 +361,7 @@ class StorageServer(service.MultiService, Referenceable): """Foolscap-specific ``allocate_buckets()`` API.""" alreadygot, bucketwriters = self.allocate_buckets( storage_index, renew_secret, cancel_secret, sharenums, allocated_size, - include_in_progress=False, owner_num=owner_num, + owner_num=owner_num, ) # Abort BucketWriters if disconnection happens. for bw in bucketwriters.values(): @@ -413,8 +412,7 @@ class StorageServer(service.MultiService, Referenceable): def bucket_writer_closed(self, bw, consumed_size): if self.stats_provider: self.stats_provider.count('storage_server.bytes_added', consumed_size) - storage_index, shnum = self._active_writers.pop(bw) - del self._bucket_writers[(storage_index, shnum)] + del self._bucket_writers[bw.incominghome] if bw in self._bucket_writer_disconnect_markers: canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) canary.dontNotifyOnDisconnect(disconnect_marker) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 3b36f293f..640280bc3 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -668,19 +668,19 @@ class Server(unittest.TestCase): self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally # allocated, allowing only 2000 more to be claimed - self.failUnlessEqual(len(ss._active_writers), 3) + self.failUnlessEqual(len(ss._bucket_writers), 3) # allocating 1001-byte shares only leaves room for one canary2 = FakeCanary() already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2) self.failUnlessEqual(len(writers2), 1) - self.failUnlessEqual(len(ss._active_writers), 4) + self.failUnlessEqual(len(ss._bucket_writers), 4) # we abandon the first set, so their provisional allocation should be # returned canary.disconnected() - self.failUnlessEqual(len(ss._active_writers), 1) + self.failUnlessEqual(len(ss._bucket_writers), 1) # now we have a provisional allocation of 1001 bytes # and we close the second set, so their provisional allocation should @@ -689,7 +689,7 @@ class Server(unittest.TestCase): for bw in writers2.values(): bw.remote_write(0, b"a"*25) bw.remote_close() - self.failUnlessEqual(len(ss._active_writers), 0) + self.failUnlessEqual(len(ss._bucket_writers), 0) # this also changes the amount reported as available by call_get_disk_stats allocated = 1001 + OVERHEAD + LEASE_SIZE @@ -699,11 +699,11 @@ class Server(unittest.TestCase): canary3 = FakeCanary() already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3) self.failUnlessEqual(len(writers3), 39) - self.failUnlessEqual(len(ss._active_writers), 39) + self.failUnlessEqual(len(ss._bucket_writers), 39) canary3.disconnected() - self.failUnlessEqual(len(ss._active_writers), 0) + self.failUnlessEqual(len(ss._bucket_writers), 0) ss.disownServiceParent() del ss From f8604e239406278dc7082ddfd730524b035f3800 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 29 Sep 2021 14:00:11 -0400 Subject: [PATCH 08/10] Fix flakes. --- src/allmydata/test/test_istorageserver.py | 2 -- src/allmydata/test/test_storage.py | 1 - 2 files changed, 3 deletions(-) diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 25b814237..9ad6a8224 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -21,8 +21,6 @@ if PY2: from random import Random from twisted.internet.defer import inlineCallbacks -from twisted.internet import reactor -from twisted.internet.task import deferLater from foolscap.api import Referenceable, RemoteException diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 640280bc3..d18960a1e 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -19,7 +19,6 @@ import platform import stat import struct import shutil -import gc from uuid import uuid4 from twisted.trial import unittest From 016d6b4530af011c5ff470846d82d85a941848c3 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 29 Sep 2021 14:10:14 -0400 Subject: [PATCH 09/10] Fix spurious type checking error. --- src/allmydata/storage/server.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 2a4b9a54d..041783a4e 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -274,10 +274,10 @@ class StorageServer(service.MultiService, Referenceable): } return version - def allocate_buckets(self, storage_index, - renew_secret, cancel_secret, - sharenums, allocated_size, - owner_num=0): + def _allocate_buckets(self, storage_index, + renew_secret, cancel_secret, + sharenums, allocated_size, + owner_num=0): """ Generic bucket allocation API. """ @@ -359,7 +359,7 @@ class StorageServer(service.MultiService, Referenceable): sharenums, allocated_size, canary, owner_num=0): """Foolscap-specific ``allocate_buckets()`` API.""" - alreadygot, bucketwriters = self.allocate_buckets( + alreadygot, bucketwriters = self._allocate_buckets( storage_index, renew_secret, cancel_secret, sharenums, allocated_size, owner_num=owner_num, ) From 23fd11be43b2978e07d9be2c0685be0469f47149 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 29 Sep 2021 14:13:18 -0400 Subject: [PATCH 10/10] Expand explanation. --- src/allmydata/test/test_istorageserver.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/allmydata/test/test_istorageserver.py b/src/allmydata/test/test_istorageserver.py index 9ad6a8224..29ce272e2 100644 --- a/src/allmydata/test/test_istorageserver.py +++ b/src/allmydata/test/test_istorageserver.py @@ -137,6 +137,10 @@ class IStorageServerImmutableAPIsTestsMixin(object): (In the real world one shouldn't do that, but writing different data is a good way to test that the original data really was wiped.) + + HTTP protocol should skip this test, since disconnection is meaningless + concept; this is more about testing implicit contract the Foolscap + implementation depends on doesn't change as we refactor things. """ storage_index, renew_secret, cancel_secret = ( new_storage_index(),