Merge pull request #3359 from corda/tlil/CORDA-1609/fix-rpc-config-api-break

CORDA-1609 - Don't use reserved keyword as method name
This commit is contained in:
Tommy Lillehagen 2018-06-13 18:25:23 +01:00 committed by GitHub
commit 18cfcb887d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 235 additions and 160 deletions

View File

@ -6527,27 +6527,22 @@ public static final class net.corda.client.rpc.CordaRPCClient$Companion extends
@NotNull
public final net.corda.client.rpc.CordaRPCClient createWithSsl(net.corda.core.utilities.NetworkHostAndPort, net.corda.core.messaging.ClientRpcSslOptions, net.corda.client.rpc.CordaRPCClientConfiguration)
##
public interface net.corda.client.rpc.CordaRPCClientConfiguration
public abstract int getCacheConcurrencyLevel()
public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.Object
public <init>(java.time.Duration)
@NotNull
public abstract java.time.Duration getConnectionMaxRetryInterval()
public final java.time.Duration component1()
@NotNull
public abstract java.time.Duration getConnectionRetryInterval()
public abstract double getConnectionRetryIntervalMultiplier()
public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration)
public boolean equals(Object)
@NotNull
public abstract java.time.Duration getDeduplicationCacheExpiry()
public abstract int getMaxFileSize()
public abstract int getMaxReconnectAttempts()
public abstract int getMinimumServerProtocolVersion()
public abstract int getObservationExecutorPoolSize()
@NotNull
public abstract java.time.Duration getReapInterval()
public abstract boolean getTrackRpcCallSites()
public java.time.Duration getConnectionMaxRetryInterval()
public int hashCode()
public String toString()
public static final net.corda.client.rpc.CordaRPCClientConfiguration$Companion Companion
@NotNull
public static final net.corda.client.rpc.CordaRPCClientConfiguration DEFAULT
##
public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object
@NotNull
public final net.corda.client.rpc.CordaRPCClientConfiguration default()
##
@DoNotImplement
public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection

View File

