mirror of
https://github.com/corda/corda.git
synced 2025-02-03 01:31:24 +00:00
Revert CORDA-296: added rpc that returns an observable for node state (#2091)
* Revert "CORDA-296: added rpc that returns an observable for node state (#2004)" This reverts commit 7d1f7ab * Revert "CORDA-296: added rpc that returns an observable for node state (#2004)" This reverts commit 7d1f7ab
This commit is contained in:
parent
e63b6d1386
commit
c467a056ae
@ -1533,7 +1533,6 @@ public final class net.corda.core.identity.IdentityUtils extends java.lang.Objec
|
||||
@org.jetbrains.annotations.NotNull public abstract List networkMapSnapshot()
|
||||
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.NodeInfo nodeInfo()
|
||||
@org.jetbrains.annotations.Nullable public abstract net.corda.core.node.NodeInfo nodeInfoFromParty(net.corda.core.identity.AbstractParty)
|
||||
@net.corda.core.messaging.RPCReturnsObservables @org.jetbrains.annotations.NotNull public abstract rx.Observable nodeStateObservable()
|
||||
@org.jetbrains.annotations.NotNull public abstract List notaryIdentities()
|
||||
@org.jetbrains.annotations.Nullable public abstract net.corda.core.identity.Party notaryPartyFromX500Name(net.corda.core.identity.CordaX500Name)
|
||||
@org.jetbrains.annotations.NotNull public abstract java.io.InputStream openAttachment(net.corda.core.crypto.SecureHash)
|
||||
@ -1613,11 +1612,6 @@ public final class net.corda.core.messaging.CordaRPCOpsKt extends java.lang.Obje
|
||||
##
|
||||
@net.corda.core.serialization.CordaSerializable public interface net.corda.core.messaging.MessageRecipients
|
||||
##
|
||||
@net.corda.core.serialization.CordaSerializable public final class net.corda.core.messaging.NodeState extends java.lang.Enum
|
||||
protected <init>(String, int)
|
||||
public static net.corda.core.messaging.NodeState valueOf(String)
|
||||
public static net.corda.core.messaging.NodeState[] values()
|
||||
##
|
||||
@net.corda.core.DoNotImplement public interface net.corda.core.messaging.RPCOps
|
||||
public abstract int getProtocolVersion()
|
||||
##
|
||||
@ -1714,7 +1708,6 @@ public @interface net.corda.core.messaging.RPCReturnsObservables
|
||||
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.ContractUpgradeService getContractUpgradeService()
|
||||
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.KeyManagementService getKeyManagementService()
|
||||
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.NodeInfo getMyInfo()
|
||||
@org.jetbrains.annotations.NotNull public abstract rx.Observable getMyNodeStateObservable()
|
||||
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.NetworkMapCache getNetworkMapCache()
|
||||
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.TransactionVerifierService getTransactionVerifierService()
|
||||
@org.jetbrains.annotations.NotNull public abstract net.corda.core.node.services.TransactionStorage getValidatedTransactions()
|
||||
|
@ -6,16 +6,12 @@ import net.corda.core.context.Trace
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.internal.concurrent.fork
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.NodeState
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.*
|
||||
import net.corda.node.services.Permissions.Companion.invokeRpc
|
||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.testing.driver.poll
|
||||
import net.corda.testing.internal.*
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
@ -241,30 +237,6 @@ class RPCStabilityTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `clients receive notifications that node is shutting down`() {
|
||||
val alice = User("Alice", "Alice", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
|
||||
val bob = User("Bob", "Bob", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
|
||||
val slagathor = User("Slagathor", "Slagathor", setOf(invokeRpc(CordaRPCOps::nodeStateObservable)))
|
||||
val userList = listOf(alice, bob, slagathor)
|
||||
val expectedMessages = ArrayList<NodeState>()
|
||||
|
||||
rpcDriver(startNodesInProcess = true) {
|
||||
val node = startNode(rpcUsers = listOf(alice, bob, slagathor)).getOrThrow()
|
||||
userList.forEach {
|
||||
val connection = node.rpcClientToNode().start(it.username, it.password)
|
||||
val nodeStateObservable = connection.proxy.nodeStateObservable()
|
||||
nodeStateObservable.subscribe { update ->
|
||||
expectedMessages.add(update)
|
||||
}
|
||||
}
|
||||
|
||||
node.stop()
|
||||
}
|
||||
assertEquals(userList.size, expectedMessages.size)
|
||||
assertEquals(NodeState.SHUTTING_DOWN, expectedMessages.first())
|
||||
}
|
||||
|
||||
interface TrackSubscriberOps : RPCOps {
|
||||
fun subscribe(): Observable<Unit>
|
||||
}
|
||||
|
@ -226,10 +226,6 @@ interface CordaRPCOps : RPCOps {
|
||||
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */
|
||||
fun nodeInfo(): NodeInfo
|
||||
|
||||
/** Returns and [Observable] object with future states of the node. */
|
||||
@RPCReturnsObservables
|
||||
fun nodeStateObservable(): Observable<NodeState>
|
||||
|
||||
/**
|
||||
* Returns network's notary identities, assuming this will not change while the node is running.
|
||||
*
|
||||
@ -468,8 +464,3 @@ inline fun <T, A, B, C, D, E, F, reified R : FlowLogic<T>> CordaRPCOps.startTrac
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class DataFeed<out A, B>(val snapshot: A, val updates: Observable<B>)
|
||||
|
||||
@CordaSerializable
|
||||
enum class NodeState {
|
||||
SHUTTING_DOWN
|
||||
}
|
@ -8,13 +8,11 @@ import net.corda.core.crypto.SignableData
|
||||
import net.corda.core.crypto.SignatureMetadata
|
||||
import net.corda.core.crypto.TransactionSignature
|
||||
import net.corda.core.flows.ContractUpgradeFlow
|
||||
import net.corda.core.messaging.NodeState
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
import net.corda.core.transactions.FilteredTransaction
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import rx.Observable
|
||||
import java.security.PublicKey
|
||||
import java.sql.Connection
|
||||
import java.time.Clock
|
||||
@ -150,9 +148,6 @@ interface ServiceHub : ServicesForResolution {
|
||||
/** The [NodeInfo] object corresponding to our own entry in the network map. */
|
||||
val myInfo: NodeInfo
|
||||
|
||||
/** The [Observable] object used to communicate to RPC clients the state of the node. */
|
||||
val myNodeStateObservable: Observable<NodeState>
|
||||
|
||||
/**
|
||||
* Return the singleton instance of the given Corda service type. This is a class that is annotated with
|
||||
* [CordaService] and will have automatically been registered by the node.
|
||||
|
@ -61,7 +61,6 @@ import net.corda.node.utilities.*
|
||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||
import org.slf4j.Logger
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.IOException
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.security.KeyPair
|
||||
@ -121,7 +120,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
|
||||
protected val services: ServiceHubInternal get() = _services
|
||||
private lateinit var _services: ServiceHubInternalImpl
|
||||
protected val nodeStateObservable: PublishSubject<NodeState> = PublishSubject.create<NodeState>()
|
||||
protected var myNotaryIdentity: PartyAndCertificate? = null
|
||||
protected lateinit var checkpointStorage: CheckpointStorage
|
||||
protected lateinit var smm: StateMachineManager
|
||||
@ -635,9 +633,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
// Meanwhile, we let the remote service send us updates until the acknowledgment buffer overflows and it
|
||||
// unsubscribes us forcibly, rather than blocking the shutdown process.
|
||||
|
||||
// Notify observers that the node is shutting down
|
||||
nodeStateObservable.onNext(NodeState.SHUTTING_DOWN)
|
||||
|
||||
// Run shutdown hooks in opposite order to starting
|
||||
for (toRun in runOnStop.reversed()) {
|
||||
toRun()
|
||||
@ -738,7 +733,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
|
||||
override val attachments: AttachmentStorage get() = this@AbstractNode.attachments
|
||||
override val networkService: MessagingService get() = network
|
||||
override val clock: Clock get() = platformClock
|
||||
override val myNodeStateObservable: Observable<NodeState> get() = nodeStateObservable
|
||||
override val configuration: NodeConfiguration get() = this@AbstractNode.configuration
|
||||
override fun <T : SerializeAsToken> cordaService(type: Class<T>): T {
|
||||
require(type.isAnnotationPresent(CordaService::class.java)) { "${type.name} is not a Corda service" }
|
||||
|
@ -117,10 +117,6 @@ internal class CordaRPCOpsImpl(
|
||||
return services.myInfo
|
||||
}
|
||||
|
||||
override fun nodeStateObservable(): Observable<NodeState> {
|
||||
return services.myNodeStateObservable
|
||||
}
|
||||
|
||||
override fun notaryIdentities(): List<Party> {
|
||||
return services.networkMapCache.notaryIdentities
|
||||
}
|
||||
|
@ -8,14 +8,12 @@ import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.NodeState
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.services.AttachmentId
|
||||
import net.corda.core.node.services.NetworkMapCache
|
||||
import net.corda.core.node.services.Vault
|
||||
import net.corda.core.node.services.vault.*
|
||||
import net.corda.node.services.messaging.RpcAuthContext
|
||||
import rx.Observable
|
||||
import java.io.InputStream
|
||||
import java.security.PublicKey
|
||||
|
||||
@ -68,8 +66,6 @@ class RpcAuthorisationProxy(private val implementation: CordaRPCOps, private val
|
||||
|
||||
override fun nodeInfo(): NodeInfo = guard("nodeInfo", implementation::nodeInfo)
|
||||
|
||||
override fun nodeStateObservable(): Observable<NodeState> = guard("nodeStateObservable", implementation::nodeStateObservable)
|
||||
|
||||
override fun notaryIdentities(): List<Party> = guard("notaryIdentities", implementation::notaryIdentities)
|
||||
|
||||
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) = guard("addVaultTransactionNote") {
|
||||
|
@ -10,7 +10,6 @@ import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.messaging.DataFeed
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.messaging.FlowProgressHandle
|
||||
import net.corda.core.messaging.NodeState
|
||||
import net.corda.core.node.*
|
||||
import net.corda.core.node.services.*
|
||||
import net.corda.core.serialization.SerializeAsToken
|
||||
@ -179,8 +178,6 @@ open class MockServices(
|
||||
val identity = getTestPartyAndCertificate(initialIdentityName, key.public)
|
||||
return NodeInfo(emptyList(), listOf(identity), 1, serial = 1L)
|
||||
}
|
||||
override val myNodeStateObservable: Observable<NodeState>
|
||||
get() = PublishSubject.create<NodeState>()
|
||||
override val transactionVerifierService: TransactionVerifierService get() = InMemoryTransactionVerifierService(2)
|
||||
val mockCordappProvider = MockCordappProvider(cordappLoader, attachments)
|
||||
override val cordappProvider: CordappProvider get() = mockCordappProvider
|
||||
|
Loading…
x
Reference in New Issue
Block a user