CORDA-3960: Port MultiRPCClient to OS (#6644)

* CORDA-3960: Port MultiRPCClient to OS

* CORDA-3960: Carefully restore serialisation environment in `MultiRpcClientTest` to eliminate side effects on other tests

* CORDA-3960: Move ThreadDumpUtils.kt to `core-utils`
This commit is contained in:
Viktor Kolomeyko 2020-08-18 10:35:19 +01:00 committed by GitHub
parent d55676b452
commit 9fc896beb3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1240 additions and 78 deletions

View File

@ -8467,9 +8467,71 @@ public class net.corda.client.rpc.RPCException extends net.corda.core.CordaRunti
public @interface net.corda.client.rpc.RPCSinceVersion
public abstract int version()
##
public class net.corda.client.rpc.UnrecoverableRPCException extends net.corda.client.rpc.RPCException
public <init>(String, Throwable)
public <init>(String, Throwable, int, kotlin.jvm.internal.DefaultConstructorMarker)
##
public final class net.corda.client.rpc.UtilsKt extends java.lang.Object
public static final void notUsed(rx.Observable<T>)
##
public final class net.corda.client.rpc.ext.MultiRPCClient extends java.lang.Object implements java.lang.AutoCloseable
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, net.corda.core.context.Trace)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, net.corda.core.context.Trace, net.corda.core.context.Actor)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.identity.CordaX500Name)
public <init>(java.util.List, Class, String, String, java.util.Set, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.identity.CordaX500Name, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, net.corda.client.rpc.CordaRPCClientConfiguration)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions)
public <init>(java.util.List<net.corda.core.utilities.NetworkHostAndPort>, Class<I>, String, String, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader)
public <init>(java.util.List, Class, String, String, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, ClassLoader, net.corda.client.rpc.CordaRPCClientConfiguration)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class, String, String, ClassLoader, net.corda.client.rpc.CordaRPCClientConfiguration, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, net.corda.core.context.Trace)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, net.corda.core.context.Trace, net.corda.core.context.Actor)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, java.util.Set<? extends net.corda.core.serialization.SerializationCustomSerializer<?, ?>>, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.identity.CordaX500Name)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class, String, String, java.util.Set, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, net.corda.core.context.Trace, net.corda.core.context.Actor, net.corda.core.identity.CordaX500Name, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, net.corda.client.rpc.CordaRPCClientConfiguration)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class, String, String, net.corda.client.rpc.CordaRPCClientConfiguration, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class, String, String, net.corda.client.rpc.CordaRPCClientConfiguration, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, int, kotlin.jvm.internal.DefaultConstructorMarker)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class<I>, String, String, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader)
public <init>(net.corda.core.utilities.NetworkHostAndPort, Class, String, String, net.corda.core.messaging.ClientRpcSslOptions, ClassLoader, int, kotlin.jvm.internal.DefaultConstructorMarker)
public final boolean addConnectionListener(net.corda.client.rpc.ext.RPCConnectionListener<I>)
public void close()
public final boolean removeConnectionListener(net.corda.client.rpc.ext.RPCConnectionListener<I>)
@NotNull
public final java.util.concurrent.CompletableFuture<net.corda.client.rpc.RPCConnection<I>> start()
public final void stop()
public static final net.corda.client.rpc.ext.MultiRPCClient$Companion Companion
##
public interface net.corda.client.rpc.ext.RPCConnectionListener
public abstract void onConnect(net.corda.client.rpc.ext.RPCConnectionListener$ConnectionContext<I>)
public abstract void onDisconnect(net.corda.client.rpc.ext.RPCConnectionListener$ConnectionContext<I>)
public abstract void onPermanentFailure(net.corda.client.rpc.ext.RPCConnectionListener$ConnectionContext<I>)
##
public static interface net.corda.client.rpc.ext.RPCConnectionListener$ConnectionContext
@Nullable
public abstract net.corda.client.rpc.RPCConnection<I> getConnectionOpt()
@Nullable
public abstract Throwable getThrowableOpt()
@NotNull
public abstract String getUserName()
##
public final class net.corda.client.rpc.reconnect.CouldNotStartFlowException extends net.corda.client.rpc.RPCException
public <init>()
public <init>(Throwable)
public <init>(Throwable, int, kotlin.jvm.internal.DefaultConstructorMarker)
##
public final class net.corda.finance.test.CashSchema extends java.lang.Object
public static final net.corda.finance.test.CashSchema INSTANCE
##

View File

