diff --git a/.ci/dev/regression/Jenkinsfile b/.ci/dev/regression/Jenkinsfile
index b49d14bd73..abd5e0f408 100644
--- a/.ci/dev/regression/Jenkinsfile
+++ b/.ci/dev/regression/Jenkinsfile
@@ -18,6 +18,20 @@ killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
* Sense environment
*/
boolean isReleaseTag = (env.TAG_NAME =~ /^release-.*(?version-properties"
+ def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: //'").trim()
+ def groupId = sh (returnStdout: true, script: "grep ^group: version-properties | sed -e 's/^group: //'").trim()
+ def artifactId = 'corda'
+ nexusAppId = "jenkins-${groupId}-${artifactId}-${version}"
+ }
+ nexusPolicyEvaluation (
+ failBuildOnNetworkError: false,
+ iqApplication: manualApplication(nexusAppId),
+ iqScanPatterns: [[scanPattern: 'node/capsule/build/libs/corda*.jar']],
+ iqStage: nexusIqStage
+ )
+ }
+ }
+
stage('Deploy Nodes') {
steps {
sh "./gradlew --no-daemon jar deployNodes"
diff --git a/detekt-baseline.xml b/detekt-baseline.xml
index 2c667858ab..9471d7b8c1 100644
--- a/detekt-baseline.xml
+++ b/detekt-baseline.xml
@@ -1617,6 +1617,7 @@
TooGenericExceptionCaught:TransformTypes.kt$TransformTypes.Companion$e: IndexOutOfBoundsException
TooGenericExceptionCaught:TransitionExecutorImpl.kt$TransitionExecutorImpl$exception: Exception
TooGenericExceptionCaught:Try.kt$Try.Companion$t: Throwable
+ TooGenericExceptionCaught:UserValidationPlugin.kt$UserValidationPlugin$e: Throwable
TooGenericExceptionCaught:Utils.kt$e: Exception
TooGenericExceptionCaught:V1NodeConfigurationSpec.kt$V1NodeConfigurationSpec$e: Exception
TooGenericExceptionCaught:ValidatingNotaryFlow.kt$ValidatingNotaryFlow$e: Exception
diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt
index c26aa74ada..5ce3db919c 100644
--- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt
+++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/protonwrapper/netty/AMQPChannelHandler.kt
@@ -10,6 +10,7 @@ import io.netty.handler.proxy.ProxyConnectionEvent
import io.netty.handler.ssl.SniCompletionEvent
import io.netty.handler.ssl.SslHandler
import io.netty.handler.ssl.SslHandshakeCompletionEvent
+import io.netty.handler.ssl.SslHandshakeTimeoutException
import io.netty.util.ReferenceCountUtil
import net.corda.core.identity.CordaX500Name
import net.corda.core.utilities.contextLogger
@@ -295,8 +296,8 @@ internal class AMQPChannelHandler(private val serverMode: Boolean,
// This happens when the peer node is closed during SSL establishment.
when {
cause is ClosedChannelException -> logWarnWithMDC("SSL Handshake closed early.")
+ cause is SslHandshakeTimeoutException -> logWarnWithMDC("SSL Handshake timed out")
// Sadly the exception thrown by Netty wrapper requires that we check the message.
- cause is SSLException && cause.message == "handshake timed out" -> logWarnWithMDC("SSL Handshake timed out")
cause is SSLException && (cause.message?.contains("close_notify") == true)
-> logWarnWithMDC("Received close_notify during handshake")
// io.netty.handler.ssl.SslHandler.setHandshakeFailureTransportFailure()
diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt
index 261532c1e3..923bb0d794 100644
--- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityAsNodeTest.kt
@@ -1,6 +1,7 @@
package net.corda.services.messaging
import net.corda.core.crypto.Crypto
+import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.createDirectories
import net.corda.core.internal.exists
@@ -14,6 +15,8 @@ import net.corda.nodeapi.internal.crypto.CertificateType
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.nodeapi.internal.loadDevCaTrustStore
import net.corda.coretesting.internal.stubs.CertificateStoreStubs
+import net.corda.nodeapi.internal.ArtemisMessagingComponent
+import net.corda.testing.core.singleIdentity
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
@@ -22,8 +25,10 @@ import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.bouncycastle.asn1.x509.GeneralName
import org.bouncycastle.asn1.x509.GeneralSubtree
import org.bouncycastle.asn1.x509.NameConstraints
+import org.junit.Ignore
import org.junit.Test
import java.nio.file.Files
+import kotlin.test.assertEquals
/**
* Runs the security tests with the attacker pretending to be a node on the network.
@@ -37,6 +42,7 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
attacker.start(PEER_USER, PEER_USER) // Login as a peer
}
+ @Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
@Test(timeout=300_000)
fun `send message to RPC requests address`() {
assertSendAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
@@ -117,4 +123,17 @@ class MQSecurityAsNodeTest : P2PMQSecurityTest() {
attacker.start(PEER_USER, PEER_USER)
}
}
+
+ @Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
+ override fun `send message to notifications address`() {
+ }
+
+ @Test(timeout=300_000)
+ fun `send message on core protocol`() {
+ val message = attacker.createMessage()
+ assertEquals(true, attacker.producer.isBlockOnNonDurableSend)
+ assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
+ attacker.producer.send("${ArtemisMessagingComponent.P2P_PREFIX}${alice.info.singleIdentity().owningKey.toStringShort()}", message)
+ }.withMessageContaining("CoreMessage").withMessageContaining("AMQPMessage")
+ }
}
diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt
index 2586a67ced..3fbaa29a37 100644
--- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt
@@ -79,7 +79,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
}
@Test(timeout=300_000)
- fun `send message to notifications address`() {
+ open fun `send message to notifications address`() {
assertSendAttackFails(NOTIFICATIONS_ADDRESS)
}
diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMQSecurityTest.kt
index 240ad1007d..068c521a6d 100644
--- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMQSecurityTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMQSecurityTest.kt
@@ -7,6 +7,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
+import org.junit.Ignore
import org.junit.Test
/**
@@ -25,6 +26,7 @@ abstract class P2PMQSecurityTest : MQSecurityTest() {
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
}
+ @Ignore("Core protocol messages are no allowed for PEER_USER: need to switch to AMQP")
@Test(timeout=300_000)
fun `send message to address of peer which has been communicated with`() {
val bobParty = startBobAndCommunicateWithAlice()
diff --git a/node/src/main/kotlin/net/corda/node/internal/artemis/UserValidationPlugin.kt b/node/src/main/kotlin/net/corda/node/internal/artemis/UserValidationPlugin.kt
new file mode 100644
index 0000000000..963f5169a6
--- /dev/null
+++ b/node/src/main/kotlin/net/corda/node/internal/artemis/UserValidationPlugin.kt
@@ -0,0 +1,47 @@
+package net.corda.node.internal.artemis
+
+import net.corda.core.utilities.contextLogger
+import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
+import org.apache.activemq.artemis.api.core.Message
+import org.apache.activemq.artemis.core.server.ServerSession
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin
+import org.apache.activemq.artemis.core.transaction.Transaction
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
+
+/**
+ * Plugin to verify the user in the AMQP message header against the user in the authenticated session.
+ *
+ * In core protocol, Artemis Server automatically overwrites the _AMQ_VALIDATED_USER field in message header according to authentication
+ * of the session. However, this is not done for AMQP protocol, which is used by Corda. Hence, _AMQ_VALIDATED_USER in AMQP packet is
+ * delivered in the same form, as it was produced by counterpart. To prevent manipulations of this field by other peers, we should check
+ * message header against user in authenticated session.
+ *
+ * Note that AMQP message is immutable, so changing the header means rebuilding the whole message, which is expensive. Instead, the
+ * preferred option is to throw an exception.
+ */
+class UserValidationPlugin : ActiveMQServerPlugin {
+ companion object {
+ private val log = contextLogger()
+ }
+
+ override fun beforeSend(session: ServerSession, tx: Transaction?, message: Message, direct: Boolean, noAutoCreateQueue: Boolean) {
+ try {
+ if (session.username == PEER_USER) {
+ if (message !is AMQPMessage) {
+ throw ActiveMQSecurityException("Invalid message type: expected [${AMQPMessage::class.java.name}], got [${message.javaClass.name}]")
+ }
+ val user = message.getStringProperty(Message.HDR_VALIDATED_USER)
+ if (user != null && user != session.validatedUser) {
+ throw ActiveMQSecurityException("_AMQ_VALIDATED_USER mismatch: expected [${session.validatedUser}], got [${user}]")
+ }
+ }
+ } catch (e: ActiveMQSecurityException) {
+ throw e
+ } catch (e: Throwable) {
+ // Artemis swallows any exception except ActiveMQException
+ log.error("Message validation failed", e)
+ throw ActiveMQSecurityException("Message validation failed")
+ }
+ }
+}
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt
index ec2aafb1d8..277d51742b 100644
--- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt
+++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt
@@ -149,6 +149,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
isJMXManagementEnabled = true
isJMXUseBrokerName = true
}
+ // Validate user in AMQP message header against authenticated session
+ registerBrokerPlugin(UserValidationPlugin())
}.configureAddressSecurity()
diff --git a/node/src/test/kotlin/net/corda/node/internal/artemis/UserValidationPluginTest.kt b/node/src/test/kotlin/net/corda/node/internal/artemis/UserValidationPluginTest.kt
new file mode 100644
index 0000000000..3f316258e7
--- /dev/null
+++ b/node/src/test/kotlin/net/corda/node/internal/artemis/UserValidationPluginTest.kt
@@ -0,0 +1,89 @@
+package net.corda.node.internal.artemis
+
+import com.nhaarman.mockito_kotlin.doReturn
+import com.nhaarman.mockito_kotlin.whenever
+import net.corda.coretesting.internal.rigorousMock
+import net.corda.nodeapi.internal.ArtemisMessagingComponent
+import net.corda.testing.core.ALICE_NAME
+import net.corda.testing.core.BOB_NAME
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
+import org.apache.activemq.artemis.api.core.SimpleString
+import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl
+import org.apache.activemq.artemis.core.server.ServerSession
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
+import org.apache.activemq.artemis.protocol.amqp.converter.AMQPConverter
+import org.assertj.core.api.Assertions
+import org.junit.Test
+
+class UserValidationPluginTest {
+ private val plugin = UserValidationPlugin()
+ private val coreMessage = ClientMessageImpl(0, false, 0, System.currentTimeMillis(), 4.toByte(), 1024)
+ private val amqpMessage get() = AMQPConverter.getInstance().fromCore(coreMessage)
+ private val session = rigorousMock().also {
+ doReturn(ArtemisMessagingComponent.PEER_USER).whenever(it).username
+ doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
+ }
+
+ @Test(timeout = 300_000)
+ fun `accept AMQP message without user`() {
+ plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
+ }
+
+ @Test(timeout = 300_000)
+ fun `accept AMQP message with user`() {
+ coreMessage.putStringProperty("_AMQ_VALIDATED_USER", ALICE_NAME.toString())
+ plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
+ }
+
+ @Test(timeout = 300_000)
+ fun `reject AMQP message with different user`() {
+ coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
+ Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
+ plugin.beforeSend(session, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
+ }.withMessageContaining("_AMQ_VALIDATED_USER")
+ }
+
+ @Test(timeout = 300_000)
+ fun `accept AMQP message with different user on internal session`() {
+ val internalSession = rigorousMock().also {
+ doReturn(ArtemisMessagingComponent.NODE_P2P_USER).whenever(it).username
+ doReturn(ALICE_NAME.toString()).whenever(it).validatedUser
+ }
+ coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
+ plugin.beforeSend(internalSession, rigorousMock(), amqpMessage, direct = false, noAutoCreateQueue = false)
+ }
+
+ @Test(timeout = 300_000)
+ fun `reject core message`() {
+ Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
+ plugin.beforeSend(session, rigorousMock(), coreMessage, direct = false, noAutoCreateQueue = false)
+ }.withMessageContaining("message type")
+ }
+
+ @Test(timeout = 300_000)
+ fun `reject message with exception`() {
+ coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
+ val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) {
+ override fun getStringProperty(key: SimpleString?): String {
+ throw IllegalStateException("My exception")
+ }
+ }
+ // Artemis swallows all exceptions except ActiveMQException, so making sure that proper exception is thrown
+ Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
+ plugin.beforeSend(session, rigorousMock(), messageWithException, direct = false, noAutoCreateQueue = false)
+ }.withMessageContaining("Message validation failed")
+ }
+
+ @Test(timeout = 300_000)
+ fun `reject message with security exception`() {
+ coreMessage.putStringProperty("_AMQ_VALIDATED_USER", BOB_NAME.toString())
+ val messageWithException = object : AMQPMessage(0, amqpMessage.buffer.array(), null) {
+ override fun getStringProperty(key: SimpleString?): String {
+ throw ActiveMQSecurityException("My security exception")
+ }
+ }
+ Assertions.assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
+ plugin.beforeSend(session, rigorousMock(), messageWithException, direct = false, noAutoCreateQueue = false)
+ }.withMessageContaining("My security exception")
+ }
+}
\ No newline at end of file