mirror of
https://github.com/corda/corda.git
synced 2025-06-12 20:28:18 +00:00
CORDA-1343 Make the RPCClient ssl constructors public. Clean up broke… (#3039)
* CORDA-1343 Make the RPCClient ssl constructors public. Clean up broker authentication logic * CORDA-1343 small fix * CORDA-1343 cleanup * CORDA-1343 fixed api changes script * CORDA-1343 fixed merge * CORDA-1343 removed unused property * CORDA-1343 add separate p2p and rpc node users * CORDA-1343 remove test configuration * CORDA-1343 fix tests * CORDA-1343 address core review comments * CORDA-1343 some documentation and adding createWithSsl method for a haAddressPool * CORDA-1343 clean up the CordaRPCClient interface * CORDA-1343 add internal shell test * CORDA-1343 address code review comments * CORDA-1343 split the internalShell user from the System Rpc user * CORDA-1343 fix test * CORDA-1343 Add warning when certificateChainCheckPolicies is being configured * CORDA-1343 Address code review changes * CORDA-1343 fix merge * CORDA-1343 added test, docs, clarify comments * CORDA-1343 clean up docs * CORDA-1343 fix api * CORDA-1343 fix merge * CORDA-1343 fix merge * CORDA-1343 fix merge * CORDA-1343 fix merge
This commit is contained in:
@ -1,6 +1,6 @@
|
||||
package net.corda.nodeapi
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.messaging.ClientRpcSslOptions
|
||||
import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
@ -8,20 +8,11 @@ import net.corda.nodeapi.internal.requireOnDefaultFileSystem
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
|
||||
|
||||
sealed class ConnectionDirection {
|
||||
data class Inbound(val acceptorFactoryClassName: String) : ConnectionDirection()
|
||||
data class Outbound(
|
||||
val expectedCommonNames: Set<CordaX500Name> = emptySet(), // TODO SNI? Or we need a notion of node's network identity?
|
||||
val connectorFactoryClassName: String = NettyConnectorFactory::class.java.name
|
||||
) : ConnectionDirection()
|
||||
}
|
||||
import java.nio.file.Path
|
||||
|
||||
/** Class to set Artemis TCP configuration options. */
|
||||
class ArtemisTcpTransport {
|
||||
companion object {
|
||||
const val VERIFY_PEER_LEGAL_NAME = "corda.verifyPeerCommonName"
|
||||
|
||||
/**
|
||||
* Corda supported TLS schemes.
|
||||
* <p><ul>
|
||||
@ -47,69 +38,114 @@ class ArtemisTcpTransport {
|
||||
/** Supported TLS versions, currently TLSv1.2 only. */
|
||||
val TLS_VERSIONS = listOf("TLSv1.2")
|
||||
|
||||
/** Specify [TransportConfiguration] for TCP communication. */
|
||||
fun tcpTransport(
|
||||
direction: ConnectionDirection,
|
||||
hostAndPort: NetworkHostAndPort,
|
||||
config: SSLConfiguration?,
|
||||
enableSSL: Boolean = true
|
||||
): TransportConfiguration {
|
||||
val options = mutableMapOf<String, Any?>(
|
||||
// Basic TCP target details.
|
||||
TransportConstants.HOST_PROP_NAME to hostAndPort.host,
|
||||
TransportConstants.PORT_PROP_NAME to hostAndPort.port,
|
||||
private fun defaultArtemisOptions(hostAndPort: NetworkHostAndPort) = mapOf(
|
||||
// Basic TCP target details.
|
||||
TransportConstants.HOST_PROP_NAME to hostAndPort.host,
|
||||
TransportConstants.PORT_PROP_NAME to hostAndPort.port,
|
||||
|
||||
// Turn on AMQP support, which needs the protocol jar on the classpath.
|
||||
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
|
||||
// It does not use AMQP messages for its own messages e.g. topology and heartbeats.
|
||||
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications.
|
||||
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP",
|
||||
TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME to (nodeSerializationEnv != null),
|
||||
TransportConstants.REMOTING_THREADS_PROPNAME to (if (nodeSerializationEnv != null) -1 else 1),
|
||||
// turn off direct delivery in Artemis - this is latency optimisation that can lead to
|
||||
//hick-ups under high load (CORDA-1336)
|
||||
TransportConstants.DIRECT_DELIVER to false
|
||||
)
|
||||
// Turn on AMQP support, which needs the protocol jar on the classpath.
|
||||
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
|
||||
// It does not use AMQP messages for its own messages e.g. topology and heartbeats.
|
||||
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications.
|
||||
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP",
|
||||
TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME to (nodeSerializationEnv != null),
|
||||
TransportConstants.REMOTING_THREADS_PROPNAME to (if (nodeSerializationEnv != null) -1 else 1),
|
||||
// turn off direct delivery in Artemis - this is latency optimisation that can lead to
|
||||
//hick-ups under high load (CORDA-1336)
|
||||
TransportConstants.DIRECT_DELIVER to false)
|
||||
|
||||
private val defaultSSLOptions = mapOf(
|
||||
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
|
||||
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(","))
|
||||
|
||||
private fun SSLConfiguration.toTransportOptions() = mapOf(
|
||||
TransportConstants.SSL_ENABLED_PROP_NAME to true,
|
||||
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
|
||||
TransportConstants.KEYSTORE_PATH_PROP_NAME to sslKeystore,
|
||||
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword,
|
||||
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
|
||||
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to trustStoreFile,
|
||||
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to trustStorePassword,
|
||||
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true)
|
||||
|
||||
private fun ClientRpcSslOptions.toTransportOptions() = mapOf(
|
||||
TransportConstants.SSL_ENABLED_PROP_NAME to true,
|
||||
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to trustStoreProvider,
|
||||
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to trustStorePath,
|
||||
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to trustStorePassword)
|
||||
|
||||
private fun BrokerRpcSslOptions.toTransportOptions() = mapOf(
|
||||
TransportConstants.SSL_ENABLED_PROP_NAME to true,
|
||||
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
|
||||
TransportConstants.KEYSTORE_PATH_PROP_NAME to keyStorePath,
|
||||
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword,
|
||||
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to false)
|
||||
|
||||
private val acceptorFactoryClassName = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory"
|
||||
private val connectorFactoryClassName = NettyConnectorFactory::class.java.name
|
||||
|
||||
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SSLConfiguration?, enableSSL: Boolean = true): TransportConfiguration {
|
||||
val options = defaultArtemisOptions(hostAndPort).toMutableMap()
|
||||
|
||||
if (config != null && enableSSL) {
|
||||
config.sslKeystore.requireOnDefaultFileSystem()
|
||||
config.trustStoreFile.requireOnDefaultFileSystem()
|
||||
val tlsOptions = mapOf(
|
||||
// Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake
|
||||
// and AES encryption.
|
||||
TransportConstants.SSL_ENABLED_PROP_NAME to true,
|
||||
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
|
||||
TransportConstants.KEYSTORE_PATH_PROP_NAME to config.sslKeystore,
|
||||
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password.
|
||||
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
|
||||
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to config.trustStoreFile,
|
||||
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
|
||||
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
|
||||
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(","),
|
||||
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true,
|
||||
VERIFY_PEER_LEGAL_NAME to (direction as? ConnectionDirection.Outbound)?.expectedCommonNames
|
||||
)
|
||||
options.putAll(tlsOptions)
|
||||
options.putAll(defaultSSLOptions)
|
||||
options.putAll(config.toTransportOptions())
|
||||
}
|
||||
val factoryName = when (direction) {
|
||||
is ConnectionDirection.Inbound -> direction.acceptorFactoryClassName
|
||||
is ConnectionDirection.Outbound -> direction.connectorFactoryClassName
|
||||
return TransportConfiguration(acceptorFactoryClassName, options)
|
||||
}
|
||||
|
||||
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: SSLConfiguration?, enableSSL: Boolean = true): TransportConfiguration {
|
||||
val options = defaultArtemisOptions(hostAndPort).toMutableMap()
|
||||
|
||||
if (config != null && enableSSL) {
|
||||
config.sslKeystore.requireOnDefaultFileSystem()
|
||||
config.trustStoreFile.requireOnDefaultFileSystem()
|
||||
options.putAll(defaultSSLOptions)
|
||||
options.putAll(config.toTransportOptions())
|
||||
}
|
||||
return TransportConfiguration(factoryName, options)
|
||||
return TransportConfiguration(connectorFactoryClassName, options)
|
||||
}
|
||||
|
||||
/** [TransportConfiguration] for RPC TCP communication - server side. */
|
||||
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
|
||||
val options = defaultArtemisOptions(hostAndPort).toMutableMap()
|
||||
|
||||
if (config != null && enableSSL) {
|
||||
config.keyStorePath.requireOnDefaultFileSystem()
|
||||
options.putAll(config.toTransportOptions())
|
||||
options.putAll(defaultSSLOptions)
|
||||
}
|
||||
return TransportConfiguration(acceptorFactoryClassName, options)
|
||||
}
|
||||
|
||||
/** [TransportConfiguration] for RPC TCP communication
|
||||
* This is the Transport that connects the client JVM to the broker. */
|
||||
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
|
||||
val options = defaultArtemisOptions(hostAndPort).toMutableMap()
|
||||
|
||||
if (config != null && enableSSL) {
|
||||
config.trustStorePath.requireOnDefaultFileSystem()
|
||||
options.putAll(config.toTransportOptions())
|
||||
options.putAll(defaultSSLOptions)
|
||||
}
|
||||
return TransportConfiguration(connectorFactoryClassName, options)
|
||||
}
|
||||
|
||||
/** Create as list of [TransportConfiguration]. **/
|
||||
fun tcpTransportsFromList(
|
||||
direction: ConnectionDirection,
|
||||
hostAndPortList: List<NetworkHostAndPort>,
|
||||
config: SSLConfiguration?,
|
||||
enableSSL: Boolean = true): List<TransportConfiguration> {
|
||||
val tcpTransports = ArrayList<TransportConfiguration>(hostAndPortList.size)
|
||||
hostAndPortList.forEach {
|
||||
tcpTransports.add(tcpTransport(direction, it, config, enableSSL))
|
||||
}
|
||||
fun rpcConnectorTcpTransportsFromList(hostAndPortList: List<NetworkHostAndPort>, config: ClientRpcSslOptions?, enableSSL: Boolean = true): List<TransportConfiguration> = hostAndPortList.map {
|
||||
rpcConnectorTcpTransport(it, config, enableSSL)
|
||||
}
|
||||
|
||||
return tcpTransports
|
||||
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SSLConfiguration): TransportConfiguration {
|
||||
return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions + config.toTransportOptions())
|
||||
}
|
||||
|
||||
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SSLConfiguration): TransportConfiguration {
|
||||
return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions + config.toTransportOptions())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data class BrokerRpcSslOptions(val keyStorePath: Path, val keyStorePassword: String)
|
||||
|
@ -4,8 +4,7 @@ import net.corda.core.serialization.internal.nodeSerializationEnv
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.nodeapi.ArtemisTcpTransport
|
||||
import net.corda.nodeapi.ConnectionDirection
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
@ -35,7 +34,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration,
|
||||
check(started == null) { "start can't be called twice" }
|
||||
log.info("Connecting to message broker: $serverAddress")
|
||||
// TODO Add broker CN to config for host verification in case the embedded broker isn't used
|
||||
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverAddress, config)
|
||||
val tcpTransport = ArtemisTcpTransport.p2pConnectorTcpTransport(serverAddress, config)
|
||||
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
|
||||
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
|
||||
// would be the default and the two lines below can be deleted.
|
||||
@ -50,7 +49,7 @@ class ArtemisMessagingClient(private val config: SSLConfiguration,
|
||||
// using our TLS certificate.
|
||||
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
|
||||
// size of 1MB is acknowledged.
|
||||
val session = sessionFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
|
||||
val session = sessionFactory!!.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
|
||||
session.start()
|
||||
// Create a general purpose producer.
|
||||
val producer = session.createProducer()
|
||||
|
@ -22,8 +22,12 @@ class ArtemisMessagingComponent {
|
||||
|
||||
// System users must contain an invalid RPC username character to prevent any chance of name clash which in this
|
||||
// case is a forward slash
|
||||
const val NODE_USER = "SystemUsers/Node"
|
||||
const val NODE_P2P_USER = "SystemUsers/Node"
|
||||
const val NODE_RPC_USER = "SystemUsers/NodeRPC"
|
||||
const val PEER_USER = "SystemUsers/Peer"
|
||||
// User used only in devMode when nodes have a shell attached by default.
|
||||
const val INTERNAL_SHELL_USER = "internalShell"
|
||||
|
||||
const val INTERNAL_PREFIX = "internal."
|
||||
const val PEERS_PREFIX = "${INTERNAL_PREFIX}peers." //TODO Come up with better name for common peers/services queue
|
||||
const val P2P_PREFIX = "p2p.inbound."
|
||||
|
@ -9,7 +9,7 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||
@ -112,7 +112,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val maxMessageSize
|
||||
if (connected) {
|
||||
log.info("Bridge Connected")
|
||||
val sessionFactory = artemis.started!!.sessionFactory
|
||||
val session = sessionFactory.createSession(NODE_USER, NODE_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
this.session = session
|
||||
val consumer = session.createConsumer(queueName)
|
||||
this.consumer = consumer
|
||||
|
Reference in New Issue
Block a user