FiberBox and RetryableException removed as they're not used

This commit is contained in:
Shams Asari 2017-07-18 13:01:20 +01:00
parent b0c299707b
commit 2778e294f3
4 changed files with 2 additions and 260 deletions

View File

@ -8,11 +8,9 @@ import com.google.common.io.ByteStreams
import com.google.common.util.concurrent.*
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.sha256
import net.corda.core.flows.FlowException
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.internal.write
import net.corda.core.serialization.CordaSerializable
import org.slf4j.Logger
import rx.Observable
import rx.Observer
@ -196,17 +194,6 @@ class ThreadBox<out T>(val content: T, val lock: ReentrantLock = ReentrantLock()
fun checkNotLocked() = check(!lock.isHeldByCurrentThread)
}
/**
* This represents a transient exception or condition that might no longer be thrown if the operation is re-run or called
* again.
*
* We avoid the use of the word transient here to hopefully reduce confusion with the term in relation to (Java) serialization.
*/
@CordaSerializable
abstract class RetryableException(message: String) : FlowException(message)
/**
* Given a path to a zip file, extracts it to the given directory.
*/

View File

@ -1,76 +0,0 @@
package net.corda.node.utilities
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.concurrent.ReentrantLock
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.RetryableException
import java.time.Clock
import java.time.Instant
import java.util.concurrent.Future
import java.util.concurrent.locks.Lock
import kotlin.concurrent.withLock
// TODO: We should consider using a Semaphore or CountDownLatch here to make it a little easier to understand, but it seems as though the current version of Quasar does not support suspending on either of their implementations.
/**
* Modelled on [net.corda.core.ThreadBox], but with support for waiting that is compatible with Quasar [Fiber]s and [MutableClock]s.
*
* It supports 3 main operations, all of which operate in a similar context to the [locked] method
* of [net.corda.core.ThreadBox]. i.e. in the context of the content.
* * [read] operations which acquire the associated lock but do not notify any waiters (see [readWithDeadline])
* and is a direct equivalent of [net.corda.core.ThreadBox.locked].
* * [write] operations which are the same as [read] operations but additionally notify any waiters that the content may have changed.
* * [readWithDeadline] operations acquire the lock and are evaluated repeatedly until they no longer throw any subclass
* of [RetryableException]. Between iterations it will wait until woken by a [write] or the deadline is reached. It will eventually
* re-throw a [RetryableException] if the deadline passes without any successful iterations.
*
* The construct also supports [MutableClock]s so it can cope with artificial progress towards the deadline, for simulations
* or testing.
*
* Currently this is intended for use within a node as a simplified way for Oracles to implement subscriptions for changing
* data by running a flow internally to implement the request handler which can then
* effectively relinquish control until the data becomes available. This isn't the most scalable design and is intended
* to be temporary. In addition, it's enitrely possible to envisage a time when we want public [net.corda.core.flows.FlowLogic]
* implementations to be able to wait for some condition to become true outside of message send/receive. At that point
* we may revisit this implementation and indeed the whole model for this, when we understand that requirement more fully.
*/
// TODO This is no longer used and can be removed
class FiberBox<out T>(private val content: T, private val lock: Lock = ReentrantLock()) {
private var mutated: SettableFuture<Boolean>? = null
@Suppress("UNUSED_VALUE") // This is here due to the compiler thinking ourMutated is not used
@Suspendable
fun <R> readWithDeadline(clock: Clock, deadline: Instant, body: T.() -> R): R {
var ex: Exception
var ourMutated: Future<Boolean>? = null
do {
lock.lock()
try {
if (mutated == null || mutated!!.isDone) {
mutated = SettableFuture.create()
}
ourMutated = mutated
return body(content)
} catch(e: RetryableException) {
ex = e
} finally {
lock.unlock()
}
} while (clock.awaitWithDeadline(deadline, ourMutated!!) && clock.instant().isBefore(deadline))
throw ex
}
@Suspendable
fun <R> read(body: T.() -> R): R = lock.withLock { body(content) }
@Suspendable
fun <R> write(body: T.() -> R): R {
lock.lock()
try {
return body(content)
} finally {
mutated?.set(true)
lock.unlock()
}
}
}

View File

