From fe313951eafe3650458403236bde70c6d4fb85bb Mon Sep 17 00:00:00 2001 From: Tommy Lillehagen Date: Wed, 13 Jun 2018 15:00:46 +0100 Subject: [PATCH] CORDA-1609 - Don't use reserved keyword as method name As reported in [CORDA-1609](https://r3-cev.atlassian.net/browse/CORDA-1609), `CordaRPCClientConfiguration.default` is not accessible from Java since `default` is a reserved keyword. As part of the refactor made in #2831, `CordaRPCClientConfiguration` went from being a data class to an interface with a backing implementation of type `CordaRPCClientConfigurationImpl`. This resulted in Java users having to rewrite code that was on the form: ```java final CordaRPCClient client = new CordaRPCClient( nodeAddress, CordaRPCClientConfiguration.DEFAULT ); ``` to something like this: ```java final CordaRPCClient client = new CordaRPCClient( nodeAddress, CordaRPCClientConfiguration.Companion.default() ); ``` However, this does not work. The user would get a compilation error because `default` is a reserved keyword in Java. Since `CordaRPCClientConfiguration` has been made an interface, there is no easy way of introducing a static final field on the interface from Kotlin. Consequently, I've changed this back to using a `class` with a static field named `DEFAULT` instead of the static method `default()`. It should be noted that `default()` / `DEFAULT` is currently only used internally to pass in default values in `CordaRPCClient.kt` and `CordaRPCClientUtils.kt`. That said, it is exposed as part of our API surface and consequently shouldn't be broken. The latter means that in the above example, the user would actually not have to provide the parameter at all: ```java final CordaRPCClient client = new CordaRPCClient(nodeAddress); ``` As can be seen from the definition of `CordaRPCClient`: ```kotlin class CordaRPCClient private constructor(...) { @JvmOverloads constructor( hostAndPort: NetworkHostAndPort, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ) : this(hostAndPort, configuration, null) ``` The mentioned [refactor](https://github.com/corda/corda/commit/7a077e76f0cdf457cd40033fb0da594805a5951e#diff-0948c125db93a22263eb81eaf3161c17R65) did not make it into the 3.1 release, so from an API-stability perspective, this change can be applied without affecting our commitment to a backwards compatible API.. --- .../client/jfx/model/NodeMonitorModel.kt | 20 +- .../corda/client/rpc/CordaRPCClientTest.kt | 6 +- .../net/corda/client/rpc/RPCStabilityTests.kt | 15 +- .../net/corda/client/rpc/CordaRPCClient.kt | 203 ++++++++++++++---- .../rpc/internal/CordaRPCClientUtils.kt | 4 +- .../corda/client/rpc/internal/RPCClient.kt | 43 +--- .../net/corda/client/rpc/AbstractRPCTest.kt | 5 +- .../corda/client/rpc/RPCConcurrencyTests.kt | 5 +- .../corda/client/rpc/RPCPerformanceTests.kt | 19 +- .../main/kotlin/net/corda/behave/node/Node.kt | 6 +- .../kotlin/net/corda/node/internal/Node.kt | 2 +- .../node/services/messaging/RPCServer.kt | 2 +- .../node/services/rpc/ArtemisRpcTests.kt | 2 +- .../corda/testing/node/internal/RPCDriver.kt | 20 +- .../net/corda/tools/shell/InteractiveShell.kt | 14 +- 15 files changed, 221 insertions(+), 145 deletions(-) diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index 13e0722b91..07cb4f3797 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -71,7 +71,7 @@ class NodeMonitorModel { // Only execute using "runLater()" if JavaFX been initialized. // It may not be initialized in the unit test. // Also if we are already in the JavaFX thread - perform direct invocation without postponing it. - if(initialized.value.get() && !Platform.isFxApplicationThread()) { + if (initialized.value.get() && !Platform.isFxApplicationThread()) { Platform.runLater(op) } else { op() @@ -98,7 +98,7 @@ class NodeMonitorModel { // Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity. proxyObservable.addListener { _, _, wrapper -> - if(wrapper != null) { + if (wrapper != null) { val proxy = wrapper.cordaRPCOps // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), @@ -134,7 +134,8 @@ class NodeMonitorModel { } val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate -> if (stateMachineUpdate is StateMachineUpdate.Added) { - ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty() + ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) + ?: Observable.empty() } else { Observable.empty() } @@ -183,29 +184,28 @@ class NodeMonitorModel { logger.info("Connecting to: $nodeHostAndPort") val client = CordaRPCClient( nodeHostAndPort, - object : CordaRPCClientConfiguration { - override val connectionMaxRetryInterval = retryInterval - } + CordaRPCClientConfiguration.DEFAULT.copy( + connectionMaxRetryInterval = retryInterval + ) ) val _connection = client.start(username, password) // Check connection is truly operational before returning it. val nodeInfo = _connection.proxy.nodeInfo() require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty()) _connection - } catch(secEx: ActiveMQException) { + } catch (secEx: ActiveMQException) { // Happens when: // * incorrect credentials provided; // * incorrect endpoint specified; // - no point to retry connecting. throw secEx - } - catch(th: Throwable) { + } catch (th: Throwable) { // Deliberately not logging full stack trace as it will be full of internal stacktraces. logger.info("Exception upon establishing connection: " + th.message) null } - if(connection != null) { + if (connection != null) { logger.info("Connection successfully established with: $nodeHostAndPort") return connection } diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index bd0d380cc7..8c00a925ee 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -51,9 +51,9 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C @Before fun setUp() { node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, object : CordaRPCClientConfiguration { - override val maxReconnectAttempts = 5 - }) + client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, CordaRPCClientConfiguration.DEFAULT.copy( + maxReconnectAttempts = 5 + )) identity = node.info.identityFromX500Name(ALICE_NAME) } diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index fd322a291f..69303e5f09 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -1,7 +1,6 @@ package net.corda.client.rpc import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.context.Trace import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork @@ -106,7 +105,7 @@ class RPCStabilityTests { Try.on { startRpcClient( server.get().broker.hostAndPort!!, - configuration = CordaRPCClientConfigurationImpl.default.copy(minimumServerProtocolVersion = 1) + configuration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = 1) ).get() } } @@ -129,7 +128,7 @@ class RPCStabilityTests { rpcDriver { fun startAndCloseServer(broker: RpcBrokerHandle) { startRpcServerWithBrokerRunning( - configuration = RPCServerConfiguration.default, + configuration = RPCServerConfiguration.DEFAULT, ops = DummyOps, brokerHandle = broker ).rpcServer.close() @@ -150,7 +149,7 @@ class RPCStabilityTests { @Test fun `rpc client close doesnt leak broker resources`() { rpcDriver { - val server = startRpcServer(configuration = RPCServerConfiguration.default, ops = DummyOps).get() + val server = startRpcServer(configuration = RPCServerConfiguration.DEFAULT, ops = DummyOps).get() RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close() val initial = server.broker.getStats() repeat(100) { @@ -241,7 +240,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() // Set retry interval to 1s to reduce test duration - val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds) + val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -266,7 +265,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() // Set retry interval to 1s to reduce test duration - val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) + val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -298,7 +297,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() - val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) + val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -453,7 +452,7 @@ class RPCStabilityTests { } } val server = startRpcServer( - configuration = RPCServerConfiguration.default.copy( + configuration = RPCServerConfiguration.DEFAULT.copy( reapInterval = 100.millis ), ops = trackSubscriberOpsImpl diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 7e03a0eda0..4892a4e94b 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -1,6 +1,5 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.context.Actor @@ -10,6 +9,9 @@ import net.corda.core.serialization.internal.effectiveSerializationEnv import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport import net.corda.core.messaging.ClientRpcSslOptions +import net.corda.core.utilities.days +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT import java.time.Duration @@ -24,46 +26,159 @@ class CordaRPCConnection internal constructor(connection: RPCConnection + */ + open val maxReconnectAttempts: Int = unlimitedReconnectAttempts, + + /** + * Maximum file size, in bytes. + */ + open val maxFileSize: Int = 10485760, + // 10 MiB maximum allowed file size for attachments, including message headers. + // TODO: acquire this value from Network Map when supported. + + /** + * The cache expiry of a deduplication watermark per client. + */ + open val deduplicationCacheExpiry: Duration = 1.days + +) { companion object { - fun default(): CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default + + private const val unlimitedReconnectAttempts = -1 + + @JvmField + val DEFAULT: CordaRPCClientConfiguration = CordaRPCClientConfiguration() + } + + /** + * Create a new copy of a configuration object with zero or more parameters modified. + */ + @JvmOverloads + fun copy( + minimumServerProtocolVersion: Int = this.minimumServerProtocolVersion, + trackRpcCallSites: Boolean = this.trackRpcCallSites, + reapInterval: Duration = this.reapInterval, + observationExecutorPoolSize: Int = this.observationExecutorPoolSize, + cacheConcurrencyLevel: Int = this.cacheConcurrencyLevel, + connectionRetryInterval: Duration = this.connectionRetryInterval, + connectionRetryIntervalMultiplier: Double = this.connectionRetryIntervalMultiplier, + connectionMaxRetryInterval: Duration = this.connectionMaxRetryInterval, + maxReconnectAttempts: Int = this.maxReconnectAttempts, + maxFileSize: Int = this.maxFileSize, + deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry + ): CordaRPCClientConfiguration { + return CordaRPCClientConfiguration( + minimumServerProtocolVersion, + trackRpcCallSites, + reapInterval, + observationExecutorPoolSize, + cacheConcurrencyLevel, + connectionRetryInterval, + connectionRetryIntervalMultiplier, + connectionMaxRetryInterval, + maxReconnectAttempts, + maxFileSize, + deduplicationCacheExpiry + ) + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as CordaRPCClientConfiguration + if (minimumServerProtocolVersion != other.minimumServerProtocolVersion) return false + if (trackRpcCallSites != other.trackRpcCallSites) return false + if (reapInterval != other.reapInterval) return false + if (observationExecutorPoolSize != other.observationExecutorPoolSize) return false + if (cacheConcurrencyLevel != other.cacheConcurrencyLevel) return false + if (connectionRetryInterval != other.connectionRetryInterval) return false + if (connectionRetryIntervalMultiplier != other.connectionRetryIntervalMultiplier) return false + if (connectionMaxRetryInterval != other.connectionMaxRetryInterval) return false + if (maxReconnectAttempts != other.maxReconnectAttempts) return false + if (maxFileSize != other.maxFileSize) return false + if (deduplicationCacheExpiry != other.deduplicationCacheExpiry) return false + + return true + } + + override fun hashCode(): Int { + var result = minimumServerProtocolVersion + result = 31 * result + trackRpcCallSites.hashCode() + result = 31 * result + reapInterval.hashCode() + result = 31 * result + observationExecutorPoolSize + result = 31 * result + cacheConcurrencyLevel + result = 31 * result + connectionRetryInterval.hashCode() + result = 31 * result + connectionRetryIntervalMultiplier.hashCode() + result = 31 * result + connectionMaxRetryInterval.hashCode() + result = 31 * result + maxReconnectAttempts + result = 31 * result + maxFileSize + result = 31 * result + deduplicationCacheExpiry.hashCode() + return result + } + + override fun toString(): String { + return "CordaRPCClientConfiguration(" + + "minimumServerProtocolVersion=$minimumServerProtocolVersion, trackRpcCallSites=$trackRpcCallSites, " + + "reapInterval=$reapInterval, observationExecutorPoolSize=$observationExecutorPoolSize, " + + "cacheConcurrencyLevel=$cacheConcurrencyLevel, connectionRetryInterval=$connectionRetryInterval, " + + "connectionRetryIntervalMultiplier=$connectionRetryIntervalMultiplier, " + + "connectionMaxRetryInterval=$connectionMaxRetryInterval, maxReconnectAttempts=$maxReconnectAttempts, " + + "maxFileSize=$maxFileSize, deduplicationCacheExpiry=$deduplicationCacheExpiry)" + } + } /** @@ -105,7 +220,7 @@ interface CordaRPCClientConfiguration { */ class CordaRPCClient private constructor( private val hostAndPort: NetworkHostAndPort, - private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, private val sslConfiguration: ClientRpcSslOptions? = null, private val nodeSslConfiguration: SSLConfiguration? = null, private val classLoader: ClassLoader? = null, @@ -114,7 +229,7 @@ class CordaRPCClient private constructor( ) { @JvmOverloads constructor(hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(hostAndPort, configuration, null) /** @@ -124,13 +239,13 @@ class CordaRPCClient private constructor( * @param configuration An optional configuration used to tweak client behaviour. */ @JvmOverloads - constructor(haAddressPool: List, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool) + constructor(haAddressPool: List, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool) companion object { fun createWithSsl( hostAndPort: NetworkHostAndPort, sslConfiguration: ClientRpcSslOptions, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default() + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaRPCClient { return CordaRPCClient(hostAndPort, configuration, sslConfiguration) } @@ -138,14 +253,14 @@ class CordaRPCClient private constructor( fun createWithSsl( haAddressPool: List, sslConfiguration: ClientRpcSslOptions, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default() + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaRPCClient { return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, haAddressPool = haAddressPool) } internal fun createWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null ): CordaRPCClient { @@ -154,7 +269,7 @@ class CordaRPCClient private constructor( internal fun createWithInternalSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: SSLConfiguration?, classLoader: ClassLoader? = null ): CordaRPCClient { diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt index 927bf21411..7781d0f135 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt @@ -12,14 +12,14 @@ import rx.Observable /** Utility which exposes the internal Corda RPC constructor to other internal Corda components */ fun createCordaRPCClientWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null ) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) fun createCordaRPCClientWithInternalSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: SSLConfiguration? = null, classLoader: ClassLoader? = null ) = CordaRPCClient.createWithInternalSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 6f7786d4c2..8f9607554f 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -23,69 +23,34 @@ import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.api.core.client.ActiveMQClient import java.lang.reflect.Proxy -import java.time.Duration - -/** - * This configuration may be used to tweak the internals of the RPC client. - */ -data class CordaRPCClientConfigurationImpl( - override val minimumServerProtocolVersion: Int, - override val trackRpcCallSites: Boolean, - override val reapInterval: Duration, - override val observationExecutorPoolSize: Int, - override val connectionRetryInterval: Duration, - override val connectionRetryIntervalMultiplier: Double, - override val connectionMaxRetryInterval: Duration, - override val maxReconnectAttempts: Int, - override val maxFileSize: Int, - override val deduplicationCacheExpiry: Duration -) : CordaRPCClientConfiguration { - companion object { - private const val unlimitedReconnectAttempts = -1 - @JvmStatic - val default = CordaRPCClientConfigurationImpl( - minimumServerProtocolVersion = 0, - trackRpcCallSites = false, - reapInterval = 1.seconds, - observationExecutorPoolSize = 4, - connectionRetryInterval = 5.seconds, - connectionRetryIntervalMultiplier = 1.5, - connectionMaxRetryInterval = 3.minutes, - maxReconnectAttempts = unlimitedReconnectAttempts, - /** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */ - maxFileSize = 10485760, - deduplicationCacheExpiry = 1.days - ) - } -} /** * This runs on the client JVM */ class RPCClient( val transport: TransportConfiguration, - val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT, val haPoolTransportConfigurations: List = emptyList() ) { constructor( hostAndPort: NetworkHostAndPort, sslConfiguration: ClientRpcSslOptions? = null, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(rpcConnectorTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext) constructor( hostAndPort: NetworkHostAndPort, sslConfiguration: SSLConfiguration, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(rpcInternalClientTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext) constructor( haAddressPool: List, sslConfiguration: ClientRpcSslOptions? = null, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration), configuration, serializationContext, rpcConnectorTcpTransportsFromList(haAddressPool, sslConfiguration)) diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt index aa542ec49f..014f852262 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt @@ -1,6 +1,5 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.map import net.corda.core.messaging.RPCOps @@ -44,8 +43,8 @@ open class AbstractRPCTest { inline fun RPCDriverDSL.testProxy( ops: I, rpcUser: User = rpcTestUser, - clientConfiguration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default, - serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.default + clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, + serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT ): TestProxy { return when (mode) { RPCTestMode.InVm -> diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt index cc130604be..0b15cc0a5e 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt @@ -1,6 +1,5 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose @@ -90,10 +89,10 @@ class RPCConcurrencyTests : AbstractRPCTest() { private fun RPCDriverDSL.testProxy(): TestProxy { return testProxy( TestOpsImpl(pool), - clientConfiguration = CordaRPCClientConfigurationImpl.default.copy( + clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy( reapInterval = 100.millis ), - serverConfiguration = RPCServerConfiguration.default.copy( + serverConfiguration = RPCServerConfiguration.DEFAULT.copy( rpcThreadPoolSize = 4 ) ) diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt index 7175ea987f..f6e9a8aa83 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt @@ -1,7 +1,6 @@ package net.corda.client.rpc import com.google.common.base.Stopwatch -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.messaging.RPCOps import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds @@ -42,7 +41,7 @@ class RPCPerformanceTests : AbstractRPCTest() { } private fun RPCDriverDSL.testProxy( - clientConfiguration: CordaRPCClientConfigurationImpl, + clientConfiguration: CordaRPCClientConfiguration, serverConfiguration: RPCServerConfiguration ): TestProxy { return testProxy( @@ -55,8 +54,8 @@ class RPCPerformanceTests : AbstractRPCTest() { private fun warmup() { rpcDriver { val proxy = testProxy( - CordaRPCClientConfigurationImpl.default, - RPCServerConfiguration.default + CordaRPCClientConfiguration.DEFAULT, + RPCServerConfiguration.DEFAULT ) val executor = Executors.newFixedThreadPool(4) val N = 10000 @@ -85,10 +84,10 @@ class RPCPerformanceTests : AbstractRPCTest() { measure(inputOutputSizes, (1..5)) { inputOutputSize, _ -> rpcDriver { val proxy = testProxy( - CordaRPCClientConfigurationImpl.default.copy( + CordaRPCClientConfiguration.DEFAULT.copy( observationExecutorPoolSize = 2 ), - RPCServerConfiguration.default.copy( + RPCServerConfiguration.DEFAULT.copy( rpcThreadPoolSize = 8 ) ) @@ -124,10 +123,10 @@ class RPCPerformanceTests : AbstractRPCTest() { rpcDriver { val metricRegistry = startReporter(shutdownManager) val proxy = testProxy( - CordaRPCClientConfigurationImpl.default.copy( + CordaRPCClientConfiguration.DEFAULT.copy( reapInterval = 1.seconds ), - RPCServerConfiguration.default.copy( + RPCServerConfiguration.DEFAULT.copy( rpcThreadPoolSize = 8 ) ) @@ -156,8 +155,8 @@ class RPCPerformanceTests : AbstractRPCTest() { // TODO this hangs with more parallelism rpcDriver { val proxy = testProxy( - CordaRPCClientConfigurationImpl.default, - RPCServerConfiguration.default + CordaRPCClientConfiguration.DEFAULT, + RPCServerConfiguration.DEFAULT ) val numberOfMessages = 1000 val bigSize = 10_000_000 diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt index fc2e3ee552..b6b9672b37 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt @@ -160,9 +160,9 @@ class Node( val user = config.users.first() val address = config.nodeInterface val targetHost = NetworkHostAndPort(address.host, address.rpcPort) - val config = object : CordaRPCClientConfiguration { - override val connectionMaxRetryInterval = 10.seconds - } + val config = CordaRPCClientConfiguration.DEFAULT.copy( + connectionMaxRetryInterval = 10.seconds + ) log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...") CordaRPCClient(targetHost, config).use(user.username, user.password) { log.info("RPC connection to ${targetHost.host}:${targetHost.port} established") diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index e40eb5d243..7d45d4dd20 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -206,7 +206,7 @@ open class Node(configuration: NodeConfiguration, bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize) printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString()) - val rpcServerConfiguration = RPCServerConfiguration.default + val rpcServerConfiguration = RPCServerConfiguration.DEFAULT rpcServerAddresses?.let { internalRpcMessagingClient = InternalRPCMessagingClient(configuration, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal), rpcServerConfiguration) printBasicNodeInfo("RPC connection address", it.primary.toString()) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index b56cd8e5f9..a61c6619f5 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -71,7 +71,7 @@ data class RPCServerConfiguration( val deduplicationCacheExpiry: Duration ) { companion object { - val default = RPCServerConfiguration( + val DEFAULT = RPCServerConfiguration( rpcThreadPoolSize = 4, reapInterval = 1.seconds, deduplicationCacheExpiry = 1.days diff --git a/node/src/test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt b/node/src/test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt index 2937608c64..425d2108eb 100644 --- a/node/src/test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt @@ -105,7 +105,7 @@ class ArtemisRpcTests { } artemisBroker.use { broker -> broker.start() - InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.default).use { server -> + InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.DEFAULT).use { server -> server.start(TestRpcOpsImpl(), securityManager, broker.serverControl) val client = RPCClient(rpcConnectorTcpTransport(broker.addresses.primary, clientSslOptions)) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 4529edbe31..cc32932736 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -1,7 +1,7 @@ package net.corda.testing.node.internal import net.corda.client.mock.Generator -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl +import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.concurrent.CordaFuture @@ -57,7 +57,7 @@ import net.corda.nodeapi.internal.config.User as InternalUser inline fun RPCDriverDSL.startInVmRpcClient( username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ) = startInVmRpcClient(I::class.java, username, password, configuration) inline fun RPCDriverDSL.startRandomRpcClient( @@ -70,14 +70,14 @@ inline fun RPCDriverDSL.startRpcClient( rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ) = startRpcClient(I::class.java, rpcAddress, username, password, configuration) inline fun RPCDriverDSL.startRpcClient( haAddressPool: List, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ) = startRpcClient(I::class.java, haAddressPool, username, password, configuration) data class RpcBrokerHandle( @@ -239,7 +239,7 @@ data class RPCDriverDSL( nodeLegalName: CordaX500Name = fakeNodeLegalName, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, - configuration: RPCServerConfiguration = RPCServerConfiguration.default, + configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I ): CordaFuture { return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker -> @@ -259,7 +259,7 @@ data class RPCDriverDSL( rpcOpsClass: Class, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(inVmClientTransportConfiguration, configuration) @@ -307,7 +307,7 @@ data class RPCDriverDSL( nodeLegalName: CordaX500Name = fakeNodeLegalName, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, - configuration: RPCServerConfiguration = RPCServerConfiguration.default, + configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, customPort: NetworkHostAndPort? = null, ops: I ): CordaFuture { @@ -330,7 +330,7 @@ data class RPCDriverDSL( rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(ArtemisTcpTransport.rpcConnectorTcpTransport(rpcAddress, null), configuration) @@ -356,7 +356,7 @@ data class RPCDriverDSL( haAddressPool: List, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(haAddressPool, null, configuration) @@ -462,7 +462,7 @@ data class RPCDriverDSL( fun startRpcServerWithBrokerRunning( rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, - configuration: RPCServerConfiguration = RPCServerConfiguration.default, + configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I, brokerHandle: RpcBrokerHandle ): RpcServerHandle { diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index 87b4ad4b8d..1afe13d0da 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -91,9 +91,9 @@ object InteractiveShell { fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { rpcOps = { username: String, credentials: String -> val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort, - configuration = object : CordaRPCClientConfiguration { - override val maxReconnectAttempts = 1 - }, + configuration = CordaRPCClientConfiguration.DEFAULT.copy( + maxReconnectAttempts = 1 + ), sslConfiguration = configuration.ssl, classLoader = classLoader) this.connection = client.start(username, credentials) @@ -109,9 +109,9 @@ object InteractiveShell { fun startShellInternal(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { rpcOps = { username: String, credentials: String -> val client = createCordaRPCClientWithInternalSslAndClassLoader(hostAndPort = configuration.hostAndPort, - configuration = object : CordaRPCClientConfiguration { - override val maxReconnectAttempts = 1 - }, + configuration = CordaRPCClientConfiguration.DEFAULT.copy( + maxReconnectAttempts = 1 + ), sslConfiguration = configuration.nodeSslConfig, classLoader = classLoader) this.connection = client.start(username, credentials) @@ -302,7 +302,7 @@ object InteractiveShell { } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) } catch (e: ExecutionException) { - // ignoring it as already logged by the progress handler subscriber + // ignoring it as already logged by the progress handler subscriber } finally { InputStreamDeserializer.closeAll() }