From 7caee508ec42848e800c36a931cbf7fee2369241 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Mon, 10 Jul 2017 12:16:00 +0100 Subject: [PATCH] Refactored ErrorOr into Try, with Success and Failure data sub-classes, and moved it into core.utilities --- .../kotlin/net/corda/client/mock/Generator.kt | 64 ++++++++-------- .../net/corda/client/rpc/RPCStabilityTests.kt | 17 +++-- .../rpc/internal/RPCClientProxyHandler.kt | 19 ++--- core/src/main/kotlin/net/corda/core/Utils.kt | 62 +--------------- .../net/corda/core/messaging/CordaRPCOps.kt | 4 +- .../kotlin/net/corda/core/utilities/Try.kt | 74 +++++++++++++++++++ .../main/kotlin/net/corda/nodeapi/RPCApi.kt | 5 +- .../node/services/BFTNotaryServiceTests.kt | 15 ++-- .../node/services/messaging/RPCServer.kt | 31 +++----- .../statemachine/FlowStateMachineImpl.kt | 14 ++-- .../statemachine/StateMachineManager.kt | 17 +++-- .../node/shell/FlowWatchPrintingSubscriber.kt | 25 ++++--- .../net/corda/node/utilities/AddOrRemove.kt | 1 - .../kotlin/net/corda/testing/driver/Driver.kt | 45 +++++------ .../net/corda/loadtest/tests/CrossCashTest.kt | 2 +- .../net/corda/loadtest/tests/NotaryTest.kt | 4 +- .../net/corda/loadtest/tests/SelfIssueTest.kt | 4 +- .../net/corda/verifier/GeneratedLedger.kt | 8 +- .../kotlin/net/corda/verifier/Verifier.kt | 14 ++-- 19 files changed, 216 insertions(+), 209 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/utilities/Try.kt diff --git a/client/mock/src/main/kotlin/net/corda/client/mock/Generator.kt b/client/mock/src/main/kotlin/net/corda/client/mock/Generator.kt index 74c052e723..9748e2a2ca 100644 --- a/client/mock/src/main/kotlin/net/corda/client/mock/Generator.kt +++ b/client/mock/src/main/kotlin/net/corda/client/mock/Generator.kt @@ -1,7 +1,7 @@ package net.corda.client.mock import net.corda.client.mock.Generator.Companion.choice -import net.corda.core.ErrorOr +import net.corda.core.utilities.Try import java.util.* /** @@ -12,7 +12,7 @@ import java.util.* * [Generator.choice] picks a generator from the specified list and runs that. * [Generator.frequency] is similar to [choice] but the probability may be specified for each generator (it is normalised before picking). * [Generator.combine] combines two generators of A and B with a function (A, B) -> C. Variants exist for other arities. - * [Generator.bind] sequences two generators using an arbitrary A->Generator function. Keep the usage of this + * [Generator.flatMap] sequences two generators using an arbitrary A->Generator function. Keep the usage of this * function minimal as it may explode the stack, especially when using recursion. * * There are other utilities as well, the type of which are usually descriptive. @@ -31,7 +31,7 @@ import java.util.* * * The above will generate a random list of animals. */ -class Generator(val generate: (SplittableRandom) -> ErrorOr) { +class Generator(val generate: (SplittableRandom) -> Try) { // Functor fun map(function: (A) -> B): Generator = @@ -54,18 +54,19 @@ class Generator(val generate: (SplittableRandom) -> ErrorOr) { product(other1.product(other2.product(other3.product(other4.product(pure({ e -> { d -> { c -> { b -> { a -> function(a, b, c, d, e) } } } } })))))) // Monad - fun bind(function: (A) -> Generator) = - Generator { generate(it).bind { a -> function(a).generate(it) } } + fun flatMap(function: (A) -> Generator): Generator { + return Generator { random -> generate(random).flatMap { function(it).generate(random) } } + } companion object { - fun pure(value: A) = Generator { ErrorOr(value) } - fun impure(valueClosure: () -> A) = Generator { ErrorOr(valueClosure()) } - fun fail(error: Exception) = Generator { ErrorOr.of(error) } + fun pure(value: A) = Generator { Try.Success(value) } + fun impure(valueClosure: () -> A) = Generator { Try.Success(valueClosure()) } + fun fail(error: Exception) = Generator { Try.Failure(error) } // Alternative - fun choice(generators: List>) = intRange(0, generators.size - 1).bind { generators[it] } + fun choice(generators: List>) = intRange(0, generators.size - 1).flatMap { generators[it] } - fun success(generate: (SplittableRandom) -> A) = Generator { ErrorOr(generate(it)) } + fun success(generate: (SplittableRandom) -> A) = Generator { Try.Success(generate(it)) } fun frequency(generators: List>>): Generator { val ranges = mutableListOf>() var current = 0.0 @@ -74,11 +75,11 @@ class Generator(val generate: (SplittableRandom) -> ErrorOr) { ranges.add(Pair(current, next)) current = next } - return doubleRange(0.0, current).bind { value -> - generators[ranges.binarySearch { range -> - if (value < range.first) { + return doubleRange(0.0, current).flatMap { value -> + generators[ranges.binarySearch { (first, second) -> + if (value < first) { 1 - } else if (value < range.second) { + } else if (value < second) { 0 } else { -1 @@ -91,14 +92,12 @@ class Generator(val generate: (SplittableRandom) -> ErrorOr) { val result = mutableListOf() for (generator in generators) { val element = generator.generate(it) - val v = element.value - if (v != null) { - result.add(v) - } else { - return@Generator ErrorOr.of(element.error!!) + when (element) { + is Try.Success -> result.add(element.value) + is Try.Failure -> return@Generator element } } - ErrorOr(result) + Try.Success(result) } } } @@ -109,11 +108,9 @@ fun Generator.generateOrFail(random: SplittableRandom, numberOfTries: Int var error: Throwable? = null for (i in 0..numberOfTries - 1) { val result = generate(random) - val v = result.value - if (v != null) { - return v - } else { - error = result.error + error = when (result) { + is Try.Success -> return result.value + is Try.Failure -> result.exception } } if (error == null) { @@ -147,9 +144,9 @@ fun Generator.Companion.doubleRange(from: Double, to: Double): Generator fun Generator.Companion.char() = Generator { val codePoint = Math.abs(it.nextInt()) % (17 * (1 shl 16)) if (Character.isValidCodePoint(codePoint)) { - return@Generator ErrorOr(codePoint.toChar()) + return@Generator Try.Success(codePoint.toChar()) } else { - ErrorOr.of(IllegalStateException("Could not generate valid codepoint")) + Try.Failure(IllegalStateException("Could not generate valid codepoint")) } } @@ -175,20 +172,19 @@ fun Generator.Companion.replicatePoisson(meanSize: Double, generator: Genera val result = mutableListOf() var finish = false while (!finish) { - val errorOr = Generator.doubleRange(0.0, 1.0).generate(it).bind { value -> + val result = Generator.doubleRange(0.0, 1.0).generate(it).flatMap { value -> if (value < chance) { generator.generate(it).map { result.add(it) } } else { finish = true - ErrorOr(Unit) + Try.Success(Unit) } } - val e = errorOr.error - if (e != null) { - return@Generator ErrorOr.of(e) + if (result is Try.Failure) { + return@Generator result } } - ErrorOr(result) + Try.Success(result) } fun Generator.Companion.pickOne(list: List) = Generator.intRange(0, list.size - 1).map { list[it] } @@ -211,7 +207,7 @@ fun Generator.Companion.pickN(number: Int, list: List) = Generator Generator.Companion.sampleBernoulli(maxRatio: Double = 1.0, vararg collection: A) = diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index a76ae661ad..ac524995c7 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -8,15 +8,19 @@ import com.esotericsoftware.kryo.pool.KryoPool import com.google.common.util.concurrent.Futures import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.RPCClientConfiguration -import net.corda.core.* import net.corda.core.crypto.random63BitValue +import net.corda.core.future +import net.corda.core.getOrThrow import net.corda.core.messaging.RPCOps +import net.corda.core.millis +import net.corda.core.seconds import net.corda.core.utilities.NetworkHostAndPort -import net.corda.testing.driver.poll +import net.corda.core.utilities.Try import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.RPCApi import net.corda.nodeapi.RPCKryo import net.corda.testing.* +import net.corda.testing.driver.poll import org.apache.activemq.artemis.api.core.SimpleString import org.junit.Assert.assertEquals import org.junit.Assert.assertTrue @@ -25,7 +29,10 @@ import rx.Observable import rx.subjects.PublishSubject import rx.subjects.UnicastSubject import java.time.Duration -import java.util.concurrent.* +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger class RPCStabilityTests { @@ -78,9 +85,9 @@ class RPCStabilityTests { val executor = Executors.newScheduledThreadPool(1) fun startAndStop() { rpcDriver { - ErrorOr.catch { startRpcClient(NetworkHostAndPort("localhost", 9999)).get() } + Try.on { startRpcClient(NetworkHostAndPort("localhost", 9999)).get() } val server = startRpcServer(ops = DummyOps) - ErrorOr.catch { startRpcClient( + Try.on { startRpcClient( server.get().broker.hostAndPort!!, configuration = RPCClientConfiguration.default.copy(minimumServerProtocolVersion = 1) ).get() } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 9d38896c19..e83363b7fc 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -12,9 +12,9 @@ import com.google.common.cache.RemovalListener import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.ThreadFactoryBuilder import net.corda.core.ThreadBox +import net.corda.core.crypto.random63BitValue import net.corda.core.getOrThrow import net.corda.core.messaging.RPCOps -import net.corda.core.crypto.random63BitValue import net.corda.core.serialization.KryoPoolWithContext import net.corda.core.utilities.* import net.corda.nodeapi.* @@ -229,14 +229,15 @@ class RPCClientProxyHandler( if (replyFuture == null) { log.error("RPC reply arrived to unknown RPC ID ${serverToClient.id}, this indicates an internal RPC error.") } else { - val rpcCallSite = callSiteMap?.get(serverToClient.id.toLong) - serverToClient.result.match( - onError = { - if (rpcCallSite != null) addRpcCallSiteToThrowable(it, rpcCallSite) - replyFuture.setException(it) - }, - onValue = { replyFuture.set(it) } - ) + val result = serverToClient.result + when (result) { + is Try.Success -> replyFuture.set(result.value) + is Try.Failure -> { + val rpcCallSite = callSiteMap?.get(serverToClient.id.toLong) + if (rpcCallSite != null) addRpcCallSiteToThrowable(result.exception, rpcCallSite) + replyFuture.setException(result.exception) + } + } } } is RPCApi.ServerToClient.Observation -> { diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt index 8bd165d370..24d8247df0 100644 --- a/core/src/main/kotlin/net/corda/core/Utils.kt +++ b/core/src/main/kotlin/net/corda/core/Utils.kt @@ -23,7 +23,10 @@ import java.nio.file.* import java.nio.file.attribute.FileAttribute import java.time.Duration import java.time.temporal.Temporal -import java.util.concurrent.* +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutionException +import java.util.concurrent.Future +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import java.util.stream.Stream import java.util.zip.Deflater @@ -324,63 +327,6 @@ data class InputStreamAndHash(val inputStream: InputStream, val sha256: SecureHa val Throwable.rootCause: Throwable get() = Throwables.getRootCause(this) -/** Representation of an operation that may have thrown an error. */ -@Suppress("DataClassPrivateConstructor") -@CordaSerializable -data class ErrorOr private constructor(val value: A?, val error: Throwable?) { - // The ErrorOr holds a value iff error == null - constructor(value: A) : this(value, null) - - companion object { - /** Runs the given lambda and wraps the result. */ - inline fun catch(body: () -> T): ErrorOr { - return try { - ErrorOr(body()) - } catch (t: Throwable) { - ErrorOr.of(t) - } - } - - fun of(t: Throwable) = ErrorOr(null, t) - } - - fun match(onValue: (A) -> T, onError: (Throwable) -> T): T { - if (error == null) { - return onValue(value as A) - } else { - return onError(error) - } - } - - fun getOrThrow(): A { - if (error == null) { - return value as A - } else { - throw error - } - } - - // Functor - fun map(function: (A) -> B) = ErrorOr(value?.let(function), error) - - // Applicative - fun combine(other: ErrorOr, function: (A, B) -> C): ErrorOr { - val newError = error ?: other.error - return ErrorOr(if (newError != null) null else function(value as A, other.value as B), newError) - } - - // Monad - fun bind(function: (A) -> ErrorOr): ErrorOr { - return if (error == null) { - function(value as A) - } else { - ErrorOr.of(error) - } - } - - fun mapError(function: (Throwable) -> Throwable) = ErrorOr(value, error?.let(function)) -} - /** * Returns an Observable that buffers events until subscribed. * @see UnicastSubject diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index fc5d9d0b4a..641b1c28e2 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -1,7 +1,6 @@ package net.corda.core.messaging import com.google.common.util.concurrent.ListenableFuture -import net.corda.core.ErrorOr import net.corda.core.contracts.Amount import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateAndRef @@ -19,6 +18,7 @@ import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.Sort import net.corda.core.serialization.CordaSerializable import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.Try import org.bouncycastle.asn1.x500.X500Name import rx.Observable import java.io.InputStream @@ -44,7 +44,7 @@ sealed class StateMachineUpdate { override val id: StateMachineRunId get() = stateMachineInfo.id } - data class Removed(override val id: StateMachineRunId, val result: ErrorOr<*>) : StateMachineUpdate() + data class Removed(override val id: StateMachineRunId, val result: Try<*>) : StateMachineUpdate() } @CordaSerializable diff --git a/core/src/main/kotlin/net/corda/core/utilities/Try.kt b/core/src/main/kotlin/net/corda/core/utilities/Try.kt new file mode 100644 index 0000000000..74c7833e66 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/utilities/Try.kt @@ -0,0 +1,74 @@ +package net.corda.core.utilities + +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.Try.Failure +import net.corda.core.utilities.Try.Success + +/** + * Representation of an operation that has either succeeded with a result (represented by [Success]) or failed with an + * exception (represented by [Failure]). + */ +@CordaSerializable +sealed class Try { + companion object { + /** + * Executes the given block of code and returns a [Success] capturing the result, or a [Failure] if an exception + * is thrown. + */ + @JvmStatic + inline fun on(body: () -> T): Try { + return try { + Success(body()) + } catch (t: Throwable) { + Failure(t) + } + } + } + + /** Returns `true` iff the [Try] is a [Success]. */ + abstract val isFailure: Boolean + + /** Returns `true` iff the [Try] is a [Failure]. */ + abstract val isSuccess: Boolean + + /** Returns the value if a [Success] otherwise throws the exception if a [Failure]. */ + abstract fun getOrThrow(): A + + /** Maps the given function to the value from this [Success], or returns `this` if this is a [Failure]. */ + inline fun map(function: (A) -> B): Try = when (this) { + is Success -> Success(function(value)) + is Failure -> this + } + + /** Returns the given function applied to the value from this [Success], or returns `this` if this is a [Failure]. */ + inline fun flatMap(function: (A) -> Try): Try = when (this) { + is Success -> function(value) + is Failure -> this + } + + /** + * Maps the given function to the values from this [Success] and [other], or returns `this` if this is a [Failure] + * or [other] if [other] is a [Failure]. + */ + inline fun combine(other: Try, function: (A, B) -> C): Try = when (this) { + is Success -> when (other) { + is Success -> Success(function(value, other.value)) + is Failure -> other + } + is Failure -> this + } + + data class Success(val value: A) : Try() { + override val isSuccess: Boolean get() = true + override val isFailure: Boolean get() = false + override fun getOrThrow(): A = value + override fun toString(): String = "Success($value)" + } + + data class Failure(val exception: Throwable) : Try() { + override val isSuccess: Boolean get() = false + override val isFailure: Boolean get() = true + override fun getOrThrow(): Nothing = throw exception + override fun toString(): String = "Failure($exception)" + } +} diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt index 836315752a..8e5aaf8cfb 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt @@ -1,13 +1,14 @@ package net.corda.nodeapi import com.esotericsoftware.kryo.pool.KryoPool -import net.corda.core.ErrorOr import net.corda.core.serialization.KryoPoolWithContext import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize +import net.corda.core.utilities.Try import net.corda.nodeapi.RPCApi.ClientToServer import net.corda.nodeapi.RPCApi.ObservableId import net.corda.nodeapi.RPCApi.RPC_CLIENT_BINDING_REMOVALS +import net.corda.nodeapi.RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION import net.corda.nodeapi.RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX import net.corda.nodeapi.RPCApi.RPC_SERVER_QUEUE_NAME import net.corda.nodeapi.RPCApi.RpcRequestId @@ -151,7 +152,7 @@ object RPCApi { data class RpcReply( val id: RpcRequestId, - val result: ErrorOr + val result: Try ) : ServerToClient() { override fun writeToClientMessage(kryoPool: KryoPool, message: ClientMessage) { message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REPLY.ordinal) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt index 5f257894e4..c01b7e7fd9 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/BFTNotaryServiceTests.kt @@ -1,11 +1,9 @@ package net.corda.node.services import com.nhaarman.mockito_kotlin.whenever -import net.corda.core.ErrorOr import net.corda.core.contracts.ContractState import net.corda.core.contracts.StateRef import net.corda.core.contracts.TransactionType -import net.corda.testing.contracts.DummyContract import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.SecureHash import net.corda.core.div @@ -13,6 +11,7 @@ import net.corda.core.getOrThrow import net.corda.core.identity.Party import net.corda.core.node.services.ServiceInfo import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.Try import net.corda.flows.NotaryError import net.corda.flows.NotaryException import net.corda.flows.NotaryFlow @@ -23,10 +22,10 @@ import net.corda.node.services.transactions.minClusterSize import net.corda.node.services.transactions.minCorrectReplicas import net.corda.node.utilities.ServiceIdentityGenerator import net.corda.node.utilities.transaction +import net.corda.testing.contracts.DummyContract import net.corda.testing.node.MockNetwork import org.bouncycastle.asn1.x500.X500Name import org.junit.After -import org.junit.Ignore import org.junit.Test import java.nio.file.Files import kotlin.test.assertEquals @@ -95,10 +94,10 @@ class BFTNotaryServiceTests { val flows = spendTxs.map { NotaryFlow.Client(it) } val stateMachines = flows.map { services.startFlow(it) } mockNet.runNetwork() - val results = stateMachines.map { ErrorOr.catch { it.resultFuture.getOrThrow() } } + val results = stateMachines.map { Try.on { it.resultFuture.getOrThrow() } } val successfulIndex = results.mapIndexedNotNull { index, result -> - if (result.error == null) { - val signers = result.getOrThrow().map { it.by } + if (result is Try.Success) { + val signers = result.value.map { it.by } assertEquals(minCorrectReplicas(clusterSize), signers.size) signers.forEach { assertTrue(it in (notary.owningKey as CompositeKey).leafKeys) @@ -109,8 +108,8 @@ class BFTNotaryServiceTests { } }.single() spendTxs.zip(results).forEach { (tx, result) -> - if (result.error != null) { - val error = (result.error as NotaryException).error as NotaryError.Conflict + if (result is Try.Failure) { + val error = (result.exception as NotaryException).error as NotaryError.Conflict assertEquals(tx.id, error.txId) val (stateRef, consumingTx) = error.conflict.verified().stateHistory.entries.single() assertEquals(StateRef(issueTx.id, 0), stateRef) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 05c13f6341..71ee340dbb 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -12,15 +12,11 @@ import com.google.common.collect.HashMultimap import com.google.common.collect.Multimaps import com.google.common.collect.SetMultimap import com.google.common.util.concurrent.ThreadFactoryBuilder -import net.corda.core.ErrorOr -import net.corda.core.messaging.RPCOps import net.corda.core.crypto.random63BitValue +import net.corda.core.messaging.RPCOps import net.corda.core.seconds import net.corda.core.serialization.KryoPoolWithContext -import net.corda.core.utilities.LazyStickyPool -import net.corda.core.utilities.LifeCycle -import net.corda.core.utilities.debug -import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.* import net.corda.node.services.RPCUserService import net.corda.nodeapi.* import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER @@ -43,7 +39,6 @@ import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method import java.time.Duration import java.util.concurrent.* -import kotlin.collections.ArrayList data class RPCServerConfiguration( /** The number of threads to use for handling RPC requests */ @@ -270,14 +265,7 @@ class RPCServer( ) rpcExecutor!!.submit { val result = invokeRpc(rpcContext, clientToServer.methodName, clientToServer.arguments) - val resultWithExceptionUnwrapped = result.mapError { - if (it is InvocationTargetException) { - it.cause ?: RPCException("Caught InvocationTargetException without cause") - } else { - it - } - } - sendReply(clientToServer.id, clientToServer.clientAddress, resultWithExceptionUnwrapped) + sendReply(clientToServer.id, clientToServer.clientAddress, result) } } is RPCApi.ClientToServer.ObservablesClosed -> { @@ -287,25 +275,24 @@ class RPCServer( artemisMessage.acknowledge() } - private fun invokeRpc(rpcContext: RpcContext, methodName: String, arguments: List): ErrorOr { - return ErrorOr.catch { + private fun invokeRpc(rpcContext: RpcContext, methodName: String, arguments: List): Try { + return Try.on { try { CURRENT_RPC_CONTEXT.set(rpcContext) log.debug { "Calling $methodName" } val method = methodTable[methodName] ?: throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?") method.invoke(ops, *arguments.toTypedArray()) + } catch (e: InvocationTargetException) { + throw e.cause ?: RPCException("Caught InvocationTargetException without cause") } finally { CURRENT_RPC_CONTEXT.remove() } } } - private fun sendReply(requestId: RPCApi.RpcRequestId, clientAddress: SimpleString, resultWithExceptionUnwrapped: ErrorOr) { - val reply = RPCApi.ServerToClient.RpcReply( - id = requestId, - result = resultWithExceptionUnwrapped - ) + private fun sendReply(requestId: RPCApi.RpcRequestId, clientAddress: SimpleString, result: Try) { + val reply = RPCApi.ServerToClient.RpcReply(requestId, result) val observableContext = ObservableContext( requestId, observableMap, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 537597f833..b8b5825744 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -7,18 +7,14 @@ import co.paralleluniverse.strands.Strand import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture import net.corda.core.DeclaredField.Companion.declaredField -import net.corda.core.ErrorOr import net.corda.core.abbreviate import net.corda.core.crypto.SecureHash +import net.corda.core.crypto.random63BitValue import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.internal.FlowStateMachine -import net.corda.core.crypto.random63BitValue import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.ProgressTracker -import net.corda.core.utilities.UntrustworthyData -import net.corda.core.utilities.debug -import net.corda.core.utilities.trace +import net.corda.core.utilities.* import net.corda.node.services.api.FlowAppAuditEvent import net.corda.node.services.api.FlowPermissionAuditEvent import net.corda.node.services.api.ServiceHubInternal @@ -71,7 +67,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, @Transient override lateinit var serviceHub: ServiceHubInternal @Transient internal lateinit var database: Database @Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit - @Transient internal lateinit var actionOnEnd: (ErrorOr, Boolean) -> Unit + @Transient internal lateinit var actionOnEnd: (Try, Boolean) -> Unit @Transient internal var fromCheckpoint: Boolean = false @Transient private var txTrampoline: Transaction? = null @@ -125,7 +121,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, .filter { it.state is FlowSessionState.Initiating } .forEach { it.waitForConfirmation() } // This is to prevent actionOnEnd being called twice if it throws an exception - actionOnEnd(ErrorOr(result), false) + actionOnEnd(Try.Success(result), false) _resultFuture?.set(result) logic.progressTracker?.currentStep = ProgressTracker.DONE logger.debug { "Flow finished with result $result" } @@ -138,7 +134,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } private fun processException(exception: Throwable, propagated: Boolean) { - actionOnEnd(ErrorOr.of(exception), propagated) + actionOnEnd(Try.Failure(exception), propagated) _resultFuture?.setException(exception) logic.progressTracker?.endWithError(exception) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 53e8a460eb..27ce753cd3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -15,7 +15,8 @@ import com.google.common.collect.HashMultimap import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.MoreExecutors import io.requery.util.CloseableIterator -import net.corda.core.* +import net.corda.core.ThreadBox +import net.corda.core.bufferUntilSubscribed import net.corda.core.crypto.SecureHash import net.corda.core.crypto.random63BitValue import net.corda.core.flows.FlowException @@ -25,6 +26,8 @@ import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party import net.corda.core.messaging.DataFeed import net.corda.core.serialization.* +import net.corda.core.then +import net.corda.core.utilities.Try import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace @@ -122,7 +125,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, abstract val logic: FlowLogic<*> data class Add(override val logic: FlowLogic<*>) : Change() - data class Removed(override val logic: FlowLogic<*>, val result: ErrorOr<*>) : Change() + data class Removed(override val logic: FlowLogic<*>, val result: Try<*>) : Change() } // A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines @@ -442,13 +445,13 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, processIORequest(ioRequest) decrementLiveFibers() } - fiber.actionOnEnd = { resultOrError, propagated -> + fiber.actionOnEnd = { result, propagated -> try { mutex.locked { stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) } - notifyChangeObservers(Change.Removed(fiber.logic, resultOrError)) + notifyChangeObservers(Change.Removed(fiber.logic, result)) } - endAllFiberSessions(fiber, resultOrError.error, propagated) + endAllFiberSessions(fiber, result, propagated) } finally { fiber.commitTransaction() decrementLiveFibers() @@ -463,10 +466,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } } - private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, exception: Throwable?, propagated: Boolean) { + private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, result: Try<*>, propagated: Boolean) { openSessions.values.removeIf { session -> if (session.fiber == fiber) { - session.endSession(exception, propagated) + session.endSession((result as? Try.Failure)?.exception, propagated) true } else { false diff --git a/node/src/main/kotlin/net/corda/node/shell/FlowWatchPrintingSubscriber.kt b/node/src/main/kotlin/net/corda/node/shell/FlowWatchPrintingSubscriber.kt index fa6b477321..f249caa619 100644 --- a/node/src/main/kotlin/net/corda/node/shell/FlowWatchPrintingSubscriber.kt +++ b/node/src/main/kotlin/net/corda/node/shell/FlowWatchPrintingSubscriber.kt @@ -1,20 +1,22 @@ package net.corda.node.shell import com.google.common.util.concurrent.SettableFuture -import net.corda.core.ErrorOr import net.corda.core.crypto.commonName import net.corda.core.flows.FlowInitiator import net.corda.core.flows.StateMachineRunId import net.corda.core.messaging.StateMachineUpdate +import net.corda.core.messaging.StateMachineUpdate.Added +import net.corda.core.messaging.StateMachineUpdate.Removed import net.corda.core.then import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.Try import org.crsh.text.Color import org.crsh.text.Decoration import org.crsh.text.RenderPrintWriter import org.crsh.text.ui.LabelElement -import org.crsh.text.ui.TableElement import org.crsh.text.ui.Overflow import org.crsh.text.ui.RowElement +import org.crsh.text.ui.TableElement import rx.Subscriber class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Subscriber() { @@ -51,10 +53,10 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub future.setException(e) } - private fun stateColor(smmUpdate: StateMachineUpdate): Color { - return when(smmUpdate){ - is StateMachineUpdate.Added -> Color.blue - is StateMachineUpdate.Removed -> smmUpdate.result.match({ Color.green } , { Color.red }) + private fun stateColor(update: StateMachineUpdate): Color { + return when (update) { + is Added -> Color.blue + is Removed -> if (update.result.isSuccess) Color.green else Color.red } } @@ -68,7 +70,7 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub // TODO Add progress tracker? private fun createStateMachinesRow(smmUpdate: StateMachineUpdate) { when (smmUpdate) { - is StateMachineUpdate.Added -> { + is Added -> { table.add(RowElement().add( LabelElement(formatFlowId(smmUpdate.id)), LabelElement(formatFlowName(smmUpdate.stateMachineInfo.flowLogicClassName)), @@ -77,7 +79,7 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub ).style(stateColor(smmUpdate).fg())) indexMap[smmUpdate.id] = table.rows.size - 1 } - is StateMachineUpdate.Removed -> { + is Removed -> { val idx = indexMap[smmUpdate.id] if (idx != null) { val oldRow = table.rows[idx] @@ -114,7 +116,7 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub } } - private fun formatFlowResult(flowResult: ErrorOr<*>): String { + private fun formatFlowResult(flowResult: Try<*>): String { fun successFormat(value: Any?): String { return when(value) { is SignedTransaction -> "Tx ID: " + value.id.toString() @@ -123,6 +125,9 @@ class FlowWatchPrintingSubscriber(private val toStream: RenderPrintWriter) : Sub else -> value.toString() } } - return flowResult.match({ successFormat(it) }, { it.message ?: it.toString() }) + return when (flowResult) { + is Try.Success -> successFormat(flowResult.value) + is Try.Failure -> flowResult.exception.message ?: flowResult.exception.toString() + } } } diff --git a/node/src/main/kotlin/net/corda/node/utilities/AddOrRemove.kt b/node/src/main/kotlin/net/corda/node/utilities/AddOrRemove.kt index 0cf5538a95..155e7853d2 100644 --- a/node/src/main/kotlin/net/corda/node/utilities/AddOrRemove.kt +++ b/node/src/main/kotlin/net/corda/node/utilities/AddOrRemove.kt @@ -1,6 +1,5 @@ package net.corda.node.utilities -import net.corda.core.ErrorOr import net.corda.core.serialization.CordaSerializable /** diff --git a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt index c176242b41..809f3fd4b6 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -19,10 +19,7 @@ import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.NodeInfo import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.ServiceType -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.WHITESPACE -import net.corda.core.utilities.loggerFor -import net.corda.core.utilities.parseNetworkHostAndPort +import net.corda.core.utilities.* import net.corda.node.internal.Node import net.corda.node.internal.NodeStartup import net.corda.node.serialization.NodeClock @@ -352,15 +349,16 @@ fun poll( if (++counter == warnCount) { log.warn("Been polling $pollName for ${pollInterval.multipliedBy(warnCount.toLong()).seconds} seconds...") } - ErrorOr.catch(check).match(onValue = { - if (it != null) { - resultFuture.set(it) + try { + val checkResult = check() + if (checkResult != null) { + resultFuture.set(checkResult) } else { executorService.schedule(this, pollInterval.toMillis(), MILLISECONDS) } - }, onError = { - resultFuture.setException(it) - }) + } catch (t: Throwable) { + resultFuture.setException(t) + } } } executorService.submit(task) // The check may be expensive, so always run it in the background even the first time. @@ -389,7 +387,7 @@ class ShutdownManager(private val executorService: ExecutorService) { } fun shutdown() { - val shutdownFutures = state.locked { + val shutdownActionFutures = state.locked { if (isShutdown) { emptyList Unit>>() } else { @@ -397,21 +395,16 @@ class ShutdownManager(private val executorService: ExecutorService) { registeredShutdowns } } - val shutdowns = shutdownFutures.map { ErrorOr.catch { it.getOrThrow(1.seconds) } } - shutdowns.reversed().forEach { errorOrShutdown -> - errorOrShutdown.match( - onValue = { shutdown -> - try { - shutdown() - } catch (throwable: Throwable) { - log.error("Exception while shutting down", throwable) - } - }, - onError = { error -> - log.error("Exception while getting shutdown method, disregarding", error) - } - ) - } + val shutdowns = shutdownActionFutures.map { Try.on { it.getOrThrow(1.seconds) } } + shutdowns.reversed().forEach { when (it) { + is Try.Success -> + try { + it.value() + } catch (t: Throwable) { + log.warn("Exception while shutting down", t) + } + is Try.Failure -> log.warn("Exception while getting shutdown method, disregarding", it.exception) + } } } fun registerShutdown(shutdown: ListenableFuture<() -> Unit>) { diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt index 6753020d98..f2d90594ab 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt @@ -117,7 +117,7 @@ val crossCashTest = LoadTest( generate = { (nodeVaults), parallelism -> val nodeMap = simpleNodes.associateBy { it.info.legalIdentity } val anonymous = true - Generator.pickN(parallelism, simpleNodes).bind { nodes -> + Generator.pickN(parallelism, simpleNodes).flatMap { nodes -> Generator.sequence( nodes.map { node -> val quantities = nodeVaults[node.info.legalIdentity] ?: mapOf() diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/NotaryTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/NotaryTest.kt index b9440ee675..f684b08bb4 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/NotaryTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/NotaryTest.kt @@ -6,7 +6,6 @@ import net.corda.client.mock.pickOne import net.corda.client.mock.replicate import net.corda.contracts.asset.DUMMY_CASH_ISSUER import net.corda.contracts.asset.DUMMY_CASH_ISSUER_KEY -import net.corda.testing.contracts.DummyContract import net.corda.core.flows.FlowException import net.corda.core.messaging.startFlow import net.corda.core.thenMatch @@ -14,6 +13,7 @@ import net.corda.core.transactions.SignedTransaction import net.corda.flows.FinalityFlow import net.corda.loadtest.LoadTest import net.corda.loadtest.NodeConnection +import net.corda.testing.contracts.DummyContract import net.corda.testing.node.MockServices import org.slf4j.LoggerFactory @@ -25,7 +25,7 @@ val dummyNotarisationTest = LoadTest( "Notarising dummy transactions", generate = { _, _ -> val issuerServices = MockServices(DUMMY_CASH_ISSUER_KEY) - val generateTx = Generator.pickOne(simpleNodes).bind { node -> + val generateTx = Generator.pickOne(simpleNodes).flatMap { node -> Generator.int().map { val issueBuilder = DummyContract.generateInitial(it, notary.info.notaryIdentity, DUMMY_CASH_ISSUER) val issueTx = issuerServices.signInitialTransaction(issueBuilder) diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt index 0543cd83b4..61194d7ea8 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt @@ -37,12 +37,12 @@ val selfIssueTest = LoadTest( "Self issuing cash randomly", generate = { _, parallelism -> - val generateIssue = Generator.pickOne(simpleNodes).bind { node -> + val generateIssue = Generator.pickOne(simpleNodes).flatMap { node -> generateIssue(1000, USD, notary.info.notaryIdentity, listOf(node.info.legalIdentity), anonymous = true).map { SelfIssueCommand(it, node) } } - Generator.replicatePoisson(parallelism.toDouble(), generateIssue).bind { + Generator.replicatePoisson(parallelism.toDouble(), generateIssue).flatMap { // We need to generate at least one if (it.isEmpty()) { Generator.sequence(listOf(generateIssue)) diff --git a/verifier/src/integration-test/kotlin/net/corda/verifier/GeneratedLedger.kt b/verifier/src/integration-test/kotlin/net/corda/verifier/GeneratedLedger.kt index fafc621c2e..3c0ccd7460 100644 --- a/verifier/src/integration-test/kotlin/net/corda/verifier/GeneratedLedger.kt +++ b/verifier/src/integration-test/kotlin/net/corda/verifier/GeneratedLedger.kt @@ -58,7 +58,7 @@ data class GeneratedLedger( * Invariants: The input list must be empty. */ val issuanceGenerator: Generator> by lazy { - val outputsGen = outputsGenerator.bind { outputs -> + val outputsGen = outputsGenerator.flatMap { outputs -> Generator.sequence( outputs.map { output -> pickOneOrMaybeNew(identities, partyGenerator).map { notary -> @@ -140,7 +140,7 @@ data class GeneratedLedger( fun notaryChangeTransactionGenerator(inputNotary: Party, inputsToChooseFrom: List>): Generator> { val newNotaryGen = pickOneOrMaybeNew(identities - inputNotary, partyGenerator) val inputsGen = Generator.sampleBernoulli(inputsToChooseFrom) - return inputsGen.bind { inputs -> + return inputsGen.flatMap { inputs -> val signers: List = (inputs.flatMap { it.state.data.participants } + inputNotary).map { it.owningKey } val outputsGen = Generator.sequence(inputs.map { input -> newNotaryGen.map { TransactionState(input.state.data, it, null) } }) outputsGen.combine(attachmentsGenerator) { outputs, txAttachments -> @@ -177,7 +177,7 @@ data class GeneratedLedger( if (availableOutputs.isEmpty()) { issuanceGenerator } else { - Generator.pickOne(availableOutputs.keys.toList()).bind { inputNotary -> + Generator.pickOne(availableOutputs.keys.toList()).flatMap { inputNotary -> val inputsToChooseFrom = availableOutputs[inputNotary]!! Generator.frequency( 0.3 to issuanceGenerator, @@ -231,7 +231,7 @@ fun pickOneOrMaybeNew(from: Collection, generator: Generator): Generat if (from.isEmpty()) { return generator } else { - return generator.bind { + return generator.flatMap { Generator.pickOne(from + it) } } diff --git a/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt b/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt index ea54765408..b8df7f891b 100644 --- a/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt +++ b/verifier/src/main/kotlin/net/corda/verifier/Verifier.kt @@ -3,8 +3,6 @@ package net.corda.verifier import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions -import net.corda.core.ErrorOr -import net.corda.nodeapi.internal.addShutdownHook import net.corda.core.div import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.debug @@ -15,6 +13,7 @@ import net.corda.nodeapi.VerifierApi import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME import net.corda.nodeapi.config.NodeSSLConfiguration import net.corda.nodeapi.config.getValue +import net.corda.nodeapi.internal.addShutdownHook import org.apache.activemq.artemis.api.core.client.ActiveMQClient import java.nio.file.Path import java.nio.file.Paths @@ -61,14 +60,15 @@ class Verifier { consumer.setMessageHandler { val request = VerifierApi.VerificationRequest.fromClientMessage(it) log.debug { "Received verification request with id ${request.verificationId}" } - val result = ErrorOr.catch { + val error = try { request.transaction.verify() - } - if (result.error != null) { - log.debug { "Verification returned with error ${result.error}" } + null + } catch (t: Throwable) { + log.debug("Verification returned with error:", t) + t } val reply = session.createMessage(false) - val response = VerifierApi.VerificationResponse(request.verificationId, result.error) + val response = VerifierApi.VerificationResponse(request.verificationId, error) response.writeToClientMessage(reply) replyProducer.send(request.responseAddress, reply) it.acknowledge()