From 646ce8afe074a54923242b1dd52dca16a499a009 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Thu, 19 Jan 2017 12:00:14 +0000 Subject: [PATCH] FlowException thrown by a flow is propagated to all counterparties --- .../net/corda/core/contracts/FungibleAsset.kt | 3 +- .../net/corda/core/flows/FlowException.kt | 13 ++ .../kotlin/net/corda/core/flows/FlowLogic.kt | 12 +- .../net/corda/core/flows/FlowStateMachine.kt | 2 - .../net/corda/core/serialization/Kryo.kt | 6 +- .../corda/core/utilities/UntrustworthyData.kt | 3 + .../flows/AbstractStateReplacementFlow.kt | 123 ++++++--------- .../kotlin/net/corda/flows/FetchDataFlow.kt | 41 +++-- .../kotlin/net/corda/flows/FinalityFlow.kt | 1 + .../net/corda/flows/NotaryChangeFlow.kt | 63 ++++---- .../main/kotlin/net/corda/flows/NotaryFlow.kt | 60 +++----- .../corda/flows/ResolveTransactionsFlow.kt | 9 +- .../corda/docs/IntegrationTestingTutorial.kt | 4 +- docs/source/flow-state-machines.rst | 104 ++++++------- .../corda/contracts/asset/OnLedgerAsset.kt | 17 ++- .../clause/AbstractConserveAmount.kt | 1 + .../main/kotlin/net/corda/flows/CashFlow.kt | 143 ++++++++---------- .../main/kotlin/net/corda/flows/IssuerFlow.kt | 84 ++++------ .../net/corda/flows/TwoPartyTradeFlow.kt | 61 ++++---- .../kotlin/net/corda/flows/IssuerFlowTest.kt | 6 +- .../node/services/DistributedServiceTests.kt | 7 +- .../node/services/messaging/RPCStructures.kt | 3 - .../persistence/DataVendingService.kt | 58 +++---- .../statemachine/FlowStateMachineImpl.kt | 55 ++++--- .../services/statemachine/SessionMessage.kt | 8 +- .../statemachine/StateMachineManager.kt | 90 +++++++---- .../corda/node/services/NotaryChangeTests.kt | 9 +- .../statemachine/StateMachineManagerTests.kt | 128 ++++++++++++++-- .../kotlin/net/corda/vega/flows/SimmFlow.kt | 20 ++- .../net/corda/vega/flows/StateRevisionFlow.kt | 32 ++-- .../views/cordapps/cash/NewTransaction.kt | 40 ++--- .../net/corda/loadtest/tests/CrossCashTest.kt | 17 +-- .../net/corda/loadtest/tests/SelfIssueTest.kt | 17 +-- 33 files changed, 634 insertions(+), 606 deletions(-) create mode 100644 core/src/main/kotlin/net/corda/core/flows/FlowException.kt diff --git a/core/src/main/kotlin/net/corda/core/contracts/FungibleAsset.kt b/core/src/main/kotlin/net/corda/core/contracts/FungibleAsset.kt index 34b618750e..50651b1ea0 100644 --- a/core/src/main/kotlin/net/corda/core/contracts/FungibleAsset.kt +++ b/core/src/main/kotlin/net/corda/core/contracts/FungibleAsset.kt @@ -1,8 +1,9 @@ package net.corda.core.contracts import net.corda.core.crypto.CompositeKey +import net.corda.core.flows.FlowException -class InsufficientBalanceException(val amountMissing: Amount<*>) : Exception() { +class InsufficientBalanceException(val amountMissing: Amount<*>) : FlowException() { override fun toString() = "Insufficient balance, missing $amountMissing" } diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowException.kt b/core/src/main/kotlin/net/corda/core/flows/FlowException.kt new file mode 100644 index 0000000000..fc8014f4d4 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/flows/FlowException.kt @@ -0,0 +1,13 @@ +package net.corda.core.flows + +/** + * Exception which can be thrown by a [FlowLogic] at any point in its logic to unexpectedly bring it to a permanent end. + * The exception will propagate to all counterparty flows and will be thrown on their end the next time they wait on a + * [FlowLogic.receive] or [FlowLogic.sendAndReceive]. Any flow which no longer needs to do a receive, or has already ended, + * will not receive the exception (if this is required then have them wait for a confirmation message). + * + * [FlowException] (or a subclass) can be a valid expected response from a flow, particularly ones which act as a service. + * It is recommended a [FlowLogic] document the [FlowException] types it can throw. + */ +open class FlowException @JvmOverloads constructor(message: String? = null, cause: Throwable? = null) + : Exception(message, cause) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 0f7bf0a58c..519a1c0149 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -73,7 +73,7 @@ abstract class FlowLogic { * @returns an [UntrustworthyData] wrapper around the received object. */ @Suspendable - open fun sendAndReceive(receiveType: Class, otherParty: Party, payload: Any): UntrustworthyData { + open fun sendAndReceive(receiveType: Class, otherParty: Party, payload: Any): UntrustworthyData { return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow) } @@ -92,9 +92,11 @@ abstract class FlowLogic { * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly * corrupted data in order to exploit your code. + * + * @returns an [UntrustworthyData] wrapper around the received object. */ @Suspendable - open fun receive(receiveType: Class, otherParty: Party): UntrustworthyData { + open fun receive(receiveType: Class, otherParty: Party): UntrustworthyData { return stateMachine.receive(receiveType, otherParty, sessionFlow) } @@ -116,11 +118,15 @@ abstract class FlowLogic { * @param shareParentSessions In certain situations the need arises to use the same sessions the parent flow has * already established. However this also prevents the subflow from creating new sessions with those parties. * For this reason the default value is false. + * + * @throws FlowException This is either thrown by [subLogic] itself or propagated from any of the remote + * [FlowLogic]s it communicated with. A subflow retry can be done by catching this exception. */ // TODO Rethink the default value for shareParentSessions // TODO shareParentSessions is a bit too low-level and perhaps can be expresed in a better way @Suspendable @JvmOverloads + @Throws(FlowException::class) open fun subFlow(subLogic: FlowLogic, shareParentSessions: Boolean = false): R { subLogic.stateMachine = stateMachine maybeWireUpProgressTracking(subLogic) @@ -149,6 +155,7 @@ abstract class FlowLogic { * helpful if this flow is meant to be used as a subflow. */ @Suspendable + @Throws(FlowException::class) abstract fun call(): T /** @@ -181,7 +188,6 @@ abstract class FlowLogic { private fun maybeWireUpProgressTracking(subLogic: FlowLogic<*>) { val ours = progressTracker - val theirs = subLogic.progressTracker if (ours != null && theirs != null) { if (ours.currentStep == ProgressTracker.UNSTARTED) { diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt index fa290bfab8..25aa753a9c 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt @@ -40,5 +40,3 @@ interface FlowStateMachine { val id: StateMachineRunId val resultFuture: ListenableFuture } - -class FlowException(message: String) : RuntimeException(message) diff --git a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt index 0b5c53b6f4..d02e0b35bc 100644 --- a/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt +++ b/core/src/main/kotlin/net/corda/core/serialization/Kryo.kt @@ -11,6 +11,7 @@ import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.serializers.JavaSerializer import com.esotericsoftware.kryo.serializers.MapSerializer import de.javakaffee.kryoserializers.ArraysAsListSerializer +import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer import de.javakaffee.kryoserializers.guava.* import net.corda.core.contracts.* import net.corda.core.crypto.* @@ -402,12 +403,10 @@ fun createKryo(k: Kryo = Kryo()): Kryo { // serialise the Kryo object itself when suspending a fiber. That's dumb, useless AND can cause crashes, so // we avoid it here. register(Kryo::class.java, object : Serializer() { - override fun write(kryo: Kryo, output: Output, obj: Kryo) { - } - override fun read(kryo: Kryo, input: Input, type: Class): Kryo { return createKryo((Fiber.getFiberSerializer() as KryoSerializer).kryo) } + override fun write(kryo: Kryo, output: Output, obj: Kryo) {} }) register(EdDSAPublicKey::class.java, Ed25519PublicKeySerializer) @@ -441,6 +440,7 @@ fun createKryo(k: Kryo = Kryo()): Kryo { addDefaultSerializer(BufferedInputStream::class.java, InputStreamSerializer) + UnmodifiableCollectionsSerializer.registerSerializers(k) ImmutableListSerializer.registerSerializers(k) ImmutableSetSerializer.registerSerializers(k) ImmutableSortedSetSerializer.registerSerializers(k) diff --git a/core/src/main/kotlin/net/corda/core/utilities/UntrustworthyData.kt b/core/src/main/kotlin/net/corda/core/utilities/UntrustworthyData.kt index 2093412c0f..b220ea20ca 100644 --- a/core/src/main/kotlin/net/corda/core/utilities/UntrustworthyData.kt +++ b/core/src/main/kotlin/net/corda/core/utilities/UntrustworthyData.kt @@ -1,5 +1,7 @@ package net.corda.core.utilities +import net.corda.core.flows.FlowException + /** * A small utility to approximate taint tracking: if a method gives you back one of these, it means the data came from * a remote source that may be incentivised to pass us junk that violates basic assumptions and thus must be checked @@ -17,6 +19,7 @@ class UntrustworthyData(private val fromUntrustedWorld: T) { get() = fromUntrustedWorld @Suppress("DEPRECATION") + @Throws(FlowException::class) inline fun unwrap(validator: (T) -> R) = validator(data) @Suppress("DEPRECATION") diff --git a/core/src/main/kotlin/net/corda/flows/AbstractStateReplacementFlow.kt b/core/src/main/kotlin/net/corda/flows/AbstractStateReplacementFlow.kt index 0e353d10c6..a05f15dd1f 100644 --- a/core/src/main/kotlin/net/corda/flows/AbstractStateReplacementFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/AbstractStateReplacementFlow.kt @@ -8,6 +8,7 @@ import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.DigitalSignature import net.corda.core.crypto.Party import net.corda.core.crypto.signWithECDSA +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.node.recordTransactions import net.corda.core.transactions.SignedTransaction @@ -27,26 +28,22 @@ import net.corda.flows.AbstractStateReplacementFlow.Instigator * Finally, [Instigator] sends the transaction containing all signatures back to each participant so they can record it and * use the new updated state for future transactions. */ -abstract class AbstractStateReplacementFlow { - interface Proposal { - val stateRef: StateRef - val modification: T - val stx: SignedTransaction - } +abstract class AbstractStateReplacementFlow { + data class Proposal(val stateRef: StateRef, val modification: T, val stx: SignedTransaction) - abstract class Instigator(val originalState: StateAndRef, - val modification: T, - override val progressTracker: ProgressTracker = tracker()) : FlowLogic>() { + abstract class Instigator( + val originalState: StateAndRef, + val modification: T, + override val progressTracker: ProgressTracker = tracker()) : FlowLogic>() { companion object { - object SIGNING : ProgressTracker.Step("Requesting signatures from other parties") - object NOTARY : ProgressTracker.Step("Requesting notary signature") fun tracker() = ProgressTracker(SIGNING, NOTARY) } @Suspendable + @Throws(StateReplacementException::class) override fun call(): StateAndRef { val (stx, participants) = assembleTx() @@ -66,7 +63,6 @@ abstract class AbstractStateReplacementFlow { return finalTx.tx.outRef(0) } - abstract protected fun assembleProposal(stateRef: StateRef, modification: T, stx: SignedTransaction): Proposal abstract protected fun assembleTx(): Pair> @Suspendable @@ -89,59 +85,55 @@ abstract class AbstractStateReplacementFlow { @Suspendable private fun getParticipantSignature(party: Party, stx: SignedTransaction): DigitalSignature.WithKey { - val proposal = assembleProposal(originalState.ref, modification, stx) - - val response = sendAndReceive(party, proposal) - val participantSignature = response.unwrap { - if (it.sig == null) throw StateReplacementException(it.error!!) - else { - check(party.owningKey.isFulfilledBy(it.sig.by)) { "Not signed by the required participant" } - it.sig.verifyWithECDSA(stx.id) - it.sig - } + val proposal = Proposal(originalState.ref, modification, stx) + val response = sendAndReceive(party, proposal) + return response.unwrap { + check(party.owningKey.isFulfilledBy(it.by)) { "Not signed by the required participant" } + it.verifyWithECDSA(stx.id) + it } - - return participantSignature } @Suspendable private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.WithKey { progressTracker.currentStep = NOTARY - return subFlow(NotaryFlow.Client(stx)) + try { + return subFlow(NotaryFlow.Client(stx)) + } catch (e: NotaryException) { + throw StateReplacementException("Unable to notarise state change", e) + } } } - abstract class Acceptor(val otherSide: Party, - override val progressTracker: ProgressTracker = tracker()) : FlowLogic() { - + abstract class Acceptor(val otherSide: Party, + override val progressTracker: ProgressTracker = tracker()) : FlowLogic() { companion object { object VERIFYING : ProgressTracker.Step("Verifying state replacement proposal") - object APPROVING : ProgressTracker.Step("State replacement approved") - object REJECTING : ProgressTracker.Step("State replacement rejected") - - fun tracker() = ProgressTracker(VERIFYING, APPROVING, REJECTING) + fun tracker() = ProgressTracker(VERIFYING, APPROVING) } @Suspendable + @Throws(StateReplacementException::class) override fun call() { progressTracker.currentStep = VERIFYING val maybeProposal: UntrustworthyData> = receive(otherSide) - try { - val stx: SignedTransaction = maybeProposal.unwrap { verifyProposal(maybeProposal).stx } - verifyTx(stx) - approve(stx) - } catch(e: Exception) { - // TODO: catch only specific exceptions. However, there are numerous validation exceptions - // that might occur (tx validation/resolution, invalid proposal). Need to rethink how - // we manage exceptions and maybe introduce some platform exception hierarchy - val myIdentity = serviceHub.myInfo.legalIdentity - val state = maybeProposal.unwrap { it.stateRef } - val reason = StateReplacementRefused(myIdentity, state, e.message) - - reject(reason) + val stx: SignedTransaction = maybeProposal.unwrap { + verifyProposal(it) + verifyTx(it.stx) + it.stx } + approve(stx) + } + + @Suspendable + private fun verifyTx(stx: SignedTransaction) { + checkMySignatureRequired(stx.tx) + checkDependenciesValid(stx) + // We expect stx to have insufficient signatures, so we convert the WireTransaction to the LedgerTransaction + // here, thus bypassing the sufficient-signatures check. + stx.tx.toLedgerTransaction(serviceHub).verify() } @Suspendable @@ -149,8 +141,7 @@ abstract class AbstractStateReplacementFlow { progressTracker.currentStep = APPROVING val mySignature = sign(stx) - val response = Result.noError(mySignature) - val swapSignatures = sendAndReceive>(otherSide, response) + val swapSignatures = sendAndReceive>(otherSide, mySignature) // TODO: This step should not be necessary, as signatures are re-checked in verifySignatures. val allSignatures = swapSignatures.unwrap { signatures -> @@ -163,28 +154,13 @@ abstract class AbstractStateReplacementFlow { serviceHub.recordTransactions(finalTx) } - @Suspendable - private fun reject(e: StateReplacementRefused) { - progressTracker.currentStep = REJECTING - val response = Result.withError(e) - send(otherSide, response) - } - /** * Check the state change proposal to confirm that it's acceptable to this node. Rules for verification depend * on the change proposed, and may further depend on the node itself (for example configuration). The - * proposal is returned if acceptable, otherwise an exception is thrown. + * proposal is returned if acceptable, otherwise a [StateReplacementException] is thrown. */ - abstract protected fun verifyProposal(maybeProposal: UntrustworthyData>): Proposal - - @Suspendable - private fun verifyTx(stx: SignedTransaction) { - checkMySignatureRequired(stx.tx) - checkDependenciesValid(stx) - // We expect stx to have insufficient signatures, so we convert the WireTransaction to the LedgerTransaction - // here, thus bypassing the sufficient-signatures check. - stx.tx.toLedgerTransaction(serviceHub).verify() - } + @Throws(StateReplacementException::class) + abstract protected fun verifyProposal(proposal: Proposal) private fun checkMySignatureRequired(tx: WireTransaction) { // TODO: use keys from the keyManagementService instead @@ -202,20 +178,7 @@ abstract class AbstractStateReplacementFlow { return myKey.signWithECDSA(stx.id) } } - - // TODO: similar classes occur in other places (NotaryFlow), need to consolidate - data class Result private constructor(val sig: DigitalSignature.WithKey?, val error: StateReplacementRefused?) { - companion object { - fun withError(error: StateReplacementRefused) = Result(null, error) - fun noError(sig: DigitalSignature.WithKey) = Result(sig, null) - } - } } - -/** Thrown when a participant refuses the proposed state replacement */ -class StateReplacementRefused(val identity: Party, val state: StateRef, val detail: String?) { - override fun toString() = "A participant $identity refused to change state $state: " + (detail ?: "no reason provided") -} - -class StateReplacementException(val error: StateReplacementRefused) : Exception("State change failed - $error") +open class StateReplacementException @JvmOverloads constructor(message: String? = null, cause: Throwable? = null) + : FlowException(message, cause) diff --git a/core/src/main/kotlin/net/corda/flows/FetchDataFlow.kt b/core/src/main/kotlin/net/corda/flows/FetchDataFlow.kt index 63c4e2f1fa..0be2b1a081 100644 --- a/core/src/main/kotlin/net/corda/flows/FetchDataFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/FetchDataFlow.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.NamedByHash import net.corda.core.crypto.Party import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.utilities.UntrustworthyData import net.corda.flows.FetchDataFlow.DownloadedVsRequestedDataMismatch @@ -30,14 +31,15 @@ abstract class FetchDataFlow( protected val requests: Set, protected val otherSide: Party) : FlowLogic>() { - open class BadAnswer : Exception() - class HashNotFound(val requested: SecureHash) : BadAnswer() - class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : BadAnswer() + class DownloadedVsRequestedDataMismatch(val requested: SecureHash, val got: SecureHash) : IllegalArgumentException() + class DownloadedVsRequestedSizeMismatch(val requested: Int, val got: Int) : IllegalArgumentException() + class HashNotFound(val requested: SecureHash) : FlowException() data class Request(val hashes: List) data class Result(val fromDisk: List, val downloaded: List) @Suspendable + @Throws(HashNotFound::class) override fun call(): Result { // Load the items we have from disk and figure out which we're missing. val (fromDisk, toFetch) = loadWhatWeHave() @@ -48,7 +50,7 @@ abstract class FetchDataFlow( logger.trace("Requesting ${toFetch.size} dependency(s) for verification") // TODO: Support "large message" response streaming so response sizes are not limited by RAM. - val maybeItems = sendAndReceive>(otherSide, Request(toFetch)) + val maybeItems = sendAndReceive>(otherSide, Request(toFetch)) // Check for a buggy/malicious peer answering with something that we didn't ask for. val downloaded = validateFetchResponse(maybeItems, toFetch) maybeWriteToDisk(downloaded) @@ -78,22 +80,19 @@ abstract class FetchDataFlow( @Suppress("UNCHECKED_CAST") protected open fun convert(wire: W): T = wire as T - private fun validateFetchResponse(maybeItems: UntrustworthyData>, - requests: List): List = - maybeItems.unwrap { response -> - if (response.size != requests.size) - throw BadAnswer() - for ((index, resp) in response.withIndex()) { - if (resp == null) throw HashNotFound(requests[index]) - } - val answers = response.requireNoNulls().map { convert(it) } - // Check transactions actually hash to what we requested, if this fails the remote node - // is a malicious flow violator or buggy. - for ((index, item) in answers.withIndex()) - if (item.id != requests[index]) - throw DownloadedVsRequestedDataMismatch(requests[index], item.id) - - answers + private fun validateFetchResponse(maybeItems: UntrustworthyData>, + requests: List): List { + return maybeItems.unwrap { response -> + if (response.size != requests.size) + throw DownloadedVsRequestedSizeMismatch(requests.size, response.size) + val answers = response.map { convert(it) } + // Check transactions actually hash to what we requested, if this fails the remote node + // is a malicious flow violator or buggy. + for ((index, item) in answers.withIndex()) { + if (item.id != requests[index]) + throw DownloadedVsRequestedDataMismatch(requests[index], item.id) } - + answers + } + } } diff --git a/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt b/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt index f6556f5217..5dac3167b5 100644 --- a/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt @@ -27,6 +27,7 @@ class FinalityFlow(val transaction: SignedTransaction, } @Suspendable + @Throws(NotaryException::class) override fun call() { // TODO: Resolve the tx here: it's probably already been done, but re-resolution is a no-op and it'll make the API more forgiving. diff --git a/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt index a677b4335a..f6cbd6f175 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryChangeFlow.kt @@ -1,13 +1,11 @@ package net.corda.flows -import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.* import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.ProgressTracker -import net.corda.core.utilities.UntrustworthyData import net.corda.flows.NotaryChangeFlow.Acceptor import net.corda.flows.NotaryChangeFlow.Instigator @@ -20,19 +18,12 @@ import net.corda.flows.NotaryChangeFlow.Instigator * Finally, [Instigator] sends the transaction containing all signatures back to each participant so they can record it and * use the new updated state for future transactions. */ -object NotaryChangeFlow : AbstractStateReplacementFlow() { +object NotaryChangeFlow : AbstractStateReplacementFlow() { - data class Proposal(override val stateRef: StateRef, - override val modification: Party, - override val stx: SignedTransaction) : AbstractStateReplacementFlow.Proposal - - class Instigator(originalState: StateAndRef, - newNotary: Party, - progressTracker: ProgressTracker = tracker()) - : AbstractStateReplacementFlow.Instigator(originalState, newNotary, progressTracker) { - - override fun assembleProposal(stateRef: StateRef, modification: Party, stx: SignedTransaction): AbstractStateReplacementFlow.Proposal - = Proposal(stateRef, modification, stx) + class Instigator( + originalState: StateAndRef, + newNotary: Party, + progressTracker: ProgressTracker = tracker()) : AbstractStateReplacementFlow.Instigator(originalState, newNotary, progressTracker) { override fun assembleTx(): Pair> { val state = originalState.state @@ -66,7 +57,8 @@ object NotaryChangeFlow : AbstractStateReplacementFlow() { private fun resolveEncumbrances(tx: TransactionBuilder): Iterable { val stateRef = originalState.ref val txId = stateRef.txhash - val issuingTx = serviceHub.storageService.validatedTransactions.getTransaction(txId) ?: throw IllegalStateException("Transaction $txId not found") + val issuingTx = serviceHub.storageService.validatedTransactions.getTransaction(txId) + ?: throw StateReplacementException("Transaction $txId not found") val outputs = issuingTx.tx.outputs val participants = mutableSetOf() @@ -97,8 +89,7 @@ object NotaryChangeFlow : AbstractStateReplacementFlow() { } class Acceptor(otherSide: Party, - override val progressTracker: ProgressTracker = tracker()) - : AbstractStateReplacementFlow.Acceptor(otherSide) { + override val progressTracker: ProgressTracker = tracker()) : AbstractStateReplacementFlow.Acceptor(otherSide) { /** * Check the notary change proposal. @@ -107,26 +98,28 @@ object NotaryChangeFlow : AbstractStateReplacementFlow() { * and is also in a geographically convenient location we can just automatically approve the change. * TODO: In more difficult cases this should call for human attention to manually verify and approve the proposal */ - @Suspendable - override fun verifyProposal(maybeProposal: UntrustworthyData>): AbstractStateReplacementFlow.Proposal { - return maybeProposal.unwrap { proposal -> - val newNotary = proposal.modification - val isNotary = serviceHub.networkMapCache.notaryNodes.any { it.notaryIdentity == newNotary } - require(isNotary) { "The proposed node $newNotary does not run a Notary service " } + override fun verifyProposal(proposal: AbstractStateReplacementFlow.Proposal): Unit { + val state = proposal.stateRef + val proposedTx = proposal.stx.tx - val state = proposal.stateRef - val proposedTx = proposal.stx.tx - require(state in proposedTx.inputs) { "The proposed state $state is not in the proposed transaction inputs" } - require(proposedTx.type.javaClass == TransactionType.NotaryChange::class.java) { - "The proposed transaction is not a notary change transaction." - } - - // An example requirement - val blacklist = listOf("Evil Notary") - require(!blacklist.contains(newNotary.name)) { "The proposed new notary $newNotary is not trusted by the party" } - - proposal + if (proposedTx.type !is TransactionType.NotaryChange) { + throw StateReplacementException("The proposed transaction is not a notary change transaction.") } + + val newNotary = proposal.modification + val isNotary = serviceHub.networkMapCache.notaryNodes.any { it.notaryIdentity == newNotary } + if (!isNotary) { + throw StateReplacementException("The proposed node $newNotary does not run a Notary service") + } + if (state !in proposedTx.inputs) { + throw StateReplacementException("The proposed state $state is not in the proposed transaction inputs") + } + +// // An example requirement +// val blacklist = listOf("Evil Notary") +// checkProposal(newNotary.name !in blacklist) { +// "The proposed new notary $newNotary is not trusted by the party" +// } } } } diff --git a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt index ce3ec4bd19..af6df1cf54 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt @@ -6,6 +6,7 @@ import net.corda.core.crypto.Party import net.corda.core.crypto.SignedData import net.corda.core.crypto.signWithECDSA import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowException import net.corda.core.node.services.TimestampChecker import net.corda.core.node.services.UniquenessException import net.corda.core.node.services.UniquenessProvider @@ -13,7 +14,6 @@ import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.ProgressTracker -import net.corda.core.utilities.UntrustworthyData object NotaryFlow { @@ -29,9 +29,7 @@ object NotaryFlow { constructor(stx: SignedTransaction) : this(stx, Client.tracker()) companion object { - object REQUESTING : ProgressTracker.Step("Requesting signature by Notary service") - object VALIDATING : ProgressTracker.Step("Validating response from Notary service") fun tracker() = ProgressTracker(REQUESTING, VALIDATING) @@ -40,6 +38,7 @@ object NotaryFlow { lateinit var notaryParty: Party @Suspendable + @Throws(NotaryException::class) override fun call(): DigitalSignature.WithKey { progressTracker.currentStep = REQUESTING val wtx = stx.tx @@ -53,27 +52,18 @@ object NotaryFlow { throw NotaryException(NotaryError.SignaturesMissing(ex)) } - val response = sendAndReceive(notaryParty, SignRequest(stx)) - - return validateResponse(response) - } - - @Throws(NotaryException::class, IllegalStateException::class) - private fun validateResponse(response: UntrustworthyData): DigitalSignature.WithKey { - return response.unwrap { notaryResult -> - progressTracker.currentStep = VALIDATING - when (notaryResult) { - is Result.Success -> { - validateSignature(notaryResult.sig, stx.id.bytes) - notaryResult.sig - } - is Result.Error -> { - if (notaryResult.error is NotaryError.Conflict) - notaryResult.error.conflict.verified() - throw NotaryException(notaryResult.error) - } - else -> throw IllegalStateException("Received invalid result from Notary service '$notaryParty'") + val response = try { + sendAndReceive(notaryParty, SignRequest(stx)) + } catch (e: NotaryException) { + if (e.error is NotaryError.Conflict) { + e.error.conflict.verified() } + throw e + } + + return response.unwrap { sig -> + validateSignature(sig, stx.id.bytes) + sig } } @@ -101,17 +91,11 @@ object NotaryFlow { val stx = receive(otherSide).unwrap { it.tx } val wtx = stx.tx - val result = try { - validateTimestamp(wtx) - beforeCommit(stx) - commitInputStates(wtx) - val sig = sign(stx.id.bytes) - Result.Success(sig) - } catch(e: NotaryException) { - Result.Error(e.error) - } - - send(otherSide, result) + validateTimestamp(wtx) + beforeCommit(stx) + commitInputStates(wtx) + val sig = sign(stx.id.bytes) + send(otherSide, sig) } private fun validateTimestamp(tx: WireTransaction) { @@ -164,15 +148,9 @@ object NotaryFlow { } data class SignRequest(val tx: SignedTransaction) - - sealed class Result { - class Error(val error: NotaryError) : Result() - class Success(val sig: DigitalSignature.WithKey) : Result() - } - } -class NotaryException(val error: NotaryError) : Exception() { +class NotaryException(val error: NotaryError) : FlowException() { override fun toString() = "${super.toString()}: Error response from Notary - $error" } diff --git a/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt index 091e558f5e..07f573ab67 100644 --- a/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt @@ -50,16 +50,12 @@ class ResolveTransactionsFlow(private val txHashes: Set, fun visit(transaction: SignedTransaction) { if (transaction.id !in visited) { visited.add(transaction.id) - forwardGraph[transaction.id]?.forEach { - visit(it) - } + forwardGraph[transaction.id]?.forEach(::visit) result.add(transaction) } } - transactions.forEach { - visit(it) - } + transactions.forEach(::visit) result.reverse() require(result.size == transactions.size) @@ -93,6 +89,7 @@ class ResolveTransactionsFlow(private val txHashes: Set, } @Suspendable + @Throws(FetchDataFlow.HashNotFound::class) override fun call(): List { val newTxns: Iterable = topologicalSort(downloadDependencies(txHashes)) diff --git a/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt index d02b9b33b2..687bc46515 100644 --- a/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt +++ b/docs/source/example-code/src/integration-test/kotlin/net/corda/docs/IntegrationTestingTutorial.kt @@ -9,9 +9,9 @@ import net.corda.core.messaging.startFlow import net.corda.core.node.services.ServiceInfo import net.corda.core.node.services.Vault import net.corda.core.serialization.OpaqueBytes +import net.corda.core.toFuture import net.corda.flows.CashCommand import net.corda.flows.CashFlow -import net.corda.flows.CashFlowResult import net.corda.node.driver.driver import net.corda.node.services.User import net.corda.node.services.startFlowPermission @@ -87,7 +87,7 @@ class IntegrationTestingTutorial { amount = i.DOLLARS.issuedBy(alice.nodeInfo.legalIdentity.ref(issueRef)), recipient = alice.nodeInfo.legalIdentity )) - assert(flowHandle.returnValue.toBlocking().first() is CashFlowResult.Success) + flowHandle.returnValue.toFuture().getOrThrow() } aliceVaultUpdates.expectEvents { diff --git a/docs/source/flow-state-machines.rst b/docs/source/flow-state-machines.rst index 460425e7d0..f01b24d79c 100644 --- a/docs/source/flow-state-machines.rst +++ b/docs/source/flow-state-machines.rst @@ -111,8 +111,8 @@ each side. object TwoPartyTradeFlow { - class UnacceptablePriceException(val givenPrice: Amount) : Exception("Unacceptable price: $givenPrice") - class AssetMismatchException(val expectedTypeName: String, val typeName: String) : Exception() { + class UnacceptablePriceException(val givenPrice: Amount) : FlowException("Unacceptable price: $givenPrice") + class AssetMismatchException(val expectedTypeName: String, val typeName: String) : FlowException() { override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName" } @@ -241,17 +241,11 @@ Let's implement the ``Seller.call`` method. This will be run when the flow is in .. container:: codeset - .. sourcecode:: kotlin - - @Suspendable - override fun call(): SignedTransaction { - val partialTX: SignedTransaction = receiveAndCheckProposedTransaction() - val ourSignature: DigitalSignature.WithKey = computeOurSignature(partialTX) - val allPartySignedTx = partialTX + ourSignature - val notarySignature = getNotarySignature(allPartySignedTx) - val result: SignedTransaction = sendSignatures(allPartySignedTx, ourSignature, notarySignature) - return result - } + .. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt + :language: kotlin + :start-after: DOCSTART 4 + :end-before: DOCEND 4 + :dedent: 4 Here we see the outline of the procedure. We receive a proposed trade transaction from the buyer and check that it's valid. The buyer has already attached their signature before sending it. Then we calculate and attach our own signature so that the transaction is @@ -265,31 +259,11 @@ Let's fill out the ``receiveAndCheckProposedTransaction()`` method. .. container:: codeset - .. sourcecode:: kotlin - - @Suspendable - private fun receiveAndCheckProposedTransaction(): SignedTransaction { - // Make the first message we'll send to kick off the flow. - val myPublicKey = myKeyPair.public.composite - val hello = SellerTradeInfo(assetToSell, price, myPublicKey) - - val maybeSTX = sendAndReceive(otherSide, hello) - - maybeSTX.unwrap { - // Check that the tx proposed by the buyer is valid. - val wtx: WireTransaction = it.verifySignatures(myPublicKey, notaryNode.notaryIdentity.owningKey) - logger.trace { "Received partially signed transaction: ${it.id}" } - - // Download and check all the things that this transaction depends on and verify it is contract-valid, - // even though it is missing signatures. - subFlow(ResolveTransactionsFlow(wtx, otherParty)) - - if (wtx.outputs.map { it.data }.sumCashBy(myPublicKey).withoutIssuer() != price) - throw IllegalArgumentException("Transaction is not sending us the right amount of cash") - - return it - } - } + .. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt + :language: kotlin + :start-after: DOCSTART 5 + :end-before: DOCEND 5 + :dedent: 4 Let's break this down. We fill out the initial flow message with the trade info, and then call ``sendAndReceive``. This function takes a few arguments: @@ -333,6 +307,26 @@ Our "scrubbing" has three parts: 2. We resolve the transaction, which we will cover below. 3. We verify that the transaction is paying us the demanded price. +Exception handling +------------------ + +Flows can throw exceptions to prematurely terminate their execution. The flow framework gives special treatment to +``FlowException`` and its subtypes. These exceptions are treated as error responses of the flow and are propagated +to all counterparties it is communicating with. The receiving flows will throw the same exception the next time they do +a ``receive`` or ``sendAndReceive`` and thus end the flow session. If the receiver was invoked via ``subFlow`` (details below) +then the exception can be caught there enabling re-invocation of the sub-flow. + +If the exception thrown by the erroring flow is not a ``FlowException`` it will still terminate but will not propagate to +the other counterparties. Instead they will be informed the flow has terminated and will themselves be terminated with a +generic exception. + +.. note:: A future version will extend this to give the node administrator more control on what to do with such erroring + flows. + +Throwing a ``FlowException`` enables a flow to reject a piece of data it has received back to the sender. This is typically +done in the ``unwrap`` method of the received ``UntrustworthyData``. In the above example the seller checks the price +and throws ``FlowException`` if it's invalid. It's then up to the buyer to either try again with a better price or give up. + Sub-flows --------- @@ -340,13 +334,11 @@ Flows can be composed via nesting. Invoking a sub-flow looks similar to an ordin .. container:: codeset - .. sourcecode:: kotlin - - @Suspendable - private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.LegallyIdentifiable { - progressTracker.currentStep = NOTARY - return subFlow(NotaryFlow.Client(stx)) - } + .. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt + :language: kotlin + :start-after: DOCSTART 6 + :end-before: DOCEND 6 + :dedent: 4 In this code snippet we are using the ``NotaryFlow.Client`` to request notarisation of the transaction. We simply create the flow object via its constructor, and then pass it to the ``subFlow`` method which @@ -372,18 +364,11 @@ Here's the rest of the code: .. container:: codeset - .. sourcecode:: kotlin - - open fun calculateOurSignature(partialTX: SignedTransaction) = myKeyPair.signWithECDSA(partialTX.id) - - @Suspendable - private fun sendSignatures(allPartySignedTX: SignedTransaction, ourSignature: DigitalSignature.WithKey, - notarySignature: DigitalSignature.WithKey): SignedTransaction { - val fullySigned = allPartySignedTX + notarySignature - logger.trace { "Built finished transaction, sending back to secondary!" } - send(otherSide, SignaturesFromSeller(ourSignature, notarySignature)) - return fullySigned - } + .. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt + :language: kotlin + :start-after: DOCSTART 7 + :end-before: DOCEND 7 + :dedent: 4 It's all pretty straightforward from now on. Here ``id`` is the secure hash representing the serialised transaction, and we just use our private key to calculate a signature over it. As a reminder, in Corda signatures do @@ -413,7 +398,7 @@ OK, let's do the same for the buyer side: :language: kotlin :start-after: DOCSTART 1 :end-before: DOCEND 1 - :dedent: 8 + :dedent: 4 This code is longer but no more complicated. Here are some things to pay attention to: @@ -453,7 +438,6 @@ A flow might declare some steps with code inside the flow class like this: :end-before: DOCSTART 1 :dedent: 4 - .. sourcecode:: java private final ProgressTracker progressTracker = new ProgressTracker( @@ -547,7 +531,7 @@ The flow framework is a key part of the platform and will be extended in major w the features we have planned: * Identity based addressing -* Exception propagation and management, with a "flow hospital" tool to manually provide solutions to unavoidable +* Exception management, with a "flow hospital" tool to manually provide solutions to unavoidable problems (e.g. the other side doesn't know the trade) * Being able to interact with internal apps and tools via RPC * Being able to interact with people, either via some sort of external ticketing system, or email, or a custom UI. diff --git a/finance/src/main/kotlin/net/corda/contracts/asset/OnLedgerAsset.kt b/finance/src/main/kotlin/net/corda/contracts/asset/OnLedgerAsset.kt index ad0637fb8c..1aa5c96ffb 100644 --- a/finance/src/main/kotlin/net/corda/contracts/asset/OnLedgerAsset.kt +++ b/finance/src/main/kotlin/net/corda/contracts/asset/OnLedgerAsset.kt @@ -38,13 +38,18 @@ abstract class OnLedgerAsset> : C * the responsibility of the caller to check that they do not exit funds held by others. * @return the public key of the assets issuer, who must sign the transaction for it to be valid. */ + @Throws(InsufficientBalanceException::class) fun generateExit(tx: TransactionBuilder, amountIssued: Amount>, - assetStates: List>): CompositeKey - = conserveClause.generateExit(tx, amountIssued, assetStates, - deriveState = { state, amount, owner -> deriveState(state, amount, owner) }, - generateMoveCommand = { -> generateMoveCommand() }, - generateExitCommand = { amount -> generateExitCommand(amount) } - ) + assetStates: List>): CompositeKey { + return conserveClause.generateExit( + tx, + amountIssued, + assetStates, + deriveState = { state, amount, owner -> deriveState(state, amount, owner) }, + generateMoveCommand = { -> generateMoveCommand() }, + generateExitCommand = { amount -> generateExitCommand(amount) } + ) + } abstract fun generateExitCommand(amount: Amount>): FungibleAsset.Commands.Exit abstract fun generateIssueCommand(): FungibleAsset.Commands.Issue diff --git a/finance/src/main/kotlin/net/corda/contracts/clause/AbstractConserveAmount.kt b/finance/src/main/kotlin/net/corda/contracts/clause/AbstractConserveAmount.kt index 86bd76c73d..d636ccd1e3 100644 --- a/finance/src/main/kotlin/net/corda/contracts/clause/AbstractConserveAmount.kt +++ b/finance/src/main/kotlin/net/corda/contracts/clause/AbstractConserveAmount.kt @@ -45,6 +45,7 @@ abstract class AbstractConserveAmount, C : CommandData, T : * the responsibility of the caller to check that they do not attempt to exit funds held by others. * @return the public key of the assets issuer, who must sign the transaction for it to be valid. */ + @Throws(InsufficientBalanceException::class) fun generateExit(tx: TransactionBuilder, amountIssued: Amount>, assetStates: List>, deriveState: (TransactionState, Amount>, CompositeKey) -> TransactionState, diff --git a/finance/src/main/kotlin/net/corda/flows/CashFlow.kt b/finance/src/main/kotlin/net/corda/flows/CashFlow.kt index 8ac1520f0b..5214ff907c 100644 --- a/finance/src/main/kotlin/net/corda/flows/CashFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/CashFlow.kt @@ -6,8 +6,8 @@ import net.corda.core.contracts.* import net.corda.core.crypto.Party import net.corda.core.crypto.keys import net.corda.core.crypto.toStringShort +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic -import net.corda.core.flows.StateMachineRunId import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder @@ -20,7 +20,7 @@ import java.util.* * * @param command Indicates what Cash transaction to create with what parameters. */ -class CashFlow(val command: CashCommand, override val progressTracker: ProgressTracker) : FlowLogic() { +class CashFlow(val command: CashCommand, override val progressTracker: ProgressTracker) : FlowLogic() { constructor(command: CashCommand) : this(command, tracker()) companion object { @@ -32,7 +32,8 @@ class CashFlow(val command: CashCommand, override val progressTracker: ProgressT } @Suspendable - override fun call(): CashFlowResult { + @Throws(CashException::class) + override fun call(): SignedTransaction { return when (command) { is CashCommand.IssueCash -> issueCash(command) is CashCommand.PayCash -> initiatePayment(command) @@ -42,84 +43,90 @@ class CashFlow(val command: CashCommand, override val progressTracker: ProgressT // TODO check with the recipient if they want to accept the cash. @Suspendable - private fun initiatePayment(req: CashCommand.PayCash): CashFlowResult { + private fun initiatePayment(req: CashCommand.PayCash): SignedTransaction { progressTracker.currentStep = PAYING val builder: TransactionBuilder = TransactionType.General.Builder(null) // TODO: Have some way of restricting this to states the caller controls - try { - val (spendTX, keysForSigning) = serviceHub.vaultService.generateSpend(builder, - req.amount.withoutIssuer(), req.recipient.owningKey, setOf(req.amount.token.issuer.party)) - - keysForSigning.keys.forEach { - val key = serviceHub.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}") - builder.signWith(KeyPair(it, key)) - } - - val tx = spendTX.toSignedTransaction(checkSufficientSignatures = false) - val flow = FinalityFlow(tx, setOf(req.recipient)) - subFlow(flow) - return CashFlowResult.Success( - stateMachine.id, - tx, - "Cash payment transaction generated" - ) - } catch(ex: InsufficientBalanceException) { - return CashFlowResult.Failed(ex.message ?: "Insufficient balance") + val (spendTX, keysForSigning) = try { + serviceHub.vaultService.generateSpend( + builder, + req.amount.withoutIssuer(), + req.recipient.owningKey, + setOf(req.amount.token.issuer.party)) + } catch (e: InsufficientBalanceException) { + throw CashException("Insufficent cash for spend", e) } + + keysForSigning.keys.forEach { + val key = serviceHub.keyManagementService.keys[it] ?: throw IllegalStateException("Could not find signing key for ${it.toStringShort()}") + builder.signWith(KeyPair(it, key)) + } + + val tx = spendTX.toSignedTransaction(checkSufficientSignatures = false) + finaliseTx(setOf(req.recipient), tx, "Unable to notarise spend") + return tx } @Suspendable - private fun exitCash(req: CashCommand.ExitCash): CashFlowResult { + private fun exitCash(req: CashCommand.ExitCash): SignedTransaction { progressTracker.currentStep = EXITING val builder: TransactionBuilder = TransactionType.General.Builder(null) + val issuer = serviceHub.myInfo.legalIdentity.ref(req.issueRef) try { - val issuer = PartyAndReference(serviceHub.myInfo.legalIdentity, req.issueRef) - Cash().generateExit(builder, req.amount.issuedBy(issuer), + Cash().generateExit( + builder, + req.amount.issuedBy(issuer), serviceHub.vaultService.currentVault.statesOfType().filter { it.state.data.owner == issuer.party.owningKey }) - val myKey = serviceHub.legalIdentityKey - builder.signWith(myKey) - - // Work out who the owners of the burnt states were - val inputStatesNullable = serviceHub.vaultService.statesForRefs(builder.inputStates()) - val inputStates = inputStatesNullable.values.filterNotNull().map { it.data } - if (inputStatesNullable.size != inputStates.size) { - val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key } - throw InputStateRefResolveFailed(unresolvedStateRefs) - } - - // TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them - // count as a reason to fail? - val participants: Set = inputStates.filterIsInstance().map { serviceHub.identityService.partyFromKey(it.owner) }.filterNotNull().toSet() - - // Commit the transaction - val tx = builder.toSignedTransaction(checkSufficientSignatures = false) - subFlow(FinalityFlow(tx, participants)) - return CashFlowResult.Success( - stateMachine.id, - tx, - "Cash destruction transaction generated" - ) - } catch (ex: InsufficientBalanceException) { - return CashFlowResult.Failed(ex.message ?: "Insufficient balance") + } catch (e: InsufficientBalanceException) { + throw CashException("Exiting more cash than exists", e) } + val myKey = serviceHub.legalIdentityKey + builder.signWith(myKey) + + // Work out who the owners of the burnt states were + val inputStatesNullable = serviceHub.vaultService.statesForRefs(builder.inputStates()) + val inputStates = inputStatesNullable.values.filterNotNull().map { it.data } + if (inputStatesNullable.size != inputStates.size) { + val unresolvedStateRefs = inputStatesNullable.filter { it.value == null }.map { it.key } + throw IllegalStateException("Failed to resolve input StateRefs: $unresolvedStateRefs") + } + + // TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them + // count as a reason to fail? + val participants: Set = inputStates + .filterIsInstance() + .map { serviceHub.identityService.partyFromKey(it.owner) } + .filterNotNull() + .toSet() + + // Commit the transaction + val tx = builder.toSignedTransaction(checkSufficientSignatures = false) + finaliseTx(participants, tx, "Unable to notarise exit") + return tx } @Suspendable - private fun issueCash(req: CashCommand.IssueCash): CashFlowResult { + private fun finaliseTx(participants: Set, tx: SignedTransaction, message: String) { + try { + subFlow(FinalityFlow(tx, participants)) + } catch (e: NotaryException) { + throw CashException(message, e) + } + } + + // TODO This doesn't throw any exception so it might be worth splitting the three cash commands into separate flows + @Suspendable + private fun issueCash(req: CashCommand.IssueCash): SignedTransaction { progressTracker.currentStep = ISSUING val builder: TransactionBuilder = TransactionType.General.Builder(notary = null) - val issuer = PartyAndReference(serviceHub.myInfo.legalIdentity, req.issueRef) + val issuer = serviceHub.myInfo.legalIdentity.ref(req.issueRef) Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary) val myKey = serviceHub.legalIdentityKey builder.signWith(myKey) val tx = builder.toSignedTransaction(checkSufficientSignatures = true) // Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it subFlow(BroadcastTransactionFlow(tx, setOf(req.recipient))) - return CashFlowResult.Success( - stateMachine.id, - tx, - "Cash issuance completed" - ) + return tx } } @@ -158,22 +165,4 @@ sealed class CashCommand { class ExitCash(val amount: Amount, val issueRef: OpaqueBytes) : CashCommand() } -sealed class CashFlowResult { - /** - * @param transaction the transaction created as a result, in the case where the flow completed successfully. - */ - class Success(val id: StateMachineRunId, val transaction: SignedTransaction?, val message: String?) : CashFlowResult() { - override fun toString() = "Success($message)" - } - - /** - * State indicating the action undertaken failed, either directly (it is not something which requires a - * state machine), or before a state machine was started. - */ - class Failed(val message: String?) : CashFlowResult() { - override fun toString() = "Failed($message)" - } -} - -class InputStateRefResolveFailed(stateRefs: List) : - Exception("Failed to resolve input StateRefs $stateRefs") +class CashException(message: String, cause: Throwable) : FlowException(message, cause) diff --git a/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt b/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt index 2e413fdcd8..ff946f0fa4 100644 --- a/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/IssuerFlow.kt @@ -1,19 +1,14 @@ package net.corda.flows import co.paralleluniverse.fibers.Suspendable -import net.corda.core.ThreadBox import net.corda.core.contracts.* import net.corda.core.crypto.Party -import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic -import net.corda.core.node.NodeInfo import net.corda.core.node.PluginServiceHub import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker -import net.corda.flows.CashCommand -import net.corda.flows.CashFlow -import net.corda.flows.CashFlowResult import java.util.* /** @@ -27,22 +22,16 @@ object IssuerFlow { data class IssuanceRequestState(val amount: Amount, val issueToParty: Party, val issuerPartyRef: OpaqueBytes) /** - * IssuanceRequester should be used by a client to ask a remote note to issue some [FungibleAsset] with the given details. + * IssuanceRequester should be used by a client to ask a remote node to issue some [FungibleAsset] with the given details. * Returns the transaction created by the Issuer to move the cash to the Requester. */ class IssuanceRequester(val amount: Amount, val issueToParty: Party, val issueToPartyRef: OpaqueBytes, val issuerBankParty: Party): FlowLogic() { @Suspendable + @Throws(CashException::class) override fun call(): SignedTransaction { val issueRequest = IssuanceRequestState(amount, issueToParty, issueToPartyRef) - try { - return sendAndReceive(issuerBankParty, issueRequest).unwrap { it } - // catch and report exception before throwing back to caller - } catch (e: Exception) { - logger.error("IssuanceRequesterException: request failed: [${issueRequest}]") - // TODO: awaiting exception handling strategy (what action should be taken here?) - throw e - } + return sendAndReceive(issuerBankParty, issueRequest).unwrap { it } } } @@ -51,22 +40,25 @@ object IssuerFlow { * Returns the generated transaction representing the transfer of the [Issued] [FungibleAsset] to the issue requester. */ class Issuer(val otherParty: Party): FlowLogic() { - override val progressTracker: ProgressTracker = tracker() companion object { object AWAITING_REQUEST : ProgressTracker.Step("Awaiting issuance request") object ISSUING : ProgressTracker.Step("Self issuing asset") object TRANSFERRING : ProgressTracker.Step("Transferring asset to issuance requester") object SENDING_CONFIRM : ProgressTracker.Step("Confirming asset issuance to requester") fun tracker() = ProgressTracker(AWAITING_REQUEST, ISSUING, TRANSFERRING, SENDING_CONFIRM) + private val VALID_CURRENCIES = listOf(USD, GBP, EUR, CHF) } + override val progressTracker: ProgressTracker = tracker() + @Suspendable + @Throws(CashException::class) override fun call(): SignedTransaction { progressTracker.currentStep = AWAITING_REQUEST - val issueRequest = receive(otherParty).unwrap { it } - // validate request inputs (for example, lets restrict the types of currency that can be issued) - require(listOf(USD, GBP, EUR, CHF).contains(issueRequest.amount.token)) { - logger.error("Currency must be one of USD, GBP, EUR, CHF") + val issueRequest = receive(otherParty).unwrap { + // validate request inputs (for example, lets restrict the types of currency that can be issued) + if (it.amount.token !in VALID_CURRENCIES) throw FlowException("Currency must be one of $VALID_CURRENCIES") + it } // TODO: parse request to determine Asset to issue val txn = issueCashTo(issueRequest.amount, issueRequest.issueToParty, issueRequest.issuerPartyRef) @@ -79,52 +71,30 @@ object IssuerFlow { // state references (thus causing Notarisation double spend exceptions). @Suspendable private fun issueCashTo(amount: Amount, - issueTo: Party, issuerPartyRef: OpaqueBytes): SignedTransaction { + issueTo: Party, + issuerPartyRef: OpaqueBytes): SignedTransaction { // TODO: pass notary in as request parameter val notaryParty = serviceHub.networkMapCache.notaryNodes[0].notaryIdentity // invoke Cash subflow to issue Asset progressTracker.currentStep = ISSUING val bankOfCordaParty = serviceHub.myInfo.legalIdentity - try { - val issueCashFlow = CashFlow(CashCommand.IssueCash(amount, issuerPartyRef, bankOfCordaParty, notaryParty)) - val resultIssue = subFlow(issueCashFlow) - // NOTE: issueCashFlow performs a Broadcast (which stores a local copy of the txn to the ledger) - if (resultIssue is CashFlowResult.Failed) { - logger.error("Problem issuing cash: ${resultIssue.message}") - throw Exception(resultIssue.message) - } - // short-circuit when issuing to self - if (issueTo.equals(serviceHub.myInfo.legalIdentity)) - return (resultIssue as CashFlowResult.Success).transaction!! - // now invoke Cash subflow to Move issued assetType to issue requester - progressTracker.currentStep = TRANSFERRING - val moveCashFlow = CashFlow(CashCommand.PayCash(amount.issuedBy(bankOfCordaParty.ref(issuerPartyRef)), issueTo)) - val resultMove = subFlow(moveCashFlow) - // NOTE: CashFlow PayCash calls FinalityFlow which performs a Broadcast (which stores a local copy of the txn to the ledger) - if (resultMove is CashFlowResult.Failed) { - logger.error("Problem transferring cash: ${resultMove.message}") - throw Exception(resultMove.message) - } - val txn = (resultMove as CashFlowResult.Success).transaction - txn?.let { - return txn - } - // NOTE: CashFlowResult.Success should always return a signedTransaction - throw Exception("Missing CashFlow transaction [${(resultMove)}]") - } - // catch and report exception before throwing back to caller flow - catch (e: Exception) { - logger.error("Issuer Exception: failed for amount ${amount} issuedTo ${issueTo} with issuerPartyRef ${issuerPartyRef}") - // TODO: awaiting exception handling strategy (what action should be taken here?) - throw e - } + val issueCashFlow = CashFlow(CashCommand.IssueCash(amount, issuerPartyRef, bankOfCordaParty, notaryParty)) + val issueTx = subFlow(issueCashFlow) + // NOTE: issueCashFlow performs a Broadcast (which stores a local copy of the txn to the ledger) + // short-circuit when issuing to self + if (issueTo == serviceHub.myInfo.legalIdentity) + return issueTx + // now invoke Cash subflow to Move issued assetType to issue requester + progressTracker.currentStep = TRANSFERRING + val moveCashFlow = CashFlow(CashCommand.PayCash(amount.issuedBy(bankOfCordaParty.ref(issuerPartyRef)), issueTo)) + val moveTx = subFlow(moveCashFlow) + // NOTE: CashFlow PayCash calls FinalityFlow which performs a Broadcast (which stores a local copy of the txn to the ledger) + return moveTx } class Service(services: PluginServiceHub) { init { - services.registerFlowInitiator(IssuanceRequester::class) { - Issuer(it) - } + services.registerFlowInitiator(IssuanceRequester::class, ::Issuer) } } } diff --git a/finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt b/finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt index ee5c2808c9..50341d8ded 100644 --- a/finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt @@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.contracts.asset.sumCashBy import net.corda.core.contracts.* import net.corda.core.crypto.* +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.node.NodeInfo import net.corda.core.seconds @@ -42,8 +43,8 @@ import java.util.* // and [AbstractStateReplacementFlow]. object TwoPartyTradeFlow { - class UnacceptablePriceException(val givenPrice: Amount) : Exception("Unacceptable price: $givenPrice") - class AssetMismatchException(val expectedTypeName: String, val typeName: String) : Exception() { + class UnacceptablePriceException(givenPrice: Amount) : FlowException("Unacceptable price: $givenPrice") + class AssetMismatchException(val expectedTypeName: String, val typeName: String) : FlowException() { override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName" } @@ -66,39 +67,39 @@ object TwoPartyTradeFlow { companion object { object AWAITING_PROPOSAL : ProgressTracker.Step("Awaiting transaction proposal") - object VERIFYING : ProgressTracker.Step("Verifying transaction proposal") - object SIGNING : ProgressTracker.Step("Signing transaction") - // DOCSTART 3 object NOTARY : ProgressTracker.Step("Getting notary signature") { override fun childProgressTracker() = FinalityFlow.tracker() } // DOCEND 3 - object SENDING_SIGS : ProgressTracker.Step("Sending transaction signatures to buyer") fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, NOTARY, SENDING_SIGS) } + // DOCSTART 4 @Suspendable override fun call(): SignedTransaction { val partialTX: SignedTransaction = receiveAndCheckProposedTransaction() - - // These two steps could be done in parallel, in theory. Our framework doesn't support that yet though. - val ourSignature = calculateOurSignature(partialTX) - val allPartySignedTx = partialTX + ourSignature - val notarySignature = getNotarySignature(allPartySignedTx) - return sendSignatures(allPartySignedTx, ourSignature, notarySignature) + val ourSignature: DigitalSignature.WithKey = calculateOurSignature(partialTX) + val allPartySignedTx: SignedTransaction = partialTX + ourSignature + val notarySignature: DigitalSignature.WithKey = getNotarySignature(allPartySignedTx) + val result: SignedTransaction = sendSignatures(allPartySignedTx, ourSignature, notarySignature) + return result } + // DOCEND 4 + // DOCSTART 6 @Suspendable private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.WithKey { progressTracker.currentStep = NOTARY return subFlow(NotaryFlow.Client(stx)) } + // DOCEND 6 + // DOCSTART 5 @Suspendable private fun receiveAndCheckProposedTransaction(): SignedTransaction { progressTracker.currentStep = AWAITING_PROPOSAL @@ -122,30 +123,36 @@ object TwoPartyTradeFlow { // even though it is missing signatures. subFlow(ResolveTransactionsFlow(wtx, otherParty)) - if (wtx.outputs.map { it.data }.sumCashBy(myPublicKey).withoutIssuer() != price) - throw IllegalArgumentException("Transaction is not sending us the right amount of cash") - - // There are all sorts of funny games a malicious secondary might play here, we should fix them: - // - // - This tx may attempt to send some assets we aren't intending to sell to the secondary, if - // we're reusing keys! So don't reuse keys! - // - This tx may include output states that impose odd conditions on the movement of the cash, - // once we implement state pairing. - // - // but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to - // express flow state machines on top of the messaging layer. + if (wtx.outputs.map { it.data }.sumCashBy(myPublicKey).withoutIssuer() != price) { + throw FlowException("Transaction is not sending us the right amount of cash") + } return it } } + // DOCEND 5 + // Following comment moved here so that it doesn't appear in the docsite: + // There are all sorts of funny games a malicious secondary might play with it sends maybeSTX (in + // receiveAndCheckProposedTransaction), we should fix them: + // + // - This tx may attempt to send some assets we aren't intending to sell to the secondary, if + // we're reusing keys! So don't reuse keys! + // - This tx may include output states that impose odd conditions on the movement of the cash, + // once we implement state pairing. + // + // but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to + // express flow state machines on top of the messaging layer. + + // DOCSTART 7 open fun calculateOurSignature(partialTX: SignedTransaction): DigitalSignature.WithKey { progressTracker.currentStep = SIGNING return myKeyPair.signWithECDSA(partialTX.id) } @Suspendable - private fun sendSignatures(allPartySignedTx: SignedTransaction, ourSignature: DigitalSignature.WithKey, + private fun sendSignatures(allPartySignedTx: SignedTransaction, + ourSignature: DigitalSignature.WithKey, notarySignature: DigitalSignature.WithKey): SignedTransaction { progressTracker.currentStep = SENDING_SIGS val fullySigned = allPartySignedTx + notarySignature @@ -155,6 +162,7 @@ object TwoPartyTradeFlow { send(otherParty, SignaturesFromSeller(ourSignature, notarySignature)) return fullySigned } + // DOCEND 7 } // DOCSTART 2 @@ -164,11 +172,8 @@ object TwoPartyTradeFlow { val typeToBuy: Class) : FlowLogic() { object RECEIVING : ProgressTracker.Step("Waiting for seller trading info") - object VERIFYING : ProgressTracker.Step("Verifying seller assets") - object SIGNING : ProgressTracker.Step("Generating and signing transaction proposal") - object SWAPPING_SIGNATURES : ProgressTracker.Step("Swapping signatures with the seller") override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES) diff --git a/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt b/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt index 07e20b5bce..7009f6874e 100644 --- a/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt +++ b/finance/src/test/kotlin/net/corda/flows/IssuerFlowTest.kt @@ -5,7 +5,9 @@ import net.corda.core.contracts.Amount import net.corda.core.contracts.DOLLARS import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.currency +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowStateMachine +import net.corda.core.getOrThrow import net.corda.core.map import net.corda.core.serialization.OpaqueBytes import net.corda.core.transactions.SignedTransaction @@ -39,8 +41,8 @@ class IssuerFlowTest { assertEquals(issuerResult.get(), issuer.get().resultFuture.get()) // try to issue an amount of a restricted currency - assertFailsWith { - runIssuerAndIssueRequester(Amount(100000L, currency("BRL")), issueToPartyAndRef).issueRequestResult.get() + assertFailsWith { + runIssuerAndIssueRequester(Amount(100000L, currency("BRL")), issueToPartyAndRef).issueRequestResult.getOrThrow() } bankOfCordaNode.stop() diff --git a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt index 2455a0a696..3004eceb21 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/DistributedServiceTests.kt @@ -5,14 +5,15 @@ import net.corda.core.contracts.Amount import net.corda.core.contracts.POUNDS import net.corda.core.contracts.issuedBy import net.corda.core.crypto.Party +import net.corda.core.getOrThrow import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.StateMachineUpdate import net.corda.core.messaging.startFlow import net.corda.core.node.NodeInfo import net.corda.core.serialization.OpaqueBytes +import net.corda.core.toFuture import net.corda.flows.CashCommand import net.corda.flows.CashFlow -import net.corda.flows.CashFlowResult import net.corda.node.driver.DriverBasedTest import net.corda.node.driver.NodeHandle import net.corda.node.driver.driver @@ -137,13 +138,13 @@ class DistributedServiceTests : DriverBasedTest() { val issueHandle = aliceProxy.startFlow( ::CashFlow, CashCommand.IssueCash(amount, OpaqueBytes.of(0), alice.nodeInfo.legalIdentity, raftNotaryIdentity)) - require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success) + issueHandle.returnValue.toFuture().getOrThrow() } private fun paySelf(amount: Amount) { val payHandle = aliceProxy.startFlow( ::CashFlow, CashCommand.PayCash(amount.issuedBy(alice.nodeInfo.legalIdentity.ref(0)), alice.nodeInfo.legalIdentity)) - require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success) + payHandle.returnValue.toFuture().getOrThrow() } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt index ca2954e5cb..f2a3dcf394 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCStructures.kt @@ -28,7 +28,6 @@ import net.corda.core.node.services.* import net.corda.core.serialization.* import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.WireTransaction -import net.corda.flows.CashFlowResult import net.corda.node.internal.AbstractNode import net.corda.node.services.User import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER @@ -190,8 +189,6 @@ private class RPCKryo(observableSerializer: Serializer>? = null) register(Cash.Clauses.ConserveAmount::class.java) register(listOf(Unit).javaClass) // SingletonList register(setOf(Unit).javaClass) // SingletonSet - register(CashFlowResult.Success::class.java) - register(CashFlowResult.Failed::class.java) register(ServiceEntry::class.java) register(NodeInfo::class.java) register(PhysicalLocation::class.java) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt index e9f56ec295..8bbf3cd811 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt @@ -2,16 +2,17 @@ package net.corda.node.services.persistence import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic +import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.PluginServiceHub import net.corda.core.node.recordTransactions import net.corda.core.serialization.SingletonSerializeAsToken -import net.corda.core.utilities.loggerFor +import net.corda.core.transactions.SignedTransaction import net.corda.flows.* -import net.corda.core.node.CordaPluginRegistry -import java.io.InputStream -import javax.annotation.concurrent.ThreadSafe import java.util.function.Function +import javax.annotation.concurrent.ThreadSafe object DataVending { @@ -33,55 +34,40 @@ object DataVending { */ @ThreadSafe class Service(services: PluginServiceHub) : SingletonSerializeAsToken() { - - companion object { - val logger = loggerFor() - } - init { services.registerFlowInitiator(FetchTransactionsFlow::class, ::FetchTransactionsHandler) services.registerFlowInitiator(FetchAttachmentsFlow::class, ::FetchAttachmentsHandler) services.registerFlowInitiator(BroadcastTransactionFlow::class, ::NotifyTransactionHandler) } - - private class FetchTransactionsHandler(val otherParty: Party) : FlowLogic() { - @Suspendable - override fun call() { - val request = receive(otherParty).unwrap { - require(it.hashes.isNotEmpty()) - it - } - val txs = request.hashes.map { - val tx = serviceHub.storageService.validatedTransactions.getTransaction(it) - if (tx == null) - logger.info("Got request for unknown tx $it") - tx - } - send(otherParty, txs) + private class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler(otherParty) { + override fun getData(id: SecureHash): SignedTransaction? { + return serviceHub.storageService.validatedTransactions.getTransaction(id) } } - // TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer. - private class FetchAttachmentsHandler(val otherParty: Party) : FlowLogic() { + private class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler(otherParty) { + override fun getData(id: SecureHash): ByteArray? { + return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes() + } + } + + private abstract class FetchDataHandler(val otherParty: Party) : FlowLogic() { @Suspendable + @Throws(FetchDataFlow.HashNotFound::class) override fun call() { val request = receive(otherParty).unwrap { - require(it.hashes.isNotEmpty()) + if (it.hashes.isEmpty()) throw FlowException("Empty hash list") it } - val attachments = request.hashes.map { - val jar: InputStream? = serviceHub.storageService.attachments.openAttachment(it)?.open() - if (jar == null) { - logger.info("Got request for unknown attachment $it") - null - } else { - jar.readBytes() - } + val response = request.hashes.map { + getData(it) ?: throw FetchDataFlow.HashNotFound(it) } - send(otherParty, attachments) + send(otherParty, response) } + + protected abstract fun getData(id: SecureHash): T? } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 2ed5e921e3..343d62288b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -30,7 +30,7 @@ import java.util.concurrent.ExecutionException class FlowStateMachineImpl(override val id: StateMachineRunId, val logic: FlowLogic, - scheduler: FiberScheduler) : Fiber("flow", scheduler), FlowStateMachine { + scheduler: FiberScheduler) : Fiber("flow", scheduler), FlowStateMachine { companion object { // Used to work around a small limitation in Quasar. private val QUASAR_UNBLOCKER = run { @@ -49,7 +49,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, @Transient override lateinit var serviceHub: ServiceHubInternal @Transient internal lateinit var database: Database @Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit - @Transient internal lateinit var actionOnEnd: () -> Unit + @Transient internal lateinit var actionOnEnd: (FlowException?) -> Unit @Transient internal var fromCheckpoint: Boolean = false @Transient private var txTrampoline: Transaction? = null @@ -80,29 +80,41 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable - override fun run(): R { + override fun run() { createTransaction() val result = try { logic.call() + } catch (e: FlowException) { + if (e.stackTrace[0].className == javaClass.name) { + // FlowException was propagated to us as it's stack trace points to this internal class (see suspendAndExpectReceive). + // If we've got to here then the flow doesn't want to handle it and so we end, but we don't propagate + // the exception further as it's not relevant to anyone else. + actionOnEnd(null) + } else { + // FLowException came from this flow + actionOnEnd(e) + } + _resultFuture?.setException(e) + return } catch (t: Throwable) { - actionOnEnd() + actionOnEnd(null) _resultFuture?.setException(t) throw ExecutionException(t) } - // Wait for sessions with unconfirmed session state. - openSessions.values.filter { it.state is FlowSessionState.Initiating }.forEach { - it.waitForConfirmation() - } + + // Only sessions which have a single send and nothing else will block here + openSessions.values + .filter { it.state is FlowSessionState.Initiating } + .forEach { it.waitForConfirmation() } // This is to prevent actionOnEnd being called twice if it throws an exception - actionOnEnd() + actionOnEnd(null) _resultFuture?.set(result) - return result } private fun createTransaction() { // Make sure we have a database transaction createDatabaseTransaction(database) - logger.trace { "Starting database transaction ${TransactionManager.currentOrNull()} on ${Strand.currentStrand()}." } + logger.trace { "Starting database transaction ${TransactionManager.currentOrNull()} on ${Strand.currentStrand()}" } } internal fun commitTransaction() { @@ -221,6 +233,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } @Suspendable + @Suppress("UNCHECKED_CAST", "PLATFORM_CLASS_MAPPED_TO_KOTLIN") private fun suspendAndExpectReceive(receiveRequest: ReceiveRequest): ReceivedSessionMessage { val session = receiveRequest.session fun getReceivedMessage(): ReceivedSessionMessage? = session.receivedMessages.poll() @@ -237,19 +250,23 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, suspend(receiveRequest) getReceivedMessage() ?: throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but instead " + - "got nothing: $receiveRequest") + "got nothing for $receiveRequest") } - if (receivedMessage.message is SessionEnd) { - openSessions.values.remove(session) - throw FlowException("Party ${session.state.sendToParty} has ended their flow but we were expecting to " + - "receive ${receiveRequest.receiveType.simpleName} from them") - } else if (receiveRequest.receiveType.isInstance(receivedMessage.message)) { - @Suppress("UNCHECKED_CAST") + if (receiveRequest.receiveType.isInstance(receivedMessage.message)) { return receivedMessage as ReceivedSessionMessage + } else if (receivedMessage.message is SessionEnd) { + openSessions.values.remove(session) + if (receivedMessage.message.errorResponse != null) { + (receivedMessage.message.errorResponse as java.lang.Throwable).fillInStackTrace() + throw receivedMessage.message.errorResponse + } else { + throw FlowSessionException("${session.state.sendToParty} has ended their flow but we were expecting " + + "to receive ${receiveRequest.receiveType.simpleName} from them") + } } else { throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but instead got " + - "${receivedMessage.message}: $receiveRequest") + "${receivedMessage.message} for $receiveRequest") } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt index 6602e55add..c1811c98b8 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SessionMessage.kt @@ -29,7 +29,7 @@ data class SessionData(override val recipientSessionId: Long, val payload: Any) } } -data class SessionEnd(override val recipientSessionId: Long) : ExistingSessionMessage +data class SessionEnd(override val recipientSessionId: Long, val errorResponse: FlowException?) : ExistingSessionMessage data class ReceivedSessionMessage(val sender: Party, val message: M) @@ -37,7 +37,9 @@ fun ReceivedSessionMessage.checkPayloadIs(type: Class): Untr if (type.isInstance(message.payload)) { return UntrustworthyData(type.cast(message.payload)) } else { - throw FlowException("We were expecting a ${type.name} from $sender but we instead got a " + + throw FlowSessionException("We were expecting a ${type.name} from $sender but we instead got a " + "${message.payload.javaClass.name} (${message.payload})") } -} \ No newline at end of file +} + +class FlowSessionException(message: String) : RuntimeException(message) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index fdfa200f1d..98159ddbc3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -12,6 +12,7 @@ import net.corda.core.ThreadBox import net.corda.core.bufferUntilSubscribed import net.corda.core.crypto.Party import net.corda.core.crypto.commonName +import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.StateMachineRunId @@ -194,7 +195,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, checkpointStorage.forEach { // If a flow is added before start() then don't attempt to restore it if (!stateMachines.containsValue(it)) { - val fiber = deserializeFiber(it.serializedFiber) + val fiber = deserializeFiber(it) initFiber(fiber) stateMachines[fiber] = it } @@ -256,7 +257,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, if (peerParty != null) { if (message is SessionConfirm) { logger.debug { "Received session confirmation but associated fiber has already terminated, so sending session end" } - sendSessionMessage(peerParty, SessionEnd(message.initiatedSessionId)) + sendSessionMessage(peerParty, SessionEnd(message.initiatedSessionId, null)) } else { logger.trace { "Ignoring session end message for already closed session: $message" } } @@ -269,30 +270,44 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, private fun onSessionInit(sessionInit: SessionInit, sender: Party) { logger.trace { "Received $sessionInit $sender" } val otherPartySessionId = sessionInit.initiatorSessionId - try { - val markerClass = Class.forName(sessionInit.flowName) - val flowFactory = serviceHub.getFlowFactory(markerClass) - if (flowFactory != null) { - val flow = flowFactory(sender) - val fiber = createFiber(flow) - val session = FlowSession(flow, random63BitValue(), FlowSessionState.Initiated(sender, otherPartySessionId)) - if (sessionInit.firstPayload != null) { - session.receivedMessages += ReceivedSessionMessage(sender, SessionData(session.ourSessionId, sessionInit.firstPayload)) - } - openSessions[session.ourSessionId] = session - fiber.openSessions[Pair(flow, sender)] = session - updateCheckpoint(fiber) - sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), fiber) - fiber.logger.debug { "Initiated from $sessionInit on $session" } - startFiber(fiber) - } else { - logger.warn("Unknown flow marker class in $sessionInit") - sendSessionMessage(sender, SessionReject(otherPartySessionId, "Don't know ${markerClass.name}")) - } + + fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message)) + + val markerClass = try { + Class.forName(sessionInit.flowName) } catch (e: Exception) { logger.warn("Received invalid $sessionInit", e) - sendSessionMessage(sender, SessionReject(otherPartySessionId, "Unable to establish session")) + sendSessionReject("Don't know ${sessionInit.flowName}") + return } + + val flowFactory = serviceHub.getFlowFactory(markerClass) + if (flowFactory == null) { + logger.warn("Unknown flow marker class in $sessionInit") + sendSessionReject("Don't know ${markerClass.name}") + return + } + + val session = try { + val flow = flowFactory(sender) + val fiber = createFiber(flow) + val session = FlowSession(flow, random63BitValue(), FlowSessionState.Initiated(sender, otherPartySessionId)) + if (sessionInit.firstPayload != null) { + session.receivedMessages += ReceivedSessionMessage(sender, SessionData(session.ourSessionId, sessionInit.firstPayload)) + } + openSessions[session.ourSessionId] = session + fiber.openSessions[Pair(flow, sender)] = session + updateCheckpoint(fiber) + session + } catch (e: Exception) { + logger.warn("Couldn't start session for $sessionInit", e) + sendSessionReject("Unable to establish session") + return + } + + sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), session.fiber) + session.fiber.logger.debug { "Initiated from $sessionInit on $session" } + startFiber(session.fiber) } private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes> { @@ -302,11 +317,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, return fiber.serialize(kryo) } - private fun deserializeFiber(serialisedFiber: SerializedBytes>): FlowStateMachineImpl<*> { + private fun deserializeFiber(checkpoint: Checkpoint): FlowStateMachineImpl<*> { val kryo = quasarKryo() // put the map of token -> tokenized into the kryo context SerializeAsTokenSerializer.setContext(kryo, serializationContext) - return serialisedFiber.deserialize(kryo).apply { fromCheckpoint = true } + return checkpoint.serializedFiber.deserialize(kryo).apply { fromCheckpoint = true } } private fun quasarKryo(): Kryo { @@ -330,14 +345,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, processIORequest(ioRequest) decrementLiveFibers() } - fiber.actionOnEnd = { + fiber.actionOnEnd = { errorResponse: FlowException? -> try { fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE mutex.locked { stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) } notifyChangeObservers(fiber, AddOrRemove.REMOVE) } - endAllFiberSessions(fiber) + endAllFiberSessions(fiber, errorResponse) } finally { fiber.commitTransaction() decrementLiveFibers() @@ -352,14 +367,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } } - private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>) { + private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, errorResponse: FlowException?) { + @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") + (errorResponse as java.lang.Throwable?)?.stackTrace = emptyArray() openSessions.values.removeIf { session -> if (session.fiber == fiber) { - val initiatedState = session.state as? FlowSessionState.Initiated - if (initiatedState != null) { - sendSessionMessage(initiatedState.peerParty, SessionEnd(initiatedState.peerSessionId), fiber) - recentlyClosedSessions[session.ourSessionId] = initiatedState.peerParty - } + session.endSession(errorResponse) true } else { false @@ -367,6 +380,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } } + private fun FlowSession.endSession(errorResponse: FlowException?) { + val initiatedState = state as? Initiated + if (initiatedState != null) { + sendSessionMessage( + initiatedState.peerParty, + SessionEnd(initiatedState.peerSessionId, errorResponse), + fiber) + recentlyClosedSessions[ourSessionId] = initiatedState.peerParty + } + } + private fun startFiber(fiber: FlowStateMachineImpl<*>) { try { resumeFiber(fiber) diff --git a/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt b/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt index 5c3798dbe6..3aef650af2 100644 --- a/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/NotaryChangeTests.kt @@ -11,18 +11,16 @@ import net.corda.core.utilities.DUMMY_NOTARY import net.corda.core.utilities.DUMMY_NOTARY_KEY import net.corda.flows.NotaryChangeFlow.Instigator import net.corda.flows.StateReplacementException -import net.corda.flows.StateReplacementRefused import net.corda.node.internal.AbstractNode import net.corda.node.services.network.NetworkMapService import net.corda.node.services.transactions.SimpleNotaryService import net.corda.testing.node.MockNetwork -import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.Before import org.junit.Test import java.time.Instant import java.util.* import kotlin.test.assertEquals -import kotlin.test.assertFailsWith import kotlin.test.assertTrue class NotaryChangeTests { @@ -83,8 +81,9 @@ class NotaryChangeTests { net.runNetwork() - val ex = assertFailsWith(StateReplacementException::class) { future.resultFuture.getOrThrow() } - assertThat(ex.error).isInstanceOf(StateReplacementRefused::class.java) + assertThatExceptionOfType(StateReplacementException::class.java).isThrownBy { + future.resultFuture.getOrThrow() + } } @Test diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index 3800dee863..db91cc92a4 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -34,7 +34,8 @@ import net.corda.testing.node.MockNetwork import net.corda.testing.node.MockNetwork.MockNode import net.corda.testing.sequence import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType import org.junit.After import org.junit.Before import org.junit.Test @@ -249,14 +250,14 @@ class StateMachineManagerTests { assertSessionTransfers(node2, node1 sent sessionInit(SendFlow::class, payload) to node2, node2 sent sessionConfirm to node1, - node1 sent sessionEnd to node2 + node1 sent sessionEnd() to node2 //There's no session end from the other flows as they're manually suspended ) assertSessionTransfers(node3, node1 sent sessionInit(SendFlow::class, payload) to node3, node3 sent sessionConfirm to node1, - node1 sent sessionEnd to node3 + node1 sent sessionEnd() to node3 //There's no session end from the other flows as they're manually suspended ) @@ -283,14 +284,14 @@ class StateMachineManagerTests { node1 sent sessionInit(ReceiveFlow::class) to node2, node2 sent sessionConfirm to node1, node2 sent sessionData(node2Payload) to node1, - node2 sent sessionEnd to node1 + node2 sent sessionEnd() to node1 ) assertSessionTransfers(node3, node1 sent sessionInit(ReceiveFlow::class) to node3, node3 sent sessionConfirm to node1, node3 sent sessionData(node3Payload) to node1, - node3 sent sessionEnd to node1 + node3 sent sessionEnd() to node1 ) } @@ -306,7 +307,7 @@ class StateMachineManagerTests { node2 sent sessionData(20L) to node1, node1 sent sessionData(11L) to node2, node2 sent sessionData(21L) to node1, - node1 sent sessionEnd to node2 + node1 sent sessionEnd() to node2 ) } @@ -368,18 +369,104 @@ class StateMachineManagerTests { } @Test - fun `exception thrown on other side`() { - val erroringFiber = node2.initiateSingleShotFlow(ReceiveFlow::class) { ExceptionFlow }.map { it.stateMachine as FlowStateMachineImpl } + fun `FlowException thrown on other side`() { + val erroringFlowFuture = node2.initiateSingleShotFlow(ReceiveFlow::class) { + ExceptionFlow { MyFlowException("Nothing useful") } + } val receivingFiber = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)) as FlowStateMachineImpl net.runNetwork() - assertThatThrownBy { receivingFiber.resultFuture.getOrThrow() }.isInstanceOf(FlowException::class.java) + assertThatExceptionOfType(MyFlowException::class.java) + .isThrownBy { receivingFiber.resultFuture.getOrThrow() } + .withMessage("Nothing useful") + .withStackTraceContaining("ReceiveFlow") // Make sure the stack trace is that of the receiving flow + databaseTransaction(node2.database) { + assertThat(node2.checkpointStorage.checkpoints()).isEmpty() + } + val errorFlow = erroringFlowFuture.getOrThrow() assertThat(receivingFiber.isTerminated).isTrue() - assertThat(erroringFiber.getOrThrow().isTerminated).isTrue() + assertThat((errorFlow.stateMachine as FlowStateMachineImpl).isTerminated).isTrue() assertSessionTransfers( node1 sent sessionInit(ReceiveFlow::class) to node2, node2 sent sessionConfirm to node1, - node2 sent sessionEnd to node1 + node2 sent sessionEnd(errorFlow.exceptionThrown) to node1 ) + // Make sure the original stack trace isn't sent down the wire + assertThat((sessionTransfers.last().message as SessionEnd).errorResponse!!.stackTrace).isEmpty() + } + + private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic() { + @Suspendable + override fun call() { + sendAndReceive(otherParty, payload) + } + } + + @Test + fun `FlowException thrown and there is a 3rd party flow`() { + val node3 = net.createNode(node1.info.address) + net.runNetwork() + + // Node 2 will send its payload and then block waiting for the receive from node 1. Meanwhile node 1 will move + // onto node 3 which will throw the exception + val node2Fiber = node2 + .initiateSingleShotFlow(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") } + .map { it.stateMachine } + node3.initiateSingleShotFlow(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } } + + val node1Fiber = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity)) as FlowStateMachineImpl + net.runNetwork() + + // Node 1 will terminate with the error it received from node 3 but it won't propagate that to node 2 (as it's + // not relevant to it) but it will end its session with it + assertThatExceptionOfType(MyFlowException::class.java).isThrownBy { + node1Fiber.resultFuture.getOrThrow() + } + val node2ResultFuture = node2Fiber.getOrThrow().resultFuture + assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy { + node2ResultFuture.getOrThrow() + } + + assertSessionTransfers(node2, + node1 sent sessionInit(ReceiveFlow::class) to node2, + node2 sent sessionConfirm to node1, + node2 sent sessionData("Hello") to node1, + node1 sent sessionEnd() to node2 // Unexpected session-end + ) + } + + private class ConditionalExceptionFlow(val otherParty: Party, val sendPayload: Any) : FlowLogic() { + @Suspendable + override fun call() { + val throwException = receive(otherParty).unwrap { it } + if (throwException) { + throw MyFlowException("Throwing exception as requested") + } + send(otherParty, sendPayload) + } + } + + @Test + fun `retry subFlow due to receiving FlowException`() { + class AskForExceptionFlow(val otherParty: Party, val throwException: Boolean) : FlowLogic() { + @Suspendable + override fun call(): String = sendAndReceive(otherParty, throwException).unwrap { it } + } + + class RetryOnExceptionFlow(val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): String { + return try { + subFlow(AskForExceptionFlow(otherParty, throwException = true)) + } catch (e: MyFlowException) { + subFlow(AskForExceptionFlow(otherParty, throwException = false)) + } + } + } + + node2.services.registerFlowInitiator(AskForExceptionFlow::class) { ConditionalExceptionFlow(it, "Hello") } + val resultFuture = node1.services.startFlow(RetryOnExceptionFlow(node2.info.legalIdentity)).resultFuture + net.runNetwork() + assertThat(resultFuture.getOrThrow()).isEqualTo("Hello") } private inline fun > MockNode.restartAndGetRestoredFlow( @@ -403,15 +490,16 @@ class StateMachineManagerTests { private fun sessionData(payload: Any) = SessionData(0, payload) - private val sessionEnd = SessionEnd(0) + private fun sessionEnd(error: FlowException? = null) = SessionEnd(0, error) private fun assertSessionTransfers(vararg expected: SessionTransfer) { assertThat(sessionTransfers).containsExactly(*expected) } - private fun assertSessionTransfers(node: MockNode, vararg expected: SessionTransfer) { + private fun assertSessionTransfers(node: MockNode, vararg expected: SessionTransfer): List { val actualForNode = sessionTransfers.filter { it.from == node.id || it.to == node.net.myAddress } assertThat(actualForNode).containsExactly(*expected) + return actualForNode } private data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) { @@ -440,7 +528,6 @@ class StateMachineManagerTests { private infix fun MockNode.sent(message: SessionMessage): Pair = Pair(id, message) private infix fun Pair.to(node: MockNode): SessionTransfer = SessionTransfer(first, second, node.net.myAddress) - private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic() { @Transient var flowStarted = false @@ -498,7 +585,16 @@ class StateMachineManagerTests { } } - private object ExceptionFlow : FlowLogic() { - override fun call(): Nothing = throw Exception() + private class ExceptionFlow(val exception: () -> E) : FlowLogic() { + lateinit var exceptionThrown: E + override fun call(): Nothing { + exceptionThrown = exception() + throw exceptionThrown + } + } + + private class MyFlowException(message: String) : FlowException(message) { + override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message + override fun hashCode(): Int = message?.hashCode() ?: 31 } } diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt index 1da82f1f5d..40ba74a159 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/SimmFlow.kt @@ -18,6 +18,8 @@ import net.corda.core.messaging.Ack import net.corda.core.node.PluginServiceHub import net.corda.core.node.services.dealsWith import net.corda.core.transactions.SignedTransaction +import net.corda.flows.AbstractStateReplacementFlow.Proposal +import net.corda.flows.StateReplacementException import net.corda.flows.TwoPartyDealFlow import net.corda.vega.analytics.* import net.corda.vega.contracts.* @@ -299,9 +301,12 @@ object SimmFlow { private fun updatePortfolio(portfolio: Portfolio) { logger.info("Handshake finished, awaiting Simm update") send(replyToParty, Ack) // Hack to state that this party is ready - subFlow(StateRevisionFlow.Receiver(replyToParty, { - it.portfolio == portfolio.refs - }), shareParentSessions = true) + subFlow(object : StateRevisionFlow.Receiver(replyToParty) { + override fun verifyProposal(proposal: Proposal) { + super.verifyProposal(proposal) + if (proposal.modification.portfolio != portfolio.refs) throw StateReplacementException() + } + }, shareParentSessions = true) } @Suspendable @@ -309,11 +314,12 @@ object SimmFlow { val portfolio = stateRef.state.data.portfolio.toStateAndRef(serviceHub).toPortfolio() val valuer = stateRef.state.data.valuer val valuation = agreeValuation(portfolio, offer.valuationDate, valuer) - - subFlow(StateRevisionFlow.Receiver(replyToParty) { - it.valuation == valuation + subFlow(object : StateRevisionFlow.Receiver(replyToParty) { + override fun verifyProposal(proposal: Proposal) { + super.verifyProposal(proposal) + if (proposal.modification.valuation != valuation) throw StateReplacementException() + } }, shareParentSessions = true) } - } } diff --git a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/StateRevisionFlow.kt b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/StateRevisionFlow.kt index ddf24d7967..2da3cce5a3 100644 --- a/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/StateRevisionFlow.kt +++ b/samples/simm-valuation-demo/src/main/kotlin/net/corda/vega/flows/StateRevisionFlow.kt @@ -1,13 +1,12 @@ package net.corda.vega.flows import net.corda.core.contracts.StateAndRef -import net.corda.core.contracts.StateRef import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party import net.corda.core.seconds import net.corda.core.transactions.SignedTransaction -import net.corda.core.utilities.UntrustworthyData import net.corda.flows.AbstractStateReplacementFlow +import net.corda.flows.StateReplacementException import net.corda.vega.contracts.RevisionedState /** @@ -15,18 +14,11 @@ import net.corda.vega.contracts.RevisionedState * on the update between two parties */ object StateRevisionFlow { - data class Proposal(override val stateRef: StateRef, - override val modification: T, - override val stx: SignedTransaction) : AbstractStateReplacementFlow.Proposal - - class Requester(curStateRef: StateAndRef>, val updatedData: T) - : AbstractStateReplacementFlow.Instigator, T>(curStateRef, updatedData) { - override fun assembleProposal(stateRef: StateRef, modification: T, stx: SignedTransaction): AbstractStateReplacementFlow.Proposal - = Proposal(stateRef, modification, stx) - + class Requester(curStateRef: StateAndRef>, + updatedData: T) : AbstractStateReplacementFlow.Instigator, T>(curStateRef, updatedData) { override fun assembleTx(): Pair> { val state = originalState.state.data - val tx = state.generateRevision(originalState.state.notary, originalState, updatedData) + val tx = state.generateRevision(originalState.state.notary, originalState, modification) tx.setTime(serviceHub.clock.instant(), 30.seconds) tx.signWith(serviceHub.legalIdentityKey) @@ -35,16 +27,12 @@ object StateRevisionFlow { } } - class Receiver(otherParty: Party, private val validate: (T) -> Boolean) - : AbstractStateReplacementFlow.Acceptor(otherParty) { - override fun verifyProposal(maybeProposal: UntrustworthyData>) - : AbstractStateReplacementFlow.Proposal { - return maybeProposal.unwrap { - val proposedTx = it.stx.tx - val state = it.stateRef - require(proposedTx.inputs.contains(state)) { "The proposed state $state is not in the proposed transaction inputs" } - require(validate(it.modification)) - it + open class Receiver(otherParty: Party) : AbstractStateReplacementFlow.Acceptor(otherParty) { + override fun verifyProposal(proposal: AbstractStateReplacementFlow.Proposal) { + val proposedTx = proposal.stx.tx + val state = proposal.stateRef + if (state !in proposedTx.inputs) { + throw StateReplacementException("The proposed state $state is not in the proposed transaction inputs") } } } diff --git a/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt b/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt index 177c393aa2..17661dfa81 100644 --- a/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt +++ b/tools/explorer/src/main/kotlin/net/corda/explorer/views/cordapps/cash/NewTransaction.kt @@ -16,9 +16,12 @@ import net.corda.core.contracts.Issued import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.withoutIssuer import net.corda.core.crypto.Party +import net.corda.core.flows.FlowException +import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.node.NodeInfo import net.corda.core.serialization.OpaqueBytes +import net.corda.core.toFuture import net.corda.core.transactions.SignedTransaction import net.corda.explorer.model.CashTransaction import net.corda.explorer.model.IssuerModel @@ -28,7 +31,6 @@ import net.corda.explorer.views.byteFormatter import net.corda.explorer.views.stringConverter import net.corda.flows.CashCommand import net.corda.flows.CashFlow -import net.corda.flows.CashFlowResult import net.corda.flows.IssuerFlow.IssuanceRequester import org.controlsfx.dialog.ExceptionDialog import tornadofx.Fragment @@ -87,33 +89,37 @@ class NewTransaction : Fragment() { } dialog.show() runAsync { - if (it is CashCommand.IssueCash) { + val handle = if (it is CashCommand.IssueCash) { myIdentity.value?.let { myIdentity -> rpcProxy.value!!.startFlow(::IssuanceRequester, it.amount, it.recipient, it.issueRef, - myIdentity.legalIdentity).returnValue.toBlocking().first() + myIdentity.legalIdentity) } + } else { + rpcProxy.value!!.startFlow(::CashFlow, it) } - else { - rpcProxy.value!!.startFlow(::CashFlow, it).returnValue.toBlocking().first() + val response = try { + handle?.returnValue?.toFuture()?.getOrThrow() + } catch (e: FlowException) { + e } + it to response }.ui { - dialog.contentText = when (it) { - is SignedTransaction -> { - dialog.alertType = Alert.AlertType.INFORMATION - "Cash Issued \nTransaction ID : ${it.id} \nMessage" - } - is CashFlowResult.Success -> { - dialog.alertType = Alert.AlertType.INFORMATION - "Transaction Started \nTransaction ID : ${it.transaction?.id} \nMessage : ${it.message}" - } - else -> { - dialog.alertType = Alert.AlertType.ERROR - it.toString() + val (command, response) = it + val (alertType, contentText) = if (response is FlowException) { + Alert.AlertType.ERROR to response.message + } else { + val type = when (command) { + is CashCommand.IssueCash -> "Cash Issued" + is CashCommand.ExitCash -> "Cash Exited" + is CashCommand.PayCash -> "Cash Paid" } + Alert.AlertType.INFORMATION to "$type \nTransaction ID : ${(response as SignedTransaction).id}" } + dialog.alertType = alertType + dialog.contentText = contentText dialog.dialogPane.isDisable = false dialog.dialogPane.scene.window.sizeToScene() }.setOnFailed { diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt index e50256132e..42ed3b0182 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/CrossCashTest.kt @@ -7,11 +7,13 @@ import net.corda.core.contracts.Issued import net.corda.core.contracts.PartyAndReference import net.corda.core.contracts.USD import net.corda.core.crypto.Party +import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow import net.corda.core.serialization.OpaqueBytes +import net.corda.core.toFuture import net.corda.flows.CashCommand +import net.corda.flows.CashException import net.corda.flows.CashFlow -import net.corda.flows.CashFlowResult import net.corda.loadtest.LoadTest import net.corda.loadtest.NodeHandle import org.slf4j.LoggerFactory @@ -205,14 +207,11 @@ val crossCashTest = LoadTest( }, execute = { command -> - val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toBlocking().first() - when (result) { - is CashFlowResult.Success -> { - log.info(result.message) - } - is CashFlowResult.Failed -> { - log.error(result.message) - } + try { + val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toFuture().getOrThrow() + log.info("Success: $result") + } catch (e: CashException) { + log.error("Failure", e) } }, diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt index d8201bab0d..2abc1a1270 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/tests/SelfIssueTest.kt @@ -7,10 +7,12 @@ import net.corda.client.mock.replicatePoisson import net.corda.contracts.asset.Cash import net.corda.core.contracts.USD import net.corda.core.crypto.Party +import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow +import net.corda.core.toFuture import net.corda.flows.CashCommand +import net.corda.flows.CashException import net.corda.flows.CashFlow -import net.corda.flows.CashFlowResult import net.corda.loadtest.LoadTest import net.corda.loadtest.NodeHandle import org.slf4j.LoggerFactory @@ -60,14 +62,11 @@ val selfIssueTest = LoadTest( }, execute = { command -> - val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toBlocking().first() - when (result) { - is CashFlowResult.Success -> { - log.info(result.message) - } - is CashFlowResult.Failed -> { - log.error(result.message) - } + try { + val result = command.node.connection.proxy.startFlow(::CashFlow, command.command).returnValue.toFuture().getOrThrow() + log.info("Success: $result") + } catch (e: CashException) { + log.error("Failure", e) } },