diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 3e1f341e41..13cce44238 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -5,12 +5,16 @@ import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.NodeState import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.serialize import net.corda.core.utilities.* +import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.RPCApi +import net.corda.nodeapi.User import net.corda.testing.driver.poll import net.corda.testing.internal.* import org.apache.activemq.artemis.api.core.SimpleString @@ -236,6 +240,30 @@ class RPCStabilityTests { } } + @Test + fun `clients receive notifications that node is shutting down`() { + val alice = User("Alice", "Alice", setOf(invokeRpc(CordaRPCOps::nodeStateObservable))) + val bob = User("Bob", "Bob", setOf(invokeRpc(CordaRPCOps::nodeStateObservable))) + val slagathor = User("Slagathor", "Slagathor", setOf(invokeRpc(CordaRPCOps::nodeStateObservable))) + val userList = listOf(alice, bob, slagathor) + val expectedMessages = ArrayList() + + rpcDriver(startNodesInProcess = true) { + val node = startNode(rpcUsers = listOf(alice, bob, slagathor)).getOrThrow() + userList.forEach { + val connection = node.rpcClientToNode().start(it.username, it.password) + val nodeStateObservable = connection.proxy.nodeStateObservable() + nodeStateObservable.subscribe { update -> + expectedMessages.add(update) + } + } + + node.stop() + } + assertEquals(userList.size, expectedMessages.size) + assertEquals(NodeState.SHUTTING_DOWN, expectedMessages.first()) + } + interface TrackSubscriberOps : RPCOps { fun subscribe(): Observable } diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 9556bb0a4f..a17b6dcfb7 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -196,6 +196,10 @@ interface CordaRPCOps : RPCOps { /** Returns Node's NodeInfo, assuming this will not change while the node is running. */ fun nodeInfo(): NodeInfo + /** Returns and [Observable] object with future states of the node. */ + @RPCReturnsObservables + fun nodeStateObservable(): Observable + /** * Returns network's notary identities, assuming this will not change while the node is running. * @@ -428,3 +432,8 @@ inline fun > CordaRPCOps.startTrac */ @CordaSerializable data class DataFeed(val snapshot: A, val updates: Observable) + +@CordaSerializable +enum class NodeState { + SHUTTING_DOWN +} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index 41d0980651..59d0b3adfc 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -8,11 +8,13 @@ import net.corda.core.crypto.SignableData import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.ContractUpgradeFlow +import net.corda.core.messaging.NodeState import net.corda.core.node.services.* import net.corda.core.serialization.SerializeAsToken import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder +import rx.Observable import java.security.PublicKey import java.sql.Connection import java.time.Clock @@ -138,6 +140,9 @@ interface ServiceHub : ServicesForResolution { /** The [NodeInfo] object corresponding to our own entry in the network map. */ val myInfo: NodeInfo + /** The [Observable] object used to communicate to RPC clients the state of the node. */ + val myNodeStateObservable: Observable + /** * Return the singleton instance of the given Corda service type. This is a class that is annotated with * [CordaService] and will have automatically been registered by the node. diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 5323cd6a82..82a4b07bd9 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -62,6 +62,7 @@ import net.corda.node.utilities.* import org.apache.activemq.artemis.utils.ReusableLatch import org.slf4j.Logger import rx.Observable +import rx.subjects.PublishSubject import java.io.IOException import java.lang.reflect.InvocationTargetException import java.security.KeyPair @@ -124,6 +125,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected val services: ServiceHubInternal get() = _services private lateinit var _services: ServiceHubInternalImpl protected lateinit var info: NodeInfo + protected val nodeStateObservable: PublishSubject = PublishSubject.create() protected var myNotaryIdentity: PartyAndCertificate? = null protected lateinit var checkpointStorage: CheckpointStorage protected lateinit var smm: StateMachineManager @@ -636,6 +638,9 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it // unsubscribes us forcibly, rather than blocking the shutdown process. + // Notify observers that the node is shutting down + nodeStateObservable.onNext(NodeState.SHUTTING_DOWN) + // Run shutdown hooks in opposite order to starting for (toRun in runOnStop.reversed()) { toRun() @@ -737,6 +742,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, override val networkService: MessagingService get() = network override val clock: Clock get() = platformClock override val myInfo: NodeInfo get() = info + override val myNodeStateObservable: Observable get() = nodeStateObservable override val database: CordaPersistence get() = this@AbstractNode.database override val configuration: NodeConfiguration get() = this@AbstractNode.configuration override fun cordaService(type: Class): T { diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 6871c234ba..c7330ffbbd 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -115,6 +115,10 @@ internal class CordaRPCOpsImpl( return services.myInfo } + override fun nodeStateObservable(): Observable { + return services.myNodeStateObservable + } + override fun notaryIdentities(): List { return services.networkMapCache.notaryIdentities } diff --git a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt index c99103b06d..3e434d3f3a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt +++ b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt @@ -8,6 +8,7 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.DataFeed +import net.corda.core.messaging.NodeState import net.corda.core.node.NodeInfo import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.Vault @@ -16,6 +17,7 @@ import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.Sort import net.corda.node.services.messaging.RpcContext import net.corda.node.services.messaging.requireEitherPermission +import rx.Observable import java.io.InputStream import java.security.PublicKey @@ -60,6 +62,8 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo) + override fun nodeStateObservable(): Observable = guard("nodeStateObservable", implementation::nodeStateObservable) + override fun notaryIdentities(): List = guard("notaryIdentities", implementation::notaryIdentities) override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = guard("addVaultTransactionNote") { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index 41249c8062..9f1ba7dd5f 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -9,6 +9,7 @@ import net.corda.core.identity.PartyAndCertificate import net.corda.core.messaging.DataFeed import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowProgressHandle +import net.corda.core.messaging.NodeState import net.corda.core.node.* import net.corda.core.node.services.* import net.corda.core.serialization.SerializeAsToken @@ -161,6 +162,8 @@ open class MockServices( val identity = getTestPartyAndCertificate(MEGA_CORP.name, key.public) return NodeInfo(emptyList(), listOf(identity), 1, serial = 1L) } + override val myNodeStateObservable: Observable + get() = PublishSubject.create() override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2) val mockCordappProvider = MockCordappProvider(cordappLoader, attachments) override val cordappProvider: CordappProvider get() = mockCordappProvider