ENT-6331: Disable AMQP protocol for Artemis RPC broker (#6956)

Also tidy-up visibility scope of internal constants.
This commit is contained in:
Viktor Kolomeyko 2021-09-14 10:58:02 +01:00 committed by GitHub
parent 943cf0b32f
commit e50f508c2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -9,6 +9,7 @@ import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.SslConfiguration
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.Path
@ -23,23 +24,25 @@ class ArtemisTcpTransport {
val TLS_VERSIONS = listOf("TLSv1.2")
internal fun defaultArtemisOptions(hostAndPort: NetworkHostAndPort) = mapOf(
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
// It does not use AMQP messages for its own messages e.g. topology and heartbeats.
private const val P2P_PROTOCOLS = "CORE,AMQP"
private const val RPC_PROTOCOLS = "CORE"
private fun defaultArtemisOptions(hostAndPort: NetworkHostAndPort, protocols: String) = mapOf(
// Basic TCP target details.
TransportConstants.HOST_PROP_NAME to hostAndPort.host,
TransportConstants.PORT_PROP_NAME to hostAndPort.port,
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop.
// It does not use AMQP messages for its own messages e.g. topology and heartbeats.
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications.
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP",
TransportConstants.PROTOCOLS_PROP_NAME to protocols,
TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME to (nodeSerializationEnv != null),
TransportConstants.REMOTING_THREADS_PROPNAME to (if (nodeSerializationEnv != null) -1 else 1),
// turn off direct delivery in Artemis - this is latency optimisation that can lead to
//hick-ups under high load (CORDA-1336)
TransportConstants.DIRECT_DELIVER to false)
internal val defaultSSLOptions = mapOf(
private val defaultSSLOptions = mapOf(
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to TLS_VERSIONS.joinToString(","))
@ -95,8 +98,8 @@ class ArtemisTcpTransport {
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword,
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to false)
internal val acceptorFactoryClassName = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory"
internal val connectorFactoryClassName = NettyConnectorFactory::class.java.name
private val acceptorFactoryClassName = NettyAcceptorFactory::class.java.name
private val connectorFactoryClassName = NettyConnectorFactory::class.java.name
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true): TransportConfiguration {
@ -110,7 +113,7 @@ class ArtemisTcpTransport {
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort).toMutableMap()
val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap()
if (enableSSL) {
options.putAll(defaultSSLOptions)
(keyStore to trustStore).addToTransportOptions(options)
@ -123,7 +126,7 @@ class ArtemisTcpTransport {
@Suppress("LongParameterList")
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false, keyStoreProvider: String? = null): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort).toMutableMap()
val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap()
if (enableSSL) {
options.putAll(defaultSSLOptions)
(keyStore to trustStore).addToTransportOptions(options)
@ -138,7 +141,7 @@ class ArtemisTcpTransport {
}
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort).toMutableMap()
val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap()
if (config != null && enableSSL) {
config.keyStorePath.requireOnDefaultFileSystem()
@ -150,7 +153,7 @@ class ArtemisTcpTransport {
}
fun rpcConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: ClientRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort).toMutableMap()
val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap()
if (config != null && enableSSL) {
config.trustStorePath.requireOnDefaultFileSystem()
@ -165,11 +168,11 @@ class ArtemisTcpTransport {
}
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration {
return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions + config.toTransportOptions() + asMap(keyStoreProvider))
return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions + config.toTransportOptions() + asMap(keyStoreProvider))
}
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration {
return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort) + defaultSSLOptions +
return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions +
config.toTransportOptions() + (TransportConstants.HANDSHAKE_TIMEOUT to 0) + asMap(keyStoreProvider))
}