CORDA-3603 Save completed flow information (#6034)

When a flow is finished do not delete the checkpoint from the DB.

Instead, the FlowStatus is marked as Completed in the DB.

Updated numerous tests which relied on the flow being removed
when finished.
This commit is contained in:
williamvigorr3 2020-03-12 11:45:30 +00:00 committed by GitHub
parent 51db2c2d7f
commit 499b6cf17e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 132 additions and 78 deletions

View File

@ -17,7 +17,7 @@ import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.ListenProcessDeathException
import net.corda.testing.node.internal.assertCheckpoints
import net.corda.testing.node.internal.assertUncompletedCheckpoints
import net.corda.testing.node.internal.enclosedCordapp
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
@ -75,7 +75,7 @@ class FlowCheckpointVersionNodeStartupCheckTest {
}
private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) {
assertCheckpoints(BOB_NAME, 1)
assertUncompletedCheckpoints(BOB_NAME, 1)
assertFailsWith(ListenProcessDeathException::class) {
startNode(NodeParameters(

View File

@ -110,9 +110,10 @@ abstract class StatemachineErrorHandlingTest {
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
class GetNumberOfUncompletedCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps ->
val sqlStatement = "select count(*) from node_checkpoints where status not in (${Checkpoint.FlowStatus.COMPLETED.ordinal})"
return serviceHub.jdbcSession().prepareStatement(sqlStatement).use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)

View File

@ -89,9 +89,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -160,9 +160,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -252,9 +252,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -349,9 +349,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for CashIssueAndPaymentFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}

View File

@ -94,7 +94,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -172,7 +172,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -252,7 +252,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -337,7 +337,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -426,7 +426,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -527,7 +527,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -616,7 +616,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -714,7 +714,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -812,7 +812,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -898,7 +898,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -990,7 +990,7 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
// 1 for errored flow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -1079,9 +1079,9 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -1176,11 +1176,11 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for the flow that is waiting for the errored counterparty flow to finish and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
// the checkpoint is not persisted since it kept failing the original checkpoint commit
// the flow will recover since artemis will keep the events and replay them on node restart
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -1273,9 +1273,9 @@ class StatemachineGeneralErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}

View File

@ -99,7 +99,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -186,7 +186,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -278,7 +278,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}

View File

@ -128,7 +128,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -230,7 +230,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -324,7 +324,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@ -426,7 +426,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}

View File

