mirror of
https://github.com/corda/corda.git
synced 2025-06-13 20:58:19 +00:00
Support HA without load balancer (#3889)
Allow configuration in node for additional advertised addresses. fix logic error Use empty list as default config not null Allow multiple addresses in NodeInfo Describe new additionalP2PAddresses property in docs. Add integration test of additionalP2PAddress feature Fixup after rebase Address PR comment Address PR comments by removing unused element of NodeAddress
This commit is contained in:
@ -6,7 +6,6 @@ import net.corda.core.messaging.MessageRecipientGroup
|
||||
import net.corda.core.messaging.MessageRecipients
|
||||
import net.corda.core.messaging.SingleMessageRecipient
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import java.security.PublicKey
|
||||
@ -77,9 +76,7 @@ class ArtemisMessagingComponent {
|
||||
val queueName: String
|
||||
}
|
||||
|
||||
interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient {
|
||||
val hostAndPort: NetworkHostAndPort
|
||||
}
|
||||
interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient
|
||||
|
||||
/**
|
||||
* This is the class used to implement [SingleMessageRecipient], for now. Note that in future this class
|
||||
@ -90,12 +87,11 @@ class ArtemisMessagingComponent {
|
||||
* an advertised service's queue.
|
||||
*
|
||||
* @param queueName The name of the queue this address is associated with.
|
||||
* @param hostAndPort The address of the node.
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class NodeAddress(override val queueName: String, override val hostAndPort: NetworkHostAndPort) : ArtemisPeerAddress {
|
||||
constructor(peerIdentity: PublicKey, hostAndPort: NetworkHostAndPort) :
|
||||
this("$PEERS_PREFIX${peerIdentity.toStringShort()}", hostAndPort)
|
||||
data class NodeAddress(override val queueName: String) : ArtemisPeerAddress {
|
||||
constructor(peerIdentity: PublicKey) :
|
||||
this("$PEERS_PREFIX${peerIdentity.toStringShort()}")
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -4,16 +4,13 @@ import io.netty.channel.EventLoopGroup
|
||||
import io.netty.channel.nio.NioEventLoopGroup
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress.Companion.translateLocalQueueToInboxAddress
|
||||
import net.corda.nodeapi.internal.ArtemisSessionProvider
|
||||
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companion.getBridgeName
|
||||
import net.corda.nodeapi.internal.config.CertificateStore
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||
@ -40,7 +37,7 @@ import kotlin.concurrent.withLock
|
||||
class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||
|
||||
private val lock = ReentrantLock()
|
||||
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
||||
private val queueNamesToBridgesMap = mutableMapOf<String, MutableList<AMQPBridge>>()
|
||||
|
||||
private class AMQPConfigurationImpl private constructor(override val keyStore: CertificateStore,
|
||||
override val trustStore: CertificateStore,
|
||||
@ -66,14 +63,13 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
|
||||
* If the delivery fails the session is rolled back to prevent loss of the message. This may cause duplicate delivery,
|
||||
* however Artemis and the remote Corda instanced will deduplicate these messages.
|
||||
*/
|
||||
private class AMQPBridge(private val queueName: String,
|
||||
private val target: NetworkHostAndPort,
|
||||
private class AMQPBridge(val queueName: String,
|
||||
val targets: List<NetworkHostAndPort>,
|
||||
private val legalNames: Set<CordaX500Name>,
|
||||
private val amqpConfig: AMQPConfiguration,
|
||||
sharedEventGroup: EventLoopGroup,
|
||||
private val artemis: ArtemisSessionProvider) {
|
||||
companion object {
|
||||
fun getBridgeName(queueName: String, hostAndPort: NetworkHostAndPort): String = "$queueName -> $hostAndPort"
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
@ -81,8 +77,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
|
||||
val oldMDC = MDC.getCopyOfContextMap()
|
||||
try {
|
||||
MDC.put("queueName", queueName)
|
||||
MDC.put("target", target.toString())
|
||||
MDC.put("bridgeName", bridgeName)
|
||||
MDC.put("targets", targets.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("legalNames", legalNames.joinToString(separator = ";") { it.toString() })
|
||||
MDC.put("maxMessageSize", amqpConfig.maxMessageSize.toString())
|
||||
block()
|
||||
@ -101,8 +96,7 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
|
||||
|
||||
private fun logWarnWithMDC(msg: String) = withMDC { log.warn(msg) }
|
||||
|
||||
val amqpClient = AMQPClient(listOf(target), legalNames, amqpConfig, sharedThreadPool = sharedEventGroup)
|
||||
val bridgeName: String get() = getBridgeName(queueName, target)
|
||||
val amqpClient = AMQPClient(targets, legalNames, amqpConfig, sharedThreadPool = sharedEventGroup)
|
||||
private val lock = ReentrantLock() // lock to serialise session level access
|
||||
private var session: ClientSession? = null
|
||||
private var consumer: ClientConsumer? = null
|
||||
@ -194,39 +188,37 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
|
||||
}
|
||||
}
|
||||
|
||||
private fun gatherAddresses(node: NodeInfo): List<ArtemisMessagingComponent.NodeAddress> {
|
||||
return node.legalIdentitiesAndCerts.map { ArtemisMessagingComponent.NodeAddress(it.party.owningKey, node.addresses[0]) }
|
||||
}
|
||||
|
||||
override fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>) {
|
||||
if (bridgeExists(getBridgeName(queueName, target))) {
|
||||
return
|
||||
}
|
||||
val newBridge = AMQPBridge(queueName, target, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!)
|
||||
lock.withLock {
|
||||
bridgeNameToBridgeMap[newBridge.bridgeName] = newBridge
|
||||
override fun deployBridge(queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>) {
|
||||
val newBridge = lock.withLock {
|
||||
val bridges = queueNamesToBridgesMap.getOrPut(queueName) { mutableListOf() }
|
||||
for (target in targets) {
|
||||
if (bridges.any { it.targets.contains(target) }) {
|
||||
return
|
||||
}
|
||||
}
|
||||
val newBridge = AMQPBridge(queueName, targets, legalNames, amqpConfig, sharedEventLoopGroup!!, artemis!!)
|
||||
bridges += newBridge
|
||||
newBridge
|
||||
}
|
||||
newBridge.start()
|
||||
}
|
||||
|
||||
override fun destroyBridges(node: NodeInfo) {
|
||||
override fun destroyBridge(queueName: String, targets: List<NetworkHostAndPort>) {
|
||||
lock.withLock {
|
||||
gatherAddresses(node).forEach {
|
||||
val bridge = bridgeNameToBridgeMap.remove(getBridgeName(it.queueName, it.hostAndPort))
|
||||
bridge?.stop()
|
||||
val bridges = queueNamesToBridgesMap[queueName] ?: mutableListOf()
|
||||
for (target in targets) {
|
||||
val bridge = bridges.firstOrNull { it.targets.contains(target) }
|
||||
if (bridge != null) {
|
||||
bridges -= bridge
|
||||
if (bridges.isEmpty()) {
|
||||
queueNamesToBridgesMap.remove(queueName)
|
||||
}
|
||||
bridge.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun destroyBridge(queueName: String, hostAndPort: NetworkHostAndPort) {
|
||||
lock.withLock {
|
||||
val bridge = bridgeNameToBridgeMap.remove(getBridgeName(queueName, hostAndPort))
|
||||
bridge?.stop()
|
||||
}
|
||||
}
|
||||
|
||||
override fun bridgeExists(bridgeName: String): Boolean = lock.withLock { bridgeNameToBridgeMap.containsKey(bridgeName) }
|
||||
|
||||
override fun start() {
|
||||
sharedEventLoopGroup = NioEventLoopGroup(NUM_BRIDGE_THREADS)
|
||||
val artemis = artemisMessageClientFactory()
|
||||
@ -238,13 +230,13 @@ class AMQPBridgeManager(config: MutualSslConfiguration, maxMessageSize: Int, pri
|
||||
|
||||
override fun close() {
|
||||
lock.withLock {
|
||||
for (bridge in bridgeNameToBridgeMap.values) {
|
||||
for (bridge in queueNamesToBridgesMap.values.flatten()) {
|
||||
bridge.stop()
|
||||
}
|
||||
sharedEventLoopGroup?.shutdownGracefully()
|
||||
sharedEventLoopGroup?.terminationFuture()?.sync()
|
||||
sharedEventLoopGroup = null
|
||||
bridgeNameToBridgeMap.clear()
|
||||
queueNamesToBridgesMap.clear()
|
||||
artemis?.stop()
|
||||
}
|
||||
}
|
||||
|
@ -98,7 +98,7 @@ class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
return
|
||||
}
|
||||
for (outQueue in controlMessage.sendQueues) {
|
||||
bridgeManager.deployBridge(outQueue.queueName, outQueue.targets.first(), outQueue.legalNames.toSet())
|
||||
bridgeManager.deployBridge(outQueue.queueName, outQueue.targets, outQueue.legalNames.toSet())
|
||||
}
|
||||
validInboundQueues.addAll(controlMessage.inboxQueues)
|
||||
}
|
||||
@ -110,14 +110,14 @@ class BridgeControlListener(val config: MutualSslConfiguration,
|
||||
log.error("Invalid queue names in control message $controlMessage")
|
||||
return
|
||||
}
|
||||
bridgeManager.deployBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets.first(), controlMessage.bridgeInfo.legalNames.toSet())
|
||||
bridgeManager.deployBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets, controlMessage.bridgeInfo.legalNames.toSet())
|
||||
}
|
||||
is BridgeControl.Delete -> {
|
||||
if (!controlMessage.bridgeInfo.queueName.startsWith(PEERS_PREFIX)) {
|
||||
log.error("Invalid queue names in control message $controlMessage")
|
||||
return
|
||||
}
|
||||
bridgeManager.destroyBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets.first())
|
||||
bridgeManager.destroyBridge(controlMessage.bridgeInfo.queueName, controlMessage.bridgeInfo.targets)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package net.corda.nodeapi.internal.bridging
|
||||
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.VisibleForTesting
|
||||
import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
|
||||
/**
|
||||
@ -10,13 +9,9 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
*/
|
||||
@VisibleForTesting
|
||||
interface BridgeManager : AutoCloseable {
|
||||
fun deployBridge(queueName: String, target: NetworkHostAndPort, legalNames: Set<CordaX500Name>)
|
||||
fun deployBridge(queueName: String, targets: List<NetworkHostAndPort>, legalNames: Set<CordaX500Name>)
|
||||
|
||||
fun destroyBridges(node: NodeInfo)
|
||||
|
||||
fun destroyBridge(queueName: String, hostAndPort: NetworkHostAndPort)
|
||||
|
||||
fun bridgeExists(bridgeName: String): Boolean
|
||||
fun destroyBridge(queueName: String, targets: List<NetworkHostAndPort>)
|
||||
|
||||
fun start()
|
||||
|
||||
|
Reference in New Issue
Block a user