Split ClockUtils to improve Java compatibility

Split `ClockUtils` into `MutableClock`, and move the extension functions into `NodeSchedulerService` which is the only thing that uses them.
This commit is contained in:
Ross Nicoll 2017-08-29 16:57:29 +01:00 committed by GitHub
parent a2a3f51689
commit 2facab3be3
7 changed files with 143 additions and 146 deletions
node/src
main/kotlin/net/corda/node
test/kotlin/net/corda/node
test-utils/src/main/kotlin/net/corda/testing/node

@ -0,0 +1,45 @@
package net.corda.node.internal
import rx.Observable
import rx.Subscriber
import rx.subscriptions.Subscriptions
import java.time.Clock
import java.util.concurrent.CopyOnWriteArraySet
import java.util.concurrent.atomic.AtomicLong
/**
* An abstract class with helper methods for a type of Clock that might have it's concept of "now"
* adjusted externally.
*
* e.g. for testing (so unit tests do not have to wait for timeouts in realtime) or for demos and simulations.
*/
abstract class MutableClock : Clock() {
private val _version = AtomicLong(0L)
/**
* This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations.
*/
val mutations: Observable<Long> by lazy {
Observable.create({ subscriber: Subscriber<in Long> ->
if (!subscriber.isUnsubscribed) {
mutationObservers.add(subscriber)
// This is not very intuitive, but subscribing to a subscriber observes unsubscribes.
subscriber.add(Subscriptions.create { mutationObservers.remove(subscriber) })
}
})
}
private val mutationObservers = CopyOnWriteArraySet<Subscriber<in Long>>()
/**
* Must be called by subclasses when they mutate (but not just with the passage of time as per the "wall clock").
*/
protected fun notifyMutationObservers() {
val version = _version.incrementAndGet()
for (observer in mutationObservers) {
if (!observer.isUnsubscribed) {
observer.onNext(version)
}
}
}
}

