diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt index a682be54d8..ab95195a0f 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/services/AMQPListenerTest.kt @@ -22,11 +22,11 @@ import net.corda.core.internal.div import net.corda.core.internal.readAll import net.corda.core.utilities.NetworkHostAndPort import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.crypto.X509KeyStore import net.corda.nodeapi.internal.crypto.X509Utilities import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient +import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.DUMMY_BANK_A_NAME import net.corda.testing.core.DUMMY_BANK_B_NAME @@ -35,6 +35,7 @@ import org.junit.Assert.assertArrayEquals import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder +import java.security.KeyStore import kotlin.test.assertEquals class AMQPListenerTest { @@ -87,16 +88,17 @@ class AMQPListenerTest { clientConfig.createBridgeKeyStores(DUMMY_BANK_B_NAME) val clientKeyStore = clientConfig.loadSslKeyStore().internal val clientTrustStore = clientConfig.loadTrustStore().internal + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = clientKeyStore + override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = clientTrustStore + override val maxMessageSize: Int = maxMessageSize + override val trace: Boolean = true + } // create and connect a real client val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)), setOf(DUMMY_BANK_A_NAME), - PEER_USER, - PEER_USER, - clientKeyStore, - clientConfig.keyStorePassword, - clientTrustStore, - true, - maxMessageSize = maxMessageSize) + amqpConfig) amqpClient.start() // Should see events to show we got a valid connection @@ -158,16 +160,17 @@ class AMQPListenerTest { clientKeyStore.setPrivateKey("TLS_CERT", clientKeys.private, listOf(clientCert)) val clientTrustStore = X509KeyStore("password") clientTrustStore.setCertificate("TLS_ROOT", clientCert) + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = clientKeyStore.internal + override val keyStorePrivateKeyPassword: CharArray = "password".toCharArray() + override val trustStore: KeyStore = clientTrustStore.internal + override val maxMessageSize: Int = maxMessageSize + override val trace: Boolean = true + } // create and connect a real client val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)), setOf(DUMMY_BANK_A_NAME), - PEER_USER, - PEER_USER, - clientKeyStore.internal, - "password", - clientTrustStore.internal, - true, - maxMessageSize = maxMessageSize) + amqpConfig) amqpClient.start() val connectionEvent = connectionFollower.next() assertEquals(false, connectionEvent.connected) diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt index 32a8d0ee23..f25054e05c 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/BridgeAMQPListenerServiceImpl.kt @@ -17,9 +17,9 @@ import net.corda.bridge.services.api.ServiceStateSupport import net.corda.bridge.services.util.ServiceStateCombiner import net.corda.bridge.services.util.ServiceStateHelper import net.corda.core.utilities.contextLogger -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.crypto.KEYSTORE_TYPE import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage +import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange import org.slf4j.LoggerFactory @@ -31,7 +31,7 @@ import java.security.KeyStore import java.util.* class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration, - val maxMessageSize: Int, + val maximumMessageSize: Int, val auditService: BridgeAuditService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeAMQPListenerService, ServiceStateSupport by stateHelper { companion object { @@ -61,16 +61,17 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration, val keyStore = loadKeyStoreAndWipeKeys(keyStoreBytes, keyStorePassword) val trustStore = loadKeyStoreAndWipeKeys(trustStoreBytes, trustStorePassword) val bindAddress = conf.inboundConfig!!.listeningAddress + val amqpConfiguration = object : AMQPConfiguration { + override val keyStore: KeyStore = keyStore + override val keyStorePrivateKeyPassword: CharArray = keyStorePrivateKeyPassword + override val trustStore: KeyStore = trustStore + override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail + override val maxMessageSize: Int = maximumMessageSize + override val trace: Boolean = conf.enableAMQPPacketTrace + } val server = AMQPServer(bindAddress.host, bindAddress.port, - PEER_USER, - PEER_USER, - keyStore, - keyStorePrivateKeyPassword, - trustStore, - conf.crlCheckSoftFail, - maxMessageSize, - conf.enableAMQPPacketTrace) + amqpConfiguration) onConnectSubscription = server.onConnection.subscribe(_onConnection) onConnectAuditSubscription = server.onConnection.subscribe({ if (it.connected) { diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt index fdd9e85643..383aba90fd 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/FloatControlListenerService.kt @@ -24,6 +24,7 @@ import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage +import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange import rx.Subscription @@ -33,7 +34,7 @@ import kotlin.concurrent.withLock class FloatControlListenerService(val conf: BridgeConfiguration, - val maxMessageSize: Int, + val maximumMessageSize: Int, val auditService: BridgeAuditService, val amqpListener: BridgeAMQPListenerService, private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatControlService, ServiceStateSupport by stateHelper { @@ -49,9 +50,6 @@ class FloatControlListenerService(val conf: BridgeConfiguration, private var receiveSubscriber: Subscription? = null private var amqpControlServer: AMQPServer? = null private val sslConfiguration: BridgeSSLConfiguration - private val keyStore: KeyStore - private val keyStorePrivateKeyPassword: String - private val trustStore: KeyStore private val floatControlAddress = conf.floatOuterConfig!!.floatAddress private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject private var activeConnectionInfo: ConnectionChange? = null @@ -61,9 +59,6 @@ class FloatControlListenerService(val conf: BridgeConfiguration, init { statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener)) sslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: BridgeSSLConfigurationImpl(conf) - keyStore = sslConfiguration.loadSslKeyStore().internal - keyStorePrivateKeyPassword = sslConfiguration.keyStorePassword - trustStore = sslConfiguration.loadTrustStore().internal } @@ -83,16 +78,22 @@ class FloatControlListenerService(val conf: BridgeConfiguration, private fun startControlListener() { lock.withLock { + val keyStore = sslConfiguration.loadSslKeyStore().internal + val keyStorePrivateKeyPassword = sslConfiguration.keyStorePassword + val trustStore = sslConfiguration.loadTrustStore().internal + val amqpConfig = object : AMQPConfiguration { + override val userName: String? = null + override val password: String? = null + override val keyStore: KeyStore = keyStore + override val keyStorePrivateKeyPassword: CharArray = keyStorePrivateKeyPassword.toCharArray() + override val trustStore: KeyStore = trustStore + override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail + override val maxMessageSize: Int = maximumMessageSize + override val trace: Boolean = conf.enableAMQPPacketTrace + } val controlServer = AMQPServer(floatControlAddress.host, floatControlAddress.port, - null, - null, - keyStore, - keyStorePrivateKeyPassword, - trustStore, - conf.crlCheckSoftFail, - maxMessageSize, - conf.enableAMQPPacketTrace) + amqpConfig) connectSubscriber = controlServer.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) }) receiveSubscriber = controlServer.onReceive.subscribe({ onControlMessage(it) }, { log.error("Receive event error", it) }) amqpControlServer = controlServer diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt index 2b315b3d0c..52c137ccdc 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/receiver/TunnelingBridgeReceiverService.kt @@ -27,6 +27,7 @@ import net.corda.nodeapi.internal.config.SSLConfiguration import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient +import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange import rx.Subscription import java.io.ByteArrayOutputStream @@ -36,7 +37,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException class TunnelingBridgeReceiverService(val conf: BridgeConfiguration, - val maxMessageSize: Int, + val maximumMessageSize: Int, val auditService: BridgeAuditService, haService: BridgeMasterService, val filterService: IncomingMessageFilterService, @@ -52,9 +53,6 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration, private var amqpControlClient: AMQPClient? = null private val controlLinkSSLConfiguration: SSLConfiguration private val floatListenerSSLConfiguration: SSLConfiguration - private val controlLinkKeyStore: KeyStore - private val controLinkKeyStorePrivateKeyPassword: String - private val controlLinkTrustStore: KeyStore private val expectedCertificateSubject: CordaX500Name private val secureRandom: SecureRandom = newSecureRandom() @@ -62,9 +60,6 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration, statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService)) controlLinkSSLConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf floatListenerSSLConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf - controlLinkKeyStore = controlLinkSSLConfiguration.loadSslKeyStore().internal - controLinkKeyStorePrivateKeyPassword = controlLinkSSLConfiguration.keyStorePassword - controlLinkTrustStore = controlLinkSSLConfiguration.loadTrustStore().internal expectedCertificateSubject = conf.bridgeInnerConfig!!.expectedCertificateSubject } @@ -73,16 +68,23 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration, statusSubscriber = statusFollower.activeChange.subscribe({ if (it) { val floatAddresses = conf.bridgeInnerConfig!!.floatAddresses + val controlLinkKeyStore = controlLinkSSLConfiguration.loadSslKeyStore().internal + val controLinkKeyStorePrivateKeyPassword = controlLinkSSLConfiguration.keyStorePassword + val controlLinkTrustStore = controlLinkSSLConfiguration.loadTrustStore().internal + val amqpConfig = object : AMQPConfiguration { + override val userName: String? = null + override val password: String? = null + override val keyStore: KeyStore = controlLinkKeyStore + override val keyStorePrivateKeyPassword: CharArray = controLinkKeyStorePrivateKeyPassword.toCharArray() + override val trustStore: KeyStore = controlLinkTrustStore + override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail + override val maxMessageSize: Int = maximumMessageSize + override val trace: Boolean = conf.enableAMQPPacketTrace + + } val controlClient = AMQPClient(floatAddresses, setOf(expectedCertificateSubject), - null, - null, - controlLinkKeyStore, - controLinkKeyStorePrivateKeyPassword, - controlLinkTrustStore, - conf.crlCheckSoftFail, - maxMessageSize, - conf.enableAMQPPacketTrace) + amqpConfig) connectSubscriber = controlClient.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) }) receiveSubscriber = controlClient.onReceive.subscribe({ onFloatMessage(it) }, { log.error("Receive event error", it) }) amqpControlClient = controlClient diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 73bd89fdec..b7239beb01 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -27,8 +27,8 @@ import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companio import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient -import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration +import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ClientConsumer @@ -48,7 +48,8 @@ import kotlin.concurrent.withLock * The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager. */ @VisibleForTesting -class AMQPBridgeManager(config: NodeSSLConfiguration, maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager { +class AMQPBridgeManager(config: NodeSSLConfiguration, socksProxyConfig: SocksProxyConfig? = null, + maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager { private val lock = ReentrantLock() private val bridgeNameToBridgeMap = mutableMapOf() @@ -56,14 +57,16 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, maxMessageSize: Int, priva private class AMQPConfigurationImpl private constructor(override val keyStore: KeyStore, override val keyStorePrivateKeyPassword: CharArray, override val trustStore: KeyStore, + override val socksProxyConfig: SocksProxyConfig?, override val maxMessageSize: Int) : AMQPConfiguration { - constructor(config: NodeSSLConfiguration, maxMessageSize: Int) : this(config.loadSslKeyStore().internal, + constructor(config: NodeSSLConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.loadSslKeyStore().internal, config.keyStorePassword.toCharArray(), config.loadTrustStore().internal, + socksProxyConfig, maxMessageSize) } - private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, maxMessageSize) + private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, socksProxyConfig, maxMessageSize) private var sharedEventLoopGroup: EventLoopGroup? = null private var artemis: ArtemisSessionProvider? = null diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt index f3762ed10d..7809ab0374 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt @@ -30,8 +30,8 @@ import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImp import net.corda.nodeapi.internal.requireMessageSize import rx.Observable import rx.subjects.PublishSubject -import java.net.InetSocketAddress import java.lang.Long.min +import java.net.InetSocketAddress import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import javax.net.ssl.KeyManagerFactory @@ -148,10 +148,10 @@ class AMQPClient(val targets: List, override fun initChannel(ch: SocketChannel) { val pipeline = ch.pipeline() - val socksConfig = parent.socksProxyConfig + val socksConfig = conf.socksProxyConfig if (socksConfig != null) { val proxyAddress = InetSocketAddress(socksConfig.proxyAddress.host, socksConfig.proxyAddress.port) - val proxy = when (parent.socksProxyConfig!!.version) { + val proxy = when (conf.socksProxyConfig!!.version) { SocksProxyVersion.SOCKS4 -> { Socks4ProxyHandler(proxyAddress, socksConfig.userName) } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt index c2ba54286a..4e873cdbaa 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPConfiguration.kt @@ -55,5 +55,9 @@ interface AMQPConfiguration { * but currently that is deferred to Artemis and the bridge code. */ val maxMessageSize: Int + + @JvmDefault + val socksProxyConfig: SocksProxyConfig? + get() = null } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index ebfba87e19..01046e5400 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -16,10 +16,13 @@ import net.corda.core.crypto.toStringShort import net.corda.core.internal.div import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor +import net.corda.node.services.config.EnterpriseConfiguration +import net.corda.node.services.config.MutualExclusionConfiguration import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingClient +import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.bridging.AMQPBridgeManager import net.corda.nodeapi.internal.bridging.BridgeManager @@ -282,7 +285,6 @@ class AMQPBridgeTest { doReturn("cordacadevpass").whenever(it).keyStorePassword doReturn(targetAdress).whenever(it).p2pAddress doReturn("").whenever(it).jmxMonitoringHttpPort - doReturn(emptyList()).whenever(it).certificateChainCheckPolicies doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration } artemisConfig.configureWithDevSSLCertificate() diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 6d75850db7..0c5ec11aca 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -18,6 +18,8 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.internal.div import net.corda.core.toFuture import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.services.config.EnterpriseConfiguration +import net.corda.node.services.config.MutualExclusionConfiguration import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingServer diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/SocksTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/SocksTests.kt index 94428f6a94..8c1d0b0da5 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/SocksTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/SocksTests.kt @@ -24,16 +24,15 @@ import net.corda.core.identity.CordaX500Name import net.corda.core.internal.div import net.corda.core.toFuture import net.corda.core.utilities.NetworkHostAndPort -import net.corda.node.services.config.* +import net.corda.node.services.config.EnterpriseConfiguration +import net.corda.node.services.config.MutualExclusionConfiguration +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus -import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient -import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer -import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig -import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyVersion +import net.corda.nodeapi.internal.protonwrapper.netty.* import net.corda.testing.core.* import net.corda.testing.internal.rigorousMock import org.apache.activemq.artemis.api.core.RoutingType @@ -43,6 +42,7 @@ import org.junit.Before import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder +import java.security.KeyStore import kotlin.test.assertEquals class SocksTests { @@ -281,7 +281,6 @@ class SocksTests { doReturn("cordacadevpass").whenever(it).keyStorePassword doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress doReturn(null).whenever(it).jmxMonitoringHttpPort - doReturn(emptyList()).whenever(it).certificateChainCheckPolicies doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration } artemisConfig.configureWithDevSSLCertificate() @@ -304,20 +303,20 @@ class SocksTests { val clientTruststore = clientConfig.loadTrustStore().internal val clientKeystore = clientConfig.loadSslKeyStore().internal + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = clientKeystore + override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = clientTruststore + override val trace: Boolean = true + override val maxMessageSize: Int = MAX_MESSAGE_SIZE + override val socksProxyConfig: SocksProxyConfig? = SocksProxyConfig(SocksProxyVersion.SOCKS5, NetworkHostAndPort("127.0.0.1", socksPort), null, null) + } return AMQPClient( listOf(NetworkHostAndPort("localhost", serverPort), NetworkHostAndPort("localhost", serverPort2), NetworkHostAndPort("localhost", artemisPort)), setOf(ALICE_NAME, CHARLIE_NAME), - PEER_USER, - PEER_USER, - clientKeystore, - clientConfig.keyStorePassword, - clientTruststore, true, - MAX_MESSAGE_SIZE, - socksProxyConfig = SocksProxyConfig(SocksProxyVersion.SOCKS5, - NetworkHostAndPort("127.0.0.1", socksPort), null, null) - ) + amqpConfig) } private fun createSharedThreadsClient(sharedEventGroup: EventLoopGroup, id: Int): AMQPClient { @@ -331,21 +330,20 @@ class SocksTests { val clientTruststore = clientConfig.loadTrustStore().internal val clientKeystore = clientConfig.loadSslKeyStore().internal + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = clientKeystore + override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = clientTruststore + override val trace: Boolean = true + override val maxMessageSize: Int = MAX_MESSAGE_SIZE + override val socksProxyConfig: SocksProxyConfig? = SocksProxyConfig(SocksProxyVersion.SOCKS5, NetworkHostAndPort("127.0.0.1", socksPort), null, null) + } + return AMQPClient( listOf(NetworkHostAndPort("localhost", serverPort)), setOf(ALICE_NAME), - PEER_USER, - PEER_USER, - clientKeystore, - clientConfig.keyStorePassword, - clientTruststore, - true, - MAX_MESSAGE_SIZE, - true, - sharedEventGroup, - socksProxyConfig = SocksProxyConfig(SocksProxyVersion.SOCKS5, - NetworkHostAndPort("127.0.0.1", socksPort), null, null) - ) + amqpConfig, + sharedEventGroup) } private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME): AMQPServer { @@ -359,15 +357,16 @@ class SocksTests { val serverTruststore = serverConfig.loadTrustStore().internal val serverKeystore = serverConfig.loadSslKeyStore().internal + val amqpConfig = object : AMQPConfiguration { + override val keyStore: KeyStore = serverKeystore + override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray() + override val trustStore: KeyStore = serverTruststore + override val trace: Boolean = true + override val maxMessageSize: Int = MAX_MESSAGE_SIZE + } return AMQPServer( "0.0.0.0", port, - PEER_USER, - PEER_USER, - serverKeystore, - serverConfig.keyStorePassword, - serverTruststore, - true, - MAX_MESSAGE_SIZE) + amqpConfig) } }