From 400346fff00598da1195a1b5ef934a07a59009f5 Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Thu, 8 Nov 2018 09:04:36 +0000 Subject: [PATCH] ENT-2669: Introduce option for HTTP proxy for outbound Bridge connectivity (#1537) * ENT-2669: Introduce option for HTTP proxy for outbound Bridge connectivity One of our customers currently using HTTP proxy without which outbound connection from Corda Node cannot be established. Also, propagate `trace` setting correctly down the Bridge stack. * ENT-2669: Compilation fixes. * ENT-2669: Revert deleted constructor back. * ENT-2669: First stub on HTTP Proxy integration test. * ENT-2669: Minor changes. * ENT-2669: Reduce test to bare minimum. * ENT-2669: Attempt to write own HttpProxy. * ENT-2669: Another attempt to make programmatic HttpProxy work. * ENT-2697: Disable DNS resolution before sending requests to proxies. * ENT-2669: Switch to use Jetty HttpProxy for integration testing. * Adds a pipeline logger ahead of the proxy stage if trace is set. The logging is removed once the proxy completes. Define a constant for pipeline stage. --- .../sender/DirectBridgeSenderService.kt | 3 +- .../kotlin/net/corda/bridge/ConfigTest.kt | 10 + .../corda/bridge/withhttpproxy/firewall.conf | 14 ++ docs/source/firewall-configuration-file.rst | 10 +- .../internal/bridging/AMQPBridgeManager.kt | 18 +- .../bridging/BridgeControlListener.kt | 7 +- .../bridging/LoopbackBridgeManager.kt | 3 +- .../protonwrapper/netty/AMQPChannelHandler.kt | 9 + .../protonwrapper/netty/AMQPClient.kt | 32 +++- node/build.gradle | 4 +- .../kotlin/net/corda/node/amqp/HttpTests.kt | 175 ++++++++++++++++++ .../net/corda/node/amqp/LoopbackBridgeTest.kt | 3 +- 12 files changed, 261 insertions(+), 27 deletions(-) create mode 100644 bridge/src/test/resources/net/corda/bridge/withhttpproxy/firewall.conf create mode 100644 node/src/integration-test/kotlin/net/corda/node/amqp/HttpTests.kt diff --git a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt index 985fff7544..ea5441ef24 100644 --- a/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt +++ b/bridge/src/main/kotlin/net/corda/bridge/services/sender/DirectBridgeSenderService.kt @@ -33,7 +33,8 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration, maxMessageSize, conf.bridgeInnerConfig?.enableSNI ?: true, { ForwardingArtemisMessageClient(artemisConnectionService) }, - BridgeAuditServiceAdaptor(auditService)) + BridgeAuditServiceAdaptor(auditService), + conf.enableAMQPPacketTrace) private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider { override fun start(): ArtemisMessagingClient.Started { diff --git a/bridge/src/test/kotlin/net/corda/bridge/ConfigTest.kt b/bridge/src/test/kotlin/net/corda/bridge/ConfigTest.kt index fe9c54451f..c37fd22113 100644 --- a/bridge/src/test/kotlin/net/corda/bridge/ConfigTest.kt +++ b/bridge/src/test/kotlin/net/corda/bridge/ConfigTest.kt @@ -164,4 +164,14 @@ class ConfigTest { assertEquals("HelloCorda!", config.healthCheckPhrase) assertEquals("proxyUser", config.outboundConfig?.proxyConfig?.userName) } + + @Test + fun `Load config with HTTP proxy support`() { + val configResource = "/net/corda/bridge/withhttpproxy/firewall.conf" + val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource) + assertEquals(ProxyVersion.HTTP, config.outboundConfig!!.proxyConfig!!.version) + assertEquals(NetworkHostAndPort("proxyHost", 12345), config.outboundConfig!!.proxyConfig!!.proxyAddress) + assertEquals("proxyUser", config.outboundConfig!!.proxyConfig!!.userName) + assertEquals("pwd", config.outboundConfig!!.proxyConfig!!.password) + } } \ No newline at end of file diff --git a/bridge/src/test/resources/net/corda/bridge/withhttpproxy/firewall.conf b/bridge/src/test/resources/net/corda/bridge/withhttpproxy/firewall.conf new file mode 100644 index 0000000000..98b00a5864 --- /dev/null +++ b/bridge/src/test/resources/net/corda/bridge/withhttpproxy/firewall.conf @@ -0,0 +1,14 @@ +firewallMode = SenderReceiver +outboundConfig : { + artemisBrokerAddress = "localhost:11005" + proxyConfig : { + version = HTTP + proxyAddress = "proxyHost:12345" + userName = "proxyUser" + password = "pwd" + } +} +inboundConfig : { + listeningAddress = "0.0.0.0:10005" +} +networkParametersPath = network-parameters \ No newline at end of file diff --git a/docs/source/firewall-configuration-file.rst b/docs/source/firewall-configuration-file.rst index 9139e2b253..decffda399 100644 --- a/docs/source/firewall-configuration-file.rst +++ b/docs/source/firewall-configuration-file.rst @@ -117,15 +117,15 @@ absolute path to the firewall's base directory. :crlCheckSoftFail: If true (recommended setting) allows certificate checks to pass if the CRL(certificate revocation list) provider is unavailable. - :proxyConfig: This section is optionally present if outgoing peer connections should go via a SOCKS4, or SOCKS5 proxy: + :proxyConfig: This section is optionally present if outgoing peer connections should go via a SOCKS4, SOCKS5, or HTTP CONNECT tunnelling proxy: - :version: Either SOCKS4, or SOCKS5 to define the protocol version used in connecting to the SOCKS proxy. + :version: Either SOCKS4, SOCKS5, or HTTP to define the protocol version used in connecting to the SOCKS proxy. - :proxyAddress: Host and port of the SOCKS proxy. + :proxyAddress: Host and port of the proxy. - :userName: Optionally a user name that will be presented to the SOCKS proxy after connect. + :userName: Optionally a user name that will be presented to the proxy after connect. - :password: Optionally, a password to present to the SOCKS5 Proxy. It is not valid for SOCKS4 proxies and it should always be combined with [userName]. + :password: Optionally, a password to present to the SOCKS5 or HTTP Proxy. It is not valid for SOCKS4 proxies and it should always be combined with [userName]. :inboundConfig: This section is used to configure the properties of the listening port. It is required for ``SenderReceiver`` and ``FloatOuter`` modes and must be absent for ``BridgeInner`` mode: diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 42446edb3d..6006b5a544 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -40,7 +40,8 @@ open class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, enableSNI: Boolean, private val artemisMessageClientFactory: () -> ArtemisSessionProvider, - private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager { + private val bridgeMetricsService: BridgeMetricsService? = null, + private val trace: Boolean) : BridgeManager { private val lock = ReentrantLock() private val queueNamesToBridgesMap = mutableMapOf>() @@ -51,20 +52,23 @@ open class AMQPBridgeManager(config: MutualSslConfiguration, override val maxMessageSize: Int, override val useOpenSsl: Boolean, override val enableSNI: Boolean, - override val sourceX500Name: String? = null) : AMQPConfiguration { - constructor(config: MutualSslConfiguration, proxyConfig: ProxyConfig?, maxMessageSize: Int, enableSNI: Boolean) : this(config.keyStore.get(), + override val sourceX500Name: String? = null, + override val trace: Boolean) : AMQPConfiguration { + constructor(config: MutualSslConfiguration, proxyConfig: ProxyConfig?, maxMessageSize: Int, enableSNI: Boolean, trace: Boolean) : this(config.keyStore.get(), config.trustStore.get(), proxyConfig, maxMessageSize, config.useOpenSsl, - enableSNI) + enableSNI, + trace = trace) } - private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, proxyConfig, maxMessageSize, enableSNI) + private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, proxyConfig, maxMessageSize, enableSNI, trace) private var sharedEventLoopGroup: EventLoopGroup? = null private var artemis: ArtemisSessionProvider? = null - constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, enableSNI: Boolean, proxyConfig: ProxyConfig? = null) : this(config, proxyConfig, maxMessageSize, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }) + constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, enableSNI: Boolean, proxyConfig: ProxyConfig? = null, trace: Boolean = false) + : this(config, proxyConfig, maxMessageSize, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }, trace = trace) companion object { private const val NUM_BRIDGE_THREADS = 0 // Default sized pool @@ -239,7 +243,7 @@ open class AMQPBridgeManager(config: MutualSslConfiguration, return } } - val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, useOpenSsl, enableSNI, sourceX500Name) } + val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, useOpenSsl, enableSNI, sourceX500Name, trace) } val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService) bridges += newBridge bridgeMetricsService?.bridgeCreated(targets, legalNames) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index 7b7db809f3..251cfaf7c1 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -28,15 +28,16 @@ class BridgeControlListener(val config: MutualSslConfiguration, maxMessageSize: Int, enableSNI: Boolean, private val artemisMessageClientFactory: () -> ArtemisSessionProvider, - bridgeMetricsService: BridgeMetricsService? = null) : AutoCloseable { + bridgeMetricsService: BridgeMetricsService? = null, + trace: Boolean = false) : AutoCloseable { private val bridgeId: String = UUID.randomUUID().toString() private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId" private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId" private val validInboundQueues = mutableSetOf() private val bridgeManager = if (enableSNI) { - LoopbackBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic) + LoopbackBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic, trace) } else { - AMQPBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService) + AMQPBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, trace) } private var artemis: ArtemisSessionProvider? = null private var controlConsumer: ClientConsumer? = null diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt index 8cc70fd58f..5f7cd26f21 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt @@ -31,7 +31,8 @@ class LoopbackBridgeManager(config: MutualSslConfiguration, enableSNI: Boolean, private val artemisMessageClientFactory: () -> ArtemisSessionProvider, private val bridgeMetricsService: BridgeMetricsService? = null, - private val isLocalInbox: (String) -> Boolean) : AMQPBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService) { + private val isLocalInbox: (String) -> Boolean, + trace: Boolean) : AMQPBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, trace) { companion object { private val log = contextLogger() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt index fbe1536b26..239f051dc5 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt @@ -46,6 +46,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() { companion object { private val log = contextLogger() + const val PROXY_LOGGER_NAME = "preProxyLogger" } private lateinit var remoteAddress: InetSocketAddress @@ -123,6 +124,14 @@ internal class AMQPChannelHandler(private val serverMode: Boolean, override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) { when (evt) { is ProxyConnectionEvent -> { + if(trace) { + log.info("ProxyConnectionEvent received: $evt") + try { + ctx.pipeline().remove(PROXY_LOGGER_NAME) + } catch (ex: NoSuchElementException) { + // ignore + } + } // update address to the real target address remoteAddress = evt.destinationAddress() } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt index 1c28f01ae0..8162f8c7ac 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPClient.kt @@ -7,8 +7,10 @@ import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioSocketChannel import io.netty.handler.logging.LogLevel import io.netty.handler.logging.LoggingHandler +import io.netty.handler.proxy.HttpProxyHandler import io.netty.handler.proxy.Socks4ProxyHandler import io.netty.handler.proxy.Socks5ProxyHandler +import io.netty.resolver.NoopAddressResolverGroup import io.netty.util.internal.logging.InternalLoggerFactory import io.netty.util.internal.logging.Slf4JLoggerFactory import net.corda.core.identity.CordaX500Name @@ -17,6 +19,7 @@ import net.corda.core.utilities.contextLogger import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl +import net.corda.nodeapi.internal.protonwrapper.netty.AMQPChannelHandler.Companion.PROXY_LOGGER_NAME import net.corda.nodeapi.internal.requireMessageSize import rx.Observable import rx.subjects.PublishSubject @@ -30,7 +33,8 @@ import kotlin.concurrent.withLock enum class ProxyVersion { SOCKS4, - SOCKS5 + SOCKS5, + HTTP } data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostAndPort, val userName: String? = null, val password: String? = null) { @@ -136,18 +140,28 @@ class AMQPClient(val targets: List, override fun initChannel(ch: SocketChannel) { val pipeline = ch.pipeline() - val socksConfig = conf.proxyConfig - if (socksConfig != null) { - val proxyAddress = InetSocketAddress(socksConfig.proxyAddress.host, socksConfig.proxyAddress.port) + val proxyConfig = conf.proxyConfig + if (proxyConfig != null) { + if (conf.trace) pipeline.addLast(PROXY_LOGGER_NAME, LoggingHandler(LogLevel.INFO)) + val proxyAddress = InetSocketAddress(proxyConfig.proxyAddress.host, proxyConfig.proxyAddress.port) val proxy = when (conf.proxyConfig!!.version) { ProxyVersion.SOCKS4 -> { - Socks4ProxyHandler(proxyAddress, socksConfig.userName) + Socks4ProxyHandler(proxyAddress, proxyConfig.userName) } ProxyVersion.SOCKS5 -> { - Socks5ProxyHandler(proxyAddress, socksConfig.userName, socksConfig.password) + Socks5ProxyHandler(proxyAddress, proxyConfig.userName, proxyConfig.password) + } + ProxyVersion.HTTP -> { + val httpProxyHandler = if(proxyConfig.userName == null || proxyConfig.password == null) { + HttpProxyHandler(proxyAddress) + } else { + HttpProxyHandler(proxyAddress, proxyConfig.userName, proxyConfig.password) + } + //httpProxyHandler.setConnectTimeoutMillis(3600000) // 1hr for debugging purposes + httpProxyHandler } } - pipeline.addLast("SocksPoxy", proxy) + pipeline.addLast("Proxy", proxy) proxy.connectFuture().addListener { if (!it.isSuccess) { ch.disconnect() @@ -201,6 +215,10 @@ class AMQPClient(val targets: List, val bootstrap = Bootstrap() // TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux bootstrap.group(workerGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this)) + // Delegate DNS Resolution to the proxy side, if we are using proxy. + if (configuration.proxyConfig != null) { + bootstrap.resolver(NoopAddressResolverGroup.INSTANCE) + } currentTarget = targets[targetIndex] val clientFuture = bootstrap.connect(currentTarget.host, currentTarget.port) clientFuture.addListener(connectListener) diff --git a/node/build.gradle b/node/build.gradle index 86f1423035..b97ce1c67c 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -173,6 +173,7 @@ dependencies { // Web stuff: for HTTP[S] servlets testCompile "org.eclipse.jetty:jetty-servlet:${jetty_version}" testCompile "org.eclipse.jetty:jetty-webapp:${jetty_version}" + testCompile "org.eclipse.jetty:jetty-proxy:${jetty_version}" testCompile "javax.servlet:javax.servlet-api:3.1.0" // Jersey for JAX-RS implementation for use in Jetty @@ -210,10 +211,9 @@ dependencies { compile "com.palominolabs.metrics:metrics-new-relic:${metrics_new_relic_version}" // Allow access to simple SOCKS Server for integration testing - testCompile("io.netty:netty-example:$netty_version") { + integrationTestCompile("io.netty:netty-example:$netty_version") { exclude group: "io.netty", module: "netty-tcnative" exclude group: "ch.qos.logback", module: "logback-classic" - } // Adding native SSL library to allow using native SSL with Artemis and AMQP diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/HttpTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/HttpTests.kt new file mode 100644 index 0000000000..aa5c9696f8 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/HttpTests.kt @@ -0,0 +1,175 @@ +package net.corda.node.amqp + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.div +import net.corda.core.toFuture +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX +import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus +import net.corda.nodeapi.internal.protonwrapper.netty.* +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.core.MAX_MESSAGE_SIZE +import net.corda.testing.driver.PortAllocation +import net.corda.testing.internal.rigorousMock +import net.corda.testing.internal.stubs.CertificateStoreStubs +import org.junit.* +import org.junit.rules.TemporaryFolder +import kotlin.test.assertEquals +import org.eclipse.jetty.proxy.ProxyServlet +import org.eclipse.jetty.servlet.ServletContextHandler.SESSIONS +import org.eclipse.jetty.proxy.ConnectHandler +import org.eclipse.jetty.server.Connector +import org.eclipse.jetty.server.Server +import org.eclipse.jetty.server.ServerConnector +import org.eclipse.jetty.servlet.ServletContextHandler +import org.eclipse.jetty.servlet.ServletHolder + + +class HttpTests { + @Rule + @JvmField + val temporaryFolder = TemporaryFolder() + + private val portAllocator = PortAllocation.Incremental(10000) + private val httpProxyPort = portAllocator.nextPort() + private val serverPort = portAllocator.nextPort() + private val serverPort2 = portAllocator.nextPort() + private val artemisPort = portAllocator.nextPort() + + private abstract class AbstractNodeConfiguration : NodeConfiguration + + private val httpProxy: Server = reverseJettyProxy() + + private fun reverseJettyProxy() : Server { + val server = Server() + + val connector = ServerConnector(server) + connector.host = "localhost" + connector.port = httpProxyPort + + server.connectors = arrayOf(connector) + + // Setup proxy handler to handle CONNECT methods + val proxy = ConnectHandler() + server.handler = proxy + + // Setup proxy servlet + val context = ServletContextHandler(proxy, "/", SESSIONS) + val proxyServlet = ServletHolder(ProxyServlet.Transparent::class.java) + proxyServlet.setInitParameter("ProxyTo", "localhost:$serverPort") + proxyServlet.setInitParameter("Prefix", "/") + context.addServlet(proxyServlet, "/*") + + return server + } + + @Before + fun setup() { + httpProxy.start() + } + + @After + fun shutdown() { + httpProxy.stop() + } + + @Test + fun `Simple AMPQ Client to Server`() { + val amqpServer = createServer(serverPort) + amqpServer.use { + amqpServer.start() + val receiveSubs = amqpServer.onReceive.subscribe { + assertEquals(BOB_NAME.toString(), it.sourceLegalName) + assertEquals(P2P_PREFIX + "Test", it.topic) + assertEquals("Test", String(it.payload)) + it.complete(true) + } + val amqpClient = createClient() + amqpClient.use { + val serverConnected = amqpServer.onConnection.toFuture() + val clientConnected = amqpClient.onConnection.toFuture() + amqpClient.start() + val serverConnect = serverConnected.get() + assertEquals(true, serverConnect.connected) + assertEquals(BOB_NAME, CordaX500Name.build(serverConnect.remoteCert!!.subjectX500Principal)) + val clientConnect = clientConnected.get() + assertEquals(true, clientConnect.connected) + assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal)) + val msg = amqpClient.createMessage("Test".toByteArray(), + P2P_PREFIX + "Test", + ALICE_NAME.toString(), + emptyMap()) + amqpClient.write(msg) + assertEquals(MessageStatus.Acknowledged, msg.onComplete.get()) + receiveSubs.unsubscribe() + } + } + } + + private fun createClient(): AMQPClient { + val baseDirectory = temporaryFolder.root.toPath() / "client" + val certificatesDirectory = baseDirectory / "certificates" + val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory) + val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) + + val clientConfig = rigorousMock().also { + doReturn(temporaryFolder.root.toPath() / "client").whenever(it).baseDirectory + doReturn(certificatesDirectory).whenever(it).certificatesDirectory + doReturn(BOB_NAME).whenever(it).myLegalName + doReturn(signingCertificateStore).whenever(it).signingCertificateStore + doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions + } + clientConfig.configureWithDevSSLCertificate() + + val clientTruststore = clientConfig.p2pSslOptions.trustStore.get() + val clientKeystore = clientConfig.p2pSslOptions.keyStore.get() + val amqpConfig = object : AMQPConfiguration { + override val keyStore = clientKeystore + override val trustStore = clientTruststore + override val trace: Boolean = true + override val maxMessageSize: Int = MAX_MESSAGE_SIZE + override val proxyConfig: ProxyConfig? = ProxyConfig(ProxyVersion.HTTP, NetworkHostAndPort("127.0.0.1", httpProxyPort), null, null) + } + return AMQPClient( + listOf(NetworkHostAndPort("localhost", serverPort), + NetworkHostAndPort("localhost", serverPort2), + NetworkHostAndPort("localhost", artemisPort)), + setOf(ALICE_NAME, CHARLIE_NAME), + amqpConfig) + } + + private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME): AMQPServer { + val baseDirectory = temporaryFolder.root.toPath() / "server" + val certificatesDirectory = baseDirectory / "certificates" + val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory) + val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) + + val serverConfig = rigorousMock().also { + doReturn(baseDirectory).whenever(it).baseDirectory + doReturn(certificatesDirectory).whenever(it).certificatesDirectory + doReturn(name).whenever(it).myLegalName + doReturn(signingCertificateStore).whenever(it).signingCertificateStore + doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions + } + serverConfig.configureWithDevSSLCertificate() + + val serverTruststore = serverConfig.p2pSslOptions.trustStore.get() + val serverKeystore = serverConfig.p2pSslOptions.keyStore.get() + val amqpConfig = object : AMQPConfiguration { + override val keyStore = serverKeystore + override val trustStore = serverTruststore + override val trace: Boolean = true + override val maxMessageSize: Int = MAX_MESSAGE_SIZE + } + return AMQPServer( + "0.0.0.0", + port, + amqpConfig) + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt index c50a246d89..a74eabfe0a 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt @@ -211,7 +211,8 @@ class LoopbackBridgeTest(private val useOpenSsl: Boolean) { artemisConfig.enableSNI, { ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize) }, null, - { true }) + { true }, + false) bridgeManager.start() val artemis = artemisClient.started!!