@ -0,0 +1,341 @@
package net.corda.client.rpc
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.atLeastOnce
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.never
import com.nhaarman.mockito_kotlin.times
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.ext.RPCConnectionListener
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.core.utilities.millis
import net.corda.core.utilities.seconds
import net.corda.core.utilities.threadDumpAsString
import net.corda.testing.common.internal.eventually
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.node.internal.rpcDriver
import net.corda.testing.node.internal.rpcTestUser
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Assert.assertEquals
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import java.util.concurrent.Callable
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertNull
import kotlin.test.assertSame
import kotlin.test.assertTrue
@RunWith(Parameterized::class)
class RPCConnectionListenerTest(@Suppress("unused") private val iteration: Int) {
companion object {
private val logger = contextLogger()
@JvmStatic
@Parameterized.Parameters(name = "iteration = {0}")
fun iterations(): Iterable<Array<Int>> {
// It is possible to change this value to a greater number
// to ensure that the test is not flaking when executed on CI
val repsCount = 1
return (1..repsCount).map { arrayOf(it) }
}
}
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
interface StringRPCOps : RPCOps {
fun stringTestMethod(): String
}
private object StringRPCOpsImpl : StringRPCOps {
const val testPhrase = "I work with Strings."
override val protocolVersion = 1000
override fun stringTestMethod(): String = testPhrase
}
@Test(timeout = 300_000)
fun `basic listener scenario`() {
rpcDriver {
val server = startRpcServer(listOps = listOf(StringRPCOpsImpl)).get()
val listener = mock<RPCConnectionListener<StringRPCOps>>()
// Establish connection and exchange some calls
val (rpcConnection, _) = startRpcClient(StringRPCOps::class.java, server.broker.hostAndPort!!,
listeners = listOf(listener)).get()
verify(listener, times(1)).onConnect(any())
val proxy = rpcConnection.proxy
assertEquals(StringRPCOpsImpl.testPhrase, proxy.stringTestMethod())
whenever(listener.onDisconnect(any())).then {
@Suppress("unchecked_cast")
val context = it.arguments[0] as RPCConnectionListener.ConnectionContext<StringRPCOps>
assertSame(rpcConnection, context.connectionOpt)
}
// Shutdown server
server.shutdown()
eventually(duration = 30.seconds) {
verify(listener, times(1)).onDisconnect(any())
assertThatThrownBy { proxy.stringTestMethod() }.isInstanceOf(RPCException::class.java)
}
verify(listener, never()).onPermanentFailure(any())
}
}
@Test(timeout = 300_000)
fun `wrong credentials`() {
rpcDriver {
val server = startRpcServer(listOps = listOf(StringRPCOpsImpl)).get()
val listener = mock<RPCConnectionListener<StringRPCOps>>()
whenever(listener.onPermanentFailure(any())).then {
@Suppress("unchecked_cast")
val context = it.arguments[0] as RPCConnectionListener.ConnectionContext<StringRPCOps>
assertNull(context.connectionOpt)
assertThat(context.throwableOpt).isInstanceOf(ActiveMQSecurityException::class.java)
}
// Start client with wrong credentials
assertThatThrownBy {
startRpcClient(StringRPCOps::class.java,
server.broker.hostAndPort!!,
username = "wrongOne",
listeners = listOf(listener)).get()
}.hasCauseInstanceOf(ActiveMQSecurityException::class.java)
verify(listener, never()).onConnect(any())
verify(listener, never()).onDisconnect(any())
verify(listener, times(1)).onPermanentFailure(any())
server.rpcServer.close()
}
}
@Test(timeout = 300_000)
fun `failover listener scenario`() {
rpcDriver {
val primary = startRpcServer(listOps = listOf(StringRPCOpsImpl)).get()
val secondary = startRpcServer(listOps = listOf(StringRPCOpsImpl)).get()
val listener = mock<RPCConnectionListener<StringRPCOps>>()
// Establish connection with HA pool passed-in and exchange some calls
val haAddressPool = listOf(primary, secondary).map { it.broker.hostAndPort!! }
logger.info("HA address pool: $haAddressPool")
val (rpcConnection, _) = startRpcClient(StringRPCOps::class.java,
haAddressPool,
listeners = listOf(listener)).get()
verify(listener, times(1)).onConnect(any())
val proxy = rpcConnection.proxy
assertEquals(StringRPCOpsImpl.testPhrase, proxy.stringTestMethod())
whenever(listener.onDisconnect(any())).then {
@Suppress("unchecked_cast")
val context = it.arguments[0] as RPCConnectionListener.ConnectionContext<StringRPCOps>
assertSame(rpcConnection, context.connectionOpt)
}
// Shutdown primary
primary.shutdown()
eventually(duration = 30.seconds) {
// First disconnect must happen
verify(listener, times(1)).onDisconnect(any())
// Followed by connect to secondary
verify(listener, times(2)).onConnect(any())
// Then functionality should start to work again
assertEquals(StringRPCOpsImpl.testPhrase, proxy.stringTestMethod())
}
// Shutdown secondary
secondary.shutdown()
eventually(duration = 30.seconds) {
// Disconnect from secondary happened
verify(listener, times(2)).onDisconnect(any())
// Subsequent calls throw
assertThatThrownBy { proxy.stringTestMethod() }.isInstanceOf(RPCException::class.java)
}
verify(listener, never()).onPermanentFailure(any())
}
}
@Test(timeout = 300_000)
fun `exceed number of retries scenario`() {
rpcDriver {
val primary = startRpcServer(listOps = listOf(StringRPCOpsImpl)).get()
val secondary = startRpcServer(listOps = listOf(StringRPCOpsImpl)).get()
val listener = mock<RPCConnectionListener<StringRPCOps>>()
// Setup client for having a finite number of quick retries
val fake = NetworkHostAndPort("localhost", nextPort())
val haAddressPool = listOf(fake) + listOf(primary, secondary).map { it.broker.hostAndPort!! }
logger.info("HA address pool: $haAddressPool")
val (rpcConnection, _) = startRpcClient(StringRPCOps::class.java,
haAddressPool,
listeners = listOf(listener),
configuration = CordaRPCClientConfiguration(maxReconnectAttempts = 3, connectionRetryInterval = 1.millis)
).get()
verify(listener, times(1)).onConnect(any())
val proxy = rpcConnection.proxy
assertEquals(StringRPCOpsImpl.testPhrase, proxy.stringTestMethod())
whenever(listener.onDisconnect(any())).then {
@Suppress("unchecked_cast")
val context = it.arguments[0] as RPCConnectionListener.ConnectionContext<StringRPCOps>
assertSame(rpcConnection, context.connectionOpt)
}
// Shutdown primary
primary.shutdown()
eventually(duration = 30.seconds) {
// Followed by connect to secondary
verify(listener, times(2)).onConnect(any())
// Then functionality should start to work again
assertEquals(StringRPCOpsImpl.testPhrase, proxy.stringTestMethod())
}
// Shutdown secondary
secondary.shutdown()
eventually(duration = 30.seconds) {
// Disconnect from secondary happened
verify(listener, times(2)).onDisconnect(any())
// Subsequent calls throw
assertThatThrownBy { proxy.stringTestMethod() }.isInstanceOf(RPCException::class.java)
// Having attempted to connect multiple times - we will give up and declare the state of permanent failure
verify(listener, times(1)).onPermanentFailure(any())
}
}
}
private class KickAndReconnectCallable constructor(private val serverControl: ActiveMQServerControl,
private val client: RPCClient<StringRPCOps>,
private val proxy: StringRPCOps) : Callable<Unit> {
override fun call() {
val latch = CountDownLatch(1)
val reConnectListener = object : RPCConnectionListener<StringRPCOps> {
override fun onConnect(context: RPCConnectionListener.ConnectionContext<StringRPCOps>) {
latch.countDown()
}
override fun onDisconnect(context: RPCConnectionListener.ConnectionContext<StringRPCOps>) {
logger.warn("Unexpected disconnect")
}
override fun onPermanentFailure(context: RPCConnectionListener.ConnectionContext<StringRPCOps>) {
logger.warn("Unexpected permanent failure")
}
}
client.addConnectionListener(reConnectListener)
logger.info("Kicking user out")
serverControl.closeConnectionsForUser(rpcTestUser.username)
assertTrue("Failed to re-connect. " + threadDumpAsString()) { latch.await(60, TimeUnit.SECONDS) }
client.removeConnectionListener(reConnectListener)
eventually(duration = 30.seconds) {
val result = proxy.stringTestMethod()
assertEquals(StringRPCOpsImpl.testPhrase, result)
}
logger.info("Ensured re-connected back")
}
}
@Test(timeout = 300_000)
fun `multi-threaded scenario`() {
rpcDriver {
val server = startRpcServer(listOps = listOf(StringRPCOpsImpl)).get()
val permanentListener = mock<RPCConnectionListener<StringRPCOps>>()
val repsCount = 100
val temporaryListeners = (1..repsCount).map { mock<RPCConnectionListener<StringRPCOps>>() }
// Establish connection and exchange some calls
// NB: Connection setup with retry
val (rpcConnection, client) = startRpcClient(StringRPCOps::class.java,
(1..2).map { server.broker.hostAndPort!! },
listeners = listOf(permanentListener)).get()
verify(permanentListener, times(1)).onConnect(any())
val proxy = rpcConnection.proxy
assertEquals(StringRPCOpsImpl.testPhrase, proxy.stringTestMethod())
var addUserIndex = 0
val addListener = Runnable {
repeat(repsCount) {
logger.debug { "Adding listener #$addUserIndex" }
client.addConnectionListener(temporaryListeners[addUserIndex++ % temporaryListeners.size])
}
}
val kickAndReconnectUser = KickAndReconnectCallable(server.broker.serverControl, client, proxy)
var removerUserIndex = 0
val removeListener = Runnable {
repeat(repsCount) {
logger.debug { "Removing listener #$removerUserIndex" }
client.removeConnectionListener(temporaryListeners[removerUserIndex++ % temporaryListeners.size])
Thread.sleep(10)
}
}
val scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors())
listOf(addListener, removeListener).map { scheduledExecutor.scheduleAtFixedRate(it, 100, 100, TimeUnit.MILLISECONDS) }
val kickAndReconnectExecutor = Executors.newSingleThreadExecutor()
val kickUserSubmissions = (1..repsCount).map {
kickAndReconnectExecutor.submit(kickAndReconnectUser)
}
kickUserSubmissions.forEach {
try {
it.get(60, TimeUnit.SECONDS)
} catch (ex : TimeoutException) {
logger.warn("Timed out waiting for Future completion. " + threadDumpAsString())
throw ex
}
}
scheduledExecutor.shutdown()
kickAndReconnectExecutor.shutdown()
verify(permanentListener, never()).onPermanentFailure(any())
verify(permanentListener, times(repsCount)).onDisconnect(any())
verify(permanentListener, times(repsCount + 1)).onConnect(any())
temporaryListeners.forEach { tmpListener ->
verify(tmpListener, never()).onPermanentFailure(any())
verify(tmpListener, atLeastOnce()).onDisconnect(any())
verify(tmpListener, atLeastOnce()).onConnect(any())
}
}
}
}

View File

