From c467a056aeed7f68751b0e292912c851737f794b Mon Sep 17 00:00:00 2001 From: bpaunescu Date: Tue, 21 Nov 2017 09:52:17 +0000 Subject: [PATCH] Revert CORDA-296: added rpc that returns an observable for node state (#2091) * Revert "CORDA-296: added rpc that returns an observable for node state (#2004)" This reverts commit 7d1f7ab * Revert "CORDA-296: added rpc that returns an observable for node state (#2004)" This reverts commit 7d1f7ab --- .ci/api-current.txt | 7 ----- .../net/corda/client/rpc/RPCStabilityTests.kt | 28 ------------------- .../net/corda/core/messaging/CordaRPCOps.kt | 9 ------ .../kotlin/net/corda/core/node/ServiceHub.kt | 5 ---- .../net/corda/node/internal/AbstractNode.kt | 6 ---- .../corda/node/internal/CordaRPCOpsImpl.kt | 4 --- .../node/internal/RpcAuthorisationProxy.kt | 4 --- .../net/corda/testing/node/MockServices.kt | 3 -- 8 files changed, 66 deletions(-) diff --git a/.ci/api-current.txt b/.ci/api-current.txt index 9416a54a9c..84b4d41817 100644 --- a/.ci/api-current.txt +++ b/.ci/api-current.txt @@ -1533,7 +1533,6 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec @org.jetbrains.annotations.NotNull public abstract List networkMapSnapshot() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.NodeInfo nodeInfo() @org.jetbrains.annotations.Nullable public abstract net.corda.core.node.NodeInfo nodeInfoFromParty(net.corda.core.identity.AbstractParty) - @net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract rx.Observable nodeStateObservable() @org.jetbrains.annotations.NotNull public abstract List notaryIdentities() @org.jetbrains.annotations.Nullable public abstract net.corda.core.identity.Party notaryPartyFromX500Name(net.corda.core.identity.CordaX500Name) @org.jetbrains.annotations.NotNull public abstract java.io.InputStream openAttachment(net.corda.core.crypto.SecureHash) @@ -1613,11 +1612,6 @@ public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Obje ## @net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.MessageRecipients ## -@net.corda.core.serialization.CordaSerializable public final class net.corda.core.messaging.NodeState extends java.lang.Enum - protected (String, int) - public static net.corda.core.messaging.NodeState valueOf(String) - public static net.corda.core.messaging.NodeState[] values() -## @net.corda.core.DoNotImplement public interface net.corda.core.messaging.RPCOps public abstract int getProtocolVersion() ## @@ -1714,7 +1708,6 @@ public @interface net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.ContractUpgradeService getContractUpgradeService() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.KeyManagementService getKeyManagementService() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.NodeInfo getMyInfo() - @org.jetbrains.annotations.NotNull public abstract rx.Observable getMyNodeStateObservable() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.NetworkMapCache getNetworkMapCache() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.TransactionVerifierService getTransactionVerifierService() @org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.TransactionStorage getValidatedTransactions() diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 7247c33ddd..8d6885f0b3 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -6,16 +6,12 @@ import net.corda.core.context.Trace import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose -import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.NodeState import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.serialize import net.corda.core.utilities.* -import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.RPCApi -import net.corda.nodeapi.User import net.corda.testing.driver.poll import net.corda.testing.internal.* import org.apache.activemq.artemis.api.core.SimpleString @@ -241,30 +237,6 @@ class RPCStabilityTests { } } - @Test - fun `clients receive notifications that node is shutting down`() { - val alice = User("Alice", "Alice", setOf(invokeRpc(CordaRPCOps::nodeStateObservable))) - val bob = User("Bob", "Bob", setOf(invokeRpc(CordaRPCOps::nodeStateObservable))) - val slagathor = User("Slagathor", "Slagathor", setOf(invokeRpc(CordaRPCOps::nodeStateObservable))) - val userList = listOf(alice, bob, slagathor) - val expectedMessages = ArrayList() - - rpcDriver(startNodesInProcess = true) { - val node = startNode(rpcUsers = listOf(alice, bob, slagathor)).getOrThrow() - userList.forEach { - val connection = node.rpcClientToNode().start(it.username, it.password) - val nodeStateObservable = connection.proxy.nodeStateObservable() - nodeStateObservable.subscribe { update -> - expectedMessages.add(update) - } - } - - node.stop() - } - assertEquals(userList.size, expectedMessages.size) - assertEquals(NodeState.SHUTTING_DOWN, expectedMessages.first()) - } - interface TrackSubscriberOps : RPCOps { fun subscribe(): Observable } diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt index 8859d94ec8..b6880a8103 100644 --- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt @@ -226,10 +226,6 @@ interface CordaRPCOps : RPCOps { /** Returns Node's NodeInfo, assuming this will not change while the node is running. */ fun nodeInfo(): NodeInfo - /** Returns and [Observable] object with future states of the node. */ - @RPCReturnsObservables - fun nodeStateObservable(): Observable - /** * Returns network's notary identities, assuming this will not change while the node is running. * @@ -468,8 +464,3 @@ inline fun > CordaRPCOps.startTrac */ @CordaSerializable data class DataFeed(val snapshot: A, val updates: Observable) - -@CordaSerializable -enum class NodeState { - SHUTTING_DOWN -} \ No newline at end of file diff --git a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt index 68ae3fc5f3..78f1704336 100644 --- a/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt +++ b/core/src/main/kotlin/net/corda/core/node/ServiceHub.kt @@ -8,13 +8,11 @@ import net.corda.core.crypto.SignableData import net.corda.core.crypto.SignatureMetadata import net.corda.core.crypto.TransactionSignature import net.corda.core.flows.ContractUpgradeFlow -import net.corda.core.messaging.NodeState import net.corda.core.node.services.* import net.corda.core.serialization.SerializeAsToken import net.corda.core.transactions.FilteredTransaction import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder -import rx.Observable import java.security.PublicKey import java.sql.Connection import java.time.Clock @@ -150,9 +148,6 @@ interface ServiceHub : ServicesForResolution { /** The [NodeInfo] object corresponding to our own entry in the network map. */ val myInfo: NodeInfo - /** The [Observable] object used to communicate to RPC clients the state of the node. */ - val myNodeStateObservable: Observable - /** * Return the singleton instance of the given Corda service type. This is a class that is annotated with * [CordaService] and will have automatically been registered by the node. diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index d6aad5cf45..149f11668e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -61,7 +61,6 @@ import net.corda.node.utilities.* import org.apache.activemq.artemis.utils.ReusableLatch import org.slf4j.Logger import rx.Observable -import rx.subjects.PublishSubject import java.io.IOException import java.lang.reflect.InvocationTargetException import java.security.KeyPair @@ -121,7 +120,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, protected val services: ServiceHubInternal get() = _services private lateinit var _services: ServiceHubInternalImpl - protected val nodeStateObservable: PublishSubject = PublishSubject.create() protected var myNotaryIdentity: PartyAndCertificate? = null protected lateinit var checkpointStorage: CheckpointStorage protected lateinit var smm: StateMachineManager @@ -635,9 +633,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, // Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it // unsubscribes us forcibly, rather than blocking the shutdown process. - // Notify observers that the node is shutting down - nodeStateObservable.onNext(NodeState.SHUTTING_DOWN) - // Run shutdown hooks in opposite order to starting for (toRun in runOnStop.reversed()) { toRun() @@ -738,7 +733,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, override val attachments: AttachmentStorage get() = this@AbstractNode.attachments override val networkService: MessagingService get() = network override val clock: Clock get() = platformClock - override val myNodeStateObservable: Observable get() = nodeStateObservable override val configuration: NodeConfiguration get() = this@AbstractNode.configuration override fun cordaService(type: Class): T { require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" } diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 660ce4b767..ef0597c675 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -117,10 +117,6 @@ internal class CordaRPCOpsImpl( return services.myInfo } - override fun nodeStateObservable(): Observable { - return services.myNodeStateObservable - } - override fun notaryIdentities(): List { return services.networkMapCache.notaryIdentities } diff --git a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt index 2ae8594db9..7a1e66f6ea 100644 --- a/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt +++ b/node/src/main/kotlin/net/corda/node/internal/RpcAuthorisationProxy.kt @@ -8,14 +8,12 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.DataFeed -import net.corda.core.messaging.NodeState import net.corda.core.node.NodeInfo import net.corda.core.node.services.AttachmentId import net.corda.core.node.services.NetworkMapCache import net.corda.core.node.services.Vault import net.corda.core.node.services.vault.* import net.corda.node.services.messaging.RpcAuthContext -import rx.Observable import java.io.InputStream import java.security.PublicKey @@ -68,8 +66,6 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo) - override fun nodeStateObservable(): Observable = guard("nodeStateObservable", implementation::nodeStateObservable) - override fun notaryIdentities(): List = guard("notaryIdentities", implementation::notaryIdentities) override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = guard("addVaultTransactionNote") { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt index 4ed9b5ef94..696472bb9e 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt @@ -10,7 +10,6 @@ import net.corda.core.identity.PartyAndCertificate import net.corda.core.messaging.DataFeed import net.corda.core.messaging.FlowHandle import net.corda.core.messaging.FlowProgressHandle -import net.corda.core.messaging.NodeState import net.corda.core.node.* import net.corda.core.node.services.* import net.corda.core.serialization.SerializeAsToken @@ -179,8 +178,6 @@ open class MockServices( val identity = getTestPartyAndCertificate(initialIdentityName, key.public) return NodeInfo(emptyList(), listOf(identity), 1, serial = 1L) } - override val myNodeStateObservable: Observable - get() = PublishSubject.create() override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2) val mockCordappProvider = MockCordappProvider(cordappLoader, attachments) override val cordappProvider: CordappProvider get() = mockCordappProvider