CORDA-3232: Support of multiple interfaces for RPC calls (#5495)

* CORDA-3232: Make backward compatible RPC client changes

Such that it will be able to talk to new and old server versions.

* CORDA-3232: Make backward compatible RPC server changes

Such that it will be able to talk to new and old client versions.

* CORDA-3232: Trick Detekt

* CORDA-3232: Integration test for multi-interface communication.

* CORDA-3232: Add legacy mode test.

* CORDA-3232: Making Detekt happier

* CORDA-3232: Fix Detekt baseline after merge with `4.3` branch

* CORDA-3232: Incrementing Platform version

As discussed with @lockathan

* CORDA-3232: Fix legacy test post platform version increment

* CORDA-3232: Use recursive logic to establish complete population of method names

* Revert "CORDA-3232: Incrementing Platform version"

This reverts commit d75f48aa

* CORDA-3232: Remove logic that conditions on PLATFORM_VERSION

* CORDA-3232: Making Detekt happier

* CORDA-3232: Few more changes after conversation with @mnesbit

* CORDA-3232: Make a strict match to `CordaRPCOps` on client side

Or else will fail:
net.corda.tools.shell.InteractiveShellIntegrationTest.dumpCheckpoints creates zip with json file for suspended flow

Flagging that `InternalCordaRPCOps.dumpCheckpoints` cannot be called.

* CORDA-3232: Address PR comments by @rick-r3

* CORDA-3232: Address further review input from @rick-r3

* Change the way how methods stored in the map;
* Extend test to make sure that `CordaRPCOps` can indeed be mixed with other RPC interfaces.
This commit is contained in:
Viktor Kolomeyko 2019-09-26 16:01:14 +01:00 committed by Matthew Nesbit
parent 298d8ba69c
commit 51330c2e44
6 changed files with 215 additions and 37 deletions

View File

@ -0,0 +1,91 @@
package net.corda.client.rpc
import com.nhaarman.mockito_kotlin.mock
import net.corda.client.rpc.RPCMultipleInterfacesTests.StringRPCOpsImpl.testPhrase
import net.corda.core.crypto.SecureHash
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.node.internal.rpcDriver
import net.corda.testing.node.internal.startRpcClient
import org.assertj.core.api.Assertions
import org.junit.Assert.*
import org.junit.Rule
import org.junit.Test
import rx.Observable
class RPCMultipleInterfacesTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
companion object {
const val sampleSize = 30
}
interface IntRPCOps : RPCOps {
fun stream(size: Int): Observable<Int>
fun intTestMethod(): Int
}
interface StringRPCOps : RPCOps {
fun stream(size: Int): Observable<String>
fun stringTestMethod() : String
}
private class IntRPCOpsImpl : IntRPCOps {
override val protocolVersion = 1000
override fun stream(size: Int): Observable<Int> {
return Observable.range(0, size)
}
override fun intTestMethod(): Int = protocolVersion
}
private object StringRPCOpsImpl : StringRPCOps {
const val testPhrase = "I work with Strings."
override val protocolVersion = 1000
override fun stream(size: Int): Observable<String> {
return Observable.range(0, size).map { it.toString(8) }
}
override fun stringTestMethod(): String = testPhrase
}
private object MyCordaRpcOpsImpl : CordaRPCOps by mock() {
override val protocolVersion = 1000
}
interface ImaginaryFriend : RPCOps
@Test
fun `can talk multiple interfaces`() {
rpcDriver {
val server = startRpcServer(listOps = listOf(IntRPCOpsImpl(), StringRPCOpsImpl, MyCordaRpcOpsImpl)).get()
val clientInt = startRpcClient<IntRPCOps>(server.broker.hostAndPort!!).get()
val intList = clientInt.stream(sampleSize).toList().toBlocking().single()
assertEquals(sampleSize, intList.size)
val clientString = startRpcClient<StringRPCOps>(server.broker.hostAndPort!!).get()
val stringList = clientString.stream(sampleSize).toList().toBlocking().single()
assertEquals(sampleSize, stringList.size)
assertTrue(stringList.toString(), stringList.all { it.matches("[0-7]*".toRegex()) })
assertEquals(testPhrase, clientString.stringTestMethod())
val rpcOpsClient = startRpcClient<CordaRPCOps>(server.broker.hostAndPort!!).get()
assertFalse(rpcOpsClient.attachmentExists(SecureHash.zeroHash))
Assertions.assertThatThrownBy { startRpcClient<ImaginaryFriend>(server.broker.hostAndPort!!).get() }
.hasCauseInstanceOf(RPCException::class.java).hasMessageContaining("possible client/server version skew")
server.rpcServer.close()
}
}
}

