ENT-6286: OS: Upgrade version of Artemis to 2.19.1 (#6975)

Fixes DDoS attack mentioned on the Jira ticket.

PR upgrades Artemis library to version 2.19.1.
This is our own release of Apache Artemis library which has vulnerability fix for v2.20 applied.

**_Breaking changes discovered during Artemis upgrade:_**
1. When the queue is created as temporary, it needs to explicitly be specified as non-durable.
2. By default, Artemis Client performs Host DNS name check against the certificate presented by the server. Our TLS certificates fail this check and this verification has to be explicitly disabled, see use of: `TransportConstants.VERIFY_HOST_PROP_NAME`.
3. Artemis Server now caches login attempts, even unsuccessful ones. When we add RPC users dynamically via DB insert this may have an unexpected outcome if the user with the same `userName` and `password` was not available previously.
To workaround permissions changing dynamically, authorization and authentication caches had to be disabled.
4. When computing `maxMessageSize`, the size of the headers content is now taken into account as well.
5. Artemis handling of start-up errors has changed. E.g. when the port is already bound.
6. A number of deprecated APIs like: `createTemporaryQueue`, `failoverOnInitialAttempt`, `NullOutputStream`, `CoreQueueConfiguration`.
7. Log warning message is produced like: `AMQ212080: Using legacy SSL store provider value: JKS. Please use either 'keyStoreType' or 'trustStoreType' instead as appropriate.`
8. As reported by QA, Artemis now produces more audit logging more details [here](https://r3-cev.atlassian.net/browse/ENT-6540). Log configuration been adjusted to reduce such output.
This commit is contained in:
Viktor Kolomeyko
2022-01-21 09:18:14 +00:00
committed by GitHub
parent b17e4571bf
commit 835321bb70
30 changed files with 271 additions and 162 deletions

View File

@ -61,7 +61,7 @@ buildscript {
ext.capsule_version = '1.0.3' ext.capsule_version = '1.0.3'
ext.asm_version = '7.1' ext.asm_version = '7.1'
ext.artemis_version = '2.6.2' ext.artemis_version = '2.19.1'
// TODO Upgrade Jackson only when corda is using kotlin 1.3.10 // TODO Upgrade Jackson only when corda is using kotlin 1.3.10
ext.jackson_version = '2.9.7' ext.jackson_version = '2.9.7'
ext.jetty_version = '9.4.19.v20190610' ext.jetty_version = '9.4.19.v20190610'
@ -405,6 +405,7 @@ allprojects {
includeGroup 'org.crashub' includeGroup 'org.crashub'
includeGroup 'com.github.bft-smart' includeGroup 'com.github.bft-smart'
includeGroup 'com.github.detro' includeGroup 'com.github.detro'
includeGroup 'org.apache.activemq'
} }
} }
maven { maven {

View File

@ -30,6 +30,7 @@ import net.corda.testing.node.internal.rpcTestUser
import net.corda.testing.node.internal.startRandomRpcClient import net.corda.testing.node.internal.startRandomRpcClient
import net.corda.testing.node.internal.startRpcClient import net.corda.testing.node.internal.startRpcClient
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.After import org.junit.After
import org.junit.Assert.assertEquals import org.junit.Assert.assertEquals
@ -551,7 +552,11 @@ class RPCStabilityTests {
// Construct an RPC session manually so that we can hang in the message handler // Construct an RPC session manually so that we can hang in the message handler
val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}"
val session = startArtemisSession(server.broker.hostAndPort!!) val session = startArtemisSession(server.broker.hostAndPort!!)
session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue) session.createQueue(QueueConfiguration(myQueue)
.setRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType())
.setAddress(myQueue)
.setTemporary(true)
.setDurable(false))
val consumer = session.createConsumer(myQueue, null, -1, -1, false) val consumer = session.createConsumer(myQueue, null, -1, -1, false)
consumer.setMessageHandler { consumer.setMessageHandler {
Thread.sleep(5000) // Needs to be slower than one per second to get kicked. Thread.sleep(5000) // Needs to be slower than one per second to get kicked.
@ -588,7 +593,11 @@ class RPCStabilityTests {
// Construct an RPC client session manually // Construct an RPC client session manually
val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}"
val session = startArtemisSession(server.broker.hostAndPort!!) val session = startArtemisSession(server.broker.hostAndPort!!)
session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue) session.createQueue(QueueConfiguration(myQueue)
.setRoutingType(ActiveMQDefaultConfiguration.getDefaultRoutingType())
.setAddress(myQueue)
.setTemporary(true)
.setDurable(false))
val consumer = session.createConsumer(myQueue, null, -1, -1, false) val consumer = session.createConsumer(myQueue, null, -1, -1, false)
val replies = ArrayList<Any>() val replies = ArrayList<Any>()
consumer.setMessageHandler { consumer.setMessageHandler {

View File

@ -95,6 +95,8 @@ class RPCClient<I : RPCOps>(
// By default RoundRobinConnectionLoadBalancingPolicy is used that picks first endpoint from the pool // By default RoundRobinConnectionLoadBalancingPolicy is used that picks first endpoint from the pool
// at random. This may be undesired and non-deterministic. For more information, see [RoundRobinConnectionPolicy] // at random. This may be undesired and non-deterministic. For more information, see [RoundRobinConnectionPolicy]
connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName connectionLoadBalancingPolicyClassName = RoundRobinConnectionPolicy::class.java.canonicalName
// Without this any type of "send" time failures will not be delivered back to the client
isBlockOnNonDurableSend = true
} }
val sessionId = Trace.SessionId.newInstance() val sessionId = Trace.SessionId.newInstance()
val distributionMux = DistributionMux(listeners, username) val distributionMux = DistributionMux(listeners, username)

View File

@ -39,6 +39,7 @@ import net.corda.nodeapi.internal.rpc.client.RpcClientObservableDeSerializer
import net.corda.nodeapi.internal.rpc.client.RpcObservableMap import net.corda.nodeapi.internal.rpc.client.RpcObservableMap
import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
@ -60,6 +61,7 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -380,12 +382,19 @@ internal class RPCClientProxyHandler(
targetLegalIdentity?.let { targetLegalIdentity?.let {
artemisMessage.putStringProperty(RPCApi.RPC_TARGET_LEGAL_IDENTITY, it.toString()) artemisMessage.putStringProperty(RPCApi.RPC_TARGET_LEGAL_IDENTITY, it.toString())
} }
sendExecutor!!.submit { val future: Future<*> = sendExecutor!!.submit {
artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, deduplicationSequenceNumber.getAndIncrement()) artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, deduplicationSequenceNumber.getAndIncrement())
log.debug { "-> RPC -> $message" } log.debug { "-> RPC -> $message" }
rpcProducer!!.send(artemisMessage) rpcProducer!!.let {
if (!it.isClosed) {
it.send(artemisMessage)
} else {
log.info("Producer is already closed. Not sending: $message")
} }
} }
}
future.getOrThrow()
}
// The handler for Artemis messages. // The handler for Artemis messages.
private fun artemisMessageHandler(message: ClientMessage) { private fun artemisMessageHandler(message: ClientMessage) {
@ -570,7 +579,12 @@ internal class RPCClientProxyHandler(
} }
if (observableIds != null) { if (observableIds != null) {
log.debug { "Reaping ${observableIds.size} observables" } log.debug { "Reaping ${observableIds.size} observables" }
@Suppress("TooGenericExceptionCaught")
try {
sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds)) sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds))
} catch(ex: Exception) {
log.warn("Unable to close observables", ex)
}
} }
} }
@ -632,7 +646,8 @@ internal class RPCClientProxyHandler(
consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, 16384) consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, 16384)
clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$rpcUsername.${random63BitValue()}") clientAddress = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$rpcUsername.${random63BitValue()}")
log.debug { "Client address: $clientAddress" } log.debug { "Client address: $clientAddress" }
consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress) consumerSession!!.createQueue(QueueConfiguration(clientAddress).setAddress(clientAddress).setRoutingType(RoutingType.ANYCAST)
.setTemporary(true).setDurable(false))
rpcConsumer = consumerSession!!.createConsumer(clientAddress) rpcConsumer = consumerSession!!.createConsumer(clientAddress)
rpcConsumer!!.setMessageHandler(this::artemisMessageHandler) rpcConsumer!!.setMessageHandler(this::artemisMessageHandler)
} }

View File

