From 5d24b702278446d058a77638bf73919c7203a6dd Mon Sep 17 00:00:00 2001 From: Kyriakos Tharrouniatis Date: Wed, 19 Aug 2020 15:56:12 +0100 Subject: [PATCH] CORDA-3998 - Commit db transaction before starting flows at node start (#6647) Delay the firing of future/callback chain in 'AbstractNode.start' to after db transaction commit --- .../main/kotlin/net/corda/node/internal/AbstractNode.kt | 9 ++++++++- .../statemachine/SingleThreadedStateMachineManager.kt | 5 ++--- .../node/services/statemachine/StateMachineManager.kt | 2 +- .../node/services/statemachine/FlowFrameworkTests.kt | 9 --------- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 686a57b77e..c470394638 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -588,6 +588,10 @@ abstract class AbstractNode(val configuration: NodeConfiguration, throw e } + // Only execute futures/callbacks linked to [rootFuture] after the database transaction below is committed. + // This ensures that the node is fully ready before starting flows. + val rootFuture = openFuture() + // Do all of this in a database transaction so anything that might need a connection has one. val (resultingNodeInfo, readyFuture) = database.transaction(recoverableFailureTolerance = 0) { networkParametersStorage.setCurrentParameters(signedNetParams, trustRoot) @@ -609,7 +613,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, tokenizableServices = null verifyCheckpointsCompatible(frozenTokenizableServices) - val smmStartedFuture = smm.start(frozenTokenizableServices) + val callback = smm.start(frozenTokenizableServices) + val smmStartedFuture = rootFuture.map { callback() } // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } val flowMonitor = FlowMonitor( @@ -629,6 +634,8 @@ abstract class AbstractNode(val configuration: NodeConfiguration, resultingNodeInfo to readyFuture } + rootFuture.captureLater(services.networkMapCache.nodeReady) + readyFuture.map { ready -> if (ready) { // NB: Dispatch lifecycle events outside of transaction to ensure attachments and the like persisted into the DB diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 4066a8f972..9399176ad1 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -22,7 +22,6 @@ import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.doneFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture -import net.corda.core.internal.mapNotNull import net.corda.core.internal.uncheckedCast import net.corda.core.messaging.DataFeed import net.corda.core.serialization.deserialize @@ -152,7 +151,7 @@ internal class SingleThreadedStateMachineManager( override val changes: Observable = innerState.changesPublisher @Suppress("ComplexMethod") - override fun start(tokenizableServices: List, startMode: StateMachineManager.StartMode): CordaFuture { + override fun start(tokenizableServices: List, startMode: StateMachineManager.StartMode): () -> Unit { checkQuasarJavaAgentPresence() val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext( CheckpointSerializeAsTokenContextImpl( @@ -223,7 +222,7 @@ internal class SingleThreadedStateMachineManager( } ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.") } - return serviceHub.networkMapCache.nodeReady.map { + return { logger.info("Node ready, info: ${serviceHub.myInfo}") resumeRestoredFlows(fibers) flowMessaging.start { _, deduplicationHandler -> diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index d1bfc901be..05887df101 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -42,7 +42,7 @@ interface StateMachineManager { * * @return `Future` which completes when SMM is fully started */ - fun start(tokenizableServices: List, startMode: StartMode = StartMode.ExcludingPaused) : CordaFuture + fun start(tokenizableServices: List, startMode: StartMode = StartMode.ExcludingPaused) : () -> Unit /** * Stops the state machine manager gracefully, waiting until all but [allowedUnsuspendedFiberCount] flows reach the diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 13e1eca04b..409e746c90 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -689,9 +689,6 @@ class FlowFrameworkTests { firstExecution = false throw HospitalizeFlowException() } else { - // the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long - // and doesn't commit before flow starts running. - Thread.sleep(3000) dbCheckpointStatusBeforeSuspension = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status currentDBSession().clear() // clear session as Hibernate with fails with 'org.hibernate.NonUniqueObjectException' once it tries to save a DBFlowCheckpoint upon checkpoint inMemoryCheckpointStatusBeforeSuspension = flowFiber.transientState.checkpoint.status @@ -740,9 +737,6 @@ class FlowFrameworkTests { firstExecution = false throw HospitalizeFlowException() } else { - // the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long - // and doesn't commit before flow starts running. - Thread.sleep(3000) dbCheckpointStatus = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status inMemoryCheckpointStatus = flowFiber.transientState.checkpoint.status @@ -857,9 +851,6 @@ class FlowFrameworkTests { var secondRun = false SuspendingFlow.hookBeforeCheckpoint = { if(secondRun) { - // the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long - // and doesn't commit before flow starts running. - Thread.sleep(3000) aliceNode.database.transaction { checkpointStatusAfterRestart = findRecordsFromDatabase().single().status dbExceptionAfterRestart = findRecordsFromDatabase()