From f3c9b458fdf15038b1ff150298291efda4d96db2 Mon Sep 17 00:00:00 2001 From: Mike Hearn Date: Fri, 6 Jan 2017 10:42:04 +0100 Subject: [PATCH] Improve the flow commit API. Make FinalityFlow do more, and be used more consistently. Add a new waitForLedgerCommit API that is intended to be used at the end of flows, or at any other point where a flow wants to wait for a transaction to finalise (but the finalisation flow is being done by someone else). Update the docs a bit. --- .../kotlin/net/corda/core/flows/FlowLogic.kt | 12 ++ .../net/corda/core/flows/FlowStateMachine.kt | 5 + .../net/corda/core/messaging/Messaging.kt | 3 +- .../kotlin/net/corda/core/node/ServiceHub.kt | 5 +- .../net/corda/core/node/services/Services.kt | 20 ++- .../corda/flows/BroadcastTransactionFlow.kt | 11 +- .../kotlin/net/corda/flows/FinalityFlow.kt | 117 ++++++++++++++---- .../main/kotlin/net/corda/flows/NotaryFlow.kt | 1 - .../corda/flows/ResolveTransactionsFlow.kt | 11 +- .../corda/docs/FxTransactionBuildTutorial.kt | 2 +- .../docs/WorkflowTransactionBuildTutorial.kt | 17 +-- docs/source/flow-state-machines.rst | 96 +++++++------- docs/source/release-notes.rst | 4 + .../main/kotlin/net/corda/flows/CashFlow.kt | 5 +- .../net/corda/flows/TwoPartyTradeFlow.kt | 112 ++++++----------- node/build.gradle | 10 +- .../persistence/DataVendingService.kt | 1 - .../services/statemachine/FlowIORequest.kt | 13 +- .../statemachine/FlowStateMachineImpl.kt | 20 ++- .../statemachine/StateMachineManager.kt | 68 +++++++++- .../node/services/vault/NodeVaultService.kt | 7 -- .../statemachine/StateMachineManagerTests.kt | 46 ++++++- .../net/corda/traderdemo/flow/SellerFlow.kt | 5 +- 23 files changed, 374 insertions(+), 217 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 519a1c0149..ea32e377af 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -2,7 +2,9 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash import net.corda.core.node.ServiceHub +import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.UntrustworthyData import org.slf4j.Logger @@ -171,6 +173,16 @@ abstract class FlowLogic { } } + /** + * Suspends the flow until the transaction with the specified ID is received, successfully verified and + * sent to the vault for processing. Note that this call suspends until the transaction is considered + * valid by the local node, but that doesn't imply the vault will consider it relevant. + */ + @Suspendable + fun waitForLedgerCommit(hash: SecureHash): SignedTransaction { + return stateMachine.waitForLedgerCommit(hash, this) + } + //////////////////////////////////////////////////////////////////////////////////////////////////////////// private var _stateMachine: FlowStateMachine<*>? = null diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt index 25aa753a9c..974b9e0d45 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt @@ -3,7 +3,9 @@ package net.corda.core.flows import co.paralleluniverse.fibers.Suspendable import com.google.common.util.concurrent.ListenableFuture import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash import net.corda.core.node.ServiceHub +import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.UntrustworthyData import org.slf4j.Logger import java.util.* @@ -35,6 +37,9 @@ interface FlowStateMachine { @Suspendable fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>) + @Suspendable + fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction + val serviceHub: ServiceHub val logger: Logger val id: StateMachineRunId diff --git a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt index 6b29086153..d1499cdc81 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/Messaging.kt @@ -43,7 +43,8 @@ interface MessagingService { /** * The provided function will be invoked for each received message whose topic and session matches. The callback - * will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result. + * will run on the main server thread provided when the messaging service is constructed, and a database + * transaction is set up for you automatically. * * The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler]. * The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index 8ddb260ed0..f516593fb6 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -29,11 +29,12 @@ interface ServiceHub { val myInfo: NodeInfo /** - * Given a list of [SignedTransaction]s, writes them to the local storage for validated transactions and then - * sends them to the vault for further processing. + * Given a [SignedTransaction], writes it to the local storage for validated transactions and then + * sends them to the vault for further processing. Expects to be run within a database transaction. * * @param txs The transactions to record. */ + // TODO: Make this take a single tx. fun recordTransactions(txs: Iterable) /** diff --git a/core/src/main/kotlin/net/corda/core/node/services/Services.kt b/core/src/main/kotlin/net/corda/core/node/services/Services.kt index 997331c1fd..800040174c 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/Services.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/Services.kt @@ -178,10 +178,22 @@ interface VaultService { fun getTransactionNotes(txnId: SecureHash): Iterable /** - * [InsufficientBalanceException] is thrown when a Cash Spending transaction fails because - * there is insufficient quantity for a given currency (and optionally set of Issuer Parties). - * Note: an [Amount] of [Currency] is only fungible for a given Issuer Party within a [FungibleAsset] - **/ + * Generate a transaction that moves an amount of currency to the given pubkey. + * + * Note: an [Amount] of [Currency] is only fungible for a given Issuer Party within a [FungibleAsset] + * + * @param tx A builder, which may contain inputs, outputs and commands already. The relevant components needed + * to move the cash will be added on top. + * @param amount How much currency to send. + * @param to a key of the recipient. + * @param onlyFromParties if non-null, the asset states will be filtered to only include those issued by the set + * of given parties. This can be useful if the party you're trying to pay has expectations + * about which type of asset claims they are willing to accept. + * @return A [Pair] of the same transaction builder passed in as [tx], and the list of keys that need to sign + * the resulting transaction for it to be valid. + * @throws InsufficientBalanceException when a cash spending transaction fails because + * there is insufficient quantity for a given currency (and optionally set of Issuer Parties). + */ @Throws(InsufficientBalanceException::class) fun generateSpend(tx: TransactionBuilder, amount: Amount, diff --git a/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt b/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt index 952bb80afb..eec9561040 100644 --- a/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/BroadcastTransactionFlow.kt @@ -3,13 +3,13 @@ package net.corda.flows import co.paralleluniverse.fibers.Suspendable import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic -import net.corda.core.node.recordTransactions import net.corda.core.transactions.SignedTransaction /** - * Notify all involved parties about a transaction, including storing a copy. Normally this would be called via - * [FinalityFlow]. + * Notify the specified parties about a transaction. The remote peers will download this transaction and its + * dependency graph, verifying them all. The flow returns when all peers have acknowledged the transactions + * as valid. Normally you wouldn't use this directly, it would be called via [FinalityFlow]. * * @param notarisedTransaction transaction which has been notarised (if needed) and is ready to notify nodes about. * @param participants a list of participants involved in the transaction. @@ -17,17 +17,14 @@ import net.corda.core.transactions.SignedTransaction */ class BroadcastTransactionFlow(val notarisedTransaction: SignedTransaction, val participants: Set) : FlowLogic() { - data class NotifyTxRequest(val tx: SignedTransaction) @Suspendable override fun call() { - // Record it locally - serviceHub.recordTransactions(notarisedTransaction) - // TODO: Messaging layer should handle this broadcast for us val msg = NotifyTxRequest(notarisedTransaction) participants.filter { it != serviceHub.myInfo.legalIdentity }.forEach { participant -> + // This pops out the other side in DataVending.NotifyTransactionHandler. send(participant, msg) } } diff --git a/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt b/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt index 5dac3167b5..61a18d7ef3 100644 --- a/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/FinalityFlow.kt @@ -1,48 +1,87 @@ package net.corda.flows import co.paralleluniverse.fibers.Suspendable +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState import net.corda.core.crypto.Party import net.corda.core.flows.FlowLogic +import net.corda.core.node.ServiceHub +import net.corda.core.transactions.LedgerTransaction import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker - /** - * Finalise a transaction by notarising it, then recording it locally, and then sending it to all involved parties. + * Verifies the given transactions, then sends them to the named notaries. If the notary agrees that the transactions + * are acceptable then they are from that point onwards committed to the ledger, and will be written through to the + * vault. Additionally they will be distributed to the parties reflected in the participants list of the states. * - * @param transaction to commit. - * @param participants a list of participants involved in the transaction. - * @return a list of participants who were successfully notified of the transaction. + * The transactions will be topologically sorted before commitment to ensure that dependencies are committed before + * dependers, so you don't need to do this yourself. + * + * The transactions are expected to have already been resolved: if their dependencies are not available in local + * storage or within the given set, verification will fail. They must have signatures from all necessary parties + * other than the notary. + * + * If specified, the extra recipients are sent all the given transactions. The base set of parties to inform of each + * transaction are calculated on a per transaction basis from the contract-given set of participants. + * + * The flow returns the same transactions, in the same order, with the additional signatures. + * + * @param transactions What to commit. + * @param extraRecipients A list of additional participants to inform of the transaction. */ -class FinalityFlow(val transaction: SignedTransaction, - val participants: Set, - override val progressTracker: ProgressTracker) : FlowLogic() { - constructor(transaction: SignedTransaction, participants: Set) : this(transaction, participants, tracker()) +class FinalityFlow(val transactions: Iterable, + val extraRecipients: Set, + override val progressTracker: ProgressTracker) : FlowLogic>() { + constructor(transaction: SignedTransaction, extraParticipants: Set) : this(listOf(transaction), extraParticipants, tracker()) + constructor(transaction: SignedTransaction) : this(listOf(transaction), emptySet(), tracker()) companion object { - object NOTARISING : ProgressTracker.Step("Requesting signature by notary service") + object NOTARISING : ProgressTracker.Step("Requesting signature by notary service") { + override fun childProgressTracker() = NotaryFlow.Client.tracker() + } object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants") + // TODO: Make all tracker() methods @JvmStatic fun tracker() = ProgressTracker(NOTARISING, BROADCASTING) } @Suspendable @Throws(NotaryException::class) - override fun call() { - // TODO: Resolve the tx here: it's probably already been done, but re-resolution is a no-op and it'll make the API more forgiving. - + override fun call(): List { + // Note: this method is carefully broken up to minimize the amount of data reachable from the stack at + // the point where subFlow is invoked, as that minimizes the checkpointing work to be done. + // + // Lookup the resolved transactions and use them to map each signed transaction to the list of participants. + // Then send to the notary if needed, record locally and distribute. progressTracker.currentStep = NOTARISING - // Notarise the transaction if needed - val notarisedTransaction = if (needsNotarySignature(transaction)) { - val notarySig = subFlow(NotaryFlow.Client(transaction)) - transaction.withAdditionalSignature(notarySig) - } else { - transaction - } + val notarisedTxns = notariseAndRecord(lookupParties(resolveDependenciesOf(transactions))) - // Let everyone else know about the transaction + // Each transaction has its own set of recipients, but extra recipients get them all. progressTracker.currentStep = BROADCASTING - subFlow(BroadcastTransactionFlow(notarisedTransaction, participants)) + val me = serviceHub.myInfo.legalIdentity + for ((stx, parties) in notarisedTxns) { + subFlow(BroadcastTransactionFlow(stx, parties + extraRecipients - me)) + } + return notarisedTxns.map { it.first } + } + + // TODO: API: Make some of these protected? + + @Suspendable + private fun notariseAndRecord(stxnsAndParties: List>>): List>> { + return stxnsAndParties.map { pair -> + val stx = pair.first + val notarised = if (needsNotarySignature(stx)) { + val notarySig = subFlow(NotaryFlow.Client(stx)) + stx + notarySig + } else { + stx + } + serviceHub.recordTransactions(listOf(notarised)) + Pair(notarised, pair.second) + } } private fun needsNotarySignature(stx: SignedTransaction) = stx.tx.notary != null && hasNoNotarySignature(stx) @@ -51,4 +90,38 @@ class FinalityFlow(val transaction: SignedTransaction, val signers = stx.sigs.map { it.by }.toSet() return !(notaryKey?.isFulfilledBy(signers) ?: false) } + + private fun lookupParties(ltxns: List>): List>> { + return ltxns.map { pair -> + val (stx, ltx) = pair + // Calculate who is meant to see the results based on the participants involved. + val keys = ltx.outputs.flatMap { it.data.participants } + ltx.inputs.flatMap { it.state.data.participants } + // TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them count as a reason to fail? + val parties = keys.mapNotNull { serviceHub.identityService.partyFromKey(it) }.toSet() + Pair(stx, parties) + } + } + + private fun resolveDependenciesOf(signedTransactions: Iterable): List> { + // Make sure the dependencies come before the dependers. + val sorted = ResolveTransactionsFlow.topologicalSort(signedTransactions.toList()) + // Build a ServiceHub that consults the argument list as well as what's in local tx storage so uncommitted + // transactions can depend on each other. + val augmentedLookup = object : ServiceHub by serviceHub { + val hashToTx = sorted.associateBy { it.id } + override fun loadState(stateRef: StateRef): TransactionState<*> { + val provided: TransactionState? = hashToTx[stateRef.txhash]?.let { it.tx.outputs[stateRef.index] } + return provided ?: super.loadState(stateRef) + } + } + // Load and verify each transaction. + return sorted.map { stx -> + val notary = stx.tx.notary + // The notary signature is allowed to be missing but no others. + val wtx = if (notary != null) stx.verifySignatures(notary.owningKey) else stx.verifySignatures() + val ltx = wtx.toLedgerTransaction(augmentedLookup) + ltx.verify() + stx to ltx + } + } } diff --git a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt index af6df1cf54..f29281fc7c 100644 --- a/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/NotaryFlow.kt @@ -16,7 +16,6 @@ import net.corda.core.transactions.WireTransaction import net.corda.core.utilities.ProgressTracker object NotaryFlow { - /** * A flow to be used for obtaining a signature from a [NotaryService] ascertaining the transaction * timestamp is correct and none of its inputs have been used in another completed transaction. diff --git a/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt index 07f573ab67..f15c614155 100644 --- a/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt +++ b/core/src/main/kotlin/net/corda/flows/ResolveTransactionsFlow.kt @@ -34,13 +34,16 @@ class ResolveTransactionsFlow(private val txHashes: Set, companion object { private fun dependencyIDs(wtx: WireTransaction) = wtx.inputs.map { it.txhash }.toSet() - private fun topologicalSort(transactions: Collection): List { + /** + * Topologically sorts the given transactions such that dependencies are listed before dependers. */ + @JvmStatic + fun topologicalSort(transactions: Collection): List { // Construct txhash -> dependent-txs map val forwardGraph = HashMap>() - transactions.forEach { tx -> - tx.tx.inputs.forEach { input -> + transactions.forEach { stx -> + stx.tx.inputs.forEach { input -> // Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is) - forwardGraph.getOrPut(input.txhash) { LinkedHashSet() }.add(tx) + forwardGraph.getOrPut(input.txhash) { LinkedHashSet() }.add(stx) } } diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt index a75ff24521..02f25559eb 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/FxTransactionBuildTutorial.kt @@ -174,7 +174,7 @@ class ForeignExchangeFlow(val tradeId: String, withNewSignature // return the almost complete transaction } - // Initiate the standard protocol to notarise and distribute to the involved parties + // Initiate the standard protocol to notarise and distribute to the involved parties. subFlow(FinalityFlow(allPartySignedTx, setOf(baseCurrencyBuyer, baseCurrencySeller))) return allPartySignedTx.id diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt index dae7c1efe8..0818835fa1 100644 --- a/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt +++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/WorkflowTransactionBuildTutorial.kt @@ -27,6 +27,7 @@ inline fun ServiceHub.latest(ref: StateRef): StateAndR val original = toStateAndRef(ref) return linearHeads.get(original.state.data.linearId)!! } + // DOCEND 1 // Minimal state model of a manual approval process @@ -121,18 +122,14 @@ class SubmitTradeApprovalFlow(val tradeId: String, // identify a notary. This might also be done external to the flow val notary = serviceHub.networkMapCache.getAnyNotary() // Create the TransactionBuilder and populate with the new state. - val tx = TransactionType. - General. - Builder(notary). - withItems(tradeProposal, - Command(TradeApprovalContract.Commands.Issue(), - listOf(tradeProposal.source.owningKey))) + val tx = TransactionType.General.Builder(notary) + .withItems(tradeProposal, Command(TradeApprovalContract.Commands.Issue(), listOf(tradeProposal.source.owningKey))) tx.setTime(serviceHub.clock.instant(), Duration.ofSeconds(60)) // We can automatically sign as there is no untrusted data. tx.signWith(serviceHub.legalIdentityKey) // Convert to a SignedTransaction that we can send to the notary val signedTx = tx.toSignedTransaction(false) - // Run the FinalityFlow to notarise and distribute the SignedTransaction to the counterparty + // Notarise and distribute. subFlow(FinalityFlow(signedTx, setOf(serviceHub.myInfo.legalIdentity, counterparty))) // Return the initial state return signedTx.tx.outRef(0) @@ -210,10 +207,8 @@ class SubmitCompletionFlow(val ref: StateRef, val verdict: WorkflowState) : Flow agreedTx } // DOCSTART 4 - // Run the FinalityFlow to notarise and distribute the completed transaction. - subFlow(FinalityFlow(allPartySignedTx, - setOf(latestRecord.state.data.source, latestRecord.state.data.counterparty))) - + // Notarise and distribute the completed transaction. + subFlow(FinalityFlow(allPartySignedTx, setOf(latestRecord.state.data.source, latestRecord.state.data.counterparty))) // DOCEND 4 // Return back the details of the completed state/transaction. return allPartySignedTx.tx.outRef(0) diff --git a/docs/source/flow-state-machines.rst b/docs/source/flow-state-machines.rst index f01b24d79c..075057ec72 100644 --- a/docs/source/flow-state-machines.rst +++ b/docs/source/flow-state-machines.rst @@ -89,7 +89,9 @@ Our flow has two parties (B and S for buyer and seller) and will proceed as foll 2. B sends to S a ``SignedTransaction`` that includes the state as input, B's cash as input, the state with the new owner key as output, and any change cash as output. It contains a single signature from B but isn't valid because it lacks a signature from S authorising movement of the asset. -3. S signs it and hands the now finalised ``SignedTransaction`` back to B. +3. S signs it and *finalises* the transaction. This means sending it to the notary, which checks the transaction for + validity, recording the transaction in the local vault, and then sending it back to B who also checks it and commits + the transaction to their local vault. You can find the implementation of this flow in the file ``finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt``. @@ -98,8 +100,7 @@ represents an atomic asset swap. Note that it's the *seller* who initiates contact with the buyer, not vice-versa as you might imagine. -We start by defining a wrapper that namespaces the flow code, two functions to start either the buy or sell side -of the flow, and two classes that will contain the flow definition. We also pick what data will be used by +We start by defining two classes that will contain the flow definition. We also pick what data will be used by each side. .. note:: The code samples in this tutorial are only available in Kotlin, but you can use any JVM language to @@ -110,7 +111,6 @@ each side. .. sourcecode:: kotlin object TwoPartyTradeFlow { - class UnacceptablePriceException(val givenPrice: Amount) : FlowException("Unacceptable price: $givenPrice") class AssetMismatchException(val expectedTypeName: String, val typeName: String) : FlowException() { override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName" @@ -188,8 +188,6 @@ and try again. .. note:: Java 9 is likely to remove this pre-marking requirement completely. -.. note:: Accessing the vault from inside an @Suspendable function (e.g. via ``serviceHub.vaultService``) can cause a serialisation error when the fiber suspends. Instead, vault access should be performed from a helper non-suspendable function, which you then call from the @Suspendable function. We are working to fix this. - Starting your flow ------------------ @@ -248,12 +246,11 @@ Let's implement the ``Seller.call`` method. This will be run when the flow is in :dedent: 4 Here we see the outline of the procedure. We receive a proposed trade transaction from the buyer and check that it's -valid. The buyer has already attached their signature before sending it. Then we calculate and attach our own signature so that the transaction is -now signed by both the buyer and the seller. We then send this request to a notary to assert with another signature that the -timestamp in the transaction (if any) is valid and there are no double spends, and send back both -our signature and the notaries signature. Note we should not send to the notary until all other required signatures have been appended -as the notary may validate the signatures as well as verifying for itself the transactional integrity. -Finally, we hand back to the code that invoked the flow the finished transaction. +valid. The buyer has already attached their signature before sending it. Then we calculate and attach our own signature +so that the transaction is now signed by both the buyer and the seller. We then *finalise* this transaction by sending +it to a notary to assert (with another signature) that the timestamp in the transaction (if any) is valid and there are no +double spends. Finally, after the finalisation process is complete, we retrieve the now fully signed transaction from +local storage. It will have the same ID as the one we started with but more signatures. Let's fill out the ``receiveAndCheckProposedTransaction()`` method. @@ -327,24 +324,39 @@ Throwing a ``FlowException`` enables a flow to reject a piece of data it has rec done in the ``unwrap`` method of the received ``UntrustworthyData``. In the above example the seller checks the price and throws ``FlowException`` if it's invalid. It's then up to the buyer to either try again with a better price or give up. -Sub-flows ---------- +Sub-flows and finalisation +-------------------------- Flows can be composed via nesting. Invoking a sub-flow looks similar to an ordinary function call: .. container:: codeset - .. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt - :language: kotlin - :start-after: DOCSTART 6 - :end-before: DOCEND 6 - :dedent: 4 + .. sourcecode:: kotlin + + @Suspendable + fun call() { + val unnotarisedTransaction = ... + subFlow(FinalityFlow(unnotarisedTransaction)) + } + + .. sourcecode:: java + + @Suspendable + public void call() throws FlowException { + SignedTransaction unnotarisedTransaction = ... + subFlow(new FinalityFlow(unnotarisedTransaction)) + } + +In this code snippet we are using the ``FinalityFlow`` to finish off the transaction. It will: + +* Send the transaction to the chosen notary and, if necessary, satisfy the notary that the transaction is valid. +* Record the transaction in the local vault, if it is relevant (i.e. involves the owner of the node). +* Send the fully signed transaction to the other participants for recording as well. -In this code snippet we are using the ``NotaryFlow.Client`` to request notarisation of the transaction. We simply create the flow object via its constructor, and then pass it to the ``subFlow`` method which returns the result of the flow's execution directly. Behind the scenes all this is doing is wiring up progress -tracking (discussed more below) and then running the objects ``call`` method. Because this little helper method can -be on the stack when network IO takes place, we mark it as ``@Suspendable``. +tracking (discussed more below) and then running the objects ``call`` method. Because the sub-flow might suspend, +we must mark the method that invokes it as suspendable. Going back to the previous code snippet, we use a sub-flow called ``ResolveTransactionsFlow``. This is responsible for downloading and checking all the dependencies of a transaction, which in Corda are always retrievable @@ -360,32 +372,11 @@ objects, but we don't need them here so we just ignore the return value. After the dependencies, we check the proposed trading transaction for validity by running the contracts for that as well (but having handled the fact that some signatures are missing ourselves). -Here's the rest of the code: - -.. container:: codeset - - .. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt - :language: kotlin - :start-after: DOCSTART 7 - :end-before: DOCEND 7 - :dedent: 4 - -It's all pretty straightforward from now on. Here ``id`` is the secure hash representing the serialised -transaction, and we just use our private key to calculate a signature over it. As a reminder, in Corda signatures do -not cover other signatures: just the core of the transaction data. - -In ``sendSignatures``, we take the two signatures we obtained and add them to the partial transaction we were sent. -There is an overload for the + operator so signatures can be added to a SignedTransaction easily. Finally, we wrap the -two signatures in a simple wrapper message class and send it back. The send won't block waiting for an acknowledgement, -but the underlying message queue software will retry delivery if the other side has gone away temporarily. - -You can also see that every flow instance has a logger (using the SLF4J API) which you can use to log progress -messages. - -.. warning:: This sample code is **not secure**. Other than not checking for all possible invalid constructions, if the - seller stops before sending the finalised transaction to the buyer, the seller is left with a valid transaction - but the buyer isn't, so they can't spend the asset they just purchased! This sort of thing will be fixed in a - future version of the code. +.. warning:: If the seller stops before sending the finalised transaction to the buyer, the seller is left with a + valid transaction but the buyer isn't, so they can't spend the asset they just purchased! This sort of thing is not + always a risk (as the seller may not gain anything from that sort of behaviour except a lawsuit), but if it is, a future + version of the platform will allow you to ask the notary to send you the transaction as well, in case your counterparty + does not. This is not the default because it reveals more private info to the notary. Implementing the buyer ---------------------- @@ -403,12 +394,11 @@ OK, let's do the same for the buyer side: This code is longer but no more complicated. Here are some things to pay attention to: 1. We do some sanity checking on the received message to ensure we're being offered what we expected to be offered. -2. We create a cash spend in the normal way, by using ``VaultService.generateSpend``. See the vault documentation if this - part isn't clear. +2. We create a cash spend using ``VaultService.generateSpend``. You can read the vault documentation to learn more about this. 3. We access the *service hub* when we need it to access things that are transient and may change or be recreated whilst a flow is suspended, things like the wallet or the network map. -4. Finally, we send the unfinished, invalid transaction to the seller so they can sign it. They are expected to send - back to us a ``SignaturesFromSeller``, which once we verify it, should be the final outcome of the trade. +4. We send the unfinished, invalid transaction to the seller so they can sign it and finalise it. +5. Finally, we wait for the finished transaction to arrive in our local storage and vault. As you can see, the flow logic is straightforward and does not contain any callbacks or network glue code, despite the fact that it takes minimal resources and can survive node restarts. @@ -435,7 +425,7 @@ A flow might declare some steps with code inside the flow class like this: .. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt :language: kotlin :start-after: DOCSTART 2 - :end-before: DOCSTART 1 + :end-before: DOCEND 2 :dedent: 4 .. sourcecode:: java diff --git a/docs/source/release-notes.rst b/docs/source/release-notes.rst index 5d2d25e9e7..4731eb7ab1 100644 --- a/docs/source/release-notes.rst +++ b/docs/source/release-notes.rst @@ -10,6 +10,10 @@ Milestone 8 * ``Party`` equality is now based on the owning key, rather than the owning key and name. This is important for party anonymisation to work, as each key must identify exactly one party. + * A new ``waitForLedgerCommit`` method is available inside flows. Given a hash it will suspend the flow until + a valid transaction with that hash has been received, committed and processed by the vault. This is useful + in multi-party flows where one side takes responsibility for sending the finished transaction to the notary, + and the other side wishes to wait for it. Milestone 7 ----------- diff --git a/finance/src/main/kotlin/net/corda/flows/CashFlow.kt b/finance/src/main/kotlin/net/corda/flows/CashFlow.kt index 5214ff907c..9aeb53ca77 100644 --- a/finance/src/main/kotlin/net/corda/flows/CashFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/CashFlow.kt @@ -123,9 +123,8 @@ class CashFlow(val command: CashCommand, override val progressTracker: ProgressT Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary) val myKey = serviceHub.legalIdentityKey builder.signWith(myKey) - val tx = builder.toSignedTransaction(checkSufficientSignatures = true) - // Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it - subFlow(BroadcastTransactionFlow(tx, setOf(req.recipient))) + val tx = builder.toSignedTransaction() + subFlow(FinalityFlow(tx)) return tx } } diff --git a/finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt b/finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt index 50341d8ded..aadd430fe3 100644 --- a/finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt @@ -25,23 +25,17 @@ import java.util.* * 2. B sends to S a [SignedTransaction] that includes the state as input, B's cash as input, the state with the new * owner key as output, and any change cash as output. It contains a single signature from B but isn't valid because * it lacks a signature from S authorising movement of the asset. - * 3. S signs it and hands the now finalised SignedWireTransaction back to B. + * 3. S signs it and commits it to the ledger, notarising it and distributing the final signed transaction back + * to B. * * Assuming no malicious termination, they both end the flow being in posession of a valid, signed transaction * that represents an atomic asset swap. * * Note that it's the *seller* who initiates contact with the buyer, not vice-versa as you might imagine. - * - * To initiate the flow, use either the [runBuyer] or [runSeller] methods, depending on which side of the trade - * your node is taking. These methods return a future which will complete once the trade is over and a fully signed - * transaction is available: you can either block your thread waiting for the flow to complete by using - * [ListenableFuture.get] or more usefully, register a callback that will be invoked when the time comes. - * - * To see an example of how to use this class, look at the unit tests. */ -// TODO: Common elements in multi-party transaction consensus and signing should be refactored into a superclass of this -// and [AbstractStateReplacementFlow]. object TwoPartyTradeFlow { + // TODO: Common elements in multi-party transaction consensus and signing should be refactored into a superclass of this + // and [AbstractStateReplacementFlow]. class UnacceptablePriceException(givenPrice: Amount) : FlowException("Unacceptable price: $givenPrice") class AssetMismatchException(val expectedTypeName: String, val typeName: String) : FlowException() { @@ -70,35 +64,26 @@ object TwoPartyTradeFlow { object VERIFYING : ProgressTracker.Step("Verifying transaction proposal") object SIGNING : ProgressTracker.Step("Signing transaction") // DOCSTART 3 - object NOTARY : ProgressTracker.Step("Getting notary signature") { + object COMMITTING : ProgressTracker.Step("Committing transaction to the ledger") { override fun childProgressTracker() = FinalityFlow.tracker() } // DOCEND 3 - object SENDING_SIGS : ProgressTracker.Step("Sending transaction signatures to buyer") + object SENDING_FINAL_TX : ProgressTracker.Step("Sending final transaction to buyer") - fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, NOTARY, SENDING_SIGS) + fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, COMMITTING, SENDING_FINAL_TX) } // DOCSTART 4 @Suspendable override fun call(): SignedTransaction { - val partialTX: SignedTransaction = receiveAndCheckProposedTransaction() - val ourSignature: DigitalSignature.WithKey = calculateOurSignature(partialTX) - val allPartySignedTx: SignedTransaction = partialTX + ourSignature - val notarySignature: DigitalSignature.WithKey = getNotarySignature(allPartySignedTx) - val result: SignedTransaction = sendSignatures(allPartySignedTx, ourSignature, notarySignature) - return result + val partialSTX: SignedTransaction = receiveAndCheckProposedTransaction() + val ourSignature = calculateOurSignature(partialSTX) + val unnotarisedSTX: SignedTransaction = partialSTX + ourSignature + val finishedSTX = subFlow(FinalityFlow(unnotarisedSTX)).single() + return finishedSTX } // DOCEND 4 - // DOCSTART 6 - @Suspendable - private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.WithKey { - progressTracker.currentStep = NOTARY - return subFlow(NotaryFlow.Client(stx)) - } - // DOCEND 6 - // DOCSTART 5 @Suspendable private fun receiveAndCheckProposedTransaction(): SignedTransaction { @@ -107,14 +92,12 @@ object TwoPartyTradeFlow { val myPublicKey = myKeyPair.public.composite // Make the first message we'll send to kick off the flow. val hello = SellerTradeInfo(assetToSell, price, myPublicKey) - - val maybeSTX = sendAndReceive(otherParty, hello) + // What we get back from the other side is a transaction that *might* be valid and acceptable to us, + // but we must check it out thoroughly before we sign! + val untrustedSTX = sendAndReceive(otherParty, hello) progressTracker.currentStep = VERIFYING - - maybeSTX.unwrap { - progressTracker.nextStep() - + return untrustedSTX.unwrap { // Check that the tx proposed by the buyer is valid. val wtx: WireTransaction = it.verifySignatures(myPublicKey, notaryNode.notaryIdentity.owningKey) logger.trace { "Received partially signed transaction: ${it.id}" } @@ -123,11 +106,10 @@ object TwoPartyTradeFlow { // even though it is missing signatures. subFlow(ResolveTransactionsFlow(wtx, otherParty)) - if (wtx.outputs.map { it.data }.sumCashBy(myPublicKey).withoutIssuer() != price) { + if (wtx.outputs.map { it.data }.sumCashBy(myPublicKey).withoutIssuer() != price) throw FlowException("Transaction is not sending us the right amount of cash") - } - return it + it } } // DOCEND 5 @@ -144,64 +126,50 @@ object TwoPartyTradeFlow { // but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to // express flow state machines on top of the messaging layer. - // DOCSTART 7 open fun calculateOurSignature(partialTX: SignedTransaction): DigitalSignature.WithKey { progressTracker.currentStep = SIGNING return myKeyPair.signWithECDSA(partialTX.id) } - - @Suspendable - private fun sendSignatures(allPartySignedTx: SignedTransaction, - ourSignature: DigitalSignature.WithKey, - notarySignature: DigitalSignature.WithKey): SignedTransaction { - progressTracker.currentStep = SENDING_SIGS - val fullySigned = allPartySignedTx + notarySignature - - logger.trace { "Built finished transaction, sending back to secondary!" } - - send(otherParty, SignaturesFromSeller(ourSignature, notarySignature)) - return fullySigned - } - // DOCEND 7 } - // DOCSTART 2 open class Buyer(val otherParty: Party, val notary: Party, val acceptablePrice: Amount, val typeToBuy: Class) : FlowLogic() { - + // DOCSTART 2 object RECEIVING : ProgressTracker.Step("Waiting for seller trading info") object VERIFYING : ProgressTracker.Step("Verifying seller assets") object SIGNING : ProgressTracker.Step("Generating and signing transaction proposal") - object SWAPPING_SIGNATURES : ProgressTracker.Step("Swapping signatures with the seller") + object SENDING_SIGNATURES : ProgressTracker.Step("Sending signatures to the seller") + object WAITING_FOR_TX : ProgressTracker.Step("Waiting for the transaction to finalise.") - override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES) + override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SENDING_SIGNATURES, WAITING_FOR_TX) + // DOCEND 2 // DOCSTART 1 @Suspendable override fun call(): SignedTransaction { + // Wait for a trade request to come in from the other party. + progressTracker.currentStep = RECEIVING val tradeRequest = receiveAndValidateTradeRequest() + // Put together a proposed transaction that performs the trade, and sign it. progressTracker.currentStep = SIGNING val (ptx, cashSigningPubKeys) = assembleSharedTX(tradeRequest) val stx = signWithOurKeys(cashSigningPubKeys, ptx) - val signatures = swapSignaturesWithSeller(stx) + // Send the signed transaction to the seller, who must then sign it themselves and commit + // it to the ledger by sending it to the notary. + progressTracker.currentStep = SENDING_SIGNATURES + send(otherParty, stx) - logger.trace { "Got signatures from seller, verifying ... " } - - val fullySigned = stx + signatures.sellerSig + signatures.notarySig - fullySigned.verifySignatures() - - logger.trace { "Signatures received are valid. Trade complete! :-)" } - return fullySigned + // Wait for the finished, notarised transaction to arrive in our transaction store. + progressTracker.currentStep = WAITING_FOR_TX + return waitForLedgerCommit(stx.id) } @Suspendable private fun receiveAndValidateTradeRequest(): SellerTradeInfo { - progressTracker.currentStep = RECEIVING - // Wait for a trade request to come in from the other side val maybeTradeRequest = receive(otherParty) progressTracker.currentStep = VERIFYING @@ -216,24 +184,14 @@ object TwoPartyTradeFlow { if (!typeToBuy.isInstance(asset)) throw AssetMismatchException(typeToBuy.name, assetTypeName) - // Check the transaction that contains the state which is being resolved. - // We only have a hash here, so if we don't know it already, we have to ask for it. + // Check that the state being sold to us is in a valid chain of transactions, i.e. that the + // seller has a valid chain of custody proving that they own the thing they're selling. subFlow(ResolveTransactionsFlow(setOf(it.assetForSale.ref.txhash), otherParty)) return it } } - @Suspendable - private fun swapSignaturesWithSeller(stx: SignedTransaction): SignaturesFromSeller { - progressTracker.currentStep = SWAPPING_SIGNATURES - logger.trace { "Sending partially signed transaction to seller" } - - // TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx. - - return sendAndReceive(otherParty, stx).unwrap { it } - } - private fun signWithOurKeys(cashSigningPubKeys: List, ptx: TransactionBuilder): SignedTransaction { // Now sign the transaction with whatever keys we need to move the cash. for (publicKey in cashSigningPubKeys.keys) { diff --git a/node/build.gradle b/node/build.gradle index 991db103da..a061ad7f4f 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -52,9 +52,6 @@ sourceSets { dependencies { compile project(':finance') - testCompile project(':test-utils') - testCompile project(':client') - compile "com.google.code.findbugs:jsr305:3.0.1" // Log4J: logging framework (with SLF4J bindings) @@ -126,8 +123,11 @@ dependencies { // Unit testing helpers. testCompile "junit:junit:$junit_version" testCompile "org.assertj:assertj-core:${assertj_version}" - testCompile "com.pholser:junit-quickcheck-core:$quickcheck_version" + testCompile "com.nhaarman:mockito-kotlin:1.1.0" + testCompile project(':test-utils') + testCompile project(':client') + testCompile project(':core') // For H2 database support in persistence compile "com.h2database:h2:1.4.193" @@ -156,8 +156,6 @@ dependencies { // Integration test helpers integrationTestCompile "junit:junit:$junit_version" - - testCompile "com.nhaarman:mockito-kotlin:1.1.0" } task integrationTest(type: Test) { diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt index 8bbf3cd811..d43fad7564 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DataVendingService.kt @@ -15,7 +15,6 @@ import java.util.function.Function import javax.annotation.concurrent.ThreadSafe object DataVending { - class Plugin : CordaPluginRegistry() { override val servicePlugins = listOf(Function(::Service)) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowIORequest.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowIORequest.kt index 10a1c72e16..9496feeb2e 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowIORequest.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowIORequest.kt @@ -1,5 +1,6 @@ package net.corda.node.services.statemachine +import net.corda.core.crypto.SecureHash import net.corda.node.services.statemachine.StateMachineManager.FlowSession // TODO revisit when Kotlin 1.1 is released and data classes can extend other classes @@ -7,14 +8,17 @@ interface FlowIORequest { // This is used to identify where we suspended, in case of message mismatch errors and other things where we // don't have the original stack trace because it's in a suspended fiber. val stackTraceInCaseOfProblems: StackSnapshot +} + +interface SessionedFlowIORequest : FlowIORequest { val session: FlowSession } -interface SendRequest : FlowIORequest { +interface SendRequest : SessionedFlowIORequest { val message: SessionMessage } -interface ReceiveRequest : FlowIORequest { +interface ReceiveRequest : SessionedFlowIORequest { val receiveType: Class } @@ -36,4 +40,9 @@ data class SendOnly(override val session: FlowSession, override val message: Ses override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot() } +data class WaitForLedgerCommit(val hash: SecureHash, val fiber: FlowStateMachineImpl<*>) : FlowIORequest { + @Transient + override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot() +} + class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem") diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 520657c526..4beab873e2 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -7,11 +7,13 @@ import co.paralleluniverse.strands.Strand import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.StateMachineRunId import net.corda.core.random63BitValue +import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.trace import net.corda.node.services.api.ServiceHubInternal @@ -72,7 +74,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } } + // This state IS serialised, as we need it to know what the fiber is waiting for. internal val openSessions = HashMap, Party>, FlowSession>() + internal var waitingForLedgerCommitOf: SecureHash? = null init { logic.stateMachine = this @@ -172,6 +176,16 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, } } + @Suspendable + override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction { + waitingForLedgerCommitOf = hash + logger.info("Waiting for transaction $hash to commit") + suspend(WaitForLedgerCommit(hash, sessionFlow.stateMachine as FlowStateMachineImpl<*>)) + logger.info("Transaction $hash has committed to the ledger, resuming") + val stx = serviceHub.storageService.validatedTransactions.getTransaction(hash) + return stx ?: throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage") + } + private fun createSessionData(session: FlowSession, payload: Any): SessionData { val sessionState = session.state val peerSessionId = when (sessionState) { @@ -266,10 +280,12 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, @Suspendable private fun suspend(ioRequest: FlowIORequest) { - // we have to pass the Thread local Transaction across via a transient field as the Fiber Park swaps them out. + // We have to pass the thread local database transaction across via a transient field as the fiber park + // swaps them out. txTrampoline = TransactionManager.currentOrNull() StrandLocalTransactionManager.setThreadLocalTx(null) - ioRequest.session.waitingForResponse = (ioRequest is ReceiveRequest<*>) + if (ioRequest is SessionedFlowIORequest) + ioRequest.session.waitingForResponse = (ioRequest is ReceiveRequest<*>) var exceptionDuringSuspend: Throwable? = null parkAndSerialize { fiber, serializer -> diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index 85a890f459..4ed04014fc 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -6,11 +6,13 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer import co.paralleluniverse.strands.Strand import com.codahale.metrics.Gauge import com.esotericsoftware.kryo.Kryo +import com.google.common.collect.HashMultimap import com.google.common.util.concurrent.ListenableFuture import kotlinx.support.jdk8.collections.removeIf import net.corda.core.ThreadBox import net.corda.core.bufferUntilSubscribed import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.commonName import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic @@ -62,7 +64,7 @@ import javax.annotation.concurrent.ThreadSafe * TODO: Timeouts * TODO: Surfacing of exceptions via an API and/or management UI * TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt - * TODO: Implement stub/skel classes that provide a basic RPC framework on top of this. + * TODO: Don't store all active flows in memory, load from the database on demand. */ @ThreadSafe class StateMachineManager(val serviceHub: ServiceHubInternal, @@ -89,15 +91,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, // A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines // property. - private val mutex = ThreadBox(object { + private class InnerState { var started = false val stateMachines = LinkedHashMap, Checkpoint>() - val changesPublisher = PublishSubject.create() + val changesPublisher = PublishSubject.create()!! + val fibersWaitingForLedgerCommit = HashMultimap.create>()!! fun notifyChangeObservers(fiber: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) { changesPublisher.bufferUntilDatabaseCommit().onNext(Change(fiber.logic, addOrRemove, fiber.id)) } - }) + } + private val mutex = ThreadBox(InnerState()) // True if we're shutting down, so don't resume anything. @Volatile private var stopping = false @@ -152,9 +156,27 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, fun start() { restoreFibersFromCheckpoints() + listenToLedgerTransactions() serviceHub.networkMapCache.mapServiceRegistered.then(executor) { resumeRestoredFibers() } } + private fun listenToLedgerTransactions() { + // Observe the stream of committed, validated transactions and resume fibers that are waiting for them. + serviceHub.storageService.validatedTransactions.updates.subscribe { stx -> + val hash = stx.id + val flows: Set> = mutex.locked { fibersWaitingForLedgerCommit.removeAll(hash) } + if (flows.isNotEmpty()) { + executor.executeASAP { + for (flow in flows) { + logger.info("Resuming ${flow.id} because it was waiting for tx ${flow.waitingForLedgerCommitOf!!} which is now committed.") + flow.waitingForLedgerCommitOf = null + resumeFiber(flow) + } + } + } + } + } + private fun decrementLiveFibers() { liveFibers.countDown() } @@ -217,8 +239,20 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, private fun resumeRestoredFiber(fiber: FlowStateMachineImpl<*>) { fiber.openSessions.values.forEach { openSessions[it.ourSessionId] = it } + val waitingForHash = fiber.waitingForLedgerCommitOf if (fiber.openSessions.values.any { it.waitingForResponse }) { fiber.logger.info("Restored, pending on receive") + } else if (waitingForHash != null) { + val stx = databaseTransaction(database) { + serviceHub.storageService.validatedTransactions.getTransaction(waitingForHash) + } + if (stx != null) { + fiber.logger.info("Resuming fiber as tx $waitingForHash has committed.") + resumeFiber(fiber) + } else { + fiber.logger.info("Restored, pending on ledger commit of $waitingForHash") + mutex.locked { fibersWaitingForLedgerCommit.put(waitingForHash, fiber) } + } } else { resumeFiber(fiber) } @@ -424,6 +458,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, * Note that you must be on the [executor] thread. */ fun add(logic: FlowLogic): FlowStateMachine { + // TODO: Check that logic has @Suspendable on its call method. executor.checkOnThread() // We swap out the parent transaction context as using this frequently leads to a deadlock as we wait // on the flow completion future inside that context. The problem is that any progress checkpoints are @@ -457,8 +492,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, private fun resumeFiber(fiber: FlowStateMachineImpl<*>) { // Avoid race condition when setting stopping to true and then checking liveFibers incrementLiveFibers() - if (!stopping) executor.executeASAP { - fiber.resume(scheduler) + if (!stopping) { + executor.executeASAP { + fiber.resume(scheduler) + } } else { fiber.logger.debug("Not resuming as SMM is stopping.") decrementLiveFibers() @@ -466,6 +503,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, } private fun processIORequest(ioRequest: FlowIORequest) { + executor.checkOnThread() if (ioRequest is SendRequest) { if (ioRequest.message is SessionInit) { openSessions[ioRequest.session.ourSessionId] = ioRequest.session @@ -475,6 +513,24 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, // We sent a message, but don't expect a response, so re-enter the continuation to let it keep going. resumeFiber(ioRequest.session.fiber) } + } else if (ioRequest is WaitForLedgerCommit) { + // Is it already committed? + val stx = databaseTransaction(database) { + serviceHub.storageService.validatedTransactions.getTransaction(ioRequest.hash) + } + if (stx != null) { + resumeFiber(ioRequest.fiber) + } else { + // No, then register to wait. + // + // We assume this code runs on the server thread, which is the only place transactions are committed + // currently. When we liberalise our threading somewhat, handing of wait requests will need to be + // reworked to make the wait atomic in another way. Otherwise there is a race between checking the + // database and updating the waiting list. + mutex.locked { + fibersWaitingForLedgerCommit[ioRequest.hash] += ioRequest.fiber + } + } } } diff --git a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt index eeff28cf6d..80154b1c07 100644 --- a/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt +++ b/node/src/main/kotlin/net/corda/node/services/vault/NodeVaultService.kt @@ -195,13 +195,6 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT } } - /** - * Generate a transaction that moves an amount of currency to the given pubkey. - * - * @param onlyFromParties if non-null, the asset states will be filtered to only include those issued by the set - * of given parties. This can be useful if the party you're trying to pay has expectations - * about which type of asset claims they are willing to accept. - */ override fun generateSpend(tx: TransactionBuilder, amount: Amount, to: CompositeKey, diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt index d8fdb57330..b0ed7a81da 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/StateMachineManagerTests.kt @@ -5,8 +5,10 @@ import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand.UncaughtExceptionHandler import com.google.common.util.concurrent.ListenableFuture import net.corda.core.contracts.DOLLARS +import net.corda.core.contracts.DummyState import net.corda.core.contracts.issuedBy import net.corda.core.crypto.Party +import net.corda.core.crypto.SecureHash import net.corda.core.crypto.generateKeyPair import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic @@ -19,8 +21,11 @@ import net.corda.core.random63BitValue import net.corda.core.rootCause import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.deserialize +import net.corda.core.transactions.SignedTransaction +import net.corda.core.transactions.TransactionBuilder import net.corda.flows.CashCommand import net.corda.flows.CashFlow +import net.corda.flows.FinalityFlow import net.corda.flows.NotaryFlow import net.corda.node.services.persistence.checkpoints import net.corda.node.services.transactions.ValidatingNotaryService @@ -483,9 +488,26 @@ class StateMachineManagerTests { assertThat(resultFuture.getOrThrow()).isEqualTo("Hello") } - private inline fun > MockNode.restartAndGetRestoredFlow( - networkMapNode: MockNode? = null): P { - disableDBCloseOnStop() //Handover DB to new node copy + @Test + fun `wait for transaction`() { + val ptx = TransactionBuilder(notary = notary1.info.notaryIdentity) + ptx.addOutputState(DummyState()) + ptx.signWith(node1.services.legalIdentityKey) + val stx = ptx.toSignedTransaction() + + val future1 = node2.services.startFlow(WaitingFlows.Waiter(stx.id)).resultFuture + val future2 = node1.services.startFlow(WaitingFlows.Committer(stx, node2.info.legalIdentity)).resultFuture + net.runNetwork() + future1.getOrThrow() + future2.getOrThrow() + } + + + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + //region Helpers + + private inline fun > MockNode.restartAndGetRestoredFlow(networkMapNode: MockNode? = null): P { + disableDBCloseOnStop() // Handover DB to new node copy stop() val newNode = mockNet.createNode(networkMapNode?.info?.address, id, advertisedServices = *advertisedServices.toTypedArray()) newNode.acceptableLiveFiberCountOnStop = 1 @@ -611,4 +633,22 @@ class StateMachineManagerTests { override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message override fun hashCode(): Int = message?.hashCode() ?: 31 } + + private object WaitingFlows { + class Waiter(private val hash: SecureHash) : FlowLogic() { + @Suspendable + override fun call() { + waitForLedgerCommit(hash) + } + } + + class Committer(private val stx: SignedTransaction, private val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call() { + subFlow(FinalityFlow(stx, setOf(otherParty))) + } + } + } + + //endregion Helpers } diff --git a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt index 6ae1c8f5d7..49c3b240ec 100644 --- a/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt +++ b/samples/trader-demo/src/main/kotlin/net/corda/traderdemo/flow/SellerFlow.kt @@ -55,10 +55,7 @@ class SellerFlow(val otherParty: Party, amount, cpOwnerKey, progressTracker.getChildProgressTracker(TRADING)!!) - val tradeTX: SignedTransaction = subFlow(seller, shareParentSessions = true) - serviceHub.recordTransactions(listOf(tradeTX)) - - return tradeTX + return subFlow(seller, shareParentSessions = true) } @Suspendable