CORDA-1170: Define and whitelist the Artemis/AMQP application headers that are accepted by Corda (#2728)

* Whitelist headers copied across bridges

* Address PR comments
This commit is contained in:
Matthew Nesbit 2018-03-07 08:56:58 +00:00 committed by GitHub
parent f682503396
commit 519644ce0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 71 additions and 40 deletions

View File

@ -7,6 +7,8 @@ import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.NetworkHostAndPort
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.SimpleString
import java.security.PublicKey
/**
@ -28,6 +30,49 @@ class ArtemisMessagingComponent {
const val BRIDGE_CONTROL = "${INTERNAL_PREFIX}bridge.control"
const val BRIDGE_NOTIFY = "${INTERNAL_PREFIX}bridge.notify"
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
/**
* In the operation mode where we have an out of process bridge we cannot correctly populate the Artemis validated user header
* as the TLS does not terminate directly onto Artemis. We therefore use this internal only header to forward
* the equivalent information from the Float.
*/
val bridgedCertificateSubject = SimpleString("sender-subject-name")
object P2PMessagingHeaders {
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
// confusion.
val topicProperty = SimpleString("platform-topic")
val cordaVendorProperty = SimpleString("corda-vendor")
val releaseVersionProperty = SimpleString("release-version")
val platformVersionProperty = SimpleString("platform-version")
val senderUUID = SimpleString("sender-uuid")
val senderSeqNo = SimpleString("send-seq-no")
/**
* In the operation mode where we have an out of process bridge we cannot correctly populate the Artemis validated user header
* as the TLS does not terminate directly onto Artemis. We therefore use this internal only header to forward
* the equivalent information from the Float.
*/
val bridgedCertificateSubject = SimpleString("sender-subject-name")
object Type {
const val KEY = "corda_p2p_message_type"
const val SESSION_INIT_VALUE = "session_init"
}
val whitelistedHeaders: Set<String> = setOf(topicProperty.toString(),
cordaVendorProperty.toString(),
releaseVersionProperty.toString(),
platformVersionProperty.toString(),
senderUUID.toString(),
senderSeqNo.toString(),
bridgedCertificateSubject.toString(),
Type.KEY,
Message.HDR_DUPLICATE_DETECTION_ID.toString(),
Message.HDR_VALIDATED_USER.toString())
}
}
interface ArtemisAddress : MessageRecipients {

View File

@ -10,6 +10,7 @@ import net.corda.core.utilities.debug
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
import net.corda.nodeapi.internal.ArtemisSessionProvider
@ -128,12 +129,14 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, val artemisMessageClientFa
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
val properties = HashMap<Any?, Any?>()
for (key in artemisMessage.propertyNames) {
var value = artemisMessage.getObjectProperty(key)
if (value is SimpleString) {
value = value.toString()
for (key in P2PMessagingHeaders.whitelistedHeaders) {
if (artemisMessage.containsProperty(key)) {
var value = artemisMessage.getObjectProperty(key)
if (value is SimpleString) {
value = value.toString()
}
properties[key] = value
}
properties[key.toString()] = value
}
log.debug { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
val peerInbox = translateLocalQueueToInboxAddress(queueName)

View File

@ -12,6 +12,7 @@ import net.corda.node.services.config.configureWithDevSSLCertificate
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
import net.corda.nodeapi.internal.bridging.BridgeManager
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
@ -56,7 +57,7 @@ class AMQPBridgeTest {
val artemis = artemisClient.started!!
for (i in 0 until 3) {
val artemisMessage = artemis.session.createMessage(true).apply {
putIntProperty("CountProp", i)
putIntProperty(P2PMessagingHeaders.senderUUID, i)
writeBodyBufferBytes("Test$i".toByteArray())
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
@ -80,7 +81,7 @@ class AMQPBridgeTest {
}
val received1 = receive.next()
val messageID1 = received1.applicationProperties["CountProp"] as Int
val messageID1 = received1.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload)
assertEquals(0, messageID1)
dedupeSet += received1.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
@ -89,7 +90,7 @@ class AMQPBridgeTest {
atNodeSequence += messageID1
val received2 = receive.next()
val messageID2 = received2.applicationProperties["CountProp"] as Int
val messageID2 = received2.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload)
assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence))
received2.complete(false) // Reject message and don't add to dedupe
@ -98,7 +99,7 @@ class AMQPBridgeTest {
// drop things until we get back to the replay
while (true) {
val received3 = receive.next()
val messageID3 = received3.applicationProperties["CountProp"] as Int
val messageID3 = received3.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload)
receivedSequence += messageID3
if (messageID3 != 1) { // keep rejecting any batched items following rejection
@ -117,7 +118,7 @@ class AMQPBridgeTest {
// start receiving again, but discarding duplicates
while (true) {
val received4 = receive.next()
val messageID4 = received4.applicationProperties["CountProp"] as Int
val messageID4 = received4.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload)
receivedSequence += messageID4
val messageId = received4.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String
@ -133,7 +134,7 @@ class AMQPBridgeTest {
// Send a fresh item and check receive
val artemisMessage = artemis.session.createMessage(true).apply {
putIntProperty("CountProp", 3)
putIntProperty(P2PMessagingHeaders.senderUUID, 3)
writeBodyBufferBytes("Test3".toByteArray())
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
@ -144,7 +145,7 @@ class AMQPBridgeTest {
// start receiving again, discarding duplicates
while (true) {
val received5 = receive.next()
val messageID5 = received5.applicationProperties["CountProp"] as Int
val messageID5 = received5.applicationProperties[P2PMessagingHeaders.senderUUID.toString()] as Int
assertArrayEquals("Test$messageID5".toByteArray(), received5.payload)
receivedSequence += messageID5
val messageId = received5.applicationProperties[HDR_DUPLICATE_DETECTION_ID.toString()] as String

View File

@ -147,10 +147,3 @@ object TopicStringValidator {
fun check(tag: String) = require(regex.matcher(tag).matches())
}
object P2PMessagingHeaders {
object Type {
const val KEY = "corda_p2p_message_type"
const val SESSION_INIT_VALUE = "session_init"
}
}

View File

@ -30,6 +30,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
@ -97,14 +98,6 @@ class P2PMessagingClient(private val config: NodeConfiguration,
) : SingletonSerializeAsToken(), MessagingService, AutoCloseable {
companion object {
private val log = contextLogger()
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".
// We should probably try to unify our notion of "topic" (really, just a string that identifies an endpoint
// that will handle messages, like a URL) with the terminology used by underlying MQ libraries, to avoid
// confusion.
private val topicProperty = SimpleString("platform-topic")
private val cordaVendorProperty = SimpleString("corda-vendor")
private val releaseVersionProperty = SimpleString("release-version")
private val platformVersionProperty = SimpleString("platform-version")
private val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt()
private const val messageMaxRetryCount: Int = 3
@ -392,9 +385,9 @@ class P2PMessagingClient(private val config: NodeConfiguration,
private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? {
try {
val topic = message.required(topicProperty) { getStringProperty(it) }
val topic = message.required(P2PMessagingHeaders.topicProperty) { getStringProperty(it) }
val user = requireNotNull(message.getStringProperty(HDR_VALIDATED_USER)) { "Message is not authenticated" }
val platformVersion = message.required(platformVersionProperty) { getIntProperty(it) }
val platformVersion = message.required(P2PMessagingHeaders.platformVersionProperty) { getIntProperty(it) }
// Use the magic deduplication property built into Artemis as our message identity too
val uuid = message.required(HDR_DUPLICATE_DETECTION_ID) { message.getStringProperty(it) }
log.info("Received message from: ${message.address} user: $user topic: $topic uuid: $uuid")
@ -523,13 +516,13 @@ class P2PMessagingClient(private val config: NodeConfiguration,
state.locked {
val mqAddress = getMQAddress(target)
val artemisMessage = producerSession!!.createMessage(true).apply {
putStringProperty(cordaVendorProperty, cordaVendor)
putStringProperty(releaseVersionProperty, releaseVersion)
putIntProperty(platformVersionProperty, versionInfo.platformVersion)
putStringProperty(topicProperty, SimpleString(message.topic))
putStringProperty(P2PMessagingHeaders.cordaVendorProperty, cordaVendor)
putStringProperty(P2PMessagingHeaders.releaseVersionProperty, releaseVersion)
putIntProperty(P2PMessagingHeaders.platformVersionProperty, versionInfo.platformVersion)
putStringProperty(P2PMessagingHeaders.topicProperty, SimpleString(message.topic))
writeBodyBufferBytes(message.data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId.toString()))
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(message.uniqueMessageId))
// For demo purposes - if set then add a delay to messages in order to demonstrate that the flows are doing as intended
if (amqDelayMillis > 0 && message.topic == StateMachineManagerImpl.sessionTopic) {

View File

@ -19,12 +19,8 @@ import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.SerializationDefaults.CHECKPOINT_CONTEXT
import net.corda.core.serialization.SerializationDefaults.SERIALIZATION_FACTORY
@ -40,10 +36,10 @@ import net.corda.node.services.api.Checkpoint
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.P2PMessagingHeaders
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.newNamedSingleThreadExecutor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction