CORDA-3998 - Commit db transaction before starting flows at node start (#6647)

Delay the firing of future/callback chain in 'AbstractNode.start' to after db transaction commit
This commit is contained in:
Kyriakos Tharrouniatis 2020-08-19 15:56:12 +01:00 committed by GitHub
parent 742312b85a
commit 5d24b70227
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 11 additions and 14 deletions

View File

@ -588,6 +588,10 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
throw e
}
// Only execute futures/callbacks linked to [rootFuture] after the database transaction below is committed.
// This ensures that the node is fully ready before starting flows.
val rootFuture = openFuture<Void?>()
// Do all of this in a database transaction so anything that might need a connection has one.
val (resultingNodeInfo, readyFuture) = database.transaction(recoverableFailureTolerance = 0) {
networkParametersStorage.setCurrentParameters(signedNetParams, trustRoot)
@ -609,7 +613,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
tokenizableServices = null
verifyCheckpointsCompatible(frozenTokenizableServices)
val smmStartedFuture = smm.start(frozenTokenizableServices)
val callback = smm.start(frozenTokenizableServices)
val smmStartedFuture = rootFuture.map { callback() }
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
val flowMonitor = FlowMonitor(
@ -629,6 +634,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
resultingNodeInfo to readyFuture
}
rootFuture.captureLater(services.networkMapCache.nodeReady)
readyFuture.map { ready ->
if (ready) {
// NB: Dispatch lifecycle events outside of transaction to ensure attachments and the like persisted into the DB

View File

@ -22,7 +22,6 @@ import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.mapNotNull
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.deserialize
@ -152,7 +151,7 @@ internal class SingleThreadedStateMachineManager(
override val changes: Observable<StateMachineManager.Change> = innerState.changesPublisher
@Suppress("ComplexMethod")
override fun start(tokenizableServices: List<Any>, startMode: StateMachineManager.StartMode): CordaFuture<Unit> {
override fun start(tokenizableServices: List<Any>, startMode: StateMachineManager.StartMode): () -> Unit {
checkQuasarJavaAgentPresence()
val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
CheckpointSerializeAsTokenContextImpl(
@ -223,7 +222,7 @@ internal class SingleThreadedStateMachineManager(
} ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.")
}
return serviceHub.networkMapCache.nodeReady.map {
return {
logger.info("Node ready, info: ${serviceHub.myInfo}")
resumeRestoredFlows(fibers)
flowMessaging.start { _, deduplicationHandler ->

View File

@ -42,7 +42,7 @@ interface StateMachineManager {
*
* @return `Future` which completes when SMM is fully started
*/
fun start(tokenizableServices: List<Any>, startMode: StartMode = StartMode.ExcludingPaused) : CordaFuture<Unit>
fun start(tokenizableServices: List<Any>, startMode: StartMode = StartMode.ExcludingPaused) : () -> Unit
/**
* Stops the state machine manager gracefully, waiting until all but [allowedUnsuspendedFiberCount] flows reach the

View File

@ -689,9 +689,6 @@ class FlowFrameworkTests {
firstExecution = false
throw HospitalizeFlowException()
} else {
// the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
// and doesn't commit before flow starts running.
Thread.sleep(3000)
dbCheckpointStatusBeforeSuspension = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
currentDBSession().clear() // clear session as Hibernate with fails with 'org.hibernate.NonUniqueObjectException' once it tries to save a DBFlowCheckpoint upon checkpoint
inMemoryCheckpointStatusBeforeSuspension = flowFiber.transientState.checkpoint.status
@ -740,9 +737,6 @@ class FlowFrameworkTests {
firstExecution = false
throw HospitalizeFlowException()
} else {
// the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
// and doesn't commit before flow starts running.
Thread.sleep(3000)
dbCheckpointStatus = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
inMemoryCheckpointStatus = flowFiber.transientState.checkpoint.status
@ -857,9 +851,6 @@ class FlowFrameworkTests {
var secondRun = false
SuspendingFlow.hookBeforeCheckpoint = {
if(secondRun) {
// the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
// and doesn't commit before flow starts running.
Thread.sleep(3000)
aliceNode.database.transaction {
checkpointStatusAfterRestart = findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().single().status
dbExceptionAfterRestart = findRecordsFromDatabase()