From 2facab3be3dac24faaca5d3fd334c9966ef47379 Mon Sep 17 00:00:00 2001 From: Ross Nicoll Date: Tue, 29 Aug 2017 16:57:29 +0100 Subject: [PATCH] Split ClockUtils to improve Java compatibility Split `ClockUtils` into `MutableClock`, and move the extension functions into `NodeSchedulerService` which is the only thing that uses them. --- .../net/corda/node/internal/MutableClock.kt | 45 +++++++ .../services/events/NodeSchedulerService.kt | 90 ++++++++++++-- .../net/corda/node/utilities/ClockUtils.kt | 115 ------------------ .../net/corda/node/utilities/TestClock.kt | 1 + .../events/NodeSchedulerServiceTest.kt | 8 +- .../corda/node/utilities/ClockUtilsTest.kt | 27 ++-- .../net/corda/testing/node/TestClock.kt | 3 +- 7 files changed, 143 insertions(+), 146 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/internal/MutableClock.kt delete mode 100644 node/src/main/kotlin/net/corda/node/utilities/ClockUtils.kt diff --git a/node/src/main/kotlin/net/corda/node/internal/MutableClock.kt b/node/src/main/kotlin/net/corda/node/internal/MutableClock.kt new file mode 100644 index 0000000000..3a52cf377c --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/internal/MutableClock.kt @@ -0,0 +1,45 @@ +package net.corda.node.internal + +import rx.Observable +import rx.Subscriber +import rx.subscriptions.Subscriptions +import java.time.Clock +import java.util.concurrent.CopyOnWriteArraySet +import java.util.concurrent.atomic.AtomicLong + +/** + * An abstract class with helper methods for a type of Clock that might have it's concept of "now" + * adjusted externally. + * + * e.g. for testing (so unit tests do not have to wait for timeouts in realtime) or for demos and simulations. + */ +abstract class MutableClock : Clock() { + private val _version = AtomicLong(0L) + + /** + * This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations. + */ + val mutations: Observable by lazy { + Observable.create({ subscriber: Subscriber -> + if (!subscriber.isUnsubscribed) { + mutationObservers.add(subscriber) + // This is not very intuitive, but subscribing to a subscriber observes unsubscribes. + subscriber.add(Subscriptions.create { mutationObservers.remove(subscriber) }) + } + }) + } + + private val mutationObservers = CopyOnWriteArraySet>() + + /** + * Must be called by subclasses when they mutate (but not just with the passage of time as per the "wall clock"). + */ + protected fun notifyMutationObservers() { + val version = _version.incrementAndGet() + for (observer in mutationObservers) { + if (!observer.isUnsubscribed) { + observer.onNext(version) + } + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 908c20c399..e7d9ac38ef 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -1,26 +1,39 @@ package net.corda.node.services.events -import com.google.common.util.concurrent.SettableFuture -import net.corda.core.contracts.* -import net.corda.core.internal.ThreadBox +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.SettableFuture as QuasarSettableFuture +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture +import net.corda.core.contracts.SchedulableState +import net.corda.core.contracts.ScheduledActivity +import net.corda.core.contracts.ScheduledStateRef +import net.corda.core.contracts.StateRef import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowInitiator import net.corda.core.flows.FlowLogic +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.VisibleForTesting +import net.corda.core.internal.until import net.corda.core.schemas.PersistentStateRef import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace +import net.corda.node.internal.MutableClock import net.corda.node.services.api.SchedulerService import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl -import net.corda.node.utilities.* +import net.corda.node.utilities.AffinityExecutor +import net.corda.node.utilities.NODE_DATABASE_PREFIX +import net.corda.node.utilities.PersistentMap import org.apache.activemq.artemis.utils.ReusableLatch +import java.time.Clock import java.time.Instant import java.util.* -import java.util.concurrent.Executor -import java.util.concurrent.Executors +import java.util.concurrent.* import javax.annotation.concurrent.ThreadSafe -import javax.persistence.* +import javax.persistence.Column +import javax.persistence.EmbeddedId +import javax.persistence.Entity /** * A first pass of a simple [SchedulerService] that works with [MutableClock]s for testing, demonstrations and simulations @@ -48,6 +61,48 @@ class NodeSchedulerService(private val services: ServiceHubInternal, companion object { private val log = loggerFor() + /** + * Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations + * used in demos or testing. This will substitute a Fiber compatible Future so the current + * [co.paralleluniverse.strands.Strand] is not blocked. + * + * @return true if the [Future] is complete, false if the deadline was reached. + */ + // We should try to make the Clock used in our code injectable (for tests etc) and to use the extension below + // to wait in our code, rather than Thread.sleep() or other time-based pauses. + @Suspendable + @VisibleForTesting + // We specify full classpath on SettableFuture to differentiate it from the Quasar class of the same name + fun awaitWithDeadline(clock: Clock, deadline: Instant, future: Future<*> = GuavaSettableFuture.create()): Boolean { + var nanos: Long + do { + val originalFutureCompleted = makeStrandFriendlySettableFuture(future) + val subscription = if (clock is MutableClock) { + clock.mutations.first().subscribe { + originalFutureCompleted.set(false) + } + } else { + null + } + nanos = (clock.instant() until deadline).toNanos() + if (nanos > 0) { + try { + // This will return when it times out, or when the clock mutates or when when the original future completes. + originalFutureCompleted.get(nanos, TimeUnit.NANOSECONDS) + } catch(e: ExecutionException) { + // No need to take action as will fall out of the loop due to future.isDone + } catch(e: CancellationException) { + // No need to take action as will fall out of the loop due to future.isDone + } catch(e: TimeoutException) { + // No need to take action as will fall out of the loop due to future.isDone + } + } + subscription?.unsubscribe() + originalFutureCompleted.cancel(false) + } while (nanos > 0 && !future.isDone) + return future.isDone + } + fun createMap(): PersistentMap { return PersistentMap( toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) }, @@ -67,6 +122,21 @@ class NodeSchedulerService(private val services: ServiceHubInternal, persistentEntityClass = PersistentScheduledState::class.java ) } + + /** + * Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation and set to true when a result + * or [Throwable] is available in the original. + * + * We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context + * switch. There's no need to checkpoint our [Fiber]s as there's no external effect of waiting. + */ + private fun makeStrandFriendlySettableFuture(future: Future) = QuasarSettableFuture().also { g -> + when (future) { + is ListenableFuture -> future.addListener(Runnable { g.set(true) }, Executor { it.run() }) + is CompletionStage<*> -> future.whenComplete { _, _ -> g.set(true) } + else -> throw IllegalArgumentException("Cannot make future $future Strand friendly.") + } + } } @Entity @@ -84,7 +154,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal, var scheduledStatesQueue: PriorityQueue = PriorityQueue( { a, b -> a.scheduledAt.compareTo(b.scheduledAt) } ) - var rescheduled: SettableFuture? = null + var rescheduled: GuavaSettableFuture? = null } private val mutex = ThreadBox(InnerState()) @@ -145,7 +215,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal, // Note, we already have the mutex but we need the scope again here val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked { rescheduled?.cancel(false) - rescheduled = SettableFuture.create() + rescheduled = GuavaSettableFuture.create() Pair(scheduledStatesQueue.peek(), rescheduled!!) } if (scheduledState != null) { @@ -153,7 +223,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal, log.trace { "Scheduling as next $scheduledState" } // This will block the scheduler single thread until the scheduled time (returns false) OR // the Future is cancelled due to rescheduling (returns true). - if (!services.clock.awaitWithDeadline(scheduledState.scheduledAt, ourRescheduledFuture)) { + if (!awaitWithDeadline(services.clock, scheduledState.scheduledAt, ourRescheduledFuture)) { log.trace { "Invoking as next $scheduledState" } onTimeReached(scheduledState) } else { diff --git a/node/src/main/kotlin/net/corda/node/utilities/ClockUtils.kt b/node/src/main/kotlin/net/corda/node/utilities/ClockUtils.kt deleted file mode 100644 index 44f3d0ec2c..0000000000 --- a/node/src/main/kotlin/net/corda/node/utilities/ClockUtils.kt +++ /dev/null @@ -1,115 +0,0 @@ -package net.corda.node.utilities - -import co.paralleluniverse.fibers.Suspendable -import co.paralleluniverse.strands.SettableFuture -import com.google.common.util.concurrent.ListenableFuture -import net.corda.core.internal.until -import rx.Observable -import rx.Subscriber -import rx.subscriptions.Subscriptions -import java.time.Clock -import java.time.Instant -import java.util.concurrent.* -import java.util.concurrent.atomic.AtomicLong -import com.google.common.util.concurrent.SettableFuture as GuavaSettableFuture - -/** - * The classes and methods in this file allow the use of custom Clocks in demos, simulations and testing - * that might not follow "real time" or "wall clock time". i.e. they allow time to be fast forwarded. - * - * We should try to make the Clock used in our code injectable (for tests etc) and to use the extensions below - * to wait in our code, rather than Thread.sleep() or Object.wait() etc. - */ - -/** - * An abstract class with helper methods for a type of Clock that might have it's concept of "now" - * adjusted externally. - * - * e.g. for testing (so unit tests do not have to wait for timeouts in realtime) or for demos and simulations. - */ -abstract class MutableClock : Clock() { - - private val _version = AtomicLong(0L) - - /** - * This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations. - */ - val mutations: Observable by lazy { - Observable.create({ subscriber: Subscriber -> - if (!subscriber.isUnsubscribed) { - mutationObservers.add(subscriber) - // This is not very intuitive, but subscribing to a subscriber observes unsubscribes. - subscriber.add(Subscriptions.create { mutationObservers.remove(subscriber) }) - } - }) - } - - private val mutationObservers = CopyOnWriteArraySet>() - - /** - * Must be called by subclasses when they mutate (but not just with the passage of time as per the "wall clock"). - */ - protected fun notifyMutationObservers() { - val version = _version.incrementAndGet() - for (observer in mutationObservers) { - if (!observer.isUnsubscribed) { - observer.onNext(version) - } - } - } -} - -/** - * Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations - * used in demos or testing. This will substitute a Fiber compatible Future so the current - * [co.paralleluniverse.strands.Strand] is not blocked. - * - * @return true if the [Future] is complete, false if the deadline was reached. - */ -@Suspendable -fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = GuavaSettableFuture.create()): Boolean { - var nanos: Long - do { - val originalFutureCompleted = makeStrandFriendlySettableFuture(future) - val subscription = if (this is MutableClock) { - mutations.first().subscribe { - originalFutureCompleted.set(false) - } - } else { - null - } - nanos = (instant() until deadline).toNanos() - if (nanos > 0) { - try { - // This will return when it times out, or when the clock mutates or when when the original future completes. - originalFutureCompleted.get(nanos, TimeUnit.NANOSECONDS) - } catch(e: ExecutionException) { - // No need to take action as will fall out of the loop due to future.isDone - } catch(e: CancellationException) { - // No need to take action as will fall out of the loop due to future.isDone - } catch(e: TimeoutException) { - // No need to take action as will fall out of the loop due to future.isDone - } - } - subscription?.unsubscribe() - originalFutureCompleted.cancel(false) - } while (nanos > 0 && !future.isDone) - return future.isDone -} - -/** - * Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation and set to true when a result - * or [Throwable] is available in the original. - * - * We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context - * switch. There's no need to checkpoint our Fibers as there's no external effect of waiting. - */ -private fun makeStrandFriendlySettableFuture(future: Future) = SettableFuture().also { g -> - when (future) { - is ListenableFuture -> future.addListener(Runnable { g.set(true) }, Executor { it.run() }) - is CompletionStage<*> -> future.whenComplete { _, _ -> g.set(true) } - else -> throw IllegalArgumentException("Cannot make future $future Fiber friendly.") - } -} - - diff --git a/node/src/main/kotlin/net/corda/node/utilities/TestClock.kt b/node/src/main/kotlin/net/corda/node/utilities/TestClock.kt index 3d1c57d312..e582f45147 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/TestClock.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/TestClock.kt @@ -4,6 +4,7 @@ import net.corda.core.internal.until import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsTokenContext import net.corda.core.serialization.SingletonSerializationToken.Companion.singletonSerializationToken +import net.corda.node.internal.MutableClock import java.time.Clock import java.time.Instant import java.time.LocalDate diff --git a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt index df17f414c2..a920256da3 100644 --- a/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/events/NodeSchedulerServiceTest.kt @@ -1,7 +1,6 @@ package net.corda.node.services.events import net.corda.core.contracts.* -import net.corda.core.utilities.days import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRef import net.corda.core.flows.FlowLogicRefFactory @@ -10,11 +9,10 @@ import net.corda.core.node.ServiceHub import net.corda.core.node.services.VaultService import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.TransactionBuilder +import net.corda.core.utilities.days import net.corda.node.services.MockServiceHubInternal -import net.corda.node.services.database.HibernateConfiguration import net.corda.node.services.identity.InMemoryIdentityService import net.corda.node.services.persistence.DBCheckpointStorage -import net.corda.node.services.schema.NodeSchemaService import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl import net.corda.node.services.statemachine.StateMachineManager import net.corda.node.services.vault.NodeVaultService @@ -22,10 +20,8 @@ import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.CordaPersistence import net.corda.node.utilities.configureDatabase import net.corda.testing.* -import net.corda.testing.node.InMemoryMessagingNetwork -import net.corda.testing.node.MockKeyManagementService import net.corda.testing.node.* -import net.corda.testing.node.TestClock +import net.corda.testing.node.InMemoryMessagingNetwork import org.assertj.core.api.Assertions.assertThat import org.bouncycastle.asn1.x500.X500Name import org.junit.After diff --git a/node/src/test/kotlin/net/corda/node/utilities/ClockUtilsTest.kt b/node/src/test/kotlin/net/corda/node/utilities/ClockUtilsTest.kt index 23723f70f2..783564a664 100644 --- a/node/src/test/kotlin/net/corda/node/utilities/ClockUtilsTest.kt +++ b/node/src/test/kotlin/net/corda/node/utilities/ClockUtilsTest.kt @@ -8,6 +8,7 @@ import com.google.common.util.concurrent.SettableFuture import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.hours import net.corda.core.utilities.minutes +import net.corda.node.services.events.NodeSchedulerService import net.corda.testing.node.TestClock import org.junit.After import org.junit.Before @@ -41,24 +42,24 @@ class ClockUtilsTest { @Test fun `test waiting no time for a deadline`() { - assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant()), "Should have reached deadline") + assertFalse(NodeSchedulerService.awaitWithDeadline(stoppedClock, stoppedClock.instant()), "Should have reached deadline") } @Test fun `test waiting negative time for a deadline`() { - assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(1.hours)), "Should have reached deadline") + assertFalse(NodeSchedulerService.awaitWithDeadline(stoppedClock, stoppedClock.instant().minus(1.hours)), "Should have reached deadline") } @Test fun `test waiting no time for a deadline with incomplete future`() { val future = SettableFuture.create() - assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant(), future), "Should have reached deadline") + assertFalse(NodeSchedulerService.awaitWithDeadline(stoppedClock, stoppedClock.instant(), future), "Should have reached deadline") } @Test fun `test waiting negative time for a deadline with incomplete future`() { val future = SettableFuture.create() - assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(1.hours), future), "Should have reached deadline") + assertFalse(NodeSchedulerService.awaitWithDeadline(stoppedClock, stoppedClock.instant().minus(1.hours), future), "Should have reached deadline") } @@ -67,7 +68,7 @@ class ClockUtilsTest { val advancedClock = Clock.offset(stoppedClock, 1.hours) val future = SettableFuture.create() completeNow(future) - assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline") + assertTrue(NodeSchedulerService.awaitWithDeadline(stoppedClock, advancedClock.instant(), future), "Should not have reached deadline") } @Test @@ -75,7 +76,7 @@ class ClockUtilsTest { val advancedClock = Clock.offset(stoppedClock, 1.hours) val future = SettableFuture.create() completeAfterWaiting(future) - assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline") + assertTrue(NodeSchedulerService.awaitWithDeadline(stoppedClock, advancedClock.instant(), future), "Should not have reached deadline") } @Test @@ -83,7 +84,7 @@ class ClockUtilsTest { val advancedClock = Clock.offset(stoppedClock, 1.hours) val testClock = TestClock(stoppedClock) advanceClockAfterWait(testClock, 1.hours) - assertFalse(testClock.awaitWithDeadline(advancedClock.instant()), "Should have reached deadline") + assertFalse(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant()), "Should have reached deadline") } @Test @@ -92,7 +93,7 @@ class ClockUtilsTest { val testClock = TestClock(stoppedClock) val future = SettableFuture.create() advanceClockAfterWait(testClock, 1.hours) - assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline") + assertFalse(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future), "Should have reached deadline") } @Test @@ -102,7 +103,7 @@ class ClockUtilsTest { val future = SettableFuture.create() advanceClockAfterWait(testClock, 1.hours) completeAfterWaiting(future) - assertTrue(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline") + assertTrue(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future), "Should not have reached deadline") } @Test @@ -113,7 +114,7 @@ class ClockUtilsTest { for (advance in 1..6) { advanceClockAfterWait(testClock, 10.minutes) } - assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline") + assertFalse(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future), "Should have reached deadline") } @Test @@ -131,7 +132,7 @@ class ClockUtilsTest { val advancedClock = Clock.offset(stoppedClock, 10.hours) try { - testClock.awaitWithDeadline(advancedClock.instant(), SettableFuture.create()) + NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), SettableFuture.create()) fail("Expected InterruptedException") } catch (exception: InterruptedException) { } @@ -145,7 +146,7 @@ class ClockUtilsTest { val future = CompletableFuture() val scheduler = FiberExecutorScheduler("test", executor) val fiber = scheduler.newFiber(@Suspendable { - future.complete(testClock.awaitWithDeadline(advancedClock.instant(), future)) + future.complete(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future)) }).start() for (advance in 1..6) { scheduler.newFiber(@Suspendable { @@ -167,7 +168,7 @@ class ClockUtilsTest { val future = SettableFuture.create() val scheduler = FiberExecutorScheduler("test", executor) val fiber = scheduler.newFiber(@Suspendable { - future.set(testClock.awaitWithDeadline(advancedClock.instant(), future)) + future.set(NodeSchedulerService.awaitWithDeadline(testClock, advancedClock.instant(), future)) }).start() for (advance in 1..6) { scheduler.newFiber(@Suspendable { diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/TestClock.kt b/test-utils/src/main/kotlin/net/corda/testing/node/TestClock.kt index d0303085d8..690f765c3d 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/TestClock.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/TestClock.kt @@ -4,8 +4,7 @@ import net.corda.core.internal.until import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsTokenContext import net.corda.core.serialization.SingletonSerializationToken.Companion.singletonSerializationToken -import net.corda.core.internal.until -import net.corda.node.utilities.MutableClock +import net.corda.node.internal.MutableClock import java.time.Clock import java.time.Duration import java.time.Instant