Cleanup: improve api docs on FlowLogic, make it clearer when stuff is considered internal, rename an obscurely named field.

This commit is contained in:
Mike Hearn 2017-01-05 15:07:22 +01:00
parent d26c44d08c
commit ef6e9786a8
4 changed files with 119 additions and 55 deletions

View File

@ -26,13 +26,12 @@ import rx.Observable
* it to the [subFlow] method. It will return the result of that flow when it completes. * it to the [subFlow] method. It will return the result of that flow when it completes.
*/ */
abstract class FlowLogic<out T> { abstract class FlowLogic<out T> {
/** Reference to the [FlowStateMachine] instance that is the top level controller for the entire flow. */
lateinit var stateMachine: FlowStateMachine<*>
/** This is where you should log things to. */ /** This is where you should log things to. */
val logger: Logger get() = stateMachine.logger val logger: Logger get() = stateMachine.logger
/** Returns a wrapped [UUID] object that identifies this state machine run (i.e. subflows have the same identifier as their parents). */
val runId: StateMachineRunId get() = sessionFlow.stateMachine.id
/** /**
* Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is * Provides access to big, heavy classes that may be reconstructed from time to time, e.g. across restarts. It is
* only available once the flow has started, which means it cannnot be accessed in the constructor. Either * only available once the flow has started, which means it cannnot be accessed in the constructor. Either
@ -40,8 +39,6 @@ abstract class FlowLogic<out T> {
*/ */
val serviceHub: ServiceHub get() = stateMachine.serviceHub val serviceHub: ServiceHub get() = stateMachine.serviceHub
private var sessionFlow: FlowLogic<*> = this
/** /**
* Return the marker [Class] which [party] has used to register the counterparty flow that is to execute on the * Return the marker [Class] which [party] has used to register the counterparty flow that is to execute on the
* other side. The default implementation returns the class object of this FlowLogic, but any [Class] instance * other side. The default implementation returns the class object of this FlowLogic, but any [Class] instance
@ -49,31 +46,73 @@ abstract class FlowLogic<out T> {
*/ */
open fun getCounterpartyMarker(party: Party): Class<*> = javaClass open fun getCounterpartyMarker(party: Party): Class<*> = javaClass
// Kotlin helpers that allow the use of generic types. /**
inline fun <reified T : Any> sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData<T> { * Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
return sendAndReceive(T::class.java, otherParty, payload) * is received, which must be of the given [R] type.
} *
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
*
* @returns an [UntrustworthyData] wrapper around the received object.
*/
inline fun <reified R : Any> sendAndReceive(otherParty: Party, payload: Any) = sendAndReceive(R::class.java, otherParty, payload)
/**
* Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
* is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the data
* should not be trusted until it's been thoroughly verified for consistency and that all expectations are
* satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
*
* Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
* use this when you expect to do a message swap than do use [send] and then [receive] in turn.
*
* @returns an [UntrustworthyData] wrapper around the received object.
*/
@Suspendable @Suspendable
fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any): UntrustworthyData<T> { open fun <T : Any> sendAndReceive(receiveType: Class<T>, otherParty: Party, payload: Any): UntrustworthyData<T> {
return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow) return stateMachine.sendAndReceive(receiveType, otherParty, payload, sessionFlow)
} }
inline fun <reified T : Any> receive(otherParty: Party): UntrustworthyData<T> = receive(T::class.java, otherParty) /**
* Suspends until the specified [otherParty] sends us a message of type [R].
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*/
inline fun <reified R : Any> receive(otherParty: Party): UntrustworthyData<R> = receive(R::class.java, otherParty)
/**
* Suspends until the specified [otherParty] sends us a message of type [receiveType].
*
* Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
* verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
* corrupted data in order to exploit your code.
*/
@Suspendable @Suspendable
fun <T : Any> receive(receiveType: Class<T>, otherParty: Party): UntrustworthyData<T> { open fun <T : Any> receive(receiveType: Class<T>, otherParty: Party): UntrustworthyData<T> {
return stateMachine.receive(receiveType, otherParty, sessionFlow) return stateMachine.receive(receiveType, otherParty, sessionFlow)
} }
/**
* Queues the given [payload] for sending to the [otherParty] and continues without suspending.
*
* Note that the other party may receive the message at some arbitrary later point or not at all: if [otherParty]
* is offline then message delivery will be retried until it comes back or until the message is older than the
* network's event horizon time.
*/
@Suspendable @Suspendable
fun send(otherParty: Party, payload: Any) { open fun send(otherParty: Party, payload: Any) = stateMachine.send(otherParty, payload, sessionFlow)
stateMachine.send(otherParty, payload, sessionFlow)
}
/** /**
* Invokes the given subflow by simply passing through this [FlowLogic]s reference to the * Invokes the given subflow. This function returns once the subflow completes successfully with the result
* [FlowStateMachine] and then calling the [call] method. * returned by that subflows [call] method. If the subflow has a progress tracker, it is attached to the
* current step in this flow's progress tracker.
*
* @param shareParentSessions In certain situations the need arises to use the same sessions the parent flow has * @param shareParentSessions In certain situations the need arises to use the same sessions the parent flow has
* already established. However this also prevents the subflow from creating new sessions with those parties. * already established. However this also prevents the subflow from creating new sessions with those parties.
* For this reason the default value is false. * For this reason the default value is false.
@ -81,7 +120,8 @@ abstract class FlowLogic<out T> {
// TODO Rethink the default value for shareParentSessions // TODO Rethink the default value for shareParentSessions
// TODO shareParentSessions is a bit too low-level and perhaps can be expresed in a better way // TODO shareParentSessions is a bit too low-level and perhaps can be expresed in a better way
@Suspendable @Suspendable
fun <R> subFlow(subLogic: FlowLogic<R>, shareParentSessions: Boolean = false): R { @JvmOverloads
open fun <R> subFlow(subLogic: FlowLogic<R>, shareParentSessions: Boolean = false): R {
subLogic.stateMachine = stateMachine subLogic.stateMachine = stateMachine
maybeWireUpProgressTracking(subLogic) maybeWireUpProgressTracking(subLogic)
if (shareParentSessions) { if (shareParentSessions) {
@ -93,6 +133,52 @@ abstract class FlowLogic<out T> {
return result return result
} }
/**
* Override this to provide a [ProgressTracker]. If one is provided and stepped, the framework will do something
* helpful with the progress reports. If this flow is invoked as a subflow of another, then the
* tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track
* progress.
*
* Note that this has to return a tracker before the flow is invoked. You can't change your mind half way
* through.
*/
open val progressTracker: ProgressTracker? = null
/**
* This is where you fill out your business logic. The returned object will usually be ignored, but can be
* helpful if this flow is meant to be used as a subflow.
*/
@Suspendable
abstract fun call(): T
/**
* Returns a pair of the current progress step, as a string, and an observable of stringified changes to the
* [progressTracker].
*
* @return Returns null if this flow has no progress tracker.
*/
fun track(): Pair<String, Observable<String>>? {
// TODO this is not threadsafe, needs an atomic get-step-and-subscribe
return progressTracker?.let {
Pair(it.currentStep.toString(), it.changes.map { it.toString() })
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////
private var _stateMachine: FlowStateMachine<*>? = null
/**
* Internal only. Reference to the [Fiber] instance that is the top level controller for the entire flow. When
* inside a flow this is equivalent to [Strand.currentStrand]. This is public only because it must be accessed
* across module boundaries.
*/
var stateMachine: FlowStateMachine<*>
get() = _stateMachine ?: throw IllegalStateException("This can only be done after the flow has been started.")
set(value) { _stateMachine = value }
// This points to the outermost flow and is changed when a subflow is invoked.
private var sessionFlow: FlowLogic<*> = this
private fun maybeWireUpProgressTracking(subLogic: FlowLogic<*>) { private fun maybeWireUpProgressTracking(subLogic: FlowLogic<*>) {
val ours = progressTracker val ours = progressTracker
@ -105,26 +191,4 @@ abstract class FlowLogic<out T> {
ours.setChildProgressTracker(ours.currentStep, theirs) ours.setChildProgressTracker(ours.currentStep, theirs)
} }
} }
/**
* Override this to provide a [ProgressTracker]. If one is provided and stepped, the framework will do something
* helpful with the progress reports. If this flow is invoked as a sub-flow of another, then the
* tracker will be made a child of the current step in the parent. If it's null, this flow doesn't track
* progress.
*
* Note that this has to return a tracker before the flow is invoked. You can't change your mind half way
* through.
*/
open val progressTracker: ProgressTracker? = null
/** This is where you fill out your business logic. */
@Suspendable
abstract fun call(): T
// TODO this is not threadsafe, needs an atomic get-step-and-subscribe
fun track(): Pair<String, Observable<String>>? {
return progressTracker?.let {
Pair(it.currentStep.toString(), it.changes.map { it.toString() })
}
}
} }

