CORDA-3600 Add flowIO request to checkpoint (#6017)

* Update Checkpoint DB to update flow io request

* Modify flow monitor to update Checkpoint DB with waiting flows

This happens periodically.

* Refactored code to avoid looping twice and updated tests

* Fix tests after rebasing

* Fix MR comments (non-functional refactor of tests + FlowMonitor).

* Made visible for testing method private in DBCheckpointStorage

This is not needed anymore.

* Explicity check if ioRequestType has changed in update method

* Fix shadowing warning

* Import non deprecated Assert into test

* Use AssertEquals not assert in test

* Address more comments (minor refactor) of DBCheckpointStorage

* Minor fix use it instead of referencing object explicitly

* Add null check to DBCheckpointStorage

* Revert changes to Flow Monitor.

We will instead store the information in the main thread of the
state machine.

* Remove now uneeded API and make statemachine update ioRequest

* Add Integration Test to check statemachine updates DB on Recieve

* Use simpleName in checkpoint storage instead of class.

Hibernate was previously resetting the class field this is now
set to null (when getting checkpoint form DB) and a new method
for getting back the simple name as a string.

* Update StateMachineState to store simple name.

* Fix after rebase broke stuff + renamed test

* Fix Detekt issue

* Remove uneeded null assertion
This commit is contained in:
williamvigorr3 2020-03-10 15:46:37 +00:00 committed by GitHub
parent d5e84a4f93
commit 51db2c2d7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 68 additions and 14 deletions

View File

@ -74,7 +74,7 @@ class CordaPersistenceServiceTests {
status = FlowStatus.RUNNABLE,
compatible = false,
progressStep = "",
ioRequestType = FlowIORequest.ForceCheckpoint.javaClass,
ioRequestType = FlowIORequest.ForceCheckpoint::class.java.simpleName,
checkpointInstant = Instant.now(),
flowMetadata = createMetadataRecord(UUID.randomUUID(), now)
)

View File

@ -1,6 +1,7 @@
package net.corda.node.services.api
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.FlowState
@ -41,4 +42,4 @@ interface CheckpointStorage {
* underlying database connection is closed, so any processing should happen before it is closed.
*/
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
}
}

View File

