mirror of
synced 2025-03-17 17:45:17 +00:00
Improve the flow commit API.
Make FinalityFlow do more, and be used more consistently. Add a new waitForLedgerCommit API that is intended to be used at the end of flows, or at any other point where a flow wants to wait for a transaction to finalise (but the finalisation flow is being done by someone else). Update the docs a bit.
This commit is contained in:
@ -2,7 +2,9 @@ package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.UntrustworthyData
import org.slf4j.Logger
@ -171,6 +173,16 @@ abstract class FlowLogic<out T> {
* Suspends the flow until the transaction with the specified ID is received, successfully verified and
* sent to the vault for processing. Note that this call suspends until the transaction is considered
* valid by the local node, but that doesn't imply the vault will consider it relevant.
fun waitForLedgerCommit(hash: SecureHash): SignedTransaction {
return stateMachine.waitForLedgerCommit(hash, this)
private var _stateMachine: FlowStateMachine<*>? = null
@ -3,7 +3,9 @@ package net.corda.core.flows
import co.paralleluniverse.fibers.Suspendable
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.node.ServiceHub
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.UntrustworthyData
import org.slf4j.Logger
import java.util.*
@ -35,6 +37,9 @@ interface FlowStateMachine<R> {
fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>)
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction
val serviceHub: ServiceHub
val logger: Logger
val id: StateMachineRunId
@ -43,7 +43,8 @@ interface MessagingService {
* The provided function will be invoked for each received message whose topic and session matches. The callback
* will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result.
* will run on the main server thread provided when the messaging service is constructed, and a database
* transaction is set up for you automatically.
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
@ -29,11 +29,12 @@ interface ServiceHub {
val myInfo: NodeInfo
* Given a list of [SignedTransaction]s, writes them to the local storage for validated transactions and then
* sends them to the vault for further processing.
* Given a [SignedTransaction], writes it to the local storage for validated transactions and then
* sends them to the vault for further processing. Expects to be run within a database transaction.
* @param txs The transactions to record.
// TODO: Make this take a single tx.
fun recordTransactions(txs: Iterable<SignedTransaction>)
@ -178,10 +178,22 @@ interface VaultService {
fun getTransactionNotes(txnId: SecureHash): Iterable<String>
* [InsufficientBalanceException] is thrown when a Cash Spending transaction fails because
* there is insufficient quantity for a given currency (and optionally set of Issuer Parties).
* Note: an [Amount] of [Currency] is only fungible for a given Issuer Party within a [FungibleAsset]
* Generate a transaction that moves an amount of currency to the given pubkey.
* Note: an [Amount] of [Currency] is only fungible for a given Issuer Party within a [FungibleAsset]
* @param tx A builder, which may contain inputs, outputs and commands already. The relevant components needed
* to move the cash will be added on top.
* @param amount How much currency to send.
* @param to a key of the recipient.
* @param onlyFromParties if non-null, the asset states will be filtered to only include those issued by the set
* of given parties. This can be useful if the party you're trying to pay has expectations
* about which type of asset claims they are willing to accept.
* @return A [Pair] of the same transaction builder passed in as [tx], and the list of keys that need to sign
* the resulting transaction for it to be valid.
* @throws InsufficientBalanceException when a cash spending transaction fails because
* there is insufficient quantity for a given currency (and optionally set of Issuer Parties).
fun generateSpend(tx: TransactionBuilder,
amount: Amount<Currency>,
@ -3,13 +3,13 @@ package net.corda.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.node.recordTransactions
import net.corda.core.transactions.SignedTransaction
* Notify all involved parties about a transaction, including storing a copy. Normally this would be called via
* [FinalityFlow].
* Notify the specified parties about a transaction. The remote peers will download this transaction and its
* dependency graph, verifying them all. The flow returns when all peers have acknowledged the transactions
* as valid. Normally you wouldn't use this directly, it would be called via [FinalityFlow].
* @param notarisedTransaction transaction which has been notarised (if needed) and is ready to notify nodes about.
* @param participants a list of participants involved in the transaction.
@ -17,17 +17,14 @@ import net.corda.core.transactions.SignedTransaction
class BroadcastTransactionFlow(val notarisedTransaction: SignedTransaction,
val participants: Set<Party>) : FlowLogic<Unit>() {
data class NotifyTxRequest(val tx: SignedTransaction)
override fun call() {
// Record it locally
// TODO: Messaging layer should handle this broadcast for us
val msg = NotifyTxRequest(notarisedTransaction)
participants.filter { it != serviceHub.myInfo.legalIdentity }.forEach { participant ->
// This pops out the other side in DataVending.NotifyTransactionHandler.
send(participant, msg)
@ -1,48 +1,87 @@
package net.corda.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.Party
import net.corda.core.flows.FlowLogic
import net.corda.core.node.ServiceHub
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
* Finalise a transaction by notarising it, then recording it locally, and then sending it to all involved parties.
* Verifies the given transactions, then sends them to the named notaries. If the notary agrees that the transactions
* are acceptable then they are from that point onwards committed to the ledger, and will be written through to the
* vault. Additionally they will be distributed to the parties reflected in the participants list of the states.
* @param transaction to commit.
* @param participants a list of participants involved in the transaction.
* @return a list of participants who were successfully notified of the transaction.
* The transactions will be topologically sorted before commitment to ensure that dependencies are committed before
* dependers, so you don't need to do this yourself.
* The transactions are expected to have already been resolved: if their dependencies are not available in local
* storage or within the given set, verification will fail. They must have signatures from all necessary parties
* other than the notary.
* If specified, the extra recipients are sent all the given transactions. The base set of parties to inform of each
* transaction are calculated on a per transaction basis from the contract-given set of participants.
* The flow returns the same transactions, in the same order, with the additional signatures.
* @param transactions What to commit.
* @param extraRecipients A list of additional participants to inform of the transaction.
class FinalityFlow(val transaction: SignedTransaction,
val participants: Set<Party>,
override val progressTracker: ProgressTracker) : FlowLogic<Unit>() {
constructor(transaction: SignedTransaction, participants: Set<Party>) : this(transaction, participants, tracker())
class FinalityFlow(val transactions: Iterable<SignedTransaction>,
val extraRecipients: Set<Party>,
override val progressTracker: ProgressTracker) : FlowLogic<List<SignedTransaction>>() {
constructor(transaction: SignedTransaction, extraParticipants: Set<Party>) : this(listOf(transaction), extraParticipants, tracker())
constructor(transaction: SignedTransaction) : this(listOf(transaction), emptySet(), tracker())
companion object {
object NOTARISING : ProgressTracker.Step("Requesting signature by notary service")
object NOTARISING : ProgressTracker.Step("Requesting signature by notary service") {
override fun childProgressTracker() = NotaryFlow.Client.tracker()
object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants")
// TODO: Make all tracker() methods @JvmStatic
fun tracker() = ProgressTracker(NOTARISING, BROADCASTING)
override fun call() {
// TODO: Resolve the tx here: it's probably already been done, but re-resolution is a no-op and it'll make the API more forgiving.
override fun call(): List<SignedTransaction> {
// Note: this method is carefully broken up to minimize the amount of data reachable from the stack at
// the point where subFlow is invoked, as that minimizes the checkpointing work to be done.
// Lookup the resolved transactions and use them to map each signed transaction to the list of participants.
// Then send to the notary if needed, record locally and distribute.
progressTracker.currentStep = NOTARISING
// Notarise the transaction if needed
val notarisedTransaction = if (needsNotarySignature(transaction)) {
val notarySig = subFlow(NotaryFlow.Client(transaction))
} else {
val notarisedTxns = notariseAndRecord(lookupParties(resolveDependenciesOf(transactions)))
// Let everyone else know about the transaction
// Each transaction has its own set of recipients, but extra recipients get them all.
progressTracker.currentStep = BROADCASTING
subFlow(BroadcastTransactionFlow(notarisedTransaction, participants))
val me = serviceHub.myInfo.legalIdentity
for ((stx, parties) in notarisedTxns) {
subFlow(BroadcastTransactionFlow(stx, parties + extraRecipients - me))
return notarisedTxns.map { it.first }
// TODO: API: Make some of these protected?
private fun notariseAndRecord(stxnsAndParties: List<Pair<SignedTransaction, Set<Party>>>): List<Pair<SignedTransaction, Set<Party>>> {
return stxnsAndParties.map { pair ->
val stx = pair.first
val notarised = if (needsNotarySignature(stx)) {
val notarySig = subFlow(NotaryFlow.Client(stx))
stx + notarySig
} else {
Pair(notarised, pair.second)
private fun needsNotarySignature(stx: SignedTransaction) = stx.tx.notary != null && hasNoNotarySignature(stx)
@ -51,4 +90,38 @@ class FinalityFlow(val transaction: SignedTransaction,
val signers = stx.sigs.map { it.by }.toSet()
return !(notaryKey?.isFulfilledBy(signers) ?: false)
private fun lookupParties(ltxns: List<Pair<SignedTransaction, LedgerTransaction>>): List<Pair<SignedTransaction, Set<Party>>> {
return ltxns.map { pair ->
val (stx, ltx) = pair
// Calculate who is meant to see the results based on the participants involved.
val keys = ltx.outputs.flatMap { it.data.participants } + ltx.inputs.flatMap { it.state.data.participants }
// TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them count as a reason to fail?
val parties = keys.mapNotNull { serviceHub.identityService.partyFromKey(it) }.toSet()
Pair(stx, parties)
private fun resolveDependenciesOf(signedTransactions: Iterable<SignedTransaction>): List<Pair<SignedTransaction, LedgerTransaction>> {
// Make sure the dependencies come before the dependers.
val sorted = ResolveTransactionsFlow.topologicalSort(signedTransactions.toList())
// Build a ServiceHub that consults the argument list as well as what's in local tx storage so uncommitted
// transactions can depend on each other.
val augmentedLookup = object : ServiceHub by serviceHub {
val hashToTx = sorted.associateBy { it.id }
override fun loadState(stateRef: StateRef): TransactionState<*> {
val provided: TransactionState<ContractState>? = hashToTx[stateRef.txhash]?.let { it.tx.outputs[stateRef.index] }
return provided ?: super.loadState(stateRef)
// Load and verify each transaction.
return sorted.map { stx ->
val notary = stx.tx.notary
// The notary signature is allowed to be missing but no others.
val wtx = if (notary != null) stx.verifySignatures(notary.owningKey) else stx.verifySignatures()
val ltx = wtx.toLedgerTransaction(augmentedLookup)
stx to ltx
@ -16,7 +16,6 @@ import net.corda.core.transactions.WireTransaction
import net.corda.core.utilities.ProgressTracker
object NotaryFlow {
* A flow to be used for obtaining a signature from a [NotaryService] ascertaining the transaction
* timestamp is correct and none of its inputs have been used in another completed transaction.
@ -34,13 +34,16 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
companion object {
private fun dependencyIDs(wtx: WireTransaction) = wtx.inputs.map { it.txhash }.toSet()
private fun topologicalSort(transactions: Collection<SignedTransaction>): List<SignedTransaction> {
* Topologically sorts the given transactions such that dependencies are listed before dependers. */
fun topologicalSort(transactions: Collection<SignedTransaction>): List<SignedTransaction> {
// Construct txhash -> dependent-txs map
val forwardGraph = HashMap<SecureHash, HashSet<SignedTransaction>>()
transactions.forEach { tx ->
tx.tx.inputs.forEach { input ->
transactions.forEach { stx ->
stx.tx.inputs.forEach { input ->
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is)
forwardGraph.getOrPut(input.txhash) { LinkedHashSet() }.add(tx)
forwardGraph.getOrPut(input.txhash) { LinkedHashSet() }.add(stx)
@ -174,7 +174,7 @@ class ForeignExchangeFlow(val tradeId: String,
withNewSignature // return the almost complete transaction
// Initiate the standard protocol to notarise and distribute to the involved parties
// Initiate the standard protocol to notarise and distribute to the involved parties.
subFlow(FinalityFlow(allPartySignedTx, setOf(baseCurrencyBuyer, baseCurrencySeller)))
return allPartySignedTx.id
@ -27,6 +27,7 @@ inline fun <reified T : LinearState> ServiceHub.latest(ref: StateRef): StateAndR
val original = toStateAndRef<T>(ref)
return linearHeads.get(original.state.data.linearId)!!
// Minimal state model of a manual approval process
@ -121,18 +122,14 @@ class SubmitTradeApprovalFlow(val tradeId: String,
// identify a notary. This might also be done external to the flow
val notary = serviceHub.networkMapCache.getAnyNotary()
// Create the TransactionBuilder and populate with the new state.
val tx = TransactionType.
val tx = TransactionType.General.Builder(notary)
.withItems(tradeProposal, Command(TradeApprovalContract.Commands.Issue(), listOf(tradeProposal.source.owningKey)))
tx.setTime(serviceHub.clock.instant(), Duration.ofSeconds(60))
// We can automatically sign as there is no untrusted data.
// Convert to a SignedTransaction that we can send to the notary
val signedTx = tx.toSignedTransaction(false)
// Run the FinalityFlow to notarise and distribute the SignedTransaction to the counterparty
// Notarise and distribute.
subFlow(FinalityFlow(signedTx, setOf(serviceHub.myInfo.legalIdentity, counterparty)))
// Return the initial state
return signedTx.tx.outRef<TradeApprovalContract.State>(0)
@ -210,10 +207,8 @@ class SubmitCompletionFlow(val ref: StateRef, val verdict: WorkflowState) : Flow
// Run the FinalityFlow to notarise and distribute the completed transaction.
setOf(latestRecord.state.data.source, latestRecord.state.data.counterparty)))
// Notarise and distribute the completed transaction.
subFlow(FinalityFlow(allPartySignedTx, setOf(latestRecord.state.data.source, latestRecord.state.data.counterparty)))
// Return back the details of the completed state/transaction.
return allPartySignedTx.tx.outRef<TradeApprovalContract.State>(0)
@ -89,7 +89,9 @@ Our flow has two parties (B and S for buyer and seller) and will proceed as foll
2. B sends to S a ``SignedTransaction`` that includes the state as input, B's cash as input, the state with the new
owner key as output, and any change cash as output. It contains a single signature from B but isn't valid because
it lacks a signature from S authorising movement of the asset.
3. S signs it and hands the now finalised ``SignedTransaction`` back to B.
3. S signs it and *finalises* the transaction. This means sending it to the notary, which checks the transaction for
validity, recording the transaction in the local vault, and then sending it back to B who also checks it and commits
the transaction to their local vault.
You can find the implementation of this flow in the file ``finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt``.
@ -98,8 +100,7 @@ represents an atomic asset swap.
Note that it's the *seller* who initiates contact with the buyer, not vice-versa as you might imagine.
We start by defining a wrapper that namespaces the flow code, two functions to start either the buy or sell side
of the flow, and two classes that will contain the flow definition. We also pick what data will be used by
We start by defining two classes that will contain the flow definition. We also pick what data will be used by
each side.
.. note:: The code samples in this tutorial are only available in Kotlin, but you can use any JVM language to
@ -110,7 +111,6 @@ each side.
.. sourcecode:: kotlin
object TwoPartyTradeFlow {
class UnacceptablePriceException(val givenPrice: Amount<Currency>) : FlowException("Unacceptable price: $givenPrice")
class AssetMismatchException(val expectedTypeName: String, val typeName: String) : FlowException() {
override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName"
@ -188,8 +188,6 @@ and try again.
.. note:: Java 9 is likely to remove this pre-marking requirement completely.
.. note:: Accessing the vault from inside an @Suspendable function (e.g. via ``serviceHub.vaultService``) can cause a serialisation error when the fiber suspends. Instead, vault access should be performed from a helper non-suspendable function, which you then call from the @Suspendable function. We are working to fix this.
Starting your flow
@ -248,12 +246,11 @@ Let's implement the ``Seller.call`` method. This will be run when the flow is in
:dedent: 4
Here we see the outline of the procedure. We receive a proposed trade transaction from the buyer and check that it's
valid. The buyer has already attached their signature before sending it. Then we calculate and attach our own signature so that the transaction is
now signed by both the buyer and the seller. We then send this request to a notary to assert with another signature that the
timestamp in the transaction (if any) is valid and there are no double spends, and send back both
our signature and the notaries signature. Note we should not send to the notary until all other required signatures have been appended
as the notary may validate the signatures as well as verifying for itself the transactional integrity.
Finally, we hand back to the code that invoked the flow the finished transaction.
valid. The buyer has already attached their signature before sending it. Then we calculate and attach our own signature
so that the transaction is now signed by both the buyer and the seller. We then *finalise* this transaction by sending
it to a notary to assert (with another signature) that the timestamp in the transaction (if any) is valid and there are no
double spends. Finally, after the finalisation process is complete, we retrieve the now fully signed transaction from
local storage. It will have the same ID as the one we started with but more signatures.
Let's fill out the ``receiveAndCheckProposedTransaction()`` method.
@ -327,24 +324,39 @@ Throwing a ``FlowException`` enables a flow to reject a piece of data it has rec
done in the ``unwrap`` method of the received ``UntrustworthyData``. In the above example the seller checks the price
and throws ``FlowException`` if it's invalid. It's then up to the buyer to either try again with a better price or give up.
Sub-flows and finalisation
Flows can be composed via nesting. Invoking a sub-flow looks similar to an ordinary function call:
.. container:: codeset
.. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt
:language: kotlin
:start-after: DOCSTART 6
:end-before: DOCEND 6
:dedent: 4
.. sourcecode:: kotlin
fun call() {
val unnotarisedTransaction = ...
.. sourcecode:: java
public void call() throws FlowException {
SignedTransaction unnotarisedTransaction = ...
subFlow(new FinalityFlow(unnotarisedTransaction))
In this code snippet we are using the ``FinalityFlow`` to finish off the transaction. It will:
* Send the transaction to the chosen notary and, if necessary, satisfy the notary that the transaction is valid.
* Record the transaction in the local vault, if it is relevant (i.e. involves the owner of the node).
* Send the fully signed transaction to the other participants for recording as well.
In this code snippet we are using the ``NotaryFlow.Client`` to request notarisation of the transaction.
We simply create the flow object via its constructor, and then pass it to the ``subFlow`` method which
returns the result of the flow's execution directly. Behind the scenes all this is doing is wiring up progress
tracking (discussed more below) and then running the objects ``call`` method. Because this little helper method can
be on the stack when network IO takes place, we mark it as ``@Suspendable``.
tracking (discussed more below) and then running the objects ``call`` method. Because the sub-flow might suspend,
we must mark the method that invokes it as suspendable.
Going back to the previous code snippet, we use a sub-flow called ``ResolveTransactionsFlow``. This is
responsible for downloading and checking all the dependencies of a transaction, which in Corda are always retrievable
@ -360,32 +372,11 @@ objects, but we don't need them here so we just ignore the return value.
After the dependencies, we check the proposed trading transaction for validity by running the contracts for that as
well (but having handled the fact that some signatures are missing ourselves).
Here's the rest of the code:
.. container:: codeset
.. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt
:language: kotlin
:start-after: DOCSTART 7
:end-before: DOCEND 7
:dedent: 4
It's all pretty straightforward from now on. Here ``id`` is the secure hash representing the serialised
transaction, and we just use our private key to calculate a signature over it. As a reminder, in Corda signatures do
not cover other signatures: just the core of the transaction data.
In ``sendSignatures``, we take the two signatures we obtained and add them to the partial transaction we were sent.
There is an overload for the + operator so signatures can be added to a SignedTransaction easily. Finally, we wrap the
two signatures in a simple wrapper message class and send it back. The send won't block waiting for an acknowledgement,
but the underlying message queue software will retry delivery if the other side has gone away temporarily.
You can also see that every flow instance has a logger (using the SLF4J API) which you can use to log progress
.. warning:: This sample code is **not secure**. Other than not checking for all possible invalid constructions, if the
seller stops before sending the finalised transaction to the buyer, the seller is left with a valid transaction
but the buyer isn't, so they can't spend the asset they just purchased! This sort of thing will be fixed in a
future version of the code.
.. warning:: If the seller stops before sending the finalised transaction to the buyer, the seller is left with a
valid transaction but the buyer isn't, so they can't spend the asset they just purchased! This sort of thing is not
always a risk (as the seller may not gain anything from that sort of behaviour except a lawsuit), but if it is, a future
version of the platform will allow you to ask the notary to send you the transaction as well, in case your counterparty
does not. This is not the default because it reveals more private info to the notary.
Implementing the buyer
@ -403,12 +394,11 @@ OK, let's do the same for the buyer side:
This code is longer but no more complicated. Here are some things to pay attention to:
1. We do some sanity checking on the received message to ensure we're being offered what we expected to be offered.
2. We create a cash spend in the normal way, by using ``VaultService.generateSpend``. See the vault documentation if this
part isn't clear.
2. We create a cash spend using ``VaultService.generateSpend``. You can read the vault documentation to learn more about this.
3. We access the *service hub* when we need it to access things that are transient and may change or be recreated
whilst a flow is suspended, things like the wallet or the network map.
4. Finally, we send the unfinished, invalid transaction to the seller so they can sign it. They are expected to send
back to us a ``SignaturesFromSeller``, which once we verify it, should be the final outcome of the trade.
4. We send the unfinished, invalid transaction to the seller so they can sign it and finalise it.
5. Finally, we wait for the finished transaction to arrive in our local storage and vault.
As you can see, the flow logic is straightforward and does not contain any callbacks or network glue code, despite
the fact that it takes minimal resources and can survive node restarts.
@ -435,7 +425,7 @@ A flow might declare some steps with code inside the flow class like this:
.. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt
:language: kotlin
:start-after: DOCSTART 2
:end-before: DOCSTART 1
:end-before: DOCEND 2
:dedent: 4
.. sourcecode:: java
@ -10,6 +10,10 @@ Milestone 8
* ``Party`` equality is now based on the owning key, rather than the owning key and name. This is important for
party anonymisation to work, as each key must identify exactly one party.
* A new ``waitForLedgerCommit`` method is available inside flows. Given a hash it will suspend the flow until
a valid transaction with that hash has been received, committed and processed by the vault. This is useful
in multi-party flows where one side takes responsibility for sending the finished transaction to the notary,
and the other side wishes to wait for it.
Milestone 7
@ -123,9 +123,8 @@ class CashFlow(val command: CashCommand, override val progressTracker: ProgressT
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)
val myKey = serviceHub.legalIdentityKey
val tx = builder.toSignedTransaction(checkSufficientSignatures = true)
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
subFlow(BroadcastTransactionFlow(tx, setOf(req.recipient)))
val tx = builder.toSignedTransaction()
return tx
@ -25,23 +25,17 @@ import java.util.*
* 2. B sends to S a [SignedTransaction] that includes the state as input, B's cash as input, the state with the new
* owner key as output, and any change cash as output. It contains a single signature from B but isn't valid because
* it lacks a signature from S authorising movement of the asset.
* 3. S signs it and hands the now finalised SignedWireTransaction back to B.
* 3. S signs it and commits it to the ledger, notarising it and distributing the final signed transaction back
* to B.
* Assuming no malicious termination, they both end the flow being in posession of a valid, signed transaction
* that represents an atomic asset swap.
* Note that it's the *seller* who initiates contact with the buyer, not vice-versa as you might imagine.
* To initiate the flow, use either the [runBuyer] or [runSeller] methods, depending on which side of the trade
* your node is taking. These methods return a future which will complete once the trade is over and a fully signed
* transaction is available: you can either block your thread waiting for the flow to complete by using
* [ListenableFuture.get] or more usefully, register a callback that will be invoked when the time comes.
* To see an example of how to use this class, look at the unit tests.
// TODO: Common elements in multi-party transaction consensus and signing should be refactored into a superclass of this
// and [AbstractStateReplacementFlow].
object TwoPartyTradeFlow {
// TODO: Common elements in multi-party transaction consensus and signing should be refactored into a superclass of this
// and [AbstractStateReplacementFlow].
class UnacceptablePriceException(givenPrice: Amount<Currency>) : FlowException("Unacceptable price: $givenPrice")
class AssetMismatchException(val expectedTypeName: String, val typeName: String) : FlowException() {
@ -70,35 +64,26 @@ object TwoPartyTradeFlow {
object VERIFYING : ProgressTracker.Step("Verifying transaction proposal")
object SIGNING : ProgressTracker.Step("Signing transaction")
object NOTARY : ProgressTracker.Step("Getting notary signature") {
object COMMITTING : ProgressTracker.Step("Committing transaction to the ledger") {
override fun childProgressTracker() = FinalityFlow.tracker()
object SENDING_SIGS : ProgressTracker.Step("Sending transaction signatures to buyer")
object SENDING_FINAL_TX : ProgressTracker.Step("Sending final transaction to buyer")
override fun call(): SignedTransaction {
val partialTX: SignedTransaction = receiveAndCheckProposedTransaction()
val ourSignature: DigitalSignature.WithKey = calculateOurSignature(partialTX)
val allPartySignedTx: SignedTransaction = partialTX + ourSignature
val notarySignature: DigitalSignature.WithKey = getNotarySignature(allPartySignedTx)
val result: SignedTransaction = sendSignatures(allPartySignedTx, ourSignature, notarySignature)
return result
val partialSTX: SignedTransaction = receiveAndCheckProposedTransaction()
val ourSignature = calculateOurSignature(partialSTX)
val unnotarisedSTX: SignedTransaction = partialSTX + ourSignature
val finishedSTX = subFlow(FinalityFlow(unnotarisedSTX)).single()
return finishedSTX
private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.WithKey {
progressTracker.currentStep = NOTARY
return subFlow(NotaryFlow.Client(stx))
private fun receiveAndCheckProposedTransaction(): SignedTransaction {
@ -107,14 +92,12 @@ object TwoPartyTradeFlow {
val myPublicKey = myKeyPair.public.composite
// Make the first message we'll send to kick off the flow.
val hello = SellerTradeInfo(assetToSell, price, myPublicKey)
val maybeSTX = sendAndReceive<SignedTransaction>(otherParty, hello)
// What we get back from the other side is a transaction that *might* be valid and acceptable to us,
// but we must check it out thoroughly before we sign!
val untrustedSTX = sendAndReceive<SignedTransaction>(otherParty, hello)
progressTracker.currentStep = VERIFYING
maybeSTX.unwrap {
return untrustedSTX.unwrap {
// Check that the tx proposed by the buyer is valid.
val wtx: WireTransaction = it.verifySignatures(myPublicKey, notaryNode.notaryIdentity.owningKey)
logger.trace { "Received partially signed transaction: ${it.id}" }
@ -123,11 +106,10 @@ object TwoPartyTradeFlow {
// even though it is missing signatures.
subFlow(ResolveTransactionsFlow(wtx, otherParty))
if (wtx.outputs.map { it.data }.sumCashBy(myPublicKey).withoutIssuer() != price) {
if (wtx.outputs.map { it.data }.sumCashBy(myPublicKey).withoutIssuer() != price)
throw FlowException("Transaction is not sending us the right amount of cash")
return it
@ -144,64 +126,50 @@ object TwoPartyTradeFlow {
// but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to
// express flow state machines on top of the messaging layer.
open fun calculateOurSignature(partialTX: SignedTransaction): DigitalSignature.WithKey {
progressTracker.currentStep = SIGNING
return myKeyPair.signWithECDSA(partialTX.id)
private fun sendSignatures(allPartySignedTx: SignedTransaction,
ourSignature: DigitalSignature.WithKey,
notarySignature: DigitalSignature.WithKey): SignedTransaction {
progressTracker.currentStep = SENDING_SIGS
val fullySigned = allPartySignedTx + notarySignature
logger.trace { "Built finished transaction, sending back to secondary!" }
send(otherParty, SignaturesFromSeller(ourSignature, notarySignature))
return fullySigned
open class Buyer(val otherParty: Party,
val notary: Party,
val acceptablePrice: Amount<Currency>,
val typeToBuy: Class<out OwnableState>) : FlowLogic<SignedTransaction>() {
object RECEIVING : ProgressTracker.Step("Waiting for seller trading info")
object VERIFYING : ProgressTracker.Step("Verifying seller assets")
object SIGNING : ProgressTracker.Step("Generating and signing transaction proposal")
object SWAPPING_SIGNATURES : ProgressTracker.Step("Swapping signatures with the seller")
object SENDING_SIGNATURES : ProgressTracker.Step("Sending signatures to the seller")
object WAITING_FOR_TX : ProgressTracker.Step("Waiting for the transaction to finalise.")
override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES)
override fun call(): SignedTransaction {
// Wait for a trade request to come in from the other party.
progressTracker.currentStep = RECEIVING
val tradeRequest = receiveAndValidateTradeRequest()
// Put together a proposed transaction that performs the trade, and sign it.
progressTracker.currentStep = SIGNING
val (ptx, cashSigningPubKeys) = assembleSharedTX(tradeRequest)
val stx = signWithOurKeys(cashSigningPubKeys, ptx)
val signatures = swapSignaturesWithSeller(stx)
// Send the signed transaction to the seller, who must then sign it themselves and commit
// it to the ledger by sending it to the notary.
progressTracker.currentStep = SENDING_SIGNATURES
send(otherParty, stx)
logger.trace { "Got signatures from seller, verifying ... " }
val fullySigned = stx + signatures.sellerSig + signatures.notarySig
logger.trace { "Signatures received are valid. Trade complete! :-)" }
return fullySigned
// Wait for the finished, notarised transaction to arrive in our transaction store.
progressTracker.currentStep = WAITING_FOR_TX
return waitForLedgerCommit(stx.id)
private fun receiveAndValidateTradeRequest(): SellerTradeInfo {
progressTracker.currentStep = RECEIVING
// Wait for a trade request to come in from the other side
val maybeTradeRequest = receive<SellerTradeInfo>(otherParty)
progressTracker.currentStep = VERIFYING
@ -216,24 +184,14 @@ object TwoPartyTradeFlow {
if (!typeToBuy.isInstance(asset))
throw AssetMismatchException(typeToBuy.name, assetTypeName)
// Check the transaction that contains the state which is being resolved.
// We only have a hash here, so if we don't know it already, we have to ask for it.
// Check that the state being sold to us is in a valid chain of transactions, i.e. that the
// seller has a valid chain of custody proving that they own the thing they're selling.
subFlow(ResolveTransactionsFlow(setOf(it.assetForSale.ref.txhash), otherParty))
return it
private fun swapSignaturesWithSeller(stx: SignedTransaction): SignaturesFromSeller {
progressTracker.currentStep = SWAPPING_SIGNATURES
logger.trace { "Sending partially signed transaction to seller" }
// TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx.
return sendAndReceive<SignaturesFromSeller>(otherParty, stx).unwrap { it }
private fun signWithOurKeys(cashSigningPubKeys: List<CompositeKey>, ptx: TransactionBuilder): SignedTransaction {
// Now sign the transaction with whatever keys we need to move the cash.
for (publicKey in cashSigningPubKeys.keys) {
@ -52,9 +52,6 @@ sourceSets {
dependencies {
compile project(':finance')
testCompile project(':test-utils')
testCompile project(':client')
compile "com.google.code.findbugs:jsr305:3.0.1"
// Log4J: logging framework (with SLF4J bindings)
@ -126,8 +123,11 @@ dependencies {
// Unit testing helpers.
testCompile "junit:junit:$junit_version"
testCompile "org.assertj:assertj-core:${assertj_version}"
testCompile "com.pholser:junit-quickcheck-core:$quickcheck_version"
testCompile "com.nhaarman:mockito-kotlin:1.1.0"
testCompile project(':test-utils')
testCompile project(':client')
testCompile project(':core')
// For H2 database support in persistence
compile "com.h2database:h2:1.4.193"
@ -156,8 +156,6 @@ dependencies {
// Integration test helpers
integrationTestCompile "junit:junit:$junit_version"
testCompile "com.nhaarman:mockito-kotlin:1.1.0"
task integrationTest(type: Test) {
@ -15,7 +15,6 @@ import java.util.function.Function
import javax.annotation.concurrent.ThreadSafe
object DataVending {
class Plugin : CordaPluginRegistry() {
override val servicePlugins = listOf(Function(::Service))
@ -1,5 +1,6 @@
package net.corda.node.services.statemachine
import net.corda.core.crypto.SecureHash
import net.corda.node.services.statemachine.StateMachineManager.FlowSession
// TODO revisit when Kotlin 1.1 is released and data classes can extend other classes
@ -7,14 +8,17 @@ interface FlowIORequest {
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
// don't have the original stack trace because it's in a suspended fiber.
val stackTraceInCaseOfProblems: StackSnapshot
interface SessionedFlowIORequest : FlowIORequest {
val session: FlowSession
interface SendRequest : FlowIORequest {
interface SendRequest : SessionedFlowIORequest {
val message: SessionMessage
interface ReceiveRequest<T : SessionMessage> : FlowIORequest {
interface ReceiveRequest<T : SessionMessage> : SessionedFlowIORequest {
val receiveType: Class<T>
@ -36,4 +40,9 @@ data class SendOnly(override val session: FlowSession, override val message: Ses
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
data class WaitForLedgerCommit(val hash: SecureHash, val fiber: FlowStateMachineImpl<*>) : FlowIORequest {
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem")
@ -7,11 +7,13 @@ import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
import net.corda.core.random63BitValue
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.trace
import net.corda.node.services.api.ServiceHubInternal
@ -72,7 +74,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
// This state IS serialised, as we need it to know what the fiber is waiting for.
internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSession>()
internal var waitingForLedgerCommitOf: SecureHash? = null
init {
logic.stateMachine = this
@ -172,6 +176,16 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction {
waitingForLedgerCommitOf = hash
logger.info("Waiting for transaction $hash to commit")
suspend(WaitForLedgerCommit(hash, sessionFlow.stateMachine as FlowStateMachineImpl<*>))
logger.info("Transaction $hash has committed to the ledger, resuming")
val stx = serviceHub.storageService.validatedTransactions.getTransaction(hash)
return stx ?: throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage")
private fun createSessionData(session: FlowSession, payload: Any): SessionData {
val sessionState = session.state
val peerSessionId = when (sessionState) {
@ -266,10 +280,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
private fun suspend(ioRequest: FlowIORequest) {
// we have to pass the Thread local Transaction across via a transient field as the Fiber Park swaps them out.
// We have to pass the thread local database transaction across via a transient field as the fiber park
// swaps them out.
txTrampoline = TransactionManager.currentOrNull()
ioRequest.session.waitingForResponse = (ioRequest is ReceiveRequest<*>)
if (ioRequest is SessionedFlowIORequest)
ioRequest.session.waitingForResponse = (ioRequest is ReceiveRequest<*>)
var exceptionDuringSuspend: Throwable? = null
parkAndSerialize { fiber, serializer ->
@ -6,11 +6,13 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
import co.paralleluniverse.strands.Strand
import com.codahale.metrics.Gauge
import com.esotericsoftware.kryo.Kryo
import com.google.common.collect.HashMultimap
import com.google.common.util.concurrent.ListenableFuture
import kotlinx.support.jdk8.collections.removeIf
import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.commonName
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
@ -62,7 +64,7 @@ import javax.annotation.concurrent.ThreadSafe
* TODO: Timeouts
* TODO: Surfacing of exceptions via an API and/or management UI
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
* TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
* TODO: Don't store all active flows in memory, load from the database on demand.
class StateMachineManager(val serviceHub: ServiceHubInternal,
@ -89,15 +91,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
// property.
private val mutex = ThreadBox(object {
private class InnerState {
var started = false
val stateMachines = LinkedHashMap<FlowStateMachineImpl<*>, Checkpoint>()
val changesPublisher = PublishSubject.create<Change>()
val changesPublisher = PublishSubject.create<Change>()!!
val fibersWaitingForLedgerCommit = HashMultimap.create<SecureHash, FlowStateMachineImpl<*>>()!!
fun notifyChangeObservers(fiber: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) {
changesPublisher.bufferUntilDatabaseCommit().onNext(Change(fiber.logic, addOrRemove, fiber.id))
private val mutex = ThreadBox(InnerState())
// True if we're shutting down, so don't resume anything.
@Volatile private var stopping = false
@ -152,9 +156,27 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
fun start() {
serviceHub.networkMapCache.mapServiceRegistered.then(executor) { resumeRestoredFibers() }
private fun listenToLedgerTransactions() {
// Observe the stream of committed, validated transactions and resume fibers that are waiting for them.
serviceHub.storageService.validatedTransactions.updates.subscribe { stx ->
val hash = stx.id
val flows: Set<FlowStateMachineImpl<*>> = mutex.locked { fibersWaitingForLedgerCommit.removeAll(hash) }
if (flows.isNotEmpty()) {
executor.executeASAP {
for (flow in flows) {
logger.info("Resuming ${flow.id} because it was waiting for tx ${flow.waitingForLedgerCommitOf!!} which is now committed.")
flow.waitingForLedgerCommitOf = null
private fun decrementLiveFibers() {
@ -217,8 +239,20 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun resumeRestoredFiber(fiber: FlowStateMachineImpl<*>) {
fiber.openSessions.values.forEach { openSessions[it.ourSessionId] = it }
val waitingForHash = fiber.waitingForLedgerCommitOf
if (fiber.openSessions.values.any { it.waitingForResponse }) {
fiber.logger.info("Restored, pending on receive")
} else if (waitingForHash != null) {
val stx = databaseTransaction(database) {
if (stx != null) {
fiber.logger.info("Resuming fiber as tx $waitingForHash has committed.")
} else {
fiber.logger.info("Restored, pending on ledger commit of $waitingForHash")
mutex.locked { fibersWaitingForLedgerCommit.put(waitingForHash, fiber) }
} else {
@ -424,6 +458,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
* Note that you must be on the [executor] thread.
fun <T> add(logic: FlowLogic<T>): FlowStateMachine<T> {
// TODO: Check that logic has @Suspendable on its call method.
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait
// on the flow completion future inside that context. The problem is that any progress checkpoints are
@ -457,8 +492,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun resumeFiber(fiber: FlowStateMachineImpl<*>) {
// Avoid race condition when setting stopping to true and then checking liveFibers
if (!stopping) executor.executeASAP {
if (!stopping) {
executor.executeASAP {
} else {
fiber.logger.debug("Not resuming as SMM is stopping.")
@ -466,6 +503,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun processIORequest(ioRequest: FlowIORequest) {
if (ioRequest is SendRequest) {
if (ioRequest.message is SessionInit) {
openSessions[ioRequest.session.ourSessionId] = ioRequest.session
@ -475,6 +513,24 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
} else if (ioRequest is WaitForLedgerCommit) {
// Is it already committed?
val stx = databaseTransaction(database) {
if (stx != null) {
} else {
// No, then register to wait.
// We assume this code runs on the server thread, which is the only place transactions are committed
// currently. When we liberalise our threading somewhat, handing of wait requests will need to be
// reworked to make the wait atomic in another way. Otherwise there is a race between checking the
// database and updating the waiting list.
mutex.locked {
fibersWaitingForLedgerCommit[ioRequest.hash] += ioRequest.fiber
@ -195,13 +195,6 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
* Generate a transaction that moves an amount of currency to the given pubkey.
* @param onlyFromParties if non-null, the asset states will be filtered to only include those issued by the set
* of given parties. This can be useful if the party you're trying to pay has expectations
* about which type of asset claims they are willing to accept.
override fun generateSpend(tx: TransactionBuilder,
amount: Amount<Currency>,
to: CompositeKey,
@ -5,8 +5,10 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand.UncaughtExceptionHandler
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.contracts.DOLLARS
import net.corda.core.contracts.DummyState
import net.corda.core.contracts.issuedBy
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.generateKeyPair
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
@ -19,8 +21,11 @@ import net.corda.core.random63BitValue
import net.corda.core.rootCause
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.serialization.deserialize
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.FinalityFlow
import net.corda.flows.NotaryFlow
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.transactions.ValidatingNotaryService
@ -483,9 +488,26 @@ class StateMachineManagerTests {
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(
networkMapNode: MockNode? = null): P {
disableDBCloseOnStop() //Handover DB to new node copy
fun `wait for transaction`() {
val ptx = TransactionBuilder(notary = notary1.info.notaryIdentity)
val stx = ptx.toSignedTransaction()
val future1 = node2.services.startFlow(WaitingFlows.Waiter(stx.id)).resultFuture
val future2 = node1.services.startFlow(WaitingFlows.Committer(stx, node2.info.legalIdentity)).resultFuture
//region Helpers
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(networkMapNode: MockNode? = null): P {
disableDBCloseOnStop() // Handover DB to new node copy
val newNode = mockNet.createNode(networkMapNode?.info?.address, id, advertisedServices = *advertisedServices.toTypedArray())
newNode.acceptableLiveFiberCountOnStop = 1
@ -611,4 +633,22 @@ class StateMachineManagerTests {
override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message
override fun hashCode(): Int = message?.hashCode() ?: 31
private object WaitingFlows {
class Waiter(private val hash: SecureHash) : FlowLogic<Unit>() {
override fun call() {
class Committer(private val stx: SignedTransaction, private val otherParty: Party) : FlowLogic<Unit>() {
override fun call() {
subFlow(FinalityFlow(stx, setOf(otherParty)))
//endregion Helpers
@ -55,10 +55,7 @@ class SellerFlow(val otherParty: Party,
val tradeTX: SignedTransaction = subFlow(seller, shareParentSessions = true)
return tradeTX
return subFlow(seller, shareParentSessions = true)
Reference in New Issue
Block a user