View File

@ -8,8 +8,11 @@ import net.corda.core.utilities.UntrustworthyData
import org.slf4j.Logger import org.slf4j.Logger
import java.util.* import java.util.*
/**
* A unique identifier for a single state machine run, valid across node restarts. Note that a single run always
* has at least one flow, but that flow may also invoke sub-flows: they all share the same run id.
*/
data class StateMachineRunId private constructor(val uuid: UUID) { data class StateMachineRunId private constructor(val uuid: UUID) {
companion object { companion object {
fun createRandom(): StateMachineRunId = StateMachineRunId(UUID.randomUUID()) fun createRandom(): StateMachineRunId = StateMachineRunId(UUID.randomUUID())
fun wrap(uuid: UUID): StateMachineRunId = StateMachineRunId(uuid) fun wrap(uuid: UUID): StateMachineRunId = StateMachineRunId(uuid)
@ -18,14 +21,7 @@ data class StateMachineRunId private constructor(val uuid: UUID) {
override fun toString(): String = "[$uuid]" override fun toString(): String = "[$uuid]"
} }
/** /** This is an internal interface that is implemented by code in the node module. You should look at [FlowLogic]. */
* A FlowStateMachine instance is a suspendable fiber that delegates all actual logic to a [FlowLogic] instance.
* For any given flow there is only one PSM, even if that flow invokes subflows.
*
* These classes are created by the [StateMachineManager] when a new flow is started at the topmost level. If
* a flow invokes a sub-flow, then it will pass along the PSM to the child. The call method of the topmost
* logic element gets to return the value that the entire state machine resolves to.
*/
interface FlowStateMachine<R> { interface FlowStateMachine<R> {
@Suspendable @Suspendable
fun <T : Any> sendAndReceive(receiveType: Class<T>, fun <T : Any> sendAndReceive(receiveType: Class<T>,
@ -41,10 +37,7 @@ interface FlowStateMachine<R> {
val serviceHub: ServiceHub val serviceHub: ServiceHub
val logger: Logger val logger: Logger
/** Unique ID for this machine run, valid across restarts */
val id: StateMachineRunId val id: StateMachineRunId
/** This future will complete when the call method returns. */
val resultFuture: ListenableFuture<R> val resultFuture: ListenableFuture<R>
} }

View File

@ -121,8 +121,6 @@ class CashFlow(val command: CashCommand, override val progressTracker: ProgressT
"Cash issuance completed" "Cash issuance completed"
) )
} }
} }
/** /**

View File

@ -6,14 +6,23 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.Strand
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.StateRef
import net.corda.core.contracts.TransactionState
import net.corda.core.crypto.Party import net.corda.core.crypto.Party
import net.corda.core.flows.FlowException import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.node.ServiceHub
import net.corda.core.random63BitValue import net.corda.core.random63BitValue
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.UntrustworthyData import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.trace import net.corda.core.utilities.trace
import net.corda.flows.BroadcastTransactionFlow
import net.corda.flows.FinalityFlow
import net.corda.flows.ResolveTransactionsFlow
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.statemachine.StateMachineManager.FlowSession import net.corda.node.services.statemachine.StateMachineManager.FlowSession
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState