mirror of
https://github.com/corda/corda.git
synced 2025-01-19 11:16:54 +00:00
Merge pull request #6412 from corda/denis/CORDA-3856-amqp-header-os-4.6
NOTICK: OS 4.5 to 4.6 merge
This commit is contained in:
commit
650a66b053
33
.ci/dev/regression/Jenkinsfile
vendored
33
.ci/dev/regression/Jenkinsfile
vendored
@ -18,6 +18,20 @@ killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
|
||||
* Sense environment
|
||||
*/
|
||||
boolean isReleaseTag = (env.TAG_NAME =~ /^release-.*(?<!_JDK11)$/)
|
||||
/*
|
||||
** calculate the stage for NexusIQ evaluation
|
||||
** * build for snapshots
|
||||
** * stage-release: for release candidates and for health checks
|
||||
** * operate: for final release
|
||||
*/
|
||||
def nexusIqStage = "build"
|
||||
if (isReleaseTag) {
|
||||
switch (env.TAG_NAME) {
|
||||
case ~/.*-RC\d+(-.*)?/: nexusIqStage = "stage-release"; break;
|
||||
case ~/.*-HC\d+(-.*)?/: nexusIqStage = "stage-release"; break;
|
||||
default: nexusIqStage = "operate"
|
||||
}
|
||||
}
|
||||
|
||||
pipeline {
|
||||
agent { label 'k8s' }
|
||||
@ -37,6 +51,25 @@ pipeline {
|
||||
}
|
||||
|
||||
stages {
|
||||
stage('Sonatype Check') {
|
||||
steps {
|
||||
sh "./gradlew --no-daemon clean jar"
|
||||
script {
|
||||
sh "./gradlew --no-daemon properties | grep -E '^(version|group):' >version-properties"
|
||||
def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: //'").trim()
|
||||
def groupId = sh (returnStdout: true, script: "grep ^group: version-properties | sed -e 's/^group: //'").trim()
|
||||
def artifactId = 'corda'
|
||||
nexusAppId = "jenkins-${groupId}-${artifactId}-${version}"
|
||||
}
|
||||
nexusPolicyEvaluation (
|
||||
failBuildOnNetworkError: false,
|
||||
iqApplication: manualApplication(nexusAppId),
|
||||
iqScanPatterns: [[scanPattern: 'node/capsule/build/libs/corda*.jar']],
|
||||
iqStage: nexusIqStage
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
stage('Deploy Nodes') {
|
||||
steps {
|
||||
sh "./gradlew --no-daemon jar deployNodes"
|
||||
|
@ -1617,6 +1617,7 @@
|
||||
<ID>TooGenericExceptionCaught:TransformTypes.kt$TransformTypes.Companion$e: IndexOutOfBoundsException</ID>
|
||||
<ID>TooGenericExceptionCaught:TransitionExecutorImpl.kt$TransitionExecutorImpl$exception: Exception</ID>
|
||||
<ID>TooGenericExceptionCaught:Try.kt$Try.Companion$t: Throwable</ID>
|
||||
<ID>TooGenericExceptionCaught:UserValidationPlugin.kt$UserValidationPlugin$e: Throwable</ID>
|
||||
<ID>TooGenericExceptionCaught:Utils.kt$e: Exception</ID>
|
||||
<ID>TooGenericExceptionCaught:V1NodeConfigurationSpec.kt$V1NodeConfigurationSpec$e: Exception</ID>
|
||||
<ID>TooGenericExceptionCaught:ValidatingNotaryFlow.kt$ValidatingNotaryFlow$e: Exception</ID>
|
||||
|
@ -10,6 +10,7 @@ import io.netty.handler.proxy.ProxyConnectionEvent
|
||||
import io.netty.handler.ssl.SniCompletionEvent
|
||||
import io.netty.handler.ssl.SslHandler
|
||||
import io.netty.handler.ssl.SslHandshakeCompletionEvent
|
||||
import io.netty.handler.ssl.SslHandshakeTimeoutException
|
||||
import io.netty.util.ReferenceCountUtil
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.utilities.contextLogger
|
||||
@ -295,8 +296,8 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
|
||||
// This happens when the peer node is closed during SSL establishment.
|
||||
when {
|
||||
cause is ClosedChannelException -> logWarnWithMDC("SSL Handshake closed early.")
|
||||
cause is SslHandshakeTimeoutException -> logWarnWithMDC("SSL Handshake timed out")
|
||||
// Sadly the exception thrown by Netty wrapper requires that we check the message.
|
||||
cause is SSLException && cause.message == "handshake timed out" -> logWarnWithMDC("SSL Handshake timed out")
|
||||
cause is SSLException && (cause.message?.contains("close_notify") == true)
|
||||
-> logWarnWithMDC("Received close_notify during handshake")
|
||||
// io.netty.handler.ssl.SslHandler.setHandshakeFailureTransportFailure()
|
||||
|
@ -1,6 +1,7 @@
|
||||
package net.corda.services.messaging
|
||||
|
||||
import net.corda.core.crypto.Crypto
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.createDirectories
|
||||
import net.corda.core.internal.exists
|
||||
@ -14,6 +15,8 @@ import net.corda.nodeapi.internal.crypto.CertificateType
|
||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||
import net.corda.nodeapi.internal.loadDevCaTrustStore
|
||||
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
|
||||
@ -22,8 +25,10 @@ import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||
import org.bouncycastle.asn1.x509.GeneralName
|
||||
import org.bouncycastle.asn1.x509.GeneralSubtree
|
||||
import org.bouncycastle.asn1.x509.NameConstraints
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
import java.nio.file.Files
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
/**
|
||||
* Runs the security tests with the attacker pretending to be a node on the network.
|
||||
@ -37,6 +42,7 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
|
||||
attacker.start(PEER_USER, PEER_USER) // Login as a peer
|
||||
}
|
||||
|
||||
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
|
||||
@Test(timeout=300_000)
|
||||
fun `send message to RPC requests address`() {
|
||||
assertSendAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
@ -117,4 +123,17 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
|
||||
attacker.start(PEER_USER, PEER_USER)
|
||||
}
|
||||
}
|
||||
|
||||
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
|
||||
override fun `send message to notifications address`() {
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `send message on core protocol`() {
|
||||
val message = attacker.createMessage()
|
||||
assertEquals(true, attacker.producer.isBlockOnNonDurableSend)
|
||||
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
attacker.producer.send("${ArtemisMessagingComponent.P2P_PREFIX}${alice.info.singleIdentity().owningKey.toStringShort()}", message)
|
||||
}.withMessageContaining("CoreMessage").withMessageContaining("AMQPMessage")
|
||||
}
|
||||
}
|
||||
|
@ -79,7 +79,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `send message to notifications address`() {
|
||||
open fun `send message to notifications address`() {
|
||||
assertSendAttackFails(NOTIFICATIONS_ADDRESS)
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import org.junit.Ignore
|
||||
import org.junit.Test
|
||||
|
||||
/**
|
||||
@ -25,6 +26,7 @@ abstract class P2PMQSecurityTest : MQSecurityTest() {
|
||||
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
|
||||
}
|
||||
|
||||
@Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
|
||||
@Test(timeout=300_000)
|
||||
fun `send message to address of peer which has been communicated with`() {
|
||||
val bobParty = startBobAndCommunicateWithAlice()
|
||||
|
@ -0,0 +1,47 @@
|
||||
package net.corda.node.internal.artemis
|
||||
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.core.server.ServerSession
|
||||
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin
|
||||
import org.apache.activemq.artemis.core.transaction.Transaction
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
|
||||
|
||||
/**
|
||||
* Plugin to verify the user in the AMQP message header against the user in the authenticated session.
|
||||
*
|
||||
* In core protocol, Artemis Server automatically overwrites the _AMQ_VALIDATED_USER field in message header according to authentication
|
||||
* of the session. However, this is not done for AMQP protocol, which is used by Corda. Hence, _AMQ_VALIDATED_USER in AMQP packet is
|
||||
* delivered in the same form, as it was produced by counterpart. To prevent manipulations of this field by other peers, we should check
|
||||
* message header against user in authenticated session.
|
||||
*
|
||||
* Note that AMQP message is immutable, so changing the header means rebuilding the whole message, which is expensive. Instead, the
|
||||
* preferred option is to throw an exception.
|
||||
*/
|
||||
class UserValidationPlugin : ActiveMQServerPlugin {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
override fun beforeSend(session: ServerSession, tx: Transaction?, message: Message, direct: Boolean, noAutoCreateQueue: Boolean) {
|
||||
try {
|
||||
if (session.username == PEER_USER) {
|
||||
if (message !is AMQPMessage) {
|
||||
throw ActiveMQSecurityException("Invalid message type: expected [${AMQPMessage::class.java.name}], got [${message.javaClass.name}]")
|
||||
}
|
||||
val user = message.getStringProperty(Message.HDR_VALIDATED_USER)
|
||||
if (user != null && user != session.validatedUser) {
|
||||
throw ActiveMQSecurityException("_AMQ_VALIDATED_USER mismatch: expected [${session.validatedUser}], got [${user}]")
|
||||
}
|
||||
}
|
||||
} catch (e: ActiveMQSecurityException) {
|
||||
throw e
|
||||
} catch (e: Throwable) {
|
||||
// Artemis swallows any exception except ActiveMQException
|
||||
log.error("Message validation failed", e)
|
||||
throw ActiveMQSecurityException("Message validation failed")
|
||||
}
|
||||
}
|
||||
}
|
@ -149,6 +149,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
||||
isJMXManagementEnabled = true
|
||||
isJMXUseBrokerName = true
|
||||
}
|
||||
// Validate user in AMQP message header against authenticated session
|
||||
registerBrokerPlugin(UserValidationPlugin())
|
||||
|
||||
}.configureAddressSecurity()
|
||||
|
||||
|
@ -0,0 +1,89 @@
|
||||
package net.corda.node.internal.artemis
|
||||
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.coretesting.internal.rigorousMock
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl
|
||||
import org.apache.activemq.artemis.core.server.ServerSession
|
||||
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
|
||||
import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter
|
||||
import org.assertj.core.api.Assertions
|
||||
import org.junit.Test
|
||||
|
||||
class UserValidationPluginTest {
|
||||
private val plugin = UserValidationPlugin()
|
||||
private val coreMessage = ClientMessageImpl(0, false, 0, System.currentTimeMillis(), 4.toByte(), 1024)
|
||||
private val amqpMessage get() = AMQPConverter.getInstance().fromCore(coreMessage)
|
||||
private val session = rigorousMock<ServerSession>().also {
|
||||
doReturn(ArtemisMessagingComponent.PEER_USER).whenever(it).username
|
||||
doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `accept AMQP message without user`() {
|
||||
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `accept AMQP message with user`() {
|
||||
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", ALICE_NAME.toString())
|
||||
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reject AMQP message with different user`() {
|
||||
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
|
||||
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
|
||||
}.withMessageContaining("_AMQ_VALIDATED_USER")
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `accept AMQP message with different user on internal session`() {
|
||||
val internalSession = rigorousMock<ServerSession>().also {
|
||||
doReturn(ArtemisMessagingComponent.NODE_P2P_USER).whenever(it).username
|
||||
doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
|
||||
}
|
||||
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
|
||||
plugin.beforeSend(internalSession, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reject core message`() {
|
||||
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
plugin.beforeSend(session, rigorousMock(), coreMessage, direct = false, noAutoCreateQueue = false)
|
||||
}.withMessageContaining("message type")
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reject message with exception`() {
|
||||
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
|
||||
val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) {
|
||||
override fun getStringProperty(key: SimpleString?): String {
|
||||
throw IllegalStateException("My exception")
|
||||
}
|
||||
}
|
||||
// Artemis swallows all exceptions except ActiveMQException, so making sure that proper exception is thrown
|
||||
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
plugin.beforeSend(session, rigorousMock(), messageWithException, direct = false, noAutoCreateQueue = false)
|
||||
}.withMessageContaining("Message validation failed")
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `reject message with security exception`() {
|
||||
coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
|
||||
val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) {
|
||||
override fun getStringProperty(key: SimpleString?): String {
|
||||
throw ActiveMQSecurityException("My security exception")
|
||||
}
|
||||
}
|
||||
Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
|
||||
plugin.beforeSend(session, rigorousMock(), messageWithException, direct = false, noAutoCreateQueue = false)
|
||||
}.withMessageContaining("My security exception")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user