CORDA-1942: Improvements to the WithReferencedStatesFlow API (#4464)

* Internal classe were being exposed and have been hidden
* The single flowLogic instance has been changed into a lambda producer. Flows may not be written to be executed twice, especially if they hold internal state.
* Added JVM c'tor overloads
This commit is contained in:
Shams Asari 2019-01-02 13:43:04 +00:00 committed by GitHub
parent c205bd2a21
commit b1112dd264
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 19 deletions

View File

@ -4,11 +4,10 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.StateRef import net.corda.core.contracts.StateRef
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.contextLogger
/** /**
* Given a flow which uses reference states, the [WithReferencedStatesFlow] will execute the the flow as a subFlow. * Given a flow which uses reference states, the [WithReferencedStatesFlow] will execute the flow as a subFlow.
* If the flow fails due to a [NotaryError.Conflict] for a reference state, then it will be suspended until the * If the flow fails due to a [NotaryError.Conflict] for a reference state, then WithReferencedStatesFlow will be suspended until the
* state refs for the reference states are consumed. In this case, a consumption means that: * state refs for the reference states are consumed. In this case, a consumption means that:
* *
* 1. the owner of the reference state has updated the state with a valid, notarised transaction * 1. the owner of the reference state has updated the state with a valid, notarised transaction
@ -21,17 +20,16 @@ import net.corda.core.utilities.contextLogger
* reference states. The flow using reference states should include checks to ensure that the reference data is * reference states. The flow using reference states should include checks to ensure that the reference data is
* reasonable, especially if some economics transaction depends upon it. * reasonable, especially if some economics transaction depends upon it.
* *
* @param flowLogic a flow which uses reference states.
* @param progressTracker a progress tracker instance. * @param progressTracker a progress tracker instance.
* @param flowLogicProducer a lambda which creates the [FlowLogic] instance using reference states. This will be executed at least once.
* It is recommended a new [FlowLogic] instance be returned each time.
*/ */
class WithReferencedStatesFlow<T : Any>( class WithReferencedStatesFlow<T : Any> @JvmOverloads constructor(
val flowLogic: FlowLogic<T>, override val progressTracker: ProgressTracker = tracker(),
override val progressTracker: ProgressTracker = WithReferencedStatesFlow.tracker() private val flowLogicProducer: () -> FlowLogic<T>
) : FlowLogic<T>() { ) : FlowLogic<T>() {
companion object { companion object {
val logger = contextLogger()
object ATTEMPT : ProgressTracker.Step("Attempting to run flow which uses reference states.") object ATTEMPT : ProgressTracker.Step("Attempting to run flow which uses reference states.")
object RETRYING : ProgressTracker.Step("Reference states are out of date! Waiting for updated states...") object RETRYING : ProgressTracker.Step("Reference states are out of date! Waiting for updated states...")
object SUCCESS : ProgressTracker.Step("Flow ran successfully.") object SUCCESS : ProgressTracker.Step("Flow ran successfully.")
@ -40,10 +38,10 @@ class WithReferencedStatesFlow<T : Any>(
fun tracker() = ProgressTracker(ATTEMPT, RETRYING, SUCCESS) fun tracker() = ProgressTracker(ATTEMPT, RETRYING, SUCCESS)
} }
private sealed class FlowResult { // This is not a sealed data class as that requires exposing Success and Conflict
data class Success<T : Any>(val value: T) : FlowResult() private interface FlowResult
data class Conflict(val stateRefs: Set<StateRef>) : FlowResult() private data class Success<T : Any>(val value: T) : FlowResult
} private data class Conflict(val stateRefs: Set<StateRef>) : FlowResult
/** /**
* Process the flow result. We don't care about anything other than NotaryExceptions. If it is a * Process the flow result. We don't care about anything other than NotaryExceptions. If it is a
@ -58,13 +56,13 @@ class WithReferencedStatesFlow<T : Any>(
val conflictingReferenceStateRefs = error.consumedStates.filter { val conflictingReferenceStateRefs = error.consumedStates.filter {
it.value.type == StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE it.value.type == StateConsumptionDetails.ConsumedStateType.REFERENCE_INPUT_STATE
}.map { it.key }.toSet() }.map { it.key }.toSet()
FlowResult.Conflict(conflictingReferenceStateRefs) Conflict(conflictingReferenceStateRefs)
} else { } else {
throw result throw result
} }
} }
is FlowException -> throw result is FlowException -> throw result
else -> FlowResult.Success(result) else -> Success(result)
} }
} }
@ -75,6 +73,7 @@ class WithReferencedStatesFlow<T : Any>(
// Loop until the flow successfully completes. We need to // Loop until the flow successfully completes. We need to
// do this because there might be consecutive update races. // do this because there might be consecutive update races.
while (true) { while (true) {
val flowLogic = flowLogicProducer()
// Return a successful flow result or a FlowException. // Return a successful flow result or a FlowException.
logger.info("Attempting to run the supplied flow ${flowLogic.javaClass.canonicalName}.") logger.info("Attempting to run the supplied flow ${flowLogic.javaClass.canonicalName}.")
val result = try { val result = try {
@ -91,12 +90,12 @@ class WithReferencedStatesFlow<T : Any>(
// states have been updated. // states have been updated.
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
when (processedResult) { when (processedResult) {
is FlowResult.Success<*> -> { is Success<*> -> {
logger.info("Flow ${flowLogic.javaClass.canonicalName} completed successfully.") logger.info("Flow ${flowLogic.javaClass.canonicalName} completed successfully.")
progressTracker.currentStep = SUCCESS progressTracker.currentStep = SUCCESS
return uncheckedCast(processedResult.value) return uncheckedCast(processedResult.value)
} }
is FlowResult.Conflict -> { is Conflict -> {
val conflicts = processedResult.stateRefs val conflicts = processedResult.stateRefs
logger.info("Flow ${flowLogic.javaClass.name} failed due to reference state conflicts: $conflicts.") logger.info("Flow ${flowLogic.javaClass.name} failed due to reference state conflicts: $conflicts.")
@ -112,4 +111,4 @@ class WithReferencedStatesFlow<T : Any>(
} }
} }
} }
} }

View File

@ -149,7 +149,7 @@ class WithReferencedStatesFlowTests {
val updatedRefState = updatedRefTx.tx.outRefsOfType<RefState.State>().single() val updatedRefState = updatedRefTx.tx.outRefsOfType<RefState.State>().single()
// 4. Try to use the old reference state. This will throw a NotaryException. // 4. Try to use the old reference state. This will throw a NotaryException.
val useRefTx = nodes[1].services.startFlow(WithReferencedStatesFlow(UseRefState(newRefState.state.data.linearId))).resultFuture val useRefTx = nodes[1].services.startFlow(WithReferencedStatesFlow { UseRefState(newRefState.state.data.linearId) }).resultFuture
// 5. Share the update reference state. // 5. Share the update reference state.
nodes[0].services.startFlow(ShareRefState.Initiator(updatedRefState)).resultFuture.getOrThrow() nodes[0].services.startFlow(ShareRefState.Initiator(updatedRefState)).resultFuture.getOrThrow()