Merged in cor-133-clock-helpers (pull request #112)

MutableClock for testing, simulation and demos plus associated utilities
This commit is contained in:
Rick Parker 2016-05-24 17:58:37 +01:00
commit 570dc38423
7 changed files with 654 additions and 1 deletions

View File

@ -146,6 +146,14 @@ class ThreadBox<T>(content: T, val lock: Lock = ReentrantLock()) {
inline fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
}
/**
* This represents a transient exception or condition that might no longer be thrown if the operation is re-run or called
* again.
*
* We avoid the use of the word transient here to hopefully reduce confusion with the term in relation to (Java) serialization.
*/
abstract class RetryableException(message: String) : Exception(message)
/**
* A simple wrapper that enables the use of Kotlin's "val x by TransientProperty { ... }" syntax. Such a property
* will not be serialized to disk, and if it's missing (or the first time it's accessed), the initializer will be

View File

@ -0,0 +1,38 @@
package com.r3corda.node.internal.testing
import com.r3corda.node.utilities.MutableClock
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.ZoneId
import javax.annotation.concurrent.ThreadSafe
/**
* A [Clock] that can have the time advanced for use in testing
*/
@ThreadSafe
class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock() {
@Synchronized fun advanceBy(duration: Duration): Boolean {
if (!duration.isNegative) {
// It's ok to increment
delegateClock = offset(delegateClock, duration)
notifyMutationObservers()
return true
}
return false
}
@Synchronized override fun instant(): Instant {
return delegateClock.instant()
}
@Synchronized override fun withZone(zone: ZoneId): Clock {
return TestClock(delegateClock.withZone(zone))
}
@Synchronized override fun getZone(): ZoneId {
return delegateClock.zone
}
}

View File

@ -0,0 +1,158 @@
package com.r3corda.node.utilities
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.AbstractFuture
import co.paralleluniverse.strands.SettableFuture
import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture
import com.r3corda.core.then
import rx.Observable
import rx.Subscriber
import rx.Subscription
import rx.subscriptions.Subscriptions
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicLong
import java.util.function.BiConsumer
/**
* The classes and methods in this file allow the use of custom Clocks in demos, simulations and testing
* that might not follow "real time" or "wall clock time". i.e. they allow time to be fast forwarded.
*
* We should try to make the Clock used in our code injectable (for tests etc) and to use the extensions below
* to wait in our code, rather than <code>Thread.sleep()</code> or <code>Object.wait()</code> etc.
*/
/**
* An abstract class with helper methods for a type of Clock that might have it's concept of "now"
* adjusted externally.
*
* e.g. for testing (so unit tests do not have to wait for timeouts in realtime) or for demos and simulations.
*/
abstract class MutableClock : Clock() {
private val _version = AtomicLong(0L)
/**
* This tracks how many direct mutations of "now" have occured for this [Clock], but not the passage of time.
*
* It starts at zero, and increments by one per mutation.
*/
val mutationCount: Long
get() = _version.get()
/**
* This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations.
*/
val mutations: Observable<Long> by lazy {
Observable.create({ subscriber: Subscriber<in Long> ->
if (!subscriber.isUnsubscribed()) {
mutationObservers.add(subscriber)
// This is not very intuitive, but subscribing to a subscriber observes unsubscribes.
subscriber.add(Subscriptions.create { mutationObservers.remove(subscriber) })
}
})
}
private val mutationObservers = CopyOnWriteArraySet<Subscriber<in Long>>()
/**
* Must be called by subclasses when they mutate (but not just with the passage of time as per the "wall clock").
*/
protected fun notifyMutationObservers() {
val version = _version.incrementAndGet()
for (observer in mutationObservers) {
if (!observer.isUnsubscribed) {
observer.onNext(version)
}
}
}
}
/**
* Internal method to do something that can be interrupted by a [MutableClock] in the case
* of it being mutated. Just returns if [MutableClock] mutates, so needs to be
* called in a loop.
*
* @throws InterruptedException if interrupted by something other than a [MutableClock]
*/
@Suspendable
private fun Clock.doInterruptibly(runnable: () -> Unit) {
var version = 0L
var subscription: Subscription? = null
try {
if (this is MutableClock) {
version = this.mutationCount
val strand = Strand.currentStrand()
subscription = this.mutations.subscribe { strand.interrupt() }
}
runnable()
} catch(e: InterruptedException) {
// If clock has not mutated, then re-throw
val newVersion = if (this is MutableClock) this.mutationCount else version
if (newVersion == version) {
throw e
}
} finally {
if (this is MutableClock) {
subscription!!.unsubscribe()
}
Strand.interrupted()
}
}
/**
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
* used in demos or testing. This will also substitute a Fiber compatible Future if required
*
* @return true if the [Future] is complete, false if the deadline was reached
*/
@Suspendable
fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = SettableFuture<Any>()): Boolean {
// convert the future to a strand friendly variety if possible so as not to accidentally block the underlying thread
val fiberFriendlyFuture = makeFutureCurrentStrandFriendly(future)
var nanos = 0L
do {
doInterruptibly @Suspendable {
nanos = Duration.between(this.instant(), deadline).toNanos()
if (nanos > 0) {
try {
fiberFriendlyFuture.get(nanos, TimeUnit.NANOSECONDS)
} catch(e: ExecutionException) {
// No need to take action as will fall out of the loop due to future.isDone
} catch(e: CancellationException) {
// No need to take action as will fall out of the loop due to future.isDone
}
}
}
} while (!future.isDone && nanos > 0)
return future.isDone
}
/**
* Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation if currently
* on a Fiber and not already using Quasar futures.
*
* We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context
* switch. There's no need to checkpoint our Fibers as there's no external effect of waiting.
*/
private fun <T : Any> makeFutureCurrentStrandFriendly(future: Future<T>): Future<out T> {
return if (Strand.isCurrentFiber() && future !is AbstractFuture) {
if (future is ListenableFuture) {
val settable = SettableFuture<T>()
future.then @Suspendable { settable.set(null) }
settable
} else if (future is CompletableFuture) {
val settable = SettableFuture<T>()
future.whenComplete(BiConsumer @Suspendable { value, throwable -> settable.set(null) })
settable
} else {
throw IllegalArgumentException("Cannot make future $future Fiber friendly whilst on a Fiber")
}
} else future
}

