mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
CORDA-3867: Add tests for AMQ_VALIDATED_USER (#6418)
* CORDA-3867: Add tests for AMQ_VALIDATED_USER * CORDA-3867: detekt
This commit is contained in:
parent
9f12e6bbc5
commit
adc0879e8e
@ -1598,6 +1598,7 @@
|
||||
<ID>TooGenericExceptionCaught:ScheduledFlowIntegrationTests.kt$ScheduledFlowIntegrationTests$ex: Exception</ID>
|
||||
<ID>TooGenericExceptionCaught:SerializationOutputTests.kt$SerializationOutputTests$t: Throwable</ID>
|
||||
<ID>TooGenericExceptionCaught:ShutdownManager.kt$ShutdownManager$t: Throwable</ID>
|
||||
<ID>TooGenericExceptionCaught:SimpleAMQPClient.kt$SimpleAMQPClient$e: Exception</ID>
|
||||
<ID>TooGenericExceptionCaught:SimpleMQClient.kt$SimpleMQClient$e: Exception</ID>
|
||||
<ID>TooGenericExceptionCaught:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager$e: Exception</ID>
|
||||
<ID>TooGenericExceptionCaught:SingleThreadedStateMachineManager.kt$SingleThreadedStateMachineManager$ex: Exception</ID>
|
||||
|
@ -191,7 +191,8 @@ dependencies {
|
||||
// Integration test helpers
|
||||
integrationTestCompile "junit:junit:$junit_version"
|
||||
integrationTestCompile "org.assertj:assertj-core:${assertj_version}"
|
||||
|
||||
integrationTestCompile "org.apache.qpid:qpid-jms-client:${protonj_version}"
|
||||
|
||||
// BFT-Smart dependencies
|
||||
compile 'com.github.bft-smart:library:master-v1.1-beta-g6215ec8-87'
|
||||
compile 'commons-codec:commons-codec:1.13'
|
||||
|
@ -16,6 +16,7 @@ import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.loadDevCaTrustStore
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.services.messaging.SimpleAMQPClient.Companion.sendAndVerify
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
|
||||
@ -25,9 +26,9 @@ import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.bouncycastle.asn1.x509.GeneralName
|
||||
import org.bouncycastle.asn1.x509.GeneralSubtree
|
||||
import org.bouncycastle.asn1.x509.NameConstraints
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.nio.file.Files
|
||||
import javax.jms.JMSSecurityException
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
/**
|
||||
@ -42,10 +43,9 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
|
||||
attacker.start(PEER_USER, PEER_USER) // Login as a peer
|
||||
}
|
||||
|
||||
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
|
||||
@Test(timeout=300_000)
|
||||
fun `send message to RPC requests address`() {
|
||||
assertSendAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
assertProducerQueueCreationAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@ -124,16 +124,52 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
|
||||
}
|
||||
}
|
||||
|
||||
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
|
||||
override fun `send message to notifications address`() {
|
||||
assertProducerQueueCreationAttackFails(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `send message on core protocol`() {
|
||||
val attacker = clientTo(alice.node.configuration.p2pAddress)
|
||||
attacker.start(PEER_USER, PEER_USER)
|
||||
val message = attacker.createMessage()
|
||||
assertEquals(true, attacker.producer.isBlockOnNonDurableSend)
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
attacker.producer.send("${ArtemisMessagingComponent.P2P_PREFIX}${alice.info.singleIdentity().owningKey.toStringShort()}", message)
|
||||
}.withMessageContaining("CoreMessage").withMessageContaining("AMQPMessage")
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `send AMQP message with correct validated user in header`() {
|
||||
val attacker = amqpClientTo(alice.node.configuration.p2pAddress)
|
||||
val session = attacker.start(PEER_USER, PEER_USER)
|
||||
val message = session.createMessage()
|
||||
message.setStringProperty("_AMQ_VALIDATED_USER", "O=MegaCorp, L=London, C=GB")
|
||||
val queue = session.createQueue("${ArtemisMessagingComponent.P2P_PREFIX}${alice.info.singleIdentity().owningKey.toStringShort()}")
|
||||
val producer = session.createProducer(queue)
|
||||
producer.sendAndVerify(message)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `send AMQP message with incorrect validated user in header`() {
|
||||
val attacker = amqpClientTo(alice.node.configuration.p2pAddress)
|
||||
val session = attacker.start(PEER_USER, PEER_USER)
|
||||
val message = session.createMessage()
|
||||
message.setStringProperty("_AMQ_VALIDATED_USER", "O=Bob, L=New York, C=US")
|
||||
val queue = session.createQueue("${ArtemisMessagingComponent.P2P_PREFIX}${alice.info.singleIdentity().owningKey.toStringShort()}")
|
||||
val producer = session.createProducer(queue)
|
||||
assertThatExceptionOfType(JMSSecurityException::class.java).isThrownBy {
|
||||
producer.sendAndVerify(message)
|
||||
}.withMessageContaining("_AMQ_VALIDATED_USER mismatch")
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `send AMQP message without header`() {
|
||||
val attacker = amqpClientTo(alice.node.configuration.p2pAddress)
|
||||
val session = attacker.start(PEER_USER, PEER_USER)
|
||||
val message = session.createMessage()
|
||||
val queue = session.createQueue("${ArtemisMessagingComponent.P2P_PREFIX}${alice.info.singleIdentity().owningKey.toStringShort()}")
|
||||
val producer = session.createProducer(queue)
|
||||
producer.sendAndVerify(message)
|
||||
}
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
private val rpcUser = User("user1", "pass", permissions = emptySet())
|
||||
lateinit var alice: NodeWithInfo
|
||||
lateinit var attacker: SimpleMQClient
|
||||
private val clients = ArrayList<SimpleMQClient>()
|
||||
private val runOnStop = ArrayList<() -> Any?>()
|
||||
|
||||
@Before
|
||||
override fun setUp() {
|
||||
@ -62,8 +62,8 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
abstract fun startAttacker(attacker: SimpleMQClient)
|
||||
|
||||
@After
|
||||
fun stopClients() {
|
||||
clients.forEach { it.stop() }
|
||||
fun tearDown() {
|
||||
runOnStop.forEach { it() }
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
@ -97,18 +97,21 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
|
||||
fun clientTo(target: NetworkHostAndPort, sslConfiguration: MutualSslConfiguration? = configureTestSSL(CordaX500Name("MegaCorp", "London", "GB"))): SimpleMQClient {
|
||||
val client = SimpleMQClient(target, sslConfiguration)
|
||||
clients += client
|
||||
runOnStop += client::stop
|
||||
return client
|
||||
}
|
||||
|
||||
fun amqpClientTo(target: NetworkHostAndPort,
|
||||
sslConfiguration: MutualSslConfiguration = configureTestSSL(CordaX500Name("MegaCorp", "London", "GB"))
|
||||
): SimpleAMQPClient {
|
||||
val client = SimpleAMQPClient(target, sslConfiguration)
|
||||
runOnStop += client::stop
|
||||
return client
|
||||
}
|
||||
|
||||
private val rpcConnections = mutableListOf<CordaRPCConnection>()
|
||||
private fun loginToRPC(target: NetworkHostAndPort, rpcUser: User): CordaRPCOps {
|
||||
return CordaRPCClient(target).start(rpcUser.username, rpcUser.password).also { rpcConnections.add(it) }.proxy
|
||||
}
|
||||
|
||||
@After
|
||||
fun closeRPCConnections() {
|
||||
rpcConnections.forEach { it.forceClose() }
|
||||
return CordaRPCClient(target).start(rpcUser.username, rpcUser.password).also { runOnStop += it::forceClose }.proxy
|
||||
}
|
||||
|
||||
fun loginToRPCAndGetClientQueue(): String {
|
||||
@ -152,7 +155,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
}
|
||||
}
|
||||
|
||||
fun assertSendAttackFails(address: String) {
|
||||
open fun assertSendAttackFails(address: String) {
|
||||
val message = attacker.createMessage()
|
||||
assertEquals(true, attacker.producer.isBlockOnNonDurableSend)
|
||||
assertAttackFails(address, "SEND") {
|
||||
|
@ -3,18 +3,43 @@ package net.corda.services.messaging
|
||||
import net.corda.core.crypto.generateKeyPair
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.services.messaging.SimpleAMQPClient.Companion.sendAndVerify
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import org.junit.Ignore
|
||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.junit.Test
|
||||
import javax.jms.JMSException
|
||||
|
||||
/**
|
||||
* Runs a series of MQ-related attacks against a node. Subclasses need to call [startAttacker] to connect
|
||||
* the attacker to [alice].
|
||||
*/
|
||||
abstract class P2PMQSecurityTest : MQSecurityTest() {
|
||||
override fun assertSendAttackFails(address: String) {
|
||||
val attacker = amqpClientTo(alice.node.configuration.p2pAddress)
|
||||
val session = attacker.start(ArtemisMessagingComponent.PEER_USER, ArtemisMessagingComponent.PEER_USER)
|
||||
val message = session.createMessage()
|
||||
message.setStringProperty("_AMQ_VALIDATED_USER", "O=MegaCorp, L=London, C=GB")
|
||||
val queue = session.createQueue(address)
|
||||
assertThatExceptionOfType(JMSException::class.java).isThrownBy {
|
||||
session.createProducer(queue).sendAndVerify(message)
|
||||
}.withMessageContaining(address).withMessageContaining("SEND")
|
||||
}
|
||||
|
||||
fun assertProducerQueueCreationAttackFails(address: String) {
|
||||
val attacker = amqpClientTo(alice.node.configuration.p2pAddress)
|
||||
val session = attacker.start(ArtemisMessagingComponent.PEER_USER, ArtemisMessagingComponent.PEER_USER)
|
||||
val message = session.createMessage()
|
||||
message.setStringProperty("_AMQ_VALIDATED_USER", "O=MegaCorp, L=London, C=GB")
|
||||
val queue = session.createQueue(address)
|
||||
assertThatExceptionOfType(JMSException::class.java).isThrownBy {
|
||||
session.createProducer(queue)
|
||||
}.withMessageContaining(address).withMessageContaining("CREATE_DURABLE_QUEUE")
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `consume message from P2P queue`() {
|
||||
assertConsumeAttackFails("$P2P_PREFIX${alice.info.singleIdentity().owningKey.toStringShort()}")
|
||||
@ -26,7 +51,6 @@ abstract class P2PMQSecurityTest : MQSecurityTest() {
|
||||
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
|
||||
@Test(timeout=300_000)
|
||||
fun `send message to address of peer which has been communicated with`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
|
@ -0,0 +1,141 @@
|
||||
package net.corda.services.messaging
|
||||
|
||||
import net.corda.core.internal.concurrent.openFuture
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import org.apache.qpid.jms.JmsConnectionFactory
|
||||
import org.apache.qpid.jms.meta.JmsConnectionInfo
|
||||
import org.apache.qpid.jms.provider.Provider
|
||||
import org.apache.qpid.jms.provider.ProviderFuture
|
||||
import org.apache.qpid.jms.provider.amqp.AmqpProvider
|
||||
import org.apache.qpid.jms.provider.amqp.AmqpSaslAuthenticator
|
||||
import org.apache.qpid.jms.sasl.PlainMechanism
|
||||
import org.apache.qpid.jms.transports.TransportOptions
|
||||
import org.apache.qpid.jms.transports.netty.NettyTcpTransport
|
||||
import org.apache.qpid.proton.engine.Sasl
|
||||
import org.apache.qpid.proton.engine.SaslListener
|
||||
import org.apache.qpid.proton.engine.Transport
|
||||
import java.net.URI
|
||||
import java.security.SecureRandom
|
||||
import java.util.concurrent.ExecutionException
|
||||
import java.util.concurrent.TimeUnit
|
||||
import javax.jms.CompletionListener
|
||||
import javax.jms.Connection
|
||||
import javax.jms.Message
|
||||
import javax.jms.MessageProducer
|
||||
import javax.jms.Session
|
||||
import javax.net.ssl.KeyManagerFactory
|
||||
import javax.net.ssl.SSLContext
|
||||
import javax.net.ssl.TrustManagerFactory
|
||||
|
||||
/**
|
||||
* Simple AMQP client connecting to broker using JMS.
|
||||
*/
|
||||
class SimpleAMQPClient(private val target: NetworkHostAndPort, private val config: MutualSslConfiguration) {
|
||||
companion object {
|
||||
/**
|
||||
* Send message and wait for completion.
|
||||
* @throws Exception on failure
|
||||
*/
|
||||
fun MessageProducer.sendAndVerify(message: Message) {
|
||||
val request = openFuture<Unit>()
|
||||
send(message, object : CompletionListener {
|
||||
override fun onException(message: Message, exception: Exception) {
|
||||
request.setException(exception)
|
||||
}
|
||||
|
||||
override fun onCompletion(message: Message) {
|
||||
request.set(Unit)
|
||||
}
|
||||
})
|
||||
try {
|
||||
request.get(10, TimeUnit.SECONDS)
|
||||
} catch (e: ExecutionException) {
|
||||
throw e.cause!!
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private lateinit var connection: Connection
|
||||
|
||||
private fun sslContext(): SSLContext {
|
||||
val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()).apply {
|
||||
init(config.keyStore.get().value.internal, config.keyStore.entryPassword.toCharArray())
|
||||
}
|
||||
val trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()).apply {
|
||||
init(config.trustStore.get().value.internal)
|
||||
}
|
||||
val sslContext = SSLContext.getInstance("TLS")
|
||||
val keyManagers = keyManagerFactory.keyManagers
|
||||
val trustManagers = trustManagerFactory.trustManagers
|
||||
sslContext.init(keyManagers, trustManagers, SecureRandom())
|
||||
return sslContext
|
||||
}
|
||||
|
||||
fun start(username: String, password: String): Session {
|
||||
val connectionFactory = TestJmsConnectionFactory("amqps://${target.host}:${target.port}", username, password)
|
||||
connectionFactory.setSslContext(sslContext())
|
||||
connection = connectionFactory.createConnection()
|
||||
connection.start()
|
||||
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
try {
|
||||
connection.close()
|
||||
} catch (e: Exception) {
|
||||
// connection might not have initialised.
|
||||
}
|
||||
}
|
||||
|
||||
private class TestJmsConnectionFactory(uri: String, private val user: String, private val pwd: String) : JmsConnectionFactory(uri) {
|
||||
override fun createProvider(remoteURI: URI): Provider {
|
||||
val transportOptions = TransportOptions().apply {
|
||||
// Disable SNI check for server certificate
|
||||
isVerifyHost = false
|
||||
}
|
||||
val transport = NettyTcpTransport(remoteURI, transportOptions, true)
|
||||
|
||||
// Manually override SASL negotiations to accept failure in SASL-OUTCOME, which is produced by node Artemis server
|
||||
return object : AmqpProvider(remoteURI, transport) {
|
||||
override fun connect(connectionInfo: JmsConnectionInfo?) {
|
||||
super.connect(connectionInfo)
|
||||
val sasl = protonTransport.sasl()
|
||||
sasl.client()
|
||||
sasl.setRemoteHostname(remoteURI.host)
|
||||
val authenticator = AmqpSaslAuthenticator {
|
||||
PlainMechanism().apply {
|
||||
username = user
|
||||
password = pwd
|
||||
}
|
||||
}
|
||||
val saslRequest = ProviderFuture()
|
||||
sasl.setListener(object : SaslListener {
|
||||
override fun onSaslMechanisms(sasl: Sasl, transport: Transport) {
|
||||
authenticator.handleSaslMechanisms(sasl, transport)
|
||||
}
|
||||
|
||||
override fun onSaslChallenge(sasl: Sasl, transport: Transport) {
|
||||
authenticator.handleSaslChallenge(sasl, transport)
|
||||
}
|
||||
|
||||
override fun onSaslOutcome(sasl: Sasl, transport: Transport) {
|
||||
authenticator.handleSaslOutcome(sasl, transport)
|
||||
saslRequest.onSuccess()
|
||||
}
|
||||
|
||||
override fun onSaslInit(sasl: Sasl, transport: Transport) {
|
||||
}
|
||||
|
||||
override fun onSaslResponse(sasl: Sasl, transport: Transport) {
|
||||
}
|
||||
})
|
||||
pumpToProtonTransport()
|
||||
saslRequest.sync()
|
||||
}
|
||||
}.apply {
|
||||
isSaslLayer = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user