mirror of
https://github.com/corda/corda.git
synced 2025-06-18 07:08:15 +00:00
Refactor CordaRPCClient into new :client:rpc Gradle module. (#405)
* CORDA-305: Refactor CordaRPCClient into :client:rpc module * CORDA-305: Remove the Kotlin test framework from the artifacts. * CORDA-305: Migrate serialisation whitelist into node-api module. * CORDA-305: Clean up unused RPC observables. * CORDA-305: Add :client:rpc module to documentation tasks. * CORDA-305: Include :finance into :client:rpc for its serialisable classes. * CORDA-305: Move test classes into the correct directory. * CORDA-305: Migrate :finance dependency from :client:rpc into DemoBench. * CORDA-305: Update wording of TODO about handling Observables.
This commit is contained in:
@ -8,9 +8,9 @@ import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.readLines
|
||||
import net.corda.node.LOGS_DIRECTORY_NAME
|
||||
import net.corda.node.services.api.RegulatorService
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import org.junit.Test
|
||||
import java.nio.file.Paths
|
||||
import java.util.concurrent.Executors
|
||||
|
@ -16,6 +16,7 @@ import net.corda.flows.CashPaymentFlow
|
||||
import net.corda.node.driver.NodeHandle
|
||||
import net.corda.node.driver.driver
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.testing.expect
|
||||
import net.corda.testing.expectEvents
|
||||
import net.corda.testing.node.DriverBasedTest
|
||||
|
@ -1,8 +1,8 @@
|
||||
package net.corda.services.messaging
|
||||
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
|
||||
import net.corda.testing.messaging.SimpleMQClient
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
|
||||
|
@ -1,6 +1,6 @@
|
||||
package net.corda.services.messaging
|
||||
|
||||
import net.corda.node.services.User
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.testing.messaging.SimpleMQClient
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
|
@ -2,6 +2,7 @@ package net.corda.services.messaging
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.client.rpc.CordaRPCClientImpl
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.crypto.composite
|
||||
@ -13,16 +14,15 @@ import net.corda.core.random63BitValue
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
|
||||
import net.corda.node.services.messaging.CordaRPCClientImpl
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.P2P_QUEUE
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.testing.configureTestSSL
|
||||
import net.corda.testing.messaging.SimpleMQClient
|
||||
import net.corda.testing.node.NodeBasedTest
|
||||
|
@ -172,7 +172,7 @@ private fun printPluginsAndServices(node: Node) {
|
||||
}
|
||||
val plugins = node.pluginRegistries
|
||||
.map { it.javaClass.name }
|
||||
.filterNot { it.startsWith("net.corda.node.") || it.startsWith("net.corda.core.") }
|
||||
.filterNot { it.startsWith("net.corda.node.") || it.startsWith("net.corda.core.") || it.startsWith("net.corda.nodeapi.")}
|
||||
.map { it.substringBefore('$') }
|
||||
if (plugins.isNotEmpty())
|
||||
printBasicNodeInfo("Loaded plugins", plugins.joinToString())
|
||||
|
@ -6,6 +6,7 @@ import com.google.common.util.concurrent.*
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigRenderOptions
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.crypto.Party
|
||||
import net.corda.core.div
|
||||
import net.corda.core.flatMap
|
||||
@ -16,15 +17,14 @@ import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.core.node.services.ServiceType
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.LOGS_DIRECTORY_NAME
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.config.ConfigHelper
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent
|
||||
import net.corda.node.services.messaging.CordaRPCClient
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.utilities.ServiceIdentityGenerator
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import okhttp3.OkHttpClient
|
||||
import okhttp3.Request
|
||||
|
@ -20,15 +20,14 @@ import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.RPCUserServiceImpl
|
||||
import net.corda.node.services.api.MessagingServiceInternal
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.node.services.messaging.NodeMessagingClient
|
||||
import net.corda.node.services.transactions.PersistentUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftUniquenessProvider
|
||||
import net.corda.node.services.transactions.RaftValidatingNotaryService
|
||||
import net.corda.node.utilities.AddressUtils
|
||||
import net.corda.node.services.transactions.*
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress
|
||||
import org.slf4j.Logger
|
||||
import java.io.RandomAccessFile
|
||||
import java.lang.management.ManagementFactory
|
||||
|
@ -1,57 +0,0 @@
|
||||
package net.corda.node.serialization
|
||||
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.core.node.CordaPluginRegistry
|
||||
import net.corda.core.serialization.SerializationCustomization
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import rx.Notification
|
||||
import rx.exceptions.OnErrorNotImplementedException
|
||||
import java.math.BigDecimal
|
||||
import java.time.LocalDate
|
||||
import java.time.Period
|
||||
import java.util.*
|
||||
|
||||
/**
|
||||
* NOTE: We do not whitelist [HashMap] or [HashSet] since they are unstable under serialization.
|
||||
*/
|
||||
class DefaultWhitelist : CordaPluginRegistry() {
|
||||
override fun customizeSerialization(custom: SerializationCustomization): Boolean {
|
||||
custom.apply {
|
||||
addToWhitelist(Array<Any>(0, {}).javaClass)
|
||||
addToWhitelist(Notification::class.java)
|
||||
addToWhitelist(Notification.Kind::class.java)
|
||||
addToWhitelist(ArrayList::class.java)
|
||||
addToWhitelist(listOf<Any>().javaClass) // EmptyList
|
||||
addToWhitelist(Pair::class.java)
|
||||
addToWhitelist(ByteArray::class.java)
|
||||
addToWhitelist(UUID::class.java)
|
||||
addToWhitelist(LinkedHashSet::class.java)
|
||||
addToWhitelist(setOf<Unit>().javaClass) // EmptySet
|
||||
addToWhitelist(Currency::class.java)
|
||||
addToWhitelist(listOf(Unit).javaClass) // SingletonList
|
||||
addToWhitelist(setOf(Unit).javaClass) // SingletonSet
|
||||
addToWhitelist(mapOf(Unit to Unit).javaClass) // SingletonSet
|
||||
addToWhitelist(HostAndPort::class.java)
|
||||
addToWhitelist(SimpleString::class.java)
|
||||
addToWhitelist(KryoException::class.java)
|
||||
addToWhitelist(StringBuffer::class.java)
|
||||
addToWhitelist(Unit::class.java)
|
||||
addToWhitelist(java.io.ByteArrayInputStream::class.java)
|
||||
addToWhitelist(java.lang.Class::class.java)
|
||||
addToWhitelist(java.math.BigDecimal::class.java)
|
||||
addToWhitelist(java.security.KeyPair::class.java)
|
||||
addToWhitelist(java.time.Duration::class.java)
|
||||
addToWhitelist(java.time.Instant::class.java)
|
||||
addToWhitelist(java.time.LocalDate::class.java)
|
||||
addToWhitelist(java.util.Collections.singletonMap("A", "B").javaClass)
|
||||
addToWhitelist(java.util.LinkedHashMap::class.java)
|
||||
addToWhitelist(BigDecimal::class.java)
|
||||
addToWhitelist(LocalDate::class.java)
|
||||
addToWhitelist(Period::class.java)
|
||||
addToWhitelist(BitSet::class.java)
|
||||
addToWhitelist(OnErrorNotImplementedException::class.java)
|
||||
}
|
||||
return true
|
||||
}
|
||||
}
|
@ -2,6 +2,7 @@ package net.corda.node.services
|
||||
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.nodeapi.User
|
||||
|
||||
/**
|
||||
* Service for retrieving [User] objects representing RPC users who are authorised to use the RPC system. A [User]
|
||||
@ -24,10 +25,6 @@ class RPCUserServiceImpl(config: NodeConfiguration) : RPCUserService {
|
||||
override val users: List<User> get() = _users.values.toList()
|
||||
}
|
||||
|
||||
data class User(val username: String, val password: String, val permissions: Set<String>) {
|
||||
override fun toString(): String = "${javaClass.simpleName}($username, permissions=$permissions)"
|
||||
}
|
||||
|
||||
fun startFlowPermission(className: String) = "StartFlow.$className"
|
||||
fun <P : FlowLogic<*>> startFlowPermission(clazz: Class<P>) = startFlowPermission(clazz.name)
|
||||
inline fun <reified P : FlowLogic<*>> startFlowPermission(): String = startFlowPermission(P::class.java)
|
||||
|
@ -12,9 +12,9 @@ import net.corda.core.node.services.ServiceInfo
|
||||
import net.corda.node.internal.NetworkMapInfo
|
||||
import net.corda.node.internal.Node
|
||||
import net.corda.node.serialization.NodeClock
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.utilities.TestClock
|
||||
import net.corda.nodeapi.User
|
||||
import java.net.URL
|
||||
import java.nio.file.Path
|
||||
import java.util.*
|
||||
|
@ -1,118 +0,0 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.core.crypto.CompositeKey
|
||||
import net.corda.core.messaging.MessageRecipientGroup
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.read
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import java.security.KeyStore
|
||||
|
||||
/**
|
||||
* The base class for Artemis services that defines shared data structures and SSL transport configuration.
|
||||
*/
|
||||
abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
|
||||
companion object {
|
||||
init {
|
||||
System.setProperty("org.jboss.logging.provider", "slf4j")
|
||||
}
|
||||
|
||||
// System users must contain an invalid RPC username character to prevent any chance of name clash which in this
|
||||
// case is a forward slash
|
||||
const val NODE_USER = "SystemUsers/Node"
|
||||
const val PEER_USER = "SystemUsers/Peer"
|
||||
|
||||
const val INTERNAL_PREFIX = "internal."
|
||||
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers."
|
||||
const val SERVICES_PREFIX = "${INTERNAL_PREFIX}services."
|
||||
const val CLIENTS_PREFIX = "clients."
|
||||
const val P2P_QUEUE = "p2p.inbound"
|
||||
const val RPC_REQUESTS_QUEUE = "rpc.requests"
|
||||
const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals"
|
||||
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
|
||||
const val NETWORK_MAP_QUEUE = "${INTERNAL_PREFIX}networkmap"
|
||||
|
||||
/**
|
||||
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
|
||||
* only be used in unit tests and the internals of the messaging services to keep addressing opaque for the future.
|
||||
* N.B. Marked as JvmStatic to allow use in the inherited classes.
|
||||
*/
|
||||
@JvmStatic
|
||||
@VisibleForTesting
|
||||
fun toHostAndPort(target: MessageRecipients): HostAndPort {
|
||||
val addr = target as? ArtemisMessagingComponent.ArtemisPeerAddress ?: throw IllegalArgumentException("Not an Artemis address")
|
||||
return addr.hostAndPort
|
||||
}
|
||||
}
|
||||
|
||||
interface ArtemisAddress : MessageRecipients {
|
||||
val queueName: String
|
||||
}
|
||||
|
||||
interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient {
|
||||
val hostAndPort: HostAndPort
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisPeerAddress {
|
||||
override val queueName: String get() = NETWORK_MAP_QUEUE
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the class used to implement [SingleMessageRecipient], for now. Note that in future this class
|
||||
* may change or evolve and code that relies upon it being a simple host/port may not function correctly.
|
||||
* For instance it may contain onion routing data.
|
||||
*
|
||||
* [NodeAddress] identifies a specific peer node and an associated queue. The queue may be the peer's own queue or
|
||||
* an advertised service's queue.
|
||||
*
|
||||
* @param queueName The name of the queue this address is associated with.
|
||||
* @param hostAndPort The address of the node.
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class NodeAddress(override val queueName: String, override val hostAndPort: HostAndPort) : ArtemisPeerAddress {
|
||||
companion object {
|
||||
fun asPeer(peerIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
|
||||
return NodeAddress("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort)
|
||||
}
|
||||
|
||||
fun asService(serviceIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
|
||||
return NodeAddress("$SERVICES_PREFIX${serviceIdentity.toBase58String()}", hostAndPort)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* [ServiceAddress] implements [MessageRecipientGroup]. It holds a queue associated with a service advertised by
|
||||
* zero or more nodes. Each advertising node has an associated consumer.
|
||||
*
|
||||
* By sending to such an address Artemis will pick a consumer (uses Round Robin by default) and sends the message
|
||||
* there. We use this to establish sessions involving service counterparties.
|
||||
*
|
||||
* @param identity The service identity's owning key.
|
||||
*/
|
||||
data class ServiceAddress(val identity: CompositeKey) : ArtemisAddress, MessageRecipientGroup {
|
||||
override val queueName: String = "$SERVICES_PREFIX${identity.toBase58String()}"
|
||||
}
|
||||
|
||||
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
|
||||
abstract val config: SSLConfiguration?
|
||||
|
||||
/**
|
||||
* Returns nothing if the keystore was opened OK or throws if not. Useful to check the password, as
|
||||
* unfortunately Artemis tends to bury the exception when the password is wrong.
|
||||
*/
|
||||
fun checkStorePasswords() {
|
||||
val config = config ?: return
|
||||
config.keyStoreFile.read {
|
||||
KeyStore.getInstance("JKS").load(it, config.keyStorePassword.toCharArray())
|
||||
}
|
||||
config.trustStoreFile.read {
|
||||
KeyStore.getInstance("JKS").load(it, config.trustStorePassword.toCharArray())
|
||||
}
|
||||
}
|
||||
}
|
@ -16,18 +16,19 @@ import net.corda.core.node.services.NetworkMapCache.MapChange
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.expectedOnDefaultFileSystem
|
||||
import net.corda.node.printBasicNodeInfo
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.NODE_ROLE
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
|
||||
import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.expectedOnDefaultFileSystem
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration
|
||||
import org.apache.activemq.artemis.core.config.Configuration
|
||||
|
@ -1,161 +0,0 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.google.common.net.HostAndPort
|
||||
import net.corda.nodeapi.config.SSLConfiguration
|
||||
import net.corda.core.ThreadBox
|
||||
import net.corda.core.logElapsedTime
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.minutes
|
||||
import net.corda.core.seconds
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||
import rx.Observable
|
||||
import java.io.Closeable
|
||||
import java.time.Duration
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
|
||||
/**
|
||||
* An RPC client connects to the specified server and allows you to make calls to the server that perform various
|
||||
* useful tasks. See the documentation for [proxy] or review the docsite to learn more about how this API works.
|
||||
*
|
||||
* @param host The hostname and messaging port of the node.
|
||||
* @param config If specified, the SSL configuration to use. If not specified, SSL will be disabled and the node will only be authenticated on non-SSL RPC port, the RPC traffic with not be encrypted when SSL is disabled.
|
||||
*/
|
||||
@ThreadSafe
|
||||
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration? = null, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() {
|
||||
private companion object {
|
||||
val log = loggerFor<CordaRPCClient>()
|
||||
}
|
||||
|
||||
// TODO: Certificate handling for clients needs more work.
|
||||
private inner class State {
|
||||
var running = false
|
||||
lateinit var sessionFactory: ClientSessionFactory
|
||||
lateinit var session: ClientSession
|
||||
lateinit var clientImpl: CordaRPCClientImpl
|
||||
}
|
||||
|
||||
private val state = ThreadBox(State())
|
||||
|
||||
/**
|
||||
* Opens the connection to the server with the given username and password, then returns itself.
|
||||
* Registers a JVM shutdown hook to cleanly disconnect.
|
||||
*/
|
||||
@Throws(ActiveMQException::class)
|
||||
fun start(username: String, password: String): CordaRPCClient {
|
||||
state.locked {
|
||||
check(!running)
|
||||
log.logElapsedTime("Startup") {
|
||||
checkStorePasswords()
|
||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.Outbound(), host, config, enableSSL = config != null)).apply {
|
||||
// TODO: Put these in config file or make it user configurable?
|
||||
threadPoolMaxSize = 1
|
||||
confirmationWindowSize = 100000 // a guess
|
||||
retryInterval = 5.seconds.toMillis()
|
||||
retryIntervalMultiplier = 1.5 // Exponential backoff
|
||||
maxRetryInterval = 3.minutes.toMillis()
|
||||
serviceConfigurationOverride?.invoke(this)
|
||||
}
|
||||
sessionFactory = serverLocator.createSessionFactory()
|
||||
session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize)
|
||||
session.start()
|
||||
clientImpl = CordaRPCClientImpl(session, state.lock, username)
|
||||
running = true
|
||||
}
|
||||
}
|
||||
|
||||
Runtime.getRuntime().addShutdownHook(Thread {
|
||||
close()
|
||||
})
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* A convenience function that opens a connection with the given credentials, executes the given code block with all
|
||||
* available RPCs in scope and shuts down the RPC connection again. It's meant for quick prototyping and demos. For
|
||||
* more control you probably want to control the lifecycle of the client and proxies independently, as well as
|
||||
* configuring a timeout and other such features via the [proxy] method.
|
||||
*
|
||||
* After this method returns the client is closed and can't be restarted.
|
||||
*/
|
||||
@Throws(ActiveMQException::class)
|
||||
fun <T> use(username: String, password: String, block: CordaRPCOps.() -> T): T {
|
||||
require(!state.locked { running })
|
||||
start(username, password)
|
||||
(this as Closeable).use {
|
||||
return proxy().block()
|
||||
}
|
||||
}
|
||||
|
||||
/** Shuts down the client and lets the server know it can free the used resources (in a nice way). */
|
||||
override fun close() {
|
||||
state.locked {
|
||||
if (!running) return
|
||||
session.close()
|
||||
sessionFactory.close()
|
||||
running = false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a fresh proxy that lets you invoke RPCs on the server. Calls on it block, and if the server throws an
|
||||
* exception then it will be rethrown on the client. Proxies are thread safe but only one RPC can be in flight at
|
||||
* once. If you'd like to perform multiple RPCs in parallel, use this function multiple times to get multiple
|
||||
* proxies.
|
||||
*
|
||||
* Creation of a proxy is a somewhat expensive operation that involves calls to the server, so if you want to do
|
||||
* calls from many threads at once you should cache one proxy per thread and reuse them. This function itself is
|
||||
* thread safe though so requires no extra synchronisation.
|
||||
*
|
||||
* RPC sends and receives are logged on the net.corda.rpc logger.
|
||||
*
|
||||
* By default there are no timeouts on calls. This is deliberate, RPCs without timeouts can survive restarts,
|
||||
* maintenance downtime and moves of the server. RPCs can survive temporary losses or changes in client connectivity,
|
||||
* like switching between wifi networks. You can specify a timeout on the level of a proxy. If a call times
|
||||
* out it will throw [RPCException.Deadline].
|
||||
*
|
||||
* The [CordaRPCOps] defines what client RPCs are available. If an RPC returns an [Observable] anywhere in the
|
||||
* object graph returned then the server-side observable is transparently linked to a messaging queue, and that
|
||||
* queue linked to another observable on the client side here. *You are expected to use it*. The server will begin
|
||||
* buffering messages immediately that it will expect you to drain by subscribing to the returned observer. You can
|
||||
* opt-out of this by simply casting the [Observable] to [Closeable] or [AutoCloseable] and then calling the close
|
||||
* method on it. You don't have to explicitly close the observable if you actually subscribe to it: it will close
|
||||
* itself and free up the server-side resources either when the client or JVM itself is shutdown, or when there are
|
||||
* no more subscribers to it. Once all the subscribers to a returned observable are unsubscribed, the observable is
|
||||
* closed and you can't then re-subscribe again: you'll have to re-request a fresh observable with another RPC.
|
||||
*
|
||||
* The proxy and linked observables consume some small amount of resources on the server. It's OK to just exit your
|
||||
* process and let the server clean up, but in a long running process where you only need something for a short
|
||||
* amount of time it is polite to cast the objects to [Closeable] or [AutoCloseable] and close it when you are done.
|
||||
* Finalizers are in place to warn you if you lose a reference to an unclosed proxy or observable.
|
||||
*
|
||||
* @throws RPCException if the server version is too low or if the server isn't reachable within the given time.
|
||||
*/
|
||||
@JvmOverloads
|
||||
@Throws(RPCException::class)
|
||||
fun proxy(timeout: Duration? = null, minVersion: Int = 0): CordaRPCOps {
|
||||
return state.locked {
|
||||
check(running) { "Client must have been started first" }
|
||||
log.logElapsedTime("Proxy build") {
|
||||
clientImpl.proxyFor(CordaRPCOps::class.java, timeout, minVersion)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("UNUSED")
|
||||
private fun finalize() {
|
||||
state.locked {
|
||||
if (running) {
|
||||
rpcLog.warn("A CordaMQClient is being finalised whilst still running, did you forget to call close?")
|
||||
close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,385 +0,0 @@
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.google.common.cache.CacheBuilder
|
||||
import net.corda.core.ErrorOr
|
||||
import net.corda.core.bufferUntilSubscribed
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.RPCReturnsObservables
|
||||
import net.corda.core.random63BitValue
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.debug
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.Closeable
|
||||
import java.lang.ref.WeakReference
|
||||
import java.lang.reflect.InvocationHandler
|
||||
import java.lang.reflect.Method
|
||||
import java.lang.reflect.Proxy
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import javax.annotation.concurrent.GuardedBy
|
||||
import javax.annotation.concurrent.ThreadSafe
|
||||
import kotlin.concurrent.withLock
|
||||
import kotlin.reflect.jvm.javaMethod
|
||||
|
||||
/**
|
||||
* Core RPC engine implementation, to learn how to use RPC you should be looking at [CordaRPCClient].
|
||||
*
|
||||
* # Design notes
|
||||
*
|
||||
* The way RPCs are handled is fairly standard except for the handling of observables. When an RPC might return
|
||||
* an [Observable] it is specially tagged. This causes the client to create a new transient queue for the
|
||||
* receiving of observables and their observations with a random ID in the name. This ID is sent to the server in
|
||||
* a message header. All observations are sent via this single queue.
|
||||
*
|
||||
* The reason for doing it this way and not the more obvious approach of one-queue-per-observable is that we want
|
||||
* the queues to be *transient*, meaning their lifetime in the broker is tied to the session that created them.
|
||||
* A server side observable and its associated queue is not a cost-free thing, let alone the memory and resources
|
||||
* needed to actually generate the observations themselves, therefore we want to ensure these cannot leak. A
|
||||
* transient queue will be deleted automatically if the client session terminates, which by default happens on
|
||||
* disconnect but can also be configured to happen after a short delay (this allows clients to e.g. switch IP
|
||||
* address). On the server the deletion of the observations queue triggers unsubscription from the associated
|
||||
* observables, which in turn may then be garbage collected.
|
||||
*
|
||||
* Creating a transient queue requires a roundtrip to the broker and thus doing an RPC that could return
|
||||
* observables takes two server roundtrips instead of one. That's why we require RPCs to be marked with
|
||||
* [RPCReturnsObservables] as needing this special treatment instead of always doing it.
|
||||
*
|
||||
* If the Artemis/JMS APIs allowed us to create transient queues assigned to someone else then we could
|
||||
* potentially use a different design in which the node creates new transient queues (one per observable) on the
|
||||
* fly. The client would then have to watch out for this and start consuming those queues as they were created.
|
||||
*
|
||||
* We use one queue per RPC because we don't know ahead of time how many observables the server might return and
|
||||
* often the server doesn't know either, which pushes towards a single queue design, but at the same time the
|
||||
* processing of observations returned by an RPC might be striped across multiple threads and we'd like
|
||||
* backpressure management to not be scoped per client process but with more granularity. So we end up with
|
||||
* a compromise where the unit of backpressure management is the response to a single RPC.
|
||||
*
|
||||
* TODO: Backpressure isn't propagated all the way through the MQ broker at the moment.
|
||||
*/
|
||||
class CordaRPCClientImpl(private val session: ClientSession,
|
||||
private val sessionLock: ReentrantLock,
|
||||
private val username: String) {
|
||||
companion object {
|
||||
private val closeableCloseMethod = Closeable::close.javaMethod
|
||||
private val autocloseableCloseMethod = AutoCloseable::close.javaMethod
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a proxy for the given type, which must descend from [RPCOps].
|
||||
*
|
||||
* @see CordaRPCClient.proxy for more information about how to use the proxies.
|
||||
*/
|
||||
fun <T : RPCOps> proxyFor(rpcInterface: Class<T>, timeout: Duration? = null, minVersion: Int = 0): T {
|
||||
sessionLock.withLock {
|
||||
if (producer == null)
|
||||
producer = session.createProducer()
|
||||
}
|
||||
val proxyImpl = RPCProxyHandler(timeout)
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
val proxy = Proxy.newProxyInstance(rpcInterface.classLoader, arrayOf(rpcInterface, Closeable::class.java), proxyImpl) as T
|
||||
proxyImpl.serverProtocolVersion = proxy.protocolVersion
|
||||
if (minVersion > proxyImpl.serverProtocolVersion)
|
||||
throw RPCException("Requested minimum protocol version $minVersion is higher than the server's supported protocol version (${proxyImpl.serverProtocolVersion})")
|
||||
return proxy
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
//
|
||||
//region RPC engine
|
||||
//
|
||||
// You can find docs on all this in the api doc for the proxyFor method, and in the docsite.
|
||||
|
||||
// Utility to quickly suck out the contents of an Artemis message. There's probably a more efficient way to
|
||||
// do this.
|
||||
private fun <T : Any> ClientMessage.deserialize(kryo: Kryo): T = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }.deserialize(kryo)
|
||||
|
||||
// We by default use a weak reference so GC can happen, otherwise they persist for the life of the client.
|
||||
@GuardedBy("sessionLock")
|
||||
private val addressToQueuedObservables = CacheBuilder.newBuilder().weakValues().build<String, QueuedObservable>()
|
||||
// This is used to hold a reference counted hard reference when we know there are subscribers.
|
||||
private val hardReferencesToQueuedObservables = Collections.synchronizedSet(mutableSetOf<QueuedObservable>())
|
||||
|
||||
private var producer: ClientProducer? = null
|
||||
|
||||
class ObservableDeserializer() : Serializer<Observable<Any>>() {
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<Observable<Any>>): Observable<Any> {
|
||||
val qName = kryo.context[RPCKryoQNameKey] as String
|
||||
val rpcName = kryo.context[RPCKryoMethodNameKey] as String
|
||||
val rpcLocation = kryo.context[RPCKryoLocationKey] as Throwable
|
||||
val rpcClient = kryo.context[RPCKryoClientKey] as CordaRPCClientImpl
|
||||
val handle = input.readInt(true)
|
||||
val ob = rpcClient.sessionLock.withLock {
|
||||
rpcClient.addressToQueuedObservables.getIfPresent(qName) ?: rpcClient.QueuedObservable(qName, rpcName, rpcLocation).apply {
|
||||
rpcClient.addressToQueuedObservables.put(qName, this)
|
||||
}
|
||||
}
|
||||
val result = ob.getForHandle(handle)
|
||||
rpcLog.debug { "Deserializing and connecting a new observable for $rpcName on $qName: $result" }
|
||||
return result
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, `object`: Observable<Any>) {
|
||||
throw UnsupportedOperationException("not implemented")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The proxy class returned to the client is auto-generated on the fly by the java.lang.reflect Proxy
|
||||
* infrastructure. The JDK Proxy class writes bytecode into memory for a class that implements the requested
|
||||
* interfaces and then routes all method calls to the invoke method below in a conveniently reified form.
|
||||
* We can then easily take the data about the method call and turn it into an RPC. This avoids the need
|
||||
* for the compile-time code generation which is so common in RPC systems.
|
||||
*/
|
||||
@ThreadSafe
|
||||
private inner class RPCProxyHandler(private val timeout: Duration?) : InvocationHandler, Closeable {
|
||||
private val proxyId = random63BitValue()
|
||||
private val consumer: ClientConsumer
|
||||
|
||||
var serverProtocolVersion = 0
|
||||
|
||||
init {
|
||||
val proxyAddress = constructAddress(proxyId)
|
||||
consumer = sessionLock.withLock {
|
||||
session.createTemporaryQueue(proxyAddress, proxyAddress)
|
||||
session.createConsumer(proxyAddress)
|
||||
}
|
||||
}
|
||||
|
||||
private fun constructAddress(addressId: Long) = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$username.rpc.$addressId"
|
||||
|
||||
@Synchronized
|
||||
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
|
||||
if (isCloseInvocation(method)) {
|
||||
close()
|
||||
return null
|
||||
}
|
||||
if (method.name == "toString" && args == null)
|
||||
return "Client RPC proxy"
|
||||
|
||||
if (consumer.isClosed)
|
||||
throw RPCException("RPC Proxy is closed")
|
||||
|
||||
// All invoked methods on the proxy end up here.
|
||||
val location = Throwable()
|
||||
rpcLog.debug {
|
||||
val argStr = args?.joinToString() ?: ""
|
||||
"-> RPC -> ${method.name}($argStr): ${method.returnType}"
|
||||
}
|
||||
|
||||
checkMethodVersion(method)
|
||||
|
||||
val msg: ClientMessage = createMessage(method)
|
||||
// We could of course also check the return type of the method to see if it's Observable, but I'd
|
||||
// rather haved the annotation be used consistently.
|
||||
val returnsObservables = method.isAnnotationPresent(RPCReturnsObservables::class.java)
|
||||
val kryo = if (returnsObservables) maybePrepareForObservables(location, method, msg) else createRPCKryoForDeserialization(this@CordaRPCClientImpl)
|
||||
val next: ErrorOr<*> = try {
|
||||
sendRequest(args, msg)
|
||||
receiveResponse(kryo, method, timeout)
|
||||
} finally {
|
||||
releaseRPCKryoForDeserialization(kryo)
|
||||
}
|
||||
rpcLog.debug { "<- RPC <- ${method.name} = $next" }
|
||||
return unwrapOrThrow(next)
|
||||
}
|
||||
|
||||
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
|
||||
private fun unwrapOrThrow(next: ErrorOr<*>): Any? {
|
||||
val ex = next.error
|
||||
if (ex != null) {
|
||||
// Replace the stack trace because that's an implementation detail of the server that isn't so
|
||||
// helpful to the user who wants to see where the error was on their side, and serialising stack
|
||||
// frame objects is a bit annoying. We slice it here to avoid the invoke() machinery being exposed.
|
||||
// The resulting exception looks like it was thrown from inside the called method.
|
||||
(ex as java.lang.Throwable).stackTrace = java.lang.Throwable().stackTrace.let { it.sliceArray(1..it.size - 1) }
|
||||
throw ex
|
||||
} else {
|
||||
return next.value
|
||||
}
|
||||
}
|
||||
|
||||
private fun receiveResponse(kryo: Kryo, method: Method, timeout: Duration?): ErrorOr<*> {
|
||||
val artemisMessage: ClientMessage =
|
||||
if (timeout == null)
|
||||
consumer.receive() ?: throw ActiveMQObjectClosedException()
|
||||
else
|
||||
consumer.receive(timeout.toMillis()) ?: throw RPCException.DeadlineExceeded(method.name)
|
||||
artemisMessage.acknowledge()
|
||||
val next = artemisMessage.deserialize<ErrorOr<*>>(kryo)
|
||||
return next
|
||||
}
|
||||
|
||||
private fun sendRequest(args: Array<out Any>?, msg: ClientMessage) {
|
||||
sessionLock.withLock {
|
||||
val argsKryo = createRPCKryoForDeserialization(this@CordaRPCClientImpl)
|
||||
val serializedArgs = try {
|
||||
(args ?: emptyArray<Any?>()).serialize(argsKryo)
|
||||
} catch (e: KryoException) {
|
||||
throw RPCException("Could not serialize RPC arguments", e)
|
||||
} finally {
|
||||
releaseRPCKryoForDeserialization(argsKryo)
|
||||
}
|
||||
msg.writeBodyBufferBytes(serializedArgs.bytes)
|
||||
producer!!.send(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE, msg)
|
||||
}
|
||||
}
|
||||
|
||||
private fun maybePrepareForObservables(location: Throwable, method: Method, msg: ClientMessage): Kryo {
|
||||
// Create a temporary queue just for the emissions on any observables that are returned.
|
||||
val observationsId = random63BitValue()
|
||||
val observationsQueueName = constructAddress(observationsId)
|
||||
session.createTemporaryQueue(observationsQueueName, observationsQueueName)
|
||||
msg.putLongProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, observationsId)
|
||||
// And make sure that we deserialise observable handles so that they're linked to the right
|
||||
// queue. Also record a bit of metadata for debugging purposes.
|
||||
return createRPCKryoForDeserialization(this@CordaRPCClientImpl, observationsQueueName, method.name, location)
|
||||
}
|
||||
|
||||
private fun createMessage(method: Method): ClientMessage {
|
||||
return session.createMessage(false).apply {
|
||||
putStringProperty(ClientRPCRequestMessage.METHOD_NAME, method.name)
|
||||
putLongProperty(ClientRPCRequestMessage.REPLY_TO, proxyId)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
}
|
||||
|
||||
private fun checkMethodVersion(method: Method) {
|
||||
val methodVersion = method.getAnnotation(RPCSinceVersion::class.java)?.version ?: 0
|
||||
if (methodVersion > serverProtocolVersion)
|
||||
throw UnsupportedOperationException("Method ${method.name} was added in RPC protocol version $methodVersion but the server is running $serverProtocolVersion")
|
||||
}
|
||||
|
||||
private fun isCloseInvocation(method: Method) = method == closeableCloseMethod || method == autocloseableCloseMethod
|
||||
|
||||
override fun close() {
|
||||
consumer.close()
|
||||
sessionLock.withLock { session.deleteQueue(constructAddress(proxyId)) }
|
||||
}
|
||||
|
||||
override fun toString() = "Corda RPC Proxy listening on queue ${constructAddress(proxyId)}"
|
||||
}
|
||||
|
||||
/**
|
||||
* When subscribed to, starts consuming from the given queue name and demultiplexing the observables being
|
||||
* sent to it. The server queue is moved into in-memory buffers (one per attached server-side observable)
|
||||
* until drained through a subscription. When the subscriptions are all gone, the server-side queue is deleted.
|
||||
*/
|
||||
@ThreadSafe
|
||||
private inner class QueuedObservable(private val qName: String,
|
||||
private val rpcName: String,
|
||||
private val rpcLocation: Throwable) {
|
||||
private val root = PublishSubject.create<MarshalledObservation>()
|
||||
private val rootShared = root.doOnUnsubscribe { close() }.share()
|
||||
|
||||
// This could be made more efficient by using a specialised IntMap
|
||||
// When handling this map we don't synchronise on [this], otherwise there is a race condition between close() and deliver()
|
||||
private val observables = Collections.synchronizedMap(HashMap<Int, Observable<Any>>())
|
||||
|
||||
private var consumer: ClientConsumer? = null
|
||||
|
||||
private val referenceCount = AtomicInteger(0)
|
||||
|
||||
// We have to create a weak reference, otherwise we cannot be GC'd.
|
||||
init {
|
||||
val weakThis = WeakReference<QueuedObservable>(this)
|
||||
consumer = sessionLock.withLock { session.createConsumer(qName) }.setMessageHandler { weakThis.get()?.deliver(it) }
|
||||
}
|
||||
|
||||
/**
|
||||
* We have to reference count subscriptions to the returned [Observable]s to prevent early GC because we are
|
||||
* weak referenced.
|
||||
*
|
||||
* Derived [Observables] (e.g. filtered etc) hold a strong reference to the original, but for example, if
|
||||
* the pattern as follows is used, the original passes out of scope and the direction of reference is from the
|
||||
* original to the [Observer]. We use the reference counting to allow for this pattern.
|
||||
*
|
||||
* val observationsSubject = PublishSubject.create<Observation>()
|
||||
* originalObservable.subscribe(observationsSubject)
|
||||
* return observationsSubject
|
||||
*/
|
||||
private fun refCountUp() {
|
||||
if(referenceCount.andIncrement == 0) {
|
||||
hardReferencesToQueuedObservables.add(this)
|
||||
}
|
||||
}
|
||||
|
||||
private fun refCountDown() {
|
||||
if(referenceCount.decrementAndGet() == 0) {
|
||||
hardReferencesToQueuedObservables.remove(this)
|
||||
}
|
||||
}
|
||||
|
||||
fun getForHandle(handle: Int): Observable<Any> {
|
||||
synchronized(observables) {
|
||||
return observables.getOrPut(handle) {
|
||||
/**
|
||||
* Note that the order of bufferUntilSubscribed() -> dematerialize() is very important here.
|
||||
*
|
||||
* In particular doing it the other way around may result in the following edge case:
|
||||
* The RPC returns two (or more) Observables. The first Observable unsubscribes *during serialisation*,
|
||||
* before the second one is hit, causing the [rootShared] to unsubscribe and consequently closing
|
||||
* the underlying artemis queue, even though the second Observable was not even registered.
|
||||
*
|
||||
* The buffer -> dematerialize order ensures that the Observable may not unsubscribe until the caller
|
||||
* subscribes, which must be after full deserialisation and registering of all top level Observables.
|
||||
*
|
||||
* In addition, when subscribe and unsubscribe is called on the [Observable] returned here, we
|
||||
* reference count a hard reference to this [QueuedObservable] to prevent premature GC.
|
||||
*/
|
||||
rootShared.filter { it.forHandle == handle }.map { it.what }.bufferUntilSubscribed().dematerialize<Any>().doOnSubscribe { refCountUp() }.doOnUnsubscribe { refCountDown() }.share()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun deliver(msg: ClientMessage) {
|
||||
msg.acknowledge()
|
||||
val kryo = createRPCKryoForDeserialization(this@CordaRPCClientImpl, qName, rpcName, rpcLocation)
|
||||
val received: MarshalledObservation = try { msg.deserialize(kryo) } finally {
|
||||
releaseRPCKryoForDeserialization(kryo)
|
||||
}
|
||||
rpcLog.debug { "<- Observable [$rpcName] <- Received $received" }
|
||||
synchronized(observables) {
|
||||
// Force creation of the buffer if it doesn't already exist.
|
||||
getForHandle(received.forHandle)
|
||||
root.onNext(received)
|
||||
}
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
fun close() {
|
||||
rpcLog.debug("Closing queue observable for call to $rpcName : $qName")
|
||||
consumer?.close()
|
||||
consumer = null
|
||||
sessionLock.withLock { session.deleteQueue(qName) }
|
||||
}
|
||||
|
||||
@Suppress("UNUSED")
|
||||
fun finalize() {
|
||||
val c = synchronized(this) { consumer }
|
||||
if (c != null) {
|
||||
rpcLog.warn("A hot observable returned from an RPC ($rpcName) was never subscribed to. " +
|
||||
"This wastes server-side resources because it was queueing observations for retrieval. " +
|
||||
"It is being closed now, but please adjust your code to subscribe and unsubscribe from the observable to close it explicitly.", rpcLocation)
|
||||
c.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
//endregion
|
||||
}
|
@ -19,6 +19,7 @@ import net.corda.node.services.api.MessagingServiceInternal
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.node.utilities.*
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
|
||||
import org.apache.activemq.artemis.api.core.Message.*
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
|
@ -5,6 +5,7 @@ import com.esotericsoftware.kryo.KryoException
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import com.google.common.collect.HashMultimap
|
||||
import net.corda.core.ErrorOr
|
||||
@ -14,11 +15,11 @@ import net.corda.core.messaging.RPCReturnsObservables
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.*
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
@ -192,4 +193,17 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, v
|
||||
if (required) throw RPCException("missing $property property") else null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val rpcSerKryoPool = KryoPool.Builder { RPCKryo(RPCDispatcher.ObservableSerializer()) }.build()
|
||||
|
||||
fun createRPCKryoForSerialization(qName: String? = null, dispatcher: RPCDispatcher? = null): Kryo {
|
||||
val kryo = rpcSerKryoPool.borrow()
|
||||
kryo.context.put(RPCKryoQNameKey, qName)
|
||||
kryo.context.put(RPCKryoDispatcherKey, dispatcher)
|
||||
return kryo
|
||||
}
|
||||
|
||||
fun releaseRPCKryoForSerialization(kryo: Kryo) {
|
||||
rpcSerKryoPool.release(kryo)
|
||||
}
|
||||
|
@ -0,0 +1,17 @@
|
||||
@file:JvmName("RPCServerStructures")
|
||||
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import net.corda.nodeapi.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.CURRENT_RPC_USER
|
||||
import net.corda.nodeapi.PermissionException
|
||||
|
||||
/** Helper method which checks that the current RPC user is entitled for the given permission. Throws a [PermissionException] otherwise. */
|
||||
fun requirePermission(permission: String) {
|
||||
// TODO remove the NODE_USER condition once webserver doesn't need it
|
||||
val currentUser = CURRENT_RPC_USER.get()
|
||||
val currentUserPermissions = currentUser.permissions
|
||||
if (currentUser.username != ArtemisMessagingComponent.NODE_USER && currentUserPermissions.intersect(listOf(permission, "ALL")).isEmpty()) {
|
||||
throw PermissionException("User not permissioned for $permission, permissions are $currentUserPermissions")
|
||||
}
|
||||
}
|
@ -1,173 +0,0 @@
|
||||
@file:JvmName("RPCStructures")
|
||||
|
||||
package net.corda.node.services.messaging
|
||||
|
||||
import com.esotericsoftware.kryo.Kryo
|
||||
import com.esotericsoftware.kryo.Registration
|
||||
import com.esotericsoftware.kryo.Serializer
|
||||
import com.esotericsoftware.kryo.io.Input
|
||||
import com.esotericsoftware.kryo.io.Output
|
||||
import com.esotericsoftware.kryo.pool.KryoPool
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.serialization.*
|
||||
import net.corda.core.toFuture
|
||||
import net.corda.core.toObservable
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import org.apache.commons.fileupload.MultipartStream
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
|
||||
/** Global RPC logger */
|
||||
val rpcLog: Logger by lazy { LoggerFactory.getLogger("net.corda.rpc") }
|
||||
|
||||
/** Used in the RPC wire protocol to wrap an observation with the handle of the observable it's intended for. */
|
||||
data class MarshalledObservation(val forHandle: Int, val what: Notification<*>)
|
||||
|
||||
/** Records the protocol version in which this RPC was added. */
|
||||
@Target(AnnotationTarget.FUNCTION)
|
||||
@MustBeDocumented
|
||||
annotation class RPCSinceVersion(val version: Int)
|
||||
|
||||
/** The contents of an RPC request message, separated from the MQ layer. */
|
||||
data class ClientRPCRequestMessage(
|
||||
val args: SerializedBytes<Array<Any>>,
|
||||
val replyToAddress: String,
|
||||
val observationsToAddress: String?,
|
||||
val methodName: String,
|
||||
val user: User
|
||||
) {
|
||||
companion object {
|
||||
const val REPLY_TO = "reply-to"
|
||||
const val OBSERVATIONS_TO = "observations-to"
|
||||
const val METHOD_NAME = "method-name"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is available to RPC implementations to query the validated [User] that is calling it. Each user has a set of
|
||||
* permissions they're entitled to which can be used to control access.
|
||||
*/
|
||||
@JvmField
|
||||
val CURRENT_RPC_USER: ThreadLocal<User> = ThreadLocal()
|
||||
|
||||
/** Helper method which checks that the current RPC user is entitled for the given permission. Throws a [PermissionException] otherwise. */
|
||||
fun requirePermission(permission: String) {
|
||||
// TODO remove the NODE_USER condition once webserver doesn't need it
|
||||
val currentUser = CURRENT_RPC_USER.get()
|
||||
val currentUserPermissions = currentUser.permissions
|
||||
if (currentUser.username != NODE_USER && currentUserPermissions.intersect(listOf(permission, "ALL")).isEmpty()) {
|
||||
throw PermissionException("User not permissioned for $permission, permissions are $currentUserPermissions")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Thrown to indicate a fatal error in the RPC system itself, as opposed to an error generated by the invoked
|
||||
* method.
|
||||
*/
|
||||
@CordaSerializable
|
||||
open class RPCException(msg: String, cause: Throwable?) : RuntimeException(msg, cause) {
|
||||
constructor(msg: String) : this(msg, null)
|
||||
|
||||
class DeadlineExceeded(rpcName: String) : RPCException("Deadline exceeded on call to $rpcName")
|
||||
}
|
||||
|
||||
object ClassSerializer : Serializer<Class<*>>() {
|
||||
override fun read(kryo: Kryo, input: Input, type: Class<Class<*>>): Class<*> {
|
||||
val className = input.readString()
|
||||
return Class.forName(className)
|
||||
}
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, clazz: Class<*>) {
|
||||
output.writeString(clazz.name)
|
||||
}
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
class PermissionException(msg: String) : RuntimeException(msg)
|
||||
|
||||
object RPCKryoClientKey
|
||||
object RPCKryoDispatcherKey
|
||||
object RPCKryoQNameKey
|
||||
object RPCKryoMethodNameKey
|
||||
object RPCKryoLocationKey
|
||||
|
||||
// The Kryo used for the RPC wire protocol. Every type in the wire protocol is listed here explicitly.
|
||||
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
|
||||
// because we can see everything we're using in one place.
|
||||
private class RPCKryo(observableSerializer: Serializer<Observable<Any>>) : CordaKryo(makeStandardClassResolver()) {
|
||||
init {
|
||||
DefaultKryoCustomizer.customize(this)
|
||||
|
||||
// RPC specific classes
|
||||
register(Class::class.java, ClassSerializer)
|
||||
register(MultipartStream.ItemInputStream::class.java, InputStreamSerializer)
|
||||
register(MarshalledObservation::class.java, ImmutableClassSerializer(MarshalledObservation::class))
|
||||
register(Observable::class.java, observableSerializer)
|
||||
@Suppress("UNCHECKED_CAST")
|
||||
register(ListenableFuture::class,
|
||||
read = { kryo, input -> observableSerializer.read(kryo, input, Observable::class.java as Class<Observable<Any>>).toFuture() },
|
||||
write = { kryo, output, obj -> observableSerializer.write(kryo, output, obj.toObservable()) }
|
||||
)
|
||||
register(
|
||||
FlowException::class,
|
||||
read = { kryo, input ->
|
||||
val message = input.readString()
|
||||
val cause = kryo.readObjectOrNull(input, Throwable::class.java)
|
||||
FlowException(message, cause)
|
||||
},
|
||||
write = { kryo, output, obj ->
|
||||
// The subclass may have overridden toString so we use that
|
||||
val message = if (obj.javaClass != FlowException::class.java) obj.toString() else obj.message
|
||||
output.writeString(message)
|
||||
kryo.writeObjectOrNull(output, obj.cause, Throwable::class.java)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
override fun getRegistration(type: Class<*>): Registration {
|
||||
val annotated = context[RPCKryoQNameKey] != null
|
||||
if (Observable::class.java.isAssignableFrom(type)) {
|
||||
return if (annotated) super.getRegistration(Observable::class.java)
|
||||
else throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables")
|
||||
}
|
||||
if (ListenableFuture::class.java.isAssignableFrom(type)) {
|
||||
return if (annotated) super.getRegistration(ListenableFuture::class.java)
|
||||
else throw IllegalStateException("This RPC was not annotated with @RPCReturnsObservables")
|
||||
}
|
||||
if (FlowException::class.java.isAssignableFrom(type))
|
||||
return super.getRegistration(FlowException::class.java)
|
||||
return super.getRegistration(type)
|
||||
}
|
||||
}
|
||||
|
||||
private val rpcSerKryoPool = KryoPool.Builder { RPCKryo(RPCDispatcher.ObservableSerializer()) }.build()
|
||||
|
||||
fun createRPCKryoForSerialization(qName: String? = null, dispatcher: RPCDispatcher? = null): Kryo {
|
||||
val kryo = rpcSerKryoPool.borrow()
|
||||
kryo.context.put(RPCKryoQNameKey, qName)
|
||||
kryo.context.put(RPCKryoDispatcherKey, dispatcher)
|
||||
return kryo
|
||||
}
|
||||
|
||||
fun releaseRPCKryoForSerialization(kryo: Kryo) {
|
||||
rpcSerKryoPool.release(kryo)
|
||||
}
|
||||
|
||||
private val rpcDesKryoPool = KryoPool.Builder { RPCKryo(CordaRPCClientImpl.ObservableDeserializer()) }.build()
|
||||
|
||||
fun createRPCKryoForDeserialization(rpcClient: CordaRPCClientImpl, qName: String? = null, rpcName: String? = null, rpcLocation: Throwable? = null): Kryo {
|
||||
val kryo = rpcDesKryoPool.borrow()
|
||||
kryo.context.put(RPCKryoClientKey, rpcClient)
|
||||
kryo.context.put(RPCKryoQNameKey, qName)
|
||||
kryo.context.put(RPCKryoMethodNameKey, rpcName)
|
||||
kryo.context.put(RPCKryoLocationKey, rpcLocation)
|
||||
return kryo
|
||||
}
|
||||
|
||||
fun releaseRPCKryoForDeserialization(kryo: Kryo) {
|
||||
rpcDesKryoPool.release(kryo)
|
||||
}
|
@ -72,7 +72,7 @@ interface NetworkMapService {
|
||||
// Base topic for messages acknowledging pushed updates
|
||||
val PUSH_ACK_TOPIC = "platform.network_map.push_ack"
|
||||
|
||||
val type = ServiceType.corda.getSubType("network_map")
|
||||
val type = ServiceType.networkMap
|
||||
}
|
||||
|
||||
data class FetchMapRequest(val subscribe: Boolean,
|
||||
|
@ -1,4 +1,3 @@
|
||||
# Register a ServiceLoader service extending from net.corda.core.node.CordaPluginRegistry
|
||||
net.corda.node.services.NotaryChange$Plugin
|
||||
net.corda.node.services.persistence.DataVending$Plugin
|
||||
net.corda.node.serialization.DefaultWhitelist
|
||||
net.corda.node.services.persistence.DataVending$Plugin
|
@ -13,13 +13,13 @@ import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.flows.CashIssueFlow
|
||||
import net.corda.flows.CashPaymentFlow
|
||||
import net.corda.node.internal.CordaRPCOpsImpl
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.CURRENT_RPC_USER
|
||||
import net.corda.node.services.messaging.PermissionException
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
import net.corda.node.services.startFlowPermission
|
||||
import net.corda.node.services.transactions.SimpleNotaryService
|
||||
import net.corda.node.utilities.databaseTransaction
|
||||
import net.corda.nodeapi.CURRENT_RPC_USER
|
||||
import net.corda.nodeapi.PermissionException
|
||||
import net.corda.nodeapi.User
|
||||
import net.corda.testing.expect
|
||||
import net.corda.testing.expectEvents
|
||||
import net.corda.testing.node.MockNetwork
|
||||
|
@ -1,98 +0,0 @@
|
||||
package net.corda.node.messaging
|
||||
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent
|
||||
import net.corda.node.services.messaging.CordaRPCClientImpl
|
||||
import net.corda.node.services.messaging.RPCDispatcher
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
|
||||
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import java.util.*
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
abstract class AbstractClientRPC {
|
||||
|
||||
lateinit var artemis: EmbeddedActiveMQ
|
||||
lateinit var serverSession: ClientSession
|
||||
lateinit var clientSession: ClientSession
|
||||
lateinit var producer: ClientProducer
|
||||
lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
|
||||
|
||||
@Before
|
||||
fun rpcSetup() {
|
||||
// Set up an in-memory Artemis with an RPC requests queue.
|
||||
artemis = EmbeddedActiveMQ()
|
||||
artemis.setConfiguration(ConfigurationImpl().apply {
|
||||
acceptorConfigurations = setOf(TransportConfiguration(InVMAcceptorFactory::class.java.name))
|
||||
isSecurityEnabled = false
|
||||
isPersistenceEnabled = false
|
||||
})
|
||||
artemis.start()
|
||||
|
||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(TransportConfiguration(InVMConnectorFactory::class.java.name))
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
serverSession = sessionFactory.createSession()
|
||||
serverSession.start()
|
||||
|
||||
serverSession.createTemporaryQueue(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE, ArtemisMessagingComponent.RPC_REQUESTS_QUEUE)
|
||||
producer = serverSession.createProducer()
|
||||
serverThread = AffinityExecutor.ServiceAffinityExecutor("unit-tests-rpc-dispatch-thread", 1)
|
||||
serverSession.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 'BINDING_REMOVED'")
|
||||
|
||||
clientSession = sessionFactory.createSession()
|
||||
clientSession.start()
|
||||
|
||||
LogHelper.setLevel("+net.corda.rpc")
|
||||
}
|
||||
|
||||
@After
|
||||
fun rpcShutdown() {
|
||||
safeClose(producer)
|
||||
clientSession.stop()
|
||||
serverSession.stop()
|
||||
artemis.stop()
|
||||
serverThread.shutdownNow()
|
||||
}
|
||||
|
||||
fun <T: RPCOps> rpcProxyFor(rpcUser: User, rpcImpl: T, type: Class<T>): T {
|
||||
val userService = object : RPCUserService {
|
||||
override fun getUser(username: String): User? = if (username == rpcUser.username) rpcUser else null
|
||||
override val users: List<User> get() = listOf(rpcUser)
|
||||
}
|
||||
|
||||
val dispatcher = object : RPCDispatcher(rpcImpl, userService, "SomeName") {
|
||||
override fun send(data: SerializedBytes<*>, toAddress: String) {
|
||||
val msg = serverSession.createMessage(false).apply {
|
||||
writeBodyBufferBytes(data.bytes)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
producer.send(toAddress, msg)
|
||||
}
|
||||
|
||||
override fun getUser(message: ClientMessage): User = rpcUser
|
||||
}
|
||||
|
||||
val serverNotifConsumer = serverSession.createConsumer("rpc.qremovals")
|
||||
val serverConsumer = serverSession.createConsumer(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE)
|
||||
dispatcher.start(serverConsumer, serverNotifConsumer, serverThread)
|
||||
return CordaRPCClientImpl(clientSession, ReentrantLock(), rpcUser.username).proxyFor(type)
|
||||
}
|
||||
|
||||
fun safeClose(obj: Any) = try { (obj as AutoCloseable).close() } catch (e: Exception) {}
|
||||
}
|
@ -1,194 +0,0 @@
|
||||
package net.corda.node.messaging
|
||||
|
||||
import com.google.common.util.concurrent.Futures
|
||||
import com.google.common.util.concurrent.ListenableFuture
|
||||
import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.RPCReturnsObservables
|
||||
import net.corda.core.success
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.CURRENT_RPC_USER
|
||||
import net.corda.node.services.messaging.RPCSinceVersion
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class ClientRPCInfrastructureTests : AbstractClientRPC() {
|
||||
// TODO: Test that timeouts work
|
||||
|
||||
lateinit var proxy: TestOps
|
||||
|
||||
private val authenticatedUser = User("test", "password", permissions = setOf())
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
proxy = rpcProxyFor(authenticatedUser, TestOpsImpl(), TestOps::class.java)
|
||||
}
|
||||
|
||||
@After
|
||||
fun shutdown() {
|
||||
safeClose(proxy)
|
||||
}
|
||||
|
||||
interface TestOps : RPCOps {
|
||||
@Throws(IllegalArgumentException::class)
|
||||
fun barf()
|
||||
|
||||
fun void()
|
||||
|
||||
fun someCalculation(str: String, num: Int): String
|
||||
|
||||
@RPCReturnsObservables
|
||||
fun makeObservable(): Observable<Int>
|
||||
|
||||
@RPCReturnsObservables
|
||||
fun makeComplicatedObservable(): Observable<Pair<String, Observable<String>>>
|
||||
|
||||
@RPCReturnsObservables
|
||||
fun makeListenableFuture(): ListenableFuture<Int>
|
||||
|
||||
@RPCReturnsObservables
|
||||
fun makeComplicatedListenableFuture(): ListenableFuture<Pair<String, ListenableFuture<String>>>
|
||||
|
||||
@RPCSinceVersion(2)
|
||||
fun addedLater()
|
||||
|
||||
fun captureUser(): String
|
||||
}
|
||||
|
||||
private lateinit var complicatedObservable: Observable<Pair<String, Observable<String>>>
|
||||
private lateinit var complicatedListenableFuturee: ListenableFuture<Pair<String, ListenableFuture<String>>>
|
||||
|
||||
inner class TestOpsImpl : TestOps {
|
||||
override val protocolVersion = 1
|
||||
override fun barf(): Unit = throw IllegalArgumentException("Barf!")
|
||||
override fun void() {}
|
||||
override fun someCalculation(str: String, num: Int) = "$str $num"
|
||||
override fun makeObservable(): Observable<Int> = Observable.just(1, 2, 3, 4)
|
||||
override fun makeListenableFuture(): ListenableFuture<Int> = Futures.immediateFuture(1)
|
||||
override fun makeComplicatedObservable() = complicatedObservable
|
||||
override fun makeComplicatedListenableFuture(): ListenableFuture<Pair<String, ListenableFuture<String>>> = complicatedListenableFuturee
|
||||
override fun addedLater(): Unit = throw UnsupportedOperationException("not implemented")
|
||||
override fun captureUser(): String = CURRENT_RPC_USER.get().username
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `simple RPCs`() {
|
||||
// Does nothing, doesn't throw.
|
||||
proxy.void()
|
||||
|
||||
assertEquals("Barf!", assertFailsWith<IllegalArgumentException> {
|
||||
proxy.barf()
|
||||
}.message)
|
||||
|
||||
assertEquals("hi 5", proxy.someCalculation("hi", 5))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `simple observable`() {
|
||||
// This tests that the observations are transmitted correctly, also completion is transmitted.
|
||||
val observations = proxy.makeObservable().toBlocking().toIterable().toList()
|
||||
assertEquals(listOf(1, 2, 3, 4), observations)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `complex observables`() {
|
||||
// This checks that we can return an object graph with complex usage of observables, like an observable
|
||||
// that emits objects that contain more observables.
|
||||
val serverQuotes = PublishSubject.create<Pair<String, Observable<String>>>()
|
||||
val unsubscribeLatch = CountDownLatch(1)
|
||||
complicatedObservable = serverQuotes.asObservable().doOnUnsubscribe { unsubscribeLatch.countDown() }
|
||||
|
||||
val twainQuotes = "Mark Twain" to Observable.just(
|
||||
"I have never let my schooling interfere with my education.",
|
||||
"Clothes make the man. Naked people have little or no influence on society."
|
||||
)
|
||||
val wildeQuotes = "Oscar Wilde" to Observable.just(
|
||||
"I can resist everything except temptation.",
|
||||
"Always forgive your enemies - nothing annoys them so much."
|
||||
)
|
||||
|
||||
val clientQuotes = LinkedBlockingQueue<String>()
|
||||
val clientObs = proxy.makeComplicatedObservable()
|
||||
|
||||
val subscription = clientObs.subscribe {
|
||||
val name = it.first
|
||||
it.second.subscribe {
|
||||
clientQuotes += "Quote by $name: $it"
|
||||
}
|
||||
}
|
||||
|
||||
val rpcQueuesQuery = SimpleString("clients.${authenticatedUser.username}.rpc.*")
|
||||
assertEquals(2, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
|
||||
|
||||
assertThat(clientQuotes).isEmpty()
|
||||
|
||||
serverQuotes.onNext(twainQuotes)
|
||||
assertEquals("Quote by Mark Twain: I have never let my schooling interfere with my education.", clientQuotes.take())
|
||||
assertEquals("Quote by Mark Twain: Clothes make the man. Naked people have little or no influence on society.", clientQuotes.take())
|
||||
|
||||
serverQuotes.onNext(wildeQuotes)
|
||||
assertEquals("Quote by Oscar Wilde: I can resist everything except temptation.", clientQuotes.take())
|
||||
assertEquals("Quote by Oscar Wilde: Always forgive your enemies - nothing annoys them so much.", clientQuotes.take())
|
||||
|
||||
assertTrue(serverQuotes.hasObservers())
|
||||
subscription.unsubscribe()
|
||||
unsubscribeLatch.await()
|
||||
assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `simple ListenableFuture`() {
|
||||
val value = proxy.makeListenableFuture().getOrThrow()
|
||||
assertThat(value).isEqualTo(1)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `complex ListenableFuture`() {
|
||||
val serverQuote = SettableFuture.create<Pair<String, ListenableFuture<String>>>()
|
||||
complicatedListenableFuturee = serverQuote
|
||||
|
||||
val twainQuote = "Mark Twain" to Futures.immediateFuture("I have never let my schooling interfere with my education.")
|
||||
|
||||
val clientQuotes = LinkedBlockingQueue<String>()
|
||||
val clientFuture = proxy.makeComplicatedListenableFuture()
|
||||
|
||||
clientFuture.success {
|
||||
val name = it.first
|
||||
it.second.success {
|
||||
clientQuotes += "Quote by $name: $it"
|
||||
}
|
||||
}
|
||||
|
||||
val rpcQueuesQuery = SimpleString("clients.${authenticatedUser.username}.rpc.*")
|
||||
assertEquals(2, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
|
||||
|
||||
assertThat(clientQuotes).isEmpty()
|
||||
|
||||
serverQuote.set(twainQuote)
|
||||
assertThat(clientQuotes.take()).isEqualTo("Quote by Mark Twain: I have never let my schooling interfere with my education.")
|
||||
|
||||
// TODO This final assert sometimes fails because the relevant queue hasn't been removed yet
|
||||
// assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun versioning() {
|
||||
assertFailsWith<UnsupportedOperationException> { proxy.addedLater() }
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `authenticated user is available to RPC`() {
|
||||
assertThat(proxy.captureUser()).isEqualTo(authenticatedUser.username)
|
||||
}
|
||||
}
|
@ -1,84 +0,0 @@
|
||||
package net.corda.node.messaging
|
||||
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.*
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import kotlin.test.*
|
||||
|
||||
class RPCPermissionsTest : AbstractClientRPC() {
|
||||
companion object {
|
||||
const val DUMMY_FLOW = "StartFlow.net.corda.flows.DummyFlow"
|
||||
const val OTHER_FLOW = "StartFlow.net.corda.flows.OtherFlow"
|
||||
const val ALL_ALLOWED = "ALL"
|
||||
}
|
||||
|
||||
lateinit var proxy: TestOps
|
||||
|
||||
@After
|
||||
fun shutdown() {
|
||||
safeClose(proxy)
|
||||
}
|
||||
|
||||
/*
|
||||
* RPC operation.
|
||||
*/
|
||||
interface TestOps : RPCOps {
|
||||
fun validatePermission(str: String)
|
||||
}
|
||||
|
||||
class TestOpsImpl : TestOps {
|
||||
override val protocolVersion = 1
|
||||
override fun validatePermission(str: String) = requirePermission(str)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an RPC proxy for the given user.
|
||||
*/
|
||||
private fun proxyFor(rpcUser: User): TestOps = rpcProxyFor(rpcUser, TestOpsImpl(), TestOps::class.java)
|
||||
|
||||
private fun userOf(name: String, permissions: Set<String>) = User(name, "password", permissions)
|
||||
|
||||
@Test
|
||||
fun `empty user cannot use any flows`() {
|
||||
val emptyUser = userOf("empty", emptySet())
|
||||
proxy = proxyFor(emptyUser)
|
||||
assertFailsWith(PermissionException::class,
|
||||
"User ${emptyUser.username} should not be allowed to use $DUMMY_FLOW.",
|
||||
{ proxy.validatePermission(DUMMY_FLOW) })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `admin user can use any flow`() {
|
||||
val adminUser = userOf("admin", setOf(ALL_ALLOWED))
|
||||
proxy = proxyFor(adminUser)
|
||||
proxy.validatePermission(DUMMY_FLOW)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `joe user is allowed to use DummyFlow`() {
|
||||
val joeUser = userOf("joe", setOf(DUMMY_FLOW))
|
||||
proxy = proxyFor(joeUser)
|
||||
proxy.validatePermission(DUMMY_FLOW)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `joe user is not allowed to use OtherFlow`() {
|
||||
val joeUser = userOf("joe", setOf(DUMMY_FLOW))
|
||||
proxy = proxyFor(joeUser)
|
||||
assertFailsWith(PermissionException::class,
|
||||
"User ${joeUser.username} should not be allowed to use $OTHER_FLOW",
|
||||
{ proxy.validatePermission(OTHER_FLOW) })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check ALL is implemented the correct way round` () {
|
||||
val joeUser = userOf("joe", setOf(DUMMY_FLOW))
|
||||
proxy = proxyFor(joeUser)
|
||||
assertFailsWith(PermissionException::class,
|
||||
"Permission $ALL_ALLOWED should not do anything for User ${joeUser.username}",
|
||||
{ proxy.validatePermission(ALL_ALLOWED) })
|
||||
}
|
||||
|
||||
}
|
@ -2,6 +2,7 @@ package net.corda.node.services
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import net.corda.node.services.config.FullNodeConfiguration
|
||||
import net.corda.nodeapi.User
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Test
|
||||
|
Reference in New Issue
Block a user