@ -1,26 +1,39 @@
package net.corda.node.services.events
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.contracts.*
import net.corda.core.internal.ThreadBox
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.SettableFuture as QuasarSettableFuture
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture
import net.corda.core.contracts.SchedulableState
import net.corda.core.contracts.ScheduledActivity
import net.corda.core.contracts.ScheduledStateRef
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.until
import net.corda.core.schemas.PersistentStateRef
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
import net.corda.node.internal.MutableClock
import net.corda.node.services.api.SchedulerService
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.utilities.*
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.NODE_DATABASE_PREFIX
import net.corda.node.utilities.PersistentMap
import org.apache.activemq.artemis.utils.ReusableLatch
import java.time.Clock
import java.time.Instant
import java.util.*
import java.util.concurrent.Executor
import java.util.concurrent.Executors
import java.util.concurrent.*
import javax.annotation.concurrent.ThreadSafe
import javax.persistence.*
import javax.persistence.Column
import javax.persistence.EmbeddedId
import javax.persistence.Entity
/**
* A first pass of a simple [SchedulerService] that works with [MutableClock]s for testing, demonstrations and simulations
@ -48,6 +61,48 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
companion object {
private val log = loggerFor<NodeSchedulerService>()
/**
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
* used in demos or testing. This will substitute a Fiber compatible Future so the current
* [co.paralleluniverse.strands.Strand] is not blocked.
*
* @return true if the [Future] is complete, false if the deadline was reached.
*/
// We should try to make the Clock used in our code injectable (for tests etc) and to use the extension below
// to wait in our code, rather than <code>Thread.sleep()</code> or other time-based pauses.
@Suspendable
@VisibleForTesting
// We specify full classpath on SettableFuture to differentiate it from the Quasar class of the same name
fun awaitWithDeadline(clock: Clock, deadline: Instant, future: Future<*> = GuavaSettableFuture.create<Any>()): Boolean {
var nanos: Long
do {
val originalFutureCompleted = makeStrandFriendlySettableFuture(future)
val subscription = if (clock is MutableClock) {
clock.mutations.first().subscribe {
originalFutureCompleted.set(false)
}
} else {
null
}
nanos = (clock.instant() until deadline).toNanos()
if (nanos > 0) {
try {
// This will return when it times out, or when the clock mutates or when when the original future completes.
originalFutureCompleted.get(nanos, TimeUnit.NANOSECONDS)
} catch(e: ExecutionException) {
// No need to take action as will fall out of the loop due to future.isDone
} catch(e: CancellationException) {
// No need to take action as will fall out of the loop due to future.isDone
} catch(e: TimeoutException) {
// No need to take action as will fall out of the loop due to future.isDone
}
}
subscription?.unsubscribe()
originalFutureCompleted.cancel(false)
} while (nanos > 0 && !future.isDone)
return future.isDone
}
fun createMap(): PersistentMap<StateRef, ScheduledStateRef, PersistentScheduledState, PersistentStateRef> {
return PersistentMap(
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
@ -67,6 +122,21 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
persistentEntityClass = PersistentScheduledState::class.java
)
}
/**
* Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation and set to true when a result
* or [Throwable] is available in the original.
*
* We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context
* switch. There's no need to checkpoint our [Fiber]s as there's no external effect of waiting.
*/
private fun <T : Any> makeStrandFriendlySettableFuture(future: Future<T>) = QuasarSettableFuture<Boolean>().also { g ->
when (future) {
is ListenableFuture -> future.addListener(Runnable { g.set(true) }, Executor { it.run() })
is CompletionStage<*> -> future.whenComplete { _, _ -> g.set(true) }
else -> throw IllegalArgumentException("Cannot make future $future Strand friendly.")
}
}
}
@Entity
@ -84,7 +154,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
var scheduledStatesQueue: PriorityQueue<ScheduledStateRef> = PriorityQueue( { a, b -> a.scheduledAt.compareTo(b.scheduledAt) } )
var rescheduled: SettableFuture<Boolean>? = null
var rescheduled: GuavaSettableFuture<Boolean>? = null
}
private val mutex = ThreadBox(InnerState())
@ -145,7 +215,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
// Note, we already have the mutex but we need the scope again here
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
rescheduled?.cancel(false)
rescheduled = SettableFuture.create()
rescheduled = GuavaSettableFuture.create()
Pair(scheduledStatesQueue.peek(), rescheduled!!)
}
if (scheduledState != null) {
@ -153,7 +223,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
log.trace { "Scheduling as next $scheduledState" }
// This will block the scheduler single thread until the scheduled time (returns false) OR
// the Future is cancelled due to rescheduling (returns true).
if (!services.clock.awaitWithDeadline(scheduledState.scheduledAt, ourRescheduledFuture)) {
if (!awaitWithDeadline(services.clock, scheduledState.scheduledAt, ourRescheduledFuture)) {
log.trace { "Invoking as next $scheduledState" }
onTimeReached(scheduledState)
} else {

@ -1,115 +0,0 @@
package net.corda.node.utilities
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.SettableFuture
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.internal.until
import rx.Observable
import rx.Subscriber
import rx.subscriptions.Subscriptions
import java.time.Clock
import java.time.Instant
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicLong
import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture
/**
* The classes and methods in this file allow the use of custom Clocks in demos, simulations and testing
* that might not follow "real time" or "wall clock time". i.e. they allow time to be fast forwarded.
*
* We should try to make the Clock used in our code injectable (for tests etc) and to use the extensions below
* to wait in our code, rather than <code>Thread.sleep()</code> or <code>Object.wait()</code> etc.
*/
/**
* An abstract class with helper methods for a type of Clock that might have it's concept of "now"
* adjusted externally.
*
* e.g. for testing (so unit tests do not have to wait for timeouts in realtime) or for demos and simulations.
*/
abstract class MutableClock : Clock() {
private val _version = AtomicLong(0L)
/**
* This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations.
*/
val mutations: Observable<Long> by lazy {
Observable.create({ subscriber: Subscriber<in Long> ->
if (!subscriber.isUnsubscribed) {
mutationObservers.add(subscriber)
// This is not very intuitive, but subscribing to a subscriber observes unsubscribes.
subscriber.add(Subscriptions.create { mutationObservers.remove(subscriber) })
}
})
}
private val mutationObservers = CopyOnWriteArraySet<Subscriber<in Long>>()
/**
* Must be called by subclasses when they mutate (but not just with the passage of time as per the "wall clock").
*/
protected fun notifyMutationObservers() {
val version = _version.incrementAndGet()
for (observer in mutationObservers) {
if (!observer.isUnsubscribed) {
observer.onNext(version)
}
}
}
}
/**
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
* used in demos or testing. This will substitute a Fiber compatible Future so the current
* [co.paralleluniverse.strands.Strand] is not blocked.
*
* @return true if the [Future] is complete, false if the deadline was reached.
*/
@Suspendable
fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = GuavaSettableFuture.create<Any>()): Boolean {
var nanos: Long
do {
val originalFutureCompleted = makeStrandFriendlySettableFuture(future)
val subscription = if (this is MutableClock) {
mutations.first().subscribe {
originalFutureCompleted.set(false)
}
} else {
null
}
nanos = (instant() until deadline).toNanos()
if (nanos > 0) {
try {
// This will return when it times out, or when the clock mutates or when when the original future completes.
originalFutureCompleted.get(nanos, TimeUnit.NANOSECONDS)
} catch(e: ExecutionException) {
// No need to take action as will fall out of the loop due to future.isDone
} catch(e: CancellationException) {
// No need to take action as will fall out of the loop due to future.isDone
} catch(e: TimeoutException) {
// No need to take action as will fall out of the loop due to future.isDone
}
}
subscription?.unsubscribe()
originalFutureCompleted.cancel(false)
} while (nanos > 0 && !future.isDone)
return future.isDone
}
/**
* Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation and set to true when a result
* or [Throwable] is available in the original.
*
* We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context
* switch. There's no need to checkpoint our Fibers as there's no external effect of waiting.
*/
private fun <T : Any> makeStrandFriendlySettableFuture(future: Future<T>) = SettableFuture<Boolean>().also { g ->
when (future) {
is ListenableFuture -> future.addListener(Runnable { g.set(true) }, Executor { it.run() })
is CompletionStage<*> -> future.whenComplete { _, _ -> g.set(true) }
else -> throw IllegalArgumentException("Cannot make future $future Fiber friendly.")
}
}

@ -4,6 +4,7 @@ import net.corda.core.internal.until
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializeAsTokenContext
import net.corda.core.serialization.SingletonSerializationToken.Companion.singletonSerializationToken
import net.corda.node.internal.MutableClock
import java.time.Clock
import java.time.Instant
import java.time.LocalDate

@ -1,7 +1,6 @@
package net.corda.node.services.events
import net.corda.core.contracts.*
import net.corda.core.utilities.days
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowLogicRef
import net.corda.core.flows.FlowLogicRefFactory
@ -10,11 +9,10 @@ import net.corda.core.node.ServiceHub
import net.corda.core.node.services.VaultService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.days
import net.corda.node.services.MockServiceHubInternal
import net.corda.node.services.database.HibernateConfiguration
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.schema.NodeSchemaService
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.vault.NodeVaultService
@ -22,10 +20,8 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.CordaPersistence
import net.corda.node.utilities.configureDatabase
import net.corda.testing.*
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.MockKeyManagementService
import net.corda.testing.node.*
import net.corda.testing.node.TestClock
import net.corda.testing.node.InMemoryMessagingNetwork
import org.assertj.core.api.Assertions.assertThat
import org.bouncycastle.asn1.x500.X500Name
import org.junit.After

@ -8,6 +8,7 @@ import com.google.common.util.concurrent.SettableFuture
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.hours
import net.corda.core.utilities.minutes
import net.corda.node.services.events.NodeSchedulerService
import net.corda.testing.node.TestClock
import org.junit.After
import org.junit.Before
@ -41,24 +42,24 @@ class ClockUtilsTest {
@Test
fun `test waiting no time for a deadline`() {
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant()), "Should have reached deadline")
assertFalse(NodeSchedulerService.awaitWithDeadline(stoppedClock, stoppedClock.instant()), "Should have reached deadline")
}
@Test
fun `test waiting negative time for a deadline`() {
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(1.hours)), "Should have reached deadline")
assertFalse(NodeSchedulerService.awaitWithDeadline(stoppedClock, stoppedClock.instant().minus(1.hours)), "Should have reached deadline")
}
@Test
fun `test waiting no time for a deadline with incomplete future`() {
val future = SettableFuture.create<Boolean>()
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant(), future), "Should have reached deadline")
assertFalse(NodeSchedulerService.awaitWithDeadline(stoppedClock, stoppedClock.instant(), future), "Should have reached deadline")
}
@Test
fun `test waiting negative time for a deadline with incomplete future`() {
val future = SettableFuture.create<Boolean>()
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(1.hours), future), "Should have reached deadline")
assertFalse(NodeSchedulerService.awaitWithDeadline(stoppedClock, stoppedClock.instant().minus(1.hours), future), "Should have reached deadline")
}
@ -67,7 +68,7 @@ class ClockUtilsTest {
val advancedClock = Clock.offset(stoppedClock, 1.hours)
val future = SettableFuture.create<Boolean>()
completeNow(future)
assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline")
assertTrue(NodeSchedulerService.awaitWithDeadline(stoppedClock, advancedClock.instant(), future), "Should not have reached deadline")
}
@Test
@ -75,7 +76,7 @@ class ClockUtilsTest {
val advancedClock = Clock.offset(stoppedClock, 1.hours)
val future = SettableFuture.create<Boolean>()
completeAfterWaiting(future)
assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline")
assertTrue(NodeSchedulerService.awaitWithDeadline(stoppedClock, advancedClock.instant(), future), "Should not have reached deadline")
}
@Test
@ -83,7 +84,7 @@ class ClockUtilsTest {
val advancedClock = Clock.offset(stoppedClock, 1.hours)
val testClock = TestClock(stoppedClock)
advanceClockAfterWait(testClock, 1.hours)
assertFalse(testClock.awaitWithDeadline(advancedClock.instant()), "Should have reached deadline")
assertFalse(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant()), "Should have reached deadline")
}
@Test
@ -92,7 +93,7 @@ class ClockUtilsTest {
val testClock = TestClock(stoppedClock)
val future = SettableFuture.create<Boolean>()
advanceClockAfterWait(testClock, 1.hours)
assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline")
assertFalse(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future), "Should have reached deadline")
}
@Test
@ -102,7 +103,7 @@ class ClockUtilsTest {
val future = SettableFuture.create<Boolean>()
advanceClockAfterWait(testClock, 1.hours)
completeAfterWaiting(future)
assertTrue(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline")
assertTrue(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future), "Should not have reached deadline")
}
@Test
@ -113,7 +114,7 @@ class ClockUtilsTest {
for (advance in 1..6) {
advanceClockAfterWait(testClock, 10.minutes)
}
assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline")
assertFalse(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future), "Should have reached deadline")
}
@Test
@ -131,7 +132,7 @@ class ClockUtilsTest {
val advancedClock = Clock.offset(stoppedClock, 10.hours)
try {
testClock.awaitWithDeadline(advancedClock.instant(), SettableFuture.create<Boolean>())
NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), SettableFuture.create<Boolean>())
fail("Expected InterruptedException")
} catch (exception: InterruptedException) {
}
@ -145,7 +146,7 @@ class ClockUtilsTest {
val future = CompletableFuture<Boolean>()
val scheduler = FiberExecutorScheduler("test", executor)
val fiber = scheduler.newFiber(@Suspendable {
future.complete(testClock.awaitWithDeadline(advancedClock.instant(), future))
future.complete(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future))
}).start()
for (advance in 1..6) {
scheduler.newFiber(@Suspendable {
@ -167,7 +168,7 @@ class ClockUtilsTest {
val future = SettableFuture.create<Boolean>()
val scheduler = FiberExecutorScheduler("test", executor)
val fiber = scheduler.newFiber(@Suspendable {
future.set(testClock.awaitWithDeadline(advancedClock.instant(), future))
future.set(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future))
}).start()
for (advance in 1..6) {
scheduler.newFiber(@Suspendable {

@ -4,8 +4,7 @@ import net.corda.core.internal.until
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.serialization.SerializeAsTokenContext
import net.corda.core.serialization.SingletonSerializationToken.Companion.singletonSerializationToken
import net.corda.core.internal.until
import net.corda.node.utilities.MutableClock
import net.corda.node.internal.MutableClock
import java.time.Clock
import java.time.Duration
import java.time.Instant