From 03fa529292b80c996f44c83c8b3caedbe06c2480 Mon Sep 17 00:00:00 2001 From: Chris Cochrane <78791827+chriscochrane@users.noreply.github.com> Date: Fri, 10 Jun 2022 19:56:02 +0100 Subject: [PATCH] ENT-6791 New service lifecycle event published just before starting the state machine (#7164) --- .../node/services/ServiceLifecycleObserver.kt | 1 + .../lifecycle/NodeLifecycleObserver.kt | 1 + .../corda/node/flows/FlowEntityManagerTest.kt | 33 +++--- .../CordaServiceLifecycleFatalTests.kt | 12 +- .../services/CordaServiceLifecycleTests.kt | 109 ++++++++++++++++-- .../net/corda/node/internal/AbstractNode.kt | 5 + .../corda/node/internal/AppServiceHubImpl.kt | 4 + 7 files changed, 136 insertions(+), 29 deletions(-) diff --git a/core/src/main/kotlin/net/corda/core/node/services/ServiceLifecycleObserver.kt b/core/src/main/kotlin/net/corda/core/node/services/ServiceLifecycleObserver.kt index 9921894f80..ea4302ab22 100644 --- a/core/src/main/kotlin/net/corda/core/node/services/ServiceLifecycleObserver.kt +++ b/core/src/main/kotlin/net/corda/core/node/services/ServiceLifecycleObserver.kt @@ -25,6 +25,7 @@ enum class ServiceLifecycleEvent { * sense for Corda node to continue its operation. The lifecycle events dispatcher will endeavor to terminate node's JVM as soon * as practically possible. */ + BEFORE_STATE_MACHINE_START, STATE_MACHINE_STARTED, } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleObserver.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleObserver.kt index 0232029188..b960f8bdb0 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleObserver.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/lifecycle/NodeLifecycleObserver.kt @@ -49,6 +49,7 @@ interface NodeLifecycleObserver { sealed class NodeLifecycleEvent(val reversedPriority: Boolean = false) { class BeforeNodeStart(val nodeInitialContext: NodeInitialContext) : NodeLifecycleEvent() class AfterNodeStart(val nodeServicesContext: T) : NodeLifecycleEvent() + class BeforeStateMachineStart(val nodeServicesContext: T) : NodeLifecycleEvent() class StateMachineStarted(val nodeServicesContext: T) : NodeLifecycleEvent() class StateMachineStopped(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true) class BeforeNodeStop(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true) diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerTest.kt index fac7f780c5..b54d9d0d75 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowEntityManagerTest.kt @@ -16,6 +16,7 @@ import net.corda.core.internal.concurrent.transpose import net.corda.core.messaging.startFlow import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService +import net.corda.core.node.services.ServiceLifecycleEvent import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder @@ -854,19 +855,25 @@ class FlowEntityManagerTest : AbstractFlowEntityManagerTest() { init { if (includeRawUpdates) { services.register { - services.vaultService.rawUpdates.subscribe { - if (insertionType == InsertionType.ENTITY_MANAGER) { - services.withEntityManager { - persist(entityWithIdOne) - persist(entityWithIdTwo) - persist(entityWithIdThree) - } - } else { - services.jdbcSession().run { - insert(entityWithIdOne) - insert(entityWithIdTwo) - insert(entityWithIdThree) - } + processEvent(it) + } + } + } + + private fun processEvent(event : ServiceLifecycleEvent) { + if (event == ServiceLifecycleEvent.STATE_MACHINE_STARTED) { + services.vaultService.rawUpdates.subscribe { + if (insertionType == InsertionType.ENTITY_MANAGER) { + services.withEntityManager { + persist(entityWithIdOne) + persist(entityWithIdTwo) + persist(entityWithIdThree) + } + } else { + services.jdbcSession().run { + insert(entityWithIdOne) + insert(entityWithIdTwo) + insert(entityWithIdThree) } } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt index a8b19e7d91..35c1352c7a 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleFatalTests.kt @@ -51,12 +51,14 @@ class CordaServiceLifecycleFatalTests { object FailingObserver : ServiceLifecycleObserver { override fun onServiceLifecycleEvent(event: ServiceLifecycleEvent) { - val tmpFile = File(System.getProperty(tempFilePropertyName)) - tmpFile.appendText("\n" + readyToThrowMarker) - eventually(duration = 30.seconds) { - assertEquals(goodToThrowMarker, tmpFile.readLines().last()) + if (event == ServiceLifecycleEvent.STATE_MACHINE_STARTED) { + val tmpFile = File(System.getProperty(tempFilePropertyName)) + tmpFile.appendText("\n" + readyToThrowMarker) + eventually(duration = 30.seconds) { + assertEquals(goodToThrowMarker, tmpFile.readLines().last()) + } + throw CordaServiceCriticalFailureException("controlled failure") } - throw CordaServiceCriticalFailureException("controlled failure") } } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleTests.kt index 9530cb0af2..e615e91a60 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/CordaServiceLifecycleTests.kt @@ -3,18 +3,23 @@ package net.corda.node.services import co.paralleluniverse.fibers.Suspendable import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC +import net.corda.core.flows.StartableByService import net.corda.core.messaging.startFlow import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService import net.corda.core.node.services.ServiceLifecycleEvent +import net.corda.core.node.services.ServiceLifecycleEvent.BEFORE_STATE_MACHINE_START import net.corda.core.node.services.ServiceLifecycleEvent.STATE_MACHINE_STARTED import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.getOrThrow import net.corda.testing.core.ALICE_NAME import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.InProcess import net.corda.testing.driver.driver import net.corda.testing.node.internal.enclosedCordapp +import org.junit.Before import org.junit.Test + import kotlin.test.assertEquals class CordaServiceLifecycleTests { @@ -22,23 +27,70 @@ class CordaServiceLifecycleTests { private companion object { const val TEST_PHRASE = "testPhrase" + // the number of times to register a service callback + private var numServiceCallbacks = 0 + // the set of events a test wants to capture + private var eventsToBeCaptured: MutableSet = mutableSetOf() + // the events that were actually captured in a test private val eventsCaptured: MutableList = mutableListOf() + + } + + @Before + fun setup() { + numServiceCallbacks = 1 + eventsCaptured.clear() + eventsToBeCaptured = setOf(BEFORE_STATE_MACHINE_START, STATE_MACHINE_STARTED).toMutableSet() } @Test(timeout=300_000) fun `corda service receives events`() { - eventsCaptured.clear() val result = driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) { val node = startNode(providedName = ALICE_NAME).getOrThrow() node.rpc.startFlow(::ComputeTextLengthThroughCordaService, TEST_PHRASE).returnValue.getOrThrow() } + val expectedEventsAndTheOrderTheyOccurIn = listOf(BEFORE_STATE_MACHINE_START, STATE_MACHINE_STARTED) assertEquals(TEST_PHRASE.length, result) - assertEquals(1, eventsCaptured.size) - assertEquals(listOf(STATE_MACHINE_STARTED), eventsCaptured) + assertEquals(numServiceCallbacks * 2, eventsCaptured.size) + assertEquals(expectedEventsAndTheOrderTheyOccurIn, eventsCaptured) + } + + @Test(timeout=300_000) + fun `corda service receives BEFORE_STATE_MACHINE_START before the state machine is started`() { + testStateMachineManagerStatusWhenServiceEventOccurs( + event = BEFORE_STATE_MACHINE_START, + expectedResult = TestSmmStateService.STATE_MACHINE_MANAGER_WAS_NOT_STARTED + ) + } + + @Test(timeout=300_000) + fun `corda service receives STATE_MACHINE_STARTED after the state machine is started`() { + testStateMachineManagerStatusWhenServiceEventOccurs( + event = STATE_MACHINE_STARTED, + expectedResult = TestSmmStateService.STATE_MACHINE_MANAGER_WAS_STARTED + ) + } + + /** + * Commonised + */ + private fun testStateMachineManagerStatusWhenServiceEventOccurs(event: ServiceLifecycleEvent, expectedResult : Int) { + val result = driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), + notarySpecs = emptyList())) { + val node = startNode(providedName = ALICE_NAME).getOrThrow() + if (node is InProcess) { // assuming the node-handle is always one of these + val svc = node.services.cordaService(TestSmmStateService::class.java) + svc.getSmmStartedForEvent(event) + } else { + TestSmmStateService.STATE_MACHINE_MANAGER_UNKNOWN_STATUS + } + } + assertEquals(expectedResult, result) } @StartableByRPC + @StartableByService class ComputeTextLengthThroughCordaService(private val text: String) : FlowLogic() { @Suspendable override fun call(): Int { @@ -52,17 +104,14 @@ class CordaServiceLifecycleTests { class TextLengthComputingService(services: AppServiceHub) : SingletonSerializeAsToken() { init { - services.register { addEvent(it) } + for (n in 1..numServiceCallbacks) { + services.register { addEvent(it) } + } } private fun addEvent(event: ServiceLifecycleEvent) { - when (event) { - STATE_MACHINE_STARTED -> { - eventsCaptured.add(event) - } - else -> { - eventsCaptured.add(event) - } + if (event in eventsToBeCaptured) { + eventsCaptured.add(event) } } @@ -71,4 +120,42 @@ class CordaServiceLifecycleTests { return text.length } } + + /** + * Service that checks the State Machine Manager state (started, not started) when service events are received. + */ + @CordaService + class TestSmmStateService(private val services: AppServiceHub) : SingletonSerializeAsToken() { + + companion object { + const val STATE_MACHINE_MANAGER_UNKNOWN_STATUS = -1 + const val STATE_MACHINE_MANAGER_WAS_NOT_STARTED = 0 + const val STATE_MACHINE_MANAGER_WAS_STARTED = 1 + } + + var smmStateAtEvent = mutableMapOf() + + init { + services.register { addEvent(it) } + } + + private fun addEvent(event: ServiceLifecycleEvent) { + smmStateAtEvent[event] = checkSmmStarted() + } + + private fun checkSmmStarted() : Int { + // try to start a flow; success == SMM started + try { + services.startFlow(ComputeTextLengthThroughCordaService(TEST_PHRASE)).returnValue.getOrThrow() + return STATE_MACHINE_MANAGER_WAS_STARTED + } catch (ex : UninitializedPropertyAccessException) { + return STATE_MACHINE_MANAGER_WAS_NOT_STARTED + } + } + + /** + * Given an event, was the SMM started when the event was received? + */ + fun getSmmStartedForEvent(event: ServiceLifecycleEvent) : Int = smmStateAtEvent.getOrDefault(event, STATE_MACHINE_MANAGER_UNKNOWN_STATUS) + } } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 705d5dc447..53b1aa9bdc 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -639,6 +639,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration, tokenizableServices = null verifyCheckpointsCompatible(frozenTokenizableServices) + /* Note the .get() at the end of the distributeEvent call, below. + This will block until all Corda Services have returned from processing the event, allowing a service to prevent the + state machine manager from starting (just below this) until the service is ready. + */ + nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.BeforeStateMachineStart(nodeServicesContext)).get() val callback = smm.start(frozenTokenizableServices) val smmStartedFuture = rootFuture.map { callback() } // Shut down the SMM so no Fibers are scheduled. diff --git a/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt b/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt index 922316045b..5acf706f91 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AppServiceHubImpl.kt @@ -45,6 +45,10 @@ internal class AppServiceHubImpl(private val serviceHub: S observer.onServiceLifecycleEvent(ServiceLifecycleEvent.STATE_MACHINE_STARTED) reportSuccess(nodeLifecycleEvent) } + is NodeLifecycleEvent.BeforeStateMachineStart<*> -> Try.on { + observer.onServiceLifecycleEvent(ServiceLifecycleEvent.BEFORE_STATE_MACHINE_START) + reportSuccess(nodeLifecycleEvent) + } else -> super.update(nodeLifecycleEvent) } }