View File

@ -0,0 +1,75 @@
package com.r3corda.node.utilities
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.concurrent.ReentrantLock
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.core.RetryableException
import java.time.Clock
import java.time.Instant
import java.util.concurrent.Future
import java.util.concurrent.locks.Lock
import kotlin.concurrent.withLock
/**
* Modelled on [ThreadBox], but with support for waiting that is compatible with Quasar [Fiber]s and [MutableClock]s
*
* It supports 3 main operations, all of which operate in a similar context to the [locked] method
* of [ThreadBox]. i.e. in the context of the content.
* * [read] operations which acquire the associated lock but do not notify any waiters (see [readWithDeadline])
* and is a direct equivalent of [ThreadBox.locked].
* * [write] operations which are the same as [read] operations but additionally notify any waiters that the content may have changed.
* * [readWithDeadline] operations acquire the lock and are evaluated repeatedly until they no longer throw any subclass
* of [RetryableException]. Between iterations it will wait until woken by a [write] or the deadline is reached. It will eventually
* re-throw a [RetryableException] if the deadline passes without any successful iterations.
*
* The construct also supports [MutableClock]s so it can cope with artificial progress towards the deadline, for simulations
* or testing.
*
* Currently this is intended for use within a node as a simplified way for Oracles to implement subscriptions for changing
* data by running a protocol internally to implement the request handler (see [NodeInterestRates.Oracle]), which can then
* effectively relinquish control until the data becomes available. This isn't the most scalable design and is intended
* to be temporary. In addition, it's enitrely possible to envisage a time when we want public [ProtocolLogic]
* implementations to be able to wait for some condition to become true outside of message send/receive. At that point
* we may revisit this implementation and indeed the whole model for this, when we understand that requirement more fully.
*
* TODO: We should consider using a [Semaphore] or [CountDownLatch] here to make it a little easier to understand, but it seems
* as though the current version of Qasar does not support suspending on either of their implementations.
*/
class FiberBox<T>(private val content: T, private val lock: Lock = ReentrantLock()) {
private var mutated: SettableFuture<Boolean>? = null
@Suspendable
fun <R> readWithDeadline(clock: Clock, deadline: Instant, body: T.() -> R): R {
var ex: Exception
var ourMutated: Future<Boolean>? = null
do {
lock.lock()
try {
if (mutated == null || mutated!!.isDone) {
mutated = SettableFuture.create()
}
ourMutated = mutated
return body(content)
} catch(e: RetryableException) {
ex = e
} finally {
lock.unlock()
}
} while (clock.awaitWithDeadline(deadline, ourMutated!!) && clock.instant().isBefore(deadline))
throw ex
}
@Suspendable
fun <R> read(body: T.() -> R): R = lock.withLock { body(content) }
@Suspendable
fun <R> write(body: T.() -> R): R {
lock.lock()
try {
return body(content)
} finally {
mutated?.set(true)
lock.unlock()
}
}
}