View File

@ -17,6 +17,7 @@ import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.serialize
@ -25,6 +26,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.RPCApi.CLASS_METHOD_DIVIDER
import net.corda.nodeapi.internal.DeduplicationChecker
import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
@ -252,12 +254,13 @@ class RPCClientProxyHandler(
throw RPCException("RPC server is not available.")
val replyId = InvocationId.newInstance()
callSiteMap?.set(replyId, CallSite(method.name))
val methodFqn = produceMethodFullyQualifiedName(method)
callSiteMap?.set(replyId, CallSite(methodFqn))
try {
val serialisedArguments = (arguments?.toList() ?: emptyList()).serialize(context = serializationContextWithObservableContext)
val request = RPCApi.ClientToServer.RpcRequest(
clientAddress,
method.name,
methodFqn,
serialisedArguments,
replyId,
sessionId,
@ -282,6 +285,15 @@ class RPCClientProxyHandler(
}
}
private fun produceMethodFullyQualifiedName(method: Method) : String {
// For CordaRPCOps send method only - for backwards compatibility
return if (CordaRPCOps::class.java == rpcOpsClass) {
method.name
} else {
rpcOpsClass.name + CLASS_METHOD_DIVIDER + method.name
}
}
private fun sendMessage(message: RPCApi.ClientToServer) {
val artemisMessage = producerSession!!.createMessage(false)
message.writeToClientMessage(artemisMessage)

View File

@ -872,8 +872,10 @@
<ID>LongParameterList:QueryCriteria.kt$QueryCriteria.VaultQueryCriteria$( status: Vault.StateStatus = this.status, contractStateTypes: Set&lt;Class&lt;out ContractState&gt;&gt;? = this.contractStateTypes, stateRefs: List&lt;StateRef&gt;? = this.stateRefs, notary: List&lt;AbstractParty&gt;? = this.notary, softLockingCondition: SoftLockingCondition? = this.softLockingCondition, timeCondition: TimeCondition? = this.timeCondition )</ID>
<ID>LongParameterList:RPCClient.kt$RPCClient$( rpcOpsClass: Class&lt;I&gt;, username: String, password: String, externalTrace: Trace? = null, impersonatedActor: Actor? = null, targetLegalIdentity: CordaX500Name? = null )</ID>
<ID>LongParameterList:RPCDriver.kt$( isDebug: Boolean = false, driverDirectory: Path = Paths.get("build") / "rpc-driver" / getTimestampAsDirectoryName(), portAllocation: PortAllocation = globalPortAllocation, debugPortAllocation: PortAllocation = globalDebugPortAllocation, systemProperties: Map&lt;String, String&gt; = emptyMap(), useTestClock: Boolean = false, startNodesInProcess: Boolean = false, waitForNodesToFinish: Boolean = false, extraCordappPackagesToScan: List&lt;String&gt; = emptyList(), notarySpecs: List&lt;NotarySpec&gt; = emptyList(), externalTrace: Trace? = null, @Suppress("DEPRECATION") jmxPolicy: JmxPolicy = JmxPolicy(), networkParameters: NetworkParameters = testNetworkParameters(), notaryCustomOverrides: Map&lt;String, Any?&gt; = emptyMap(), inMemoryDB: Boolean = true, cordappsForAllNodes: Collection&lt;TestCordappInternal&gt;? = null, dsl: RPCDriverDSL.() -&gt; A )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, listOps: List&lt;I&gt;, brokerHandle: RpcBrokerHandle, queueDrainTimeout: Duration = 5.seconds )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I, brokerHandle: RpcBrokerHandle, queueDrainTimeout: Duration = 5.seconds )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I, queueDrainTimeout: Duration = 5.seconds )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( serverName: String = "driver-rpc-server-${random63BitValue()}", rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 5L * MAX_MESSAGE_SIZE, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, customPort: NetworkHostAndPort? = null, listOps: List&lt;I&gt; )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( serverName: String = "driver-rpc-server-${random63BitValue()}", rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 5L * MAX_MESSAGE_SIZE, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, customPort: NetworkHostAndPort? = null, ops: I )</ID>
<ID>LongParameterList:RpcBrokerConfiguration.kt$RpcBrokerConfiguration$(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false, deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false)</ID>
<ID>LongParameterList:SerializationEnvironment.kt$SerializationEnvironment.Companion$( serializationFactory: SerializationFactory, p2pContext: SerializationContext, rpcServerContext: SerializationContext? = null, rpcClientContext: SerializationContext? = null, storageContext: SerializationContext? = null, checkpointContext: CheckpointSerializationContext? = null, checkpointSerializer: CheckpointSerializer? = null )</ID>
@ -2135,6 +2137,10 @@
<ID>MagicNumber:RPCHighThroughputObservableTests.kt$RPCHighThroughputObservableTests$3</ID>
<ID>MagicNumber:RPCHighThroughputObservableTests.kt$RPCHighThroughputObservableTests$4</ID>
<ID>MagicNumber:RPCHighThroughputObservableTests.kt$RPCHighThroughputObservableTests.TestOpsImpl$1000</ID>
<ID>MagicNumber:RPCMultipleInterfacesTests.kt$RPCMultipleInterfacesTests.IntRPCOpsImpl$1000</ID>
<ID>MagicNumber:RPCMultipleInterfacesTests.kt$RPCMultipleInterfacesTests.MyCordaRpcOpsImpl$1000</ID>
<ID>MagicNumber:RPCMultipleInterfacesTests.kt$RPCMultipleInterfacesTests.StringRPCOpsImpl$1000</ID>
<ID>MagicNumber:RPCMultipleInterfacesTests.kt$RPCMultipleInterfacesTests.StringRPCOpsImpl$8</ID>
<ID>MagicNumber:RPCPerformanceTests.kt$RPCPerformanceTests$100</ID>
<ID>MagicNumber:RPCPerformanceTests.kt$RPCPerformanceTests$1000</ID>
<ID>MagicNumber:RPCPerformanceTests.kt$RPCPerformanceTests$1000.0</ID>
@ -4510,7 +4516,6 @@
<ID>MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$return cacheFactory.buildNamed(Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()), "RpcClientProxyHandler_rpcObservable")</ID>
<ID>MaxLineLength:RPCClientProxyHandler.kt$RPCClientProxyHandler$throw UnsupportedOperationException("Method $calledMethod was added in RPC protocol version $sinceVersion but the server is running $serverProtocolVersion")</ID>
<ID>MaxLineLength:RPCDriver.kt$RPCDriverDSL$val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort)</ID>
<ID>MaxLineLength:RPCDriver.kt$RPCDriverDSL$val rpcSecurityManager = RPCSecurityManagerImpl.fromUserList(users = listOf(InternalUser(rpcUser.username, rpcUser.password, rpcUser.permissions)), id = AuthServiceId("TEST_SECURITY_MANAGER"))</ID>
<ID>MaxLineLength:RPCDriver.kt$RPCDriverDSL.Companion$fun createRpcServerArtemisConfig(maxFileSize: Int, maxBufferedBytesPerClient: Long, baseDirectory: Path, hostAndPort: NetworkHostAndPort): Configuration</ID>
<ID>MaxLineLength:RPCDriver.kt$RandomRpcUser.Companion$private inline fun &lt;reified T&gt; HashMap&lt;Class&lt;*&gt;, Generator&lt;*&gt;&gt;.add(generator: Generator&lt;T&gt;)</ID>
<ID>MaxLineLength:RPCDriver.kt$RandomRpcUser.Companion$val handle = RPCClient&lt;RPCOps&gt;(hostAndPort, null, serializationContext = AMQP_RPC_CLIENT_CONTEXT).start(rpcClass, username, password)</ID>
@ -4520,6 +4525,9 @@
<ID>MaxLineLength:RPCOpsWithContext.kt$fun makeRPCOps(getCordaRPCOps: (username: String, credential: String) -&gt; InternalCordaRPCOps, username: String, credential: String): InternalCordaRPCOps</ID>
<ID>MaxLineLength:RPCOpsWithContext.kt$return Proxy.newProxyInstance(InternalCordaRPCOps::class.java.classLoader, arrayOf(InternalCordaRPCOps::class.java)) { _, method, args -&gt; try { method.invoke(cordaRPCOps, *(args ?: arrayOf())) } catch (e: InvocationTargetException) { // Unpack exception. throw e.targetException } } as InternalCordaRPCOps</ID>
<ID>MaxLineLength:RPCSecurityManagerWithAdditionalUser.kt$RPCSecurityManagerWithAdditionalUser : RPCSecurityManager</ID>
<ID>MaxLineLength:RPCServer.kt$RPCServer</ID>
<ID>MaxLineLength:RPCServer.kt$RPCServer$( ops: RPCOps, rpcServerUsername: String, rpcServerPassword: String, serverLocator: ServerLocator, securityManager: RPCSecurityManager, nodeLegalName: CordaX500Name, rpcConfiguration: RPCServerConfiguration, cacheFactory: NamedCacheFactory )</ID>
<ID>MaxLineLength:RPCServer.kt$RPCServer$/** * The method name -&gt; InvocationTarget used for servicing the actual call. * NB: The key in this map can either be: * - FQN of the method including interface name for all the interfaces except `CordaRPCOps`; * - For `CordaRPCOps` interface this will be just plain method name. This is done to maintain wire compatibility with previous versions. */ private val methodTable: Map&lt;String, InvocationTarget&gt;</ID>
<ID>MaxLineLength:RPCServer.kt$RPCServer$consumerSession = sessionFactory!!.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)</ID>
<ID>MaxLineLength:RPCServer.kt$RPCServer$private</ID>
<ID>MaxLineLength:RPCServer.kt$RPCServer$producerSession = sessionFactory!!.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)</ID>
@ -5429,7 +5437,7 @@
<ID>SpreadOperator:RPCDriver.kt$RandomRpcUser.Companion$(handle.proxy, *arguments.toTypedArray())</ID>
<ID>SpreadOperator:RPCOpsWithContext.kt$(cordaRPCOps, *(args ?: arrayOf()))</ID>
<ID>SpreadOperator:RPCSecurityManagerTest.kt$RPCSecurityManagerTest$(request.first(), *args)</ID>
<ID>SpreadOperator:RPCServer.kt$RPCServer$(ops, *arguments.toTypedArray())</ID>
<ID>SpreadOperator:RPCServer.kt$RPCServer$(invocationTarget.instance, *arguments.toTypedArray())</ID>
<ID>SpreadOperator:ReactiveArtemisConsumer.kt$ReactiveArtemisConsumer.Companion$(queueName, *queueNames)</ID>
<ID>SpreadOperator:ReconnectingCordaRPCOps.kt$ReconnectingCordaRPCOps.ErrorInterceptingHandler$(reconnectingRPCConnection.proxy, *(args ?: emptyArray()))</ID>
<ID>SpreadOperator:ServiceHub.kt$ServiceHub$(first, *remaining)</ID>
@ -5477,7 +5485,7 @@
<ID>ThrowsCount:NonValidatingNotaryFlow.kt$NonValidatingNotaryFlow$ private fun checkNotaryWhitelisted(notary: Party, attachedParameterHash: SecureHash?)</ID>
<ID>ThrowsCount:PropertyDescriptor.kt$PropertyDescriptor$ fun validate()</ID>
<ID>ThrowsCount:RPCApi.kt$RPCApi.ServerToClient.Companion$fun fromClientMessage(context: SerializationContext, message: ClientMessage): ServerToClient</ID>
<ID>ThrowsCount:RPCServer.kt$RPCServer$private fun invokeRpc(context: RpcAuthContext, methodName: String, arguments: List&lt;Any?&gt;): Try&lt;Any&gt;</ID>
<ID>ThrowsCount:RPCServer.kt$RPCServer$private fun invokeRpc(context: RpcAuthContext, inMethodName: String, arguments: List&lt;Any?&gt;): Try&lt;Any&gt;</ID>
<ID>ThrowsCount:SchemaMigration.kt$SchemaMigration$private fun doRunMigration(run: Boolean, check: Boolean, existingCheckpoints: Boolean? = null)</ID>
<ID>ThrowsCount:ServicesForResolutionImpl.kt$ServicesForResolutionImpl$// We may need to recursively chase transactions if there are notary changes. fun inner(stateRef: StateRef, forContractClassName: String?): Attachment</ID>
<ID>ThrowsCount:SignedNodeInfo.kt$SignedNodeInfo$// TODO Add root cert param (or TrustAnchor) to make sure all the identities belong to the same root fun verified(): NodeInfo</ID>
@ -6656,6 +6664,7 @@
<ID>WildcardImport:QueryCriteriaUtils.kt$import net.corda.core.node.services.vault.LikenessOperator.*</ID>
<ID>WildcardImport:RPCClientProxyHandler.kt$import net.corda.core.internal.*</ID>
<ID>WildcardImport:RPCClientProxyHandler.kt$import org.apache.activemq.artemis.api.core.client.*</ID>
<ID>WildcardImport:RPCMultipleInterfacesTests.kt$import org.junit.Assert.*</ID>
<ID>WildcardImport:RPCSecurityManagerImpl.kt$import org.apache.shiro.authc.*</ID>
<ID>WildcardImport:RPCServer.kt$import net.corda.core.utilities.*</ID>
<ID>WildcardImport:RPCServer.kt$import org.apache.activemq.artemis.api.core.client.*</ID>

