CORDA-296: added rpc that returns an observable for node state (#2004)

* CORDA-296: added rpc that returns an observable for node state; used to let rpc clients know that the know is about to shut down

* replaced node shut down observation String with enum
This commit is contained in:
bpaunescu 2017-11-08 12:44:10 +00:00 committed by GitHub
parent c7ec9ad8ac
commit 7d1f7ab53d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 59 additions and 0 deletions

View File

@ -5,12 +5,16 @@ import net.corda.client.rpc.internal.RPCClientConfiguration
import net.corda.core.crypto.random63BitValue import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.NodeState
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.* import net.corda.core.utilities.*
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.User
import net.corda.testing.driver.poll import net.corda.testing.driver.poll
import net.corda.testing.internal.* import net.corda.testing.internal.*
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
@ -236,6 +240,30 @@ class RPCStabilityTests {
} }
} }
@Test
fun `clients receive notifications that node is shutting down`() {
val alice = User("Alice", "Alice", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
val bob = User("Bob", "Bob", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
val slagathor = User("Slagathor", "Slagathor", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
val userList = listOf(alice, bob, slagathor)
val expectedMessages = ArrayList<NodeState>()
rpcDriver(startNodesInProcess = true) {
val node = startNode(rpcUsers = listOf(alice, bob, slagathor)).getOrThrow()
userList.forEach {
val connection = node.rpcClientToNode().start(it.username, it.password)
val nodeStateObservable = connection.proxy.nodeStateObservable()
nodeStateObservable.subscribe { update ->
expectedMessages.add(update)
}
}
node.stop()
}
assertEquals(userList.size, expectedMessages.size)
assertEquals(NodeState.SHUTTING_DOWN, expectedMessages.first())
}
interface TrackSubscriberOps : RPCOps { interface TrackSubscriberOps : RPCOps {
fun subscribe(): Observable<Unit> fun subscribe(): Observable<Unit>
} }

View File

@ -196,6 +196,10 @@ interface CordaRPCOps : RPCOps {
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */ /** Returns Node's NodeInfo, assuming this will not change while the node is running. */
fun nodeInfo(): NodeInfo fun nodeInfo(): NodeInfo
/** Returns and [Observable] object with future states of the node. */
@RPCReturnsObservables
fun nodeStateObservable(): Observable<NodeState>
/** /**
* Returns network's notary identities, assuming this will not change while the node is running. * Returns network's notary identities, assuming this will not change while the node is running.
* *
@ -428,3 +432,8 @@ inline fun <T, A, B, C, D, E, F, reified R : FlowLogic<T>> CordaRPCOps.startTrac
*/ */
@CordaSerializable @CordaSerializable
data class DataFeed<out A, B>(val snapshot: A, val updates: Observable<B>) data class DataFeed<out A, B>(val snapshot: A, val updates: Observable<B>)
@CordaSerializable
enum class NodeState {
SHUTTING_DOWN
}

View File

@ -8,11 +8,13 @@ import net.corda.core.crypto.SignableData
import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.SignatureMetadata
import net.corda.core.crypto.TransactionSignature import net.corda.core.crypto.TransactionSignature
import net.corda.core.flows.ContractUpgradeFlow import net.corda.core.flows.ContractUpgradeFlow
import net.corda.core.messaging.NodeState
import net.corda.core.node.services.* import net.corda.core.node.services.*
import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsToken
import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.FilteredTransaction
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.TransactionBuilder import net.corda.core.transactions.TransactionBuilder
import rx.Observable
import java.security.PublicKey import java.security.PublicKey
import java.sql.Connection import java.sql.Connection
import java.time.Clock import java.time.Clock
@ -138,6 +140,9 @@ interface ServiceHub : ServicesForResolution {
/** The [NodeInfo] object corresponding to our own entry in the network map. */ /** The [NodeInfo] object corresponding to our own entry in the network map. */
val myInfo: NodeInfo val myInfo: NodeInfo
/** The [Observable] object used to communicate to RPC clients the state of the node. */
val myNodeStateObservable: Observable<NodeState>
/** /**
* Return the singleton instance of the given Corda service type. This is a class that is annotated with * Return the singleton instance of the given Corda service type. This is a class that is annotated with
* [CordaService] and will have automatically been registered by the node. * [CordaService] and will have automatically been registered by the node.

View File

@ -62,6 +62,7 @@ import net.corda.node.utilities.*
import org.apache.activemq.artemis.utils.ReusableLatch import org.apache.activemq.artemis.utils.ReusableLatch
import org.slf4j.Logger import org.slf4j.Logger
import rx.Observable import rx.Observable
import rx.subjects.PublishSubject
import java.io.IOException import java.io.IOException
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.security.KeyPair import java.security.KeyPair
@ -124,6 +125,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
protected val services: ServiceHubInternal get() = _services protected val services: ServiceHubInternal get() = _services
private lateinit var _services: ServiceHubInternalImpl private lateinit var _services: ServiceHubInternalImpl
protected lateinit var info: NodeInfo protected lateinit var info: NodeInfo
protected val nodeStateObservable: PublishSubject<NodeState> = PublishSubject.create<NodeState>()
protected var myNotaryIdentity: PartyAndCertificate? = null protected var myNotaryIdentity: PartyAndCertificate? = null
protected lateinit var checkpointStorage: CheckpointStorage protected lateinit var checkpointStorage: CheckpointStorage
protected lateinit var smm: StateMachineManager protected lateinit var smm: StateMachineManager
@ -636,6 +638,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it // Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it
// unsubscribes us forcibly, rather than blocking the shutdown process. // unsubscribes us forcibly, rather than blocking the shutdown process.
// Notify observers that the node is shutting down
nodeStateObservable.onNext(NodeState.SHUTTING_DOWN)
// Run shutdown hooks in opposite order to starting // Run shutdown hooks in opposite order to starting
for (toRun in runOnStop.reversed()) { for (toRun in runOnStop.reversed()) {
toRun() toRun()
@ -737,6 +742,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
override val networkService: MessagingService get() = network override val networkService: MessagingService get() = network
override val clock: Clock get() = platformClock override val clock: Clock get() = platformClock
override val myInfo: NodeInfo get() = info override val myInfo: NodeInfo get() = info
override val myNodeStateObservable: Observable<NodeState> get() = nodeStateObservable
override val database: CordaPersistence get() = this@AbstractNode.database override val database: CordaPersistence get() = this@AbstractNode.database
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T { override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {

View File

@ -115,6 +115,10 @@ internal class CordaRPCOpsImpl(
return services.myInfo return services.myInfo
} }
override fun nodeStateObservable(): Observable<NodeState> {
return services.myNodeStateObservable
}
override fun notaryIdentities(): List<Party> { override fun notaryIdentities(): List<Party> {
return services.networkMapCache.notaryIdentities return services.networkMapCache.notaryIdentities
} }

View File

@ -8,6 +8,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.NodeState
import net.corda.core.node.NodeInfo import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.NetworkMapCache
import net.corda.core.node.services.Vault import net.corda.core.node.services.Vault
@ -16,6 +17,7 @@ import net.corda.core.node.services.vault.QueryCriteria
import net.corda.core.node.services.vault.Sort import net.corda.core.node.services.vault.Sort
import net.corda.node.services.messaging.RpcContext import net.corda.node.services.messaging.RpcContext
import net.corda.node.services.messaging.requireEitherPermission import net.corda.node.services.messaging.requireEitherPermission
import rx.Observable
import java.io.InputStream import java.io.InputStream
import java.security.PublicKey import java.security.PublicKey
@ -60,6 +62,8 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo) override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo)
override fun nodeStateObservable(): Observable<NodeState> = guard("nodeStateObservable", implementation::nodeStateObservable)
override fun notaryIdentities(): List<Party> = guard("notaryIdentities", implementation::notaryIdentities) override fun notaryIdentities(): List<Party> = guard("notaryIdentities", implementation::notaryIdentities)
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = guard("addVaultTransactionNote") { override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = guard("addVaultTransactionNote") {

View File

@ -9,6 +9,7 @@ import net.corda.core.identity.PartyAndCertificate
import net.corda.core.messaging.DataFeed import net.corda.core.messaging.DataFeed
import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.FlowProgressHandle import net.corda.core.messaging.FlowProgressHandle
import net.corda.core.messaging.NodeState
import net.corda.core.node.* import net.corda.core.node.*
import net.corda.core.node.services.* import net.corda.core.node.services.*
import net.corda.core.serialization.SerializeAsToken import net.corda.core.serialization.SerializeAsToken
@ -161,6 +162,8 @@ open class MockServices(
val identity = getTestPartyAndCertificate(MEGA_CORP.name, key.public) val identity = getTestPartyAndCertificate(MEGA_CORP.name, key.public)
return NodeInfo(emptyList(), listOf(identity), 1, serial = 1L) return NodeInfo(emptyList(), listOf(identity), 1, serial = 1L)
} }
override val myNodeStateObservable: Observable<NodeState>
get() = PublishSubject.create<NodeState>()
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2) override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)
val mockCordappProvider = MockCordappProvider(cordappLoader, attachments) val mockCordappProvider = MockCordappProvider(cordappLoader, attachments)
override val cordappProvider: CordappProvider get() = mockCordappProvider override val cordappProvider: CordappProvider get() = mockCordappProvider