diff --git a/build.gradle b/build.gradle index 85bffde2e5..3964ac44ad 100644 --- a/build.gradle +++ b/build.gradle @@ -61,7 +61,7 @@ buildscript { ext.capsule_version = '1.0.3' ext.asm_version = '7.1' - ext.artemis_version = '2.6.2' + ext.artemis_version = '2.19.1' // TODO Upgrade Jackson only when corda is using kotlin 1.3.10 ext.jackson_version = '2.9.7' ext.jetty_version = '9.4.19.v20190610' @@ -405,6 +405,7 @@ allprojects { includeGroup 'org.crashub' includeGroup 'com.github.bft-smart' includeGroup 'com.github.detro' + includeGroup 'org.apache.activemq' } } maven { diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index e476e0e581..94effe653a 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -30,6 +30,7 @@ import net.corda.testing.node.internal.rpcTestUser import net.corda.testing.node.internal.startRandomRpcClient import net.corda.testing.node.internal.startRpcClient import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.SimpleString import org.junit.After import org.junit.Assert.assertEquals @@ -551,7 +552,11 @@ class RPCStabilityTests { // Construct an RPC session manually so that we can hang in the message handler val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" val session = startArtemisSession(server.broker.hostAndPort!!) - session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue) + session.createQueue(QueueConfiguration(myQueue) + .setRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType()) + .setAddress(myQueue) + .setTemporary(true) + .setDurable(false)) val consumer = session.createConsumer(myQueue, null, -1, -1, false) consumer.setMessageHandler { Thread.sleep(5000) // Needs to be slower than one per second to get kicked. @@ -588,7 +593,11 @@ class RPCStabilityTests { // Construct an RPC client session manually val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" val session = startArtemisSession(server.broker.hostAndPort!!) - session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue) + session.createQueue(QueueConfiguration(myQueue) + .setRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType()) + .setAddress(myQueue) + .setTemporary(true) + .setDurable(false)) val consumer = session.createConsumer(myQueue, null, -1, -1, false) val replies = ArrayList() consumer.setMessageHandler { diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 2e4f2c529b..8458b4fb3f 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -95,6 +95,8 @@ class RPCClient( // By default RoundRobinConnectionLoadBalancingPolicy is used that picks first endpoint from the pool // at random. This may be undesired and non-deterministic. For more information, see [RoundRobinConnectionPolicy] connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName + // Without this any type of "send" time failures will not be delivered back to the client + isBlockOnNonDurableSend = true } val sessionId = Trace.SessionId.newInstance() val distributionMux = DistributionMux(listeners, username) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 82ac3cd579..c13266264b 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -39,6 +39,7 @@ import net.corda.nodeapi.internal.rpc.client.RpcClientObservableDeSerializer import net.corda.nodeapi.internal.rpc.client.RpcObservableMap import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE @@ -60,6 +61,7 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.Future import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import java.util.concurrent.TimeUnit @@ -380,11 +382,18 @@ internal class RPCClientProxyHandler( targetLegalIdentity?.let { artemisMessage.putStringProperty(RPCApi.RPC_TARGET_LEGAL_IDENTITY, it.toString()) } - sendExecutor!!.submit { + val future: Future<*> = sendExecutor!!.submit { artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, deduplicationSequenceNumber.getAndIncrement()) log.debug { "-> RPC -> $message" } - rpcProducer!!.send(artemisMessage) + rpcProducer!!.let { + if (!it.isClosed) { + it.send(artemisMessage) + } else { + log.info("Producer is already closed. Not sending: $message") + } + } } + future.getOrThrow() } // The handler for Artemis messages. @@ -570,7 +579,12 @@ internal class RPCClientProxyHandler( } if (observableIds != null) { log.debug { "Reaping ${observableIds.size} observables" } - sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds)) + @Suppress("TooGenericExceptionCaught") + try { + sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds)) + } catch(ex: Exception) { + log.warn("Unable to close observables", ex) + } } } @@ -632,7 +646,8 @@ internal class RPCClientProxyHandler( consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, 16384) clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$rpcUsername.${random63BitValue()}") log.debug { "Client address: $clientAddress" } - consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress) + consumerSession!!.createQueue(QueueConfiguration(clientAddress).setAddress(clientAddress).setRoutingType(RoutingType.ANYCAST) + .setTemporary(true).setDurable(false)) rpcConsumer = consumerSession!!.createConsumer(clientAddress) rpcConsumer!!.setMessageHandler(this::artemisMessageHandler) } diff --git a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt index dea9797cd2..49d3680e54 100644 --- a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt +++ b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt @@ -40,7 +40,7 @@ import net.corda.nodeapi.internal.config.User import net.corda.sleeping.SleepingFlow import net.corda.smoketesting.NodeConfig import net.corda.smoketesting.NodeProcess -import org.apache.commons.io.output.NullOutputStream +import org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM import org.hamcrest.text.MatchesPattern import org.junit.After import org.junit.Before @@ -117,7 +117,7 @@ class StandaloneCordaRPClientTest { assertEquals(attachment.sha256, id, "Attachment has incorrect SHA256 hash") val hash = HashingInputStream(Hashing.sha256(), rpcProxy.openAttachment(id)).use { it -> - it.copyTo(NullOutputStream()) + it.copyTo(NULL_OUTPUT_STREAM) SecureHash.SHA256(it.hash().asBytes()) } assertEquals(attachment.sha256, hash) @@ -132,7 +132,7 @@ class StandaloneCordaRPClientTest { assertEquals(attachment.sha256, id, "Attachment has incorrect SHA256 hash") val hash = HashingInputStream(Hashing.sha256(), rpcProxy.openAttachment(id)).use { it -> - it.copyTo(NullOutputStream()) + it.copyTo(NULL_OUTPUT_STREAM) SecureHash.SHA256(it.hash().asBytes()) } assertEquals(attachment.sha256, hash) diff --git a/config/dev/log4j2.xml b/config/dev/log4j2.xml index 02aa604cf4..171832c040 100644 --- a/config/dev/log4j2.xml +++ b/config/dev/log4j2.xml @@ -206,6 +206,10 @@ + + + + diff --git a/detekt-baseline.xml b/detekt-baseline.xml index d6fdc13c8c..046c21e008 100644 --- a/detekt-baseline.xml +++ b/detekt-baseline.xml @@ -1364,7 +1364,6 @@ ThrowsCount:AMQPTypeIdentifierParser.kt$AMQPTypeIdentifierParser$// Make sure our inputs aren't designed to blow things up. private fun validate(typeString: String) ThrowsCount:AbstractNode.kt$AbstractNode$private fun installCordaServices() ThrowsCount:ArtemisMessagingServer.kt$ArtemisMessagingServer$// TODO: Maybe wrap [IOException] on a key store load error so that it's clearly splitting key store loading from // Artemis IO errors @Throws(IOException::class, AddressBindingException::class, KeyStoreException::class) private fun configureAndStartServer() - ThrowsCount:BrokerJaasLoginModule.kt$BaseBrokerJaasLoginModule$@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate protected fun getUsernamePasswordAndCerts(): Triple<String, String, Array<javax.security.cert.X509Certificate>?> ThrowsCount:CheckpointVerifier.kt$CheckpointVerifier$ fun verifyCheckpointsCompatible( checkpointStorage: CheckpointStorage, currentCordapps: List<Cordapp>, platformVersion: Int, serviceHub: ServiceHub, tokenizableServices: List<Any> ) ThrowsCount:CheckpointVerifier.kt$CheckpointVerifier$// Throws exception when the flow is incompatible private fun checkFlowCompatible(subFlow: SubFlow, currentCordappsByHash: Map<SecureHash.SHA256, Cordapp>, platformVersion: Int) ThrowsCount:ClassCarpenter.kt$ClassCarpenterImpl$ private fun validateSchema(schema: Schema) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt index 1206cbe8ec..4a381c21ab 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisMessagingClient.kt @@ -66,7 +66,6 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration, retryInterval = messagingServerConnectionConfig.retryInterval().toMillis() retryIntervalMultiplier = messagingServerConnectionConfig.retryIntervalMultiplier() maxRetryInterval = messagingServerConnectionConfig.maxRetryInterval(isHA).toMillis() - isFailoverOnInitialConnection = messagingServerConnectionConfig.failoverOnInitialAttempt(isHA) initialConnectAttempts = messagingServerConnectionConfig.initialConnectAttempts(isHA) } addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize)) diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt index cc3bf585eb..cdf5ea8cc9 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/ArtemisTcpTransport.kt @@ -73,27 +73,27 @@ class ArtemisTcpTransport { private fun CertificateStore.toKeyStoreTransportOptions(path: Path) = mapOf( TransportConstants.SSL_ENABLED_PROP_NAME to true, - TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS", + TransportConstants.KEYSTORE_TYPE_PROP_NAME to "JKS", TransportConstants.KEYSTORE_PATH_PROP_NAME to path, TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to password, TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true) private fun CertificateStore.toTrustStoreTransportOptions(path: Path) = mapOf( TransportConstants.SSL_ENABLED_PROP_NAME to true, - TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS", + TransportConstants.TRUSTSTORE_TYPE_PROP_NAME to "JKS", TransportConstants.TRUSTSTORE_PATH_PROP_NAME to path, TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to password, TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true) private fun ClientRpcSslOptions.toTransportOptions() = mapOf( TransportConstants.SSL_ENABLED_PROP_NAME to true, - TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to trustStoreProvider, + TransportConstants.TRUSTSTORE_TYPE_PROP_NAME to trustStoreProvider, TransportConstants.TRUSTSTORE_PATH_PROP_NAME to trustStorePath, TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to trustStorePassword) private fun BrokerRpcSslOptions.toTransportOptions() = mapOf( TransportConstants.SSL_ENABLED_PROP_NAME to true, - TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS", + TransportConstants.KEYSTORE_TYPE_PROP_NAME to "JKS", TransportConstants.KEYSTORE_PATH_PROP_NAME to keyStorePath, TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword, TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to false) @@ -106,9 +106,9 @@ class ArtemisTcpTransport { return p2pAcceptorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false) } - fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreProvider: String? = null): TransportConfiguration { + fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreType: String? = null): TransportConfiguration { - return p2pConnectorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false, keyStoreProvider = keyStoreProvider) + return p2pConnectorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false, keyStoreType = keyStoreType) } fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false): TransportConfiguration { @@ -124,20 +124,22 @@ class ArtemisTcpTransport { } @Suppress("LongParameterList") - fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false, keyStoreProvider: String? = null): TransportConfiguration { + fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false, keyStoreType: String? = null): TransportConfiguration { val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap() if (enableSSL) { options.putAll(defaultSSLOptions) (keyStore to trustStore).addToTransportOptions(options) options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER - keyStoreProvider?.let { options.put(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME, keyStoreProvider) } + keyStoreType?.let { options.put(TransportConstants.KEYSTORE_TYPE_PROP_NAME, keyStoreType) } + // This is required to stop Client checking URL address vs. Server provided certificate + options[TransportConstants.VERIFY_HOST_PROP_NAME] = false } return TransportConfiguration(connectorFactoryClassName, options) } - fun p2pConnectorTcpTransportFromList(hostAndPortList: List, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreProvider: String? = null): List = hostAndPortList.map { - p2pConnectorTcpTransport(it, config, enableSSL, keyStoreProvider) + fun p2pConnectorTcpTransportFromList(hostAndPortList: List, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreType: String? = null): List = hostAndPortList.map { + p2pConnectorTcpTransport(it, config, enableSSL, keyStoreType) } fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { @@ -159,6 +161,8 @@ class ArtemisTcpTransport { config.trustStorePath.requireOnDefaultFileSystem() options.putAll(config.toTransportOptions()) options.putAll(defaultSSLOptions) + // This is required to stop Client checking URL address vs. Server provided certificate + options[TransportConstants.VERIFY_HOST_PROP_NAME] = false } return TransportConfiguration(connectorFactoryClassName, options) } @@ -167,17 +171,23 @@ class ArtemisTcpTransport { rpcConnectorTcpTransport(it, config, enableSSL) } - fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration { - return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions + config.toTransportOptions() + asMap(keyStoreProvider)) + fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreType: String? = null): TransportConfiguration { + val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap() + options.putAll(defaultSSLOptions) + options.putAll(config.toTransportOptions()) + options.putAll(asMap(keyStoreType)) + // This is required to stop Client checking URL address vs. Server provided certificate + options[TransportConstants.VERIFY_HOST_PROP_NAME] = false + return TransportConfiguration(connectorFactoryClassName, options) } - fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration { + fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreType: String? = null): TransportConfiguration { return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions + - config.toTransportOptions() + (TransportConstants.HANDSHAKE_TIMEOUT to 0) + asMap(keyStoreProvider)) + config.toTransportOptions() + (TransportConstants.HANDSHAKE_TIMEOUT to 0) + asMap(keyStoreType)) } - private fun asMap(keyStoreProvider: String?): Map { - return keyStoreProvider?.let {mutableMapOf(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to it)} ?: emptyMap() + private fun asMap(keyStoreType: String?): Map { + return keyStoreType?.let { mutableMapOf(TransportConstants.KEYSTORE_TYPE_PROP_NAME to it) } ?: emptyMap() } } } \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/MessageSizeChecksInterceptor.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/MessageSizeChecksInterceptor.kt index 5b813bff0a..9f789ac655 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/MessageSizeChecksInterceptor.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/MessageSizeChecksInterceptor.kt @@ -8,6 +8,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePac import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection +import org.apache.qpid.proton.amqp.messaging.Data class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor(maxMessageSize), Interceptor { override fun getMessageSize(packet: Packet?): Int? { @@ -22,7 +23,7 @@ class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChec } class AmqpMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor(maxMessageSize), AmqpInterceptor { - override fun getMessageSize(packet: AMQPMessage?): Int? = packet?.encodeSize + override fun getMessageSize(packet: AMQPMessage?): Int? = (packet?.protonMessage?.body as? Data)?.value?.length } /** diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index 2a37649667..422ecc1c7b 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -20,6 +20,7 @@ import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientConsumer @@ -114,7 +115,9 @@ class BridgeControlListener(private val keyStore: CertificateStore, private fun registerBridgeControlListener(artemisSession: ClientSession) { try { - artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue) + artemisSession.createQueue( + QueueConfiguration(bridgeControlQueue).setAddress(BRIDGE_CONTROL).setRoutingType(RoutingType.MULTICAST) + .setTemporary(true).setDurable(false)) } catch (ex: ActiveMQQueueExistsException) { // Ignore if there is a queue still not cleaned up } @@ -134,7 +137,9 @@ class BridgeControlListener(private val keyStore: CertificateStore, private fun registerBridgeDuplicateChecker(artemisSession: ClientSession) { try { - artemisSession.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue) + artemisSession.createQueue( + QueueConfiguration(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST) + .setTemporary(true).setDurable(false)) } catch (ex: ActiveMQQueueExistsException) { // Ignore if there is a queue still not cleaned up } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/MessagingServerConnectionConfiguration.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/MessagingServerConnectionConfiguration.kt index 2b1bf7829a..1f58a79219 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/MessagingServerConnectionConfiguration.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/config/MessagingServerConnectionConfiguration.kt @@ -15,7 +15,6 @@ import java.time.Duration * * totalFailoverDuration = 5 + 5 * 1.5 + 5 * (1.5)^2 + 5 * (1.5)^3 + 5 * (1.5)^4 = ~66 seconds * - * @param failoverOnInitialAttempt Determines whether failover is triggered if initial connection fails. * @param initialConnectAttempts The number of reconnect attempts if failover is enabled for initial connection. A value * of -1 represents infinite attempts. * @param reconnectAttempts The number of reconnect attempts for failover after initial connection is done. A value @@ -27,7 +26,6 @@ import java.time.Duration enum class MessagingServerConnectionConfiguration { DEFAULT { - override fun failoverOnInitialAttempt(isHa: Boolean) = true override fun initialConnectAttempts(isHa: Boolean) = 5 override fun reconnectAttempts(isHa: Boolean) = 5 override fun retryInterval() = 5.seconds @@ -36,7 +34,6 @@ enum class MessagingServerConnectionConfiguration { }, FAIL_FAST { - override fun failoverOnInitialAttempt(isHa: Boolean) = isHa override fun initialConnectAttempts(isHa: Boolean) = 0 // Client die too fast during failover/failback, need a few reconnect attempts to allow new master to become active override fun reconnectAttempts(isHa: Boolean) = if (isHa) 3 else 0 @@ -46,7 +43,6 @@ enum class MessagingServerConnectionConfiguration { }, CONTINUOUS_RETRY { - override fun failoverOnInitialAttempt(isHa: Boolean) = true override fun initialConnectAttempts(isHa: Boolean) = if (isHa) 0 else -1 override fun reconnectAttempts(isHa: Boolean) = -1 override fun retryInterval() = 5.seconds @@ -54,7 +50,6 @@ enum class MessagingServerConnectionConfiguration { override fun maxRetryInterval(isHa: Boolean) = if (isHa) 3.minutes else 5.minutes }; - abstract fun failoverOnInitialAttempt(isHa: Boolean): Boolean abstract fun initialConnectAttempts(isHa: Boolean): Int abstract fun reconnectAttempts(isHa: Boolean): Int abstract fun retryInterval(): Duration diff --git a/node/src/integration-test/kotlin/net/corda/node/AuthDBTests.kt b/node/src/integration-test/kotlin/net/corda/node/AuthDBTests.kt index b1afd23f0a..5687bfa9d3 100644 --- a/node/src/integration-test/kotlin/net/corda/node/AuthDBTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/AuthDBTests.kt @@ -3,6 +3,7 @@ package net.corda.node import co.paralleluniverse.fibers.Suspendable import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.PermissionException +import net.corda.client.rpc.RPCException import net.corda.core.flows.FlowLogic import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.StartableByRPC @@ -151,7 +152,7 @@ class AuthDBTests : NodeBasedTest(cordappPackages = CORDAPPS) { proxy.stateMachinesFeed() assertFailsWith( PermissionException::class, - "This user should not be authorized to call 'nodeInfo'") { + "This user should not be authorized to call 'stateMachinesFeed'") { proxy.nodeInfo() } } @@ -185,7 +186,7 @@ class AuthDBTests : NodeBasedTest(cordappPackages = CORDAPPS) { val proxy = it.proxy assertFailsWith( PermissionException::class, - "This user should not be authorized to call 'nodeInfo'") { + "This user should not be authorized to call 'stateMachinesFeed'") { proxy.stateMachinesFeed() } db.addRoleToUser("user3", "default") @@ -207,8 +208,8 @@ class AuthDBTests : NodeBasedTest(cordappPackages = CORDAPPS) { db.deleteUser("user4") Thread.sleep(1500) assertFailsWith( - PermissionException::class, - "This user should not be authorized to call 'nodeInfo'") { + RPCException::class, + "This user should not be authorized to call 'stateMachinesFeed'") { proxy.stateMachinesFeed() } } diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 72d2c1bd47..2c96dfb238 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -24,6 +24,7 @@ import net.corda.coretesting.internal.rigorousMock import net.corda.coretesting.internal.stubs.CertificateStoreStubs import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.junit.Assert.assertArrayEquals @@ -222,7 +223,8 @@ class AMQPBridgeTest { val artemis = artemisClient.started!! if (sourceQueueName != null) { // Local queue for outgoing messages - artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) + artemis.session.createQueue( + QueueConfiguration(sourceQueueName).setRoutingType(RoutingType.ANYCAST).setAddress(sourceQueueName).setDurable(true)) bridgeManager.deployBridge(ALICE_NAME.toString(), sourceQueueName, listOf(amqpAddress), setOf(BOB.name)) } return Triple(artemisServer, artemisClient, bridgeManager) diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt index e941a78aea..97a2795df6 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt @@ -32,6 +32,7 @@ import net.corda.coretesting.internal.stubs.CertificateStoreStubs import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.RoutingType import org.assertj.core.api.Assertions.assertThatIllegalArgumentException import org.bouncycastle.asn1.x500.X500Name @@ -487,7 +488,7 @@ class CertificateRevocationListNodeTests { @Path("node.crl") @Produces("application/pkcs7-crl") fun getNodeCRL(): Response { - return Response.ok(CertificateRevocationListNodeTests.createRevocationList( + return Response.ok(createRevocationList( server, SIGNATURE_ALGORITHM, INTERMEDIATE_CA.certificate, @@ -663,7 +664,8 @@ class CertificateRevocationListNodeTests { val queueName = P2P_PREFIX + "Test" val (artemisServer, artemisClient) = createArtemisServerAndClient(serverPort, crlCheckSoftFail, crlCheckArtemisServer) artemisServer.use { - artemisClient.started!!.session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) + artemisClient.started!!.session.createQueue( + QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true)) val (amqpClient, nodeCert) = createClient(serverPort, true, nodeCrlDistPoint) if (revokedNodeCert) { diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 21c4567e0b..dcbf40c145 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -34,6 +34,7 @@ import net.corda.testing.internal.createDevIntermediateCaCertPath import net.corda.coretesting.internal.rigorousMock import net.corda.coretesting.internal.stubs.CertificateStoreStubs import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.RoutingType import org.assertj.core.api.Assertions.assertThatThrownBy import org.junit.Assert.assertArrayEquals @@ -271,7 +272,8 @@ class ProtonWrapperTests { assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) val artemis = artemisClient.started!! val sendAddress = P2P_PREFIX + "Test" - artemis.session.createQueue(sendAddress, RoutingType.ANYCAST, "queue", true) + artemis.session.createQueue(QueueConfiguration("queue") + .setRoutingType(RoutingType.ANYCAST).setAddress(sendAddress).setDurable(true)) val consumer = artemis.session.createConsumer("queue") val testData = "Test".toByteArray() val testProperty = mutableMapOf() @@ -298,7 +300,8 @@ class ProtonWrapperTests { assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) val artemis = artemisClient.started!! val sendAddress = P2P_PREFIX + "Test" - artemis.session.createQueue(sendAddress, RoutingType.ANYCAST, "queue", true) + artemis.session.createQueue(QueueConfiguration("queue") + .setRoutingType(RoutingType.ANYCAST).setAddress(sendAddress).setDurable(true)) val consumer = artemis.session.createConsumer("queue") val testProperty = mutableMapOf() @@ -313,7 +316,7 @@ class ProtonWrapperTests { assertEquals("1", received.getStringProperty("TestProp")) assertArrayEquals(testData, ByteArray(received.bodySize).apply { received.bodyBuffer.readBytes(this) }) - // Send message larger then max message size. + // Send message larger than max message size. val largeData = ByteArray(maxMessageSize + 1) // Create message will fail. assertThatThrownBy { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 5d650942a8..95b0b76662 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -184,6 +184,7 @@ class ArtemisMessagingTest { messagingClient.send(tooLagerMessage, messagingClient.myAddress) }.isInstanceOf(ActiveMQConnectionTimedOutException::class.java) assertNull(receivedMessages.poll(200, MILLISECONDS)) + this.messagingClient = null } @Test(timeout=300_000) @@ -231,7 +232,9 @@ class ArtemisMessagingTest { MetricRegistry(), TestingNamedCacheFactory(), isDrainingModeOn = { false }, - drainingModeWasChangedEvents = PublishSubject.create>()).apply { + drainingModeWasChangedEvents = PublishSubject.create>(), + terminateOnConnectionError = false, + timeoutConfig = P2PMessagingClient.TimeoutConfig(10.seconds, 10.seconds, 10.seconds)).apply { config.configureWithDevSSLCertificate() messagingClient = this } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index 82c9804b8f..3b012b7672 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -28,6 +28,7 @@ import net.corda.testing.node.internal.NodeBasedTest import net.corda.testing.node.internal.startFlow import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.assertj.core.api.Assertions.assertThatExceptionOfType @@ -130,7 +131,11 @@ abstract class MQSecurityTest : NodeBasedTest() { fun assertTempQueueCreationAttackFails(queue: String) { assertAttackFails(queue, "CREATE_NON_DURABLE_QUEUE") { - attacker.session.createTemporaryQueue(queue, RoutingType.MULTICAST, queue) + attacker.session.createQueue(QueueConfiguration(queue) + .setRoutingType(RoutingType.MULTICAST) + .setAddress(queue) + .setTemporary(true) + .setDurable(false)) } // Double-check assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy { @@ -147,7 +152,8 @@ abstract class MQSecurityTest : NodeBasedTest() { fun assertNonTempQueueCreationAttackFails(queue: String, durable: Boolean) { val permission = if (durable) "CREATE_DURABLE_QUEUE" else "CREATE_NON_DURABLE_QUEUE" assertAttackFails(queue, permission) { - attacker.session.createQueue(queue, RoutingType.MULTICAST, queue, durable) + attacker.session.createQueue( + QueueConfiguration(queue).setAddress(queue).setRoutingType(RoutingType.MULTICAST).setDurable(durable)) } // Double-check assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy { diff --git a/node/src/main/kotlin/net/corda/node/internal/artemis/ArtemisBroker.kt b/node/src/main/kotlin/net/corda/node/internal/artemis/ArtemisBroker.kt index 5dc7e2c4ea..ccea39accf 100644 --- a/node/src/main/kotlin/net/corda/node/internal/artemis/ArtemisBroker.kt +++ b/node/src/main/kotlin/net/corda/node/internal/artemis/ArtemisBroker.kt @@ -1,6 +1,5 @@ package net.corda.node.internal.artemis -import io.netty.channel.unix.Errors import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.LifecycleSupport import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl @@ -18,4 +17,11 @@ data class BrokerAddresses(val primary: NetworkHostAndPort, private val adminArg val admin = adminArg ?: primary } -fun java.io.IOException.isBindingError() = this is BindException || this is Errors.NativeIoException && message?.contains("Address already in use") == true \ No newline at end of file +fun Throwable.isBindingError(): Boolean { + val addressAlreadyUsedMsg = "Address already in use" + // This is not an exact science here. + // Depending on the underlying OS it can be either [Errors.NativeIoException] on Linux or [BindException] on Windows + // and of course this is dependent on the version of Artemis library used. + return this is BindException || + this is IllegalStateException && cause.let { it is BindException || it?.message?.contains(addressAlreadyUsedMsg) == true } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/internal/artemis/BrokerJaasLoginModule.kt b/node/src/main/kotlin/net/corda/node/internal/artemis/BrokerJaasLoginModule.kt index c146629364..a58373e8dd 100644 --- a/node/src/main/kotlin/net/corda/node/internal/artemis/BrokerJaasLoginModule.kt +++ b/node/src/main/kotlin/net/corda/node/internal/artemis/BrokerJaasLoginModule.kt @@ -14,6 +14,7 @@ import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal import java.io.IOException import java.security.KeyStore import java.security.Principal +import java.security.cert.X509Certificate import java.util.* import javax.security.auth.Subject import javax.security.auth.callback.CallbackHandler @@ -119,26 +120,25 @@ class BrokerJaasLoginModule : BaseBrokerJaasLoginModule() { // The Main authentication logic, responsible for running all the configured checks for each user type // and return the actual User and principals - @Suppress("DEPRECATION") // should use java.security.cert.X509Certificate - private fun authenticateAndAuthorise(username: String, certificates: Array?, password: String): Pair> { - fun requireTls(certificates: Array?) = requireNotNull(certificates) { "No client certificates presented." } + private fun authenticateAndAuthorise(username: String, certificates: Array, password: String): Pair> { + fun requireTls(certificates: Array?) = requireNotNull(certificates) { "No client certificates presented." } return when (username) { ArtemisMessagingComponent.NODE_P2P_USER -> { requireTls(certificates) - CertificateChainCheckPolicy.LeafMustMatch.createCheck(nodeJaasConfig.keyStore, nodeJaasConfig.trustStore).checkCertificateChain(certificates!!) + CertificateChainCheckPolicy.LeafMustMatch.createCheck(nodeJaasConfig.keyStore, nodeJaasConfig.trustStore).checkCertificateChain(certificates) Pair(certificates.first().subjectDN.name, listOf(RolePrincipal(NODE_P2P_ROLE))) } ArtemisMessagingComponent.NODE_RPC_USER -> { requireTls(certificates) - CertificateChainCheckPolicy.LeafMustMatch.createCheck(nodeJaasConfig.keyStore, nodeJaasConfig.trustStore).checkCertificateChain(certificates!!) + CertificateChainCheckPolicy.LeafMustMatch.createCheck(nodeJaasConfig.keyStore, nodeJaasConfig.trustStore).checkCertificateChain(certificates) Pair(ArtemisMessagingComponent.NODE_RPC_USER, listOf(RolePrincipal(NODE_RPC_ROLE))) } ArtemisMessagingComponent.PEER_USER -> { requireNotNull(p2pJaasConfig) { "Attempted to connect as a peer to the rpc broker." } requireTls(certificates) // This check is redundant as it was performed already during the SSL handshake - CertificateChainCheckPolicy.RootMustMatch.createCheck(p2pJaasConfig!!.keyStore, p2pJaasConfig!!.trustStore).checkCertificateChain(certificates!!) + CertificateChainCheckPolicy.RootMustMatch.createCheck(p2pJaasConfig!!.keyStore, p2pJaasConfig!!.trustStore).checkCertificateChain(certificates) CertificateChainCheckPolicy.RevocationCheck(p2pJaasConfig!!.revocationMode) .createCheck(p2pJaasConfig!!.keyStore, p2pJaasConfig!!.trustStore).checkCertificateChain(certificates) Pair(certificates.first().subjectDN.name, listOf(RolePrincipal(PEER_ROLE))) @@ -176,8 +176,8 @@ abstract class BaseBrokerJaasLoginModule : LoginModule { protected lateinit var callbackHandler: CallbackHandler protected val principals = ArrayList() - @Suppress("DEPRECATION") // should use java.security.cert.X509Certificate - protected fun getUsernamePasswordAndCerts(): Triple?> { + @Suppress("ThrowsCount") + protected fun getUsernamePasswordAndCerts(): Triple> { val nameCallback = NameCallback("Username: ") val passwordCallback = PasswordCallback("Password: ", false) val certificateCallback = CertificateCallback() diff --git a/node/src/main/kotlin/net/corda/node/internal/artemis/CertificateChainCheckPolicy.kt b/node/src/main/kotlin/net/corda/node/internal/artemis/CertificateChainCheckPolicy.kt index 90a44f9c55..e8ef09a3c0 100644 --- a/node/src/main/kotlin/net/corda/node/internal/artemis/CertificateChainCheckPolicy.kt +++ b/node/src/main/kotlin/net/corda/node/internal/artemis/CertificateChainCheckPolicy.kt @@ -22,8 +22,7 @@ sealed class CertificateChainCheckPolicy { @FunctionalInterface interface Check { - @Suppress("DEPRECATION") // should use java.security.cert.X509Certificate - fun checkCertificateChain(theirChain: Array) + fun checkCertificateChain(theirChain: Array) } abstract fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check @@ -31,8 +30,7 @@ sealed class CertificateChainCheckPolicy { object Any : CertificateChainCheckPolicy() { override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { return object : Check { - @Suppress("DEPRECATION") // should use java.security.cert.X509Certificate - override fun checkCertificateChain(theirChain: Array) { + override fun checkCertificateChain(theirChain: Array) { // nothing to do here } } @@ -44,8 +42,7 @@ sealed class CertificateChainCheckPolicy { val rootAliases = trustStore.aliases().asSequence().filter { it.startsWith(X509Utilities.CORDA_ROOT_CA) } val rootPublicKeys = rootAliases.map { trustStore.getCertificate(it).publicKey }.toSet() return object : Check { - @Suppress("DEPRECATION") // should use java.security.cert.X509Certificate - override fun checkCertificateChain(theirChain: Array) { + override fun checkCertificateChain(theirChain: Array) { val theirRoot = theirChain.last().publicKey if (theirRoot !in rootPublicKeys) { throw CertificateException("Root certificate mismatch, their root = $theirRoot") @@ -59,8 +56,7 @@ sealed class CertificateChainCheckPolicy { override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { val ourPublicKey = keyStore.getCertificate(X509Utilities.CORDA_CLIENT_TLS).publicKey return object : Check { - @Suppress("DEPRECATION") // should use java.security.cert.X509Certificate - override fun checkCertificateChain(theirChain: Array) { + override fun checkCertificateChain(theirChain: Array) { val theirLeaf = theirChain.first().publicKey if (ourPublicKey != theirLeaf) { throw CertificateException("Leaf certificate mismatch, their leaf = $theirLeaf") @@ -74,8 +70,7 @@ sealed class CertificateChainCheckPolicy { override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { val trustedPublicKeys = trustedAliases.map { trustStore.getCertificate(it).publicKey }.toSet() return object : Check { - @Suppress("DEPRECATION") // should use java.security.cert.X509Certificate - override fun checkCertificateChain(theirChain: Array) { + override fun checkCertificateChain(theirChain: Array) { if (!theirChain.any { it.publicKey in trustedPublicKeys }) { throw CertificateException("Their certificate chain contained none of the trusted ones") } @@ -92,8 +87,7 @@ sealed class CertificateChainCheckPolicy { class UsernameMustMatchCommonNameCheck : Check { lateinit var username: String - @Suppress("DEPRECATION") // should use java.security.cert.X509Certificate - override fun checkCertificateChain(theirChain: Array) { + override fun checkCertificateChain(theirChain: Array) { if (!theirChain.any { certificate -> CordaX500Name.parse(certificate.subjectDN.name).commonName == username }) { throw CertificateException("Client certificate does not match login username.") } @@ -103,8 +97,7 @@ sealed class CertificateChainCheckPolicy { class RevocationCheck(val revocationMode: RevocationConfig.Mode) : CertificateChainCheckPolicy() { override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { return object : Check { - @Suppress("DEPRECATION") // should use java.security.cert.X509Certificate - override fun checkCertificateChain(theirChain: Array) { + override fun checkCertificateChain(theirChain: Array) { if (revocationMode == RevocationConfig.Mode.OFF) { return } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 277d51742b..1cb28178a8 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -11,6 +11,7 @@ import net.corda.node.internal.artemis.* import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_P2P_ROLE import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.PEER_ROLE import net.corda.node.services.config.NodeConfiguration +import net.corda.node.utilities.artemis.startSynchronously import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX @@ -89,28 +90,26 @@ class ArtemisMessagingServer(private val config: NodeConfiguration, override val started: Boolean get() = activeMQServer.isStarted - // TODO: Maybe wrap [IOException] on a key store load error so that it's clearly splitting key store loading from - // Artemis IO errors @Throws(IOException::class, AddressBindingException::class, KeyStoreException::class) private fun configureAndStartServer() { val artemisConfig = createArtemisConfig() val securityManager = createArtemisSecurityManager() activeMQServer = ActiveMQServerImpl(artemisConfig, securityManager).apply { - // Throw any exceptions which are detected during startup - registerActivationFailureListener { exception -> throw exception } // Some types of queue might need special preparation on our side, like dialling back or preparing // a lazily initialised subsystem. registerPostQueueCreationCallback { log.debug { "Queue Created: $it" } } registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } } + @Suppress("TooGenericExceptionCaught") try { - activeMQServer.start() - } catch (e: IOException) { + activeMQServer.startSynchronously() + } catch (e: Throwable) { log.error("Unable to start message broker", e) if (e.isBindingError()) { throw AddressBindingException(config.p2pAddress) } else { + log.error("Unexpected error starting message broker", e) throw e } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index 10a22f5aed..7e0aa7dd02 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -48,13 +48,22 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.* +import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory +import org.apache.activemq.artemis.api.core.client.FailoverEventType +import org.apache.activemq.artemis.api.core.client.ServerLocator import rx.Observable import rx.Subscription import rx.subjects.PublishSubject import java.security.PublicKey +import java.time.Duration import java.time.Instant import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -72,15 +81,17 @@ import kotlin.concurrent.timer * executor through into Artemis and from there, back through to senders. * * An implementation of [CordaRPCOps] can be provided. If given, clients using the CordaMQClient RPC library can - * invoke methods on the provided implementation. There is more documentation on this in the docsite and the + * invoke methods on the provided implementation. There is more documentation on this in the doc-site and the * CordaRPCClient class. * * @param config The configuration of the node, which is used for controlling the message redelivery options. * @param versionInfo All messages from the node carry the version info and received messages are checked against this for compatibility. * @param serverAddress The host and port of the Artemis broker. * @param nodeExecutor The received messages are marshalled onto the server executor to prevent Netty buffers leaking during fiber suspends. - * @param database The nodes database, which is used to deduplicate messages. + * @param database The node's database, which is used to deduplicate messages. + * @param terminateOnConnectionError whether the process should be terminated forcibly if connection with the broker fails. */ +@Suppress("LongParameterList") @ThreadSafe class P2PMessagingClient(val config: NodeConfiguration, private val versionInfo: VersionInfo, @@ -93,7 +104,9 @@ class P2PMessagingClient(val config: NodeConfiguration, cacheFactory: NamedCacheFactory, private val isDrainingModeOn: () -> Boolean, private val drainingModeWasChangedEvents: Observable>, - private val stateHelper: ServiceStateHelper = ServiceStateHelper(log) + private val stateHelper: ServiceStateHelper = ServiceStateHelper(log), + private val terminateOnConnectionError: Boolean = true, + private val timeoutConfig: TimeoutConfig = TimeoutConfig.default() ) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, ServiceStateSupport by stateHelper { companion object { private val log = contextLogger() @@ -126,6 +139,21 @@ class P2PMessagingClient(val config: NodeConfiguration, fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message) } + /** + * @property callTimeout the time a blocking call (e.g. message send) from a client waits for a response until it times out. + * @property serverConnectionTtl the time the server waits for a packet/heartbeat from a client before it announces the connection dead and cleans it up. + * @property clientConnectionTtl the time the client waits for a packet/heartbeat from a client before it announces the connection dead and cleans it up. + */ + data class TimeoutConfig(val callTimeout: Duration, val serverConnectionTtl: Duration, val clientConnectionTtl: Duration) { + companion object { + /** + * Some sensible defaults, aligned with defaults of Artemis + */ + @Suppress("MagicNumber") + fun default() = TimeoutConfig(30.seconds, 60.seconds, 30.seconds) + } + } + /** A registration to handle messages of different types */ data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration @@ -168,15 +196,21 @@ class P2PMessagingClient(val config: NodeConfiguration, locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // would be the default and the two lines below can be deleted. - connectionTTL = 60000 - clientFailureCheckPeriod = 30000 + callTimeout = timeoutConfig.callTimeout.toMillis() + connectionTTL = timeoutConfig.serverConnectionTtl.toMillis() + clientFailureCheckPeriod = timeoutConfig.clientConnectionTtl.toMillis() minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE isUseGlobalPools = nodeSerializationEnv != null } - sessionFactory = locator!!.createSessionFactory().addFailoverListener(::failoverCallback) + + sessionFactory = if (terminateOnConnectionError) { + locator!!.createSessionFactory().addFailoverListener(::failoverCallback) + } else { + locator!!.createSessionFactory() + } // Login using the node username. The broker will authenticate us as its node (as opposed to another peer) // using our TLS certificate. - // Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer + // Note that the acknowledgement of messages is not flushed to the Artemis journal until the default buffer // size of 1MB is acknowledged. val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) } @@ -234,7 +268,8 @@ class P2PMessagingClient(val config: NodeConfiguration, private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List) { val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}" if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) { - session.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue) + session.createQueue(QueueConfiguration(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST) + .setTemporary(true).setDurable(false)) } val bridgeConsumer = session.createConsumer(bridgeNotifyQueue) bridgeNotifyConsumer = bridgeConsumer @@ -266,8 +301,8 @@ class P2PMessagingClient(val config: NodeConfiguration, log.info("Updating bridges on network map change: ${change::class.simpleName} ${change.node}") fun gatherAddresses(node: NodeInfo): Sequence { return state.locked { - node.legalIdentitiesAndCerts.map { - val messagingAddress = NodeAddress(it.party.owningKey) + node.legalIdentitiesAndCerts.map { partyAndCertificate -> + val messagingAddress = NodeAddress(partyAndCertificate.party.owningKey) BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name }, serviceAddress = false) }.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence() } @@ -464,8 +499,8 @@ class P2PMessagingClient(val config: NodeConfiguration, running = false stateHelper.active = false networkChangeSubscription?.unsubscribe() - require(p2pConsumer != null, { "stop can't be called twice" }) - require(producer != null, { "stop can't be called twice" }) + require(p2pConsumer != null) { "stop can't be called twice" } + require(producer != null) { "stop can't be called twice" } close(p2pConsumer) p2pConsumer = null @@ -525,7 +560,7 @@ class P2PMessagingClient(val config: NodeConfiguration, // If we are sending to ourselves then route the message directly to our P2P queue. RemoteInboxAddress(myIdentity).queueName } else { - // Otherwise we send the message to an internal queue for the target residing on our broker. It's then the + // Otherwise, we send the message to an internal queue for the target residing on our broker. It's then the // broker's job to route the message to the target's P2P queue. val internalTargetQueue = (address as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address") @@ -557,9 +592,13 @@ class P2PMessagingClient(val config: NodeConfiguration, val queueQuery = session.queueQuery(SimpleString(queueName)) if (!queueQuery.isExists) { log.info("Create fresh queue $queueName bound on same address") - session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, - ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), - ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), exclusive, null) + session.createQueue(QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName) + .setDurable(true).setAutoCreated(false) + .setMaxConsumers(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers()) + .setPurgeOnNoConsumers(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers()) + .setExclusive(exclusive) + .setLastValue(null) + ) sendBridgeCreateMessage() } } @@ -568,7 +607,7 @@ class P2PMessagingClient(val config: NodeConfiguration, } override fun addMessageHandler(topic: String, callback: MessageHandler): MessageHandlerRegistration { - require(!topic.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } + require(topic.isNotBlank()) { "Topic must not be blank, as the empty topic is a special case." } handlers.compute(topic) { _, handler -> if (handler != null) { throw IllegalStateException("Cannot add another acking handler for $topic, there is already an acking one") diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/ArtemisRpcBroker.kt b/node/src/main/kotlin/net/corda/node/services/rpc/ArtemisRpcBroker.kt index 818923697d..6ae79d378e 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/ArtemisRpcBroker.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/ArtemisRpcBroker.kt @@ -8,6 +8,7 @@ import net.corda.node.internal.artemis.* import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_SECURITY_CONFIG import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.RPC_SECURITY_CONFIG import net.corda.node.internal.security.RPCSecurityManager +import net.corda.node.utilities.artemis.startSynchronously import net.corda.nodeapi.BrokerRpcSslOptions import net.corda.nodeapi.internal.config.MutualSslConfiguration import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl @@ -51,20 +52,19 @@ class ArtemisRpcBroker internal constructor( } } + @Suppress("TooGenericExceptionCaught") override fun start() { logger.debug { "Artemis RPC broker is starting for: $addresses" } try { - server.start() - } catch (e: IOException) { + server.startSynchronously() + } catch (e: Throwable) { logger.error("Unable to start message broker", e) if (e.isBindingError()) { throw AddressBindingException(adminAddressOptional?.let { setOf(it, addresses.primary) } ?: setOf(addresses.primary)) } else { + logger.error("Unexpected error starting message broker", e) throw e } - } catch (th: Throwable) { - logger.error("Unexpected error starting message broker", th) - throw th } logger.debug("Artemis RPC broker is started.") } @@ -90,7 +90,6 @@ class ArtemisRpcBroker internal constructor( val serverSecurityManager = createArtemisSecurityManager(serverConfiguration.loginListener) return ActiveMQServerImpl(serverConfiguration, serverSecurityManager).apply { - registerActivationFailureListener { exception -> throw exception } registerPostQueueDeletionCallback { address, qName -> logger.debug("Queue deleted: $qName for $address") } } } diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt index d8c320cab4..57ea80aadd 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/RpcBrokerConfiguration.kt @@ -12,8 +12,8 @@ import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcAcceptorTcpTr import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcInternalAcceptorTcpTransport import net.corda.nodeapi.internal.config.MutualSslConfiguration import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.core.config.CoreQueueConfiguration import org.apache.activemq.artemis.core.security.Role import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy import org.apache.activemq.artemis.core.settings.impl.AddressSettings @@ -37,14 +37,14 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, } acceptorConfigurations = acceptorConfigurationsSet - queueConfigurations = queueConfigurations() + queueConfigs = queueConfigurations() managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS) addressesSettings = mapOf( "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply { maxSizeBytes = 5L * maxMessageSize addressFullMessagePolicy = AddressFullMessagePolicy.PAGE - pageSizeBytes = 1L * maxMessageSize + pageSizeBytes = maxMessageSize } ) @@ -76,7 +76,11 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, securityRoles["${ArtemisMessagingComponent.INTERNAL_PREFIX}#"] = setOf(nodeInternalRole) securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(BrokerJaasLoginModule.RPC_ROLE, send = true)) securitySettingPlugins.add(rolesAdderOnLogin) - securityInvalidationInterval = ArtemisMessagingComponent.SECURITY_INVALIDATION_INTERVAL + + // Effectively disable security cache as permissions might change dynamically when e.g. DB is updated + authenticationCacheSize = 0 + authorizationCacheSize = 0 + securityInvalidationInterval = 0 } private fun enableJmx() { @@ -85,19 +89,19 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, } private fun initialiseSettings(maxMessageSize: Int, journalBufferTimeout: Int?) { - // Enable built in message deduplication. Note we still have to do our own as the delayed commits - // and our own definition of commit mean that the built in deduplication cannot remove all duplicates. + // Enable built-in message deduplication. Note we still have to do our own as the delayed commits + // and our own definition of commit means that the built-in deduplication cannot remove all the duplicates. idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess isPersistIDCache = true isPopulateValidatedUser = true - journalBufferSize_NIO = maxMessageSize // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store. + journalBufferSize_NIO = maxMessageSize // Artemis default is 490 KB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store. journalBufferTimeout_NIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio() journalBufferSize_AIO = maxMessageSize // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store. journalBufferTimeout_AIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutAio() - journalFileSize = maxMessageSize // The size of each journal file in bytes. Artemis default is 10MiB. + journalFileSize = maxMessageSize // The size of each journal file in bytes. Artemis default is 10 MB. } - private fun queueConfigurations(): List { + private fun queueConfigurations(): List { return listOf( queueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME, durable = false), queueConfiguration( @@ -122,15 +126,8 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, pagingDirectory = (baseDirectory / "paging").toString() } - private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration { - val configuration = CoreQueueConfiguration() - - configuration.name = name - configuration.address = address - configuration.filterString = filter - configuration.isDurable = durable - - return configuration + private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): QueueConfiguration { + return QueueConfiguration(name).setAddress(address).setFilterString(filter).setDurable(durable) } private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, diff --git a/node/src/main/kotlin/net/corda/node/utilities/artemis/ArtemisStartupUtil.kt b/node/src/main/kotlin/net/corda/node/utilities/artemis/ArtemisStartupUtil.kt new file mode 100644 index 0000000000..1f8675255a --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/utilities/artemis/ArtemisStartupUtil.kt @@ -0,0 +1,22 @@ +package net.corda.node.utilities.artemis + +import net.corda.core.utilities.getOrThrow +import org.apache.activemq.artemis.core.server.ActivateCallback +import org.apache.activemq.artemis.core.server.ActiveMQServer +import java.util.concurrent.CompletableFuture + +fun ActiveMQServer.startSynchronously() { + val startupFuture = CompletableFuture() + registerActivateCallback(object: ActivateCallback { + override fun activationComplete() { + startupFuture.complete(Unit) + } + }) + registerActivationFailureListener { + startupFuture.completeExceptionally(it) + } + + start() + + startupFuture.getOrThrow() +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/internal/artemis/RevocationCheckTest.kt b/node/src/test/kotlin/net/corda/node/internal/artemis/RevocationCheckTest.kt index 2e984eb3b5..2495fb370e 100644 --- a/node/src/test/kotlin/net/corda/node/internal/artemis/RevocationCheckTest.kt +++ b/node/src/test/kotlin/net/corda/node/internal/artemis/RevocationCheckTest.kt @@ -61,9 +61,7 @@ class RevocationCheckTest(private val revocationMode: RevocationConfig.Mode) { private lateinit var tlsCert: X509Certificate private val chain - get() = listOf(tlsCert, nodeCACert, doormanCert, rootCert).map { - javax.security.cert.X509Certificate.getInstance(it.encoded) - }.toTypedArray() + get() = listOf(tlsCert, nodeCACert, doormanCert, rootCert).toTypedArray() @Before fun before() { diff --git a/node/src/test/kotlin/net/corda/node/internal/artemis/UserValidationPluginTest.kt b/node/src/test/kotlin/net/corda/node/internal/artemis/UserValidationPluginTest.kt index 3f316258e7..b6b9288884 100644 --- a/node/src/test/kotlin/net/corda/node/internal/artemis/UserValidationPluginTest.kt +++ b/node/src/test/kotlin/net/corda/node/internal/artemis/UserValidationPluginTest.kt @@ -1,24 +1,34 @@ package net.corda.node.internal.artemis +import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.doThrow import com.nhaarman.mockito_kotlin.whenever import net.corda.coretesting.internal.rigorousMock import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import org.apache.activemq.artemis.api.core.ActiveMQSecurityException +import org.apache.activemq.artemis.api.core.Message import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl import org.apache.activemq.artemis.core.server.ServerSession import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage -import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage import org.assertj.core.api.Assertions import org.junit.Test class UserValidationPluginTest { private val plugin = UserValidationPlugin() - private val coreMessage = ClientMessageImpl(0, false, 0, System.currentTimeMillis(), 4.toByte(), 1024) - private val amqpMessage get() = AMQPConverter.getInstance().fromCore(coreMessage) + private val coreMessage = ClientMessageImpl(0, false, 0, System.currentTimeMillis(), + 4.toByte(), 1024) + private val amqpMessage: AMQPMessage + get() { + return rigorousMock().also { + doReturn(coreMessage.validatedUserID).whenever(it).getStringProperty(Message.HDR_VALIDATED_USER) + } + } + private val session = rigorousMock().also { doReturn(ArtemisMessagingComponent.PEER_USER).whenever(it).username doReturn(ALICE_NAME.toString()).whenever(it).validatedUser @@ -31,16 +41,17 @@ class UserValidationPluginTest { @Test(timeout = 300_000) fun `accept AMQP message with user`() { - coreMessage.putStringProperty("_AMQ_VALIDATED_USER", ALICE_NAME.toString()) + coreMessage.validatedUserID = ALICE_NAME.toString() plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false) } @Test(timeout = 300_000) fun `reject AMQP message with different user`() { - coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString()) + coreMessage.validatedUserID = BOB_NAME.toString() + val localAmqpMessage = amqpMessage Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { - plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false) - }.withMessageContaining("_AMQ_VALIDATED_USER") + plugin.beforeSend(session, rigorousMock(), localAmqpMessage, direct = false, noAutoCreateQueue = false) + }.withMessageContaining(Message.HDR_VALIDATED_USER.toString()) } @Test(timeout = 300_000) @@ -49,7 +60,7 @@ class UserValidationPluginTest { doReturn(ArtemisMessagingComponent.NODE_P2P_USER).whenever(it).username doReturn(ALICE_NAME.toString()).whenever(it).validatedUser } - coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString()) + coreMessage.validatedUserID = BOB_NAME.toString() plugin.beforeSend(internalSession, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false) } @@ -62,11 +73,8 @@ class UserValidationPluginTest { @Test(timeout = 300_000) fun `reject message with exception`() { - coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString()) - val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) { - override fun getStringProperty(key: SimpleString?): String { - throw IllegalStateException("My exception") - } + val messageWithException = rigorousMock().also { + doThrow(IllegalStateException("My exception")).whenever(it).getStringProperty(any()) } // Artemis swallows all exceptions except ActiveMQException, so making sure that proper exception is thrown Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { @@ -76,9 +84,8 @@ class UserValidationPluginTest { @Test(timeout = 300_000) fun `reject message with security exception`() { - coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString()) - val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) { - override fun getStringProperty(key: SimpleString?): String { + val messageWithException = object : AMQPStandardMessage(0, ByteArray(0), null) { + override fun getApplicationPropertiesMap(createIfAbsent: Boolean): MutableMap { throw ActiveMQSecurityException("My security exception") } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 73cb5dbc83..1d8106993d 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -35,6 +35,7 @@ import net.corda.testing.internal.TestingNamedCacheFactory import net.corda.testing.internal.fromUserList import net.corda.testing.node.NotarySpec import net.corda.testing.node.User +import org.apache.activemq.artemis.api.core.QueueConfiguration import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.api.core.client.ActiveMQClient @@ -42,7 +43,6 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.core.config.Configuration -import org.apache.activemq.artemis.core.config.CoreQueueConfiguration import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory @@ -201,30 +201,18 @@ data class RPCDriverDSL( journalBufferSize_NIO = maxFileSize journalBufferSize_AIO = maxFileSize journalFileSize = maxFileSize - queueConfigurations = listOf( - CoreQueueConfiguration().apply { - name = RPCApi.RPC_SERVER_QUEUE_NAME - address = RPCApi.RPC_SERVER_QUEUE_NAME - isDurable = false - }, - CoreQueueConfiguration().apply { - name = RPCApi.RPC_CLIENT_BINDING_REMOVALS - address = notificationAddress - filterString = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION - isDurable = false - }, - CoreQueueConfiguration().apply { - name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS - address = notificationAddress - filterString = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION - isDurable = false - } + queueConfigs = listOf( + QueueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME).setAddress(RPCApi.RPC_SERVER_QUEUE_NAME).setDurable(false), + QueueConfiguration(RPCApi.RPC_CLIENT_BINDING_REMOVALS).setAddress(notificationAddress) + .setFilterString(RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION).setDurable(false), + QueueConfiguration(RPCApi.RPC_CLIENT_BINDING_ADDITIONS).setAddress(notificationAddress) + .setFilterString(RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION).setDurable(false) ) addressesSettings = mapOf( "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply { maxSizeBytes = maxBufferedBytesPerClient addressFullMessagePolicy = AddressFullMessagePolicy.PAGE - pageSizeBytes = maxSizeBytes / 10 + pageSizeBytes = maxSizeBytes.toInt() / 10 } ) } @@ -259,7 +247,7 @@ data class RPCDriverDSL( * Starts an In-VM RPC server. Note that only a single one may be started. * * @param rpcUser The single user who can access the server through RPC, and their permissions. - * @param nodeLegalName The legal name of the node to check against to authenticate a super user. + * @param nodeLegalName The legal name of the node to check against to authenticate a superuser. * @param configuration The RPC server configuration. * @param ops The server-side implementation of the RPC interface. */ @@ -338,7 +326,7 @@ data class RPCDriverDSL( * * @param serverName The name of the server, to be used for the folder created for Artemis files. * @param rpcUser The single user who can access the server through RPC, and their permissions. - * @param nodeLegalName The legal name of the node to check against to authenticate a super user. + * @param nodeLegalName The legal name of the node to check against to authenticate a superuser. * @param configuration The RPC server configuration. * @param listOps The server-side implementation of the RPC interfaces. */ diff --git a/testing/test-common/src/main/resources/log4j2-test.xml b/testing/test-common/src/main/resources/log4j2-test.xml index 45910c8ca5..5edab5035f 100644 --- a/testing/test-common/src/main/resources/log4j2-test.xml +++ b/testing/test-common/src/main/resources/log4j2-test.xml @@ -94,6 +94,10 @@ + + + +