ENT-2669: Introduce option for HTTP proxy for outbound Bridge connectivity (#1537)

* ENT-2669: Introduce option for HTTP proxy for outbound Bridge connectivity

One of our customers currently using HTTP proxy without which outbound connection from Corda Node cannot be established.
Also, propagate `trace` setting correctly down the Bridge stack.

* ENT-2669: Compilation fixes.

* ENT-2669: Revert deleted constructor back.

* ENT-2669: First stub on HTTP Proxy integration test.

* ENT-2669: Minor changes.

* ENT-2669: Reduce test to bare minimum.

* ENT-2669: Attempt to write own HttpProxy.

* ENT-2669: Another attempt to make programmatic HttpProxy work.

* ENT-2697: Disable DNS resolution before sending requests to proxies.

* ENT-2669: Switch to use Jetty HttpProxy for integration testing.

* Adds a pipeline logger ahead of the proxy stage if trace is set. The logging is removed once the proxy completes.

Define a constant for pipeline stage.
This commit is contained in:
Viktor Kolomeyko 2018-11-08 09:04:36 +00:00 committed by GitHub
parent 90597e1736
commit 400346fff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 261 additions and 27 deletions

View File

@ -33,7 +33,8 @@ class DirectBridgeSenderService(val conf: FirewallConfiguration,
maxMessageSize,
conf.bridgeInnerConfig?.enableSNI ?: true,
{ ForwardingArtemisMessageClient(artemisConnectionService) },
BridgeAuditServiceAdaptor(auditService))
BridgeAuditServiceAdaptor(auditService),
conf.enableAMQPPacketTrace)
private class ForwardingArtemisMessageClient(val artemisConnectionService: BridgeArtemisConnectionService) : ArtemisSessionProvider {
override fun start(): ArtemisMessagingClient.Started {

View File

@ -164,4 +164,14 @@ class ConfigTest {
assertEquals("HelloCorda!", config.healthCheckPhrase)
assertEquals("proxyUser", config.outboundConfig?.proxyConfig?.userName)
}
@Test
fun `Load config with HTTP proxy support`() {
val configResource = "/net/corda/bridge/withhttpproxy/firewall.conf"
val config = createAndLoadConfigFromResource(tempFolder.root.toPath(), configResource)
assertEquals(ProxyVersion.HTTP, config.outboundConfig!!.proxyConfig!!.version)
assertEquals(NetworkHostAndPort("proxyHost", 12345), config.outboundConfig!!.proxyConfig!!.proxyAddress)
assertEquals("proxyUser", config.outboundConfig!!.proxyConfig!!.userName)
assertEquals("pwd", config.outboundConfig!!.proxyConfig!!.password)
}
}

View File

@ -0,0 +1,14 @@
firewallMode = SenderReceiver
outboundConfig : {
artemisBrokerAddress = "localhost:11005"
proxyConfig : {
version = HTTP
proxyAddress = "proxyHost:12345"
userName = "proxyUser"
password = "pwd"
}
}
inboundConfig : {
listeningAddress = "0.0.0.0:10005"
}
networkParametersPath = network-parameters

View File

@ -117,15 +117,15 @@ absolute path to the firewall's base directory.
:crlCheckSoftFail: If true (recommended setting) allows certificate checks to pass if the CRL(certificate revocation list) provider is unavailable.
:proxyConfig: This section is optionally present if outgoing peer connections should go via a SOCKS4, or SOCKS5 proxy:
:proxyConfig: This section is optionally present if outgoing peer connections should go via a SOCKS4, SOCKS5, or HTTP CONNECT tunnelling proxy:
:version: Either SOCKS4, or SOCKS5 to define the protocol version used in connecting to the SOCKS proxy.
:version: Either SOCKS4, SOCKS5, or HTTP to define the protocol version used in connecting to the SOCKS proxy.
:proxyAddress: Host and port of the SOCKS proxy.
:proxyAddress: Host and port of the proxy.
:userName: Optionally a user name that will be presented to the SOCKS proxy after connect.
:userName: Optionally a user name that will be presented to the proxy after connect.
:password: Optionally, a password to present to the SOCKS5 Proxy. It is not valid for SOCKS4 proxies and it should always be combined with [userName].
:password: Optionally, a password to present to the SOCKS5 or HTTP Proxy. It is not valid for SOCKS4 proxies and it should always be combined with [userName].
:inboundConfig: This section is used to configure the properties of the listening port. It is required for ``SenderReceiver`` and ``FloatOuter`` modes and must be absent for ``BridgeInner`` mode:

View File

@ -40,7 +40,8 @@ open class AMQPBridgeManager(config: MutualSslConfiguration,
maxMessageSize: Int,
enableSNI: Boolean,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager {
private val bridgeMetricsService: BridgeMetricsService? = null,
private val trace: Boolean) : BridgeManager {
private val lock = ReentrantLock()
private val queueNamesToBridgesMap = mutableMapOf<String, MutableList<AMQPBridge>>()
@ -51,20 +52,23 @@ open class AMQPBridgeManager(config: MutualSslConfiguration,
override val maxMessageSize: Int,
override val useOpenSsl: Boolean,
override val enableSNI: Boolean,
override val sourceX500Name: String? = null) : AMQPConfiguration {
constructor(config: MutualSslConfiguration, proxyConfig: ProxyConfig?, maxMessageSize: Int, enableSNI: Boolean) : this(config.keyStore.get(),
override val sourceX500Name: String? = null,
override val trace: Boolean) : AMQPConfiguration {
constructor(config: MutualSslConfiguration, proxyConfig: ProxyConfig?, maxMessageSize: Int, enableSNI: Boolean, trace: Boolean) : this(config.keyStore.get(),
config.trustStore.get(),
proxyConfig,
maxMessageSize,
config.useOpenSsl,
enableSNI)
enableSNI,
trace = trace)
}
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, proxyConfig, maxMessageSize, enableSNI)
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, proxyConfig, maxMessageSize, enableSNI, trace)
private var sharedEventLoopGroup: EventLoopGroup? = null
private var artemis: ArtemisSessionProvider? = null
constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, enableSNI: Boolean, proxyConfig: ProxyConfig? = null) : this(config, proxyConfig, maxMessageSize, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, enableSNI: Boolean, proxyConfig: ProxyConfig? = null, trace: Boolean = false)
: this(config, proxyConfig, maxMessageSize, enableSNI, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }, trace = trace)
companion object {
private const val NUM_BRIDGE_THREADS = 0 // Default sized pool
@ -239,7 +243,7 @@ open class AMQPBridgeManager(config: MutualSslConfiguration,
return
}
}
val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, useOpenSsl, enableSNI, sourceX500Name) }
val newAMQPConfig = with(amqpConfig) { AMQPConfigurationImpl(keyStore, trustStore, proxyConfig, maxMessageSize, useOpenSsl, enableSNI, sourceX500Name, trace) }
val newBridge = AMQPBridge(sourceX500Name, queueName, targets, legalNames, newAMQPConfig, sharedEventLoopGroup!!, artemis!!, bridgeMetricsService)
bridges += newBridge
bridgeMetricsService?.bridgeCreated(targets, legalNames)