View File

@ -0,0 +1,196 @@
package com.r3corda.node.utilities
import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.node.internal.testing.TestClock
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Clock
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class ClockUtilsTest {
lateinit var realClock: Clock
lateinit var stoppedClock: Clock
lateinit var executor: ExecutorService
@Before
fun setup() {
realClock = Clock.systemUTC()
stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
executor = Executors.newSingleThreadExecutor()
}
@After
fun teardown() {
executor.shutdown()
}
@Test
fun `test waiting no time for a deadline`() {
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant()), "Should have reached deadline")
}
@Test
fun `test waiting negative time for a deadline`() {
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(Duration.ofHours(1))), "Should have reached deadline")
}
@Test
fun `test waiting no time for a deadline with incomplete future`() {
val future = SettableFuture.create<Boolean>()
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant(), future), "Should have reached deadline")
}
@Test
fun `test waiting negative time for a deadline with incomplete future`() {
val future = SettableFuture.create<Boolean>()
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(Duration.ofHours(1)), future), "Should have reached deadline")
}
@Test
fun `test waiting for a deadline with future completed before wait`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val future = SettableFuture.create<Boolean>()
completeNow(future)
assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline")
}
@Test
fun `test waiting for a deadline with future completed after wait`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val future = SettableFuture.create<Boolean>()
completeAfterWaiting(future)
assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline")
}
@Test
fun `test waiting for a deadline with clock advance`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val testClock = TestClock(stoppedClock)
advanceClockAfterWait(testClock, Duration.ofHours(1))
assertFalse(testClock.awaitWithDeadline(advancedClock.instant()), "Should have reached deadline")
}
@Test
fun `test waiting for a deadline with clock advance and incomplete future`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val testClock = TestClock(stoppedClock)
val future = SettableFuture.create<Boolean>()
advanceClockAfterWait(testClock, Duration.ofHours(1))
assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline")
}
@Test
fun `test waiting for a deadline with clock advance and complete future`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(2))
val testClock = TestClock(stoppedClock)
val future = SettableFuture.create<Boolean>()
advanceClockAfterWait(testClock, Duration.ofHours(1))
completeAfterWaiting(future)
assertTrue(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline")
}
@Test
fun `test waiting for a deadline with multiple clock advance and incomplete future`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val testClock = TestClock(stoppedClock)
val future = SettableFuture.create<Boolean>()
for (advance in 1..6) {
advanceClockAfterWait(testClock, Duration.ofMinutes(10))
}
assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline")
}
/**
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
* run on core module (in IntelliJ, open gradle side tab and run:
* r3prototyping -> core -> Tasks -> other -> quasarScan
*/
@Test
@Suspendable
fun `test waiting for a deadline with multiple clock advance and incomplete JDK8 future on Fibers`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val testClock = TestClock(stoppedClock)
val future = CompletableFuture<Boolean>()
val scheduler = FiberExecutorScheduler("test", executor)
val fiber = scheduler.newFiber(@Suspendable {
future.complete(testClock.awaitWithDeadline(advancedClock.instant(), future))
}).start()
for (advance in 1..6) {
scheduler.newFiber(@Suspendable {
// Wait until fiber is waiting
while (fiber.state != Strand.State.TIMED_WAITING) {
Strand.sleep(1)
}
testClock.advanceBy(Duration.ofMinutes(10))
}).start()
}
assertFalse(future.get(), "Should have reached deadline")
}
/**
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
* run on core module (in IntelliJ, open gradle side tab and run:
* r3prototyping -> core -> Tasks -> other -> quasarScan
*/
@Test
@Suspendable
fun `test waiting for a deadline with multiple clock advance and incomplete Guava future on Fibers`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val testClock = TestClock(stoppedClock)
val future = SettableFuture.create<Boolean>()
val scheduler = FiberExecutorScheduler("test", executor)
val fiber = scheduler.newFiber(@Suspendable {
future.set(testClock.awaitWithDeadline(advancedClock.instant(), future))
}).start()
for (advance in 1..6) {
scheduler.newFiber(@Suspendable {
// Wait until fiber is waiting
while (fiber.state != Strand.State.TIMED_WAITING) {
Strand.sleep(1)
}
testClock.advanceBy(Duration.ofMinutes(10))
}).start()
}
assertFalse(future.get(), "Should have reached deadline")
}
@Suspendable
private fun advanceClockAfterWait(testClock: TestClock, duration: Duration) {
val mainStrand = Strand.currentStrand()
executor.execute @Suspendable {
// Wait until main thread is waiting
while (mainStrand.state != Strand.State.TIMED_WAITING) {
Strand.sleep(1)
}
testClock.advanceBy(duration)
}
}
@Suspendable
private fun completeNow(future: SettableFuture<Boolean>) {
future.set(true)
}
private fun completeAfterWaiting(future: SettableFuture<Boolean>) {
val mainStrand = Strand.currentStrand()
executor.execute @Suspendable {
// Wait until main thread is waiting
while (mainStrand.state != Strand.State.TIMED_WAITING) {
Strand.sleep(1)
}
completeNow(future)
}
}
}

