Refactor BucketWriters such that disconnection can be limited Foolscap.

This commit is contained in:
Itamar Turner-Trauring 2021-09-29 13:42:17 -04:00
parent a4153b7125
commit 8fb6afee1b
4 changed files with 65 additions and 42 deletions

View File

@ -208,8 +208,9 @@ class ShareFile(object):
# Batch 1: # Batch 1:
# - bucketwriter dict in the server, to persist them + TEST of persistence # - bucketwriter dict in the server, to persist them + TEST of persistence
# - aborting bucketwriter removes it from server persistent + TEST # - aborting bucketwriter removes it from server persistent + TEST
# - get rid of disconnect notification (probably no test, rely on existing?) # - disconnect still aborts _for Foolscap only_
# - add bucketwriter cancellation to remote_allocate_buckets() (probably rely on existing tests) # - existing in-use buckets are not returned _for Foolscap only_
# - this implies splitting remote_allocate_buckets into generic and Foolscap-y parts
# Batch 2: # Batch 2:
# - scheduled events for aborting bucketwriter + TEST # - scheduled events for aborting bucketwriter + TEST
# - bucketwriter writes delay cancellation + TEST # - bucketwriter writes delay cancellation + TEST
@ -218,13 +219,11 @@ class ShareFile(object):
@implementer(RIBucketWriter) @implementer(RIBucketWriter)
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary): def __init__(self, ss, incominghome, finalhome, max_size, lease_info):
self.ss = ss self.ss = ss
self.incominghome = incominghome self.incominghome = incominghome
self.finalhome = finalhome self.finalhome = finalhome
self._max_size = max_size # don't allow the client to write more than this self._max_size = max_size # don't allow the client to write more than this
self._canary = canary
self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
self.closed = False self.closed = False
self.throw_out_all_data = False self.throw_out_all_data = False
self._sharefile = ShareFile(incominghome, create=True, max_size=max_size) self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
@ -290,22 +289,19 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
pass pass
self._sharefile = None self._sharefile = None
self.closed = True self.closed = True
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
filelen = os.stat(self.finalhome)[stat.ST_SIZE] filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen) self.ss.bucket_writer_closed(self, filelen)
self.ss.add_latency("close", time.time() - start) self.ss.add_latency("close", time.time() - start)
self.ss.count("close") self.ss.count("close")
def _disconnected(self): def disconnected(self):
if not self.closed: if not self.closed:
self._abort() self._abort()
def remote_abort(self): def remote_abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome, log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL) facility="tahoe.storage", level=log.UNUSUAL)
if not self.closed:
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
self._abort() self._abort()
self.ss.count("abort") self.ss.count("abort")

View File

