diff --git a/core/src/main/kotlin/com/r3corda/core/Utils.kt b/core/src/main/kotlin/com/r3corda/core/Utils.kt index dfe26c377e..82baf925ca 100644 --- a/core/src/main/kotlin/com/r3corda/core/Utils.kt +++ b/core/src/main/kotlin/com/r3corda/core/Utils.kt @@ -146,6 +146,14 @@ class ThreadBox(content: T, val lock: Lock = ReentrantLock()) { inline fun locked(body: T.() -> R): R = lock.withLock { body(content) } } +/** + * This represents a transient exception or condition that might no longer be thrown if the operation is re-run or called + * again. + * + * We avoid the use of the word transient here to hopefully reduce confusion with the term in relation to (Java) serialization. + */ +abstract class RetryableException(message: String) : Exception(message) + /** * A simple wrapper that enables the use of Kotlin's "val x by TransientProperty { ... }" syntax. Such a property * will not be serialized to disk, and if it's missing (or the first time it's accessed), the initializer will be diff --git a/node/src/main/kotlin/com/r3corda/node/internal/testing/TestClock.kt b/node/src/main/kotlin/com/r3corda/node/internal/testing/TestClock.kt new file mode 100644 index 0000000000..68fcdb04ff --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/internal/testing/TestClock.kt @@ -0,0 +1,38 @@ +package com.r3corda.node.internal.testing + +import com.r3corda.node.utilities.MutableClock +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.time.ZoneId +import javax.annotation.concurrent.ThreadSafe + + +/** + * A [Clock] that can have the time advanced for use in testing + */ +@ThreadSafe +class TestClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock() { + + @Synchronized fun advanceBy(duration: Duration): Boolean { + if (!duration.isNegative) { + // It's ok to increment + delegateClock = offset(delegateClock, duration) + notifyMutationObservers() + return true + } + return false + } + + @Synchronized override fun instant(): Instant { + return delegateClock.instant() + } + + @Synchronized override fun withZone(zone: ZoneId): Clock { + return TestClock(delegateClock.withZone(zone)) + } + + @Synchronized override fun getZone(): ZoneId { + return delegateClock.zone + } +} diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt b/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt new file mode 100644 index 0000000000..42e57d2dae --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/utilities/ClockUtils.kt @@ -0,0 +1,158 @@ +package com.r3corda.node.utilities + +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.AbstractFuture +import co.paralleluniverse.strands.SettableFuture +import co.paralleluniverse.strands.Strand +import com.google.common.util.concurrent.ListenableFuture +import com.r3corda.core.then +import rx.Observable +import rx.Subscriber +import rx.Subscription +import rx.subscriptions.Subscriptions +import java.time.Clock +import java.time.Duration +import java.time.Instant +import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicLong +import java.util.function.BiConsumer + +/** + * The classes and methods in this file allow the use of custom Clocks in demos, simulations and testing + * that might not follow "real time" or "wall clock time". i.e. they allow time to be fast forwarded. + * + * We should try to make the Clock used in our code injectable (for tests etc) and to use the extensions below + * to wait in our code, rather than Thread.sleep() or Object.wait() etc. + */ + +/** + * An abstract class with helper methods for a type of Clock that might have it's concept of "now" + * adjusted externally. + * + * e.g. for testing (so unit tests do not have to wait for timeouts in realtime) or for demos and simulations. + */ +abstract class MutableClock : Clock() { + + private val _version = AtomicLong(0L) + + /** + * This tracks how many direct mutations of "now" have occured for this [Clock], but not the passage of time. + * + * It starts at zero, and increments by one per mutation. + */ + val mutationCount: Long + get() = _version.get() + + /** + * This is an observer on the mutation count of this [Clock], which reflects the occurence of mutations. + */ + val mutations: Observable by lazy { + Observable.create({ subscriber: Subscriber -> + if (!subscriber.isUnsubscribed()) { + mutationObservers.add(subscriber) + // This is not very intuitive, but subscribing to a subscriber observes unsubscribes. + subscriber.add(Subscriptions.create { mutationObservers.remove(subscriber) }) + } + }) + } + + private val mutationObservers = CopyOnWriteArraySet>() + + /** + * Must be called by subclasses when they mutate (but not just with the passage of time as per the "wall clock"). + */ + protected fun notifyMutationObservers() { + val version = _version.incrementAndGet() + for (observer in mutationObservers) { + if (!observer.isUnsubscribed) { + observer.onNext(version) + } + } + } +} + +/** + * Internal method to do something that can be interrupted by a [MutableClock] in the case + * of it being mutated. Just returns if [MutableClock] mutates, so needs to be + * called in a loop. + * + * @throws InterruptedException if interrupted by something other than a [MutableClock] + */ +@Suspendable +private fun Clock.doInterruptibly(runnable: () -> Unit) { + var version = 0L + var subscription: Subscription? = null + try { + if (this is MutableClock) { + version = this.mutationCount + val strand = Strand.currentStrand() + subscription = this.mutations.subscribe { strand.interrupt() } + } + runnable() + } catch(e: InterruptedException) { + // If clock has not mutated, then re-throw + val newVersion = if (this is MutableClock) this.mutationCount else version + if (newVersion == version) { + throw e + } + } finally { + if (this is MutableClock) { + subscription!!.unsubscribe() + } + Strand.interrupted() + } +} + +/** + * Wait until the given [Future] is complete or the deadline is reached, with support for [MutableClock] implementations + * used in demos or testing. This will also substitute a Fiber compatible Future if required + * + * @return true if the [Future] is complete, false if the deadline was reached + */ +@Suspendable +fun Clock.awaitWithDeadline(deadline: Instant, future: Future<*> = SettableFuture()): Boolean { + // convert the future to a strand friendly variety if possible so as not to accidentally block the underlying thread + val fiberFriendlyFuture = makeFutureCurrentStrandFriendly(future) + + var nanos = 0L + do { + doInterruptibly @Suspendable { + nanos = Duration.between(this.instant(), deadline).toNanos() + if (nanos > 0) { + try { + fiberFriendlyFuture.get(nanos, TimeUnit.NANOSECONDS) + } catch(e: ExecutionException) { + // No need to take action as will fall out of the loop due to future.isDone + } catch(e: CancellationException) { + // No need to take action as will fall out of the loop due to future.isDone + } + } + } + } while (!future.isDone && nanos > 0) + return future.isDone +} + +/** + * Convert a Guava [ListenableFuture] or JDK8 [CompletableFuture] to Quasar implementation if currently + * on a Fiber and not already using Quasar futures. + * + * We need this so that we do not block the actual thread when calling get(), but instead allow a Quasar context + * switch. There's no need to checkpoint our Fibers as there's no external effect of waiting. + */ +private fun makeFutureCurrentStrandFriendly(future: Future): Future { + return if (Strand.isCurrentFiber() && future !is AbstractFuture) { + if (future is ListenableFuture) { + val settable = SettableFuture() + future.then @Suspendable { settable.set(null) } + settable + } else if (future is CompletableFuture) { + val settable = SettableFuture() + future.whenComplete(BiConsumer @Suspendable { value, throwable -> settable.set(null) }) + settable + } else { + throw IllegalArgumentException("Cannot make future $future Fiber friendly whilst on a Fiber") + } + } else future +} + + diff --git a/node/src/main/kotlin/com/r3corda/node/utilities/FiberBox.kt b/node/src/main/kotlin/com/r3corda/node/utilities/FiberBox.kt new file mode 100644 index 0000000000..4b712a8fb5 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/utilities/FiberBox.kt @@ -0,0 +1,75 @@ +package com.r3corda.node.utilities + +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.concurrent.ReentrantLock +import com.google.common.util.concurrent.SettableFuture +import com.r3corda.core.RetryableException +import java.time.Clock +import java.time.Instant +import java.util.concurrent.Future +import java.util.concurrent.locks.Lock +import kotlin.concurrent.withLock + + +/** + * Modelled on [ThreadBox], but with support for waiting that is compatible with Quasar [Fiber]s and [MutableClock]s + * + * It supports 3 main operations, all of which operate in a similar context to the [locked] method + * of [ThreadBox]. i.e. in the context of the content. + * * [read] operations which acquire the associated lock but do not notify any waiters (see [readWithDeadline]) + * and is a direct equivalent of [ThreadBox.locked]. + * * [write] operations which are the same as [read] operations but additionally notify any waiters that the content may have changed. + * * [readWithDeadline] operations acquire the lock and are evaluated repeatedly until they no longer throw any subclass + * of [RetryableException]. Between iterations it will wait until woken by a [write] or the deadline is reached. It will eventually + * re-throw a [RetryableException] if the deadline passes without any successful iterations. + * + * The construct also supports [MutableClock]s so it can cope with artificial progress towards the deadline, for simulations + * or testing. + * + * Currently this is intended for use within a node as a simplified way for Oracles to implement subscriptions for changing + * data by running a protocol internally to implement the request handler (see [NodeInterestRates.Oracle]), which can then + * effectively relinquish control until the data becomes available. This isn't the most scalable design and is intended + * to be temporary. In addition, it's enitrely possible to envisage a time when we want public [ProtocolLogic] + * implementations to be able to wait for some condition to become true outside of message send/receive. At that point + * we may revisit this implementation and indeed the whole model for this, when we understand that requirement more fully. + * + * TODO: We should consider using a [Semaphore] or [CountDownLatch] here to make it a little easier to understand, but it seems + * as though the current version of Qasar does not support suspending on either of their implementations. + */ +class FiberBox(private val content: T, private val lock: Lock = ReentrantLock()) { + private var mutated: SettableFuture? = null + @Suspendable + fun readWithDeadline(clock: Clock, deadline: Instant, body: T.() -> R): R { + var ex: Exception + var ourMutated: Future? = null + do { + lock.lock() + try { + if (mutated == null || mutated!!.isDone) { + mutated = SettableFuture.create() + } + ourMutated = mutated + return body(content) + } catch(e: RetryableException) { + ex = e + } finally { + lock.unlock() + } + } while (clock.awaitWithDeadline(deadline, ourMutated!!) && clock.instant().isBefore(deadline)) + throw ex + } + + @Suspendable + fun read(body: T.() -> R): R = lock.withLock { body(content) } + + @Suspendable + fun write(body: T.() -> R): R { + lock.lock() + try { + return body(content) + } finally { + mutated?.set(true) + lock.unlock() + } + } +} \ No newline at end of file diff --git a/node/src/test/kotlin/com/r3corda/node/utilities/ClockUtilsTest.kt b/node/src/test/kotlin/com/r3corda/node/utilities/ClockUtilsTest.kt new file mode 100644 index 0000000000..faf7b1a7ee --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/utilities/ClockUtilsTest.kt @@ -0,0 +1,196 @@ +package com.r3corda.node.utilities + + +import co.paralleluniverse.fibers.FiberExecutorScheduler +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.Strand +import com.google.common.util.concurrent.SettableFuture +import com.r3corda.node.internal.testing.TestClock +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.time.Clock +import java.time.Duration +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import kotlin.test.assertFalse +import kotlin.test.assertTrue + +class ClockUtilsTest { + + lateinit var realClock: Clock + lateinit var stoppedClock: Clock + lateinit var executor: ExecutorService + + @Before + fun setup() { + realClock = Clock.systemUTC() + stoppedClock = Clock.fixed(realClock.instant(), realClock.zone) + executor = Executors.newSingleThreadExecutor() + } + + @After + fun teardown() { + executor.shutdown() + } + + @Test + fun `test waiting no time for a deadline`() { + assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant()), "Should have reached deadline") + } + + @Test + fun `test waiting negative time for a deadline`() { + assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(Duration.ofHours(1))), "Should have reached deadline") + } + + @Test + fun `test waiting no time for a deadline with incomplete future`() { + val future = SettableFuture.create() + assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant(), future), "Should have reached deadline") + } + + @Test + fun `test waiting negative time for a deadline with incomplete future`() { + val future = SettableFuture.create() + assertFalse(stoppedClock.awaitWithDeadline(stoppedClock.instant().minus(Duration.ofHours(1)), future), "Should have reached deadline") + } + + + @Test + fun `test waiting for a deadline with future completed before wait`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val future = SettableFuture.create() + completeNow(future) + assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline") + } + + @Test + fun `test waiting for a deadline with future completed after wait`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val future = SettableFuture.create() + completeAfterWaiting(future) + assertTrue(stoppedClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline") + } + + @Test + fun `test waiting for a deadline with clock advance`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val testClock = TestClock(stoppedClock) + advanceClockAfterWait(testClock, Duration.ofHours(1)) + assertFalse(testClock.awaitWithDeadline(advancedClock.instant()), "Should have reached deadline") + } + + @Test + fun `test waiting for a deadline with clock advance and incomplete future`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val testClock = TestClock(stoppedClock) + val future = SettableFuture.create() + advanceClockAfterWait(testClock, Duration.ofHours(1)) + assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline") + } + + @Test + fun `test waiting for a deadline with clock advance and complete future`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(2)) + val testClock = TestClock(stoppedClock) + val future = SettableFuture.create() + advanceClockAfterWait(testClock, Duration.ofHours(1)) + completeAfterWaiting(future) + assertTrue(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should not have reached deadline") + } + + @Test + fun `test waiting for a deadline with multiple clock advance and incomplete future`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val testClock = TestClock(stoppedClock) + val future = SettableFuture.create() + for (advance in 1..6) { + advanceClockAfterWait(testClock, Duration.ofMinutes(10)) + } + assertFalse(testClock.awaitWithDeadline(advancedClock.instant(), future), "Should have reached deadline") + } + + /** + * If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been + * run on core module (in IntelliJ, open gradle side tab and run: + * r3prototyping -> core -> Tasks -> other -> quasarScan + */ + @Test + @Suspendable + fun `test waiting for a deadline with multiple clock advance and incomplete JDK8 future on Fibers`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val testClock = TestClock(stoppedClock) + val future = CompletableFuture() + val scheduler = FiberExecutorScheduler("test", executor) + val fiber = scheduler.newFiber(@Suspendable { + future.complete(testClock.awaitWithDeadline(advancedClock.instant(), future)) + }).start() + for (advance in 1..6) { + scheduler.newFiber(@Suspendable { + // Wait until fiber is waiting + while (fiber.state != Strand.State.TIMED_WAITING) { + Strand.sleep(1) + } + testClock.advanceBy(Duration.ofMinutes(10)) + }).start() + } + assertFalse(future.get(), "Should have reached deadline") + } + + /** + * If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been + * run on core module (in IntelliJ, open gradle side tab and run: + * r3prototyping -> core -> Tasks -> other -> quasarScan + */ + @Test + @Suspendable + fun `test waiting for a deadline with multiple clock advance and incomplete Guava future on Fibers`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val testClock = TestClock(stoppedClock) + val future = SettableFuture.create() + val scheduler = FiberExecutorScheduler("test", executor) + val fiber = scheduler.newFiber(@Suspendable { + future.set(testClock.awaitWithDeadline(advancedClock.instant(), future)) + }).start() + for (advance in 1..6) { + scheduler.newFiber(@Suspendable { + // Wait until fiber is waiting + while (fiber.state != Strand.State.TIMED_WAITING) { + Strand.sleep(1) + } + testClock.advanceBy(Duration.ofMinutes(10)) + }).start() + } + assertFalse(future.get(), "Should have reached deadline") + } + + @Suspendable + private fun advanceClockAfterWait(testClock: TestClock, duration: Duration) { + val mainStrand = Strand.currentStrand() + executor.execute @Suspendable { + // Wait until main thread is waiting + while (mainStrand.state != Strand.State.TIMED_WAITING) { + Strand.sleep(1) + } + testClock.advanceBy(duration) + } + } + + @Suspendable + private fun completeNow(future: SettableFuture) { + future.set(true) + } + + private fun completeAfterWaiting(future: SettableFuture) { + val mainStrand = Strand.currentStrand() + executor.execute @Suspendable { + // Wait until main thread is waiting + while (mainStrand.state != Strand.State.TIMED_WAITING) { + Strand.sleep(1) + } + completeNow(future) + } + } +} diff --git a/node/src/test/kotlin/com/r3corda/node/utilities/FiberBoxTest.kt b/node/src/test/kotlin/com/r3corda/node/utilities/FiberBoxTest.kt new file mode 100644 index 0000000000..1702422398 --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/utilities/FiberBoxTest.kt @@ -0,0 +1,176 @@ +package com.r3corda.node.utilities + + +import co.paralleluniverse.fibers.FiberExecutorScheduler +import co.paralleluniverse.fibers.Suspendable +import co.paralleluniverse.strands.Strand +import com.r3corda.core.RetryableException +import com.r3corda.node.internal.testing.TestClock +import org.junit.After +import org.junit.Before +import org.junit.Test +import java.time.Clock +import java.time.Duration +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutionException +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import kotlin.test.assertEquals + +class FiberBoxTest { + + class Content { + var integer: Int = 0 + } + + class TestRetryableException(message: String) : RetryableException(message) + + lateinit var mutex: FiberBox + lateinit var realClock: Clock + lateinit var stoppedClock: Clock + lateinit var executor: ExecutorService + + @Before + fun setup() { + mutex = FiberBox(Content()) + realClock = Clock.systemUTC() + stoppedClock = Clock.fixed(realClock.instant(), realClock.zone) + executor = Executors.newSingleThreadExecutor() + } + + @After + fun teardown() { + executor.shutdown() + } + + @Test + fun `write and read`() { + mutex.write { integer = 1 } + assertEquals(1, mutex.read { integer }) + } + + @Test + fun `readWithDeadline with no wait`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + + mutex.write { integer = 1 } + assertEquals(1, mutex.readWithDeadline(realClock, advancedClock.instant()) { integer }) + } + + @Test + fun `readWithDeadline with stopped clock and background write`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + + assertEquals(1, mutex.readWithDeadline(stoppedClock, advancedClock.instant()) { + backgroundWrite() + if (integer == 1) 1 else throw TestRetryableException("Not 1") + }) + } + + @Test(expected = TestRetryableException::class) + fun `readWithDeadline with clock advanced`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val testClock = TestClock(stoppedClock) + + assertEquals(1, mutex.readWithDeadline(testClock, advancedClock.instant()) { + backgroundAdvanceClock(testClock, Duration.ofHours(1)) + if (integer == 1) 0 else throw TestRetryableException("Not 1") + }) + } + + @Test + fun `readWithDeadline with clock advanced 5x and background write`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val testClock = TestClock(stoppedClock) + + assertEquals(5, mutex.readWithDeadline(testClock, advancedClock.instant()) { + backgroundAdvanceClock(testClock, Duration.ofMinutes(10)) + backgroundWrite() + if (integer == 5) 5 else throw TestRetryableException("Not 5") + }) + } + + /** + * If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been + * run on core module (in IntelliJ, open gradle side tab and run: + * r3prototyping -> core -> Tasks -> other -> quasarScan + */ + @Test(expected = TestRetryableException::class) + @Suspendable + fun `readWithDeadline with clock advanced on Fibers`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val testClock = TestClock(stoppedClock) + val future = CompletableFuture() + val scheduler = FiberExecutorScheduler("test", executor) + val fiber = scheduler.newFiber(@Suspendable { + try { + future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) { + if (integer == 1) 1 else throw TestRetryableException("Not 1") + }) + } catch(e: Exception) { + future.completeExceptionally(e) + } + }).start() + for (advance in 1..6) { + scheduler.newFiber(@Suspendable { + // Wait until fiber is waiting + while (fiber.state != Strand.State.TIMED_WAITING) { + Strand.sleep(1) + } + testClock.advanceBy(Duration.ofMinutes(10)) + }).start() + } + try { + assertEquals(2, future.get()) + } catch(e: ExecutionException) { + throw e.cause!! + } + } + + /** + * If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been + * run on core module (in IntelliJ, open gradle side tab and run: + * r3prototyping -> core -> Tasks -> other -> quasarScan + */ + @Test + @Suspendable + fun `readWithDeadline with background write on Fibers`() { + val advancedClock = Clock.offset(stoppedClock, Duration.ofHours(1)) + val testClock = TestClock(stoppedClock) + val future = CompletableFuture() + val scheduler = FiberExecutorScheduler("test", executor) + val fiber = scheduler.newFiber(@Suspendable { + try { + future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) { + if (integer == 1) 1 else throw TestRetryableException("Not 1") + }) + } catch(e: Exception) { + future.completeExceptionally(e) + } + }).start() + scheduler.newFiber(@Suspendable { + // Wait until fiber is waiting + while (fiber.state != Strand.State.TIMED_WAITING) { + Strand.sleep(1) + } + mutex.write { integer = 1 } + }).start() + try { + assertEquals(1, future.get()) + } catch(e: ExecutionException) { + throw e.cause!! + } + } + + private fun backgroundWrite() { + executor.execute { + mutex.write { integer += 1 } + } + } + + private fun backgroundAdvanceClock(clock: TestClock, duration: Duration) { + executor.execute { + clock.advanceBy(duration) + } + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/r3corda/demos/DemoClock.kt b/src/main/kotlin/com/r3corda/demos/DemoClock.kt index ac70d0adc7..3021a4e7aa 100644 --- a/src/main/kotlin/com/r3corda/demos/DemoClock.kt +++ b/src/main/kotlin/com/r3corda/demos/DemoClock.kt @@ -1,5 +1,6 @@ package com.r3corda.demos +import com.r3corda.node.utilities.MutableClock import java.time.* import javax.annotation.concurrent.ThreadSafe @@ -7,13 +8,14 @@ import javax.annotation.concurrent.ThreadSafe * A [Clock] that can have the date advanced for use in demos */ @ThreadSafe -class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : Clock() { +class DemoClock(private var delegateClock: Clock = Clock.systemUTC()) : MutableClock() { @Synchronized fun updateDate(date: LocalDate): Boolean { val currentDate = LocalDate.now(this) if (currentDate.isBefore(date)) { // It's ok to increment delegateClock = Clock.offset(delegateClock, Duration.between(currentDate.atStartOfDay(), date.atStartOfDay())) + notifyMutationObservers() return true } return false