Merge branch 'feature/checkpoint_table_improvements' into dan/merge-4.5-into-feature-branch

# Conflicts:
#	node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
This commit is contained in:
LankyDan 2020-03-13 14:29:18 +00:00
commit 8941cf7b8f
17 changed files with 246 additions and 85 deletions

View File

@ -17,7 +17,7 @@ import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.ListenProcessDeathException
import net.corda.testing.node.internal.assertCheckpoints
import net.corda.testing.node.internal.assertUncompletedCheckpoints
import net.corda.testing.node.internal.enclosedCordapp
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
@ -75,7 +75,7 @@ class FlowCheckpointVersionNodeStartupCheckTest {
}
private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) {
assertCheckpoints(BOB_NAME, 1)
assertUncompletedCheckpoints(BOB_NAME, 1)
assertFailsWith(ListenProcessDeathException::class) {
startNode(NodeParameters(

View File

@ -110,9 +110,10 @@ abstract class StatemachineErrorHandlingTest {
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
class GetNumberOfUncompletedCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps ->
val sqlStatement = "select count(*) from node_checkpoints where status not in (${Checkpoint.FlowStatus.COMPLETED.ordinal})"
return serviceHub.jdbcSession().prepareStatement(sqlStatement).use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)

View File

@ -89,9 +89,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -160,9 +160,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -252,9 +252,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -349,9 +349,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for CashIssueAndPaymentFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}

View File

@ -94,7 +94,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -172,7 +172,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -252,7 +252,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -337,7 +337,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -426,7 +426,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -527,7 +527,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -616,7 +616,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -714,7 +714,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -812,7 +812,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -898,7 +898,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -990,7 +990,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for errored flow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -1079,9 +1079,9 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -1176,11 +1176,11 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for the flow that is waiting for the errored counterparty flow to finish and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
// the checkpoint is not persisted since it kept failing the original checkpoint commit
// the flow will recover since artemis will keep the events and replay them on node restart
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -1273,9 +1273,9 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}

View File

@ -99,7 +99,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -186,7 +186,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -278,7 +278,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}

View File

@ -128,7 +128,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -230,7 +230,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -324,7 +324,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -426,7 +426,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}

View File

