mirror of
synced 2025-02-15 15:12:46 +00:00
add a shared memory port allocator to allow multiple processes to sha… (#5223)
* add a shared memory port allocator to allow multiple processes to share a single allocation pool * remove dangerous reset function on port allocator * set forkCount = 2 in node integration test * only allow one build of a cordapp at any given time for Driver tests * make all portallocation requests use same starting point * globally set forks to 6 * tweak forking parameters to allow parallel builds * tweak unit test parallelism * 2 workers for integrationTest * some more tweaks for parallel builds * some more tweaks for parallel builds * seems that 49K is not the start of ephemeral ports on all kernels * tweak parallel settings * try fix RPC shutdown test in parallel env * add some logging for RPC shutdown test * added some logging around PortAllocation tests - try figure out where they are getting stuck * added some logging around PortAllocation tests - try figure out where they are getting stuck * fix api-scanner tests * minimize api changes * revert to complying with existing API * add the AtomicInteger for api compatibility reasons * make sizing script executable * address review comments pt1 * address review comments pt2 * fix compile errors after review comments * return to using home dir as temp dir seemed to interact badly with gradle
This commit is contained in:
Normal file
Normal file
@ -0,0 +1,11 @@
# Corda Build
## Build Environment Variables
CORDA_CORE_TESTING_FORKS : Number of JVMS to fork for running unit tests in :core
CORDA_NODE_INT_TESTING_FORKS : Number of JVMS to fork for running integration tests in :node
CORDA_NODE_TESTING_FORKS : Number of JVMS to fork for running unit tests in :node
CORDA_INT_TESTING_FORKS : Global number of JVMS to fork for running integration tests
CORDA_TESTING_FORKS : Global number of JVMS to fork for running unit tests
@ -232,6 +232,7 @@ allprojects {
tasks.withType(Test) {
forkEvery = 10
failFast = project.hasProperty('tests.failFast') ? project.property('tests.failFast').toBoolean() : false
// Prevent the project from creating temporary files outside of the build directory.
@ -239,15 +240,6 @@ allprojects {
maxHeapSize = "1g"
if (project.hasProperty('test.parallel') && project.property('test.parallel').toBoolean()) {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) as int ?: 1
if (System.getProperty("test.maxParallelForks") != null) {
maxParallelForks = Integer.getInteger('test.maxParallelForks')
logger.debug("System property test.maxParallelForks found - setting max parallel forks to $maxParallelForks for $project")
if (project.path.startsWith(':experimental') && System.getProperty("experimental.test.enable") == null) {
enabled = false
@ -257,6 +249,16 @@ allprojects {
extensions.configure(TypeOf.typeOf(JacocoTaskExtension)) { ex ->
ex.append = false
maxParallelForks = (System.env.CORDA_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_TESTING_FORKS".toInteger()
systemProperty 'java.security.egd', 'file:/dev/./urandom'
if (name.contains("integrationTest")){
maxParallelForks = (System.env.CORDA_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_INT_TESTING_FORKS".toInteger()
group 'net.corda'
@ -10,15 +10,16 @@ import net.corda.core.internal.toPath
import net.corda.core.messaging.*
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.finance.DOLLARS
import net.corda.finance.POUNDS
import net.corda.finance.USD
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.workflows.getCashBalance
import net.corda.finance.workflows.getCashBalances
import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.finance.workflows.getCashBalance
import net.corda.finance.workflows.getCashBalances
import net.corda.node.internal.NodeWithInfo
import net.corda.node.services.Permissions.Companion.all
import net.corda.testing.common.internal.checkNotOnClasspath
@ -47,6 +48,7 @@ import kotlin.test.assertTrue
class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = listOf(DUMMY_NOTARY_NAME)) {
companion object {
val rpcUser = User("user1", "test", permissions = setOf(all()))
val log = contextLogger()
private lateinit var node: NodeWithInfo
@ -97,13 +99,13 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
val nodeIsShut: PublishSubject<Unit> = PublishSubject.create()
val latch = CountDownLatch(1)
var successful = false
val maxCount = 20
val maxCount = 120
var count = 0
CloseableExecutor(Executors.newSingleThreadScheduledExecutor()).use { scheduler ->
val task = scheduler.scheduleAtFixedRate({
try {
println("Checking whether node is still running...")
log.info("Checking whether node is still running...")
client.start(rpcUser.username, rpcUser.password).use {
println("... node is still running.")
if (count == maxCount) {
@ -112,7 +114,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
} catch (e: RPCException) {
println("... node is not running.")
log.info("... node is not running.")
} catch (e: ActiveMQSecurityException) {
// nothing here - this happens if trying to connect before the node is started
@ -122,7 +124,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries =
}, 1, 1, TimeUnit.SECONDS)
nodeIsShut.doOnError { error ->
log.error("FAILED TO SHUT DOWN NODE DUE TO", error)
successful = false
@ -41,7 +41,7 @@ class RPCStabilityTests {
val testSerialization = SerializationEnvironmentRule(true)
private val pool = Executors.newFixedThreadPool(10, testThreadFactory())
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
fun shutdown() {
@ -140,9 +140,9 @@ configurations {
testArtifacts.extendsFrom testRuntimeClasspath
tasks.withType(Test) {
// fork a new test process for every test class
forkEvery = 10
maxParallelForks = (System.env.CORDA_CORE_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_CORE_TESTING_FORKS".toInteger()
task testJar(type: Jar) {
@ -207,18 +207,15 @@ tasks.withType(JavaCompile) {
options.compilerArgs << '-proc:none'
tasks.withType(Test) {
test {
maxHeapSize = "2g"
// fork a new test process for every test class
forkEvery = 10
maxParallelForks = (System.env.CORDA_NODE_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_TESTING_FORKS".toInteger()
task integrationTest(type: Test) {
testClassesDirs = sourceSets.integrationTest.output.classesDirs
classpath = sourceSets.integrationTest.runtimeClasspath
systemProperty 'testing.global.port.allocation.enabled', true
systemProperty 'testing.global.port.allocation.starting.port', 10000
maxParallelForks = (System.env.CORDA_NODE_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_INT_TESTING_FORKS".toInteger()
// quasar exclusions upon agent code instrumentation at run-time
@ -17,7 +17,7 @@ import java.net.ServerSocket
class AddressBindingFailureTests {
companion object {
private val portAllocation = incrementalPortAllocation(20_000)
private val portAllocation = incrementalPortAllocation()
@ -41,7 +41,7 @@ class AMQPBridgeTest {
private val BOB = TestIdentity(BOB_NAME)
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val artemisAddress = portAllocation.nextHostAndPort()
private val amqpAddress = portAllocation.nextHostAndPort()
@ -72,7 +72,7 @@ class CertificateRevocationListNodeTests {
private val ROOT_CA = DEV_ROOT_CA
private lateinit var INTERMEDIATE_CA: CertificateAndKeyPair
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val serverPort = portAllocation.nextPort()
private lateinit var server: CrlServer
@ -49,7 +49,7 @@ class ProtonWrapperTests {
val temporaryFolder = TemporaryFolder()
private val portAllocation = incrementalPortAllocation(15000) // use 15000 to move us out of harms way
private val portAllocation = incrementalPortAllocation() // use 15000 to move us out of harms way
private val serverPort = portAllocation.nextPort()
private val serverPort2 = portAllocation.nextPort()
private val artemisPort = portAllocation.nextPort()
@ -33,7 +33,7 @@ import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
class FlowsDrainingModeContentionTest {
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val user = User("mark", "dadada", setOf(all()))
private val users = listOf(user)
@ -7,9 +7,7 @@ import net.corda.core.internal.concurrent.map
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.logging.logFile
import net.corda.node.services.Permissions
import net.corda.nodeapi.internal.hasCancelledDrainingShutdown
import net.corda.testing.core.ALICE_NAME
@ -21,7 +19,6 @@ import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.internal.chooseIdentity
import net.corda.testing.node.User
import net.corda.testing.node.internal.waitForShutdown
import org.assertj.core.api.Assertions
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.After
import org.junit.Before
@ -37,7 +34,7 @@ class P2PFlowsDrainingModeTest {
private val logger = contextLogger()
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user)
@ -17,7 +17,7 @@ import org.junit.Test
class RpcFlowsDrainingModeTest {
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val user = User("mark", "dadada", setOf(Permissions.all()))
private val users = listOf(user)
@ -22,7 +22,7 @@ import kotlin.test.assertTrue
class H2SecurityTests {
companion object {
private val port = incrementalPortAllocation(21_000)
private val port = incrementalPortAllocation()
private fun getFreePort() = port.nextPort()
private const val h2AddressKey = "h2Settings.address"
private const val dbPasswordKey = "dataSourceProperties.dataSource.password"
@ -58,7 +58,7 @@ class ArtemisMessagingTest {
val temporaryFolder = TemporaryFolder()
// THe
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private val serverPort = portAllocation.nextPort()
private val identity = generateKeyPair()
@ -40,7 +40,7 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
val testSerialization = SerializationEnvironmentRule(true)
private val cacheTimeout = 1.seconds
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private lateinit var networkMapServer: NetworkMapServer
private lateinit var compatibilityZone: CompatibilityZoneParams
@ -36,7 +36,7 @@ import java.nio.file.Path
import javax.security.auth.x500.X500Principal
class ArtemisRpcTests {
private val ports: PortAllocation = incrementalPortAllocation(10000)
private val ports: PortAllocation = incrementalPortAllocation()
private val user = User("mark", "dadada", setOf(all()))
private val users = listOf(user)
@ -17,10 +17,8 @@ import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueAndPaymentFlow
import net.corda.finance.schemas.CashSchemaV1
import net.corda.node.services.Permissions
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.core.DUMMY_BANK_B_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
@ -28,7 +26,6 @@ import net.corda.testing.driver.internal.OutOfProcessImpl
import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.node.User
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import org.junit.ClassRule
import org.junit.Test
import java.util.*
import java.util.concurrent.CountDownLatch
@ -47,7 +44,7 @@ class RpcReconnectTests {
private val log = contextLogger()
private val portAllocator = incrementalPortAllocation(20006)
private val portAllocator = incrementalPortAllocation()
* This test showcases and stress tests the demo [ReconnectingCordaRPCOps].
@ -63,7 +63,7 @@ class HardRestartTest {
fun restartShortPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
portAllocation = incrementalPortAllocation(10000),
portAllocation = incrementalPortAllocation(),
startNodesInProcess = false,
inMemoryDB = false,
notarySpecs = emptyList(),
@ -101,7 +101,7 @@ class HardRestartTest {
fun restartLongPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
portAllocation = incrementalPortAllocation(10000),
portAllocation = incrementalPortAllocation(),
startNodesInProcess = false,
inMemoryDB = false,
notarySpecs = emptyList(),
@ -139,7 +139,7 @@ class HardRestartTest {
fun softRestartLongPingPongFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<Ping>(), Permissions.all()))
portAllocation = incrementalPortAllocation(10000),
portAllocation = incrementalPortAllocation(),
startNodesInProcess = false,
inMemoryDB = false,
notarySpecs = emptyList(),
@ -221,7 +221,7 @@ class HardRestartTest {
fun restartRecursiveFlowRandomly() {
val demoUser = User("demo", "demo", setOf(Permissions.startFlow<RecursiveA>(), Permissions.all()))
portAllocation = incrementalPortAllocation(10000),
portAllocation = incrementalPortAllocation(),
startNodesInProcess = false,
inMemoryDB = false,
notarySpecs = emptyList(),
@ -53,7 +53,7 @@ class NodeRegistrationTest {
val testSerialization = SerializationEnvironmentRule(true)
private val portAllocation = incrementalPortAllocation(13000)
private val portAllocation = incrementalPortAllocation()
private val registrationHandler = RegistrationHandler(DEV_ROOT_CA)
private lateinit var server: NetworkMapServer
private lateinit var serverHostAndPort: NetworkHostAndPort
@ -25,7 +25,7 @@ import org.junit.Test
import java.util.*
class AdditionP2PAddressModeTest {
private val portAllocation = incrementalPortAllocation(27182)
private val portAllocation = incrementalPortAllocation()
fun `runs nodes with one configured to use additionalP2PAddresses`() {
val testUser = User("test", "test", setOf(all()))
@ -109,7 +109,9 @@ class CordaRPCOpsImplTest {
fun cleanUp() {
if (::mockNet.isInitialized) {
@ -9,11 +9,13 @@ import net.corda.testing.core.singleIdentityAndCert
import net.corda.testing.node.MockServices
import net.corda.testing.node.makeTestIdentityService
import org.bouncycastle.asn1.DEROctetString
import org.junit.Ignore
import org.junit.Test
import kotlin.test.assertEquals
class KMSUtilsTests {
fun `should generate certificates with the correct role`() {
val aliceKey = generateKeyPair()
val alice = getTestPartyAndCertificate(ALICE_NAME, aliceKey.public)
@ -17,7 +17,6 @@ import net.corda.core.utilities.getOrThrow
import net.corda.node.services.schema.NodeSchemaService
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.notary.experimental.raft.RaftNotarySchemaV1
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.internal.incrementalPortAllocation
@ -45,7 +44,7 @@ class RaftTransactionCommitLogTests {
val testSerialization = SerializationEnvironmentRule(true)
private val databases: MutableList<CordaPersistence> = mutableListOf()
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
private lateinit var cluster: List<Member>
@ -19,7 +19,7 @@ class AttachmentDemoTest {
fun `attachment demo using a 10MB zip file`() {
val numOfExpectedBytes = 10_000_000
portAllocation = incrementalPortAllocation(20000),
portAllocation = incrementalPortAllocation(),
startNodesInProcess = true,
cordappsForAllNodes = listOf(findCordapp("net.corda.attachmentdemo.contracts"), findCordapp("net.corda.attachmentdemo.workflows")))
) {
Executable file
Executable file
@ -0,0 +1,33 @@
#!/usr/bin/env bash
echo "Running in shell $(ps -ef | grep $$ | grep -v grep)"
if test ${NUM_CPU} -le 8 ; then
elif test ${NUM_CPU} -gt 8 && test ${NUM_CPU} -le 16 ; then
elif test ${NUM_CPU} -gt 16 && test ${NUM_CPU} -le 32 ; then
echo "CORDA_TESTING_FORKS=${CORDA_TESTING_FORKS}" >> /etc/environment
@ -26,6 +26,12 @@ import net.corda.testing.node.internal.genericDriver
import net.corda.testing.node.internal.getTimestampAsDirectoryName
import net.corda.testing.node.internal.newContext
import rx.Observable
import sun.misc.Unsafe
import sun.nio.ch.DirectBuffer
import java.io.File
import java.io.RandomAccessFile
import java.nio.MappedByteBuffer
import java.nio.channels.FileChannel
import java.nio.file.Path
import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger
@ -66,7 +72,6 @@ interface NodeHandle : AutoCloseable {
fun stop()
/** Interface which represents an out of process node and exposes its process handle. **/
interface OutOfProcess : NodeHandle {
@ -104,41 +109,85 @@ data class WebserverHandle(
val process: Process
* An abstract helper class which is used within the driver to allocate unused ports for testing.
abstract class PortAllocation {
/** Get the next available port **/
abstract fun nextPort(): Int
companion object {
val defaultAllocator: PortAllocation = SharedMemoryIncremental.INSTANCE
const val DEFAULT_START_PORT = 10_000
const val FIRST_EPHEMERAL_PORT = 30_000
/** Get the next available port via [nextPort] and then return a [NetworkHostAndPort] **/
fun nextHostAndPort() = NetworkHostAndPort("localhost", nextPort())
fun nextHostAndPort(): NetworkHostAndPort = NetworkHostAndPort("localhost", nextPort())
* An implementation of [PortAllocation] which allocates ports sequentially
abstract fun nextPort(): Int
@Deprecated("This has been superseded by net.corda.testing.driver.SharedMemoryIncremental.INSTANCE", ReplaceWith("SharedMemoryIncremental.INSTANCE"))
open class Incremental(private val startingPort: Int) : PortAllocation() {
private companion object {
private const val FIRST_EPHEMERAL_PORT = 49152
/** The backing [AtomicInteger] used to keep track of the currently allocated port */
val portCounter = AtomicInteger(startingPort)
@Deprecated("This has been superseded by net.corda.testing.driver.SharedMemoryIncremental.INSTANCE", ReplaceWith("net.corda.testing.driver.DriverDSL.nextPort()"))
val portCounter: AtomicInteger = AtomicInteger()
@Deprecated("This has been superseded by net.corda.testing.driver.SharedMemoryIncremental.INSTANCE", ReplaceWith("net.corda.testing.driver.DriverDSL.nextPort()"))
override fun nextPort(): Int {
return SharedMemoryIncremental.INSTANCE.nextPort()
private class SharedMemoryIncremental private constructor(startPort: Int, endPort: Int, file: File = File(System.getProperty("user.home"), "corda-$startPort-to-$endPort-port-allocator.bin")) : PortAllocation() {
private val startingPoint: Int = startPort
private val endPoint: Int = endPort
private val backingFile: RandomAccessFile = RandomAccessFile(file, "rw")
private val mb: MappedByteBuffer
private val startingAddress: Long
* An implementation of [PortAllocation] which allocates ports sequentially
companion object {
private val UNSAFE: Unsafe = getUnsafe()
private fun getUnsafe(): Unsafe {
val f = Unsafe::class.java.getDeclaredField("theUnsafe")
f.isAccessible = true
return f.get(null) as Unsafe
override fun nextPort(): Int {
return portCounter.getAndUpdate { i ->
val next = i + 1
var oldValue: Long
var newValue: Long
do {
oldValue = UNSAFE.getLongVolatile(null, startingAddress)
newValue = if (oldValue + 1 >= endPoint || oldValue < startingPoint) {
} else {
(oldValue + 1)
} while (!UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue))
return newValue.toInt()
init {
mb = backingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 16)
startingAddress = (mb as DirectBuffer).address()
* A class containing configuration information for Jolokia JMX, to be used when creating a node via the [driver].
@ -152,7 +201,7 @@ data class JmxPolicy
@Deprecated("Use the constructor that just takes in the jmxHttpServerPortAllocation or use JmxPolicy.defaultEnabled()")
val startJmxHttpServer: Boolean = false,
val jmxHttpServerPortAllocation: PortAllocation = incrementalPortAllocation(7005)
val jmxHttpServerPortAllocation: PortAllocation = incrementalPortAllocation()
) {
@Deprecated("The default constructor does not turn on monitoring. Simply leave the jmxPolicy parameter unspecified if you wish to not " +
"have monitoring turned on.")
@ -245,9 +294,9 @@ fun <A> driver(defaultParameters: DriverParameters = DriverParameters(), dsl: Dr
data class DriverParameters(
val isDebug: Boolean = false,
val driverDirectory: Path = Paths.get("build") / "node-driver" / getTimestampAsDirectoryName(),
val portAllocation: PortAllocation = incrementalPortAllocation(10000),
val debugPortAllocation: PortAllocation = incrementalPortAllocation(5005),
val driverDirectory: Path = Paths.get("build") / "node-driver" / getTimestampAsDirectoryName(),
val portAllocation: PortAllocation = incrementalPortAllocation(),
val debugPortAllocation: PortAllocation = incrementalPortAllocation(),
val systemProperties: Map<String, String> = emptyMap(),
val useTestClock: Boolean = false,
val startNodesInProcess: Boolean = false,
@ -266,9 +315,9 @@ data class DriverParameters(
isDebug: Boolean = false,
driverDirectory: Path = Paths.get("build") / "node-driver" / getTimestampAsDirectoryName(),
portAllocation: PortAllocation = incrementalPortAllocation(10000),
debugPortAllocation: PortAllocation = incrementalPortAllocation(5005),
driverDirectory: Path = Paths.get("build") / "node-driver" / getTimestampAsDirectoryName(),
portAllocation: PortAllocation = incrementalPortAllocation(),
debugPortAllocation: PortAllocation = incrementalPortAllocation(),
systemProperties: Map<String, String> = emptyMap(),
useTestClock: Boolean = false,
startNodesInProcess: Boolean = false,
@ -372,6 +421,7 @@ data class DriverParameters(
@Deprecated("extraCordappPackagesToScan does not preserve the original CorDapp's versioning and metadata, which may lead to " +
"misleading results in tests. Use withCordappsForAllNodes instead.")
fun withExtraCordappPackagesToScan(extraCordappPackagesToScan: List<String>): DriverParameters = copy(extraCordappPackagesToScan = extraCordappPackagesToScan)
fun withJmxPolicy(jmxPolicy: JmxPolicy): DriverParameters = copy(jmxPolicy = jmxPolicy)
fun withNetworkParameters(networkParameters: NetworkParameters): DriverParameters = copy(networkParameters = networkParameters)
fun withNotaryCustomOverrides(notaryCustomOverrides: Map<String, Any?>): DriverParameters = copy(notaryCustomOverrides = notaryCustomOverrides)
@ -437,4 +487,4 @@ data class DriverParameters(
notaryCustomOverrides = emptyMap(),
cordappsForAllNodes = cordappsForAllNodes
@ -171,4 +171,9 @@ interface DriverDSL {
* is needed before the node is started.
fun baseDirectory(nodeName: CordaX500Name): Path
* Returns the next port to use when instantiating test processes that must not conflict on the same machine
fun nextPort() = PortAllocation.defaultAllocator.nextPort()
@ -30,7 +30,7 @@ data class NodeParameters(
val verifierType: VerifierType = VerifierType.InMemory,
val customOverrides: Map<String, Any?> = emptyMap(),
val startInSameProcess: Boolean? = null,
val maximumHeapSize: String = "512m",
val maximumHeapSize: String = System.getenv("DRIVER_NODE_MEMORY") ?: "512m",
val additionalCordapps: Collection<TestCordapp> = emptySet(),
val flowOverrides: Map<out Class<out FlowLogic<*>>, Class<out FlowLogic<*>>> = emptyMap(),
val logLevelOverride : String? = null
@ -2,24 +2,7 @@ package net.corda.testing.driver.internal
import net.corda.testing.driver.PortAllocation
fun incrementalPortAllocation(startingPortIfNoEnv: Int): PortAllocation {
return when {
System.getProperty(enablingSystemProperty)?.toBoolean() ?: System.getenv(enablingEnvVar)?.toBoolean() == true -> GlobalTestPortAllocation
else -> PortAllocation.Incremental(startingPortIfNoEnv)
fun incrementalPortAllocation(): PortAllocation {
return PortAllocation.defaultAllocator
private object GlobalTestPortAllocation : PortAllocation.Incremental(startingPort = startingPort())
private const val enablingEnvVar = "TESTING_GLOBAL_PORT_ALLOCATION_ENABLED"
private const val startingPortEnvVariable = "TESTING_GLOBAL_PORT_ALLOCATION_STARTING_PORT"
private val enablingSystemProperty = enablingEnvVar.toLowerCase().replace("_", ".")
private val startingPortSystemProperty = startingPortEnvVariable.toLowerCase().replace("_", ".")
private const val startingPortDefaultValue = 5000
private fun startingPort(): Int {
return System.getProperty(startingPortSystemProperty)?.toIntOrNull() ?: System.getenv(startingPortEnvVariable)?.toIntOrNull() ?: startingPortDefaultValue
@ -56,7 +56,7 @@ constructor(private val cordappPackages: List<String> = emptyList(), private val
private lateinit var defaultNetworkParameters: NetworkParametersCopier
protected val notaryNodes = mutableListOf<NodeWithInfo>()
private val nodes = mutableListOf<NodeWithInfo>()
private val portAllocation = incrementalPortAllocation(10000)
private val portAllocation = incrementalPortAllocation()
init {
System.setProperty("consoleLogLevel", Level.DEBUG.name().toLowerCase())
@ -101,8 +101,8 @@ val rpcTestUser = User("user1", "test", permissions = emptySet())
val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality = "Nowhere", country = "GB")
// Use a global pool so that we can run RPC tests in parallel
private val globalPortAllocation = incrementalPortAllocation(10000)
private val globalDebugPortAllocation = incrementalPortAllocation(5005)
private val globalPortAllocation = incrementalPortAllocation()
private val globalDebugPortAllocation = incrementalPortAllocation()
fun <A> rpcDriver(
isDebug: Boolean = false,
@ -6,6 +6,8 @@ import net.corda.core.utilities.contextLogger
import net.corda.testing.node.TestCordapp
import org.gradle.tooling.GradleConnector
import org.gradle.tooling.ProgressEvent
import java.io.File
import java.io.RandomAccessFile
import java.nio.file.Path
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@ -77,13 +79,20 @@ data class TestCordappImpl(val scanPackage: String, override val config: Map<Str
private fun buildCordappJar(projectRoot: Path): Path {
return projectRootToBuiltJar.computeIfAbsent(projectRoot) {
log.info("Generating CorDapp jar from local project in $projectRoot ...")
val gradleLockFile = RandomAccessFile(File(System.getProperty("user.home"), "corda-gradle.lock"), "rw")
return gradleLockFile.use {
val lock = gradleLockFile.channel.lock()
lock.use {
projectRootToBuiltJar.computeIfAbsent(projectRoot) {
log.info("Generating CorDapp jar from local project in $projectRoot ...")
val libs = projectRoot / "build" / "libs"
val jars = libs.list { it.filter { it.toString().endsWith(".jar") }.toList() }.sortedBy { it.attributes().creationTime() }
checkNotNull(jars.lastOrNull()) { "No jars were built in $libs" }
val libs = projectRoot / "build" / "libs"
val jars = libs.list { it.filter { it.toString().endsWith(".jar") }.toList() }
.sortedBy { it.attributes().creationTime() }
checkNotNull(jars.lastOrNull()) { "No jars were built in $libs" }
@ -0,0 +1,44 @@
package net.corda.testing.node;
import net.corda.testing.driver.PortAllocation;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
public class PortAllocationRunner {
public static void main(@NotNull String[] args) throws IOException {
* each JVM will be launched with [spinnerFile, reportingIndex]
int reportingIndex = Integer.parseInt(args[1]);
RandomAccessFile spinnerBackingFile = new RandomAccessFile(args[0], "rw");
MappedByteBuffer spinnerBuffer = spinnerBackingFile.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, 16);
* notify back to launching process that we are ready to start using the reporting index we were allocated on launch
spinnerBuffer.putShort(reportingIndex, ((short) 10));
* wait for parent process to notify us that all waiting processes are good to go
* do not Thread.sleep as we want to ensure there is as much of an overlap between the ports selected by the spawned processes
* and by sleeping, its frequently the case that one will complete selection before another wakes up resulting in sequential ranges rather than overlapping
while (spinnerBuffer.getShort(0) != 8) {
* allocate ports and print out for later consumption by the spawning test
PortAllocation pa = PortAllocation.getDefaultAllocator();
for (int i = 0; i < 10000; i++) {
@ -0,0 +1,96 @@
package net.corda.testing.driver
import net.corda.testing.node.PortAllocationRunner
import org.assertj.core.util.Files
import org.hamcrest.CoreMatchers.`is`
import org.hamcrest.core.IsNot.not
import org.hamcrest.number.OrderingComparison
import org.junit.Assert
import org.junit.Test
import java.io.RandomAccessFile
import java.nio.channels.FileChannel
import java.util.concurrent.TimeUnit
class PortAllocationTest {
fun `should allocate a port whilst cycling back round if exceeding start of ephemeral range`() {
val startingPoint = PortAllocation.DEFAULT_START_PORT
val portAllocator = PortAllocation.defaultAllocator
var previous = portAllocator.nextPort()
(0 until 1000000).forEach { _ ->
val next = portAllocator.nextPort()
Assert.assertThat(next, `is`(not(previous)))
Assert.assertThat(next, `is`(OrderingComparison.lessThan(PortAllocation.FIRST_EPHEMERAL_PORT)))
if (next == startingPoint) {
Assert.assertThat(previous, `is`(PortAllocation.FIRST_EPHEMERAL_PORT - 1))
} else {
Assert.assertThat(next, `is`(previous + 1))
previous = next
@Test(timeout = 120_000)
fun `should support multiprocess port allocation`() {
println("Starting multiprocess port allocation test")
val spinnerFile = Files.newTemporaryFile().also { it.deleteOnExit() }.absolutePath
val process1 = buildJvmProcess(spinnerFile, 1)
val process2 = buildJvmProcess(spinnerFile, 2)
println("Started child processes")
val processes = listOf(process1, process2)
val spinnerBackingFile = RandomAccessFile(spinnerFile, "rw")
println("Mapped spinner file")
val spinnerBuffer = spinnerBackingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 512)
println("Created spinner buffer")
var timeWaited = 0L
while (spinnerBuffer.getShort(1) != 10.toShort() && spinnerBuffer.getShort(2) != 10.toShort() && timeWaited < 60_000) {
println("Waiting to childProcesses to report back. waited ${timeWaited}ms")
timeWaited += 1000
println("Instructing child processes to start allocating ports")
spinnerBuffer.putShort(0, 8)
println("Waiting for child processes to terminate")
processes.forEach { it.waitFor(1, TimeUnit.MINUTES) }
println("child processes terminated")
val process1Output = process1.inputStream.reader().readLines().toSet()
val process2Output = process2.inputStream.reader().readLines().toSet()
println("child process out captured")
Assert.assertThat(process1Output.size, `is`(10_000))
Assert.assertThat(process2Output.size, `is`(10_000))
//there should be no overlap between the outputs as each process should have been allocated a unique set of ports
Assert.assertThat(process1Output.intersect(process2Output), `is`(emptySet()))
private fun buildJvmProcess(spinnerFile: String, reportingIndex: Int): Process {
val separator = System.getProperty("file.separator")
val classpath = System.getProperty("java.class.path")
val path = (System.getProperty("java.home")
+ separator + "bin" + separator + "java")
val processBuilder = ProcessBuilder(path, "-cp",
return processBuilder.start()
@ -170,7 +170,7 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List<Pair<LoadTest
connectToNodes(remoteNodes, incrementalPortAllocation(configuration.localTunnelStartingPort)) { connections ->
connectToNodes(remoteNodes, incrementalPortAllocation()) { connections ->
log.info("Connected to all nodes!")
val hostNodeMap = ConcurrentHashMap<String, NodeConnection>()
connections.parallelStream().forEach { connection ->
Reference in New Issue
Block a user