From e75732af91434b0acece3301b41b55f3d0cf82c9 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Tue, 9 May 2017 17:01:13 +0100 Subject: [PATCH] Making sure non-serialisable objects in FlowException do not interfere with the flow session (#651) Also TransactionVerificationException no longer has reference to non-serialisable LedgerTransaction --- .../corda/client/rpc/CordaRPCClientTest.kt | 7 ++-- .../corda/core/contracts/TransactionTypes.kt | 22 +++++++------ .../core/contracts/TransactionVerification.kt | 19 +++++------ .../core/transactions/LedgerTransaction.kt | 2 ++ .../kotlin/net/corda/nodeapi/RPCStructures.kt | 19 ----------- .../statemachine/StateMachineManager.kt | 19 +++++++++-- .../statemachine/FlowFrameworkTests.kt | 33 +++++++++++++++---- .../main/kotlin/net/corda/testing/TestDSL.kt | 2 +- 8 files changed, 70 insertions(+), 53 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index d6d0f2719b..72f3ef6316 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -1,7 +1,6 @@ package net.corda.client.rpc import net.corda.core.contracts.DOLLARS -import net.corda.core.flows.FlowException import net.corda.core.flows.FlowInitiator import net.corda.core.getOrThrow import net.corda.core.messaging.* @@ -9,6 +8,7 @@ import net.corda.core.node.services.ServiceInfo import net.corda.core.random63BitValue import net.corda.core.serialization.OpaqueBytes import net.corda.core.utilities.ALICE +import net.corda.flows.CashException import net.corda.flows.CashIssueFlow import net.corda.flows.CashPaymentFlow import net.corda.node.internal.Node @@ -86,11 +86,10 @@ class CordaRPCClientTest : NodeBasedTest() { } @Test - fun `FlowException thrown by flow`() { + fun `sub-type of FlowException thrown by flow`() { login(rpcUser.username, rpcUser.password) val handle = connection!!.proxy.startFlow(::CashPaymentFlow, 100.DOLLARS, node.info.legalIdentity) - // TODO Restrict this to CashException once RPC serialisation has been fixed - assertThatExceptionOfType(FlowException::class.java).isThrownBy { + assertThatExceptionOfType(CashException::class.java).isThrownBy { handle.returnValue.getOrThrow() } } diff --git a/core/src/main/kotlin/net/corda/core/contracts/TransactionTypes.kt b/core/src/main/kotlin/net/corda/core/contracts/TransactionTypes.kt index 00c6099f4f..7e3f42bdab 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/TransactionTypes.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/TransactionTypes.kt @@ -20,16 +20,16 @@ sealed class TransactionType { fun verify(tx: LedgerTransaction) { require(tx.notary != null || tx.timestamp == null) { "Transactions with timestamps must be notarised." } val duplicates = detectDuplicateInputs(tx) - if (duplicates.isNotEmpty()) throw TransactionVerificationException.DuplicateInputStates(tx, duplicates) + if (duplicates.isNotEmpty()) throw TransactionVerificationException.DuplicateInputStates(tx.id, duplicates) val missing = verifySigners(tx) - if (missing.isNotEmpty()) throw TransactionVerificationException.SignersMissing(tx, missing.toList()) + if (missing.isNotEmpty()) throw TransactionVerificationException.SignersMissing(tx.id, missing.toList()) verifyTransaction(tx) } /** Check that the list of signers includes all the necessary keys */ fun verifySigners(tx: LedgerTransaction): Set { val notaryKey = tx.inputs.map { it.state.notary.owningKey }.toSet() - if (notaryKey.size > 1) throw TransactionVerificationException.MoreThanOneNotary(tx) + if (notaryKey.size > 1) throw TransactionVerificationException.MoreThanOneNotary(tx.id) val requiredKeys = getRequiredSigners(tx) + notaryKey val missing = requiredKeys - tx.mustSign @@ -81,7 +81,7 @@ sealed class TransactionType { if (tx.notary != null && tx.inputs.isNotEmpty()) { tx.outputs.forEach { if (it.notary != tx.notary) { - throw TransactionVerificationException.NotaryChangeInWrongTransactionType(tx, it.notary) + throw TransactionVerificationException.NotaryChangeInWrongTransactionType(tx.id, tx.notary, it.notary) } } } @@ -90,13 +90,14 @@ sealed class TransactionType { private fun verifyEncumbrances(tx: LedgerTransaction) { // Validate that all encumbrances exist within the set of input states. val encumberedInputs = tx.inputs.filter { it.state.encumbrance != null } - encumberedInputs.forEach { encumberedInput -> + encumberedInputs.forEach { (state, ref) -> val encumbranceStateExists = tx.inputs.any { - it.ref.txhash == encumberedInput.ref.txhash && it.ref.index == encumberedInput.state.encumbrance + it.ref.txhash == ref.txhash && it.ref.index == state.encumbrance } if (!encumbranceStateExists) { throw TransactionVerificationException.TransactionMissingEncumbranceException( - tx, encumberedInput.state.encumbrance!!, + tx.id, + state.encumbrance!!, TransactionVerificationException.Direction.INPUT ) } @@ -108,7 +109,8 @@ sealed class TransactionType { val encumbranceIndex = output.encumbrance ?: continue if (encumbranceIndex == i || encumbranceIndex >= tx.outputs.size) { throw TransactionVerificationException.TransactionMissingEncumbranceException( - tx, encumbranceIndex, + tx.id, + encumbranceIndex, TransactionVerificationException.Direction.OUTPUT) } } @@ -126,7 +128,7 @@ sealed class TransactionType { try { contract.verify(ctx) } catch(e: Throwable) { - throw TransactionVerificationException.ContractRejection(tx, contract, e) + throw TransactionVerificationException.ContractRejection(tx.id, contract, e) } } } @@ -164,7 +166,7 @@ sealed class TransactionType { } check(tx.commands.isEmpty()) } catch (e: IllegalStateException) { - throw TransactionVerificationException.InvalidNotaryChange(tx) + throw TransactionVerificationException.InvalidNotaryChange(tx.id) } } diff --git a/core/src/main/kotlin/net/corda/core/contracts/TransactionVerification.kt b/core/src/main/kotlin/net/corda/core/contracts/TransactionVerification.kt index 86ca7282a3..0a9ecd1edd 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/TransactionVerification.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/TransactionVerification.kt @@ -4,7 +4,6 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowException import net.corda.core.serialization.CordaSerializable -import net.corda.core.transactions.LedgerTransaction import java.security.PublicKey import java.util.* @@ -95,25 +94,25 @@ class AttachmentResolutionException(val hash: SecureHash) : FlowException() { override fun toString(): String = "Attachment resolution failure for $hash" } -sealed class TransactionVerificationException(val tx: LedgerTransaction, cause: Throwable?) : FlowException(cause) { - class ContractRejection(tx: LedgerTransaction, val contract: Contract, cause: Throwable?) : TransactionVerificationException(tx, cause) - class MoreThanOneNotary(tx: LedgerTransaction) : TransactionVerificationException(tx, null) - class SignersMissing(tx: LedgerTransaction, val missing: List) : TransactionVerificationException(tx, null) { +sealed class TransactionVerificationException(val txId: SecureHash, cause: Throwable?) : FlowException(cause) { + class ContractRejection(txId: SecureHash, val contract: Contract, cause: Throwable?) : TransactionVerificationException(txId, cause) + class MoreThanOneNotary(txId: SecureHash) : TransactionVerificationException(txId, null) + class SignersMissing(txId: SecureHash, val missing: List) : TransactionVerificationException(txId, null) { override fun toString(): String = "Signers missing: ${missing.joinToString()}" } - class DuplicateInputStates(tx: LedgerTransaction, val duplicates: Set) : TransactionVerificationException(tx, null) { + class DuplicateInputStates(txId: SecureHash, val duplicates: Set) : TransactionVerificationException(txId, null) { override fun toString(): String = "Duplicate inputs: ${duplicates.joinToString()}" } - class InvalidNotaryChange(tx: LedgerTransaction) : TransactionVerificationException(tx, null) - class NotaryChangeInWrongTransactionType(tx: LedgerTransaction, val outputNotary: Party) : TransactionVerificationException(tx, null) { + class InvalidNotaryChange(txId: SecureHash) : TransactionVerificationException(txId, null) + class NotaryChangeInWrongTransactionType(txId: SecureHash, val txNotary: Party, val outputNotary: Party) : TransactionVerificationException(txId, null) { override fun toString(): String { - return "Found unexpected notary change in transaction. Tx notary: ${tx.notary}, found: $outputNotary" + return "Found unexpected notary change in transaction. Tx notary: $txNotary, found: $outputNotary" } } - class TransactionMissingEncumbranceException(tx: LedgerTransaction, val missing: Int, val inOut: Direction) : TransactionVerificationException(tx, null) { + class TransactionMissingEncumbranceException(txId: SecureHash, val missing: Int, val inOut: Direction) : TransactionVerificationException(txId, null) { override val message: String get() = "Missing required encumbrance $missing in $inOut" } diff --git a/core/src/main/kotlin/net/corda/core/transactions/LedgerTransaction.kt b/core/src/main/kotlin/net/corda/core/transactions/LedgerTransaction.kt index 2d7dae9ec5..60f09ec6f9 100644 --- a/core/src/main/kotlin/net/corda/core/transactions/LedgerTransaction.kt +++ b/core/src/main/kotlin/net/corda/core/transactions/LedgerTransaction.kt @@ -17,6 +17,8 @@ import java.security.PublicKey * * All the above refer to inputs using a (txhash, output index) pair. */ +// TODO LedgerTransaction is not supposed to be serialisable as it references attachments, etc. The verification logic +// currently sends this across to out-of-process verifiers. We'll need to change that first. @CordaSerializable class LedgerTransaction( /** The resolved input states which will be consumed/invalidated by the execution of this transaction. */ diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt b/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt index 329155c6e5..d7d70a5428 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/RPCStructures.kt @@ -5,15 +5,12 @@ package net.corda.nodeapi import com.esotericsoftware.kryo.Registration import com.esotericsoftware.kryo.Serializer import com.google.common.util.concurrent.ListenableFuture -import net.corda.core.flows.FlowException import net.corda.core.serialization.* import net.corda.core.toFuture import net.corda.core.toObservable import net.corda.nodeapi.config.OldConfig import rx.Observable import java.io.InputStream -import java.io.PrintWriter -import java.io.StringWriter data class User( @OldConfig("user") @@ -55,20 +52,6 @@ class RPCKryo(observableSerializer: Serializer>) : CordaKryo(mak read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java as Class>).toFuture() }, write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) } ) - register( - FlowException::class, - read = { kryo, input -> - val message = input.readString() - val cause = kryo.readObjectOrNull(input, Throwable::class.java) - FlowException(message, cause) - }, - write = { kryo, output, obj -> - // The subclass may have overridden toString so we use that - val message = if (obj.javaClass != FlowException::class.java) obj.toString() else obj.message - output.writeString(message) - kryo.writeObjectOrNull(output, obj.cause, Throwable::class.java) - } - ) } override fun getRegistration(type: Class<*>): Registration { @@ -81,8 +64,6 @@ class RPCKryo(observableSerializer: Serializer>) : CordaKryo(mak if (ListenableFuture::class.java != type && ListenableFuture::class.java.isAssignableFrom(type)) { return super.getRegistration(ListenableFuture::class.java) } - if (FlowException::class.java.isAssignableFrom(type)) - return super.getRegistration(FlowException::class.java) return super.getRegistration(type) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 56345260f5..26f36d3776 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -6,6 +6,7 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer import co.paralleluniverse.strands.Strand import com.codahale.metrics.Gauge import com.esotericsoftware.kryo.Kryo +import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output @@ -16,7 +17,6 @@ import io.requery.util.CloseableIterator import net.corda.core.* import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash -import net.corda.core.crypto.commonName import net.corda.core.flows.* import net.corda.core.serialization.* import net.corda.core.utilities.debug @@ -28,7 +28,6 @@ import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.messaging.TopicSession -import net.corda.node.services.messaging.send import net.corda.node.utilities.* import org.apache.activemq.artemis.utils.ReusableLatch import org.jetbrains.exposed.sql.Database @@ -573,6 +572,20 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, val address = serviceHub.networkService.getAddressOfParty(partyInfo) val logger = fiber?.logger ?: logger logger.trace { "Sending $message to party $party @ $address" + if (retryId != null) " with retry $retryId" else "" } - serviceHub.networkService.send(sessionTopic, message, address, retryId = retryId) + + val serialized = try { + message.serialize() + } catch (e: KryoException) { + if (message !is ErrorSessionEnd || message.errorResponse == null) throw e + logger.warn("Something in ${message.errorResponse.javaClass.name} is not serialisable. " + + "Instead sending back an exception which is serialisable to ensure session end occurs properly.", e) + // The subclass may have overridden toString so we use that + val exMessage = message.errorResponse.let { if (it.javaClass != FlowException::class.java) it.toString() else it.message } + message.copy(errorResponse = FlowException(exMessage)).serialize() + } + + serviceHub.networkService.apply { + send(createMessage(sessionTopic, serialized.bytes), address, retryId = retryId) + } } } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index b93a025c55..a9c0760b35 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -33,16 +33,12 @@ import net.corda.flows.NotaryFlow import net.corda.node.services.persistence.checkpoints import net.corda.node.services.transactions.ValidatingNotaryService import net.corda.node.utilities.transaction -import net.corda.testing.expect -import net.corda.testing.expectEvents -import net.corda.testing.getTestX509Name -import net.corda.testing.initiateSingleShotFlow +import net.corda.testing.* import net.corda.testing.node.InMemoryMessagingNetwork import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNetwork.MockNode -import net.corda.testing.sequence import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType @@ -538,6 +534,28 @@ class FlowFrameworkTests { assertThat(resultFuture.getOrThrow()).isEqualTo("Hello") } + @Test + fun `serialisation issue in counterparty`() { + node2.registerServiceFlow(ReceiveFlow::class) { SendFlow(NonSerialisableData(1), it) } + val result = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)).resultFuture + net.runNetwork() + assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy { + result.getOrThrow() + } + } + + @Test + fun `FlowException has non-serialisable object`() { + node2.initiateSingleShotFlow(ReceiveFlow::class) { + ExceptionFlow { NonSerialisableFlowException(NonSerialisableData(1)) } + } + val result = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)).resultFuture + net.runNetwork() + assertThatExceptionOfType(FlowException::class.java).isThrownBy { + result.getOrThrow() + } + } + @Test fun `wait for transaction`() { val ptx = TransactionBuilder(notary = notary1.info.notaryIdentity) @@ -704,7 +722,7 @@ class FlowFrameworkTests { @InitiatingFlow - private open class SendFlow(val payload: String, vararg val otherParties: Party) : FlowLogic() { + private open class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic() { init { require(otherParties.isNotEmpty()) } @@ -835,5 +853,8 @@ class FlowFrameworkTests { } } + private data class NonSerialisableData(val a: Int) + private class NonSerialisableFlowException(val data: NonSerialisableData) : FlowException() + //endregion Helpers } diff --git a/test-utils/src/main/kotlin/net/corda/testing/TestDSL.kt b/test-utils/src/main/kotlin/net/corda/testing/TestDSL.kt index 393ed62eaa..ce64f8a97d 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/TestDSL.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/TestDSL.kt @@ -297,7 +297,7 @@ data class TestLedgerDSLInterpreter private constructor( } return EnforceVerifyOrFail.Token } catch (exception: TransactionVerificationException) { - val transactionWithLocation = transactionWithLocations[exception.tx.id] + val transactionWithLocation = transactionWithLocations[exception.txId] val transactionName = transactionWithLocation?.label ?: transactionWithLocation?.location ?: "" throw VerifiesFailed(transactionName, exception) }