Merge pull request #1162 from tahoe-lafs/3807.immutable-upload-timeout

Immutable uploads time out if no writes are done for 30 minutes

Fixes ticket:3807
This commit is contained in:
Itamar Turner-Trauring
2021-11-23 11:01:26 -05:00
committed by GitHub
6 changed files with 143 additions and 48 deletions

2
.gitignore vendored
View File

@ -29,7 +29,7 @@ zope.interface-*.egg
.pc
/src/allmydata/test/plugins/dropin.cache
/_trial_temp*
**/_trial_temp*
/tmp*
/*.patch
/dist/

View File

@ -0,0 +1 @@
If uploading an immutable hasn't had a write for 30 minutes, the storage server will abort the upload.

View File

@ -233,7 +233,7 @@ class ShareFile(object):
@implementer(RIBucketWriter)
class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def __init__(self, ss, incominghome, finalhome, max_size, lease_info):
def __init__(self, ss, incominghome, finalhome, max_size, lease_info, clock):
self.ss = ss
self.incominghome = incominghome
self.finalhome = finalhome
@ -245,12 +245,16 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
# added by simultaneous uploaders
self._sharefile.add_lease(lease_info)
self._already_written = RangeMap()
self._clock = clock
self._timeout = clock.callLater(30 * 60, self._abort_due_to_timeout)
def allocated_size(self):
return self._max_size
def remote_write(self, offset, data):
start = time.time()
# Delay the timeout, since we received data:
self._timeout.reset(30 * 60)
start = self._clock.seconds()
precondition(not self.closed)
if self.throw_out_all_data:
return
@ -268,12 +272,16 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self._sharefile.write_share_data(offset, data)
self._already_written.set(True, offset, end)
self.ss.add_latency("write", time.time() - start)
self.ss.add_latency("write", self._clock.seconds() - start)
self.ss.count("write")
def remote_close(self):
self.close()
def close(self):
precondition(not self.closed)
start = time.time()
self._timeout.cancel()
start = self._clock.seconds()
fileutil.make_dirs(os.path.dirname(self.finalhome))
fileutil.rename(self.incominghome, self.finalhome)
@ -306,20 +314,28 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
filelen = os.stat(self.finalhome)[stat.ST_SIZE]
self.ss.bucket_writer_closed(self, filelen)
self.ss.add_latency("close", time.time() - start)
self.ss.add_latency("close", self._clock.seconds() - start)
self.ss.count("close")
def disconnected(self):
if not self.closed:
self._abort()
self.abort()
def _abort_due_to_timeout(self):
"""
Called if we run out of time.
"""
log.msg("storage: aborting sharefile %s due to timeout" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
def remote_abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self._abort()
self.abort()
self.ss.count("abort")
def _abort(self):
def abort(self):
if self.closed:
return
@ -337,6 +353,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self.closed = True
self.ss.bucket_writer_closed(self, 0)
# Cancel timeout if it wasn't already cancelled.
if self._timeout.active():
self._timeout.cancel()
@implementer(RIBucketReader)
class BucketReader(Referenceable): # type: ignore # warner/foolscap#78

View File

@ -14,12 +14,13 @@ if PY2:
else:
from typing import Dict
import os, re, time
import os, re
import six
from foolscap.api import Referenceable
from foolscap.ipb import IRemoteReference
from twisted.application import service
from twisted.internet import reactor
from zope.interface import implementer
from allmydata.interfaces import RIStorageServer, IStatsProducer
@ -71,7 +72,7 @@ class StorageServer(service.MultiService, Referenceable):
expiration_override_lease_duration=None,
expiration_cutoff_date=None,
expiration_sharetypes=("mutable", "immutable"),
get_current_time=time.time):
clock=reactor):
service.MultiService.__init__(self)
assert isinstance(nodeid, bytes)
assert len(nodeid) == 20
@ -122,7 +123,7 @@ class StorageServer(service.MultiService, Referenceable):
expiration_cutoff_date,
expiration_sharetypes)
self.lease_checker.setServiceParent(self)
self._get_current_time = get_current_time
self._clock = clock
# Currently being-written Bucketwriters. For Foolscap, lifetime is tied
# to connection: when disconnection happens, the BucketWriters are
@ -135,6 +136,12 @@ class StorageServer(service.MultiService, Referenceable):
# Canaries and disconnect markers for BucketWriters created via Foolscap:
self._bucket_writer_disconnect_markers = {} # type: Dict[BucketWriter,(IRemoteReference, object)]
def stopService(self):
# Cancel any in-progress uploads:
for bw in list(self._bucket_writers.values()):
bw.disconnected()
return service.MultiService.stopService(self)
def __repr__(self):
return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
@ -292,7 +299,7 @@ class StorageServer(service.MultiService, Referenceable):
# owner_num is not for clients to set, but rather it should be
# curried into the PersonalStorageServer instance that is dedicated
# to a particular owner.
start = self._get_current_time()
start = self._clock.seconds()
self.count("allocate")
alreadygot = set()
bucketwriters = {} # k: shnum, v: BucketWriter
@ -305,7 +312,7 @@ class StorageServer(service.MultiService, Referenceable):
# goes into the share files themselves. It could also be put into a
# separate database. Note that the lease should not be added until
# the BucketWriter has been closed.
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -346,7 +353,8 @@ class StorageServer(service.MultiService, Referenceable):
elif (not limited) or (remaining_space >= max_space_per_bucket):
# ok! we need to create the new share file.
bw = BucketWriter(self, incominghome, finalhome,
max_space_per_bucket, lease_info)
max_space_per_bucket, lease_info,
clock=self._clock)
if self.no_storage:
bw.throw_out_all_data = True
bucketwriters[shnum] = bw
@ -360,7 +368,7 @@ class StorageServer(service.MultiService, Referenceable):
if bucketwriters:
fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
self.add_latency("allocate", self._get_current_time() - start)
self.add_latency("allocate", self._clock.seconds() - start)
return alreadygot, bucketwriters
def remote_allocate_buckets(self, storage_index,
@ -395,26 +403,26 @@ class StorageServer(service.MultiService, Referenceable):
def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
owner_num=1):
start = self._get_current_time()
start = self._clock.seconds()
self.count("add-lease")
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(owner_num,
renew_secret, cancel_secret,
new_expire_time, self.my_nodeid)
for sf in self._iter_share_files(storage_index):
sf.add_or_renew_lease(lease_info)
self.add_latency("add-lease", self._get_current_time() - start)
self.add_latency("add-lease", self._clock.seconds() - start)
return None
def remote_renew_lease(self, storage_index, renew_secret):
start = self._get_current_time()
start = self._clock.seconds()
self.count("renew")
new_expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
found_buckets = False
for sf in self._iter_share_files(storage_index):
found_buckets = True
sf.renew_lease(renew_secret, new_expire_time)
self.add_latency("renew", self._get_current_time() - start)
self.add_latency("renew", self._clock.seconds() - start)
if not found_buckets:
raise IndexError("no such lease to renew")
@ -441,7 +449,7 @@ class StorageServer(service.MultiService, Referenceable):
pass
def remote_get_buckets(self, storage_index):
start = self._get_current_time()
start = self._clock.seconds()
self.count("get")
si_s = si_b2a(storage_index)
log.msg("storage: get_buckets %r" % si_s)
@ -449,7 +457,7 @@ class StorageServer(service.MultiService, Referenceable):
for shnum, filename in self._get_bucket_shares(storage_index):
bucketreaders[shnum] = BucketReader(self, filename,
storage_index, shnum)
self.add_latency("get", self._get_current_time() - start)
self.add_latency("get", self._clock.seconds() - start)
return bucketreaders
def get_leases(self, storage_index):
@ -608,7 +616,7 @@ class StorageServer(service.MultiService, Referenceable):
:return LeaseInfo: Information for a new lease for a share.
"""
ownerid = 1 # TODO
expire_time = self._get_current_time() + DEFAULT_RENEWAL_TIME
expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
lease_info = LeaseInfo(ownerid,
renew_secret, cancel_secret,
expire_time, self.my_nodeid)
@ -646,7 +654,7 @@ class StorageServer(service.MultiService, Referenceable):
See ``allmydata.interfaces.RIStorageServer`` for details about other
parameters and return value.
"""
start = self._get_current_time()
start = self._clock.seconds()
self.count("writev")
si_s = si_b2a(storage_index)
log.msg("storage: slot_writev %r" % si_s)
@ -687,7 +695,7 @@ class StorageServer(service.MultiService, Referenceable):
self._add_or_renew_leases(remaining_shares, lease_info)
# all done
self.add_latency("writev", self._get_current_time() - start)
self.add_latency("writev", self._clock.seconds() - start)
return (testv_is_good, read_data)
def remote_slot_testv_and_readv_and_writev(self, storage_index,
@ -713,7 +721,7 @@ class StorageServer(service.MultiService, Referenceable):
return share
def remote_slot_readv(self, storage_index, shares, readv):
start = self._get_current_time()
start = self._clock.seconds()
self.count("readv")
si_s = si_b2a(storage_index)
lp = log.msg("storage: slot_readv %r %r" % (si_s, shares),
@ -722,7 +730,7 @@ class StorageServer(service.MultiService, Referenceable):
# shares exist if there is a file for them
bucketdir = os.path.join(self.sharedir, si_dir)
if not os.path.isdir(bucketdir):
self.add_latency("readv", self._get_current_time() - start)
self.add_latency("readv", self._clock.seconds() - start)
return {}
datavs = {}
for sharenum_s in os.listdir(bucketdir):
@ -736,7 +744,7 @@ class StorageServer(service.MultiService, Referenceable):
datavs[sharenum] = msf.readv(readv)
log.msg("returning shares %s" % (list(datavs.keys()),),
facility="tahoe.storage", level=log.NOISY, parent=lp)
self.add_latency("readv", self._get_current_time() - start)
self.add_latency("readv", self._clock.seconds() - start)
return datavs
def remote_advise_corrupt_share(self, share_type, storage_index, shnum,

View File

@ -21,6 +21,7 @@ if PY2:
from random import Random
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.task import Clock
from foolscap.api import Referenceable, RemoteException
@ -1017,16 +1018,17 @@ class _FoolscapMixin(SystemTestMixin):
self.server = s
break
assert self.server is not None, "Couldn't find StorageServer"
self._current_time = 123456
self.server._get_current_time = self.fake_time
self._clock = Clock()
self._clock.advance(123456)
self.server._clock = self._clock
def fake_time(self):
"""Return the current fake, test-controlled, time."""
return self._current_time
return self._clock.seconds()
def fake_sleep(self, seconds):
"""Advance the fake time by the given number of seconds."""
self._current_time += seconds
self._clock.advance(seconds)
@inlineCallbacks
def tearDown(self):

View File

@ -128,7 +128,7 @@ class Bucket(unittest.TestCase):
def test_create(self):
incoming, final = self.make_workdir("test_create")
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*25)
@ -137,7 +137,7 @@ class Bucket(unittest.TestCase):
def test_readwrite(self):
incoming, final = self.make_workdir("test_readwrite")
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
bw.remote_write(0, b"a"*25)
bw.remote_write(25, b"b"*25)
bw.remote_write(50, b"c"*7) # last block may be short
@ -155,7 +155,7 @@ class Bucket(unittest.TestCase):
incoming, final = self.make_workdir(
"test_write_past_size_errors-{}".format(i)
)
bw = BucketWriter(self, incoming, final, 200, self.make_lease())
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), Clock())
with self.assertRaises(DataTooLargeError):
bw.remote_write(offset, b"a" * length)
@ -174,7 +174,7 @@ class Bucket(unittest.TestCase):
expected_data = b"".join(bchr(i) for i in range(100))
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, expected_data[10:20])
@ -212,7 +212,7 @@ class Bucket(unittest.TestCase):
length = 100
incoming, final = self.make_workdir("overlapping_writes_{}".format(uuid4()))
bw = BucketWriter(
self, incoming, final, length, self.make_lease(),
self, incoming, final, length, self.make_lease(), Clock()
)
# Three writes: 10-19, 30-39, 50-59. This allows for a bunch of holes.
bw.remote_write(10, b"1" * 10)
@ -285,6 +285,67 @@ class Bucket(unittest.TestCase):
result_of_read = br.remote_read(0, len(share_data)+1)
self.failUnlessEqual(result_of_read, share_data)
def _assert_timeout_only_after_30_minutes(self, clock, bw):
"""
The ``BucketWriter`` times out and is closed after 30 minutes, but not
sooner.
"""
self.assertFalse(bw.closed)
# 29 minutes pass. Everything is fine.
for i in range(29):
clock.advance(60)
self.assertFalse(bw.closed, "Bucket closed after only %d minutes" % (i + 1,))
# After the 30th minute, the bucket is closed due to lack of writes.
clock.advance(60)
self.assertTrue(bw.closed)
def test_bucket_expires_if_no_writes_for_30_minutes(self):
"""
If a ``BucketWriter`` receives no writes for 30 minutes, it is removed.
"""
incoming, final = self.make_workdir("test_bucket_expires")
clock = Clock()
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_writes_delay_timeout(self):
"""
So long as the ``BucketWriter`` receives writes, the the removal
timeout is put off.
"""
incoming, final = self.make_workdir("test_bucket_writes_delay_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
# 29 minutes pass, getting close to the timeout...
clock.advance(29 * 60)
# .. but we receive a write! So that should delay the timeout again to
# another 30 minutes.
bw.remote_write(0, b"hello")
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_closing_cancels_timeout(self):
"""
Closing cancels the ``BucketWriter`` timeout.
"""
incoming, final = self.make_workdir("test_bucket_close_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
self.assertTrue(clock.getDelayedCalls())
bw.close()
self.assertFalse(clock.getDelayedCalls())
def test_bucket_aborting_cancels_timeout(self):
"""
Closing cancels the ``BucketWriter`` timeout.
"""
incoming, final = self.make_workdir("test_bucket_abort_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
self.assertTrue(clock.getDelayedCalls())
bw.abort()
self.assertFalse(clock.getDelayedCalls())
class RemoteBucket(object):
def __init__(self, target):
@ -312,7 +373,7 @@ class BucketProxy(unittest.TestCase):
final = os.path.join(basedir, "bucket")
fileutil.make_dirs(basedir)
fileutil.make_dirs(os.path.join(basedir, "tmp"))
bw = BucketWriter(self, incoming, final, size, self.make_lease())
bw = BucketWriter(self, incoming, final, size, self.make_lease(), Clock())
rb = RemoteBucket(bw)
return bw, rb, final
@ -438,11 +499,13 @@ class Server(unittest.TestCase):
basedir = os.path.join("storage", "Server", name)
return basedir
def create(self, name, reserved_space=0, klass=StorageServer, get_current_time=time.time):
def create(self, name, reserved_space=0, klass=StorageServer, clock=None):
if clock is None:
clock = Clock()
workdir = self.workdir(name)
ss = klass(workdir, b"\x00" * 20, reserved_space=reserved_space,
stats_provider=FakeStatsProvider(),
get_current_time=get_current_time)
clock=clock)
ss.setServiceParent(self.sparent)
return ss
@ -559,7 +622,6 @@ class Server(unittest.TestCase):
writer.remote_abort()
self.failUnlessEqual(ss.allocated_size(), 0)
def test_allocate(self):
ss = self.create("test_allocate")
@ -626,7 +688,7 @@ class Server(unittest.TestCase):
clock.advance(first_lease)
ss = self.create(
"test_allocate_without_lease_renewal",
get_current_time=clock.seconds,
clock=clock,
)
# Put a share on there
@ -918,7 +980,7 @@ class Server(unittest.TestCase):
"""
clock = Clock()
clock.advance(123)
ss = self.create("test_immutable_add_lease_renews", get_current_time=clock.seconds)
ss = self.create("test_immutable_add_lease_renews", clock=clock)
# Start out with single lease created with bucket:
renewal_secret, cancel_secret = self.create_bucket_5_shares(ss, b"si0")
@ -1032,10 +1094,12 @@ class MutableServer(unittest.TestCase):
basedir = os.path.join("storage", "MutableServer", name)
return basedir
def create(self, name, get_current_time=time.time):
def create(self, name, clock=None):
workdir = self.workdir(name)
if clock is None:
clock = Clock()
ss = StorageServer(workdir, b"\x00" * 20,
get_current_time=get_current_time)
clock=clock)
ss.setServiceParent(self.sparent)
return ss
@ -1420,7 +1484,7 @@ class MutableServer(unittest.TestCase):
clock = Clock()
clock.advance(235)
ss = self.create("test_mutable_add_lease_renews",
get_current_time=clock.seconds)
clock=clock)
def secrets(n):
return ( self.write_enabler(b"we1"),
self.renew_secret(b"we1-%d" % n),