diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt
index 7e7183edba..7ce883c37b 100644
--- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/StatemachineErrorHandlingTest.kt
@@ -39,8 +39,6 @@ import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
-import net.corda.testing.internal.IntegrationTest
-import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
@@ -59,10 +57,9 @@ import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
-class StatemachineErrorHandlingTest : IntegrationTest() {
+class StatemachineErrorHandlingTest {
companion object {
- val databaseSchemas = IntegrationTestSchemas(CHARLIE_NAME, ALICE_NAME, DUMMY_NOTARY_NAME)
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
var counter = 0
}
@@ -715,7 +712,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
ENDRULE
RULE Throw exception on retry
- CLASS ${MultiThreadedStateMachineManager::class.java.name}
+ CLASS ${SingleThreadedStateMachineManager::class.java.name}
METHOD addAndStartFlow
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("retry_exception_flag")
@@ -899,7 +896,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
ENDRULE
RULE Throw exception on retry
- CLASS ${MultiThreadedStateMachineManager::class.java.name}
+ CLASS ${SingleThreadedStateMachineManager::class.java.name}
METHOD onExternalStartFlow
AT ENTRY
IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")
diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt
index f19b3f55de..5c98b8a89a 100644
--- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt
@@ -12,11 +12,8 @@ import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
-import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
-import net.corda.testing.internal.IntegrationTest
-import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.node.User
import net.corda.testing.node.internal.findCordapp
import org.junit.After
@@ -31,11 +28,10 @@ import java.util.concurrent.TimeoutException
import javax.persistence.PersistenceException
import kotlin.test.assertFailsWith
-class VaultObserverExceptionTest : IntegrationTest() {
+class VaultObserverExceptionTest {
companion object {
@ClassRule
@JvmField
- val databaseSchemas = IntegrationTestSchemas(ALICE_NAME, DUMMY_NOTARY_NAME)
val log = contextLogger()
@@ -46,8 +42,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
}
@After
- override fun tearDown() {
- super.tearDown()
+ fun tearDown() {
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
StaffedFlowHospital.onFlowAdmitted.clear()
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
index 7bd7ec0992..59ff682484 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
@@ -16,6 +16,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
+import net.corda.core.internal.concurrent.mapError
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.SerializedBytes
@@ -210,12 +211,14 @@ class SingleThreadedStateMachineManager(
}
private fun startFlow(
+ flowId: StateMachineRunId,
flowLogic: FlowLogic,
context: InvocationContext,
ourIdentity: Party?,
deduplicationHandler: DeduplicationHandler?
): CordaFuture> {
return startFlowInternal(
+ flowId,
invocationContext = context,
flowLogic = flowLogic,
flowStart = FlowStart.Explicit,
@@ -230,7 +233,10 @@ class SingleThreadedStateMachineManager(
cancelTimeoutIfScheduled(id)
val flow = flows.remove(id)
if (flow != null) {
- logger.debug("Killing flow known to physical node.")
+ flow.fiber.transientState?.let {
+ flow.fiber.transientState = TransientReference(it.value.copy(isRemoved = true))
+ }
+ logger.info("Killing flow $id known to this node.")
decrementLiveFibers()
totalFinishedFlows.inc()
try {
@@ -343,58 +349,71 @@ class SingleThreadedStateMachineManager(
}
}
+ @Suppress("TooGenericExceptionCaught", "ComplexMethod") // this is fully intentional here, see comment in the catch clause
override fun retryFlowFromSafePoint(currentState: StateMachineState) {
// Get set of external events
val flowId = currentState.flowLogic.runId
- val oldFlowLeftOver = mutex.locked { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
- if (oldFlowLeftOver == null) {
- logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
- return
- }
- val flow = if (currentState.isAnyCheckpointPersisted) {
- // We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
- // we mirror exactly what happens when restarting the node.
- val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
- if (serializedCheckpoint == null) {
- logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
+ try {
+ val oldFlowLeftOver = mutex.locked { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
+ if (oldFlowLeftOver == null) {
+ logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
- // Resurrect flow
- createFlowFromCheckpoint(
+ val flow = if (currentState.isAnyCheckpointPersisted) {
+ // We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
+ // we mirror exactly what happens when restarting the node.
+ val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
+ if (serializedCheckpoint == null) {
+ logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
+ return
+ }
+ // Resurrect flow
+ createFlowFromCheckpoint(
id = flowId,
serializedCheckpoint = serializedCheckpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false
- ) ?: return
- } else {
- // Just flow initiation message
- null
- }
- mutex.locked {
- if (stopping) {
- return
+ ) ?: return
+ } else {
+ // Just flow initiation message
+ null
}
- // Remove any sessions the old flow has.
- for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
- sessionToFlow.remove(sessionId)
- }
- if (flow != null) {
- injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
- addAndStartFlow(flowId, flow)
- }
- // Deliver all the external events from the old flow instance.
- val unprocessedExternalEvents = mutableListOf()
- do {
- val event = oldFlowLeftOver.tryReceive()
- if (event is Event.GeneratedByExternalEvent) {
- unprocessedExternalEvents += event.deduplicationHandler.externalCause
+ mutex.locked {
+ if (stopping) {
+ return
+ }
+ // Remove any sessions the old flow has.
+ for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
+ sessionToFlow.remove(sessionId)
+ }
+ if (flow != null) {
+ injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
+ addAndStartFlow(flowId, flow)
+ }
+ // Deliver all the external events from the old flow instance.
+ val unprocessedExternalEvents = mutableListOf()
+ do {
+ val event = oldFlowLeftOver.tryReceive()
+ if (event is Event.GeneratedByExternalEvent) {
+ unprocessedExternalEvents += event.deduplicationHandler.externalCause
+ }
+ } while (event != null)
+ val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
+ for (externalEvent in externalEvents) {
+ deliverExternalEvent(externalEvent)
}
- } while (event != null)
- val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
- for (externalEvent in externalEvents) {
- deliverExternalEvent(externalEvent)
}
+ } catch (e: Exception) {
+ // Failed to retry - manually put the flow in for observation rather than
+ // relying on the [HospitalisingInterceptor] to do so
+ val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored)
+ ?.errors
+ ?.map { it.exception }
+ ?.plus(e) ?: emptyList()
+ logger.info("Failed to retry flow $flowId, keeping in for observation and aborting")
+ flowHospital.forceIntoOvernightObservation(flowId, exceptions)
+ throw e
}
}
@@ -410,7 +429,13 @@ class SingleThreadedStateMachineManager(
}
private fun onExternalStartFlow(event: ExternalEvent.ExternalStartFlowEvent) {
- val future = startFlow(event.flowLogic, event.context, ourIdentity = null, deduplicationHandler = event.deduplicationHandler)
+ val future = startFlow(
+ event.flowId,
+ event.flowLogic,
+ event.context,
+ ourIdentity = null,
+ deduplicationHandler = event.deduplicationHandler
+ )
event.wireUpFuture(future)
}
@@ -476,7 +501,16 @@ class SingleThreadedStateMachineManager(
is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion
is InitiatedFlowFactory.CorDapp -> null
}
- startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
+ startInitiatedFlow(
+ event.flowId,
+ flowLogic,
+ event.deduplicationHandler,
+ senderSession,
+ initiatedSessionId,
+ sessionMessage,
+ senderCoreFlowVersion,
+ initiatedFlowInfo
+ )
} catch (t: Throwable) {
logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " +
"flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t)
@@ -503,7 +537,9 @@ class SingleThreadedStateMachineManager(
return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass)
}
+ @Suppress("LongParameterList")
private fun startInitiatedFlow(
+ flowId: StateMachineRunId,
flowLogic: FlowLogic,
initiatingMessageDeduplicationHandler: DeduplicationHandler,
peerSession: FlowSessionImpl,
@@ -515,13 +551,19 @@ class SingleThreadedStateMachineManager(
val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo)
val ourIdentity = ourFirstIdentity
startFlowInternal(
- InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity,
+ flowId,
+ InvocationContext.peer(peerSession.counterparty.name),
+ flowLogic,
+ flowStart,
+ ourIdentity,
initiatingMessageDeduplicationHandler,
isStartIdempotent = false
)
}
+ @Suppress("LongParameterList")
private fun startFlowInternal(
+ flowId: StateMachineRunId,
invocationContext: InvocationContext,
flowLogic: FlowLogic,
flowStart: FlowStart,
@@ -529,7 +571,6 @@ class SingleThreadedStateMachineManager(
deduplicationHandler: DeduplicationHandler?,
isStartIdempotent: Boolean
): CordaFuture> {
- val flowId = StateMachineRunId.createRandom()
// Before we construct the state machine state by freezing the FlowLogic we need to make sure that lazy properties
// have access to the fiber (and thereby the service hub)
@@ -541,22 +582,44 @@ class SingleThreadedStateMachineManager(
val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
- val initialCheckpoint = Checkpoint.create(
- invocationContext,
- flowStart,
- flowLogic.javaClass,
- frozenFlowLogic,
- ourIdentity,
- flowCorDappVersion,
- flowLogic.isEnabledTimedFlow()
+ val flowAlreadyExists = mutex.locked { flows[flowId] != null }
+
+ val existingCheckpoint = if (flowAlreadyExists) {
+ // Load the flow's checkpoint
+ // The checkpoint will be missing if the flow failed before persisting the original checkpoint
+ // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
+ checkpointStorage.getCheckpoint(flowId)?.let { serializedCheckpoint ->
+ val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId)
+ if (checkpoint == null) {
+ return openFuture>().mapError {
+ IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " +
+ "Something is very wrong. The flow will not retry.")
+ }
+ } else {
+ checkpoint
+ }
+ }
+ } else {
+ // This is a brand new flow
+ null
+ }
+ val checkpoint = existingCheckpoint ?: Checkpoint.create(
+ invocationContext,
+ flowStart,
+ flowLogic.javaClass,
+ frozenFlowLogic,
+ ourIdentity,
+ flowCorDappVersion,
+ flowLogic.isEnabledTimedFlow()
).getOrThrow()
+
val startedFuture = openFuture()
val initialState = StateMachineState(
- checkpoint = initialCheckpoint,
+ checkpoint = checkpoint,
pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isTransactionTracked = false,
- isAnyCheckpointPersisted = false,
+ isAnyCheckpointPersisted = existingCheckpoint != null,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
flowLogic = flowLogic,
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt
index 8c0d011f6e..f8bdf269f8 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt
@@ -158,6 +158,33 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
return true
}
+ /**
+ * Forces the flow to be kept in for overnight observation by the hospital. A flow must already exist inside the hospital
+ * and have existing medical records for it to be moved to overnight observation. If it does not meet these criteria then
+ * an [IllegalArgumentException] will be thrown.
+ *
+ * @param id The [StateMachineRunId] of the flow that you are trying to force into observation
+ * @param errors The errors to include in the new medical record
+ */
+ fun forceIntoOvernightObservation(id: StateMachineRunId, errors: List) {
+ mutex.locked {
+ // If a flow does not meet the criteria below, then it has moved into an invalid state or the function is being
+ // called from an incorrect location. The assertions below should error out the flow if they are not true.
+ requireNotNull(flowsInHospital[id]) { "Flow must already be in the hospital before forcing into overnight observation" }
+ val history = requireNotNull(flowPatients[id]) { "Flow must already have history before forcing into overnight observation" }
+ // Use the last staff member that last discharged the flow as the current staff member
+ val record = history.records.last().copy(
+ time = clock.instant(),
+ errors = errors,
+ outcome = Outcome.OVERNIGHT_OBSERVATION
+ )
+ onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(id, record.by.map { it.toString() }) }
+ history.records += record
+ recordsPublisher.onNext(record)
+ }
+ }
+
+
/**
* Request treatment for the [flowFiber]. A flow can only be added to the hospital if they are not already being
* treated.
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
index c08ad7950f..53ca92c77e 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
@@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
+import net.corda.core.flows.Destination
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession