mirror of
synced 2025-03-19 18:45:28 +00:00
CORDA-296: added rpc that returns an observable for node state (#2004)
* CORDA-296: added rpc that returns an observable for node state; used to let rpc clients know that the know is about to shut down * replaced node shut down observation String with enum
This commit is contained in:
@ -5,12 +5,16 @@ import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.NodeState
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.User
import net.corda.testing.driver.poll
import net.corda.testing.internal.*
import org.apache.activemq.artemis.api.core.SimpleString
@ -236,6 +240,30 @@ class RPCStabilityTests {
fun `clients receive notifications that node is shutting down`() {
val alice = User("Alice", "Alice", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
val bob = User("Bob", "Bob", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
val slagathor = User("Slagathor", "Slagathor", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
val userList = listOf(alice, bob, slagathor)
val expectedMessages = ArrayList<NodeState>()
rpcDriver(startNodesInProcess = true) {
val node = startNode(rpcUsers = listOf(alice, bob, slagathor)).getOrThrow()
userList.forEach {
val connection = node.rpcClientToNode().start(it.username, it.password)
val nodeStateObservable = connection.proxy.nodeStateObservable()
nodeStateObservable.subscribe { update ->
assertEquals(userList.size, expectedMessages.size)
assertEquals(NodeState.SHUTTING_DOWN, expectedMessages.first())
interface TrackSubscriberOps : RPCOps {
fun subscribe(): Observable<Unit>
@ -196,6 +196,10 @@ interface CordaRPCOps : RPCOps {
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */
fun nodeInfo(): NodeInfo
/** Returns and [Observable] object with future states of the node. */
fun nodeStateObservable(): Observable<NodeState>
* Returns network's notary identities, assuming this will not change while the node is running.
@ -428,3 +432,8 @@ inline fun <T, A, B, C, D, E, F, reified R : FlowLogic<T>> CordaRPCOps.startTrac
data class DataFeed<out A, B>(val snapshot: A, val updates: Observable<B>)
enum class NodeState {
@ -8,11 +8,13 @@ import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.ContractUpgradeFlow
import net.corda.core.messaging.NodeState
import net.corda.core.node.services.*
import net.corda.core.serialization.SerializeAsToken
import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder
import rx.Observable
import java.security.PublicKey
import java.sql.Connection
import java.time.Clock
@ -138,6 +140,9 @@ interface ServiceHub : ServicesForResolution {
/** The [NodeInfo] object corresponding to our own entry in the network map. */
val myInfo: NodeInfo
/** The [Observable] object used to communicate to RPC clients the state of the node. */
val myNodeStateObservable: Observable<NodeState>
* Return the singleton instance of the given Corda service type. This is a class that is annotated with
* [CordaService] and will have automatically been registered by the node.
@ -62,6 +62,7 @@ import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.Logger
import rx.Observable
import rx.subjects.PublishSubject
import java.io.IOException
import java.lang.reflect.InvocationTargetException
import java.security.KeyPair
@ -124,6 +125,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val services: ServiceHubInternal get() = _services
private lateinit var _services: ServiceHubInternalImpl
protected lateinit var info: NodeInfo
protected val nodeStateObservable: PublishSubject<NodeState> = PublishSubject.create<NodeState>()
protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage
protected lateinit var smm: StateMachineManager
@ -636,6 +638,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it
// unsubscribes us forcibly, rather than blocking the shutdown process.
// Notify observers that the node is shutting down
// Run shutdown hooks in opposite order to starting
for (toRun in runOnStop.reversed()) {
@ -737,6 +742,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override val networkService: MessagingService get() = network
override val clock: Clock get() = platformClock
override val myInfo: NodeInfo get() = info
override val myNodeStateObservable: Observable<NodeState> get() = nodeStateObservable
override val database: CordaPersistence get() = this@AbstractNode.database
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
@ -115,6 +115,10 @@ internal class CordaRPCOpsImpl(
return services.myInfo
override fun nodeStateObservable(): Observable<NodeState> {
return services.myNodeStateObservable
override fun notaryIdentities(): List<Party> {
return services.networkMapCache.notaryIdentities
@ -8,6 +8,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.NodeState
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault
@ -16,6 +17,7 @@ import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort
import net.corda.node.services.messaging.RpcContext
import net.corda.node.services.messaging.requireEitherPermission
import rx.Observable
import java.io.InputStream
import java.security.PublicKey
@ -60,6 +62,8 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo)
override fun nodeStateObservable(): Observable<NodeState> = guard("nodeStateObservable", implementation::nodeStateObservable)
override fun notaryIdentities(): List<Party> = guard("notaryIdentities", implementation::notaryIdentities)
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = guard("addVaultTransactionNote") {
@ -9,6 +9,7 @@ import net.corda.core.identity.PartyAndCertificate
import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.NodeState
import net.corda.core.node.*
import net.corda.core.node.services.*
import net.corda.core.serialization.SerializeAsToken
@ -161,6 +162,8 @@ open class MockServices(
val identity = getTestPartyAndCertificate(MEGA_CORP.name, key.public)
return NodeInfo(emptyList(), listOf(identity), 1, serial = 1L)
override val myNodeStateObservable: Observable<NodeState>
get() = PublishSubject.create<NodeState>()
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)
val mockCordappProvider = MockCordappProvider(cordappLoader, attachments)
override val cordappProvider: CordappProvider get() = mockCordappProvider
Reference in New Issue
Block a user