mirror of
https://github.com/corda/corda.git
synced 2025-06-01 15:10:54 +00:00
Re-apply changes
This commit is contained in:
parent
1e9f97f890
commit
f8e610aa71
@ -146,6 +146,14 @@ class ThreadBox<T>(content: T, val lock: Lock = ReentrantLock()) {
|
|||||||
inline fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
|
inline fun <R> locked(body: T.() -> R): R = lock.withLock { body(content) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This represents a transient exception or condition that might no longer be thrown if the operation is re-run or called
|
||||||
|
* again.
|
||||||
|
*
|
||||||
|
* We avoid the use of the word transient here to hopefully reduce confusion with the term in relation to (Java) serialization.
|
||||||
|
*/
|
||||||
|
abstract class RetryableException(message: String) : Exception(message)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A simple wrapper that enables the use of Kotlin's "val x by TransientProperty { ... }" syntax. Such a property
|
* A simple wrapper that enables the use of Kotlin's "val x by TransientProperty { ... }" syntax. Such a property
|
||||||
* will not be serialized to disk, and if it's missing (or the first time it's accessed), the initializer will be
|
* will not be serialized to disk, and if it's missing (or the first time it's accessed), the initializer will be
|
||||||
|
158
node/src/main/kotlin/com/r3corda/core/utilities/ClockUtils.kt
Normal file
158
node/src/main/kotlin/com/r3corda/core/utilities/ClockUtils.kt
Normal file
@ -0,0 +1,158 @@
|
|||||||
|
package com.r3corda.core.utilities
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import co.paralleluniverse.strands.AbstractFuture
|
||||||
|
import co.paralleluniverse.strands.SettableFuture
|
||||||
|
import co.paralleluniverse.strands.Strand
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
|
import com.r3corda.core.then
|
||||||
|
import rx.Observable
|
||||||
|
import rx.Subscriber
|
||||||
|
import rx.Subscription
|
||||||
|
import rx.subscriptions.Subscriptions
|
||||||
|
import java.time.Clock
|
||||||
|
import java.time.Duration
|
||||||
|
import java.time.Instant
|
||||||
|
import java.util.concurrent.*
|
||||||
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
import java.util.function.BiConsumer
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The classes and methods in this file allow the use of custom Clocks in demos, simulations and testing
|
||||||
|
* that might not follow "real time" or "wall clock time". i.e. they allow time to be fast forwarded.
|
||||||
|
*
|
||||||
|
* We should try to make the Clock used in our code injectable (for tests etc) and to use the extensions below
|
||||||
|
* to wait in our code, rather than <code>Thread.sleep()</code> or <code>Object.wait()</code> etc.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A marker interface and helper methods for a type of Clock that might have it's concept of "now"
|
||||||
|
* adjusted externally.
|
||||||
|
*
|
||||||
|
* e.g. for testing (so unit tests do not have to wait for timeouts in realtime) or for demos and simulations.
|
||||||
|
*/
|
||||||
|
abstract class MutableClock : Clock() {
|
||||||
|
|
||||||
|
private val _version = AtomicLong(0L)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This tracks how many direct mutations of "now" have occured for this [Clock], but not the passage of time.
|
||||||
|
*
|
||||||
|
* It starts at zero, and increments by one per mutation.
|
||||||
|
*/
|
||||||
|
val mutationCount: Long
|
||||||
|
get() = _version.get()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations.
|
||||||
|
*/
|
||||||
|
val mutations: Observable<Long> by lazy {
|
||||||
|
Observable.create({ subscriber: Subscriber<in Long> ->
|
||||||
|
if (!subscriber.isUnsubscribed()) {
|
||||||
|
mutationObservers.add(subscriber)
|
||||||
|
// This is not very intuitive, but subscribing to a subscriber observes unsubscribes.
|
||||||
|
subscriber.add(Subscriptions.create { mutationObservers.remove(subscriber) })
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
private val mutationObservers = CopyOnWriteArraySet<Subscriber<in Long>>()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Must be called by subclasses when they mutate (but not just with the passage of time as per the "wall clock").
|
||||||
|
*/
|
||||||
|
protected fun notifyMutationObservers() {
|
||||||
|
val version = _version.incrementAndGet()
|
||||||
|
for (observer in mutationObservers) {
|
||||||
|
if (!observer.isUnsubscribed) {
|
||||||
|
observer.onNext(version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal method to do something that can be interrupted by a [MutableClock] in the case
|
||||||
|
* of it being mutated. Just returns if [MutableClock] mutates, so needs to be
|
||||||
|
* called in a loop.
|
||||||
|
*
|
||||||
|
* @throws InterruptedException if interrupted by something other than a [MutableClock]
|
||||||
|
*/
|
||||||
|
@Suspendable
|
||||||
|
private fun Clock.doInterruptibly(runnable: () -> Unit) {
|
||||||
|
var version = 0L
|
||||||
|
var subscription: Subscription? = null
|
||||||
|
try {
|
||||||
|
if (this is MutableClock) {
|
||||||
|
version = this.mutationCount
|
||||||
|
val strand = Strand.currentStrand()
|
||||||
|
subscription = this.mutations.subscribe { strand.interrupt() }
|
||||||
|
}
|
||||||
|
runnable()
|
||||||
|
} catch(e: InterruptedException) {
|
||||||
|
// If clock has not mutated, then re-throw
|
||||||
|
val newVersion = if (this is MutableClock) this.mutationCount else version
|
||||||
|
if (newVersion == version) {
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (this is MutableClock) {
|
||||||
|
subscription!!.unsubscribe()
|
||||||
|
}
|
||||||
|
Strand.interrupted()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations
|
||||||
|
* used in demos or testing. This will also substitute a Fiber compatible Future if required
|
||||||
|
*
|
||||||
|
* @return true if the [Future] is complete, false if the deadline was reached
|
||||||
|
*/
|
||||||
|
@Suspendable
|
||||||
|
fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = SettableFuture<Any>()): Boolean {
|
||||||
|
// convert the future to a strand friendly variety if possible so as not to accidentally block the underlying thread
|
||||||
|
val fiberFriendlyFuture = makeFutureCurrentStrandFriendly(future)
|
||||||
|
|
||||||
|
var nanos = 0L
|
||||||
|
do {
|
||||||
|
doInterruptibly @Suspendable {
|
||||||
|
nanos = Duration.between(this.instant(), deadline).toNanos()
|
||||||
|
if (nanos > 0) {
|
||||||
|
try {
|
||||||
|
fiberFriendlyFuture.get(nanos, TimeUnit.NANOSECONDS)
|
||||||
|
} catch(e: ExecutionException) {
|
||||||
|
// No need to take action as will fall out of the loop due to future.isDone
|
||||||
|
} catch(e: CancellationException) {
|
||||||
|
// No need to take action as will fall out of the loop due to future.isDone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (!future.isDone && nanos > 0)
|
||||||
|
return future.isDone
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation if currently
|
||||||
|
* on a Fiber and not already using Quasar futures.
|
||||||
|
*
|
||||||
|
* We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context
|
||||||
|
* switch. There's no need to checkpoint our Fibers as there's no external effect of waiting.
|
||||||
|
*/
|
||||||
|
private fun <T : Any> makeFutureCurrentStrandFriendly(future: Future<T>): Future<out T> {
|
||||||
|
return if (Strand.isCurrentFiber() && future !is AbstractFuture) {
|
||||||
|
if (future is ListenableFuture) {
|
||||||
|
val settable = SettableFuture<T>()
|
||||||
|
future.then @Suspendable { settable.set(null) }
|
||||||
|
settable
|
||||||
|
} else if (future is CompletableFuture) {
|
||||||
|
val settable = SettableFuture<T>()
|
||||||
|
future.whenComplete(BiConsumer @Suspendable { value, throwable -> settable.set(null) })
|
||||||
|
settable
|
||||||
|
} else {
|
||||||
|
throw IllegalArgumentException("Cannot make future $future Fiber friendly whilst on a Fiber")
|
||||||
|
}
|
||||||
|
} else future
|
||||||
|
}
|
||||||
|
|
||||||
|
|
66
node/src/main/kotlin/com/r3corda/core/utilities/FiberBox.kt
Normal file
66
node/src/main/kotlin/com/r3corda/core/utilities/FiberBox.kt
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
package com.r3corda.core.utilities
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import co.paralleluniverse.strands.concurrent.ReentrantLock
|
||||||
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
|
import com.r3corda.core.RetryableException
|
||||||
|
import java.time.Clock
|
||||||
|
import java.time.Instant
|
||||||
|
import java.util.concurrent.Future
|
||||||
|
import java.util.concurrent.locks.Lock
|
||||||
|
import kotlin.concurrent.withLock
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Modelled on [ThreadBox], but with support for waiting that is compatible with Quasar [Fiber]s and [MutableClock]s
|
||||||
|
*
|
||||||
|
* It supports 3 main operations, all of which operate in a similar context to the [locked] method
|
||||||
|
* of [ThreadBox]. i.e. in the context of the content.
|
||||||
|
* * [read] operations which acquire the associated lock but do not notifty any waiters (see [readWithDeadline])
|
||||||
|
* and is a direct equivalent of [ThreadBox.locked].
|
||||||
|
* * [write] operations which are the same as [read] operations but additionally notify any waiters that the content may have changed.
|
||||||
|
* * [readWithDeadline] operations acquire the lock and are evaluated repeatedly until they no longer throw any subclass
|
||||||
|
* of [RetryableException]. Between iterations it will wait until woken by a [write] or the deadline is reached. It will eventually
|
||||||
|
* re-throw a [RetryableException] if the deadline passes without any successful iterations.
|
||||||
|
*
|
||||||
|
* The construct also supports [MutableClock]s so it can cope with artificial progress towards the deadline, for simulations
|
||||||
|
* or testing.
|
||||||
|
*/
|
||||||
|
class FiberBox<T>(content: T, val lock: Lock = ReentrantLock()) {
|
||||||
|
val content = content
|
||||||
|
private var mutated: SettableFuture<Boolean>? = null
|
||||||
|
@Suspendable
|
||||||
|
fun <R> readWithDeadline(clock: Clock, deadline: Instant, body: T.() -> R): R {
|
||||||
|
var ex: Exception
|
||||||
|
var ourMutated: Future<Boolean>? = null
|
||||||
|
do {
|
||||||
|
lock.lock()
|
||||||
|
try {
|
||||||
|
if (mutated == null || mutated!!.isDone) {
|
||||||
|
mutated = SettableFuture.create()
|
||||||
|
}
|
||||||
|
ourMutated = mutated
|
||||||
|
return body(content)
|
||||||
|
} catch(e: RetryableException) {
|
||||||
|
ex = e
|
||||||
|
} finally {
|
||||||
|
lock.unlock()
|
||||||
|
}
|
||||||
|
} while (clock.awaitWithDeadline(deadline, ourMutated!!) && clock.instant().isBefore(deadline))
|
||||||
|
throw ex
|
||||||
|
}
|
||||||
|
|
||||||
|
@Suspendable
|
||||||
|
fun <R> read(body: T.() -> R): R = lock.withLock { body(content) }
|
||||||
|
|
||||||
|
@Suspendable
|
||||||
|
fun <R> write(body: T.() -> R): R {
|
||||||
|
lock.lock()
|
||||||
|
try {
|
||||||
|
return body(content)
|
||||||
|
} finally {
|
||||||
|
mutated?.set(true)
|
||||||
|
lock.unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
38
node/src/test/kotlin/com/r3corda/core/testutils/TestClock.kt
Normal file
38
node/src/test/kotlin/com/r3corda/core/testutils/TestClock.kt
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
package com.r3corda.core.testutils
|
||||||
|
|
||||||
|
import com.r3corda.core.utilities.MutableClock
|
||||||
|
import java.time.Clock
|
||||||
|
import java.time.Duration
|
||||||
|
import java.time.Instant
|
||||||
|
import java.time.ZoneId
|
||||||
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A [Clock] that can have the time advanced for use in testing
|
||||||
|
*/
|
||||||
|
@ThreadSafe
|
||||||
|
class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock() {
|
||||||
|
|
||||||
|
@Synchronized fun advanceBy(duration: Duration): Boolean {
|
||||||
|
if (!duration.isNegative) {
|
||||||
|
// It's ok to increment
|
||||||
|
delegateClock = offset(delegateClock, duration)
|
||||||
|
notifyMutationObservers()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
@Synchronized override fun instant(): Instant {
|
||||||
|
return delegateClock.instant()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Synchronized override fun withZone(zone: ZoneId): Clock {
|
||||||
|
return TestClock(delegateClock.withZone(zone))
|
||||||
|
}
|
||||||
|
|
||||||
|
@Synchronized override fun getZone(): ZoneId {
|
||||||
|
return delegateClock.zone
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,196 @@
|
|||||||
|
package com.r3corda.core.utilities
|
||||||
|
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import co.paralleluniverse.strands.Strand
|
||||||
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
|
import com.r3corda.core.testutils.TestClock
|
||||||
|
import org.junit.After
|
||||||
|
import org.junit.Before
|
||||||
|
import org.junit.Test
|
||||||
|
import java.time.Clock
|
||||||
|
import java.time.Duration
|
||||||
|
import java.util.concurrent.CompletableFuture
|
||||||
|
import java.util.concurrent.ExecutorService
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
import kotlin.test.assertFalse
|
||||||
|
import kotlin.test.assertTrue
|
||||||
|
|
||||||
|
class ClockUtilsTest {
|
||||||
|
|
||||||
|
lateinit var realClock: Clock
|
||||||
|
lateinit var stoppedClock: Clock
|
||||||
|
lateinit var executor: ExecutorService
|
||||||
|
|
||||||
|
@Before
|
||||||
|
fun setup() {
|
||||||
|
realClock = Clock.systemUTC()
|
||||||
|
stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
|
||||||
|
executor = Executors.newSingleThreadExecutor()
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
fun teardown() {
|
||||||
|
executor.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting no time for a deadline`() {
|
||||||
|
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant()), "Should have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting negative time for a deadline`() {
|
||||||
|
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(Duration.ofHours(1))), "Should have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting no time for a deadline with incomplete future`() {
|
||||||
|
val future = SettableFuture.create<Boolean>()
|
||||||
|
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant(), future), "Should have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting negative time for a deadline with incomplete future`() {
|
||||||
|
val future = SettableFuture.create<Boolean>()
|
||||||
|
assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(Duration.ofHours(1)), future), "Should have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting for a deadline with future completed before wait`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val future = SettableFuture.create<Boolean>()
|
||||||
|
completeNow(future)
|
||||||
|
assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting for a deadline with future completed after wait`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val future = SettableFuture.create<Boolean>()
|
||||||
|
completeAfterWaiting(future)
|
||||||
|
assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting for a deadline with clock advance`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
advanceClockAfterWait(testClock, Duration.ofHours(1))
|
||||||
|
assertFalse(testClock.awaitWithDeadline(advancedClock.instant()), "Should have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting for a deadline with clock advance and incomplete future`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
val future = SettableFuture.create<Boolean>()
|
||||||
|
advanceClockAfterWait(testClock, Duration.ofHours(1))
|
||||||
|
assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting for a deadline with clock advance and complete future`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(2))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
val future = SettableFuture.create<Boolean>()
|
||||||
|
advanceClockAfterWait(testClock, Duration.ofHours(1))
|
||||||
|
completeAfterWaiting(future)
|
||||||
|
assertTrue(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `test waiting for a deadline with multiple clock advance and incomplete future`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
val future = SettableFuture.create<Boolean>()
|
||||||
|
for (advance in 1..6) {
|
||||||
|
advanceClockAfterWait(testClock, Duration.ofMinutes(10))
|
||||||
|
}
|
||||||
|
assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
|
||||||
|
* run on core module (in IntelliJ, open gradle side tab and run:
|
||||||
|
* r3prototyping -> core -> Tasks -> other -> quasarScan
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
@Suspendable
|
||||||
|
fun `test waiting for a deadline with multiple clock advance and incomplete JDK8 future on Fibers`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
val future = CompletableFuture<Boolean>()
|
||||||
|
val scheduler = FiberExecutorScheduler("test", executor)
|
||||||
|
val fiber = scheduler.newFiber(@Suspendable {
|
||||||
|
future.complete(testClock.awaitWithDeadline(advancedClock.instant(), future))
|
||||||
|
}).start()
|
||||||
|
for (advance in 1..6) {
|
||||||
|
scheduler.newFiber(@Suspendable {
|
||||||
|
// Wait until fiber is waiting
|
||||||
|
while (fiber.state != Strand.State.TIMED_WAITING) {
|
||||||
|
Strand.sleep(1)
|
||||||
|
}
|
||||||
|
testClock.advanceBy(Duration.ofMinutes(10))
|
||||||
|
}).start()
|
||||||
|
}
|
||||||
|
assertFalse(future.get(), "Should have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
|
||||||
|
* run on core module (in IntelliJ, open gradle side tab and run:
|
||||||
|
* r3prototyping -> core -> Tasks -> other -> quasarScan
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
@Suspendable
|
||||||
|
fun `test waiting for a deadline with multiple clock advance and incomplete Guava future on Fibers`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
val future = SettableFuture.create<Boolean>()
|
||||||
|
val scheduler = FiberExecutorScheduler("test", executor)
|
||||||
|
val fiber = scheduler.newFiber(@Suspendable {
|
||||||
|
future.set(testClock.awaitWithDeadline(advancedClock.instant(), future))
|
||||||
|
}).start()
|
||||||
|
for (advance in 1..6) {
|
||||||
|
scheduler.newFiber(@Suspendable {
|
||||||
|
// Wait until fiber is waiting
|
||||||
|
while (fiber.state != Strand.State.TIMED_WAITING) {
|
||||||
|
Strand.sleep(1)
|
||||||
|
}
|
||||||
|
testClock.advanceBy(Duration.ofMinutes(10))
|
||||||
|
}).start()
|
||||||
|
}
|
||||||
|
assertFalse(future.get(), "Should have reached deadline")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Suspendable
|
||||||
|
private fun advanceClockAfterWait(testClock: TestClock, duration: Duration) {
|
||||||
|
val mainStrand = Strand.currentStrand()
|
||||||
|
executor.execute @Suspendable {
|
||||||
|
// Wait until main thread is waiting
|
||||||
|
while (mainStrand.state != Strand.State.TIMED_WAITING) {
|
||||||
|
Strand.sleep(1)
|
||||||
|
}
|
||||||
|
testClock.advanceBy(duration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Suspendable
|
||||||
|
private fun completeNow(future: SettableFuture<Boolean>) {
|
||||||
|
future.set(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun completeAfterWaiting(future: SettableFuture<Boolean>) {
|
||||||
|
val mainStrand = Strand.currentStrand()
|
||||||
|
executor.execute @Suspendable {
|
||||||
|
// Wait until main thread is waiting
|
||||||
|
while (mainStrand.state != Strand.State.TIMED_WAITING) {
|
||||||
|
Strand.sleep(1)
|
||||||
|
}
|
||||||
|
completeNow(future)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
176
node/src/test/kotlin/com/r3corda/core/utilities/FiberBoxTest.kt
Normal file
176
node/src/test/kotlin/com/r3corda/core/utilities/FiberBoxTest.kt
Normal file
@ -0,0 +1,176 @@
|
|||||||
|
package com.r3corda.core.utilities
|
||||||
|
|
||||||
|
|
||||||
|
import co.paralleluniverse.fibers.FiberExecutorScheduler
|
||||||
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
|
import co.paralleluniverse.strands.Strand
|
||||||
|
import com.r3corda.core.RetryableException
|
||||||
|
import com.r3corda.core.testutils.TestClock
|
||||||
|
import org.junit.After
|
||||||
|
import org.junit.Before
|
||||||
|
import org.junit.Test
|
||||||
|
import java.time.Clock
|
||||||
|
import java.time.Duration
|
||||||
|
import java.util.concurrent.CompletableFuture
|
||||||
|
import java.util.concurrent.ExecutionException
|
||||||
|
import java.util.concurrent.ExecutorService
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
|
class FiberBoxTest {
|
||||||
|
|
||||||
|
class Content {
|
||||||
|
var integer: Int = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
class TestRetryableException(message: String) : RetryableException(message)
|
||||||
|
|
||||||
|
lateinit var mutex: FiberBox<Content>
|
||||||
|
lateinit var realClock: Clock
|
||||||
|
lateinit var stoppedClock: Clock
|
||||||
|
lateinit var executor: ExecutorService
|
||||||
|
|
||||||
|
@Before
|
||||||
|
fun setup() {
|
||||||
|
mutex = FiberBox(Content())
|
||||||
|
realClock = Clock.systemUTC()
|
||||||
|
stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
|
||||||
|
executor = Executors.newSingleThreadExecutor()
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
fun teardown() {
|
||||||
|
executor.shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `write and read`() {
|
||||||
|
mutex.write { integer = 1 }
|
||||||
|
assertEquals(1, mutex.read { integer })
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `readWithDeadline with no wait`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
|
||||||
|
mutex.write { integer = 1 }
|
||||||
|
assertEquals(1, mutex.readWithDeadline(realClock, advancedClock.instant()) { integer })
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `readWithDeadline with stopped clock and background write`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
|
||||||
|
assertEquals(1, mutex.readWithDeadline(stoppedClock, advancedClock.instant()) {
|
||||||
|
backgroundWrite()
|
||||||
|
if (integer == 1) 1 else throw TestRetryableException("Not 1")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = TestRetryableException::class)
|
||||||
|
fun `readWithDeadline with clock advanced`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
|
||||||
|
assertEquals(1, mutex.readWithDeadline(testClock, advancedClock.instant()) {
|
||||||
|
backgroundAdvanceClock(testClock, Duration.ofHours(1))
|
||||||
|
if (integer == 1) 0 else throw TestRetryableException("Not 1")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `readWithDeadline with clock advanced 5x and background write`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
|
||||||
|
assertEquals(5, mutex.readWithDeadline(testClock, advancedClock.instant()) {
|
||||||
|
backgroundAdvanceClock(testClock, Duration.ofMinutes(10))
|
||||||
|
backgroundWrite()
|
||||||
|
if (integer == 5) 5 else throw TestRetryableException("Not 5")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
|
||||||
|
* run on core module (in IntelliJ, open gradle side tab and run:
|
||||||
|
* r3prototyping -> core -> Tasks -> other -> quasarScan
|
||||||
|
*/
|
||||||
|
@Test(expected = TestRetryableException::class)
|
||||||
|
@Suspendable
|
||||||
|
fun `readWithDeadline with clock advanced on Fibers`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
val future = CompletableFuture<Int>()
|
||||||
|
val scheduler = FiberExecutorScheduler("test", executor)
|
||||||
|
val fiber = scheduler.newFiber(@Suspendable {
|
||||||
|
try {
|
||||||
|
future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) {
|
||||||
|
if (integer == 1) 1 else throw TestRetryableException("Not 1")
|
||||||
|
})
|
||||||
|
} catch(e: Exception) {
|
||||||
|
future.completeExceptionally(e)
|
||||||
|
}
|
||||||
|
}).start()
|
||||||
|
for (advance in 1..6) {
|
||||||
|
scheduler.newFiber(@Suspendable {
|
||||||
|
// Wait until fiber is waiting
|
||||||
|
while (fiber.state != Strand.State.TIMED_WAITING) {
|
||||||
|
Strand.sleep(1)
|
||||||
|
}
|
||||||
|
testClock.advanceBy(Duration.ofMinutes(10))
|
||||||
|
}).start()
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
assertEquals(2, future.get())
|
||||||
|
} catch(e: ExecutionException) {
|
||||||
|
throw e.cause!!
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
|
||||||
|
* run on core module (in IntelliJ, open gradle side tab and run:
|
||||||
|
* r3prototyping -> core -> Tasks -> other -> quasarScan
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
@Suspendable
|
||||||
|
fun `readWithDeadline with background write on Fibers`() {
|
||||||
|
val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1))
|
||||||
|
val testClock = TestClock(stoppedClock)
|
||||||
|
val future = CompletableFuture<Int>()
|
||||||
|
val scheduler = FiberExecutorScheduler("test", executor)
|
||||||
|
val fiber = scheduler.newFiber(@Suspendable {
|
||||||
|
try {
|
||||||
|
future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) {
|
||||||
|
if (integer == 1) 1 else throw TestRetryableException("Not 1")
|
||||||
|
})
|
||||||
|
} catch(e: Exception) {
|
||||||
|
future.completeExceptionally(e)
|
||||||
|
}
|
||||||
|
}).start()
|
||||||
|
scheduler.newFiber(@Suspendable {
|
||||||
|
// Wait until fiber is waiting
|
||||||
|
while (fiber.state != Strand.State.TIMED_WAITING) {
|
||||||
|
Strand.sleep(1)
|
||||||
|
}
|
||||||
|
mutex.write { integer = 1 }
|
||||||
|
}).start()
|
||||||
|
try {
|
||||||
|
assertEquals(1, future.get())
|
||||||
|
} catch(e: ExecutionException) {
|
||||||
|
throw e.cause!!
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun backgroundWrite() {
|
||||||
|
executor.execute {
|
||||||
|
mutex.write { integer += 1 }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun backgroundAdvanceClock(clock: TestClock, duration: Duration) {
|
||||||
|
executor.execute {
|
||||||
|
clock.advanceBy(duration)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,5 +1,6 @@
|
|||||||
package com.r3corda.demos
|
package com.r3corda.demos
|
||||||
|
|
||||||
|
import com.r3corda.core.utilities.MutableClock
|
||||||
import java.time.*
|
import java.time.*
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
|
|
||||||
@ -7,13 +8,14 @@ import javax.annotation.concurrent.ThreadSafe
|
|||||||
* A [Clock] that can have the date advanced for use in demos
|
* A [Clock] that can have the date advanced for use in demos
|
||||||
*/
|
*/
|
||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : Clock() {
|
class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock() {
|
||||||
|
|
||||||
@Synchronized fun updateDate(date: LocalDate): Boolean {
|
@Synchronized fun updateDate(date: LocalDate): Boolean {
|
||||||
val currentDate = LocalDate.now(this)
|
val currentDate = LocalDate.now(this)
|
||||||
if (currentDate.isBefore(date)) {
|
if (currentDate.isBefore(date)) {
|
||||||
// It's ok to increment
|
// It's ok to increment
|
||||||
delegateClock = Clock.offset(delegateClock, Duration.between(currentDate.atStartOfDay(), date.atStartOfDay()))
|
delegateClock = Clock.offset(delegateClock, Duration.between(currentDate.atStartOfDay(), date.atStartOfDay()))
|
||||||
|
notifyMutationObservers()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
|
Loading…
x
Reference in New Issue
Block a user