Remove use of interrupts to in ClockUtils

This commit is contained in:
rick.parker 2016-09-09 15:27:29 +01:00
parent a11de4258a
commit 03a04d10ca

View File

@ -1,15 +1,11 @@
package com.r3corda.node.utilities package com.r3corda.node.utilities
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.AbstractFuture
import co.paralleluniverse.strands.SettableFuture import co.paralleluniverse.strands.SettableFuture
import co.paralleluniverse.strands.Strand
import co.paralleluniverse.strands.SuspendableRunnable
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.then import com.r3corda.core.then
import rx.Observable import rx.Observable
import rx.Subscriber import rx.Subscriber
import rx.Subscription
import rx.subscriptions.Subscriptions import rx.subscriptions.Subscriptions
import java.time.Clock import java.time.Clock
import java.time.Duration import java.time.Duration
@ -17,6 +13,7 @@ import java.time.Instant
import java.util.concurrent.* import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.function.BiConsumer import java.util.function.BiConsumer
import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture
/** /**
* The classes and methods in this file allow the use of custom Clocks in demos, simulations and testing * The classes and methods in this file allow the use of custom Clocks in demos, simulations and testing
@ -72,89 +69,60 @@ abstract class MutableClock : Clock() {
} }
} }
/**
* Internal method to do something that can be interrupted by a [MutableClock] in the case
* of it being mutated. Just returns if [MutableClock] mutates, so needs to be
* called in a loop.
*
* @throws InterruptedException if interrupted by something other than a [MutableClock].
*/
@Suspendable
private fun Clock.doInterruptibly(runnable: SuspendableRunnable) {
var subscription: Subscription? = null
var interruptedByMutation = false
try {
if (this is MutableClock) {
val strand = Strand.currentStrand()
subscription = this.mutations.subscribe {
interruptedByMutation = true
strand.interrupt()
}
}
runnable.run()
} catch(e: InterruptedException) {
// If clock has not mutated, then re-throw
if (!interruptedByMutation) {
throw e
}
} finally {
if (this is MutableClock) {
subscription!!.unsubscribe()
}
Strand.interrupted()
}
}
/** /**
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations * Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
* used in demos or testing. This will also substitute a Fiber compatible Future if required. * used in demos or testing. This will substitute a Fiber compatible Future so the current [Strand] is not blocked.
* *
* @return true if the [Future] is complete, false if the deadline was reached. * @return true if the [Future] is complete, false if the deadline was reached.
*/ */
@Suspendable @Suspendable
fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = SettableFuture<Any>()): Boolean { fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = GuavaSettableFuture.create<Any>()): Boolean {
// convert the future to a strand friendly variety if possible so as not to accidentally block the underlying thread var nanos: Long
val fiberFriendlyFuture = makeFutureCurrentStrandFriendly(future)
var nanos = 0L
do { do {
doInterruptibly(SuspendableRunnable @Suspendable { val originalFutureCompleted = makeStrandFriendlySettableFuture(future)
val subscription = if (this is MutableClock) {
mutations.first().subscribe {
originalFutureCompleted.set(false)
}
} else {
null
}
nanos = Duration.between(this.instant(), deadline).toNanos() nanos = Duration.between(this.instant(), deadline).toNanos()
if (nanos > 0) { if (nanos > 0) {
try { try {
fiberFriendlyFuture.get(nanos, TimeUnit.NANOSECONDS) // This will return when it times out, or when the clock mutates or when when the original future completes.
originalFutureCompleted.get(nanos, TimeUnit.NANOSECONDS)
} catch(e: ExecutionException) { } catch(e: ExecutionException) {
// No need to take action as will fall out of the loop due to future.isDone // No need to take action as will fall out of the loop due to future.isDone
} catch(e: CancellationException) { } catch(e: CancellationException) {
// No need to take action as will fall out of the loop due to future.isDone // No need to take action as will fall out of the loop due to future.isDone
} }
} }
}) subscription?.unsubscribe()
} while (!future.isDone && nanos > 0) originalFutureCompleted.cancel(false)
} while (nanos > 0 && !future.isDone)
return future.isDone return future.isDone
} }
/** /**
* Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation if currently * Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation and set to true when a result
* on a Fiber and not already using Quasar futures. * or [Throwable] is available in the original.
* *
* We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context * We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context
* switch. There's no need to checkpoint our Fibers as there's no external effect of waiting. * switch. There's no need to checkpoint our Fibers as there's no external effect of waiting.
*/ */
private fun <T : Any> makeFutureCurrentStrandFriendly(future: Future<T>): Future<out T> { private fun <T : Any> makeStrandFriendlySettableFuture(future: Future<T>): SettableFuture<Boolean> {
return if (Strand.isCurrentFiber() && future !is AbstractFuture) { return if (future is ListenableFuture) {
if (future is ListenableFuture) { val settable = SettableFuture<Boolean>()
val settable = SettableFuture<T>() future.then { settable.set(true) }
future.then { settable.set(null) }
settable settable
} else if (future is CompletableFuture) { } else if (future is CompletableFuture) {
val settable = SettableFuture<T>() val settable = SettableFuture<Boolean>()
future.whenComplete(BiConsumer { value, throwable -> settable.set(null) }) future.whenComplete(BiConsumer { value, throwable -> settable.set(true) })
settable settable
} else { } else {
throw IllegalArgumentException("Cannot make future $future Fiber friendly whilst on a Fiber") throw IllegalArgumentException("Cannot make future $future Fiber friendly.")
} }
} else future
} }