CORDA-3604 Store failed and hospitalised errors along with corresponding statuses (#6061)

Flows that are kept for overnight observation:

- Save their Checkpoint.status as 'HOSPITALIZED' in the database
- Save the error that caused the hospitalization in the database

A new Event was added for this reason. Whenever the hospital determines
a flow for hospitalization, it adds this Event in the flow's fiber queue.
When processed it creates a new DB transaction, stores the checkpoint status along with
the error, and it adds a 'FlowContinuation.ProcessEvents' continuation so that the fiber keeps
processing events (effectively since there are no more events in the fiber's channel, the fiber will suspend).

Flows that error:

- Their checkpoints are kept in the database
- Save their Checkpoint.status as 'FAILED'
- Save the error that caused the error in the database

Upon erroring, the flow's Checkpoint.status gets updated('FAILED') and the checkpoint is stored
in the database instead of getting removed. The flow then propagates the error to counterparties,
sets its future with the error and gets removed from memory.
This commit is contained in:
Kyriakos Tharrouniatis 2020-04-07 17:58:00 +01:00 committed by GitHub
parent 024d63147d
commit 501b766e71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 330 additions and 97 deletions

View File

@ -142,8 +142,7 @@ class FlowRetryTest {
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(3, TransientConnectionFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, it.proxy.startFlow(::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
assertEquals(1, it.proxy.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get())
}
}
}
@ -161,8 +160,7 @@ class FlowRetryTest {
.returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
}
assertEquals(3, WrappedTransientConnectionFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, it.proxy.startFlow(::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
assertEquals(1, it.proxy.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.HOSPITALIZED).returnValue.get())
}
}
}
@ -180,8 +178,7 @@ class FlowRetryTest {
it.proxy.startFlow(::GeneralExternalFailureFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()
}
assertEquals(0, GeneralExternalFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(1, it.proxy.startFlow(::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
assertEquals(1, it.proxy.startFlow(::GetCheckpointNumberOfStatusFlow, Checkpoint.FlowStatus.FAILED).returnValue.get())
}
}
}
@ -458,9 +455,14 @@ class GeneralExternalFailureResponder(private val session: FlowSession) : FlowLo
}
@StartableByRPC
class GetNumberOfUncompletedCheckpointsFlow : FlowLogic<Long>() {
class GetCheckpointNumberOfStatusFlow(private val flowStatus: Checkpoint.FlowStatus) : FlowLogic<Long>() {
override fun call(): Long {
val sqlStatement = "select count(*) from node_checkpoints where status not in (${Checkpoint.FlowStatus.COMPLETED.ordinal})"
val sqlStatement =
"select count(*) " +
"from node_checkpoints " +
"where status = ${flowStatus.ordinal} " +
"and flow_id != '${runId.uuid}' " // don't count in the checkpoint of the current flow
return serviceHub.jdbcSession().prepareStatement(sqlStatement).use { ps ->
ps.executeQuery().use { rs ->
rs.next()

View File

@ -4,6 +4,7 @@ import net.corda.core.context.InvocationContext
import net.corda.core.context.InvocationOrigin
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
@ -19,6 +20,7 @@ import net.corda.node.services.statemachine.SubFlowVersion
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY
import org.apache.commons.lang3.exception.ExceptionUtils
import org.hibernate.annotations.Type
import java.sql.Connection
import java.sql.SQLException
@ -50,8 +52,12 @@ class DBCheckpointStorage(
private const val HMAC_SIZE_BYTES = 16
private const val MAX_PROGRESS_STEP_LENGTH = 256
@VisibleForTesting
const val MAX_STACKTRACE_LENGTH = 4000
private const val MAX_EXC_MSG_LENGTH = 4000
private const val MAX_EXC_TYPE_LENGTH = 256
private const val MAX_FLOW_NAME_LENGTH = 128
private const val MAX_PROGRESS_STEP_LENGTH = 256
private val NOT_RUNNABLE_CHECKPOINTS = listOf(FlowStatus.COMPLETED, FlowStatus.FAILED, FlowStatus.KILLED)
@ -170,15 +176,18 @@ class DBCheckpointStorage(
var id: Long = 0,
@Column(name = "type", nullable = false)
var type: Class<out Throwable>,
@Type(type = "corda-blob")
@Column(name = "exception_value", nullable = false)
var value: ByteArray = EMPTY_BYTE_ARRAY,
var type: String,
@Column(name = "exception_message")
var message: String? = null,
@Column(name = "stack_trace", nullable = false)
var stackTrace: String,
@Type(type = "corda-blob")
@Column(name = "exception_value")
var value: ByteArray? = null,
@Column(name = "timestamp")
val persistedInstant: Instant
)
@ -307,7 +316,8 @@ class DBCheckpointStorage(
}
}
private fun getDBCheckpoint(id: StateMachineRunId): DBFlowCheckpoint? {
@VisibleForTesting
internal fun getDBCheckpoint(id: StateMachineRunId): DBFlowCheckpoint? {
return currentDBSession().find(DBFlowCheckpoint::class.java, id.uuid.toString())
}
@ -375,10 +385,15 @@ class DBCheckpointStorage(
// Load the previous entity from the hibernate cache so the meta data join does not get updated
val entity = currentDBSession().find(DBFlowCheckpoint::class.java, flowId)
val serializedCheckpointState = checkpoint.checkpointState.storageSerialize()
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
// Do not update in DB [Checkpoint.checkpointState] or [Checkpoint.flowState] if flow failed or got hospitalized
val blob = if (checkpoint.status == FlowStatus.FAILED || checkpoint.status == FlowStatus.HOSPITALIZED) {
entity.blob
} else {
val serializedCheckpointState = checkpoint.checkpointState.storageSerialize()
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now)
}
val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now)
//This code needs to be added back in when we want to persist the result. For now this requires the result to be @CordaSerializable.
//val result = updateDBFlowResult(entity, checkpoint, now)
val exceptionDetails = updateDBFlowException(entity, checkpoint, now)
@ -478,9 +493,10 @@ class DBCheckpointStorage(
private fun createDBFlowException(errorState: ErrorState.Errored, now: Instant): DBFlowException {
return errorState.errors.last().exception.let {
DBFlowException(
type = it::class.java,
message = it.message,
value = it.storageSerialize().bytes,
type = it::class.java.name.truncate(MAX_EXC_TYPE_LENGTH, true),
message = it.message?.truncate(MAX_EXC_MSG_LENGTH, false),
stackTrace = it.stackTraceToString(),
value = null, // TODO to be populated upon implementing https://r3-cev.atlassian.net/browse/CORDA-3681
persistedInstant = now
)
}
@ -528,4 +544,26 @@ class DBCheckpointStorage(
FlowStatus.COMPLETED, FlowStatus.KILLED, FlowStatus.FAILED -> true
else -> false
}
private fun String.truncate(maxLength: Int, withWarnings: Boolean): String {
var str = this
if (length > maxLength) {
if (withWarnings) {
log.warn("Truncating long string before storing it into the database. String: $str.")
}
str = str.substring(0, maxLength)
}
return str
}
private fun Throwable.stackTraceToString(): String {
var stackTraceStr = ExceptionUtils.getStackTrace(this)
if (stackTraceStr.length > MAX_STACKTRACE_LENGTH) {
// cut off the last line, which will be a half line
val lineBreak = System.getProperty("line.separator")
val truncateIndex = stackTraceStr.lastIndexOf(lineBreak, MAX_STACKTRACE_LENGTH - 1)
stackTraceStr = stackTraceStr.substring(0, truncateIndex + lineBreak.length) // include last line break in
}
return stackTraceStr
}
}

View File

@ -154,6 +154,15 @@ sealed class Event {
override fun toString() = "RetryFlowFromSafePoint"
}
/**
* Keeps a flow for overnight observation. Overnight observation practically sends the fiber to get suspended,
* in [FlowStateMachineImpl.processEventsUntilFlowIsResumed]. Since the fiber's channel will have no more events to process,
* the fiber gets suspended (i.e. hospitalized).
*/
object OvernightObservation : Event() {
override fun toString() = "OvernightObservation"
}
/**
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
* even if it has not yet been processed and placed on the pending de-duplication handlers list.

View File

@ -223,7 +223,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
log.info("Flow error kept for overnight observation by ${report.by} (error was ${report.error.message})")
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) }
Triple(Outcome.OVERNIGHT_OBSERVATION, null, 0.seconds)
Triple(Outcome.OVERNIGHT_OBSERVATION, Event.OvernightObservation, 0.seconds)
}
Diagnosis.NOT_MY_SPECIALTY, Diagnosis.TERMINAL -> {
// None of the staff care for these errors, or someone decided it is a terminal condition, so we let them propagate
@ -239,14 +239,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
Pair(event, backOffForChronicCondition)
}
if (event != null) {
if (backOffForChronicCondition.isZero) {
if (backOffForChronicCondition.isZero) {
flowFiber.scheduleEvent(event)
} else {
hospitalJobTimer.schedule(timerTask {
flowFiber.scheduleEvent(event)
} else {
hospitalJobTimer.schedule(timerTask {
flowFiber.scheduleEvent(event)
}, backOffForChronicCondition.toMillis())
}
}, backOffForChronicCondition.toMillis())
}
}

View File

@ -59,11 +59,11 @@ class ErrorFlowTransition(
// If we haven't been removed yet remove the flow.
if (!currentState.isRemoved) {
actions.add(Action.CreateTransaction)
if (currentState.isAnyCheckpointPersisted) {
actions.add(Action.RemoveCheckpoint(context.id))
}
val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
actions.addAll(arrayOf(
Action.CreateTransaction,
Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(context.id.uuid),
Action.CommitTransaction,
@ -72,6 +72,7 @@ class ErrorFlowTransition(
))
currentState = currentState.copy(
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isRemoved = true
)

View File

@ -48,6 +48,7 @@ class TopLevelTransition(
is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event)
is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event)
is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition(startingState)
is Event.OvernightObservation -> overnightObservationTransition()
}
}
@ -309,4 +310,15 @@ class TopLevelTransition(
FlowContinuation.Abort
}
}
private fun overnightObservationTransition(): TransitionResult {
return builder {
val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
actions.add(Action.CreateTransaction)
actions.add(Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted))
actions.add(Action.CommitTransaction)
currentState = currentState.copy(checkpoint = newCheckpoint)
FlowContinuation.ProcessEvents
}
}
}

View File

@ -81,10 +81,13 @@
<column name="type" type="NVARCHAR(256)">
<constraints nullable="false"/>
</column>
<column name="exception_value" type="varbinary(33554432)">
<column name="exception_message" type="NVARCHAR(4000)">
<constraints nullable="true"/>
</column>
<column name="stack_trace" type="NVARCHAR(4000)">
<constraints nullable="false"/>
</column>
<column name="exception_message" type="NVARCHAR(512)">
<column name="exception_value" type="varbinary(33554432)">
<constraints nullable="true"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">

View File

@ -81,10 +81,13 @@
<column name="type" type="NVARCHAR(256)">
<constraints nullable="false"/>
</column>
<column name="exception_value" type="blob">
<column name="exception_message" type="NVARCHAR(4000)">
<constraints nullable="true"/>
</column>
<column name="stack_trace" type="NVARCHAR(4000)">
<constraints nullable="false"/>
</column>
<column name="exception_message" type="NVARCHAR(512)">
<column name="exception_value" type="blob">
<constraints nullable="true"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">

View File

@ -8,6 +8,7 @@ import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.core.utilities.contextLogger
import net.corda.node.internal.CheckpointIncompatibleException
import net.corda.node.internal.CheckpointVerifier
import net.corda.node.services.api.CheckpointStorage
@ -28,6 +29,7 @@ import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.apache.commons.lang3.exception.ExceptionUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
@ -38,18 +40,22 @@ import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import java.time.Clock
import java.util.ArrayList
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertTrue
internal fun CheckpointStorage.checkpoints(): List<Checkpoint.Serialized> {
return getRunnableCheckpoints().use {
return getAllCheckpoints().use {
it.map { it.second }.toList()
}
}
class DBCheckpointStorageTests {
private companion object {
val log = contextLogger()
val ALICE = TestIdentity(ALICE_NAME, 70).party
}
@ -423,7 +429,7 @@ class DBCheckpointStorageTests {
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
val exceptionDetails = session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails
assertNotNull(exceptionDetails)
assertEquals(exception::class.java, exceptionDetails!!.type)
assertEquals(exception::class.java.name, exceptionDetails!!.type)
assertEquals(exception.message, exceptionDetails.message)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java))
@ -451,7 +457,7 @@ class DBCheckpointStorageTests {
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize().errorState is ErrorState.Clean)
val exceptionDetails = session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails
assertNotNull(exceptionDetails)
assertEquals(illegalArgumentException::class.java, exceptionDetails!!.type)
assertEquals(illegalArgumentException::class.java.name, exceptionDetails!!.type)
assertEquals(illegalArgumentException.message, exceptionDetails.message)
val criteria = session.criteriaBuilder.createQuery(DBCheckpointStorage.DBFlowException::class.java)
criteria.select(criteria.from(DBCheckpointStorage.DBFlowException::class.java))
@ -566,6 +572,85 @@ class DBCheckpointStorageTests {
}
}
@Test(timeout = 300_000)
fun `-not greater than DBCheckpointStorage_MAX_STACKTRACE_LENGTH- stackTrace gets persisted as a whole`() {
val smallerDummyStackTrace = ArrayList<StackTraceElement>()
val dummyStackTraceElement = StackTraceElement("class", "method", "file", 0)
for (i in 0 until iterationsBasedOnLineSeparatorLength()) {
smallerDummyStackTrace.add(dummyStackTraceElement)
}
val smallerStackTraceException = java.lang.IllegalStateException()
.apply {
this.stackTrace = smallerDummyStackTrace.toTypedArray()
}
val smallerStackTraceSize = ExceptionUtils.getStackTrace(smallerStackTraceException).length
assertTrue(smallerStackTraceSize < DBCheckpointStorage.MAX_STACKTRACE_LENGTH)
val (id, checkpoint) = newCheckpoint()
database.transaction {
val serializedFlowState = checkpoint.serializeFlowState()
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
checkpointStorage.updateCheckpoint(id, checkpoint.addError(smallerStackTraceException), serializedFlowState)
}
database.transaction {
val persistedCheckpoint = checkpointStorage.getDBCheckpoint(id)
val persistedStackTrace = persistedCheckpoint!!.exceptionDetails!!.stackTrace
assertEquals(smallerStackTraceSize, persistedStackTrace.length)
assertEquals(ExceptionUtils.getStackTrace(smallerStackTraceException), persistedStackTrace)
}
}
@Test(timeout = 300_000)
fun `-greater than DBCheckpointStorage_MAX_STACKTRACE_LENGTH- stackTrace gets truncated to MAX_LENGTH_VARCHAR, and persisted`() {
val smallerDummyStackTrace = ArrayList<StackTraceElement>()
val dummyStackTraceElement = StackTraceElement("class", "method", "file", 0)
for (i in 0 until iterationsBasedOnLineSeparatorLength()) {
smallerDummyStackTrace.add(dummyStackTraceElement)
}
val smallerStackTraceException = java.lang.IllegalStateException()
.apply {
this.stackTrace = smallerDummyStackTrace.toTypedArray()
}
val smallerStackTraceSize = ExceptionUtils.getStackTrace(smallerStackTraceException).length
log.info("smallerStackTraceSize = $smallerStackTraceSize")
assertTrue(smallerStackTraceSize < DBCheckpointStorage.MAX_STACKTRACE_LENGTH)
val biggerStackTraceException = java.lang.IllegalStateException()
.apply {
this.stackTrace = (smallerDummyStackTrace + dummyStackTraceElement).toTypedArray()
}
val biggerStackTraceSize = ExceptionUtils.getStackTrace(biggerStackTraceException).length
log.info("biggerStackTraceSize = $biggerStackTraceSize")
assertTrue(biggerStackTraceSize > DBCheckpointStorage.MAX_STACKTRACE_LENGTH)
val (id, checkpoint) = newCheckpoint()
database.transaction {
val serializedFlowState = checkpoint.serializeFlowState()
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
checkpointStorage.updateCheckpoint(id, checkpoint.addError(biggerStackTraceException), serializedFlowState)
}
database.transaction {
val persistedCheckpoint = checkpointStorage.getDBCheckpoint(id)
val persistedStackTrace = persistedCheckpoint!!.exceptionDetails!!.stackTrace
// last line of DBFlowException.stackTrace was a half line; will be truncated to the last whole line,
// therefore it will have the exact same length as 'notGreaterThanDummyException' exception
assertEquals(smallerStackTraceSize, persistedStackTrace.length)
assertEquals(ExceptionUtils.getStackTrace(smallerStackTraceException), persistedStackTrace)
}
}
private fun iterationsBasedOnLineSeparatorLength() = when {
System.getProperty("line.separator").length == 1 -> // Linux or Mac
158
System.getProperty("line.separator").length == 2 -> // Windows
152
else -> throw IllegalStateException("Unknown line.separator")
}
private fun newCheckpointStorage() {
database.transaction {
checkpointStorage = DBCheckpointStorage(

View File

@ -15,20 +15,22 @@ import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.queryBy
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
@ -41,9 +43,6 @@ import net.corda.core.utilities.unwrap
import net.corda.node.services.persistence.CheckpointPerformanceRecorder
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyState
import net.corda.testing.core.ALICE_NAME
@ -60,6 +59,7 @@ import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.getMessage
import net.corda.testing.node.internal.startFlow
import org.apache.commons.lang3.exception.ExceptionUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
@ -73,11 +73,12 @@ import org.junit.Before
import org.junit.Test
import rx.Notification
import rx.Observable
import java.sql.SQLException
import java.sql.SQLTransientConnectionException
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.concurrent.TimeoutException
import java.util.function.Predicate
import kotlin.reflect.KClass
import kotlin.streams.toList
@ -138,6 +139,9 @@ class FlowFrameworkTests {
fun cleanUp() {
mockNet.stopNodes()
receivedSessionMessages.clear()
SuspendingFlow.hookBeforeCheckpoint = {}
SuspendingFlow.hookAfterCheckpoint = {}
}
@Test(timeout=300_000)
@ -297,7 +301,8 @@ class FlowFrameworkTests {
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
.withStackTraceContaining("Received counter-flow exception from peer")
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
val checkpoint = bobNode.internals.checkpointStorage.checkpoints().single()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpoint.status)
}
assertThat(receivingFiber.state).isEqualTo(Strand.State.WAITING)
@ -660,83 +665,160 @@ class FlowFrameworkTests {
@Test(timeout=300_000)
fun `Checkpoint status changes to RUNNABLE when flow is loaded from checkpoint - FlowState Unstarted`() {
var firstExecution = true
var checkpointStatusInDBBeforeSuspension: Checkpoint.FlowStatus? = null
var checkpointStatusInDBAfterSuspension: Checkpoint.FlowStatus? = null
var checkpointStatusInMemoryBeforeSuspension: Checkpoint.FlowStatus? = null
var flowState: FlowState? = null
var dbCheckpointStatusBeforeSuspension: Checkpoint.FlowStatus? = null
var dbCheckpointStatusAfterSuspension: Checkpoint.FlowStatus? = null
var inMemoryCheckpointStatusBeforeSuspension: Checkpoint.FlowStatus? = null
val futureFiber = openFuture<FlowStateMachineImpl<*>>().toCompletableFuture()
SuspendingFlow.hookBeforeCheckpoint = {
val flowFiber = this as? FlowStateMachineImpl<*>
assertTrue(flowFiber!!.transientState!!.value.checkpoint.flowState is FlowState.Unstarted)
flowState = flowFiber!!.transientState!!.value.checkpoint.flowState
if (firstExecution) {
// the following manual persisting Checkpoint.status to FAILED should be removed when implementing CORDA-3604.
manuallyFailCheckpointInDB(aliceNode)
firstExecution = false
throw SQLException("deadlock") // will cause flow to retry
throw HospitalizeFlowException()
} else {
// The persisted Checkpoint should be still failed here -> it should change to RUNNABLE after suspension
checkpointStatusInDBBeforeSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
checkpointStatusInMemoryBeforeSuspension = flowFiber.transientState!!.value.checkpoint.status
dbCheckpointStatusBeforeSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
inMemoryCheckpointStatusBeforeSuspension = flowFiber.transientState!!.value.checkpoint.status
futureFiber.complete(flowFiber)
}
}
SuspendingFlow.hookAfterCheckpoint = {
checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getRunnableCheckpoints().toList().single().second.status
dbCheckpointStatusAfterSuspension = aliceNode.internals.checkpointStorage.getRunnableCheckpoints().toList().single()
.second.status
}
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatusInDBBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInMemoryBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInDBAfterSuspension)
SuspendingFlow.hookBeforeCheckpoint = {}
SuspendingFlow.hookAfterCheckpoint = {}
assertFailsWith<TimeoutException> {
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow(30.seconds) // wait till flow gets hospitalized
}
// flow is in hospital
assertTrue(flowState is FlowState.Unstarted)
val inMemoryHospitalizedCheckpointStatus = aliceNode.internals.smm.snapshot().first().transientState?.value?.checkpoint?.status
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, inMemoryHospitalizedCheckpointStatus)
aliceNode.database.transaction {
val checkpoint = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, checkpoint.status)
}
// restart Node - flow will be loaded from checkpoint
firstExecution = false
aliceNode = mockNet.restartNode(aliceNode)
futureFiber.get().resultFuture.getOrThrow() // wait until the flow has completed
// checkpoint states ,after flow retried, before and after suspension
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, dbCheckpointStatusBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, inMemoryCheckpointStatusBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, dbCheckpointStatusAfterSuspension)
}
@Test(timeout=300_000)
fun `Checkpoint status changes to RUNNABLE when flow is loaded from checkpoint - FlowState Started`() {
var firstExecution = true
var checkpointStatusInDB: Checkpoint.FlowStatus? = null
var checkpointStatusInMemory: Checkpoint.FlowStatus? = null
var flowState: FlowState? = null
var dbCheckpointStatus: Checkpoint.FlowStatus? = null
var inMemoryCheckpointStatus: Checkpoint.FlowStatus? = null
val futureFiber = openFuture<FlowStateMachineImpl<*>>().toCompletableFuture()
SuspendingFlow.hookAfterCheckpoint = {
val flowFiber = this as? FlowStateMachineImpl<*>
assertTrue(flowFiber!!.transientState!!.value.checkpoint.flowState is FlowState.Started)
flowState = flowFiber!!.transientState!!.value.checkpoint.flowState
if (firstExecution) {
// the following manual persisting Checkpoint.status to FAILED should be removed when implementing CORDA-3604.
manuallyFailCheckpointInDB(aliceNode)
firstExecution = false
throw SQLException("deadlock") // will cause flow to retry
throw HospitalizeFlowException()
} else {
checkpointStatusInDB = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
checkpointStatusInMemory = flowFiber.transientState!!.value.checkpoint.status
dbCheckpointStatus = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
inMemoryCheckpointStatus = flowFiber.transientState!!.value.checkpoint.status
futureFiber.complete(flowFiber)
}
}
assertFailsWith<TimeoutException> {
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow(30.seconds) // wait till flow gets hospitalized
}
// flow is in hospital
assertTrue(flowState is FlowState.Started)
aliceNode.database.transaction {
val checkpoint = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, checkpoint.status)
}
// restart Node - flow will be loaded from checkpoint
firstExecution = false
aliceNode = mockNet.restartNode(aliceNode)
futureFiber.get().resultFuture.getOrThrow() // wait until the flow has completed
// checkpoint states ,after flow retried, after suspension
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, dbCheckpointStatus)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, inMemoryCheckpointStatus)
}
@Test(timeout=300_000)
fun `Checkpoint is updated in DB with FAILED status and the error when flow fails`() {
var flowId: StateMachineRunId? = null
val e = assertFailsWith<FlowException> {
val fiber = aliceNode.services.startFlow(ExceptionFlow { FlowException("Just an exception") })
flowId = fiber.id
fiber.resultFuture.getOrThrow()
}
aliceNode.database.transaction {
val checkpoint = aliceNode.internals.checkpointStorage.checkpoints().single()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpoint.status)
// assert all fields of DBFlowException
val persistedException = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowId!!)!!.exceptionDetails
assertEquals(FlowException::class.java.name, persistedException!!.type)
assertEquals("Just an exception", persistedException.message)
assertEquals(ExceptionUtils.getStackTrace(e), persistedException.stackTrace)
assertEquals(null, persistedException.value)
}
}
@Test(timeout=300_000)
fun `Checkpoint is updated in DB with HOSPITALIZED status and the error when flow is kept for overnight observation` () {
var flowId: StateMachineRunId? = null
assertFailsWith<TimeoutException> {
val fiber = aliceNode.services.startFlow(ExceptionFlow { HospitalizeFlowException("Overnight observation") })
flowId = fiber.id
fiber.resultFuture.getOrThrow(10.seconds)
}
aliceNode.database.transaction {
val checkpoint = aliceNode.internals.checkpointStorage.checkpoints().single()
assertEquals(Checkpoint.FlowStatus.HOSPITALIZED, checkpoint.status)
// assert all fields of DBFlowException
val persistedException = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowId!!)!!.exceptionDetails
assertEquals(HospitalizeFlowException::class.java.name, persistedException!!.type)
assertEquals("Overnight observation", persistedException.message)
assertEquals(null, persistedException.value)
}
}
@Test(timeout=300_000)
fun `Checkpoint status and error in memory and in DB are not dirtied upon flow retry`() {
var firstExecution = true
var dbCheckpointStatus: Checkpoint.FlowStatus? = null
var inMemoryCheckpointStatus: Checkpoint.FlowStatus? = null
var persistedException: DBCheckpointStorage.DBFlowException? = null
SuspendingFlow.hookAfterCheckpoint = {
if (firstExecution) {
firstExecution = false
throw SQLTransientConnectionException("connection is not available")
} else {
val flowFiber = this as? FlowStateMachineImpl<*>
dbCheckpointStatus = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
inMemoryCheckpointStatus = flowFiber!!.transientState!!.value.checkpoint.status
persistedException = aliceNode.internals.checkpointStorage.getDBCheckpoint(flowFiber.id)!!.exceptionDetails
}
}
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatusInDB)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInMemory)
SuspendingFlow.hookAfterCheckpoint = {}
}
// the following method should be removed when implementing CORDA-3604.
private fun manuallyFailCheckpointInDB(node: TestStartedNode) {
val idCheckpoint = node.internals.checkpointStorage.getRunnableCheckpoints().toList().single()
val checkpoint = idCheckpoint.second
val updatedCheckpoint = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
node.internals.checkpointStorage.updateCheckpoint(idCheckpoint.first,
updatedCheckpoint.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT),
updatedCheckpoint.serializedFlowState)
contextTransaction.commit()
contextTransaction.close()
contextTransactionOrNull = null
contextDatabase.newTransaction()
// checkpoint states ,after flow retried, after suspension
assertEquals(Checkpoint.FlowStatus.RUNNABLE, dbCheckpointStatus)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, inMemoryCheckpointStatus)
assertEquals(null, persistedException)
}
//region Helpers