CORDA-2942: Node lifecycle events (#5846)

* CORDA-2942: Port minimal set of changes to make lifecycle events work

... and make codebase compile.

* CORDA-2942: Undo some changes which are not strictly speaking necessary

* CORDA-2942: Make `NodeServicesContext` leaner and delete `extensions-api` module

* CORDA-2942: Reduce even more number of files affected

* CORDA-2942: Integration test fix

* CORDA-2942: Make events `AfterStart` and `BeforeStop` generic w.r.t. `NodeServicesContext`

* CORDA-2942: `NodeLifecycleObserverService` and a set of integration tests.

Public API violations are expected as well as integration tests failing.

* CORDA-2942: Re-work to introduce `ServiceLifecycleObserver`

* CORDA-2942: Explicitly mention a type of exception that may be thrown for some events.

* CORDA-2942: Register `ServiceLifecycleObserver` through `AppServiceHub`

* CORDA-2942: Fix integration test + KDocs update

* CORDA-2942: Detekt and `api-current` update

* CORDA-2942: Improvement to `CordaServiceLifecycleFatalTests`

... or else it has side effects on other tests.

* CORDA-2942: Add an integration test for new API use in Java

Driver test is written in Kotlin, but services definition is written in Java.

Also KDocs improvements.

* CORDA-2942: Documentation and release notes update

* CORDA-2942: First set of changes following review by @mnesbit

* CORDA-2942: Second set of changes following review by @mnesbit

* CORDA-2942: Added multi-threaded test

* CORDA-2942: Fixes

* CORDA-2942: Undo changes to `api-current.txt`

* CORDA-2942: Bare mimimum change to `api-current.txt` for CI gate to pass.

* CORDA-2942: Address review feedback from @rick-r3

* CORDA-2942: Detekt update

* CORDA-2942: Delete `ServiceLifecycleObserverPriority` and replace it with `Int` after discussion with @mnesbit

* CORDA-2942: Introduce more `NodeLifecycleEvent` and switch services to listen for those events

* CORDA-2942: Few more changes after input from @rick-r3

* First stub on integration test
Unfinished - hang on issue and pay

* CORDA-2942: Switch to use out-of-process nodes for the inetgration test

Currently Alice and Notary stuck waiting to hear from each other.

* CORDA-2942: Extra log lines during event distribution

* CORDA-2942: Asynchronously distribute lifecycle events

* CORDA-2942: Await for complete P2P client start-up

Next step: Add vault query to integration test

* CORDA-2942: Asynchronously distribute lifecycle events

Next step: Improve integration test

* CORDA-2942: Fix test broken by recent changes and improve logging

* CORDA-2942: Improvement of the test to be able to monitor actions performed by @CordaService in the remote process

* CORDA-2942: Add node re-start step to the integration test

* CORDA-2942: Remove `CORDAPP_STOPPED` event for now

* CORDA-2942: s/CORDAPP_STARTED/STATE_MACHINE_STARTED/

* CORDA-2942: Inverse the meaning of `priority` as requested by @rick-r3

* CORDA-2942: Register `AppServiceHubImpl` for lifecycle events and put a warning when SMM is not ready.
This commit is contained in:
Viktor Kolomeyko
2020-01-21 13:38:02 +00:00
committed by Rick Parker
parent a4d00b79d4
commit 0978500a9a
36 changed files with 1055 additions and 164 deletions

View File

@ -0,0 +1,10 @@
package net.corda.nodeapi.internal.lifecycle
import net.corda.common.configuration.parsing.internal.ConfigurationWithOptionsContainer
/**
* Bare minimum information which will be available even before node fully started-up.
*/
interface NodeInitialContext : ConfigurationWithOptionsContainer {
val platformVersion: Int
}

View File

@ -0,0 +1,128 @@
package net.corda.nodeapi.internal.lifecycle
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.concurrent.CordaFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.node.services.CordaServiceCriticalFailureException
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import java.util.Collections.singleton
import java.util.LinkedList
import java.util.concurrent.Executors
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReadWriteLock
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.system.exitProcess
/**
* Responsible for distributing of various `NodeLifecycleEvent` to `NodeLifecycleObserver`.
*
* This class may do it in an asynchronous fashion. Also it might listen to the feedback from observers on the notifications sent and perform
* actions depending on the observer's priority.
*
* The class is safe for concurrent use from multiple threads.
*/
class NodeLifecycleEventsDistributor {
companion object {
private val log = contextLogger()
private val criticalEventsClasses: Set<Class<out NodeLifecycleEvent>> = setOf(
NodeLifecycleEvent.BeforeNodeStart::class.java,
NodeLifecycleEvent.AfterNodeStart::class.java,
NodeLifecycleEvent.StateMachineStarted::class.java)
private val criticalExceptionsClasses: Set<Class<out Throwable>> = setOf(CordaServiceCriticalFailureException::class.java)
}
/**
* Order is maintained by priority and within equal priority by full class name.
*/
private val prioritizedObservers: MutableList<NodeLifecycleObserver> = mutableListOf()
private val readWriteLock: ReadWriteLock = ReentrantReadWriteLock()
private val executor = Executors.newSingleThreadExecutor(
ThreadFactoryBuilder().setNameFormat("NodeLifecycleEventsDistributor-%d").build())
/**
* Adds observer to the distribution list.
*/
fun <T : NodeLifecycleObserver> add(observer: T) : T {
addAll(singleton(observer))
return observer
}
/**
* Adds multiple observers to the distribution list.
*/
fun <T : NodeLifecycleObserver> addAll(observers: Collection<T>) : Collection<T> {
data class SortingKey(val priority: Int, val clazz: Class<*>) : Comparable<SortingKey> {
override fun compareTo(other: SortingKey): Int {
if(priority != other.priority) {
// Reversing sorting order such that higher priorities come first
return other.priority - priority
}
// Within the same priority order alphabetically by class name to deterministic order
return clazz.name.compareTo(other.clazz.name)
}
}
readWriteLock.writeLock().executeLocked {
prioritizedObservers.addAll(observers)
// In-place sorting
prioritizedObservers.sortBy { SortingKey(it.priority, it.javaClass) }
}
return observers
}
/**
* Distributes event to all the observers previously added
*
* @return [CordaFuture] to signal when distribution is finished and delivered to all the observers
*/
fun distributeEvent(event: NodeLifecycleEvent): CordaFuture<Unit> {
val snapshot = readWriteLock.readLock().executeLocked { LinkedList(prioritizedObservers) }
val result = openFuture<Any?>()
executor.execute {
val orderedSnapshot = if (event.reversedPriority) snapshot.reversed() else snapshot
orderedSnapshot.forEach {
log.debug("Distributing event $event to: $it")
val updateResult = it.update(event)
if (updateResult.isSuccess) {
log.debug("Event $event distribution outcome: $updateResult")
} else {
log.error("Failed to distribute event $event, failure outcome: $updateResult")
handlePossibleFatalTermination(event, updateResult as Try.Failure<String>)
}
}
result.set(null)
}
return result.map { }
}
private fun handlePossibleFatalTermination(event: NodeLifecycleEvent, updateFailed: Try.Failure<String>) {
if (event.javaClass in criticalEventsClasses && updateFailed.exception.javaClass in criticalExceptionsClasses) {
log.error("During processing of $event critical failure been reported: $updateFailed. JVM will be terminated.")
exitProcess(1)
} else {
log.warn("During processing of $event non-critical failure been reported: $updateFailed.")
}
}
/**
* Custom implementation vs. using [kotlin.concurrent.withLock] to allow interruption during lock acquisition.
*/
private fun <T> Lock.executeLocked(block: () -> T) : T {
lockInterruptibly()
try {
return block()
} finally {
unlock()
}
}
}

View File

@ -0,0 +1,56 @@
package net.corda.nodeapi.internal.lifecycle
import net.corda.core.utilities.Try
/**
* Interface to flag interest in the Corda Node lifecycle which involves being notified when the node is starting up or
* shutting down.
* Unlike [net.corda.core.node.services.ServiceLifecycleObserver] this is an internal interface that provides much richer
* functionality for interacting with node's internal services.
*/
interface NodeLifecycleObserver {
companion object {
const val RPC_PRIORITY_HIGH = 1200
const val RPC_PRIORITY_NORMAL = 1100
const val RPC_PRIORITY_LOW = 1020
/**
* Helper method to create a string to flag successful processing of an event.
*/
@Suppress("unused")
inline fun <reified T : NodeLifecycleObserver> T.reportSuccess(nodeLifecycleEvent: NodeLifecycleEvent) : String =
"${T::class.java} successfully processed $nodeLifecycleEvent"
}
/**
* Used to inform `NodeLifecycleObserver` of certain `NodeLifecycleEvent`.
*
* @return If even been processed successfully and the are no error conditions `Try.Success` with brief status, otherwise `Try.Failure`
* with exception explaining what went wrong.
* It is down to subject (i.e. Node) to decide what to do in case of failure and decision may depend on the Observer's priority.
*/
fun update(nodeLifecycleEvent: NodeLifecycleEvent) : Try<String> = Try.on { "${javaClass.simpleName} ignored $nodeLifecycleEvent" }
/**
* It is possible to optionally override observer priority.
*
* `start` methods will be invoked in the ascending sequence priority order. For items with the same order alphabetical ordering
* of full class name will be applied.
* For `stop` methods, the order will be opposite to `start`.
*/
val priority: Int
}
/**
* A set of events to flag the important milestones in the lifecycle of the node.
* @param reversedPriority flags whether it would make sense to notify observers in the reversed order.
*/
sealed class NodeLifecycleEvent(val reversedPriority: Boolean = false) {
class BeforeNodeStart(val nodeInitialContext: NodeInitialContext) : NodeLifecycleEvent()
class AfterNodeStart<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent()
class StateMachineStarted<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent()
class StateMachineStopped<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true)
class BeforeNodeStop<out T : NodeServicesContext>(val nodeServicesContext: T) : NodeLifecycleEvent(reversedPriority = true)
class AfterNodeStop(val nodeInitialContext: NodeInitialContext) : NodeLifecycleEvent(reversedPriority = true)
}

