Initial cut of SOCKS proxy support

Correct the reconnect logic when SOCKS proxy is in the pipeline

Add integration tests and adjust handling of reconnect

Rename parameter
This commit is contained in:
Matthew Nesbit
2018-03-12 17:39:44 +00:00
parent a26c5c1483
commit 6885661b66
9 changed files with 440 additions and 10 deletions

View File

@ -28,6 +28,7 @@ import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companio
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientConsumer
@ -47,7 +48,7 @@ import kotlin.concurrent.withLock
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
*/
@VisibleForTesting
class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
class AMQPBridgeManager(config: NodeSSLConfiguration, private val socksProxyConfig: SocksProxyConfig? = null, val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
private val lock = ReentrantLock()
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
@ -57,7 +58,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
private val trustStore = config.loadTrustStore().internal
private var artemis: ArtemisSessionProvider? = null
constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int) : this(config, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
constructor(config: NodeSSLConfiguration, p2pAddress: NetworkHostAndPort, maxMessageSize: Int, socksProxyConfig: SocksProxyConfig? = null) : this(config, socksProxyConfig, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object {
private const val NUM_BRIDGE_THREADS = 0 // Default sized pool
@ -78,6 +79,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
keyStorePrivateKeyPassword: String,
trustStore: KeyStore,
sharedEventGroup: EventLoopGroup,
socksProxyConfig: SocksProxyConfig?,
private val artemis: ArtemisSessionProvider) {
companion object {
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
@ -85,7 +87,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
private val log = LoggerFactory.getLogger("$bridgeName:${legalNames.first()}")
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, sharedThreadPool = sharedEventGroup)
val amqpClient = AMQPClient(listOf(target), legalNames, PEER_USER, PEER_USER, keyStore, keyStorePrivateKeyPassword, trustStore, sharedThreadPool = sharedEventGroup, socksProxyConfig = socksProxyConfig)
val bridgeName: String get() = getBridgeName(queueName, target)
private val lock = ReentrantLock() // lock to serialise session level access
private var session: ClientSession? = null
@ -179,7 +181,7 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
if (bridgeExists(getBridgeName(queueName, target))) {
return
}
val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, sharedEventLoopGroup!!, artemis!!)
val newBridge = AMQPBridge(queueName, target, legalNames, keyStore, keyStorePrivateKeyPassword, trustStore, sharedEventLoopGroup!!, socksProxyConfig, artemis!!)
lock.withLock {
bridgeNameToBridgeMap[newBridge.bridgeName] = newBridge
}

View File

@ -22,6 +22,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisSessionProvider
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer
@ -29,16 +30,18 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
import java.util.*
class BridgeControlListener(val config: NodeSSLConfiguration,
socksProxyConfig: SocksProxyConfig? = null,
val artemisMessageClientFactory: () -> ArtemisSessionProvider) : AutoCloseable {
private val bridgeId: String = UUID.randomUUID().toString()
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, artemisMessageClientFactory)
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, artemisMessageClientFactory)
private val validInboundQueues = mutableSetOf<String>()
private var artemis: ArtemisSessionProvider? = null
private var controlConsumer: ClientConsumer? = null
constructor(config: NodeSSLConfiguration,
p2pAddress: NetworkHostAndPort,
maxMessageSize: Int) : this(config, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
maxMessageSize: Int,
socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
companion object {
private val log = contextLogger()

View File

@ -15,6 +15,8 @@ import io.netty.channel.ChannelDuplexHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelPromise
import io.netty.channel.socket.SocketChannel
import io.netty.handler.proxy.ProxyConnectException
import io.netty.handler.proxy.ProxyConnectionEvent
import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeCompletionEvent
import io.netty.util.ReferenceCountUtil
@ -51,6 +53,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
private var localCert: X509Certificate? = null
private var remoteCert: X509Certificate? = null
private var eventProcessor: EventProcessor? = null
private var suppressClose: Boolean = false
override fun channelActive(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
@ -82,12 +85,17 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
override fun channelInactive(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false)))
if (!suppressClose) {
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false)))
}
eventProcessor?.close()
ctx.fireChannelInactive()
}
override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
if (evt is ProxyConnectionEvent) {
remoteAddress = evt.destinationAddress() // update address to teh real target address
}
if (evt is SslHandshakeCompletionEvent) {
if (evt.isSuccess) {
val sslHandler = ctx.pipeline().get(SslHandler::class.java)
@ -111,6 +119,15 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
}
}
@Suppress("OverridingDeprecatedMember")
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
if (cause is ProxyConnectException) {
log.warn("Proxy connection failed ${cause.message}")
suppressClose = true // The pipeline gets marked as active on connection to the proxy rather than to the target, which causes excess close events
}
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
try {
log.debug { "Received $msg" }

View File

@ -17,6 +17,8 @@ import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import io.netty.handler.proxy.Socks4ProxyHandler
import io.netty.handler.proxy.Socks5ProxyHandler
import io.netty.util.internal.logging.InternalLoggerFactory
import io.netty.util.internal.logging.Slf4JLoggerFactory
import net.corda.core.identity.CordaX500Name
@ -27,6 +29,7 @@ import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
import rx.Observable
import rx.subjects.PublishSubject
import java.net.InetSocketAddress
import java.security.KeyStore
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
@ -34,6 +37,19 @@ import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.TrustManagerFactory
import kotlin.concurrent.withLock
enum class SocksProxyVersion {
SOCKS4,
SOCKS5
}
data class SocksProxyConfig(val version: SocksProxyVersion, val proxyAddress: NetworkHostAndPort, val userName: String? = null, val password: String? = null) {
init {
if (version == SocksProxyVersion.SOCKS4) {
require(password == null) { "SOCKS4 does not support a password" }
}
}
}
/**
* The AMQPClient creates a connection initiator that will try to connect in a round-robin fashion
* to the first open SSL socket. It will keep retrying until it is stopped.
@ -49,7 +65,8 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
private val keyStorePrivateKeyPassword: String,
private val trustStore: KeyStore,
private val trace: Boolean = false,
private val sharedThreadPool: EventLoopGroup? = null) : AutoCloseable {
private val sharedThreadPool: EventLoopGroup? = null,
private val socksProxyConfig: SocksProxyConfig? = null) : AutoCloseable {
companion object {
init {
InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE)
@ -117,6 +134,25 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
val socksConfig = parent.socksProxyConfig
if (socksConfig != null) {
val proxyAddress = InetSocketAddress(socksConfig.proxyAddress.host, socksConfig.proxyAddress.port)
val proxy = when (parent.socksProxyConfig!!.version) {
SocksProxyVersion.SOCKS4 -> {
Socks4ProxyHandler(proxyAddress, socksConfig.userName)
}
SocksProxyVersion.SOCKS5 -> {
Socks5ProxyHandler(proxyAddress, socksConfig.userName, socksConfig.password)
}
}
pipeline.addLast("SocksPoxy", proxy)
proxy.connectFuture().addListener {
if (!it.isSuccess) {
ch.disconnect()
}
}
}
val handler = createClientSslHelper(parent.currentTarget, keyManagerFactory, trustManagerFactory)
pipeline.addLast("sslHandler", handler)
if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))