Merge branch '3793-persistent-bucketwriter' into 3798-bucket-write-abort-tests

This commit is contained in:
Itamar Turner-Trauring 2021-10-04 10:30:41 -04:00
commit b09b7fd01b
6 changed files with 112 additions and 72 deletions

0
newsfragments/3793.minor Normal file
View File

View File

@ -208,13 +208,11 @@ class ShareFile(object):
@implementer(RIBucketWriter)
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def __init__(self, ss, incominghome, finalhome, max_size, lease_info, canary):
def __init__(self, ss, incominghome, finalhome, max_size, lease_info):
self.ss = ss
self.incominghome = incominghome
self.finalhome = finalhome
self._max_size = max_size # don't allow the client to write more than this
self._canary = canary
self._disconnect_marker = canary.notifyOnDisconnect(self._disconnected)
self.closed = False
self.throw_out_all_data = False
self._sharefile = ShareFile(incominghome, create=True, max_size=max_size)
@ -280,22 +278,19 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
pass
self._sharefile = None
self.closed = True
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen)
self.ss.add_latency("close", time.time() - start)
self.ss.count("close")
def _disconnected(self):
def disconnected(self):
if not self.closed:
self._abort()
def remote_abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
if not self.closed:
self._canary.dontNotifyOnDisconnect(self._disconnect_marker)
self._abort()
self.ss.count("abort")

View File