View File

@ -0,0 +1,176 @@
package com.r3corda.node.utilities
import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import com.r3corda.core.RetryableException
import com.r3corda.node.internal.testing.TestClock
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Clock
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.test.assertEquals
class FiberBoxTest {
class Content {
var integer: Int = 0
}
class TestRetryableException(message: String) : RetryableException(message)
lateinit var mutex: FiberBox<Content>
lateinit var realClock: Clock
lateinit var stoppedClock: Clock
lateinit var executor: ExecutorService
@Before
fun setup() {
mutex = FiberBox(Content())
realClock = Clock.systemUTC()
stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
executor = Executors.newSingleThreadExecutor()
}
@After
fun teardown() {
executor.shutdown()
}
@Test
fun `write and read`() {
mutex.write { integer = 1 }
assertEquals(1, mutex.read { integer })
}
@Test
fun `readWithDeadline with no wait`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
mutex.write { integer = 1 }
assertEquals(1, mutex.readWithDeadline(realClock, advancedClock.instant()) { integer })
}
@Test
fun `readWithDeadline with stopped clock and background write`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
assertEquals(1, mutex.readWithDeadline(stoppedClock, advancedClock.instant()) {
backgroundWrite()
if (integer == 1) 1 else throw TestRetryableException("Not 1")
})
}
@Test(expected = TestRetryableException::class)
fun `readWithDeadline with clock advanced`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val testClock = TestClock(stoppedClock)
assertEquals(1, mutex.readWithDeadline(testClock, advancedClock.instant()) {
backgroundAdvanceClock(testClock, Duration.ofHours(1))
if (integer == 1) 0 else throw TestRetryableException("Not 1")
})
}
@Test
fun `readWithDeadline with clock advanced 5x and background write`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val testClock = TestClock(stoppedClock)
assertEquals(5, mutex.readWithDeadline(testClock, advancedClock.instant()) {
backgroundAdvanceClock(testClock, Duration.ofMinutes(10))
backgroundWrite()
if (integer == 5) 5 else throw TestRetryableException("Not 5")
})
}
/**
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
* run on core module (in IntelliJ, open gradle side tab and run:
* r3prototyping -> core -> Tasks -> other -> quasarScan
*/
@Test(expected = TestRetryableException::class)
@Suspendable
fun `readWithDeadline with clock advanced on Fibers`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val testClock = TestClock(stoppedClock)
val future = CompletableFuture<Int>()
val scheduler = FiberExecutorScheduler("test", executor)
val fiber = scheduler.newFiber(@Suspendable {
try {
future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) {
if (integer == 1) 1 else throw TestRetryableException("Not 1")
})
} catch(e: Exception) {
future.completeExceptionally(e)
}
}).start()
for (advance in 1..6) {
scheduler.newFiber(@Suspendable {
// Wait until fiber is waiting
while (fiber.state != Strand.State.TIMED_WAITING) {
Strand.sleep(1)
}
testClock.advanceBy(Duration.ofMinutes(10))
}).start()
}
try {
assertEquals(2, future.get())
} catch(e: ExecutionException) {
throw e.cause!!
}
}
/**
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
* run on core module (in IntelliJ, open gradle side tab and run:
* r3prototyping -> core -> Tasks -> other -> quasarScan
*/
@Test
@Suspendable
fun `readWithDeadline with background write on Fibers`() {
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
val testClock = TestClock(stoppedClock)
val future = CompletableFuture<Int>()
val scheduler = FiberExecutorScheduler("test", executor)
val fiber = scheduler.newFiber(@Suspendable {
try {
future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) {
if (integer == 1) 1 else throw TestRetryableException("Not 1")
})
} catch(e: Exception) {
future.completeExceptionally(e)
}
}).start()
scheduler.newFiber(@Suspendable {
// Wait until fiber is waiting
while (fiber.state != Strand.State.TIMED_WAITING) {
Strand.sleep(1)
}
mutex.write { integer = 1 }
}).start()
try {
assertEquals(1, future.get())
} catch(e: ExecutionException) {
throw e.cause!!
}
}
private fun backgroundWrite() {
executor.execute {
mutex.write { integer += 1 }
}
}
private fun backgroundAdvanceClock(clock: TestClock, duration: Duration) {
executor.execute {
clock.advanceBy(duration)
}
}
}

View File

@ -1,5 +1,6 @@
package com.r3corda.demos
import com.r3corda.node.utilities.MutableClock
import java.time.*
import javax.annotation.concurrent.ThreadSafe
@ -7,13 +8,14 @@ import javax.annotation.concurrent.ThreadSafe
* A [Clock] that can have the date advanced for use in demos
*/
@ThreadSafe
class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : Clock() {
class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock() {
@Synchronized fun updateDate(date: LocalDate): Boolean {
val currentDate = LocalDate.now(this)
if (currentDate.isBefore(date)) {
// It's ok to increment
delegateClock = Clock.offset(delegateClock, Duration.between(currentDate.atStartOfDay(), date.atStartOfDay()))
notifyMutationObservers()
return true
}
return false