diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt index 926dae3e16..18d1a38cd3 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/BridgeIntegrationTest.kt @@ -473,7 +473,7 @@ class BridgeIntegrationTest { artemis.session.createQueue(BRIDGE_NOTIFY, RoutingType.ANYCAST, BRIDGE_NOTIFY, false) val controlConsumer = artemis.session.createConsumer(BRIDGE_NOTIFY) controlConsumer.setMessageHandler { msg -> - val outEntry = listOf(BridgeEntry(dummyOutQueue.toString(), listOf(NetworkHostAndPort("localhost", 7890)), listOf(DUMMY_BANK_A_NAME))) + val outEntry = listOf(BridgeEntry(dummyOutQueue.toString(), listOf(NetworkHostAndPort("localhost", 7890)), listOf(DUMMY_BANK_A_NAME), serviceAddress = false)) val bridgeControl = BridgeControl.NodeToBridgeSnapshot("Test", listOf(inboxAddress.toString()), outEntry) val controlPacket = bridgeControl.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes val artemisMessage = artemis.session.createMessage(false) diff --git a/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt b/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt index fb3b593776..cd81d6a46d 100644 --- a/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt +++ b/bridge/src/integration-test/kotlin/net/corda/bridge/SNIBridgeTest.kt @@ -169,11 +169,15 @@ class SNIBridgeTest { CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { val handle = it.proxy.startFlow(::Ping, c.nodeInfo.singleIdentity(), 5) handle.returnValue.getOrThrow() + // Loopback flow test. + it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 5).returnValue.getOrThrow() } CordaRPCClient(b.rpcAddress).use(demoUser.username, demoUser.password) { val handle = it.proxy.startFlow(::Ping, c.nodeInfo.singleIdentity(), 5) handle.returnValue.getOrThrow() + // Loopback flow test. + it.proxy.startFlow(::Ping, a.nodeInfo.singleIdentity(), 5).returnValue.getOrThrow() } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt index 2b62929211..520962fcea 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/AMQPBridgeManager.kt @@ -42,12 +42,12 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP private val lock = ReentrantLock() private val queueNamesToBridgesMap = mutableMapOf>() - private class AMQPConfigurationImpl (override val keyStore: CertificateStore, - override val trustStore: CertificateStore, - override val socksProxyConfig: SocksProxyConfig?, - override val maxMessageSize: Int, - override val useOpenSsl: Boolean, - override val sourceX500Name: String? = null) : AMQPConfiguration { + private class AMQPConfigurationImpl(override val keyStore: CertificateStore, + override val trustStore: CertificateStore, + override val socksProxyConfig: SocksProxyConfig?, + override val maxMessageSize: Int, + override val useOpenSsl: Boolean, + override val sourceX500Name: String? = null) : AMQPConfiguration { constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.keyStore.get(), config.trustStore.get(), socksProxyConfig, @@ -188,7 +188,6 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP artemisMessage.acknowledge() return } - val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) } val properties = HashMap() for (key in P2PMessagingHeaders.whitelistedHeaders) { if (artemisMessage.containsProperty(key)) { @@ -201,7 +200,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP } logDebugWithMDC { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } val peerInbox = translateLocalQueueToInboxAddress(queueName) - val sendableMessage = amqpClient.createMessage(data, peerInbox, + val sendableMessage = amqpClient.createMessage(artemisMessage.payload(), peerInbox, legalNames.first().toString(), properties) sendableMessage.onComplete.then { @@ -224,7 +223,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP } override fun deployBridge(sourceX500Name: String, queueName: String, targets: List, legalNames: Set) { - val newBridge = lock.withLock { + lock.withLock { val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() } for (target in targets) { if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) { @@ -236,8 +235,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP bridges += newBridge bridgeMetricsService?.bridgeCreated(targets, legalNames) newBridge - } - newBridge.start() + }.start() } override fun destroyBridge(queueName: String, targets: List) { @@ -257,6 +255,14 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP } } + fun destroyAllBridge(queueName: String): Map { + val bridges = queueNamesToBridgesMap[queueName] + destroyBridge(queueName, queueNamesToBridgesMap[queueName]?.flatMap { it.targets } ?: emptyList()) + return bridges?.map { + it.sourceX500Name to BridgeEntry(it.queueName, it.targets, it.legalNames.toList(), serviceAddress = false) + }?.toMap() ?: emptyMap() + } + override fun start() { sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS) val artemis = artemisMessageClientFactory() diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt index 30999ec556..e7fd2a0328 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlListener.kt @@ -1,5 +1,6 @@ package net.corda.nodeapi.internal.bridging +import net.corda.core.identity.CordaX500Name import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize @@ -10,10 +11,11 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress import net.corda.nodeapi.internal.ArtemisSessionProvider +import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException -import net.corda.nodeapi.internal.config.MutualSslConfiguration import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientConsumer @@ -31,20 +33,17 @@ class BridgeControlListener(val config: MutualSslConfiguration, private val bridgeId: String = UUID.randomUUID().toString() private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId" private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId" - private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, - artemisMessageClientFactory, bridgeMetricsService) private val validInboundQueues = mutableSetOf() + private val bridgeManager = LoopbackBridgeManagerWrapper(config, socksProxyConfig, maxMessageSize, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic) private var artemis: ArtemisSessionProvider? = null private var controlConsumer: ClientConsumer? = null private var notifyConsumer: ClientConsumer? = null constructor(config: MutualSslConfiguration, p2pAddress: NetworkHostAndPort, - maxMessageSize: Int, socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) }) - companion object { private val log = contextLogger() } @@ -163,6 +162,8 @@ class BridgeControlListener(val config: MutualSslConfiguration, } val wasActive = active validInboundQueues.addAll(controlMessage.inboxQueues) + log.info("Added inbox: ${controlMessage.inboxQueues}") + bridgeManager.inboxesAdded(controlMessage.inboxQueues) if (!wasActive && active) { _activeChange.onNext(true) } @@ -187,4 +188,53 @@ class BridgeControlListener(val config: MutualSslConfiguration, } } + private class LoopbackBridgeManagerWrapper(config: MutualSslConfiguration, + socksProxyConfig: SocksProxyConfig? = null, + maxMessageSize: Int, + artemisMessageClientFactory: () -> ArtemisSessionProvider, + bridgeMetricsService: BridgeMetricsService? = null, + private val isLocalInbox: (String) -> Boolean) : BridgeManager { + + private val bridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, artemisMessageClientFactory, bridgeMetricsService) + private val loopbackBridgeManager = LoopbackBridgeManager(artemisMessageClientFactory, bridgeMetricsService) + + override fun deployBridge(sourceX500Name: String, queueName: String, targets: List, legalNames: Set) { + val inboxAddress = translateLocalQueueToInboxAddress(queueName) + if (isLocalInbox(inboxAddress)) { + log.info("Deploying loopback bridge for $queueName, source $sourceX500Name") + loopbackBridgeManager.deployBridge(sourceX500Name, queueName, targets, legalNames) + } else { + log.info("Deploying AMQP bridge for $queueName, source $sourceX500Name") + bridgeManager.deployBridge(sourceX500Name, queueName, targets, legalNames) + } + } + + override fun destroyBridge(queueName: String, targets: List) { + bridgeManager.destroyBridge(queueName, targets) + loopbackBridgeManager.destroyBridge(queueName, targets) + } + + override fun start() { + bridgeManager.start() + loopbackBridgeManager.start() + } + + override fun stop() { + bridgeManager.stop() + loopbackBridgeManager.stop() + } + + override fun close() = stop() + + /** + * Remove any AMQP bridge for the local inbox and create a loopback bridge for that queue. + */ + fun inboxesAdded(inboxes: List) { + for (inbox in inboxes) { + bridgeManager.destroyAllBridge(inbox).forEach { source, bridgeEntry -> + loopbackBridgeManager.deployBridge(source, bridgeEntry.queueName, bridgeEntry.targets, bridgeEntry.legalNames.toSet()) + } + } + } + } } \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlMessages.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlMessages.kt index 6a2f30bcd4..fe8486e82e 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlMessages.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeControlMessages.kt @@ -11,7 +11,7 @@ import net.corda.core.utilities.NetworkHostAndPort * @property legalNames The list of acceptable [CordaX500Name] names that should be presented as subject of the validated peer TLS certificate. */ @CordaSerializable -data class BridgeEntry(val queueName: String, val targets: List, val legalNames: List) +data class BridgeEntry(val queueName: String, val targets: List, val legalNames: List, val serviceAddress: Boolean) sealed class BridgeControl { /** diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeManager.kt index 322dc72f86..7bf1f150da 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeManager.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/BridgeManager.kt @@ -3,6 +3,7 @@ package net.corda.nodeapi.internal.bridging import net.corda.core.identity.CordaX500Name import net.corda.core.internal.VisibleForTesting import net.corda.core.utilities.NetworkHostAndPort +import org.apache.activemq.artemis.api.core.client.ClientMessage /** * Provides an internal interface that the [BridgeControlListener] delegates to for Bridge activities. @@ -16,4 +17,6 @@ interface BridgeManager : AutoCloseable { fun start() fun stop() -} \ No newline at end of file +} + +fun ClientMessage.payload() = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) } \ No newline at end of file diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt new file mode 100644 index 0000000000..3ca3e3f5e6 --- /dev/null +++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/bridging/LoopbackBridgeManager.kt @@ -0,0 +1,169 @@ +package net.corda.nodeapi.internal.bridging + +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.ConcurrentBox +import net.corda.core.internal.VisibleForTesting +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.nodeapi.internal.ArtemisMessagingComponent +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER +import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress +import net.corda.nodeapi.internal.ArtemisSessionProvider +import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl +import org.apache.activemq.artemis.api.core.SimpleString +import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.slf4j.MDC + +/** + * The LoopbackBridgeManager holds the list of independent LoopbackBridge objects that actively loopback messages to local Artemis + * inboxes. + */ +@VisibleForTesting +class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> ArtemisSessionProvider, + private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager { + + private val queueNamesToBridgesMap = ConcurrentBox(mutableMapOf>()) + private var artemis: ArtemisSessionProvider? = null + + /** + * Each LoopbackBridge is an independent consumer of messages from the Artemis local queue per designated endpoint. + * It attempts to loopback these messages via ArtemisClient to the local inbox. + */ + private class LoopbackBridge(val sourceX500Name: String, + val queueName: String, + val targets: List, + val legalNames: Set, + artemis: ArtemisSessionProvider, + private val bridgeMetricsService: BridgeMetricsService?) { + companion object { + private val log = contextLogger() + } + + // TODO: refactor MDC support, duplicated in AMQPBridgeManager. + private fun withMDC(block: () -> Unit) { + val oldMDC = MDC.getCopyOfContextMap() + try { + MDC.put("queueName", queueName) + MDC.put("source", sourceX500Name) + MDC.put("targets", targets.joinToString(separator = ";") { it.toString() }) + MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() }) + MDC.put("bridgeType", "loopback") + block() + } finally { + MDC.setContextMap(oldMDC) + } + } + + private fun logDebugWithMDC(msg: () -> String) { + if (log.isDebugEnabled) { + withMDC { log.debug(msg()) } + } + } + + private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) } + + private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) } + + private val artemis = ConcurrentBox(artemis) + private var session: ClientSession? = null + private var consumer: ClientConsumer? = null + private var producer: ClientProducer? = null + + fun start() { + logInfoWithMDC("Create new Artemis loopback bridge") + artemis.exclusive { + logInfoWithMDC("Bridge Connected") + bridgeMetricsService?.bridgeConnected(targets, legalNames) + val sessionFactory = started!!.sessionFactory + val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) + this@LoopbackBridge.session = session + // Several producers (in the case of shared bridge) can put messages in the same outbound p2p queue. The consumers are created using the source x500 name as a filter + val consumer = session.createConsumer(queueName, "hyphenated_props:sender-subject-name = '$sourceX500Name'") + this@LoopbackBridge.consumer = consumer + consumer.setMessageHandler(this@LoopbackBridge::clientArtemisMessageHandler) + this@LoopbackBridge.producer = session.createProducer() + session.start() + } + } + + fun stop() { + logInfoWithMDC("Stopping AMQP bridge") + artemis.exclusive { + consumer?.apply { if (!isClosed) close() } + consumer = null + session?.apply { if (!isClosed) stop() } + session = null + } + } + + private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { + logDebugWithMDC { "Loopback Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" } + val peerInbox = translateLocalQueueToInboxAddress(queueName) + producer?.send(SimpleString(peerInbox), artemisMessage) { artemisMessage.acknowledge() } + bridgeMetricsService?.let { metricsService -> + val properties = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.mapNotNull { key -> + if (artemisMessage.containsProperty(key)) { + key to artemisMessage.getObjectProperty(key).let { (it as? SimpleString)?.toString() ?: it } + } else { + null + } + }.toMap() + metricsService.packetAcceptedEvent(SendableMessageImpl(artemisMessage.payload(), peerInbox, legalNames.first().toString(), targets.first(), properties)) + } + } + } + + override fun deployBridge(sourceX500Name: String, queueName: String, targets: List, legalNames: Set) { + queueNamesToBridgesMap.exclusive { + val bridges = getOrPut(queueName) { mutableListOf() } + for (target in targets) { + if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) { + return + } + } + val newBridge = LoopbackBridge(sourceX500Name, queueName, targets, legalNames, artemis!!, bridgeMetricsService) + bridges += newBridge + bridgeMetricsService?.bridgeCreated(targets, legalNames) + newBridge + }.start() + } + + override fun destroyBridge(queueName: String, targets: List) { + queueNamesToBridgesMap.exclusive { + val bridges = this[queueName] ?: mutableListOf() + for (target in targets) { + val bridge = bridges.firstOrNull { it.targets.contains(target) } + if (bridge != null) { + bridges -= bridge + if (bridges.isEmpty()) { + remove(queueName) + } + bridge.stop() + bridgeMetricsService?.bridgeDestroyed(bridge.targets, bridge.legalNames) + } + } + } + } + + override fun start() { + val artemis = artemisMessageClientFactory() + this.artemis = artemis + artemis.start() + } + + override fun stop() = close() + + override fun close() { + queueNamesToBridgesMap.exclusive { + for (bridge in values.flatten()) { + bridge.stop() + } + clear() + artemis?.stop() + } + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt new file mode 100644 index 0000000000..7d76717db0 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/LoopbackBridgeTest.kt @@ -0,0 +1,218 @@ +package net.corda.node.amqp + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.whenever +import net.corda.core.crypto.toStringShort +import net.corda.core.internal.div +import net.corda.core.utilities.loggerFor +import net.corda.node.services.config.EnterpriseConfiguration +import net.corda.node.services.config.MutualExclusionConfiguration +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.configureWithDevSSLCertificate +import net.corda.node.services.messaging.ArtemisMessagingServer +import net.corda.nodeapi.internal.ArtemisMessagingClient +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders +import net.corda.nodeapi.internal.bridging.BridgeManager +import net.corda.nodeapi.internal.bridging.LoopbackBridgeManager +import net.corda.nodeapi.internal.bridging.payload +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.MAX_MESSAGE_SIZE +import net.corda.testing.core.TestIdentity +import net.corda.testing.driver.PortAllocation +import net.corda.testing.internal.rigorousMock +import net.corda.testing.internal.stubs.CertificateStoreStubs +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration +import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID +import org.apache.activemq.artemis.api.core.RoutingType +import org.apache.activemq.artemis.api.core.SimpleString +import org.junit.Assert.assertArrayEquals +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import java.util.* +import kotlin.test.assertEquals + +@RunWith(Parameterized::class) +class LoopbackBridgeTest(private val useOpenSsl: Boolean) { + companion object { + @JvmStatic + @Parameterized.Parameters(name = "useOpenSsl = {0}") + fun data(): Collection = listOf(false, true) + } + + @Rule + @JvmField + val temporaryFolder = TemporaryFolder() + private val log = loggerFor() + private val BOB = TestIdentity(BOB_NAME) + private val portAllocation = PortAllocation.Incremental(10000) + private val artemisAddress = portAllocation.nextHostAndPort() + private val amqpAddress = portAllocation.nextHostAndPort() + + private abstract class AbstractNodeConfiguration : NodeConfiguration + + @Test + fun `test acked and nacked messages`() { + // Create local queue + val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort() + val (artemisServer, artemisClient, bridgeManager) = createArtemis(sourceQueueName) + val artemis = artemisClient.started!! + + //Create target artemis inbox + val queueName = "p2p.inbound.${BOB.publicKey.toStringShort()}" + artemis.session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false, + ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), + ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), true, null) + + // Pre-populate local queue with 3 messages + for (i in 0 until 3) { + val artemisMessage = artemis.session.createMessage(true).apply { + putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, ALICE_NAME.toString()) + putIntProperty(P2PMessagingHeaders.senderUUID, i) + writeBodyBufferBytes("Test$i".toByteArray()) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } + artemis.producer.send(sourceQueueName, artemisMessage) + } + + var consumer = artemis.session.createConsumer(queueName) + + val dedupeSet = mutableSetOf() + val receivedSequence = mutableListOf() + val atNodeSequence = mutableListOf() + + fun formatMessage(expected: String, actual: Int, received: List): String { + return "Expected message with id $expected, got $actual, previous message receive sequence: $received." + } + + val received1 = consumer!!.receive(1000) + val messageID1 = received1.getIntProperty(P2PMessagingHeaders.senderUUID) + assertArrayEquals("Test$messageID1".toByteArray(), received1.payload()) + assertEquals(0, messageID1) + dedupeSet += received1.getStringProperty(HDR_DUPLICATE_DETECTION_ID) + received1.acknowledge() // Accept first message + receivedSequence += messageID1 + atNodeSequence += messageID1 + + val received2 = consumer.receive(1000) + val messageID2 = received2.getIntProperty(P2PMessagingHeaders.senderUUID) + assertArrayEquals("Test$messageID2".toByteArray(), received2.payload()) + assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence)) + consumer.close() // Reject message and don't add to dedupe + consumer = artemis.session.createConsumer(queueName) + receivedSequence += messageID2 // reflects actual sequence + + // drop things until we get back to the replay + while (true) { + val received3 = consumer!!.receive(1000) + val messageID3 = received3.getIntProperty(P2PMessagingHeaders.senderUUID) + assertArrayEquals("Test$messageID3".toByteArray(), received3.payload()) + receivedSequence += messageID3 + if (messageID3 != 1) { // keep rejecting any batched items following rejection + consumer.close() // Reject message and don't add to dedupe + consumer = artemis.session.createConsumer(queueName) + } else { // beginnings of replay so accept again + received3.acknowledge() + val messageId = received3.getStringProperty(HDR_DUPLICATE_DETECTION_ID) + if (messageId !in dedupeSet) { + dedupeSet += messageId + atNodeSequence += messageID3 + } + break + } + } + + // start receiving again, but discarding duplicates + while (true) { + val received4 = consumer!!.receive(1000) + val messageID4 = received4.getIntProperty(P2PMessagingHeaders.senderUUID) + assertArrayEquals("Test$messageID4".toByteArray(), received4.payload()) + receivedSequence += messageID4 + val messageId = received4.getStringProperty(HDR_DUPLICATE_DETECTION_ID) + if (messageId !in dedupeSet) { + dedupeSet += messageId + atNodeSequence += messageID4 + } + received4.acknowledge() + if (messageID4 == 2) { // started to replay messages after rejection point + break + } + } + + // Send a fresh item and check receive + val artemisMessage = artemis.session.createMessage(true).apply { + putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, ALICE_NAME.toString()) + putIntProperty(P2PMessagingHeaders.senderUUID, 3) + writeBodyBufferBytes("Test3".toByteArray()) + // Use the magic deduplication property built into Artemis as our message identity too + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + } + artemis.producer.send(sourceQueueName, artemisMessage) + + + // start receiving again, discarding duplicates + while (true) { + val received5 = consumer.receive(1000) + val messageID5 = received5.getIntProperty(P2PMessagingHeaders.senderUUID) + assertArrayEquals("Test$messageID5".toByteArray(), received5.payload()) + receivedSequence += messageID5 + val messageId = received5.getStringProperty(HDR_DUPLICATE_DETECTION_ID) + if (messageId !in dedupeSet) { + dedupeSet += messageId + atNodeSequence += messageID5 + } + received5.acknowledge() + if (messageID5 == 3) { // reached our fresh message + break + } + } + + log.info("Message sequence: $receivedSequence") + log.info("Deduped sequence: $atNodeSequence") + assertEquals(listOf(0, 1, 2, 3), atNodeSequence) + consumer.close() + bridgeManager.stop() + artemisClient.stop() + artemisServer.stop() + } + + private fun createArtemis(sourceQueueName: String?): Triple { + val baseDir = temporaryFolder.root.toPath() / "artemis" + val certificatesDirectory = baseDir / "certificates" + val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory, useOpenSsl = useOpenSsl) + val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory) + val artemisConfig = rigorousMock().also { + doReturn(baseDir).whenever(it).baseDirectory + doReturn(ALICE_NAME).whenever(it).myLegalName + doReturn(certificatesDirectory).whenever(it).certificatesDirectory + doReturn(signingCertificateStore).whenever(it).signingCertificateStore + doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions + doReturn(true).whenever(it).crlCheckSoftFail + doReturn(artemisAddress).whenever(it).p2pAddress + doReturn(null).whenever(it).jmxMonitoringHttpPort + doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration + } + artemisConfig.configureWithDevSSLCertificate() + + val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE) + + val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize) + + artemisServer.start() + artemisClient.start() + val bridgeManager = LoopbackBridgeManager({ ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize) }) + bridgeManager.start() + + val artemis = artemisClient.started!! + if (sourceQueueName != null) { + // Local queue for outgoing messages + artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true) + bridgeManager.deployBridge(ALICE_NAME.toString(), sourceQueueName, listOf(amqpAddress), setOf(BOB.name)) + } + return Triple(artemisServer, artemisClient, bridgeManager) + } +} \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index b2d0a0f290..a400d4c324 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -31,7 +31,7 @@ import java.util.* import java.util.concurrent.atomic.AtomicBoolean class P2PMessagingTest : IntegrationTest() { - private companion object { + private companion object { @ClassRule @JvmField val databaseSchemas = IntegrationTestSchemas(ALICE_NAME.toDatabaseSchemaName(), "DistributedService_0", "DistributedService_1") @@ -41,18 +41,19 @@ class P2PMessagingTest : IntegrationTest() { @Test fun `communicating with a distributed service which we're part of`() { - startDriverWithDistributedService { distributedService -> - assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, distributedService[0]) + startDriverWithDistributedService { originatingNode, distributedService -> + assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, originatingNode) } } - private fun startDriverWithDistributedService(dsl: DriverDSL.(List) -> Unit) { + private fun startDriverWithDistributedService(dsl: DriverDSL.(InProcess, List) -> Unit) { driver(DriverParameters( startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.notary.raft"), notarySpecs = listOf(NotarySpec(DISTRIBUTED_SERVICE_NAME, cluster = ClusterSpec.Raft(clusterSize = 2))) )) { - dsl(defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as InProcess) }) + val originatingNode = startNode().get() as InProcess + dsl(originatingNode, defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as InProcess) }) } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index e34996c9aa..b75e376e18 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -5,6 +5,7 @@ import com.codahale.metrics.Clock import com.codahale.metrics.MetricRegistry import net.corda.core.crypto.toStringShort import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party import net.corda.core.internal.ThreadBox import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients @@ -35,7 +36,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.bridging.BridgeControl @@ -225,7 +225,7 @@ class P2PMessagingClient(val config: NodeConfiguration, inboxes += RemoteInboxAddress(it).queueName } - inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true) } + inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true, isServiceAddress = false) } p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents) @@ -282,10 +282,10 @@ class P2PMessagingClient(val config: NodeConfiguration, log.info("Updating bridges on network map change: ${change.node}") fun gatherAddresses(node: NodeInfo): Sequence { return state.locked { - node.legalIdentitiesAndCerts.map { + node.legalIdentitiesAndCerts.asSequence().map { val messagingAddress = NodeAddress(it.party.owningKey) - BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name }) - }.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence() + BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map(Party::name), serviceAddress = false) + }.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists } } } @@ -323,7 +323,7 @@ class P2PMessagingClient(val config: NodeConfiguration, val keyHash = queueName.substring(PEERS_PREFIX.length) val peers = networkMap.getNodesByOwningKeyIndex(keyHash) for (node in peers) { - val bridge = BridgeEntry(queueName.toString(), node.addresses, node.legalIdentities.map { it.name }) + val bridge = BridgeEntry(queueName.toString(), node.addresses, node.legalIdentities.map { it.name }, serviceAddress = false) requiredBridges += bridge knownQueues += queueName.toString() } @@ -377,7 +377,8 @@ class P2PMessagingClient(val config: NodeConfiguration, requireMessageSize(message.bodySize, maxMessageSize) val topic = message.required(P2PMessagingHeaders.topicProperty) { getStringProperty(it) } val user = requireNotNull(if (externalBridge) { - message.getStringProperty(P2PMessagingHeaders.bridgedCertificateSubject) ?: message.getStringProperty(HDR_VALIDATED_USER) + message.getStringProperty(P2PMessagingHeaders.bridgedCertificateSubject) + ?: message.getStringProperty(HDR_VALIDATED_USER) } else { message.getStringProperty(HDR_VALIDATED_USER) }) { "Message is not authenticated" } @@ -562,19 +563,20 @@ class P2PMessagingClient(val config: NodeConfiguration, val internalTargetQueue = (address as? ArtemisAddress)?.queueName ?: throw IllegalArgumentException("Not an Artemis address") state.locked { - createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = false) + createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = false, isServiceAddress = address is ServiceAddress) } internalTargetQueue } } /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ - private fun createQueueIfAbsent(queueName: String, session: ClientSession, exclusive: Boolean) { + private fun createQueueIfAbsent(queueName: String, session: ClientSession, exclusive: Boolean, isServiceAddress: Boolean) { fun sendBridgeCreateMessage() { val keyHash = queueName.substring(PEERS_PREFIX.length) val peers = networkMap.getNodesByOwningKeyIndex(keyHash) for (node in peers) { - val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name }) + + val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name }, isServiceAddress) val createBridgeMessage = BridgeControl.Create(config.myLegalName.toString(), bridge) sendBridgeControl(createBridgeMessage) }