View File

@ -75,6 +75,8 @@ object RPCApi {
const val DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME = "deduplication-sequence-number"
const val CLASS_METHOD_DIVIDER = "#"
val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION =
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " +
"${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'"
@ -103,7 +105,7 @@ object RPCApi {
* Request to a server to trigger the specified method with the provided arguments.
*
* @param clientAddress return address to contact the client at.
* @param id a unique ID for the request, which the server will use to identify its response with.
* @param replyId a unique ID for the request, which the server will use to identify its response with.
* @param methodName name of the method (procedure) to be called.
* @param serialisedArguments Serialised arguments to pass to the method, if any.
*/

View File

@ -14,6 +14,7 @@ import net.corda.core.context.Trace.InvocationId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.LifeCycle
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
@ -25,6 +26,7 @@ import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
import net.corda.node.services.logging.pushToLoggingContext
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.RPCApi.CLASS_METHOD_DIVIDER
import net.corda.nodeapi.externalTrace
import net.corda.nodeapi.impersonatedActor
import net.corda.nodeapi.internal.DeduplicationChecker
@ -67,15 +69,18 @@ data class RPCServerConfiguration(
}
/**
* The [RPCServer] implements the complement of [RPCClient]. When an RPC request arrives it dispatches to the
* corresponding function in [ops]. During serialisation of the reply (and later observations) the server subscribes to
* The [RPCServer] implements the complement of [net.corda.client.rpc.internal.RPCClient]. When an RPC request arrives it dispatches to the
* corresponding function in [opsList]. During serialisation of the reply (and later observations) the server subscribes to
* each Observable it encounters and captures the client address to associate with these Observables. Later it uses this
* address to forward observations arriving on the Observables.
*
* The way this is done is similar to that in [RPCClient], we use Kryo and add a context to stores the subscription map.
* The way this is done is similar to that in [net.corda.client.rpc.internal.RPCClient], we use AMQP and add a context to stores the subscription map.
*
* NB: The order of elements in [opsList] matters in case of legacy RPC clients who do not specify class name of the RPC Ops they are after.
* For Legacy RPC clients who supply method name alone, the calls are being targeted at first element in [opsList].
*/
class RPCServer(
private val ops: RPCOps,
private val opsList: List<RPCOps>,
private val rpcServerUsername: String,
private val rpcServerPassword: String,
private val serverLocator: ServerLocator,
@ -86,6 +91,8 @@ class RPCServer(
) {
private companion object {
private val log = contextLogger()
private data class InvocationTarget(val method: Method, val instance: RPCOps)
}
private enum class State {
@ -102,8 +109,13 @@ class RPCServer(
private data class MessageAndContext(val message: RPCApi.ServerToClient.RpcReply, val context: ObservableContext)
private val lifeCycle = LifeCycle(State.UNSTARTED)
/** The methodname->Method map to use for dispatching. */
private val methodTable: Map<String, Method>
/**
* The method name -> InvocationTarget used for servicing the actual call.
* NB: The key in this map can either be:
* - FQN of the method including interface name for all the interfaces except `CordaRPCOps`;
* - For `CordaRPCOps` interface this will be just plain method name. This is done to maintain wire compatibility with previous versions.
*/
private val methodTable: Map<String, InvocationTarget>
/** The observable subscription mapping. */
private val observableMap = createObservableSubscriptionMap()
/** A mapping from client addresses to IDs of associated Observables */
@ -130,16 +142,47 @@ class RPCServer(
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry, cacheFactory = cacheFactory)
private var deduplicationIdentity: String? = null
constructor (
ops: RPCOps,
rpcServerUsername: String,
rpcServerPassword: String,
serverLocator: ServerLocator,
securityManager: RPCSecurityManager,
nodeLegalName: CordaX500Name,
rpcConfiguration: RPCServerConfiguration,
cacheFactory: NamedCacheFactory
) : this(listOf(ops), rpcServerUsername, rpcServerPassword, serverLocator, securityManager, nodeLegalName, rpcConfiguration, cacheFactory)
init {
val groupedMethods = ops.javaClass.declaredMethods.groupBy { it.name }
groupedMethods.forEach { name, methods ->
if (methods.size > 1) {
throw IllegalArgumentException("Encountered more than one method called $name on ${ops.javaClass.name}")
val mutableMethodTable = mutableMapOf<String, InvocationTarget>()
opsList.forEach { ops ->
listOfApplicableInterfacesRec(ops.javaClass).toSet().forEach { interfaceClass ->
val groupedMethods = with(interfaceClass) {
if(interfaceClass == CordaRPCOps::class.java) {
methods.groupBy { it.name }
} else {
methods.groupBy { interfaceClass.name + CLASS_METHOD_DIVIDER + it.name }
}
}
groupedMethods.forEach { name, methods ->
if (methods.size > 1) {
throw IllegalArgumentException("Encountered more than one method called $name on ${interfaceClass.name}")
}
}
val interimMap = groupedMethods.mapValues { InvocationTarget(it.value.single(), ops) }
mutableMethodTable.putAll(interimMap)
}
}
methodTable = groupedMethods.mapValues { it.value.single() }
// Going forward it is should be treated as immutable construct.
methodTable = mutableMethodTable
}
private fun listOfApplicableInterfacesRec(clazz: Class<*>): List<Class<*>> =
clazz.interfaces.filter { RPCOps::class.java.isAssignableFrom(it) }.flatMap {
listOf(it) + listOfApplicableInterfacesRec(it)
}
private fun createObservableSubscriptionMap(): ObservableSubscriptionMap {
val onObservableRemove = RemovalListener<InvocationId, ObservableSubscription> { key, value, cause ->
log.debug { "Unsubscribing from Observable with id $key because of $cause" }
@ -349,18 +392,18 @@ class RPCServer(
}
}
private fun invokeRpc(context: RpcAuthContext, methodName: String, arguments: List<Any?>): Try<Any> {
private fun invokeRpc(context: RpcAuthContext, inMethodName: String, arguments: List<Any?>): Try<Any> {
return Try.on {
try {
CURRENT_RPC_CONTEXT.set(context)
log.trace { "Calling $methodName" }
val method = methodTable[methodName] ?:
throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?")
method.invoke(ops, *arguments.toTypedArray())
log.trace { "Calling $inMethodName" }
val invocationTarget = methodTable[inMethodName] ?:
throw RPCException("Received RPC for unknown method $inMethodName - possible client/server version skew?")
invocationTarget.method.invoke(invocationTarget.instance, *arguments.toTypedArray())
} catch (e: InvocationTargetException) {
throw e.cause ?: RPCException("Caught InvocationTargetException without cause")
} catch (e: Exception) {
log.warn("Caught exception attempting to invoke RPC $methodName", e)
log.warn("Caught exception attempting to invoke RPC $inMethodName", e)
throw e
} finally {
CURRENT_RPC_CONTEXT.remove()
@ -393,7 +436,7 @@ class RPCServer(
* we receive a notification that the client queue bindings were added.
*/
private fun bufferIfQueueNotBound(clientAddress: SimpleString, message: RPCApi.ServerToClient.RpcReply, context: ObservableContext): Boolean {
val clientBuffer = responseMessageBuffer.compute(clientAddress, { _, value ->
val clientBuffer = responseMessageBuffer.compute(clientAddress) { _, value ->
when (value) {
null -> BufferOrNone.Buffer(ArrayList()).apply {
container.add(MessageAndContext(message, context))
@ -403,7 +446,7 @@ class RPCServer(
}
is BufferOrNone.None -> value
}
})
}
return clientBuffer is BufferOrNone.Buffer
}

View File

@ -303,15 +303,6 @@ data class RPCDriverDSL(
return session
}
/**
* Starts a Netty RPC server.
*
* @param serverName The name of the server, to be used for the folder created for Artemis files.
* @param rpcUser The single user who can access the server through RPC, and their permissions.
* @param nodeLegalName The legal name of the node to check against to authenticate a super user.
* @param configuration The RPC server configuration.
* @param ops The server-side implementation of the RPC interface.
*/
fun <I : RPCOps> startRpcServer(
serverName: String = "driver-rpc-server-${random63BitValue()}",
rpcUser: User = rpcTestUser,
@ -321,9 +312,29 @@ data class RPCDriverDSL(
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
customPort: NetworkHostAndPort? = null,
ops: I
) = startRpcServer(serverName, rpcUser, nodeLegalName, maxFileSize, maxBufferedBytesPerClient, configuration, customPort, listOf(ops))
/**
* Starts a Netty RPC server.
*
* @param serverName The name of the server, to be used for the folder created for Artemis files.
* @param rpcUser The single user who can access the server through RPC, and their permissions.
* @param nodeLegalName The legal name of the node to check against to authenticate a super user.
* @param configuration The RPC server configuration.
* @param listOps The server-side implementation of the RPC interfaces.
*/
fun <I : RPCOps> startRpcServer(
serverName: String = "driver-rpc-server-${random63BitValue()}",
rpcUser: User = rpcTestUser,
nodeLegalName: CordaX500Name = fakeNodeLegalName,
maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 5L * MAX_MESSAGE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
customPort: NetworkHostAndPort? = null,
listOps: List<I>
): CordaFuture<RpcServerHandle> {
return startRpcBroker(serverName, rpcUser, maxFileSize, maxBufferedBytesPerClient, customPort).map { broker ->
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, ops, broker)
startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, listOps, broker)
}
}
@ -477,14 +488,24 @@ data class RPCDriverDSL(
ops: I,
brokerHandle: RpcBrokerHandle,
queueDrainTimeout: Duration = 5.seconds
) = startRpcServerWithBrokerRunning(rpcUser, nodeLegalName, configuration, listOf(ops), brokerHandle, queueDrainTimeout)
private fun <I : RPCOps> startRpcServerWithBrokerRunning(
rpcUser: User = rpcTestUser,
nodeLegalName: CordaX500Name = fakeNodeLegalName,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
listOps: List<I>,
brokerHandle: RpcBrokerHandle,
queueDrainTimeout: Duration = 5.seconds
): RpcServerHandle {
val locator = ActiveMQClient.createServerLocatorWithoutHA(brokerHandle.clientTransportConfiguration).apply {
minLargeMessageSize = MAX_MESSAGE_SIZE
isUseGlobalPools = false
}
val rpcSecurityManager = RPCSecurityManagerImpl.fromUserList(users = listOf(InternalUser(rpcUser.username, rpcUser.password, rpcUser.permissions)), id = AuthServiceId("TEST_SECURITY_MANAGER"))
val rpcSecurityManager = RPCSecurityManagerImpl.fromUserList(users = listOf(InternalUser(rpcUser.username,
rpcUser.password, rpcUser.permissions)), id = AuthServiceId("TEST_SECURITY_MANAGER"))
val rpcServer = RPCServer(
ops,
listOps,
rpcUser.username,
rpcUser.password,
locator,