From 2e72f784f11930e2efd1fdd1a727229c03504a74 Mon Sep 17 00:00:00 2001 From: Matthew Nesbit Date: Thu, 24 May 2018 16:54:09 +0100 Subject: [PATCH] Late start bridges (unless configured otherwise) if the queue is empty. (#3227) --- .../node/services/config/NodeConfiguration.kt | 4 +- .../services/messaging/P2PMessagingClient.kt | 57 ++++++++++--------- node/src/main/resources/reference.conf | 1 + 3 files changed, 33 insertions(+), 29 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index bedb27badd..465c0b7e7d 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -10,9 +10,9 @@ import net.corda.core.utilities.loggerFor import net.corda.core.utilities.seconds import net.corda.node.internal.artemis.CertificateChainCheckPolicy import net.corda.node.services.config.rpc.NodeRpcOptions -import net.corda.nodeapi.internal.config.* import net.corda.nodeapi.BrokerRpcSslOptions import net.corda.nodeapi.internal.config.NodeSSLConfiguration +import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy import net.corda.nodeapi.internal.config.User import net.corda.nodeapi.internal.config.parseAs import net.corda.nodeapi.internal.persistence.DatabaseConfig @@ -47,6 +47,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val messagingServerExternal: Boolean // TODO Move into DevModeOptions val useTestClock: Boolean get() = false + val lazyBridgeStart: Boolean val detectPublicIp: Boolean get() = true val sshd: SSHDConfiguration? val database: DatabaseConfig @@ -158,6 +159,7 @@ data class NodeConfigurationImpl( override val noLocalShell: Boolean = false, override val devModeOptions: DevModeOptions? = null, override val useTestClock: Boolean = false, + override val lazyBridgeStart: Boolean = true, override val detectPublicIp: Boolean = true, // TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(), diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt index aebc8b5832..c8e65c7c21 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/P2PMessagingClient.kt @@ -15,11 +15,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.serialization.deserialize import net.corda.core.serialization.internal.nodeSerializationEnv import net.corda.core.serialization.serialize -import net.corda.core.utilities.ByteSequence -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.OpaqueBytes -import net.corda.core.utilities.contextLogger -import net.corda.core.utilities.trace +import net.corda.core.utilities.* import net.corda.node.VersionInfo import net.corda.node.internal.LifecycleSupport import net.corda.node.internal.artemis.ReactiveArtemisConsumer @@ -31,15 +27,12 @@ import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.PersistentMap import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport import net.corda.nodeapi.internal.ArtemisMessagingComponent -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress +import net.corda.nodeapi.internal.ArtemisMessagingComponent.* import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX -import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress -import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress import net.corda.nodeapi.internal.bridging.BridgeControl import net.corda.nodeapi.internal.bridging.BridgeEntry import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -50,12 +43,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.ActiveMQClient -import org.apache.activemq.artemis.api.core.client.ClientConsumer -import org.apache.activemq.artemis.api.core.client.ClientMessage -import org.apache.activemq.artemis.api.core.client.ClientProducer -import org.apache.activemq.artemis.api.core.client.ClientSession -import org.apache.activemq.artemis.api.core.client.ServerLocator +import org.apache.activemq.artemis.api.core.client.* import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY import rx.Observable import rx.Subscription @@ -173,6 +161,7 @@ class P2PMessagingClient(val config: NodeConfiguration, private val messageRedeliveryDelaySeconds = config.p2pMessagingRetry.messageRedeliveryDelay.seconds private val state = ThreadBox(InnerState()) private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap()) + private val delayStartQueues = Collections.newSetFromMap(ConcurrentHashMap()) private val handlers = ConcurrentHashMap() @@ -332,7 +321,12 @@ class P2PMessagingClient(val config: NodeConfiguration, val queues = session.addressQuery(SimpleString("$PEERS_PREFIX#")).queueNames for (queue in queues) { - createBridgeEntry(queue) + val queueQuery = session.queueQuery(queue) + if (!config.lazyBridgeStart || queueQuery.messageCount > 0) { + createBridgeEntry(queue) + } else { + delayStartQueues += queue.toString() + } } val startupMessage = BridgeControl.NodeToBridgeSnapshot(myIdentity.toStringShort(), inboxes, requiredBridges) sendBridgeControl(startupMessage) @@ -574,19 +568,26 @@ class P2PMessagingClient(val config: NodeConfiguration, /** Attempts to create a durable queue on the broker which is bound to an address of the same name. */ private fun createQueueIfAbsent(queueName: String, session: ClientSession) { + fun sendBridgeCreateMessage() { + val keyHash = queueName.substring(PEERS_PREFIX.length) + val peers = networkMap.getNodesByOwningKeyIndex(keyHash) + for (node in peers) { + val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name }) + val createBridgeMessage = BridgeControl.Create(myIdentity.toStringShort(), bridge) + sendBridgeControl(createBridgeMessage) + } + } if (!knownQueues.contains(queueName)) { - val queueQuery = session.queueQuery(SimpleString(queueName)) - if (!queueQuery.isExists) { - log.info("Create fresh queue $queueName bound on same address") - session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) - if (queueName.startsWith(PEERS_PREFIX)) { - val keyHash = queueName.substring(PEERS_PREFIX.length) - val peers = networkMap.getNodesByOwningKeyIndex(keyHash) - for (node in peers) { - val bridge = BridgeEntry(queueName, node.addresses, node.legalIdentities.map { it.name }) - val createBridgeMessage = BridgeControl.Create(myIdentity.toStringShort(), bridge) - sendBridgeControl(createBridgeMessage) - } + if (delayStartQueues.contains(queueName)) { + log.info("Start bridge for previously empty queue $queueName") + sendBridgeCreateMessage() + delayStartQueues -= queueName + } else { + val queueQuery = session.queueQuery(SimpleString(queueName)) + if (!queueQuery.isExists) { + log.info("Create fresh queue $queueName bound on same address") + session.createQueue(queueName, RoutingType.ANYCAST, queueName, true) + sendBridgeCreateMessage() } } knownQueues += queueName diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 808b1e1cee..9dea25e930 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -3,6 +3,7 @@ emailAddress = "admin@company.com" keyStorePassword = "cordacadevpass" trustStorePassword = "trustpass" crlCheckSoftFail = true +lazyBridgeStart = true dataSourceProperties = { dataSourceClassName = org.h2.jdbcx.JdbcDataSource dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=100;AUTO_SERVER_PORT="${h2port}