ENT-2489: Additional optional parameter to specify target for RPC invocation. (#3915)

This is a split-out change from an enterprise feature, but since we do have: `RPCApi.RPC_TARGET_LEGAL_IDENTITY` header constant even in OS, it would make sense to extend RPC client API to populate this.
This commit is contained in:
Viktor Kolomeyko 2018-09-11 16:32:17 +01:00 committed by GitHub
parent ce5cdc03f3
commit 49adf55397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 5 deletions

View File

@ -4,6 +4,7 @@ import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.identity.CordaX500Name
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
@ -329,6 +330,21 @@ class CordaRPCClient private constructor(
return start(username, password, null, null)
}
/**
* Logs in to the target server and returns an active connection. The returned connection is a [java.io.Closeable]
* and can be used with a try-with-resources statement. If you don't use that, you should use the
* [RPCConnection.notifyServerAndClose] or [RPCConnection.forceClose] methods to dispose of the connection object
* when done.
*
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed.
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String, targetLegalIdentity: CordaX500Name): CordaRPCConnection {
return start(username, password, null, null, targetLegalIdentity)
}
/**
* Logs in to the target server and returns an active connection. The returned connection is a [java.io.Closeable]
* and can be used with a try-with-resources statement. If you don't use that, you should use the
@ -338,10 +354,28 @@ class CordaRPCClient private constructor(
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param externalTrace external [Trace] for correlation.
* @param impersonatedActor the actor on behalf of which all the invocations will be made.
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?): CordaRPCConnection {
return CordaRPCConnection(getRpcClient().start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor))
return start(username, password, externalTrace, impersonatedActor, null)
}
/**
* Logs in to the target server and returns an active connection. The returned connection is a [java.io.Closeable]
* and can be used with a try-with-resources statement. If you don't use that, you should use the
* [RPCConnection.notifyServerAndClose] or [RPCConnection.forceClose] methods to dispose of the connection object
* when done.
*
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param externalTrace external [Trace] for correlation.
* @param impersonatedActor the actor on behalf of which all the invocations will be made.
* @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed.
* @throws RPCException if the server version is too low or if the server isn't reachable within a reasonable timeout.
*/
fun start(username: String, password: String, externalTrace: Trace?, impersonatedActor: Actor?, targetLegalIdentity: CordaX500Name?): CordaRPCConnection {
return CordaRPCConnection(getRpcClient().start(CordaRPCOps::class.java, username, password, externalTrace, impersonatedActor, targetLegalIdentity))
}
/**

View File

@ -6,6 +6,7 @@ import net.corda.client.rpc.RPCException
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.logElapsedTime
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.ClientRpcSslOptions
@ -65,7 +66,8 @@ class RPCClient<I : RPCOps>(
username: String,
password: String,
externalTrace: Trace? = null,
impersonatedActor: Actor? = null
impersonatedActor: Actor? = null,
targetLegalIdentity: CordaX500Name? = null
): RPCConnection<I> {
return log.logElapsedTime("Startup") {
val clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.${random63BitValue()}")
@ -85,7 +87,8 @@ class RPCClient<I : RPCOps>(
isUseGlobalPools = nodeSerializationEnv != null
}
val sessionId = Trace.SessionId.newInstance()
val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, clientAddress, rpcOpsClass, serializationContext, sessionId, externalTrace, impersonatedActor)
val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, clientAddress,
rpcOpsClass, serializationContext, sessionId, externalTrace, impersonatedActor, targetLegalIdentity)
try {
proxyHandler.start()
val ops: I = uncheckedCast(Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler))

View File

@ -14,6 +14,7 @@ import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSer
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.*
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
@ -64,7 +65,7 @@ import kotlin.reflect.jvm.javaMethod
* automatically signal the server. This is done using a cache that holds weak references to the [UnicastSubject]s.
* The cleanup happens in batches using a dedicated reaper, scheduled on [reaperExecutor].
*
* The client will attempt to failover in case the server become unreachable. Depending on the [ServerLocataor] instance
* The client will attempt to failover in case the server become unreachable. Depending on the [ServerLocator] instance
* passed in the constructor, failover is either handle at Artemis level or client level. If only one transport
* was used to create the [ServerLocator], failover is handled by Artemis (retrying based on [CordaRPCClientConfiguration].
* If a list of transport configurations was used, failover is handled locally. Artemis is able to do it, however the
@ -80,7 +81,8 @@ class RPCClientProxyHandler(
serializationContext: SerializationContext,
private val sessionId: Trace.SessionId,
private val externalTrace: Trace?,
private val impersonatedActor: Actor?
private val impersonatedActor: Actor?,
private val targetLegalIdentity: CordaX500Name?
) : InvocationHandler {
private enum class State {
@ -274,6 +276,9 @@ class RPCClientProxyHandler(
private fun sendMessage(message: RPCApi.ClientToServer) {
val artemisMessage = producerSession!!.createMessage(false)
message.writeToClientMessage(artemisMessage)
targetLegalIdentity?.let {
artemisMessage.putStringProperty(RPCApi.RPC_TARGET_LEGAL_IDENTITY, it.toString())
}
sendExecutor!!.submit {
artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, deduplicationSequenceNumber.getAndIncrement())
log.debug { "-> RPC -> $message" }

View File

@ -7,6 +7,8 @@ release, see :doc:`upgrade-notes`.
Unreleased
----------
* New overload for ``CordaRPCClient.start()`` method allowing to specify target legal identity to use for RPC call.
* Case insensitive vault queries can be specified via a boolean on applicable SQL criteria builder operators. By default queries will be case sensitive.
* Getter added to ``CordaRPCOps`` for the node's network parameters.