mirror of
https://github.com/corda/corda.git
synced 2024-12-21 22:07:55 +00:00
Merge pull request #6410 from corda/denis/CORDA-3856-amqp-header-os-4.5
OS 4.4 to 4.5 merge
This commit is contained in:
commit
5228817fb4
33
.ci/dev/regression/Jenkinsfile
vendored
33
.ci/dev/regression/Jenkinsfile
vendored
@ -18,6 +18,20 @@ killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
|
|||||||
* Sense environment
|
* Sense environment
|
||||||
*/
|
*/
|
||||||
boolean isReleaseTag = (env.TAG_NAME =~ /^release-.*(?<!_JDK11)$/)
|
boolean isReleaseTag = (env.TAG_NAME =~ /^release-.*(?<!_JDK11)$/)
|
||||||
|
/*
|
||||||
|
** calculate the stage for NexusIQ evaluation
|
||||||
|
** * build for snapshots
|
||||||
|
** * stage-release: for release candidates and for health checks
|
||||||
|
** * operate: for final release
|
||||||
|
*/
|
||||||
|
def nexusIqStage = "build"
|
||||||
|
if (isReleaseTag) {
|
||||||
|
switch (env.TAG_NAME) {
|
||||||
|
case ~/.*-RC\d+(-.*)?/: nexusIqStage = "stage-release"; break;
|
||||||
|
case ~/.*-HC\d+(-.*)?/: nexusIqStage = "stage-release"; break;
|
||||||
|
default: nexusIqStage = "operate"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pipeline {
|
pipeline {
|
||||||
agent { label 'k8s' }
|
agent { label 'k8s' }
|
||||||
@ -36,6 +50,25 @@ pipeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
stages {
|
stages {
|
||||||
|
stage('Sonatype Check') {
|
||||||
|
steps {
|
||||||
|
sh "./gradlew --no-daemon clean jar"
|
||||||
|
script {
|
||||||
|
sh "./gradlew --no-daemon properties | grep -E '^(version|group):' >version-properties"
|
||||||
|
def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: //'").trim()
|
||||||
|
def groupId = sh (returnStdout: true, script: "grep ^group: version-properties | sed -e 's/^group: //'").trim()
|
||||||
|
def artifactId = 'corda'
|
||||||
|
nexusAppId = "jenkins-${groupId}-${artifactId}-${version}"
|
||||||
|
}
|
||||||
|
nexusPolicyEvaluation (
|
||||||
|
failBuildOnNetworkError: false,
|
||||||
|
iqApplication: manualApplication(nexusAppId),
|
||||||
|
iqScanPatterns: [[scanPattern: 'node/capsule/build/libs/corda*.jar']],
|
||||||
|
iqStage: nexusIqStage
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
stage('Deploy Nodes') {
|
stage('Deploy Nodes') {
|
||||||
steps {
|
steps {
|
||||||
sh "./gradlew --no-daemon jar deployNodes"
|
sh "./gradlew --no-daemon jar deployNodes"
|
||||||
|
@ -1617,6 +1617,7 @@
|
|||||||
<ID>TooGenericExceptionCaught:TransformTypes.kt$TransformTypes.Companion$e: IndexOutOfBoundsException</ID>
|
<ID>TooGenericExceptionCaught:TransformTypes.kt$TransformTypes.Companion$e: IndexOutOfBoundsException</ID>
|
||||||
<ID>TooGenericExceptionCaught:TransitionExecutorImpl.kt$TransitionExecutorImpl$exception: Exception</ID>
|
<ID>TooGenericExceptionCaught:TransitionExecutorImpl.kt$TransitionExecutorImpl$exception: Exception</ID>
|
||||||
<ID>TooGenericExceptionCaught:Try.kt$Try.Companion$t: Throwable</ID>
|
<ID>TooGenericExceptionCaught:Try.kt$Try.Companion$t: Throwable</ID>
|
||||||
|
<ID>TooGenericExceptionCaught:UserValidationPlugin.kt$UserValidationPlugin$e: Throwable</ID>
|
||||||
<ID>TooGenericExceptionCaught:Utils.kt$e: Exception</ID>
|
<ID>TooGenericExceptionCaught:Utils.kt$e: Exception</ID>
|
||||||
<ID>TooGenericExceptionCaught:V1NodeConfigurationSpec.kt$V1NodeConfigurationSpec$e: Exception</ID>
|
<ID>TooGenericExceptionCaught:V1NodeConfigurationSpec.kt$V1NodeConfigurationSpec$e: Exception</ID>
|
||||||
<ID>TooGenericExceptionCaught:ValidatingNotaryFlow.kt$ValidatingNotaryFlow$e: Exception</ID>
|
<ID>TooGenericExceptionCaught:ValidatingNotaryFlow.kt$ValidatingNotaryFlow$e: Exception</ID>
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package net.corda.services.messaging
|
package net.corda.services.messaging
|
||||||
|
|
||||||
import net.corda.core.crypto.Crypto
|
import net.corda.core.crypto.Crypto
|
||||||
|
import net.corda.core.crypto.toStringShort
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.internal.createDirectories
|
import net.corda.core.internal.createDirectories
|
||||||
import net.corda.core.internal.exists
|
import net.corda.core.internal.exists
|
||||||
@ -14,6 +15,8 @@ import net.corda.nodeapi.internal.crypto.CertificateType
|
|||||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||||
import net.corda.nodeapi.internal.loadDevCaTrustStore
|
import net.corda.nodeapi.internal.loadDevCaTrustStore
|
||||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
|
import net.corda.testing.core.singleIdentity
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
|
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
|
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
|
||||||
@ -22,8 +25,10 @@ import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
|||||||
import org.bouncycastle.asn1.x509.GeneralName
|
import org.bouncycastle.asn1.x509.GeneralName
|
||||||
import org.bouncycastle.asn1.x509.GeneralSubtree
|
import org.bouncycastle.asn1.x509.GeneralSubtree
|
||||||
import org.bouncycastle.asn1.x509.NameConstraints
|
import org.bouncycastle.asn1.x509.NameConstraints
|
||||||
|
import org.junit.Ignore
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.nio.file.Files
|
import java.nio.file.Files
|
||||||
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Runs the security tests with the attacker pretending to be a node on the network.
|
* Runs the security tests with the attacker pretending to be a node on the network.
|
||||||
@ -37,6 +42,7 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
|
|||||||
attacker.start(PEER_USER, PEER_USER) // Login as a peer
|
attacker.start(PEER_USER, PEER_USER) // Login as a peer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
|
||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `send message to RPC requests address`() {
|
fun `send message to RPC requests address`() {
|
||||||
assertSendAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
|
assertSendAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||||
@ -117,4 +123,17 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
|
|||||||
attacker.start(PEER_USER, PEER_USER)
|
attacker.start(PEER_USER, PEER_USER)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
|
||||||
|
override fun `send message to notifications address`() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300_000)
|
||||||
|
fun `send message on core protocol`() {
|
||||||
|
val message = attacker.createMessage()
|
||||||
|
assertEquals(true, attacker.producer.isBlockOnNonDurableSend)
|
||||||
|
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||||
|
attacker.producer.send("${ArtemisMessagingComponent.P2P_PREFIX}${alice.info.singleIdentity().owningKey.toStringShort()}", message)
|
||||||
|
}.withMessageContaining("CoreMessage").withMessageContaining("AMQPMessage")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -79,7 +79,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `send message to notifications address`() {
|
open fun `send message to notifications address`() {
|
||||||
assertSendAttackFails(NOTIFICATIONS_ADDRESS)
|
assertSendAttackFails(NOTIFICATIONS_ADDRESS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,6 +7,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
|||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
import net.corda.testing.core.BOB_NAME
|
import net.corda.testing.core.BOB_NAME
|
||||||
import net.corda.testing.core.singleIdentity
|
import net.corda.testing.core.singleIdentity
|
||||||
|
import org.junit.Ignore
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -25,6 +26,7 @@ abstract class P2PMQSecurityTest : MQSecurityTest() {
|
|||||||
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
|
||||||
@Test(timeout=300_000)
|
@Test(timeout=300_000)
|
||||||
fun `send message to address of peer which has been communicated with`() {
|
fun `send message to address of peer which has been communicated with`() {
|
||||||
val bobParty = startBobAndCommunicateWithAlice()
|
val bobParty = startBobAndCommunicateWithAlice()
|
||||||
|
@ -0,0 +1,47 @@
|
|||||||
|
package net.corda.node.internal.artemis
|
||||||
|
|
||||||
|
import net.corda.core.utilities.contextLogger
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||||
|
import org.apache.activemq.artemis.api.core.Message
|
||||||
|
import org.apache.activemq.artemis.core.server.ServerSession
|
||||||
|
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin
|
||||||
|
import org.apache.activemq.artemis.core.transaction.Transaction
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Plugin to verify the user in the AMQP message header against the user in the authenticated session.
|
||||||
|
*
|
||||||
|
* In core protocol, Artemis Server automatically overwrites the _AMQ_VALIDATED_USER field in message header according to authentication
|
||||||
|
* of the session. However, this is not done for AMQP protocol, which is used by Corda. Hence, _AMQ_VALIDATED_USER in AMQP packet is
|
||||||
|
* delivered in the same form, as it was produced by counterpart. To prevent manipulations of this field by other peers, we should check
|
||||||
|
* message header against user in authenticated session.
|
||||||
|
*
|
||||||
|
* Note that AMQP message is immutable, so changing the header means rebuilding the whole message, which is expensive. Instead, the
|
||||||
|
* preferred option is to throw an exception.
|
||||||
|
*/
|
||||||
|
class UserValidationPlugin : ActiveMQServerPlugin {
|
||||||
|
companion object {
|
||||||
|
private val log = contextLogger()
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun beforeSend(session: ServerSession, tx: Transaction?, message: Message, direct: Boolean, noAutoCreateQueue: Boolean) {
|
||||||
|
try {
|
||||||
|
if (session.username == PEER_USER) {
|
||||||
|
if (message !is AMQPMessage) {
|
||||||
|
throw ActiveMQSecurityException("Invalid message type: expected [${AMQPMessage::class.java.name}], got [${message.javaClass.name}]")
|
||||||
|
}
|
||||||
|
val user = message.getStringProperty(Message.HDR_VALIDATED_USER)
|
||||||
|
if (user != null && user != session.validatedUser) {
|
||||||
|
throw ActiveMQSecurityException("_AMQ_VALIDATED_USER mismatch: expected [${session.validatedUser}], got [${user}]")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e: ActiveMQSecurityException) {
|
||||||
|
throw e
|
||||||
|
} catch (e: Throwable) {
|
||||||
|
// Artemis swallows any exception except ActiveMQException
|
||||||
|
log.error("Message validation failed", e)
|
||||||
|
throw ActiveMQSecurityException("Message validation failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -149,6 +149,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
|||||||
isJMXManagementEnabled = true
|
isJMXManagementEnabled = true
|
||||||
isJMXUseBrokerName = true
|
isJMXUseBrokerName = true
|
||||||
}
|
}
|
||||||
|
// Validate user in AMQP message header against authenticated session
|
||||||
|
registerBrokerPlugin(UserValidationPlugin())
|
||||||
|
|
||||||
}.configureAddressSecurity()
|
}.configureAddressSecurity()
|
||||||
|
|
||||||
|
@ -0,0 +1,89 @@
|
|||||||
|
package net.corda.node.internal.artemis
|
||||||
|
|
||||||
|
import com.nhaarman.mockito_kotlin.doReturn
|
||||||
|
import com.nhaarman.mockito_kotlin.whenever
|
||||||
|
import net.corda.coretesting.internal.rigorousMock
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
|
import net.corda.testing.core.ALICE_NAME
|
||||||
|
import net.corda.testing.core.BOB_NAME
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
|
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl
|
||||||
|
import org.apache.activemq.artemis.core.server.ServerSession
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
|
||||||
|
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter
|
||||||
|
import org.assertj.core.api.Assertions
|
||||||
|
import org.junit.Test
|
||||||
|
|
||||||
|
class UserValidationPluginTest {
|
||||||
|
private val plugin = UserValidationPlugin()
|
||||||
|
private val coreMessage = ClientMessageImpl(0, false, 0, System.currentTimeMillis(), 4.toByte(), 1024)
|
||||||
|
private val amqpMessage get() = AMQPConverter.getInstance().fromCore(coreMessage)
|
||||||
|
private val session = rigorousMock<ServerSession>().also {
|
||||||
|
doReturn(ArtemisMessagingComponent.PEER_USER).whenever(it).username
|
||||||
|
doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300_000)
|
||||||
|
fun `accept AMQP message without user`() {
|
||||||
|
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300_000)
|
||||||
|
fun `accept AMQP message with user`() {
|
||||||
|
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", ALICE_NAME.toString())
|
||||||
|
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300_000)
|
||||||
|
fun `reject AMQP message with different user`() {
|
||||||
|
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
|
||||||
|
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||||
|
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
|
||||||
|
}.withMessageContaining("_AMQ_VALIDATED_USER")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300_000)
|
||||||
|
fun `accept AMQP message with different user on internal session`() {
|
||||||
|
val internalSession = rigorousMock<ServerSession>().also {
|
||||||
|
doReturn(ArtemisMessagingComponent.NODE_P2P_USER).whenever(it).username
|
||||||
|
doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
|
||||||
|
}
|
||||||
|
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
|
||||||
|
plugin.beforeSend(internalSession, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300_000)
|
||||||
|
fun `reject core message`() {
|
||||||
|
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||||
|
plugin.beforeSend(session, rigorousMock(), coreMessage, direct = false, noAutoCreateQueue = false)
|
||||||
|
}.withMessageContaining("message type")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300_000)
|
||||||
|
fun `reject message with exception`() {
|
||||||
|
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
|
||||||
|
val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) {
|
||||||
|
override fun getStringProperty(key: SimpleString?): String {
|
||||||
|
throw IllegalStateException("My exception")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Artemis swallows all exceptions except ActiveMQException, so making sure that proper exception is thrown
|
||||||
|
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||||
|
plugin.beforeSend(session, rigorousMock(), messageWithException, direct = false, noAutoCreateQueue = false)
|
||||||
|
}.withMessageContaining("Message validation failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300_000)
|
||||||
|
fun `reject message with security exception`() {
|
||||||
|
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
|
||||||
|
val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) {
|
||||||
|
override fun getStringProperty(key: SimpleString?): String {
|
||||||
|
throw ActiveMQSecurityException("My security exception")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||||
|
plugin.beforeSend(session, rigorousMock(), messageWithException, direct = false, noAutoCreateQueue = false)
|
||||||
|
}.withMessageContaining("My security exception")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user