@ -11,13 +11,14 @@ if PY2:
# Omit open() to get native behavior where open("w") always accepts native # Omit open() to get native behavior where open("w") always accepts native
# strings. Omit bytes so we don't leak future's custom bytes. # strings. Omit bytes so we don't leak future's custom bytes.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401 from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401
else:
from typing import Dict
import os, re, struct, time import os, re, struct, time
import weakref
import six import six
from foolscap.api import Referenceable from foolscap.api import Referenceable
from foolscap.ipb import IRemoteReference
from twisted.application import service from twisted.application import service
from zope.interface import implementer from zope.interface import implementer
@ -89,7 +90,6 @@ class StorageServer(service.MultiService, Referenceable):
self.incomingdir = os.path.join(sharedir, 'incoming') self.incomingdir = os.path.join(sharedir, 'incoming')
self._clean_incomplete() self._clean_incomplete()
fileutil.make_dirs(self.incomingdir) fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
log.msg("StorageServer created", facility="tahoe.storage") log.msg("StorageServer created", facility="tahoe.storage")
if reserved_space: if reserved_space:
@ -121,6 +121,15 @@ class StorageServer(service.MultiService, Referenceable):
self.lease_checker.setServiceParent(self) self.lease_checker.setServiceParent(self)
self._get_current_time = get_current_time self._get_current_time = get_current_time
# Currently being-written Bucketwriters. TODO Can probably refactor so
# active_writers is unnecessary, do as second pass.
# Map BucketWriter -> (storage_index, share_num)
self._active_writers = {} # type: Dict[BucketWriter, (bytes,int)]
# Map (storage_index, share_num) -> BucketWriter:
self._bucket_writers = {} # type: Dict[(bytes, int),BucketWriter]
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
def __repr__(self): def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),) return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
@ -263,10 +272,14 @@ class StorageServer(service.MultiService, Referenceable):
} }
return version return version
def remote_allocate_buckets(self, storage_index, def allocate_buckets(self, storage_index,
renew_secret, cancel_secret, renew_secret, cancel_secret,
sharenums, allocated_size, sharenums, allocated_size,
canary, owner_num=0): include_in_progress,
owner_num=0):
"""
Generic bucket allocation API.
"""
# owner_num is not for clients to set, but rather it should be # owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated # curried into the PersonalStorageServer instance that is dedicated
# to a particular owner. # to a particular owner.
@ -315,6 +328,7 @@ class StorageServer(service.MultiService, Referenceable):
# great! we already have it. easy. # great! we already have it. easy.
pass pass
elif os.path.exists(incominghome): elif os.path.exists(incominghome):
# TODO use include_in_progress
# Note that we don't create BucketWriters for shnums that # Note that we don't create BucketWriters for shnums that
# have a partial share (in incoming/), so if a second upload # have a partial share (in incoming/), so if a second upload
# occurs while the first is still in progress, the second # occurs while the first is still in progress, the second
@ -323,11 +337,12 @@ class StorageServer(service.MultiService, Referenceable):
elif (not limited) or (remaining_space >= max_space_per_bucket): elif (not limited) or (remaining_space >= max_space_per_bucket):
# ok! we need to create the new share file. # ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome, bw = BucketWriter(self, incominghome, finalhome,
max_space_per_bucket, lease_info, canary) max_space_per_bucket, lease_info)
if self.no_storage: if self.no_storage:
bw.throw_out_all_data = True bw.throw_out_all_data = True
bucketwriters[shnum] = bw bucketwriters[shnum] = bw
self._active_writers[bw] = 1 self._active_writers[bw] = (storage_index, shnum)
self._bucket_writers[(storage_index, shnum)] = bw
if limited: if limited:
remaining_space -= max_space_per_bucket remaining_space -= max_space_per_bucket
else: else:
@ -340,6 +355,21 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("allocate", self._get_current_time() - start) self.add_latency("allocate", self._get_current_time() - start)
return alreadygot, bucketwriters return alreadygot, bucketwriters
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self.allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
include_in_progress=False, owner_num=owner_num,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
return alreadygot, bucketwriters
def _iter_share_files(self, storage_index): def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index): for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f: with open(filename, 'rb') as f:
@ -383,7 +413,11 @@ class StorageServer(service.MultiService, Referenceable):
def bucket_writer_closed(self, bw, consumed_size): def bucket_writer_closed(self, bw, consumed_size):
if self.stats_provider: if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size) self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._active_writers[bw] storage_index, shnum = self._active_writers.pop(bw)
del self._bucket_writers[(storage_index, shnum)]
if bw in self._bucket_writer_disconnect_markers:
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
canary.dontNotifyOnDisconnect(disconnect_marker)
def _get_bucket_shares(self, storage_index): def _get_bucket_shares(self, storage_index):
"""Return a list of (shnum, pathname) tuples for files that hold """Return a list of (shnum, pathname) tuples for files that hold

View File

@ -314,6 +314,11 @@ class FakeCanary(object):
def getPeer(self): def getPeer(self):
return "<fake>" return "<fake>"
# For use by tests:
def disconnected(self):
for (f, args, kwargs) in list(self.disconnectors.values()):
f(*args, **kwargs)
class ShouldFailMixin(object): class ShouldFailMixin(object):

View File

