mirror of
https://github.com/corda/corda.git
synced 2025-01-18 18:56:28 +00:00
CORDA-649: Improve stability of PersistentNetworkMapCacheTest (#1711)
Improve stability of the NetworkMap test by ensuring that cluster of nodes is in a stable state before performing testing
This commit is contained in:
parent
89ef4034c0
commit
d13bf77473
@ -27,6 +27,7 @@ buildscript {
|
||||
ext.jersey_version = '2.25'
|
||||
ext.jolokia_version = '2.0.0-M3'
|
||||
ext.assertj_version = '3.6.1'
|
||||
ext.kotlintest_version = '2.0.5'
|
||||
ext.slf4j_version = '1.7.25'
|
||||
ext.log4j_version = '2.7'
|
||||
ext.bouncycastle_version = constants.getProperty("bouncycastleVersion")
|
||||
|
@ -132,6 +132,7 @@ dependencies {
|
||||
// Unit testing helpers.
|
||||
testCompile "junit:junit:$junit_version"
|
||||
testCompile "org.assertj:assertj-core:${assertj_version}"
|
||||
testCompile "io.kotlintest:kotlintest:${kotlintest_version}"
|
||||
testCompile project(':test-utils')
|
||||
testCompile project(':client:jfx')
|
||||
testCompile project(':finance')
|
||||
|
@ -1,6 +1,8 @@
|
||||
package net.corda.node.services.network
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import io.kotlintest.eventually
|
||||
import io.kotlintest.milliseconds
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
@ -20,10 +22,12 @@ import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFails
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
private const val BRIDGE_RETRY_MS: Long = 100
|
||||
|
||||
class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB)
|
||||
val addressesMap: HashMap<CordaX500Name, NetworkHostAndPort> = HashMap()
|
||||
val infos: MutableSet<NodeInfo> = HashSet()
|
||||
private val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB)
|
||||
private val addressesMap: HashMap<CordaX500Name, NetworkHostAndPort> = HashMap()
|
||||
private val infos: MutableSet<NodeInfo> = HashSet()
|
||||
|
||||
companion object {
|
||||
val logger = loggerFor<PersistentNetworkMapCacheTest>()
|
||||
@ -48,7 +52,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
val res = netCache.getNodeByLegalIdentity(alice.info.chooseIdentity())
|
||||
assertEquals(alice.info, res)
|
||||
val res2 = netCache.getNodeByLegalName(DUMMY_NOTARY.name)
|
||||
assertEquals(infos.filter { DUMMY_NOTARY.name in it.legalIdentitiesAndCerts.map { it.name } }.singleOrNull(), res2)
|
||||
assertEquals(infos.singleOrNull { DUMMY_NOTARY.name in it.legalIdentitiesAndCerts.map { it.name } }, res2)
|
||||
}
|
||||
}
|
||||
|
||||
@ -111,19 +115,23 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
|
||||
@Test
|
||||
fun `new node joins network without network map started`() {
|
||||
|
||||
fun customNodesStart(parties: List<Party>): List<StartedNode<Node>> =
|
||||
startNodesWithPort(parties, noNetworkMap = false, customRetryIntervalMs = BRIDGE_RETRY_MS)
|
||||
|
||||
val parties = partiesList.subList(1, partiesList.size)
|
||||
// Start 2 nodes pointing at network map, but don't start network map service.
|
||||
val otherNodes = startNodesWithPort(parties, noNetworkMap = false)
|
||||
val otherNodes = customNodesStart(parties)
|
||||
otherNodes.forEach { node ->
|
||||
assertTrue(infos.any { it.legalIdentitiesAndCerts.toSet() == node.info.legalIdentitiesAndCerts.toSet() })
|
||||
}
|
||||
// Start node that is not in databases of other nodes. Point to NMS. Which has't started yet.
|
||||
val charlie = startNodesWithPort(listOf(CHARLIE), noNetworkMap = false)[0]
|
||||
val charlie = customNodesStart(listOf(CHARLIE)).single()
|
||||
otherNodes.forEach {
|
||||
assertThat(it.services.networkMapCache.allNodes).doesNotContain(charlie.info)
|
||||
}
|
||||
// Start Network Map and see that charlie node appears in caches.
|
||||
val nms = startNodesWithPort(listOf(DUMMY_NOTARY), noNetworkMap = false)[0]
|
||||
val nms = customNodesStart(listOf(DUMMY_NOTARY)).single()
|
||||
nms.internals.startupComplete.get()
|
||||
assertTrue(nms.inNodeNetworkMapService != NullNetworkMapService)
|
||||
assertTrue(infos.any { it.legalIdentities.toSet() == nms.info.legalIdentities.toSet() })
|
||||
@ -131,24 +139,34 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() {
|
||||
assertTrue(nms.info.chooseIdentity() in it.services.networkMapCache.allNodes.map { it.chooseIdentity() })
|
||||
}
|
||||
charlie.internals.nodeReadyFuture.get() // Finish registration.
|
||||
logger.info("Checking connectivity")
|
||||
checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS.
|
||||
logger.info("Loading caches")
|
||||
val cacheA = otherNodes[0].services.networkMapCache.allNodes
|
||||
val cacheB = otherNodes[1].services.networkMapCache.allNodes
|
||||
val cacheC = charlie.services.networkMapCache.allNodes
|
||||
logger.info("Performing verification")
|
||||
assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap
|
||||
assertThat(cacheB).contains(charlie.info)
|
||||
assertEquals(cacheA.toSet(), cacheB.toSet())
|
||||
assertEquals(cacheA.toSet(), cacheC.toSet())
|
||||
|
||||
val allTheStartedNodesPopulation = otherNodes.plus(charlie).plus(nms)
|
||||
|
||||
// This is prediction of the longest time it will take to get the cluster into a stable state such that further
|
||||
// testing can be performed upon it
|
||||
val maxInstabilityInterval = BRIDGE_RETRY_MS * allTheStartedNodesPopulation.size * 2
|
||||
|
||||
eventually(maxInstabilityInterval.milliseconds) {
|
||||
logger.info("Checking connectivity")
|
||||
checkConnectivity(listOf(otherNodes[0], nms)) // Checks connectivity from A to NMS.
|
||||
logger.info("Loading caches")
|
||||
val cacheA = otherNodes[0].services.networkMapCache.allNodes
|
||||
val cacheB = otherNodes[1].services.networkMapCache.allNodes
|
||||
val cacheC = charlie.services.networkMapCache.allNodes
|
||||
logger.info("Performing verification")
|
||||
assertEquals(4, cacheC.size) // Charlie fetched data from NetworkMap
|
||||
assertThat(cacheB).contains(charlie.info)
|
||||
assertEquals(cacheA.toSet(), cacheB.toSet())
|
||||
assertEquals(cacheA.toSet(), cacheC.toSet())
|
||||
}
|
||||
}
|
||||
|
||||
// HELPERS
|
||||
// Helper function to restart nodes with the same host and port.
|
||||
private fun startNodesWithPort(nodesToStart: List<Party>, noNetworkMap: Boolean = false): List<StartedNode<Node>> {
|
||||
private fun startNodesWithPort(nodesToStart: List<Party>, noNetworkMap: Boolean = false, customRetryIntervalMs: Long? = null): List<StartedNode<Node>> {
|
||||
return nodesToStart.map { party ->
|
||||
val configOverrides = addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()
|
||||
val configOverrides = (addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()) +
|
||||
(customRetryIntervalMs?.let { mapOf("activeMQServer.bridge.retryIntervalMs" to it.toString()) } ?: emptyMap())
|
||||
if (party == DUMMY_NOTARY) {
|
||||
startNetworkMapNode(party.name, configOverrides = configOverrides)
|
||||
}
|
@ -9,6 +9,8 @@ import net.corda.core.internal.cert
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.internal.NetworkMapInfo
|
||||
import net.corda.node.services.config.ActiveMqServerConfiguration
|
||||
import net.corda.node.services.config.BridgeConfiguration
|
||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||
import net.corda.node.services.messaging.sendRequest
|
||||
import net.corda.node.services.network.NetworkMapService
|
||||
@ -60,6 +62,7 @@ class P2PSecurityTest : NodeBasedTest() {
|
||||
baseDirectory = baseDirectory(legalName),
|
||||
myLegalName = legalName).also {
|
||||
whenever(it.networkMapService).thenReturn(NetworkMapInfo(networkMapNode.internals.configuration.p2pAddress, networkMapNode.info.chooseIdentity().name))
|
||||
whenever(it.activeMQServer).thenReturn(ActiveMqServerConfiguration(BridgeConfiguration(1001, 2, 3.4)))
|
||||
}
|
||||
config.configureWithDevSSLCertificate() // This creates the node's TLS cert with the CN as the legal name
|
||||
return SimpleNode(config, trustRoot = trustRoot).apply { start() }
|
||||
|
@ -37,8 +37,17 @@ interface NodeConfiguration : NodeSSLConfiguration {
|
||||
val bftSMaRt: BFTSMaRtConfiguration
|
||||
val notaryNodeAddress: NetworkHostAndPort?
|
||||
val notaryClusterAddresses: List<NetworkHostAndPort>
|
||||
val activeMQServer: ActiveMqServerConfiguration
|
||||
}
|
||||
|
||||
data class BridgeConfiguration(
|
||||
val retryIntervalMs: Long,
|
||||
val maxRetryIntervalMin: Long,
|
||||
val retryIntervalMultiplier: Double
|
||||
)
|
||||
|
||||
data class ActiveMqServerConfiguration(val bridge: BridgeConfiguration)
|
||||
|
||||
data class FullNodeConfiguration(
|
||||
/** This is not retrieved from the config file but rather from a command line argument. */
|
||||
override val baseDirectory: Path,
|
||||
@ -68,7 +77,8 @@ data class FullNodeConfiguration(
|
||||
override val certificateChainCheckPolicies: List<CertChainPolicyConfig>,
|
||||
override val devMode: Boolean = false,
|
||||
val useTestClock: Boolean = false,
|
||||
val detectPublicIp: Boolean = true
|
||||
val detectPublicIp: Boolean = true,
|
||||
override val activeMQServer: ActiveMqServerConfiguration
|
||||
) : NodeConfiguration {
|
||||
override val exportJMXto: String get() = "http"
|
||||
|
||||
@ -104,7 +114,7 @@ enum class CertChainPolicyType {
|
||||
MustContainOneOf
|
||||
}
|
||||
|
||||
data class CertChainPolicyConfig(val role: String, val policy: CertChainPolicyType, val trustedAliases: Set<String>) {
|
||||
data class CertChainPolicyConfig(val role: String, private val policy: CertChainPolicyType, private val trustedAliases: Set<String>) {
|
||||
val certificateChainCheckPolicy: CertificateChainCheckPolicy get() {
|
||||
return when (policy) {
|
||||
CertChainPolicyType.Any -> CertificateChainCheckPolicy.Any
|
||||
|
@ -57,6 +57,7 @@ import java.math.BigInteger
|
||||
import java.security.KeyStore
|
||||
import java.security.KeyStoreException
|
||||
import java.security.Principal
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executor
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
@ -390,10 +391,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
|
||||
isUseDuplicateDetection = true // Enable the bridge's automatic deduplication logic
|
||||
// We keep trying until the network map deems the node unreachable and tells us it's been removed at which
|
||||
// point we destroy the bridge
|
||||
// TODO Give some thought to the retry settings
|
||||
retryInterval = 5.seconds.toMillis()
|
||||
retryIntervalMultiplier = 1.5 // Exponential backoff
|
||||
maxRetryInterval = 3.minutes.toMillis()
|
||||
retryInterval = config.activeMQServer.bridge.retryIntervalMs
|
||||
retryIntervalMultiplier = config.activeMQServer.bridge.retryIntervalMultiplier
|
||||
maxRetryInterval = Duration.ofMinutes(config.activeMQServer.bridge.maxRetryIntervalMin).toMillis()
|
||||
// As a peer of the target node we must connect to it using the peer user. Actual authentication is done using
|
||||
// our TLS certificate.
|
||||
user = PEER_USER
|
||||
|
@ -23,3 +23,10 @@ bftSMaRt = {
|
||||
replicaId = -1
|
||||
debug = false
|
||||
}
|
||||
activeMQServer = {
|
||||
bridge = {
|
||||
retryIntervalMs = 5000
|
||||
retryIntervalMultiplier = 1.5
|
||||
maxRetryIntervalMin = 3
|
||||
}
|
||||
}
|
||||
|
@ -34,7 +34,8 @@ class FullNodeConfigurationTest {
|
||||
notaryNodeAddress = null,
|
||||
notaryClusterAddresses = emptyList(),
|
||||
certificateChainCheckPolicies = emptyList(),
|
||||
devMode = true)
|
||||
devMode = true,
|
||||
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)))
|
||||
|
||||
fun configWithRPCUsername(username: String) {
|
||||
testConfiguration.copy(rpcUsers = listOf(User(username, "pass", emptySet())))
|
||||
|
Loading…
Reference in New Issue
Block a user