Merge branch 'release/os/4.4' into dan/os-4.4-to-4.5-merge-2020-03-23

This commit is contained in:
LankyDan 2020-03-23 11:46:24 +00:00
commit 82e068be94
6 changed files with 78 additions and 38 deletions

View File

@ -20,6 +20,7 @@ import net.corda.core.schemas.MappedSchema
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.statemachine.StaffedFlowHospital import net.corda.node.services.statemachine.StaffedFlowHospital
import org.junit.Before import org.junit.Before
import java.sql.SQLTransientConnectionException import java.sql.SQLTransientConnectionException
@ -72,11 +73,12 @@ abstract class AbstractFlowExternalOperationTest {
@Suspendable @Suspendable
override fun call(): Any { override fun call(): Any {
log.info("Started my flow") log.info("Started my flow")
subFlow(PingPongFlow(party))
val result = testCode() val result = testCode()
val session = initiateFlow(party) val session = initiateFlow(party)
session.send("hi there") session.sendAndReceive<String>("hi there").unwrap { it }
log.info("ServiceHub value = $serviceHub") session.sendAndReceive<String>("hi there").unwrap { it }
session.receive<String>() subFlow(PingPongFlow(party))
log.info("Finished my flow") log.info("Finished my flow")
return result return result
} }
@ -92,8 +94,28 @@ abstract class AbstractFlowExternalOperationTest {
class FlowWithExternalOperationResponder(val session: FlowSession) : FlowLogic<Unit>() { class FlowWithExternalOperationResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable @Suspendable
override fun call() { override fun call() {
session.receive<String>() session.receive<String>().unwrap { it }
session.send("go away") session.send("go away")
session.receive<String>().unwrap { it }
session.send("go away")
}
}
@InitiatingFlow
class PingPongFlow(val party: Party): FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(party)
session.sendAndReceive<String>("ping pong").unwrap { it }
}
}
@InitiatedBy(PingPongFlow::class)
class PingPongResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
session.send("I got you bro")
} }
} }
@ -111,7 +133,7 @@ abstract class AbstractFlowExternalOperationTest {
fun createFuture(): CompletableFuture<Any> { fun createFuture(): CompletableFuture<Any> {
return CompletableFuture.supplyAsync(Supplier<Any> { return CompletableFuture.supplyAsync(Supplier<Any> {
log.info("Starting sleep inside of future") log.info("Starting sleep inside of future")
Thread.sleep(2000) Thread.sleep(1000)
log.info("Finished sleep inside of future") log.info("Finished sleep inside of future")
"Here is your return value" "Here is your return value"
}, executorService) }, executorService)

View File

@ -624,7 +624,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint, checkpoint = checkpoint,
pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(), pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false, isFlowResumed = false,
isTransactionTracked = false, isWaitingForFuture = false,
isAnyCheckpointPersisted = existingCheckpoint != null, isAnyCheckpointPersisted = existingCheckpoint != null,
isStartIdempotent = isStartIdempotent, isStartIdempotent = isStartIdempotent,
isRemoved = false, isRemoved = false,
@ -780,7 +780,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint, checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false, isFlowResumed = false,
isTransactionTracked = false, isWaitingForFuture = false,
isAnyCheckpointPersisted = isAnyCheckpointPersisted, isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent, isStartIdempotent = isStartIdempotent,
isRemoved = false, isRemoved = false,
@ -804,7 +804,7 @@ class SingleThreadedStateMachineManager(
checkpoint = checkpoint.copy(), checkpoint = checkpoint.copy(),
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(), pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false, isFlowResumed = false,
isTransactionTracked = false, isWaitingForFuture = false,
isAnyCheckpointPersisted = isAnyCheckpointPersisted, isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent, isStartIdempotent = isStartIdempotent,
isRemoved = false, isRemoved = false,

View File

@ -21,7 +21,7 @@ import java.time.Instant
* @param pendingDeduplicationHandlers the list of incomplete deduplication handlers. * @param pendingDeduplicationHandlers the list of incomplete deduplication handlers.
* @param isFlowResumed true if the control is returned (or being returned) to "user-space" flow code. This is used * @param isFlowResumed true if the control is returned (or being returned) to "user-space" flow code. This is used
* to make [Event.DoRemainingWork] idempotent. * to make [Event.DoRemainingWork] idempotent.
* @param isTransactionTracked true if a ledger transaction has been tracked as part of a * @param isWaitingForFuture true if the flow is waiting for the completion of a future triggered by one of the statemachine's actions
* [FlowIORequest.WaitForLedgerCommit]. This used is to make tracking idempotent. * [FlowIORequest.WaitForLedgerCommit]. This used is to make tracking idempotent.
* @param isAnyCheckpointPersisted true if at least a single checkpoint has been persisted. This is used to determine * @param isAnyCheckpointPersisted true if at least a single checkpoint has been persisted. This is used to determine
* whether we should DELETE the checkpoint at the end of the flow. * whether we should DELETE the checkpoint at the end of the flow.
@ -34,15 +34,15 @@ import java.time.Instant
// TODO perhaps add a read-only environment to the state machine for things that don't change over time? // TODO perhaps add a read-only environment to the state machine for things that don't change over time?
// TODO evaluate persistent datastructure libraries to replace the inefficient copying we currently do. // TODO evaluate persistent datastructure libraries to replace the inefficient copying we currently do.
data class StateMachineState( data class StateMachineState(
val checkpoint: Checkpoint, val checkpoint: Checkpoint,
val flowLogic: FlowLogic<*>, val flowLogic: FlowLogic<*>,
val pendingDeduplicationHandlers: List<DeduplicationHandler>, val pendingDeduplicationHandlers: List<DeduplicationHandler>,
val isFlowResumed: Boolean, val isFlowResumed: Boolean,
val isTransactionTracked: Boolean, val isWaitingForFuture: Boolean,
val isAnyCheckpointPersisted: Boolean, val isAnyCheckpointPersisted: Boolean,
val isStartIdempotent: Boolean, val isStartIdempotent: Boolean,
val isRemoved: Boolean, val isRemoved: Boolean,
val senderUUID: String? val senderUUID: String?
) )
/** /**

View File

@ -95,14 +95,16 @@ class StartedFlowTransition(
} }
private fun waitForLedgerCommitTransition(flowIORequest: FlowIORequest.WaitForLedgerCommit): TransitionResult { private fun waitForLedgerCommitTransition(flowIORequest: FlowIORequest.WaitForLedgerCommit): TransitionResult {
return if (!startingState.isTransactionTracked) { // This ensures that the [WaitForLedgerCommit] request is not executed multiple times if extra
// [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up
return if (!startingState.isWaitingForFuture) {
TransitionResult( TransitionResult(
newState = startingState.copy(isTransactionTracked = true), newState = startingState.copy(isWaitingForFuture = true),
actions = listOf( actions = listOf(
Action.CreateTransaction, Action.CreateTransaction,
Action.TrackTransaction(flowIORequest.hash), Action.TrackTransaction(flowIORequest.hash),
Action.CommitTransaction Action.CommitTransaction
) )
) )
} else { } else {
TransitionResult(startingState) TransitionResult(startingState)
@ -416,12 +418,19 @@ class StartedFlowTransition(
} }
private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult { private fun executeAsyncOperation(flowIORequest: FlowIORequest.ExecuteAsyncOperation<*>): TransitionResult {
return builder { // This ensures that the [ExecuteAsyncOperation] request is not executed multiple times if extra
// The `numberOfSuspends` is added to the deduplication ID in case an async // [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up
// operation is executed multiple times within the same flow. return if (!startingState.isWaitingForFuture) {
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString() builder {
actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation)) // The `numberOfSuspends` is added to the deduplication ID in case an async
FlowContinuation.ProcessEvents // operation is executed multiple times within the same flow.
val deduplicationId = context.id.toString() + ":" + currentState.checkpoint.numberOfSuspends.toString()
actions.add(Action.ExecuteAsyncOperation(deduplicationId, flowIORequest.operation))
currentState = currentState.copy(isWaitingForFuture = true)
FlowContinuation.ProcessEvents
}
} else {
TransitionResult(startingState)
} }
} }

View File

@ -1,5 +1,6 @@
package net.corda.node.services.statemachine.transitions package net.corda.node.services.statemachine.transitions
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.internal.FlowIORequest import net.corda.core.internal.FlowIORequest
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
@ -60,11 +61,8 @@ class TopLevelTransition(
private fun transactionCommittedTransition(event: Event.TransactionCommitted): TransitionResult { private fun transactionCommittedTransition(event: Event.TransactionCommitted): TransitionResult {
return builder { return builder {
val checkpoint = currentState.checkpoint val checkpoint = currentState.checkpoint
if (currentState.isTransactionTracked && if (isWaitingForLedgerCommit(currentState, checkpoint, event.transaction.id)) {
checkpoint.flowState is FlowState.Started && currentState = currentState.copy(isWaitingForFuture = false)
checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit &&
checkpoint.flowState.flowIORequest.hash == event.transaction.id) {
currentState = currentState.copy(isTransactionTracked = false)
if (isErrored()) { if (isErrored()) {
return@builder FlowContinuation.ProcessEvents return@builder FlowContinuation.ProcessEvents
} }
@ -76,6 +74,17 @@ class TopLevelTransition(
} }
} }
private fun isWaitingForLedgerCommit(
currentState: StateMachineState,
checkpoint: Checkpoint,
transactionId: SecureHash
): Boolean {
return currentState.isWaitingForFuture &&
checkpoint.flowState is FlowState.Started &&
checkpoint.flowState.flowIORequest is FlowIORequest.WaitForLedgerCommit &&
checkpoint.flowState.flowIORequest.hash == transactionId
}
private fun softShutdownTransition(): TransitionResult { private fun softShutdownTransition(): TransitionResult {
val lastState = startingState.copy(isRemoved = true) val lastState = startingState.copy(isRemoved = true)
return TransitionResult( return TransitionResult(

View File

@ -68,13 +68,13 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
fun resumeFlowLogic(result: Any?): FlowContinuation { fun resumeFlowLogic(result: Any?): FlowContinuation {
actions.add(Action.CreateTransaction) actions.add(Action.CreateTransaction)
currentState = currentState.copy(isFlowResumed = true) currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false)
return FlowContinuation.Resume(result) return FlowContinuation.Resume(result)
} }
fun resumeFlowLogic(result: Throwable): FlowContinuation { fun resumeFlowLogic(result: Throwable): FlowContinuation {
actions.add(Action.CreateTransaction) actions.add(Action.CreateTransaction)
currentState = currentState.copy(isFlowResumed = true) currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false)
return FlowContinuation.Throw(result) return FlowContinuation.Throw(result)
} }
} }