From 88894bc5923f8716e4e010e84e487980d4d8b230 Mon Sep 17 00:00:00 2001 From: Stefano Franz Date: Tue, 2 Jul 2019 18:38:33 +0000 Subject: [PATCH] =?UTF-8?q?add=20a=20shared=20memory=20port=20allocator=20?= =?UTF-8?q?to=20allow=20multiple=20processes=20to=20sha=E2=80=A6=20(#5223)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add a shared memory port allocator to allow multiple processes to share a single allocation pool * remove dangerous reset function on port allocator * set forkCount = 2 in node integration test * only allow one build of a cordapp at any given time for Driver tests * make all portallocation requests use same starting point * globally set forks to 6 * tweak forking parameters to allow parallel builds * tweak unit test parallelism * 2 workers for integrationTest * some more tweaks for parallel builds * some more tweaks for parallel builds * seems that 49K is not the start of ephemeral ports on all kernels * tweak parallel settings * try fix RPC shutdown test in parallel env * add some logging for RPC shutdown test * added some logging around PortAllocation tests - try figure out where they are getting stuck * added some logging around PortAllocation tests - try figure out where they are getting stuck * fix api-scanner tests * minimize api changes * revert to complying with existing API * add the AtomicInteger for api compatibility reasons * make sizing script executable * address review comments pt1 * address review comments pt2 * fix compile errors after review comments * return to using home dir as temp dir seemed to interact badly with gradle --- BUILD.md | 11 ++ build.gradle | 20 ++-- .../corda/client/rpc/CordaRPCClientTest.kt | 14 ++- .../net/corda/client/rpc/RPCStabilityTests.kt | 2 +- core/build.gradle | 6 +- node/build.gradle | 9 +- .../corda/node/AddressBindingFailureTests.kt | 2 +- .../net/corda/node/amqp/AMQPBridgeTest.kt | 2 +- .../CertificateRevocationListNodeTests.kt | 2 +- .../net/corda/node/amqp/ProtonWrapperTests.kt | 2 +- .../FlowsDrainingModeContentionTest.kt | 2 +- .../draining/P2PFlowsDrainingModeTest.kt | 5 +- .../draining/RpcFlowsDrainingModeTest.kt | 2 +- .../corda/node/persistence/H2SecurityTests.kt | 2 +- .../messaging/ArtemisMessagingTest.kt | 2 +- .../node/services/network/NetworkMapTest.kt | 2 +- .../node/services/rpc/ArtemisRpcTests.kt | 2 +- .../node/services/rpc/RpcReconnectTests.kt | 5 +- .../services/statemachine/HardRestartTest.kt | 8 +- .../registration/NodeRegistrationTest.kt | 2 +- .../messaging/AdditionP2PAddressModeTest.kt | 2 +- .../net/corda/node/CordaRPCOpsImplTest.kt | 4 +- .../corda/node/services/keys/KMSUtilsTests.kt | 2 + .../raft/RaftTransactionCommitLogTests.kt | 3 +- .../attachmentdemo/AttachmentDemoTest.kt | 2 +- sizing.sh | 33 ++++++ .../kotlin/net/corda/testing/driver/Driver.kt | 106 +++++++++++++----- .../net/corda/testing/driver/DriverDSL.kt | 5 + .../corda/testing/driver/NodeParameters.kt | 2 +- .../internal/GlobalTestPortAllocation.kt | 21 +--- .../testing/node/internal/NodeBasedTest.kt | 2 +- .../corda/testing/node/internal/RPCDriver.kt | 4 +- .../testing/node/internal/TestCordappImpl.kt | 21 +++- .../testing/node/PortAllocationRunner.java | 44 ++++++++ .../testing/driver/PortAllocationTest.kt | 96 ++++++++++++++++ .../kotlin/net/corda/loadtest/LoadTest.kt | 2 +- 36 files changed, 340 insertions(+), 111 deletions(-) create mode 100644 BUILD.md create mode 100755 sizing.sh create mode 100644 testing/node-driver/src/test/java/net/corda/testing/node/PortAllocationRunner.java create mode 100644 testing/node-driver/src/test/kotlin/net/corda/testing/driver/PortAllocationTest.kt diff --git a/BUILD.md b/BUILD.md new file mode 100644 index 0000000000..c19f7e4c20 --- /dev/null +++ b/BUILD.md @@ -0,0 +1,11 @@ +# Corda Build + +## Build Environment Variables + +CORDA_CORE_TESTING_FORKS : Number of JVMS to fork for running unit tests in :core + +CORDA_NODE_INT_TESTING_FORKS : Number of JVMS to fork for running integration tests in :node +CORDA_NODE_TESTING_FORKS : Number of JVMS to fork for running unit tests in :node + +CORDA_INT_TESTING_FORKS : Global number of JVMS to fork for running integration tests +CORDA_TESTING_FORKS : Global number of JVMS to fork for running unit tests \ No newline at end of file diff --git a/build.gradle b/build.gradle index 0a87b1eef6..8c5e83fd5b 100644 --- a/build.gradle +++ b/build.gradle @@ -232,6 +232,7 @@ allprojects { } tasks.withType(Test) { + forkEvery = 10 failFast = project.hasProperty('tests.failFast') ? project.property('tests.failFast').toBoolean() : false // Prevent the project from creating temporary files outside of the build directory. @@ -239,15 +240,6 @@ allprojects { maxHeapSize = "1g" - if (project.hasProperty('test.parallel') && project.property('test.parallel').toBoolean()) { - maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) as int ?: 1 - } - - if (System.getProperty("test.maxParallelForks") != null) { - maxParallelForks = Integer.getInteger('test.maxParallelForks') - logger.debug("System property test.maxParallelForks found - setting max parallel forks to $maxParallelForks for $project") - } - if (project.path.startsWith(':experimental') && System.getProperty("experimental.test.enable") == null) { enabled = false } @@ -257,6 +249,16 @@ allprojects { extensions.configure(TypeOf.typeOf(JacocoTaskExtension)) { ex -> ex.append = false } + + maxParallelForks = (System.env.CORDA_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_TESTING_FORKS".toInteger() + + systemProperty 'java.security.egd', 'file:/dev/./urandom' + } + + tasks.withType(Test){ + if (name.contains("integrationTest")){ + maxParallelForks = (System.env.CORDA_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_INT_TESTING_FORKS".toInteger() + } } group 'net.corda' diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index 38bcf1490b..14f2cc21a9 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -10,15 +10,16 @@ import net.corda.core.internal.toPath import net.corda.core.messaging.* import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.OpaqueBytes +import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow import net.corda.finance.DOLLARS import net.corda.finance.POUNDS import net.corda.finance.USD import net.corda.finance.contracts.asset.Cash -import net.corda.finance.workflows.getCashBalance -import net.corda.finance.workflows.getCashBalances import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow +import net.corda.finance.workflows.getCashBalance +import net.corda.finance.workflows.getCashBalances import net.corda.node.internal.NodeWithInfo import net.corda.node.services.Permissions.Companion.all import net.corda.testing.common.internal.checkNotOnClasspath @@ -47,6 +48,7 @@ import kotlin.test.assertTrue class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = listOf(DUMMY_NOTARY_NAME)) { companion object { val rpcUser = User("user1", "test", permissions = setOf(all())) + val log = contextLogger() } private lateinit var node: NodeWithInfo @@ -97,13 +99,13 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = val nodeIsShut: PublishSubject = PublishSubject.create() val latch = CountDownLatch(1) var successful = false - val maxCount = 20 + val maxCount = 120 var count = 0 CloseableExecutor(Executors.newSingleThreadScheduledExecutor()).use { scheduler -> val task = scheduler.scheduleAtFixedRate({ try { - println("Checking whether node is still running...") + log.info("Checking whether node is still running...") client.start(rpcUser.username, rpcUser.password).use { println("... node is still running.") if (count == maxCount) { @@ -112,7 +114,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = count++ } } catch (e: RPCException) { - println("... node is not running.") + log.info("... node is not running.") nodeIsShut.onCompleted() } catch (e: ActiveMQSecurityException) { // nothing here - this happens if trying to connect before the node is started @@ -122,7 +124,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance"), notaries = }, 1, 1, TimeUnit.SECONDS) nodeIsShut.doOnError { error -> - error.printStackTrace() + log.error("FAILED TO SHUT DOWN NODE DUE TO", error) successful = false task.cancel(true) latch.countDown() diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 69f25dcf81..752d6e7196 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -41,7 +41,7 @@ class RPCStabilityTests { val testSerialization = SerializationEnvironmentRule(true) private val pool = Executors.newFixedThreadPool(10, testThreadFactory()) - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() @After fun shutdown() { diff --git a/core/build.gradle b/core/build.gradle index 8ba25591a6..4d2b75ab34 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -140,9 +140,9 @@ configurations { testArtifacts.extendsFrom testRuntimeClasspath } -tasks.withType(Test) { - // fork a new test process for every test class - forkEvery = 10 + +test{ + maxParallelForks = (System.env.CORDA_CORE_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_CORE_TESTING_FORKS".toInteger() } task testJar(type: Jar) { diff --git a/node/build.gradle b/node/build.gradle index b28b3df298..e3af275dce 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -207,18 +207,15 @@ tasks.withType(JavaCompile) { options.compilerArgs << '-proc:none' } -tasks.withType(Test) { +test { maxHeapSize = "2g" - // fork a new test process for every test class - forkEvery = 10 + maxParallelForks = (System.env.CORDA_NODE_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_TESTING_FORKS".toInteger() } task integrationTest(type: Test) { testClassesDirs = sourceSets.integrationTest.output.classesDirs classpath = sourceSets.integrationTest.runtimeClasspath - - systemProperty 'testing.global.port.allocation.enabled', true - systemProperty 'testing.global.port.allocation.starting.port', 10000 + maxParallelForks = (System.env.CORDA_NODE_INT_TESTING_FORKS == null) ? 1 : "$System.env.CORDA_NODE_INT_TESTING_FORKS".toInteger() } // quasar exclusions upon agent code instrumentation at run-time diff --git a/node/src/integration-test/kotlin/net/corda/node/AddressBindingFailureTests.kt b/node/src/integration-test/kotlin/net/corda/node/AddressBindingFailureTests.kt index f6044946f1..e640e36133 100644 --- a/node/src/integration-test/kotlin/net/corda/node/AddressBindingFailureTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/AddressBindingFailureTests.kt @@ -17,7 +17,7 @@ import java.net.ServerSocket class AddressBindingFailureTests { companion object { - private val portAllocation = incrementalPortAllocation(20_000) + private val portAllocation = incrementalPortAllocation() } @Test diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt index 9b4f89cae7..fd7a8007df 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/AMQPBridgeTest.kt @@ -41,7 +41,7 @@ class AMQPBridgeTest { private val BOB = TestIdentity(BOB_NAME) - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() private val artemisAddress = portAllocation.nextHostAndPort() private val amqpAddress = portAllocation.nextHostAndPort() diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt index b9ad2fd99b..13d220cbfb 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/CertificateRevocationListNodeTests.kt @@ -72,7 +72,7 @@ class CertificateRevocationListNodeTests { private val ROOT_CA = DEV_ROOT_CA private lateinit var INTERMEDIATE_CA: CertificateAndKeyPair - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() private val serverPort = portAllocation.nextPort() private lateinit var server: CrlServer diff --git a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt index 22068615e5..4e30ebe606 100644 --- a/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/amqp/ProtonWrapperTests.kt @@ -49,7 +49,7 @@ class ProtonWrapperTests { @JvmField val temporaryFolder = TemporaryFolder() - private val portAllocation = incrementalPortAllocation(15000) // use 15000 to move us out of harms way + private val portAllocation = incrementalPortAllocation() // use 15000 to move us out of harms way private val serverPort = portAllocation.nextPort() private val serverPort2 = portAllocation.nextPort() private val artemisPort = portAllocation.nextPort() diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt index a9ca103741..4c92186001 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/FlowsDrainingModeContentionTest.kt @@ -33,7 +33,7 @@ import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService class FlowsDrainingModeContentionTest { - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() private val user = User("mark", "dadada", setOf(all())) private val users = listOf(user) diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt index f59d025c8b..0330024f4f 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/P2PFlowsDrainingModeTest.kt @@ -7,9 +7,7 @@ import net.corda.core.internal.concurrent.map import net.corda.core.messaging.startFlow import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow -import net.corda.core.utilities.seconds import net.corda.core.utilities.unwrap -import net.corda.node.logging.logFile import net.corda.node.services.Permissions import net.corda.nodeapi.internal.hasCancelledDrainingShutdown import net.corda.testing.core.ALICE_NAME @@ -21,7 +19,6 @@ import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.internal.chooseIdentity import net.corda.testing.node.User import net.corda.testing.node.internal.waitForShutdown -import org.assertj.core.api.Assertions import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat import org.junit.After import org.junit.Before @@ -37,7 +34,7 @@ class P2PFlowsDrainingModeTest { private val logger = contextLogger() } - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() private val user = User("mark", "dadada", setOf(Permissions.all())) private val users = listOf(user) diff --git a/node/src/integration-test/kotlin/net/corda/node/modes/draining/RpcFlowsDrainingModeTest.kt b/node/src/integration-test/kotlin/net/corda/node/modes/draining/RpcFlowsDrainingModeTest.kt index 923428adbe..f5ae9b9ea4 100644 --- a/node/src/integration-test/kotlin/net/corda/node/modes/draining/RpcFlowsDrainingModeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/modes/draining/RpcFlowsDrainingModeTest.kt @@ -17,7 +17,7 @@ import org.junit.Test class RpcFlowsDrainingModeTest { - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() private val user = User("mark", "dadada", setOf(Permissions.all())) private val users = listOf(user) diff --git a/node/src/integration-test/kotlin/net/corda/node/persistence/H2SecurityTests.kt b/node/src/integration-test/kotlin/net/corda/node/persistence/H2SecurityTests.kt index fda850e497..80fed9f7d7 100644 --- a/node/src/integration-test/kotlin/net/corda/node/persistence/H2SecurityTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/persistence/H2SecurityTests.kt @@ -22,7 +22,7 @@ import kotlin.test.assertTrue class H2SecurityTests { companion object { - private val port = incrementalPortAllocation(21_000) + private val port = incrementalPortAllocation() private fun getFreePort() = port.nextPort() private const val h2AddressKey = "h2Settings.address" private const val dbPasswordKey = "dataSourceProperties.dataSource.password" diff --git a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index 34c588b763..6808d45c53 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -58,7 +58,7 @@ class ArtemisMessagingTest { val temporaryFolder = TemporaryFolder() // THe - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() private val serverPort = portAllocation.nextPort() private val identity = generateKeyPair() diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt index d75c6326cc..287fb40183 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/NetworkMapTest.kt @@ -40,7 +40,7 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP val testSerialization = SerializationEnvironmentRule(true) private val cacheTimeout = 1.seconds - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() private lateinit var networkMapServer: NetworkMapServer private lateinit var compatibilityZone: CompatibilityZoneParams diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt index 5a7b0a29ee..99d8816d38 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/ArtemisRpcTests.kt @@ -36,7 +36,7 @@ import java.nio.file.Path import javax.security.auth.x500.X500Principal class ArtemisRpcTests { - private val ports: PortAllocation = incrementalPortAllocation(10000) + private val ports: PortAllocation = incrementalPortAllocation() private val user = User("mark", "dadada", setOf(all())) private val users = listOf(user) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt index 36f088d0b1..e4b4bfd6af 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/rpc/RpcReconnectTests.kt @@ -17,10 +17,8 @@ import net.corda.finance.contracts.asset.Cash import net.corda.finance.flows.CashIssueAndPaymentFlow import net.corda.finance.schemas.CashSchemaV1 import net.corda.node.services.Permissions -import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.DUMMY_BANK_A_NAME import net.corda.testing.core.DUMMY_BANK_B_NAME -import net.corda.testing.core.DUMMY_NOTARY_NAME import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver @@ -28,7 +26,6 @@ import net.corda.testing.driver.internal.OutOfProcessImpl import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.node.User import net.corda.testing.node.internal.FINANCE_CORDAPPS -import org.junit.ClassRule import org.junit.Test import java.util.* import java.util.concurrent.CountDownLatch @@ -47,7 +44,7 @@ class RpcReconnectTests { private val log = contextLogger() } - private val portAllocator = incrementalPortAllocation(20006) + private val portAllocator = incrementalPortAllocation() /** * This test showcases and stress tests the demo [ReconnectingCordaRPCOps]. diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt index b8876c70ce..681eea0df0 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt @@ -63,7 +63,7 @@ class HardRestartTest { fun restartShortPingPongFlowRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) driver(DriverParameters( - portAllocation = incrementalPortAllocation(10000), + portAllocation = incrementalPortAllocation(), startNodesInProcess = false, inMemoryDB = false, notarySpecs = emptyList(), @@ -101,7 +101,7 @@ class HardRestartTest { fun restartLongPingPongFlowRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) driver(DriverParameters( - portAllocation = incrementalPortAllocation(10000), + portAllocation = incrementalPortAllocation(), startNodesInProcess = false, inMemoryDB = false, notarySpecs = emptyList(), @@ -139,7 +139,7 @@ class HardRestartTest { fun softRestartLongPingPongFlowRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) driver(DriverParameters( - portAllocation = incrementalPortAllocation(10000), + portAllocation = incrementalPortAllocation(), startNodesInProcess = false, inMemoryDB = false, notarySpecs = emptyList(), @@ -221,7 +221,7 @@ class HardRestartTest { fun restartRecursiveFlowRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) driver(DriverParameters( - portAllocation = incrementalPortAllocation(10000), + portAllocation = incrementalPortAllocation(), startNodesInProcess = false, inMemoryDB = false, notarySpecs = emptyList(), diff --git a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt index 30c317de55..20bd039032 100644 --- a/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/utilities/registration/NodeRegistrationTest.kt @@ -53,7 +53,7 @@ class NodeRegistrationTest { @JvmField val testSerialization = SerializationEnvironmentRule(true) - private val portAllocation = incrementalPortAllocation(13000) + private val portAllocation = incrementalPortAllocation() private val registrationHandler = RegistrationHandler(DEV_ROOT_CA) private lateinit var server: NetworkMapServer private lateinit var serverHostAndPort: NetworkHostAndPort diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/AdditionP2PAddressModeTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/AdditionP2PAddressModeTest.kt index f05b690e1a..53a699b97b 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/AdditionP2PAddressModeTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/AdditionP2PAddressModeTest.kt @@ -25,7 +25,7 @@ import org.junit.Test import java.util.* class AdditionP2PAddressModeTest { - private val portAllocation = incrementalPortAllocation(27182) + private val portAllocation = incrementalPortAllocation() @Test fun `runs nodes with one configured to use additionalP2PAddresses`() { val testUser = User("test", "test", setOf(all())) diff --git a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt index 146985df15..13f335374c 100644 --- a/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/CordaRPCOpsImplTest.kt @@ -109,7 +109,9 @@ class CordaRPCOpsImplTest { @After fun cleanUp() { - mockNet.stopNodes() + if (::mockNet.isInitialized) { + mockNet.stopNodes() + } } @Test diff --git a/node/src/test/kotlin/net/corda/node/services/keys/KMSUtilsTests.kt b/node/src/test/kotlin/net/corda/node/services/keys/KMSUtilsTests.kt index deebbf3363..d43a11c2e9 100644 --- a/node/src/test/kotlin/net/corda/node/services/keys/KMSUtilsTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/keys/KMSUtilsTests.kt @@ -9,11 +9,13 @@ import net.corda.testing.core.singleIdentityAndCert import net.corda.testing.node.MockServices import net.corda.testing.node.makeTestIdentityService import org.bouncycastle.asn1.DEROctetString +import org.junit.Ignore import org.junit.Test import kotlin.test.assertEquals class KMSUtilsTests { @Test + @Ignore fun `should generate certificates with the correct role`() { val aliceKey = generateKeyPair() val alice = getTestPartyAndCertificate(ALICE_NAME, aliceKey.public) diff --git a/node/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt b/node/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt index b61f1d3d85..46608be981 100644 --- a/node/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt +++ b/node/src/test/kotlin/net/corda/notary/experimental/raft/RaftTransactionCommitLogTests.kt @@ -17,7 +17,6 @@ import net.corda.core.utilities.getOrThrow import net.corda.node.services.schema.NodeSchemaService import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig -import net.corda.notary.experimental.raft.RaftNotarySchemaV1 import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.driver.internal.incrementalPortAllocation @@ -45,7 +44,7 @@ class RaftTransactionCommitLogTests { val testSerialization = SerializationEnvironmentRule(true) private val databases: MutableList = mutableListOf() - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() private lateinit var cluster: List diff --git a/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt b/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt index be9e74528b..04cd8ec21e 100644 --- a/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt +++ b/samples/attachment-demo/src/integration-test/kotlin/net/corda/attachmentdemo/AttachmentDemoTest.kt @@ -19,7 +19,7 @@ class AttachmentDemoTest { fun `attachment demo using a 10MB zip file`() { val numOfExpectedBytes = 10_000_000 driver(DriverParameters( - portAllocation = incrementalPortAllocation(20000), + portAllocation = incrementalPortAllocation(), startNodesInProcess = true, cordappsForAllNodes = listOf(findCordapp("net.corda.attachmentdemo.contracts"), findCordapp("net.corda.attachmentdemo.workflows"))) ) { diff --git a/sizing.sh b/sizing.sh new file mode 100755 index 0000000000..14992c1e44 --- /dev/null +++ b/sizing.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +echo "Running in shell $(ps -ef | grep $$ | grep -v grep)" +NUM_CPU=$(nproc) +if test ${NUM_CPU} -le 8 ; then + CORDA_CORE_TESTING_FORKS=1 + CORDA_NODE_INT_TESTING_FORKS=1 + CORDA_NODE_TESTING_FORKS=1 + CORDA_INT_TESTING_FORKS=1 + CORDA_TESTING_FORKS=1 +elif test ${NUM_CPU} -gt 8 && test ${NUM_CPU} -le 16 ; then + CORDA_CORE_TESTING_FORKS=2 + CORDA_NODE_INT_TESTING_FORKS=2 + CORDA_NODE_TESTING_FORKS=2 + CORDA_INT_TESTING_FORKS=2 + CORDA_TESTING_FORKS=2 +elif test ${NUM_CPU} -gt 16 && test ${NUM_CPU} -le 32 ; then + CORDA_CORE_TESTING_FORKS=4 + CORDA_NODE_INT_TESTING_FORKS=4 + CORDA_NODE_TESTING_FORKS=4 + CORDA_INT_TESTING_FORKS=2 + CORDA_TESTING_FORKS=2 +else + CORDA_CORE_TESTING_FORKS=8 + CORDA_NODE_INT_TESTING_FORKS=8 + CORDA_NODE_TESTING_FORKS=8 + CORDA_INT_TESTING_FORKS=4 + CORDA_TESTING_FORKS=4 +fi +echo "CORDA_CORE_TESTING_FORKS=${CORDA_CORE_TESTING_FORKS}" >> /etc/environment +echo "CORDA_NODE_INT_TESTING_FORKS=${CORDA_NODE_INT_TESTING_FORKS}" >> /etc/environment +echo "CORDA_NODE_TESTING_FORKS=${CORDA_NODE_TESTING_FORKS}" >> /etc/environment +echo "CORDA_INT_TESTING_FORKS=${CORDA_INT_TESTING_FORKS}" >> /etc/environment +echo "CORDA_TESTING_FORKS=${CORDA_TESTING_FORKS}" >> /etc/environment \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt index 4704416d0c..393a29f4be 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -26,6 +26,12 @@ import net.corda.testing.node.internal.genericDriver import net.corda.testing.node.internal.getTimestampAsDirectoryName import net.corda.testing.node.internal.newContext import rx.Observable +import sun.misc.Unsafe +import sun.nio.ch.DirectBuffer +import java.io.File +import java.io.RandomAccessFile +import java.nio.MappedByteBuffer +import java.nio.channels.FileChannel import java.nio.file.Path import java.nio.file.Paths import java.util.concurrent.atomic.AtomicInteger @@ -66,7 +72,6 @@ interface NodeHandle : AutoCloseable { fun stop() } - /** Interface which represents an out of process node and exposes its process handle. **/ @DoNotImplement interface OutOfProcess : NodeHandle { @@ -104,41 +109,85 @@ data class WebserverHandle( val process: Process ) -/** - * An abstract helper class which is used within the driver to allocate unused ports for testing. - */ @DoNotImplement abstract class PortAllocation { - /** Get the next available port **/ - abstract fun nextPort(): Int + + companion object { + @JvmStatic + val defaultAllocator: PortAllocation = SharedMemoryIncremental.INSTANCE + const val DEFAULT_START_PORT = 10_000 + const val FIRST_EPHEMERAL_PORT = 30_000 + } + /** Get the next available port via [nextPort] and then return a [NetworkHostAndPort] **/ - fun nextHostAndPort() = NetworkHostAndPort("localhost", nextPort()) + fun nextHostAndPort(): NetworkHostAndPort = NetworkHostAndPort("localhost", nextPort()) - /** - * An implementation of [PortAllocation] which allocates ports sequentially - */ + abstract fun nextPort(): Int + + @DoNotImplement + @Deprecated("This has been superseded by net.corda.testing.driver.SharedMemoryIncremental.INSTANCE", ReplaceWith("SharedMemoryIncremental.INSTANCE")) open class Incremental(private val startingPort: Int) : PortAllocation() { - private companion object { - private const val FIRST_EPHEMERAL_PORT = 49152 - } /** The backing [AtomicInteger] used to keep track of the currently allocated port */ - val portCounter = AtomicInteger(startingPort) + @Deprecated("This has been superseded by net.corda.testing.driver.SharedMemoryIncremental.INSTANCE", ReplaceWith("net.corda.testing.driver.DriverDSL.nextPort()")) + val portCounter: AtomicInteger = AtomicInteger() + + @Deprecated("This has been superseded by net.corda.testing.driver.SharedMemoryIncremental.INSTANCE", ReplaceWith("net.corda.testing.driver.DriverDSL.nextPort()")) + override fun nextPort(): Int { + return SharedMemoryIncremental.INSTANCE.nextPort() + } + } + + private class SharedMemoryIncremental private constructor(startPort: Int, endPort: Int, file: File = File(System.getProperty("user.home"), "corda-$startPort-to-$endPort-port-allocator.bin")) : PortAllocation() { + + private val startingPoint: Int = startPort + private val endPoint: Int = endPort + + private val backingFile: RandomAccessFile = RandomAccessFile(file, "rw") + private val mb: MappedByteBuffer + private val startingAddress: Long + + /** + * An implementation of [PortAllocation] which allocates ports sequentially + */ + + companion object { + + private val UNSAFE: Unsafe = getUnsafe() + private fun getUnsafe(): Unsafe { + val f = Unsafe::class.java.getDeclaredField("theUnsafe") + f.isAccessible = true + return f.get(null) as Unsafe + } + + val INSTANCE = SharedMemoryIncremental(DEFAULT_START_PORT, FIRST_EPHEMERAL_PORT) + } override fun nextPort(): Int { - return portCounter.getAndUpdate { i -> - val next = i + 1 - if (next >= FIRST_EPHEMERAL_PORT) { - startingPort + var oldValue: Long + var newValue: Long + do { + oldValue = UNSAFE.getLongVolatile(null, startingAddress) + newValue = if (oldValue + 1 >= endPoint || oldValue < startingPoint) { + startingPoint.toLong() } else { - next + (oldValue + 1) } - } + } while (!UNSAFE.compareAndSwapLong(null, startingAddress, oldValue, newValue)) + + return newValue.toInt() + } + + init { + mb = backingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 16) + startingAddress = (mb as DirectBuffer).address() } } } + + /** * A class containing configuration information for Jolokia JMX, to be used when creating a node via the [driver]. * @@ -152,7 +201,7 @@ data class JmxPolicy @Deprecated("Use the constructor that just takes in the jmxHttpServerPortAllocation or use JmxPolicy.defaultEnabled()") constructor( val startJmxHttpServer: Boolean = false, - val jmxHttpServerPortAllocation: PortAllocation = incrementalPortAllocation(7005) + val jmxHttpServerPortAllocation: PortAllocation = incrementalPortAllocation() ) { @Deprecated("The default constructor does not turn on monitoring. Simply leave the jmxPolicy parameter unspecified if you wish to not " + "have monitoring turned on.") @@ -245,9 +294,9 @@ fun driver(defaultParameters: DriverParameters = DriverParameters(), dsl: Dr @Suppress("unused") data class DriverParameters( val isDebug: Boolean = false, - val driverDirectory: Path = Paths.get("build") / "node-driver" / getTimestampAsDirectoryName(), - val portAllocation: PortAllocation = incrementalPortAllocation(10000), - val debugPortAllocation: PortAllocation = incrementalPortAllocation(5005), + val driverDirectory: Path = Paths.get("build") / "node-driver" / getTimestampAsDirectoryName(), + val portAllocation: PortAllocation = incrementalPortAllocation(), + val debugPortAllocation: PortAllocation = incrementalPortAllocation(), val systemProperties: Map = emptyMap(), val useTestClock: Boolean = false, val startNodesInProcess: Boolean = false, @@ -266,9 +315,9 @@ data class DriverParameters( constructor( isDebug: Boolean = false, - driverDirectory: Path = Paths.get("build") / "node-driver" / getTimestampAsDirectoryName(), - portAllocation: PortAllocation = incrementalPortAllocation(10000), - debugPortAllocation: PortAllocation = incrementalPortAllocation(5005), + driverDirectory: Path = Paths.get("build") / "node-driver" / getTimestampAsDirectoryName(), + portAllocation: PortAllocation = incrementalPortAllocation(), + debugPortAllocation: PortAllocation = incrementalPortAllocation(), systemProperties: Map = emptyMap(), useTestClock: Boolean = false, startNodesInProcess: Boolean = false, @@ -372,6 +421,7 @@ data class DriverParameters( @Deprecated("extraCordappPackagesToScan does not preserve the original CorDapp's versioning and metadata, which may lead to " + "misleading results in tests. Use withCordappsForAllNodes instead.") fun withExtraCordappPackagesToScan(extraCordappPackagesToScan: List): DriverParameters = copy(extraCordappPackagesToScan = extraCordappPackagesToScan) + fun withJmxPolicy(jmxPolicy: JmxPolicy): DriverParameters = copy(jmxPolicy = jmxPolicy) fun withNetworkParameters(networkParameters: NetworkParameters): DriverParameters = copy(networkParameters = networkParameters) fun withNotaryCustomOverrides(notaryCustomOverrides: Map): DriverParameters = copy(notaryCustomOverrides = notaryCustomOverrides) @@ -437,4 +487,4 @@ data class DriverParameters( notaryCustomOverrides = emptyMap(), cordappsForAllNodes = cordappsForAllNodes ) -} +} \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt index 00fdf508a6..3b887dcaa6 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt @@ -171,4 +171,9 @@ interface DriverDSL { * is needed before the node is started. */ fun baseDirectory(nodeName: CordaX500Name): Path + + /** + * Returns the next port to use when instantiating test processes that must not conflict on the same machine + */ + fun nextPort() = PortAllocation.defaultAllocator.nextPort() } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt index 6764719b77..af58e4d7cb 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/NodeParameters.kt @@ -30,7 +30,7 @@ data class NodeParameters( val verifierType: VerifierType = VerifierType.InMemory, val customOverrides: Map = emptyMap(), val startInSameProcess: Boolean? = null, - val maximumHeapSize: String = "512m", + val maximumHeapSize: String = System.getenv("DRIVER_NODE_MEMORY") ?: "512m", val additionalCordapps: Collection = emptySet(), val flowOverrides: Map>, Class>> = emptyMap(), val logLevelOverride : String? = null diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/GlobalTestPortAllocation.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/GlobalTestPortAllocation.kt index f868bce7ef..8d8bb3409e 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/GlobalTestPortAllocation.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/internal/GlobalTestPortAllocation.kt @@ -2,24 +2,7 @@ package net.corda.testing.driver.internal import net.corda.testing.driver.PortAllocation -fun incrementalPortAllocation(startingPortIfNoEnv: Int): PortAllocation { - - return when { - System.getProperty(enablingSystemProperty)?.toBoolean() ?: System.getenv(enablingEnvVar)?.toBoolean() == true -> GlobalTestPortAllocation - else -> PortAllocation.Incremental(startingPortIfNoEnv) - } +fun incrementalPortAllocation(): PortAllocation { + return PortAllocation.defaultAllocator } -private object GlobalTestPortAllocation : PortAllocation.Incremental(startingPort = startingPort()) - -private const val enablingEnvVar = "TESTING_GLOBAL_PORT_ALLOCATION_ENABLED" -private const val startingPortEnvVariable = "TESTING_GLOBAL_PORT_ALLOCATION_STARTING_PORT" -private val enablingSystemProperty = enablingEnvVar.toLowerCase().replace("_", ".") -private val startingPortSystemProperty = startingPortEnvVariable.toLowerCase().replace("_", ".") -private const val startingPortDefaultValue = 5000 - - -private fun startingPort(): Int { - - return System.getProperty(startingPortSystemProperty)?.toIntOrNull() ?: System.getenv(startingPortEnvVariable)?.toIntOrNull() ?: startingPortDefaultValue -} \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt index 48ed3c20cf..3c43c99a97 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/NodeBasedTest.kt @@ -56,7 +56,7 @@ constructor(private val cordappPackages: List = emptyList(), private val private lateinit var defaultNetworkParameters: NetworkParametersCopier protected val notaryNodes = mutableListOf() private val nodes = mutableListOf() - private val portAllocation = incrementalPortAllocation(10000) + private val portAllocation = incrementalPortAllocation() init { System.setProperty("consoleLogLevel", Level.DEBUG.name().toLowerCase()) diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt index 4cafcf5dd0..fad709c789 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/RPCDriver.kt @@ -101,8 +101,8 @@ val rpcTestUser = User("user1", "test", permissions = emptySet()) val fakeNodeLegalName = CordaX500Name(organisation = "Not:a:real:name", locality = "Nowhere", country = "GB") // Use a global pool so that we can run RPC tests in parallel -private val globalPortAllocation = incrementalPortAllocation(10000) -private val globalDebugPortAllocation = incrementalPortAllocation(5005) +private val globalPortAllocation = incrementalPortAllocation() +private val globalDebugPortAllocation = incrementalPortAllocation() fun rpcDriver( isDebug: Boolean = false, diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappImpl.kt index 8ac3bbc428..9dc5fcf323 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappImpl.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/TestCordappImpl.kt @@ -6,6 +6,8 @@ import net.corda.core.utilities.contextLogger import net.corda.testing.node.TestCordapp import org.gradle.tooling.GradleConnector import org.gradle.tooling.ProgressEvent +import java.io.File +import java.io.RandomAccessFile import java.nio.file.Path import java.util.* import java.util.concurrent.ConcurrentHashMap @@ -77,13 +79,20 @@ data class TestCordappImpl(val scanPackage: String, override val config: Map + val next = portAllocator.nextPort() + Assert.assertThat(next, `is`(not(previous))) + Assert.assertThat(next, `is`(OrderingComparison.lessThan(PortAllocation.FIRST_EPHEMERAL_PORT))) + + if (next == startingPoint) { + Assert.assertThat(previous, `is`(PortAllocation.FIRST_EPHEMERAL_PORT - 1)) + } else { + Assert.assertThat(next, `is`(previous + 1)) + } + previous = next + } + } + + @Test(timeout = 120_000) + fun `should support multiprocess port allocation`() { + + println("Starting multiprocess port allocation test") + val spinnerFile = Files.newTemporaryFile().also { it.deleteOnExit() }.absolutePath + val process1 = buildJvmProcess(spinnerFile, 1) + val process2 = buildJvmProcess(spinnerFile, 2) + + println("Started child processes") + + val processes = listOf(process1, process2) + + val spinnerBackingFile = RandomAccessFile(spinnerFile, "rw") + println("Mapped spinner file") + val spinnerBuffer = spinnerBackingFile.channel.map(FileChannel.MapMode.READ_WRITE, 0, 512) + println("Created spinner buffer") + + var timeWaited = 0L + + while (spinnerBuffer.getShort(1) != 10.toShort() && spinnerBuffer.getShort(2) != 10.toShort() && timeWaited < 60_000) { + println("Waiting to childProcesses to report back. waited ${timeWaited}ms") + Thread.sleep(1000) + timeWaited += 1000 + } + + //GO! + println("Instructing child processes to start allocating ports") + spinnerBuffer.putShort(0, 8) + println("Waiting for child processes to terminate") + processes.forEach { it.waitFor(1, TimeUnit.MINUTES) } + println("child processes terminated") + + val process1Output = process1.inputStream.reader().readLines().toSet() + val process2Output = process2.inputStream.reader().readLines().toSet() + + println("child process out captured") + + Assert.assertThat(process1Output.size, `is`(10_000)) + Assert.assertThat(process2Output.size, `is`(10_000)) + + //there should be no overlap between the outputs as each process should have been allocated a unique set of ports + Assert.assertThat(process1Output.intersect(process2Output), `is`(emptySet())) + } + + private fun buildJvmProcess(spinnerFile: String, reportingIndex: Int): Process { + val separator = System.getProperty("file.separator") + val classpath = System.getProperty("java.class.path") + val path = (System.getProperty("java.home") + + separator + "bin" + separator + "java") + val processBuilder = ProcessBuilder(path, "-cp", + classpath, + PortAllocationRunner::class.java.name, + spinnerFile, + reportingIndex.toString()) + + return processBuilder.start() + } +} + + + diff --git a/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt b/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt index 41e0c93a37..b8c8700fda 100644 --- a/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt +++ b/tools/loadtest/src/main/kotlin/net/corda/loadtest/LoadTest.kt @@ -170,7 +170,7 @@ fun runLoadTests(configuration: LoadTestConfiguration, tests: List + connectToNodes(remoteNodes, incrementalPortAllocation()) { connections -> log.info("Connected to all nodes!") val hostNodeMap = ConcurrentHashMap() connections.parallelStream().forEach { connection ->