node: Driver network map starts parallel with other nodes, uses executor service, pre-create most artemis queues

This commit is contained in:
Andras Slemmer 2016-12-01 13:40:21 +00:00
parent eec16a41cf
commit 7d9caa984b
8 changed files with 104 additions and 72 deletions

View File

@ -7,20 +7,23 @@ import net.corda.node.services.api.RegulatorService
import net.corda.node.services.messaging.ArtemisMessagingComponent
import net.corda.node.services.transactions.SimpleNotaryService
import org.junit.Test
import java.util.concurrent.Executors
class DriverTests {
companion object {
val executorService = Executors.newScheduledThreadPool(2)
fun nodeMustBeUp(nodeInfo: NodeInfo) {
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
// Check that the port is bound
addressMustBeBound(hostAndPort)
addressMustBeBound(executorService, hostAndPort)
}
fun nodeMustBeDown(nodeInfo: NodeInfo) {
val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address)
// Check that the port is bound
addressMustNotBeBound(hostAndPort)
addressMustNotBeBound(executorService, hostAndPort)
}
}

View File

@ -114,16 +114,15 @@ class RaftValidatingNotaryServiceTests : DriverBasedTest() {
waitFor()
}
// Pay ourselves another 10x5 pounds
for (i in 1..10) {
// Pay ourselves another 20x5 pounds
for (i in 1..20) {
val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
}
// Artemis still dispatches some requests to the dead notary but all others should go through.
val notarisationsPerNotary = HashMap<Party, Int>()
notaryStateMachines.expectEvents(isStrict = false) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(15) {
replicate<Pair<NodeInfo, StateMachineUpdate>>(30) {
expect(match = { it.second is StateMachineUpdate.Added }) {
val (notary, update) = it
update as StateMachineUpdate.Added

View File

@ -17,9 +17,9 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NET
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.node.services.messaging.CordaRPCClientImpl
import net.corda.node.services.messaging.NodeMessagingClient.Companion.RPC_QUEUE_REMOVALS_QUEUE
import net.corda.testing.messaging.SimpleMQClient
import net.corda.testing.node.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException

View File

@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.core.*
@ -33,14 +34,13 @@ import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.assertEquals
/**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
@ -67,7 +67,7 @@ interface DriverDSLExposedInterface {
fun startNode(providedName: String? = null,
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList(),
customOverrides: Map<String, Any?> = emptyMap()): Future<NodeHandle>
customOverrides: Map<String, Any?> = emptyMap()): ListenableFuture<NodeHandle>
/**
* Starts a distributed notary cluster.
@ -198,8 +198,8 @@ fun getTimestampAsDirectoryName(): String {
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now())
}
fun addressMustBeBound(hostAndPort: HostAndPort) {
poll("address $hostAndPort to bind") {
fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort): ListenableFuture<Unit> {
return poll(executorService, "address $hostAndPort to bind") {
try {
Socket(hostAndPort.hostText, hostAndPort.port).close()
Unit
@ -209,8 +209,8 @@ fun addressMustBeBound(hostAndPort: HostAndPort) {
}
}
fun addressMustNotBeBound(hostAndPort: HostAndPort) {
poll("address $hostAndPort to unbind") {
fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort): ListenableFuture<Unit> {
return poll(executorService, "address $hostAndPort to unbind") {
try {
Socket(hostAndPort.hostText, hostAndPort.port).close()
null
@ -220,18 +220,36 @@ fun addressMustNotBeBound(hostAndPort: HostAndPort) {
}
}
fun <A> poll(pollName: String, pollIntervalMs: Long = 500, warnCount: Int = 120, f: () -> A?): A {
private fun <A> poll(
executorService: ScheduledExecutorService,
pollName: String,
pollIntervalMs: Long = 500,
warnCount: Int = 120,
check: () -> A?
): ListenableFuture<A> {
val initialResult = check()
val resultFuture = SettableFuture.create<A>()
if (initialResult != null) {
resultFuture.set(initialResult)
return resultFuture
}
var counter = 0
var result = f()
while (result == null) {
if (counter == warnCount) {
log.warn("Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0} seconds...")
}
counter = (counter % warnCount) + 1
Thread.sleep(pollIntervalMs)
result = f()
fun schedulePoll() {
executorService.schedule({
counter++
if (counter == warnCount) {
log.warn("Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0} seconds...")
}
val result = check()
if (result == null) {
schedulePoll()
} else {
resultFuture.set(result)
}
}, pollIntervalMs, MILLISECONDS)
}
return result
schedulePoll()
return resultFuture
}
open class DriverDSL(
@ -241,13 +259,13 @@ open class DriverDSL(
val useTestClock: Boolean,
val isDebug: Boolean
) : DriverDSLInternalInterface {
private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2)
private val networkMapName = "NetworkMapService"
private val networkMapAddress = portAllocation.nextHostAndPort()
class State {
val registeredProcesses = LinkedList<Process>()
val clients = LinkedList<NodeMessagingClient>()
var localServer: ArtemisMessagingServer? = null
}
private val state = ThreadBox(State())
@ -276,11 +294,10 @@ open class DriverDSL(
clients.forEach {
it.stop()
}
localServer?.stop()
registeredProcesses.forEach(Process::destroy)
}
/** Wait 5 seconds, then [Process.destroyForcibly] */
val finishedFuture = future {
val finishedFuture = executorService.submit {
waitForAllNodesToFinish()
}
try {
@ -295,10 +312,8 @@ open class DriverDSL(
}
// Check that we shut down properly
state.locked {
localServer?.run { addressMustNotBeBound(myHostPort) }
}
addressMustNotBeBound(networkMapAddress)
addressMustNotBeBound(executorService, networkMapAddress).get()
executorService.shutdown()
}
private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? {
@ -353,10 +368,9 @@ open class DriverDSL(
configOverrides = configOverrides
)
return future {
val process = DriverDSL.startNode(FullNodeConfiguration(config), quasarJarPath, debugPort)
registerProcess(process)
NodeHandle(queryNodeInfo(apiAddress)!!, config, process)
return startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort).map {
registerProcess(it)
NodeHandle(queryNodeInfo(apiAddress)!!, config, it)
}
}
@ -395,7 +409,7 @@ open class DriverDSL(
startNetworkMapService()
}
private fun startNetworkMapService() {
private fun startNetworkMapService(): ListenableFuture<Unit> {
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
@ -414,7 +428,9 @@ open class DriverDSL(
)
log.info("Starting network-map-service")
registerProcess(startNode(FullNodeConfiguration(config), quasarJarPath, debugPort))
return startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort).map {
registerProcess(it)
}
}
companion object {
@ -428,10 +444,11 @@ open class DriverDSL(
fun <A> pickA(array: Array<A>): A = array[Math.abs(Random().nextInt()) % array.size]
private fun startNode(
executorService: ScheduledExecutorService,
nodeConf: FullNodeConfiguration,
quasarJarPath: String,
debugPort: Int?
): Process {
): ListenableFuture<Process> {
// Write node.conf
writeConfig(nodeConf.basedir, "node.conf", nodeConf.config)
@ -454,13 +471,13 @@ open class DriverDSL(
builder.inheritIO()
builder.directory(nodeConf.basedir.toFile())
val process = builder.start()
addressMustBeBound(nodeConf.artemisAddress)
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
// the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for
// the web api address to be bound as well, as that starts after the services. Needs rethinking.
addressMustBeBound(nodeConf.webAddress)
return process
return Futures.allAsList(
addressMustBeBound(executorService, nodeConf.artemisAddress),
// TODO There is a race condition here. Even though the messaging address is bound it may be the case that
// the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for
// the web api address to be bound as well, as that starts after the services. Needs rethinking.
addressMustBeBound(executorService, nodeConf.webAddress)
).map { process }
}
}
}
@ -469,4 +486,3 @@ fun writeConfig(path: Path, filename: String, config: Config) {
path.toFile().mkdirs()
File("$path/$filename").writeText(config.root().render(ConfigRenderOptions.concise()))
}

View File

@ -142,7 +142,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
runOnStop += Runnable { messageBroker?.stop() }
start()
if (networkMapService is NetworkMapAddress) {
bridgeToNetworkMapService(networkMapService)
deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort)
}
}

View File

@ -39,10 +39,11 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
const val CLIENTS_PREFIX = "clients."
const val P2P_QUEUE = "p2p.inbound"
const val RPC_REQUESTS_QUEUE = "rpc.requests"
const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals"
const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications"
@JvmStatic
val NETWORK_MAP_ADDRESS = SimpleString("${INTERNAL_PREFIX}networkmap")
val NETWORK_MAP_ADDRESS = "${INTERNAL_PREFIX}networkmap"
/**
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
@ -57,16 +58,16 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
}
}
protected interface ArtemisAddress : MessageRecipients {
interface ArtemisAddress : MessageRecipients {
val queueName: SimpleString
}
protected interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient {
interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient {
val hostAndPort: HostAndPort
}
data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisPeerAddress {
override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS
override val queueName = SimpleString(NETWORK_MAP_ADDRESS)
}
/**

View File

@ -25,6 +25,7 @@ import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.security.Role
@ -105,14 +106,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
running = false
}
fun bridgeToNetworkMapService(networkMapService: NetworkMapAddress) {
val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS)
if (!query.isExists) {
activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false)
}
deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort)
}
/**
* The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted.
* The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues.
@ -225,6 +218,36 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
// password is changed from the default (as warned in the docs). Since we don't need this feature we turn it off
// by having its password be an unknown securely random 128-bit value.
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
queueConfigurations.addAll(listOf(
CoreQueueConfiguration().apply {
address = NETWORK_MAP_ADDRESS
name = NETWORK_MAP_ADDRESS
isDurable = true
},
CoreQueueConfiguration().apply {
address = P2P_QUEUE
name = P2P_QUEUE
isDurable = true
},
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
// but these queues are not worth persisting.
CoreQueueConfiguration().apply {
name = RPC_REQUESTS_QUEUE
address = RPC_REQUESTS_QUEUE
isDurable = false
},
// The custom name for the queue is intentional - we may wish other things to subscribe to the
// NOTIFICATIONS_ADDRESS with different filters in future
CoreQueueConfiguration().apply {
name = RPC_QUEUE_REMOVALS_QUEUE
address = NOTIFICATIONS_ADDRESS
isDurable = false
filterString = "_AMQ_NotifType = 1"
}
))
configureAddressSecurity()
}
@ -284,7 +307,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name)
private fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) {
fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) {
if (!connectorExists(hostAndPort)) {
addConnector(hostAndPort)
}

View File

@ -67,8 +67,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
const val TOPIC_PROPERTY = "platform-topic"
const val SESSION_ID_PROPERTY = "session-id"
const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals"
/**
* This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node.
* All other addresses come from the NetworkMapCache, or myAddress below.
@ -136,7 +134,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
producer = session.createProducer()
// Create a queue, consumer and producer for handling P2P network messages.
createQueueIfAbsent(SimpleString(P2P_QUEUE))
p2pConsumer = makeP2PConsumer(session, true)
networkMapRegistrationFuture.success {
state.locked {
@ -150,13 +147,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
// Create an RPC queue and consumer: this will service locally connected clients only (not via a
// bridge) and those clients must have authenticated. We could use a single consumer for everything
// and perhaps we should, but these queues are not worth persisting.
session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
// The custom name for the queue is intentional - we may wish other things to subscribe to the
// NOTIFICATIONS_ADDRESS with different filters in future
session.createTemporaryQueue(NOTIFICATIONS_ADDRESS, RPC_QUEUE_REMOVALS_QUEUE, "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE)
rpcDispatcher = createRPCDispatcher(rpcOps, userService, config.myLegalName)