CORDA-2071 - Log file filling with Artemis messages and rolling over quickly (#4034)

* ENT-1850: Improve reporting of connection problems (#3124)

* Add nicer logging for SSL handshake problems

* Just in case let people see the horrid netty exception traces at trace level

(cherry picked from commit 3c005789c0)

* fixup after cherrypick

* Prevent bridge reconnection attempts on targets that present invalid/misconfigured/different certificates to protect nodes from dead identities. (#3225)

(cherry picked from commit 7ff008d4e3)

* fixup after cherrypick

* add extra error handling (copied from master)
This commit is contained in:
Patrick Kuo 2018-10-05 12:58:36 +01:00 committed by Katelyn Baker
parent 04b8606374
commit 262427fc12
5 changed files with 275 additions and 25 deletions

View File

@ -21,7 +21,9 @@ import org.apache.qpid.proton.engine.impl.ProtocolTracer
import org.apache.qpid.proton.framing.TransportFrame
import org.slf4j.LoggerFactory
import java.net.InetSocketAddress
import java.nio.channels.ClosedChannelException
import java.security.cert.X509Certificate
import javax.net.ssl.SSLException
/**
* An instance of AMQPChannelHandler sits inside the netty pipeline and controls the socket level lifecycle.
@ -41,6 +43,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
private lateinit var localCert: X509Certificate
private lateinit var remoteCert: X509Certificate
private var eventProcessor: EventProcessor? = null
private var badCert: Boolean = false
override fun channelActive(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
@ -72,7 +75,7 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
override fun channelInactive(ctx: ChannelHandlerContext) {
val ch = ctx.channel()
log.info("Closed client connection ${ch.id()} from $remoteAddress to ${ch.localAddress()}")
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, null, false)))
onClose(Pair(ch as SocketChannel, ConnectionChange(remoteAddress, null, false, badCert)))
eventProcessor?.close()
ctx.fireChannelInactive()
}
@ -83,24 +86,51 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
val sslHandler = ctx.pipeline().get(SslHandler::class.java)
localCert = sslHandler.engine().session.localCertificates[0].x509
remoteCert = sslHandler.engine().session.peerCertificates[0].x509
try {
val remoteX500Name = CordaX500Name.build(remoteCert.subjectX500Principal)
require(allowedRemoteLegalNames == null || remoteX500Name in allowedRemoteLegalNames)
log.info("handshake completed subject: $remoteX500Name")
val remoteX500Name = try {
CordaX500Name.build(remoteCert.subjectX500Principal)
} catch (ex: IllegalArgumentException) {
log.error("Invalid certificate subject", ex)
badCert = true
log.error("Certificate subject not a valid CordaX500Name", ex)
ctx.close()
return
}
if (allowedRemoteLegalNames != null && remoteX500Name !in allowedRemoteLegalNames) {
badCert = true
log.error("Provided certificate subject $remoteX500Name not in expected set $allowedRemoteLegalNames")
ctx.close()
return
}
log.info("Handshake completed with subject: $remoteX500Name")
createAMQPEngine(ctx)
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true)))
onOpen(Pair(ctx.channel() as SocketChannel, ConnectionChange(remoteAddress, remoteCert, true, false)))
} else {
log.error("Handshake failure $evt")
val cause = evt.cause()
// This happens when the peer node is closed during SSL establishment.
if (cause is ClosedChannelException) {
log.warn("SSL Handshake closed early.")
} else if (cause is SSLException && cause.message == "handshake timed out") { // Sadly the exception thrown by Netty wrapper requires that we check the message.
log.warn("SSL Handshake timed out")
} else {
badCert = true
}
log.error("Handshake failure ${evt.cause().message}")
if (log.isTraceEnabled) {
log.trace("Handshake failure", evt.cause())
}
ctx.close()
}
}
}
@Suppress("OverridingDeprecatedMember")
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
log.warn("Closing channel due to nonrecoverable exception ${cause.message}")
if (log.isTraceEnabled) {
log.trace("Pipeline uncaught exception", cause)
}
ctx.close()
}
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
try {
log.debug { "Received $msg" }

View File

@ -17,6 +17,7 @@ import net.corda.nodeapi.internal.protonwrapper.messages.SendableMessage
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
import rx.Observable
import rx.subjects.PublishSubject
import java.lang.Long.min
import java.security.KeyStore
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
@ -46,7 +47,9 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
}
val log = contextLogger()
const val RETRY_INTERVAL = 1000L
const val MIN_RETRY_INTERVAL = 1000L
const val MAX_RETRY_INTERVAL = 300000L
const val BACKOFF_MULTIPLIER = 2L
const val NUM_CLIENT_THREADS = 2
}
@ -59,6 +62,26 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
// Offset into the list of targets, so that we can implement round-robin reconnect logic.
private var targetIndex = 0
private var currentTarget: NetworkHostAndPort = targets.first()
private var retryInterval = MIN_RETRY_INTERVAL
private val badCertTargets = mutableSetOf<NetworkHostAndPort>()
private fun nextTarget() {
val origIndex = targetIndex
targetIndex = -1
for (offset in 1..targets.size) {
val newTargetIndex = (origIndex + offset).rem(targets.size)
if (targets[newTargetIndex] !in badCertTargets) {
targetIndex = newTargetIndex
break
}
}
if (targetIndex == -1) {
log.error("No targets have presented acceptable certificates for $allowedRemoteLegalNames. Halting retries")
return
}
log.info("Retry connect to ${targets[targetIndex]}")
retryInterval = min(MAX_RETRY_INTERVAL, retryInterval * BACKOFF_MULTIPLIER)
}
private val connectListener = object : ChannelFutureListener {
override fun operationComplete(future: ChannelFuture) {
@ -67,10 +90,9 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
if (!stopping) {
workerGroup?.schedule({
log.info("Retry connect to $currentTarget")
targetIndex = (targetIndex + 1).rem(targets.size)
nextTarget()
restart()
}, RETRY_INTERVAL, TimeUnit.MILLISECONDS)
}, retryInterval, TimeUnit.MILLISECONDS)
}
} else {
log.info("Connected to $currentTarget")
@ -88,10 +110,9 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
clientChannel = null
if (!stopping) {
workerGroup?.schedule({
log.info("Retry connect")
targetIndex = (targetIndex + 1).rem(targets.size)
nextTarget()
restart()
}, RETRY_INTERVAL, TimeUnit.MILLISECONDS)
}, retryInterval, TimeUnit.MILLISECONDS)
}
}
}
@ -107,7 +128,8 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
override fun initChannel(ch: SocketChannel) {
val pipeline = ch.pipeline()
val handler = createClientSslHelper(parent.currentTarget, parent.allowedRemoteLegalNames, keyManagerFactory, trustManagerFactory)
val target = parent.currentTarget
val handler = createClientSslHelper(target, parent.allowedRemoteLegalNames, keyManagerFactory, trustManagerFactory)
pipeline.addLast("sslHandler", handler)
if (parent.trace) pipeline.addLast("logger", LoggingHandler(LogLevel.INFO))
pipeline.addLast(AMQPChannelHandler(false,
@ -115,8 +137,17 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
parent.userName,
parent.password,
parent.trace,
{ parent._onConnection.onNext(it.second) },
{ parent._onConnection.onNext(it.second) },
{
parent.retryInterval = MIN_RETRY_INTERVAL // reset to fast reconnect if we connect properly
parent._onConnection.onNext(it.second)
},
{
parent._onConnection.onNext(it.second)
if (it.second.badCert) {
log.error("Blocking future connection attempts to $target due to bad certificate on endpoint")
parent.badCertTargets += target
}
},
{ rcv -> parent._onReceive.onNext(rcv) }))
}
}
@ -130,6 +161,9 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
}
private fun restart() {
if (targetIndex == -1) {
return
}
val bootstrap = Bootstrap()
// TODO Needs more configuration control when we profile. e.g. to use EPOLL on Linux
bootstrap.group(workerGroup).

View File

@ -3,4 +3,4 @@ package net.corda.nodeapi.internal.protonwrapper.netty
import java.net.InetSocketAddress
import java.security.cert.X509Certificate
data class ConnectionChange(val remoteAddress: InetSocketAddress, val remoteCert: X509Certificate?, val connected: Boolean)
data class ConnectionChange(val remoteAddress: InetSocketAddress, val remoteCert: X509Certificate?, val connected: Boolean, val badCert: Boolean)

View File

@ -4,22 +4,114 @@ import io.netty.handler.ssl.SslHandler
import net.corda.core.crypto.SecureHash
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.toHex
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.internal.crypto.toBc
import org.bouncycastle.asn1.x509.AuthorityKeyIdentifier
import org.bouncycastle.asn1.x509.Extension
import org.bouncycastle.asn1.x509.SubjectKeyIdentifier
import java.net.Socket
import java.security.KeyStore
import java.security.SecureRandom
import javax.net.ssl.KeyManagerFactory
import javax.net.ssl.SNIHostName
import javax.net.ssl.SSLContext
import javax.net.ssl.TrustManagerFactory
import java.security.cert.*
import java.util.*
import javax.net.ssl.*
private const val HOSTNAME_FORMAT = "%s.corda.net"
internal class LoggingTrustManagerWrapper(val wrapped: X509ExtendedTrustManager) : X509ExtendedTrustManager() {
companion object {
val log = contextLogger()
}
private fun certPathToString(certPath: Array<out X509Certificate>?): String {
if (certPath == null) {
return "<empty certpath>"
}
val certs = certPath.map {
val bcCert = it.toBc()
val subject = bcCert.subject.toString()
val issuer = bcCert.issuer.toString()
val keyIdentifier = try {
SubjectKeyIdentifier.getInstance(bcCert.getExtension(Extension.subjectKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (ex: Exception) {
"null"
}
val authorityKeyIdentifier = try {
AuthorityKeyIdentifier.getInstance(bcCert.getExtension(Extension.authorityKeyIdentifier).parsedValue).keyIdentifier.toHex()
} catch (ex: Exception) {
"null"
}
" $subject[$keyIdentifier] issued by $issuer[$authorityKeyIdentifier]"
}
return certs.joinToString("\r\n")
}
private fun certPathToStringFull(chain: Array<out X509Certificate>?): String {
if (chain == null) {
return "<empty certpath>"
}
return chain.map { it.toString() }.joinToString(", ")
}
private fun logErrors(chain: Array<out X509Certificate>?, block: () -> Unit) {
try {
block()
} catch (ex: CertificateException) {
log.error("Bad certificate path ${ex.message}:\r\n${certPathToStringFull(chain)}")
throw ex
}
}
@Throws(CertificateException::class)
override fun checkClientTrusted(chain: Array<out X509Certificate>?, authType: String?, socket: Socket?) {
log.info("Check Client Certpath:\r\n${certPathToString(chain)}")
logErrors(chain) { wrapped.checkClientTrusted(chain, authType, socket) }
}
@Throws(CertificateException::class)
override fun checkClientTrusted(chain: Array<out X509Certificate>?, authType: String?, engine: SSLEngine?) {
log.info("Check Client Certpath:\r\n${certPathToString(chain)}")
logErrors(chain) { wrapped.checkClientTrusted(chain, authType, engine) }
}
@Throws(CertificateException::class)
override fun checkClientTrusted(chain: Array<out X509Certificate>?, authType: String?) {
log.info("Check Client Certpath:\r\n${certPathToString(chain)}")
logErrors(chain) { wrapped.checkClientTrusted(chain, authType) }
}
@Throws(CertificateException::class)
override fun checkServerTrusted(chain: Array<out X509Certificate>?, authType: String?, socket: Socket?) {
log.info("Check Server Certpath:\r\n${certPathToString(chain)}")
logErrors(chain) { wrapped.checkServerTrusted(chain, authType, socket) }
}
@Throws(CertificateException::class)
override fun checkServerTrusted(chain: Array<out X509Certificate>?, authType: String?, engine: SSLEngine?) {
log.info("Check Server Certpath:\r\n${certPathToString(chain)}")
logErrors(chain) { wrapped.checkServerTrusted(chain, authType, engine) }
}
@Throws(CertificateException::class)
override fun checkServerTrusted(chain: Array<out X509Certificate>?, authType: String?) {
log.info("Check Server Certpath:\r\n${certPathToString(chain)}")
logErrors(chain) { wrapped.checkServerTrusted(chain, authType) }
}
override fun getAcceptedIssuers(): Array<X509Certificate> = wrapped.acceptedIssuers
}
internal fun createClientSslHelper(target: NetworkHostAndPort,
expectedRemoteLegalNames: Set<CordaX500Name>,
keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers
val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray()
sslContext.init(keyManagers, trustManagers, SecureRandom())
val sslEngine = sslContext.createSSLEngine(target.host, target.port)
sslEngine.useClientMode = true
@ -38,7 +130,7 @@ internal fun createServerSslHelper(keyManagerFactory: KeyManagerFactory,
trustManagerFactory: TrustManagerFactory): SslHandler {
val sslContext = SSLContext.getInstance("TLS")
val keyManagers = keyManagerFactory.keyManagers
val trustManagers = trustManagerFactory.trustManagers
val trustManagers = trustManagerFactory.trustManagers.filterIsInstance(X509ExtendedTrustManager::class.java).map { LoggingTrustManagerWrapper(it) }.toTypedArray()
sslContext.init(keyManagers, trustManagers, SecureRandom())
val sslEngine = sslContext.createSSLEngine()
sslEngine.useClientMode = false

View File

@ -12,20 +12,30 @@ import net.corda.node.services.config.CertChainPolicyConfig
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.ArtemisTcpTransport.Companion.CIPHER_SUITES
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.config.SSLConfiguration
import net.corda.nodeapi.internal.createDevKeyStores
import net.corda.nodeapi.internal.crypto.*
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.testing.core.*
import net.corda.testing.internal.createDevIntermediateCaCertPath
import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.RoutingType
import org.junit.Assert.assertArrayEquals
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.security.SecureRandom
import java.security.cert.X509Certificate
import javax.net.ssl.*
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class ProtonWrapperTests {
@Rule
@ -86,6 +96,90 @@ class ProtonWrapperTests {
}
}
private fun SSLConfiguration.createTrustStore(rootCert: X509Certificate) {
val trustStore = loadOrCreateKeyStore(trustStoreFile, trustStorePassword)
trustStore.addOrReplaceCertificate(X509Utilities.CORDA_ROOT_CA, rootCert)
trustStore.save(trustStoreFile, trustStorePassword)
}
@Test
fun `Test AMQP Client with invalid root certificate`() {
val sslConfig = object : SSLConfiguration {
override val certificatesDirectory = temporaryFolder.root.toPath()
override val keyStorePassword = "serverstorepass"
override val trustStorePassword = "trustpass"
}
val (rootCa, intermediateCa) = createDevIntermediateCaCertPath()
// Generate server cert and private key and populate another keystore suitable for SSL
sslConfig.createDevKeyStores(ALICE_NAME, rootCa.certificate, intermediateCa)
sslConfig.createTrustStore(rootCa.certificate)
val keyStore = loadKeyStore(sslConfig.sslKeystore, sslConfig.keyStorePassword)
val trustStore = loadKeyStore(sslConfig.trustStoreFile, sslConfig.trustStorePassword)
val context = SSLContext.getInstance("TLS")
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm())
keyManagerFactory.init(keyStore, sslConfig.keyStorePassword.toCharArray())
val keyManagers = keyManagerFactory.keyManagers
val trustMgrFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm())
trustMgrFactory.init(trustStore)
val trustManagers = trustMgrFactory.trustManagers
context.init(keyManagers, trustManagers, SecureRandom())
val serverSocketFactory = context.serverSocketFactory
val serverSocket = serverSocketFactory.createServerSocket(serverPort) as SSLServerSocket
val serverParams = SSLParameters(CIPHER_SUITES.toTypedArray(),
arrayOf("TLSv1.2"))
serverParams.wantClientAuth = true
serverParams.needClientAuth = true
serverParams.endpointIdentificationAlgorithm = null // Reconfirm default no server name indication, use our own validator.
serverSocket.sslParameters = serverParams
serverSocket.useClientMode = false
val lock = Object()
var done = false
var handshakeError = false
val serverThread = thread {
try {
val sslServerSocket = serverSocket.accept() as SSLSocket
sslServerSocket.addHandshakeCompletedListener {
done = true
}
sslServerSocket.startHandshake()
synchronized(lock) {
while (!done) {
lock.wait(1000)
}
}
sslServerSocket.close()
} catch (ex: SSLHandshakeException) {
handshakeError = true
}
}
val amqpClient = createClient()
amqpClient.use {
val clientConnected = amqpClient.onConnection.toFuture()
amqpClient.start()
val clientConnect = clientConnected.get()
assertEquals(false, clientConnect.connected)
synchronized(lock) {
done = true
lock.notifyAll()
}
}
serverThread.join(1000)
assertTrue(handshakeError)
serverSocket.close()
assertTrue(done)
}
@Test
fun `Client Failover for multiple IP`() {
val amqpServer = createServer(serverPort)