Add raft notary integration tests, testing service addressing

This commit is contained in:
Andras Slemmer 2016-12-13 11:30:19 +00:00 committed by exfalso
parent 7ee88b6ec8
commit 6a796cef35
7 changed files with 235 additions and 44 deletions

View File

@ -8,7 +8,7 @@ import net.corda.core.random63BitValue
import net.corda.core.serialization.OpaqueBytes import net.corda.core.serialization.OpaqueBytes
import net.corda.flows.CashCommand import net.corda.flows.CashCommand
import net.corda.flows.CashFlow import net.corda.flows.CashFlow
import net.corda.node.driver.NodeInfoAndConfig import net.corda.node.driver.NodeHandle
import net.corda.node.driver.driver import net.corda.node.driver.driver
import net.corda.node.services.User import net.corda.node.services.User
import net.corda.node.services.config.configureTestSSL import net.corda.node.services.config.configureTestSSL
@ -30,7 +30,7 @@ class CordaRPCClientTest {
private val stopDriver = CountDownLatch(1) private val stopDriver = CountDownLatch(1)
private var driverThread: Thread? = null private var driverThread: Thread? = null
private lateinit var client: CordaRPCClient private lateinit var client: CordaRPCClient
private lateinit var driverInfo: NodeInfoAndConfig private lateinit var driverInfo: NodeHandle
@Before @Before
fun start() { fun start() {

View File

@ -23,8 +23,12 @@ data class StateMachineInfo(
) )
sealed class StateMachineUpdate(val id: StateMachineRunId) { sealed class StateMachineUpdate(val id: StateMachineRunId) {
class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate(stateMachineInfo.id) class Added(val stateMachineInfo: StateMachineInfo) : StateMachineUpdate(stateMachineInfo.id) {
class Removed(id: StateMachineRunId) : StateMachineUpdate(id) override fun toString() = "Added($id, ${stateMachineInfo.flowLogicClassName})"
}
class Removed(id: StateMachineRunId) : StateMachineUpdate(id) {
override fun toString() = "Removed($id)"
}
} }
/** /**

View File

@ -0,0 +1,159 @@
package net.corda.node.services
import net.corda.core.bufferUntilSubscribed
import net.corda.core.contracts.POUNDS
import net.corda.core.contracts.issuedBy
import net.corda.core.crypto.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.CashFlowResult
import net.corda.node.driver.driver
import net.corda.node.services.config.configureTestSSL
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.messaging.CordaRPCClient
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.testing.expect
import net.corda.testing.expectEvents
import net.corda.testing.replicate
import org.junit.Test
import rx.Observable
import java.net.Inet6Address
import java.net.InetAddress
import java.util.*
import kotlin.test.assertEquals
class RaftValidatingNotaryServiceTests {
@Test
fun `notarisation requests are distributed evenly in raft cluster`() {
driver {
// Start Alice and 3 raft notaries
val clusterSize = 3
val testUser = User("test", "test", permissions = setOf(startFlowPermission<CashFlow>()))
val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser))
val notariesFuture = startNotaryCluster(
"Notary",
rpcUsers = listOf(testUser),
clusterSize = clusterSize,
type = RaftValidatingNotaryService.type
)
val alice = aliceFuture.get().nodeInfo
val (raftNotaryIdentity, notaries) = notariesFuture.get()
assertEquals(notaries.size, clusterSize)
assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size)
// Connect to Alice and the notaries
fun connectRpc(node: NodeInfo): CordaRPCOps {
val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL())
client.start("test", "test")
return client.proxy()
}
val aliceProxy = connectRpc(alice)
val notaryProxies = notaries.map { connectRpc(it.nodeInfo) }
val notaryStateMachines = Observable.from(notaryProxies.map { proxy ->
proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) }
}).flatMap { it }
// Issue 100 pounds, then pay ourselves 50x2 pounds
val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
for (i in 1 .. 50) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(2.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
}
// The state machines added in the notaries should map one-to-one to notarisation requests
val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(50) {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
}
}
}
// The distribution of requests should be very close to sg like 16/17/17 as by default artemis does round robin
println("Notarisation distribution: $notarisationsPerNotary")
require(notarisationsPerNotary.size == 3)
// We allow some leeway for artemis as it doesn't always produce perfect distribution
require(notarisationsPerNotary.values.all { it > 10 })
}
}
@Test
fun `cluster survives if a notary is killed`() {
driver {
// Start Alice and 3 raft notaries
val clusterSize = 3
val testUser = User("test", "test", permissions = setOf(startFlowPermission<CashFlow>()))
val aliceFuture = startNode("Alice", rpcUsers = listOf(testUser))
val notariesFuture = startNotaryCluster(
"Notary",
rpcUsers = listOf(testUser),
clusterSize = clusterSize,
type = RaftValidatingNotaryService.type
)
val alice = aliceFuture.get().nodeInfo
val (raftNotaryIdentity, notaries) = notariesFuture.get()
assertEquals(notaries.size, clusterSize)
assertEquals(notaries.size, notaries.map { it.nodeInfo.legalIdentity }.toSet().size)
// Connect to Alice and the notaries
fun connectRpc(node: NodeInfo): CordaRPCOps {
val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(node.address), configureTestSSL())
client.start("test", "test")
return client.proxy()
}
val aliceProxy = connectRpc(alice)
val notaryProxies = notaries.map { connectRpc(it.nodeInfo) }
val notaryStateMachines = Observable.from(notaryProxies.map { proxy ->
proxy.stateMachinesAndUpdates().second.map { Pair(proxy.nodeIdentity(), it) }
}).flatMap { it.onErrorResumeNext(Observable.empty()) }.bufferUntilSubscribed()
// Issue 100 pounds, then pay ourselves 10x5 pounds
val issueHandle = aliceProxy.startFlow(::CashFlow, CashCommand.IssueCash(100.POUNDS, OpaqueBytes.of(0), alice.legalIdentity, raftNotaryIdentity))
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
for (i in 1 .. 10) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
}
// Now kill a notary
notaries[0].process.apply {
destroy()
waitFor()
}
// Pay ourselves another 10x5 pounds
for (i in 1 .. 10) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
}
// Artemis still dispatches some requests to the dead notary but all others should go through.
val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(15) {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
update as StateMachineUpdate.Added
notarisationsPerNotary.compute(notary.legalIdentity) { _key, number -> number?.plus(1) ?: 1 }
}
}
}
println("Notarisation distribution: $notarisationsPerNotary")
require(notarisationsPerNotary.size == 3)
}
}
}

View File

@ -1,6 +1,7 @@
package net.corda.services.messaging package net.corda.services.messaging
import net.corda.node.driver.driver import net.corda.node.driver.driver
import net.corda.node.driver.getTimestampAsDirectoryName
import org.junit.Test import org.junit.Test
import java.nio.file.Paths import java.nio.file.Paths
import java.time.Instant import java.time.Instant
@ -22,8 +23,4 @@ class ArtemisMessagingServerTest {
arrayOf(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).forEach { it.get(5, TimeUnit.MINUTES) } arrayOf(startNode("NodeA"), startNode("NodeB"), startNode("Notary")).forEach { it.get(5, TimeUnit.MINUTES) }
} }
} }
private fun getTimestampAsDirectoryName(): String {
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(ZoneOffset.UTC).format(Instant.now())
}
} }

View File

@ -37,17 +37,13 @@ import java.util.concurrent.Future
import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
/** /**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests. * This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
* *
* The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first * The process the driver is run in behaves as an Artemis client and starts up other processes. Namely it first
* bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes. * bootstraps a network map service to allow the specified nodes to connect to, then starts up the actual nodes.
*
* TODO The driver actually starts up as an Artemis server now that may route traffic. Fix this once the client MessagingService is done.
* TODO The nodes are started up sequentially which is quite slow. Either speed up node startup or make startup parallel somehow.
* TODO The driver now polls the network map cache for info about newly started up nodes, this could be done asynchronously(?).
* TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought.
*/ */
private val log: Logger = loggerFor<DriverDSL>() private val log: Logger = loggerFor<DriverDSL>()
@ -68,7 +64,7 @@ interface DriverDSLExposedInterface {
fun startNode(providedName: String? = null, fun startNode(providedName: String? = null,
advertisedServices: Set<ServiceInfo> = emptySet(), advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(), rpcUsers: List<User> = emptyList(),
customOverrides: Map<String, Any?> = emptyMap()): Future<NodeInfoAndConfig> customOverrides: Map<String, Any?> = emptyMap()): Future<NodeHandle>
/** /**
* Starts a distributed notary cluster. * Starts a distributed notary cluster.
@ -76,8 +72,13 @@ interface DriverDSLExposedInterface {
* @param notaryName The legal name of the advertised distributed notary service. * @param notaryName The legal name of the advertised distributed notary service.
* @param clusterSize Number of nodes to create for the cluster. * @param clusterSize Number of nodes to create for the cluster.
* @param type The advertised notary service type. Currently the only supported type is [RaftValidatingNotaryService.type]. * @param type The advertised notary service type. Currently the only supported type is [RaftValidatingNotaryService.type].
* @return The [Party] identity of the distributed notary service, and the [NodeInfo]s of the notaries in the cluster.
*/ */
fun startNotaryCluster(notaryName: String, clusterSize: Int = 3, type: ServiceType = RaftValidatingNotaryService.type) fun startNotaryCluster(
notaryName: String,
clusterSize: Int = 3,
type: ServiceType = RaftValidatingNotaryService.type,
rpcUsers: List<User> = emptyList()): Future<Pair<Party, List<NodeHandle>>>
fun waitForAllNodesToFinish() fun waitForAllNodesToFinish()
} }
@ -87,7 +88,11 @@ interface DriverDSLInternalInterface : DriverDSLExposedInterface {
fun shutdown() fun shutdown()
} }
data class NodeInfoAndConfig(val nodeInfo: NodeInfo, val config: Config) data class NodeHandle(
val nodeInfo: NodeInfo,
val config: Config,
val process: Process
)
sealed class PortAllocation { sealed class PortAllocation {
abstract fun nextPort(): Int abstract fun nextPort(): Int
@ -120,7 +125,7 @@ sealed class PortAllocation {
* Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [Future] * Note that [DriverDSL.startNode] does not wait for the node to start up synchronously, but rather returns a [Future]
* of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service. * of the [NodeInfo] that may be waited on, which completes when the new node registered with the network map service.
* *
* The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache]. * The driver implicitly bootstraps a [NetworkMapService].
* *
* @param driverDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node * @param driverDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
* directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>" * directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>"
@ -176,6 +181,9 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
}) })
Runtime.getRuntime().addShutdownHook(shutdownHook) Runtime.getRuntime().addShutdownHook(shutdownHook)
return returnValue return returnValue
} catch (exception: Throwable) {
println("Driver shutting down because of exception $exception")
throw exception
} finally { } finally {
driverDsl.shutdown() driverDsl.shutdown()
if (shutdownHook != null) { if (shutdownHook != null) {
@ -184,7 +192,7 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
} }
} }
private fun getTimestampAsDirectoryName(): String { fun getTimestampAsDirectoryName(): String {
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now()) return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now())
} }
@ -313,7 +321,7 @@ open class DriverDSL(
} }
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>, override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>, customOverrides: Map<String, Any?>): Future<NodeInfoAndConfig> { rpcUsers: List<User>, customOverrides: Map<String, Any?>): Future<NodeHandle> {
val messagingAddress = portAllocation.nextHostAndPort() val messagingAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
@ -344,12 +352,18 @@ open class DriverDSL(
) )
return future { return future {
registerProcess(DriverDSL.startNode(FullNodeConfiguration(config), quasarJarPath, debugPort)) val process = DriverDSL.startNode(FullNodeConfiguration(config), quasarJarPath, debugPort)
NodeInfoAndConfig(queryNodeInfo(apiAddress)!!, config) registerProcess(process)
NodeHandle(queryNodeInfo(apiAddress)!!, config, process)
} }
} }
override fun startNotaryCluster(notaryName: String, clusterSize: Int, type: ServiceType) { override fun startNotaryCluster(
notaryName: String,
clusterSize: Int,
type: ServiceType,
rpcUsers: List<User>
): Future<Pair<Party, List<NodeHandle>>> {
val nodeNames = (1..clusterSize).map { "Notary Node $it" } val nodeNames = (1..clusterSize).map { "Notary Node $it" }
val paths = nodeNames.map { driverDirectory / it } val paths = nodeNames.map { driverDirectory / it }
ServiceIdentityGenerator.generateToDisk(paths, type.id, notaryName) ServiceIdentityGenerator.generateToDisk(paths, type.id, notaryName)
@ -359,12 +373,23 @@ open class DriverDSL(
val notaryClusterAddress = portAllocation.nextHostAndPort() val notaryClusterAddress = portAllocation.nextHostAndPort()
// Start the first node that will bootstrap the cluster // Start the first node that will bootstrap the cluster
startNode(nodeNames.first(), advertisedService, emptyList(), mapOf("notaryNodeAddress" to notaryClusterAddress.toString())) val firstNotaryFuture = startNode(nodeNames.first(), advertisedService, rpcUsers, mapOf("notaryNodeAddress" to notaryClusterAddress.toString()))
// All other nodes will join the cluster // All other nodes will join the cluster
nodeNames.drop(1).forEach { val restNotaryFutures = nodeNames.drop(1).map {
val nodeAddress = portAllocation.nextHostAndPort() val nodeAddress = portAllocation.nextHostAndPort()
val configOverride = mapOf("notaryNodeAddress" to nodeAddress.toString(), "notaryClusterAddresses" to listOf(notaryClusterAddress.toString())) val configOverride = mapOf("notaryNodeAddress" to nodeAddress.toString(), "notaryClusterAddresses" to listOf(notaryClusterAddress.toString()))
startNode(it, advertisedService, emptyList(), configOverride) startNode(it, advertisedService, rpcUsers, configOverride)
}
return future {
val firstNotary = firstNotaryFuture.get()
val notaryParty = firstNotary.nodeInfo.notaryIdentity
val restNotaries = restNotaryFutures.map {
val notary = it.get()
assertEquals(notaryParty, notary.nodeInfo.notaryIdentity)
notary
}
Pair(notaryParty, listOf(firstNotary) + restNotaries)
} }
} }

View File

@ -52,17 +52,20 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
@JvmStatic @JvmStatic
@VisibleForTesting @VisibleForTesting
fun toHostAndPort(target: MessageRecipients): HostAndPort { fun toHostAndPort(target: MessageRecipients): HostAndPort {
val addr = target as? ArtemisMessagingComponent.ArtemisAddress ?: throw IllegalArgumentException("Not an Artemis address") val addr = target as? ArtemisMessagingComponent.ArtemisPeerAddress ?: throw IllegalArgumentException("Not an Artemis address")
return addr.hostAndPort return addr.hostAndPort
} }
} }
protected interface ArtemisAddress : SingleMessageRecipient { protected interface ArtemisAddress : MessageRecipients {
val queueName: SimpleString val queueName: SimpleString
}
protected interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient {
val hostAndPort: HostAndPort val hostAndPort: HostAndPort
} }
data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress { data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisPeerAddress {
override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS
} }
@ -70,8 +73,11 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
* This is the class used to implement [SingleMessageRecipient], for now. Note that in future this class * This is the class used to implement [SingleMessageRecipient], for now. Note that in future this class
* may change or evolve and code that relies upon it being a simple host/port may not function correctly. * may change or evolve and code that relies upon it being a simple host/port may not function correctly.
* For instance it may contain onion routing data. * For instance it may contain onion routing data.
*
* @param queueName The name of the queue this address is associated with. This is either the direct peer queue or
* an advertised service queue.
*/ */
data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisAddress { data class NodeAddress(override val queueName: SimpleString, override val hostAndPort: HostAndPort) : ArtemisPeerAddress {
companion object { companion object {
fun asPeer(identity: CompositeKey, hostAndPort: HostAndPort) = fun asPeer(identity: CompositeKey, hostAndPort: HostAndPort) =
NodeAddress(SimpleString("$PEERS_PREFIX${identity.toBase58String()}"), hostAndPort) NodeAddress(SimpleString("$PEERS_PREFIX${identity.toBase58String()}"), hostAndPort)
@ -81,8 +87,8 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)" override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)"
} }
data class ServiceAddress(val identity: CompositeKey) : MessageRecipientGroup { data class ServiceAddress(val identity: CompositeKey) : ArtemisAddress, MessageRecipientGroup {
val queueName: SimpleString = SimpleString("$SERVICES_PREFIX${identity.toBase58String()}") override val queueName: SimpleString = SimpleString("$SERVICES_PREFIX${identity.toBase58String()}")
} }
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */ /** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */

View File

@ -122,16 +122,16 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service. * TODO : Create the bridge directly from the list of queues on start up when we have a persisted network map service.
*/ */
private fun destroyOrCreateBridges(change: MapChange) { private fun destroyOrCreateBridges(change: MapChange) {
fun addAddresses(node: NodeInfo, target: HashSet<ArtemisAddress>) { fun addAddresses(node: NodeInfo, target: HashSet<ArtemisPeerAddress>) {
val nodeAddress = node.address as ArtemisAddress val nodeAddress = node.address as ArtemisPeerAddress
target.add(nodeAddress) target.add(nodeAddress)
change.node.advertisedServices.forEach { change.node.advertisedServices.forEach {
target.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort)) target.add(NodeAddress.asService(it.identity.owningKey, nodeAddress.hostAndPort))
} }
} }
val addressesToCreateBridgesTo = HashSet<ArtemisAddress>() val addressesToCreateBridgesTo = HashSet<ArtemisPeerAddress>()
val addressesToRemoveBridgesTo = HashSet<ArtemisAddress>() val addressesToRemoveBridgesTo = HashSet<ArtemisPeerAddress>()
when (change) { when (change) {
is MapChange.Modified -> { is MapChange.Modified -> {
addAddresses(change.node, addressesToCreateBridgesTo) addAddresses(change.node, addressesToCreateBridgesTo)
@ -281,17 +281,17 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name) private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name)
private fun maybeDeployBridgeForAddress(address: ArtemisAddress) { private fun maybeDeployBridgeForAddress(peerAddress: ArtemisPeerAddress) {
if (!connectorExists(address.hostAndPort)) { if (!connectorExists(peerAddress.hostAndPort)) {
addConnector(address.hostAndPort) addConnector(peerAddress.hostAndPort)
} }
val bridgeName = bridgeNameForAddress(address) val bridgeName = bridgeNameForAddress(peerAddress)
if (!bridgeExists(bridgeName)) { if (!bridgeExists(bridgeName)) {
deployBridge(bridgeName, address) deployBridge(bridgeName, peerAddress)
} }
} }
private fun bridgeNameForAddress(address: ArtemisAddress) = "${address.queueName}-${address.hostAndPort}" private fun bridgeNameForAddress(peerAddress: ArtemisPeerAddress) = "${peerAddress.queueName}-${peerAddress.hostAndPort}"
/** /**
* All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving * All nodes are expected to have a public facing address called [ArtemisMessagingComponent.P2P_QUEUE] for receiving
@ -299,12 +299,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
* as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's * as defined by ArtemisAddress.queueName. A bridge is then created to forward messages from this queue to the node's
* P2P address. * P2P address.
*/ */
private fun deployBridge(bridgeName: String, address: ArtemisAddress) { private fun deployBridge(bridgeName: String, peerAddress: ArtemisPeerAddress) {
activeMQServer.deployBridge(BridgeConfiguration().apply { activeMQServer.deployBridge(BridgeConfiguration().apply {
name = bridgeName name = bridgeName
queueName = address.queueName.toString() queueName = peerAddress.queueName.toString()
forwardingAddress = P2P_QUEUE forwardingAddress = P2P_QUEUE
staticConnectors = listOf(address.hostAndPort.toString()) staticConnectors = listOf(peerAddress.hostAndPort.toString())
confirmationWindowSize = 100000 // a guess confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using // As a peer of the target node we must connect to it using the peer user. Actual authentication is done using