mirror of
https://github.com/corda/corda.git
synced 2025-02-14 22:52:22 +00:00
Merge pull request #999 from corda/tlil-os-merge-20180613
OS->ENT merge 2018-06-13
This commit is contained in:
commit
f3fb03a455
@ -6527,27 +6527,22 @@ public static final class net.corda.client.rpc.CordaRPCClient$Companion extends
|
||||
@NotNull
|
||||
public final net.corda.client.rpc.CordaRPCClient createWithSsl(net.corda.core.utilities.NetworkHostAndPort, net.corda.core.messaging.ClientRpcSslOptions, net.corda.client.rpc.CordaRPCClientConfiguration)
|
||||
##
|
||||
public interface net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
public abstract int getCacheConcurrencyLevel()
|
||||
public class net.corda.client.rpc.CordaRPCClientConfiguration extends java.lang.Object
|
||||
public <init>(java.time.Duration)
|
||||
@NotNull
|
||||
public abstract java.time.Duration getConnectionMaxRetryInterval()
|
||||
public final java.time.Duration component1()
|
||||
@NotNull
|
||||
public abstract java.time.Duration getConnectionRetryInterval()
|
||||
public abstract double getConnectionRetryIntervalMultiplier()
|
||||
public final net.corda.client.rpc.CordaRPCClientConfiguration copy(java.time.Duration)
|
||||
public boolean equals(Object)
|
||||
@NotNull
|
||||
public abstract java.time.Duration getDeduplicationCacheExpiry()
|
||||
public abstract int getMaxFileSize()
|
||||
public abstract int getMaxReconnectAttempts()
|
||||
public abstract int getMinimumServerProtocolVersion()
|
||||
public abstract int getObservationExecutorPoolSize()
|
||||
@NotNull
|
||||
public abstract java.time.Duration getReapInterval()
|
||||
public abstract boolean getTrackRpcCallSites()
|
||||
public java.time.Duration getConnectionMaxRetryInterval()
|
||||
public int hashCode()
|
||||
public String toString()
|
||||
public static final net.corda.client.rpc.CordaRPCClientConfiguration$Companion Companion
|
||||
@NotNull
|
||||
public static final net.corda.client.rpc.CordaRPCClientConfiguration DEFAULT
|
||||
##
|
||||
public static final class net.corda.client.rpc.CordaRPCClientConfiguration$Companion extends java.lang.Object
|
||||
@NotNull
|
||||
public final net.corda.client.rpc.CordaRPCClientConfiguration default()
|
||||
##
|
||||
@DoNotImplement
|
||||
public final class net.corda.client.rpc.CordaRPCConnection extends java.lang.Object implements net.corda.client.rpc.RPCConnection
|
||||
|
@ -81,7 +81,7 @@ class NodeMonitorModel {
|
||||
// Only execute using "runLater()" if JavaFX been initialized.
|
||||
// It may not be initialized in the unit test.
|
||||
// Also if we are already in the JavaFX thread - perform direct invocation without postponing it.
|
||||
if(initialized.value.get() && !Platform.isFxApplicationThread()) {
|
||||
if (initialized.value.get() && !Platform.isFxApplicationThread()) {
|
||||
Platform.runLater(op)
|
||||
} else {
|
||||
op()
|
||||
@ -108,7 +108,7 @@ class NodeMonitorModel {
|
||||
|
||||
// Proxy may change during re-connect, ensure that subject wiring accurately reacts to this activity.
|
||||
proxyObservable.addListener { _, _, wrapper ->
|
||||
if(wrapper != null) {
|
||||
if (wrapper != null) {
|
||||
val proxy = wrapper.cordaRPCOps
|
||||
// Vault snapshot (force single page load with MAX_PAGE_SIZE) + updates
|
||||
val (statesSnapshot, vaultUpdates) = proxy.vaultTrackBy<ContractState>(QueryCriteria.VaultQueryCriteria(Vault.StateStatus.ALL),
|
||||
@ -144,7 +144,8 @@ class NodeMonitorModel {
|
||||
}
|
||||
val futureProgressTrackerUpdates = stateMachineUpdatesSubject.map { stateMachineUpdate ->
|
||||
if (stateMachineUpdate is StateMachineUpdate.Added) {
|
||||
ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo) ?: Observable.empty<ProgressTrackingEvent>()
|
||||
ProgressTrackingEvent.createStreamFromStateMachineInfo(stateMachineUpdate.stateMachineInfo)
|
||||
?: Observable.empty<ProgressTrackingEvent>()
|
||||
} else {
|
||||
Observable.empty<ProgressTrackingEvent>()
|
||||
}
|
||||
@ -193,29 +194,28 @@ class NodeMonitorModel {
|
||||
logger.info("Connecting to: $nodeHostAndPort")
|
||||
val client = CordaRPCClient(
|
||||
nodeHostAndPort,
|
||||
object : CordaRPCClientConfiguration {
|
||||
override val connectionMaxRetryInterval = retryInterval
|
||||
}
|
||||
CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
connectionMaxRetryInterval = retryInterval
|
||||
)
|
||||
)
|
||||
val _connection = client.start(username, password)
|
||||
// Check connection is truly operational before returning it.
|
||||
val nodeInfo = _connection.proxy.nodeInfo()
|
||||
require(nodeInfo.legalIdentitiesAndCerts.isNotEmpty())
|
||||
_connection
|
||||
} catch(secEx: ActiveMQException) {
|
||||
} catch (secEx: ActiveMQException) {
|
||||
// Happens when:
|
||||
// * incorrect credentials provided;
|
||||
// * incorrect endpoint specified;
|
||||
// - no point to retry connecting.
|
||||
throw secEx
|
||||
}
|
||||
catch(th: Throwable) {
|
||||
} catch (th: Throwable) {
|
||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||
logger.info("Exception upon establishing connection: " + th.message)
|
||||
null
|
||||
}
|
||||
|
||||
if(connection != null) {
|
||||
if (connection != null) {
|
||||
logger.info("Connection successfully established with: $nodeHostAndPort")
|
||||
return connection
|
||||
}
|
||||
|
@ -70,9 +70,9 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C
|
||||
override fun setUp() {
|
||||
super.setUp()
|
||||
node = startNode(ALICE_NAME, rpcUsers = listOf(rpcUser))
|
||||
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, object : CordaRPCClientConfiguration {
|
||||
override val maxReconnectAttempts = 5
|
||||
})
|
||||
client = CordaRPCClient(node.internals.configuration.rpcOptions.address!!, CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
maxReconnectAttempts = 5
|
||||
))
|
||||
identity = node.info.identityFromX500Name(ALICE_NAME)
|
||||
}
|
||||
|
||||
|
@ -11,7 +11,6 @@
|
||||
package net.corda.client.rpc
|
||||
|
||||
import net.corda.client.rpc.internal.RPCClient
|
||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.internal.concurrent.fork
|
||||
@ -116,7 +115,7 @@ class RPCStabilityTests {
|
||||
Try.on {
|
||||
startRpcClient<RPCOps>(
|
||||
server.get().broker.hostAndPort!!,
|
||||
configuration = CordaRPCClientConfigurationImpl.default.copy(minimumServerProtocolVersion = 1)
|
||||
configuration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = 1)
|
||||
).get()
|
||||
}
|
||||
}
|
||||
@ -139,7 +138,7 @@ class RPCStabilityTests {
|
||||
rpcDriver {
|
||||
fun startAndCloseServer(broker: RpcBrokerHandle) {
|
||||
startRpcServerWithBrokerRunning(
|
||||
configuration = RPCServerConfiguration.default,
|
||||
configuration = RPCServerConfiguration.DEFAULT,
|
||||
ops = DummyOps,
|
||||
brokerHandle = broker
|
||||
).rpcServer.close()
|
||||
@ -160,7 +159,7 @@ class RPCStabilityTests {
|
||||
@Test
|
||||
fun `rpc client close doesnt leak broker resources`() {
|
||||
rpcDriver {
|
||||
val server = startRpcServer(configuration = RPCServerConfiguration.default, ops = DummyOps).get()
|
||||
val server = startRpcServer(configuration = RPCServerConfiguration.DEFAULT, ops = DummyOps).get()
|
||||
RPCClient<RPCOps>(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close()
|
||||
val initial = server.broker.getStats()
|
||||
repeat(100) {
|
||||
@ -251,7 +250,7 @@ class RPCStabilityTests {
|
||||
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
||||
serverFollower.unfollow()
|
||||
// Set retry interval to 1s to reduce test duration
|
||||
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds)
|
||||
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds)
|
||||
val clientFollower = shutdownManager.follower()
|
||||
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
||||
clientFollower.unfollow()
|
||||
@ -276,7 +275,7 @@ class RPCStabilityTests {
|
||||
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
||||
serverFollower.unfollow()
|
||||
// Set retry interval to 1s to reduce test duration
|
||||
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
|
||||
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
|
||||
val clientFollower = shutdownManager.follower()
|
||||
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
||||
clientFollower.unfollow()
|
||||
@ -308,7 +307,7 @@ class RPCStabilityTests {
|
||||
val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
||||
serverFollower.unfollow()
|
||||
|
||||
val clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
|
||||
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
|
||||
val clientFollower = shutdownManager.follower()
|
||||
val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
||||
clientFollower.unfollow()
|
||||
@ -463,7 +462,7 @@ class RPCStabilityTests {
|
||||
}
|
||||
}
|
||||
val server = startRpcServer<TrackSubscriberOps>(
|
||||
configuration = RPCServerConfiguration.default.copy(
|
||||
configuration = RPCServerConfiguration.DEFAULT.copy(
|
||||
reapInterval = 100.millis
|
||||
),
|
||||
ops = trackSubscriberOpsImpl
|
||||
|
@ -10,7 +10,6 @@
|
||||
|
||||
package net.corda.client.rpc
|
||||
|
||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||
import net.corda.client.rpc.internal.RPCClient
|
||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.core.context.Actor
|
||||
@ -20,6 +19,9 @@ import net.corda.core.serialization.internal.effectiveSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
|
||||
import net.corda.core.messaging.ClientRpcSslOptions
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
|
||||
import java.time.Duration
|
||||
@ -34,46 +36,163 @@ class CordaRPCConnection internal constructor(connection: RPCConnection<CordaRPC
|
||||
/**
|
||||
* Can be used to configure the RPC client connection.
|
||||
*/
|
||||
interface CordaRPCClientConfiguration {
|
||||
open class CordaRPCClientConfiguration @JvmOverloads constructor(
|
||||
|
||||
/** The minimum protocol version required from the server */
|
||||
val minimumServerProtocolVersion: Int get() = default().minimumServerProtocolVersion
|
||||
/**
|
||||
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
|
||||
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
|
||||
* constructing call stacks is a moderately expensive operation.
|
||||
*/
|
||||
val trackRpcCallSites: Boolean get() = default().trackRpcCallSites
|
||||
/**
|
||||
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
|
||||
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
|
||||
* duration. If set too low it wastes client side cycles.
|
||||
*/
|
||||
val reapInterval: Duration get() = default().reapInterval
|
||||
/** The number of threads to use for observations (for executing [Observable.onNext]) */
|
||||
val observationExecutorPoolSize: Int get() = default().observationExecutorPoolSize
|
||||
/**
|
||||
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
|
||||
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
|
||||
* See the implementation of [com.google.common.cache.LocalCache] for details.
|
||||
*/
|
||||
val cacheConcurrencyLevel: Int get() = default().cacheConcurrencyLevel
|
||||
/** The retry interval of artemis connections in milliseconds */
|
||||
val connectionRetryInterval: Duration get() = default().connectionRetryInterval
|
||||
/** The retry interval multiplier for exponential backoff */
|
||||
val connectionRetryIntervalMultiplier: Double get() = default().connectionRetryIntervalMultiplier
|
||||
/** Maximum retry interval */
|
||||
val connectionMaxRetryInterval: Duration get() = default().connectionMaxRetryInterval
|
||||
/** Maximum reconnect attempts on failover */
|
||||
val maxReconnectAttempts: Int get() = default().maxReconnectAttempts
|
||||
/** Maximum file size */
|
||||
val maxFileSize: Int get() = default().maxFileSize
|
||||
/** The cache expiry of a deduplication watermark per client. */
|
||||
val deduplicationCacheExpiry: Duration get() = default().deduplicationCacheExpiry
|
||||
/**
|
||||
* Maximum retry interval.
|
||||
*/
|
||||
open val connectionMaxRetryInterval: Duration = 3.minutes,
|
||||
|
||||
/**
|
||||
* The minimum protocol version required from the server.
|
||||
*/
|
||||
open val minimumServerProtocolVersion: Int = 0,
|
||||
|
||||
/**
|
||||
* If set to true the client will track RPC call sites. If an error occurs subsequently during the RPC or in a
|
||||
* returned Observable stream the stack trace of the originating RPC will be shown as well. Note that
|
||||
* constructing call stacks is a moderately expensive operation.
|
||||
*/
|
||||
open val trackRpcCallSites: Boolean = false,
|
||||
|
||||
/**
|
||||
* The interval of unused observable reaping. Leaked Observables (unused ones) are detected using weak references
|
||||
* and are cleaned up in batches in this interval. If set too large it will waste server side resources for this
|
||||
* duration. If set too low it wastes client side cycles.
|
||||
*/
|
||||
open val reapInterval: Duration = 1.seconds,
|
||||
|
||||
/**
|
||||
* The number of threads to use for observations (for executing [Observable.onNext]).
|
||||
*/
|
||||
open val observationExecutorPoolSize: Int = 4,
|
||||
|
||||
/**
|
||||
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
|
||||
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
|
||||
* See the implementation of [com.google.common.cache.LocalCache] for details.
|
||||
*/
|
||||
open val cacheConcurrencyLevel: Int = 1,
|
||||
|
||||
/**
|
||||
* The retry interval of Artemis connections in milliseconds.
|
||||
*/
|
||||
open val connectionRetryInterval: Duration = 5.seconds,
|
||||
|
||||
/**
|
||||
* The retry interval multiplier for exponential backoff.
|
||||
*/
|
||||
open val connectionRetryIntervalMultiplier: Double = 1.5,
|
||||
|
||||
/**
|
||||
* Maximum reconnect attempts on failover>
|
||||
*/
|
||||
open val maxReconnectAttempts: Int = unlimitedReconnectAttempts,
|
||||
|
||||
/**
|
||||
* Maximum file size, in bytes.
|
||||
*/
|
||||
open val maxFileSize: Int = 10485760,
|
||||
// 10 MiB maximum allowed file size for attachments, including message headers.
|
||||
// TODO: acquire this value from Network Map when supported.
|
||||
|
||||
/**
|
||||
* The cache expiry of a deduplication watermark per client.
|
||||
*/
|
||||
open val deduplicationCacheExpiry: Duration = 1.days
|
||||
|
||||
) {
|
||||
|
||||
companion object {
|
||||
fun default(): CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default
|
||||
|
||||
private const val unlimitedReconnectAttempts = -1
|
||||
|
||||
@JvmField
|
||||
val DEFAULT: CordaRPCClientConfiguration = CordaRPCClientConfiguration()
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new copy of a configuration object with zero or more parameters modified.
|
||||
*/
|
||||
@JvmOverloads
|
||||
fun copy(
|
||||
connectionMaxRetryInterval: Duration = this.connectionMaxRetryInterval,
|
||||
minimumServerProtocolVersion: Int = this.minimumServerProtocolVersion,
|
||||
trackRpcCallSites: Boolean = this.trackRpcCallSites,
|
||||
reapInterval: Duration = this.reapInterval,
|
||||
observationExecutorPoolSize: Int = this.observationExecutorPoolSize,
|
||||
cacheConcurrencyLevel: Int = this.cacheConcurrencyLevel,
|
||||
connectionRetryInterval: Duration = this.connectionRetryInterval,
|
||||
connectionRetryIntervalMultiplier: Double = this.connectionRetryIntervalMultiplier,
|
||||
maxReconnectAttempts: Int = this.maxReconnectAttempts,
|
||||
maxFileSize: Int = this.maxFileSize,
|
||||
deduplicationCacheExpiry: Duration = this.deduplicationCacheExpiry
|
||||
): CordaRPCClientConfiguration {
|
||||
return CordaRPCClientConfiguration(
|
||||
connectionMaxRetryInterval,
|
||||
minimumServerProtocolVersion,
|
||||
trackRpcCallSites,
|
||||
reapInterval,
|
||||
observationExecutorPoolSize,
|
||||
cacheConcurrencyLevel,
|
||||
connectionRetryInterval,
|
||||
connectionRetryIntervalMultiplier,
|
||||
maxReconnectAttempts,
|
||||
maxFileSize,
|
||||
deduplicationCacheExpiry
|
||||
)
|
||||
}
|
||||
|
||||
override fun equals(other: Any?): Boolean {
|
||||
if (this === other) return true
|
||||
if (javaClass != other?.javaClass) return false
|
||||
|
||||
other as CordaRPCClientConfiguration
|
||||
if (connectionMaxRetryInterval != other.connectionMaxRetryInterval) return false
|
||||
if (minimumServerProtocolVersion != other.minimumServerProtocolVersion) return false
|
||||
if (trackRpcCallSites != other.trackRpcCallSites) return false
|
||||
if (reapInterval != other.reapInterval) return false
|
||||
if (observationExecutorPoolSize != other.observationExecutorPoolSize) return false
|
||||
if (cacheConcurrencyLevel != other.cacheConcurrencyLevel) return false
|
||||
if (connectionRetryInterval != other.connectionRetryInterval) return false
|
||||
if (connectionRetryIntervalMultiplier != other.connectionRetryIntervalMultiplier) return false
|
||||
if (maxReconnectAttempts != other.maxReconnectAttempts) return false
|
||||
if (maxFileSize != other.maxFileSize) return false
|
||||
if (deduplicationCacheExpiry != other.deduplicationCacheExpiry) return false
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
override fun hashCode(): Int {
|
||||
var result = minimumServerProtocolVersion
|
||||
result = 31 * result + connectionMaxRetryInterval.hashCode()
|
||||
result = 31 * result + trackRpcCallSites.hashCode()
|
||||
result = 31 * result + reapInterval.hashCode()
|
||||
result = 31 * result + observationExecutorPoolSize
|
||||
result = 31 * result + cacheConcurrencyLevel
|
||||
result = 31 * result + connectionRetryInterval.hashCode()
|
||||
result = 31 * result + connectionRetryIntervalMultiplier.hashCode()
|
||||
result = 31 * result + maxReconnectAttempts
|
||||
result = 31 * result + maxFileSize
|
||||
result = 31 * result + deduplicationCacheExpiry.hashCode()
|
||||
return result
|
||||
}
|
||||
|
||||
override fun toString(): String {
|
||||
return "CordaRPCClientConfiguration(" +
|
||||
"connectionMaxRetryInterval=$connectionMaxRetryInterval, " +
|
||||
"minimumServerProtocolVersion=$minimumServerProtocolVersion, trackRpcCallSites=$trackRpcCallSites, " +
|
||||
"reapInterval=$reapInterval, observationExecutorPoolSize=$observationExecutorPoolSize, " +
|
||||
"cacheConcurrencyLevel=$cacheConcurrencyLevel, connectionRetryInterval=$connectionRetryInterval, " +
|
||||
"connectionRetryIntervalMultiplier=$connectionRetryIntervalMultiplier, " +
|
||||
"maxReconnectAttempts=$maxReconnectAttempts, maxFileSize=$maxFileSize, " +
|
||||
"deduplicationCacheExpiry=$deduplicationCacheExpiry)"
|
||||
}
|
||||
|
||||
// Left is for backwards compatibility with version 3.1
|
||||
operator fun component1() = connectionMaxRetryInterval
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
@ -115,7 +234,7 @@ interface CordaRPCClientConfiguration {
|
||||
*/
|
||||
class CordaRPCClient private constructor(
|
||||
private val hostAndPort: NetworkHostAndPort,
|
||||
private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
||||
private val configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
private val sslConfiguration: ClientRpcSslOptions? = null,
|
||||
private val nodeSslConfiguration: SSLConfiguration? = null,
|
||||
private val classLoader: ClassLoader? = null,
|
||||
@ -124,7 +243,7 @@ class CordaRPCClient private constructor(
|
||||
) {
|
||||
@JvmOverloads
|
||||
constructor(hostAndPort: NetworkHostAndPort,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default())
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT)
|
||||
: this(hostAndPort, configuration, null)
|
||||
|
||||
/**
|
||||
@ -134,13 +253,13 @@ class CordaRPCClient private constructor(
|
||||
* @param configuration An optional configuration used to tweak client behaviour.
|
||||
*/
|
||||
@JvmOverloads
|
||||
constructor(haAddressPool: List<NetworkHostAndPort>, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool)
|
||||
constructor(haAddressPool: List<NetworkHostAndPort>, configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT) : this(haAddressPool.first(), configuration, null, null, null, haAddressPool)
|
||||
|
||||
companion object {
|
||||
fun createWithSsl(
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
sslConfiguration: ClientRpcSslOptions,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||
): CordaRPCClient {
|
||||
return CordaRPCClient(hostAndPort, configuration, sslConfiguration)
|
||||
}
|
||||
@ -148,14 +267,14 @@ class CordaRPCClient private constructor(
|
||||
fun createWithSsl(
|
||||
haAddressPool: List<NetworkHostAndPort>,
|
||||
sslConfiguration: ClientRpcSslOptions,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default()
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||
): CordaRPCClient {
|
||||
return CordaRPCClient(haAddressPool.first(), configuration, sslConfiguration, haAddressPool = haAddressPool)
|
||||
}
|
||||
|
||||
internal fun createWithSslAndClassLoader(
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null
|
||||
): CordaRPCClient {
|
||||
@ -164,7 +283,7 @@ class CordaRPCClient private constructor(
|
||||
|
||||
internal fun createWithInternalSslAndClassLoader(
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
sslConfiguration: SSLConfiguration?,
|
||||
classLoader: ClassLoader? = null
|
||||
): CordaRPCClient {
|
||||
@ -173,7 +292,7 @@ class CordaRPCClient private constructor(
|
||||
|
||||
internal fun createWithSslAndClassLoader(
|
||||
haAddressPool: List<NetworkHostAndPort>,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null
|
||||
): CordaRPCClient {
|
||||
|
@ -22,21 +22,21 @@ import rx.Observable
|
||||
/** Utility which exposes the internal Corda RPC constructor to other internal Corda components */
|
||||
fun createCordaRPCClientWithSslAndClassLoader(
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null
|
||||
) = CordaRPCClient.createWithSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
|
||||
|
||||
fun createCordaRPCClientWithInternalSslAndClassLoader(
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
sslConfiguration: SSLConfiguration? = null,
|
||||
classLoader: ClassLoader? = null
|
||||
) = CordaRPCClient.createWithInternalSslAndClassLoader(hostAndPort, configuration, sslConfiguration, classLoader)
|
||||
|
||||
fun createCordaRPCClientWithSslAndClassLoader(
|
||||
haAddressPool: List<NetworkHostAndPort>,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.default(),
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
classLoader: ClassLoader? = null
|
||||
) = CordaRPCClient.createWithSslAndClassLoader(haAddressPool, configuration, sslConfiguration, classLoader)
|
||||
|
@ -33,69 +33,34 @@ import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import java.lang.reflect.Proxy
|
||||
import java.time.Duration
|
||||
|
||||
/**
|
||||
* This configuration may be used to tweak the internals of the RPC client.
|
||||
*/
|
||||
data class CordaRPCClientConfigurationImpl(
|
||||
override val minimumServerProtocolVersion: Int,
|
||||
override val trackRpcCallSites: Boolean,
|
||||
override val reapInterval: Duration,
|
||||
override val observationExecutorPoolSize: Int,
|
||||
override val connectionRetryInterval: Duration,
|
||||
override val connectionRetryIntervalMultiplier: Double,
|
||||
override val connectionMaxRetryInterval: Duration,
|
||||
override val maxReconnectAttempts: Int,
|
||||
override val maxFileSize: Int,
|
||||
override val deduplicationCacheExpiry: Duration
|
||||
) : CordaRPCClientConfiguration {
|
||||
companion object {
|
||||
private const val unlimitedReconnectAttempts = -1
|
||||
@JvmStatic
|
||||
val default = CordaRPCClientConfigurationImpl(
|
||||
minimumServerProtocolVersion = 0,
|
||||
trackRpcCallSites = false,
|
||||
reapInterval = 1.seconds,
|
||||
observationExecutorPoolSize = 4,
|
||||
connectionRetryInterval = 5.seconds,
|
||||
connectionRetryIntervalMultiplier = 1.5,
|
||||
connectionMaxRetryInterval = 3.minutes,
|
||||
maxReconnectAttempts = unlimitedReconnectAttempts,
|
||||
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
|
||||
maxFileSize = 10485760,
|
||||
deduplicationCacheExpiry = 1.days
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This runs on the client JVM
|
||||
*/
|
||||
class RPCClient<I : RPCOps>(
|
||||
val transport: TransportConfiguration,
|
||||
val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
|
||||
val rpcConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
val serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT,
|
||||
val haPoolTransportConfigurations: List<TransportConfiguration> = emptyList()
|
||||
) {
|
||||
constructor(
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
|
||||
) : this(rpcConnectorTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext)
|
||||
|
||||
constructor(
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
sslConfiguration: SSLConfiguration,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
|
||||
) : this(rpcInternalClientTcpTransport(hostAndPort, sslConfiguration), configuration, serializationContext)
|
||||
|
||||
constructor(
|
||||
haAddressPool: List<NetworkHostAndPort>,
|
||||
sslConfiguration: ClientRpcSslOptions? = null,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfigurationImpl.default,
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
serializationContext: SerializationContext = SerializationDefaults.RPC_CLIENT_CONTEXT
|
||||
) : this(rpcConnectorTcpTransport(haAddressPool.first(), sslConfiguration),
|
||||
configuration, serializationContext, rpcConnectorTcpTransportsFromList(haAddressPool, sslConfiguration))
|
||||
|
@ -10,7 +10,6 @@
|
||||
|
||||
package net.corda.client.rpc
|
||||
|
||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||
import net.corda.core.internal.concurrent.flatMap
|
||||
import net.corda.core.internal.concurrent.map
|
||||
import net.corda.core.messaging.RPCOps
|
||||
@ -54,8 +53,8 @@ open class AbstractRPCTest {
|
||||
inline fun <reified I : RPCOps> RPCDriverDSL.testProxy(
|
||||
ops: I,
|
||||
rpcUser: User = rpcTestUser,
|
||||
clientConfiguration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default,
|
||||
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.default
|
||||
clientConfiguration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT,
|
||||
serverConfiguration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT
|
||||
): TestProxy<I> {
|
||||
return when (mode) {
|
||||
RPCTestMode.InVm ->
|
||||
|
@ -10,7 +10,6 @@
|
||||
|
||||
package net.corda.client.rpc
|
||||
|
||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||
import net.corda.core.crypto.random63BitValue
|
||||
import net.corda.core.internal.concurrent.fork
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
@ -100,10 +99,10 @@ class RPCConcurrencyTests : AbstractRPCTest() {
|
||||
private fun RPCDriverDSL.testProxy(): TestProxy<TestOps> {
|
||||
return testProxy<TestOps>(
|
||||
TestOpsImpl(pool),
|
||||
clientConfiguration = CordaRPCClientConfigurationImpl.default.copy(
|
||||
clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
reapInterval = 100.millis
|
||||
),
|
||||
serverConfiguration = RPCServerConfiguration.default.copy(
|
||||
serverConfiguration = RPCServerConfiguration.DEFAULT.copy(
|
||||
rpcThreadPoolSize = 4
|
||||
)
|
||||
)
|
||||
|
@ -11,7 +11,6 @@
|
||||
package net.corda.client.rpc
|
||||
|
||||
import com.google.common.base.Stopwatch
|
||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||
import net.corda.core.internal.concurrent.doneFuture
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.utilities.minutes
|
||||
@ -53,7 +52,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
||||
}
|
||||
|
||||
private fun RPCDriverDSL.testProxy(
|
||||
clientConfiguration: CordaRPCClientConfigurationImpl,
|
||||
clientConfiguration: CordaRPCClientConfiguration,
|
||||
serverConfiguration: RPCServerConfiguration
|
||||
): TestProxy<TestOps> {
|
||||
return testProxy<TestOps>(
|
||||
@ -66,8 +65,8 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
||||
private fun warmup() {
|
||||
rpcDriver {
|
||||
val proxy = testProxy(
|
||||
CordaRPCClientConfigurationImpl.default,
|
||||
RPCServerConfiguration.default
|
||||
CordaRPCClientConfiguration.DEFAULT,
|
||||
RPCServerConfiguration.DEFAULT
|
||||
)
|
||||
val executor = Executors.newFixedThreadPool(4)
|
||||
val N = 10000
|
||||
@ -96,10 +95,10 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
||||
measure(inputOutputSizes, (1..5)) { inputOutputSize, _ ->
|
||||
rpcDriver {
|
||||
val proxy = testProxy(
|
||||
CordaRPCClientConfigurationImpl.default.copy(
|
||||
CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
observationExecutorPoolSize = 2
|
||||
),
|
||||
RPCServerConfiguration.default.copy(
|
||||
RPCServerConfiguration.DEFAULT.copy(
|
||||
rpcThreadPoolSize = 8
|
||||
)
|
||||
)
|
||||
@ -135,10 +134,10 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
||||
rpcDriver {
|
||||
val metricRegistry = startReporter(shutdownManager)
|
||||
val proxy = testProxy(
|
||||
CordaRPCClientConfigurationImpl.default.copy(
|
||||
CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
reapInterval = 1.seconds
|
||||
),
|
||||
RPCServerConfiguration.default.copy(
|
||||
RPCServerConfiguration.DEFAULT.copy(
|
||||
rpcThreadPoolSize = 8
|
||||
)
|
||||
)
|
||||
@ -168,8 +167,8 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
||||
// TODO this hangs with more parallelism
|
||||
rpcDriver {
|
||||
val proxy = testProxy(
|
||||
CordaRPCClientConfigurationImpl.default,
|
||||
RPCServerConfiguration.default
|
||||
CordaRPCClientConfiguration.DEFAULT,
|
||||
RPCServerConfiguration.DEFAULT
|
||||
)
|
||||
val numberOfMessages = 1000
|
||||
val bigSize = 10_000_000
|
||||
|
@ -8,6 +8,11 @@ Unreleased
|
||||
==========
|
||||
* Introduced a hierarchy of ``DatabaseMigrationException``s, allowing ``NodeStartup`` to gracefully inform users of problems related to database migrations before exiting with a non-zero code.
|
||||
|
||||
* H2 database changes:
|
||||
* The node's H2 database now listens on ``localhost`` by default.
|
||||
* The database server address must also be enabled in the node configuration.
|
||||
* A new ``h2Settings`` configuration block supercedes the ``h2Port`` option.
|
||||
|
||||
* Improved documentation PDF quality. Building the documentation now requires ``LaTex`` to be installed on the OS.
|
||||
|
||||
* Add ``devModeOptions.allowCompatibilityZone`` to re-enable the use of a compatibility zone and ``devMode``
|
||||
@ -48,6 +53,8 @@ Unreleased
|
||||
* The node's configuration is only printed on startup if ``devMode`` is ``true``, avoiding the risk of printing passwords
|
||||
in a production setup.
|
||||
|
||||
* ``NodeStartup`` will now only print node's configuration if ``devMode`` is ``true``, avoiding the risk of printing passwords in a production setup.
|
||||
|
||||
* SLF4J's MDC will now only be printed to the console if not empty. No more log lines ending with "{}".
|
||||
|
||||
* ``WireTransaction.Companion.createComponentGroups`` has been marked as ``@CordaInternal``. It was never intended to be
|
||||
|
@ -143,8 +143,26 @@ The webserver JAR will be copied into the node's ``build`` folder with the name
|
||||
The Dockerform task
|
||||
-------------------
|
||||
|
||||
The ``Dockerform`` is a sister task of ``Cordform``. It has nearly the same syntax and produces very
|
||||
similar results - enhanced by an extra file to enable easy spin up of nodes using ``docker-compose``.
|
||||
The ``Dockerform`` is a sister task of ``Cordform`` that provides an extra file allowing you to easily spin up
|
||||
nodes using ``docker-compose``. It supports the following configuration options for each node:
|
||||
|
||||
* ``name``
|
||||
* ``notary``
|
||||
* ``cordapps``
|
||||
* ``rpcUsers``
|
||||
* ``useTestClock``
|
||||
|
||||
There is no need to specify the nodes' ports, as every node has a separate container, so no ports conflict will occur.
|
||||
Every node will expose port ``10003`` for RPC connections.
|
||||
|
||||
The nodes' webservers will not be started. Instead, you should interact with each node via its shell over SSH
|
||||
(see the :doc:`node configuration options <corda-configuration-file>`). You have to enable the shell by adding the
|
||||
following line to each node's ``node.conf`` file:
|
||||
|
||||
``sshd { port = 2222 }``
|
||||
|
||||
Where ``2222`` is the port you want to open to SSH into the shell.
|
||||
|
||||
Below you can find the example task from the `IRS Demo <https://github.com/corda/corda/blob/release-V3.0/samples/irs-demo/cordapp/build.gradle#L111>`_ included in the samples directory of main Corda GitHub repository:
|
||||
|
||||
.. sourcecode:: groovy
|
||||
@ -194,13 +212,6 @@ Below you can find the example task from the `IRS Demo <https://github.com/corda
|
||||
}
|
||||
}
|
||||
|
||||
There is no need to specify the ports, as every node is a separated container, so no ports conflict will occur. Every
|
||||
node by default will expose port 10003 which is the default port for RPC connections.
|
||||
|
||||
.. warning:: The node webserver is not supported by this task!
|
||||
|
||||
.. warning:: Nodes are run without the local shell enabled!
|
||||
|
||||
Running the Cordform/Dockerform tasks
|
||||
-------------------------------------
|
||||
To create the nodes defined in our ``deployNodes`` task, run the following command in a terminal window from the root
|
||||
|
@ -59,11 +59,10 @@ Node can be configured to run SSH server. See :doc:`shell` for details.
|
||||
|
||||
Database access
|
||||
---------------
|
||||
|
||||
The node exposes its internal database over a socket which can be browsed using any tool that can use JDBC drivers.
|
||||
The node can be configured to expose its internal database over socket which can be browsed using any tool that can use JDBC drivers.
|
||||
The JDBC URL is printed during node startup to the log and will typically look like this:
|
||||
|
||||
``jdbc:h2:tcp://192.168.0.31:31339/node``
|
||||
``jdbc:h2:tcp://localhost:31339/node``
|
||||
|
||||
The username and password can be altered in the :doc:`corda-configuration-file` but default to username "sa" and a blank
|
||||
password.
|
||||
@ -72,7 +71,18 @@ Any database browsing tool that supports JDBC can be used, but if you have Intel
|
||||
a tool integrated with your IDE. Just open the database window and add an H2 data source with the above details.
|
||||
You will now be able to browse the tables and row data within them.
|
||||
|
||||
.. _jolokia_ref:
|
||||
By default the node will expose its database on the localhost network interface. This behaviour can be
|
||||
overridden by specifying the full network address (interface and port), using the new h2Settings
|
||||
syntax in the node configuration:
|
||||
|
||||
.. sourcecode:: groovy
|
||||
h2Settings {
|
||||
address: "localhost:12345"
|
||||
}
|
||||
|
||||
The configuration above will restrict the H2 service to run on localhost. If remote access is required, the address
|
||||
can be changed to 0.0.0.0. However it is recommended to change the default username and password
|
||||
before doing so.
|
||||
|
||||
Monitoring your node
|
||||
--------------------
|
||||
|
@ -6,6 +6,13 @@ Default in-memory database
|
||||
By default, nodes store their data in an H2 database. You can connect directly to a running node's database to see its
|
||||
stored states, transactions and attachments as follows:
|
||||
|
||||
* Enable the H2 database access in the node configuration using the following syntax:
|
||||
|
||||
.. sourcecode:: groovy
|
||||
h2Settings {
|
||||
address: "localhost:0"
|
||||
}
|
||||
|
||||
* Download the **last stable** `h2 platform-independent zip <http://www.h2database.com/html/download.html>`_, unzip the zip, and
|
||||
navigate in a terminal window to the unzipped folder
|
||||
* Change directories to the bin folder: ``cd h2/bin``
|
||||
@ -25,6 +32,10 @@ stored states, transactions and attachments as follows:
|
||||
You will be presented with a web interface that shows the contents of your node's storage and vault, and provides an
|
||||
interface for you to query them using SQL.
|
||||
|
||||
The default behaviour is to expose the H2 database on localhost. This can be overridden in the
|
||||
node configuration using ``h2Settings.address`` and specifying the address of the network interface to listen on,
|
||||
or simply using ``0.0.0.0:0`` to listen on all interfaces.
|
||||
|
||||
.. _standalone_database_config_examples_ref:
|
||||
|
||||
Standalone database
|
||||
|
@ -200,9 +200,9 @@ class Node(
|
||||
val user = config.users.first()
|
||||
val address = config.nodeInterface
|
||||
val targetHost = NetworkHostAndPort(address.host, address.rpcPort)
|
||||
val config = object : CordaRPCClientConfiguration {
|
||||
override val connectionMaxRetryInterval = 10.seconds
|
||||
}
|
||||
val config = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
connectionMaxRetryInterval = 10.seconds
|
||||
)
|
||||
log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...")
|
||||
CordaRPCClient(targetHost, config).use(user.username, user.password) {
|
||||
log.info("RPC connection to ${targetHost.host}:${targetHost.port} established")
|
||||
|
@ -24,9 +24,9 @@ abstract class AbstractScenarioRunner(options: OptionSet) {
|
||||
val retryInterval = 5.seconds
|
||||
|
||||
val client = CordaRPCClient(endpoint,
|
||||
object : CordaRPCClientConfiguration {
|
||||
override val connectionMaxRetryInterval = retryInterval
|
||||
}
|
||||
CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
connectionMaxRetryInterval = retryInterval
|
||||
)
|
||||
)
|
||||
val connection = client.start(user, password)
|
||||
return connection.proxy
|
||||
|
@ -224,7 +224,7 @@ open class Node(configuration: NodeConfiguration,
|
||||
|
||||
printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString())
|
||||
|
||||
val rpcServerConfiguration = RPCServerConfiguration.default.copy(
|
||||
val rpcServerConfiguration = RPCServerConfiguration.DEFAULT.copy(
|
||||
rpcThreadPoolSize = configuration.enterpriseConfiguration.tuning.rpcThreadPoolSize
|
||||
)
|
||||
rpcServerAddresses?.let {
|
||||
@ -362,15 +362,19 @@ open class Node(configuration: NodeConfiguration,
|
||||
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?): CordaPersistence {
|
||||
val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url")
|
||||
val h2Prefix = "jdbc:h2:file:"
|
||||
|
||||
if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) {
|
||||
val h2Port = databaseUrl.substringAfter(";AUTO_SERVER_PORT=", "").substringBefore(';')
|
||||
if (h2Port.isNotBlank()) {
|
||||
val effectiveH2Settings = configuration.effectiveH2Settings
|
||||
|
||||
if(effectiveH2Settings != null && effectiveH2Settings.address != null) {
|
||||
val databaseName = databaseUrl.removePrefix(h2Prefix).substringBefore(';')
|
||||
val server = org.h2.tools.Server.createTcpServer(
|
||||
"-tcpPort", h2Port,
|
||||
"-tcpPort", effectiveH2Settings.address.port.toString(),
|
||||
"-tcpAllowOthers",
|
||||
"-tcpDaemon",
|
||||
"-key", "node", databaseName)
|
||||
// override interface that createTcpServer listens on (which is always 0.0.0.0)
|
||||
System.setProperty("h2.bindAddress", effectiveH2Settings.address.host)
|
||||
runOnStop += server::stop
|
||||
val url = server.start().url
|
||||
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
|
||||
|
@ -76,7 +76,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
val extraNetworkMapKeys: List<UUID>
|
||||
val tlsCertCrlDistPoint: URL?
|
||||
val tlsCertCrlIssuer: String?
|
||||
|
||||
val effectiveH2Settings: NodeH2Settings?
|
||||
fun validate(): List<String>
|
||||
|
||||
companion object {
|
||||
@ -249,12 +249,14 @@ data class NodeConfigurationImpl(
|
||||
override val graphiteOptions: GraphiteOptions? = null,
|
||||
override val extraNetworkMapKeys: List<UUID> = emptyList(),
|
||||
// do not use or remove (breaks DemoBench together with rejection of unknown configuration keys during parsing)
|
||||
private val h2port: Int = 0,
|
||||
private val h2port: Int? = null,
|
||||
private val h2Settings: NodeH2Settings? = null,
|
||||
// do not use or remove (used by Capsule)
|
||||
private val jarDirs: List<String> = emptyList()
|
||||
) : NodeConfiguration {
|
||||
companion object {
|
||||
private val logger = loggerFor<NodeConfigurationImpl>()
|
||||
|
||||
}
|
||||
|
||||
override val rpcOptions: NodeRpcOptions = initialiseRpcOptions(rpcAddress, rpcSettings, BrokerRpcSslOptions(baseDirectory / "certificates" / "nodekeystore.jks", keyStorePassword))
|
||||
@ -274,6 +276,7 @@ data class NodeConfigurationImpl(
|
||||
}.asOptions(fallbackSslOptions)
|
||||
}
|
||||
|
||||
|
||||
private fun validateTlsCertCrlConfig(): List<String> {
|
||||
val errors = mutableListOf<String>()
|
||||
if (tlsCertCrlIssuer != null) {
|
||||
@ -298,6 +301,15 @@ data class NodeConfigurationImpl(
|
||||
errors += validateRpcOptions(rpcOptions)
|
||||
errors += validateTlsCertCrlConfig()
|
||||
errors += validateNetworkServices()
|
||||
errors += validateH2Settings()
|
||||
return errors
|
||||
}
|
||||
|
||||
private fun validateH2Settings(): List<String> {
|
||||
val errors = mutableListOf<String>()
|
||||
if (h2port != null && h2Settings != null) {
|
||||
errors += "Cannot specify both 'h2port' and 'h2Settings' in configuration"
|
||||
}
|
||||
return errors
|
||||
}
|
||||
|
||||
@ -345,6 +357,11 @@ data class NodeConfigurationImpl(
|
||||
override val attachmentContentCacheSizeBytes: Long
|
||||
get() = attachmentContentCacheSizeMegaBytes?.MB ?: super.attachmentContentCacheSizeBytes
|
||||
|
||||
override val effectiveH2Settings: NodeH2Settings?
|
||||
get() = when {
|
||||
h2port != null -> NodeH2Settings(address = NetworkHostAndPort(host="localhost", port=h2port))
|
||||
else -> h2Settings
|
||||
}
|
||||
|
||||
init {
|
||||
// This is a sanity feature do not remove.
|
||||
@ -387,9 +404,12 @@ data class NodeConfigurationImpl(
|
||||
if (compatibilityZoneURL != null && networkServices == null) {
|
||||
networkServices = NetworkServicesConfig(compatibilityZoneURL, compatibilityZoneURL, true)
|
||||
}
|
||||
require(h2port == null || h2Settings == null) { "Cannot specify both 'h2port' and 'h2Settings' in configuration" }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
data class NodeRpcSettings(
|
||||
val address: NetworkHostAndPort?,
|
||||
val adminAddress: NetworkHostAndPort?,
|
||||
@ -412,6 +432,10 @@ data class NodeRpcSettings(
|
||||
}
|
||||
}
|
||||
|
||||
data class NodeH2Settings(
|
||||
val address: NetworkHostAndPort?
|
||||
)
|
||||
|
||||
enum class VerifierType {
|
||||
InMemory,
|
||||
OutOfProcess
|
||||
|
@ -81,7 +81,7 @@ data class RPCServerConfiguration(
|
||||
val deduplicationCacheExpiry: Duration
|
||||
) {
|
||||
companion object {
|
||||
val default = RPCServerConfiguration(
|
||||
val DEFAULT = RPCServerConfiguration(
|
||||
rpcThreadPoolSize = 4,
|
||||
reapInterval = 1.seconds,
|
||||
deduplicationCacheExpiry = 1.days
|
||||
|
@ -14,7 +14,7 @@ crlCheckSoftFail = true
|
||||
lazyBridgeStart = true
|
||||
dataSourceProperties = {
|
||||
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
|
||||
dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;WRITE_DELAY=0;LOCK_TIMEOUT=10000;AUTO_SERVER_PORT="${h2port}
|
||||
dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;WRITE_DELAY=0;LOCK_TIMEOUT=10000"
|
||||
dataSource.user = sa
|
||||
dataSource.password = ""
|
||||
}
|
||||
@ -22,7 +22,7 @@ database = {
|
||||
transactionIsolationLevel = "REPEATABLE_READ"
|
||||
exportHibernateJMXStatistics = "false"
|
||||
}
|
||||
h2port = 0
|
||||
|
||||
useTestClock = false
|
||||
verifierType = InMemory
|
||||
enterpriseConfiguration = {
|
||||
@ -46,6 +46,6 @@ rpcSettings = {
|
||||
}
|
||||
flowTimeout {
|
||||
timeout = 30 seconds
|
||||
maxRestartCount = 3
|
||||
backoffBase = 2.0
|
||||
maxRestartCount = 5
|
||||
backoffBase = 1.8
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ class ArtemisRpcTests {
|
||||
}
|
||||
artemisBroker.use { broker ->
|
||||
broker.start()
|
||||
InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.default).use { server ->
|
||||
InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.DEFAULT).use { server ->
|
||||
server.start(TestRpcOpsImpl(), securityManager, broker.serverControl)
|
||||
|
||||
val client = RPCClient<TestRpcOps>(rpcConnectorTcpTransport(broker.addresses.primary, clientSslOptions))
|
||||
|
@ -11,7 +11,7 @@
|
||||
package net.corda.testing.node.internal
|
||||
|
||||
import net.corda.client.mock.Generator
|
||||
import net.corda.client.rpc.internal.CordaRPCClientConfigurationImpl
|
||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||
import net.corda.client.rpc.internal.RPCClient
|
||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||
import net.corda.core.concurrent.CordaFuture
|
||||
@ -67,7 +67,7 @@ import net.corda.nodeapi.internal.config.User as InternalUser
|
||||
inline fun <reified I : RPCOps> RPCDriverDSL.startInVmRpcClient(
|
||||
username: String = rpcTestUser.username,
|
||||
password: String = rpcTestUser.password,
|
||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||
) = startInVmRpcClient(I::class.java, username, password, configuration)
|
||||
|
||||
inline fun <reified I : RPCOps> RPCDriverDSL.startRandomRpcClient(
|
||||
@ -80,14 +80,14 @@ inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
|
||||
rpcAddress: NetworkHostAndPort,
|
||||
username: String = rpcTestUser.username,
|
||||
password: String = rpcTestUser.password,
|
||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||
) = startRpcClient(I::class.java, rpcAddress, username, password, configuration)
|
||||
|
||||
inline fun <reified I : RPCOps> RPCDriverDSL.startRpcClient(
|
||||
haAddressPool: List<NetworkHostAndPort>,
|
||||
username: String = rpcTestUser.username,
|
||||
password: String = rpcTestUser.password,
|
||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||
) = startRpcClient(I::class.java, haAddressPool, username, password, configuration)
|
||||
|
||||
data class RpcBrokerHandle(
|
||||
@ -249,7 +249,7 @@ data class RPCDriverDSL(
|
||||
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
||||
maxFileSize: Int = MAX_MESSAGE_SIZE,
|
||||
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
|
||||
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
|
||||
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
||||
ops: I
|
||||
): CordaFuture<RpcServerHandle> {
|
||||
return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker ->
|
||||
@ -269,7 +269,7 @@ data class RPCDriverDSL(
|
||||
rpcOpsClass: Class<I>,
|
||||
username: String = rpcTestUser.username,
|
||||
password: String = rpcTestUser.password,
|
||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||
): CordaFuture<I> {
|
||||
return driverDSL.executorService.fork {
|
||||
val client = RPCClient<I>(inVmClientTransportConfiguration, configuration)
|
||||
@ -317,7 +317,7 @@ data class RPCDriverDSL(
|
||||
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
||||
maxFileSize: Int = MAX_MESSAGE_SIZE,
|
||||
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
|
||||
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
|
||||
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
||||
customPort: NetworkHostAndPort? = null,
|
||||
ops: I
|
||||
): CordaFuture<RpcServerHandle> {
|
||||
@ -340,7 +340,7 @@ data class RPCDriverDSL(
|
||||
rpcAddress: NetworkHostAndPort,
|
||||
username: String = rpcTestUser.username,
|
||||
password: String = rpcTestUser.password,
|
||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||
): CordaFuture<I> {
|
||||
return driverDSL.executorService.fork {
|
||||
val client = RPCClient<I>(ArtemisTcpTransport.rpcConnectorTcpTransport(rpcAddress, null), configuration)
|
||||
@ -366,7 +366,7 @@ data class RPCDriverDSL(
|
||||
haAddressPool: List<NetworkHostAndPort>,
|
||||
username: String = rpcTestUser.username,
|
||||
password: String = rpcTestUser.password,
|
||||
configuration: CordaRPCClientConfigurationImpl = CordaRPCClientConfigurationImpl.default
|
||||
configuration: CordaRPCClientConfiguration = CordaRPCClientConfiguration.DEFAULT
|
||||
): CordaFuture<I> {
|
||||
return driverDSL.executorService.fork {
|
||||
val client = RPCClient<I>(haAddressPool, null, configuration)
|
||||
@ -472,7 +472,7 @@ data class RPCDriverDSL(
|
||||
fun <I : RPCOps> startRpcServerWithBrokerRunning(
|
||||
rpcUser: User = rpcTestUser,
|
||||
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
||||
configuration: RPCServerConfiguration = RPCServerConfiguration.default,
|
||||
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
||||
ops: I,
|
||||
brokerHandle: RpcBrokerHandle
|
||||
): RpcServerHandle {
|
||||
|
@ -134,9 +134,9 @@ class RPCProxyWebService(targetHostAndPort: NetworkHostAndPort) {
|
||||
|
||||
private fun <T> use(action: (CordaRPCOps) -> T): Response {
|
||||
val targetHost = NetworkHostAndPort("localhost", targetPort)
|
||||
val config = object : CordaRPCClientConfiguration {
|
||||
override val connectionMaxRetryInterval = 10.seconds
|
||||
}
|
||||
val config = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
connectionMaxRetryInterval = 10.seconds
|
||||
)
|
||||
log.info("Establishing RPC connection to ${targetHost.host} on port ${targetHost.port} ...")
|
||||
return try {
|
||||
CordaRPCClient(targetHost, config).use("corda", DEFAULT_PASSWORD) {
|
||||
|
@ -56,7 +56,6 @@ class NodeConfigTest {
|
||||
assertEquals(localPort(40002), fullConfig.rpcOptions.address)
|
||||
assertEquals(localPort(10001), fullConfig.p2pAddress)
|
||||
assertEquals(listOf(user("jenny")), fullConfig.rpcUsers)
|
||||
assertThat(fullConfig.dataSourceProperties[DataSourceConfigTag.DATA_SOURCE_URL] as String).contains("AUTO_SERVER_PORT=30001")
|
||||
assertTrue(fullConfig.useTestClock)
|
||||
assertFalse(fullConfig.detectPublicIp)
|
||||
}
|
||||
|
@ -101,9 +101,9 @@ object InteractiveShell {
|
||||
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
|
||||
rpcOps = { username: String, credentials: String ->
|
||||
val client = createCordaRPCClientWithSslAndClassLoader(hostAndPort = configuration.hostAndPort,
|
||||
configuration = object : CordaRPCClientConfiguration {
|
||||
override val maxReconnectAttempts = 1
|
||||
},
|
||||
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
maxReconnectAttempts = 1
|
||||
),
|
||||
sslConfiguration = configuration.ssl,
|
||||
classLoader = classLoader)
|
||||
this.connection = client.start(username, credentials)
|
||||
@ -119,9 +119,9 @@ object InteractiveShell {
|
||||
fun startShellInternal(configuration: ShellConfiguration, classLoader: ClassLoader? = null) {
|
||||
rpcOps = { username: String, credentials: String ->
|
||||
val client = createCordaRPCClientWithInternalSslAndClassLoader(hostAndPort = configuration.hostAndPort,
|
||||
configuration = object : CordaRPCClientConfiguration {
|
||||
override val maxReconnectAttempts = 1
|
||||
},
|
||||
configuration = CordaRPCClientConfiguration.DEFAULT.copy(
|
||||
maxReconnectAttempts = 1
|
||||
),
|
||||
sslConfiguration = configuration.nodeSslConfig,
|
||||
classLoader = classLoader)
|
||||
this.connection = client.start(username, credentials)
|
||||
@ -312,7 +312,7 @@ object InteractiveShell {
|
||||
} catch (e: PermissionException) {
|
||||
output.println(e.message ?: "Access denied", Color.red)
|
||||
} catch (e: ExecutionException) {
|
||||
// ignoring it as already logged by the progress handler subscriber
|
||||
// ignoring it as already logged by the progress handler subscriber
|
||||
} finally {
|
||||
InputStreamDeserializer.closeAll()
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user