CORDA-3692 - Store result information in checkpoint results table (#6473)
Making statemachine not remove COMPLETED flows' checkpoints from the database if they are started with a clientId, instead they are getting persisted and retained within the database along with their result (`DBFlowResult`). On flow start with a client id (`startFlowDynamicWithClientId`), if the client id maps to a flow that was previously started with the same client id and the flow is now finished, then fetch the `DBFlowResult` from the database to construct a `FlowStateMachineHandle` done future and return it back to the client. Object stored as results must abide by the storage serializer rules. If they fail to do so the result will not be stored and an exception is thrown to the client to indicate this.
@ -0,0 +1,11 @@
package net.corda.core.flows
import net.corda.core.CordaRuntimeException
import net.corda.core.serialization.internal.MissingSerializerException
* Thrown whenever a flow result cannot be serialized when attempting to save it in the database
class ResultSerializationException private constructor(message: String?) : CordaRuntimeException(message) {
constructor(e: MissingSerializerException): this(e.message)
@ -3,15 +3,20 @@ package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.flows.ResultSerializationException
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.junit.Before
import org.junit.Test
import java.util.*
import rx.Observable
import java.util.UUID
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotEquals
import kotlin.test.assertTrue
@ -55,9 +60,22 @@ class FlowWithClientIdTest {
assertEquals(flowHandle0.clientId, flowHandle1.clientId)
assertEquals(2, counter) // this asserts that 2 different flows were spawned indeed
fun `on flow unserializable result a 'CordaRuntimeException' is thrown containing in its message the unserializable type`() {
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode().getOrThrow()
val e = assertFailsWith<ResultSerializationException> {
nodeA.rpc.startFlowWithClientId(clientId, ::UnserializableResultFlow).returnValue.getOrThrow(20.seconds)
val errorMessage = e.message
assertTrue(errorMessage!!.contains("Unable to create an object serializer for type class ${UnserializableResultFlow.UNSERIALIZABLE_OBJECT::class.java.name}"))
@ -75,3 +93,14 @@ internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
internal class UnserializableResultFlow: FlowLogic<OpenFuture<Observable<Unit>>>() {
companion object {
val UNSERIALIZABLE_OBJECT = openFuture<Observable<Unit>>().also { it.set(Observable.empty<Unit>())}
override fun call(): OpenFuture<Observable<Unit>> {
@ -4,6 +4,7 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.CheckpointState
import net.corda.node.services.statemachine.FlowResultMetadata
import net.corda.node.services.statemachine.FlowState
import java.util.stream.Stream
@ -66,5 +67,13 @@ interface CheckpointStorage {
fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>>
* Load a flow result from the store. If [throwIfMissing] is true then it throws an [IllegalStateException]
* if the flow result is missing in the database.
fun getFlowResult(id: StateMachineRunId, throwIfMissing: Boolean = false): Any?
fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus)
@ -6,8 +6,11 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.uncheckedCast
import net.corda.core.flows.ResultSerializationException
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.MissingSerializerException
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.CheckpointStorage
@ -15,6 +18,7 @@ import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.Checkpoint.FlowStatus
import net.corda.node.services.statemachine.CheckpointState
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.FlowResultMetadata
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.SubFlowVersion
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
@ -59,6 +63,24 @@ class DBCheckpointStorage(
// This is a dummy [DBFlowMetadata] object which help us whenever we want to persist a [DBFlowCheckpoint], but not persist its [DBFlowMetadata].
// [DBFlowCheckpoint] needs to always reference a [DBFlowMetadata] ([DBFlowCheckpoint.flowMetadata] is not nullable).
// However, since we do not -hibernate- cascade, it does not get persisted into the database.
private val dummyDBFlowMetadata: DBFlowMetadata = DBFlowMetadata(
flowId = "dummyFlowId",
invocationId = "dummyInvocationId",
flowName = "dummyFlowName",
userSuppliedIdentifier = "dummyUserSuppliedIdentifier",
startType = StartReason.INITIATED,
initialParameters = ByteArray(0),
launchingCordapp = "dummyLaunchingCordapp",
platformVersion = -1,
startedBy = "dummyStartedBy",
invocationInstant = Instant.now(),
startInstant = Instant.now(),
finishInstant = null
* This needs to run before Hibernate is initialised.
@ -185,28 +207,31 @@ class DBCheckpointStorage(
var flow_id: String,
@Type(type = "corda-blob")
@Column(name = "result_value", nullable = false)
var value: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "result_value", nullable = true)
var value: ByteArray? = null,
@Column(name = "timestamp")
val persistedInstant: Instant
) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as DBFlowResult
if (flow_id != other.flow_id) return false
if (!value.contentEquals(other.value)) return false
val value = value
val otherValue = other.value
if (value != null) {
if (otherValue == null) return false
if (!value.contentEquals(otherValue)) return false
} else if (otherValue != null) return false
if (persistedInstant != other.persistedInstant) return false
return true
override fun hashCode(): Int {
var result = flow_id.hashCode()
result = 31 * result + value.contentHashCode()
result = 31 * result + (value?.contentHashCode() ?: 0)
result = 31 * result + persistedInstant.hashCode()
return result
@ -299,7 +324,7 @@ class DBCheckpointStorage(
@Column(name = "invocation_time", nullable = false)
var invocationInstant: Instant,
@Column(name = "start_time", nullable = true)
@Column(name = "start_time", nullable = false)
var startInstant: Instant,
@Column(name = "finish_time", nullable = true)
@ -363,7 +388,7 @@ class DBCheckpointStorage(
val metadata = createDBFlowMetadata(flowId, checkpoint)
val metadata = createDBFlowMetadata(flowId, checkpoint, now)
// Most fields are null as they cannot have been set when creating the initial checkpoint
val dbFlowCheckpoint = DBFlowCheckpoint(
@ -384,8 +409,11 @@ class DBCheckpointStorage(
override fun updateCheckpoint(
id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>?,
id: StateMachineRunId,
checkpoint: Checkpoint,
serializedFlowState: SerializedBytes<FlowState>?,
serializedCheckpointState: SerializedBytes<CheckpointState>
) {
val now = clock.instant()
@ -404,18 +432,25 @@ class DBCheckpointStorage(
//This code needs to be added back in when we want to persist the result. For now this requires the result to be @CordaSerializable.
//val result = updateDBFlowResult(entity, checkpoint, now)
val dbFlowResult = if (checkpoint.status == FlowStatus.COMPLETED) {
try {
createDBFlowResult(flowId, checkpoint.result, now)
} catch (e: MissingSerializerException) {
throw ResultSerializationException(e)
} else {
val exceptionDetails = updateDBFlowException(flowId, checkpoint, now)
val metadata = createDBFlowMetadata(flowId, checkpoint)
// Updates to children entities ([DBFlowCheckpointBlob], [DBFlowResult], [DBFlowException], [DBFlowMetadata]) are not cascaded to children tables.
val dbFlowCheckpoint = DBFlowCheckpoint(
flowId = flowId,
blob = blob,
result = null,
result = dbFlowResult,
exceptionDetails = exceptionDetails,
flowMetadata = metadata,
flowMetadata = dummyDBFlowMetadata, // [DBFlowMetadata] will only update its 'finish_time' when a checkpoint finishes
status = checkpoint.status,
compatible = checkpoint.compatible,
progressStep = checkpoint.progressStep?.take(MAX_PROGRESS_STEP_LENGTH),
@ -425,9 +460,9 @@ class DBCheckpointStorage(
blob?.let { currentDBSession().update(it) }
dbFlowResult?.let { currentDBSession().save(it) }
if (checkpoint.isFinished()) {
metadata.finishInstant = now
setDBFlowMetadataFinishTime(flowId, now)
@ -446,11 +481,11 @@ class DBCheckpointStorage(
var deletedRows = 0
val flowId = id.uuid.toString()
deletedRows += deleteRow(DBFlowMetadata::class.java, DBFlowMetadata::flowId.name, flowId)
deletedRows += deleteRow(DBFlowResult::class.java, DBFlowResult::flow_id.name, flowId)
deletedRows += deleteRow(DBFlowCheckpointBlob::class.java, DBFlowCheckpointBlob::flowId.name, flowId)
deletedRows += deleteRow(DBFlowCheckpoint::class.java, DBFlowCheckpoint::flowId.name, flowId)
// resultId?.let { deletedRows += deleteRow(DBFlowResult::class.java, DBFlowResult::flow_id.name, it.toString()) }
// exceptionId?.let { deletedRows += deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, it.toString()) }
return deletedRows == 3
return deletedRows >= 2
private fun <T> deleteRow(clazz: Class<T>, pk: String, value: String): Int {
@ -488,6 +523,10 @@ class DBCheckpointStorage(
return currentDBSession().find(DBFlowCheckpoint::class.java, id.uuid.toString())
private fun getDBFlowResult(id: StateMachineRunId): DBFlowResult? {
return currentDBSession().find(DBFlowResult::class.java, id.uuid.toString())
override fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
val session = currentDBSession()
val jpqlQuery = """select new ${DBPausedFields::class.java.name}(checkpoint.id, blob.checkpoint, checkpoint.status,
@ -500,12 +539,34 @@ class DBCheckpointStorage(
// This method needs modification once CORDA-3681 is implemented to include FAILED flows as well
override fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>> {
val session = currentDBSession()
val jpqlQuery = """select new ${DBFlowResultMetadataFields::class.java.name}(checkpoint.id, checkpoint.status, metadata.userSuppliedIdentifier)
from ${DBFlowCheckpoint::class.java.name} checkpoint
join ${DBFlowMetadata::class.java.name} metadata on metadata.id = checkpoint.flowMetadata
where checkpoint.status = ${FlowStatus.COMPLETED.ordinal}""".trimIndent()
val query = session.createQuery(jpqlQuery, DBFlowResultMetadataFields::class.java)
return query.resultList.stream().map {
StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId)
override fun getFlowResult(id: StateMachineRunId, throwIfMissing: Boolean): Any? {
val dbFlowResult = getDBFlowResult(id)
if (throwIfMissing && dbFlowResult == null) {
throw IllegalStateException("Flow's $id result was not found in the database. Something is very wrong.")
val serializedFlowResult = dbFlowResult?.value?.let { SerializedBytes<Any>(it) }
return serializedFlowResult?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
override fun updateStatus(runId: StateMachineRunId, flowStatus: FlowStatus) {
val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set status = ${flowStatus.ordinal} where flow_id = '${runId.uuid}'"
private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint): DBFlowMetadata {
private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint, now: Instant): DBFlowMetadata {
val context = checkpoint.checkpointState.invocationContext
val flowInfo = checkpoint.checkpointState.subFlowStack.first()
return DBFlowMetadata(
@ -521,7 +582,7 @@ class DBCheckpointStorage(
platformVersion = PLATFORM_VERSION,
startedBy = context.principal().name,
invocationInstant = context.trace.invocationId.timestamp,
startInstant = clock.instant(),
startInstant = now,
finishInstant = null
@ -541,35 +602,10 @@ class DBCheckpointStorage(
* Creates, updates or deletes the result related to the current flow/checkpoint.
* This is needed because updates are not cascading via Hibernate, therefore operations must be handled manually.
* A [DBFlowResult] is created if [DBFlowCheckpoint.result] does not exist and the [Checkpoint] has a result..
* The existing [DBFlowResult] is updated if [DBFlowCheckpoint.result] exists and the [Checkpoint] has a result.
* The existing [DBFlowResult] is deleted if [DBFlowCheckpoint.result] exists and the [Checkpoint] has no result.
* Nothing happens if both [DBFlowCheckpoint] and [Checkpoint] do not have a result.
private fun updateDBFlowResult(flowId: String, entity: DBFlowCheckpoint, checkpoint: Checkpoint, now: Instant): DBFlowResult? {
val result = checkpoint.result?.let { createDBFlowResult(flowId, it, now) }
if (entity.result != null) {
if (result != null) {
result.flow_id = entity.result!!.flow_id
} else {
} else if (result != null) {
return result
private fun createDBFlowResult(flowId: String, result: Any, now: Instant): DBFlowResult {
private fun createDBFlowResult(flowId: String, result: Any?, now: Instant): DBFlowResult {
return DBFlowResult(
flow_id = flowId,
value = result.storageSerialize().bytes,
value = result?.storageSerialize()?.bytes,
persistedInstant = now
@ -618,6 +654,14 @@ class DBCheckpointStorage(
private fun setDBFlowMetadataFinishTime(flowId: String, now: Instant) {
val session = currentDBSession()
val sqlQuery = "Update ${NODE_DATABASE_PREFIX}flow_metadata set finish_time = '$now' " +
"where flow_id = '$flowId'"
val query = session.createNativeQuery(sqlQuery)
private fun InvocationContext.getStartedType(): StartReason {
return when (origin) {
is InvocationOrigin.RPC, is InvocationOrigin.Shell -> StartReason.RPC
@ -648,7 +692,7 @@ class DBCheckpointStorage(
// Always load as a [Clean] checkpoint to represent that the checkpoint is the last _good_ checkpoint
errorState = ErrorState.Clean,
// A checkpoint with a result should not normally be loaded (it should be [null] most of the time)
result = result?.let { SerializedBytes<Any>(it.value) },
result = result?.let { dbFlowResult -> dbFlowResult.value?.let { SerializedBytes<Any>(it) } },
status = status,
progressStep = progressStep,
flowIoRequest = ioRequestType,
@ -679,6 +723,12 @@ class DBCheckpointStorage(
private class DBFlowResultMetadataFields(
val id: String,
val status: FlowStatus,
val clientId: String?
private fun <T : Any> T.storageSerialize(): SerializedBytes<T> {
return serialize(context = SerializationDefaults.STORAGE_CONTEXT)
@ -5,6 +5,7 @@ import co.paralleluniverse.fibers.FiberExecutorScheduler
import co.paralleluniverse.fibers.instrument.JavaAgent
import com.codahale.metrics.Gauge
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.CordaRuntimeException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowException
@ -20,7 +21,6 @@ import net.corda.core.internal.castIfPossible
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.mapError
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.mapNotNull
import net.corda.core.internal.uncheckedCast
@ -127,6 +127,7 @@ internal class SingleThreadedStateMachineManager(
override val changes: Observable<StateMachineManager.Change> = innerState.changesPublisher
override fun start(tokenizableServices: List<Any>, startMode: StateMachineManager.StartMode): CordaFuture<Unit> {
val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
@ -174,8 +175,6 @@ internal class SingleThreadedStateMachineManager(
// at the moment we have RUNNABLE, HOSPITALIZED and PAUSED flows
// - RESULTED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3692
// - FAILED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3681
// - Incompatible checkpoints need to be handled upon implementing CORDA-3897
for (flow in fibers) {
flow.fiber.clientId?.let {
@ -191,6 +190,17 @@ internal class SingleThreadedStateMachineManager(
val finishedFlowsResults = checkpointStorage.getFinishedFlowsResultsMetadata().toList()
for ((id, finishedFlowResult) in finishedFlowsResults) {
finishedFlowResult.clientId?.let {
if (finishedFlowResult.status == Checkpoint.FlowStatus.COMPLETED) {
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, true)
} else {
// - FAILED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3681
} ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.")
return serviceHub.networkMapCache.nodeReady.map {
logger.info("Node ready, info: ${serviceHub.myInfo}")
@ -276,24 +286,24 @@ internal class SingleThreadedStateMachineManager(
val clientId = context.clientId
if (clientId != null) {
var existingFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>? = null
var existingStatus: FlowWithClientIdStatus? = null
innerState.withLock {
clientIdsToFlowIds.compute(clientId) { _, existingStatus ->
if (existingStatus != null) {
existingFuture = when (existingStatus) {
is FlowWithClientIdStatus.Active -> existingStatus.flowStateMachineFuture
// This below dummy future ('doneFuture(5)') will be populated from DB upon implementing CORDA-3692 and CORDA-3681 - for now just return a dummy future
is FlowWithClientIdStatus.Removed -> doneClientIdFuture(existingStatus.flowId, doneFuture(@Suppress("MagicNumber")5), clientId)
clientIdsToFlowIds.compute(clientId) { _, status ->
if (status != null) {
existingStatus = status
} else {
newFuture = openFuture()
if (existingFuture != null) return uncheckedCast(existingFuture)
// Flow -started with client id- already exists, return the existing's flow future and don't start a new flow.
existingStatus?.let {
val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
return@startFlow uncheckedCast(existingFuture)
@ -674,17 +684,9 @@ internal class SingleThreadedStateMachineManager(
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
val existingCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
existingCheckpoint?.let { serializedCheckpoint ->
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId)
if (checkpoint == null) {
return openFuture<FlowStateMachine<A>>().mapError {
"Unable to deserialize database checkpoint for flow $flowId. " +
"Something is very wrong. The flow will not retry."
} else {
tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: throw IllegalStateException(
"Unable to deserialize database checkpoint for flow $flowId. Something is very wrong. The flow will not retry."
} else {
// This is a brand new flow
@ -878,6 +880,24 @@ internal class SingleThreadedStateMachineManager(
private fun activeOrRemovedClientIdFuture(existingStatus: FlowWithClientIdStatus, clientId: String) = when (existingStatus) {
is FlowWithClientIdStatus.Active -> existingStatus.flowStateMachineFuture
is FlowWithClientIdStatus.Removed -> {
val flowId = existingStatus.flowId
val resultFuture = if (existingStatus.succeeded) {
val flowResult = database.transaction { checkpointStorage.getFlowResult(existingStatus.flowId, throwIfMissing = true) }
} else {
// this block will be implemented upon implementing CORDA-3681 - for now just return a dummy exception
val flowException = CordaRuntimeException("dummy")
openFuture<Any?>().apply { setException(flowException) }
doneClientIdFuture(flowId, resultFuture, clientId)
* The flow out of which a [doneFuture] will be produced should be a started flow,
* i.e. it should not exist in [mutex.content.startedFutures].
@ -388,3 +388,8 @@ sealed class FlowWithClientIdStatus {
data class Active(val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>) : FlowWithClientIdStatus()
data class Removed(val flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus()
data class FlowResultMetadata(
val status: Checkpoint.FlowStatus,
val clientId: String?
@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.ResultSerializationException
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
@ -73,22 +74,12 @@ class TransitionExecutorImpl(
log.info("Error while executing $action, with event $event, erroring state", exception)
// distinguish between a DatabaseTransactionException and an actual StateTransitionException
val stateTransitionOrDatabaseTransactionException =
if (exception is DatabaseTransactionException) {
// if the exception is a DatabaseTransactionException then it is not really a StateTransitionException
// it is actually an exception that previously broke a DatabaseTransaction and was suppressed by user code
// it was rethrown on [DatabaseTransaction.commit]. Unwrap the original exception and pass it to flow hospital
} else {
// Wrap the exception with [StateTransitionException] for handling by the flow hospital
StateTransitionException(action, event, exception)
val flowError = createError(exception, action, event)
val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException))
isFlowResumed = false
@ -121,4 +112,23 @@ class TransitionExecutorImpl(
private fun createError(e: Exception, action: Action, event: Event): FlowError {
// distinguish between a DatabaseTransactionException and an actual StateTransitionException
val stateTransitionOrOtherException: Throwable =
if (e is DatabaseTransactionException) {
// if the exception is a DatabaseTransactionException then it is not really a StateTransitionException
// it is actually an exception that previously broke a DatabaseTransaction and was suppressed by user code
// it was rethrown on [DatabaseTransaction.commit]. Unwrap the original exception and pass it to flow hospital
} else if (e is ResultSerializationException) {
// We must not wrap a [ResultSerializationException] with a [StateTransitionException],
// because we will propagate the exception to rpc clients and [StateTransitionException] cannot be propagated to rpc clients.
} else {
// Wrap the exception with [StateTransitionException] for handling by the flow hospital
StateTransitionException(action, event, e)
return FlowError(secureRandom.nextLong(), stateTransitionOrOtherException)
@ -240,10 +240,22 @@ class TopLevelTransition(
isFlowResumed = false,
isRemoved = true
val allSourceSessionIds = checkpoint.checkpointState.sessions.keys
if (currentState.isAnyCheckpointPersisted) {
if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
} else {
isCheckpointUpdate = currentState.isAnyCheckpointPersisted
val allSourceSessionIds = currentState.checkpoint.checkpointState.sessions.keys
@ -12,12 +12,15 @@
<addPrimaryKey columnNames="flow_id" constraintName="node_checkpoints_pk" tableName="node_checkpoints"/>
<!-- TODO: add indexes for the rest of the tables as well (Results + Exceptions) -->
<!-- TODO: add indexes for Exceptions table as well -->
<!-- TODO: the following only add indexes so maybe also align name of file? -->
<changeSet author="R3.Corda" id="add_new_checkpoint_schema_indexes">
<createIndex indexName="node_checkpoint_blobs_idx" tableName="node_checkpoint_blobs" clustered="false" unique="true">
<column name="flow_id"/>
<createIndex indexName="node_flow_results_idx" tableName="node_flow_results" clustered="false" unique="true">
<column name="flow_id"/>
<createIndex indexName="node_flow_metadata_idx" tableName="node_flow_metadata" clustered="false" unique="true">
<column name="flow_id"/>
@ -49,14 +49,13 @@
<changeSet author="R3.Corda" id="add_new_flow_result_table-postgres" dbms="postgresql">
<createTable tableName="node_flow_results">
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
<column name="result_value" type="varbinary(33554432)">
<constraints nullable="false"/>
<constraints nullable="true"/>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
@ -49,14 +49,13 @@
<changeSet author="R3.Corda" id="add_new_flow_result_table" dbms="!postgresql">
<createTable tableName="node_flow_results">
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
<column name="result_value" type="blob">
<constraints nullable="false"/>
<constraints nullable="true"/>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
@ -276,14 +276,13 @@ class DBCheckpointStorageTests {
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
val updatedCheckpoint = checkpoint.copy(result = "The result")
val updatedCheckpoint = checkpoint.copy(result = "The result", status = Checkpoint.FlowStatus.COMPLETED)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState()) }
database.transaction {
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
// The result not stored yet
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
@ -457,7 +456,6 @@ class DBCheckpointStorageTests {
@Test(timeout = 300_000)
fun `update checkpoint with result information creates new result database record`() {
val result = "This is the result"
val (id, checkpoint) = newCheckpoint()
@ -466,7 +464,7 @@ class DBCheckpointStorageTests {
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
val updatedCheckpoint = checkpoint.copy(result = result)
val updatedCheckpoint = checkpoint.copy(result = result, status = Checkpoint.FlowStatus.COMPLETED)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState())
@ -481,63 +479,6 @@ class DBCheckpointStorageTests {
@Test(timeout = 300_000)
fun `update checkpoint with result information updates existing result database record`() {
val result = "This is the result"
val somehowThereIsANewResult = "Another result (which should not be possible!)"
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
val updatedCheckpoint = checkpoint.copy(result = result)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState())
val updatedCheckpoint2 = checkpoint.copy(result = somehowThereIsANewResult)
val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2, updatedCheckpoint2.serializeCheckpointState())
database.transaction {
assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
@Test(timeout = 300_000)
fun `removing result information from checkpoint deletes existing result database record`() {
val result = "This is the result"
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
val updatedCheckpoint = checkpoint.copy(result = result)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState())
val updatedCheckpoint2 = checkpoint.copy(result = null)
val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState()
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2, updatedCheckpoint2.serializeCheckpointState())
database.transaction {
assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
@Test(timeout = 300_000)
fun `update checkpoint with error information creates a new error database record`() {
@ -890,6 +831,41 @@ class DBCheckpointStorageTests {
// This test needs modification once CORDA-3681 is implemented to include FAILED flows as well
@Test(timeout = 300_000)
fun `'getFinishedFlowsResultsMetadata' fetches flows results metadata for finished flows only`() {
val (_, checkpoint) = newCheckpoint(1)
val runnable = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
val hospitalized = changeStatus(checkpoint, Checkpoint.FlowStatus.HOSPITALIZED)
val completed = changeStatus(checkpoint, Checkpoint.FlowStatus.COMPLETED)
val failed = changeStatus(checkpoint, Checkpoint.FlowStatus.FAILED)
val killed = changeStatus(checkpoint, Checkpoint.FlowStatus.KILLED)
val paused = changeStatus(checkpoint, Checkpoint.FlowStatus.PAUSED)
database.transaction {
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.addCheckpoint(runnable.id, runnable.checkpoint, serializedFlowState, runnable.checkpoint.serializeCheckpointState())
checkpointStorage.addCheckpoint(hospitalized.id, hospitalized.checkpoint, serializedFlowState, hospitalized.checkpoint.serializeCheckpointState())
checkpointStorage.addCheckpoint(completed.id, completed.checkpoint, serializedFlowState, completed.checkpoint.serializeCheckpointState())
checkpointStorage.addCheckpoint(failed.id, failed.checkpoint, serializedFlowState, failed.checkpoint.serializeCheckpointState())
checkpointStorage.addCheckpoint(killed.id, killed.checkpoint, serializedFlowState, killed.checkpoint.serializeCheckpointState())
checkpointStorage.addCheckpoint(paused.id, paused.checkpoint, serializedFlowState, paused.checkpoint.serializeCheckpointState())
val checkpointsInDb = database.transaction {
val resultsMetadata = database.transaction {
assertEquals(6, checkpointsInDb)
assertEquals(Checkpoint.FlowStatus.COMPLETED, resultsMetadata.single().second.status)
data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint)
private fun changeStatus(oldCheckpoint: Checkpoint, status: Checkpoint.FlowStatus): IdAndCheckpoint {
@ -1,11 +1,13 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.concurrent.Semaphore
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic
import net.corda.core.internal.FlowIORequest
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
@ -19,6 +21,7 @@ import org.junit.Assert
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import rx.Observable
import java.lang.IllegalStateException
import java.sql.SQLTransientConnectionException
import java.util.UUID
@ -26,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNull
import kotlin.test.assertTrue
class FlowClientIdTests {
@ -49,6 +53,7 @@ class FlowClientIdTests {
ResultFlow.hook = null
ResultFlow.suspendableHook = null
UnSerializableResultFlow.firstRun = true
SingleThreadedStateMachineManager.beforeClientIDCheck = null
SingleThreadedStateMachineManager.onClientIDNotFound = null
SingleThreadedStateMachineManager.onCallingStartFlowInternal = null
@ -65,6 +70,16 @@ class FlowClientIdTests {
Assert.assertEquals(1, counter)
fun `flow's result gets persisted if the flow is started with a client id`() {
val clientId = UUID.randomUUID().toString()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)).resultFuture.getOrThrow()
aliceNode.database.transaction {
assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
fun `flow's result is retrievable after flow's lifetime, when flow is started with a client id - different parameters are ignored`() {
val clientId = UUID.randomUUID().toString()
@ -83,6 +98,41 @@ class FlowClientIdTests {
Assert.assertEquals(result0, result1)
fun `if flow's result is not found in the database an IllegalStateException is thrown`() {
val clientId = UUID.randomUUID().toString()
val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
val flowId0 = handle0.id
// manually remove the checkpoint (including DBFlowResult) from the database
aliceNode.database.transaction {
assertFailsWith<IllegalStateException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
fun `flow returning null gets retrieved after flow's lifetime when started with client id`() {
val clientId = UUID.randomUUID().toString()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null)).resultFuture.getOrThrow()
val flowResult = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null)).resultFuture.getOrThrow()
fun `flow returning Unit gets retrieved after flow's lifetime when started with client id`() {
val clientId = UUID.randomUUID().toString()
aliceNode.services.startFlowWithClientId(clientId, ResultFlow(Unit)).resultFuture.getOrThrow()
val flowResult = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(Unit)).resultFuture.getOrThrow()
assertEquals(Unit, flowResult)
fun `flow's result is available if reconnect after flow had retried from previous checkpoint, when flow is started with a client id`() {
var firstRun = true
@ -240,42 +290,31 @@ class FlowClientIdTests {
Assert.assertEquals(10, resultsCounter.get())
fun `on node start -running- flows with client id are hook-able`() {
val clientId = UUID.randomUUID().toString()
var noSecondFlowWasSpawned = 0
var firstRun = true
var firstFiber: Fiber<out Any?>? = null
val flowIsRunning = Semaphore(0)
val waitUntilFlowIsRunning = Semaphore(0)
ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
override fun call() {
if (firstRun) {
firstFiber = Fiber.currentFiber()
firstRun = false
try {
flowIsRunning.acquire() // make flow wait here to impersonate a running flow
} catch (e: InterruptedException) {
throw e
if (firstRun) {
firstRun = false
// high sleeping time doesn't matter because the fiber will get an [Event.SoftShutdown] on node restart, which will wake up the fiber
sleep(100.seconds, maySkipCheckpoint = true)
flowIsRunning.acquire() // make flow wait here to impersonate a running flow
val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
aliceNode.internals.acceptableLiveFiberCountOnStop = 1
val aliceNode = mockNet.restartNode(aliceNode)
// Blow up the first fiber running our flow as it is leaked here, on normal node shutdown that fiber should be gone
// Re-hook a running flow
@ -285,7 +324,6 @@ class FlowClientIdTests {
Assert.assertEquals(flowHandle0.id, flowHandle1.id)
Assert.assertEquals(clientId, flowHandle1.clientId)
Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds))
Assert.assertEquals(1, noSecondFlowWasSpawned)
// @Test(timeout=300_000)
@ -340,6 +378,28 @@ class FlowClientIdTests {
// Assert.assertEquals(1, noSecondFlowWasSpawned)
// }
fun `on node start -completed- flows with client id are hook-able`() {
val clientId = UUID.randomUUID().toString()
var counter = 0
ResultFlow.hook = {
val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
val aliceNode = mockNet.restartNode(aliceNode)
// Re-hook a completed flow
val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
val result1 = flowHandle1.resultFuture.getOrThrow(20.seconds)
Assert.assertEquals(1, counter) // assert flow has run only once
Assert.assertEquals(flowHandle0.id, flowHandle1.id)
Assert.assertEquals(clientId, flowHandle1.clientId)
Assert.assertEquals(5, result1)
fun `On 'startFlowInternal' throwing, subsequent request with same client id does not get de-duplicated and starts a new flow`() {
val clientId = UUID.randomUUID().toString()
@ -400,6 +460,43 @@ class FlowClientIdTests {
// assertEquals(0, counter)
// }
// This test needs modification once CORDA-3681 is implemented to check that 'node_flow_exceptions' gets a row
fun `if flow fails to serialize its result then the result gets converted to an exception result`() {
val clientId = UUID.randomUUID().toString()
assertFailsWith<CordaRuntimeException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow<Observable<Unit>>(Observable.empty())).resultFuture.getOrThrow()
// flow has failed to serialize its result => table 'node_flow_results' should be empty, 'node_flow_exceptions' should get one row instead
aliceNode.services.database.transaction {
val checkpointStatus = findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().single().status
assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatus)
assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
// uncomment the below line once CORDA-3681 is implemented
//assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
assertFailsWith<CordaRuntimeException> {
aliceNode.services.startFlowWithClientId(clientId, ResultFlow<Observable<Unit>>(Observable.empty())).resultFuture.getOrThrow()
* The below test does not follow a valid path. Normally it should error and propagate.
* However, we want to assert that a flow that fails to serialize its result its retriable.
fun `flow failing to serialize its result gets retried and succeeds if returning a different result`() {
val clientId = UUID.randomUUID().toString()
// before the hospital schedules a [Event.Error] we manually schedule a [Event.RetryFlowFromSafePoint]
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
val result = aliceNode.services.startFlowWithClientId(clientId, UnSerializableResultFlow()).resultFuture.getOrThrow()
assertEquals(5, result)
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
@ -414,4 +511,21 @@ internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
suspendableHook?.let { subFlow(it) }
return result
internal class UnSerializableResultFlow: FlowLogic<Any>() {
companion object {
var firstRun = true
override fun call(): Any {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
return if (firstRun) {
firstRun = false
} else {
5 // serializable result
@ -62,6 +62,7 @@ import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.getMessage
import net.corda.testing.node.internal.startFlow
import net.corda.testing.node.internal.startFlowWithClientId
import org.apache.commons.lang3.exception.ExceptionUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
@ -81,7 +82,7 @@ import java.sql.SQLTransientConnectionException
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.ArrayList
import java.util.UUID
import java.util.concurrent.TimeoutException
import java.util.function.Predicate
import kotlin.reflect.KClass
@ -372,12 +373,11 @@ class FlowFrameworkTests {
// Ignoring test since completed flows are not currently keeping their checkpoints in the database
@Test(timeout = 300_000)
fun `Flow metadata finish time is set in database when the flow finishes`() {
val terminationSignal = Semaphore(0)
val flow = aliceNode.services.startFlow(NoOpFlow(terminateUponSignal = terminationSignal))
val clientId = UUID.randomUUID().toString()
val flow = aliceNode.services.startFlowWithClientId(clientId, NoOpFlow(terminateUponSignal = terminationSignal))
aliceNode.database.transaction {
val metadata = session.find(DBCheckpointStorage.DBFlowMetadata::class.java, flow.id.uuid.toString())
@ -832,12 +832,6 @@ class FlowFrameworkTests {
assertEquals(null, persistedException)
private inline fun <reified T> DatabaseTransaction.findRecordsFromDatabase(): List<T> {
val criteria = session.criteriaBuilder.createQuery(T::class.java)
return session.createQuery(criteria).resultList
//region Helpers
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
@ -1022,6 +1016,12 @@ internal fun TestStartedNode.sendSessionMessage(message: SessionMessage, destina
inline fun <reified T> DatabaseTransaction.findRecordsFromDatabase(): List<T> {
val criteria = session.criteriaBuilder.createQuery(T::class.java)
return session.createQuery(criteria).resultList
internal fun errorMessage(errorResponse: FlowException? = null) =
ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))
