Refactored ErrorOr into Try, with Success and Failure data sub-classes, and moved it into core.utilities

This commit is contained in:
Shams Asari 2017-07-10 12:16:00 +01:00
parent 7e8de79848
commit 7caee508ec
19 changed files with 216 additions and 209 deletions

View File

@ -1,7 +1,7 @@
package net.corda.client.mock
import net.corda.client.mock.Generator.Companion.choice
import net.corda.core.ErrorOr
import net.corda.core.utilities.Try
import java.util.*
/**
@ -12,7 +12,7 @@ import java.util.*
* [Generator.choice] picks a generator from the specified list and runs that.
* [Generator.frequency] is similar to [choice] but the probability may be specified for each generator (it is normalised before picking).
* [Generator.combine] combines two generators of A and B with a function (A, B) -> C. Variants exist for other arities.
* [Generator.bind] sequences two generators using an arbitrary A->Generator<B> function. Keep the usage of this
* [Generator.flatMap] sequences two generators using an arbitrary A->Generator<B> function. Keep the usage of this
* function minimal as it may explode the stack, especially when using recursion.
*
* There are other utilities as well, the type of which are usually descriptive.
@ -31,7 +31,7 @@ import java.util.*
*
* The above will generate a random list of animals.
*/
class Generator<out A>(val generate: (SplittableRandom) -> ErrorOr<A>) {
class Generator<out A>(val generate: (SplittableRandom) -> Try<A>) {
// Functor
fun <B> map(function: (A) -> B): Generator<B> =
@ -54,18 +54,19 @@ class Generator<out A>(val generate: (SplittableRandom) -> ErrorOr<A>) {
product<R>(other1.product(other2.product(other3.product(other4.product(pure({ e -> { d -> { c -> { b -> { a -> function(a, b, c, d, e) } } } } }))))))
// Monad
fun <B> bind(function: (A) -> Generator<B>) =
Generator { generate(it).bind { a -> function(a).generate(it) } }
fun <B> flatMap(function: (A) -> Generator<B>): Generator<B> {
return Generator { random -> generate(random).flatMap { function(it).generate(random) } }
}
companion object {
fun <A> pure(value: A) = Generator { ErrorOr(value) }
fun <A> impure(valueClosure: () -> A) = Generator { ErrorOr(valueClosure()) }
fun <A> fail(error: Exception) = Generator<A> { ErrorOr.of(error) }
fun <A> pure(value: A) = Generator { Try.Success(value) }
fun <A> impure(valueClosure: () -> A) = Generator { Try.Success(valueClosure()) }
fun <A> fail(error: Exception) = Generator<A> { Try.Failure(error) }
// Alternative
fun <A> choice(generators: List<Generator<A>>) = intRange(0, generators.size - 1).bind { generators[it] }
fun <A> choice(generators: List<Generator<A>>) = intRange(0, generators.size - 1).flatMap { generators[it] }
fun <A> success(generate: (SplittableRandom) -> A) = Generator { ErrorOr(generate(it)) }
fun <A> success(generate: (SplittableRandom) -> A) = Generator { Try.Success(generate(it)) }
fun <A> frequency(generators: List<Pair<Double, Generator<A>>>): Generator<A> {
val ranges = mutableListOf<Pair<Double, Double>>()
var current = 0.0
@ -74,11 +75,11 @@ class Generator<out A>(val generate: (SplittableRandom) -> ErrorOr<A>) {
ranges.add(Pair(current, next))
current = next
}
return doubleRange(0.0, current).bind { value ->
generators[ranges.binarySearch { range ->
if (value < range.first) {
return doubleRange(0.0, current).flatMap { value ->
generators[ranges.binarySearch { (first, second) ->
if (value < first) {
1
} else if (value < range.second) {
} else if (value < second) {
0
} else {
-1
@ -91,14 +92,12 @@ class Generator<out A>(val generate: (SplittableRandom) -> ErrorOr<A>) {
val result = mutableListOf<A>()
for (generator in generators) {
val element = generator.generate(it)
val v = element.value
if (v != null) {
result.add(v)
} else {
return@Generator ErrorOr.of(element.error!!)
when (element) {
is Try.Success -> result.add(element.value)
is Try.Failure -> return@Generator element
}
}
ErrorOr(result)
Try.Success(result)
}
}
}
@ -109,11 +108,9 @@ fun <A> Generator<A>.generateOrFail(random: SplittableRandom, numberOfTries: Int
var error: Throwable? = null
for (i in 0..numberOfTries - 1) {
val result = generate(random)
val v = result.value
if (v != null) {
return v
} else {
error = result.error
error = when (result) {
is Try.Success -> return result.value
is Try.Failure -> result.exception
}
}
if (error == null) {
@ -147,9 +144,9 @@ fun Generator.Companion.doubleRange(from: Double, to: Double): Generator<Double>
fun Generator.Companion.char() = Generator {
val codePoint = Math.abs(it.nextInt()) % (17 * (1 shl 16))
if (Character.isValidCodePoint(codePoint)) {
return@Generator ErrorOr(codePoint.toChar())
return@Generator Try.Success(codePoint.toChar())
} else {
ErrorOr.of(IllegalStateException("Could not generate valid codepoint"))
Try.Failure(IllegalStateException("Could not generate valid codepoint"))
}
}
@ -175,20 +172,19 @@ fun <A> Generator.Companion.replicatePoisson(meanSize: Double, generator: Genera
val result = mutableListOf<A>()
var finish = false
while (!finish) {
val errorOr = Generator.doubleRange(0.0, 1.0).generate(it).bind { value ->
val result = Generator.doubleRange(0.0, 1.0).generate(it).flatMap { value ->
if (value < chance) {
generator.generate(it).map { result.add(it) }
} else {
finish = true
ErrorOr(Unit)
Try.Success(Unit)
}
}
val e = errorOr.error
if (e != null) {
return@Generator ErrorOr.of(e)
if (result is Try.Failure) {
return@Generator result
}
}
ErrorOr(result)
Try.Success(result)
}
fun <A> Generator.Companion.pickOne(list: List<A>) = Generator.intRange(0, list.size - 1).map { list[it] }
@ -211,7 +207,7 @@ fun <A> Generator.Companion.pickN(number: Int, list: List<A>) = Generator<List<A
resultList.add(a)
}
}
ErrorOr(resultList)
Try.Success(resultList)
}
fun <A> Generator.Companion.sampleBernoulli(maxRatio: Double = 1.0, vararg collection: A) =

View File

@ -8,15 +8,19 @@ import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.util.concurrent.Futures
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.*
import net.corda.core.crypto.random63BitValue
import net.corda.core.future
import net.corda.core.getOrThrow
import net.corda.core.messaging.RPCOps
import net.corda.core.millis
import net.corda.core.seconds
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.driver.poll
import net.corda.core.utilities.Try
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.RPCKryo
import net.corda.testing.*
import net.corda.testing.driver.poll
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
@ -25,7 +29,10 @@ import rx.Observable
import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject
import java.time.Duration
import java.util.concurrent.*
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
class RPCStabilityTests {
@ -78,9 +85,9 @@ class RPCStabilityTests {
val executor = Executors.newScheduledThreadPool(1)
fun startAndStop() {
rpcDriver {
ErrorOr.catch { startRpcClient<RPCOps>(NetworkHostAndPort("localhost", 9999)).get() }
Try.on { startRpcClient<RPCOps>(NetworkHostAndPort("localhost", 9999)).get() }
val server = startRpcServer<RPCOps>(ops = DummyOps)
ErrorOr.catch { startRpcClient<RPCOps>(
Try.on { startRpcClient<RPCOps>(
server.get().broker.hostAndPort!!,
configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1)
).get() }

View File

@ -12,9 +12,9 @@ import com.google.common.cache.RemovalListener
import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.ThreadBox
import net.corda.core.crypto.random63BitValue
import net.corda.core.getOrThrow
import net.corda.core.messaging.RPCOps
import net.corda.core.crypto.random63BitValue
import net.corda.core.serialization.KryoPoolWithContext
import net.corda.core.utilities.*
import net.corda.nodeapi.*
@ -229,14 +229,15 @@ class RPCClientProxyHandler(
if (replyFuture == null) {
log.error("RPC reply arrived to unknown RPC ID ${serverToClient.id}, this indicates an internal RPC error.")
} else {
val rpcCallSite = callSiteMap?.get(serverToClient.id.toLong)
serverToClient.result.match(
onError = {
if (rpcCallSite != null) addRpcCallSiteToThrowable(it, rpcCallSite)
replyFuture.setException(it)
},
onValue = { replyFuture.set(it) }
)
val result = serverToClient.result
when (result) {
is Try.Success -> replyFuture.set(result.value)
is Try.Failure -> {
val rpcCallSite = callSiteMap?.get(serverToClient.id.toLong)
if (rpcCallSite != null) addRpcCallSiteToThrowable(result.exception, rpcCallSite)
replyFuture.setException(result.exception)
}
}
}
}
is RPCApi.ServerToClient.Observation -> {

View File

@ -23,7 +23,10 @@ import java.nio.file.*
import java.nio.file.attribute.FileAttribute
import java.time.Duration
import java.time.temporal.Temporal
import java.util.concurrent.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import java.util.stream.Stream
import java.util.zip.Deflater
@ -324,63 +327,6 @@ data class InputStreamAndHash(val inputStream: InputStream, val sha256: SecureHa
val Throwable.rootCause: Throwable get() = Throwables.getRootCause(this)
/** Representation of an operation that may have thrown an error. */
@Suppress("DataClassPrivateConstructor")
@CordaSerializable
data class ErrorOr<out A> private constructor(val value: A?, val error: Throwable?) {
// The ErrorOr holds a value iff error == null
constructor(value: A) : this(value, null)
companion object {
/** Runs the given lambda and wraps the result. */
inline fun <T> catch(body: () -> T): ErrorOr<T> {
return try {
ErrorOr(body())
} catch (t: Throwable) {
ErrorOr.of(t)
}
}
fun of(t: Throwable) = ErrorOr(null, t)
}
fun <T> match(onValue: (A) -> T, onError: (Throwable) -> T): T {
if (error == null) {
return onValue(value as A)
} else {
return onError(error)
}
}
fun getOrThrow(): A {
if (error == null) {
return value as A
} else {
throw error
}
}
// Functor
fun <B> map(function: (A) -> B) = ErrorOr(value?.let(function), error)
// Applicative
fun <B, C> combine(other: ErrorOr<B>, function: (A, B) -> C): ErrorOr<C> {
val newError = error ?: other.error
return ErrorOr(if (newError != null) null else function(value as A, other.value as B), newError)
}
// Monad
fun <B> bind(function: (A) -> ErrorOr<B>): ErrorOr<B> {
return if (error == null) {
function(value as A)
} else {
ErrorOr.of(error)
}
}
fun mapError(function: (Throwable) -> Throwable) = ErrorOr(value, error?.let(function))
}
/**
* Returns an Observable that buffers events until subscribed.
* @see UnicastSubject

View File

@ -1,7 +1,6 @@
package net.corda.core.messaging
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.ErrorOr
import net.corda.core.contracts.Amount
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateAndRef
@ -19,6 +18,7 @@ import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Try
import org.bouncycastle.asn1.x500.X500Name
import rx.Observable
import java.io.InputStream
@ -44,7 +44,7 @@ sealed class StateMachineUpdate {
override val id: StateMachineRunId get() = stateMachineInfo.id
}
data class Removed(override val id: StateMachineRunId, val result: ErrorOr<*>) : StateMachineUpdate()
data class Removed(override val id: StateMachineRunId, val result: Try<*>) : StateMachineUpdate()
}
@CordaSerializable

View File

@ -0,0 +1,74 @@
package net.corda.core.utilities
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.Try.Failure
import net.corda.core.utilities.Try.Success
/**
* Representation of an operation that has either succeeded with a result (represented by [Success]) or failed with an
* exception (represented by [Failure]).
*/
@CordaSerializable
sealed class Try<out A> {
companion object {
/**
* Executes the given block of code and returns a [Success] capturing the result, or a [Failure] if an exception
* is thrown.
*/
@JvmStatic
inline fun <T> on(body: () -> T): Try<T> {
return try {
Success(body())
} catch (t: Throwable) {
Failure(t)
}
}
}
/** Returns `true` iff the [Try] is a [Success]. */
abstract val isFailure: Boolean
/** Returns `true` iff the [Try] is a [Failure]. */
abstract val isSuccess: Boolean
/** Returns the value if a [Success] otherwise throws the exception if a [Failure]. */
abstract fun getOrThrow(): A
/** Maps the given function to the value from this [Success], or returns `this` if this is a [Failure]. */
inline fun <B> map(function: (A) -> B): Try<B> = when (this) {
is Success -> Success(function(value))
is Failure -> this
}
/** Returns the given function applied to the value from this [Success], or returns `this` if this is a [Failure]. */
inline fun <B> flatMap(function: (A) -> Try<B>): Try<B> = when (this) {
is Success -> function(value)
is Failure -> this
}
/**
* Maps the given function to the values from this [Success] and [other], or returns `this` if this is a [Failure]
* or [other] if [other] is a [Failure].
*/
inline fun <B, C> combine(other: Try<B>, function: (A, B) -> C): Try<C> = when (this) {
is Success -> when (other) {
is Success -> Success(function(value, other.value))
is Failure -> other
}
is Failure -> this
}
data class Success<out A>(val value: A) : Try<A>() {
override val isSuccess: Boolean get() = true
override val isFailure: Boolean get() = false
override fun getOrThrow(): A = value
override fun toString(): String = "Success($value)"
}
data class Failure(val exception: Throwable) : Try<Nothing>() {
override val isSuccess: Boolean get() = false
override val isFailure: Boolean get() = true
override fun getOrThrow(): Nothing = throw exception
override fun toString(): String = "Failure($exception)"
}
}

View File

@ -1,13 +1,14 @@
package net.corda.nodeapi
import com.esotericsoftware.kryo.pool.KryoPool
import net.corda.core.ErrorOr
import net.corda.core.serialization.KryoPoolWithContext
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.Try
import net.corda.nodeapi.RPCApi.ClientToServer
import net.corda.nodeapi.RPCApi.ObservableId
import net.corda.nodeapi.RPCApi.RPC_CLIENT_BINDING_REMOVALS
import net.corda.nodeapi.RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION
import net.corda.nodeapi.RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX
import net.corda.nodeapi.RPCApi.RPC_SERVER_QUEUE_NAME
import net.corda.nodeapi.RPCApi.RpcRequestId
@ -151,7 +152,7 @@ object RPCApi {
data class RpcReply(
val id: RpcRequestId,
val result: ErrorOr<Any?>
val result: Try<Any?>
) : ServerToClient() {
override fun writeToClientMessage(kryoPool: KryoPool, message: ClientMessage) {
message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REPLY.ordinal)

View File

@ -1,11 +1,9 @@
package net.corda.node.services
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.ErrorOr
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionType
import net.corda.testing.contracts.DummyContract
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.div
@ -13,6 +11,7 @@ import net.corda.core.getOrThrow
import net.corda.core.identity.Party
import net.corda.core.node.services.ServiceInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.Try
import net.corda.flows.NotaryError
import net.corda.flows.NotaryException
import net.corda.flows.NotaryFlow
@ -23,10 +22,10 @@ import net.corda.node.services.transactions.minClusterSize
import net.corda.node.services.transactions.minCorrectReplicas
import net.corda.node.utilities.ServiceIdentityGenerator
import net.corda.node.utilities.transaction
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.MockNetwork
import org.bouncycastle.asn1.x500.X500Name
import org.junit.After
import org.junit.Ignore
import org.junit.Test
import java.nio.file.Files
import kotlin.test.assertEquals
@ -95,10 +94,10 @@ class BFTNotaryServiceTests {
val flows = spendTxs.map { NotaryFlow.Client(it) }
val stateMachines = flows.map { services.startFlow(it) }
mockNet.runNetwork()
val results = stateMachines.map { ErrorOr.catch { it.resultFuture.getOrThrow() } }
val results = stateMachines.map { Try.on { it.resultFuture.getOrThrow() } }
val successfulIndex = results.mapIndexedNotNull { index, result ->
if (result.error == null) {
val signers = result.getOrThrow().map { it.by }
if (result is Try.Success) {
val signers = result.value.map { it.by }
assertEquals(minCorrectReplicas(clusterSize), signers.size)
signers.forEach {
assertTrue(it in (notary.owningKey as CompositeKey).leafKeys)
@ -109,8 +108,8 @@ class BFTNotaryServiceTests {
}
}.single()
spendTxs.zip(results).forEach { (tx, result) ->
if (result.error != null) {
val error = (result.error as NotaryException).error as NotaryError.Conflict
if (result is Try.Failure) {
val error = (result.exception as NotaryException).error as NotaryError.Conflict
assertEquals(tx.id, error.txId)
val (stateRef, consumingTx) = error.conflict.verified().stateHistory.entries.single()
assertEquals(StateRef(issueTx.id, 0), stateRef)

View File

@ -12,15 +12,11 @@ import com.google.common.collect.HashMultimap
import com.google.common.collect.Multimaps
import com.google.common.collect.SetMultimap
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.ErrorOr
import net.corda.core.messaging.RPCOps
import net.corda.core.crypto.random63BitValue
import net.corda.core.messaging.RPCOps
import net.corda.core.seconds
import net.corda.core.serialization.KryoPoolWithContext
import net.corda.core.utilities.LazyStickyPool
import net.corda.core.utilities.LifeCycle
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.*
import net.corda.node.services.RPCUserService
import net.corda.nodeapi.*
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
@ -43,7 +39,6 @@ import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.time.Duration
import java.util.concurrent.*
import kotlin.collections.ArrayList
data class RPCServerConfiguration(
/** The number of threads to use for handling RPC requests */
@ -270,14 +265,7 @@ class RPCServer(
)
rpcExecutor!!.submit {
val result = invokeRpc(rpcContext, clientToServer.methodName, clientToServer.arguments)
val resultWithExceptionUnwrapped = result.mapError {
if (it is InvocationTargetException) {
it.cause ?: RPCException("Caught InvocationTargetException without cause")
} else {
it
}
}
sendReply(clientToServer.id, clientToServer.clientAddress, resultWithExceptionUnwrapped)
sendReply(clientToServer.id, clientToServer.clientAddress, result)
}
}
is RPCApi.ClientToServer.ObservablesClosed -> {
@ -287,25 +275,24 @@ class RPCServer(
artemisMessage.acknowledge()
}
private fun invokeRpc(rpcContext: RpcContext, methodName: String, arguments: List<Any?>): ErrorOr<Any> {
return ErrorOr.catch {
private fun invokeRpc(rpcContext: RpcContext, methodName: String, arguments: List<Any?>): Try<Any> {
return Try.on {
try {
CURRENT_RPC_CONTEXT.set(rpcContext)
log.debug { "Calling $methodName" }
val method = methodTable[methodName] ?:
throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?")
method.invoke(ops, *arguments.toTypedArray())
} catch (e: InvocationTargetException) {
throw e.cause ?: RPCException("Caught InvocationTargetException without cause")
} finally {
CURRENT_RPC_CONTEXT.remove()
}
}
}
private fun sendReply(requestId: RPCApi.RpcRequestId, clientAddress: SimpleString, resultWithExceptionUnwrapped: ErrorOr<Any>) {
val reply = RPCApi.ServerToClient.RpcReply(
id = requestId,
result = resultWithExceptionUnwrapped
)
private fun sendReply(requestId: RPCApi.RpcRequestId, clientAddress: SimpleString, result: Try<Any>) {
val reply = RPCApi.ServerToClient.RpcReply(requestId, result)
val observableContext = ObservableContext(
requestId,
observableMap,

View File

@ -7,18 +7,14 @@ import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.DeclaredField.Companion.declaredField
import net.corda.core.ErrorOr
import net.corda.core.abbreviate
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.crypto.random63BitValue
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.debug
import net.corda.core.utilities.trace
import net.corda.core.utilities.*
import net.corda.node.services.api.FlowAppAuditEvent
import net.corda.node.services.api.FlowPermissionAuditEvent
import net.corda.node.services.api.ServiceHubInternal
@ -71,7 +67,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Transient override lateinit var serviceHub: ServiceHubInternal
@Transient internal lateinit var database: Database
@Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit
@Transient internal lateinit var actionOnEnd: (ErrorOr<R>, Boolean) -> Unit
@Transient internal lateinit var actionOnEnd: (Try<R>, Boolean) -> Unit
@Transient internal var fromCheckpoint: Boolean = false
@Transient private var txTrampoline: Transaction? = null
@ -125,7 +121,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
.filter { it.state is FlowSessionState.Initiating }
.forEach { it.waitForConfirmation() }
// This is to prevent actionOnEnd being called twice if it throws an exception
actionOnEnd(ErrorOr(result), false)
actionOnEnd(Try.Success(result), false)
_resultFuture?.set(result)
logic.progressTracker?.currentStep = ProgressTracker.DONE
logger.debug { "Flow finished with result $result" }
@ -138,7 +134,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
private fun processException(exception: Throwable, propagated: Boolean) {
actionOnEnd(ErrorOr.of(exception), propagated)
actionOnEnd(Try.Failure(exception), propagated)
_resultFuture?.setException(exception)
logic.progressTracker?.endWithError(exception)
}

View File

@ -15,7 +15,8 @@ import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import io.requery.util.CloseableIterator
import net.corda.core.*
import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowException
@ -25,6 +26,8 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.then
import net.corda.core.utilities.Try
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
@ -122,7 +125,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
abstract val logic: FlowLogic<*>
data class Add(override val logic: FlowLogic<*>) : Change()
data class Removed(override val logic: FlowLogic<*>, val result: ErrorOr<*>) : Change()
data class Removed(override val logic: FlowLogic<*>, val result: Try<*>) : Change()
}
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
@ -442,13 +445,13 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
processIORequest(ioRequest)
decrementLiveFibers()
}
fiber.actionOnEnd = { resultOrError, propagated ->
fiber.actionOnEnd = { result, propagated ->
try {
mutex.locked {
stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) }
notifyChangeObservers(Change.Removed(fiber.logic, resultOrError))
notifyChangeObservers(Change.Removed(fiber.logic, result))
}
endAllFiberSessions(fiber, resultOrError.error, propagated)
endAllFiberSessions(fiber, result, propagated)
} finally {
fiber.commitTransaction()
decrementLiveFibers()
@ -463,10 +466,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, exception: Throwable?, propagated: Boolean) {
private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, result: Try<*>, propagated: Boolean) {
openSessions.values.removeIf { session ->
if (session.fiber == fiber) {
session.endSession(exception, propagated)
session.endSession((result as? Try.Failure)?.exception, propagated)
true
} else {
false

View File

@ -1,20 +1,22 @@
package net.corda.node.shell
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.ErrorOr
import net.corda.core.crypto.commonName
import net.corda.core.flows.FlowInitiator
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.StateMachineUpdate.Added
import net.corda.core.messaging.StateMachineUpdate.Removed
import net.corda.core.then
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.Try
import org.crsh.text.Color
import org.crsh.text.Decoration
import org.crsh.text.RenderPrintWriter
import org.crsh.text.ui.LabelElement
import org.crsh.text.ui.TableElement
import org.crsh.text.ui.Overflow
import org.crsh.text.ui.RowElement
import org.crsh.text.ui.TableElement
import rx.Subscriber
class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Subscriber<Any>() {
@ -51,10 +53,10 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub
future.setException(e)
}
private fun stateColor(smmUpdate: StateMachineUpdate): Color {
return when(smmUpdate){
is StateMachineUpdate.Added -> Color.blue
is StateMachineUpdate.Removed -> smmUpdate.result.match({ Color.green } , { Color.red })
private fun stateColor(update: StateMachineUpdate): Color {
return when (update) {
is Added -> Color.blue
is Removed -> if (update.result.isSuccess) Color.green else Color.red
}
}
@ -68,7 +70,7 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub
// TODO Add progress tracker?
private fun createStateMachinesRow(smmUpdate: StateMachineUpdate) {
when (smmUpdate) {
is StateMachineUpdate.Added -> {
is Added -> {
table.add(RowElement().add(
LabelElement(formatFlowId(smmUpdate.id)),
LabelElement(formatFlowName(smmUpdate.stateMachineInfo.flowLogicClassName)),
@ -77,7 +79,7 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub
).style(stateColor(smmUpdate).fg()))
indexMap[smmUpdate.id] = table.rows.size - 1
}
is StateMachineUpdate.Removed -> {
is Removed -> {
val idx = indexMap[smmUpdate.id]
if (idx != null) {
val oldRow = table.rows[idx]
@ -114,7 +116,7 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub
}
}
private fun formatFlowResult(flowResult: ErrorOr<*>): String {
private fun formatFlowResult(flowResult: Try<*>): String {
fun successFormat(value: Any?): String {
return when(value) {
is SignedTransaction -> "Tx ID: " + value.id.toString()
@ -123,6 +125,9 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub
else -> value.toString()
}
}
return flowResult.match({ successFormat(it) }, { it.message ?: it.toString() })
return when (flowResult) {
is Try.Success -> successFormat(flowResult.value)
is Try.Failure -> flowResult.exception.message ?: flowResult.exception.toString()
}
}
}

View File

@ -1,6 +1,5 @@
package net.corda.node.utilities
import net.corda.core.ErrorOr
import net.corda.core.serialization.CordaSerializable
/**

View File

@ -19,10 +19,7 @@ import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.WHITESPACE
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.parseNetworkHostAndPort
import net.corda.core.utilities.*
import net.corda.node.internal.Node
import net.corda.node.internal.NodeStartup
import net.corda.node.serialization.NodeClock
@ -352,15 +349,16 @@ fun <A> poll(
if (++counter == warnCount) {
log.warn("Been polling $pollName for ${pollInterval.multipliedBy(warnCount.toLong()).seconds} seconds...")
}
ErrorOr.catch(check).match(onValue = {
if (it != null) {
resultFuture.set(it)
try {
val checkResult = check()
if (checkResult != null) {
resultFuture.set(checkResult)
} else {
executorService.schedule(this, pollInterval.toMillis(), MILLISECONDS)
}
}, onError = {
resultFuture.setException(it)
})
} catch (t: Throwable) {
resultFuture.setException(t)
}
}
}
executorService.submit(task) // The check may be expensive, so always run it in the background even the first time.
@ -389,7 +387,7 @@ class ShutdownManager(private val executorService: ExecutorService) {
}
fun shutdown() {
val shutdownFutures = state.locked {
val shutdownActionFutures = state.locked {
if (isShutdown) {
emptyList<ListenableFuture<() -> Unit>>()
} else {
@ -397,21 +395,16 @@ class ShutdownManager(private val executorService: ExecutorService) {
registeredShutdowns
}
}
val shutdowns = shutdownFutures.map { ErrorOr.catch { it.getOrThrow(1.seconds) } }
shutdowns.reversed().forEach { errorOrShutdown ->
errorOrShutdown.match(
onValue = { shutdown ->
try {
shutdown()
} catch (throwable: Throwable) {
log.error("Exception while shutting down", throwable)
}
},
onError = { error ->
log.error("Exception while getting shutdown method, disregarding", error)
}
)
}
val shutdowns = shutdownActionFutures.map { Try.on { it.getOrThrow(1.seconds) } }
shutdowns.reversed().forEach { when (it) {
is Try.Success ->
try {
it.value()
} catch (t: Throwable) {
log.warn("Exception while shutting down", t)
}
is Try.Failure -> log.warn("Exception while getting shutdown method, disregarding", it.exception)
} }
}
fun registerShutdown(shutdown: ListenableFuture<() -> Unit>) {

View File

@ -117,7 +117,7 @@ val crossCashTest = LoadTest<CrossCashCommand, CrossCashState>(
generate = { (nodeVaults), parallelism ->
val nodeMap = simpleNodes.associateBy { it.info.legalIdentity }
val anonymous = true
Generator.pickN(parallelism, simpleNodes).bind { nodes ->
Generator.pickN(parallelism, simpleNodes).flatMap { nodes ->
Generator.sequence(
nodes.map { node ->
val quantities = nodeVaults[node.info.legalIdentity] ?: mapOf()

View File

@ -6,7 +6,6 @@ import net.corda.client.mock.pickOne
import net.corda.client.mock.replicate
import net.corda.contracts.asset.DUMMY_CASH_ISSUER
import net.corda.contracts.asset.DUMMY_CASH_ISSUER_KEY
import net.corda.testing.contracts.DummyContract
import net.corda.core.flows.FlowException
import net.corda.core.messaging.startFlow
import net.corda.core.thenMatch
@ -14,6 +13,7 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.flows.FinalityFlow
import net.corda.loadtest.LoadTest
import net.corda.loadtest.NodeConnection
import net.corda.testing.contracts.DummyContract
import net.corda.testing.node.MockServices
import org.slf4j.LoggerFactory
@ -25,7 +25,7 @@ val dummyNotarisationTest = LoadTest<NotariseCommand, Unit>(
"Notarising dummy transactions",
generate = { _, _ ->
val issuerServices = MockServices(DUMMY_CASH_ISSUER_KEY)
val generateTx = Generator.pickOne(simpleNodes).bind { node ->
val generateTx = Generator.pickOne(simpleNodes).flatMap { node ->
Generator.int().map {
val issueBuilder = DummyContract.generateInitial(it, notary.info.notaryIdentity, DUMMY_CASH_ISSUER)
val issueTx = issuerServices.signInitialTransaction(issueBuilder)

View File

@ -37,12 +37,12 @@ val selfIssueTest = LoadTest<SelfIssueCommand, SelfIssueState>(
"Self issuing cash randomly",
generate = { _, parallelism ->
val generateIssue = Generator.pickOne(simpleNodes).bind { node ->
val generateIssue = Generator.pickOne(simpleNodes).flatMap { node ->
generateIssue(1000, USD, notary.info.notaryIdentity, listOf(node.info.legalIdentity), anonymous = true).map {
SelfIssueCommand(it, node)
}
}
Generator.replicatePoisson(parallelism.toDouble(), generateIssue).bind {
Generator.replicatePoisson(parallelism.toDouble(), generateIssue).flatMap {
// We need to generate at least one
if (it.isEmpty()) {
Generator.sequence(listOf(generateIssue))

View File

@ -58,7 +58,7 @@ data class GeneratedLedger(
* Invariants: The input list must be empty.
*/
val issuanceGenerator: Generator<Pair<WireTransaction, GeneratedLedger>> by lazy {
val outputsGen = outputsGenerator.bind { outputs ->
val outputsGen = outputsGenerator.flatMap { outputs ->
Generator.sequence(
outputs.map { output ->
pickOneOrMaybeNew(identities, partyGenerator).map { notary ->
@ -140,7 +140,7 @@ data class GeneratedLedger(
fun notaryChangeTransactionGenerator(inputNotary: Party, inputsToChooseFrom: List<StateAndRef<ContractState>>): Generator<Pair<WireTransaction, GeneratedLedger>> {
val newNotaryGen = pickOneOrMaybeNew(identities - inputNotary, partyGenerator)
val inputsGen = Generator.sampleBernoulli(inputsToChooseFrom)
return inputsGen.bind { inputs ->
return inputsGen.flatMap { inputs ->
val signers: List<PublicKey> = (inputs.flatMap { it.state.data.participants } + inputNotary).map { it.owningKey }
val outputsGen = Generator.sequence(inputs.map { input -> newNotaryGen.map { TransactionState(input.state.data, it, null) } })
outputsGen.combine(attachmentsGenerator) { outputs, txAttachments ->
@ -177,7 +177,7 @@ data class GeneratedLedger(
if (availableOutputs.isEmpty()) {
issuanceGenerator
} else {
Generator.pickOne(availableOutputs.keys.toList()).bind { inputNotary ->
Generator.pickOne(availableOutputs.keys.toList()).flatMap { inputNotary ->
val inputsToChooseFrom = availableOutputs[inputNotary]!!
Generator.frequency(
0.3 to issuanceGenerator,
@ -231,7 +231,7 @@ fun <A> pickOneOrMaybeNew(from: Collection<A>, generator: Generator<A>): Generat
if (from.isEmpty()) {
return generator
} else {
return generator.bind {
return generator.flatMap {
Generator.pickOne(from + it)
}
}

View File

@ -3,8 +3,6 @@ package net.corda.verifier
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import net.corda.core.ErrorOr
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.core.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.debug
@ -15,6 +13,7 @@ import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.config.NodeSSLConfiguration
import net.corda.nodeapi.config.getValue
import net.corda.nodeapi.internal.addShutdownHook
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import java.nio.file.Path
import java.nio.file.Paths
@ -61,14 +60,15 @@ class Verifier {
consumer.setMessageHandler {
val request = VerifierApi.VerificationRequest.fromClientMessage(it)
log.debug { "Received verification request with id ${request.verificationId}" }
val result = ErrorOr.catch {
val error = try {
request.transaction.verify()
}
if (result.error != null) {
log.debug { "Verification returned with error ${result.error}" }
null
} catch (t: Throwable) {
log.debug("Verification returned with error:", t)
t
}
val reply = session.createMessage(false)
val response = VerifierApi.VerificationResponse(request.verificationId, result.error)
val response = VerifierApi.VerificationResponse(request.verificationId, error)
response.writeToClientMessage(reply)
replyProducer.send(request.responseAddress, reply)
it.acknowledge()