@ -40,7 +40,7 @@ import net.corda.nodeapi.internal.config.User
import net.corda.sleeping.SleepingFlow import net.corda.sleeping.SleepingFlow
import net.corda.smoketesting.NodeConfig import net.corda.smoketesting.NodeConfig
import net.corda.smoketesting.NodeProcess import net.corda.smoketesting.NodeProcess
import org.apache.commons.io.output.NullOutputStream import org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM
import org.hamcrest.text.MatchesPattern import org.hamcrest.text.MatchesPattern
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
@ -117,7 +117,7 @@ class StandaloneCordaRPClientTest {
assertEquals(attachment.sha256, id, "Attachment has incorrect SHA256 hash") assertEquals(attachment.sha256, id, "Attachment has incorrect SHA256 hash")
val hash = HashingInputStream(Hashing.sha256(), rpcProxy.openAttachment(id)).use { it -> val hash = HashingInputStream(Hashing.sha256(), rpcProxy.openAttachment(id)).use { it ->
it.copyTo(NullOutputStream()) it.copyTo(NULL_OUTPUT_STREAM)
SecureHash.SHA256(it.hash().asBytes()) SecureHash.SHA256(it.hash().asBytes())
} }
assertEquals(attachment.sha256, hash) assertEquals(attachment.sha256, hash)
@ -132,7 +132,7 @@ class StandaloneCordaRPClientTest {
assertEquals(attachment.sha256, id, "Attachment has incorrect SHA256 hash") assertEquals(attachment.sha256, id, "Attachment has incorrect SHA256 hash")
val hash = HashingInputStream(Hashing.sha256(), rpcProxy.openAttachment(id)).use { it -> val hash = HashingInputStream(Hashing.sha256(), rpcProxy.openAttachment(id)).use { it ->
it.copyTo(NullOutputStream()) it.copyTo(NULL_OUTPUT_STREAM)
SecureHash.SHA256(it.hash().asBytes()) SecureHash.SHA256(it.hash().asBytes())
} }
assertEquals(attachment.sha256, hash) assertEquals(attachment.sha256, hash)

View File

@ -206,6 +206,10 @@
<AppenderRef ref="Console-ErrorCode-Selector"/> <AppenderRef ref="Console-ErrorCode-Selector"/>
<AppenderRef ref="RollingFile-ErrorCode-Appender"/> <AppenderRef ref="RollingFile-ErrorCode-Appender"/>
</Logger> </Logger>
<Logger name="org.apache.activemq.audit" level="error" additivity="false">
<AppenderRef ref="Console-ErrorCode-Selector"/>
<AppenderRef ref="RollingFile-ErrorCode-Appender"/>
</Logger>
<Logger name="org.jolokia" additivity="true" level="warn"> <Logger name="org.jolokia" additivity="true" level="warn">
<AppenderRef ref="Console-ErrorCode-Appender-Println"/> <AppenderRef ref="Console-ErrorCode-Appender-Println"/>
<AppenderRef ref="RollingFile-ErrorCode-Appender"/> <AppenderRef ref="RollingFile-ErrorCode-Appender"/>

View File

@ -1364,7 +1364,6 @@
<ID>ThrowsCount:AMQPTypeIdentifierParser.kt$AMQPTypeIdentifierParser$// Make sure our inputs aren't designed to blow things up. private fun validate(typeString: String)</ID> <ID>ThrowsCount:AMQPTypeIdentifierParser.kt$AMQPTypeIdentifierParser$// Make sure our inputs aren't designed to blow things up. private fun validate(typeString: String)</ID>
<ID>ThrowsCount:AbstractNode.kt$AbstractNode$private fun installCordaServices()</ID> <ID>ThrowsCount:AbstractNode.kt$AbstractNode$private fun installCordaServices()</ID>
<ID>ThrowsCount:ArtemisMessagingServer.kt$ArtemisMessagingServer$// TODO: Maybe wrap [IOException] on a key store load error so that it's clearly splitting key store loading from // Artemis IO errors @Throws(IOException::class, AddressBindingException::class, KeyStoreException::class) private fun configureAndStartServer()</ID> <ID>ThrowsCount:ArtemisMessagingServer.kt$ArtemisMessagingServer$// TODO: Maybe wrap [IOException] on a key store load error so that it's clearly splitting key store loading from // Artemis IO errors @Throws(IOException::class, AddressBindingException::class, KeyStoreException::class) private fun configureAndStartServer()</ID>
<ID>ThrowsCount:BrokerJaasLoginModule.kt$BaseBrokerJaasLoginModule$@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate protected fun getUsernamePasswordAndCerts(): Triple&lt;String, String, Array&lt;javax.security.cert.X509Certificate&gt;?&gt;</ID>
<ID>ThrowsCount:CheckpointVerifier.kt$CheckpointVerifier$ fun verifyCheckpointsCompatible( checkpointStorage: CheckpointStorage, currentCordapps: List&lt;Cordapp&gt;, platformVersion: Int, serviceHub: ServiceHub, tokenizableServices: List&lt;Any&gt; )</ID> <ID>ThrowsCount:CheckpointVerifier.kt$CheckpointVerifier$ fun verifyCheckpointsCompatible( checkpointStorage: CheckpointStorage, currentCordapps: List&lt;Cordapp&gt;, platformVersion: Int, serviceHub: ServiceHub, tokenizableServices: List&lt;Any&gt; )</ID>
<ID>ThrowsCount:CheckpointVerifier.kt$CheckpointVerifier$// Throws exception when the flow is incompatible private fun checkFlowCompatible(subFlow: SubFlow, currentCordappsByHash: Map&lt;SecureHash.SHA256, Cordapp&gt;, platformVersion: Int)</ID> <ID>ThrowsCount:CheckpointVerifier.kt$CheckpointVerifier$// Throws exception when the flow is incompatible private fun checkFlowCompatible(subFlow: SubFlow, currentCordappsByHash: Map&lt;SecureHash.SHA256, Cordapp&gt;, platformVersion: Int)</ID>
<ID>ThrowsCount:ClassCarpenter.kt$ClassCarpenterImpl$ private fun validateSchema(schema: Schema)</ID> <ID>ThrowsCount:ClassCarpenter.kt$ClassCarpenterImpl$ private fun validateSchema(schema: Schema)</ID>

View File

@ -66,7 +66,6 @@ class ArtemisMessagingClient(private val config: MutualSslConfiguration,
retryInterval = messagingServerConnectionConfig.retryInterval().toMillis() retryInterval = messagingServerConnectionConfig.retryInterval().toMillis()
retryIntervalMultiplier = messagingServerConnectionConfig.retryIntervalMultiplier() retryIntervalMultiplier = messagingServerConnectionConfig.retryIntervalMultiplier()
maxRetryInterval = messagingServerConnectionConfig.maxRetryInterval(isHA).toMillis() maxRetryInterval = messagingServerConnectionConfig.maxRetryInterval(isHA).toMillis()
isFailoverOnInitialConnection = messagingServerConnectionConfig.failoverOnInitialAttempt(isHA)
initialConnectAttempts = messagingServerConnectionConfig.initialConnectAttempts(isHA) initialConnectAttempts = messagingServerConnectionConfig.initialConnectAttempts(isHA)
} }
addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize)) addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize))

View File

@ -73,27 +73,27 @@ class ArtemisTcpTransport {
private fun CertificateStore.toKeyStoreTransportOptions(path: Path) = mapOf( private fun CertificateStore.toKeyStoreTransportOptions(path: Path) = mapOf(
TransportConstants.SSL_ENABLED_PROP_NAME to true, TransportConstants.SSL_ENABLED_PROP_NAME to true,
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS", TransportConstants.KEYSTORE_TYPE_PROP_NAME to "JKS",
TransportConstants.KEYSTORE_PATH_PROP_NAME to path, TransportConstants.KEYSTORE_PATH_PROP_NAME to path,
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to password, TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to password,
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true) TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true)
private fun CertificateStore.toTrustStoreTransportOptions(path: Path) = mapOf( private fun CertificateStore.toTrustStoreTransportOptions(path: Path) = mapOf(
TransportConstants.SSL_ENABLED_PROP_NAME to true, TransportConstants.SSL_ENABLED_PROP_NAME to true,
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS", TransportConstants.TRUSTSTORE_TYPE_PROP_NAME to "JKS",
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to path, TransportConstants.TRUSTSTORE_PATH_PROP_NAME to path,
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to password, TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to password,
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true) TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true)
private fun ClientRpcSslOptions.toTransportOptions() = mapOf( private fun ClientRpcSslOptions.toTransportOptions() = mapOf(
TransportConstants.SSL_ENABLED_PROP_NAME to true, TransportConstants.SSL_ENABLED_PROP_NAME to true,
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to trustStoreProvider, TransportConstants.TRUSTSTORE_TYPE_PROP_NAME to trustStoreProvider,
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to trustStorePath, TransportConstants.TRUSTSTORE_PATH_PROP_NAME to trustStorePath,
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to trustStorePassword) TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to trustStorePassword)
private fun BrokerRpcSslOptions.toTransportOptions() = mapOf( private fun BrokerRpcSslOptions.toTransportOptions() = mapOf(
TransportConstants.SSL_ENABLED_PROP_NAME to true, TransportConstants.SSL_ENABLED_PROP_NAME to true,
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS", TransportConstants.KEYSTORE_TYPE_PROP_NAME to "JKS",
TransportConstants.KEYSTORE_PATH_PROP_NAME to keyStorePath, TransportConstants.KEYSTORE_PATH_PROP_NAME to keyStorePath,
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword, TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to keyStorePassword,
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to false) TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to false)
@ -106,9 +106,9 @@ class ArtemisTcpTransport {
return p2pAcceptorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false) return p2pAcceptorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false)
} }
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreProvider: String? = null): TransportConfiguration { fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreType: String? = null): TransportConfiguration {
return p2pConnectorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false, keyStoreProvider = keyStoreProvider) return p2pConnectorTcpTransport(hostAndPort, config?.keyStore, config?.trustStore, enableSSL = enableSSL, useOpenSsl = config?.useOpenSsl ?: false, keyStoreType = keyStoreType)
} }
fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false): TransportConfiguration { fun p2pAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false): TransportConfiguration {
@ -124,20 +124,22 @@ class ArtemisTcpTransport {
} }
@Suppress("LongParameterList") @Suppress("LongParameterList")
fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false, keyStoreProvider: String? = null): TransportConfiguration { fun p2pConnectorTcpTransport(hostAndPort: NetworkHostAndPort, keyStore: FileBasedCertificateStoreSupplier?, trustStore: FileBasedCertificateStoreSupplier?, enableSSL: Boolean = true, useOpenSsl: Boolean = false, keyStoreType: String? = null): TransportConfiguration {
val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap() val options = defaultArtemisOptions(hostAndPort, P2P_PROTOCOLS).toMutableMap()
if (enableSSL) { if (enableSSL) {
options.putAll(defaultSSLOptions) options.putAll(defaultSSLOptions)
(keyStore to trustStore).addToTransportOptions(options) (keyStore to trustStore).addToTransportOptions(options)
options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER options[TransportConstants.SSL_PROVIDER] = if (useOpenSsl) TransportConstants.OPENSSL_PROVIDER else TransportConstants.DEFAULT_SSL_PROVIDER
keyStoreProvider?.let { options.put(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME, keyStoreProvider) } keyStoreType?.let { options.put(TransportConstants.KEYSTORE_TYPE_PROP_NAME, keyStoreType) }
// This is required to stop Client checking URL address vs. Server provided certificate
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
} }
return TransportConfiguration(connectorFactoryClassName, options) return TransportConfiguration(connectorFactoryClassName, options)
} }
fun p2pConnectorTcpTransportFromList(hostAndPortList: List<NetworkHostAndPort>, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreProvider: String? = null): List<TransportConfiguration> = hostAndPortList.map { fun p2pConnectorTcpTransportFromList(hostAndPortList: List<NetworkHostAndPort>, config: MutualSslConfiguration?, enableSSL: Boolean = true, keyStoreType: String? = null): List<TransportConfiguration> = hostAndPortList.map {
p2pConnectorTcpTransport(it, config, enableSSL, keyStoreProvider) p2pConnectorTcpTransport(it, config, enableSSL, keyStoreType)
} }
fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration { fun rpcAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: BrokerRpcSslOptions?, enableSSL: Boolean = true): TransportConfiguration {
@ -159,6 +161,8 @@ class ArtemisTcpTransport {
config.trustStorePath.requireOnDefaultFileSystem() config.trustStorePath.requireOnDefaultFileSystem()
options.putAll(config.toTransportOptions()) options.putAll(config.toTransportOptions())
options.putAll(defaultSSLOptions) options.putAll(defaultSSLOptions)
// This is required to stop Client checking URL address vs. Server provided certificate
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
} }
return TransportConfiguration(connectorFactoryClassName, options) return TransportConfiguration(connectorFactoryClassName, options)
} }
@ -167,17 +171,23 @@ class ArtemisTcpTransport {
rpcConnectorTcpTransport(it, config, enableSSL) rpcConnectorTcpTransport(it, config, enableSSL)
} }
fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration { fun rpcInternalClientTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreType: String? = null): TransportConfiguration {
return TransportConfiguration(connectorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions + config.toTransportOptions() + asMap(keyStoreProvider)) val options = defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS).toMutableMap()
options.putAll(defaultSSLOptions)
options.putAll(config.toTransportOptions())
options.putAll(asMap(keyStoreType))
// This is required to stop Client checking URL address vs. Server provided certificate
options[TransportConstants.VERIFY_HOST_PROP_NAME] = false
return TransportConfiguration(connectorFactoryClassName, options)
} }
fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreProvider: String? = null): TransportConfiguration { fun rpcInternalAcceptorTcpTransport(hostAndPort: NetworkHostAndPort, config: SslConfiguration, keyStoreType: String? = null): TransportConfiguration {
return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions + return TransportConfiguration(acceptorFactoryClassName, defaultArtemisOptions(hostAndPort, RPC_PROTOCOLS) + defaultSSLOptions +
config.toTransportOptions() + (TransportConstants.HANDSHAKE_TIMEOUT to 0) + asMap(keyStoreProvider)) config.toTransportOptions() + (TransportConstants.HANDSHAKE_TIMEOUT to 0) + asMap(keyStoreType))
} }
private fun asMap(keyStoreProvider: String?): Map<String, String> { private fun asMap(keyStoreType: String?): Map<String, String> {
return keyStoreProvider?.let {mutableMapOf(TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to it)} ?: emptyMap() return keyStoreType?.let { mutableMapOf(TransportConstants.KEYSTORE_TYPE_PROP_NAME to it) } ?: emptyMap()
} }
} }
} }

