Implement timeout and corresponding tests.

This commit is contained in:
Itamar Turner-Trauring 2021-11-18 14:35:04 -05:00
parent 5e341ad43a
commit 8c8e377466
2 changed files with 76 additions and 10 deletions

View File

@ -246,11 +246,17 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self._sharefile.add_lease(lease_info) self._sharefile.add_lease(lease_info)
self._already_written = RangeMap() self._already_written = RangeMap()
self._clock = clock self._clock = clock
self._timeout = clock.callLater(30 * 60, self._abort_due_to_timeout)
def allocated_size(self): def allocated_size(self):
return self._max_size return self._max_size
def remote_write(self, offset, data): def remote_write(self, offset, data):
self.write(offset, data)
def write(self, offset, data):
# Delay the timeout, since we received data:
self._timeout.reset(30 * 60)
start = self._clock.seconds() start = self._clock.seconds()
precondition(not self.closed) precondition(not self.closed)
if self.throw_out_all_data: if self.throw_out_all_data:
@ -273,7 +279,11 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self.ss.count("write") self.ss.count("write")
def remote_close(self): def remote_close(self):
self.close()
def close(self):
precondition(not self.closed) precondition(not self.closed)
self._timeout.cancel()
start = self._clock.seconds() start = self._clock.seconds()
fileutil.make_dirs(os.path.dirname(self.finalhome)) fileutil.make_dirs(os.path.dirname(self.finalhome))
@ -312,15 +322,23 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
def disconnected(self): def disconnected(self):
if not self.closed: if not self.closed:
self._abort() self.abort()
def _abort_due_to_timeout(self):
"""
Called if we run out of time.
"""
log.msg("storage: aborting sharefile %s due to timeout" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL)
self.abort()
def remote_abort(self): def remote_abort(self):
log.msg("storage: aborting sharefile %s" % self.incominghome, log.msg("storage: aborting sharefile %s" % self.incominghome,
facility="tahoe.storage", level=log.UNUSUAL) facility="tahoe.storage", level=log.UNUSUAL)
self._abort() self.abort()
self.ss.count("abort") self.ss.count("abort")
def _abort(self): def abort(self):
if self.closed: if self.closed:
return return
@ -338,6 +356,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78
self.closed = True self.closed = True
self.ss.bucket_writer_closed(self, 0) self.ss.bucket_writer_closed(self, 0)
# Cancel timeout if it wasn't already cancelled.
if self._timeout.active():
self._timeout.cancel()
@implementer(RIBucketReader) @implementer(RIBucketReader)
class BucketReader(Referenceable): # type: ignore # warner/foolscap#78 class BucketReader(Referenceable): # type: ignore # warner/foolscap#78

View File

@ -285,20 +285,64 @@ class Bucket(unittest.TestCase):
result_of_read = br.remote_read(0, len(share_data)+1) result_of_read = br.remote_read(0, len(share_data)+1)
self.failUnlessEqual(result_of_read, share_data) self.failUnlessEqual(result_of_read, share_data)
def _assert_timeout_only_after_30_minutes(self, clock, bw):
"""
The ``BucketWriter`` times out and is closed after 30 minutes, but not
sooner.
"""
self.assertFalse(bw.closed)
# 29 minutes pass. Everything is fine.
for i in range(29):
clock.advance(60)
self.assertFalse(bw.closed, "Bucket closed after only %d minutes" % (i + 1,))
# After the 30th minute, the bucket is closed due to lack of writes.
clock.advance(60)
self.assertTrue(bw.closed)
def test_bucket_expires_if_no_writes_for_30_minutes(self): def test_bucket_expires_if_no_writes_for_30_minutes(self):
pass """
If a ``BucketWriter`` receives no writes for 30 minutes, it is removed.
"""
incoming, final = self.make_workdir("test_bucket_expires")
clock = Clock()
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_writes_delay_timeout(self): def test_bucket_writes_delay_timeout(self):
pass """
So long as the ``BucketWriter`` receives writes, the the removal
def test_bucket_finishing_writiing_cancels_timeout(self): timeout is put off.
pass """
incoming, final = self.make_workdir("test_bucket_writes_delay_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock)
# 20 minutes pass, getting close to the timeout...
clock.advance(29 * 60)
# .. but we receive a write! So that should delay the timeout.
bw.write(0, b"hello")
self._assert_timeout_only_after_30_minutes(clock, bw)
def test_bucket_closing_cancels_timeout(self): def test_bucket_closing_cancels_timeout(self):
pass """
Closing cancels the ``BucketWriter`` timeout.
"""
incoming, final = self.make_workdir("test_bucket_close_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
self.assertTrue(clock.getDelayedCalls())
bw.close()
self.assertFalse(clock.getDelayedCalls())
def test_bucket_aborting_cancels_timeout(self): def test_bucket_aborting_cancels_timeout(self):
pass """
Closing cancels the ``BucketWriter`` timeout.
"""
incoming, final = self.make_workdir("test_bucket_abort_timeout")
clock = Clock()
bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock)
self.assertTrue(clock.getDelayedCalls())
bw.abort()
self.assertFalse(clock.getDelayedCalls())
class RemoteBucket(object): class RemoteBucket(object):