mirror of
https://github.com/corda/corda.git
synced 2025-06-18 15:18:16 +00:00
Prevent bridge reconnection attempts on targets that present invalid/misconfigured/different certificates to protect nodes from dead identities. (#3225)
This commit is contained in:
@ -41,6 +41,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
private var localCert: X509Certificate? = null
|
private var localCert: X509Certificate? = null
|
||||||
private var remoteCert: X509Certificate? = null
|
private var remoteCert: X509Certificate? = null
|
||||||
private var eventProcessor: EventProcessor? = null
|
private var eventProcessor: EventProcessor? = null
|
||||||
|
private var badCert: Boolean = false
|
||||||
|
|
||||||
override fun channelActive(ctx: ChannelHandlerContext) {
|
override fun channelActive(ctx: ChannelHandlerContext) {
|
||||||
val ch = ctx.channel()
|
val ch = ctx.channel()
|
||||||
@ -72,7 +73,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
override fun channelInactive(ctx: ChannelHandlerContext) {
|
override fun channelInactive(ctx: ChannelHandlerContext) {
|
||||||
val ch = ctx.channel()
|
val ch = ctx.channel()
|
||||||
log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
|
log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
|
||||||
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false)))
|
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, remoteCert, false, badCert)))
|
||||||
eventProcessor?.close()
|
eventProcessor?.close()
|
||||||
ctx.fireChannelInactive()
|
ctx.fireChannelInactive()
|
||||||
}
|
}
|
||||||
@ -86,19 +87,22 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
|||||||
val remoteX500Name = try {
|
val remoteX500Name = try {
|
||||||
CordaX500Name.build(remoteCert!!.subjectX500Principal)
|
CordaX500Name.build(remoteCert!!.subjectX500Principal)
|
||||||
} catch (ex: IllegalArgumentException) {
|
} catch (ex: IllegalArgumentException) {
|
||||||
|
badCert = true
|
||||||
log.error("Certificate subject not a valid CordaX500Name", ex)
|
log.error("Certificate subject not a valid CordaX500Name", ex)
|
||||||
ctx.close()
|
ctx.close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) {
|
if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) {
|
||||||
|
badCert = true
|
||||||
log.error("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
|
log.error("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
|
||||||
ctx.close()
|
ctx.close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.info("Handshake completed with subject: $remoteX500Name")
|
log.info("Handshake completed with subject: $remoteX500Name")
|
||||||
createAMQPEngine(ctx)
|
createAMQPEngine(ctx)
|
||||||
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true)))
|
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
|
||||||
} else {
|
} else {
|
||||||
|
badCert = true
|
||||||
log.error("Handshake failure ${evt.cause().message}")
|
log.error("Handshake failure ${evt.cause().message}")
|
||||||
if (log.isTraceEnabled) {
|
if (log.isTraceEnabled) {
|
||||||
log.trace("Handshake failure", evt.cause())
|
log.trace("Handshake failure", evt.cause())
|
||||||
|
@ -51,7 +51,7 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
|||||||
|
|
||||||
val log = contextLogger()
|
val log = contextLogger()
|
||||||
const val MIN_RETRY_INTERVAL = 1000L
|
const val MIN_RETRY_INTERVAL = 1000L
|
||||||
const val MAX_RETRY_INTERVAL = 60000L
|
const val MAX_RETRY_INTERVAL = 300000L
|
||||||
const val BACKOFF_MULTIPLIER = 2L
|
const val BACKOFF_MULTIPLIER = 2L
|
||||||
const val NUM_CLIENT_THREADS = 2
|
const val NUM_CLIENT_THREADS = 2
|
||||||
}
|
}
|
||||||
@ -66,9 +66,22 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
|||||||
private var targetIndex = 0
|
private var targetIndex = 0
|
||||||
private var currentTarget: NetworkHostAndPort = targets.first()
|
private var currentTarget: NetworkHostAndPort = targets.first()
|
||||||
private var retryInterval = MIN_RETRY_INTERVAL
|
private var retryInterval = MIN_RETRY_INTERVAL
|
||||||
|
private val badCertTargets = mutableSetOf<NetworkHostAndPort>()
|
||||||
|
|
||||||
private fun nextTarget() {
|
private fun nextTarget() {
|
||||||
targetIndex = (targetIndex + 1).rem(targets.size)
|
val origIndex = targetIndex
|
||||||
|
targetIndex = -1
|
||||||
|
for (offset in 1..targets.size) {
|
||||||
|
val newTargetIndex = (origIndex + offset).rem(targets.size)
|
||||||
|
if (targets[newTargetIndex] !in badCertTargets) {
|
||||||
|
targetIndex = newTargetIndex
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (targetIndex == -1) {
|
||||||
|
log.error("No targets have presented acceptable certificates for $allowedRemoteLegalNames. Halting retries")
|
||||||
|
return
|
||||||
|
}
|
||||||
log.info("Retry connect to ${targets[targetIndex]}")
|
log.info("Retry connect to ${targets[targetIndex]}")
|
||||||
retryInterval = min(MAX_RETRY_INTERVAL, retryInterval * BACKOFF_MULTIPLIER)
|
retryInterval = min(MAX_RETRY_INTERVAL, retryInterval * BACKOFF_MULTIPLIER)
|
||||||
}
|
}
|
||||||
@ -116,7 +129,8 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
|||||||
|
|
||||||
override fun initChannel(ch: SocketChannel) {
|
override fun initChannel(ch: SocketChannel) {
|
||||||
val pipeline = ch.pipeline()
|
val pipeline = ch.pipeline()
|
||||||
val handler = createClientSslHelper(parent.currentTarget, keyManagerFactory, trustManagerFactory)
|
val target = parent.currentTarget
|
||||||
|
val handler = createClientSslHelper(target, keyManagerFactory, trustManagerFactory)
|
||||||
pipeline.addLast("sslHandler", handler)
|
pipeline.addLast("sslHandler", handler)
|
||||||
if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
|
if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
|
||||||
pipeline.addLast(AMQPChannelHandler(false,
|
pipeline.addLast(AMQPChannelHandler(false,
|
||||||
@ -128,7 +142,13 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
|||||||
parent.retryInterval = MIN_RETRY_INTERVAL // reset to fast reconnect if we connect properly
|
parent.retryInterval = MIN_RETRY_INTERVAL // reset to fast reconnect if we connect properly
|
||||||
parent._onConnection.onNext(it.second)
|
parent._onConnection.onNext(it.second)
|
||||||
},
|
},
|
||||||
{ parent._onConnection.onNext(it.second) },
|
{
|
||||||
|
parent._onConnection.onNext(it.second)
|
||||||
|
if (it.second.badCert) {
|
||||||
|
log.error("Blocking future connection attempts to $target due to bad certificate on endpoint")
|
||||||
|
parent.badCertTargets += target
|
||||||
|
}
|
||||||
|
},
|
||||||
{ rcv -> parent._onReceive.onNext(rcv) }))
|
{ rcv -> parent._onReceive.onNext(rcv) }))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -142,6 +162,9 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun restart() {
|
private fun restart() {
|
||||||
|
if (targetIndex == -1) {
|
||||||
|
return
|
||||||
|
}
|
||||||
val bootstrap = Bootstrap()
|
val bootstrap = Bootstrap()
|
||||||
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
|
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
|
||||||
bootstrap.group(workerGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this))
|
bootstrap.group(workerGroup).channel(NioSocketChannel::class.java).handler(ClientChannelInitializer(this))
|
||||||
|
@ -3,4 +3,4 @@ package net.corda.nodeapi.internal.protonwrapper.netty
|
|||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.security.cert.X509Certificate
|
import java.security.cert.X509Certificate
|
||||||
|
|
||||||
data class ConnectionChange(val remoteAddress: InetSocketAddress, val remoteCert: X509Certificate?, val connected: Boolean)
|
data class ConnectionChange(val remoteAddress: InetSocketAddress, val remoteCert: X509Certificate?, val connected: Boolean, val badCert: Boolean)
|
Reference in New Issue
Block a user