@ -4,19 +4,16 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.core.CordaRuntimeException
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.FlowAsyncOperation
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.executeAsync
import net.corda.core.messaging.startFlow
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.FlowTimeoutException
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
@ -146,7 +143,7 @@ class FlowRetryTest {
}
assertEquals(3, TransientConnectionFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, it.proxy.startFlow(::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}
@ -165,7 +162,7 @@ class FlowRetryTest {
}
assertEquals(3, WrappedTransientConnectionFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(2, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(2, it.proxy.startFlow(::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}
@ -184,7 +181,7 @@ class FlowRetryTest {
}
assertEquals(0, GeneralExternalFailureFlow.retryCount)
// 1 for the errored flow kept for observation and another for GetNumberOfCheckpointsFlow
assertEquals(1, it.proxy.startFlow(::GetNumberOfCheckpointsFlow).returnValue.get())
assertEquals(1, it.proxy.startFlow(::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
}
@ -461,9 +458,10 @@ class GeneralExternalFailureResponder(private val session: FlowSession) : FlowLo
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
class GetNumberOfUncompletedCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps ->
val sqlStatement = "select count(*) from node_checkpoints where status not in (${Checkpoint.FlowStatus.COMPLETED.ordinal})"
return serviceHub.jdbcSession().prepareStatement(sqlStatement).use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)

View File

@ -11,6 +11,7 @@ import net.corda.core.node.services.CordaService
import net.corda.core.node.services.vault.SessionScope
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.Checkpoint.FlowStatus
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.testing.driver.DriverParameters
@ -37,7 +38,9 @@ class CordaPersistenceServiceTests {
assertEquals(sampleSize, count)
DriverManager.getConnection("jdbc:h2:tcp://localhost:$port/node", "sa", "").use {
val resultSet = it.createStatement().executeQuery("SELECT count(*) from ${NODE_DATABASE_PREFIX}checkpoints")
val resultSet = it.createStatement().executeQuery(
"SELECT count(*) from ${NODE_DATABASE_PREFIX}checkpoints where status not in (${FlowStatus.COMPLETED.ordinal})"
)
assertTrue(resultSet.next())
val resultSize = resultSet.getInt(1)
assertEquals(sampleSize, resultSize)

View File

@ -2,6 +2,7 @@ package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
@ -328,12 +329,14 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now)
val result = updateDBFlowResult(entity, checkpoint, now)
//This code needs to be added back in when we want to persist the result. For now this requires the result to be @CordaSerializable.
//val result = updateDBFlowResult(entity, checkpoint, now)
val exceptionDetails = updateDBFlowException(entity, checkpoint, now)
return entity.apply {
this.blob = blob
this.result = result
//Set the result to null for now.
this.result = null
this.exceptionDetails = exceptionDetails
// Do not update the meta data relationship on updates
this.flowMetadata = entity.flowMetadata

View File

@ -199,16 +199,17 @@ class TopLevelTransition(
checkpoint = checkpoint.copy(
checkpointState = checkpoint.checkpointState.copy(
numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1
)),
),
result = event.returnValue,
status = Checkpoint.FlowStatus.COMPLETED
),
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isRemoved = true
)
val allSourceSessionIds = checkpoint.checkpointState.sessions.keys
if (currentState.isAnyCheckpointPersisted) {
actions.add(Action.RemoveCheckpoint(context.id))
}
actions.addAll(arrayOf(
Action.PersistCheckpoint(context.id, currentState.checkpoint, currentState.isAnyCheckpointPersisted),
Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(event.softLocksId),
Action.CommitTransaction,
@ -289,4 +290,4 @@ class TopLevelTransition(
FlowContinuation.Abort
}
}
}
}

View File

@ -32,9 +32,10 @@ import net.corda.finance.contracts.asset.CASH
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.TwoPartyTradeFlow.Buyer
import net.corda.finance.flows.TwoPartyTradeFlow.Seller
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.WritableTransactionStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.node.services.statemachine.Checkpoint
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.core.*
import net.corda.testing.dsl.LedgerDSL
@ -56,10 +57,17 @@ import java.io.ByteArrayOutputStream
import java.util.*
import java.util.jar.JarOutputStream
import java.util.zip.ZipEntry
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
internal fun CheckpointStorage.getAllIncompleteCheckpoints(): List<Checkpoint.Serialized> {
return getAllCheckpoints().use {
it.map { it.second }.toList()
}.filter { it.status != Checkpoint.FlowStatus.COMPLETED }
}
/**
* In this example, Alice wishes to sell her commercial paper to Bob in return for $1,000,000 and they wish to do
* it on the ledger atomically. Therefore they must work together to build a transaction.
@ -135,11 +143,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
bobNode.dispose()
aliceNode.database.transaction {
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(aliceNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
aliceNode.internals.manuallyCloseDB()
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
bobNode.internals.manuallyCloseDB()
}
@ -191,11 +199,11 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
bobNode.dispose()
aliceNode.database.transaction {
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(aliceNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
aliceNode.internals.manuallyCloseDB()
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
bobNode.internals.manuallyCloseDB()
}
@ -245,7 +253,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
// OK, now Bob has sent the partial transaction back to Alice and is waiting for Alice's signature.
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).hasSize(1)
assertThat(bobNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).hasSize(1)
}
val storage = bobNode.services.validatedTransactions
@ -278,10 +286,10 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
assertThat(bobNode.smm.findStateMachines(Buyer::class.java)).isEmpty()
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(bobNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
aliceNode.database.transaction {
assertThat(aliceNode.internals.checkpointStorage.checkpoints()).isEmpty()
assertThat(aliceNode.internals.checkpointStorage.getAllIncompleteCheckpoints()).isEmpty()
}
bobNode.database.transaction {

View File

@ -23,6 +23,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.LogHelper
@ -35,6 +36,7 @@ import org.junit.After
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import java.time.Instant
@ -270,6 +272,7 @@ class DBCheckpointStorageTests {
}
@Test(timeout = 300_000)
@Ignore
fun `update checkpoint with result information creates new result database record`() {
val result = "This is the result"
val (id, checkpoint) = newCheckpoint()
@ -297,6 +300,7 @@ class DBCheckpointStorageTests {
}
@Test(timeout = 300_000)
@Ignore
fun `update checkpoint with result information updates existing result database record`() {
val result = "This is the result"
val somehowThereIsANewResult = "Another result (which should not be possible!)"

View File

@ -9,7 +9,15 @@ import net.corda.core.concurrent.CordaFuture
import net.corda.core.contracts.ContractState
import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.flows.Destination
import net.corda.core.flows.FinalityFlow
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.FlowIORequest
@ -40,12 +48,20 @@ import net.corda.testing.flows.registerCordappFlowFactory
import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork.MessageTransfer
import net.corda.testing.node.InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin
import net.corda.testing.node.internal.*
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.getMessage
import net.corda.testing.node.internal.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
import org.assertj.core.api.Condition
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotEquals
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Test
import rx.Notification
@ -55,7 +71,6 @@ import java.time.Instant
import java.util.*
import java.util.function.Predicate
import kotlin.reflect.KClass
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class FlowFrameworkTests {
@ -73,15 +88,14 @@ class FlowFrameworkTests {
private lateinit var notaryIdentity: Party
private val receivedSessionMessages = ArrayList<SessionTransfer>()
private val dbCheckpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
private val dbCheckpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
@Before
fun setUpMockNet() {
@ -312,6 +326,26 @@ class FlowFrameworkTests {
}, "FlowException's private peer field has value set"))
}
//We should update this test when we do the work to persists the flow result.
@Test(timeout = 300_000)
fun `Flow status is set to completed in database when the flow finishes`() {
val terminationSignal = Semaphore(0)
val flow = aliceNode.services.startFlow(NoOpFlow( terminateUponSignal = terminationSignal))
mockNet.waitQuiescent() // current thread needs to wait fiber running on a different thread, has reached the blocking point
aliceNode.database.transaction {
val checkpoint = dbCheckpointStorage.getCheckpoint(flow.id)
assertNull(checkpoint!!.result)
assertNotEquals(Checkpoint.FlowStatus.COMPLETED, checkpoint.status)
}
terminationSignal.release()
mockNet.waitQuiescent()
aliceNode.database.transaction {
val checkpoint = dbCheckpointStorage.getCheckpoint(flow.id)
assertNull(checkpoint!!.result)
assertEquals(Checkpoint.FlowStatus.COMPLETED, checkpoint.status)
}
}
private class ConditionalExceptionFlow(val otherPartySession: FlowSession, val sendPayload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
@ -915,4 +949,4 @@ internal class ExceptionFlow<E : Exception>(val exception: () -> E) : FlowLogic<
exceptionThrown = exception()
throw exceptionThrown
}
}
}

View File

@ -19,7 +19,7 @@ import net.corda.testing.driver.*
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import net.corda.testing.node.internal.assertCheckpoints
import net.corda.testing.node.internal.assertUncompletedCheckpoints
import net.corda.testing.node.internal.poll
import net.corda.traderdemo.flow.CommercialPaperIssueFlow
import net.corda.traderdemo.flow.SellerFlow
@ -100,7 +100,7 @@ class TraderDemoTest {
val saleFuture = seller.rpc.startFlow(::SellerFlow, buyer.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue
buyer.rpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts
buyer.stop()
assertCheckpoints(DUMMY_BANK_A_NAME, 1)
assertUncompletedCheckpoints(DUMMY_BANK_A_NAME, 1)
val buyer2 = startNode(providedName = DUMMY_BANK_A_NAME, customOverrides = mapOf("p2pAddress" to buyer.p2pAddress.toString())).getOrThrow()
saleFuture.getOrThrow()
assertThat(buyer2.rpc.getCashBalance(USD)).isEqualTo(95.DOLLARS)

View File

@ -23,6 +23,7 @@ import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.messaging.Message
import net.corda.node.services.statemachine.Checkpoint
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.NodeHandle
import net.corda.testing.internal.chooseIdentity
@ -273,9 +274,10 @@ fun CordaRPCOps.waitForShutdown(): Observable<Unit> {
return completable
}
fun DriverDSL.assertCheckpoints(name: CordaX500Name, expected: Long) {
fun DriverDSL.assertUncompletedCheckpoints(name: CordaX500Name, expected: Long) {
val sqlStatement = "select count(*) from node_checkpoints where status not in (${Checkpoint.FlowStatus.COMPLETED.ordinal})"
DriverManager.getConnection("jdbc:h2:file:${baseDirectory(name) / "persistence"}", "sa", "").use { connection ->
connection.createStatement().executeQuery("select count(*) from NODE_CHECKPOINTS").use { rs ->
connection.createStatement().executeQuery(sqlStatement).use { rs ->
rs.next()
assertThat(rs.getLong(1)).isEqualTo(expected)
}