@ -4,19 +4,16 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.CordaRuntimeException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.executeAsync
import net.corda.core.messaging.startFlow
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.FlowTimeoutException
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
@ -146,7 +143,7 @@ class FlowRetryTest {
}
assertEquals(3, TransientConnectionFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, it.proxy.startFlow(::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}
@ -165,7 +162,7 @@ class FlowRetryTest {
}
assertEquals(3, WrappedTransientConnectionFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, it.proxy.startFlow(::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}
@ -184,7 +181,7 @@ class FlowRetryTest {
}
assertEquals(0, GeneralExternalFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(1, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, it.proxy.startFlow(::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}
@ -461,9 +458,10 @@ class GeneralExternalFailureResponder(private val session: FlowSession) : FlowLo
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
class GetNumberOfUncompletedCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps ->
val sqlStatement = "select count(*) from node_checkpoints where status not in (${Checkpoint.FlowStatus.COMPLETED.ordinal})"
return serviceHub.jdbcSession().prepareStatement(sqlStatement).use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)

View File

@ -11,6 +11,7 @@ import net.corda.core.node.services.CordaService
import net.corda.core.node.services.vault.SessionScope
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.Checkpoint.FlowStatus
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.testing.driver.DriverParameters
@ -37,7 +38,9 @@ class CordaPersistenceServiceTests {
assertEquals(sampleSize, count)
DriverManager.getConnection("jdbc:h2:tcp://localhost:$port/node", "sa", "").use {
val resultSet = it.createStatement().executeQuery("SELECT count(*) from ${NODE_DATABASE_PREFIX}checkpoints")
val resultSet = it.createStatement().executeQuery(
"SELECT count(*) from ${NODE_DATABASE_PREFIX}checkpoints where status not in (${FlowStatus.COMPLETED.ordinal})"
)
assertTrue(resultSet.next())
val resultSize = resultSet.getInt(1)
assertEquals(sampleSize, resultSize)

View File

@ -2,6 +2,7 @@ package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
@ -328,12 +329,14 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now)
val result = updateDBFlowResult(entity, checkpoint, now)
//This code needs to be added back in when we want to persist the result. For now this requires the result to be @CordaSerializable.
//val result = updateDBFlowResult(entity, checkpoint, now)
val exceptionDetails = updateDBFlowException(entity, checkpoint, now)
return entity.apply {
this.blob = blob
this.result = result
//Set the result to null for now.
this.result = null
this.exceptionDetails = exceptionDetails
// Do not update the meta data relationship on updates
this.flowMetadata = entity.flowMetadata

View File

@ -609,7 +609,7 @@ class SingleThreadedStateMachineManager(
// This is a brand new flow
null
}
val checkpoint = existingCheckpoint ?: Checkpoint.create(
val checkpoint = existingCheckpoint?.copy(status = Checkpoint.FlowStatus.RUNNABLE) ?: Checkpoint.create(
invocationContext,
flowStart,
flowLogic.javaClass,
@ -779,7 +779,7 @@ class SingleThreadedStateMachineManager(
isStartIdempotent: Boolean,
initialDeduplicationHandler: DeduplicationHandler?
): Flow? {
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return null
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.copy(status = Checkpoint.FlowStatus.RUNNABLE) ?: return null
val flowState = checkpoint.flowState
val resultFuture = openFuture<Any?>()
val fiber = when (flowState) {
@ -805,12 +805,7 @@ class SingleThreadedStateMachineManager(
is FlowState.Started -> {
val fiber = tryCheckpointDeserialize(flowState.frozenFiber, id) ?: return null
val state = StateMachineState(
// Do a trivial checkpoint copy below, to update the Checkpoint#timestamp value.
// The Checkpoint#timestamp is being used by FlowMonitor as the starting time point of a potential suspension.
// We need to refresh the Checkpoint#timestamp here, in case of an e.g. node start up after a long period.
// If not then, there is a time window (until the next checkpoint update) in which the FlowMonitor
// could log this flow as a waiting flow, from the last checkpoint update i.e. before the node's start up.
checkpoint = checkpoint.copy(),
checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isTransactionTracked = false,

View File

@ -75,7 +75,12 @@ data class Checkpoint(
PAUSED
}
val timestamp: Instant = Instant.now() // This will get updated every time a Checkpoint object is created/ created by copy.
/**
* [timestamp] will get updated every time a [Checkpoint] object is created/ created by copy.
* It will be updated, therefore, for example when a flow is being suspended or whenever a flow
* is being loaded from [Checkpoint] through [Serialized.deserialize].
*/
val timestamp: Instant = Instant.now()
companion object {

View File

@ -199,16 +199,17 @@ class TopLevelTransition(
checkpoint = checkpoint.copy(
checkpointState = checkpoint.checkpointState.copy(
numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1
)),
),
result = event.returnValue,
status = Checkpoint.FlowStatus.COMPLETED
),
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isRemoved = true
)
val allSourceSessionIds = checkpoint.checkpointState.sessions.keys
if (currentState.isAnyCheckpointPersisted) {
actions.add(Action.RemoveCheckpoint(context.id))
}
actions.addAll(arrayOf(
Action.PersistCheckpoint(context.id, currentState.checkpoint, currentState.isAnyCheckpointPersisted),
Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(event.softLocksId),
Action.CommitTransaction,
@ -289,4 +290,4 @@ class TopLevelTransition(
FlowContinuation.Abort
}
}
}
}

View File

@ -33,9 +33,10 @@ import net.corda.finance.contracts.asset.CASH
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.TwoPartyTradeFlow.Buyer
import net.corda.finance.flows.TwoPartyTradeFlow.Seller
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.statemachine.Checkpoint
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.core.*
import net.corda.testing.dsl.LedgerDSL
@ -56,10 +57,17 @@ import java.io.ByteArrayOutputStream
import java.util.*
import java.util.jar.JarOutputStream
import java.util.zip.ZipEntry
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
internal fun CheckpointStorage.getAllIncompleteCheckpoints(): List<Checkpoint.Serialized> {
return getAllCheckpoints().use {
it.map { it.second }.toList()
}.filter { it.status != Checkpoint.FlowStatus.COMPLETED }
}
/**
* In this example, Alice wishes to sell her commercial paper to Bob in return for $1,000,000 and they wish to do
* it on the ledger atomically. Therefore they must work together to build a transaction.
@ -135,11 +143,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
bobNode.dispose()
aliceNode.database.transaction {
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(aliceNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
aliceNode.internals.manuallyCloseDB()
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
bobNode.internals.manuallyCloseDB()
}
@ -191,11 +199,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
bobNode.dispose()
aliceNode.database.transaction {
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(aliceNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
aliceNode.internals.manuallyCloseDB()
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
bobNode.internals.manuallyCloseDB()
}
@ -245,7 +253,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).hasSize(1)
assertThat(bobNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).hasSize(1)
}
val storage = bobNode.services.validatedTransactions
@ -278,10 +286,10 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
assertThat(bobNode.smm.findStateMachines(Buyer::class.java)).isEmpty()
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
aliceNode.database.transaction {
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(aliceNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
bobNode.database.transaction {

View File

@ -23,6 +23,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.LogHelper
@ -35,6 +36,7 @@ import org.junit.After
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import java.time.Instant
@ -270,6 +272,7 @@ class DBCheckpointStorageTests {
}
@Test(timeout = 300_000)
@Ignore
fun `update checkpoint with result information creates new result database record`() {
val result = "This is the result"
val (id, checkpoint) = newCheckpoint()
@ -297,6 +300,7 @@ class DBCheckpointStorageTests {
}
@Test(timeout = 300_000)
@Ignore
fun `update checkpoint with result information updates existing result database record`() {
val result = "This is the result"
val somehowThereIsANewResult = "Another result (which should not be possible!)"

View File

@ -9,9 +9,18 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.ContractState
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.flows.Destination
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.messaging.MessageRecipients
@ -19,6 +28,7 @@ import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.queryBy
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.toFuture
import net.corda.core.transactions.SignedTransaction
@ -26,10 +36,14 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.ProgressTracker.Change
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.services.persistence.CheckpointPerformanceRecorder
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyState
import net.corda.testing.core.ALICE_NAME
@ -40,23 +54,34 @@ import net.corda.testing.flows.registerCordappFlowFactory
import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.internal.*
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.getMessage
import net.corda.testing.node.internal.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
import org.assertj.core.api.Condition
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotEquals
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Test
import rx.Notification
import rx.Observable
import java.sql.SQLException
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.function.Predicate
import kotlin.reflect.KClass
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class FlowFrameworkTests {
companion object {
@ -73,15 +98,14 @@ class FlowFrameworkTests {
private lateinit var notaryIdentity: Party
private val receivedSessionMessages = ArrayList<SessionTransfer>()
private val dbCheckpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
private val dbCheckpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
@Before
fun setUpMockNet() {
@ -312,6 +336,26 @@ class FlowFrameworkTests {
}, "FlowException's private peer field has value set"))
}
//We should update this test when we do the work to persists the flow result.
@Test(timeout = 300_000)
fun `Flow status is set to completed in database when the flow finishes`() {
val terminationSignal = Semaphore(0)
val flow = aliceNode.services.startFlow(NoOpFlow( terminateUponSignal = terminationSignal))
mockNet.waitQuiescent() // current thread needs to wait fiber running on a different thread, has reached the blocking point
aliceNode.database.transaction {
val checkpoint = dbCheckpointStorage.getCheckpoint(flow.id)
assertNull(checkpoint!!.result)
assertNotEquals(Checkpoint.FlowStatus.COMPLETED, checkpoint.status)
}
terminationSignal.release()
mockNet.waitQuiescent()
aliceNode.database.transaction {
val checkpoint = dbCheckpointStorage.getCheckpoint(flow.id)
assertNull(checkpoint!!.result)
assertEquals(Checkpoint.FlowStatus.COMPLETED, checkpoint.status)
}
}
private class ConditionalExceptionFlow(val otherPartySession: FlowSession, val sendPayload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
@ -573,6 +617,88 @@ class FlowFrameworkTests {
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
}
@Test(timeout=300_000)
fun `Checkpoint status changes to RUNNABLE when flow is loaded from checkpoint - FlowState Unstarted`() {
var firstExecution = true
var checkpointStatusInDBBeforeSuspension: Checkpoint.FlowStatus? = null
var checkpointStatusInDBAfterSuspension: Checkpoint.FlowStatus? = null
var checkpointStatusInMemoryBeforeSuspension: Checkpoint.FlowStatus? = null
SuspendingFlow.hookBeforeCheckpoint = {
val flowFiber = this as? FlowStateMachineImpl<*>
assertTrue(flowFiber!!.transientState!!.value.checkpoint.flowState is FlowState.Unstarted)
if (firstExecution) {
// the following manual persisting Checkpoint.status to FAILED should be removed when implementing CORDA-3604.
manuallyFailCheckpointInDB(aliceNode)
firstExecution = false
throw SQLException("deadlock") // will cause flow to retry
} else {
// The persisted Checkpoint should be still failed here -> it should change to RUNNABLE after suspension
checkpointStatusInDBBeforeSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
checkpointStatusInMemoryBeforeSuspension = flowFiber.transientState!!.value.checkpoint.status
}
}
SuspendingFlow.hookAfterCheckpoint = {
checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
}
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatusInDBBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInMemoryBeforeSuspension)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInDBAfterSuspension)
SuspendingFlow.hookBeforeCheckpoint = {}
SuspendingFlow.hookAfterCheckpoint = {}
}
@Test(timeout=300_000)
fun `Checkpoint status changes to RUNNABLE when flow is loaded from checkpoint - FlowState Started`() {
var firstExecution = true
var checkpointStatusInDB: Checkpoint.FlowStatus? = null
var checkpointStatusInMemory: Checkpoint.FlowStatus? = null
SuspendingFlow.hookAfterCheckpoint = {
val flowFiber = this as? FlowStateMachineImpl<*>
assertTrue(flowFiber!!.transientState!!.value.checkpoint.flowState is FlowState.Started)
if (firstExecution) {
// the following manual persisting Checkpoint.status to FAILED should be removed when implementing CORDA-3604.
manuallyFailCheckpointInDB(aliceNode)
firstExecution = false
throw SQLException("deadlock") // will cause flow to retry
} else {
checkpointStatusInDB = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
checkpointStatusInMemory = flowFiber.transientState!!.value.checkpoint.status
}
}
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow()
assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatusInDB)
assertEquals(Checkpoint.FlowStatus.RUNNABLE, checkpointStatusInMemory)
SuspendingFlow.hookAfterCheckpoint = {}
}
// the following method should be removed when implementing CORDA-3604.
private fun manuallyFailCheckpointInDB(node: TestStartedNode) {
val idCheckpoint = node.internals.checkpointStorage.getAllCheckpoints().toList().single()
val checkpoint = idCheckpoint.second
val updatedCheckpoint = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
node.internals.checkpointStorage.updateCheckpoint(idCheckpoint.first,
updatedCheckpoint.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT),
updatedCheckpoint.serializedFlowState)
contextTransaction.commit()
contextTransaction.close()
contextTransactionOrNull = null
contextDatabase.newTransaction()
}
//region Helpers
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
@ -925,4 +1051,19 @@ internal class ExceptionFlow<E : Exception>(val exception: () -> E) : FlowLogic<
exceptionThrown = exception()
throw exceptionThrown
}
}
internal class SuspendingFlow : FlowLogic<Unit>() {
companion object {
var hookBeforeCheckpoint: FlowStateMachine<*>.() -> Unit = {}
var hookAfterCheckpoint: FlowStateMachine<*>.() -> Unit = {}
}
@Suspendable
override fun call() {
stateMachine.hookBeforeCheckpoint()
sleep(1.seconds) // flow checkpoints => checkpoint is in DB
stateMachine.hookAfterCheckpoint()
}
}

View File

@ -19,7 +19,7 @@ import net.corda.testing.driver.*
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.assertCheckpoints
import net.corda.testing.node.internal.assertUncompletedCheckpoints
import net.corda.testing.node.internal.poll
import net.corda.traderdemo.flow.CommercialPaperIssueFlow
import net.corda.traderdemo.flow.SellerFlow
@ -100,7 +100,7 @@ class TraderDemoTest {
val saleFuture = seller.rpc.startFlow(::SellerFlow, buyer.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue
buyer.rpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts
buyer.stop()
assertCheckpoints(DUMMY_BANK_A_NAME, 1)
assertUncompletedCheckpoints(DUMMY_BANK_A_NAME, 1)
val buyer2 = startNode(providedName = DUMMY_BANK_A_NAME, customOverrides = mapOf("p2pAddress" to buyer.p2pAddress.toString())).getOrThrow()
saleFuture.getOrThrow()
assertThat(buyer2.rpc.getCashBalance(USD)).isEqualTo(95.DOLLARS)

View File

@ -23,6 +23,7 @@ import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.Message
import net.corda.node.services.statemachine.Checkpoint
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.NodeHandle
import net.corda.testing.internal.chooseIdentity
@ -273,9 +274,10 @@ fun CordaRPCOps.waitForShutdown(): Observable<Unit> {
return completable
}
fun DriverDSL.assertCheckpoints(name: CordaX500Name, expected: Long) {
fun DriverDSL.assertUncompletedCheckpoints(name: CordaX500Name, expected: Long) {
val sqlStatement = "select count(*) from node_checkpoints where status not in (${Checkpoint.FlowStatus.COMPLETED.ordinal})"
DriverManager.getConnection("jdbc:h2:file:${baseDirectory(name) / "persistence"}", "sa", "").use { connection ->
connection.createStatement().executeQuery("select count(*) from NODE_CHECKPOINTS").use { rs ->
connection.createStatement().executeQuery(sqlStatement).use { rs ->
rs.next()
assertThat(rs.getLong(1)).isEqualTo(expected)
}