Deprecated freeLocalHostAndPort, freePort and getFreeLocalPorts of TestUtils (#3630)

They're prone to cause flaky tests due to the "allocated" ports already being taken by the system when eventually needed. Replaced usages with PortAllocation.Incremental.

Affected unit tests made into integration tests to avoid any issues in the future when unit tests are made to run in parallel.
This commit is contained in:
Shams Asari 2018-07-17 14:42:30 +01:00 committed by GitHub
parent e879de70f3
commit 921b132658
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 71 additions and 40 deletions

View File

@ -13,7 +13,7 @@ import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.eventually
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.freePort
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.internal.*
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
@ -37,7 +37,10 @@ class RPCStabilityTests {
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private val pool = Executors.newFixedThreadPool(10, testThreadFactory())
private val portAllocation = PortAllocation.Incremental(10000)
@After
fun shutdown() {
pool.shutdown()
@ -77,7 +80,6 @@ class RPCStabilityTests {
private fun runBlockAndCheckThreads(block: () -> Unit) {
val executor = Executors.newScheduledThreadPool(1)
try {
// Warm-up so that all the thread pools & co. created
block()
@ -90,7 +92,7 @@ class RPCStabilityTests {
// This is a less than check because threads from other tests may be shutting down while this test is running.
// This is therefore a "best effort" check. When this test is run on its own this should be a strict equality.
// In case of failure we output the threads along with their stacktraces to get an idea what was running at a time.
require(threadsBefore.keys.size >= threadsAfter.keys.size, { "threadsBefore: $threadsBefore\nthreadsAfter: $threadsAfter" })
require(threadsBefore.keys.size >= threadsAfter.keys.size) { "threadsBefore: $threadsBefore\nthreadsAfter: $threadsAfter" }
} finally {
executor.shutdownNow()
}
@ -327,7 +329,7 @@ class RPCStabilityTests {
@Test
fun `client throws RPCException after initial connection attempt fails`() {
val client = CordaRPCClient(NetworkHostAndPort("localhost", freePort()))
val client = CordaRPCClient(portAllocation.nextHostAndPort())
var exceptionMessage: String? = null
try {
client.start("user", "pass").proxy

View File

@ -6,6 +6,10 @@ release, see :doc:`upgrade-notes`.
Unreleased
----------
* ``freeLocalHostAndPort``, ``freePort``, and ``getFreeLocalPorts`` from ``TestUtils`` have been deprecated as they
don't provide any guarantee the returned port will be available which can result in flaky tests. Use ``PortAllocation.Incremental``
instead.
* Docs for IdentityService. assertOwnership updated to correctly state that an UnknownAnonymousPartyException is thrown
rather than IllegalStateException.

View File

@ -4,7 +4,6 @@ import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.crypto.toStringShort
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
@ -15,7 +14,11 @@ import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
import net.corda.nodeapi.internal.bridging.BridgeManager
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.testing.core.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.TestIdentity
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.RoutingType
@ -37,11 +40,9 @@ class AMQPBridgeTest {
private val BOB = TestIdentity(BOB_NAME)
private val artemisPort = freePort()
private val artemisPort2 = freePort()
private val amqpPort = freePort()
private val artemisAddress = NetworkHostAndPort("localhost", artemisPort)
private val amqpAddress = NetworkHostAndPort("localhost", amqpPort)
private val portAllocation = PortAllocation.Incremental(10000)
private val artemisAddress = portAllocation.nextHostAndPort()
private val amqpAddress = portAllocation.nextHostAndPort()
private abstract class AbstractNodeConfiguration : NodeConfiguration
@ -177,7 +178,7 @@ class AMQPBridgeTest {
doReturn(null).whenever(it).jmxMonitoringHttpPort
}
artemisConfig.configureWithDevSSLCertificate()
val artemisServer = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), MAX_MESSAGE_SIZE)
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE)
val artemisClient = ArtemisMessagingClient(artemisConfig, artemisAddress, MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()
@ -209,7 +210,7 @@ class AMQPBridgeTest {
override val maxMessageSize: Int = maxMessageSize
}
return AMQPServer("0.0.0.0",
amqpPort,
amqpAddress.port,
amqpConfig
)
}

View File

@ -19,7 +19,11 @@ import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.testing.core.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.DEV_INTERMEDIATE_CA
import net.corda.testing.internal.DEV_ROOT_CA
import net.corda.testing.internal.rigorousMock
@ -66,7 +70,8 @@ class CertificateRevocationListNodeTests {
private val ROOT_CA = DEV_ROOT_CA
private lateinit var INTERMEDIATE_CA: CertificateAndKeyPair
private val serverPort = freePort()
private val portAllocation = PortAllocation.Incremental(10000)
private val serverPort = portAllocation.nextPort()
private lateinit var server: CrlServer

View File

@ -22,7 +22,11 @@ import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
import net.corda.testing.core.*
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.createDevIntermediateCaCertPath
import net.corda.testing.internal.rigorousMock
import org.apache.activemq.artemis.api.core.RoutingType
@ -32,7 +36,6 @@ import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.security.KeyStore
import java.security.SecureRandom
import java.security.cert.X509Certificate
import javax.net.ssl.*
import kotlin.concurrent.thread
@ -44,9 +47,10 @@ class ProtonWrapperTests {
@JvmField
val temporaryFolder = TemporaryFolder()
private val serverPort = freePort()
private val serverPort2 = freePort()
private val artemisPort = freePort()
private val portAllocation = PortAllocation.Incremental(10000)
private val serverPort = portAllocation.nextPort()
private val serverPort2 = portAllocation.nextPort()
private val artemisPort = portAllocation.nextPort()
private abstract class AbstractNodeConfiguration : NodeConfiguration

View File

@ -16,6 +16,7 @@ import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.*
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -51,7 +52,9 @@ class ArtemisMessagingTest {
@JvmField
val temporaryFolder = TemporaryFolder()
private val serverPort = freePort()
// THe
private val portAllocation = PortAllocation.Incremental(10000)
private val serverPort = portAllocation.nextPort()
private val identity = generateKeyPair()
private lateinit var config: NodeConfiguration
@ -96,7 +99,7 @@ class ArtemisMessagingTest {
@Test
fun `client should connect to remote server`() {
val remoteServerAddress = freeLocalHostAndPort()
val remoteServerAddress = portAllocation.nextHostAndPort()
createMessagingServer(remoteServerAddress.port).start()
createMessagingClient(server = remoteServerAddress)
@ -105,8 +108,8 @@ class ArtemisMessagingTest {
@Test
fun `client should throw if remote server not found`() {
val serverAddress = freeLocalHostAndPort()
val invalidServerAddress = freeLocalHostAndPort()
val serverAddress = portAllocation.nextHostAndPort()
val invalidServerAddress = portAllocation.nextHostAndPort()
createMessagingServer(serverAddress.port).start()

View File

@ -20,12 +20,15 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.freeLocalHostAndPort
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.LogHelper
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.hamcrest.Matchers.instanceOf
import org.junit.*
import org.junit.After
import org.junit.Assert.assertThat
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.time.Clock
import java.time.Instant
import java.util.concurrent.CompletableFuture
@ -39,8 +42,10 @@ class RaftTransactionCommitLogTests {
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private lateinit var cluster: List<Member>
private val databases: MutableList<CordaPersistence> = mutableListOf()
private val portAllocation = PortAllocation.Incremental(10000)
private lateinit var cluster: List<Member>
@Before
fun setup() {
@ -139,9 +144,9 @@ class RaftTransactionCommitLogTests {
}
private fun setUpCluster(nodeCount: Int = 3): List<Member> {
val clusterAddress = freeLocalHostAndPort()
val clusterAddress = portAllocation.nextHostAndPort()
val cluster = mutableListOf(createReplica(clusterAddress))
for (i in 1..nodeCount) cluster.add(createReplica(freeLocalHostAndPort(), clusterAddress))
for (i in 1..nodeCount) cluster.add(createReplica(portAllocation.nextHostAndPort(), clusterAddress))
return cluster.map { it.getOrThrow() }
}

View File

@ -1,12 +1,11 @@
package net.corda.node.services.rpc
import io.netty.channel.unix.Errors
import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.artemis.*
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_SECURITY_CONFIG
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.RPC_SECURITY_CONFIG
import net.corda.core.internal.errors.AddressBindingException
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.nodeapi.BrokerRpcSslOptions
import net.corda.nodeapi.internal.config.SSLConfiguration
@ -20,7 +19,7 @@ import java.nio.file.Path
import java.security.KeyStoreException
import javax.security.auth.login.AppConfigurationEntry
internal class ArtemisRpcBroker internal constructor(
class ArtemisRpcBroker internal constructor(
address: NetworkHostAndPort,
private val adminAddressOptional: NetworkHostAndPort?,
private val sslOptions: BrokerRpcSslOptions?,

View File

@ -13,12 +13,12 @@ import net.corda.node.internal.StartedNode
import net.corda.node.internal.cordapp.CordappLoader
import net.corda.node.services.config.*
import net.corda.nodeapi.internal.config.toConfig
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.nodeapi.internal.network.NetworkParametersCopier
import net.corda.testing.node.User
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.getFreeLocalPorts
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.User
import org.apache.logging.log4j.Level
import org.junit.After
import org.junit.Before
@ -45,6 +45,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
private lateinit var defaultNetworkParameters: NetworkParametersCopier
private val nodes = mutableListOf<StartedNode<Node>>()
private val nodeInfos = mutableListOf<NodeInfo>()
private val portAllocation = PortAllocation.Incremental(10000)
init {
System.setProperty("consoleLogLevel", Level.DEBUG.name().toLowerCase())
@ -84,8 +85,7 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
rpcUsers: List<User> = emptyList(),
configOverrides: Map<String, Any> = emptyMap()): StartedNode<Node> {
val baseDirectory = baseDirectory(legalName).createDirectories()
val localPort = getFreeLocalPorts("localhost", 3)
val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString()
val p2pAddress = configOverrides["p2pAddress"] ?: portAllocation.nextHostAndPort().toString()
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory,
allowMissingConfig = true,
@ -93,8 +93,8 @@ abstract class NodeBasedTest(private val cordappPackages: List<String> = emptyLi
"myLegalName" to legalName.toString(),
"p2pAddress" to p2pAddress,
"devMode" to true,
"rpcSettings.address" to localPort[1].toString(),
"rpcSettings.adminAddress" to localPort[2].toString(),
"rpcSettings.address" to portAllocation.nextHostAndPort().toString(),
"rpcSettings.adminAddress" to portAllocation.nextHostAndPort().toString(),
"rpcUsers" to rpcUsers.map { it.toConfig().root().unwrapped() }
) + configOverrides
)

View File

@ -49,13 +49,17 @@ import java.util.concurrent.atomic.AtomicInteger
fun generateStateRef(): StateRef = StateRef(SecureHash.randomSHA256(), 0)
private val freePortCounter = AtomicInteger(30000)
/**
* Returns a localhost address with a free port.
*
* Unsafe for getting multiple ports!
* Use [getFreeLocalPorts] for getting multiple ports.
*/
fun freeLocalHostAndPort() = NetworkHostAndPort("localhost", freePort())
@Suppress("DEPRECATION")
@Deprecated("Returned port is not guaranteed to be free when used, which can result in flaky tests. Instead use a port " +
"range that's unlikely to be used by the rest of the system, such as PortAllocation.Incremental(10000).")
fun freeLocalHostAndPort(): NetworkHostAndPort = NetworkHostAndPort("localhost", freePort())
/**
* Returns a free port.
@ -63,6 +67,8 @@ fun freeLocalHostAndPort() = NetworkHostAndPort("localhost", freePort())
* Unsafe for getting multiple ports!
* Use [getFreeLocalPorts] for getting multiple ports.
*/
@Deprecated("Returned port is not guaranteed to be free when used, which can result in flaky tests. Instead use a port " +
"range that's unlikely to be used by the rest of the system, such as PortAllocation.Incremental(10000).")
fun freePort(): Int = freePortCounter.getAndAccumulate(0) { prev, _ -> 30000 + (prev - 30000 + 1) % 10000 }
/**
@ -71,6 +77,8 @@ fun freePort(): Int = freePortCounter.getAndAccumulate(0) { prev, _ -> 30000 + (
* Unlikely, but in the time between running this function and handing the ports
* to the Node, some other process else could allocate the returned ports.
*/
@Deprecated("Returned port is not guaranteed to be free when used, which can result in flaky tests. Instead use a port " +
"range that's unlikely to be used by the rest of the system, such as PortAllocation.Incremental(10000).")
fun getFreeLocalPorts(hostName: String, numberToAlloc: Int): List<NetworkHostAndPort> {
val freePort = freePortCounter.getAndAccumulate(0) { prev, _ -> 30000 + (prev - 30000 + numberToAlloc) % 10000 }
return (0 until numberToAlloc).map { NetworkHostAndPort(hostName, freePort + it) }