@ -1,7 +1,6 @@
package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
@ -17,12 +16,10 @@ import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY
import org.hibernate.annotations.Type
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.sql.Connection
import java.sql.SQLException
import java.time.Instant
import java.util.UUID
import java.util.*
import java.util.stream.Stream
import javax.persistence.CascadeType
import javax.persistence.Column
@ -106,7 +103,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
var progressStep: String?,
@Column(name = "flow_io_request")
var ioRequestType: Class<out FlowIORequest<*>>?,
var ioRequestType: String?,
@Column(name = "timestamp", nullable = false)
var checkpointInstant: Instant
@ -238,7 +235,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
}
override fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? {
return currentDBSession().get(DBFlowCheckpoint::class.java, id.uuid.toString())?.toSerializedCheckpoint()
return getDBCheckpoint(id)?.toSerializedCheckpoint()
}
override fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
@ -251,6 +248,10 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
}
}
private fun getDBCheckpoint(id: StateMachineRunId): DBFlowCheckpoint? {
return currentDBSession().find(DBFlowCheckpoint::class.java, id.uuid.toString())
}
private fun createDBCheckpoint(
id: StateMachineRunId,
checkpoint: Checkpoint,

View File

@ -62,7 +62,7 @@ data class Checkpoint(
val result: Any? = null,
val status: FlowStatus = FlowStatus.RUNNABLE,
val progressStep: String? = null,
val flowIoRequest: Class<out FlowIORequest<*>>? = null,
val flowIoRequest: String? = null,
val compatible: Boolean = true
) {
@CordaSerializable
@ -149,7 +149,7 @@ data class Checkpoint(
val result: SerializedBytes<Any>?,
val status: FlowStatus,
val progressStep: String?,
val flowIoRequest: Class<out FlowIORequest<*>>?,
val flowIoRequest: String?,
val compatible: Boolean
) {
/**

View File

@ -158,7 +158,8 @@ class TopLevelTransition(
flowState = FlowState.Started(event.ioRequest, event.fiber),
checkpointState = currentState.checkpoint.checkpointState.copy(
numberOfSuspends = currentState.checkpoint.checkpointState.numberOfSuspends + 1
)
),
flowIoRequest = event.ioRequest::class.java.simpleName
)
if (event.maySkipCheckpoint) {
actions.addAll(arrayOf(

View File

@ -32,14 +32,14 @@ import net.corda.testing.node.MockServices.Companion.makeTestDataSourcePropertie
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.time.Instant
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
internal fun CheckpointStorage.checkpoints(): List<Checkpoint.Serialized> {
@ -121,7 +121,7 @@ class DBCheckpointStorageTests {
logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
),
progressStep = "I have made progress",
flowIoRequest = FlowIORequest.SendAndReceive::class.java
flowIoRequest = FlowIORequest.SendAndReceive::class.java.simpleName
)
val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
database.transaction {
@ -439,6 +439,30 @@ class DBCheckpointStorageTests {
}
}
@Test(timeout = 300_000)
fun `Checkpoint can be updated with flow io request information`() {
val (id, checkpoint) = newCheckpoint(1)
database.transaction {
val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
val checkpointFromStorage = checkpointStorage.getCheckpoint(id)
assertNull(checkpointFromStorage!!.flowIoRequest)
}
database.transaction {
val newCheckpoint = checkpoint.copy(flowIoRequest = FlowIORequest.Sleep::class.java.simpleName)
val serializedFlowState = newCheckpoint.flowState.checkpointSerialize(
context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT
)
checkpointStorage.updateCheckpoint(id, newCheckpoint, serializedFlowState)
}
database.transaction {
val checkpointFromStorage = checkpointStorage.getCheckpoint(id)
assertNotNull(checkpointFromStorage!!.flowIoRequest)
val flowIORequest = checkpointFromStorage.flowIoRequest
assertEquals(FlowIORequest.Sleep::class.java.simpleName, flowIORequest)
}
}
private fun newCheckpointStorage() {
database.transaction {
checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {

View File

@ -12,10 +12,12 @@ import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.*
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.queryBy
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.toFuture
@ -25,6 +27,8 @@ import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.ProgressTracker.Change
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.persistence.CheckpointPerformanceRecorder
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.checkpoints
import net.corda.testing.contracts.DummyContract
import net.corda.testing.contracts.DummyState
@ -69,6 +73,16 @@ class FlowFrameworkTests {
private lateinit var notaryIdentity: Party
private val receivedSessionMessages = ArrayList<SessionTransfer>()
private val dbCheckpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
@Before
fun setUpMockNet() {
mockNet = InternalMockNetwork(
@ -208,6 +222,19 @@ class FlowFrameworkTests {
script(FlowMonitor(aliceNode.smm, Duration.ZERO, Duration.ZERO), FlowMonitor(bobNode.smm, Duration.ZERO, Duration.ZERO))
}
@Test(timeout = 300_000)
fun `flow status is updated in database when flow suspends on ioRequest`() {
val terminationSignal = Semaphore(0)
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { NoOpFlow( terminateUponSignal = terminationSignal) }
val flowId = aliceNode.services.startFlow(ReceiveFlow(bob)).id
mockNet.runNetwork()
aliceNode.database.transaction {
val checkpoint = dbCheckpointStorage.getCheckpoint(flowId)
assertEquals(FlowIORequest.Receive::class.java.simpleName, checkpoint?.flowIoRequest)
}
terminationSignal.release()
}
@Test(timeout=300_000)
fun `receiving unexpected session end before entering sendAndReceive`() {
bobNode.registerCordappFlowFactory(WaitForOtherSideEndBeforeSendAndReceive::class) { NoOpFlow() }