mirror of
https://github.com/corda/corda.git
synced 2025-04-19 08:36:39 +00:00
ENT-2573 Add feedback logic for messages (#1475)
* Message loopback for nodes sharing the same bridge * address PR issue
This commit is contained in:
parent
39878e1966
commit
873b1f2fcd
@ -473,7 +473,7 @@ class BridgeIntegrationTest {
|
||||
artemis.session.createQueue(BRIDGE_NOTIFY, RoutingType.ANYCAST, BRIDGE_NOTIFY, false)
|
||||
val controlConsumer = artemis.session.createConsumer(BRIDGE_NOTIFY)
|
||||
controlConsumer.setMessageHandler { msg ->
|
||||
val outEntry = listOf(BridgeEntry(dummyOutQueue.toString(), listOf(NetworkHostAndPort("localhost", 7890)), listOf(DUMMY_BANK_A_NAME)))
|
||||
val outEntry = listOf(BridgeEntry(dummyOutQueue.toString(), listOf(NetworkHostAndPort("localhost", 7890)), listOf(DUMMY_BANK_A_NAME), serviceAddress = false))
|
||||
val bridgeControl = BridgeControl.NodeToBridgeSnapshot("Test", listOf(inboxAddress.toString()), outEntry)
|
||||
val controlPacket = bridgeControl.serialize(context = SerializationDefaults.P2P_CONTEXT).bytes
|
||||
val artemisMessage = artemis.session.createMessage(false)
|
||||
|
@ -169,11 +169,15 @@ class SNIBridgeTest {
|
||||
CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) {
|
||||
val handle = it.proxy.startFlow(::Ping, c.nodeInfo.singleIdentity(), 5)
|
||||
handle.returnValue.getOrThrow()
|
||||
// Loopback flow test.
|
||||
it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 5).returnValue.getOrThrow()
|
||||
}
|
||||
|
||||
CordaRPCClient(b.rpcAddress).use(demoUser.username, demoUser.password) {
|
||||
val handle = it.proxy.startFlow(::Ping, c.nodeInfo.singleIdentity(), 5)
|
||||
handle.returnValue.getOrThrow()
|
||||
// Loopback flow test.
|
||||
it.proxy.startFlow(::Ping, a.nodeInfo.singleIdentity(), 5).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -42,12 +42,12 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
private val lock = ReentrantLock()
|
||||
private val queueNamesToBridgesMap = mutableMapOf<String, MutableList<AMQPBridge>>()
|
||||
|
||||
private class AMQPConfigurationImpl (override val keyStore: CertificateStore,
|
||||
override val trustStore: CertificateStore,
|
||||
override val socksProxyConfig: SocksProxyConfig?,
|
||||
override val maxMessageSize: Int,
|
||||
override val useOpenSsl: Boolean,
|
||||
override val sourceX500Name: String? = null) : AMQPConfiguration {
|
||||
private class AMQPConfigurationImpl(override val keyStore: CertificateStore,
|
||||
override val trustStore: CertificateStore,
|
||||
override val socksProxyConfig: SocksProxyConfig?,
|
||||
override val maxMessageSize: Int,
|
||||
override val useOpenSsl: Boolean,
|
||||
override val sourceX500Name: String? = null) : AMQPConfiguration {
|
||||
constructor(config: MutualSslConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.keyStore.get(),
|
||||
config.trustStore.get(),
|
||||
socksProxyConfig,
|
||||
@ -188,7 +188,6 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
artemisMessage.acknowledge()
|
||||
return
|
||||
}
|
||||
val data = ByteArray(artemisMessage.bodySize).apply { artemisMessage.bodyBuffer.readBytes(this) }
|
||||
val properties = HashMap<String, Any?>()
|
||||
for (key in P2PMessagingHeaders.whitelistedHeaders) {
|
||||
if (artemisMessage.containsProperty(key)) {
|
||||
@ -201,7 +200,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
}
|
||||
logDebugWithMDC { "Bridged Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
val sendableMessage = amqpClient.createMessage(data, peerInbox,
|
||||
val sendableMessage = amqpClient.createMessage(artemisMessage.payload(), peerInbox,
|
||||
legalNames.first().toString(),
|
||||
properties)
|
||||
sendableMessage.onComplete.then {
|
||||
@ -224,7 +223,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
}
|
||||
|
||||
override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
|
||||
val newBridge = lock.withLock {
|
||||
lock.withLock {
|
||||
val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() }
|
||||
for (target in targets) {
|
||||
if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) {
|
||||
@ -236,8 +235,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
bridges += newBridge
|
||||
bridgeMetricsService?.bridgeCreated(targets, legalNames)
|
||||
newBridge
|
||||
}
|
||||
newBridge.start()
|
||||
}.start()
|
||||
}
|
||||
|
||||
override fun destroyBridge(queueName: String, targets: List<NetworkHostAndPort>) {
|
||||
@ -257,6 +255,14 @@ class AMQPBridgeManager(config: MutualSslConfiguration, socksProxyConfig: SocksP
|
||||
}
|
||||
}
|
||||
|
||||
fun destroyAllBridge(queueName: String): Map<String, BridgeEntry> {
|
||||
val bridges = queueNamesToBridgesMap[queueName]
|
||||
destroyBridge(queueName, queueNamesToBridgesMap[queueName]?.flatMap { it.targets } ?: emptyList())
|
||||
return bridges?.map {
|
||||
it.sourceX500Name to BridgeEntry(it.queueName, it.targets, it.legalNames.toList(), serviceAddress = false)
|
||||
}?.toMap() ?: emptyMap()
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS)
|
||||
val artemis = artemisMessageClientFactory()
|
||||
|
@ -1,5 +1,6 @@
|
||||
package net.corda.nodeapi.internal.bridging
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
@ -10,10 +11,11 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
@ -31,20 +33,17 @@ class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
private val bridgeId: String = UUID.randomUUID().toString()
|
||||
private val bridgeControlQueue = "$BRIDGE_CONTROL.$bridgeId"
|
||||
private val bridgeNotifyQueue = "$BRIDGE_NOTIFY.$bridgeId"
|
||||
private val bridgeManager: BridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize,
|
||||
artemisMessageClientFactory, bridgeMetricsService)
|
||||
private val validInboundQueues = mutableSetOf<String>()
|
||||
private val bridgeManager = LoopbackBridgeManagerWrapper(config, socksProxyConfig, maxMessageSize, artemisMessageClientFactory, bridgeMetricsService, this::validateReceiveTopic)
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
private var controlConsumer: ClientConsumer? = null
|
||||
private var notifyConsumer: ClientConsumer? = null
|
||||
|
||||
constructor(config: MutualSslConfiguration,
|
||||
p2pAddress: NetworkHostAndPort,
|
||||
|
||||
maxMessageSize: Int,
|
||||
socksProxy: SocksProxyConfig? = null) : this(config, socksProxy, maxMessageSize, { ArtemisMessagingClient(config, p2pAddress, maxMessageSize) })
|
||||
|
||||
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
@ -163,6 +162,8 @@ class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
}
|
||||
val wasActive = active
|
||||
validInboundQueues.addAll(controlMessage.inboxQueues)
|
||||
log.info("Added inbox: ${controlMessage.inboxQueues}")
|
||||
bridgeManager.inboxesAdded(controlMessage.inboxQueues)
|
||||
if (!wasActive && active) {
|
||||
_activeChange.onNext(true)
|
||||
}
|
||||
@ -187,4 +188,53 @@ class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
}
|
||||
}
|
||||
|
||||
private class LoopbackBridgeManagerWrapper(config: MutualSslConfiguration,
|
||||
socksProxyConfig: SocksProxyConfig? = null,
|
||||
maxMessageSize: Int,
|
||||
artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
bridgeMetricsService: BridgeMetricsService? = null,
|
||||
private val isLocalInbox: (String) -> Boolean) : BridgeManager {
|
||||
|
||||
private val bridgeManager = AMQPBridgeManager(config, socksProxyConfig, maxMessageSize, artemisMessageClientFactory, bridgeMetricsService)
|
||||
private val loopbackBridgeManager = LoopbackBridgeManager(artemisMessageClientFactory, bridgeMetricsService)
|
||||
|
||||
override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
|
||||
val inboxAddress = translateLocalQueueToInboxAddress(queueName)
|
||||
if (isLocalInbox(inboxAddress)) {
|
||||
log.info("Deploying loopback bridge for $queueName, source $sourceX500Name")
|
||||
loopbackBridgeManager.deployBridge(sourceX500Name, queueName, targets, legalNames)
|
||||
} else {
|
||||
log.info("Deploying AMQP bridge for $queueName, source $sourceX500Name")
|
||||
bridgeManager.deployBridge(sourceX500Name, queueName, targets, legalNames)
|
||||
}
|
||||
}
|
||||
|
||||
override fun destroyBridge(queueName: String, targets: List<NetworkHostAndPort>) {
|
||||
bridgeManager.destroyBridge(queueName, targets)
|
||||
loopbackBridgeManager.destroyBridge(queueName, targets)
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
bridgeManager.start()
|
||||
loopbackBridgeManager.start()
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
bridgeManager.stop()
|
||||
loopbackBridgeManager.stop()
|
||||
}
|
||||
|
||||
override fun close() = stop()
|
||||
|
||||
/**
|
||||
* Remove any AMQP bridge for the local inbox and create a loopback bridge for that queue.
|
||||
*/
|
||||
fun inboxesAdded(inboxes: List<String>) {
|
||||
for (inbox in inboxes) {
|
||||
bridgeManager.destroyAllBridge(inbox).forEach { source, bridgeEntry ->
|
||||
loopbackBridgeManager.deployBridge(source, bridgeEntry.queueName, bridgeEntry.targets, bridgeEntry.legalNames.toSet())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -11,7 +11,7 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
* @property legalNames The list of acceptable [CordaX500Name] names that should be presented as subject of the validated peer TLS certificate.
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class BridgeEntry(val queueName: String, val targets: List<NetworkHostAndPort>, val legalNames: List<CordaX500Name>)
|
||||
data class BridgeEntry(val queueName: String, val targets: List<NetworkHostAndPort>, val legalNames: List<CordaX500Name>, val serviceAddress: Boolean)
|
||||
|
||||
sealed class BridgeControl {
|
||||
/**
|
||||
|
@ -3,6 +3,7 @@ package net.corda.nodeapi.internal.bridging
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
|
||||
/**
|
||||
* Provides an internal interface that the [BridgeControlListener] delegates to for Bridge activities.
|
||||
@ -16,4 +17,6 @@ interface BridgeManager : AutoCloseable {
|
||||
fun start()
|
||||
|
||||
fun stop()
|
||||
}
|
||||
}
|
||||
|
||||
fun ClientMessage.payload() = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }
|
@ -0,0 +1,169 @@
|
||||
package net.corda.nodeapi.internal.bridging
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.ConcurrentBox
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImpl
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.slf4j.MDC
|
||||
|
||||
/**
|
||||
* The LoopbackBridgeManager holds the list of independent LoopbackBridge objects that actively loopback messages to local Artemis
|
||||
* inboxes.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
class LoopbackBridgeManager(private val artemisMessageClientFactory: () -> ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService? = null) : BridgeManager {
|
||||
|
||||
private val queueNamesToBridgesMap = ConcurrentBox(mutableMapOf<String, MutableList<LoopbackBridge>>())
|
||||
private var artemis: ArtemisSessionProvider? = null
|
||||
|
||||
/**
|
||||
* Each LoopbackBridge is an independent consumer of messages from the Artemis local queue per designated endpoint.
|
||||
* It attempts to loopback these messages via ArtemisClient to the local inbox.
|
||||
*/
|
||||
private class LoopbackBridge(val sourceX500Name: String,
|
||||
val queueName: String,
|
||||
val targets: List<NetworkHostAndPort>,
|
||||
val legalNames: Set<CordaX500Name>,
|
||||
artemis: ArtemisSessionProvider,
|
||||
private val bridgeMetricsService: BridgeMetricsService?) {
|
||||
companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
// TODO: refactor MDC support, duplicated in AMQPBridgeManager.
|
||||
private fun withMDC(block: () -> Unit) {
|
||||
val oldMDC = MDC.getCopyOfContextMap()
|
||||
try {
|
||||
MDC.put("queueName", queueName)
|
||||
MDC.put("source", sourceX500Name)
|
||||
MDC.put("targets", targets.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("bridgeType", "loopback")
|
||||
block()
|
||||
} finally {
|
||||
MDC.setContextMap(oldMDC)
|
||||
}
|
||||
}
|
||||
|
||||
private fun logDebugWithMDC(msg: () -> String) {
|
||||
if (log.isDebugEnabled) {
|
||||
withMDC { log.debug(msg()) }
|
||||
}
|
||||
}
|
||||
|
||||
private fun logInfoWithMDC(msg: String) = withMDC { log.info(msg) }
|
||||
|
||||
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||
|
||||
private val artemis = ConcurrentBox(artemis)
|
||||
private var session: ClientSession? = null
|
||||
private var consumer: ClientConsumer? = null
|
||||
private var producer: ClientProducer? = null
|
||||
|
||||
fun start() {
|
||||
logInfoWithMDC("Create new Artemis loopback bridge")
|
||||
artemis.exclusive {
|
||||
logInfoWithMDC("Bridge Connected")
|
||||
bridgeMetricsService?.bridgeConnected(targets, legalNames)
|
||||
val sessionFactory = started!!.sessionFactory
|
||||
val session = sessionFactory.createSession(NODE_P2P_USER, NODE_P2P_USER, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
this@LoopbackBridge.session = session
|
||||
// Several producers (in the case of shared bridge) can put messages in the same outbound p2p queue. The consumers are created using the source x500 name as a filter
|
||||
val consumer = session.createConsumer(queueName, "hyphenated_props:sender-subject-name = '$sourceX500Name'")
|
||||
this@LoopbackBridge.consumer = consumer
|
||||
consumer.setMessageHandler(this@LoopbackBridge::clientArtemisMessageHandler)
|
||||
this@LoopbackBridge.producer = session.createProducer()
|
||||
session.start()
|
||||
}
|
||||
}
|
||||
|
||||
fun stop() {
|
||||
logInfoWithMDC("Stopping AMQP bridge")
|
||||
artemis.exclusive {
|
||||
consumer?.apply { if (!isClosed) close() }
|
||||
consumer = null
|
||||
session?.apply { if (!isClosed) stop() }
|
||||
session = null
|
||||
}
|
||||
}
|
||||
|
||||
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
|
||||
logDebugWithMDC { "Loopback Send to ${legalNames.first()} uuid: ${artemisMessage.getObjectProperty("_AMQ_DUPL_ID")}" }
|
||||
val peerInbox = translateLocalQueueToInboxAddress(queueName)
|
||||
producer?.send(SimpleString(peerInbox), artemisMessage) { artemisMessage.acknowledge() }
|
||||
bridgeMetricsService?.let { metricsService ->
|
||||
val properties = ArtemisMessagingComponent.Companion.P2PMessagingHeaders.whitelistedHeaders.mapNotNull { key ->
|
||||
if (artemisMessage.containsProperty(key)) {
|
||||
key to artemisMessage.getObjectProperty(key).let { (it as? SimpleString)?.toString() ?: it }
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}.toMap()
|
||||
metricsService.packetAcceptedEvent(SendableMessageImpl(artemisMessage.payload(), peerInbox, legalNames.first().toString(), targets.first(), properties))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun deployBridge(sourceX500Name: String, queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
|
||||
queueNamesToBridgesMap.exclusive {
|
||||
val bridges = getOrPut(queueName) { mutableListOf() }
|
||||
for (target in targets) {
|
||||
if (bridges.any { it.targets.contains(target) && it.sourceX500Name == sourceX500Name }) {
|
||||
return
|
||||
}
|
||||
}
|
||||
val newBridge = LoopbackBridge(sourceX500Name, queueName, targets, legalNames, artemis!!, bridgeMetricsService)
|
||||
bridges += newBridge
|
||||
bridgeMetricsService?.bridgeCreated(targets, legalNames)
|
||||
newBridge
|
||||
}.start()
|
||||
}
|
||||
|
||||
override fun destroyBridge(queueName: String, targets: List<NetworkHostAndPort>) {
|
||||
queueNamesToBridgesMap.exclusive {
|
||||
val bridges = this[queueName] ?: mutableListOf()
|
||||
for (target in targets) {
|
||||
val bridge = bridges.firstOrNull { it.targets.contains(target) }
|
||||
if (bridge != null) {
|
||||
bridges -= bridge
|
||||
if (bridges.isEmpty()) {
|
||||
remove(queueName)
|
||||
}
|
||||
bridge.stop()
|
||||
bridgeMetricsService?.bridgeDestroyed(bridge.targets, bridge.legalNames)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
val artemis = artemisMessageClientFactory()
|
||||
this.artemis = artemis
|
||||
artemis.start()
|
||||
}
|
||||
|
||||
override fun stop() = close()
|
||||
|
||||
override fun close() {
|
||||
queueNamesToBridgesMap.exclusive {
|
||||
for (bridge in values.flatten()) {
|
||||
bridge.stop()
|
||||
}
|
||||
clear()
|
||||
artemis?.stop()
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,218 @@
|
||||
package net.corda.node.amqp
|
||||
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.internal.div
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.node.services.config.EnterpriseConfiguration
|
||||
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.bridging.BridgeManager
|
||||
import net.corda.nodeapi.internal.bridging.LoopbackBridgeManager
|
||||
import net.corda.nodeapi.internal.bridging.payload
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.MAX_MESSAGE_SIZE
|
||||
import net.corda.testing.core.TestIdentity
|
||||
import net.corda.testing.driver.PortAllocation
|
||||
import net.corda.testing.internal.rigorousMock
|
||||
import net.corda.testing.internal.stubs.CertificateStoreStubs
|
||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.junit.Assert.assertArrayEquals
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import org.junit.rules.TemporaryFolder
|
||||
import org.junit.runner.RunWith
|
||||
import org.junit.runners.Parameterized
|
||||
import java.util.*
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
@RunWith(Parameterized::class)
|
||||
class LoopbackBridgeTest(private val useOpenSsl: Boolean) {
|
||||
companion object {
|
||||
@JvmStatic
|
||||
@Parameterized.Parameters(name = "useOpenSsl = {0}")
|
||||
fun data(): Collection<Boolean> = listOf(false, true)
|
||||
}
|
||||
|
||||
@Rule
|
||||
@JvmField
|
||||
val temporaryFolder = TemporaryFolder()
|
||||
private val log = loggerFor<LoopbackBridgeTest>()
|
||||
private val BOB = TestIdentity(BOB_NAME)
|
||||
private val portAllocation = PortAllocation.Incremental(10000)
|
||||
private val artemisAddress = portAllocation.nextHostAndPort()
|
||||
private val amqpAddress = portAllocation.nextHostAndPort()
|
||||
|
||||
private abstract class AbstractNodeConfiguration : NodeConfiguration
|
||||
|
||||
@Test
|
||||
fun `test acked and nacked messages`() {
|
||||
// Create local queue
|
||||
val sourceQueueName = "internal.peers." + BOB.publicKey.toStringShort()
|
||||
val (artemisServer, artemisClient, bridgeManager) = createArtemis(sourceQueueName)
|
||||
val artemis = artemisClient.started!!
|
||||
|
||||
//Create target artemis inbox
|
||||
val queueName = "p2p.inbound.${BOB.publicKey.toStringShort()}"
|
||||
artemis.session.createQueue(queueName, RoutingType.ANYCAST, queueName, null, true, false,
|
||||
ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(),
|
||||
ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), true, null)
|
||||
|
||||
// Pre-populate local queue with 3 messages
|
||||
for (i in 0 until 3) {
|
||||
val artemisMessage = artemis.session.createMessage(true).apply {
|
||||
putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, ALICE_NAME.toString())
|
||||
putIntProperty(P2PMessagingHeaders.senderUUID, i)
|
||||
writeBodyBufferBytes("Test$i".toByteArray())
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
artemis.producer.send(sourceQueueName, artemisMessage)
|
||||
}
|
||||
|
||||
var consumer = artemis.session.createConsumer(queueName)
|
||||
|
||||
val dedupeSet = mutableSetOf<String>()
|
||||
val receivedSequence = mutableListOf<Int>()
|
||||
val atNodeSequence = mutableListOf<Int>()
|
||||
|
||||
fun formatMessage(expected: String, actual: Int, received: List<Int>): String {
|
||||
return "Expected message with id $expected, got $actual, previous message receive sequence: $received."
|
||||
}
|
||||
|
||||
val received1 = consumer!!.receive(1000)
|
||||
val messageID1 = received1.getIntProperty(P2PMessagingHeaders.senderUUID)
|
||||
assertArrayEquals("Test$messageID1".toByteArray(), received1.payload())
|
||||
assertEquals(0, messageID1)
|
||||
dedupeSet += received1.getStringProperty(HDR_DUPLICATE_DETECTION_ID)
|
||||
received1.acknowledge() // Accept first message
|
||||
receivedSequence += messageID1
|
||||
atNodeSequence += messageID1
|
||||
|
||||
val received2 = consumer.receive(1000)
|
||||
val messageID2 = received2.getIntProperty(P2PMessagingHeaders.senderUUID)
|
||||
assertArrayEquals("Test$messageID2".toByteArray(), received2.payload())
|
||||
assertEquals(1, messageID2, formatMessage("1", messageID2, receivedSequence))
|
||||
consumer.close() // Reject message and don't add to dedupe
|
||||
consumer = artemis.session.createConsumer(queueName)
|
||||
receivedSequence += messageID2 // reflects actual sequence
|
||||
|
||||
// drop things until we get back to the replay
|
||||
while (true) {
|
||||
val received3 = consumer!!.receive(1000)
|
||||
val messageID3 = received3.getIntProperty(P2PMessagingHeaders.senderUUID)
|
||||
assertArrayEquals("Test$messageID3".toByteArray(), received3.payload())
|
||||
receivedSequence += messageID3
|
||||
if (messageID3 != 1) { // keep rejecting any batched items following rejection
|
||||
consumer.close() // Reject message and don't add to dedupe
|
||||
consumer = artemis.session.createConsumer(queueName)
|
||||
} else { // beginnings of replay so accept again
|
||||
received3.acknowledge()
|
||||
val messageId = received3.getStringProperty(HDR_DUPLICATE_DETECTION_ID)
|
||||
if (messageId !in dedupeSet) {
|
||||
dedupeSet += messageId
|
||||
atNodeSequence += messageID3
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// start receiving again, but discarding duplicates
|
||||
while (true) {
|
||||
val received4 = consumer!!.receive(1000)
|
||||
val messageID4 = received4.getIntProperty(P2PMessagingHeaders.senderUUID)
|
||||
assertArrayEquals("Test$messageID4".toByteArray(), received4.payload())
|
||||
receivedSequence += messageID4
|
||||
val messageId = received4.getStringProperty(HDR_DUPLICATE_DETECTION_ID)
|
||||
if (messageId !in dedupeSet) {
|
||||
dedupeSet += messageId
|
||||
atNodeSequence += messageID4
|
||||
}
|
||||
received4.acknowledge()
|
||||
if (messageID4 == 2) { // started to replay messages after rejection point
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Send a fresh item and check receive
|
||||
val artemisMessage = artemis.session.createMessage(true).apply {
|
||||
putStringProperty(P2PMessagingHeaders.bridgedCertificateSubject, ALICE_NAME.toString())
|
||||
putIntProperty(P2PMessagingHeaders.senderUUID, 3)
|
||||
writeBodyBufferBytes("Test3".toByteArray())
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
artemis.producer.send(sourceQueueName, artemisMessage)
|
||||
|
||||
|
||||
// start receiving again, discarding duplicates
|
||||
while (true) {
|
||||
val received5 = consumer.receive(1000)
|
||||
val messageID5 = received5.getIntProperty(P2PMessagingHeaders.senderUUID)
|
||||
assertArrayEquals("Test$messageID5".toByteArray(), received5.payload())
|
||||
receivedSequence += messageID5
|
||||
val messageId = received5.getStringProperty(HDR_DUPLICATE_DETECTION_ID)
|
||||
if (messageId !in dedupeSet) {
|
||||
dedupeSet += messageId
|
||||
atNodeSequence += messageID5
|
||||
}
|
||||
received5.acknowledge()
|
||||
if (messageID5 == 3) { // reached our fresh message
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Message sequence: $receivedSequence")
|
||||
log.info("Deduped sequence: $atNodeSequence")
|
||||
assertEquals(listOf(0, 1, 2, 3), atNodeSequence)
|
||||
consumer.close()
|
||||
bridgeManager.stop()
|
||||
artemisClient.stop()
|
||||
artemisServer.stop()
|
||||
}
|
||||
|
||||
private fun createArtemis(sourceQueueName: String?): Triple<ArtemisMessagingServer, ArtemisMessagingClient, BridgeManager> {
|
||||
val baseDir = temporaryFolder.root.toPath() / "artemis"
|
||||
val certificatesDirectory = baseDir / "certificates"
|
||||
val p2pSslConfiguration = CertificateStoreStubs.P2P.withCertificatesDirectory(certificatesDirectory, useOpenSsl = useOpenSsl)
|
||||
val signingCertificateStore = CertificateStoreStubs.Signing.withCertificatesDirectory(certificatesDirectory)
|
||||
val artemisConfig = rigorousMock<AbstractNodeConfiguration>().also {
|
||||
doReturn(baseDir).whenever(it).baseDirectory
|
||||
doReturn(ALICE_NAME).whenever(it).myLegalName
|
||||
doReturn(certificatesDirectory).whenever(it).certificatesDirectory
|
||||
doReturn(signingCertificateStore).whenever(it).signingCertificateStore
|
||||
doReturn(p2pSslConfiguration).whenever(it).p2pSslOptions
|
||||
doReturn(true).whenever(it).crlCheckSoftFail
|
||||
doReturn(artemisAddress).whenever(it).p2pAddress
|
||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||
}
|
||||
artemisConfig.configureWithDevSSLCertificate()
|
||||
|
||||
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE)
|
||||
|
||||
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize)
|
||||
|
||||
artemisServer.start()
|
||||
artemisClient.start()
|
||||
val bridgeManager = LoopbackBridgeManager({ ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE, confirmationWindowSize = artemisConfig.enterpriseConfiguration.tuning.p2pConfirmationWindowSize) })
|
||||
bridgeManager.start()
|
||||
|
||||
val artemis = artemisClient.started!!
|
||||
if (sourceQueueName != null) {
|
||||
// Local queue for outgoing messages
|
||||
artemis.session.createQueue(sourceQueueName, RoutingType.ANYCAST, sourceQueueName, true)
|
||||
bridgeManager.deployBridge(ALICE_NAME.toString(), sourceQueueName, listOf(amqpAddress), setOf(BOB.name))
|
||||
}
|
||||
return Triple(artemisServer, artemisClient, bridgeManager)
|
||||
}
|
||||
}
|
@ -31,7 +31,7 @@ import java.util.*
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
class P2PMessagingTest : IntegrationTest() {
|
||||
private companion object {
|
||||
private companion object {
|
||||
@ClassRule
|
||||
@JvmField
|
||||
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME.toDatabaseSchemaName(), "DistributedService_0", "DistributedService_1")
|
||||
@ -41,18 +41,19 @@ class P2PMessagingTest : IntegrationTest() {
|
||||
|
||||
@Test
|
||||
fun `communicating with a distributed service which we're part of`() {
|
||||
startDriverWithDistributedService { distributedService ->
|
||||
assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, distributedService[0])
|
||||
startDriverWithDistributedService { originatingNode, distributedService ->
|
||||
assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, originatingNode)
|
||||
}
|
||||
}
|
||||
|
||||
private fun startDriverWithDistributedService(dsl: DriverDSL.(List<InProcess>) -> Unit) {
|
||||
private fun startDriverWithDistributedService(dsl: DriverDSL.(InProcess, List<InProcess>) -> Unit) {
|
||||
driver(DriverParameters(
|
||||
startNodesInProcess = true,
|
||||
extraCordappPackagesToScan = listOf("net.corda.notary.raft"),
|
||||
notarySpecs = listOf(NotarySpec(DISTRIBUTED_SERVICE_NAME, cluster = ClusterSpec.Raft(clusterSize = 2)))
|
||||
)) {
|
||||
dsl(defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as InProcess) })
|
||||
val originatingNode = startNode().get() as InProcess
|
||||
dsl(originatingNode, defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as InProcess) })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@ import com.codahale.metrics.Clock
|
||||
import com.codahale.metrics.MetricRegistry
|
||||
import net.corda.core.crypto.toStringShort
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.ThreadBox
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
@ -35,7 +36,6 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CON
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
|
||||
import net.corda.nodeapi.internal.bridging.BridgeControl
|
||||
@ -225,7 +225,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
inboxes += RemoteInboxAddress(it).queueName
|
||||
}
|
||||
|
||||
inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true) }
|
||||
inboxes.forEach { createQueueIfAbsent(it, producerSession!!, exclusive = true, isServiceAddress = false) }
|
||||
|
||||
p2pConsumer = P2PMessagingConsumer(inboxes, createNewSession, isDrainingModeOn, drainingModeWasChangedEvents)
|
||||
|
||||
@ -282,10 +282,10 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
log.info("Updating bridges on network map change: ${change.node}")
|
||||
fun gatherAddresses(node: NodeInfo): Sequence<BridgeEntry> {
|
||||
return state.locked {
|
||||
node.legalIdentitiesAndCerts.map {
|
||||
node.legalIdentitiesAndCerts.asSequence().map {
|
||||
val messagingAddress = NodeAddress(it.party.owningKey)
|
||||
BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map { it.name })
|
||||
}.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }.asSequence()
|
||||
BridgeEntry(messagingAddress.queueName, node.addresses, node.legalIdentities.map(Party::name), serviceAddress = false)
|
||||
}.filter { producerSession!!.queueQuery(SimpleString(it.queueName)).isExists }
|
||||
}
|
||||
}
|
||||
|
||||
@ -323,7 +323,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
val keyHash = queueName.substring(PEERS_PREFIX.length)
|
||||
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
|
||||
for (node in peers) {
|
||||
val bridge = BridgeEntry(queueName.toString(), node.addresses, node.legalIdentities.map { it.name })
|
||||
val bridge = BridgeEntry(queueName.toString(), node.addresses, node.legalIdentities.map { it.name }, serviceAddress = false)
|
||||
requiredBridges += bridge
|
||||
knownQueues += queueName.toString()
|
||||
}
|
||||
@ -377,7 +377,8 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
requireMessageSize(message.bodySize, maxMessageSize)
|
||||
val topic = message.required(P2PMessagingHeaders.topicProperty) { getStringProperty(it) }
|
||||
val user = requireNotNull(if (externalBridge) {
|
||||
message.getStringProperty(P2PMessagingHeaders.bridgedCertificateSubject) ?: message.getStringProperty(HDR_VALIDATED_USER)
|
||||
message.getStringProperty(P2PMessagingHeaders.bridgedCertificateSubject)
|
||||
?: message.getStringProperty(HDR_VALIDATED_USER)
|
||||
} else {
|
||||
message.getStringProperty(HDR_VALIDATED_USER)
|
||||
}) { "Message is not authenticated" }
|
||||
@ -562,19 +563,20 @@ class P2PMessagingClient(val config: NodeConfiguration,
|
||||
val internalTargetQueue = (address as? ArtemisAddress)?.queueName
|
||||
?: throw IllegalArgumentException("Not an Artemis address")
|
||||
state.locked {
|
||||
createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = false)
|
||||
createQueueIfAbsent(internalTargetQueue, producerSession!!, exclusive = false, isServiceAddress = address is ServiceAddress)
|
||||
}
|
||||
internalTargetQueue
|
||||
}
|
||||
}
|
||||
|
||||
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
|
||||
private fun createQueueIfAbsent(queueName: String, session: ClientSession, exclusive: Boolean) {
|
||||
private fun createQueueIfAbsent(queueName: String, session: ClientSession, exclusive: Boolean, isServiceAddress: Boolean) {
|
||||
fun sendBridgeCreateMessage() {
|
||||
val keyHash = queueName.substring(PEERS_PREFIX.length)
|
||||
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
|
||||
for (node in peers) {
|
||||
val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name })
|
||||
|
||||
val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name }, isServiceAddress)
|
||||
val createBridgeMessage = BridgeControl.Create(config.myLegalName.toString(), bridge)
|
||||
sendBridgeControl(createBridgeMessage)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user