diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt index d3122c9dc8..cc3bf585eb 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt @@ -9,6 +9,7 @@ import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.SslConfiguration import org.apache.activemq.artemis.api.core.TransportConfiguration +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants import java.nio.file.Path @@ -23,23 +24,25 @@ class ArtemisTcpTransport { val TLS_VERSIONS = listOf("TLSv1.2") - internal fun defaultArtemisOptions(hostAndPort: NetworkHostAndPort) = mapOf( + // Turn on AMQP support, which needs the protocol jar on the classpath. + // Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop. + // It does not use AMQP messages for its own messages e.g. topology and heartbeats. + private const val P2P_PROTOCOLS = "CORE,AMQP" + + private const val RPC_PROTOCOLS = "CORE" + + private fun defaultArtemisOptions(hostAndPort: NetworkHostAndPort, protocols: String) = mapOf( // Basic TCP target details. TransportConstants.HOST_PROP_NAME to hostAndPort.host, TransportConstants.PORT_PROP_NAME to hostAndPort.port, - - // Turn on AMQP support, which needs the protocol jar on the classpath. - // Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop. - // It does not use AMQP messages for its own messages e.g. topology and heartbeats. - // TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications. - TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP", + TransportConstants.PROTOCOLS_PROP_NAME to protocols, TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME to (nodeSerializationEnv != null), TransportConstants.REMOTING_THREADS_PROPNAME to (if (nodeSerializationEnv != null) -1 else 1), // turn off direct delivery in Artemis - this is latency optimisation that can lead to //hick-ups under high load (CORDA-1336) TransportConstants.DIRECT_DELIVER to false) - internal val defaultSSLOptions = mapOf( + private val defaultSSLOptions = mapOf( TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","), TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(",")) @@ -95,8 +98,8 @@ class ArtemisTcpTransport { TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword, TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to false) - internal val acceptorFactoryClassName = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory" - internal val connectorFactoryClassName = NettyConnectorFactory::class.java.name + private val acceptorFactoryClassName = NettyAcceptorFactory::class.java.name + private val connectorFactoryClassName = NettyConnectorFactory::class.java.name fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true): TransportConfiguration { @@ -110,7 +113,7 @@ class ArtemisTcpTransport { fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false): TransportConfiguration { - val options = defaultArtemisOptions(hostAndPort).toMutableMap() + val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap() if (enableSSL) { options.putAll(defaultSSLOptions) (keyStore to trustStore).addToTransportOptions(options) @@ -123,7 +126,7 @@ class ArtemisTcpTransport { @Suppress("LongParameterList") fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false, keyStoreProvider: String? = null): TransportConfiguration { - val options = defaultArtemisOptions(hostAndPort).toMutableMap() + val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap() if (enableSSL) { options.putAll(defaultSSLOptions) (keyStore to trustStore).addToTransportOptions(options) @@ -138,7 +141,7 @@ class ArtemisTcpTransport { } fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { - val options = defaultArtemisOptions(hostAndPort).toMutableMap() + val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap() if (config != null && enableSSL) { config.keyStorePath.requireOnDefaultFileSystem() @@ -150,7 +153,7 @@ class ArtemisTcpTransport { } fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { - val options = defaultArtemisOptions(hostAndPort).toMutableMap() + val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap() if (config != null && enableSSL) { config.trustStorePath.requireOnDefaultFileSystem() @@ -165,11 +168,11 @@ class ArtemisTcpTransport { } fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration { - return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions + config.toTransportOptions() + asMap(keyStoreProvider)) + return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions + config.toTransportOptions() + asMap(keyStoreProvider)) } fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration { - return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions + + return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions + config.toTransportOptions() + (TransportConstants.HANDSHAKE_TIMEOUT to 0) + asMap(keyStoreProvider)) }