From d13bf77473f06151659e5f8ab1f787475c0c3f3c Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Thu, 28 Sep 2017 17:46:36 +0100 Subject: [PATCH] CORDA-649: Improve stability of PersistentNetworkMapCacheTest (#1711) Improve stability of the NetworkMap test by ensuring that cluster of nodes is in a stable state before performing testing --- build.gradle | 1 + node/build.gradle | 1 + .../network/PersistentNetworkMapCacheTest.kt | 58 ++++++++++++------- .../services/messaging/P2PSecurityTest.kt | 3 + .../node/services/config/NodeConfiguration.kt | 14 ++++- .../messaging/ArtemisMessagingServer.kt | 8 +-- node/src/main/resources/reference.conf | 7 +++ .../config/FullNodeConfigurationTest.kt | 3 +- 8 files changed, 68 insertions(+), 27 deletions(-) rename node/src/{test => integration-test}/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt (77%) diff --git a/build.gradle b/build.gradle index 97a02e8c18..1a624d147a 100644 --- a/build.gradle +++ b/build.gradle @@ -27,6 +27,7 @@ buildscript { ext.jersey_version = '2.25' ext.jolokia_version = '2.0.0-M3' ext.assertj_version = '3.6.1' + ext.kotlintest_version = '2.0.5' ext.slf4j_version = '1.7.25' ext.log4j_version = '2.7' ext.bouncycastle_version = constants.getProperty("bouncycastleVersion") diff --git a/node/build.gradle b/node/build.gradle index d6b76f8b45..f793837c13 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -132,6 +132,7 @@ dependencies { // Unit testing helpers. testCompile "junit:junit:$junit_version" testCompile "org.assertj:assertj-core:${assertj_version}" + testCompile "io.kotlintest:kotlintest:${kotlintest_version}" testCompile project(':test-utils') testCompile project(':client:jfx') testCompile project(':finance') diff --git a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt similarity index 77% rename from node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt rename to node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt index 0d34b7d142..0e69839461 100644 --- a/node/src/test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt @@ -1,6 +1,8 @@ package net.corda.node.services.network import co.paralleluniverse.fibers.Suspendable +import io.kotlintest.eventually +import io.kotlintest.milliseconds import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession import net.corda.core.flows.InitiatedBy @@ -20,10 +22,12 @@ import kotlin.test.assertEquals import kotlin.test.assertFails import kotlin.test.assertTrue +private const val BRIDGE_RETRY_MS: Long = 100 + class PersistentNetworkMapCacheTest : NodeBasedTest() { - val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB) - val addressesMap: HashMap = HashMap() - val infos: MutableSet = HashSet() + private val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB) + private val addressesMap: HashMap = HashMap() + private val infos: MutableSet = HashSet() companion object { val logger = loggerFor() @@ -48,7 +52,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { val res = netCache.getNodeByLegalIdentity(alice.info.chooseIdentity()) assertEquals(alice.info, res) val res2 = netCache.getNodeByLegalName(DUMMY_NOTARY.name) - assertEquals(infos.filter { DUMMY_NOTARY.name in it.legalIdentitiesAndCerts.map { it.name } }.singleOrNull(), res2) + assertEquals(infos.singleOrNull { DUMMY_NOTARY.name in it.legalIdentitiesAndCerts.map { it.name } }, res2) } } @@ -111,19 +115,23 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { @Test fun `new node joins network without network map started`() { + + fun customNodesStart(parties: List): List> = + startNodesWithPort(parties, noNetworkMap = false, customRetryIntervalMs = BRIDGE_RETRY_MS) + val parties = partiesList.subList(1, partiesList.size) // Start 2 nodes pointing at network map, but don't start network map service. - val otherNodes = startNodesWithPort(parties, noNetworkMap = false) + val otherNodes = customNodesStart(parties) otherNodes.forEach { node -> assertTrue(infos.any { it.legalIdentitiesAndCerts.toSet() == node.info.legalIdentitiesAndCerts.toSet() }) } // Start node that is not in databases of other nodes. Point to NMS. Which has't started yet. - val charlie = startNodesWithPort(listOf(CHARLIE), noNetworkMap = false)[0] + val charlie = customNodesStart(listOf(CHARLIE)).single() otherNodes.forEach { assertThat(it.services.networkMapCache.allNodes).doesNotContain(charlie.info) } // Start Network Map and see that charlie node appears in caches. - val nms = startNodesWithPort(listOf(DUMMY_NOTARY), noNetworkMap = false)[0] + val nms = customNodesStart(listOf(DUMMY_NOTARY)).single() nms.internals.startupComplete.get() assertTrue(nms.inNodeNetworkMapService != NullNetworkMapService) assertTrue(infos.any { it.legalIdentities.toSet() == nms.info.legalIdentities.toSet() }) @@ -131,24 +139,34 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { assertTrue(nms.info.chooseIdentity() in it.services.networkMapCache.allNodes.map { it.chooseIdentity() }) } charlie.internals.nodeReadyFuture.get() // Finish registration. - logger.info("Checking connectivity") - checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS. - logger.info("Loading caches") - val cacheA = otherNodes[0].services.networkMapCache.allNodes - val cacheB = otherNodes[1].services.networkMapCache.allNodes - val cacheC = charlie.services.networkMapCache.allNodes - logger.info("Performing verification") - assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap - assertThat(cacheB).contains(charlie.info) - assertEquals(cacheA.toSet(), cacheB.toSet()) - assertEquals(cacheA.toSet(), cacheC.toSet()) + + val allTheStartedNodesPopulation = otherNodes.plus(charlie).plus(nms) + + // This is prediction of the longest time it will take to get the cluster into a stable state such that further + // testing can be performed upon it + val maxInstabilityInterval = BRIDGE_RETRY_MS * allTheStartedNodesPopulation.size * 2 + + eventually(maxInstabilityInterval.milliseconds) { + logger.info("Checking connectivity") + checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS. + logger.info("Loading caches") + val cacheA = otherNodes[0].services.networkMapCache.allNodes + val cacheB = otherNodes[1].services.networkMapCache.allNodes + val cacheC = charlie.services.networkMapCache.allNodes + logger.info("Performing verification") + assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap + assertThat(cacheB).contains(charlie.info) + assertEquals(cacheA.toSet(), cacheB.toSet()) + assertEquals(cacheA.toSet(), cacheC.toSet()) + } } // HELPERS // Helper function to restart nodes with the same host and port. - private fun startNodesWithPort(nodesToStart: List, noNetworkMap: Boolean = false): List> { + private fun startNodesWithPort(nodesToStart: List, noNetworkMap: Boolean = false, customRetryIntervalMs: Long? = null): List> { return nodesToStart.map { party -> - val configOverrides = addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap() + val configOverrides = (addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()) + + (customRetryIntervalMs?.let { mapOf("activeMQServer.bridge.retryIntervalMs" to it.toString()) } ?: emptyMap()) if (party == DUMMY_NOTARY) { startNetworkMapNode(party.name, configOverrides = configOverrides) } diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt index 8d4b6fab55..317252e531 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PSecurityTest.kt @@ -9,6 +9,8 @@ import net.corda.core.internal.cert import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.internal.NetworkMapInfo +import net.corda.node.services.config.ActiveMqServerConfiguration +import net.corda.node.services.config.BridgeConfiguration import net.corda.node.services.config.configureWithDevSSLCertificate import net.corda.node.services.messaging.sendRequest import net.corda.node.services.network.NetworkMapService @@ -60,6 +62,7 @@ class P2PSecurityTest : NodeBasedTest() { baseDirectory = baseDirectory(legalName), myLegalName = legalName).also { whenever(it.networkMapService).thenReturn(NetworkMapInfo(networkMapNode.internals.configuration.p2pAddress, networkMapNode.info.chooseIdentity().name)) + whenever(it.activeMQServer).thenReturn(ActiveMqServerConfiguration(BridgeConfiguration(1001, 2, 3.4))) } config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name return SimpleNode(config, trustRoot = trustRoot).apply { start() } diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index d078c1df48..7d500b2202 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -37,8 +37,17 @@ interface NodeConfiguration : NodeSSLConfiguration { val bftSMaRt: BFTSMaRtConfiguration val notaryNodeAddress: NetworkHostAndPort? val notaryClusterAddresses: List + val activeMQServer: ActiveMqServerConfiguration } +data class BridgeConfiguration( + val retryIntervalMs: Long, + val maxRetryIntervalMin: Long, + val retryIntervalMultiplier: Double +) + +data class ActiveMqServerConfiguration(val bridge: BridgeConfiguration) + data class FullNodeConfiguration( /** This is not retrieved from the config file but rather from a command line argument. */ override val baseDirectory: Path, @@ -68,7 +77,8 @@ data class FullNodeConfiguration( override val certificateChainCheckPolicies: List, override val devMode: Boolean = false, val useTestClock: Boolean = false, - val detectPublicIp: Boolean = true + val detectPublicIp: Boolean = true, + override val activeMQServer: ActiveMqServerConfiguration ) : NodeConfiguration { override val exportJMXto: String get() = "http" @@ -104,7 +114,7 @@ enum class CertChainPolicyType { MustContainOneOf } -data class CertChainPolicyConfig(val role: String, val policy: CertChainPolicyType, val trustedAliases: Set) { +data class CertChainPolicyConfig(val role: String, private val policy: CertChainPolicyType, private val trustedAliases: Set) { val certificateChainCheckPolicy: CertificateChainCheckPolicy get() { return when (policy) { CertChainPolicyType.Any -> CertificateChainCheckPolicy.Any diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 32c866da19..b648721633 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -57,6 +57,7 @@ import java.math.BigInteger import java.security.KeyStore import java.security.KeyStoreException import java.security.Principal +import java.time.Duration import java.util.* import java.util.concurrent.Executor import java.util.concurrent.ScheduledExecutorService @@ -390,10 +391,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic // We keep trying until the network map deems the node unreachable and tells us it's been removed at which // point we destroy the bridge - // TODO Give some thought to the retry settings - retryInterval = 5.seconds.toMillis() - retryIntervalMultiplier = 1.5 // Exponential backoff - maxRetryInterval = 3.minutes.toMillis() + retryInterval = config.activeMQServer.bridge.retryIntervalMs + retryIntervalMultiplier = config.activeMQServer.bridge.retryIntervalMultiplier + maxRetryInterval = Duration.ofMinutes(config.activeMQServer.bridge.maxRetryIntervalMin).toMillis() // As a peer of the target node we must connect to it using the peer user. Actual authentication is done using // our TLS certificate. user = PEER_USER diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 1b3711c8c4..37a91c5e5e 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -23,3 +23,10 @@ bftSMaRt = { replicaId = -1 debug = false } +activeMQServer = { + bridge = { + retryIntervalMs = 5000 + retryIntervalMultiplier = 1.5 + maxRetryIntervalMin = 3 + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/config/FullNodeConfigurationTest.kt b/node/src/test/kotlin/net/corda/node/services/config/FullNodeConfigurationTest.kt index ef4e7128e4..b4cce7d46f 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/FullNodeConfigurationTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/FullNodeConfigurationTest.kt @@ -34,7 +34,8 @@ class FullNodeConfigurationTest { notaryNodeAddress = null, notaryClusterAddresses = emptyList(), certificateChainCheckPolicies = emptyList(), - devMode = true) + devMode = true, + activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0))) fun configWithRPCUsername(username: String) { testConfiguration.copy(rpcUsers = listOf(User(username, "pass", emptySet())))