From fc7000c15219a28209c5fbfdc8adca4972acc274 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 5 Sep 2016 17:41:33 +0100 Subject: [PATCH 1/3] node: Fix race in Clock.doInterruptibly by not relying on version counter --- .../kotlin/com/r3corda/node/utilities/ClockUtils.kt | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt b/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt index 09a2d69846..01a4cbb981 100644 --- a/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt +++ b/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt @@ -7,6 +7,8 @@ import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.SuspendableRunnable import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.then +import com.r3corda.node.log +import org.slf4j.LoggerFactory import rx.Observable import rx.Subscriber import rx.Subscription @@ -82,19 +84,20 @@ abstract class MutableClock : Clock() { @Suppress("UNUSED_VALUE") // This is here due to the compiler thinking version is not used @Suspendable private fun Clock.doInterruptibly(runnable: SuspendableRunnable) { - var version = 0L var subscription: Subscription? = null + var interruptedByMutation = false try { if (this is MutableClock) { - version = this.mutationCount val strand = Strand.currentStrand() - subscription = this.mutations.subscribe { strand.interrupt() } + subscription = this.mutations.subscribe { + interruptedByMutation = true + strand.interrupt() + } } runnable.run() } catch(e: InterruptedException) { // If clock has not mutated, then re-throw - val newVersion = if (this is MutableClock) this.mutationCount else version - if (newVersion == version) { + if (!interruptedByMutation) { throw e } } finally { From 737fc0589c4d6eb4ada3624a34886244fda2d8e4 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 5 Sep 2016 17:42:07 +0100 Subject: [PATCH 2/3] node: Add clock test for external interrupt on waiting strand --- .../r3corda/node/utilities/ClockUtilsTest.kt | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/node/src/test/kotlin/com/r3corda/node/utilities/ClockUtilsTest.kt b/node/src/test/kotlin/com/r3corda/node/utilities/ClockUtilsTest.kt index be64fb16ca..508b061b12 100644 --- a/node/src/test/kotlin/com/r3corda/node/utilities/ClockUtilsTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/utilities/ClockUtilsTest.kt @@ -16,6 +16,7 @@ import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import kotlin.test.assertFalse import kotlin.test.assertTrue +import kotlin.test.fail class ClockUtilsTest { @@ -112,6 +113,27 @@ class ClockUtilsTest { assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline") } + @Test + fun `test external interrupt of a clock future`() { + val mainStrand = Strand.currentStrand() + executor.execute @Suspendable { + // Wait until main thread is waiting + while (mainStrand.state != Strand.State.TIMED_WAITING) { + Strand.sleep(1) + } + mainStrand.interrupt() + } + + val testClock = TestClock(stoppedClock) + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(10)) + + try { + testClock.awaitWithDeadline(advancedClock.instant(), SettableFuture.create()) + fail("Expected InterruptedException") + } catch (exception: InterruptedException) { + } + } + /** * If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been * run on core module (in IntelliJ, open gradle side tab and run: From 4923e33a7d6f17e910c24366f71d52d1b44015f3 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Tue, 6 Sep 2016 09:49:26 +0100 Subject: [PATCH 3/3] node: Remove unnecessary @Suppress --- .../src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt b/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt index 01a4cbb981..cc0d47d19d 100644 --- a/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt +++ b/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt @@ -7,8 +7,6 @@ import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.SuspendableRunnable import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.then -import com.r3corda.node.log -import org.slf4j.LoggerFactory import rx.Observable import rx.Subscriber import rx.Subscription @@ -39,7 +37,7 @@ abstract class MutableClock : Clock() { private val _version = AtomicLong(0L) /** - * This tracks how many direct mutations of "now" have occured for this [Clock], but not the passage of time. + * This tracks how many direct mutations of "now" have occurred for this [Clock], but not the passage of time. * * It starts at zero, and increments by one per mutation. */ @@ -81,7 +79,6 @@ abstract class MutableClock : Clock() { * * @throws InterruptedException if interrupted by something other than a [MutableClock]. */ -@Suppress("UNUSED_VALUE") // This is here due to the compiler thinking version is not used @Suspendable private fun Clock.doInterruptibly(runnable: SuspendableRunnable) { var subscription: Subscription? = null