CORDA-3856: Add Artemis plugin for validating AMQP message header and type (#6407)

This commit is contained in:
Denis Rekalov 2020-06-29 09:23:29 +01:00 committed by GitHub
parent b21a3c33cd
commit 3f03de6fbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 161 additions and 1 deletions

View File

@ -1639,6 +1639,7 @@
<ID>TooGenericExceptionCaught:TransformTypes.kt$TransformTypes.Companion$e: IndexOutOfBoundsException</ID>
<ID>TooGenericExceptionCaught:TransitionExecutorImpl.kt$TransitionExecutorImpl$exception: Exception</ID>
<ID>TooGenericExceptionCaught:Try.kt$Try.Companion$t: Throwable</ID>
<ID>TooGenericExceptionCaught:UserValidationPlugin.kt$UserValidationPlugin$e: Throwable</ID>
<ID>TooGenericExceptionCaught:Utils.kt$e: Exception</ID>
<ID>TooGenericExceptionCaught:V1NodeConfigurationSpec.kt$V1NodeConfigurationSpec$e: Exception</ID>
<ID>TooGenericExceptionCaught:ValidatingNotaryFlow.kt$ValidatingNotaryFlow$e: Exception</ID>

View File

@ -1,6 +1,7 @@
package net.corda.services.messaging
import net.corda.core.crypto.Crypto
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.createDirectories
import net.corda.core.internal.exists
@ -14,6 +15,8 @@ import net.corda.nodeapi.internal.crypto.CertificateType
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.loadDevCaTrustStore
import net.corda.testing.internal.stubs.CertificateStoreStubs
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.testing.core.singleIdentity
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
@ -22,8 +25,10 @@ import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.bouncycastle.asn1.x509.GeneralName
import org.bouncycastle.asn1.x509.GeneralSubtree
import org.bouncycastle.asn1.x509.NameConstraints
import org.junit.Ignore
import org.junit.Test
import java.nio.file.Files
import kotlin.test.assertEquals
/**
* Runs the security tests with the attacker pretending to be a node on the network.
@ -37,6 +42,7 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
attacker.start(PEER_USER, PEER_USER) // Login as a peer
}
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
@Test(timeout=300_000)
fun `send message to RPC requests address`() {
assertSendAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
@ -117,4 +123,17 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
attacker.start(PEER_USER, PEER_USER)
}
}
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
override fun `send message to notifications address`() {
}
@Test(timeout=300_000)
fun `send message on core protocol`() {
val message = attacker.createMessage()
assertEquals(true, attacker.producer.isBlockOnNonDurableSend)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.producer.send("${ArtemisMessagingComponent.P2P_PREFIX}${alice.info.singleIdentity().owningKey.toStringShort()}", message)
}.withMessageContaining("CoreMessage").withMessageContaining("AMQPMessage")
}
}

View File

@ -79,7 +79,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
}
@Test(timeout=300_000)
fun `send message to notifications address`() {
open fun `send message to notifications address`() {
assertSendAttackFails(NOTIFICATIONS_ADDRESS)
}

View File

@ -7,6 +7,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Ignore
import org.junit.Test
/**
@ -25,6 +26,7 @@ abstract class P2PMQSecurityTest : MQSecurityTest() {
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
}
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
@Test(timeout=300_000)
fun `send message to address of peer which has been communicated with`() {
val bobParty = startBobAndCommunicateWithAlice()

View File

@ -0,0 +1,47 @@
package net.corda.node.internal.artemis
import net.corda.core.utilities.contextLogger
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.core.server.ServerSession
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin
import org.apache.activemq.artemis.core.transaction.Transaction
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
/**
* Plugin to verify the user in the AMQP message header against the user in the authenticated session.
*
* In core protocol, Artemis Server automatically overwrites the _AMQ_VALIDATED_USER field in message header according to authentication
* of the session. However, this is not done for AMQP protocol, which is used by Corda. Hence, _AMQ_VALIDATED_USER in AMQP packet is
* delivered in the same form, as it was produced by counterpart. To prevent manipulations of this field by other peers, we should check
* message header against user in authenticated session.
*
* Note that AMQP message is immutable, so changing the header means rebuilding the whole message, which is expensive. Instead, the
* preferred option is to throw an exception.
*/
class UserValidationPlugin : ActiveMQServerPlugin {
companion object {
private val log = contextLogger()
}
override fun beforeSend(session: ServerSession, tx: Transaction?, message: Message, direct: Boolean, noAutoCreateQueue: Boolean) {
try {
if (session.username == PEER_USER) {
if (message !is AMQPMessage) {
throw ActiveMQSecurityException("Invalid message type: expected [${AMQPMessage::class.java.name}], got [${message.javaClass.name}]")
}
val user = message.getStringProperty(Message.HDR_VALIDATED_USER)
if (user != null && user != session.validatedUser) {
throw ActiveMQSecurityException("_AMQ_VALIDATED_USER mismatch: expected [${session.validatedUser}], got [${user}]")
}
}
} catch (e: ActiveMQSecurityException) {
throw e
} catch (e: Throwable) {
// Artemis swallows any exception except ActiveMQException
log.error("Message validation failed", e)
throw ActiveMQSecurityException("Message validation failed")
}
}
}

View File

@ -142,6 +142,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
isJMXManagementEnabled = true
isJMXUseBrokerName = true
}
// Validate user in AMQP message header against authenticated session
registerBrokerPlugin(UserValidationPlugin())
}.configureAddressSecurity()