View File

@ -28,15 +28,16 @@ class BridgeControlListener(val config: MutualSslConfiguration,
maxMessageSize: Int,
enableSNI: Boolean,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
bridgeMetricsService: BridgeMetricsService? = null) : AutoCloseable {
bridgeMetricsService: BridgeMetricsService? = null,
trace: Boolean = false) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId"
private val validInboundQueues = mutableSetOf<String>()
private val bridgeManager = if (enableSNI) {
LoopbackBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic)
LoopbackBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic, trace)
} else {
AMQPBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService)
AMQPBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, trace)
}
private var artemis: ArtemisSessionProvider? = null
private var controlConsumer: ClientConsumer? = null

View File

@ -31,7 +31,8 @@ class LoopbackBridgeManager(config: MutualSslConfiguration,
enableSNI: Boolean,
private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
private val bridgeMetricsService: BridgeMetricsService? = null,
private val isLocalInbox: (String) -> Boolean) : AMQPBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService) {
private val isLocalInbox: (String) -> Boolean,
trace: Boolean) : AMQPBridgeManager(config, proxyConfig, maxMessageSize, enableSNI, artemisMessageClientFactory, bridgeMetricsService, trace) {
companion object {
private val log = contextLogger()

View File

@ -46,6 +46,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
private val onReceive: (ReceivedMessage) -> Unit) : ChannelDuplexHandler() {
companion object {
private val log = contextLogger()
const val PROXY_LOGGER_NAME = "preProxyLogger"
}
private lateinit var remoteAddress: InetSocketAddress
@ -123,6 +124,14 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
when (evt) {
is ProxyConnectionEvent -> {
if(trace) {
log.info("ProxyConnectionEvent received: $evt")
try {
ctx.pipeline().remove(PROXY_LOGGER_NAME)
} catch (ex: NoSuchElementException) {
// ignore
}
}
// update address to the real target address
remoteAddress = evt.destinationAddress()
}

View File

@ -7,8 +7,10 @@ import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.handler.proxy.HttpProxyHandler
import io.netty.handler.proxy.Socks4ProxyHandler
import io.netty.handler.proxy.Socks5ProxyHandler
import io.netty.resolver.NoopAddressResolverGroup
import io.netty.util.internal.logging.InternalLoggerFactory
import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.identity.CordaX500Name
@ -17,6 +19,7 @@ import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPChannelHandler.Companion.PROXY_LOGGER_NAME
import net.corda.nodeapi.internal.requireMessageSize
import rx.Observable
import rx.subjects.PublishSubject
@ -30,7 +33,8 @@ import kotlin.concurrent.withLock
enum class ProxyVersion {
SOCKS4,
SOCKS5
SOCKS5,
HTTP
}
data class ProxyConfig(val version: ProxyVersion, val proxyAddress: NetworkHostAndPort, val userName: String? = null, val password: String? = null) {
@ -136,18 +140,28 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
val socksConfig = conf.proxyConfig
if (socksConfig != null) {
val proxyAddress = InetSocketAddress(socksConfig.proxyAddress.host, socksConfig.proxyAddress.port)
val proxyConfig = conf.proxyConfig
if (proxyConfig != null) {
if (conf.trace) pipeline.addLast(PROXY_LOGGER_NAME, LoggingHandler(LogLevel.INFO))
val proxyAddress = InetSocketAddress(proxyConfig.proxyAddress.host, proxyConfig.proxyAddress.port)
val proxy = when (conf.proxyConfig!!.version) {
ProxyVersion.SOCKS4 -> {
Socks4ProxyHandler(proxyAddress, socksConfig.userName)
Socks4ProxyHandler(proxyAddress, proxyConfig.userName)
}
ProxyVersion.SOCKS5 -> {
Socks5ProxyHandler(proxyAddress, socksConfig.userName, socksConfig.password)
Socks5ProxyHandler(proxyAddress, proxyConfig.userName, proxyConfig.password)
}
ProxyVersion.HTTP -> {
val httpProxyHandler = if(proxyConfig.userName == null || proxyConfig.password == null) {
HttpProxyHandler(proxyAddress)
} else {
HttpProxyHandler(proxyAddress, proxyConfig.userName, proxyConfig.password)
}
//httpProxyHandler.setConnectTimeoutMillis(3600000) // 1hr for debugging purposes
httpProxyHandler
}
}
pipeline.addLast("SocksPoxy", proxy)
pipeline.addLast("Proxy", proxy)
proxy.connectFuture().addListener {
if (!it.isSuccess) {
ch.disconnect()
@ -201,6 +215,10 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
val bootstrap = Bootstrap()
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
bootstrap.group(workerGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this))
// Delegate DNS Resolution to the proxy side, if we are using proxy.
if (configuration.proxyConfig != null) {
bootstrap.resolver(NoopAddressResolverGroup.INSTANCE)
}
currentTarget = targets[targetIndex]
val clientFuture = bootstrap.connect(currentTarget.host, currentTarget.port)
clientFuture.addListener(connectListener)

View File

@ -173,6 +173,7 @@ dependencies {
// Web stuff: for HTTP[S] servlets
testCompile "org.eclipse.jetty:jetty-servlet:${jetty_version}"
testCompile "org.eclipse.jetty:jetty-webapp:${jetty_version}"
testCompile "org.eclipse.jetty:jetty-proxy:${jetty_version}"
testCompile "javax.servlet:javax.servlet-api:3.1.0"
// Jersey for JAX-RS implementation for use in Jetty
@ -210,10 +211,9 @@ dependencies {
compile "com.palominolabs.metrics:metrics-new-relic:${metrics_new_relic_version}"
// Allow access to simple SOCKS Server for integration testing
testCompile("io.netty:netty-example:$netty_version") {
integrationTestCompile("io.netty:netty-example:$netty_version") {
exclude group: "io.netty", module: "netty-tcnative"
exclude group: "ch.qos.logback", module: "logback-classic"
}
// Adding native SSL library to allow using native SSL with Artemis and AMQP

View File

@ -0,0 +1,175 @@
package net.corda.node.amqp
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.div
import net.corda.core.toFuture
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.netty.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.stubs.CertificateStoreStubs
import org.junit.*
import org.junit.rules.TemporaryFolder
import kotlin.test.assertEquals
import org.eclipse.jetty.proxy.ProxyServlet
import org.eclipse.jetty.servlet.ServletContextHandler.SESSIONS
import org.eclipse.jetty.proxy.ConnectHandler
import org.eclipse.jetty.server.Connector
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.ServerConnector
import org.eclipse.jetty.servlet.ServletContextHandler
import org.eclipse.jetty.servlet.ServletHolder
class HttpTests {
@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
private val portAllocator = PortAllocation.Incremental(10000)
private val httpProxyPort = portAllocator.nextPort()
private val serverPort = portAllocator.nextPort()
private val serverPort2 = portAllocator.nextPort()
private val artemisPort = portAllocator.nextPort()
private abstract class AbstractNodeConfiguration : NodeConfiguration
private val httpProxy: Server = reverseJettyProxy()
private fun reverseJettyProxy() : Server {
val server = Server()
val connector = ServerConnector(server)
connector.host = "localhost"
connector.port = httpProxyPort
server.connectors = arrayOf<Connector>(connector)
// Setup proxy handler to handle CONNECT methods
val proxy = ConnectHandler()
server.handler = proxy
// Setup proxy servlet
val context = ServletContextHandler(proxy, "/", SESSIONS)
val proxyServlet = ServletHolder(ProxyServlet.Transparent::class.java)
proxyServlet.setInitParameter("ProxyTo", "localhost:$serverPort")
proxyServlet.setInitParameter("Prefix", "/")
context.addServlet(proxyServlet, "/*")
return server
}
@Before
fun setup() {
httpProxy.start()
}
@After
fun shutdown() {
httpProxy.stop()
}
@Test
fun `Simple AMPQ Client to Server`() {
val amqpServer = createServer(serverPort)
amqpServer.use {
amqpServer.start()
val receiveSubs = amqpServer.onReceive.subscribe {
assertEquals(BOB_NAME.toString(), it.sourceLegalName)
assertEquals(P2P_PREFIX + "Test", it.topic)
assertEquals("Test", String(it.payload))
it.complete(true)
}
val amqpClient = createClient()
amqpClient.use {
val serverConnected = amqpServer.onConnection.toFuture()
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val serverConnect = serverConnected.get()
assertEquals(true, serverConnect.connected)
assertEquals(BOB_NAME, CordaX500Name.build(serverConnect.remoteCert!!.subjectX500Principal))
val clientConnect = clientConnected.get()
assertEquals(true, clientConnect.connected)
assertEquals(ALICE_NAME, CordaX500Name.build(clientConnect.remoteCert!!.subjectX500Principal))
val msg = amqpClient.createMessage("Test".toByteArray(),
P2P_PREFIX + "Test",
ALICE_NAME.toString(),
emptyMap())
amqpClient.write(msg)
assertEquals(MessageStatus.Acknowledged, msg.onComplete.get())
receiveSubs.unsubscribe()
}
}
}
private fun createClient(): AMQPClient {
val baseDirectory = temporaryFolder.root.toPath() / "client"
val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val clientConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(temporaryFolder.root.toPath() / "client").whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(BOB_NAME).whenever(it).myLegalName
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
}
clientConfig.configureWithDevSSLCertificate()
val clientTruststore = clientConfig.p2pSslOptions.trustStore.get()
val clientKeystore = clientConfig.p2pSslOptions.keyStore.get()
val amqpConfig = object : AMQPConfiguration {
override val keyStore = clientKeystore
override val trustStore = clientTruststore
override val trace: Boolean = true
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
override val proxyConfig: ProxyConfig? = ProxyConfig(ProxyVersion.HTTP, NetworkHostAndPort("127.0.0.1", httpProxyPort), null, null)
}
return AMQPClient(
listOf(NetworkHostAndPort("localhost", serverPort),
NetworkHostAndPort("localhost", serverPort2),
NetworkHostAndPort("localhost", artemisPort)),
setOf(ALICE_NAME, CHARLIE_NAME),
amqpConfig)
}
private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME): AMQPServer {
val baseDirectory = temporaryFolder.root.toPath() / "server"
val certificatesDirectory = baseDirectory / "certificates"
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory)
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
val serverConfig = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(baseDirectory).whenever(it).baseDirectory
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
doReturn(name).whenever(it).myLegalName
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
}
serverConfig.configureWithDevSSLCertificate()
val serverTruststore = serverConfig.p2pSslOptions.trustStore.get()
val serverKeystore = serverConfig.p2pSslOptions.keyStore.get()
val amqpConfig = object : AMQPConfiguration {
override val keyStore = serverKeystore
override val trustStore = serverTruststore
override val trace: Boolean = true
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
}
return AMQPServer(
"0.0.0.0",
port,
amqpConfig)
}
}

View File

@ -211,7 +211,8 @@ class LoopbackBridgeTest(private val useOpenSsl: Boolean) {
artemisConfig.enableSNI,
{ ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize) },
null,
{ true })
{ true },
false)
bridgeManager.start()
val artemis = artemisClient.started!!