diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt b/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt index cc0d47d19d..181de735ab 100644 --- a/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt +++ b/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt @@ -1,15 +1,11 @@ package com.r3corda.node.utilities import co.paralleluniverse.fibers.Suspendable -import co.paralleluniverse.strands.AbstractFuture import co.paralleluniverse.strands.SettableFuture -import co.paralleluniverse.strands.Strand -import co.paralleluniverse.strands.SuspendableRunnable import com.google.common.util.concurrent.ListenableFuture import com.r3corda.core.then import rx.Observable import rx.Subscriber -import rx.Subscription import rx.subscriptions.Subscriptions import java.time.Clock import java.time.Duration @@ -17,6 +13,7 @@ import java.time.Instant import java.util.concurrent.* import java.util.concurrent.atomic.AtomicLong import java.util.function.BiConsumer +import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture /** * The classes and methods in this file allow the use of custom Clocks in demos, simulations and testing @@ -72,89 +69,60 @@ abstract class MutableClock : Clock() { } } -/** - * Internal method to do something that can be interrupted by a [MutableClock] in the case - * of it being mutated. Just returns if [MutableClock] mutates, so needs to be - * called in a loop. - * - * @throws InterruptedException if interrupted by something other than a [MutableClock]. - */ -@Suspendable -private fun Clock.doInterruptibly(runnable: SuspendableRunnable) { - var subscription: Subscription? = null - var interruptedByMutation = false - try { - if (this is MutableClock) { - val strand = Strand.currentStrand() - subscription = this.mutations.subscribe { - interruptedByMutation = true - strand.interrupt() - } - } - runnable.run() - } catch(e: InterruptedException) { - // If clock has not mutated, then re-throw - if (!interruptedByMutation) { - throw e - } - } finally { - if (this is MutableClock) { - subscription!!.unsubscribe() - } - Strand.interrupted() - } -} - /** * Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations - * used in demos or testing. This will also substitute a Fiber compatible Future if required. + * used in demos or testing. This will substitute a Fiber compatible Future so the current [Strand] is not blocked. * * @return true if the [Future] is complete, false if the deadline was reached. */ @Suspendable -fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = SettableFuture()): Boolean { - // convert the future to a strand friendly variety if possible so as not to accidentally block the underlying thread - val fiberFriendlyFuture = makeFutureCurrentStrandFriendly(future) - - var nanos = 0L +fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = GuavaSettableFuture.create()): Boolean { + var nanos: Long do { - doInterruptibly(SuspendableRunnable @Suspendable { - nanos = Duration.between(this.instant(), deadline).toNanos() - if (nanos > 0) { - try { - fiberFriendlyFuture.get(nanos, TimeUnit.NANOSECONDS) - } catch(e: ExecutionException) { - // No need to take action as will fall out of the loop due to future.isDone - } catch(e: CancellationException) { - // No need to take action as will fall out of the loop due to future.isDone - } + val originalFutureCompleted = makeStrandFriendlySettableFuture(future) + val subscription = if (this is MutableClock) { + mutations.first().subscribe { + originalFutureCompleted.set(false) } - }) - } while (!future.isDone && nanos > 0) + } else { + null + } + nanos = Duration.between(this.instant(), deadline).toNanos() + if (nanos > 0) { + try { + // This will return when it times out, or when the clock mutates or when when the original future completes. + originalFutureCompleted.get(nanos, TimeUnit.NANOSECONDS) + } catch(e: ExecutionException) { + // No need to take action as will fall out of the loop due to future.isDone + } catch(e: CancellationException) { + // No need to take action as will fall out of the loop due to future.isDone + } + } + subscription?.unsubscribe() + originalFutureCompleted.cancel(false) + } while (nanos > 0 && !future.isDone) return future.isDone } /** - * Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation if currently - * on a Fiber and not already using Quasar futures. + * Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation and set to true when a result + * or [Throwable] is available in the original. * * We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context * switch. There's no need to checkpoint our Fibers as there's no external effect of waiting. */ -private fun makeFutureCurrentStrandFriendly(future: Future): Future { - return if (Strand.isCurrentFiber() && future !is AbstractFuture) { - if (future is ListenableFuture) { - val settable = SettableFuture() - future.then { settable.set(null) } - settable - } else if (future is CompletableFuture) { - val settable = SettableFuture() - future.whenComplete(BiConsumer { value, throwable -> settable.set(null) }) - settable - } else { - throw IllegalArgumentException("Cannot make future $future Fiber friendly whilst on a Fiber") - } - } else future +private fun makeStrandFriendlySettableFuture(future: Future): SettableFuture { + return if (future is ListenableFuture) { + val settable = SettableFuture() + future.then { settable.set(true) } + settable + } else if (future is CompletableFuture) { + val settable = SettableFuture() + future.whenComplete(BiConsumer { value, throwable -> settable.set(true) }) + settable + } else { + throw IllegalArgumentException("Cannot make future $future Fiber friendly.") + } }