@ -129,8 +129,7 @@ class Bucket(unittest.TestCase):
def test_create(self): def test_create(self):
incoming, final = self.make_workdir("test_create") incoming, final = self.make_workdir("test_create")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), bw = BucketWriter(self, incoming, final, 200, self.make_lease())
FakeCanary())
bw.remote_write(0, b"a"*25) bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25) bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*25) bw.remote_write(50, b"c"*25)
@ -139,8 +138,7 @@ class Bucket(unittest.TestCase):
def test_readwrite(self): def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite") incoming, final = self.make_workdir("test_readwrite")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), bw = BucketWriter(self, incoming, final, 200, self.make_lease())
FakeCanary())
bw.remote_write(0, b"a"*25) bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25) bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*7) # last block may be short bw.remote_write(50, b"c"*7) # last block may be short
@ -158,8 +156,7 @@ class Bucket(unittest.TestCase):
incoming, final = self.make_workdir( incoming, final = self.make_workdir(
"test_write_past_size_errors-{}".format(i) "test_write_past_size_errors-{}".format(i)
) )
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), bw = BucketWriter(self, incoming, final, 200, self.make_lease())
FakeCanary())
with self.assertRaises(DataTooLargeError): with self.assertRaises(DataTooLargeError):
bw.remote_write(offset, b"a" * length) bw.remote_write(offset, b"a" * length)
@ -179,7 +176,6 @@ class Bucket(unittest.TestCase):
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter( bw = BucketWriter(
self, incoming, final, length, self.make_lease(), self, incoming, final, length, self.make_lease(),
FakeCanary()
) )
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, expected_data[10:20]) bw.remote_write(10, expected_data[10:20])
@ -218,7 +214,6 @@ class Bucket(unittest.TestCase):
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4())) incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter( bw = BucketWriter(
self, incoming, final, length, self.make_lease(), self, incoming, final, length, self.make_lease(),
FakeCanary()
) )
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes. # Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, b"1" * 10) bw.remote_write(10, b"1" * 10)
@ -318,8 +313,7 @@ class BucketProxy(unittest.TestCase):
final = os.path.join(basedir, "bucket") final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir) fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp")) fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease(), bw = BucketWriter(self, incoming, final, size, self.make_lease())
FakeCanary())
rb = RemoteBucket(bw) rb = RemoteBucket(bw)
return bw, rb, final return bw, rb, final
@ -669,7 +663,7 @@ class Server(unittest.TestCase):
# the size we request. # the size we request.
OVERHEAD = 3*4 OVERHEAD = 3*4
LEASE_SIZE = 4+32+32+4 LEASE_SIZE = 4+32+32+4
canary = FakeCanary(True) canary = FakeCanary()
already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary) already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary)
self.failUnlessEqual(len(writers), 3) self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally # now the StorageServer should have 3000 bytes provisionally
@ -677,16 +671,14 @@ class Server(unittest.TestCase):
self.failUnlessEqual(len(ss._active_writers), 3) self.failUnlessEqual(len(ss._active_writers), 3)
# allocating 1001-byte shares only leaves room for one # allocating 1001-byte shares only leaves room for one
already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary) canary2 = FakeCanary()
already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2)
self.failUnlessEqual(len(writers2), 1) self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4) self.failUnlessEqual(len(ss._active_writers), 4)
# we abandon the first set, so their provisional allocation should be # we abandon the first set, so their provisional allocation should be
# returned # returned
canary.disconnected()
del already
del writers
gc.collect()
self.failUnlessEqual(len(ss._active_writers), 1) self.failUnlessEqual(len(ss._active_writers), 1)
# now we have a provisional allocation of 1001 bytes # now we have a provisional allocation of 1001 bytes
@ -697,9 +689,6 @@ class Server(unittest.TestCase):
for bw in writers2.values(): for bw in writers2.values():
bw.remote_write(0, b"a"*25) bw.remote_write(0, b"a"*25)
bw.remote_close() bw.remote_close()
del already2
del writers2
del bw
self.failUnlessEqual(len(ss._active_writers), 0) self.failUnlessEqual(len(ss._active_writers), 0)
# this also changes the amount reported as available by call_get_disk_stats # this also changes the amount reported as available by call_get_disk_stats
@ -707,13 +696,12 @@ class Server(unittest.TestCase):
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares # 5000-1085=3915 free, therefore we can fit 39 100byte shares
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary) canary3 = FakeCanary()
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3)
self.failUnlessEqual(len(writers3), 39) self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._active_writers), 39) self.failUnlessEqual(len(ss._active_writers), 39)
del already3 canary3.disconnected()
del writers3
gc.collect()
self.failUnlessEqual(len(ss._active_writers), 0) self.failUnlessEqual(len(ss._active_writers), 0)
ss.disownServiceParent() ss.disownServiceParent()