From 1025ee1dee2ee642c2bf0527516a0172543fcf1c Mon Sep 17 00:00:00 2001 From: williamvigorr3 <58432369+williamvigorr3@users.noreply.github.com> Date: Mon, 16 Mar 2020 09:30:23 +0000 Subject: [PATCH] CORDA-3599 Add progress tracker information to checkpoint (#6063) * Add progress tracker information to checkpoint The checkpoint Datebase is updated when the statemachine suspends with the progress trackers current step name. This is truncated if it is longer than the Database column. * Minor rename in statemachine for clarity --- .../persistence/DBCheckpointStorage.kt | 4 ++- .../corda/node/services/statemachine/Event.kt | 6 +++- .../statemachine/FlowStateMachineImpl.kt | 3 +- .../transitions/TopLevelTransition.kt | 3 +- .../persistence/DBCheckpointStorageTests.kt | 28 ++++++++++++++++++- .../statemachine/FlowFrameworkTests.kt | 18 ++++++++++-- 6 files changed, 55 insertions(+), 7 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index fabbeb2d81..7f554edbf6 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -43,6 +43,8 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP private const val HMAC_SIZE_BYTES = 16 + private const val MAX_PROGRESS_STEP_LENGTH = 256 + /** * This needs to run before Hibernate is initialised. * @@ -342,7 +344,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP this.flowMetadata = entity.flowMetadata this.status = checkpoint.status this.compatible = checkpoint.compatible - this.progressStep = checkpoint.progressStep + this.progressStep = checkpoint.progressStep?.take(MAX_PROGRESS_STEP_LENGTH) this.ioRequestType = checkpoint.flowIoRequest this.checkpointInstant = now } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt index a7d15c87c8..d0f96925d2 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt @@ -6,6 +6,7 @@ import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.SerializedBytes import net.corda.core.transactions.SignedTransaction +import net.corda.core.utilities.ProgressTracker import net.corda.node.services.messaging.DeduplicationHandler import java.util.* @@ -101,17 +102,20 @@ sealed class Event { * @param ioRequest the request triggering the suspension. * @param maySkipCheckpoint indicates whether the persistence may be skipped. * @param fiber the serialised stack of the flow. + * @param progressStep the current progress tracker step. */ data class Suspend( val ioRequest: FlowIORequest<*>, val maySkipCheckpoint: Boolean, - val fiber: SerializedBytes> + val fiber: SerializedBytes>, + var progressStep: ProgressTracker.Step? ) : Event() { override fun toString() = "Suspend(" + "ioRequest=$ioRequest, " + "maySkipCheckpoint=$maySkipCheckpoint, " + "fiber=${fiber.hash}, " + + "currentStep=${progressStep?.label}" + ")" } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 9b0541f8cc..12b578ec37 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -430,7 +430,8 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, Event.Suspend( ioRequest = ioRequest, maySkipCheckpoint = skipPersistingCheckpoint, - fiber = this.checkpointSerialize(context = serializationContext.value) + fiber = this.checkpointSerialize(context = serializationContext.value), + progressStep = logic.progressTracker?.currentStep ) } catch (exception: Exception) { Event.Error(exception) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt index 85d7afb302..e20f3fc290 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt @@ -159,7 +159,8 @@ class TopLevelTransition( checkpointState = currentState.checkpoint.checkpointState.copy( numberOfSuspends = currentState.checkpoint.checkpointState.numberOfSuspends + 1 ), - flowIoRequest = event.ioRequest::class.java.simpleName + flowIoRequest = event.ioRequest::class.java.simpleName, + progressStep = event.progressStep?.label ) if (event.maySkipCheckpoint) { actions.addAll(arrayOf( diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index 83128be220..d1da5967b0 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -23,7 +23,6 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.testing.core.ALICE_NAME -import net.corda.testing.core.DUMMY_NOTARY_NAME import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity import net.corda.testing.internal.LogHelper @@ -467,6 +466,33 @@ class DBCheckpointStorageTests { } } + @Test(timeout = 300_000) + fun `Checkpoint truncates long progressTracker step name`() { + val maxProgressStepLength = 256 + val (id, checkpoint) = newCheckpoint(1) + database.transaction { + val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + val checkpointFromStorage = checkpointStorage.getCheckpoint(id) + assertNull(checkpointFromStorage!!.progressStep) + } + val longString = """Long string Long string Long string Long string Long string Long string Long string Long string Long string + Long string Long string Long string Long string Long string Long string Long string Long string Long string Long string + Long string Long string Long string Long string Long string Long string Long string Long string Long string Long string + """.trimIndent() + database.transaction { + val newCheckpoint = checkpoint.copy(progressStep = longString) + val serializedFlowState = newCheckpoint.flowState.checkpointSerialize( + context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT + ) + checkpointStorage.updateCheckpoint(id, newCheckpoint, serializedFlowState) + } + database.transaction { + val checkpointFromStorage = checkpointStorage.getCheckpoint(id) + assertEquals(longString.take(maxProgressStepLength), checkpointFromStorage!!.progressStep) + } + } + private fun newCheckpointStorage() { database.transaction { checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder { diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 01bd6e76e6..242eaa0dba 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -20,8 +20,8 @@ import net.corda.core.flows.ReceiveFinalityFlow import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.identity.Party import net.corda.core.internal.DeclaredField -import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.concurrent.flatMap import net.corda.core.messaging.MessageRecipients import net.corda.core.node.services.PartyInfo @@ -79,7 +79,6 @@ import java.util.* import java.util.function.Predicate import kotlin.reflect.KClass import kotlin.streams.toList -import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertTrue @@ -356,6 +355,21 @@ class FlowFrameworkTests { } } + @Test(timeout = 300_000) + fun `Flow persists progress tracker in the database when the flow suspends`() { + bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedReceiveFlow(it) } + val aliceFlowId = aliceNode.services.startFlow(ReceiveFlow(bob)).id + mockNet.runNetwork() + aliceNode.database.transaction { + val checkpoint = aliceNode.internals.checkpointStorage.getCheckpoint(aliceFlowId) + assertEquals(ReceiveFlow.START_STEP.label, checkpoint!!.progressStep) + } + bobNode.database.transaction { + val checkpoints = bobNode.internals.checkpointStorage.checkpoints().single() + assertEquals(InitiatedReceiveFlow.START_STEP.label, checkpoints.progressStep) + } + } + private class ConditionalExceptionFlow(val otherPartySession: FlowSession, val sendPayload: Any) : FlowLogic() { @Suspendable override fun call() {