View File

@ -0,0 +1,89 @@
package net.corda.node.internal.artemis
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl
import org.apache.activemq.artemis.core.server.ServerSession
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter
import org.assertj.core.api.Assertions
import org.junit.Test
class UserValidationPluginTest {
private val plugin = UserValidationPlugin()
private val coreMessage = ClientMessageImpl(0, false, 0, System.currentTimeMillis(), 4.toByte(), 1024)
private val amqpMessage get() = AMQPConverter.getInstance().fromCore(coreMessage)
private val session = rigorousMock<ServerSession>().also {
doReturn(ArtemisMessagingComponent.PEER_USER).whenever(it).username
doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
}
@Test(timeout = 300_000)
fun `accept AMQP message without user`() {
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
}
@Test(timeout = 300_000)
fun `accept AMQP message with user`() {
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", ALICE_NAME.toString())
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
}
@Test(timeout = 300_000)
fun `reject AMQP message with different user`() {
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
}.withMessageContaining("_AMQ_VALIDATED_USER")
}
@Test(timeout = 300_000)
fun `accept AMQP message with different user on internal session`() {
val internalSession = rigorousMock<ServerSession>().also {
doReturn(ArtemisMessagingComponent.NODE_P2P_USER).whenever(it).username
doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
}
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
plugin.beforeSend(internalSession, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
}
@Test(timeout = 300_000)
fun `reject core message`() {
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
plugin.beforeSend(session, rigorousMock(), coreMessage, direct = false, noAutoCreateQueue = false)
}.withMessageContaining("message type")
}
@Test(timeout = 300_000)
fun `reject message with exception`() {
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) {
override fun getStringProperty(key: SimpleString?): String {
throw IllegalStateException("My exception")
}
}
// Artemis swallows all exceptions except ActiveMQException, so making sure that proper exception is thrown
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
plugin.beforeSend(session, rigorousMock(), messageWithException, direct = false, noAutoCreateQueue = false)
}.withMessageContaining("Message validation failed")
}
@Test(timeout = 300_000)
fun `reject message with security exception`() {
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) {
override fun getStringProperty(key: SimpleString?): String {
throw ActiveMQSecurityException("My security exception")
}
}
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
plugin.beforeSend(session, rigorousMock(), messageWithException, direct = false, noAutoCreateQueue = false)
}.withMessageContaining("My security exception")
}
}