@ -1,169 +0,0 @@
package net.corda.node.utilities
import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import net.corda.core.RetryableException
import net.corda.core.getOrThrow
import net.corda.core.utilities.hours
import net.corda.core.utilities.minutes
import net.corda.testing.node.TestClock
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.time.Clock
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import kotlin.test.assertEquals
class FiberBoxTest {
class Content {
var integer: Int = 0
}
class TestRetryableException(message: String) : RetryableException(message)
lateinit var mutex: FiberBox<Content>
lateinit var realClock: Clock
lateinit var stoppedClock: Clock
lateinit var executor: ExecutorService
@Before
fun setup() {
mutex = FiberBox(Content())
realClock = Clock.systemUTC()
stoppedClock = Clock.fixed(realClock.instant(), realClock.zone)
executor = Executors.newSingleThreadExecutor()
}
@After
fun teardown() {
executor.shutdown()
}
@Test
fun `write and read`() {
mutex.write { integer = 1 }
assertEquals(1, mutex.read { integer })
}
@Test
fun `readWithDeadline with no wait`() {
val advancedClock = Clock.offset(stoppedClock, 1.hours)
mutex.write { integer = 1 }
assertEquals(1, mutex.readWithDeadline(realClock, advancedClock.instant()) { integer })
}
@Test
fun `readWithDeadline with stopped clock and background write`() {
val advancedClock = Clock.offset(stoppedClock, 1.hours)
assertEquals(1, mutex.readWithDeadline(stoppedClock, advancedClock.instant()) {
backgroundWrite()
if (integer == 1) 1 else throw TestRetryableException("Not 1")
})
}
@Test(expected = TestRetryableException::class)
fun `readWithDeadline with clock advanced`() {
val advancedClock = Clock.offset(stoppedClock, 1.hours)
val testClock = TestClock(stoppedClock)
assertEquals(1, mutex.readWithDeadline(testClock, advancedClock.instant()) {
backgroundAdvanceClock(testClock, 1.hours)
if (integer == 1) 0 else throw TestRetryableException("Not 1")
})
}
@Test
fun `readWithDeadline with clock advanced 5x and background write`() {
val advancedClock = Clock.offset(stoppedClock, 1.hours)
val testClock = TestClock(stoppedClock)
assertEquals(5, mutex.readWithDeadline(testClock, advancedClock.instant()) {
backgroundAdvanceClock(testClock, 10.minutes)
backgroundWrite()
if (integer == 5) 5 else throw TestRetryableException("Not 5")
})
}
/**
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
* run on core module (in IntelliJ, open gradle side tab and run:
* r3prototyping -> core -> Tasks -> other -> quasarScan
*/
@Test(expected = TestRetryableException::class)
@Suspendable
fun `readWithDeadline with clock advanced on Fibers`() {
val advancedClock = Clock.offset(stoppedClock, 1.hours)
val testClock = TestClock(stoppedClock)
val future = CompletableFuture<Int>()
val scheduler = FiberExecutorScheduler("test", executor)
val fiber = scheduler.newFiber(@Suspendable {
try {
future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) {
if (integer == 1) 1 else throw TestRetryableException("Not 1")
})
} catch(e: Exception) {
future.completeExceptionally(e)
}
}).start()
for (advance in 1..6) {
scheduler.newFiber(@Suspendable {
// Wait until fiber is waiting
while (fiber.state != Strand.State.TIMED_WAITING) {
Strand.sleep(1)
}
testClock.advanceBy(10.minutes)
}).start()
}
assertEquals(2, future.getOrThrow())
}
/**
* If this test seems to hang and throw an NPE, then likely that quasar suspendables scanner has not been
* run on core module (in IntelliJ, open gradle side tab and run:
* r3prototyping -> core -> Tasks -> other -> quasarScan
*/
@Test
@Suspendable
fun `readWithDeadline with background write on Fibers`() {
val advancedClock = Clock.offset(stoppedClock, 1.hours)
val testClock = TestClock(stoppedClock)
val future = CompletableFuture<Int>()
val scheduler = FiberExecutorScheduler("test", executor)
val fiber = scheduler.newFiber(@Suspendable {
try {
future.complete(mutex.readWithDeadline(testClock, advancedClock.instant()) {
if (integer == 1) 1 else throw TestRetryableException("Not 1")
})
} catch(e: Exception) {
future.completeExceptionally(e)
}
}).start()
scheduler.newFiber(@Suspendable {
// Wait until fiber is waiting
while (fiber.state != Strand.State.TIMED_WAITING) {
Strand.sleep(1)
}
mutex.write { integer = 1 }
}).start()
assertEquals(1, future.getOrThrow())
}
private fun backgroundWrite() {
executor.execute {
mutex.write { integer += 1 }
}
}
private fun backgroundAdvanceClock(clock: TestClock, duration: Duration) {
executor.execute {
clock.advanceBy(duration)
}
}
}

View File

@ -8,12 +8,12 @@ import net.corda.contracts.Tenor
import net.corda.contracts.math.CubicSplineInterpolator
import net.corda.contracts.math.Interpolator
import net.corda.contracts.math.InterpolatorFactory
import net.corda.core.RetryableException
import net.corda.core.ThreadBox
import net.corda.core.contracts.Command
import net.corda.core.crypto.DigitalSignature
import net.corda.core.crypto.MerkleTreeException
import net.corda.core.crypto.keys
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.StartableByRPC
@ -192,7 +192,7 @@ object NodeInterestRates {
}
// TODO: can we split into two? Fix not available (retryable/transient) and unknown (permanent)
class UnknownFix(val fix: FixOf) : RetryableException("Unknown fix: $fix")
class UnknownFix(val fix: FixOf) : FlowException("Unknown fix: $fix")
// Upload the raw fix data via RPC. In a real system the oracle data would be taken from a database.
@StartableByRPC