ENT-6791 New service lifecycle event published just before starting the state machine (#7164)

This commit is contained in:
Chris Cochrane
2022-06-10 19:56:02 +01:00
committed by GitHub
parent cd1e3bab85
commit 03fa529292
7 changed files with 136 additions and 29 deletions

View File

@ -25,6 +25,7 @@ enum class ServiceLifecycleEvent {
* sense for Corda node to continue its operation. The lifecycle events dispatcher will endeavor to terminate node's JVM as soon * sense for Corda node to continue its operation. The lifecycle events dispatcher will endeavor to terminate node's JVM as soon
* as practically possible. * as practically possible.
*/ */
BEFORE_STATE_MACHINE_START,
STATE_MACHINE_STARTED, STATE_MACHINE_STARTED,
} }

View File

@ -49,6 +49,7 @@ interface NodeLifecycleObserver {
sealed class NodeLifecycleEvent(val reversedPriority: Boolean = false) { sealed class NodeLifecycleEvent(val reversedPriority: Boolean = false) {
class BeforeNodeStart(val nodeInitialContext: NodeInitialContext) : NodeLifecycleEvent() class BeforeNodeStart(val nodeInitialContext: NodeInitialContext) : NodeLifecycleEvent()
class AfterNodeStart<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent() class AfterNodeStart<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent()
class BeforeStateMachineStart<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent()
class StateMachineStarted<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent() class StateMachineStarted<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent()
class StateMachineStopped<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true) class StateMachineStopped<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true)
class BeforeNodeStop<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true) class BeforeNodeStop<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true)

View File

