diff --git a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt index 13e0722b91..07cb4f3797 100644 --- a/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt +++ b/client/jfx/src/main/kotlin/net/corda/client/jfx/model/NodeMonitorModel.kt @@ -71,7 +71,7 @@ class NodeMonitorModel { // Only execute using "runLater()" if JavaFX been initialized. // It may not be initialized in the unit test. // Also if we are already in the JavaFX thread - perform direct invocation without postponing it. - if(initialized.value.get() && !Platform.isFxApplicationThread()) { + if (initialized.value.get() && !Platform.isFxApplicationThread()) { Platform.runLater(op) } else { op() @@ -98,7 +98,7 @@ class NodeMonitorModel { // Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity. proxyObservable.addListener { _, _, wrapper -> - if(wrapper != null) { + if (wrapper != null) { val proxy = wrapper.cordaRPCOps // Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL), @@ -134,7 +134,8 @@ class NodeMonitorModel { } val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate -> if (stateMachineUpdate is StateMachineUpdate.Added) { - ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty() + ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) + ?: Observable.empty() } else { Observable.empty() } @@ -183,29 +184,28 @@ class NodeMonitorModel { logger.info("Connecting to: $nodeHostAndPort") val client = CordaRPCClient( nodeHostAndPort, - object : CordaRPCClientConfiguration { - override val connectionMaxRetryInterval = retryInterval - } + CordaRPCClientConfiguration.DEFAULT.copy( + connectionMaxRetryInterval = retryInterval + ) ) val _connection = client.start(username, password) // Check connection is truly operational before returning it. val nodeInfo = _connection.proxy.nodeInfo() require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty()) _connection - } catch(secEx: ActiveMQException) { + } catch (secEx: ActiveMQException) { // Happens when: // * incorrect credentials provided; // * incorrect endpoint specified; // - no point to retry connecting. throw secEx - } - catch(th: Throwable) { + } catch (th: Throwable) { // Deliberately not logging full stack trace as it will be full of internal stacktraces. logger.info("Exception upon establishing connection: " + th.message) null } - if(connection != null) { + if (connection != null) { logger.info("Connection successfully established with: $nodeHostAndPort") return connection } diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index bd0d380cc7..8c00a925ee 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -51,9 +51,9 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C @Before fun setUp() { node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser)) - client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, object : CordaRPCClientConfiguration { - override val maxReconnectAttempts = 5 - }) + client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, CordaRPCClientConfiguration.DEFAULT.copy( + maxReconnectAttempts = 5 + )) identity = node.info.identityFromX500Name(ALICE_NAME) } diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index fd322a291f..69303e5f09 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -1,7 +1,6 @@ package net.corda.client.rpc import net.corda.client.rpc.internal.RPCClient -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.context.Trace import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork @@ -106,7 +105,7 @@ class RPCStabilityTests { Try.on { startRpcClient( server.get().broker.hostAndPort!!, - configuration = CordaRPCClientConfigurationImpl.default.copy(minimumServerProtocolVersion = 1) + configuration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = 1) ).get() } } @@ -129,7 +128,7 @@ class RPCStabilityTests { rpcDriver { fun startAndCloseServer(broker: RpcBrokerHandle) { startRpcServerWithBrokerRunning( - configuration = RPCServerConfiguration.default, + configuration = RPCServerConfiguration.DEFAULT, ops = DummyOps, brokerHandle = broker ).rpcServer.close() @@ -150,7 +149,7 @@ class RPCStabilityTests { @Test fun `rpc client close doesnt leak broker resources`() { rpcDriver { - val server = startRpcServer(configuration = RPCServerConfiguration.default, ops = DummyOps).get() + val server = startRpcServer(configuration = RPCServerConfiguration.DEFAULT, ops = DummyOps).get() RPCClient(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close() val initial = server.broker.getStats() repeat(100) { @@ -241,7 +240,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() // Set retry interval to 1s to reduce test duration - val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds) + val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -266,7 +265,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() // Set retry interval to 1s to reduce test duration - val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) + val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -298,7 +297,7 @@ class RPCStabilityTests { val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() - val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) + val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1) val clientFollower = shutdownManager.follower() val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() clientFollower.unfollow() @@ -453,7 +452,7 @@ class RPCStabilityTests { } } val server = startRpcServer( - configuration = RPCServerConfiguration.default.copy( + configuration = RPCServerConfiguration.DEFAULT.copy( reapInterval = 100.millis ), ops = trackSubscriberOpsImpl diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt index 7e03a0eda0..4892a4e94b 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/CordaRPCClient.kt @@ -1,6 +1,5 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.context.Actor @@ -10,6 +9,9 @@ import net.corda.core.serialization.internal.effectiveSerializationEnv import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport import net.corda.core.messaging.ClientRpcSslOptions +import net.corda.core.utilities.days +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT import java.time.Duration @@ -24,46 +26,159 @@ class CordaRPCConnection internal constructor(connection: RPCConnection + */ + open val maxReconnectAttempts: Int = unlimitedReconnectAttempts, + + /** + * Maximum file size, in bytes. + */ + open val maxFileSize: Int = 10485760, + // 10 MiB maximum allowed file size for attachments, including message headers. + // TODO: acquire this value from Network Map when supported. + + /** + * The cache expiry of a deduplication watermark per client. + */ + open val deduplicationCacheExpiry: Duration = 1.days + +) { companion object { - fun default(): CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default + + private const val unlimitedReconnectAttempts = -1 + + @JvmField + val DEFAULT: CordaRPCClientConfiguration = CordaRPCClientConfiguration() + } + + /** + * Create a new copy of a configuration object with zero or more parameters modified. + */ + @JvmOverloads + fun copy( + minimumServerProtocolVersion: Int = this.minimumServerProtocolVersion, + trackRpcCallSites: Boolean = this.trackRpcCallSites, + reapInterval: Duration = this.reapInterval, + observationExecutorPoolSize: Int = this.observationExecutorPoolSize, + cacheConcurrencyLevel: Int = this.cacheConcurrencyLevel, + connectionRetryInterval: Duration = this.connectionRetryInterval, + connectionRetryIntervalMultiplier: Double = this.connectionRetryIntervalMultiplier, + connectionMaxRetryInterval: Duration = this.connectionMaxRetryInterval, + maxReconnectAttempts: Int = this.maxReconnectAttempts, + maxFileSize: Int = this.maxFileSize, + deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry + ): CordaRPCClientConfiguration { + return CordaRPCClientConfiguration( + minimumServerProtocolVersion, + trackRpcCallSites, + reapInterval, + observationExecutorPoolSize, + cacheConcurrencyLevel, + connectionRetryInterval, + connectionRetryIntervalMultiplier, + connectionMaxRetryInterval, + maxReconnectAttempts, + maxFileSize, + deduplicationCacheExpiry + ) + } + + override fun equals(other: Any?): Boolean { + if (this === other) return true + if (javaClass != other?.javaClass) return false + + other as CordaRPCClientConfiguration + if (minimumServerProtocolVersion != other.minimumServerProtocolVersion) return false + if (trackRpcCallSites != other.trackRpcCallSites) return false + if (reapInterval != other.reapInterval) return false + if (observationExecutorPoolSize != other.observationExecutorPoolSize) return false + if (cacheConcurrencyLevel != other.cacheConcurrencyLevel) return false + if (connectionRetryInterval != other.connectionRetryInterval) return false + if (connectionRetryIntervalMultiplier != other.connectionRetryIntervalMultiplier) return false + if (connectionMaxRetryInterval != other.connectionMaxRetryInterval) return false + if (maxReconnectAttempts != other.maxReconnectAttempts) return false + if (maxFileSize != other.maxFileSize) return false + if (deduplicationCacheExpiry != other.deduplicationCacheExpiry) return false + + return true + } + + override fun hashCode(): Int { + var result = minimumServerProtocolVersion + result = 31 * result + trackRpcCallSites.hashCode() + result = 31 * result + reapInterval.hashCode() + result = 31 * result + observationExecutorPoolSize + result = 31 * result + cacheConcurrencyLevel + result = 31 * result + connectionRetryInterval.hashCode() + result = 31 * result + connectionRetryIntervalMultiplier.hashCode() + result = 31 * result + connectionMaxRetryInterval.hashCode() + result = 31 * result + maxReconnectAttempts + result = 31 * result + maxFileSize + result = 31 * result + deduplicationCacheExpiry.hashCode() + return result + } + + override fun toString(): String { + return "CordaRPCClientConfiguration(" + + "minimumServerProtocolVersion=$minimumServerProtocolVersion, trackRpcCallSites=$trackRpcCallSites, " + + "reapInterval=$reapInterval, observationExecutorPoolSize=$observationExecutorPoolSize, " + + "cacheConcurrencyLevel=$cacheConcurrencyLevel, connectionRetryInterval=$connectionRetryInterval, " + + "connectionRetryIntervalMultiplier=$connectionRetryIntervalMultiplier, " + + "connectionMaxRetryInterval=$connectionMaxRetryInterval, maxReconnectAttempts=$maxReconnectAttempts, " + + "maxFileSize=$maxFileSize, deduplicationCacheExpiry=$deduplicationCacheExpiry)" + } + } /** @@ -105,7 +220,7 @@ interface CordaRPCClientConfiguration { */ class CordaRPCClient private constructor( private val hostAndPort: NetworkHostAndPort, - private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, private val sslConfiguration: ClientRpcSslOptions? = null, private val nodeSslConfiguration: SSLConfiguration? = null, private val classLoader: ClassLoader? = null, @@ -114,7 +229,7 @@ class CordaRPCClient private constructor( ) { @JvmOverloads constructor(hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(hostAndPort, configuration, null) /** @@ -124,13 +239,13 @@ class CordaRPCClient private constructor( * @param configuration An optional configuration used to tweak client behaviour. */ @JvmOverloads - constructor(haAddressPool: List, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool) + constructor(haAddressPool: List, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool) companion object { fun createWithSsl( hostAndPort: NetworkHostAndPort, sslConfiguration: ClientRpcSslOptions, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default() + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaRPCClient { return CordaRPCClient(hostAndPort, configuration, sslConfiguration) } @@ -138,14 +253,14 @@ class CordaRPCClient private constructor( fun createWithSsl( haAddressPool: List, sslConfiguration: ClientRpcSslOptions, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default() + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaRPCClient { return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, haAddressPool = haAddressPool) } internal fun createWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null ): CordaRPCClient { @@ -154,7 +269,7 @@ class CordaRPCClient private constructor( internal fun createWithInternalSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: SSLConfiguration?, classLoader: ClassLoader? = null ): CordaRPCClient { diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt index 927bf21411..7781d0f135 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/CordaRPCClientUtils.kt @@ -12,14 +12,14 @@ import rx.Observable /** Utility which exposes the internal Corda RPC constructor to other internal Corda components */ fun createCordaRPCClientWithSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: ClientRpcSslOptions? = null, classLoader: ClassLoader? = null ) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) fun createCordaRPCClientWithInternalSslAndClassLoader( hostAndPort: NetworkHostAndPort, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(), + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, sslConfiguration: SSLConfiguration? = null, classLoader: ClassLoader? = null ) = CordaRPCClient.createWithInternalSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 6f7786d4c2..8f9607554f 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -23,69 +23,34 @@ import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.api.core.client.ActiveMQClient import java.lang.reflect.Proxy -import java.time.Duration - -/** - * This configuration may be used to tweak the internals of the RPC client. - */ -data class CordaRPCClientConfigurationImpl( - override val minimumServerProtocolVersion: Int, - override val trackRpcCallSites: Boolean, - override val reapInterval: Duration, - override val observationExecutorPoolSize: Int, - override val connectionRetryInterval: Duration, - override val connectionRetryIntervalMultiplier: Double, - override val connectionMaxRetryInterval: Duration, - override val maxReconnectAttempts: Int, - override val maxFileSize: Int, - override val deduplicationCacheExpiry: Duration -) : CordaRPCClientConfiguration { - companion object { - private const val unlimitedReconnectAttempts = -1 - @JvmStatic - val default = CordaRPCClientConfigurationImpl( - minimumServerProtocolVersion = 0, - trackRpcCallSites = false, - reapInterval = 1.seconds, - observationExecutorPoolSize = 4, - connectionRetryInterval = 5.seconds, - connectionRetryIntervalMultiplier = 1.5, - connectionMaxRetryInterval = 3.minutes, - maxReconnectAttempts = unlimitedReconnectAttempts, - /** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */ - maxFileSize = 10485760, - deduplicationCacheExpiry = 1.days - ) - } -} /** * This runs on the client JVM */ class RPCClient( val transport: TransportConfiguration, - val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT, val haPoolTransportConfigurations: List = emptyList() ) { constructor( hostAndPort: NetworkHostAndPort, sslConfiguration: ClientRpcSslOptions? = null, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(rpcConnectorTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext) constructor( hostAndPort: NetworkHostAndPort, sslConfiguration: SSLConfiguration, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(rpcInternalClientTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext) constructor( haAddressPool: List, sslConfiguration: ClientRpcSslOptions? = null, - configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default, + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT ) : this(rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration), configuration, serializationContext, rpcConnectorTcpTransportsFromList(haAddressPool, sslConfiguration)) diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt index aa542ec49f..014f852262 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt @@ -1,6 +1,5 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.map import net.corda.core.messaging.RPCOps @@ -44,8 +43,8 @@ open class AbstractRPCTest { inline fun RPCDriverDSL.testProxy( ops: I, rpcUser: User = rpcTestUser, - clientConfiguration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default, - serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.default + clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT, + serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT ): TestProxy { return when (mode) { RPCTestMode.InVm -> diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt index cc130604be..0b15cc0a5e 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt @@ -1,6 +1,5 @@ package net.corda.client.rpc -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose @@ -90,10 +89,10 @@ class RPCConcurrencyTests : AbstractRPCTest() { private fun RPCDriverDSL.testProxy(): TestProxy { return testProxy( TestOpsImpl(pool), - clientConfiguration = CordaRPCClientConfigurationImpl.default.copy( + clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy( reapInterval = 100.millis ), - serverConfiguration = RPCServerConfiguration.default.copy( + serverConfiguration = RPCServerConfiguration.DEFAULT.copy( rpcThreadPoolSize = 4 ) ) diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt index 7175ea987f..f6e9a8aa83 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt @@ -1,7 +1,6 @@ package net.corda.client.rpc import com.google.common.base.Stopwatch -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl import net.corda.core.messaging.RPCOps import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds @@ -42,7 +41,7 @@ class RPCPerformanceTests : AbstractRPCTest() { } private fun RPCDriverDSL.testProxy( - clientConfiguration: CordaRPCClientConfigurationImpl, + clientConfiguration: CordaRPCClientConfiguration, serverConfiguration: RPCServerConfiguration ): TestProxy { return testProxy( @@ -55,8 +54,8 @@ class RPCPerformanceTests : AbstractRPCTest() { private fun warmup() { rpcDriver { val proxy = testProxy( - CordaRPCClientConfigurationImpl.default, - RPCServerConfiguration.default + CordaRPCClientConfiguration.DEFAULT, + RPCServerConfiguration.DEFAULT ) val executor = Executors.newFixedThreadPool(4) val N = 10000 @@ -85,10 +84,10 @@ class RPCPerformanceTests : AbstractRPCTest() { measure(inputOutputSizes, (1..5)) { inputOutputSize, _ -> rpcDriver { val proxy = testProxy( - CordaRPCClientConfigurationImpl.default.copy( + CordaRPCClientConfiguration.DEFAULT.copy( observationExecutorPoolSize = 2 ), - RPCServerConfiguration.default.copy( + RPCServerConfiguration.DEFAULT.copy( rpcThreadPoolSize = 8 ) ) @@ -124,10 +123,10 @@ class RPCPerformanceTests : AbstractRPCTest() { rpcDriver { val metricRegistry = startReporter(shutdownManager) val proxy = testProxy( - CordaRPCClientConfigurationImpl.default.copy( + CordaRPCClientConfiguration.DEFAULT.copy( reapInterval = 1.seconds ), - RPCServerConfiguration.default.copy( + RPCServerConfiguration.DEFAULT.copy( rpcThreadPoolSize = 8 ) ) @@ -156,8 +155,8 @@ class RPCPerformanceTests : AbstractRPCTest() { // TODO this hangs with more parallelism rpcDriver { val proxy = testProxy( - CordaRPCClientConfigurationImpl.default, - RPCServerConfiguration.default + CordaRPCClientConfiguration.DEFAULT, + RPCServerConfiguration.DEFAULT ) val numberOfMessages = 1000 val bigSize = 10_000_000 diff --git a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt index fc2e3ee552..b6b9672b37 100644 --- a/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt +++ b/experimental/behave/src/main/kotlin/net/corda/behave/node/Node.kt @@ -160,9 +160,9 @@ class Node( val user = config.users.first() val address = config.nodeInterface val targetHost = NetworkHostAndPort(address.host, address.rpcPort) - val config = object : CordaRPCClientConfiguration { - override val connectionMaxRetryInterval = 10.seconds - } + val config = CordaRPCClientConfiguration.DEFAULT.copy( + connectionMaxRetryInterval = 10.seconds + ) log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...") CordaRPCClient(targetHost, config).use(user.username, user.password) { log.info("RPC connection to ${targetHost.host}:${targetHost.port} established") diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index e40eb5d243..7d45d4dd20 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -206,7 +206,7 @@ open class Node(configuration: NodeConfiguration, bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize) printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString()) - val rpcServerConfiguration = RPCServerConfiguration.default + val rpcServerConfiguration = RPCServerConfiguration.DEFAULT rpcServerAddresses?.let { internalRpcMessagingClient = InternalRPCMessagingClient(configuration, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal), rpcServerConfiguration) printBasicNodeInfo("RPC connection address", it.primary.toString()) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index b56cd8e5f9..a61c6619f5 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -71,7 +71,7 @@ data class RPCServerConfiguration( val deduplicationCacheExpiry: Duration ) { companion object { - val default = RPCServerConfiguration( + val DEFAULT = RPCServerConfiguration( rpcThreadPoolSize = 4, reapInterval = 1.seconds, deduplicationCacheExpiry = 1.days diff --git a/node/src/test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt b/node/src/test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt index 2937608c64..425d2108eb 100644 --- a/node/src/test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt @@ -105,7 +105,7 @@ class ArtemisRpcTests { } artemisBroker.use { broker -> broker.start() - InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.default).use { server -> + InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.DEFAULT).use { server -> server.start(TestRpcOpsImpl(), securityManager, broker.serverControl) val client = RPCClient(rpcConnectorTcpTransport(broker.addresses.primary, clientSslOptions)) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 4529edbe31..cc32932736 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -1,7 +1,7 @@ package net.corda.testing.node.internal import net.corda.client.mock.Generator -import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl +import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.internal.RPCClient import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme import net.corda.core.concurrent.CordaFuture @@ -57,7 +57,7 @@ import net.corda.nodeapi.internal.config.User as InternalUser inline fun RPCDriverDSL.startInVmRpcClient( username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ) = startInVmRpcClient(I::class.java, username, password, configuration) inline fun RPCDriverDSL.startRandomRpcClient( @@ -70,14 +70,14 @@ inline fun RPCDriverDSL.startRpcClient( rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ) = startRpcClient(I::class.java, rpcAddress, username, password, configuration) inline fun RPCDriverDSL.startRpcClient( haAddressPool: List, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ) = startRpcClient(I::class.java, haAddressPool, username, password, configuration) data class RpcBrokerHandle( @@ -239,7 +239,7 @@ data class RPCDriverDSL( nodeLegalName: CordaX500Name = fakeNodeLegalName, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, - configuration: RPCServerConfiguration = RPCServerConfiguration.default, + configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I ): CordaFuture { return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker -> @@ -259,7 +259,7 @@ data class RPCDriverDSL( rpcOpsClass: Class, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(inVmClientTransportConfiguration, configuration) @@ -307,7 +307,7 @@ data class RPCDriverDSL( nodeLegalName: CordaX500Name = fakeNodeLegalName, maxFileSize: Int = MAX_MESSAGE_SIZE, maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE, - configuration: RPCServerConfiguration = RPCServerConfiguration.default, + configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, customPort: NetworkHostAndPort? = null, ops: I ): CordaFuture { @@ -330,7 +330,7 @@ data class RPCDriverDSL( rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(ArtemisTcpTransport.rpcConnectorTcpTransport(rpcAddress, null), configuration) @@ -356,7 +356,7 @@ data class RPCDriverDSL( haAddressPool: List, username: String = rpcTestUser.username, password: String = rpcTestUser.password, - configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default + configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(haAddressPool, null, configuration) @@ -462,7 +462,7 @@ data class RPCDriverDSL( fun startRpcServerWithBrokerRunning( rpcUser: User = rpcTestUser, nodeLegalName: CordaX500Name = fakeNodeLegalName, - configuration: RPCServerConfiguration = RPCServerConfiguration.default, + configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT, ops: I, brokerHandle: RpcBrokerHandle ): RpcServerHandle { diff --git a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt index 87b4ad4b8d..1afe13d0da 100644 --- a/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt +++ b/tools/shell/src/main/kotlin/net/corda/tools/shell/InteractiveShell.kt @@ -91,9 +91,9 @@ object InteractiveShell { fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { rpcOps = { username: String, credentials: String -> val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort, - configuration = object : CordaRPCClientConfiguration { - override val maxReconnectAttempts = 1 - }, + configuration = CordaRPCClientConfiguration.DEFAULT.copy( + maxReconnectAttempts = 1 + ), sslConfiguration = configuration.ssl, classLoader = classLoader) this.connection = client.start(username, credentials) @@ -109,9 +109,9 @@ object InteractiveShell { fun startShellInternal(configuration: ShellConfiguration, classLoader: ClassLoader? = null) { rpcOps = { username: String, credentials: String -> val client = createCordaRPCClientWithInternalSslAndClassLoader(hostAndPort = configuration.hostAndPort, - configuration = object : CordaRPCClientConfiguration { - override val maxReconnectAttempts = 1 - }, + configuration = CordaRPCClientConfiguration.DEFAULT.copy( + maxReconnectAttempts = 1 + ), sslConfiguration = configuration.nodeSslConfig, classLoader = classLoader) this.connection = client.start(username, credentials) @@ -302,7 +302,7 @@ object InteractiveShell { } catch (e: PermissionException) { output.println(e.message ?: "Access denied", Color.red) } catch (e: ExecutionException) { - // ignoring it as already logged by the progress handler subscriber + // ignoring it as already logged by the progress handler subscriber } finally { InputStreamDeserializer.closeAll() }