From 2778e294f3e7f6e8597a76215e6b5e43bb2b8722 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Tue, 18 Jul 2017 13:01:20 +0100 Subject: [PATCH] FiberBox and RetryableException removed as they're not used --- core/src/main/kotlin/net/corda/core/Utils.kt | 13 -- .../net/corda/node/utilities/FiberBox.kt | 76 -------- .../net/corda/node/utilities/FiberBoxTest.kt | 169 ------------------ .../net/corda/irs/api/NodeInterestRates.kt | 4 +- 4 files changed, 2 insertions(+), 260 deletions(-) delete mode 100644 node/src/main/kotlin/net/corda/node/utilities/FiberBox.kt delete mode 100644 node/src/test/kotlin/net/corda/node/utilities/FiberBoxTest.kt diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt index 3be1a00cc3..44b4ebf5a3 100644 --- a/core/src/main/kotlin/net/corda/core/Utils.kt +++ b/core/src/main/kotlin/net/corda/core/Utils.kt @@ -8,11 +8,9 @@ import com.google.common.io.ByteStreams import com.google.common.util.concurrent.* import net.corda.core.crypto.SecureHash import net.corda.core.crypto.sha256 -import net.corda.core.flows.FlowException import net.corda.core.internal.createDirectories import net.corda.core.internal.div import net.corda.core.internal.write -import net.corda.core.serialization.CordaSerializable import org.slf4j.Logger import rx.Observable import rx.Observer @@ -196,17 +194,6 @@ class ThreadBox(val content: T, val lock: ReentrantLock = ReentrantLock() fun checkNotLocked() = check(!lock.isHeldByCurrentThread) } -/** - * This represents a transient exception or condition that might no longer be thrown if the operation is re-run or called - * again. - * - * We avoid the use of the word transient here to hopefully reduce confusion with the term in relation to (Java) serialization. - */ -@CordaSerializable -abstract class RetryableException(message: String) : FlowException(message) - - - /** * Given a path to a zip file, extracts it to the given directory. */ diff --git a/node/src/main/kotlin/net/corda/node/utilities/FiberBox.kt b/node/src/main/kotlin/net/corda/node/utilities/FiberBox.kt deleted file mode 100644 index d930fe206d..0000000000 --- a/node/src/main/kotlin/net/corda/node/utilities/FiberBox.kt +++ /dev/null @@ -1,76 +0,0 @@ -package net.corda.node.utilities - -import co.paralleluniverse.fibers.Suspendable -import co.paralleluniverse.strands.concurrent.ReentrantLock -import com.google.common.util.concurrent.SettableFuture -import net.corda.core.RetryableException -import java.time.Clock -import java.time.Instant -import java.util.concurrent.Future -import java.util.concurrent.locks.Lock -import kotlin.concurrent.withLock - -// TODO: We should consider using a Semaphore or CountDownLatch here to make it a little easier to understand, but it seems as though the current version of Quasar does not support suspending on either of their implementations. - -/** - * Modelled on [net.corda.core.ThreadBox], but with support for waiting that is compatible with Quasar [Fiber]s and [MutableClock]s. - * - * It supports 3 main operations, all of which operate in a similar context to the [locked] method - * of [net.corda.core.ThreadBox]. i.e. in the context of the content. - * * [read] operations which acquire the associated lock but do not notify any waiters (see [readWithDeadline]) - * and is a direct equivalent of [net.corda.core.ThreadBox.locked]. - * * [write] operations which are the same as [read] operations but additionally notify any waiters that the content may have changed. - * * [readWithDeadline] operations acquire the lock and are evaluated repeatedly until they no longer throw any subclass - * of [RetryableException]. Between iterations it will wait until woken by a [write] or the deadline is reached. It will eventually - * re-throw a [RetryableException] if the deadline passes without any successful iterations. - * - * The construct also supports [MutableClock]s so it can cope with artificial progress towards the deadline, for simulations - * or testing. - * - * Currently this is intended for use within a node as a simplified way for Oracles to implement subscriptions for changing - * data by running a flow internally to implement the request handler which can then - * effectively relinquish control until the data becomes available. This isn't the most scalable design and is intended - * to be temporary. In addition, it's enitrely possible to envisage a time when we want public [net.corda.core.flows.FlowLogic] - * implementations to be able to wait for some condition to become true outside of message send/receive. At that point - * we may revisit this implementation and indeed the whole model for this, when we understand that requirement more fully. - */ -// TODO This is no longer used and can be removed -class FiberBox(private val content: T, private val lock: Lock = ReentrantLock()) { - private var mutated: SettableFuture? = null - - @Suppress("UNUSED_VALUE") // This is here due to the compiler thinking ourMutated is not used - @Suspendable - fun readWithDeadline(clock: Clock, deadline: Instant, body: T.() -> R): R { - var ex: Exception - var ourMutated: Future? = null - do { - lock.lock() - try { - if (mutated == null || mutated!!.isDone) { - mutated = SettableFuture.create() - } - ourMutated = mutated - return body(content) - } catch(e: RetryableException) { - ex = e - } finally { - lock.unlock() - } - } while (clock.awaitWithDeadline(deadline, ourMutated!!) && clock.instant().isBefore(deadline)) - throw ex - } - - @Suspendable - fun read(body: T.() -> R): R = lock.withLock { body(content) } - - @Suspendable - fun write(body: T.() -> R): R { - lock.lock() - try { - return body(content) - } finally { - mutated?.set(true) - lock.unlock() - } - } -} diff --git a/node/src/test/kotlin/net/corda/node/utilities/FiberBoxTest.kt b/node/src/test/kotlin/net/corda/node/utilities/FiberBoxTest.kt deleted file mode 100644 index 25fc475071..0000000000 --- a/node/src/test/kotlin/net/corda/node/utilities/FiberBoxTest.kt +++ /dev/null @@ -1,169 +0,0 @@ -package net.corda.node.utilities - -import co.paralleluniverse.fibers.FiberExecutorScheduler -import co.paralleluniverse.fibers.Suspendable -import co.paralleluniverse.strands.Strand -import net.corda.core.RetryableException -import net.corda.core.getOrThrow -import net.corda.core.utilities.hours -import net.corda.core.utilities.minutes -import net.corda.testing.node.TestClock -import org.junit.After -import org.junit.Before -import org.junit.Test -import java.time.Clock -import java.time.Duration -import java.util.concurrent.CompletableFuture -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import kotlin.test.assertEquals - -class FiberBoxTest { - - class Content { - var integer: Int = 0 - } - - class TestRetryableException(message: String) : RetryableException(message) - - lateinit var mutex: FiberBox - lateinit var realClock: Clock - lateinit var stoppedClock: Clock - lateinit var executor: ExecutorService - - @Before - fun setup() { - mutex = FiberBox(Content()) - realClock = Clock.systemUTC() - stoppedClock = Clock.fixed(realClock.instant(), realClock.zone) - executor = Executors.newSingleThreadExecutor() - } - - @After - fun teardown() { - executor.shutdown() - } - - @Test - fun `write and read`() { - mutex.write { integer = 1 } - assertEquals(1, mutex.read { integer }) - } - - @Test - fun `readWithDeadline with no wait`() { - val advancedClock = Clock.offset(stoppedClock, 1.hours) - - mutex.write { integer = 1 } - assertEquals(1, mutex.readWithDeadline(realClock, advancedClock.instant()) { integer }) - } - - @Test - fun `readWithDeadline with stopped clock and background write`() { - val advancedClock = Clock.offset(stoppedClock, 1.hours) - - assertEquals(1, mutex.readWithDeadline(stoppedClock, advancedClock.instant()) { - backgroundWrite() - if (integer == 1) 1 else throw TestRetryableException("Not 1") - }) - } - - @Test(expected = TestRetryableException::class) - fun `readWithDeadline with clock advanced`() { - val advancedClock = Clock.offset(stoppedClock, 1.hours) - val testClock = TestClock(stoppedClock) - - assertEquals(1, mutex.readWithDeadline(testClock, advancedClock.instant()) { - backgroundAdvanceClock(testClock, 1.hours) - if (integer == 1) 0 else throw TestRetryableException("Not 1") - }) - } - - @Test - fun `readWithDeadline with clock advanced 5x and background write`() { - val advancedClock = Clock.offset(stoppedClock, 1.hours) - val testClock = TestClock(stoppedClock) - - assertEquals(5, mutex.readWithDeadline(testClock, advancedClock.instant()) { - backgroundAdvanceClock(testClock, 10.minutes) - backgroundWrite() - if (integer == 5) 5 else throw TestRetryableException("Not 5") - }) - } - - /** - * If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been - * run on core module (in IntelliJ, open gradle side tab and run: - * r3prototyping -> core -> Tasks -> other -> quasarScan - */ - @Test(expected = TestRetryableException::class) - @Suspendable - fun `readWithDeadline with clock advanced on Fibers`() { - val advancedClock = Clock.offset(stoppedClock, 1.hours) - val testClock = TestClock(stoppedClock) - val future = CompletableFuture() - val scheduler = FiberExecutorScheduler("test", executor) - val fiber = scheduler.newFiber(@Suspendable { - try { - future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) { - if (integer == 1) 1 else throw TestRetryableException("Not 1") - }) - } catch(e: Exception) { - future.completeExceptionally(e) - } - }).start() - for (advance in 1..6) { - scheduler.newFiber(@Suspendable { - // Wait until fiber is waiting - while (fiber.state != Strand.State.TIMED_WAITING) { - Strand.sleep(1) - } - testClock.advanceBy(10.minutes) - }).start() - } - assertEquals(2, future.getOrThrow()) - } - - /** - * If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been - * run on core module (in IntelliJ, open gradle side tab and run: - * r3prototyping -> core -> Tasks -> other -> quasarScan - */ - @Test - @Suspendable - fun `readWithDeadline with background write on Fibers`() { - val advancedClock = Clock.offset(stoppedClock, 1.hours) - val testClock = TestClock(stoppedClock) - val future = CompletableFuture() - val scheduler = FiberExecutorScheduler("test", executor) - val fiber = scheduler.newFiber(@Suspendable { - try { - future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) { - if (integer == 1) 1 else throw TestRetryableException("Not 1") - }) - } catch(e: Exception) { - future.completeExceptionally(e) - } - }).start() - scheduler.newFiber(@Suspendable { - // Wait until fiber is waiting - while (fiber.state != Strand.State.TIMED_WAITING) { - Strand.sleep(1) - } - mutex.write { integer = 1 } - }).start() - assertEquals(1, future.getOrThrow()) - } - - private fun backgroundWrite() { - executor.execute { - mutex.write { integer += 1 } - } - } - - private fun backgroundAdvanceClock(clock: TestClock, duration: Duration) { - executor.execute { - clock.advanceBy(duration) - } - } -} diff --git a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt index 33604a7454..01bcebfb17 100644 --- a/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt +++ b/samples/irs-demo/src/main/kotlin/net/corda/irs/api/NodeInterestRates.kt @@ -8,12 +8,12 @@ import net.corda.contracts.Tenor import net.corda.contracts.math.CubicSplineInterpolator import net.corda.contracts.math.Interpolator import net.corda.contracts.math.InterpolatorFactory -import net.corda.core.RetryableException import net.corda.core.ThreadBox import net.corda.core.contracts.Command import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.MerkleTreeException import net.corda.core.crypto.keys +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.flows.InitiatedBy import net.corda.core.flows.StartableByRPC @@ -192,7 +192,7 @@ object NodeInterestRates { } // TODO: can we split into two? Fix not available (retryable/transient) and unknown (permanent) - class UnknownFix(val fix: FixOf) : RetryableException("Unknown fix: $fix") + class UnknownFix(val fix: FixOf) : FlowException("Unknown fix: $fix") // Upload the raw fix data via RPC. In a real system the oracle data would be taken from a database. @StartableByRPC