@ -16,6 +16,7 @@ import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService import net.corda.core.node.services.CordaService
import net.corda.core.node.services.ServiceLifecycleEvent
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
@ -854,6 +855,13 @@ class FlowEntityManagerTest : AbstractFlowEntityManagerTest() {
init { init {
if (includeRawUpdates) { if (includeRawUpdates) {
services.register { services.register {
processEvent(it)
}
}
}
private fun processEvent(event : ServiceLifecycleEvent) {
if (event == ServiceLifecycleEvent.STATE_MACHINE_STARTED) {
services.vaultService.rawUpdates.subscribe { services.vaultService.rawUpdates.subscribe {
if (insertionType == InsertionType.ENTITY_MANAGER) { if (insertionType == InsertionType.ENTITY_MANAGER) {
services.withEntityManager { services.withEntityManager {
@ -871,7 +879,6 @@ class FlowEntityManagerTest : AbstractFlowEntityManagerTest() {
} }
} }
} }
}
private fun Connection.insert(entity: CustomTableEntity) { private fun Connection.insert(entity: CustomTableEntity) {
prepareStatement("INSERT INTO $TABLE_NAME VALUES (?, ?, ?)").apply { prepareStatement("INSERT INTO $TABLE_NAME VALUES (?, ?, ?)").apply {

View File

@ -51,6 +51,7 @@ class CordaServiceLifecycleFatalTests {
object FailingObserver : ServiceLifecycleObserver { object FailingObserver : ServiceLifecycleObserver {
override fun onServiceLifecycleEvent(event: ServiceLifecycleEvent) { override fun onServiceLifecycleEvent(event: ServiceLifecycleEvent) {
if (event == ServiceLifecycleEvent.STATE_MACHINE_STARTED) {
val tmpFile = File(System.getProperty(tempFilePropertyName)) val tmpFile = File(System.getProperty(tempFilePropertyName))
tmpFile.appendText("\n" + readyToThrowMarker) tmpFile.appendText("\n" + readyToThrowMarker)
eventually(duration = 30.seconds) { eventually(duration = 30.seconds) {
@ -60,6 +61,7 @@ class CordaServiceLifecycleFatalTests {
} }
} }
} }
}
@Test(timeout=300_000) @Test(timeout=300_000)
fun `JVM terminates on critical failure`() { fun `JVM terminates on critical failure`() {

View File

@ -3,18 +3,23 @@ package net.corda.node.services
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StartableByService
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService import net.corda.core.node.services.CordaService
import net.corda.core.node.services.ServiceLifecycleEvent import net.corda.core.node.services.ServiceLifecycleEvent
import net.corda.core.node.services.ServiceLifecycleEvent.BEFORE_STATE_MACHINE_START
import net.corda.core.node.services.ServiceLifecycleEvent.STATE_MACHINE_STARTED import net.corda.core.node.services.ServiceLifecycleEvent.STATE_MACHINE_STARTED
import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.InProcess
import net.corda.testing.driver.driver import net.corda.testing.driver.driver
import net.corda.testing.node.internal.enclosedCordapp import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Before
import org.junit.Test import org.junit.Test
import kotlin.test.assertEquals import kotlin.test.assertEquals
class CordaServiceLifecycleTests { class CordaServiceLifecycleTests {
@ -22,23 +27,70 @@ class CordaServiceLifecycleTests {
private companion object { private companion object {
const val TEST_PHRASE = "testPhrase" const val TEST_PHRASE = "testPhrase"
// the number of times to register a service callback
private var numServiceCallbacks = 0
// the set of events a test wants to capture
private var eventsToBeCaptured: MutableSet<ServiceLifecycleEvent> = mutableSetOf()
// the events that were actually captured in a test
private val eventsCaptured: MutableList<ServiceLifecycleEvent> = mutableListOf() private val eventsCaptured: MutableList<ServiceLifecycleEvent> = mutableListOf()
}
@Before
fun setup() {
numServiceCallbacks = 1
eventsCaptured.clear()
eventsToBeCaptured = setOf(BEFORE_STATE_MACHINE_START, STATE_MACHINE_STARTED).toMutableSet()
} }
@Test(timeout=300_000) @Test(timeout=300_000)
fun `corda service receives events`() { fun `corda service receives events`() {
eventsCaptured.clear()
val result = driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), val result = driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()),
notarySpecs = emptyList())) { notarySpecs = emptyList())) {
val node = startNode(providedName = ALICE_NAME).getOrThrow() val node = startNode(providedName = ALICE_NAME).getOrThrow()
node.rpc.startFlow(::ComputeTextLengthThroughCordaService, TEST_PHRASE).returnValue.getOrThrow() node.rpc.startFlow(::ComputeTextLengthThroughCordaService, TEST_PHRASE).returnValue.getOrThrow()
} }
val expectedEventsAndTheOrderTheyOccurIn = listOf(BEFORE_STATE_MACHINE_START, STATE_MACHINE_STARTED)
assertEquals(TEST_PHRASE.length, result) assertEquals(TEST_PHRASE.length, result)
assertEquals(1, eventsCaptured.size) assertEquals(numServiceCallbacks * 2, eventsCaptured.size)
assertEquals(listOf(STATE_MACHINE_STARTED), eventsCaptured) assertEquals(expectedEventsAndTheOrderTheyOccurIn, eventsCaptured)
}
@Test(timeout=300_000)
fun `corda service receives BEFORE_STATE_MACHINE_START before the state machine is started`() {
testStateMachineManagerStatusWhenServiceEventOccurs(
event = BEFORE_STATE_MACHINE_START,
expectedResult = TestSmmStateService.STATE_MACHINE_MANAGER_WAS_NOT_STARTED
)
}
@Test(timeout=300_000)
fun `corda service receives STATE_MACHINE_STARTED after the state machine is started`() {
testStateMachineManagerStatusWhenServiceEventOccurs(
event = STATE_MACHINE_STARTED,
expectedResult = TestSmmStateService.STATE_MACHINE_MANAGER_WAS_STARTED
)
}
/**
* Commonised
*/
private fun testStateMachineManagerStatusWhenServiceEventOccurs(event: ServiceLifecycleEvent, expectedResult : Int) {
val result = driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()),
notarySpecs = emptyList())) {
val node = startNode(providedName = ALICE_NAME).getOrThrow()
if (node is InProcess) { // assuming the node-handle is always one of these
val svc = node.services.cordaService(TestSmmStateService::class.java)
svc.getSmmStartedForEvent(event)
} else {
TestSmmStateService.STATE_MACHINE_MANAGER_UNKNOWN_STATUS
}
}
assertEquals(expectedResult, result)
} }
@StartableByRPC @StartableByRPC
@StartableByService
class ComputeTextLengthThroughCordaService(private val text: String) : FlowLogic<Int>() { class ComputeTextLengthThroughCordaService(private val text: String) : FlowLogic<Int>() {
@Suspendable @Suspendable
override fun call(): Int { override fun call(): Int {
@ -52,18 +104,15 @@ class CordaServiceLifecycleTests {
class TextLengthComputingService(services: AppServiceHub) : SingletonSerializeAsToken() { class TextLengthComputingService(services: AppServiceHub) : SingletonSerializeAsToken() {
init { init {
for (n in 1..numServiceCallbacks) {
services.register { addEvent(it) } services.register { addEvent(it) }
} }
}
private fun addEvent(event: ServiceLifecycleEvent) { private fun addEvent(event: ServiceLifecycleEvent) {
when (event) { if (event in eventsToBeCaptured) {
STATE_MACHINE_STARTED -> {
eventsCaptured.add(event) eventsCaptured.add(event)
} }
else -> {
eventsCaptured.add(event)
}
}
} }
fun computeLength(text: String): Int { fun computeLength(text: String): Int {
@ -71,4 +120,42 @@ class CordaServiceLifecycleTests {
return text.length return text.length
} }
} }
/**
* Service that checks the State Machine Manager state (started, not started) when service events are received.
*/
@CordaService
class TestSmmStateService(private val services: AppServiceHub) : SingletonSerializeAsToken() {
companion object {
const val STATE_MACHINE_MANAGER_UNKNOWN_STATUS = -1
const val STATE_MACHINE_MANAGER_WAS_NOT_STARTED = 0
const val STATE_MACHINE_MANAGER_WAS_STARTED = 1
}
var smmStateAtEvent = mutableMapOf<ServiceLifecycleEvent, Int>()
init {
services.register { addEvent(it) }
}
private fun addEvent(event: ServiceLifecycleEvent) {
smmStateAtEvent[event] = checkSmmStarted()
}
private fun checkSmmStarted() : Int {
// try to start a flow; success == SMM started
try {
services.startFlow(ComputeTextLengthThroughCordaService(TEST_PHRASE)).returnValue.getOrThrow()
return STATE_MACHINE_MANAGER_WAS_STARTED
} catch (ex : UninitializedPropertyAccessException) {
return STATE_MACHINE_MANAGER_WAS_NOT_STARTED
}
}
/**
* Given an event, was the SMM started when the event was received?
*/
fun getSmmStartedForEvent(event: ServiceLifecycleEvent) : Int = smmStateAtEvent.getOrDefault(event, STATE_MACHINE_MANAGER_UNKNOWN_STATUS)
}
} }

View File

@ -639,6 +639,11 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
tokenizableServices = null tokenizableServices = null
verifyCheckpointsCompatible(frozenTokenizableServices) verifyCheckpointsCompatible(frozenTokenizableServices)
/* Note the .get() at the end of the distributeEvent call, below.
This will block until all Corda Services have returned from processing the event, allowing a service to prevent the
state machine manager from starting (just below this) until the service is ready.
*/
nodeLifecycleEventsDistributor.distributeEvent(NodeLifecycleEvent.BeforeStateMachineStart(nodeServicesContext)).get()
val callback = smm.start(frozenTokenizableServices) val callback = smm.start(frozenTokenizableServices)
val smmStartedFuture = rootFuture.map { callback() } val smmStartedFuture = rootFuture.map { callback() }
// Shut down the SMM so no Fibers are scheduled. // Shut down the SMM so no Fibers are scheduled.

View File

@ -45,6 +45,10 @@ internal class AppServiceHubImpl<T : SerializeAsToken>(private val serviceHub: S
observer.onServiceLifecycleEvent(ServiceLifecycleEvent.STATE_MACHINE_STARTED) observer.onServiceLifecycleEvent(ServiceLifecycleEvent.STATE_MACHINE_STARTED)
reportSuccess(nodeLifecycleEvent) reportSuccess(nodeLifecycleEvent)
} }
is NodeLifecycleEvent.BeforeStateMachineStart<*> -> Try.on {
observer.onServiceLifecycleEvent(ServiceLifecycleEvent.BEFORE_STATE_MACHINE_START)
reportSuccess(nodeLifecycleEvent)
}
else -> super.update(nodeLifecycleEvent) else -> super.update(nodeLifecycleEvent)
} }
} }