mirror of
https://github.com/corda/corda.git
synced 2025-01-01 10:46:46 +00:00
Improve the flow commit API.
Make FinalityFlow do more, and be used more consistently. Add a new waitForLedgerCommit API that is intended to be used at the end of flows, or at any other point where a flow wants to wait for a transaction to finalise (but the finalisation flow is being done by someone else). Update the docs a bit.
This commit is contained in:
parent
e0dd67cf14
commit
ead2ca2ade
@ -2,7 +2,9 @@ package net.corda.core.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.UntrustworthyData
|
||||
import org.slf4j.Logger
|
||||
@ -171,6 +173,16 @@ abstract class FlowLogic<out T> {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Suspends the flow until the transaction with the specified ID is received, successfully verified and
|
||||
* sent to the vault for processing. Note that this call suspends until the transaction is considered
|
||||
* valid by the local node, but that doesn't imply the vault will consider it relevant.
|
||||
*/
|
||||
@Suspendable
|
||||
fun waitForLedgerCommit(hash: SecureHash): SignedTransaction {
|
||||
return stateMachine.waitForLedgerCommit(hash, this)
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private var _stateMachine: FlowStateMachine<*>? = null
|
||||
|
@ -3,7 +3,9 @@ package net.corda.core.flows
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.UntrustworthyData
|
||||
import org.slf4j.Logger
|
||||
import java.util.*
|
||||
@ -35,6 +37,9 @@ interface FlowStateMachine<R> {
|
||||
@Suspendable
|
||||
fun send(otherParty: Party, payload: Any, sessionFlow: FlowLogic<*>)
|
||||
|
||||
@Suspendable
|
||||
fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction
|
||||
|
||||
val serviceHub: ServiceHub
|
||||
val logger: Logger
|
||||
val id: StateMachineRunId
|
||||
|
@ -43,7 +43,8 @@ interface MessagingService {
|
||||
|
||||
/**
|
||||
* The provided function will be invoked for each received message whose topic and session matches. The callback
|
||||
* will run on threads provided by the messaging service, and the callback is expected to be thread safe as a result.
|
||||
* will run on the main server thread provided when the messaging service is constructed, and a database
|
||||
* transaction is set up for you automatically.
|
||||
*
|
||||
* The returned object is an opaque handle that may be used to un-register handlers later with [removeMessageHandler].
|
||||
* The handle is passed to the callback as well, to avoid race conditions whereby the callback wants to unregister
|
||||
|
@ -29,11 +29,12 @@ interface ServiceHub {
|
||||
val myInfo: NodeInfo
|
||||
|
||||
/**
|
||||
* Given a list of [SignedTransaction]s, writes them to the local storage for validated transactions and then
|
||||
* sends them to the vault for further processing.
|
||||
* Given a [SignedTransaction], writes it to the local storage for validated transactions and then
|
||||
* sends them to the vault for further processing. Expects to be run within a database transaction.
|
||||
*
|
||||
* @param txs The transactions to record.
|
||||
*/
|
||||
// TODO: Make this take a single tx.
|
||||
fun recordTransactions(txs: Iterable<SignedTransaction>)
|
||||
|
||||
/**
|
||||
|
@ -178,10 +178,22 @@ interface VaultService {
|
||||
fun getTransactionNotes(txnId: SecureHash): Iterable<String>
|
||||
|
||||
/**
|
||||
* [InsufficientBalanceException] is thrown when a Cash Spending transaction fails because
|
||||
* there is insufficient quantity for a given currency (and optionally set of Issuer Parties).
|
||||
* Note: an [Amount] of [Currency] is only fungible for a given Issuer Party within a [FungibleAsset]
|
||||
**/
|
||||
* Generate a transaction that moves an amount of currency to the given pubkey.
|
||||
*
|
||||
* Note: an [Amount] of [Currency] is only fungible for a given Issuer Party within a [FungibleAsset]
|
||||
*
|
||||
* @param tx A builder, which may contain inputs, outputs and commands already. The relevant components needed
|
||||
* to move the cash will be added on top.
|
||||
* @param amount How much currency to send.
|
||||
* @param to a key of the recipient.
|
||||
* @param onlyFromParties if non-null, the asset states will be filtered to only include those issued by the set
|
||||
* of given parties. This can be useful if the party you're trying to pay has expectations
|
||||
* about which type of asset claims they are willing to accept.
|
||||
* @return A [Pair] of the same transaction builder passed in as [tx], and the list of keys that need to sign
|
||||
* the resulting transaction for it to be valid.
|
||||
* @throws InsufficientBalanceException when a cash spending transaction fails because
|
||||
* there is insufficient quantity for a given currency (and optionally set of Issuer Parties).
|
||||
*/
|
||||
@Throws(InsufficientBalanceException::class)
|
||||
fun generateSpend(tx: TransactionBuilder,
|
||||
amount: Amount<Currency>,
|
||||
|
@ -3,13 +3,13 @@ package net.corda.flows
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.recordTransactions
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
|
||||
|
||||
/**
|
||||
* Notify all involved parties about a transaction, including storing a copy. Normally this would be called via
|
||||
* [FinalityFlow].
|
||||
* Notify the specified parties about a transaction. The remote peers will download this transaction and its
|
||||
* dependency graph, verifying them all. The flow returns when all peers have acknowledged the transactions
|
||||
* as valid. Normally you wouldn't use this directly, it would be called via [FinalityFlow].
|
||||
*
|
||||
* @param notarisedTransaction transaction which has been notarised (if needed) and is ready to notify nodes about.
|
||||
* @param participants a list of participants involved in the transaction.
|
||||
@ -17,17 +17,14 @@ import net.corda.core.transactions.SignedTransaction
|
||||
*/
|
||||
class BroadcastTransactionFlow(val notarisedTransaction: SignedTransaction,
|
||||
val participants: Set<Party>) : FlowLogic<Unit>() {
|
||||
|
||||
data class NotifyTxRequest(val tx: SignedTransaction)
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
// Record it locally
|
||||
serviceHub.recordTransactions(notarisedTransaction)
|
||||
|
||||
// TODO: Messaging layer should handle this broadcast for us
|
||||
val msg = NotifyTxRequest(notarisedTransaction)
|
||||
participants.filter { it != serviceHub.myInfo.legalIdentity }.forEach { participant ->
|
||||
// This pops out the other side in DataVending.NotifyTransactionHandler.
|
||||
send(participant, msg)
|
||||
}
|
||||
}
|
||||
|
@ -1,48 +1,87 @@
|
||||
package net.corda.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.contracts.ContractState
|
||||
import net.corda.core.contracts.StateRef
|
||||
import net.corda.core.contracts.TransactionState
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.transactions.LedgerTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
|
||||
|
||||
/**
|
||||
* Finalise a transaction by notarising it, then recording it locally, and then sending it to all involved parties.
|
||||
* Verifies the given transactions, then sends them to the named notaries. If the notary agrees that the transactions
|
||||
* are acceptable then they are from that point onwards committed to the ledger, and will be written through to the
|
||||
* vault. Additionally they will be distributed to the parties reflected in the participants list of the states.
|
||||
*
|
||||
* @param transaction to commit.
|
||||
* @param participants a list of participants involved in the transaction.
|
||||
* @return a list of participants who were successfully notified of the transaction.
|
||||
* The transactions will be topologically sorted before commitment to ensure that dependencies are committed before
|
||||
* dependers, so you don't need to do this yourself.
|
||||
*
|
||||
* The transactions are expected to have already been resolved: if their dependencies are not available in local
|
||||
* storage or within the given set, verification will fail. They must have signatures from all necessary parties
|
||||
* other than the notary.
|
||||
*
|
||||
* If specified, the extra recipients are sent all the given transactions. The base set of parties to inform of each
|
||||
* transaction are calculated on a per transaction basis from the contract-given set of participants.
|
||||
*
|
||||
* The flow returns the same transactions, in the same order, with the additional signatures.
|
||||
*
|
||||
* @param transactions What to commit.
|
||||
* @param extraRecipients A list of additional participants to inform of the transaction.
|
||||
*/
|
||||
class FinalityFlow(val transaction: SignedTransaction,
|
||||
val participants: Set<Party>,
|
||||
override val progressTracker: ProgressTracker) : FlowLogic<Unit>() {
|
||||
constructor(transaction: SignedTransaction, participants: Set<Party>) : this(transaction, participants, tracker())
|
||||
class FinalityFlow(val transactions: Iterable<SignedTransaction>,
|
||||
val extraRecipients: Set<Party>,
|
||||
override val progressTracker: ProgressTracker) : FlowLogic<List<SignedTransaction>>() {
|
||||
constructor(transaction: SignedTransaction, extraParticipants: Set<Party>) : this(listOf(transaction), extraParticipants, tracker())
|
||||
constructor(transaction: SignedTransaction) : this(listOf(transaction), emptySet(), tracker())
|
||||
|
||||
companion object {
|
||||
object NOTARISING : ProgressTracker.Step("Requesting signature by notary service")
|
||||
object NOTARISING : ProgressTracker.Step("Requesting signature by notary service") {
|
||||
override fun childProgressTracker() = NotaryFlow.Client.tracker()
|
||||
}
|
||||
object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants")
|
||||
|
||||
// TODO: Make all tracker() methods @JvmStatic
|
||||
fun tracker() = ProgressTracker(NOTARISING, BROADCASTING)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
@Throws(NotaryException::class)
|
||||
override fun call() {
|
||||
// TODO: Resolve the tx here: it's probably already been done, but re-resolution is a no-op and it'll make the API more forgiving.
|
||||
|
||||
override fun call(): List<SignedTransaction> {
|
||||
// Note: this method is carefully broken up to minimize the amount of data reachable from the stack at
|
||||
// the point where subFlow is invoked, as that minimizes the checkpointing work to be done.
|
||||
//
|
||||
// Lookup the resolved transactions and use them to map each signed transaction to the list of participants.
|
||||
// Then send to the notary if needed, record locally and distribute.
|
||||
progressTracker.currentStep = NOTARISING
|
||||
// Notarise the transaction if needed
|
||||
val notarisedTransaction = if (needsNotarySignature(transaction)) {
|
||||
val notarySig = subFlow(NotaryFlow.Client(transaction))
|
||||
transaction.withAdditionalSignature(notarySig)
|
||||
} else {
|
||||
transaction
|
||||
}
|
||||
val notarisedTxns = notariseAndRecord(lookupParties(resolveDependenciesOf(transactions)))
|
||||
|
||||
// Let everyone else know about the transaction
|
||||
// Each transaction has its own set of recipients, but extra recipients get them all.
|
||||
progressTracker.currentStep = BROADCASTING
|
||||
subFlow(BroadcastTransactionFlow(notarisedTransaction, participants))
|
||||
val me = serviceHub.myInfo.legalIdentity
|
||||
for ((stx, parties) in notarisedTxns) {
|
||||
subFlow(BroadcastTransactionFlow(stx, parties + extraRecipients - me))
|
||||
}
|
||||
return notarisedTxns.map { it.first }
|
||||
}
|
||||
|
||||
// TODO: API: Make some of these protected?
|
||||
|
||||
@Suspendable
|
||||
private fun notariseAndRecord(stxnsAndParties: List<Pair<SignedTransaction, Set<Party>>>): List<Pair<SignedTransaction, Set<Party>>> {
|
||||
return stxnsAndParties.map { pair ->
|
||||
val stx = pair.first
|
||||
val notarised = if (needsNotarySignature(stx)) {
|
||||
val notarySig = subFlow(NotaryFlow.Client(stx))
|
||||
stx + notarySig
|
||||
} else {
|
||||
stx
|
||||
}
|
||||
serviceHub.recordTransactions(listOf(notarised))
|
||||
Pair(notarised, pair.second)
|
||||
}
|
||||
}
|
||||
|
||||
private fun needsNotarySignature(stx: SignedTransaction) = stx.tx.notary != null && hasNoNotarySignature(stx)
|
||||
@ -51,4 +90,38 @@ class FinalityFlow(val transaction: SignedTransaction,
|
||||
val signers = stx.sigs.map { it.by }.toSet()
|
||||
return !(notaryKey?.isFulfilledBy(signers) ?: false)
|
||||
}
|
||||
|
||||
private fun lookupParties(ltxns: List<Pair<SignedTransaction, LedgerTransaction>>): List<Pair<SignedTransaction, Set<Party>>> {
|
||||
return ltxns.map { pair ->
|
||||
val (stx, ltx) = pair
|
||||
// Calculate who is meant to see the results based on the participants involved.
|
||||
val keys = ltx.outputs.flatMap { it.data.participants } + ltx.inputs.flatMap { it.state.data.participants }
|
||||
// TODO: Is it safe to drop participants we don't know how to contact? Does not knowing how to contact them count as a reason to fail?
|
||||
val parties = keys.mapNotNull { serviceHub.identityService.partyFromKey(it) }.toSet()
|
||||
Pair(stx, parties)
|
||||
}
|
||||
}
|
||||
|
||||
private fun resolveDependenciesOf(signedTransactions: Iterable<SignedTransaction>): List<Pair<SignedTransaction, LedgerTransaction>> {
|
||||
// Make sure the dependencies come before the dependers.
|
||||
val sorted = ResolveTransactionsFlow.topologicalSort(signedTransactions.toList())
|
||||
// Build a ServiceHub that consults the argument list as well as what's in local tx storage so uncommitted
|
||||
// transactions can depend on each other.
|
||||
val augmentedLookup = object : ServiceHub by serviceHub {
|
||||
val hashToTx = sorted.associateBy { it.id }
|
||||
override fun loadState(stateRef: StateRef): TransactionState<*> {
|
||||
val provided: TransactionState<ContractState>? = hashToTx[stateRef.txhash]?.let { it.tx.outputs[stateRef.index] }
|
||||
return provided ?: super.loadState(stateRef)
|
||||
}
|
||||
}
|
||||
// Load and verify each transaction.
|
||||
return sorted.map { stx ->
|
||||
val notary = stx.tx.notary
|
||||
// The notary signature is allowed to be missing but no others.
|
||||
val wtx = if (notary != null) stx.verifySignatures(notary.owningKey) else stx.verifySignatures()
|
||||
val ltx = wtx.toLedgerTransaction(augmentedLookup)
|
||||
ltx.verify()
|
||||
stx to ltx
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,6 @@ import net.corda.core.transactions.WireTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
|
||||
object NotaryFlow {
|
||||
|
||||
/**
|
||||
* A flow to be used for obtaining a signature from a [NotaryService] ascertaining the transaction
|
||||
* timestamp is correct and none of its inputs have been used in another completed transaction.
|
||||
|
@ -34,13 +34,16 @@ class ResolveTransactionsFlow(private val txHashes: Set<SecureHash>,
|
||||
companion object {
|
||||
private fun dependencyIDs(wtx: WireTransaction) = wtx.inputs.map { it.txhash }.toSet()
|
||||
|
||||
private fun topologicalSort(transactions: Collection<SignedTransaction>): List<SignedTransaction> {
|
||||
/**
|
||||
* Topologically sorts the given transactions such that dependencies are listed before dependers. */
|
||||
@JvmStatic
|
||||
fun topologicalSort(transactions: Collection<SignedTransaction>): List<SignedTransaction> {
|
||||
// Construct txhash -> dependent-txs map
|
||||
val forwardGraph = HashMap<SecureHash, HashSet<SignedTransaction>>()
|
||||
transactions.forEach { tx ->
|
||||
tx.tx.inputs.forEach { input ->
|
||||
transactions.forEach { stx ->
|
||||
stx.tx.inputs.forEach { input ->
|
||||
// Note that we use a LinkedHashSet here to make the traversal deterministic (as long as the input list is)
|
||||
forwardGraph.getOrPut(input.txhash) { LinkedHashSet() }.add(tx)
|
||||
forwardGraph.getOrPut(input.txhash) { LinkedHashSet() }.add(stx)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -174,7 +174,7 @@ class ForeignExchangeFlow(val tradeId: String,
|
||||
withNewSignature // return the almost complete transaction
|
||||
}
|
||||
|
||||
// Initiate the standard protocol to notarise and distribute to the involved parties
|
||||
// Initiate the standard protocol to notarise and distribute to the involved parties.
|
||||
subFlow(FinalityFlow(allPartySignedTx, setOf(baseCurrencyBuyer, baseCurrencySeller)))
|
||||
|
||||
return allPartySignedTx.id
|
||||
|
@ -27,6 +27,7 @@ inline fun <reified T : LinearState> ServiceHub.latest(ref: StateRef): StateAndR
|
||||
val original = toStateAndRef<T>(ref)
|
||||
return linearHeads.get(original.state.data.linearId)!!
|
||||
}
|
||||
|
||||
// DOCEND 1
|
||||
|
||||
// Minimal state model of a manual approval process
|
||||
@ -121,18 +122,14 @@ class SubmitTradeApprovalFlow(val tradeId: String,
|
||||
// identify a notary. This might also be done external to the flow
|
||||
val notary = serviceHub.networkMapCache.getAnyNotary()
|
||||
// Create the TransactionBuilder and populate with the new state.
|
||||
val tx = TransactionType.
|
||||
General.
|
||||
Builder(notary).
|
||||
withItems(tradeProposal,
|
||||
Command(TradeApprovalContract.Commands.Issue(),
|
||||
listOf(tradeProposal.source.owningKey)))
|
||||
val tx = TransactionType.General.Builder(notary)
|
||||
.withItems(tradeProposal, Command(TradeApprovalContract.Commands.Issue(), listOf(tradeProposal.source.owningKey)))
|
||||
tx.setTime(serviceHub.clock.instant(), Duration.ofSeconds(60))
|
||||
// We can automatically sign as there is no untrusted data.
|
||||
tx.signWith(serviceHub.legalIdentityKey)
|
||||
// Convert to a SignedTransaction that we can send to the notary
|
||||
val signedTx = tx.toSignedTransaction(false)
|
||||
// Run the FinalityFlow to notarise and distribute the SignedTransaction to the counterparty
|
||||
// Notarise and distribute.
|
||||
subFlow(FinalityFlow(signedTx, setOf(serviceHub.myInfo.legalIdentity, counterparty)))
|
||||
// Return the initial state
|
||||
return signedTx.tx.outRef<TradeApprovalContract.State>(0)
|
||||
@ -210,10 +207,8 @@ class SubmitCompletionFlow(val ref: StateRef, val verdict: WorkflowState) : Flow
|
||||
agreedTx
|
||||
}
|
||||
// DOCSTART 4
|
||||
// Run the FinalityFlow to notarise and distribute the completed transaction.
|
||||
subFlow(FinalityFlow(allPartySignedTx,
|
||||
setOf(latestRecord.state.data.source, latestRecord.state.data.counterparty)))
|
||||
|
||||
// Notarise and distribute the completed transaction.
|
||||
subFlow(FinalityFlow(allPartySignedTx, setOf(latestRecord.state.data.source, latestRecord.state.data.counterparty)))
|
||||
// DOCEND 4
|
||||
// Return back the details of the completed state/transaction.
|
||||
return allPartySignedTx.tx.outRef<TradeApprovalContract.State>(0)
|
||||
|
@ -89,7 +89,9 @@ Our flow has two parties (B and S for buyer and seller) and will proceed as foll
|
||||
2. B sends to S a ``SignedTransaction`` that includes the state as input, B's cash as input, the state with the new
|
||||
owner key as output, and any change cash as output. It contains a single signature from B but isn't valid because
|
||||
it lacks a signature from S authorising movement of the asset.
|
||||
3. S signs it and hands the now finalised ``SignedTransaction`` back to B.
|
||||
3. S signs it and *finalises* the transaction. This means sending it to the notary, which checks the transaction for
|
||||
validity, recording the transaction in the local vault, and then sending it back to B who also checks it and commits
|
||||
the transaction to their local vault.
|
||||
|
||||
You can find the implementation of this flow in the file ``finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt``.
|
||||
|
||||
@ -98,8 +100,7 @@ represents an atomic asset swap.
|
||||
|
||||
Note that it's the *seller* who initiates contact with the buyer, not vice-versa as you might imagine.
|
||||
|
||||
We start by defining a wrapper that namespaces the flow code, two functions to start either the buy or sell side
|
||||
of the flow, and two classes that will contain the flow definition. We also pick what data will be used by
|
||||
We start by defining two classes that will contain the flow definition. We also pick what data will be used by
|
||||
each side.
|
||||
|
||||
.. note:: The code samples in this tutorial are only available in Kotlin, but you can use any JVM language to
|
||||
@ -110,7 +111,6 @@ each side.
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
object TwoPartyTradeFlow {
|
||||
|
||||
class UnacceptablePriceException(val givenPrice: Amount<Currency>) : FlowException("Unacceptable price: $givenPrice")
|
||||
class AssetMismatchException(val expectedTypeName: String, val typeName: String) : FlowException() {
|
||||
override fun toString() = "The submitted asset didn't match the expected type: $expectedTypeName vs $typeName"
|
||||
@ -188,8 +188,6 @@ and try again.
|
||||
|
||||
.. note:: Java 9 is likely to remove this pre-marking requirement completely.
|
||||
|
||||
.. note:: Accessing the vault from inside an @Suspendable function (e.g. via ``serviceHub.vaultService``) can cause a serialisation error when the fiber suspends. Instead, vault access should be performed from a helper non-suspendable function, which you then call from the @Suspendable function. We are working to fix this.
|
||||
|
||||
Starting your flow
|
||||
------------------
|
||||
|
||||
@ -248,12 +246,11 @@ Let's implement the ``Seller.call`` method. This will be run when the flow is in
|
||||
:dedent: 4
|
||||
|
||||
Here we see the outline of the procedure. We receive a proposed trade transaction from the buyer and check that it's
|
||||
valid. The buyer has already attached their signature before sending it. Then we calculate and attach our own signature so that the transaction is
|
||||
now signed by both the buyer and the seller. We then send this request to a notary to assert with another signature that the
|
||||
timestamp in the transaction (if any) is valid and there are no double spends, and send back both
|
||||
our signature and the notaries signature. Note we should not send to the notary until all other required signatures have been appended
|
||||
as the notary may validate the signatures as well as verifying for itself the transactional integrity.
|
||||
Finally, we hand back to the code that invoked the flow the finished transaction.
|
||||
valid. The buyer has already attached their signature before sending it. Then we calculate and attach our own signature
|
||||
so that the transaction is now signed by both the buyer and the seller. We then *finalise* this transaction by sending
|
||||
it to a notary to assert (with another signature) that the timestamp in the transaction (if any) is valid and there are no
|
||||
double spends. Finally, after the finalisation process is complete, we retrieve the now fully signed transaction from
|
||||
local storage. It will have the same ID as the one we started with but more signatures.
|
||||
|
||||
Let's fill out the ``receiveAndCheckProposedTransaction()`` method.
|
||||
|
||||
@ -327,24 +324,39 @@ Throwing a ``FlowException`` enables a flow to reject a piece of data it has rec
|
||||
done in the ``unwrap`` method of the received ``UntrustworthyData``. In the above example the seller checks the price
|
||||
and throws ``FlowException`` if it's invalid. It's then up to the buyer to either try again with a better price or give up.
|
||||
|
||||
Sub-flows
|
||||
---------
|
||||
Sub-flows and finalisation
|
||||
--------------------------
|
||||
|
||||
Flows can be composed via nesting. Invoking a sub-flow looks similar to an ordinary function call:
|
||||
|
||||
.. container:: codeset
|
||||
|
||||
.. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART 6
|
||||
:end-before: DOCEND 6
|
||||
:dedent: 4
|
||||
.. sourcecode:: kotlin
|
||||
|
||||
@Suspendable
|
||||
fun call() {
|
||||
val unnotarisedTransaction = ...
|
||||
subFlow(FinalityFlow(unnotarisedTransaction))
|
||||
}
|
||||
|
||||
.. sourcecode:: java
|
||||
|
||||
@Suspendable
|
||||
public void call() throws FlowException {
|
||||
SignedTransaction unnotarisedTransaction = ...
|
||||
subFlow(new FinalityFlow(unnotarisedTransaction))
|
||||
}
|
||||
|
||||
In this code snippet we are using the ``FinalityFlow`` to finish off the transaction. It will:
|
||||
|
||||
* Send the transaction to the chosen notary and, if necessary, satisfy the notary that the transaction is valid.
|
||||
* Record the transaction in the local vault, if it is relevant (i.e. involves the owner of the node).
|
||||
* Send the fully signed transaction to the other participants for recording as well.
|
||||
|
||||
In this code snippet we are using the ``NotaryFlow.Client`` to request notarisation of the transaction.
|
||||
We simply create the flow object via its constructor, and then pass it to the ``subFlow`` method which
|
||||
returns the result of the flow's execution directly. Behind the scenes all this is doing is wiring up progress
|
||||
tracking (discussed more below) and then running the objects ``call`` method. Because this little helper method can
|
||||
be on the stack when network IO takes place, we mark it as ``@Suspendable``.
|
||||
tracking (discussed more below) and then running the objects ``call`` method. Because the sub-flow might suspend,
|
||||
we must mark the method that invokes it as suspendable.
|
||||
|
||||
Going back to the previous code snippet, we use a sub-flow called ``ResolveTransactionsFlow``. This is
|
||||
responsible for downloading and checking all the dependencies of a transaction, which in Corda are always retrievable
|
||||
@ -360,32 +372,11 @@ objects, but we don't need them here so we just ignore the return value.
|
||||
After the dependencies, we check the proposed trading transaction for validity by running the contracts for that as
|
||||
well (but having handled the fact that some signatures are missing ourselves).
|
||||
|
||||
Here's the rest of the code:
|
||||
|
||||
.. container:: codeset
|
||||
|
||||
.. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART 7
|
||||
:end-before: DOCEND 7
|
||||
:dedent: 4
|
||||
|
||||
It's all pretty straightforward from now on. Here ``id`` is the secure hash representing the serialised
|
||||
transaction, and we just use our private key to calculate a signature over it. As a reminder, in Corda signatures do
|
||||
not cover other signatures: just the core of the transaction data.
|
||||
|
||||
In ``sendSignatures``, we take the two signatures we obtained and add them to the partial transaction we were sent.
|
||||
There is an overload for the + operator so signatures can be added to a SignedTransaction easily. Finally, we wrap the
|
||||
two signatures in a simple wrapper message class and send it back. The send won't block waiting for an acknowledgement,
|
||||
but the underlying message queue software will retry delivery if the other side has gone away temporarily.
|
||||
|
||||
You can also see that every flow instance has a logger (using the SLF4J API) which you can use to log progress
|
||||
messages.
|
||||
|
||||
.. warning:: This sample code is **not secure**. Other than not checking for all possible invalid constructions, if the
|
||||
seller stops before sending the finalised transaction to the buyer, the seller is left with a valid transaction
|
||||
but the buyer isn't, so they can't spend the asset they just purchased! This sort of thing will be fixed in a
|
||||
future version of the code.
|
||||
.. warning:: If the seller stops before sending the finalised transaction to the buyer, the seller is left with a
|
||||
valid transaction but the buyer isn't, so they can't spend the asset they just purchased! This sort of thing is not
|
||||
always a risk (as the seller may not gain anything from that sort of behaviour except a lawsuit), but if it is, a future
|
||||
version of the platform will allow you to ask the notary to send you the transaction as well, in case your counterparty
|
||||
does not. This is not the default because it reveals more private info to the notary.
|
||||
|
||||
Implementing the buyer
|
||||
----------------------
|
||||
@ -403,12 +394,11 @@ OK, let's do the same for the buyer side:
|
||||
This code is longer but no more complicated. Here are some things to pay attention to:
|
||||
|
||||
1. We do some sanity checking on the received message to ensure we're being offered what we expected to be offered.
|
||||
2. We create a cash spend in the normal way, by using ``VaultService.generateSpend``. See the vault documentation if this
|
||||
part isn't clear.
|
||||
2. We create a cash spend using ``VaultService.generateSpend``. You can read the vault documentation to learn more about this.
|
||||
3. We access the *service hub* when we need it to access things that are transient and may change or be recreated
|
||||
whilst a flow is suspended, things like the wallet or the network map.
|
||||
4. Finally, we send the unfinished, invalid transaction to the seller so they can sign it. They are expected to send
|
||||
back to us a ``SignaturesFromSeller``, which once we verify it, should be the final outcome of the trade.
|
||||
4. We send the unfinished, invalid transaction to the seller so they can sign it and finalise it.
|
||||
5. Finally, we wait for the finished transaction to arrive in our local storage and vault.
|
||||
|
||||
As you can see, the flow logic is straightforward and does not contain any callbacks or network glue code, despite
|
||||
the fact that it takes minimal resources and can survive node restarts.
|
||||
@ -435,7 +425,7 @@ A flow might declare some steps with code inside the flow class like this:
|
||||
.. literalinclude:: ../../finance/src/main/kotlin/net/corda/flows/TwoPartyTradeFlow.kt
|
||||
:language: kotlin
|
||||
:start-after: DOCSTART 2
|
||||
:end-before: DOCSTART 1
|
||||
:end-before: DOCEND 2
|
||||
:dedent: 4
|
||||
|
||||
.. sourcecode:: java
|
||||
|
@ -10,6 +10,10 @@ Milestone 8
|
||||
|
||||
* ``Party`` equality is now based on the owning key, rather than the owning key and name. This is important for
|
||||
party anonymisation to work, as each key must identify exactly one party.
|
||||
* A new ``waitForLedgerCommit`` method is available inside flows. Given a hash it will suspend the flow until
|
||||
a valid transaction with that hash has been received, committed and processed by the vault. This is useful
|
||||
in multi-party flows where one side takes responsibility for sending the finished transaction to the notary,
|
||||
and the other side wishes to wait for it.
|
||||
|
||||
Milestone 7
|
||||
-----------
|
||||
|
@ -123,9 +123,8 @@ class CashFlow(val command: CashCommand, override val progressTracker: ProgressT
|
||||
Cash().generateIssue(builder, req.amount.issuedBy(issuer), req.recipient.owningKey, req.notary)
|
||||
val myKey = serviceHub.legalIdentityKey
|
||||
builder.signWith(myKey)
|
||||
val tx = builder.toSignedTransaction(checkSufficientSignatures = true)
|
||||
// Issuance transactions do not need to be notarised, so we can skip directly to broadcasting it
|
||||
subFlow(BroadcastTransactionFlow(tx, setOf(req.recipient)))
|
||||
val tx = builder.toSignedTransaction()
|
||||
subFlow(FinalityFlow(tx))
|
||||
return tx
|
||||
}
|
||||
}
|
||||
|
@ -25,23 +25,17 @@ import java.util.*
|
||||
* 2. B sends to S a [SignedTransaction] that includes the state as input, B's cash as input, the state with the new
|
||||
* owner key as output, and any change cash as output. It contains a single signature from B but isn't valid because
|
||||
* it lacks a signature from S authorising movement of the asset.
|
||||
* 3. S signs it and hands the now finalised SignedWireTransaction back to B.
|
||||
* 3. S signs it and commits it to the ledger, notarising it and distributing the final signed transaction back
|
||||
* to B.
|
||||
*
|
||||
* Assuming no malicious termination, they both end the flow being in posession of a valid, signed transaction
|
||||
* that represents an atomic asset swap.
|
||||
*
|
||||
* Note that it's the *seller* who initiates contact with the buyer, not vice-versa as you might imagine.
|
||||
*
|
||||
* To initiate the flow, use either the [runBuyer] or [runSeller] methods, depending on which side of the trade
|
||||
* your node is taking. These methods return a future which will complete once the trade is over and a fully signed
|
||||
* transaction is available: you can either block your thread waiting for the flow to complete by using
|
||||
* [ListenableFuture.get] or more usefully, register a callback that will be invoked when the time comes.
|
||||
*
|
||||
* To see an example of how to use this class, look at the unit tests.
|
||||
*/
|
||||
// TODO: Common elements in multi-party transaction consensus and signing should be refactored into a superclass of this
|
||||
// and [AbstractStateReplacementFlow].
|
||||
object TwoPartyTradeFlow {
|
||||
// TODO: Common elements in multi-party transaction consensus and signing should be refactored into a superclass of this
|
||||
// and [AbstractStateReplacementFlow].
|
||||
|
||||
class UnacceptablePriceException(givenPrice: Amount<Currency>) : FlowException("Unacceptable price: $givenPrice")
|
||||
class AssetMismatchException(val expectedTypeName: String, val typeName: String) : FlowException() {
|
||||
@ -70,35 +64,26 @@ object TwoPartyTradeFlow {
|
||||
object VERIFYING : ProgressTracker.Step("Verifying transaction proposal")
|
||||
object SIGNING : ProgressTracker.Step("Signing transaction")
|
||||
// DOCSTART 3
|
||||
object NOTARY : ProgressTracker.Step("Getting notary signature") {
|
||||
object COMMITTING : ProgressTracker.Step("Committing transaction to the ledger") {
|
||||
override fun childProgressTracker() = FinalityFlow.tracker()
|
||||
}
|
||||
// DOCEND 3
|
||||
object SENDING_SIGS : ProgressTracker.Step("Sending transaction signatures to buyer")
|
||||
object SENDING_FINAL_TX : ProgressTracker.Step("Sending final transaction to buyer")
|
||||
|
||||
fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, NOTARY, SENDING_SIGS)
|
||||
fun tracker() = ProgressTracker(AWAITING_PROPOSAL, VERIFYING, SIGNING, COMMITTING, SENDING_FINAL_TX)
|
||||
}
|
||||
|
||||
// DOCSTART 4
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
val partialTX: SignedTransaction = receiveAndCheckProposedTransaction()
|
||||
val ourSignature: DigitalSignature.WithKey = calculateOurSignature(partialTX)
|
||||
val allPartySignedTx: SignedTransaction = partialTX + ourSignature
|
||||
val notarySignature: DigitalSignature.WithKey = getNotarySignature(allPartySignedTx)
|
||||
val result: SignedTransaction = sendSignatures(allPartySignedTx, ourSignature, notarySignature)
|
||||
return result
|
||||
val partialSTX: SignedTransaction = receiveAndCheckProposedTransaction()
|
||||
val ourSignature = calculateOurSignature(partialSTX)
|
||||
val unnotarisedSTX: SignedTransaction = partialSTX + ourSignature
|
||||
val finishedSTX = subFlow(FinalityFlow(unnotarisedSTX)).single()
|
||||
return finishedSTX
|
||||
}
|
||||
// DOCEND 4
|
||||
|
||||
// DOCSTART 6
|
||||
@Suspendable
|
||||
private fun getNotarySignature(stx: SignedTransaction): DigitalSignature.WithKey {
|
||||
progressTracker.currentStep = NOTARY
|
||||
return subFlow(NotaryFlow.Client(stx))
|
||||
}
|
||||
// DOCEND 6
|
||||
|
||||
// DOCSTART 5
|
||||
@Suspendable
|
||||
private fun receiveAndCheckProposedTransaction(): SignedTransaction {
|
||||
@ -107,14 +92,12 @@ object TwoPartyTradeFlow {
|
||||
val myPublicKey = myKeyPair.public.composite
|
||||
// Make the first message we'll send to kick off the flow.
|
||||
val hello = SellerTradeInfo(assetToSell, price, myPublicKey)
|
||||
|
||||
val maybeSTX = sendAndReceive<SignedTransaction>(otherParty, hello)
|
||||
// What we get back from the other side is a transaction that *might* be valid and acceptable to us,
|
||||
// but we must check it out thoroughly before we sign!
|
||||
val untrustedSTX = sendAndReceive<SignedTransaction>(otherParty, hello)
|
||||
|
||||
progressTracker.currentStep = VERIFYING
|
||||
|
||||
maybeSTX.unwrap {
|
||||
progressTracker.nextStep()
|
||||
|
||||
return untrustedSTX.unwrap {
|
||||
// Check that the tx proposed by the buyer is valid.
|
||||
val wtx: WireTransaction = it.verifySignatures(myPublicKey, notaryNode.notaryIdentity.owningKey)
|
||||
logger.trace { "Received partially signed transaction: ${it.id}" }
|
||||
@ -123,11 +106,10 @@ object TwoPartyTradeFlow {
|
||||
// even though it is missing signatures.
|
||||
subFlow(ResolveTransactionsFlow(wtx, otherParty))
|
||||
|
||||
if (wtx.outputs.map { it.data }.sumCashBy(myPublicKey).withoutIssuer() != price) {
|
||||
if (wtx.outputs.map { it.data }.sumCashBy(myPublicKey).withoutIssuer() != price)
|
||||
throw FlowException("Transaction is not sending us the right amount of cash")
|
||||
}
|
||||
|
||||
return it
|
||||
it
|
||||
}
|
||||
}
|
||||
// DOCEND 5
|
||||
@ -144,64 +126,50 @@ object TwoPartyTradeFlow {
|
||||
// but the goal of this code is not to be fully secure (yet), but rather, just to find good ways to
|
||||
// express flow state machines on top of the messaging layer.
|
||||
|
||||
// DOCSTART 7
|
||||
open fun calculateOurSignature(partialTX: SignedTransaction): DigitalSignature.WithKey {
|
||||
progressTracker.currentStep = SIGNING
|
||||
return myKeyPair.signWithECDSA(partialTX.id)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun sendSignatures(allPartySignedTx: SignedTransaction,
|
||||
ourSignature: DigitalSignature.WithKey,
|
||||
notarySignature: DigitalSignature.WithKey): SignedTransaction {
|
||||
progressTracker.currentStep = SENDING_SIGS
|
||||
val fullySigned = allPartySignedTx + notarySignature
|
||||
|
||||
logger.trace { "Built finished transaction, sending back to secondary!" }
|
||||
|
||||
send(otherParty, SignaturesFromSeller(ourSignature, notarySignature))
|
||||
return fullySigned
|
||||
}
|
||||
// DOCEND 7
|
||||
}
|
||||
|
||||
// DOCSTART 2
|
||||
open class Buyer(val otherParty: Party,
|
||||
val notary: Party,
|
||||
val acceptablePrice: Amount<Currency>,
|
||||
val typeToBuy: Class<out OwnableState>) : FlowLogic<SignedTransaction>() {
|
||||
|
||||
// DOCSTART 2
|
||||
object RECEIVING : ProgressTracker.Step("Waiting for seller trading info")
|
||||
object VERIFYING : ProgressTracker.Step("Verifying seller assets")
|
||||
object SIGNING : ProgressTracker.Step("Generating and signing transaction proposal")
|
||||
object SWAPPING_SIGNATURES : ProgressTracker.Step("Swapping signatures with the seller")
|
||||
object SENDING_SIGNATURES : ProgressTracker.Step("Sending signatures to the seller")
|
||||
object WAITING_FOR_TX : ProgressTracker.Step("Waiting for the transaction to finalise.")
|
||||
|
||||
override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SWAPPING_SIGNATURES)
|
||||
override val progressTracker = ProgressTracker(RECEIVING, VERIFYING, SIGNING, SENDING_SIGNATURES, WAITING_FOR_TX)
|
||||
// DOCEND 2
|
||||
|
||||
// DOCSTART 1
|
||||
@Suspendable
|
||||
override fun call(): SignedTransaction {
|
||||
// Wait for a trade request to come in from the other party.
|
||||
progressTracker.currentStep = RECEIVING
|
||||
val tradeRequest = receiveAndValidateTradeRequest()
|
||||
|
||||
// Put together a proposed transaction that performs the trade, and sign it.
|
||||
progressTracker.currentStep = SIGNING
|
||||
val (ptx, cashSigningPubKeys) = assembleSharedTX(tradeRequest)
|
||||
val stx = signWithOurKeys(cashSigningPubKeys, ptx)
|
||||
|
||||
val signatures = swapSignaturesWithSeller(stx)
|
||||
// Send the signed transaction to the seller, who must then sign it themselves and commit
|
||||
// it to the ledger by sending it to the notary.
|
||||
progressTracker.currentStep = SENDING_SIGNATURES
|
||||
send(otherParty, stx)
|
||||
|
||||
logger.trace { "Got signatures from seller, verifying ... " }
|
||||
|
||||
val fullySigned = stx + signatures.sellerSig + signatures.notarySig
|
||||
fullySigned.verifySignatures()
|
||||
|
||||
logger.trace { "Signatures received are valid. Trade complete! :-)" }
|
||||
return fullySigned
|
||||
// Wait for the finished, notarised transaction to arrive in our transaction store.
|
||||
progressTracker.currentStep = WAITING_FOR_TX
|
||||
return waitForLedgerCommit(stx.id)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun receiveAndValidateTradeRequest(): SellerTradeInfo {
|
||||
progressTracker.currentStep = RECEIVING
|
||||
// Wait for a trade request to come in from the other side
|
||||
val maybeTradeRequest = receive<SellerTradeInfo>(otherParty)
|
||||
|
||||
progressTracker.currentStep = VERIFYING
|
||||
@ -216,24 +184,14 @@ object TwoPartyTradeFlow {
|
||||
if (!typeToBuy.isInstance(asset))
|
||||
throw AssetMismatchException(typeToBuy.name, assetTypeName)
|
||||
|
||||
// Check the transaction that contains the state which is being resolved.
|
||||
// We only have a hash here, so if we don't know it already, we have to ask for it.
|
||||
// Check that the state being sold to us is in a valid chain of transactions, i.e. that the
|
||||
// seller has a valid chain of custody proving that they own the thing they're selling.
|
||||
subFlow(ResolveTransactionsFlow(setOf(it.assetForSale.ref.txhash), otherParty))
|
||||
|
||||
return it
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun swapSignaturesWithSeller(stx: SignedTransaction): SignaturesFromSeller {
|
||||
progressTracker.currentStep = SWAPPING_SIGNATURES
|
||||
logger.trace { "Sending partially signed transaction to seller" }
|
||||
|
||||
// TODO: Protect against the seller terminating here and leaving us in the lurch without the final tx.
|
||||
|
||||
return sendAndReceive<SignaturesFromSeller>(otherParty, stx).unwrap { it }
|
||||
}
|
||||
|
||||
private fun signWithOurKeys(cashSigningPubKeys: List<CompositeKey>, ptx: TransactionBuilder): SignedTransaction {
|
||||
// Now sign the transaction with whatever keys we need to move the cash.
|
||||
for (publicKey in cashSigningPubKeys.keys) {
|
||||
|
@ -52,9 +52,6 @@ sourceSets {
|
||||
|
||||
dependencies {
|
||||
compile project(':finance')
|
||||
testCompile project(':test-utils')
|
||||
testCompile project(':client')
|
||||
|
||||
compile "com.google.code.findbugs:jsr305:3.0.1"
|
||||
|
||||
// Log4J: logging framework (with SLF4J bindings)
|
||||
@ -126,8 +123,11 @@ dependencies {
|
||||
// Unit testing helpers.
|
||||
testCompile "junit:junit:$junit_version"
|
||||
testCompile "org.assertj:assertj-core:${assertj_version}"
|
||||
|
||||
testCompile "com.pholser:junit-quickcheck-core:$quickcheck_version"
|
||||
testCompile "com.nhaarman:mockito-kotlin:1.1.0"
|
||||
testCompile project(':test-utils')
|
||||
testCompile project(':client')
|
||||
testCompile project(':core')
|
||||
|
||||
// For H2 database support in persistence
|
||||
compile "com.h2database:h2:1.4.193"
|
||||
@ -156,8 +156,6 @@ dependencies {
|
||||
|
||||
// Integration test helpers
|
||||
integrationTestCompile "junit:junit:$junit_version"
|
||||
|
||||
testCompile "com.nhaarman:mockito-kotlin:1.1.0"
|
||||
}
|
||||
|
||||
task integrationTest(type: Test) {
|
||||
|
@ -15,7 +15,6 @@ import java.util.function.Function
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
object DataVending {
|
||||
|
||||
class Plugin : CordaPluginRegistry() {
|
||||
override val servicePlugins = listOf(Function(::Service))
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.node.services.statemachine.StateMachineManager.FlowSession
|
||||
|
||||
// TODO revisit when Kotlin 1.1 is released and data classes can extend other classes
|
||||
@ -7,14 +8,17 @@ interface FlowIORequest {
|
||||
// This is used to identify where we suspended, in case of message mismatch errors and other things where we
|
||||
// don't have the original stack trace because it's in a suspended fiber.
|
||||
val stackTraceInCaseOfProblems: StackSnapshot
|
||||
}
|
||||
|
||||
interface SessionedFlowIORequest : FlowIORequest {
|
||||
val session: FlowSession
|
||||
}
|
||||
|
||||
interface SendRequest : FlowIORequest {
|
||||
interface SendRequest : SessionedFlowIORequest {
|
||||
val message: SessionMessage
|
||||
}
|
||||
|
||||
interface ReceiveRequest<T : SessionMessage> : FlowIORequest {
|
||||
interface ReceiveRequest<T : SessionMessage> : SessionedFlowIORequest {
|
||||
val receiveType: Class<T>
|
||||
}
|
||||
|
||||
@ -36,4 +40,9 @@ data class SendOnly(override val session: FlowSession, override val message: Ses
|
||||
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
|
||||
}
|
||||
|
||||
data class WaitForLedgerCommit(val hash: SecureHash, val fiber: FlowStateMachineImpl<*>) : FlowIORequest {
|
||||
@Transient
|
||||
override val stackTraceInCaseOfProblems: StackSnapshot = StackSnapshot()
|
||||
}
|
||||
|
||||
class StackSnapshot : Throwable("This is a stack trace to help identify the source of the underlying problem")
|
||||
|
@ -7,11 +7,13 @@ import co.paralleluniverse.strands.Strand
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowStateMachine
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.random63BitValue
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.UntrustworthyData
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
@ -72,7 +74,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
}
|
||||
|
||||
// This state IS serialised, as we need it to know what the fiber is waiting for.
|
||||
internal val openSessions = HashMap<Pair<FlowLogic<*>, Party>, FlowSession>()
|
||||
internal var waitingForLedgerCommitOf: SecureHash? = null
|
||||
|
||||
init {
|
||||
logic.stateMachine = this
|
||||
@ -172,6 +176,16 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
}
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun waitForLedgerCommit(hash: SecureHash, sessionFlow: FlowLogic<*>): SignedTransaction {
|
||||
waitingForLedgerCommitOf = hash
|
||||
logger.info("Waiting for transaction $hash to commit")
|
||||
suspend(WaitForLedgerCommit(hash, sessionFlow.stateMachine as FlowStateMachineImpl<*>))
|
||||
logger.info("Transaction $hash has committed to the ledger, resuming")
|
||||
val stx = serviceHub.storageService.validatedTransactions.getTransaction(hash)
|
||||
return stx ?: throw IllegalStateException("We were resumed after waiting for $hash but it wasn't found in our local storage")
|
||||
}
|
||||
|
||||
private fun createSessionData(session: FlowSession, payload: Any): SessionData {
|
||||
val sessionState = session.state
|
||||
val peerSessionId = when (sessionState) {
|
||||
@ -266,10 +280,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
|
||||
@Suspendable
|
||||
private fun suspend(ioRequest: FlowIORequest) {
|
||||
// we have to pass the Thread local Transaction across via a transient field as the Fiber Park swaps them out.
|
||||
// We have to pass the thread local database transaction across via a transient field as the fiber park
|
||||
// swaps them out.
|
||||
txTrampoline = TransactionManager.currentOrNull()
|
||||
StrandLocalTransactionManager.setThreadLocalTx(null)
|
||||
ioRequest.session.waitingForResponse = (ioRequest is ReceiveRequest<*>)
|
||||
if (ioRequest is SessionedFlowIORequest)
|
||||
ioRequest.session.waitingForResponse = (ioRequest is ReceiveRequest<*>)
|
||||
|
||||
var exceptionDuringSuspend: Throwable? = null
|
||||
parkAndSerialize { fiber, serializer ->
|
||||
|
@ -6,11 +6,13 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
||||
import co.paralleluniverse.strands.Strand
|
||||
import com.codahale.metrics.Gauge
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.google.common.collect.HashMultimap
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import kotlinx.support.jdk8.collections.removeIf
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.bufferUntilSubscribed
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.commonName
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
@ -62,7 +64,7 @@ import javax.annotation.concurrent.ThreadSafe
|
||||
* TODO: Timeouts
|
||||
* TODO: Surfacing of exceptions via an API and/or management UI
|
||||
* TODO: Ability to control checkpointing explicitly, for cases where you know replaying a message can't hurt
|
||||
* TODO: Implement stub/skel classes that provide a basic RPC framework on top of this.
|
||||
* TODO: Don't store all active flows in memory, load from the database on demand.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
@ -89,15 +91,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
|
||||
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
|
||||
// property.
|
||||
private val mutex = ThreadBox(object {
|
||||
private class InnerState {
|
||||
var started = false
|
||||
val stateMachines = LinkedHashMap<FlowStateMachineImpl<*>, Checkpoint>()
|
||||
val changesPublisher = PublishSubject.create<Change>()
|
||||
val changesPublisher = PublishSubject.create<Change>()!!
|
||||
val fibersWaitingForLedgerCommit = HashMultimap.create<SecureHash, FlowStateMachineImpl<*>>()!!
|
||||
|
||||
fun notifyChangeObservers(fiber: FlowStateMachineImpl<*>, addOrRemove: AddOrRemove) {
|
||||
changesPublisher.bufferUntilDatabaseCommit().onNext(Change(fiber.logic, addOrRemove, fiber.id))
|
||||
}
|
||||
})
|
||||
}
|
||||
private val mutex = ThreadBox(InnerState())
|
||||
|
||||
// True if we're shutting down, so don't resume anything.
|
||||
@Volatile private var stopping = false
|
||||
@ -152,9 +156,27 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
|
||||
fun start() {
|
||||
restoreFibersFromCheckpoints()
|
||||
listenToLedgerTransactions()
|
||||
serviceHub.networkMapCache.mapServiceRegistered.then(executor) { resumeRestoredFibers() }
|
||||
}
|
||||
|
||||
private fun listenToLedgerTransactions() {
|
||||
// Observe the stream of committed, validated transactions and resume fibers that are waiting for them.
|
||||
serviceHub.storageService.validatedTransactions.updates.subscribe { stx ->
|
||||
val hash = stx.id
|
||||
val flows: Set<FlowStateMachineImpl<*>> = mutex.locked { fibersWaitingForLedgerCommit.removeAll(hash) }
|
||||
if (flows.isNotEmpty()) {
|
||||
executor.executeASAP {
|
||||
for (flow in flows) {
|
||||
logger.info("Resuming ${flow.id} because it was waiting for tx ${flow.waitingForLedgerCommitOf!!} which is now committed.")
|
||||
flow.waitingForLedgerCommitOf = null
|
||||
resumeFiber(flow)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun decrementLiveFibers() {
|
||||
liveFibers.countDown()
|
||||
}
|
||||
@ -217,8 +239,20 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
|
||||
private fun resumeRestoredFiber(fiber: FlowStateMachineImpl<*>) {
|
||||
fiber.openSessions.values.forEach { openSessions[it.ourSessionId] = it }
|
||||
val waitingForHash = fiber.waitingForLedgerCommitOf
|
||||
if (fiber.openSessions.values.any { it.waitingForResponse }) {
|
||||
fiber.logger.info("Restored, pending on receive")
|
||||
} else if (waitingForHash != null) {
|
||||
val stx = databaseTransaction(database) {
|
||||
serviceHub.storageService.validatedTransactions.getTransaction(waitingForHash)
|
||||
}
|
||||
if (stx != null) {
|
||||
fiber.logger.info("Resuming fiber as tx $waitingForHash has committed.")
|
||||
resumeFiber(fiber)
|
||||
} else {
|
||||
fiber.logger.info("Restored, pending on ledger commit of $waitingForHash")
|
||||
mutex.locked { fibersWaitingForLedgerCommit.put(waitingForHash, fiber) }
|
||||
}
|
||||
} else {
|
||||
resumeFiber(fiber)
|
||||
}
|
||||
@ -424,6 +458,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
* Note that you must be on the [executor] thread.
|
||||
*/
|
||||
fun <T> add(logic: FlowLogic<T>): FlowStateMachine<T> {
|
||||
// TODO: Check that logic has @Suspendable on its call method.
|
||||
executor.checkOnThread()
|
||||
// We swap out the parent transaction context as using this frequently leads to a deadlock as we wait
|
||||
// on the flow completion future inside that context. The problem is that any progress checkpoints are
|
||||
@ -457,8 +492,10 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
private fun resumeFiber(fiber: FlowStateMachineImpl<*>) {
|
||||
// Avoid race condition when setting stopping to true and then checking liveFibers
|
||||
incrementLiveFibers()
|
||||
if (!stopping) executor.executeASAP {
|
||||
fiber.resume(scheduler)
|
||||
if (!stopping) {
|
||||
executor.executeASAP {
|
||||
fiber.resume(scheduler)
|
||||
}
|
||||
} else {
|
||||
fiber.logger.debug("Not resuming as SMM is stopping.")
|
||||
decrementLiveFibers()
|
||||
@ -466,6 +503,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
}
|
||||
|
||||
private fun processIORequest(ioRequest: FlowIORequest) {
|
||||
executor.checkOnThread()
|
||||
if (ioRequest is SendRequest) {
|
||||
if (ioRequest.message is SessionInit) {
|
||||
openSessions[ioRequest.session.ourSessionId] = ioRequest.session
|
||||
@ -475,6 +513,24 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
||||
// We sent a message, but don't expect a response, so re-enter the continuation to let it keep going.
|
||||
resumeFiber(ioRequest.session.fiber)
|
||||
}
|
||||
} else if (ioRequest is WaitForLedgerCommit) {
|
||||
// Is it already committed?
|
||||
val stx = databaseTransaction(database) {
|
||||
serviceHub.storageService.validatedTransactions.getTransaction(ioRequest.hash)
|
||||
}
|
||||
if (stx != null) {
|
||||
resumeFiber(ioRequest.fiber)
|
||||
} else {
|
||||
// No, then register to wait.
|
||||
//
|
||||
// We assume this code runs on the server thread, which is the only place transactions are committed
|
||||
// currently. When we liberalise our threading somewhat, handing of wait requests will need to be
|
||||
// reworked to make the wait atomic in another way. Otherwise there is a race between checking the
|
||||
// database and updating the waiting list.
|
||||
mutex.locked {
|
||||
fibersWaitingForLedgerCommit[ioRequest.hash] += ioRequest.fiber
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -195,13 +195,6 @@ class NodeVaultService(private val services: ServiceHub) : SingletonSerializeAsT
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a transaction that moves an amount of currency to the given pubkey.
|
||||
*
|
||||
* @param onlyFromParties if non-null, the asset states will be filtered to only include those issued by the set
|
||||
* of given parties. This can be useful if the party you're trying to pay has expectations
|
||||
* about which type of asset claims they are willing to accept.
|
||||
*/
|
||||
override fun generateSpend(tx: TransactionBuilder,
|
||||
amount: Amount<Currency>,
|
||||
to: CompositeKey,
|
||||
|
@ -5,8 +5,10 @@ import co.paralleluniverse.fibers.Suspendable
|
||||
import co.paralleluniverse.strands.Strand.UncaughtExceptionHandler
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.contracts.DOLLARS
|
||||
import net.corda.core.contracts.DummyState
|
||||
import net.corda.core.contracts.issuedBy
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.SecureHash
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
@ -19,8 +21,11 @@ import net.corda.core.random63BitValue
|
||||
import net.corda.core.rootCause
|
||||
import net.corda.core.serialization.OpaqueBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.flows.CashCommand
|
||||
import net.corda.flows.CashFlow
|
||||
import net.corda.flows.FinalityFlow
|
||||
import net.corda.flows.NotaryFlow
|
||||
import net.corda.node.services.persistence.checkpoints
|
||||
import net.corda.node.services.transactions.ValidatingNotaryService
|
||||
@ -483,9 +488,26 @@ class StateMachineManagerTests {
|
||||
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello")
|
||||
}
|
||||
|
||||
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(
|
||||
networkMapNode: MockNode? = null): P {
|
||||
disableDBCloseOnStop() //Handover DB to new node copy
|
||||
@Test
|
||||
fun `wait for transaction`() {
|
||||
val ptx = TransactionBuilder(notary = notary1.info.notaryIdentity)
|
||||
ptx.addOutputState(DummyState())
|
||||
ptx.signWith(node1.services.legalIdentityKey)
|
||||
val stx = ptx.toSignedTransaction()
|
||||
|
||||
val future1 = node2.services.startFlow(WaitingFlows.Waiter(stx.id)).resultFuture
|
||||
val future2 = node1.services.startFlow(WaitingFlows.Committer(stx, node2.info.legalIdentity)).resultFuture
|
||||
net.runNetwork()
|
||||
future1.getOrThrow()
|
||||
future2.getOrThrow()
|
||||
}
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
//region Helpers
|
||||
|
||||
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(networkMapNode: MockNode? = null): P {
|
||||
disableDBCloseOnStop() // Handover DB to new node copy
|
||||
stop()
|
||||
val newNode = mockNet.createNode(networkMapNode?.info?.address, id, advertisedServices = *advertisedServices.toTypedArray())
|
||||
newNode.acceptableLiveFiberCountOnStop = 1
|
||||
@ -611,4 +633,22 @@ class StateMachineManagerTests {
|
||||
override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message
|
||||
override fun hashCode(): Int = message?.hashCode() ?: 31
|
||||
}
|
||||
|
||||
private object WaitingFlows {
|
||||
class Waiter(private val hash: SecureHash) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
waitForLedgerCommit(hash)
|
||||
}
|
||||
}
|
||||
|
||||
class Committer(private val stx: SignedTransaction, private val otherParty: Party) : FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
subFlow(FinalityFlow(stx, setOf(otherParty)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//endregion Helpers
|
||||
}
|
||||
|
@ -55,10 +55,7 @@ class SellerFlow(val otherParty: Party,
|
||||
amount,
|
||||
cpOwnerKey,
|
||||
progressTracker.getChildProgressTracker(TRADING)!!)
|
||||
val tradeTX: SignedTransaction = subFlow(seller, shareParentSessions = true)
|
||||
serviceHub.recordTransactions(listOf(tradeTX))
|
||||
|
||||
return tradeTX
|
||||
return subFlow(seller, shareParentSessions = true)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
|
Loading…
Reference in New Issue
Block a user