From 8c8e377466bcf2659029f7d59636d5039e12abf7 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 18 Nov 2021 14:35:04 -0500 Subject: [PATCH] Implement timeout and corresponding tests. --- src/allmydata/storage/immutable.py | 28 +++++++++++++-- src/allmydata/test/test_storage.py | 58 ++++++++++++++++++++++++++---- 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py index 7cfb7a1bf..8a7519b7b 100644 --- a/src/allmydata/storage/immutable.py +++ b/src/allmydata/storage/immutable.py @@ -246,11 +246,17 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 self._sharefile.add_lease(lease_info) self._already_written = RangeMap() self._clock = clock + self._timeout = clock.callLater(30 * 60, self._abort_due_to_timeout) def allocated_size(self): return self._max_size def remote_write(self, offset, data): + self.write(offset, data) + + def write(self, offset, data): + # Delay the timeout, since we received data: + self._timeout.reset(30 * 60) start = self._clock.seconds() precondition(not self.closed) if self.throw_out_all_data: @@ -273,7 +279,11 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 self.ss.count("write") def remote_close(self): + self.close() + + def close(self): precondition(not self.closed) + self._timeout.cancel() start = self._clock.seconds() fileutil.make_dirs(os.path.dirname(self.finalhome)) @@ -312,15 +322,23 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 def disconnected(self): if not self.closed: - self._abort() + self.abort() + + def _abort_due_to_timeout(self): + """ + Called if we run out of time. + """ + log.msg("storage: aborting sharefile %s due to timeout" % self.incominghome, + facility="tahoe.storage", level=log.UNUSUAL) + self.abort() def remote_abort(self): log.msg("storage: aborting sharefile %s" % self.incominghome, facility="tahoe.storage", level=log.UNUSUAL) - self._abort() + self.abort() self.ss.count("abort") - def _abort(self): + def abort(self): if self.closed: return @@ -338,6 +356,10 @@ class BucketWriter(Referenceable): # type: ignore # warner/foolscap#78 self.closed = True self.ss.bucket_writer_closed(self, 0) + # Cancel timeout if it wasn't already cancelled. + if self._timeout.active(): + self._timeout.cancel() + @implementer(RIBucketReader) class BucketReader(Referenceable): # type: ignore # warner/foolscap#78 diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index 93779bb29..18dca9856 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -285,20 +285,64 @@ class Bucket(unittest.TestCase): result_of_read = br.remote_read(0, len(share_data)+1) self.failUnlessEqual(result_of_read, share_data) + def _assert_timeout_only_after_30_minutes(self, clock, bw): + """ + The ``BucketWriter`` times out and is closed after 30 minutes, but not + sooner. + """ + self.assertFalse(bw.closed) + # 29 minutes pass. Everything is fine. + for i in range(29): + clock.advance(60) + self.assertFalse(bw.closed, "Bucket closed after only %d minutes" % (i + 1,)) + # After the 30th minute, the bucket is closed due to lack of writes. + clock.advance(60) + self.assertTrue(bw.closed) + def test_bucket_expires_if_no_writes_for_30_minutes(self): - pass + """ + If a ``BucketWriter`` receives no writes for 30 minutes, it is removed. + """ + incoming, final = self.make_workdir("test_bucket_expires") + clock = Clock() + bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock) + self._assert_timeout_only_after_30_minutes(clock, bw) def test_bucket_writes_delay_timeout(self): - pass - - def test_bucket_finishing_writiing_cancels_timeout(self): - pass + """ + So long as the ``BucketWriter`` receives writes, the the removal + timeout is put off. + """ + incoming, final = self.make_workdir("test_bucket_writes_delay_timeout") + clock = Clock() + bw = BucketWriter(self, incoming, final, 200, self.make_lease(), clock) + # 20 minutes pass, getting close to the timeout... + clock.advance(29 * 60) + # .. but we receive a write! So that should delay the timeout. + bw.write(0, b"hello") + self._assert_timeout_only_after_30_minutes(clock, bw) def test_bucket_closing_cancels_timeout(self): - pass + """ + Closing cancels the ``BucketWriter`` timeout. + """ + incoming, final = self.make_workdir("test_bucket_close_timeout") + clock = Clock() + bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock) + self.assertTrue(clock.getDelayedCalls()) + bw.close() + self.assertFalse(clock.getDelayedCalls()) def test_bucket_aborting_cancels_timeout(self): - pass + """ + Closing cancels the ``BucketWriter`` timeout. + """ + incoming, final = self.make_workdir("test_bucket_abort_timeout") + clock = Clock() + bw = BucketWriter(self, incoming, final, 10, self.make_lease(), clock) + self.assertTrue(clock.getDelayedCalls()) + bw.abort() + self.assertFalse(clock.getDelayedCalls()) class RemoteBucket(object):