From 7d9caa984b94de8edf94749d50a3007c07b6187f Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 1 Dec 2016 13:40:21 +0000 Subject: [PATCH] node: Driver network map starts parallel with other nodes, uses executor service, pre-create most artemis queues --- .../net/corda/node/driver/DriverTests.kt | 7 +- .../RaftValidatingNotaryServiceTests.kt | 7 +- .../services/messaging/MQSecurityTest.kt | 2 +- .../kotlin/net/corda/node/driver/Driver.kt | 98 +++++++++++-------- .../kotlin/net/corda/node/internal/Node.kt | 2 +- .../messaging/ArtemisMessagingComponent.kt | 9 +- .../messaging/ArtemisMessagingServer.kt | 41 ++++++-- .../services/messaging/NodeMessagingClient.kt | 10 -- 8 files changed, 104 insertions(+), 72 deletions(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt b/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt index 22fcacc56e..5cce89db62 100644 --- a/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt @@ -7,20 +7,23 @@ import net.corda.node.services.api.RegulatorService import net.corda.node.services.messaging.ArtemisMessagingComponent import net.corda.node.services.transactions.SimpleNotaryService import org.junit.Test +import java.util.concurrent.Executors class DriverTests { companion object { + val executorService = Executors.newScheduledThreadPool(2) + fun nodeMustBeUp(nodeInfo: NodeInfo) { val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) // Check that the port is bound - addressMustBeBound(hostAndPort) + addressMustBeBound(executorService, hostAndPort) } fun nodeMustBeDown(nodeInfo: NodeInfo) { val hostAndPort = ArtemisMessagingComponent.toHostAndPort(nodeInfo.address) // Check that the port is bound - addressMustNotBeBound(hostAndPort) + addressMustNotBeBound(executorService, hostAndPort) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/RaftValidatingNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/RaftValidatingNotaryServiceTests.kt index f023887c11..288cc1a96e 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/RaftValidatingNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/RaftValidatingNotaryServiceTests.kt @@ -114,16 +114,15 @@ class RaftValidatingNotaryServiceTests : DriverBasedTest() { waitFor() } - // Pay ourselves another 10x5 pounds - for (i in 1..10) { + // Pay ourselves another 20x5 pounds + for (i in 1..20) { val payHandle = aliceProxy.startFlow(::CashFlow, CashCommand.PayCash(5.POUNDS.issuedBy(alice.legalIdentity.ref(0)), alice.legalIdentity)) require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success) } - // Artemis still dispatches some requests to the dead notary but all others should go through. val notarisationsPerNotary = HashMap() notaryStateMachines.expectEvents(isStrict = false) { - replicate>(15) { + replicate>(30) { expect(match = { it.second is StateMachineUpdate.Added }) { val (notary, update) = it update as StateMachineUpdate.Added diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index c9bcacdf1f..576c9f9329 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -17,9 +17,9 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NET import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.P2P_QUEUE import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEERS_PREFIX +import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE import net.corda.node.services.messaging.CordaRPCClientImpl -import net.corda.node.services.messaging.NodeMessagingClient.Companion.RPC_QUEUE_REMOVALS_QUEUE import net.corda.testing.messaging.SimpleMQClient import net.corda.testing.node.NodeBasedTest import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException diff --git a/node/src/main/kotlin/net/corda/node/driver/Driver.kt b/node/src/main/kotlin/net/corda/node/driver/Driver.kt index dc2ddc6ade..d17c4d8d18 100644 --- a/node/src/main/kotlin/net/corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/net/corda/node/driver/Driver.kt @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule import com.google.common.net.HostAndPort import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture import com.typesafe.config.Config import com.typesafe.config.ConfigRenderOptions import net.corda.core.* @@ -33,14 +34,13 @@ import java.time.Instant import java.time.ZoneOffset.UTC import java.time.format.DateTimeFormatter import java.util.* -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors import java.util.concurrent.Future +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger -import kotlin.concurrent.thread -import kotlin.test.assertEquals /** * This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests. @@ -67,7 +67,7 @@ interface DriverDSLExposedInterface { fun startNode(providedName: String? = null, advertisedServices: Set = emptySet(), rpcUsers: List = emptyList(), - customOverrides: Map = emptyMap()): Future + customOverrides: Map = emptyMap()): ListenableFuture /** * Starts a distributed notary cluster. @@ -198,8 +198,8 @@ fun getTimestampAsDirectoryName(): String { return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now()) } -fun addressMustBeBound(hostAndPort: HostAndPort) { - poll("address $hostAndPort to bind") { +fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort): ListenableFuture { + return poll(executorService, "address $hostAndPort to bind") { try { Socket(hostAndPort.hostText, hostAndPort.port).close() Unit @@ -209,8 +209,8 @@ fun addressMustBeBound(hostAndPort: HostAndPort) { } } -fun addressMustNotBeBound(hostAndPort: HostAndPort) { - poll("address $hostAndPort to unbind") { +fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort): ListenableFuture { + return poll(executorService, "address $hostAndPort to unbind") { try { Socket(hostAndPort.hostText, hostAndPort.port).close() null @@ -220,18 +220,36 @@ fun addressMustNotBeBound(hostAndPort: HostAndPort) { } } -fun poll(pollName: String, pollIntervalMs: Long = 500, warnCount: Int = 120, f: () -> A?): A { +private fun poll( + executorService: ScheduledExecutorService, + pollName: String, + pollIntervalMs: Long = 500, + warnCount: Int = 120, + check: () -> A? +): ListenableFuture { + val initialResult = check() + val resultFuture = SettableFuture.create() + if (initialResult != null) { + resultFuture.set(initialResult) + return resultFuture + } var counter = 0 - var result = f() - while (result == null) { - if (counter == warnCount) { - log.warn("Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0} seconds...") - } - counter = (counter % warnCount) + 1 - Thread.sleep(pollIntervalMs) - result = f() + fun schedulePoll() { + executorService.schedule({ + counter++ + if (counter == warnCount) { + log.warn("Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0} seconds...") + } + val result = check() + if (result == null) { + schedulePoll() + } else { + resultFuture.set(result) + } + }, pollIntervalMs, MILLISECONDS) } - return result + schedulePoll() + return resultFuture } open class DriverDSL( @@ -241,13 +259,13 @@ open class DriverDSL( val useTestClock: Boolean, val isDebug: Boolean ) : DriverDSLInternalInterface { + private val executorService: ScheduledExecutorService = Executors.newScheduledThreadPool(2) private val networkMapName = "NetworkMapService" private val networkMapAddress = portAllocation.nextHostAndPort() class State { val registeredProcesses = LinkedList() val clients = LinkedList() - var localServer: ArtemisMessagingServer? = null } private val state = ThreadBox(State()) @@ -276,11 +294,10 @@ open class DriverDSL( clients.forEach { it.stop() } - localServer?.stop() registeredProcesses.forEach(Process::destroy) } /** Wait 5 seconds, then [Process.destroyForcibly] */ - val finishedFuture = future { + val finishedFuture = executorService.submit { waitForAllNodesToFinish() } try { @@ -295,10 +312,8 @@ open class DriverDSL( } // Check that we shut down properly - state.locked { - localServer?.run { addressMustNotBeBound(myHostPort) } - } - addressMustNotBeBound(networkMapAddress) + addressMustNotBeBound(executorService, networkMapAddress).get() + executorService.shutdown() } private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? { @@ -353,10 +368,9 @@ open class DriverDSL( configOverrides = configOverrides ) - return future { - val process = DriverDSL.startNode(FullNodeConfiguration(config), quasarJarPath, debugPort) - registerProcess(process) - NodeHandle(queryNodeInfo(apiAddress)!!, config, process) + return startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort).map { + registerProcess(it) + NodeHandle(queryNodeInfo(apiAddress)!!, config, it) } } @@ -395,7 +409,7 @@ open class DriverDSL( startNetworkMapService() } - private fun startNetworkMapService() { + private fun startNetworkMapService(): ListenableFuture { val apiAddress = portAllocation.nextHostAndPort() val debugPort = if (isDebug) debugPortAllocation.nextPort() else null @@ -414,7 +428,9 @@ open class DriverDSL( ) log.info("Starting network-map-service") - registerProcess(startNode(FullNodeConfiguration(config), quasarJarPath, debugPort)) + return startNode(executorService, FullNodeConfiguration(config), quasarJarPath, debugPort).map { + registerProcess(it) + } } companion object { @@ -428,10 +444,11 @@ open class DriverDSL( fun pickA(array: Array): A = array[Math.abs(Random().nextInt()) % array.size] private fun startNode( + executorService: ScheduledExecutorService, nodeConf: FullNodeConfiguration, quasarJarPath: String, debugPort: Int? - ): Process { + ): ListenableFuture { // Write node.conf writeConfig(nodeConf.basedir, "node.conf", nodeConf.config) @@ -454,13 +471,13 @@ open class DriverDSL( builder.inheritIO() builder.directory(nodeConf.basedir.toFile()) val process = builder.start() - addressMustBeBound(nodeConf.artemisAddress) - // TODO There is a race condition here. Even though the messaging address is bound it may be the case that - // the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for - // the web api address to be bound as well, as that starts after the services. Needs rethinking. - addressMustBeBound(nodeConf.webAddress) - - return process + return Futures.allAsList( + addressMustBeBound(executorService, nodeConf.artemisAddress), + // TODO There is a race condition here. Even though the messaging address is bound it may be the case that + // the handlers for the advertised services are not yet registered. A hacky workaround is that we wait for + // the web api address to be bound as well, as that starts after the services. Needs rethinking. + addressMustBeBound(executorService, nodeConf.webAddress) + ).map { process } } } } @@ -469,4 +486,3 @@ fun writeConfig(path: Path, filename: String, config: Config) { path.toFile().mkdirs() File("$path/$filename").writeText(config.root().render(ConfigRenderOptions.concise())) } - diff --git a/node/src/main/kotlin/net/corda/node/internal/Node.kt b/node/src/main/kotlin/net/corda/node/internal/Node.kt index 48778eb305..7e6a466d03 100644 --- a/node/src/main/kotlin/net/corda/node/internal/Node.kt +++ b/node/src/main/kotlin/net/corda/node/internal/Node.kt @@ -142,7 +142,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: runOnStop += Runnable { messageBroker?.stop() } start() if (networkMapService is NetworkMapAddress) { - bridgeToNetworkMapService(networkMapService) + deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort) } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt index 4ce27f38de..275e89802f 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingComponent.kt @@ -39,10 +39,11 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { const val CLIENTS_PREFIX = "clients." const val P2P_QUEUE = "p2p.inbound" const val RPC_REQUESTS_QUEUE = "rpc.requests" + const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals" const val NOTIFICATIONS_ADDRESS = "${INTERNAL_PREFIX}activemq.notifications" @JvmStatic - val NETWORK_MAP_ADDRESS = SimpleString("${INTERNAL_PREFIX}networkmap") + val NETWORK_MAP_ADDRESS = "${INTERNAL_PREFIX}networkmap" /** * Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should @@ -57,16 +58,16 @@ abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() { } } - protected interface ArtemisAddress : MessageRecipients { + interface ArtemisAddress : MessageRecipients { val queueName: SimpleString } - protected interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient { + interface ArtemisPeerAddress : ArtemisAddress, SingleMessageRecipient { val hostAndPort: HostAndPort } data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisPeerAddress { - override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS + override val queueName = SimpleString(NETWORK_MAP_ADDRESS) } /** diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index d13ac023be..fc3cced9b0 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -25,6 +25,7 @@ import net.corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule. import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.config.Configuration +import org.apache.activemq.artemis.core.config.CoreQueueConfiguration import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration import org.apache.activemq.artemis.core.security.Role @@ -105,14 +106,6 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, running = false } - fun bridgeToNetworkMapService(networkMapService: NetworkMapAddress) { - val query = activeMQServer.queueQuery(NETWORK_MAP_ADDRESS) - if (!query.isExists) { - activeMQServer.createQueue(NETWORK_MAP_ADDRESS, NETWORK_MAP_ADDRESS, null, true, false) - } - deployBridgeIfAbsent(networkMapService.queueName, networkMapService.hostAndPort) - } - /** * The bridge will be created automatically when the queues are created, however, this is not the case when the network map restarted. * The queues are restored from the journal, and because the queues are added before we register the callback handler, this method will never get called for existing queues. @@ -225,6 +218,36 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, // password is changed from the default (as warned in the docs). Since we don't need this feature we turn it off // by having its password be an unknown securely random 128-bit value. clusterPassword = BigInteger(128, newSecureRandom()).toString(16) + + queueConfigurations.addAll(listOf( + CoreQueueConfiguration().apply { + address = NETWORK_MAP_ADDRESS + name = NETWORK_MAP_ADDRESS + isDurable = true + }, + CoreQueueConfiguration().apply { + address = P2P_QUEUE + name = P2P_QUEUE + isDurable = true + }, + // Create an RPC queue: this will service locally connected clients only (not via a bridge) and those + // clients must have authenticated. We could use a single consumer for everything and perhaps we should, + // but these queues are not worth persisting. + CoreQueueConfiguration().apply { + name = RPC_REQUESTS_QUEUE + address = RPC_REQUESTS_QUEUE + isDurable = false + }, + // The custom name for the queue is intentional - we may wish other things to subscribe to the + // NOTIFICATIONS_ADDRESS with different filters in future + CoreQueueConfiguration().apply { + name = RPC_QUEUE_REMOVALS_QUEUE + address = NOTIFICATIONS_ADDRESS + isDurable = false + filterString = "_AMQ_NotifType = 1" + } + )) + configureAddressSecurity() } @@ -284,7 +307,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, private fun bridgeExists(name: String) = activeMQServer.clusterManager.bridges.containsKey(name) - private fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) { + fun deployBridgeIfAbsent(queueName: SimpleString, hostAndPort: HostAndPort) { if (!connectorExists(hostAndPort)) { addConnector(hostAndPort) } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 3b0a4ce786..2e4c9d9119 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -67,8 +67,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, const val TOPIC_PROPERTY = "platform-topic" const val SESSION_ID_PROPERTY = "session-id" - const val RPC_QUEUE_REMOVALS_QUEUE = "rpc.qremovals" - /** * This should be the only way to generate an ArtemisAddress and that only of the remote NetworkMapService node. * All other addresses come from the NetworkMapCache, or myAddress below. @@ -136,7 +134,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, producer = session.createProducer() // Create a queue, consumer and producer for handling P2P network messages. - createQueueIfAbsent(SimpleString(P2P_QUEUE)) p2pConsumer = makeP2PConsumer(session, true) networkMapRegistrationFuture.success { state.locked { @@ -150,13 +147,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - // Create an RPC queue and consumer: this will service locally connected clients only (not via a - // bridge) and those clients must have authenticated. We could use a single consumer for everything - // and perhaps we should, but these queues are not worth persisting. - session.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE) - // The custom name for the queue is intentional - we may wish other things to subscribe to the - // NOTIFICATIONS_ADDRESS with different filters in future - session.createTemporaryQueue(NOTIFICATIONS_ADDRESS, RPC_QUEUE_REMOVALS_QUEUE, "_AMQ_NotifType = 1") rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE) rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE) rpcDispatcher = createRPCDispatcher(rpcOps, userService, config.myLegalName)