CORDA-3601 Record a flow's finish time (#6079)

* CORDA-3601 Record a flow's finish time

Record a flow's finish time by updating its metadata record. It is set
in `updateCheckpoint` by checking the status of the checkpoint. If it is
 `COMPLETED` it will set the `finishInstant` on the metadata object and
 update it.

* CORDA-3601 Record flow finish time for all finished statuses

Update the flow finish time for the following statuses:

- COMPLETED
- KILLED
- FAILED

* CORDA-3601 Use platform clock in `DBCheckpointStorage`
This commit is contained in:
Dan Newton 2020-03-25 13:47:00 +00:00 committed by GitHub
parent 937f12f966
commit 79b36aea8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 128 additions and 41 deletions

View File

@ -328,7 +328,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
})
}
val services = ServiceHubInternalImpl().tokenize()
val checkpointStorage = DBCheckpointStorage(DBCheckpointPerformanceRecorder(services.monitoringService.metrics))
val checkpointStorage = DBCheckpointStorage(DBCheckpointPerformanceRecorder(services.monitoringService.metrics), platformClock)
@Suppress("LeakingThis")
val smm = makeStateMachineManager()
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory)

View File

@ -22,6 +22,7 @@ import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY
import org.hibernate.annotations.Type
import java.sql.Connection
import java.sql.SQLException
import java.time.Clock
import java.time.Instant
import java.util.*
import java.util.stream.Stream
@ -39,7 +40,10 @@ import javax.persistence.OneToOne
* Simple checkpoint key value storage in DB.
*/
@Suppress("TooManyFunctions")
class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointPerformanceRecorder) : CheckpointStorage {
class DBCheckpointStorage(
private val checkpointPerformanceRecorder: CheckpointPerformanceRecorder,
private val clock: Clock
) : CheckpointStorage {
companion object {
val log = contextLogger()
@ -314,7 +318,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
serializedFlowState: SerializedBytes<FlowState>
): DBFlowCheckpoint {
val flowId = id.uuid.toString()
val now = Instant.now()
val now = clock.instant()
val serializedCheckpointState = checkpoint.checkpointState.storageSerialize()
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
@ -333,7 +337,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
compatible = checkpoint.compatible,
progressStep = null,
ioRequestType = null,
checkpointInstant = Instant.now()
checkpointInstant = now
)
}
@ -354,7 +358,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
platformVersion = PLATFORM_VERSION,
startedBy = context.principal().name,
invocationInstant = context.trace.invocationId.timestamp,
startInstant = Instant.now(),
startInstant = clock.instant(),
finishInstant = null
).apply {
currentDBSession().save(this)
@ -367,7 +371,7 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
serializedFlowState: SerializedBytes<FlowState>
): DBFlowCheckpoint {
val flowId = id.uuid.toString()
val now = Instant.now()
val now = clock.instant()
// Load the previous entity from the hibernate cache so the meta data join does not get updated
val entity = currentDBSession().find(DBFlowCheckpoint::class.java, flowId)
@ -380,13 +384,20 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
//val result = updateDBFlowResult(entity, checkpoint, now)
val exceptionDetails = updateDBFlowException(entity, checkpoint, now)
val metadata = entity.flowMetadata.apply {
if (checkpoint.isFinished() && finishInstant == null) {
finishInstant = now
currentDBSession().update(this)
}
}
return entity.apply {
this.blob = blob
//Set the result to null for now.
this.result = null
this.exceptionDetails = exceptionDetails
// Do not update the meta data relationship on updates
this.flowMetadata = entity.flowMetadata
this.flowMetadata = metadata
this.status = checkpoint.status
this.compatible = checkpoint.compatible
this.progressStep = checkpoint.progressStep?.take(MAX_PROGRESS_STEP_LENGTH)
@ -512,4 +523,9 @@ class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointP
private fun <T : Any> T.storageSerialize(): SerializedBytes<T> {
return serialize(context = SerializationDefaults.STORAGE_CONTEXT)
}
private fun Checkpoint.isFinished() = when(status) {
FlowStatus.COMPLETED, FlowStatus.KILLED, FlowStatus.FAILED -> true
else -> false
}
}

View File

@ -37,6 +37,7 @@ import org.junit.Before
import org.junit.Ignore
import org.junit.Rule
import org.junit.Test
import java.time.Clock
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@ -546,14 +547,17 @@ class DBCheckpointStorageTests {
private fun newCheckpointStorage() {
database.transaction {
checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
checkpointStorage = DBCheckpointStorage(
object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
},
Clock.systemUTC()
)
}
}

View File

@ -143,14 +143,17 @@ class CheckpointDumperImplTest {
private fun newCheckpointStorage() {
database.transaction {
checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
checkpointStorage = DBCheckpointStorage(
object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
},
Clock.systemUTC()
)
}
}

View File

@ -73,6 +73,7 @@ import org.junit.Test
import rx.Notification
import rx.Observable
import java.sql.SQLException
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.*
@ -80,6 +81,7 @@ import java.util.function.Predicate
import kotlin.reflect.KClass
import kotlin.streams.toList
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
class FlowFrameworkTests {
@ -97,14 +99,17 @@ class FlowFrameworkTests {
private lateinit var notaryIdentity: Party
private val receivedSessionMessages = ArrayList<SessionTransfer>()
private val dbCheckpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
private val dbCheckpointStorage = DBCheckpointStorage(
object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
) {
// do nothing
}
},
Clock.systemUTC()
)
@Before
fun setUpMockNet() {
@ -355,6 +360,24 @@ class FlowFrameworkTests {
}
}
@Test(timeout = 300_000)
fun `Flow metadata finish time is set in database when the flow finishes`() {
val terminationSignal = Semaphore(0)
val flow = aliceNode.services.startFlow(NoOpFlow(terminateUponSignal = terminationSignal))
mockNet.waitQuiescent()
aliceNode.database.transaction {
val metadata = session.find(DBCheckpointStorage.DBFlowMetadata::class.java, flow.id.uuid.toString())
assertNull(metadata.finishInstant)
}
terminationSignal.release()
mockNet.waitQuiescent()
aliceNode.database.transaction {
val metadata = session.find(DBCheckpointStorage.DBFlowMetadata::class.java, flow.id.uuid.toString())
assertNotNull(metadata.finishInstant)
assertTrue(metadata.finishInstant!! >= metadata.startInstant)
}
}
@Test(timeout = 300_000)
fun `Flow persists progress tracker in the database when the flow suspends`() {
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedReceiveFlow(it) }

View File

@ -31,6 +31,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.node.services.Permissions
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.testing.contracts.DummyContract
@ -43,15 +44,14 @@ import net.corda.testing.node.User
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Test
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.Semaphore
import java.util.function.Supplier
import kotlin.reflect.jvm.jvmName
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
@ -92,7 +92,7 @@ class FlowMetadataRecordingTest {
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
).returnValue.getOrThrow(1.minutes)
}
metadata!!.let {
@ -133,7 +133,7 @@ class FlowMetadataRecordingTest {
}
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(::MyFlowWithoutParameters).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
it.proxy.startFlow(::MyFlowWithoutParameters).returnValue.getOrThrow(1.minutes)
}
metadata!!.let {
@ -186,7 +186,7 @@ class FlowMetadataRecordingTest {
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
).returnValue.getOrThrow(1.minutes)
}
assertEquals(
@ -232,7 +232,7 @@ class FlowMetadataRecordingTest {
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
).returnValue.getOrThrow(1.minutes)
}
metadata!!.let {
@ -278,7 +278,7 @@ class FlowMetadataRecordingTest {
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
).returnValue.getOrThrow(1.minutes)
}
metadata!!.let {
@ -308,7 +308,7 @@ class FlowMetadataRecordingTest {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
val lock = Semaphore(1)
val lock = Semaphore(0)
var flowId: StateMachineRunId? = null
var context: InvocationContext? = null
@ -322,16 +322,13 @@ class FlowMetadataRecordingTest {
lock.release()
}
// Acquire the lock to prevent the asserts from being processed too early
lock.acquire()
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(
::MyStartedScheduledFlow,
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(Duration.of(10, ChronoUnit.SECONDS))
).returnValue.getOrThrow(1.minutes)
}
// Block here until released in the hook
@ -357,6 +354,42 @@ class FlowMetadataRecordingTest {
}
}
@Test(timeout = 300_000)
fun `flows have their finish time recorded when completed`() {
driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
var flowId: StateMachineRunId? = null
var metadata: DBCheckpointStorage.DBFlowMetadata? = null
MyFlow.hookAfterInitialCheckpoint =
{ flowIdFromHook: StateMachineRunId, _, metadataFromHook: DBCheckpointStorage.DBFlowMetadata ->
flowId = flowIdFromHook
metadata = metadataFromHook
}
val finishTime = CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
it.proxy.startFlow(
::MyFlow,
nodeBHandle.nodeInfo.singleIdentity(),
string,
someObject
).returnValue.getOrThrow(1.minutes)
it.proxy.startFlow(
::GetFlowFinishTimeFlow,
flowId!!
).returnValue.getOrThrow(1.minutes)
}
metadata!!.let {
assertNull(it.finishInstant)
assertNotNull(finishTime)
assertTrue(finishTime!! >= it.startInstant)
}
}
}
@InitiatingFlow
@StartableByRPC
@StartableByService
@ -473,6 +506,14 @@ class FlowMetadataRecordingTest {
}
}
@StartableByRPC
class GetFlowFinishTimeFlow(private val flowId: StateMachineRunId) : FlowLogic<Instant?>() {
@Suspendable
override fun call(): Instant? {
return serviceHub.cordaService(MyService::class.java).findMetadata(flowId).finishInstant
}
}
@CordaService
class MyService(private val services: AppServiceHub) : SingletonSerializeAsToken() {