diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index d3b6ce875..b8b18f140 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -204,17 +204,6 @@ class ShareFile(object): self.unlink() return space_freed -# TODOs -# Batch 1: -# - bucketwriter dict in the server, to persist them + TEST of persistence -# - aborting bucketwriter removes it from server persistent + TEST -# - disconnect still aborts _for Foolscap only_ -# - existing in-use buckets are not returned _for Foolscap only_ -# - this implies splitting remote_allocate_buckets into generic and Foolscap-y parts -# Batch 2: -# - scheduled events for aborting bucketwriter + TEST -# - bucketwriter writes delay cancellation + TEST - @implementer(RIBucketWriter) class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py index 92b3d2a1b..2a4b9a54d 100644 --- a/src/allmydata/storage/server.py +++ b/src/allmydata/storage/server.py @@ -121,12 +121,14 @@ class StorageServer(service.MultiService, Referenceable): self.lease_checker.setServiceParent(self) self._get_current_time = get_current_time - # Currently being-written Bucketwriters. TODO Can probably refactor so - # active_writers is unnecessary, do as second pass. - # Map BucketWriter -> (storage_index, share_num) - self._active_writers = {} # type: Dict[BucketWriter, (bytes,int)] - # Map (storage_index, share_num) -> BucketWriter: - self._bucket_writers = {} # type: Dict[(bytes, int),BucketWriter] + # Currently being-written Bucketwriters. For Foolscap, lifetime is tied + # to connection: when disconnection happens, the BucketWriters are + # removed. For HTTP, this makes no sense, so there will be + # timeout-based cleanup; see + # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807. + + # Map in-progress filesystem path -> BucketWriter: + self._bucket_writers = {} # type: Dict[str,BucketWriter] # Canaries and disconnect markers for BucketWriters created via Foolscap: self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)] @@ -247,7 +249,7 @@ class StorageServer(service.MultiService, Referenceable): def allocated_size(self): space = 0 - for bw in self._active_writers: + for bw in self._bucket_writers.values(): space += bw.allocated_size() return space @@ -275,7 +277,6 @@ class StorageServer(service.MultiService, Referenceable): def allocate_buckets(self, storage_index, renew_secret, cancel_secret, sharenums, allocated_size, - include_in_progress, owner_num=0): """ Generic bucket allocation API. @@ -328,8 +329,7 @@ class StorageServer(service.MultiService, Referenceable): # great! we already have it. easy. pass elif os.path.exists(incominghome): - # TODO use include_in_progress - # Note that we don't create BucketWriters for shnums that + # For Foolscap we don't create BucketWriters for shnums that # have a partial share (in incoming/), so if a second upload # occurs while the first is still in progress, the second # uploader will use different storage servers. @@ -341,8 +341,7 @@ class StorageServer(service.MultiService, Referenceable): if self.no_storage: bw.throw_out_all_data = True bucketwriters[shnum] = bw - self._active_writers[bw] = (storage_index, shnum) - self._bucket_writers[(storage_index, shnum)] = bw + self._bucket_writers[incominghome] = bw if limited: remaining_space -= max_space_per_bucket else: @@ -362,7 +361,7 @@ class StorageServer(service.MultiService, Referenceable): """Foolscap-specific ``allocate_buckets()`` API.""" alreadygot, bucketwriters = self.allocate_buckets( storage_index, renew_secret, cancel_secret, sharenums, allocated_size, - include_in_progress=False, owner_num=owner_num, + owner_num=owner_num, ) # Abort BucketWriters if disconnection happens. for bw in bucketwriters.values(): @@ -413,8 +412,7 @@ class StorageServer(service.MultiService, Referenceable): def bucket_writer_closed(self, bw, consumed_size): if self.stats_provider: self.stats_provider.count('storage_server.bytes_added', consumed_size) - storage_index, shnum = self._active_writers.pop(bw) - del self._bucket_writers[(storage_index, shnum)] + del self._bucket_writers[bw.incominghome] if bw in self._bucket_writer_disconnect_markers: canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw) canary.dontNotifyOnDisconnect(disconnect_marker) diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 3b36f293f..640280bc3 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -668,19 +668,19 @@ class Server(unittest.TestCase): self.failUnlessEqual(len(writers), 3) # now the StorageServer should have 3000 bytes provisionally # allocated, allowing only 2000 more to be claimed - self.failUnlessEqual(len(ss._active_writers), 3) + self.failUnlessEqual(len(ss._bucket_writers), 3) # allocating 1001-byte shares only leaves room for one canary2 = FakeCanary() already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2) self.failUnlessEqual(len(writers2), 1) - self.failUnlessEqual(len(ss._active_writers), 4) + self.failUnlessEqual(len(ss._bucket_writers), 4) # we abandon the first set, so their provisional allocation should be # returned canary.disconnected() - self.failUnlessEqual(len(ss._active_writers), 1) + self.failUnlessEqual(len(ss._bucket_writers), 1) # now we have a provisional allocation of 1001 bytes # and we close the second set, so their provisional allocation should @@ -689,7 +689,7 @@ class Server(unittest.TestCase): for bw in writers2.values(): bw.remote_write(0, b"a"*25) bw.remote_close() - self.failUnlessEqual(len(ss._active_writers), 0) + self.failUnlessEqual(len(ss._bucket_writers), 0) # this also changes the amount reported as available by call_get_disk_stats allocated = 1001 + OVERHEAD + LEASE_SIZE @@ -699,11 +699,11 @@ class Server(unittest.TestCase): canary3 = FakeCanary() already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3) self.failUnlessEqual(len(writers3), 39) - self.failUnlessEqual(len(ss._active_writers), 39) + self.failUnlessEqual(len(ss._bucket_writers), 39) canary3.disconnected() - self.failUnlessEqual(len(ss._active_writers), 0) + self.failUnlessEqual(len(ss._bucket_writers), 0) ss.disownServiceParent() del ss