View File

@ -8,6 +8,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.MessagePac
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
import org.apache.qpid.proton.amqp.messaging.Data
class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<Packet>(maxMessageSize), Interceptor { class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<Packet>(maxMessageSize), Interceptor {
override fun getMessageSize(packet: Packet?): Int? { override fun getMessageSize(packet: Packet?): Int? {
@ -22,7 +23,7 @@ class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChec
} }
class AmqpMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<AMQPMessage>(maxMessageSize), AmqpInterceptor { class AmqpMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<AMQPMessage>(maxMessageSize), AmqpInterceptor {
override fun getMessageSize(packet: AMQPMessage?): Int? = packet?.encodeSize override fun getMessageSize(packet: AMQPMessage?): Int? = (packet?.protonMessage?.body as? Data)?.value?.length
} }
/** /**

View File

@ -20,6 +20,7 @@ import net.corda.nodeapi.internal.protonwrapper.netty.ProxyConfig
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientConsumer
@ -114,7 +115,9 @@ class BridgeControlListener(private val keyStore: CertificateStore,
private fun registerBridgeControlListener(artemisSession: ClientSession) { private fun registerBridgeControlListener(artemisSession: ClientSession) {
try { try {
artemisSession.createTemporaryQueue(BRIDGE_CONTROL, RoutingType.MULTICAST, bridgeControlQueue) artemisSession.createQueue(
QueueConfiguration(bridgeControlQueue).setAddress(BRIDGE_CONTROL).setRoutingType(RoutingType.MULTICAST)
.setTemporary(true).setDurable(false))
} catch (ex: ActiveMQQueueExistsException) { } catch (ex: ActiveMQQueueExistsException) {
// Ignore if there is a queue still not cleaned up // Ignore if there is a queue still not cleaned up
} }
@ -134,7 +137,9 @@ class BridgeControlListener(private val keyStore: CertificateStore,
private fun registerBridgeDuplicateChecker(artemisSession: ClientSession) { private fun registerBridgeDuplicateChecker(artemisSession: ClientSession) {
try { try {
artemisSession.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue) artemisSession.createQueue(
QueueConfiguration(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST)
.setTemporary(true).setDurable(false))
} catch (ex: ActiveMQQueueExistsException) { } catch (ex: ActiveMQQueueExistsException) {
// Ignore if there is a queue still not cleaned up // Ignore if there is a queue still not cleaned up
} }

View File

@ -15,7 +15,6 @@ import java.time.Duration
* *
* totalFailoverDuration = 5 + 5 * 1.5 + 5 * (1.5)^2 + 5 * (1.5)^3 + 5 * (1.5)^4 = ~66 seconds * totalFailoverDuration = 5 + 5 * 1.5 + 5 * (1.5)^2 + 5 * (1.5)^3 + 5 * (1.5)^4 = ~66 seconds
* *
* @param failoverOnInitialAttempt Determines whether failover is triggered if initial connection fails.
* @param initialConnectAttempts The number of reconnect attempts if failover is enabled for initial connection. A value * @param initialConnectAttempts The number of reconnect attempts if failover is enabled for initial connection. A value
* of -1 represents infinite attempts. * of -1 represents infinite attempts.
* @param reconnectAttempts The number of reconnect attempts for failover after initial connection is done. A value * @param reconnectAttempts The number of reconnect attempts for failover after initial connection is done. A value
@ -27,7 +26,6 @@ import java.time.Duration
enum class MessagingServerConnectionConfiguration { enum class MessagingServerConnectionConfiguration {
DEFAULT { DEFAULT {
override fun failoverOnInitialAttempt(isHa: Boolean) = true
override fun initialConnectAttempts(isHa: Boolean) = 5 override fun initialConnectAttempts(isHa: Boolean) = 5
override fun reconnectAttempts(isHa: Boolean) = 5 override fun reconnectAttempts(isHa: Boolean) = 5
override fun retryInterval() = 5.seconds override fun retryInterval() = 5.seconds
@ -36,7 +34,6 @@ enum class MessagingServerConnectionConfiguration {
}, },
FAIL_FAST { FAIL_FAST {
override fun failoverOnInitialAttempt(isHa: Boolean) = isHa
override fun initialConnectAttempts(isHa: Boolean) = 0 override fun initialConnectAttempts(isHa: Boolean) = 0
// Client die too fast during failover/failback, need a few reconnect attempts to allow new master to become active // Client die too fast during failover/failback, need a few reconnect attempts to allow new master to become active
override fun reconnectAttempts(isHa: Boolean) = if (isHa) 3 else 0 override fun reconnectAttempts(isHa: Boolean) = if (isHa) 3 else 0
@ -46,7 +43,6 @@ enum class MessagingServerConnectionConfiguration {
}, },
CONTINUOUS_RETRY { CONTINUOUS_RETRY {
override fun failoverOnInitialAttempt(isHa: Boolean) = true
override fun initialConnectAttempts(isHa: Boolean) = if (isHa) 0 else -1 override fun initialConnectAttempts(isHa: Boolean) = if (isHa) 0 else -1
override fun reconnectAttempts(isHa: Boolean) = -1 override fun reconnectAttempts(isHa: Boolean) = -1
override fun retryInterval() = 5.seconds override fun retryInterval() = 5.seconds
@ -54,7 +50,6 @@ enum class MessagingServerConnectionConfiguration {
override fun maxRetryInterval(isHa: Boolean) = if (isHa) 3.minutes else 5.minutes override fun maxRetryInterval(isHa: Boolean) = if (isHa) 3.minutes else 5.minutes
}; };
abstract fun failoverOnInitialAttempt(isHa: Boolean): Boolean
abstract fun initialConnectAttempts(isHa: Boolean): Int abstract fun initialConnectAttempts(isHa: Boolean): Int
abstract fun reconnectAttempts(isHa: Boolean): Int abstract fun reconnectAttempts(isHa: Boolean): Int
abstract fun retryInterval(): Duration abstract fun retryInterval(): Duration

View File

@ -3,6 +3,7 @@ package net.corda.node
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.PermissionException import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.RPCException
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
@ -151,7 +152,7 @@ class AuthDBTests : NodeBasedTest(cordappPackages = CORDAPPS) {
proxy.stateMachinesFeed() proxy.stateMachinesFeed()
assertFailsWith( assertFailsWith(
PermissionException::class, PermissionException::class,
"This user should not be authorized to call 'nodeInfo'") { "This user should not be authorized to call 'stateMachinesFeed'") {
proxy.nodeInfo() proxy.nodeInfo()
} }
} }
@ -185,7 +186,7 @@ class AuthDBTests : NodeBasedTest(cordappPackages = CORDAPPS) {
val proxy = it.proxy val proxy = it.proxy
assertFailsWith( assertFailsWith(
PermissionException::class, PermissionException::class,
"This user should not be authorized to call 'nodeInfo'") { "This user should not be authorized to call 'stateMachinesFeed'") {
proxy.stateMachinesFeed() proxy.stateMachinesFeed()
} }
db.addRoleToUser("user3", "default") db.addRoleToUser("user3", "default")
@ -207,8 +208,8 @@ class AuthDBTests : NodeBasedTest(cordappPackages = CORDAPPS) {
db.deleteUser("user4") db.deleteUser("user4")
Thread.sleep(1500) Thread.sleep(1500)
assertFailsWith( assertFailsWith(
PermissionException::class, RPCException::class,
"This user should not be authorized to call 'nodeInfo'") { "This user should not be authorized to call 'stateMachinesFeed'") {
proxy.stateMachinesFeed() proxy.stateMachinesFeed()
} }
} }

View File

@ -24,6 +24,7 @@ import net.corda.coretesting.internal.rigorousMock
import net.corda.coretesting.internal.stubs.CertificateStoreStubs import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Assert.assertArrayEquals import org.junit.Assert.assertArrayEquals
@ -222,7 +223,8 @@ class AMQPBridgeTest {
val artemis = artemisClient.started!! val artemis = artemisClient.started!!
if (sourceQueueName != null) { if (sourceQueueName != null) {
// Local queue for outgoing messages // Local queue for outgoing messages
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) artemis.session.createQueue(
QueueConfiguration(sourceQueueName).setRoutingType(RoutingType.ANYCAST).setAddress(sourceQueueName).setDurable(true))
bridgeManager.deployBridge(ALICE_NAME.toString(), sourceQueueName, listOf(amqpAddress), setOf(BOB.name)) bridgeManager.deployBridge(ALICE_NAME.toString(), sourceQueueName, listOf(amqpAddress), setOf(BOB.name))
} }
return Triple(artemisServer, artemisClient, bridgeManager) return Triple(artemisServer, artemisClient, bridgeManager)

View File

@ -32,6 +32,7 @@ import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.node.services.messaging.ArtemisMessagingServer import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
@ -487,7 +488,7 @@ class CertificateRevocationListNodeTests {
@Path("node.crl") @Path("node.crl")
@Produces("application/pkcs7-crl") @Produces("application/pkcs7-crl")
fun getNodeCRL(): Response { fun getNodeCRL(): Response {
return Response.ok(CertificateRevocationListNodeTests.createRevocationList( return Response.ok(createRevocationList(
server, server,
SIGNATURE_ALGORITHM, SIGNATURE_ALGORITHM,
INTERMEDIATE_CA.certificate, INTERMEDIATE_CA.certificate,
@ -663,7 +664,8 @@ class CertificateRevocationListNodeTests {
val queueName = P2P_PREFIX + "Test" val queueName = P2P_PREFIX + "Test"
val (artemisServer, artemisClient) = createArtemisServerAndClient(serverPort, crlCheckSoftFail, crlCheckArtemisServer) val (artemisServer, artemisClient) = createArtemisServerAndClient(serverPort, crlCheckSoftFail, crlCheckArtemisServer)
artemisServer.use { artemisServer.use {
artemisClient.started!!.session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) artemisClient.started!!.session.createQueue(
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName).setDurable(true))
val (amqpClient, nodeCert) = createClient(serverPort, true, nodeCrlDistPoint) val (amqpClient, nodeCert) = createClient(serverPort, true, nodeCrlDistPoint)
if (revokedNodeCert) { if (revokedNodeCert) {

View File

@ -34,6 +34,7 @@ import net.corda.testing.internal.createDevIntermediateCaCertPath
import net.corda.coretesting.internal.rigorousMock import net.corda.coretesting.internal.rigorousMock
import net.corda.coretesting.internal.stubs.CertificateStoreStubs import net.corda.coretesting.internal.stubs.CertificateStoreStubs
import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig import net.corda.nodeapi.internal.protonwrapper.netty.toRevocationConfig
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Assert.assertArrayEquals import org.junit.Assert.assertArrayEquals
@ -271,7 +272,8 @@ class ProtonWrapperTests {
assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal))
val artemis = artemisClient.started!! val artemis = artemisClient.started!!
val sendAddress = P2P_PREFIX + "Test" val sendAddress = P2P_PREFIX + "Test"
artemis.session.createQueue(sendAddress, RoutingType.ANYCAST, "queue", true) artemis.session.createQueue(QueueConfiguration("queue")
.setRoutingType(RoutingType.ANYCAST).setAddress(sendAddress).setDurable(true))
val consumer = artemis.session.createConsumer("queue") val consumer = artemis.session.createConsumer("queue")
val testData = "Test".toByteArray() val testData = "Test".toByteArray()
val testProperty = mutableMapOf<String, Any?>() val testProperty = mutableMapOf<String, Any?>()
@ -298,7 +300,8 @@ class ProtonWrapperTests {
assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal)) assertEquals(CHARLIE_NAME, CordaX500Name.build(clientConnected.get().remoteCert!!.subjectX500Principal))
val artemis = artemisClient.started!! val artemis = artemisClient.started!!
val sendAddress = P2P_PREFIX + "Test" val sendAddress = P2P_PREFIX + "Test"
artemis.session.createQueue(sendAddress, RoutingType.ANYCAST, "queue", true) artemis.session.createQueue(QueueConfiguration("queue")
.setRoutingType(RoutingType.ANYCAST).setAddress(sendAddress).setDurable(true))
val consumer = artemis.session.createConsumer("queue") val consumer = artemis.session.createConsumer("queue")
val testProperty = mutableMapOf<String, Any?>() val testProperty = mutableMapOf<String, Any?>()
@ -313,7 +316,7 @@ class ProtonWrapperTests {
assertEquals("1", received.getStringProperty("TestProp")) assertEquals("1", received.getStringProperty("TestProp"))
assertArrayEquals(testData, ByteArray(received.bodySize).apply { received.bodyBuffer.readBytes(this) }) assertArrayEquals(testData, ByteArray(received.bodySize).apply { received.bodyBuffer.readBytes(this) })
// Send message larger then max message size. // Send message larger than max message size.
val largeData = ByteArray(maxMessageSize + 1) val largeData = ByteArray(maxMessageSize + 1)
// Create message will fail. // Create message will fail.
assertThatThrownBy { assertThatThrownBy {

View File

@ -184,6 +184,7 @@ class ArtemisMessagingTest {
messagingClient.send(tooLagerMessage, messagingClient.myAddress) messagingClient.send(tooLagerMessage, messagingClient.myAddress)
}.isInstanceOf(ActiveMQConnectionTimedOutException::class.java) }.isInstanceOf(ActiveMQConnectionTimedOutException::class.java)
assertNull(receivedMessages.poll(200, MILLISECONDS)) assertNull(receivedMessages.poll(200, MILLISECONDS))
this.messagingClient = null
} }
@Test(timeout=300_000) @Test(timeout=300_000)
@ -231,7 +232,9 @@ class ArtemisMessagingTest {
MetricRegistry(), MetricRegistry(),
TestingNamedCacheFactory(), TestingNamedCacheFactory(),
isDrainingModeOn = { false }, isDrainingModeOn = { false },
drainingModeWasChangedEvents = PublishSubject.create<Pair<Boolean, Boolean>>()).apply { drainingModeWasChangedEvents = PublishSubject.create<Pair<Boolean, Boolean>>(),
terminateOnConnectionError = false,
timeoutConfig = P2PMessagingClient.TimeoutConfig(10.seconds, 10.seconds, 10.seconds)).apply {
config.configureWithDevSSLCertificate() config.configureWithDevSSLCertificate()
messagingClient = this messagingClient = this
} }

View File

@ -28,6 +28,7 @@ import net.corda.testing.node.internal.NodeBasedTest
import net.corda.testing.node.internal.startFlow import net.corda.testing.node.internal.startFlow
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.assertj.core.api.Assertions.assertThatExceptionOfType
@ -130,7 +131,11 @@ abstract class MQSecurityTest : NodeBasedTest() {
fun assertTempQueueCreationAttackFails(queue: String) { fun assertTempQueueCreationAttackFails(queue: String) {
assertAttackFails(queue, "CREATE_NON_DURABLE_QUEUE") { assertAttackFails(queue, "CREATE_NON_DURABLE_QUEUE") {
attacker.session.createTemporaryQueue(queue, RoutingType.MULTICAST, queue) attacker.session.createQueue(QueueConfiguration(queue)
.setRoutingType(RoutingType.MULTICAST)
.setAddress(queue)
.setTemporary(true)
.setDurable(false))
} }
// Double-check // Double-check
assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy { assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy {
@ -147,7 +152,8 @@ abstract class MQSecurityTest : NodeBasedTest() {
fun assertNonTempQueueCreationAttackFails(queue: String, durable: Boolean) { fun assertNonTempQueueCreationAttackFails(queue: String, durable: Boolean) {
val permission = if (durable) "CREATE_DURABLE_QUEUE" else "CREATE_NON_DURABLE_QUEUE" val permission = if (durable) "CREATE_DURABLE_QUEUE" else "CREATE_NON_DURABLE_QUEUE"
assertAttackFails(queue, permission) { assertAttackFails(queue, permission) {
attacker.session.createQueue(queue, RoutingType.MULTICAST, queue, durable) attacker.session.createQueue(
QueueConfiguration(queue).setAddress(queue).setRoutingType(RoutingType.MULTICAST).setDurable(durable))
} }
// Double-check // Double-check
assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy { assertThatExceptionOfType(ActiveMQNonExistentQueueException::class.java).isThrownBy {

View File

@ -1,6 +1,5 @@
package net.corda.node.internal.artemis package net.corda.node.internal.artemis
import io.netty.channel.unix.Errors
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.LifecycleSupport import net.corda.node.internal.LifecycleSupport
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
@ -18,4 +17,11 @@ data class BrokerAddresses(val primary: NetworkHostAndPort, private val adminArg
val admin = adminArg ?: primary val admin = adminArg ?: primary
} }
fun java.io.IOException.isBindingError() = this is BindException || this is Errors.NativeIoException && message?.contains("Address already in use") == true fun Throwable.isBindingError(): Boolean {
val addressAlreadyUsedMsg = "Address already in use"
// This is not an exact science here.
// Depending on the underlying OS it can be either [Errors.NativeIoException] on Linux or [BindException] on Windows
// and of course this is dependent on the version of Artemis library used.
return this is BindException ||
this is IllegalStateException && cause.let { it is BindException || it?.message?.contains(addressAlreadyUsedMsg) == true }
}

View File

@ -14,6 +14,7 @@ import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
import java.io.IOException import java.io.IOException
import java.security.KeyStore import java.security.KeyStore
import java.security.Principal import java.security.Principal
import java.security.cert.X509Certificate
import java.util.* import java.util.*
import javax.security.auth.Subject import javax.security.auth.Subject
import javax.security.auth.callback.CallbackHandler import javax.security.auth.callback.CallbackHandler
@ -119,26 +120,25 @@ class BrokerJaasLoginModule : BaseBrokerJaasLoginModule() {
// The Main authentication logic, responsible for running all the configured checks for each user type // The Main authentication logic, responsible for running all the configured checks for each user type
// and return the actual User and principals // and return the actual User and principals
@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate private fun authenticateAndAuthorise(username: String, certificates: Array<X509Certificate>, password: String): Pair<String, List<RolePrincipal>> {
private fun authenticateAndAuthorise(username: String, certificates: Array<javax.security.cert.X509Certificate>?, password: String): Pair<String, List<RolePrincipal>> { fun requireTls(certificates: Array<X509Certificate>?) = requireNotNull(certificates) { "No client certificates presented." }
fun requireTls(certificates: Array<javax.security.cert.X509Certificate>?) = requireNotNull(certificates) { "No client certificates presented." }
return when (username) { return when (username) {
ArtemisMessagingComponent.NODE_P2P_USER -> { ArtemisMessagingComponent.NODE_P2P_USER -> {
requireTls(certificates) requireTls(certificates)
CertificateChainCheckPolicy.LeafMustMatch.createCheck(nodeJaasConfig.keyStore, nodeJaasConfig.trustStore).checkCertificateChain(certificates!!) CertificateChainCheckPolicy.LeafMustMatch.createCheck(nodeJaasConfig.keyStore, nodeJaasConfig.trustStore).checkCertificateChain(certificates)
Pair(certificates.first().subjectDN.name, listOf(RolePrincipal(NODE_P2P_ROLE))) Pair(certificates.first().subjectDN.name, listOf(RolePrincipal(NODE_P2P_ROLE)))
} }
ArtemisMessagingComponent.NODE_RPC_USER -> { ArtemisMessagingComponent.NODE_RPC_USER -> {
requireTls(certificates) requireTls(certificates)
CertificateChainCheckPolicy.LeafMustMatch.createCheck(nodeJaasConfig.keyStore, nodeJaasConfig.trustStore).checkCertificateChain(certificates!!) CertificateChainCheckPolicy.LeafMustMatch.createCheck(nodeJaasConfig.keyStore, nodeJaasConfig.trustStore).checkCertificateChain(certificates)
Pair(ArtemisMessagingComponent.NODE_RPC_USER, listOf(RolePrincipal(NODE_RPC_ROLE))) Pair(ArtemisMessagingComponent.NODE_RPC_USER, listOf(RolePrincipal(NODE_RPC_ROLE)))
} }
ArtemisMessagingComponent.PEER_USER -> { ArtemisMessagingComponent.PEER_USER -> {
requireNotNull(p2pJaasConfig) { "Attempted to connect as a peer to the rpc broker." } requireNotNull(p2pJaasConfig) { "Attempted to connect as a peer to the rpc broker." }
requireTls(certificates) requireTls(certificates)
// This check is redundant as it was performed already during the SSL handshake // This check is redundant as it was performed already during the SSL handshake
CertificateChainCheckPolicy.RootMustMatch.createCheck(p2pJaasConfig!!.keyStore, p2pJaasConfig!!.trustStore).checkCertificateChain(certificates!!) CertificateChainCheckPolicy.RootMustMatch.createCheck(p2pJaasConfig!!.keyStore, p2pJaasConfig!!.trustStore).checkCertificateChain(certificates)
CertificateChainCheckPolicy.RevocationCheck(p2pJaasConfig!!.revocationMode) CertificateChainCheckPolicy.RevocationCheck(p2pJaasConfig!!.revocationMode)
.createCheck(p2pJaasConfig!!.keyStore, p2pJaasConfig!!.trustStore).checkCertificateChain(certificates) .createCheck(p2pJaasConfig!!.keyStore, p2pJaasConfig!!.trustStore).checkCertificateChain(certificates)
Pair(certificates.first().subjectDN.name, listOf(RolePrincipal(PEER_ROLE))) Pair(certificates.first().subjectDN.name, listOf(RolePrincipal(PEER_ROLE)))
@ -176,8 +176,8 @@ abstract class BaseBrokerJaasLoginModule : LoginModule {
protected lateinit var callbackHandler: CallbackHandler protected lateinit var callbackHandler: CallbackHandler
protected val principals = ArrayList<Principal>() protected val principals = ArrayList<Principal>()
@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate @Suppress("ThrowsCount")
protected fun getUsernamePasswordAndCerts(): Triple<String, String, Array<javax.security.cert.X509Certificate>?> { protected fun getUsernamePasswordAndCerts(): Triple<String, String, Array<X509Certificate>> {
val nameCallback = NameCallback("Username: ") val nameCallback = NameCallback("Username: ")
val passwordCallback = PasswordCallback("Password: ", false) val passwordCallback = PasswordCallback("Password: ", false)
val certificateCallback = CertificateCallback() val certificateCallback = CertificateCallback()

View File

@ -22,8 +22,7 @@ sealed class CertificateChainCheckPolicy {
@FunctionalInterface @FunctionalInterface
interface Check { interface Check {
@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>)
fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>)
} }
abstract fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check abstract fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check
@ -31,8 +30,7 @@ sealed class CertificateChainCheckPolicy {
object Any : CertificateChainCheckPolicy() { object Any : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
return object : Check { return object : Check {
@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate override fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>) {
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
// nothing to do here // nothing to do here
} }
} }
@ -44,8 +42,7 @@ sealed class CertificateChainCheckPolicy {
val rootAliases = trustStore.aliases().asSequence().filter { it.startsWith(X509Utilities.CORDA_ROOT_CA) } val rootAliases = trustStore.aliases().asSequence().filter { it.startsWith(X509Utilities.CORDA_ROOT_CA) }
val rootPublicKeys = rootAliases.map { trustStore.getCertificate(it).publicKey }.toSet() val rootPublicKeys = rootAliases.map { trustStore.getCertificate(it).publicKey }.toSet()
return object : Check { return object : Check {
@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate override fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>) {
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
val theirRoot = theirChain.last().publicKey val theirRoot = theirChain.last().publicKey
if (theirRoot !in rootPublicKeys) { if (theirRoot !in rootPublicKeys) {
throw CertificateException("Root certificate mismatch, their root = $theirRoot") throw CertificateException("Root certificate mismatch, their root = $theirRoot")
@ -59,8 +56,7 @@ sealed class CertificateChainCheckPolicy {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
val ourPublicKey = keyStore.getCertificate(X509Utilities.CORDA_CLIENT_TLS).publicKey val ourPublicKey = keyStore.getCertificate(X509Utilities.CORDA_CLIENT_TLS).publicKey
return object : Check { return object : Check {
@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate override fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>) {
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
val theirLeaf = theirChain.first().publicKey val theirLeaf = theirChain.first().publicKey
if (ourPublicKey != theirLeaf) { if (ourPublicKey != theirLeaf) {
throw CertificateException("Leaf certificate mismatch, their leaf = $theirLeaf") throw CertificateException("Leaf certificate mismatch, their leaf = $theirLeaf")
@ -74,8 +70,7 @@ sealed class CertificateChainCheckPolicy {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
val trustedPublicKeys = trustedAliases.map { trustStore.getCertificate(it).publicKey }.toSet() val trustedPublicKeys = trustedAliases.map { trustStore.getCertificate(it).publicKey }.toSet()
return object : Check { return object : Check {
@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate override fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>) {
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
if (!theirChain.any { it.publicKey in trustedPublicKeys }) { if (!theirChain.any { it.publicKey in trustedPublicKeys }) {
throw CertificateException("Their certificate chain contained none of the trusted ones") throw CertificateException("Their certificate chain contained none of the trusted ones")
} }
@ -92,8 +87,7 @@ sealed class CertificateChainCheckPolicy {
class UsernameMustMatchCommonNameCheck : Check { class UsernameMustMatchCommonNameCheck : Check {
lateinit var username: String lateinit var username: String
@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate override fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>) {
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
if (!theirChain.any { certificate -> CordaX500Name.parse(certificate.subjectDN.name).commonName == username }) { if (!theirChain.any { certificate -> CordaX500Name.parse(certificate.subjectDN.name).commonName == username }) {
throw CertificateException("Client certificate does not match login username.") throw CertificateException("Client certificate does not match login username.")
} }
@ -103,8 +97,7 @@ sealed class CertificateChainCheckPolicy {
class RevocationCheck(val revocationMode: RevocationConfig.Mode) : CertificateChainCheckPolicy() { class RevocationCheck(val revocationMode: RevocationConfig.Mode) : CertificateChainCheckPolicy() {
override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check { override fun createCheck(keyStore: KeyStore, trustStore: KeyStore): Check {
return object : Check { return object : Check {
@Suppress("DEPRECATION") // should use java.security.cert.X509Certificate override fun checkCertificateChain(theirChain: Array<java.security.cert.X509Certificate>) {
override fun checkCertificateChain(theirChain: Array<javax.security.cert.X509Certificate>) {
if (revocationMode == RevocationConfig.Mode.OFF) { if (revocationMode == RevocationConfig.Mode.OFF) {
return return
} }

View File

@ -11,6 +11,7 @@ import net.corda.node.internal.artemis.*
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_P2P_ROLE import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_P2P_ROLE
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.PEER_ROLE import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.PEER_ROLE
import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NodeConfiguration
import net.corda.node.utilities.artemis.startSynchronously
import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor
import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
@ -89,28 +90,26 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
override val started: Boolean override val started: Boolean
get() = activeMQServer.isStarted get() = activeMQServer.isStarted
// TODO: Maybe wrap [IOException] on a key store load error so that it's clearly splitting key store loading from
// Artemis IO errors
@Throws(IOException::class, AddressBindingException::class, KeyStoreException::class) @Throws(IOException::class, AddressBindingException::class, KeyStoreException::class)
private fun configureAndStartServer() { private fun configureAndStartServer() {
val artemisConfig = createArtemisConfig() val artemisConfig = createArtemisConfig()
val securityManager = createArtemisSecurityManager() val securityManager = createArtemisSecurityManager()
activeMQServer = ActiveMQServerImpl(artemisConfig, securityManager).apply { activeMQServer = ActiveMQServerImpl(artemisConfig, securityManager).apply {
// Throw any exceptions which are detected during startup
registerActivationFailureListener { exception -> throw exception }
// Some types of queue might need special preparation on our side, like dialling back or preparing // Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem. // a lazily initialised subsystem.
registerPostQueueCreationCallback { log.debug { "Queue Created: $it" } } registerPostQueueCreationCallback { log.debug { "Queue Created: $it" } }
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } } registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
} }
@Suppress("TooGenericExceptionCaught")
try { try {
activeMQServer.start() activeMQServer.startSynchronously()
} catch (e: IOException) { } catch (e: Throwable) {
log.error("Unable to start message broker", e) log.error("Unable to start message broker", e)
if (e.isBindingError()) { if (e.isBindingError()) {
throw AddressBindingException(config.p2pAddress) throw AddressBindingException(config.p2pAddress)
} else { } else {
log.error("Unexpected error starting message broker", e)
throw e throw e
} }
} }

View File

@ -48,13 +48,22 @@ import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.* import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
import org.apache.activemq.artemis.api.core.client.FailoverEventType
import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Observable import rx.Observable
import rx.Subscription import rx.Subscription
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import java.security.PublicKey import java.security.PublicKey
import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
@ -72,15 +81,17 @@ import kotlin.concurrent.timer
* executor through into Artemis and from there, back through to senders. * executor through into Artemis and from there, back through to senders.
* *
* An implementation of [CordaRPCOps] can be provided. If given, clients using the CordaMQClient RPC library can * An implementation of [CordaRPCOps] can be provided. If given, clients using the CordaMQClient RPC library can
* invoke methods on the provided implementation. There is more documentation on this in the docsite and the * invoke methods on the provided implementation. There is more documentation on this in the doc-site and the
* CordaRPCClient class. * CordaRPCClient class.
* *
* @param config The configuration of the node, which is used for controlling the message redelivery options. * @param config The configuration of the node, which is used for controlling the message redelivery options.
* @param versionInfo All messages from the node carry the version info and received messages are checked against this for compatibility. * @param versionInfo All messages from the node carry the version info and received messages are checked against this for compatibility.
* @param serverAddress The host and port of the Artemis broker. * @param serverAddress The host and port of the Artemis broker.
* @param nodeExecutor The received messages are marshalled onto the server executor to prevent Netty buffers leaking during fiber suspends. * @param nodeExecutor The received messages are marshalled onto the server executor to prevent Netty buffers leaking during fiber suspends.
* @param database The nodes database, which is used to deduplicate messages. * @param database The node's database, which is used to deduplicate messages.
* @param terminateOnConnectionError whether the process should be terminated forcibly if connection with the broker fails.
*/ */
@Suppress("LongParameterList")
@ThreadSafe @ThreadSafe
class P2PMessagingClient(val config: NodeConfiguration, class P2PMessagingClient(val config: NodeConfiguration,
private val versionInfo: VersionInfo, private val versionInfo: VersionInfo,
@ -93,7 +104,9 @@ class P2PMessagingClient(val config: NodeConfiguration,
cacheFactory: NamedCacheFactory, cacheFactory: NamedCacheFactory,
private val isDrainingModeOn: () -> Boolean, private val isDrainingModeOn: () -> Boolean,
private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>, private val drainingModeWasChangedEvents: Observable<Pair<Boolean, Boolean>>,
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log) private val stateHelper: ServiceStateHelper = ServiceStateHelper(log),
private val terminateOnConnectionError: Boolean = true,
private val timeoutConfig: TimeoutConfig = TimeoutConfig.default()
) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, ServiceStateSupport by stateHelper { ) : SingletonSerializeAsToken(), MessagingService, AddressToArtemisQueueResolver, ServiceStateSupport by stateHelper {
companion object { companion object {
private val log = contextLogger() private val log = contextLogger()
@ -126,6 +139,21 @@ class P2PMessagingClient(val config: NodeConfiguration,
fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message) fun sendMessage(address: String, message: ClientMessage) = producer!!.send(address, message)
} }
/**
* @property callTimeout the time a blocking call (e.g. message send) from a client waits for a response until it times out.
* @property serverConnectionTtl the time the server waits for a packet/heartbeat from a client before it announces the connection dead and cleans it up.
* @property clientConnectionTtl the time the client waits for a packet/heartbeat from a client before it announces the connection dead and cleans it up.
*/
data class TimeoutConfig(val callTimeout: Duration, val serverConnectionTtl: Duration, val clientConnectionTtl: Duration) {
companion object {
/**
* Some sensible defaults, aligned with defaults of Artemis
*/
@Suppress("MagicNumber")
fun default() = TimeoutConfig(30.seconds, 60.seconds, 30.seconds)
}
}
/** A registration to handle messages of different types */ /** A registration to handle messages of different types */
data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration data class HandlerRegistration(val topic: String, val callback: Any) : MessageHandlerRegistration
@ -168,15 +196,21 @@ class P2PMessagingClient(val config: NodeConfiguration,
locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply { locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport).apply {
// Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this // Never time out on our loopback Artemis connections. If we switch back to using the InVM transport this
// would be the default and the two lines below can be deleted. // would be the default and the two lines below can be deleted.
connectionTTL = 60000 callTimeout = timeoutConfig.callTimeout.toMillis()
clientFailureCheckPeriod = 30000 connectionTTL = timeoutConfig.serverConnectionTtl.toMillis()
clientFailureCheckPeriod = timeoutConfig.clientConnectionTtl.toMillis()
minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE minLargeMessageSize = maxMessageSize + JOURNAL_HEADER_SIZE
isUseGlobalPools = nodeSerializationEnv != null isUseGlobalPools = nodeSerializationEnv != null
} }
sessionFactory = locator!!.createSessionFactory().addFailoverListener(::failoverCallback)
sessionFactory = if (terminateOnConnectionError) {
locator!!.createSessionFactory().addFailoverListener(::failoverCallback)
} else {
locator!!.createSessionFactory()
}
// Login using the node username. The broker will authenticate us as its node (as opposed to another peer) // Login using the node username. The broker will authenticate us as its node (as opposed to another peer)
// using our TLS certificate. // using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer // Note that the acknowledgement of messages is not flushed to the Artemis journal until the default buffer
// size of 1MB is acknowledged. // size of 1MB is acknowledged.
val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) } val createNewSession = { sessionFactory!!.createSession(ArtemisMessagingComponent.NODE_P2P_USER, ArtemisMessagingComponent.NODE_P2P_USER, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE) }
@ -234,7 +268,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) { private fun InnerState.registerBridgeControl(session: ClientSession, inboxes: List<String>) {
val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}" val bridgeNotifyQueue = "$BRIDGE_NOTIFY.${myIdentity.toStringShort()}"
if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) { if (!session.queueQuery(SimpleString(bridgeNotifyQueue)).isExists) {
session.createTemporaryQueue(BRIDGE_NOTIFY, RoutingType.MULTICAST, bridgeNotifyQueue) session.createQueue(QueueConfiguration(bridgeNotifyQueue).setAddress(BRIDGE_NOTIFY).setRoutingType(RoutingType.MULTICAST)
.setTemporary(true).setDurable(false))
} }
val bridgeConsumer = session.createConsumer(bridgeNotifyQueue) val bridgeConsumer = session.createConsumer(bridgeNotifyQueue)
bridgeNotifyConsumer = bridgeConsumer bridgeNotifyConsumer = bridgeConsumer
@ -266,8 +301,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
log.info("Updating bridges on network map change: ${change::class.simpleName} ${change.node}") log.info("Updating bridges on network map change: ${change::class.simpleName} ${change.node}")
fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> { fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> {
return state.locked { return state.locked {
node.legalIdentitiesAndCerts.map { node.legalIdentitiesAndCerts.map { partyAndCertificate ->
val messagingAddress = NodeAddress(it.party.owningKey) val messagingAddress = NodeAddress(partyAndCertificate.party.owningKey)
BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name }, serviceAddress = false) BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name }, serviceAddress = false)
}.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence() }.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
} }
@ -464,8 +499,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
running = false running = false
stateHelper.active = false stateHelper.active = false
networkChangeSubscription?.unsubscribe() networkChangeSubscription?.unsubscribe()
require(p2pConsumer != null, { "stop can't be called twice" }) require(p2pConsumer != null) { "stop can't be called twice" }
require(producer != null, { "stop can't be called twice" }) require(producer != null) { "stop can't be called twice" }
close(p2pConsumer) close(p2pConsumer)
p2pConsumer = null p2pConsumer = null
@ -525,7 +560,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
// If we are sending to ourselves then route the message directly to our P2P queue. // If we are sending to ourselves then route the message directly to our P2P queue.
RemoteInboxAddress(myIdentity).queueName RemoteInboxAddress(myIdentity).queueName
} else { } else {
// Otherwise we send the message to an internal queue for the target residing on our broker. It's then the // Otherwise, we send the message to an internal queue for the target residing on our broker. It's then the
// broker's job to route the message to the target's P2P queue. // broker's job to route the message to the target's P2P queue.
val internalTargetQueue = (address as? ArtemisAddress)?.queueName val internalTargetQueue = (address as? ArtemisAddress)?.queueName
?: throw IllegalArgumentException("Not an Artemis address") ?: throw IllegalArgumentException("Not an Artemis address")
@ -557,9 +592,13 @@ class P2PMessagingClient(val config: NodeConfiguration,
val queueQuery = session.queueQuery(SimpleString(queueName)) val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) { if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address") log.info("Create fresh queue $queueName bound on same address")
session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, session.createQueue(QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST).setAddress(queueName)
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), .setDurable(true).setAutoCreated(false)
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), exclusive, null) .setMaxConsumers(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers())
.setPurgeOnNoConsumers(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers())
.setExclusive(exclusive)
.setLastValue(null)
)
sendBridgeCreateMessage() sendBridgeCreateMessage()
} }
} }
@ -568,7 +607,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
} }
override fun addMessageHandler(topic: String, callback: MessageHandler): MessageHandlerRegistration { override fun addMessageHandler(topic: String, callback: MessageHandler): MessageHandlerRegistration {
require(!topic.isBlank()) { "Topic must not be blank, as the empty topic is a special case." } require(topic.isNotBlank()) { "Topic must not be blank, as the empty topic is a special case." }
handlers.compute(topic) { _, handler -> handlers.compute(topic) { _, handler ->
if (handler != null) { if (handler != null) {
throw IllegalStateException("Cannot add another acking handler for $topic, there is already an acking one") throw IllegalStateException("Cannot add another acking handler for $topic, there is already an acking one")

View File

@ -8,6 +8,7 @@ import net.corda.node.internal.artemis.*
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_SECURITY_CONFIG import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_SECURITY_CONFIG
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.RPC_SECURITY_CONFIG import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.RPC_SECURITY_CONFIG
import net.corda.node.internal.security.RPCSecurityManager import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.utilities.artemis.startSynchronously
import net.corda.nodeapi.BrokerRpcSslOptions import net.corda.nodeapi.BrokerRpcSslOptions
import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
@ -51,20 +52,19 @@ class ArtemisRpcBroker internal constructor(
} }
} }
@Suppress("TooGenericExceptionCaught")
override fun start() { override fun start() {
logger.debug { "Artemis RPC broker is starting for: $addresses" } logger.debug { "Artemis RPC broker is starting for: $addresses" }
try { try {
server.start() server.startSynchronously()
} catch (e: IOException) { } catch (e: Throwable) {
logger.error("Unable to start message broker", e) logger.error("Unable to start message broker", e)
if (e.isBindingError()) { if (e.isBindingError()) {
throw AddressBindingException(adminAddressOptional?.let { setOf(it, addresses.primary) } ?: setOf(addresses.primary)) throw AddressBindingException(adminAddressOptional?.let { setOf(it, addresses.primary) } ?: setOf(addresses.primary))
} else { } else {
logger.error("Unexpected error starting message broker", e)
throw e throw e
} }
} catch (th: Throwable) {
logger.error("Unexpected error starting message broker", th)
throw th
} }
logger.debug("Artemis RPC broker is started.") logger.debug("Artemis RPC broker is started.")
} }
@ -90,7 +90,6 @@ class ArtemisRpcBroker internal constructor(
val serverSecurityManager = createArtemisSecurityManager(serverConfiguration.loginListener) val serverSecurityManager = createArtemisSecurityManager(serverConfiguration.loginListener)
return ActiveMQServerImpl(serverConfiguration, serverSecurityManager).apply { return ActiveMQServerImpl(serverConfiguration, serverSecurityManager).apply {
registerActivationFailureListener { exception -> throw exception }
registerPostQueueDeletionCallback { address, qName -> logger.debug("Queue deleted: $qName for $address") } registerPostQueueDeletionCallback { address, qName -> logger.debug("Queue deleted: $qName for $address") }
} }
} }

View File

@ -12,8 +12,8 @@ import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcAcceptorTcpTr
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcInternalAcceptorTcpTransport import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcInternalAcceptorTcpTransport
import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.MutualSslConfiguration
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.security.Role import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings import org.apache.activemq.artemis.core.settings.impl.AddressSettings
@ -37,14 +37,14 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
} }
acceptorConfigurations = acceptorConfigurationsSet acceptorConfigurations = acceptorConfigurationsSet
queueConfigurations = queueConfigurations() queueConfigs = queueConfigurations()
managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS) managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS)
addressesSettings = mapOf( addressesSettings = mapOf(
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply { "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
maxSizeBytes = 5L * maxMessageSize maxSizeBytes = 5L * maxMessageSize
addressFullMessagePolicy = AddressFullMessagePolicy.PAGE addressFullMessagePolicy = AddressFullMessagePolicy.PAGE
pageSizeBytes = 1L * maxMessageSize pageSizeBytes = maxMessageSize
} }
) )
@ -76,7 +76,11 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
securityRoles["${ArtemisMessagingComponent.INTERNAL_PREFIX}#"] = setOf(nodeInternalRole) securityRoles["${ArtemisMessagingComponent.INTERNAL_PREFIX}#"] = setOf(nodeInternalRole)
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(BrokerJaasLoginModule.RPC_ROLE, send = true)) securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(BrokerJaasLoginModule.RPC_ROLE, send = true))
securitySettingPlugins.add(rolesAdderOnLogin) securitySettingPlugins.add(rolesAdderOnLogin)
securityInvalidationInterval = ArtemisMessagingComponent.SECURITY_INVALIDATION_INTERVAL
// Effectively disable security cache as permissions might change dynamically when e.g. DB is updated
authenticationCacheSize = 0
authorizationCacheSize = 0
securityInvalidationInterval = 0
} }
private fun enableJmx() { private fun enableJmx() {
@ -85,19 +89,19 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
} }
private fun initialiseSettings(maxMessageSize: Int, journalBufferTimeout: Int?) { private fun initialiseSettings(maxMessageSize: Int, journalBufferTimeout: Int?) {
// Enable built in message deduplication. Note we still have to do our own as the delayed commits // Enable built-in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates. // and our own definition of commit means that the built-in deduplication cannot remove all the duplicates.
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true isPersistIDCache = true
isPopulateValidatedUser = true isPopulateValidatedUser = true
journalBufferSize_NIO = maxMessageSize // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store. journalBufferSize_NIO = maxMessageSize // Artemis default is 490 KB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
journalBufferTimeout_NIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio() journalBufferTimeout_NIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio()
journalBufferSize_AIO = maxMessageSize // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store. journalBufferSize_AIO = maxMessageSize // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
journalBufferTimeout_AIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutAio() journalBufferTimeout_AIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutAio()
journalFileSize = maxMessageSize // The size of each journal file in bytes. Artemis default is 10MiB. journalFileSize = maxMessageSize // The size of each journal file in bytes. Artemis default is 10 MB.
} }
private fun queueConfigurations(): List<CoreQueueConfiguration> { private fun queueConfigurations(): List<QueueConfiguration> {
return listOf( return listOf(
queueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME, durable = false), queueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME, durable = false),
queueConfiguration( queueConfiguration(
@ -122,15 +126,8 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
pagingDirectory = (baseDirectory / "paging").toString() pagingDirectory = (baseDirectory / "paging").toString()
} }
private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration { private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): QueueConfiguration {
val configuration = CoreQueueConfiguration() return QueueConfiguration(name).setAddress(address).setFilterString(filter).setDurable(durable)
configuration.name = name
configuration.address = address
configuration.filterString = filter
configuration.isDurable = durable
return configuration
} }
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,

View File

@ -0,0 +1,22 @@
package net.corda.node.utilities.artemis
import net.corda.core.utilities.getOrThrow
import org.apache.activemq.artemis.core.server.ActivateCallback
import org.apache.activemq.artemis.core.server.ActiveMQServer
import java.util.concurrent.CompletableFuture
fun ActiveMQServer.startSynchronously() {
val startupFuture = CompletableFuture<Unit>()
registerActivateCallback(object: ActivateCallback {
override fun activationComplete() {
startupFuture.complete(Unit)
}
})
registerActivationFailureListener {
startupFuture.completeExceptionally(it)
}
start()
startupFuture.getOrThrow()
}

View File

@ -61,9 +61,7 @@ class RevocationCheckTest(private val revocationMode: RevocationConfig.Mode) {
private lateinit var tlsCert: X509Certificate private lateinit var tlsCert: X509Certificate
private val chain private val chain
get() = listOf(tlsCert, nodeCACert, doormanCert, rootCert).map { get() = listOf(tlsCert, nodeCACert, doormanCert, rootCert).toTypedArray()
javax.security.cert.X509Certificate.getInstance(it.encoded)
}.toTypedArray()
@Before @Before
fun before() { fun before() {

View File

@ -1,24 +1,34 @@
package net.corda.node.internal.artemis package net.corda.node.internal.artemis
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.doReturn import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.doThrow
import com.nhaarman.mockito_kotlin.whenever import com.nhaarman.mockito_kotlin.whenever
import net.corda.coretesting.internal.rigorousMock import net.corda.coretesting.internal.rigorousMock
import net.corda.nodeapi.internal.ArtemisMessagingComponent import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME import net.corda.testing.core.BOB_NAME
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl
import org.apache.activemq.artemis.core.server.ServerSession import org.apache.activemq.artemis.core.server.ServerSession
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage
import org.assertj.core.api.Assertions import org.assertj.core.api.Assertions
import org.junit.Test import org.junit.Test
class UserValidationPluginTest { class UserValidationPluginTest {
private val plugin = UserValidationPlugin() private val plugin = UserValidationPlugin()
private val coreMessage = ClientMessageImpl(0, false, 0, System.currentTimeMillis(), 4.toByte(), 1024) private val coreMessage = ClientMessageImpl(0, false, 0, System.currentTimeMillis(),
private val amqpMessage get() = AMQPConverter.getInstance().fromCore(coreMessage) 4.toByte(), 1024)
private val amqpMessage: AMQPMessage
get() {
return rigorousMock<AMQPMessage>().also {
doReturn(coreMessage.validatedUserID).whenever(it).getStringProperty(Message.HDR_VALIDATED_USER)
}
}
private val session = rigorousMock<ServerSession>().also { private val session = rigorousMock<ServerSession>().also {
doReturn(ArtemisMessagingComponent.PEER_USER).whenever(it).username doReturn(ArtemisMessagingComponent.PEER_USER).whenever(it).username
doReturn(ALICE_NAME.toString()).whenever(it).validatedUser doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
@ -31,16 +41,17 @@ class UserValidationPluginTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `accept AMQP message with user`() { fun `accept AMQP message with user`() {
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", ALICE_NAME.toString()) coreMessage.validatedUserID = ALICE_NAME.toString()
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false) plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `reject AMQP message with different user`() { fun `reject AMQP message with different user`() {
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString()) coreMessage.validatedUserID = BOB_NAME.toString()
val localAmqpMessage = amqpMessage
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false) plugin.beforeSend(session, rigorousMock(), localAmqpMessage, direct = false, noAutoCreateQueue = false)
}.withMessageContaining("_AMQ_VALIDATED_USER") }.withMessageContaining(Message.HDR_VALIDATED_USER.toString())
} }
@Test(timeout = 300_000) @Test(timeout = 300_000)
@ -49,7 +60,7 @@ class UserValidationPluginTest {
doReturn(ArtemisMessagingComponent.NODE_P2P_USER).whenever(it).username doReturn(ArtemisMessagingComponent.NODE_P2P_USER).whenever(it).username
doReturn(ALICE_NAME.toString()).whenever(it).validatedUser doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
} }
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString()) coreMessage.validatedUserID = BOB_NAME.toString()
plugin.beforeSend(internalSession, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false) plugin.beforeSend(internalSession, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
} }
@ -62,11 +73,8 @@ class UserValidationPluginTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `reject message with exception`() { fun `reject message with exception`() {
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString()) val messageWithException = rigorousMock<AMQPMessage>().also {
val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) { doThrow(IllegalStateException("My exception")).whenever(it).getStringProperty(any<SimpleString>())
override fun getStringProperty(key: SimpleString?): String {
throw IllegalStateException("My exception")
}
} }
// Artemis swallows all exceptions except ActiveMQException, so making sure that proper exception is thrown // Artemis swallows all exceptions except ActiveMQException, so making sure that proper exception is thrown
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
@ -76,9 +84,8 @@ class UserValidationPluginTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `reject message with security exception`() { fun `reject message with security exception`() {
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString()) val messageWithException = object : AMQPStandardMessage(0, ByteArray(0), null) {
val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) { override fun getApplicationPropertiesMap(createIfAbsent: Boolean): MutableMap<String, Any> {
override fun getStringProperty(key: SimpleString?): String {
throw ActiveMQSecurityException("My security exception") throw ActiveMQSecurityException("My security exception")
} }
} }

View File

@ -35,6 +35,7 @@ import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.fromUserList import net.corda.testing.internal.fromUserList
import net.corda.testing.node.NotarySpec import net.corda.testing.node.NotarySpec
import net.corda.testing.node.User import net.corda.testing.node.User
import org.apache.activemq.artemis.api.core.QueueConfiguration
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ActiveMQClient
@ -42,7 +43,6 @@ import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BA
import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.activemq.artemis.core.config.Configuration import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
@ -201,30 +201,18 @@ data class RPCDriverDSL(
journalBufferSize_NIO = maxFileSize journalBufferSize_NIO = maxFileSize
journalBufferSize_AIO = maxFileSize journalBufferSize_AIO = maxFileSize
journalFileSize = maxFileSize journalFileSize = maxFileSize
queueConfigurations = listOf( queueConfigs = listOf(
CoreQueueConfiguration().apply { QueueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME).setAddress(RPCApi.RPC_SERVER_QUEUE_NAME).setDurable(false),
name = RPCApi.RPC_SERVER_QUEUE_NAME QueueConfiguration(RPCApi.RPC_CLIENT_BINDING_REMOVALS).setAddress(notificationAddress)
address = RPCApi.RPC_SERVER_QUEUE_NAME .setFilterString(RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION).setDurable(false),
isDurable = false QueueConfiguration(RPCApi.RPC_CLIENT_BINDING_ADDITIONS).setAddress(notificationAddress)
}, .setFilterString(RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION).setDurable(false)
CoreQueueConfiguration().apply {
name = RPCApi.RPC_CLIENT_BINDING_REMOVALS
address = notificationAddress
filterString = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION
isDurable = false
},
CoreQueueConfiguration().apply {
name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS
address = notificationAddress
filterString = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION
isDurable = false
}
) )
addressesSettings = mapOf( addressesSettings = mapOf(
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply { "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
maxSizeBytes = maxBufferedBytesPerClient maxSizeBytes = maxBufferedBytesPerClient
addressFullMessagePolicy = AddressFullMessagePolicy.PAGE addressFullMessagePolicy = AddressFullMessagePolicy.PAGE
pageSizeBytes = maxSizeBytes / 10 pageSizeBytes = maxSizeBytes.toInt() / 10
} }
) )
} }
@ -259,7 +247,7 @@ data class RPCDriverDSL(
* Starts an In-VM RPC server. Note that only a single one may be started. * Starts an In-VM RPC server. Note that only a single one may be started.
* *
* @param rpcUser The single user who can access the server through RPC, and their permissions. * @param rpcUser The single user who can access the server through RPC, and their permissions.
* @param nodeLegalName The legal name of the node to check against to authenticate a super user. * @param nodeLegalName The legal name of the node to check against to authenticate a superuser.
* @param configuration The RPC server configuration. * @param configuration The RPC server configuration.
* @param ops The server-side implementation of the RPC interface. * @param ops The server-side implementation of the RPC interface.
*/ */
@ -338,7 +326,7 @@ data class RPCDriverDSL(
* *
* @param serverName The name of the server, to be used for the folder created for Artemis files. * @param serverName The name of the server, to be used for the folder created for Artemis files.
* @param rpcUser The single user who can access the server through RPC, and their permissions. * @param rpcUser The single user who can access the server through RPC, and their permissions.
* @param nodeLegalName The legal name of the node to check against to authenticate a super user. * @param nodeLegalName The legal name of the node to check against to authenticate a superuser.
* @param configuration The RPC server configuration. * @param configuration The RPC server configuration.
* @param listOps The server-side implementation of the RPC interfaces. * @param listOps The server-side implementation of the RPC interfaces.
*/ */

View File

@ -94,6 +94,10 @@
<AppenderRef ref="Console-ErrorCode-Appender"/> <AppenderRef ref="Console-ErrorCode-Appender"/>
<AppenderRef ref="RollingFile-ErrorCode-Appender"/> <AppenderRef ref="RollingFile-ErrorCode-Appender"/>
</Logger> </Logger>
<Logger name="org.apache.activemq.audit" level="error" additivity="false">
<AppenderRef ref="Console-ErrorCode-Appender"/>
<AppenderRef ref="RollingFile-ErrorCode-Appender"/>
</Logger>
<Logger name="org.jolokia" additivity="true" level="warn"> <Logger name="org.jolokia" additivity="true" level="warn">
<AppenderRef ref="Console-ErrorCode-Appender-Println"/> <AppenderRef ref="Console-ErrorCode-Appender-Println"/>
<AppenderRef ref="RollingFile-ErrorCode-Appender"/> <AppenderRef ref="RollingFile-ErrorCode-Appender"/>