@ -1,8 +1,8 @@
package net.corda.client.rpc
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.client.rpc.internal.SerializationEnvironmentHelper
import net.corda.nodeapi.internal.rpc.client.AMQPClientSerializationScheme
import net.corda.client.rpc.reconnect.CouldNotStartFlowException
import net.corda.core.CordaInternal
@ -11,7 +11,6 @@ import net.corda.core.context.Trace
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.createInstancesOfClassesImplementing
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.serialization.SerializationCustomSerializer
@ -24,8 +23,6 @@ import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
import net.corda.serialization.internal.SerializationFactoryImpl
import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey
import net.corda.serialization.internal.amqp.SerializerFactory
import java.time.Duration
import java.util.*
import java.util.concurrent.ExecutorService
@ -33,7 +30,7 @@ import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
/**
* This class is essentially just a wrapper for an RPCConnection<CordaRPCOps> and can be treated identically.
* This class is essentially just a wrapper for an `RPCConnection<CordaRPCOps>` and can be treated identically.
*
* @see RPCConnection
*/
@ -460,37 +457,7 @@ class CordaRPCClient private constructor(
}
init {
try {
effectiveSerializationEnv
} catch (e: IllegalStateException) {
try {
val cache = Caffeine.newBuilder().maximumSize(128)
.build<SerializationFactoryCacheKey, SerializerFactory>().asMap()
// If the client has explicitly provided a classloader use this one to scan for custom serializers,
// otherwise use the current one.
val serializationClassLoader = this.classLoader ?: this.javaClass.classLoader
// If the client has explicitly provided a set of custom serializers, avoid performing any scanning and use these instead.
val discoveredCustomSerializers = customSerializers ?: createInstancesOfClassesImplementing(
serializationClassLoader,
SerializationCustomSerializer::class.java
)
val serializationWhitelists = ServiceLoader.load(
SerializationWhitelist::class.java,
serializationClassLoader
).toSet()
AMQPClientSerializationScheme.initialiseSerialization(
serializationClassLoader,
discoveredCustomSerializers,
serializationWhitelists,
cache
)
} catch (e: IllegalStateException) {
// Race e.g. two of these constructed in parallel, ignore.
}
}
SerializationEnvironmentHelper.ensureEffectiveSerializationEnvSet(classLoader, customSerializers)
}
private fun getRpcClient(): RPCClient<CordaRPCOps> {

View File

@ -0,0 +1,312 @@
package net.corda.client.rpc.ext
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.SerializationEnvironmentHelper
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.identity.CordaX500Name
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisTcpTransport
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicBoolean
/**
* An RPC client connects to the specified server and allows to make calls using specified remote interface to the server that perform various
* useful tasks. Please see the Client RPC section of [Corda Documentation](http://docs.corda.net) to learn more about how this API works.
* Only a brief description is provided here.
*
* Calling [start] returns an [RPCConnection] containing a proxy that allows making RPCs calls to the server.
* This is a blocking communication, and if the server throws an exception then it will be rethrown on the client. Proxies are thread safe and
* may be used to invoke multiple RPCs in parallel.
*
* RPC sends and receives are logged on the `net.corda.rpc` logger.
*
* In case of loss of connection to the server, the client will try to reconnect using the settings provided via
* [CordaRPCClientConfiguration]. If the client was created using a list of hosts via [haAddressPool], automatic failover will occur
* (the servers have to be started in HA mode). While attempting failover, current and future RPC calls will throw
* [RPCException].
*
* It is also possible to add [RPCConnectionListener]s event before connection is started to be notified about connection lifecycle.
* Please see documentation on [RPCConnectionListener] for more details.
*
* @param hostAndPort The network address to connect to.
* @param haAddressPool A list of [NetworkHostAndPort] representing the addresses of servers in HA mode.
* The client will attempt to connect to a live server by trying each address in the list. If the servers are not in
* HA mode, the client will round-robin from the beginning of the list and try all servers.
* @param rpcOpsClass [Class] instance of the [RPCOps] remote interface that will be used for communication.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param configuration An optional configuration used to tweak client behaviour.
* @param sslConfiguration An optional [ClientRpcSslOptions] used to enable secure communication with the server.
* @param classLoader a classloader, which will be used (if provided) to discover available [SerializationCustomSerializer]s
* and [SerializationWhitelist]s. If no classloader is provided, the classloader of the current class will be used by default
* for the aforementioned discovery process.
* @param customSerializers a set of [SerializationCustomSerializer]s to be used. If this parameter is specified, then no classpath scanning
* will be performed for custom serializers, the provided ones will be used instead. This parameter serves as a more user-friendly option
* to specify your serializers and disable the classpath scanning (e.g. for performance reasons).
* @param externalTrace external [Trace] for correlation.
* @param impersonatedActor the actor on behalf of which all the invocations will be made.
* @param targetLegalIdentity in case of multi-identity RPC endpoint specific legal identity to which the calls must be addressed.
*/
class MultiRPCClient<I : RPCOps> private constructor(
private val hostAndPort: NetworkHostAndPort?,
private val haAddressPool: List<NetworkHostAndPort>,
private val rpcOpsClass: Class<I>,
private val username: String,
private val password: String,
private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
private val sslConfiguration: ClientRpcSslOptions? = null,
private val classLoader: ClassLoader? = null,
private val customSerializers: Set<SerializationCustomSerializer<*, *>>? = null,
private val externalTrace: Trace? = null,
private val impersonatedActor: Actor? = null,
private val targetLegalIdentity: CordaX500Name? = null
) : AutoCloseable {
private companion object {
private val logger = contextLogger()
}
@JvmOverloads
constructor(
hostAndPort: NetworkHostAndPort,
rpcOpsClass: Class<I>,
username: String,
password: String,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
rpcOpsClass = rpcOpsClass,
username = username,
password = password,
configuration = configuration
)
constructor(
hostAndPort: NetworkHostAndPort,
rpcOpsClass: Class<I>,
username: String,
password: String,
classLoader: ClassLoader,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
) : this(
hostAndPort = hostAndPort,
rpcOpsClass = rpcOpsClass,
username = username,
password = password,
configuration = configuration,
sslConfiguration = null,
classLoader = classLoader
)
constructor(
hostAndPort: NetworkHostAndPort,
rpcOpsClass: Class<I>,
username: String,
password: String,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
rpcOpsClass = rpcOpsClass,
username = username,
password = password,
sslConfiguration = sslConfiguration,
classLoader = classLoader
)
@JvmOverloads
constructor(
hostAndPort: NetworkHostAndPort,
rpcOpsClass: Class<I>,
username: String,
password: String,
configuration: CordaRPCClientConfiguration,
sslConfiguration: ClientRpcSslOptions?,
classLoader: ClassLoader? = null
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
rpcOpsClass = rpcOpsClass,
username = username,
password = password,
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader
)
@JvmOverloads
constructor(
haAddressPool: List<NetworkHostAndPort>,
rpcOpsClass: Class<I>,
username: String,
password: String,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
) : this(
hostAndPort = null,
haAddressPool = haAddressPool,
rpcOpsClass = rpcOpsClass,
username = username,
password = password,
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader
)
@JvmOverloads
constructor(
hostAndPort: NetworkHostAndPort,
rpcOpsClass: Class<I>,
username: String,
password: String,
customSerializers: Set<SerializationCustomSerializer<*, *>>?,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
externalTrace: Trace? = null,
impersonatedActor: Actor? = null,
targetLegalIdentity: CordaX500Name? = null
) : this(
hostAndPort = hostAndPort,
haAddressPool = emptyList(),
rpcOpsClass = rpcOpsClass,
username = username,
password = password,
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader,
customSerializers = customSerializers,
externalTrace = externalTrace,
impersonatedActor = impersonatedActor,
targetLegalIdentity = targetLegalIdentity
)
@JvmOverloads
constructor(
haAddressPool: List<NetworkHostAndPort>,
rpcOpsClass: Class<I>,
username: String,
password: String,
customSerializers: Set<SerializationCustomSerializer<*, *>>?,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null,
externalTrace: Trace? = null,
impersonatedActor: Actor? = null,
targetLegalIdentity: CordaX500Name? = null
) : this(
hostAndPort = null,
haAddressPool = haAddressPool,
rpcOpsClass = rpcOpsClass,
username = username,
password = password,
configuration = configuration,
sslConfiguration = sslConfiguration,
classLoader = classLoader,
customSerializers = customSerializers,
externalTrace = externalTrace,
impersonatedActor = impersonatedActor,
targetLegalIdentity = targetLegalIdentity
)
init {
SerializationEnvironmentHelper.ensureEffectiveSerializationEnvSet(classLoader, customSerializers)
}
private val endpointString: String get() = hostAndPort?.toString() ?: haAddressPool.toString()
private val internalImpl: RPCClient<I> = createInternalRpcClient()
private val connectionFuture = CompletableFuture<RPCConnection<I>>()
private val connectionStarterThread = Executors.newSingleThreadExecutor(
ThreadFactoryBuilder().setNameFormat("RPCConnectionStarter-$username@$endpointString").build())
private val connectionStarted = AtomicBoolean(false)
private fun createInternalRpcClient(): RPCClient<I> {
val serializationContext = if (classLoader != null) {
AMQP_RPC_CLIENT_CONTEXT.withClassLoader(classLoader)
} else {
AMQP_RPC_CLIENT_CONTEXT
}
return when {
haAddressPool.isEmpty() -> RPCClient(
ArtemisTcpTransport.rpcConnectorTcpTransport(hostAndPort!!, config = sslConfiguration),
configuration,
serializationContext)
else -> {
RPCClient(haAddressPool, sslConfiguration, configuration, serializationContext)
}
}
}
/**
* Adds [RPCConnectionListener] to this [MultiRPCClient] to be informed about important connectivity events.
* @return `true` if the element has been added, `false` when listener is already contained in the set of listeners.
*/
fun addConnectionListener(listener: RPCConnectionListener<I>): Boolean {
return internalImpl.addConnectionListener(listener)
}
/**
* Removes [RPCConnectionListener] from this [MultiRPCClient].
*
* @return `true` if the element has been successfully removed; `false` if it was not present in the set of listeners.
*/
fun removeConnectionListener(listener: RPCConnectionListener<I>): Boolean {
return internalImpl.removeConnectionListener(listener)
}
/**
* Logs in to the target server and returns an active connection.
*
* It only makes sense to this method once. If it is called repeatedly it will return the same by reference [CompletableFuture]
*
* @return [CompletableFuture] containing [RPCConnection] or throwing [RPCException] if the server version is too low or if the server is not
* reachable within a reasonable timeout or if login credentials provided are incorrect.
*/
fun start(): CompletableFuture<RPCConnection<I>> {
if(connectionStarted.compareAndSet(false, true)) {
connectionStarterThread.submit {
try {
connectionFuture.complete(internalImpl.start(
rpcOpsClass, username, password, externalTrace, impersonatedActor, targetLegalIdentity))
} catch (ex: Throwable) {
logger.warn("Unable to start RPC connection", ex)
connectionFuture.completeExceptionally(ex)
}
// Do not wait for close, release the thread as soon as
connectionStarterThread.shutdown()
}
}
return connectionFuture
}
/**
* Stops the client and closes [RPCConnection] if it has been previously established
*/
fun stop() = close()
override fun close() {
connectionStarterThread.shutdownNow()
// Close connection if future is ready and was successful
if(connectionFuture.isDone && !connectionFuture.isCompletedExceptionally) {
connectionFuture.get().notifyServerAndClose()
}
}
}

View File

@ -0,0 +1,44 @@
package net.corda.client.rpc.ext
import net.corda.client.rpc.RPCConnection
import net.corda.core.messaging.RPCOps
/**
* A listener that can be attached to [MultiRPCClient] to be notified about important RPC connectivity events.
*/
interface RPCConnectionListener<I : RPCOps> {
/**
* Defines context information for events distributed.
*/
interface ConnectionContext<I : RPCOps> {
val userName: String
val connectionOpt: RPCConnection<I>?
val throwableOpt: Throwable?
}
/**
* This method will be called to inform that RPC connection is established. [ConnectionContext.connectionOpt] will not be `null`.
*
* If connection is lost RPC client will attempt to re-connect and if this is successful then this method will be called
* again with the **same** reference of [ConnectionContext.connectionOpt] as during initial connect. I.e. it is possible to say that once
* established [ConnectionContext.connectionOpt] stays constant during [onConnect]/[onDisconnect] cycles.
*/
fun onConnect(context: ConnectionContext<I>)
/**
* This method will be called to inform about connection loss. Since given RPC client may produce multiple [RPCConnection]s,
* [ConnectionContext.connectionOpt] will specify which connection is interrupted.
*/
fun onDisconnect(context: ConnectionContext<I>)
/**
* This is a terminal notification to inform that:
* - it has never been possible to connect due to incorrect credentials or endpoints addresses supplied. In this case
* [ConnectionContext.connectionOpt] will be `null`;
* or
* - no further reconnection will be performed as maximum number of attempts has been reached. In this case
* [ConnectionContext.connectionOpt] may not be `null`.
*/
fun onPermanentFailure(context: ConnectionContext<I>)
}

View File

@ -0,0 +1,57 @@
package net.corda.client.rpc.internal
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.ext.RPCConnectionListener
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
/**
* Internal helper class for distributing connectivity events to multiple [RPCConnectionListener]s.
* It retains some state to simplify construction of [RPCConnectionListener.ConnectionContext].
*/
internal class DistributionMux<I : RPCOps>(private val listeners: Iterable<RPCConnectionListener<I>>, private val userName: String) {
companion object {
private val logger = contextLogger()
private data class ConnectionContextImpl<I : RPCOps>(override val userName: String,
override val connectionOpt: RPCConnection<I>?,
override val throwableOpt: Throwable? = null) : RPCConnectionListener.ConnectionContext<I>
}
@Volatile
internal var connectionOpt: RPCConnection<I>? = null
internal fun onConnect() {
safeForEachListener {
onConnect(ConnectionContextImpl(userName, connectionOpt))
}
}
internal fun onDisconnect(throwableOpt: Throwable?) {
if (connectionOpt != null) {
safeForEachListener {
onDisconnect(ConnectionContextImpl(userName, connectionOpt, throwableOpt))
}
} else {
logger.debug { "Not distributing onDisconnect as connection never been established" }
}
}
internal fun onPermanentFailure(throwableOpt: Throwable?) {
safeForEachListener {
onPermanentFailure(ConnectionContextImpl(userName, connectionOpt, throwableOpt))
}
}
private fun safeForEachListener(action: RPCConnectionListener<I>.() -> Unit) {
listeners.forEach {
try {
it.action()
} catch (ex: Exception) {
logger.error("Exception during distribution to: $it", ex)
}
}
}
}

View File

@ -3,9 +3,9 @@ package net.corda.client.rpc.internal
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.UnrecoverableRPCException
import net.corda.client.rpc.ext.RPCConnectionListener
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.logElapsedTime
import net.corda.core.internal.uncheckedCast
@ -16,24 +16,28 @@ import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcConnectorTcpTransportsFromList
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcInternalClientTcpTransport
import net.corda.nodeapi.internal.RoundRobinConnectionPolicy
import net.corda.nodeapi.internal.config.SslConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import java.lang.reflect.Proxy
import java.util.concurrent.CopyOnWriteArraySet
/**
* This runs on the client JVM
* [RPCClient] is meant to run outside of Corda Node JVM and provide connectivity to a node using RPC protocol.
* Since Corda Node can expose multiple RPC interfaces, it is possible to specify which [RPCOps] interface should be used.
*
* When `haAddressPool` [RPCClient] will perform connectivity failover using parameters specified by [CordaRPCClientConfiguration].
* Whenever status of connection changes registered [RPCConnectionListener] will be informed about those events.
*/
class RPCClient<I : RPCOps>(
val transport: TransportConfiguration,
val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT,
val haPoolTransportConfigurations: List<TransportConfiguration> = emptyList()
private val transport: TransportConfiguration,
private val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
private val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT,
private val haPoolTransportConfigurations: List<TransportConfiguration> = emptyList()
) {
constructor(
hostAndPort: NetworkHostAndPort,
@ -49,6 +53,9 @@ class RPCClient<I : RPCOps>(
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) : this(rpcInternalClientTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext)
/**
* A way to create RPC connections to a pool of RPC addresses for resiliency
*/
constructor(
haAddressPool: List<NetworkHostAndPort>,
sslConfiguration: ClientRpcSslOptions? = null,
@ -61,6 +68,8 @@ class RPCClient<I : RPCOps>(
private val log = contextLogger()
}
private val listeners: MutableSet<RPCConnectionListener<I>> = CopyOnWriteArraySet()
fun start(
rpcOpsClass: Class<I>,
username: String,
@ -70,8 +79,6 @@ class RPCClient<I : RPCOps>(
targetLegalIdentity: CordaX500Name? = null
): RPCConnection<I> {
return log.logElapsedTime("Startup") {
val clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.${random63BitValue()}")
val serverLocator = (if (haPoolTransportConfigurations.isEmpty()) {
ActiveMQClient.createServerLocatorWithoutHA(transport)
} else {
@ -85,10 +92,14 @@ class RPCClient<I : RPCOps>(
reconnectAttempts = if (haPoolTransportConfigurations.isEmpty()) rpcConfiguration.maxReconnectAttempts else 0
minLargeMessageSize = rpcConfiguration.maxFileSize
isUseGlobalPools = nodeSerializationEnv != null
// By default RoundRobinConnectionLoadBalancingPolicy is used that picks first endpoint from the pool
// at random. This may be undesired and non-deterministic. For more information, see [RoundRobinConnectionPolicy]
connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName
}
val sessionId = Trace.SessionId.newInstance()
val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator, clientAddress,
rpcOpsClass, serializationContext, sessionId, externalTrace, impersonatedActor, targetLegalIdentity)
val distributionMux = DistributionMux(listeners, username)
val proxyHandler = RPCClientProxyHandler(rpcConfiguration, username, password, serverLocator,
rpcOpsClass, serializationContext, sessionId, externalTrace, impersonatedActor, targetLegalIdentity, distributionMux)
try {
proxyHandler.start()
val ops: I = uncheckedCast(Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler))
@ -101,7 +112,7 @@ class RPCClient<I : RPCOps>(
proxyHandler.setServerProtocolVersion(serverProtocolVersion)
log.debug("RPC connected, returning proxy")
object : RPCConnection<I> {
val connection = object : RPCConnection<I> {
override val proxy = ops
override val serverProtocolVersion = serverProtocolVersion
@ -122,12 +133,32 @@ class RPCClient<I : RPCOps>(
close(false)
}
}
} catch (exception: Throwable) {
distributionMux.connectionOpt = connection
distributionMux.onConnect()
connection
} catch (throwable: Throwable) {
proxyHandler.notifyServerAndClose()
serverLocator.close()
throw exception
distributionMux.onPermanentFailure(throwable)
throw throwable
}
}
}
}
/**
* Adds [RPCConnectionListener] to this [RPCClient] to be informed about important connectivity events.
* @return `true` if the element has been added, `false` when listener is already contained in the set of listeners.
*/
fun addConnectionListener(listener: RPCConnectionListener<I>) : Boolean {
return listeners.add(listener)
}
/**
* Removes [RPCConnectionListener] from this [RPCClient].
*
* @return `true` if the element has been successfully removed; `false` if it was not present in the set of listeners.
*/
fun removeConnectionListener(listener: RPCConnectionListener<I>) : Boolean {
return listeners.remove(listener)
}
}

View File

@ -14,6 +14,7 @@ import net.corda.client.rpc.internal.RPCUtils.isShutdownCmd
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
import net.corda.core.crypto.random63BitValue
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.LazyStickyPool
import net.corda.core.internal.LifeCycle
@ -100,13 +101,13 @@ internal class RPCClientProxyHandler(
private val rpcUsername: String,
private val rpcPassword: String,
private val serverLocator: ServerLocator,
private val clientAddress: SimpleString,
private val rpcOpsClass: Class<out RPCOps>,
serializationContext: SerializationContext,
private val sessionId: Trace.SessionId,
private val externalTrace: Trace?,
private val impersonatedActor: Actor?,
private val targetLegalIdentity: CordaX500Name?,
private val notificationDistributionMux: DistributionMux<out RPCOps>,
private val cacheFactory: NamedCacheFactory = ClientCacheFactory()
) : InvocationHandler {
@ -222,6 +223,7 @@ internal class RPCClientProxyHandler(
)
}
private var clientAddress: SimpleString? = null
private var sessionFactory: ClientSessionFactory? = null
private var producerSession: ClientSession? = null
private var consumerSession: ClientSession? = null
@ -329,7 +331,7 @@ internal class RPCClientProxyHandler(
try {
val serialisedArguments = (arguments?.toList() ?: emptyList()).serialize(context = serializationContextWithObservableContext)
val request = RPCApi.ClientToServer.RpcRequest(
clientAddress,
clientAddress!!,
methodFqn,
serialisedArguments,
replyId,
@ -519,6 +521,7 @@ internal class RPCClientProxyHandler(
// leak borrowed executors.
val observationExecutors = observationExecutorPool.close()
observationExecutors.forEach { it.shutdownNow() }
notificationDistributionMux.onDisconnect(null)
lifeCycle.justTransition(State.FINISHED)
}
@ -610,6 +613,7 @@ internal class RPCClientProxyHandler(
initSessions()
startSessions()
sendingEnabled.set(true)
notificationDistributionMux.onConnect()
break
}
@ -618,6 +622,7 @@ internal class RPCClientProxyHandler(
val errMessage = "Could not reconnect to the RPC server after trying $reconnectAttempt times." +
if (sessionFactory != null) "" else " It was never possible to to establish connection with any of the endpoints."
log.error(errMessage)
notificationDistributionMux.onPermanentFailure(IllegalStateException(errMessage))
}
}
@ -625,6 +630,8 @@ internal class RPCClientProxyHandler(
producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME)
consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, 16384)
clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$rpcUsername.${random63BitValue()}")
log.debug { "Client address: $clientAddress" }
consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
rpcConsumer = consumerSession!!.createConsumer(clientAddress)
rpcConsumer!!.setMessageHandler(this::artemisMessageHandler)
@ -655,8 +662,11 @@ internal class RPCClientProxyHandler(
replyFuture.setException(connectionFailureException)
}
log.debug { "rpcReplyMap size before clear: ${rpcReplyMap.size}" }
rpcReplyMap.clear()
log.debug { "callSiteMap size before clear: ${callSiteMap?.size}" }
callSiteMap?.clear()
notificationDistributionMux.onDisconnect(connectionFailureException)
}
override fun equals(other: Any?): Boolean {
@ -666,7 +676,6 @@ internal class RPCClientProxyHandler(
other as RPCClientProxyHandler
if (rpcUsername != other.rpcUsername) return false
if (clientAddress != other.clientAddress) return false
if (sessionId != other.sessionId) return false
if (targetLegalIdentity != other.targetLegalIdentity) return false
@ -675,7 +684,6 @@ internal class RPCClientProxyHandler(
override fun hashCode(): Int {
var result = rpcUsername.hashCode()
result = 31 * result + clientAddress.hashCode()
result = 31 * result + sessionId.hashCode()
result = 31 * result + (targetLegalIdentity?.hashCode() ?: 0)
return result

View File

@ -0,0 +1,49 @@
package net.corda.client.rpc.internal
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.core.internal.createInstancesOfClassesImplementing
import net.corda.core.serialization.SerializationCustomSerializer
import net.corda.core.serialization.SerializationWhitelist
import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.nodeapi.internal.rpc.client.AMQPClientSerializationScheme
import net.corda.serialization.internal.amqp.SerializationFactoryCacheKey
import net.corda.serialization.internal.amqp.SerializerFactory
import java.util.ServiceLoader
internal object SerializationEnvironmentHelper {
internal fun ensureEffectiveSerializationEnvSet(classLoader: ClassLoader?,
customSerializers: Set<SerializationCustomSerializer<*, *>>?) {
try {
effectiveSerializationEnv
} catch (e: IllegalStateException) {
try {
val cache = Caffeine.newBuilder().maximumSize(128)
.build<SerializationFactoryCacheKey, SerializerFactory>().asMap()
// If the client has explicitly provided a classloader use this one to scan for custom serializers,
// otherwise use the current one.
val serializationClassLoader = classLoader ?: this.javaClass.classLoader
// If the client has explicitly provided a set of custom serializers, avoid performing any scanning and use these instead.
val discoveredCustomSerializers = customSerializers ?: createInstancesOfClassesImplementing(
serializationClassLoader,
SerializationCustomSerializer::class.java
)
val serializationWhitelists = ServiceLoader.load(
SerializationWhitelist::class.java,
serializationClassLoader
).toSet()
AMQPClientSerializationScheme.initialiseSerialization(
serializationClassLoader,
discoveredCustomSerializers,
serializationWhitelists,
cache
)
} catch (e: IllegalStateException) {
// Race e.g. two of these constructed in parallel, ignore.
}
}
}
}

View File

@ -0,0 +1,91 @@
package net.corda.core.utilities
import java.lang.management.LockInfo
import java.lang.management.ManagementFactory
import java.lang.management.ThreadInfo
fun threadDumpAsString(): String {
val mxBean = ManagementFactory.getThreadMXBean()
val threadInfos: Array<ThreadInfo?> = mxBean.getThreadInfo(mxBean.allThreadIds, Integer.MAX_VALUE)
return "Thread Dump:\n" + threadInfos.filterNotNull().joinToString(separator = "\n") { ti -> ti.asString() }
}
/**
* Inspired by `ThreadInfo.toString`
*
* Returns a string representation of this thread info.
* The format of this string depends on the implementation.
* The returned string will typically include
* the [thread name][.getThreadName],
* the [thread ID][.getThreadId],
* its [state][.getThreadState],
* and a [stack trace][.getStackTrace] if any.
*
* @return a string representation of this thread info.
*/
fun ThreadInfo.asString(maxFrames : Int = 256): String {
val sb = StringBuilder("\"" + threadName + "\"" +
" Id=" + threadId + " " +
threadState)
if (lockName != null) {
sb.append(" on $lockName")
}
if (lockOwnerName != null) {
sb.append(" owned by \"" + lockOwnerName +
"\" Id=" + lockOwnerId)
}
if (isSuspended) {
sb.append(" (suspended)")
}
if (isInNative) {
sb.append(" (in native)")
}
sb.append('\n')
var i = 0
while (i < stackTrace.size && i < maxFrames) {
val ste: StackTraceElement = stackTrace.get(i)
sb.append("\tat $ste")
sb.append('\n')
if (i == 0 && lockInfo != null) {
when (threadState) {
Thread.State.BLOCKED -> {
sb.append("\t- blocked on $lockInfo")
sb.append('\n')
}
Thread.State.WAITING -> {
sb.append("\t- waiting on $lockInfo")
sb.append('\n')
}
Thread.State.TIMED_WAITING -> {
sb.append("\t- waiting on $lockInfo")
sb.append('\n')
}
else -> {
}
}
}
for (mi in lockedMonitors) {
if (mi.lockedStackDepth == i) {
sb.append("\t- locked $mi")
sb.append('\n')
}
}
i++
}
if (i < stackTrace.size) {
sb.append("\t...")
sb.append('\n')
}
val locks: Array<LockInfo> = getLockedSynchronizers()
if (locks.isNotEmpty()) {
sb.append("""
Number of locked synchronizers = ${locks.size}""")
sb.append('\n')
for (li in locks) {
sb.append("\t- $li")
sb.append('\n')
}
}
sb.append('\n')
return sb.toString()
}

View File

@ -170,6 +170,7 @@
<ID>ComplexMethod:StartedFlowTransition.kt$StartedFlowTransition$override fun transition(): TransitionResult</ID>
<ID>ComplexMethod:StatusTransitions.kt$StatusTransitions$ fun verify(tx: LedgerTransaction)</ID>
<ID>ComplexMethod:StringToMethodCallParser.kt$StringToMethodCallParser$ @Throws(UnparseableCallException::class) fun parse(target: T?, command: String): ParsedMethodCall</ID>
<ID>ComplexMethod:ThreadDumpUtils.kt$ fun ThreadInfo.asString(maxFrames : Int = 256): String</ID>
<ID>ComplexMethod:TlsDiffAlgorithmsTest.kt$TlsDiffAlgorithmsTest$@Test(timeout=300_000) fun testClientServerTlsExchange()</ID>
<ID>ComplexMethod:TlsDiffProtocolsTest.kt$TlsDiffProtocolsTest$@Test(timeout=300_000) fun testClientServerTlsExchange()</ID>
<ID>ComplexMethod:TransactionUtils.kt$ fun createComponentGroups(inputs: List&lt;StateRef&gt;, outputs: List&lt;TransactionState&lt;ContractState&gt;&gt;, commands: List&lt;Command&lt;*&gt;&gt;, attachments: List&lt;SecureHash&gt;, notary: Party?, timeWindow: TimeWindow?, references: List&lt;StateRef&gt;, networkParametersHash: SecureHash?): List&lt;ComponentGroup&gt;</ID>
@ -677,6 +678,8 @@
<ID>LongParameterList:QueryCriteria.kt$QueryCriteria.VaultQueryCriteria$( status: Vault.StateStatus = Vault.StateStatus.UNCONSUMED, contractStateTypes: Set&lt;Class&lt;out ContractState&gt;&gt;? = null, stateRefs: List&lt;StateRef&gt;? = null, notary: List&lt;AbstractParty&gt;? = null, softLockingCondition: SoftLockingCondition? = null, timeCondition: TimeCondition? = null, relevancyStatus: Vault.RelevancyStatus = Vault.RelevancyStatus.ALL, constraintTypes: Set&lt;Vault.ConstraintInfo.Type&gt; = emptySet(), constraints: Set&lt;Vault.ConstraintInfo&gt; = emptySet(), participants: List&lt;AbstractParty&gt;? = null, externalIds: List&lt;UUID&gt; = emptyList() )</ID>
<ID>LongParameterList:QueryCriteria.kt$QueryCriteria.VaultQueryCriteria$( status: Vault.StateStatus = this.status, contractStateTypes: Set&lt;Class&lt;out ContractState&gt;&gt;? = this.contractStateTypes, stateRefs: List&lt;StateRef&gt;? = this.stateRefs, notary: List&lt;AbstractParty&gt;? = this.notary, softLockingCondition: SoftLockingCondition? = this.softLockingCondition, timeCondition: TimeCondition? = this.timeCondition )</ID>
<ID>LongParameterList:RPCClient.kt$RPCClient$( rpcOpsClass: Class&lt;I&gt;, username: String, password: String, externalTrace: Trace? = null, impersonatedActor: Actor? = null, targetLegalIdentity: CordaX500Name? = null )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( rpcOpsClass: Class&lt;I&gt;, haAddressPool: List&lt;NetworkHostAndPort&gt;, username: String = rpcTestUser.username, password: String = rpcTestUser.password, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, listeners: Iterable&lt;RPCConnectionListener&lt;I&gt;&gt; = emptyList() )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( rpcOpsClass: Class&lt;I&gt;, rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, listeners: Iterable&lt;RPCConnectionListener&lt;I&gt;&gt; = emptyList() )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, listOps: List&lt;I&gt;, brokerHandle: RpcBrokerHandle, queueDrainTimeout: Duration = 5.seconds )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I, brokerHandle: RpcBrokerHandle, queueDrainTimeout: Duration = 5.seconds )</ID>
<ID>LongParameterList:RPCDriver.kt$RPCDriverDSL$( rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I, queueDrainTimeout: Duration = 5.seconds )</ID>
@ -759,7 +762,6 @@
<ID>MagicNumber:ConnectionStateMachine.kt$ConnectionStateMachine$1500</ID>
<ID>MagicNumber:ConnectionStateMachine.kt$ConnectionStateMachine$4</ID>
<ID>MagicNumber:CordaPersistence.kt$DatabaseConfig.Defaults$100L</ID>
<ID>MagicNumber:CordaRPCClient.kt$CordaRPCClient$128</ID>
<ID>MagicNumber:CordaRPCClient.kt$CordaRPCClientConfiguration$3</ID>
<ID>MagicNumber:CordaRPCClient.kt$CordaRPCClientConfiguration$5</ID>
<ID>MagicNumber:CordaSSHAuthInfo.kt$CordaSSHAuthInfo$10</ID>
@ -1099,6 +1101,7 @@
<ID>MagicNumber:SecureArtemisConfiguration.kt$SecureArtemisConfiguration$16</ID>
<ID>MagicNumber:SecureHash.kt$SecureHash.Companion$32</ID>
<ID>MagicNumber:SecureHash.kt$SecureHash.SHA256$32</ID>
<ID>MagicNumber:SerializationEnvironmentHelper.kt$SerializationEnvironmentHelper$128</ID>
<ID>MagicNumber:ShutdownManager.kt$ShutdownManager$5</ID>
<ID>MagicNumber:ShutdownManager.kt$ShutdownManager$60</ID>
<ID>MagicNumber:SimmFlow.kt$100</ID>
@ -1446,6 +1449,7 @@
<ID>TooGenericExceptionCaught:DemoBenchView.kt$DemoBenchView$e: Exception</ID>
<ID>TooGenericExceptionCaught:DeserializationInput.kt$DeserializationInput$e: Exception</ID>
<ID>TooGenericExceptionCaught:DeserializeSimpleTypesTests.kt$DeserializeSimpleTypesTests$e: Exception</ID>
<ID>TooGenericExceptionCaught:DistributionMux.kt$DistributionMux$ex: Exception</ID>
<ID>TooGenericExceptionCaught:DockerInstantiator.kt$DockerInstantiator$e: Exception</ID>
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$DriverDSLImpl$e: Exception</ID>
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$DriverDSLImpl.Companion$th: Throwable</ID>
@ -1494,6 +1498,7 @@
<ID>TooGenericExceptionCaught:MockAttachmentStorage.kt$MockAttachmentStorage$e: Exception</ID>
<ID>TooGenericExceptionCaught:MockCryptoService.kt$MockCryptoService$e: Exception</ID>
<ID>TooGenericExceptionCaught:MockNodeMessagingService.kt$MockNodeMessagingService$e: Exception</ID>
<ID>TooGenericExceptionCaught:MultiRPCClient.kt$MultiRPCClient$ex: Throwable</ID>
<ID>TooGenericExceptionCaught:MyCustomNotaryService.kt$MyValidatingNotaryFlow$e: Exception</ID>
<ID>TooGenericExceptionCaught:NamedCacheTest.kt$NamedCacheTest$e: Exception</ID>
<ID>TooGenericExceptionCaught:NettyTestHandler.kt$NettyTestHandler$e: Throwable</ID>
@ -1527,7 +1532,7 @@
<ID>TooGenericExceptionCaught:QuasarInstrumentationHook.kt$QuasarInstrumentationHook$throwable: Throwable</ID>
<ID>TooGenericExceptionCaught:R3Pty.kt$R3Pty$e: Exception</ID>
<ID>TooGenericExceptionCaught:RPCApi.kt$RPCApi.ServerToClient.Companion$e: Exception</ID>
<ID>TooGenericExceptionCaught:RPCClient.kt$RPCClient$exception: Throwable</ID>
<ID>TooGenericExceptionCaught:RPCClient.kt$RPCClient$throwable: Throwable</ID>
<ID>TooGenericExceptionCaught:RPCClientProxyHandler.kt$RPCClientProxyHandler$e: Exception</ID>
<ID>TooGenericExceptionCaught:RPCClientProxyHandler.kt$RPCClientProxyHandler$e: RuntimeException</ID>
<ID>TooGenericExceptionCaught:RPCPermissionResolver.kt$RPCPermissionResolver.InterfaceMethodMapCacheLoader$ex: Exception</ID>
@ -1659,6 +1664,7 @@
<ID>TooManyFunctions:QueryCriteriaUtils.kt$Builder</ID>
<ID>TooManyFunctions:RPCApi.kt$net.corda.nodeapi.RPCApi.kt</ID>
<ID>TooManyFunctions:RPCClientProxyHandler.kt$RPCClientProxyHandler : InvocationHandler</ID>
<ID>TooManyFunctions:RPCDriver.kt$RPCDriverDSL : InternalDriverDSL</ID>
<ID>TooManyFunctions:RPCServer.kt$RPCServer</ID>
<ID>TooManyFunctions:SSLHelper.kt$net.corda.nodeapi.internal.protonwrapper.netty.SSLHelper.kt</ID>
<ID>TooManyFunctions:SerializationHelper.kt$net.corda.serialization.internal.amqp.SerializationHelper.kt</ID>

View File

@ -0,0 +1,131 @@
package net.corda.node.multiRpc
import com.nhaarman.mockito_kotlin.argThat
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.times
import com.nhaarman.mockito_kotlin.verify
import net.corda.client.rpc.ConnectionFailureException
import net.corda.client.rpc.ext.MultiRPCClient
import net.corda.client.rpc.ext.RPCConnectionListener
import net.corda.core.internal.messaging.AttachmentTrustInfoRPCOps
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.services.NetworkMapCache
import net.corda.core.serialization.internal.SerializationEnvironment
import net.corda.core.serialization.internal._rpcClientSerializationEnv
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions.Companion.all
import net.corda.testing.common.internal.eventually
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.node.User
import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.Observer
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertSame
class MultiRpcClientTest {
companion object {
private fun ensureSerialisationEnvNull() {
// Ensure that RPC client serialisation environment is definitely not set
if (_rpcClientSerializationEnv.get() != null) {
_rpcClientSerializationEnv.set(null)
}
}
}
private var prevRpcClientSerializationEnv: SerializationEnvironment? = null
@Before
fun setup() {
prevRpcClientSerializationEnv = _rpcClientSerializationEnv.get()
ensureSerialisationEnvNull()
}
@After
fun after() {
ensureSerialisationEnvNull()
// Restore something that was changed during setup
prevRpcClientSerializationEnv?.let { _rpcClientSerializationEnv.set(prevRpcClientSerializationEnv) }
}
@Test(timeout = 300_000)
fun `can connect to custom RPC interface`() {
// Allocate named port to be used for RPC interaction
val rpcAddress = incrementalPortAllocation().nextHostAndPort()
// Create a specific RPC user
val rpcUser = User("MultiRpcClientTest", "MultiRpcClientTestPwd", setOf(all()))
// Create client with RPC address specified
val client = MultiRPCClient(rpcAddress, AttachmentTrustInfoRPCOps::class.java, rpcUser.username, rpcUser.password)
// Ensure that RPC client definitely sets serialisation environment
assertNotNull(_rpcClientSerializationEnv.get())
// Right from the start attach a listener such that it will be informed of all the activity happening for this RPC client
val listener = mock<RPCConnectionListener<AttachmentTrustInfoRPCOps>>()
client.addConnectionListener(listener)
client.use {
// Starting node out-of-process to ensure it is completely independent from RPC client
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = false)) {
startNode(providedName = ALICE_NAME,
defaultParameters = NodeParameters(rpcAddress = rpcAddress, rpcUsers = listOf(rpcUser))).getOrThrow()
val connFuture = client.start()
eventually(duration = 60.seconds) {
verify(listener, times(1)).onConnect(argThat { connectionOpt === connFuture.get() })
}
val conn = connFuture.get()
conn.use {
assertNotNull(it.proxy.attachmentTrustInfos)
}
verify(listener, times(1)).onDisconnect(argThat { connectionOpt === conn && throwableOpt == null })
// Ensuring that calling start even after close will result in the same future
assertSame(connFuture, client.start())
}
}
}
@Test(timeout = 300_000)
fun `ensure onError populated on disconnect`() {
// Allocate named port to be used for RPC interaction
val rpcAddress = incrementalPortAllocation().nextHostAndPort()
// Create a specific RPC user
val rpcUser = User("MultiRpcClientTest2", "MultiRpcClientTestPwd2", setOf(all()))
// Create client with RPC address specified
val client = MultiRPCClient(rpcAddress, CordaRPCOps::class.java, rpcUser.username, rpcUser.password)
val observer = mock<Observer<NetworkMapCache.MapChange>>()
client.use {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = false)) {
startNode(providedName = ALICE_NAME,
defaultParameters = NodeParameters(rpcAddress = rpcAddress, rpcUsers = listOf(rpcUser))).getOrThrow()
val connFuture = client.start()
val conn = connFuture.get()
val nmFeed = conn.proxy.networkMapFeed()
assertEquals(ALICE_NAME, nmFeed.snapshot.single().legalIdentities.single().name)
nmFeed.updates.subscribe(observer)
}
}
eventually {
verify(observer, times(1)).onError(argThat { this as? ConnectionFailureException != null })
}
}
}

View File

@ -3,6 +3,7 @@ package net.corda.testing.driver
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
@ -22,6 +23,7 @@ import net.corda.testing.node.User
* managed by the [DriverDSL].
* @property logLevelOverride log level to be passed as parameter to an out of process node. ERROR, WARN, INFO, DEBUG, TRACE. This overrides debug port
* log level argument.
* @property rpcAddress optional override for RPC address on which node will be accepting RPC connections from the clients. Port provided must be vacant.
*/
@Suppress("unused")
data class NodeParameters(
@ -33,7 +35,8 @@ data class NodeParameters(
val maximumHeapSize: String = System.getenv("DRIVER_NODE_MEMORY") ?: "512m",
val additionalCordapps: Collection<TestCordapp> = emptySet(),
val flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>> = emptyMap(),
val logLevelOverride: String? = null
val logLevelOverride: String? = null,
val rpcAddress: NetworkHostAndPort? = null
) {
/**
* Create a new node parameters object with default values. Each parameter can be specified with its wither method which returns a copy

View File

@ -227,10 +227,6 @@ class DriverDSLImpl(
}
}
/**
* @param pollInterval the interval to wait between attempting to connect, if
* a connection attempt fails.
*/
private fun establishRpc(config: NodeConfig,
processDeathFuture: CordaFuture<out Process>): CordaFuture<CordaRPCOps> {
val rpcAddress = config.corda.rpcOptions.address
@ -297,7 +293,7 @@ class DriverDSLImpl(
p2pAddress: NetworkHostAndPort = portAllocation.nextHostAndPort()
): NodeConfig {
val baseDirectory = baseDirectory(providedName).createDirectories()
val rpcAddress = portAllocation.nextHostAndPort()
val rpcAddress = parameters.rpcAddress ?: portAllocation.nextHostAndPort()
val rpcAdminAddress = portAllocation.nextHostAndPort()
val users = parameters.rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) }
val czUrlConfig = when (compatibilityZone) {

View File

@ -2,7 +2,9 @@ package net.corda.testing.node.internal
import net.corda.client.mock.Generator
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.ext.RPCConnectionListener
import net.corda.nodeapi.internal.rpc.client.AMQPClientSerializationScheme
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.AuthServiceId
@ -89,15 +91,25 @@ data class RpcBrokerHandle(
val hostAndPort: NetworkHostAndPort?,
/** null if this is an InVM broker */
val clientTransportConfiguration: TransportConfiguration,
val serverControl: ActiveMQServerControl
val serverControl: ActiveMQServerControl,
val shutdown: () -> Unit
)
data class RpcServerHandle(
val broker: RpcBrokerHandle,
val rpcServer: RPCServer
)
) {
fun shutdown() {
rpcServer.close()
broker.shutdown()
}
}
val rpcTestUser = User("user1", "test", permissions = emptySet())
// A separate user for RPC server is necessary as there are scenarios that call `ActiveMQServerControl.closeConnectionsForUser`
// to test disconnect/failover. If there is only a single Artemis broker user, `ActiveMQServerControl.closeConnectionsForUser` will do
// damage to the internals of `RPCServer` rendering it unusable.
val rpcServerUser = User("rpcServer", "rpcServerPassword", permissions = emptySet())
val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality = "Nowhere", country = "GB")
// Use a global pool so that we can run RPC tests in parallel
@ -156,7 +168,7 @@ fun <A> rpcDriver(
)
}
private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityManager3 {
private class UserSetSecurityManager(val userSet: Set<User>) : ActiveMQSecurityManager3 {
override fun validateUser(user: String?, password: String?) = isValid(user, password)
override fun validateUserAndRole(user: String?, password: String?, roles: MutableSet<Role>?, checkType: CheckType?) = isValid(user, password)
override fun validateUser(user: String?, password: String?, connection: RemotingConnection?): String? {
@ -168,7 +180,7 @@ private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityMan
}
private fun isValid(user: String?, password: String?): Boolean {
return rpcUser.username == user && rpcUser.password == password
return userSet.any { it.username == user && it.password == password }
}
private fun validate(user: String?, password: String?): String? {
@ -361,13 +373,37 @@ data class RPCDriverDSL(
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
): CordaFuture<I> {
return startRpcClient(rpcOpsClass, rpcAddress, username, password, configuration, emptyList()).map { it.first.proxy }
}
/**
* Starts a Netty RPC client.
*
* @param rpcOpsClass The [Class] of the RPC interface.
* @param rpcAddress The address of the RPC server to connect to.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param configuration The RPC client configuration.
* @param listeners [RPCConnectionListener]s to be attached to the [RPCClient]
*/
fun <I : RPCOps> startRpcClient(
rpcOpsClass: Class<I>,
rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
listeners: Iterable<RPCConnectionListener<I>> = emptyList()
): CordaFuture<Pair<RPCConnection<I>, RPCClient<I>>> {
return driverDSL.executorService.fork {
val client = RPCClient<I>(ArtemisTcpTransport.rpcConnectorTcpTransport(rpcAddress, null), configuration)
listeners.forEach {
client.addConnectionListener(it)
}
val connection = client.start(rpcOpsClass, username, password, externalTrace)
driverDSL.shutdownManager.registerShutdown {
connection.close()
}
connection.proxy
connection to client
}
}
@ -387,13 +423,37 @@ data class RPCDriverDSL(
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
): CordaFuture<I> {
return startRpcClient(rpcOpsClass, haAddressPool, username, password, configuration, emptyList()).map { it.first.proxy }
}
/**
* Starts a Netty RPC client.
*
* @param rpcOpsClass The [Class] of the RPC interface.
* @param haAddressPool The addresses of the RPC servers(configured in HA mode) to connect to.
* @param username The username to authenticate with.
* @param password The password to authenticate with.
* @param configuration The RPC client configuration.
* @param listeners listeners to be attached upon creation
*/
fun <I : RPCOps> startRpcClient(
rpcOpsClass: Class<I>,
haAddressPool: List<NetworkHostAndPort>,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
listeners: Iterable<RPCConnectionListener<I>> = emptyList()
): CordaFuture<Pair<RPCConnection<I>, RPCClient<I>>> {
return driverDSL.executorService.fork {
val client = RPCClient<I>(haAddressPool, null, configuration)
listeners.forEach {
client.addConnectionListener(it)
}
val connection = client.start(rpcOpsClass, username, password, externalTrace)
driverDSL.shutdownManager.registerShutdown {
connection.close()
}
connection.proxy
connection to client
}
}
@ -451,7 +511,7 @@ data class RPCDriverDSL(
addressMustNotBeBound(driverDSL.executorService, hostAndPort)
return driverDSL.executorService.fork {
val artemisConfig = createRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient, driverDSL.driverDirectory / serverName, hostAndPort)
val server = ActiveMQServerImpl(artemisConfig, SingleUserSecurityManager(rpcUser))
val server = ActiveMQServerImpl(artemisConfig, UserSetSecurityManager(setOf(rpcUser, rpcServerUser)))
server.start()
driverDSL.shutdownManager.registerShutdown {
server.stop()
@ -460,7 +520,8 @@ data class RPCDriverDSL(
RpcBrokerHandle(
hostAndPort = hostAndPort,
clientTransportConfiguration = createNettyClientTransportConfiguration(hostAndPort),
serverControl = server.activeMQServerControl
serverControl = server.activeMQServerControl,
shutdown = { server.stop() }
)
}
}
@ -474,7 +535,7 @@ data class RPCDriverDSL(
val artemisConfig = createInVmRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient)
val server = EmbeddedActiveMQ()
server.setConfiguration(artemisConfig)
server.setSecurityManager(SingleUserSecurityManager(rpcUser))
server.setSecurityManager(UserSetSecurityManager(setOf(rpcUser, rpcServerUser)))
server.start()
driverDSL.shutdownManager.registerShutdown {
server.activeMQServer.stop()
@ -483,7 +544,8 @@ data class RPCDriverDSL(
RpcBrokerHandle(
hostAndPort = null,
clientTransportConfiguration = inVmClientTransportConfiguration,
serverControl = server.activeMQServer.activeMQServerControl
serverControl = server.activeMQServer.activeMQServerControl,
shutdown = { server.stop() }
)
}
}
@ -509,12 +571,14 @@ data class RPCDriverDSL(
minLargeMessageSize = MAX_MESSAGE_SIZE
isUseGlobalPools = false
}
val rpcSecurityManager = RPCSecurityManagerImpl.fromUserList(users = listOf(InternalUser(rpcUser.username,
rpcUser.password, rpcUser.permissions)), id = AuthServiceId("TEST_SECURITY_MANAGER"))
val rpcSecurityManager = RPCSecurityManagerImpl.fromUserList(users = listOf(
InternalUser(rpcUser.username, rpcUser.password, rpcUser.permissions),
InternalUser(rpcServerUser.username, rpcServerUser.password, rpcServerUser.permissions)),
id = AuthServiceId("TEST_SECURITY_MANAGER"))
val rpcServer = RPCServer(
listOps,
rpcUser.username,
rpcUser.password,
rpcServerUser.username,
rpcServerUser.password,
locator,
rpcSecurityManager,
nodeLegalName,