View File

@ -0,0 +1,17 @@
package net.corda.nodeapi.internal.lifecycle
import net.corda.core.CordaInternal
import net.corda.core.serialization.SerializeAsToken
/**
* Defines a set of properties that will be available for services to perform useful activity with side effects.
*/
interface NodeServicesContext : NodeInitialContext {
/**
* Special services which upon serialisation will be represented in the stream by a special token. On the remote side
* during deserialization token will be read and corresponding instance found and wired as necessary.
*/
@CordaInternal
val tokenizableServices: List<SerializeAsToken>
}

View File

@ -0,0 +1,61 @@
package net.corda.nodeapi.internal.lifecycle
import com.nhaarman.mockito_kotlin.mock
import net.corda.core.internal.stream
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import org.junit.Test
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.reportSuccess
import java.util.concurrent.atomic.AtomicLong
import kotlin.test.assertTrue
internal class NodeLifecycleEventsDistributorMultiThreadedTest {
companion object {
private val logger = contextLogger()
}
private val instance = NodeLifecycleEventsDistributor()
private val addedCounter = AtomicLong()
private val eventsDeliveredCounter = AtomicLong()
@Test
fun addAndDistributeConcurrently() {
val initialObserversCount = 10
repeat(initialObserversCount) { instance.add(MyObserver(it)) }
val operationsCount = 100_000
val event = NodeLifecycleEvent.BeforeNodeStart(mock())
val additionFreq = 1000
val distributionFutures = (1..operationsCount).stream(true).mapToObj {
if(it % additionFreq == 0) {
logger.debug("Adding observer")
instance.add(MyObserver(it))
addedCounter.incrementAndGet()
logger.info("Progress so far: $it")
}
logger.debug("Distributing event")
instance.distributeEvent(event)
}
distributionFutures.forEach { it.get() }
with(eventsDeliveredCounter.get()) {
// Greater than original observers times events
assertTrue("$this") { this > initialObserversCount.toLong() * operationsCount }
// Less than ever added observers times events
assertTrue("$this") { this < (initialObserversCount.toLong() + addedCounter.get()) * operationsCount }
}
}
inner class MyObserver(seqNum: Int) : NodeLifecycleObserver {
override val priority: Int = seqNum % 10
override fun update(nodeLifecycleEvent: NodeLifecycleEvent): Try<String> = Try.on {
eventsDeliveredCounter.incrementAndGet()
reportSuccess(nodeLifecycleEvent)
}
}
}