@ -71,7 +71,7 @@ class NodeMonitorModel {
// Only execute using "runLater()" if JavaFX been initialized.
// It may not be initialized in the unit test.
// Also if we are already in the JavaFX thread - perform direct invocation without postponing it.
if(initialized.value.get() && !Platform.isFxApplicationThread()) {
if (initialized.value.get() && !Platform.isFxApplicationThread()) {
Platform.runLater(op)
} else {
op()
@ -98,7 +98,7 @@ class NodeMonitorModel {
// Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity.
proxyObservable.addListener { _, _, wrapper ->
if(wrapper != null) {
if (wrapper != null) {
val proxy = wrapper.cordaRPCOps
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
@ -134,7 +134,8 @@ class NodeMonitorModel {
}
val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate ->
if (stateMachineUpdate is StateMachineUpdate.Added) {
ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty<ProgressTrackingEvent>()
ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo)
?: Observable.empty<ProgressTrackingEvent>()
} else {
Observable.empty<ProgressTrackingEvent>()
}
@ -183,29 +184,28 @@ class NodeMonitorModel {
logger.info("Connecting to: $nodeHostAndPort")
val client = CordaRPCClient(
nodeHostAndPort,
object : CordaRPCClientConfiguration {
override val connectionMaxRetryInterval = retryInterval
}
CordaRPCClientConfiguration.DEFAULT.copy(
connectionMaxRetryInterval = retryInterval
)
)
val _connection = client.start(username, password)
// Check connection is truly operational before returning it.
val nodeInfo = _connection.proxy.nodeInfo()
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
_connection
} catch(secEx: ActiveMQException) {
} catch (secEx: ActiveMQException) {
// Happens when:
// * incorrect credentials provided;
// * incorrect endpoint specified;
// - no point to retry connecting.
throw secEx
}
catch(th: Throwable) {
} catch (th: Throwable) {
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
logger.info("Exception upon establishing connection: " + th.message)
null
}
if(connection != null) {
if (connection != null) {
logger.info("Connection successfully established with: $nodeHostAndPort")
return connection
}

View File

@ -51,9 +51,9 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
@Before
fun setUp() {
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, object : CordaRPCClientConfiguration {
override val maxReconnectAttempts = 5
})
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 5
))
identity = node.info.identityFromX500Name(ALICE_NAME)
}

View File

@ -1,7 +1,6 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.context.Trace
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
@ -106,7 +105,7 @@ class RPCStabilityTests {
Try.on {
startRpcClient<RPCOps>(
server.get().broker.hostAndPort!!,
configuration = CordaRPCClientConfigurationImpl.default.copy(minimumServerProtocolVersion = 1)
configuration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = 1)
).get()
}
}
@ -129,7 +128,7 @@ class RPCStabilityTests {
rpcDriver {
fun startAndCloseServer(broker: RpcBrokerHandle) {
startRpcServerWithBrokerRunning(
configuration = RPCServerConfiguration.default,
configuration = RPCServerConfiguration.DEFAULT,
ops = DummyOps,
brokerHandle = broker
).rpcServer.close()
@ -150,7 +149,7 @@ class RPCStabilityTests {
@Test
fun `rpc client close doesnt leak broker resources`() {
rpcDriver {
val server = startRpcServer(configuration = RPCServerConfiguration.default, ops = DummyOps).get()
val server = startRpcServer(configuration = RPCServerConfiguration.DEFAULT, ops = DummyOps).get()
RPCClient<RPCOps>(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close()
val initial = server.broker.getStats()
repeat(100) {
@ -241,7 +240,7 @@ class RPCStabilityTests {
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
// Set retry interval to 1s to reduce test duration
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds)
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
@ -266,7 +265,7 @@ class RPCStabilityTests {
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
// Set retry interval to 1s to reduce test duration
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
@ -298,7 +297,7 @@ class RPCStabilityTests {
val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
@ -453,7 +452,7 @@ class RPCStabilityTests {
}
}
val server = startRpcServer<TrackSubscriberOps>(
configuration = RPCServerConfiguration.default.copy(
configuration = RPCServerConfiguration.DEFAULT.copy(
reapInterval = 100.millis
),
ops = trackSubscriberOpsImpl

View File

@ -1,6 +1,5 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.context.Actor
@ -10,6 +9,9 @@ import net.corda.core.serialization.internal.effectiveSerializationEnv
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.nodeapi.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
import net.corda.core.messaging.ClientRpcSslOptions
import net.corda.core.utilities.days
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
import java.time.Duration
@ -24,46 +26,163 @@ class CordaRPCConnection internal constructor(connection: RPCConnection<CordaRPC
/**
* Can be used to configure the RPC client connection.
*/
interface CordaRPCClientConfiguration {
open class CordaRPCClientConfiguration @JvmOverloads constructor(
/** The minimum protocol version required from the server */
val minimumServerProtocolVersion: Int get() = default().minimumServerProtocolVersion
/**
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
* constructing call stacks is a moderately expensive operation.
*/
val trackRpcCallSites: Boolean get() = default().trackRpcCallSites
/**
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
* duration. If set too low it wastes client side cycles.
*/
val reapInterval: Duration get() = default().reapInterval
/** The number of threads to use for observations (for executing [Observable.onNext]) */
val observationExecutorPoolSize: Int get() = default().observationExecutorPoolSize
/**
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
* See the implementation of [com.google.common.cache.LocalCache] for details.
*/
val cacheConcurrencyLevel: Int get() = default().cacheConcurrencyLevel
/** The retry interval of artemis connections in milliseconds */
val connectionRetryInterval: Duration get() = default().connectionRetryInterval
/** The retry interval multiplier for exponential backoff */
val connectionRetryIntervalMultiplier: Double get() = default().connectionRetryIntervalMultiplier
/** Maximum retry interval */
val connectionMaxRetryInterval: Duration get() = default().connectionMaxRetryInterval
/** Maximum reconnect attempts on failover */
val maxReconnectAttempts: Int get() = default().maxReconnectAttempts
/** Maximum file size */
val maxFileSize: Int get() = default().maxFileSize
/** The cache expiry of a deduplication watermark per client. */
val deduplicationCacheExpiry: Duration get() = default().deduplicationCacheExpiry
/**
* Maximum retry interval.
*/
open val connectionMaxRetryInterval: Duration = 3.minutes,
/**
* The minimum protocol version required from the server.
*/
open val minimumServerProtocolVersion: Int = 0,
/**
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
* constructing call stacks is a moderately expensive operation.
*/
open val trackRpcCallSites: Boolean = false,
/**
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
* duration. If set too low it wastes client side cycles.
*/
open val reapInterval: Duration = 1.seconds,
/**
* The number of threads to use for observations (for executing [Observable.onNext]).
*/
open val observationExecutorPoolSize: Int = 4,
/**
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
* See the implementation of [com.google.common.cache.LocalCache] for details.
*/
open val cacheConcurrencyLevel: Int = 1,
/**
* The retry interval of Artemis connections in milliseconds.
*/
open val connectionRetryInterval: Duration = 5.seconds,
/**
* The retry interval multiplier for exponential backoff.
*/
open val connectionRetryIntervalMultiplier: Double = 1.5,
/**
* Maximum reconnect attempts on failover>
*/
open val maxReconnectAttempts: Int = unlimitedReconnectAttempts,
/**
* Maximum file size, in bytes.
*/
open val maxFileSize: Int = 10485760,
// 10 MiB maximum allowed file size for attachments, including message headers.
// TODO: acquire this value from Network Map when supported.
/**
* The cache expiry of a deduplication watermark per client.
*/
open val deduplicationCacheExpiry: Duration = 1.days
) {
companion object {
fun default(): CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default
private const val unlimitedReconnectAttempts = -1
@JvmField
val DEFAULT: CordaRPCClientConfiguration = CordaRPCClientConfiguration()
}
/**
* Create a new copy of a configuration object with zero or more parameters modified.
*/
@JvmOverloads
fun copy(
connectionMaxRetryInterval: Duration = this.connectionMaxRetryInterval,
minimumServerProtocolVersion: Int = this.minimumServerProtocolVersion,
trackRpcCallSites: Boolean = this.trackRpcCallSites,
reapInterval: Duration = this.reapInterval,
observationExecutorPoolSize: Int = this.observationExecutorPoolSize,
cacheConcurrencyLevel: Int = this.cacheConcurrencyLevel,
connectionRetryInterval: Duration = this.connectionRetryInterval,
connectionRetryIntervalMultiplier: Double = this.connectionRetryIntervalMultiplier,
maxReconnectAttempts: Int = this.maxReconnectAttempts,
maxFileSize: Int = this.maxFileSize,
deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry
): CordaRPCClientConfiguration {
return CordaRPCClientConfiguration(
connectionMaxRetryInterval,
minimumServerProtocolVersion,
trackRpcCallSites,
reapInterval,
observationExecutorPoolSize,
cacheConcurrencyLevel,
connectionRetryInterval,
connectionRetryIntervalMultiplier,
maxReconnectAttempts,
maxFileSize,
deduplicationCacheExpiry
)
}
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false
other as CordaRPCClientConfiguration
if (connectionMaxRetryInterval != other.connectionMaxRetryInterval) return false
if (minimumServerProtocolVersion != other.minimumServerProtocolVersion) return false
if (trackRpcCallSites != other.trackRpcCallSites) return false
if (reapInterval != other.reapInterval) return false
if (observationExecutorPoolSize != other.observationExecutorPoolSize) return false
if (cacheConcurrencyLevel != other.cacheConcurrencyLevel) return false
if (connectionRetryInterval != other.connectionRetryInterval) return false
if (connectionRetryIntervalMultiplier != other.connectionRetryIntervalMultiplier) return false
if (maxReconnectAttempts != other.maxReconnectAttempts) return false
if (maxFileSize != other.maxFileSize) return false
if (deduplicationCacheExpiry != other.deduplicationCacheExpiry) return false
return true
}
override fun hashCode(): Int {
var result = minimumServerProtocolVersion
result = 31 * result + connectionMaxRetryInterval.hashCode()
result = 31 * result + trackRpcCallSites.hashCode()
result = 31 * result + reapInterval.hashCode()
result = 31 * result + observationExecutorPoolSize
result = 31 * result + cacheConcurrencyLevel
result = 31 * result + connectionRetryInterval.hashCode()
result = 31 * result + connectionRetryIntervalMultiplier.hashCode()
result = 31 * result + maxReconnectAttempts
result = 31 * result + maxFileSize
result = 31 * result + deduplicationCacheExpiry.hashCode()
return result
}
override fun toString(): String {
return "CordaRPCClientConfiguration(" +
"connectionMaxRetryInterval=$connectionMaxRetryInterval, " +
"minimumServerProtocolVersion=$minimumServerProtocolVersion, trackRpcCallSites=$trackRpcCallSites, " +
"reapInterval=$reapInterval, observationExecutorPoolSize=$observationExecutorPoolSize, " +
"cacheConcurrencyLevel=$cacheConcurrencyLevel, connectionRetryInterval=$connectionRetryInterval, " +
"connectionRetryIntervalMultiplier=$connectionRetryIntervalMultiplier, " +
"maxReconnectAttempts=$maxReconnectAttempts, maxFileSize=$maxFileSize, " +
"deduplicationCacheExpiry=$deduplicationCacheExpiry)"
}
// Left is for backwards compatibility with version 3.1
operator fun component1() = connectionMaxRetryInterval
}
/**
@ -105,7 +224,7 @@ interface CordaRPCClientConfiguration {
*/
class CordaRPCClient private constructor(
private val hostAndPort: NetworkHostAndPort,
private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
private val sslConfiguration: ClientRpcSslOptions? = null,
private val nodeSslConfiguration: SSLConfiguration? = null,
private val classLoader: ClassLoader? = null,
@ -114,7 +233,7 @@ class CordaRPCClient private constructor(
) {
@JvmOverloads
constructor(hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default())
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT)
: this(hostAndPort, configuration, null)
/**
@ -124,13 +243,13 @@ class CordaRPCClient private constructor(
* @param configuration An optional configuration used to tweak client behaviour.
*/
@JvmOverloads
constructor(haAddressPool: List<NetworkHostAndPort>, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool)
constructor(haAddressPool: List<NetworkHostAndPort>, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool)
companion object {
fun createWithSsl(
hostAndPort: NetworkHostAndPort,
sslConfiguration: ClientRpcSslOptions,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
): CordaRPCClient {
return CordaRPCClient(hostAndPort, configuration, sslConfiguration)
}
@ -138,14 +257,14 @@ class CordaRPCClient private constructor(
fun createWithSsl(
haAddressPool: List<NetworkHostAndPort>,
sslConfiguration: ClientRpcSslOptions,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
): CordaRPCClient {
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, haAddressPool = haAddressPool)
}
internal fun createWithSslAndClassLoader(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
): CordaRPCClient {
@ -154,7 +273,7 @@ class CordaRPCClient private constructor(
internal fun createWithInternalSslAndClassLoader(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: SSLConfiguration?,
classLoader: ClassLoader? = null
): CordaRPCClient {

View File

@ -12,14 +12,14 @@ import rx.Observable
/** Utility which exposes the internal Corda RPC constructor to other internal Corda components */
fun createCordaRPCClientWithSslAndClassLoader(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: ClientRpcSslOptions? = null,
classLoader: ClassLoader? = null
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
fun createCordaRPCClientWithInternalSslAndClassLoader(
hostAndPort: NetworkHostAndPort,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
sslConfiguration: SSLConfiguration? = null,
classLoader: ClassLoader? = null
) = CordaRPCClient.createWithInternalSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)

View File

@ -23,69 +23,34 @@ import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import java.lang.reflect.Proxy
import java.time.Duration
/**
* This configuration may be used to tweak the internals of the RPC client.
*/
data class CordaRPCClientConfigurationImpl(
override val minimumServerProtocolVersion: Int,
override val trackRpcCallSites: Boolean,
override val reapInterval: Duration,
override val observationExecutorPoolSize: Int,
override val connectionRetryInterval: Duration,
override val connectionRetryIntervalMultiplier: Double,
override val connectionMaxRetryInterval: Duration,
override val maxReconnectAttempts: Int,
override val maxFileSize: Int,
override val deduplicationCacheExpiry: Duration
) : CordaRPCClientConfiguration {
companion object {
private const val unlimitedReconnectAttempts = -1
@JvmStatic
val default = CordaRPCClientConfigurationImpl(
minimumServerProtocolVersion = 0,
trackRpcCallSites = false,
reapInterval = 1.seconds,
observationExecutorPoolSize = 4,
connectionRetryInterval = 5.seconds,
connectionRetryIntervalMultiplier = 1.5,
connectionMaxRetryInterval = 3.minutes,
maxReconnectAttempts = unlimitedReconnectAttempts,
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
maxFileSize = 10485760,
deduplicationCacheExpiry = 1.days
)
}
}
/**
* This runs on the client JVM
*/
class RPCClient<I : RPCOps>(
val transport: TransportConfiguration,
val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT,
val haPoolTransportConfigurations: List<TransportConfiguration> = emptyList()
) {
constructor(
hostAndPort: NetworkHostAndPort,
sslConfiguration: ClientRpcSslOptions? = null,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) : this(rpcConnectorTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext)
constructor(
hostAndPort: NetworkHostAndPort,
sslConfiguration: SSLConfiguration,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) : this(rpcInternalClientTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext)
constructor(
haAddressPool: List<NetworkHostAndPort>,
sslConfiguration: ClientRpcSslOptions? = null,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
) : this(rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration),
configuration, serializationContext, rpcConnectorTcpTransportsFromList(haAddressPool, sslConfiguration))

View File

@ -1,6 +1,5 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.RPCOps
@ -44,8 +43,8 @@ open class AbstractRPCTest {
inline fun <reified I : RPCOps> RPCDriverDSL.testProxy(
ops: I,
rpcUser: User = rpcTestUser,
clientConfiguration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default,
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.default
clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT
): TestProxy<I> {
return when (mode) {
RPCTestMode.InVm ->

View File

@ -1,6 +1,5 @@
package net.corda.client.rpc
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose
@ -90,10 +89,10 @@ class RPCConcurrencyTests : AbstractRPCTest() {
private fun RPCDriverDSL.testProxy(): TestProxy<TestOps> {
return testProxy<TestOps>(
TestOpsImpl(pool),
clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(
clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(
reapInterval = 100.millis
),
serverConfiguration = RPCServerConfiguration.default.copy(
serverConfiguration = RPCServerConfiguration.DEFAULT.copy(
rpcThreadPoolSize = 4
)
)

View File

@ -1,7 +1,6 @@
package net.corda.client.rpc
import com.google.common.base.Stopwatch
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.core.messaging.RPCOps
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
@ -42,7 +41,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
}
private fun RPCDriverDSL.testProxy(
clientConfiguration: CordaRPCClientConfigurationImpl,
clientConfiguration: CordaRPCClientConfiguration,
serverConfiguration: RPCServerConfiguration
): TestProxy<TestOps> {
return testProxy<TestOps>(
@ -55,8 +54,8 @@ class RPCPerformanceTests : AbstractRPCTest() {
private fun warmup() {
rpcDriver {
val proxy = testProxy(
CordaRPCClientConfigurationImpl.default,
RPCServerConfiguration.default
CordaRPCClientConfiguration.DEFAULT,
RPCServerConfiguration.DEFAULT
)
val executor = Executors.newFixedThreadPool(4)
val N = 10000
@ -85,10 +84,10 @@ class RPCPerformanceTests : AbstractRPCTest() {
measure(inputOutputSizes, (1..5)) { inputOutputSize, _ ->
rpcDriver {
val proxy = testProxy(
CordaRPCClientConfigurationImpl.default.copy(
CordaRPCClientConfiguration.DEFAULT.copy(
observationExecutorPoolSize = 2
),
RPCServerConfiguration.default.copy(
RPCServerConfiguration.DEFAULT.copy(
rpcThreadPoolSize = 8
)
)
@ -124,10 +123,10 @@ class RPCPerformanceTests : AbstractRPCTest() {
rpcDriver {
val metricRegistry = startReporter(shutdownManager)
val proxy = testProxy(
CordaRPCClientConfigurationImpl.default.copy(
CordaRPCClientConfiguration.DEFAULT.copy(
reapInterval = 1.seconds
),
RPCServerConfiguration.default.copy(
RPCServerConfiguration.DEFAULT.copy(
rpcThreadPoolSize = 8
)
)
@ -156,8 +155,8 @@ class RPCPerformanceTests : AbstractRPCTest() {
// TODO this hangs with more parallelism
rpcDriver {
val proxy = testProxy(
CordaRPCClientConfigurationImpl.default,
RPCServerConfiguration.default
CordaRPCClientConfiguration.DEFAULT,
RPCServerConfiguration.DEFAULT
)
val numberOfMessages = 1000
val bigSize = 10_000_000

View File

@ -160,9 +160,9 @@ class Node(
val user = config.users.first()
val address = config.nodeInterface
val targetHost = NetworkHostAndPort(address.host, address.rpcPort)
val config = object : CordaRPCClientConfiguration {
override val connectionMaxRetryInterval = 10.seconds
}
val config = CordaRPCClientConfiguration.DEFAULT.copy(
connectionMaxRetryInterval = 10.seconds
)
log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...")
CordaRPCClient(targetHost, config).use(user.username, user.password) {
log.info("RPC connection to ${targetHost.host}:${targetHost.port} established")

View File

@ -206,7 +206,7 @@ open class Node(configuration: NodeConfiguration,
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString())
val rpcServerConfiguration = RPCServerConfiguration.default
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT
rpcServerAddresses?.let {
internalRpcMessagingClient = InternalRPCMessagingClient(configuration, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal), rpcServerConfiguration)
printBasicNodeInfo("RPC connection address", it.primary.toString())

View File

@ -71,7 +71,7 @@ data class RPCServerConfiguration(
val deduplicationCacheExpiry: Duration
) {
companion object {
val default = RPCServerConfiguration(
val DEFAULT = RPCServerConfiguration(
rpcThreadPoolSize = 4,
reapInterval = 1.seconds,
deduplicationCacheExpiry = 1.days

View File

@ -105,7 +105,7 @@ class ArtemisRpcTests {
}
artemisBroker.use { broker ->
broker.start()
InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.default).use { server ->
InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.DEFAULT).use { server ->
server.start(TestRpcOpsImpl(), securityManager, broker.serverControl)
val client = RPCClient<TestRpcOps>(rpcConnectorTcpTransport(broker.addresses.primary, clientSslOptions))

View File

@ -1,7 +1,7 @@
package net.corda.testing.node.internal
import net.corda.client.mock.Generator
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
import net.corda.client.rpc.CordaRPCClientConfiguration
import net.corda.client.rpc.internal.RPCClient
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
import net.corda.core.concurrent.CordaFuture
@ -57,7 +57,7 @@ import net.corda.nodeapi.internal.config.User as InternalUser
inline fun <reified I : RPCOps> RPCDriverDSL.startInVmRpcClient(
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
) = startInVmRpcClient(I::class.java, username, password, configuration)
inline fun <reified I : RPCOps> RPCDriverDSL.startRandomRpcClient(
@ -70,14 +70,14 @@ inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
) = startRpcClient(I::class.java, rpcAddress, username, password, configuration)
inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
haAddressPool: List<NetworkHostAndPort>,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
) = startRpcClient(I::class.java, haAddressPool, username, password, configuration)
data class RpcBrokerHandle(
@ -239,7 +239,7 @@ data class RPCDriverDSL(
nodeLegalName: CordaX500Name = fakeNodeLegalName,
maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
ops: I
): CordaFuture<RpcServerHandle> {
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
@ -259,7 +259,7 @@ data class RPCDriverDSL(
rpcOpsClass: Class<I>,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
): CordaFuture<I> {
return driverDSL.executorService.fork {
val client = RPCClient<I>(inVmClientTransportConfiguration, configuration)
@ -307,7 +307,7 @@ data class RPCDriverDSL(
nodeLegalName: CordaX500Name = fakeNodeLegalName,
maxFileSize: Int = MAX_MESSAGE_SIZE,
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
customPort: NetworkHostAndPort? = null,
ops: I
): CordaFuture<RpcServerHandle> {
@ -330,7 +330,7 @@ data class RPCDriverDSL(
rpcAddress: NetworkHostAndPort,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
): CordaFuture<I> {
return driverDSL.executorService.fork {
val client = RPCClient<I>(ArtemisTcpTransport.rpcConnectorTcpTransport(rpcAddress, null), configuration)
@ -356,7 +356,7 @@ data class RPCDriverDSL(
haAddressPool: List<NetworkHostAndPort>,
username: String = rpcTestUser.username,
password: String = rpcTestUser.password,
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
): CordaFuture<I> {
return driverDSL.executorService.fork {
val client = RPCClient<I>(haAddressPool, null, configuration)
@ -462,7 +462,7 @@ data class RPCDriverDSL(
fun <I : RPCOps> startRpcServerWithBrokerRunning(
rpcUser: User = rpcTestUser,
nodeLegalName: CordaX500Name = fakeNodeLegalName,
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
ops: I,
brokerHandle: RpcBrokerHandle
): RpcServerHandle {

View File

@ -91,9 +91,9 @@ object InteractiveShell {
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
rpcOps = { username: String, credentials: String ->
val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort,
configuration = object : CordaRPCClientConfiguration {
override val maxReconnectAttempts = 1
},
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 1
),
sslConfiguration = configuration.ssl,
classLoader = classLoader)
this.connection = client.start(username, credentials)
@ -109,9 +109,9 @@ object InteractiveShell {
fun startShellInternal(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
rpcOps = { username: String, credentials: String ->
val client = createCordaRPCClientWithInternalSslAndClassLoader(hostAndPort = configuration.hostAndPort,
configuration = object : CordaRPCClientConfiguration {
override val maxReconnectAttempts = 1
},
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
maxReconnectAttempts = 1
),
sslConfiguration = configuration.nodeSslConfig,
classLoader = classLoader)
this.connection = client.start(username, credentials)
@ -302,7 +302,7 @@ object InteractiveShell {
} catch (e: PermissionException) {
output.println(e.message ?: "Access denied", Color.red)
} catch (e: ExecutionException) {
// ignoring it as already logged by the progress handler subscriber
// ignoring it as already logged by the progress handler subscriber
} finally {
InputStreamDeserializer.closeAll()
}