Merge branch 'feature/checkpoint_table_improvements' into dan/merge-4.5-into-feature-branch

This commit is contained in:
LankyDan 2020-03-16 15:38:14 +00:00
commit 944fbce740
11 changed files with 111 additions and 17 deletions

@ -5,7 +5,6 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.SubFlow
import net.corda.node.services.statemachine.SubFlowVersion
@ -36,7 +35,7 @@ object CheckpointVerifier {
val cordappsByHash = currentCordapps.associateBy { it.jarHash }
checkpointStorage.getAllCheckpoints().use {
checkpointStorage.getRunnableCheckpoints().use {
it.forEach { (_, serializedCheckpoint) ->
val checkpoint = try {
serializedCheckpoint.deserialize(checkpointSerializationContext)

@ -1,7 +1,6 @@
package net.corda.node.services.api
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.FlowState
@ -42,4 +41,10 @@ interface CheckpointStorage {
* underlying database connection is closed, so any processing should happen before it is closed.
*/
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
/**
* Stream runnable checkpoints from the store. If this is backed by a database the stream will be valid
* until the underlying database connection is closed, so any processing should happen before it is closed.
*/
fun getRunnableCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
}

@ -2,7 +2,6 @@ package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.serialize
@ -43,6 +42,10 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
private const val HMAC_SIZE_BYTES = 16
private const val MAX_PROGRESS_STEP_LENGTH = 256
private val NOT_RUNNABLE_CHECKPOINTS = listOf(FlowStatus.COMPLETED, FlowStatus.FAILED, FlowStatus.KILLED)
/**
* This needs to run before Hibernate is initialised.
*
@ -249,6 +252,18 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
}
}
override fun getRunnableCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val criteriaQuery = criteriaBuilder.createQuery(DBFlowCheckpoint::class.java)
val root = criteriaQuery.from(DBFlowCheckpoint::class.java)
criteriaQuery.select(root)
.where(criteriaBuilder.not(root.get<FlowStatus>(DBFlowCheckpoint::status.name).`in`(NOT_RUNNABLE_CHECKPOINTS)))
return session.createQuery(criteriaQuery).stream().map {
StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint()
}
}
private fun getDBCheckpoint(id: StateMachineRunId): DBFlowCheckpoint? {
return currentDBSession().find(DBFlowCheckpoint::class.java, id.uuid.toString())
}
@ -342,7 +357,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
this.flowMetadata = entity.flowMetadata
this.status = checkpoint.status
this.compatible = checkpoint.compatible
this.progressStep = checkpoint.progressStep
this.progressStep = checkpoint.progressStep?.take(MAX_PROGRESS_STEP_LENGTH)
this.ioRequestType = checkpoint.flowIoRequest
this.checkpointInstant = now
}

@ -141,7 +141,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
try {
if (lock.getAndIncrement() == 0 && !file.exists()) {
database.transaction {
checkpointStorage.getAllCheckpoints().use { stream ->
checkpointStorage.getRunnableCheckpoints().use { stream ->
ZipOutputStream(file.outputStream()).use { zip ->
stream.forEach { (runId, serialisedCheckpoint) ->

@ -6,6 +6,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.messaging.DeduplicationHandler
import java.util.*
@ -101,17 +102,20 @@ sealed class Event {
* @param ioRequest the request triggering the suspension.
* @param maySkipCheckpoint indicates whether the persistence may be skipped.
* @param fiber the serialised stack of the flow.
* @param progressStep the current progress tracker step.
*/
data class Suspend(
val ioRequest: FlowIORequest<*>,
val maySkipCheckpoint: Boolean,
val fiber: SerializedBytes<FlowStateMachineImpl<*>>
val fiber: SerializedBytes<FlowStateMachineImpl<*>>,
var progressStep: ProgressTracker.Step?
) : Event() {
override fun toString() =
"Suspend(" +
"ioRequest=$ioRequest, " +
"maySkipCheckpoint=$maySkipCheckpoint, " +
"fiber=${fiber.hash}, " +
"currentStep=${progressStep?.label}" +
")"
}

@ -430,7 +430,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Event.Suspend(
ioRequest = ioRequest,
maySkipCheckpoint = skipPersistingCheckpoint,
fiber = this.checkpointSerialize(context = serializationContext.value)
fiber = this.checkpointSerialize(context = serializationContext.value),
progressStep = logic.progressTracker?.currentStep
)
} catch (exception: Exception) {
Event.Error(exception)

@ -334,7 +334,7 @@ class SingleThreadedStateMachineManager(
}
private fun restoreFlowsFromCheckpoints(): List<Flow> {
return checkpointStorage.getAllCheckpoints().use {
return checkpointStorage.getRunnableCheckpoints().use {
it.mapNotNull { (id, serializedCheckpoint) ->
// If a flow is added before start() then don't attempt to restore it
mutex.locked { if (id in flows) return@mapNotNull null }

@ -159,7 +159,8 @@ class TopLevelTransition(
checkpointState = currentState.checkpoint.checkpointState.copy(
numberOfSuspends = currentState.checkpoint.checkpointState.numberOfSuspends + 1
),
flowIoRequest = event.ioRequest::class.java.simpleName
flowIoRequest = event.ioRequest::class.java.simpleName,
progressStep = event.progressStep?.label
)
if (event.maySkipCheckpoint) {
actions.addAll(arrayOf(

@ -63,7 +63,7 @@ import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
internal fun CheckpointStorage.getAllIncompleteCheckpoints(): List<Checkpoint.Serialized> {
return getAllCheckpoints().use {
return getRunnableCheckpoints().use {
it.map { it.second }.toList()
}.filter { it.status != Checkpoint.FlowStatus.COMPLETED }
}

@ -23,7 +23,6 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.LogHelper
@ -45,7 +44,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertTrue
internal fun CheckpointStorage.checkpoints(): List<Checkpoint.Serialized> {
return getAllCheckpoints().use {
return getRunnableCheckpoints().use {
it.map { it.second }.toList()
}
}
@ -467,6 +466,62 @@ class DBCheckpointStorageTests {
}
}
@Test(timeout = 300_000)
fun `Checkpoint truncates long progressTracker step name`() {
val maxProgressStepLength = 256
val (id, checkpoint) = newCheckpoint(1)
database.transaction {
val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
val checkpointFromStorage = checkpointStorage.getCheckpoint(id)
assertNull(checkpointFromStorage!!.progressStep)
}
val longString = """Long string Long string Long string Long string Long string Long string Long string Long string Long string
Long string Long string Long string Long string Long string Long string Long string Long string Long string Long string
Long string Long string Long string Long string Long string Long string Long string Long string Long string Long string
""".trimIndent()
database.transaction {
val newCheckpoint = checkpoint.copy(progressStep = longString)
val serializedFlowState = newCheckpoint.flowState.checkpointSerialize(
context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT
)
checkpointStorage.updateCheckpoint(id, newCheckpoint, serializedFlowState)
}
database.transaction {
val checkpointFromStorage = checkpointStorage.getCheckpoint(id)
assertEquals(longString.take(maxProgressStepLength), checkpointFromStorage!!.progressStep)
}
}
@Test(timeout = 300_000)
fun `fetch runnable checkpoints`() {
val (_, checkpoint) = newCheckpoint(1)
// runnables
val runnable = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
val hospitalized = checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
// not runnables
val completed = checkpoint.copy(status = Checkpoint.FlowStatus.COMPLETED)
val failed = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
val killed = checkpoint.copy(status = Checkpoint.FlowStatus.KILLED)
// tentative
val paused = checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED) // is considered runnable
database.transaction {
val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), runnable, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), hospitalized, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), completed, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), failed, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), killed, serializedFlowState)
checkpointStorage.addCheckpoint(StateMachineRunId.createRandom(), paused, serializedFlowState)
}
database.transaction {
assertEquals(3, checkpointStorage.getRunnableCheckpoints().count())
}
}
private fun newCheckpointStorage() {
database.transaction {
checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {

@ -20,8 +20,8 @@ import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
@ -79,7 +79,6 @@ import java.util.*
import java.util.function.Predicate
import kotlin.reflect.KClass
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@ -356,6 +355,21 @@ class FlowFrameworkTests {
}
}
@Test(timeout = 300_000)
fun `Flow persists progress tracker in the database when the flow suspends`() {
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedReceiveFlow(it) }
val aliceFlowId = aliceNode.services.startFlow(ReceiveFlow(bob)).id
mockNet.runNetwork()
aliceNode.database.transaction {
val checkpoint = aliceNode.internals.checkpointStorage.getCheckpoint(aliceFlowId)
assertEquals(ReceiveFlow.START_STEP.label, checkpoint!!.progressStep)
}
bobNode.database.transaction {
val checkpoints = bobNode.internals.checkpointStorage.checkpoints().single()
assertEquals(InitiatedReceiveFlow.START_STEP.label, checkpoints.progressStep)
}
}
private class ConditionalExceptionFlow(val otherPartySession: FlowSession, val sendPayload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
@ -642,7 +656,7 @@ class FlowFrameworkTests {
}
SuspendingFlow.hookAfterCheckpoint = {
checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getAllCheckpoints().toList().single().second.status
checkpointStatusInDBAfterSuspension = aliceNode.internals.checkpointStorage.getRunnableCheckpoints().toList().single().second.status
}
aliceNode.services.startFlow(SuspendingFlow()).resultFuture.getOrThrow()
@ -687,7 +701,7 @@ class FlowFrameworkTests {
// the following method should be removed when implementing CORDA-3604.
private fun manuallyFailCheckpointInDB(node: TestStartedNode) {
val idCheckpoint = node.internals.checkpointStorage.getAllCheckpoints().toList().single()
val idCheckpoint = node.internals.checkpointStorage.getRunnableCheckpoints().toList().single()
val checkpoint = idCheckpoint.second
val updatedCheckpoint = checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
node.internals.checkpointStorage.updateCheckpoint(idCheckpoint.first,