[CORDA-2890] - Close security manager after broker is shut down (#5425)

This commit is contained in:
Dimos Raptis 2019-09-04 14:06:46 +01:00 committed by Shams Asari
parent a842740c9e
commit 1ac7715e28
4 changed files with 20 additions and 25 deletions

View File

@ -27,6 +27,7 @@ import net.corda.testing.core.*
import net.corda.testing.node.User import net.corda.testing.node.User
import net.corda.testing.node.internal.NodeBasedTest import net.corda.testing.node.internal.NodeBasedTest
import net.corda.testing.node.internal.ProcessUtilities import net.corda.testing.node.internal.ProcessUtilities
import net.corda.testing.node.internal.poll
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.assertj.core.api.Assertions.assertThatExceptionOfType
@ -37,10 +38,7 @@ import rx.subjects.PublishSubject
import java.net.URLClassLoader import java.net.URLClassLoader
import java.nio.file.Paths import java.nio.file.Paths
import java.util.* import java.util.*
import java.util.concurrent.CountDownLatch import java.util.concurrent.*
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFalse import kotlin.test.assertFalse
import kotlin.test.assertTrue import kotlin.test.assertTrue
@ -101,7 +99,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
var successful = false var successful = false
val maxCount = 120 val maxCount = 120
var count = 0 var count = 0
CloseableExecutor(Executors.newSingleThreadScheduledExecutor()).use { scheduler -> CloseableExecutor(Executors.newScheduledThreadPool(2)).use { scheduler ->
val task = scheduler.scheduleAtFixedRate({ val task = scheduler.scheduleAtFixedRate({
try { try {
@ -116,8 +114,6 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
} catch (e: RPCException) { } catch (e: RPCException) {
log.info("... node is not running.") log.info("... node is not running.")
nodeIsShut.onCompleted() nodeIsShut.onCompleted()
} catch (e: ActiveMQSecurityException) {
// nothing here - this happens if trying to connect before the node is started
} catch (e: Exception) { } catch (e: Exception) {
nodeIsShut.onError(e) nodeIsShut.onError(e)
} }
@ -126,11 +122,17 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
nodeIsShut.doOnError { error -> nodeIsShut.doOnError { error ->
log.error("FAILED TO SHUT DOWN NODE DUE TO", error) log.error("FAILED TO SHUT DOWN NODE DUE TO", error)
successful = false successful = false
task.cancel(true) task.cancel(false)
latch.countDown() latch.countDown()
}.doOnCompleted { }.doOnCompleted {
successful = (node.node.started == null) val nodeTerminated = try {
task.cancel(true) poll(scheduler, pollName = "node's started state", check = { if (node.node.started == null) true else null })
.get(10, TimeUnit.SECONDS)
} catch (e: Exception) {
false
}
successful = nodeTerminated
task.cancel(false)
latch.countDown() latch.countDown()
}.subscribe() }.subscribe()

View File

@ -71,9 +71,6 @@ class ReconnectingCordaRPCOps private constructor(
observersPool ?: Executors.newCachedThreadPool(), observersPool ?: Executors.newCachedThreadPool(),
observersPool != null) observersPool != null)
private companion object { private companion object {
// See https://r3-cev.atlassian.net/browse/CORDA-2890.
// TODO Once the bug is fixed, this retry logic should be removed.
const val MAX_RETRY_ATTEMPTS_ON_AUTH_ERROR = 3
private val log = contextLogger() private val log = contextLogger()
private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): InternalCordaRPCOps { private fun proxy(reconnectingRPCConnection: ReconnectingRPCConnection, observersPool: ExecutorService): InternalCordaRPCOps {
return Proxy.newProxyInstance( return Proxy.newProxyInstance(
@ -159,8 +156,7 @@ class ReconnectingCordaRPCOps private constructor(
return currentRPCConnection!! return currentRPCConnection!!
} }
private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, currentAuthenticationRetries: Int = 0, roundRobinIndex: Int = 0): CordaRPCConnection { private tailrec fun establishConnectionWithRetry(retryInterval: Duration = 1.seconds, roundRobinIndex: Int = 0): CordaRPCConnection {
var _currentAuthenticationRetries = currentAuthenticationRetries
val attemptedAddress = nodeHostAndPorts[roundRobinIndex] val attemptedAddress = nodeHostAndPorts[roundRobinIndex]
log.info("Connecting to: $attemptedAddress") log.info("Connecting to: $attemptedAddress")
try { try {
@ -176,13 +172,9 @@ class ReconnectingCordaRPCOps private constructor(
} catch (ex: Exception) { } catch (ex: Exception) {
when (ex) { when (ex) {
is ActiveMQSecurityException -> { is ActiveMQSecurityException -> {
// Happens when incorrect credentials provided.
// It can happen at startup as well when the credentials are correct.
if (_currentAuthenticationRetries++ > MAX_RETRY_ATTEMPTS_ON_AUTH_ERROR) {
log.error("Failed to login to node.", ex) log.error("Failed to login to node.", ex)
throw ex throw ex
} }
}
is RPCException -> { is RPCException -> {
// Deliberately not logging full stack trace as it will be full of internal stacktraces. // Deliberately not logging full stack trace as it will be full of internal stacktraces.
log.debug { "Exception upon establishing connection: ${ex.message}" } log.debug { "Exception upon establishing connection: ${ex.message}" }
@ -204,7 +196,7 @@ class ReconnectingCordaRPCOps private constructor(
Thread.sleep(retryInterval.toMillis()) Thread.sleep(retryInterval.toMillis())
// TODO - make the exponential retry factor configurable. // TODO - make the exponential retry factor configurable.
val nextRoundRobinIndex = (roundRobinIndex + 1) % nodeHostAndPorts.size val nextRoundRobinIndex = (roundRobinIndex + 1) % nodeHostAndPorts.size
return establishConnectionWithRetry((retryInterval * 10) / 9, _currentAuthenticationRetries, nextRoundRobinIndex) return establishConnectionWithRetry((retryInterval * 10) / 9, nextRoundRobinIndex)
} }
override val proxy: CordaRPCOps override val proxy: CordaRPCOps
get() = current.proxy get() = current.proxy

View File

@ -15,6 +15,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import java.io.IOException import java.io.IOException
import java.lang.RuntimeException
import java.nio.file.Path import java.nio.file.Path
import java.security.KeyStoreException import java.security.KeyStoreException
import javax.security.auth.login.AppConfigurationEntry import javax.security.auth.login.AppConfigurationEntry
@ -60,6 +61,7 @@ class ArtemisRpcBroker internal constructor(
override fun stop() { override fun stop() {
logger.debug("Artemis RPC broker is stopping.") logger.debug("Artemis RPC broker is stopping.")
server.stop(true) server.stop(true)
securityManager.close()
logger.debug("Artemis RPC broker is stopped.") logger.debug("Artemis RPC broker is stopped.")
} }

View File

@ -243,7 +243,6 @@ class RPCServer(
reaperScheduledFuture?.cancel(false) reaperScheduledFuture?.cancel(false)
rpcExecutor?.shutdownNow() rpcExecutor?.shutdownNow()
reaperExecutor?.shutdownNow() reaperExecutor?.shutdownNow()
securityManager.close()
sessionFactory?.close() sessionFactory?.close()
observableMap.invalidateAll() observableMap.invalidateAll()
reapSubscriptions() reapSubscriptions()