diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index d942cb9ec8..73bd89fdec 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -21,7 +21,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.ArtemisSessionProvider import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName @@ -29,6 +28,7 @@ import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig +import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ClientConsumer @@ -48,17 +48,24 @@ import kotlin.concurrent.withLock * The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager. */ @VisibleForTesting -class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConfig: SocksProxyConfig? = null, - private val maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager { +class AMQPBridgeManager(config: NodeSSLConfiguration, maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager { private val lock = ReentrantLock() private val bridgeNameToBridgeMap = mutableMapOf() + + private class AMQPConfigurationImpl private constructor(override val keyStore: KeyStore, + override val keyStorePrivateKeyPassword: CharArray, + override val trustStore: KeyStore, + override val maxMessageSize: Int) : AMQPConfiguration { + constructor(config: NodeSSLConfiguration, maxMessageSize: Int) : this(config.loadSslKeyStore().internal, + config.keyStorePassword.toCharArray(), + config.loadTrustStore().internal, + maxMessageSize) + } + + private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, maxMessageSize) private var sharedEventLoopGroup: EventLoopGroup? = null - private val keyStore = config.loadSslKeyStore().internal - private val keyStorePrivateKeyPassword: String = config.keyStorePassword - private val trustStore = config.loadTrustStore().internal private var artemis: ArtemisSessionProvider? = null - private val crlCheckSoftFail: Boolean = config.crlCheckSoftFail constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }) @@ -78,27 +85,26 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf private class AMQPBridge(private val queueName: String, private val target: NetworkHostAndPort, private val legalNames: Set, - keyStore: KeyStore, - keyStorePrivateKeyPassword: String, - trustStore: KeyStore, - crlCheckSoftFail: Boolean, + private val amqpConfig: AMQPConfiguration, sharedEventGroup: EventLoopGroup, - socksProxyConfig: SocksProxyConfig?, - private val artemis: ArtemisSessionProvider, - private val maxMessageSize: Int) { + private val artemis: ArtemisSessionProvider) { companion object { fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort" private val log = contextLogger() } private fun withMDC(block: () -> Unit) { - MDC.put("queueName", queueName) - MDC.put("target", target.toString()) - MDC.put("bridgeName", bridgeName) - MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() }) - MDC.put("maxMessageSize", maxMessageSize.toString()) - block() - MDC.clear() + val oldMDC = MDC.getCopyOfContextMap() + try { + MDC.put("queueName", queueName) + MDC.put("target", target.toString()) + MDC.put("bridgeName", bridgeName) + MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() }) + MDC.put("maxMessageSize", amqpConfig.maxMessageSize.toString()) + block() + } finally { + MDC.setContextMap(oldMDC) + } } private fun logDebugWithMDC(msg: () -> String) { @@ -111,8 +117,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) } - val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, - sharedThreadPool = sharedEventGroup, socksProxyConfig = socksProxyConfig, maxMessageSize = maxMessageSize) + val amqpClient = AMQPClient(listOf(target), legalNames, amqpConfig, sharedThreadPool = sharedEventGroup) val bridgeName: String get() = getBridgeName(queueName, target) private val lock = ReentrantLock() // lock to serialise session level access private var session: ClientSession? = null @@ -180,8 +185,8 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf } private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { - if (artemisMessage.bodySize > maxMessageSize) { - logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [${artemisMessage.bodySize}], " + + if (artemisMessage.bodySize > amqpConfig.maxMessageSize) { + logWarnWithMDC("Message exceeds maxMessageSize network parameter, maxMessageSize: [${amqpConfig.maxMessageSize}], message size: [${artemisMessage.bodySize}], " + "dropping message, uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}") // Ack the message to prevent same message being sent to us again. artemisMessage.acknowledge() @@ -229,9 +234,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConf if (bridgeExists(getBridgeName(queueName, target))) { return } - - val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, crlCheckSoftFail, sharedEventLoopGroup!!, socksProxyConfig, artemis!!, maxMessageSize) - + val newBridge = AMQPBridge(queueName, target, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!) lock.withLock { bridgeNameToBridgeMap[newBridge.bridgeName] = newBridge } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt index 5f848c616c..d06e63dea4 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/ConnectionStateMachine.kt @@ -57,11 +57,15 @@ internal class ConnectionStateMachine(private val serverMode: Boolean, } private fun withMDC(block: () -> Unit) { - MDC.put("serverMode", serverMode.toString()) - MDC.put("localLegalName", localLegalName) - MDC.put("remoteLegalName", remoteLegalName) - block() - MDC.clear() + val oldMDC = MDC.getCopyOfContextMap() + try { + MDC.put("serverMode", serverMode.toString()) + MDC.put("localLegalName", localLegalName) + MDC.put("remoteLegalName", remoteLegalName) + block() + } finally { + MDC.setContextMap(oldMDC) + } } private fun logDebugWithMDC(msg: () -> String) { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt index fc41b54eec..229e8703eb 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/engine/EventProcessor.kt @@ -51,11 +51,15 @@ internal class EventProcessor(channel: Channel, } private fun withMDC(block: () -> Unit) { - MDC.put("serverMode", serverMode.toString()) - MDC.put("localLegalName", localLegalName) - MDC.put("remoteLegalName", remoteLegalName) - block() - MDC.clear() + val oldMDC = MDC.getCopyOfContextMap() + try { + MDC.put("serverMode", serverMode.toString()) + MDC.put("localLegalName", localLegalName) + MDC.put("remoteLegalName", remoteLegalName) + block() + } finally { + MDC.setContextMap(oldMDC) + } } private fun logDebugWithMDC(msg: () -> String) { diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index fcac36487e..71406eda47 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -35,6 +35,7 @@ import org.slf4j.MDC import java.net.InetSocketAddress import java.nio.channels.ClosedChannelException import java.security.cert.X509Certificate +import javax.net.ssl.SSLException /** * An instance of AMQPChannelHandler sits inside the netty pipeline and controls the socket level lifecycle. @@ -61,13 +62,17 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, private var badCert: Boolean = false private fun withMDC(block: () -> Unit) { - MDC.put("serverMode", serverMode.toString()) - MDC.put("remoteAddress", remoteAddress.toString()) - MDC.put("localCert", localCert?.subjectDN?.toString()) - MDC.put("remoteCert", remoteCert?.subjectDN?.toString()) - MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames?.joinToString(separator = ";") { it.toString() }) - block() - MDC.clear() + val oldMDC = MDC.getCopyOfContextMap() + try { + MDC.put("serverMode", serverMode.toString()) + MDC.put("remoteAddress", remoteAddress.toString()) + MDC.put("localCert", localCert?.subjectDN?.toString()) + MDC.put("remoteCert", remoteCert?.subjectDN?.toString()) + MDC.put("allowedRemoteLegalNames", allowedRemoteLegalNames?.joinToString(separator = ";") { it.toString() }) + block() + } finally { + MDC.setContextMap(oldMDC) + } } private fun logDebugWithMDC(msg: () -> String) { @@ -147,9 +152,12 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, createAMQPEngine(ctx) onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false))) } else { + val cause = evt.cause() // This happens when the peer node is closed during SSL establishment. - if (evt.cause() is ClosedChannelException) { + if (cause is ClosedChannelException) { logWarnWithMDC("SSL Handshake closed early.") + } else if (cause is SSLException && cause.message == "handshake timed out") { // Sadly the exception thrown by Netty wrapper requires that we check the message. + logWarnWithMDC("SSL Handshake timed out") } else { badCert = true } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt index 9b20ae76ff..f3762ed10d 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt @@ -32,7 +32,6 @@ import rx.Observable import rx.subjects.PublishSubject import java.net.InetSocketAddress import java.lang.Long.min -import java.security.KeyStore import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.KeyManagerFactory @@ -61,16 +60,8 @@ data class SocksProxyConfig(val version: SocksProxyVersion, val proxyAddress: Ne */ class AMQPClient(val targets: List, val allowedRemoteLegalNames: Set, - private val userName: String?, - private val password: String?, - private val keyStore: KeyStore, - private val keyStorePrivateKeyPassword: String, - private val trustStore: KeyStore, - private val crlCheckSoftFail: Boolean, - private val maxMessageSize: Int, - private val trace: Boolean = false, - private val sharedThreadPool: EventLoopGroup? = null, - private val socksProxyConfig: SocksProxyConfig? = null) : AutoCloseable { + private val configuration: AMQPConfiguration, + private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable { companion object { init { InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE) @@ -148,10 +139,11 @@ class AMQPClient(val targets: List, private class ClientChannelInitializer(val parent: AMQPClient) : ChannelInitializer() { private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) + private val conf = parent.configuration init { - keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword.toCharArray()) - trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(parent.trustStore, parent.crlCheckSoftFail)) + keyManagerFactory.init(conf.keyStore, conf.keyStorePrivateKeyPassword) + trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, conf.crlCheckSoftFail)) } override fun initChannel(ch: SocketChannel) { @@ -178,12 +170,12 @@ class AMQPClient(val targets: List, val target = parent.currentTarget val handler = createClientSslHelper(target, keyManagerFactory, trustManagerFactory) pipeline.addLast("sslHandler", handler) - if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) + if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) pipeline.addLast(AMQPChannelHandler(false, parent.allowedRemoteLegalNames, - parent.userName, - parent.password, - parent.trace, + conf.userName, + conf.password, + conf.trace, { parent.retryInterval = MIN_RETRY_INTERVAL // reset to fast reconnect if we connect properly parent._onConnection.onNext(it.second) @@ -251,7 +243,7 @@ class AMQPClient(val targets: List, topic: String, destinationLegalName: String, properties: Map): SendableMessage { - requireMessageSize(payload.size, maxMessageSize) + requireMessageSize(payload.size, configuration.maxMessageSize) return SendableMessageImpl(payload, topic, destinationLegalName, currentTarget, properties) } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt new file mode 100644 index 0000000000..c2ba54286a --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt @@ -0,0 +1,59 @@ +package net.corda.nodeapi.internal.protonwrapper.netty + +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import java.security.KeyStore + +interface AMQPConfiguration { + /** + * SASL User name presented during protocol handshake. No SASL login if NULL. + * For legacy interoperability with Artemis authorisation we typically require this to be "PEER_USER" + */ + @JvmDefault + val userName: String? + get() = ArtemisMessagingComponent.PEER_USER + + /** + * SASL plain text password presented during protocol handshake. No SASL login if NULL. + * For legacy interoperability with Artemis authorisation we typically require this to be "PEER_USER" + */ + @JvmDefault + val password: String? + get() = ArtemisMessagingComponent.PEER_USER + + /** + * The keystore used for TLS connections + */ + val keyStore: KeyStore + + /** + * Password used to unlock TLS private keys in the KeyStore. + */ + val keyStorePrivateKeyPassword: CharArray + + /** + * The trust root KeyStore to validate the peer certificates against + */ + val trustStore: KeyStore + + /** + * Setting crlCheckSoftFail to true allows certificate paths where some leaf certificates do not contain cRLDistributionPoints + * and also allows validation to continue if the CRL distribution server is not contactable. + */ + @JvmDefault + val crlCheckSoftFail: Boolean + get() = true + + /** + * Enables full debug tracing of all netty and AMQP level packets. This logs aat very high volume and is only for developers. + */ + @JvmDefault + val trace: Boolean + get() = false + + /** + * The maximum allowed size for packets, which will be dropped ahead of send. In future may also be enforced on receive, + * but currently that is deferred to Artemis and the bridge code. + */ + val maxMessageSize: Int +} + diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt index 1d2c4f2adb..fd54a8a1d4 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPServer.kt @@ -33,7 +33,6 @@ import rx.Observable import rx.subjects.PublishSubject import java.net.BindException import java.net.InetSocketAddress -import java.security.KeyStore import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.KeyManagerFactory @@ -46,14 +45,7 @@ import kotlin.concurrent.withLock */ class AMQPServer(val hostName: String, val port: Int, - private val userName: String?, - private val password: String?, - private val keyStore: KeyStore, - private val keyStorePrivateKeyPassword: CharArray, - private val trustStore: KeyStore, - private val crlCheckSoftFail: Boolean, - private val maxMessageSize: Int, - private val trace: Boolean = false) : AutoCloseable { + private val configuration: AMQPConfiguration) : AutoCloseable { companion object { init { @@ -72,36 +64,26 @@ class AMQPServer(val hostName: String, private var serverChannel: Channel? = null private val clientChannels = ConcurrentHashMap() - constructor(hostName: String, - port: Int, - userName: String?, - password: String?, - keyStore: KeyStore, - keyStorePrivateKeyPassword: String, - trustStore: KeyStore, - crlCheckSoftFail: Boolean, - maxMessageSize: Int, - trace: Boolean = false) : this(hostName, port, userName, password, keyStore, keyStorePrivateKeyPassword.toCharArray(), trustStore, crlCheckSoftFail, maxMessageSize, trace) - private class ServerChannelInitializer(val parent: AMQPServer) : ChannelInitializer() { private val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) private val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()) + private val conf = parent.configuration init { - keyManagerFactory.init(parent.keyStore, parent.keyStorePrivateKeyPassword) - trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(parent.trustStore, parent.crlCheckSoftFail)) + keyManagerFactory.init(conf.keyStore, conf.keyStorePrivateKeyPassword) + trustManagerFactory.init(initialiseTrustStoreAndEnableCrlChecking(conf.trustStore, conf.crlCheckSoftFail)) } override fun initChannel(ch: SocketChannel) { val pipeline = ch.pipeline() val handler = createServerSslHelper(keyManagerFactory, trustManagerFactory) pipeline.addLast("sslHandler", handler) - if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) + if (conf.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO)) pipeline.addLast(AMQPChannelHandler(true, null, - parent.userName, - parent.password, - parent.trace, + conf.userName, + conf.password, + conf.trace, { parent.clientChannels[it.first.remoteAddress()] = it.first parent._onConnection.onNext(it.second) @@ -169,7 +151,7 @@ class AMQPServer(val hostName: String, destinationLegalName: String, destinationLink: NetworkHostAndPort, properties: Map): SendableMessage { - requireMessageSize(payload.size, maxMessageSize) + requireMessageSize(payload.size, configuration.maxMessageSize) val dest = InetSocketAddress(destinationLink.host, destinationLink.port) require(dest in clientChannels.keys) { "Destination not available" diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 3b6c8d1bfd..ebfba87e19 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -16,13 +16,14 @@ import net.corda.core.crypto.toStringShort import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor -import net.corda.node.services.config.* +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingClient -import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.bridging.AMQPBridgeManager import net.corda.nodeapi.internal.bridging.BridgeManager +import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer import net.corda.testing.core.* import net.corda.testing.internal.rigorousMock @@ -35,6 +36,7 @@ import org.junit.Ignore import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder +import java.security.KeyStore import java.util.* import kotlin.system.measureNanoTime import kotlin.system.measureTimeMillis @@ -302,16 +304,16 @@ class AMQPBridgeTest { } serverConfig.configureWithDevSSLCertificate() + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = serverConfig.loadSslKeyStore().internal + override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = serverConfig.loadTrustStore().internal + override val trace: Boolean = true + override val maxMessageSize: Int = maxMessageSize + } return AMQPServer("0.0.0.0", amqpPort, - ArtemisMessagingComponent.PEER_USER, - ArtemisMessagingComponent.PEER_USER, - serverConfig.loadSslKeyStore().internal, - serverConfig.keyStorePassword, - serverConfig.loadTrustStore().internal, - crlCheckSoftFail = true, - trace = true, - maxMessageSize = maxMessageSize + amqpConfig ) } } \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt index 10e9c2c6d9..0f7acfc0b8 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt @@ -13,11 +13,11 @@ import net.corda.core.utilities.seconds import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.crypto.* import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient +import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer import net.corda.testing.core.* import net.corda.testing.internal.DEV_INTERMEDIATE_CA @@ -46,6 +46,7 @@ import java.io.Closeable import java.math.BigInteger import java.net.InetSocketAddress import java.security.KeyPair +import java.security.KeyStore import java.security.PrivateKey import java.security.Security import java.security.cert.X509CRL @@ -244,7 +245,7 @@ class CertificateRevocationListNodeTests { @Test fun `AMPQ Client to Server connection succeeds when CRL cannot be obtained and soft fail is enabled`() { val crlCheckSoftFail = true - val (amqpServer, serverCert) = createServer( + val (amqpServer, _) = createServer( serverPort, crlCheckSoftFail = crlCheckSoftFail, nodeCrlDistPoint = "http://${server.hostAndPort}/crl/invalid.crl") @@ -335,16 +336,18 @@ class CertificateRevocationListNodeTests { val nodeCert = clientConfig.recreateNodeCaAndTlsCertificates(nodeCrlDistPoint, tlsCrlDistPoint) val clientTruststore = clientConfig.loadTrustStore().internal val clientKeystore = clientConfig.loadSslKeyStore().internal + + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = clientKeystore + override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = clientTruststore + override val crlCheckSoftFail: Boolean = crlCheckSoftFail + override val maxMessageSize: Int = maxMessageSize + } return Pair(AMQPClient( listOf(NetworkHostAndPort("localhost", targetPort)), setOf(ALICE_NAME, CHARLIE_NAME), - PEER_USER, - PEER_USER, - clientKeystore, - clientConfig.keyStorePassword, - clientTruststore, - crlCheckSoftFail, - maxMessageSize = maxMessageSize), nodeCert) + amqpConfig), nodeCert) } private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME, @@ -363,16 +366,17 @@ class CertificateRevocationListNodeTests { val nodeCert = serverConfig.recreateNodeCaAndTlsCertificates(nodeCrlDistPoint, tlsCrlDistPoint) val serverTruststore = serverConfig.loadTrustStore().internal val serverKeystore = serverConfig.loadSslKeyStore().internal + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = serverKeystore + override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = serverTruststore + override val crlCheckSoftFail: Boolean = crlCheckSoftFail + override val maxMessageSize: Int = maxMessageSize + } return Pair(AMQPServer( "0.0.0.0", port, - PEER_USER, - PEER_USER, - serverKeystore, - serverConfig.keyStorePassword, - serverTruststore, - crlCheckSoftFail, - maxMessageSize = maxMessageSize), nodeCert) + amqpConfig), nodeCert) } private fun SSLConfiguration.recreateNodeCaAndTlsCertificates(nodeCaCrlDistPoint: String, tlsCrlDistPoint: String?): X509Certificate { diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index e15d3c4c6c..6d75850db7 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -18,17 +18,18 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.internal.div import net.corda.core.toFuture import net.corda.core.utilities.NetworkHostAndPort -import net.corda.node.services.config.* +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.ArtemisTcpTransport.Companion.CIPHER_SUITES import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.createDevKeyStores import net.corda.nodeapi.internal.crypto.* import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient +import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer import net.corda.testing.core.* import net.corda.testing.internal.createDevIntermediateCaCertPath @@ -39,6 +40,7 @@ import org.junit.Assert.assertArrayEquals import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder +import java.security.KeyStore import java.security.SecureRandom import java.security.cert.X509Certificate import javax.net.ssl.* @@ -413,18 +415,19 @@ class ProtonWrapperTests { val clientTruststore = clientConfig.loadTrustStore().internal val clientKeystore = clientConfig.loadSslKeyStore().internal + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = clientKeystore + override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = clientTruststore + override val trace: Boolean = true + override val maxMessageSize: Int = maxMessageSize + } return AMQPClient( listOf(NetworkHostAndPort("localhost", serverPort), NetworkHostAndPort("localhost", serverPort2), NetworkHostAndPort("localhost", artemisPort)), setOf(ALICE_NAME, CHARLIE_NAME), - PEER_USER, - PEER_USER, - clientKeystore, - clientConfig.keyStorePassword, - clientTruststore, - true, - maxMessageSize = maxMessageSize) + amqpConfig) } private fun createSharedThreadsClient(sharedEventGroup: EventLoopGroup, id: Int, maxMessageSize: Int = MAX_MESSAGE_SIZE): AMQPClient { @@ -439,17 +442,18 @@ class ProtonWrapperTests { val clientTruststore = clientConfig.loadTrustStore().internal val clientKeystore = clientConfig.loadSslKeyStore().internal + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = clientKeystore + override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = clientTruststore + override val trace: Boolean = true + override val maxMessageSize: Int = maxMessageSize + } return AMQPClient( listOf(NetworkHostAndPort("localhost", serverPort)), setOf(ALICE_NAME), - PEER_USER, - PEER_USER, - clientKeystore, - clientConfig.keyStorePassword, - clientTruststore, - true, - sharedThreadPool = sharedEventGroup, - maxMessageSize = maxMessageSize) + amqpConfig, + sharedThreadPool = sharedEventGroup) } @@ -467,15 +471,16 @@ class ProtonWrapperTests { val serverTruststore = serverConfig.loadTrustStore().internal val serverKeystore = serverConfig.loadSslKeyStore().internal + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = serverKeystore + override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = serverTruststore + override val trace: Boolean = true + override val maxMessageSize: Int = maxMessageSize + } return AMQPServer( "0.0.0.0", port, - PEER_USER, - PEER_USER, - serverKeystore, - serverConfig.keyStorePassword, - serverTruststore, - crlCheckSoftFail = true, - maxMessageSize = maxMessageSize) + amqpConfig) } }