mirror of
https://github.com/corda/corda.git
synced 2025-06-16 22:28:15 +00:00
CORDA-1609 - Don't use reserved keyword as method name
As reported in [CORDA-1609](https://r3-cev.atlassian.net/browse/CORDA-1609),
`CordaRPCClientConfiguration.default` is not accessible from Java since
`default` is a reserved keyword.
As part of the refactor made in #2831, `CordaRPCClientConfiguration` went
from being a data class to an interface with a backing implementation of
type `CordaRPCClientConfigurationImpl`.
This resulted in Java users having to rewrite code that was on the form:
```java
final CordaRPCClient client = new CordaRPCClient(
nodeAddress, CordaRPCClientConfiguration.DEFAULT
);
```
to something like this:
```java
final CordaRPCClient client = new CordaRPCClient(
nodeAddress, CordaRPCClientConfiguration.Companion.default()
);
```
However, this does not work. The user would get a compilation error because
`default` is a reserved keyword in Java.
Since `CordaRPCClientConfiguration` has been made an interface, there is no
easy way of introducing a static final field on the interface from Kotlin.
Consequently, I've changed this back to using a `class` with a static field
named `DEFAULT` instead of the static method `default()`.
It should be noted that `default()` / `DEFAULT` is currently only used
internally to pass in default values in `CordaRPCClient.kt` and
`CordaRPCClientUtils.kt`. That said, it is exposed as part of our API
surface and consequently shouldn't be broken.
The latter means that in the above example, the user would actually not
have to provide the parameter at all:
```java
final CordaRPCClient client = new CordaRPCClient(nodeAddress);
```
As can be seen from the definition of `CordaRPCClient`:
```kotlin
class CordaRPCClient private constructor(...) {
@JvmOverloads
constructor(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
) : this(hostAndPort, configuration, null)
```
The mentioned [refactor](7a077e76f0 (diff-0948c125db93a22263eb81eaf3161c17R65)
)
did not make it into the 3.1 release, so from an API-stability perspective,
this change can be applied without affecting our commitment to a
backwards compatible API..
This commit is contained in:
@ -71,7 +71,7 @@ class NodeMonitorModel {
|
|||||||
// Only execute using "runLater()" if JavaFX been initialized.
|
// Only execute using "runLater()" if JavaFX been initialized.
|
||||||
// It may not be initialized in the unit test.
|
// It may not be initialized in the unit test.
|
||||||
// Also if we are already in the JavaFX thread - perform direct invocation without postponing it.
|
// Also if we are already in the JavaFX thread - perform direct invocation without postponing it.
|
||||||
if(initialized.value.get() && !Platform.isFxApplicationThread()) {
|
if (initialized.value.get() && !Platform.isFxApplicationThread()) {
|
||||||
Platform.runLater(op)
|
Platform.runLater(op)
|
||||||
} else {
|
} else {
|
||||||
op()
|
op()
|
||||||
@ -98,7 +98,7 @@ class NodeMonitorModel {
|
|||||||
|
|
||||||
// Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity.
|
// Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity.
|
||||||
proxyObservable.addListener { _, _, wrapper ->
|
proxyObservable.addListener { _, _, wrapper ->
|
||||||
if(wrapper != null) {
|
if (wrapper != null) {
|
||||||
val proxy = wrapper.cordaRPCOps
|
val proxy = wrapper.cordaRPCOps
|
||||||
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
|
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
|
||||||
val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
|
val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
|
||||||
@ -134,7 +134,8 @@ class NodeMonitorModel {
|
|||||||
}
|
}
|
||||||
val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate ->
|
val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate ->
|
||||||
if (stateMachineUpdate is StateMachineUpdate.Added) {
|
if (stateMachineUpdate is StateMachineUpdate.Added) {
|
||||||
ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty<ProgressTrackingEvent>()
|
ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo)
|
||||||
|
?: Observable.empty<ProgressTrackingEvent>()
|
||||||
} else {
|
} else {
|
||||||
Observable.empty<ProgressTrackingEvent>()
|
Observable.empty<ProgressTrackingEvent>()
|
||||||
}
|
}
|
||||||
@ -183,29 +184,28 @@ class NodeMonitorModel {
|
|||||||
logger.info("Connecting to: $nodeHostAndPort")
|
logger.info("Connecting to: $nodeHostAndPort")
|
||||||
val client = CordaRPCClient(
|
val client = CordaRPCClient(
|
||||||
nodeHostAndPort,
|
nodeHostAndPort,
|
||||||
object : CordaRPCClientConfiguration {
|
CordaRPCClientConfiguration.DEFAULT.copy(
|
||||||
override val connectionMaxRetryInterval = retryInterval
|
connectionMaxRetryInterval = retryInterval
|
||||||
}
|
)
|
||||||
)
|
)
|
||||||
val _connection = client.start(username, password)
|
val _connection = client.start(username, password)
|
||||||
// Check connection is truly operational before returning it.
|
// Check connection is truly operational before returning it.
|
||||||
val nodeInfo = _connection.proxy.nodeInfo()
|
val nodeInfo = _connection.proxy.nodeInfo()
|
||||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||||
_connection
|
_connection
|
||||||
} catch(secEx: ActiveMQException) {
|
} catch (secEx: ActiveMQException) {
|
||||||
// Happens when:
|
// Happens when:
|
||||||
// * incorrect credentials provided;
|
// * incorrect credentials provided;
|
||||||
// * incorrect endpoint specified;
|
// * incorrect endpoint specified;
|
||||||
// - no point to retry connecting.
|
// - no point to retry connecting.
|
||||||
throw secEx
|
throw secEx
|
||||||
}
|
} catch (th: Throwable) {
|
||||||
catch(th: Throwable) {
|
|
||||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||||
logger.info("Exception upon establishing connection: " + th.message)
|
logger.info("Exception upon establishing connection: " + th.message)
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
|
|
||||||
if(connection != null) {
|
if (connection != null) {
|
||||||
logger.info("Connection successfully established with: $nodeHostAndPort")
|
logger.info("Connection successfully established with: $nodeHostAndPort")
|
||||||
return connection
|
return connection
|
||||||
}
|
}
|
||||||
|
@ -51,9 +51,9 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
|
|||||||
@Before
|
@Before
|
||||||
fun setUp() {
|
fun setUp() {
|
||||||
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
|
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
|
||||||
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, object : CordaRPCClientConfiguration {
|
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, CordaRPCClientConfiguration.DEFAULT.copy(
|
||||||
override val maxReconnectAttempts = 5
|
maxReconnectAttempts = 5
|
||||||
})
|
))
|
||||||
identity = node.info.identityFromX500Name(ALICE_NAME)
|
identity = node.info.identityFromX500Name(ALICE_NAME)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
|
||||||
import net.corda.core.context.Trace
|
import net.corda.core.context.Trace
|
||||||
import net.corda.core.crypto.random63BitValue
|
import net.corda.core.crypto.random63BitValue
|
||||||
import net.corda.core.internal.concurrent.fork
|
import net.corda.core.internal.concurrent.fork
|
||||||
@ -106,7 +105,7 @@ class RPCStabilityTests {
|
|||||||
Try.on {
|
Try.on {
|
||||||
startRpcClient<RPCOps>(
|
startRpcClient<RPCOps>(
|
||||||
server.get().broker.hostAndPort!!,
|
server.get().broker.hostAndPort!!,
|
||||||
configuration = CordaRPCClientConfigurationImpl.default.copy(minimumServerProtocolVersion = 1)
|
configuration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = 1)
|
||||||
).get()
|
).get()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -129,7 +128,7 @@ class RPCStabilityTests {
|
|||||||
rpcDriver {
|
rpcDriver {
|
||||||
fun startAndCloseServer(broker: RpcBrokerHandle) {
|
fun startAndCloseServer(broker: RpcBrokerHandle) {
|
||||||
startRpcServerWithBrokerRunning(
|
startRpcServerWithBrokerRunning(
|
||||||
configuration = RPCServerConfiguration.default,
|
configuration = RPCServerConfiguration.DEFAULT,
|
||||||
ops = DummyOps,
|
ops = DummyOps,
|
||||||
brokerHandle = broker
|
brokerHandle = broker
|
||||||
).rpcServer.close()
|
).rpcServer.close()
|
||||||
@ -150,7 +149,7 @@ class RPCStabilityTests {
|
|||||||
@Test
|
@Test
|
||||||
fun `rpc client close doesnt leak broker resources`() {
|
fun `rpc client close doesnt leak broker resources`() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val server = startRpcServer(configuration = RPCServerConfiguration.default, ops = DummyOps).get()
|
val server = startRpcServer(configuration = RPCServerConfiguration.DEFAULT, ops = DummyOps).get()
|
||||||
RPCClient<RPCOps>(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close()
|
RPCClient<RPCOps>(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close()
|
||||||
val initial = server.broker.getStats()
|
val initial = server.broker.getStats()
|
||||||
repeat(100) {
|
repeat(100) {
|
||||||
@ -241,7 +240,7 @@ class RPCStabilityTests {
|
|||||||
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
||||||
serverFollower.unfollow()
|
serverFollower.unfollow()
|
||||||
// Set retry interval to 1s to reduce test duration
|
// Set retry interval to 1s to reduce test duration
|
||||||
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds)
|
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds)
|
||||||
val clientFollower = shutdownManager.follower()
|
val clientFollower = shutdownManager.follower()
|
||||||
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
||||||
clientFollower.unfollow()
|
clientFollower.unfollow()
|
||||||
@ -266,7 +265,7 @@ class RPCStabilityTests {
|
|||||||
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
||||||
serverFollower.unfollow()
|
serverFollower.unfollow()
|
||||||
// Set retry interval to 1s to reduce test duration
|
// Set retry interval to 1s to reduce test duration
|
||||||
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
|
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
|
||||||
val clientFollower = shutdownManager.follower()
|
val clientFollower = shutdownManager.follower()
|
||||||
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
||||||
clientFollower.unfollow()
|
clientFollower.unfollow()
|
||||||
@ -298,7 +297,7 @@ class RPCStabilityTests {
|
|||||||
val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
||||||
serverFollower.unfollow()
|
serverFollower.unfollow()
|
||||||
|
|
||||||
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
|
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
|
||||||
val clientFollower = shutdownManager.follower()
|
val clientFollower = shutdownManager.follower()
|
||||||
val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
||||||
clientFollower.unfollow()
|
clientFollower.unfollow()
|
||||||
@ -453,7 +452,7 @@ class RPCStabilityTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
val server = startRpcServer<TrackSubscriberOps>(
|
val server = startRpcServer<TrackSubscriberOps>(
|
||||||
configuration = RPCServerConfiguration.default.copy(
|
configuration = RPCServerConfiguration.DEFAULT.copy(
|
||||||
reapInterval = 100.millis
|
reapInterval = 100.millis
|
||||||
),
|
),
|
||||||
ops = trackSubscriberOpsImpl
|
ops = trackSubscriberOpsImpl
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
|
||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||||
import net.corda.core.context.Actor
|
import net.corda.core.context.Actor
|
||||||
@ -10,6 +9,9 @@ import net.corda.core.serialization.internal.effectiveSerializationEnv
|
|||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
|
||||||
import net.corda.core.messaging.ClientRpcSslOptions
|
import net.corda.core.messaging.ClientRpcSslOptions
|
||||||
|
import net.corda.core.utilities.days
|
||||||
|
import net.corda.core.utilities.minutes
|
||||||
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||||
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
|
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
@ -24,46 +26,159 @@ class CordaRPCConnection internal constructor(connection: RPCConnection<CordaRPC
|
|||||||
/**
|
/**
|
||||||
* Can be used to configure the RPC client connection.
|
* Can be used to configure the RPC client connection.
|
||||||
*/
|
*/
|
||||||
interface CordaRPCClientConfiguration {
|
open class CordaRPCClientConfiguration @JvmOverloads constructor(
|
||||||
|
|
||||||
/** The minimum protocol version required from the server */
|
/**
|
||||||
val minimumServerProtocolVersion: Int get() = default().minimumServerProtocolVersion
|
* The minimum protocol version required from the server.
|
||||||
/**
|
*/
|
||||||
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
|
open val minimumServerProtocolVersion: Int = 0,
|
||||||
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
|
|
||||||
* constructing call stacks is a moderately expensive operation.
|
/**
|
||||||
*/
|
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
|
||||||
val trackRpcCallSites: Boolean get() = default().trackRpcCallSites
|
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
|
||||||
/**
|
* constructing call stacks is a moderately expensive operation.
|
||||||
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
|
*/
|
||||||
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
|
open val trackRpcCallSites: Boolean = false,
|
||||||
* duration. If set too low it wastes client side cycles.
|
|
||||||
*/
|
/**
|
||||||
val reapInterval: Duration get() = default().reapInterval
|
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
|
||||||
/** The number of threads to use for observations (for executing [Observable.onNext]) */
|
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
|
||||||
val observationExecutorPoolSize: Int get() = default().observationExecutorPoolSize
|
* duration. If set too low it wastes client side cycles.
|
||||||
/**
|
*/
|
||||||
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
|
open val reapInterval: Duration = 1.seconds,
|
||||||
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
|
|
||||||
* See the implementation of [com.google.common.cache.LocalCache] for details.
|
/**
|
||||||
*/
|
* The number of threads to use for observations (for executing [Observable.onNext]).
|
||||||
val cacheConcurrencyLevel: Int get() = default().cacheConcurrencyLevel
|
*/
|
||||||
/** The retry interval of artemis connections in milliseconds */
|
open val observationExecutorPoolSize: Int = 4,
|
||||||
val connectionRetryInterval: Duration get() = default().connectionRetryInterval
|
|
||||||
/** The retry interval multiplier for exponential backoff */
|
/**
|
||||||
val connectionRetryIntervalMultiplier: Double get() = default().connectionRetryIntervalMultiplier
|
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
|
||||||
/** Maximum retry interval */
|
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
|
||||||
val connectionMaxRetryInterval: Duration get() = default().connectionMaxRetryInterval
|
* See the implementation of [com.google.common.cache.LocalCache] for details.
|
||||||
/** Maximum reconnect attempts on failover */
|
*/
|
||||||
val maxReconnectAttempts: Int get() = default().maxReconnectAttempts
|
open val cacheConcurrencyLevel: Int = 1,
|
||||||
/** Maximum file size */
|
|
||||||
val maxFileSize: Int get() = default().maxFileSize
|
/**
|
||||||
/** The cache expiry of a deduplication watermark per client. */
|
* The retry interval of Artemis connections in milliseconds.
|
||||||
val deduplicationCacheExpiry: Duration get() = default().deduplicationCacheExpiry
|
*/
|
||||||
|
open val connectionRetryInterval: Duration = 5.seconds,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The retry interval multiplier for exponential backoff.
|
||||||
|
*/
|
||||||
|
open val connectionRetryIntervalMultiplier: Double = 1.5,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum retry interval.
|
||||||
|
*/
|
||||||
|
open val connectionMaxRetryInterval: Duration = 3.minutes,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum reconnect attempts on failover>
|
||||||
|
*/
|
||||||
|
open val maxReconnectAttempts: Int = unlimitedReconnectAttempts,
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maximum file size, in bytes.
|
||||||
|
*/
|
||||||
|
open val maxFileSize: Int = 10485760,
|
||||||
|
// 10 MiB maximum allowed file size for attachments, including message headers.
|
||||||
|
// TODO: acquire this value from Network Map when supported.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The cache expiry of a deduplication watermark per client.
|
||||||
|
*/
|
||||||
|
open val deduplicationCacheExpiry: Duration = 1.days
|
||||||
|
|
||||||
|
) {
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
fun default(): CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default
|
|
||||||
|
private const val unlimitedReconnectAttempts = -1
|
||||||
|
|
||||||
|
@JvmField
|
||||||
|
val DEFAULT: CordaRPCClientConfiguration = CordaRPCClientConfiguration()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new copy of a configuration object with zero or more parameters modified.
|
||||||
|
*/
|
||||||
|
@JvmOverloads
|
||||||
|
fun copy(
|
||||||
|
minimumServerProtocolVersion: Int = this.minimumServerProtocolVersion,
|
||||||
|
trackRpcCallSites: Boolean = this.trackRpcCallSites,
|
||||||
|
reapInterval: Duration = this.reapInterval,
|
||||||
|
observationExecutorPoolSize: Int = this.observationExecutorPoolSize,
|
||||||
|
cacheConcurrencyLevel: Int = this.cacheConcurrencyLevel,
|
||||||
|
connectionRetryInterval: Duration = this.connectionRetryInterval,
|
||||||
|
connectionRetryIntervalMultiplier: Double = this.connectionRetryIntervalMultiplier,
|
||||||
|
connectionMaxRetryInterval: Duration = this.connectionMaxRetryInterval,
|
||||||
|
maxReconnectAttempts: Int = this.maxReconnectAttempts,
|
||||||
|
maxFileSize: Int = this.maxFileSize,
|
||||||
|
deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry
|
||||||
|
): CordaRPCClientConfiguration {
|
||||||
|
return CordaRPCClientConfiguration(
|
||||||
|
minimumServerProtocolVersion,
|
||||||
|
trackRpcCallSites,
|
||||||
|
reapInterval,
|
||||||
|
observationExecutorPoolSize,
|
||||||
|
cacheConcurrencyLevel,
|
||||||
|
connectionRetryInterval,
|
||||||
|
connectionRetryIntervalMultiplier,
|
||||||
|
connectionMaxRetryInterval,
|
||||||
|
maxReconnectAttempts,
|
||||||
|
maxFileSize,
|
||||||
|
deduplicationCacheExpiry
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun equals(other: Any?): Boolean {
|
||||||
|
if (this === other) return true
|
||||||
|
if (javaClass != other?.javaClass) return false
|
||||||
|
|
||||||
|
other as CordaRPCClientConfiguration
|
||||||
|
if (minimumServerProtocolVersion != other.minimumServerProtocolVersion) return false
|
||||||
|
if (trackRpcCallSites != other.trackRpcCallSites) return false
|
||||||
|
if (reapInterval != other.reapInterval) return false
|
||||||
|
if (observationExecutorPoolSize != other.observationExecutorPoolSize) return false
|
||||||
|
if (cacheConcurrencyLevel != other.cacheConcurrencyLevel) return false
|
||||||
|
if (connectionRetryInterval != other.connectionRetryInterval) return false
|
||||||
|
if (connectionRetryIntervalMultiplier != other.connectionRetryIntervalMultiplier) return false
|
||||||
|
if (connectionMaxRetryInterval != other.connectionMaxRetryInterval) return false
|
||||||
|
if (maxReconnectAttempts != other.maxReconnectAttempts) return false
|
||||||
|
if (maxFileSize != other.maxFileSize) return false
|
||||||
|
if (deduplicationCacheExpiry != other.deduplicationCacheExpiry) return false
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun hashCode(): Int {
|
||||||
|
var result = minimumServerProtocolVersion
|
||||||
|
result = 31 * result + trackRpcCallSites.hashCode()
|
||||||
|
result = 31 * result + reapInterval.hashCode()
|
||||||
|
result = 31 * result + observationExecutorPoolSize
|
||||||
|
result = 31 * result + cacheConcurrencyLevel
|
||||||
|
result = 31 * result + connectionRetryInterval.hashCode()
|
||||||
|
result = 31 * result + connectionRetryIntervalMultiplier.hashCode()
|
||||||
|
result = 31 * result + connectionMaxRetryInterval.hashCode()
|
||||||
|
result = 31 * result + maxReconnectAttempts
|
||||||
|
result = 31 * result + maxFileSize
|
||||||
|
result = 31 * result + deduplicationCacheExpiry.hashCode()
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun toString(): String {
|
||||||
|
return "CordaRPCClientConfiguration(" +
|
||||||
|
"minimumServerProtocolVersion=$minimumServerProtocolVersion, trackRpcCallSites=$trackRpcCallSites, " +
|
||||||
|
"reapInterval=$reapInterval, observationExecutorPoolSize=$observationExecutorPoolSize, " +
|
||||||
|
"cacheConcurrencyLevel=$cacheConcurrencyLevel, connectionRetryInterval=$connectionRetryInterval, " +
|
||||||
|
"connectionRetryIntervalMultiplier=$connectionRetryIntervalMultiplier, " +
|
||||||
|
"connectionMaxRetryInterval=$connectionMaxRetryInterval, maxReconnectAttempts=$maxReconnectAttempts, " +
|
||||||
|
"maxFileSize=$maxFileSize, deduplicationCacheExpiry=$deduplicationCacheExpiry)"
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -105,7 +220,7 @@ interface CordaRPCClientConfiguration {
|
|||||||
*/
|
*/
|
||||||
class CordaRPCClient private constructor(
|
class CordaRPCClient private constructor(
|
||||||
private val hostAndPort: NetworkHostAndPort,
|
private val hostAndPort: NetworkHostAndPort,
|
||||||
private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
private val sslConfiguration: ClientRpcSslOptions? = null,
|
private val sslConfiguration: ClientRpcSslOptions? = null,
|
||||||
private val nodeSslConfiguration: SSLConfiguration? = null,
|
private val nodeSslConfiguration: SSLConfiguration? = null,
|
||||||
private val classLoader: ClassLoader? = null,
|
private val classLoader: ClassLoader? = null,
|
||||||
@ -114,7 +229,7 @@ class CordaRPCClient private constructor(
|
|||||||
) {
|
) {
|
||||||
@JvmOverloads
|
@JvmOverloads
|
||||||
constructor(hostAndPort: NetworkHostAndPort,
|
constructor(hostAndPort: NetworkHostAndPort,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default())
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT)
|
||||||
: this(hostAndPort, configuration, null)
|
: this(hostAndPort, configuration, null)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -124,13 +239,13 @@ class CordaRPCClient private constructor(
|
|||||||
* @param configuration An optional configuration used to tweak client behaviour.
|
* @param configuration An optional configuration used to tweak client behaviour.
|
||||||
*/
|
*/
|
||||||
@JvmOverloads
|
@JvmOverloads
|
||||||
constructor(haAddressPool: List<NetworkHostAndPort>, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool)
|
constructor(haAddressPool: List<NetworkHostAndPort>, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool)
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
fun createWithSsl(
|
fun createWithSsl(
|
||||||
hostAndPort: NetworkHostAndPort,
|
hostAndPort: NetworkHostAndPort,
|
||||||
sslConfiguration: ClientRpcSslOptions,
|
sslConfiguration: ClientRpcSslOptions,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||||
): CordaRPCClient {
|
): CordaRPCClient {
|
||||||
return CordaRPCClient(hostAndPort, configuration, sslConfiguration)
|
return CordaRPCClient(hostAndPort, configuration, sslConfiguration)
|
||||||
}
|
}
|
||||||
@ -138,14 +253,14 @@ class CordaRPCClient private constructor(
|
|||||||
fun createWithSsl(
|
fun createWithSsl(
|
||||||
haAddressPool: List<NetworkHostAndPort>,
|
haAddressPool: List<NetworkHostAndPort>,
|
||||||
sslConfiguration: ClientRpcSslOptions,
|
sslConfiguration: ClientRpcSslOptions,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||||
): CordaRPCClient {
|
): CordaRPCClient {
|
||||||
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, haAddressPool = haAddressPool)
|
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, haAddressPool = haAddressPool)
|
||||||
}
|
}
|
||||||
|
|
||||||
internal fun createWithSslAndClassLoader(
|
internal fun createWithSslAndClassLoader(
|
||||||
hostAndPort: NetworkHostAndPort,
|
hostAndPort: NetworkHostAndPort,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
sslConfiguration: ClientRpcSslOptions? = null,
|
sslConfiguration: ClientRpcSslOptions? = null,
|
||||||
classLoader: ClassLoader? = null
|
classLoader: ClassLoader? = null
|
||||||
): CordaRPCClient {
|
): CordaRPCClient {
|
||||||
@ -154,7 +269,7 @@ class CordaRPCClient private constructor(
|
|||||||
|
|
||||||
internal fun createWithInternalSslAndClassLoader(
|
internal fun createWithInternalSslAndClassLoader(
|
||||||
hostAndPort: NetworkHostAndPort,
|
hostAndPort: NetworkHostAndPort,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
sslConfiguration: SSLConfiguration?,
|
sslConfiguration: SSLConfiguration?,
|
||||||
classLoader: ClassLoader? = null
|
classLoader: ClassLoader? = null
|
||||||
): CordaRPCClient {
|
): CordaRPCClient {
|
||||||
|
@ -12,14 +12,14 @@ import rx.Observable
|
|||||||
/** Utility which exposes the internal Corda RPC constructor to other internal Corda components */
|
/** Utility which exposes the internal Corda RPC constructor to other internal Corda components */
|
||||||
fun createCordaRPCClientWithSslAndClassLoader(
|
fun createCordaRPCClientWithSslAndClassLoader(
|
||||||
hostAndPort: NetworkHostAndPort,
|
hostAndPort: NetworkHostAndPort,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
sslConfiguration: ClientRpcSslOptions? = null,
|
sslConfiguration: ClientRpcSslOptions? = null,
|
||||||
classLoader: ClassLoader? = null
|
classLoader: ClassLoader? = null
|
||||||
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
|
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
|
||||||
|
|
||||||
fun createCordaRPCClientWithInternalSslAndClassLoader(
|
fun createCordaRPCClientWithInternalSslAndClassLoader(
|
||||||
hostAndPort: NetworkHostAndPort,
|
hostAndPort: NetworkHostAndPort,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
sslConfiguration: SSLConfiguration? = null,
|
sslConfiguration: SSLConfiguration? = null,
|
||||||
classLoader: ClassLoader? = null
|
classLoader: ClassLoader? = null
|
||||||
) = CordaRPCClient.createWithInternalSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
|
) = CordaRPCClient.createWithInternalSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
|
||||||
|
@ -23,69 +23,34 @@ import org.apache.activemq.artemis.api.core.SimpleString
|
|||||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||||
import java.lang.reflect.Proxy
|
import java.lang.reflect.Proxy
|
||||||
import java.time.Duration
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This configuration may be used to tweak the internals of the RPC client.
|
|
||||||
*/
|
|
||||||
data class CordaRPCClientConfigurationImpl(
|
|
||||||
override val minimumServerProtocolVersion: Int,
|
|
||||||
override val trackRpcCallSites: Boolean,
|
|
||||||
override val reapInterval: Duration,
|
|
||||||
override val observationExecutorPoolSize: Int,
|
|
||||||
override val connectionRetryInterval: Duration,
|
|
||||||
override val connectionRetryIntervalMultiplier: Double,
|
|
||||||
override val connectionMaxRetryInterval: Duration,
|
|
||||||
override val maxReconnectAttempts: Int,
|
|
||||||
override val maxFileSize: Int,
|
|
||||||
override val deduplicationCacheExpiry: Duration
|
|
||||||
) : CordaRPCClientConfiguration {
|
|
||||||
companion object {
|
|
||||||
private const val unlimitedReconnectAttempts = -1
|
|
||||||
@JvmStatic
|
|
||||||
val default = CordaRPCClientConfigurationImpl(
|
|
||||||
minimumServerProtocolVersion = 0,
|
|
||||||
trackRpcCallSites = false,
|
|
||||||
reapInterval = 1.seconds,
|
|
||||||
observationExecutorPoolSize = 4,
|
|
||||||
connectionRetryInterval = 5.seconds,
|
|
||||||
connectionRetryIntervalMultiplier = 1.5,
|
|
||||||
connectionMaxRetryInterval = 3.minutes,
|
|
||||||
maxReconnectAttempts = unlimitedReconnectAttempts,
|
|
||||||
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
|
|
||||||
maxFileSize = 10485760,
|
|
||||||
deduplicationCacheExpiry = 1.days
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This runs on the client JVM
|
* This runs on the client JVM
|
||||||
*/
|
*/
|
||||||
class RPCClient<I : RPCOps>(
|
class RPCClient<I : RPCOps>(
|
||||||
val transport: TransportConfiguration,
|
val transport: TransportConfiguration,
|
||||||
val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
|
val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT,
|
val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT,
|
||||||
val haPoolTransportConfigurations: List<TransportConfiguration> = emptyList()
|
val haPoolTransportConfigurations: List<TransportConfiguration> = emptyList()
|
||||||
) {
|
) {
|
||||||
constructor(
|
constructor(
|
||||||
hostAndPort: NetworkHostAndPort,
|
hostAndPort: NetworkHostAndPort,
|
||||||
sslConfiguration: ClientRpcSslOptions? = null,
|
sslConfiguration: ClientRpcSslOptions? = null,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
|
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
|
||||||
) : this(rpcConnectorTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext)
|
) : this(rpcConnectorTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext)
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
hostAndPort: NetworkHostAndPort,
|
hostAndPort: NetworkHostAndPort,
|
||||||
sslConfiguration: SSLConfiguration,
|
sslConfiguration: SSLConfiguration,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
|
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
|
||||||
) : this(rpcInternalClientTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext)
|
) : this(rpcInternalClientTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext)
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
haAddressPool: List<NetworkHostAndPort>,
|
haAddressPool: List<NetworkHostAndPort>,
|
||||||
sslConfiguration: ClientRpcSslOptions? = null,
|
sslConfiguration: ClientRpcSslOptions? = null,
|
||||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
|
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
|
||||||
) : this(rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration),
|
) : this(rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration),
|
||||||
configuration, serializationContext, rpcConnectorTcpTransportsFromList(haAddressPool, sslConfiguration))
|
configuration, serializationContext, rpcConnectorTcpTransportsFromList(haAddressPool, sslConfiguration))
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
|
||||||
import net.corda.core.internal.concurrent.flatMap
|
import net.corda.core.internal.concurrent.flatMap
|
||||||
import net.corda.core.internal.concurrent.map
|
import net.corda.core.internal.concurrent.map
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
@ -44,8 +43,8 @@ open class AbstractRPCTest {
|
|||||||
inline fun <reified I : RPCOps> RPCDriverDSL.testProxy(
|
inline fun <reified I : RPCOps> RPCDriverDSL.testProxy(
|
||||||
ops: I,
|
ops: I,
|
||||||
rpcUser: User = rpcTestUser,
|
rpcUser: User = rpcTestUser,
|
||||||
clientConfiguration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default,
|
clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||||
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.default
|
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT
|
||||||
): TestProxy<I> {
|
): TestProxy<I> {
|
||||||
return when (mode) {
|
return when (mode) {
|
||||||
RPCTestMode.InVm ->
|
RPCTestMode.InVm ->
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
|
||||||
import net.corda.core.crypto.random63BitValue
|
import net.corda.core.crypto.random63BitValue
|
||||||
import net.corda.core.internal.concurrent.fork
|
import net.corda.core.internal.concurrent.fork
|
||||||
import net.corda.core.internal.concurrent.transpose
|
import net.corda.core.internal.concurrent.transpose
|
||||||
@ -90,10 +89,10 @@ class RPCConcurrencyTests : AbstractRPCTest() {
|
|||||||
private fun RPCDriverDSL.testProxy(): TestProxy<TestOps> {
|
private fun RPCDriverDSL.testProxy(): TestProxy<TestOps> {
|
||||||
return testProxy<TestOps>(
|
return testProxy<TestOps>(
|
||||||
TestOpsImpl(pool),
|
TestOpsImpl(pool),
|
||||||
clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(
|
clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||||
reapInterval = 100.millis
|
reapInterval = 100.millis
|
||||||
),
|
),
|
||||||
serverConfiguration = RPCServerConfiguration.default.copy(
|
serverConfiguration = RPCServerConfiguration.DEFAULT.copy(
|
||||||
rpcThreadPoolSize = 4
|
rpcThreadPoolSize = 4
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package net.corda.client.rpc
|
package net.corda.client.rpc
|
||||||
|
|
||||||
import com.google.common.base.Stopwatch
|
import com.google.common.base.Stopwatch
|
||||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.utilities.minutes
|
import net.corda.core.utilities.minutes
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.utilities.seconds
|
||||||
@ -42,7 +41,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun RPCDriverDSL.testProxy(
|
private fun RPCDriverDSL.testProxy(
|
||||||
clientConfiguration: CordaRPCClientConfigurationImpl,
|
clientConfiguration: CordaRPCClientConfiguration,
|
||||||
serverConfiguration: RPCServerConfiguration
|
serverConfiguration: RPCServerConfiguration
|
||||||
): TestProxy<TestOps> {
|
): TestProxy<TestOps> {
|
||||||
return testProxy<TestOps>(
|
return testProxy<TestOps>(
|
||||||
@ -55,8 +54,8 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
|||||||
private fun warmup() {
|
private fun warmup() {
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val proxy = testProxy(
|
val proxy = testProxy(
|
||||||
CordaRPCClientConfigurationImpl.default,
|
CordaRPCClientConfiguration.DEFAULT,
|
||||||
RPCServerConfiguration.default
|
RPCServerConfiguration.DEFAULT
|
||||||
)
|
)
|
||||||
val executor = Executors.newFixedThreadPool(4)
|
val executor = Executors.newFixedThreadPool(4)
|
||||||
val N = 10000
|
val N = 10000
|
||||||
@ -85,10 +84,10 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
|||||||
measure(inputOutputSizes, (1..5)) { inputOutputSize, _ ->
|
measure(inputOutputSizes, (1..5)) { inputOutputSize, _ ->
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val proxy = testProxy(
|
val proxy = testProxy(
|
||||||
CordaRPCClientConfigurationImpl.default.copy(
|
CordaRPCClientConfiguration.DEFAULT.copy(
|
||||||
observationExecutorPoolSize = 2
|
observationExecutorPoolSize = 2
|
||||||
),
|
),
|
||||||
RPCServerConfiguration.default.copy(
|
RPCServerConfiguration.DEFAULT.copy(
|
||||||
rpcThreadPoolSize = 8
|
rpcThreadPoolSize = 8
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -124,10 +123,10 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
|||||||
rpcDriver {
|
rpcDriver {
|
||||||
val metricRegistry = startReporter(shutdownManager)
|
val metricRegistry = startReporter(shutdownManager)
|
||||||
val proxy = testProxy(
|
val proxy = testProxy(
|
||||||
CordaRPCClientConfigurationImpl.default.copy(
|
CordaRPCClientConfiguration.DEFAULT.copy(
|
||||||
reapInterval = 1.seconds
|
reapInterval = 1.seconds
|
||||||
),
|
),
|
||||||
RPCServerConfiguration.default.copy(
|
RPCServerConfiguration.DEFAULT.copy(
|
||||||
rpcThreadPoolSize = 8
|
rpcThreadPoolSize = 8
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
@ -156,8 +155,8 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
|||||||
// TODO this hangs with more parallelism
|
// TODO this hangs with more parallelism
|
||||||
rpcDriver {
|
rpcDriver {
|
||||||
val proxy = testProxy(
|
val proxy = testProxy(
|
||||||
CordaRPCClientConfigurationImpl.default,
|
CordaRPCClientConfiguration.DEFAULT,
|
||||||
RPCServerConfiguration.default
|
RPCServerConfiguration.DEFAULT
|
||||||
)
|
)
|
||||||
val numberOfMessages = 1000
|
val numberOfMessages = 1000
|
||||||
val bigSize = 10_000_000
|
val bigSize = 10_000_000
|
||||||
|
@ -160,9 +160,9 @@ class Node(
|
|||||||
val user = config.users.first()
|
val user = config.users.first()
|
||||||
val address = config.nodeInterface
|
val address = config.nodeInterface
|
||||||
val targetHost = NetworkHostAndPort(address.host, address.rpcPort)
|
val targetHost = NetworkHostAndPort(address.host, address.rpcPort)
|
||||||
val config = object : CordaRPCClientConfiguration {
|
val config = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||||
override val connectionMaxRetryInterval = 10.seconds
|
connectionMaxRetryInterval = 10.seconds
|
||||||
}
|
)
|
||||||
log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...")
|
log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...")
|
||||||
CordaRPCClient(targetHost, config).use(user.username, user.password) {
|
CordaRPCClient(targetHost, config).use(user.username, user.password) {
|
||||||
log.info("RPC connection to ${targetHost.host}:${targetHost.port} established")
|
log.info("RPC connection to ${targetHost.host}:${targetHost.port} established")
|
||||||
|
@ -206,7 +206,7 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
|
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
|
||||||
|
|
||||||
printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString())
|
printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString())
|
||||||
val rpcServerConfiguration = RPCServerConfiguration.default
|
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT
|
||||||
rpcServerAddresses?.let {
|
rpcServerAddresses?.let {
|
||||||
internalRpcMessagingClient = InternalRPCMessagingClient(configuration, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal), rpcServerConfiguration)
|
internalRpcMessagingClient = InternalRPCMessagingClient(configuration, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal), rpcServerConfiguration)
|
||||||
printBasicNodeInfo("RPC connection address", it.primary.toString())
|
printBasicNodeInfo("RPC connection address", it.primary.toString())
|
||||||
|
@ -71,7 +71,7 @@ data class RPCServerConfiguration(
|
|||||||
val deduplicationCacheExpiry: Duration
|
val deduplicationCacheExpiry: Duration
|
||||||
) {
|
) {
|
||||||
companion object {
|
companion object {
|
||||||
val default = RPCServerConfiguration(
|
val DEFAULT = RPCServerConfiguration(
|
||||||
rpcThreadPoolSize = 4,
|
rpcThreadPoolSize = 4,
|
||||||
reapInterval = 1.seconds,
|
reapInterval = 1.seconds,
|
||||||
deduplicationCacheExpiry = 1.days
|
deduplicationCacheExpiry = 1.days
|
||||||
|
@ -105,7 +105,7 @@ class ArtemisRpcTests {
|
|||||||
}
|
}
|
||||||
artemisBroker.use { broker ->
|
artemisBroker.use { broker ->
|
||||||
broker.start()
|
broker.start()
|
||||||
InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.default).use { server ->
|
InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.DEFAULT).use { server ->
|
||||||
server.start(TestRpcOpsImpl(), securityManager, broker.serverControl)
|
server.start(TestRpcOpsImpl(), securityManager, broker.serverControl)
|
||||||
|
|
||||||
val client = RPCClient<TestRpcOps>(rpcConnectorTcpTransport(broker.addresses.primary, clientSslOptions))
|
val client = RPCClient<TestRpcOps>(rpcConnectorTcpTransport(broker.addresses.primary, clientSslOptions))
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package net.corda.testing.node.internal
|
package net.corda.testing.node.internal
|
||||||
|
|
||||||
import net.corda.client.mock.Generator
|
import net.corda.client.mock.Generator
|
||||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
@ -57,7 +57,7 @@ import net.corda.nodeapi.internal.config.User as InternalUser
|
|||||||
inline fun <reified I : RPCOps> RPCDriverDSL.startInVmRpcClient(
|
inline fun <reified I : RPCOps> RPCDriverDSL.startInVmRpcClient(
|
||||||
username: String = rpcTestUser.username,
|
username: String = rpcTestUser.username,
|
||||||
password: String = rpcTestUser.password,
|
password: String = rpcTestUser.password,
|
||||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||||
) = startInVmRpcClient(I::class.java, username, password, configuration)
|
) = startInVmRpcClient(I::class.java, username, password, configuration)
|
||||||
|
|
||||||
inline fun <reified I : RPCOps> RPCDriverDSL.startRandomRpcClient(
|
inline fun <reified I : RPCOps> RPCDriverDSL.startRandomRpcClient(
|
||||||
@ -70,14 +70,14 @@ inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
|
|||||||
rpcAddress: NetworkHostAndPort,
|
rpcAddress: NetworkHostAndPort,
|
||||||
username: String = rpcTestUser.username,
|
username: String = rpcTestUser.username,
|
||||||
password: String = rpcTestUser.password,
|
password: String = rpcTestUser.password,
|
||||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||||
) = startRpcClient(I::class.java, rpcAddress, username, password, configuration)
|
) = startRpcClient(I::class.java, rpcAddress, username, password, configuration)
|
||||||
|
|
||||||
inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
|
inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
|
||||||
haAddressPool: List<NetworkHostAndPort>,
|
haAddressPool: List<NetworkHostAndPort>,
|
||||||
username: String = rpcTestUser.username,
|
username: String = rpcTestUser.username,
|
||||||
password: String = rpcTestUser.password,
|
password: String = rpcTestUser.password,
|
||||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||||
) = startRpcClient(I::class.java, haAddressPool, username, password, configuration)
|
) = startRpcClient(I::class.java, haAddressPool, username, password, configuration)
|
||||||
|
|
||||||
data class RpcBrokerHandle(
|
data class RpcBrokerHandle(
|
||||||
@ -239,7 +239,7 @@ data class RPCDriverDSL(
|
|||||||
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
||||||
maxFileSize: Int = MAX_MESSAGE_SIZE,
|
maxFileSize: Int = MAX_MESSAGE_SIZE,
|
||||||
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
|
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
|
||||||
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
|
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
||||||
ops: I
|
ops: I
|
||||||
): CordaFuture<RpcServerHandle> {
|
): CordaFuture<RpcServerHandle> {
|
||||||
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
|
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
|
||||||
@ -259,7 +259,7 @@ data class RPCDriverDSL(
|
|||||||
rpcOpsClass: Class<I>,
|
rpcOpsClass: Class<I>,
|
||||||
username: String = rpcTestUser.username,
|
username: String = rpcTestUser.username,
|
||||||
password: String = rpcTestUser.password,
|
password: String = rpcTestUser.password,
|
||||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||||
): CordaFuture<I> {
|
): CordaFuture<I> {
|
||||||
return driverDSL.executorService.fork {
|
return driverDSL.executorService.fork {
|
||||||
val client = RPCClient<I>(inVmClientTransportConfiguration, configuration)
|
val client = RPCClient<I>(inVmClientTransportConfiguration, configuration)
|
||||||
@ -307,7 +307,7 @@ data class RPCDriverDSL(
|
|||||||
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
||||||
maxFileSize: Int = MAX_MESSAGE_SIZE,
|
maxFileSize: Int = MAX_MESSAGE_SIZE,
|
||||||
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
|
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
|
||||||
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
|
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
||||||
customPort: NetworkHostAndPort? = null,
|
customPort: NetworkHostAndPort? = null,
|
||||||
ops: I
|
ops: I
|
||||||
): CordaFuture<RpcServerHandle> {
|
): CordaFuture<RpcServerHandle> {
|
||||||
@ -330,7 +330,7 @@ data class RPCDriverDSL(
|
|||||||
rpcAddress: NetworkHostAndPort,
|
rpcAddress: NetworkHostAndPort,
|
||||||
username: String = rpcTestUser.username,
|
username: String = rpcTestUser.username,
|
||||||
password: String = rpcTestUser.password,
|
password: String = rpcTestUser.password,
|
||||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||||
): CordaFuture<I> {
|
): CordaFuture<I> {
|
||||||
return driverDSL.executorService.fork {
|
return driverDSL.executorService.fork {
|
||||||
val client = RPCClient<I>(ArtemisTcpTransport.rpcConnectorTcpTransport(rpcAddress, null), configuration)
|
val client = RPCClient<I>(ArtemisTcpTransport.rpcConnectorTcpTransport(rpcAddress, null), configuration)
|
||||||
@ -356,7 +356,7 @@ data class RPCDriverDSL(
|
|||||||
haAddressPool: List<NetworkHostAndPort>,
|
haAddressPool: List<NetworkHostAndPort>,
|
||||||
username: String = rpcTestUser.username,
|
username: String = rpcTestUser.username,
|
||||||
password: String = rpcTestUser.password,
|
password: String = rpcTestUser.password,
|
||||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||||
): CordaFuture<I> {
|
): CordaFuture<I> {
|
||||||
return driverDSL.executorService.fork {
|
return driverDSL.executorService.fork {
|
||||||
val client = RPCClient<I>(haAddressPool, null, configuration)
|
val client = RPCClient<I>(haAddressPool, null, configuration)
|
||||||
@ -462,7 +462,7 @@ data class RPCDriverDSL(
|
|||||||
fun <I : RPCOps> startRpcServerWithBrokerRunning(
|
fun <I : RPCOps> startRpcServerWithBrokerRunning(
|
||||||
rpcUser: User = rpcTestUser,
|
rpcUser: User = rpcTestUser,
|
||||||
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
||||||
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
|
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
||||||
ops: I,
|
ops: I,
|
||||||
brokerHandle: RpcBrokerHandle
|
brokerHandle: RpcBrokerHandle
|
||||||
): RpcServerHandle {
|
): RpcServerHandle {
|
||||||
|
@ -91,9 +91,9 @@ object InteractiveShell {
|
|||||||
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
|
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
|
||||||
rpcOps = { username: String, credentials: String ->
|
rpcOps = { username: String, credentials: String ->
|
||||||
val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort,
|
val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort,
|
||||||
configuration = object : CordaRPCClientConfiguration {
|
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||||
override val maxReconnectAttempts = 1
|
maxReconnectAttempts = 1
|
||||||
},
|
),
|
||||||
sslConfiguration = configuration.ssl,
|
sslConfiguration = configuration.ssl,
|
||||||
classLoader = classLoader)
|
classLoader = classLoader)
|
||||||
this.connection = client.start(username, credentials)
|
this.connection = client.start(username, credentials)
|
||||||
@ -109,9 +109,9 @@ object InteractiveShell {
|
|||||||
fun startShellInternal(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
|
fun startShellInternal(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
|
||||||
rpcOps = { username: String, credentials: String ->
|
rpcOps = { username: String, credentials: String ->
|
||||||
val client = createCordaRPCClientWithInternalSslAndClassLoader(hostAndPort = configuration.hostAndPort,
|
val client = createCordaRPCClientWithInternalSslAndClassLoader(hostAndPort = configuration.hostAndPort,
|
||||||
configuration = object : CordaRPCClientConfiguration {
|
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||||
override val maxReconnectAttempts = 1
|
maxReconnectAttempts = 1
|
||||||
},
|
),
|
||||||
sslConfiguration = configuration.nodeSslConfig,
|
sslConfiguration = configuration.nodeSslConfig,
|
||||||
classLoader = classLoader)
|
classLoader = classLoader)
|
||||||
this.connection = client.start(username, credentials)
|
this.connection = client.start(username, credentials)
|
||||||
@ -302,7 +302,7 @@ object InteractiveShell {
|
|||||||
} catch (e: PermissionException) {
|
} catch (e: PermissionException) {
|
||||||
output.println(e.message ?: "Access denied", Color.red)
|
output.println(e.message ?: "Access denied", Color.red)
|
||||||
} catch (e: ExecutionException) {
|
} catch (e: ExecutionException) {
|
||||||
// ignoring it as already logged by the progress handler subscriber
|
// ignoring it as already logged by the progress handler subscriber
|
||||||
} finally {
|
} finally {
|
||||||
InputStreamDeserializer.closeAll()
|
InputStreamDeserializer.closeAll()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user