@ -11,13 +11,14 @@ if PY2:
# Omit open() to get native behavior where open("w") always accepts native
# strings. Omit bytes so we don't leak future's custom bytes.
from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min # noqa: F401
else:
from typing import Dict
import os, re, struct, time
import weakref
import six
from foolscap.api import Referenceable
from foolscap.ipb import IRemoteReference
from twisted.application import service
from zope.interface import implementer
@ -89,7 +90,6 @@ class StorageServer(service.MultiService, Referenceable):
self.incomingdir = os.path.join(sharedir, 'incoming')
self._clean_incomplete()
fileutil.make_dirs(self.incomingdir)
self._active_writers = weakref.WeakKeyDictionary()
log.msg("StorageServer created", facility="tahoe.storage")
if reserved_space:
@ -121,6 +121,17 @@ class StorageServer(service.MultiService, Referenceable):
self.lease_checker.setServiceParent(self)
self._get_current_time = get_current_time
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
# to connection: when disconnection happens, the BucketWriters are
# removed. For HTTP, this makes no sense, so there will be
# timeout-based cleanup; see
# https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3807.
# Map in-progress filesystem path -> BucketWriter:
self._bucket_writers = {} # type: Dict[str,BucketWriter]
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
@ -238,7 +249,7 @@ class StorageServer(service.MultiService, Referenceable):
def allocated_size(self):
space = 0
for bw in self._active_writers:
for bw in self._bucket_writers.values():
space += bw.allocated_size()
return space
@ -263,10 +274,13 @@ class StorageServer(service.MultiService, Referenceable):
}
return version
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
def _allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
owner_num=0):
"""
Generic bucket allocation API.
"""
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
@ -315,7 +329,7 @@ class StorageServer(service.MultiService, Referenceable):
# great! we already have it. easy.
pass
elif os.path.exists(incominghome):
# Note that we don't create BucketWriters for shnums that
# For Foolscap we don't create BucketWriters for shnums that
# have a partial share (in incoming/), so if a second upload
# occurs while the first is still in progress, the second
# uploader will use different storage servers.
@ -323,11 +337,11 @@ class StorageServer(service.MultiService, Referenceable):
elif (not limited) or (remaining_space >= max_space_per_bucket):
# ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome,
max_space_per_bucket, lease_info, canary)
max_space_per_bucket, lease_info)
if self.no_storage:
bw.throw_out_all_data = True
bucketwriters[shnum] = bw
self._active_writers[bw] = 1
self._bucket_writers[incominghome] = bw
if limited:
remaining_space -= max_space_per_bucket
else:
@ -340,6 +354,21 @@ class StorageServer(service.MultiService, Referenceable):
self.add_latency("allocate", self._get_current_time() - start)
return alreadygot, bucketwriters
def remote_allocate_buckets(self, storage_index,
renew_secret, cancel_secret,
sharenums, allocated_size,
canary, owner_num=0):
"""Foolscap-specific ``allocate_buckets()`` API."""
alreadygot, bucketwriters = self._allocate_buckets(
storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
owner_num=owner_num,
)
# Abort BucketWriters if disconnection happens.
for bw in bucketwriters.values():
disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
return alreadygot, bucketwriters
def _iter_share_files(self, storage_index):
for shnum, filename in self._get_bucket_shares(storage_index):
with open(filename, 'rb') as f:
@ -383,7 +412,10 @@ class StorageServer(service.MultiService, Referenceable):
def bucket_writer_closed(self, bw, consumed_size):
if self.stats_provider:
self.stats_provider.count('storage_server.bytes_added', consumed_size)
del self._active_writers[bw]
del self._bucket_writers[bw.incominghome]
if bw in self._bucket_writer_disconnect_markers:
canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
canary.dontNotifyOnDisconnect(disconnect_marker)
def _get_bucket_shares(self, storage_index):
"""Return a list of (shnum, pathname) tuples for files that hold

View File

@ -314,6 +314,11 @@ class FakeCanary(object):
def getPeer(self):
return "<fake>"
# For use by tests:
def disconnected(self):
for (f, args, kwargs) in list(self.disconnectors.values()):
f(*args, **kwargs)
class ShouldFailMixin(object):

View File

@ -20,8 +20,6 @@ if PY2:
from random import Random
from testtools import skipIf
from twisted.internet.defer import inlineCallbacks
from foolscap.api import Referenceable, RemoteException
@ -77,6 +75,10 @@ class IStorageServerImmutableAPIsTestsMixin(object):
Tests for ``IStorageServer``'s immutable APIs.
``self.storage_server`` is expected to provide ``IStorageServer``.
``self.disconnect()`` should disconnect and then reconnect, creating a new
``self.storage_server``. Some implementations may wish to skip tests using
this; HTTP has no notion of disconnection.
"""
@inlineCallbacks
@ -98,13 +100,10 @@ class IStorageServerImmutableAPIsTestsMixin(object):
# We validate the bucket objects' interface in a later test.
@inlineCallbacks
@skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793")
def test_allocate_buckets_repeat(self):
"""
allocate_buckets() with the same storage index returns the same result,
because the shares have not been written to.
This fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793
``IStorageServer.allocate_buckets()`` with the same storage index does not return
work-in-progress buckets, but will add any newly added buckets.
"""
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
@ -115,7 +114,7 @@ class IStorageServerImmutableAPIsTestsMixin(object):
storage_index,
renew_secret,
cancel_secret,
sharenums=set(range(5)),
sharenums=set(range(4)),
allocated_size=1024,
canary=Referenceable(),
)
@ -128,40 +127,51 @@ class IStorageServerImmutableAPIsTestsMixin(object):
Referenceable(),
)
self.assertEqual(already_got, already_got2)
self.assertEqual(set(allocated.keys()), set(allocated2.keys()))
self.assertEqual(set(allocated2.keys()), {4})
@skipIf(True, "https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793")
@inlineCallbacks
def test_allocate_buckets_more_sharenums(self):
def test_disconnection(self):
"""
allocate_buckets() with the same storage index but more sharenums
acknowledges the extra shares don't exist.
If we disconnect in the middle of writing to a bucket, all data is
wiped, and it's even possible to write different data to the bucket.
Fails due to https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3793
(In the real world one shouldn't do that, but writing different data is
a good way to test that the original data really was wiped.)
HTTP protocol should skip this test, since disconnection is meaningless
concept; this is more about testing implicit contract the Foolscap
implementation depends on doesn't change as we refactor things.
"""
storage_index, renew_secret, cancel_secret = (
new_storage_index(),
new_secret(),
new_secret(),
)
yield self.storage_server.allocate_buckets(
(_, allocated) = yield self.storage_server.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums=set(range(5)),
sharenums={0},
allocated_size=1024,
canary=Referenceable(),
)
(already_got2, allocated2) = yield self.storage_server.allocate_buckets(
# Bucket 1 is fully written in one go.
yield allocated[0].callRemote("write", 0, b"1" * 1024)
# Disconnect:
yield self.disconnect()
# Write different data with no complaint:
(_, allocated) = yield self.storage_server.allocate_buckets(
storage_index,
renew_secret,
cancel_secret,
sharenums=set(range(7)),
sharenums={0},
allocated_size=1024,
canary=Referenceable(),
)
self.assertEqual(already_got2, set()) # none were fully written
self.assertEqual(set(allocated2.keys()), set(range(7)))
yield allocated[0].callRemote("write", 0, b"2" * 1024)
@inlineCallbacks
def test_written_shares_are_allocated(self):
@ -684,15 +694,16 @@ class IStorageServerMutableAPIsTestsMixin(object):
class _FoolscapMixin(SystemTestMixin):
"""Run tests on Foolscap version of ``IStorageServer."""
def _get_native_server(self):
return next(iter(self.clients[0].storage_broker.get_known_servers()))
@inlineCallbacks
def setUp(self):
AsyncTestCase.setUp(self)
self.basedir = "test_istorageserver/" + self.id()
yield SystemTestMixin.setUp(self)
yield self.set_up_nodes(1)
self.storage_server = next(
iter(self.clients[0].storage_broker.get_known_servers())
).get_storage_server()
self.storage_server = self._get_native_server().get_storage_server()
self.assertTrue(IStorageServer.providedBy(self.storage_server))
@inlineCallbacks
@ -700,6 +711,16 @@ class _FoolscapMixin(SystemTestMixin):
AsyncTestCase.tearDown(self)
yield SystemTestMixin.tearDown(self)
@inlineCallbacks
def disconnect(self):
"""
Disconnect and then reconnect with a new ``IStorageServer``.
"""
current = self.storage_server
yield self.bounce_client(0)
self.storage_server = self._get_native_server().get_storage_server()
assert self.storage_server is not current
class FoolscapSharedAPIsTests(
_FoolscapMixin, IStorageServerSharedAPIsTestsMixin, AsyncTestCase

View File

@ -19,7 +19,6 @@ import platform
import stat
import struct
import shutil
import gc
from uuid import uuid4
from twisted.trial import unittest
@ -129,8 +128,7 @@ class Bucket(unittest.TestCase):
def test_create(self):
incoming, final = self.make_workdir("test_create")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
FakeCanary())
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*25)
@ -139,8 +137,7 @@ class Bucket(unittest.TestCase):
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
FakeCanary())
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*7) # last block may be short
@ -158,8 +155,7 @@ class Bucket(unittest.TestCase):
incoming, final = self.make_workdir(
"test_write_past_size_errors-{}".format(i)
)
bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
FakeCanary())
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
with self.assertRaises(DataTooLargeError):
bw.remote_write(offset, b"a" * length)
@ -179,7 +175,6 @@ class Bucket(unittest.TestCase):
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
FakeCanary()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, expected_data[10:20])
@ -218,7 +213,6 @@ class Bucket(unittest.TestCase):
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
FakeCanary()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, b"1" * 10)
@ -318,8 +312,7 @@ class BucketProxy(unittest.TestCase):
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease(),
FakeCanary())
bw = BucketWriter(self, incoming, final, size, self.make_lease())
rb = RemoteBucket(bw)
return bw, rb, final
@ -669,26 +662,24 @@ class Server(unittest.TestCase):
# the size we request.
OVERHEAD = 3*4
LEASE_SIZE = 4+32+32+4
canary = FakeCanary(True)
canary = FakeCanary()
already, writers = self.allocate(ss, b"vid1", [0,1,2], 1000, canary)
self.failUnlessEqual(len(writers), 3)
# now the StorageServer should have 3000 bytes provisionally
# allocated, allowing only 2000 more to be claimed
self.failUnlessEqual(len(ss._active_writers), 3)
self.failUnlessEqual(len(ss._bucket_writers), 3)
# allocating 1001-byte shares only leaves room for one
already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary)
canary2 = FakeCanary()
already2, writers2 = self.allocate(ss, b"vid2", [0,1,2], 1001, canary2)
self.failUnlessEqual(len(writers2), 1)
self.failUnlessEqual(len(ss._active_writers), 4)
self.failUnlessEqual(len(ss._bucket_writers), 4)
# we abandon the first set, so their provisional allocation should be
# returned
canary.disconnected()
del already
del writers
gc.collect()
self.failUnlessEqual(len(ss._active_writers), 1)
self.failUnlessEqual(len(ss._bucket_writers), 1)
# now we have a provisional allocation of 1001 bytes
# and we close the second set, so their provisional allocation should
@ -697,25 +688,21 @@ class Server(unittest.TestCase):
for bw in writers2.values():
bw.remote_write(0, b"a"*25)
bw.remote_close()
del already2
del writers2
del bw
self.failUnlessEqual(len(ss._active_writers), 0)
self.failUnlessEqual(len(ss._bucket_writers), 0)
# this also changes the amount reported as available by call_get_disk_stats
allocated = 1001 + OVERHEAD + LEASE_SIZE
# now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
# 5000-1085=3915 free, therefore we can fit 39 100byte shares
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary)
canary3 = FakeCanary()
already3, writers3 = self.allocate(ss, b"vid3", list(range(100)), 100, canary3)
self.failUnlessEqual(len(writers3), 39)
self.failUnlessEqual(len(ss._active_writers), 39)
self.failUnlessEqual(len(ss._bucket_writers), 39)
del already3
del writers3
gc.collect()
canary3.disconnected()
self.failUnlessEqual(len(ss._active_writers), 0)
self.failUnlessEqual(len(ss._bucket_writers), 0)
ss.disownServiceParent()
del ss