diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 90bc4499b6..01e92ec092 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -26,13 +26,12 @@ import rx.Observable * it to the [subFlow] method. It will return the result of that flow when it completes. */ abstract class FlowLogic<out T> { - - /** Reference to the [FlowStateMachine] instance that is the top level controller for the entire flow. */ - lateinit var stateMachine: FlowStateMachine<*> - /** This is where you should log things to. */ val logger: Logger get() = stateMachine.logger + /** Returns a wrapped [UUID] object that identifies this state machine run (i.e. subflows have the same identifier as their parents). */ + val runId: StateMachineRunId get() = sessionFlow.stateMachine.id + /** * Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is * only available once the flow has started, which means it cannnot be accessed in the constructor. Either @@ -40,8 +39,6 @@ abstract class FlowLogic<out T> { */ val serviceHub: ServiceHub get() = stateMachine.serviceHub - private var sessionFlow: FlowLogic<*> = this - /** * Return the marker [Class] which [party] has used to register the counterparty flow that is to execute on the * other side. The default implementation returns the class object of this FlowLogic, but any [Class] instance @@ -49,31 +46,73 @@ abstract class FlowLogic<out T> { */ open fun getCounterpartyMarker(party: Party): Class<*> = javaClass - // Kotlin helpers that allow the use of generic types. - inline fun <reified T : Any> sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData<T> { - return sendAndReceive(T::class.java, otherParty, payload) - } + /** + * Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response + * is received, which must be of the given [R] type. + * + * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly + * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly + * corrupted data in order to exploit your code. + * + * Note that this function is not just a simple send+receive pair: it is more efficient and more correct to + * use this when you expect to do a message swap than do use [send] and then [receive] in turn. + * + * @returns an [UntrustworthyData] wrapper around the received object. + */ + inline fun <reified R : Any> sendAndReceive(otherParty: Party, payload: Any) = sendAndReceive(R::class.java, otherParty, payload) + /** + * Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response + * is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the data + * should not be trusted until it's been thoroughly verified for consistency and that all expectations are + * satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code. + * + * Note that this function is not just a simple send+receive pair: it is more efficient and more correct to + * use this when you expect to do a message swap than do use [send] and then [receive] in turn. + * + * @returns an [UntrustworthyData] wrapper around the received object. + */ @Suspendable - fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any): UntrustworthyData<T> { + open fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any): UntrustworthyData<T> { return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow) } - inline fun <reified T : Any> receive(otherParty: Party): UntrustworthyData<T> = receive(T::class.java, otherParty) + /** + * Suspends until the specified [otherParty] sends us a message of type [R]. + * + * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly + * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly + * corrupted data in order to exploit your code. + */ + inline fun <reified R : Any> receive(otherParty: Party): UntrustworthyData<R> = receive(R::class.java, otherParty) + /** + * Suspends until the specified [otherParty] sends us a message of type [receiveType]. + * + * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly + * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly + * corrupted data in order to exploit your code. + */ @Suspendable - fun <T : Any> receive(receiveType: Class<T>, otherParty: Party): UntrustworthyData<T> { + open fun <T : Any> receive(receiveType: Class<T>, otherParty: Party): UntrustworthyData<T> { return stateMachine.receive(receiveType, otherParty, sessionFlow) } + /** + * Queues the given [payload] for sending to the [otherParty] and continues without suspending. + * + * Note that the other party may receive the message at some arbitrary later point or not at all: if [otherParty] + * is offline then message delivery will be retried until it comes back or until the message is older than the + * network's event horizon time. + */ @Suspendable - fun send(otherParty: Party, payload: Any) { - stateMachine.send(otherParty, payload, sessionFlow) - } + open fun send(otherParty: Party, payload: Any) = stateMachine.send(otherParty, payload, sessionFlow) /** - * Invokes the given subflow by simply passing through this [FlowLogic]s reference to the - * [FlowStateMachine] and then calling the [call] method. + * Invokes the given subflow. This function returns once the subflow completes successfully with the result + * returned by that subflows [call] method. If the subflow has a progress tracker, it is attached to the + * current step in this flow's progress tracker. + * * @param shareParentSessions In certain situations the need arises to use the same sessions the parent flow has * already established. However this also prevents the subflow from creating new sessions with those parties. * For this reason the default value is false. @@ -81,7 +120,8 @@ abstract class FlowLogic<out T> { // TODO Rethink the default value for shareParentSessions // TODO shareParentSessions is a bit too low-level and perhaps can be expresed in a better way @Suspendable - fun <R> subFlow(subLogic: FlowLogic<R>, shareParentSessions: Boolean = false): R { + @JvmOverloads + open fun <R> subFlow(subLogic: FlowLogic<R>, shareParentSessions: Boolean = false): R { subLogic.stateMachine = stateMachine maybeWireUpProgressTracking(subLogic) if (shareParentSessions) { @@ -93,6 +133,52 @@ abstract class FlowLogic<out T> { return result } + /** + * Override this to provide a [ProgressTracker]. If one is provided and stepped, the framework will do something + * helpful with the progress reports. If this flow is invoked as a subflow of another, then the + * tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track + * progress. + * + * Note that this has to return a tracker before the flow is invoked. You can't change your mind half way + * through. + */ + open val progressTracker: ProgressTracker? = null + + /** + * This is where you fill out your business logic. The returned object will usually be ignored, but can be + * helpful if this flow is meant to be used as a subflow. + */ + @Suspendable + abstract fun call(): T + + /** + * Returns a pair of the current progress step, as a string, and an observable of stringified changes to the + * [progressTracker]. + * + * @return Returns null if this flow has no progress tracker. + */ + fun track(): Pair<String, Observable<String>>? { + // TODO this is not threadsafe, needs an atomic get-step-and-subscribe + return progressTracker?.let { + Pair(it.currentStep.toString(), it.changes.map { it.toString() }) + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + + private var _stateMachine: FlowStateMachine<*>? = null + /** + * Internal only. Reference to the [Fiber] instance that is the top level controller for the entire flow. When + * inside a flow this is equivalent to [Strand.currentStrand]. This is public only because it must be accessed + * across module boundaries. + */ + var stateMachine: FlowStateMachine<*> + get() = _stateMachine ?: throw IllegalStateException("This can only be done after the flow has been started.") + set(value) { _stateMachine = value } + + // This points to the outermost flow and is changed when a subflow is invoked. + private var sessionFlow: FlowLogic<*> = this + private fun maybeWireUpProgressTracking(subLogic: FlowLogic<*>) { val ours = progressTracker @@ -105,26 +191,4 @@ abstract class FlowLogic<out T> { ours.setChildProgressTracker(ours.currentStep, theirs) } } - - /** - * Override this to provide a [ProgressTracker]. If one is provided and stepped, the framework will do something - * helpful with the progress reports. If this flow is invoked as a sub-flow of another, then the - * tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track - * progress. - * - * Note that this has to return a tracker before the flow is invoked. You can't change your mind half way - * through. - */ - open val progressTracker: ProgressTracker? = null - - /** This is where you fill out your business logic. */ - @Suspendable - abstract fun call(): T - - // TODO this is not threadsafe, needs an atomic get-step-and-subscribe - fun track(): Pair<String, Observable<String>>? { - return progressTracker?.let { - Pair(it.currentStep.toString(), it.changes.map { it.toString() }) - } - } } diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt index f6b691c5c5..fa290bfab8 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowStateMachine.kt @@ -8,8 +8,11 @@ import net.corda.core.utilities.UntrustworthyData import org.slf4j.Logger import java.util.* +/** + * A unique identifier for a single state machine run, valid across node restarts. Note that a single run always + * has at least one flow, but that flow may also invoke sub-flows: they all share the same run id. + */ data class StateMachineRunId private constructor(val uuid: UUID) { - companion object { fun createRandom(): StateMachineRunId = StateMachineRunId(UUID.randomUUID()) fun wrap(uuid: UUID): StateMachineRunId = StateMachineRunId(uuid) @@ -18,14 +21,7 @@ data class StateMachineRunId private constructor(val uuid: UUID) { override fun toString(): String = "[$uuid]" } -/** - * A FlowStateMachine instance is a suspendable fiber that delegates all actual logic to a [FlowLogic] instance. - * For any given flow there is only one PSM, even if that flow invokes subflows. - * - * These classes are created by the [StateMachineManager] when a new flow is started at the topmost level. If - * a flow invokes a sub-flow, then it will pass along the PSM to the child. The call method of the topmost - * logic element gets to return the value that the entire state machine resolves to. - */ +/** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */ interface FlowStateMachine<R> { @Suspendable fun <T : Any> sendAndReceive(receiveType: Class<T>, @@ -41,10 +37,7 @@ interface FlowStateMachine<R> { val serviceHub: ServiceHub val logger: Logger - - /** Unique ID for this machine run, valid across restarts */ val id: StateMachineRunId - /** This future will complete when the call method returns. */ val resultFuture: ListenableFuture<R> } diff --git a/finance/src/main/kotlin/net/corda/flows/CashFlow.kt b/finance/src/main/kotlin/net/corda/flows/CashFlow.kt index 1b30781814..8ac1520f0b 100644 --- a/finance/src/main/kotlin/net/corda/flows/CashFlow.kt +++ b/finance/src/main/kotlin/net/corda/flows/CashFlow.kt @@ -121,8 +121,6 @@ class CashFlow(val command: CashCommand, override val progressTracker: ProgressT "Cash issuance completed" ) } - - } /** diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index a3e55f04b4..24cfb619e2 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -6,14 +6,23 @@ import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.SettableFuture +import net.corda.core.contracts.ContractState +import net.corda.core.contracts.StateRef +import net.corda.core.contracts.TransactionState import net.corda.core.crypto.Party import net.corda.core.flows.FlowException import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.StateMachineRunId +import net.corda.core.node.ServiceHub import net.corda.core.random63BitValue +import net.corda.core.transactions.LedgerTransaction +import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.trace +import net.corda.flows.BroadcastTransactionFlow +import net.corda.flows.FinalityFlow +import net.corda.flows.ResolveTransactionsFlow import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.statemachine.StateMachineManager.FlowSession import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState