Late start bridges (unless configured otherwise) if the queue is empty. (#3227)

This commit is contained in:
Matthew Nesbit 2018-05-24 16:54:09 +01:00 committed by GitHub
parent 7ff008d4e3
commit 2e72f784f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 29 deletions

View File

@ -10,9 +10,9 @@ import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.seconds
import net.corda.node.internal.artemis.CertificateChainCheckPolicy
import net.corda.node.services.config.rpc.NodeRpcOptions
import net.corda.nodeapi.internal.config.*
import net.corda.nodeapi.BrokerRpcSslOptions
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy
import net.corda.nodeapi.internal.config.User
import net.corda.nodeapi.internal.config.parseAs
import net.corda.nodeapi.internal.persistence.DatabaseConfig
@ -47,6 +47,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val messagingServerExternal: Boolean
// TODO Move into DevModeOptions
val useTestClock: Boolean get() = false
val lazyBridgeStart: Boolean
val detectPublicIp: Boolean get() = true
val sshd: SSHDConfiguration?
val database: DatabaseConfig
@ -158,6 +159,7 @@ data class NodeConfigurationImpl(
override val noLocalShell: Boolean = false,
override val devModeOptions: DevModeOptions? = null,
override val useTestClock: Boolean = false,
override val lazyBridgeStart: Boolean = true,
override val detectPublicIp: Boolean = true,
// TODO See TODO above. Rename this to nodeInfoPollingFrequency and make it of type Duration
override val additionalNodeInfoPollingFrequencyMsec: Long = 5.seconds.toMillis(),

View File

@ -15,11 +15,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.nodeSerializationEnv
import net.corda.core.serialization.serialize
import net.corda.core.utilities.ByteSequence
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.trace
import net.corda.core.utilities.*
import net.corda.node.VersionInfo
import net.corda.node.internal.LifecycleSupport
import net.corda.node.internal.artemis.ReactiveArtemisConsumer
@ -31,15 +27,12 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.PersistentMap
import net.corda.nodeapi.ArtemisTcpTransport.Companion.p2pConnectorTcpTransport
import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ArtemisAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.*
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_CONTROL
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.BRIDGE_NOTIFY
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.JOURNAL_HEADER_SIZE
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.NodeAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.RemoteInboxAddress
import net.corda.nodeapi.internal.ArtemisMessagingComponent.ServiceAddress
import net.corda.nodeapi.internal.bridging.BridgeControl
import net.corda.nodeapi.internal.bridging.BridgeEntry
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -50,12 +43,7 @@ import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ServerLocator
import org.apache.activemq.artemis.api.core.client.*
import org.apache.commons.lang.ArrayUtils.EMPTY_BYTE_ARRAY
import rx.Observable
import rx.Subscription
@ -173,6 +161,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
private val messageRedeliveryDelaySeconds = config.p2pMessagingRetry.messageRedeliveryDelay.seconds
private val state = ThreadBox(InnerState())
private val knownQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val delayStartQueues = Collections.newSetFromMap(ConcurrentHashMap<String, Boolean>())
private val handlers = ConcurrentHashMap<String, MessageHandler>()
@ -332,7 +321,12 @@ class P2PMessagingClient(val config: NodeConfiguration,
val queues = session.addressQuery(SimpleString("$PEERS_PREFIX#")).queueNames
for (queue in queues) {
val queueQuery = session.queueQuery(queue)
if (!config.lazyBridgeStart || queueQuery.messageCount > 0) {
createBridgeEntry(queue)
} else {
delayStartQueues += queue.toString()
}
}
val startupMessage = BridgeControl.NodeToBridgeSnapshot(myIdentity.toStringShort(), inboxes, requiredBridges)
sendBridgeControl(startupMessage)
@ -574,12 +568,7 @@ class P2PMessagingClient(val config: NodeConfiguration,
/** Attempts to create a durable queue on the broker which is bound to an address of the same name. */
private fun createQueueIfAbsent(queueName: String, session: ClientSession) {
if (!knownQueues.contains(queueName)) {
val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address")
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
if (queueName.startsWith(PEERS_PREFIX)) {
fun sendBridgeCreateMessage() {
val keyHash = queueName.substring(PEERS_PREFIX.length)
val peers = networkMap.getNodesByOwningKeyIndex(keyHash)
for (node in peers) {
@ -588,6 +577,18 @@ class P2PMessagingClient(val config: NodeConfiguration,
sendBridgeControl(createBridgeMessage)
}
}
if (!knownQueues.contains(queueName)) {
if (delayStartQueues.contains(queueName)) {
log.info("Start bridge for previously empty queue $queueName")
sendBridgeCreateMessage()
delayStartQueues -= queueName
} else {
val queueQuery = session.queueQuery(SimpleString(queueName))
if (!queueQuery.isExists) {
log.info("Create fresh queue $queueName bound on same address")
session.createQueue(queueName, RoutingType.ANYCAST, queueName, true)
sendBridgeCreateMessage()
}
}
knownQueues += queueName
}

View File

@ -3,6 +3,7 @@ emailAddress = "admin@company.com"
keyStorePassword = "cordacadevpass"
trustStorePassword = "trustpass"
crlCheckSoftFail = true
lazyBridgeStart = true
dataSourceProperties = {
dataSourceClassName = org.h2.jdbcx.JdbcDataSource
dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=100;AUTO_SERVER_PORT="${h2port}