mirror of
https://github.com/tahoe-lafs/tahoe-lafs.git
synced 2025-01-31 16:36:20 +00:00
Simplify implementation.
This commit is contained in:
parent
8fb6afee1b
commit
58d7e2f627
@ -204,17 +204,6 @@ class ShareFile(object):
|
||||
self.unlink()
|
||||
return space_freed
|
||||
|
||||
# TODOs
|
||||
# Batch 1:
|
||||
# - bucketwriter dict in the server, to persist them + TEST of persistence
|
||||
# - aborting bucketwriter removes it from server persistent + TEST
|
||||
# - disconnect still aborts _for Foolscap only_
|
||||
# - existing in-use buckets are not returned _for Foolscap only_
|
||||
# - this implies splitting remote_allocate_buckets into generic and Foolscap-y parts
|
||||
# Batch 2:
|
||||
# - scheduled events for aborting bucketwriter + TEST
|
||||
# - bucketwriter writes delay cancellation + TEST
|
||||
|
||||
|
||||
@implementer(RIBucketWriter)
|
||||
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
|
||||
|
@ -121,12 +121,14 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
self.lease_checker.setServiceParent(self)
|
||||
self._get_current_time = get_current_time
|
||||
|
||||
# Currently being-written Bucketwriters. TODO Can probably refactor so
|
||||
# active_writers is unnecessary, do as second pass.
|
||||
# Map BucketWriter -> (storage_index, share_num)
|
||||
self._active_writers = {} # type: Dict[BucketWriter, (bytes,int)]
|
||||
# Map (storage_index, share_num) -> BucketWriter:
|
||||
self._bucket_writers = {} # type: Dict[(bytes, int),BucketWriter]
|
||||
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
|
||||
# to connection: when disconnection happens, the BucketWriters are
|
||||
# removed. For HTTP, this makes no sense, so there will be
|
||||
# timeout-based cleanup; see
|
||||
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807.
|
||||
|
||||
# Map in-progress filesystem path -> BucketWriter:
|
||||
self._bucket_writers = {} # type: Dict[str,BucketWriter]
|
||||
# Canaries and disconnect markers for BucketWriters created via Foolscap:
|
||||
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
|
||||
|
||||
@ -247,7 +249,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
|
||||
def allocated_size(self):
|
||||
space = 0
|
||||
for bw in self._active_writers:
|
||||
for bw in self._bucket_writers.values():
|
||||
space += bw.allocated_size()
|
||||
return space
|
||||
|
||||
@ -275,7 +277,6 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
def allocate_buckets(self, storage_index,
|
||||
renew_secret, cancel_secret,
|
||||
sharenums, allocated_size,
|
||||
include_in_progress,
|
||||
owner_num=0):
|
||||
"""
|
||||
Generic bucket allocation API.
|
||||
@ -328,8 +329,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
# great! we already have it. easy.
|
||||
pass
|
||||
elif os.path.exists(incominghome):
|
||||
# TODO use include_in_progress
|
||||
# Note that we don't create BucketWriters for shnums that
|
||||
# For Foolscap we don't create BucketWriters for shnums that
|
||||
# have a partial share (in incoming/), so if a second upload
|
||||
# occurs while the first is still in progress, the second
|
||||
# uploader will use different storage servers.
|
||||
@ -341,8 +341,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
if self.no_storage:
|
||||
bw.throw_out_all_data = True
|
||||
bucketwriters[shnum] = bw
|
||||
self._active_writers[bw] = (storage_index, shnum)
|
||||
self._bucket_writers[(storage_index, shnum)] = bw
|
||||
self._bucket_writers[incominghome] = bw
|
||||
if limited:
|
||||
remaining_space -= max_space_per_bucket
|
||||
else:
|
||||
@ -362,7 +361,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
"""Foolscap-specific ``allocate_buckets()`` API."""
|
||||
alreadygot, bucketwriters = self.allocate_buckets(
|
||||
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
|
||||
include_in_progress=False, owner_num=owner_num,
|
||||
owner_num=owner_num,
|
||||
)
|
||||
# Abort BucketWriters if disconnection happens.
|
||||
for bw in bucketwriters.values():
|
||||
@ -413,8 +412,7 @@ class StorageServer(service.MultiService, Referenceable):
|
||||
def bucket_writer_closed(self, bw, consumed_size):
|
||||
if self.stats_provider:
|
||||
self.stats_provider.count('storage_server.bytes_added', consumed_size)
|
||||
storage_index, shnum = self._active_writers.pop(bw)
|
||||
del self._bucket_writers[(storage_index, shnum)]
|
||||
del self._bucket_writers[bw.incominghome]
|
||||
if bw in self._bucket_writer_disconnect_markers:
|
||||
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
|
||||
canary.dontNotifyOnDisconnect(disconnect_marker)
|
||||
|
@ -668,19 +668,19 @@ class Server(unittest.TestCase):
|
||||
self.failUnlessEqual(len(writers), 3)
|
||||
# now the StorageServer should have 3000 bytes provisionally
|
||||
# allocated, allowing only 2000 more to be claimed
|
||||
self.failUnlessEqual(len(ss._active_writers), 3)
|
||||
self.failUnlessEqual(len(ss._bucket_writers), 3)
|
||||
|
||||
# allocating 1001-byte shares only leaves room for one
|
||||
canary2 = FakeCanary()
|
||||
already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2)
|
||||
self.failUnlessEqual(len(writers2), 1)
|
||||
self.failUnlessEqual(len(ss._active_writers), 4)
|
||||
self.failUnlessEqual(len(ss._bucket_writers), 4)
|
||||
|
||||
# we abandon the first set, so their provisional allocation should be
|
||||
# returned
|
||||
canary.disconnected()
|
||||
|
||||
self.failUnlessEqual(len(ss._active_writers), 1)
|
||||
self.failUnlessEqual(len(ss._bucket_writers), 1)
|
||||
# now we have a provisional allocation of 1001 bytes
|
||||
|
||||
# and we close the second set, so their provisional allocation should
|
||||
@ -689,7 +689,7 @@ class Server(unittest.TestCase):
|
||||
for bw in writers2.values():
|
||||
bw.remote_write(0, b"a"*25)
|
||||
bw.remote_close()
|
||||
self.failUnlessEqual(len(ss._active_writers), 0)
|
||||
self.failUnlessEqual(len(ss._bucket_writers), 0)
|
||||
|
||||
# this also changes the amount reported as available by call_get_disk_stats
|
||||
allocated = 1001 + OVERHEAD + LEASE_SIZE
|
||||
@ -699,11 +699,11 @@ class Server(unittest.TestCase):
|
||||
canary3 = FakeCanary()
|
||||
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3)
|
||||
self.failUnlessEqual(len(writers3), 39)
|
||||
self.failUnlessEqual(len(ss._active_writers), 39)
|
||||
self.failUnlessEqual(len(ss._bucket_writers), 39)
|
||||
|
||||
canary3.disconnected()
|
||||
|
||||
self.failUnlessEqual(len(ss._active_writers), 0)
|
||||
self.failUnlessEqual(len(ss._bucket_writers), 0)
|
||||
ss.disownServiceParent()
|
||||
del ss
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user