diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 811d77a46e..a6ff6eb80d 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -13,7 +13,7 @@ import net.corda.core.utilities.* import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.RPCApi import net.corda.testing.SerializationEnvironmentRule -import net.corda.testing.driver.poll +import net.corda.testing.internal.poll import net.corda.testing.internal.* import org.apache.activemq.artemis.api.core.SimpleString import org.junit.After @@ -231,6 +231,7 @@ class RPCStabilityTests { override val protocolVersion = 0 override fun ping() = "pong" } + val serverFollower = shutdownManager.follower() val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! serverFollower.unfollow() @@ -348,7 +349,7 @@ class RPCStabilityTests { } -fun RPCDriverExposedDSLInterface.pollUntilClientNumber(server: RpcServerHandle, expected: Int) { +fun RPCDriverDSL.pollUntilClientNumber(server: RpcServerHandle, expected: Int) { pollUntilTrue("number of RPC clients to become $expected") { val clientAddresses = server.broker.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) } clientAddresses.size == expected diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt index 546415e247..053c8b85c1 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/AbstractRPCTest.kt @@ -7,7 +7,7 @@ import net.corda.core.messaging.RPCOps import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.internal.config.User import net.corda.testing.SerializationEnvironmentRule -import net.corda.testing.internal.RPCDriverExposedDSLInterface +import net.corda.testing.internal.RPCDriverDSL import net.corda.testing.internal.rpcTestUser import net.corda.testing.internal.startInVmRpcClient import net.corda.testing.internal.startRpcClient @@ -41,7 +41,7 @@ open class AbstractRPCTest { val createSession: () -> ClientSession ) - inline fun RPCDriverExposedDSLInterface.testProxy( + inline fun RPCDriverDSL.testProxy( ops: I, rpcUser: User = rpcTestUser, clientConfiguration: RPCClientConfiguration = RPCClientConfiguration.default, @@ -55,9 +55,9 @@ open class AbstractRPCTest { } } RPCTestMode.Netty -> - startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { server -> - startRpcClient(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map { - TestProxy(it, { startArtemisSession(server.broker.hostAndPort!!, rpcUser.username, rpcUser.password) }) + startRpcServer(ops = ops, rpcUser = rpcUser, configuration = serverConfiguration).flatMap { (broker) -> + startRpcClient(broker.hostAndPort!!, rpcUser.username, rpcUser.password, clientConfiguration).map { + TestProxy(it, { startArtemisSession(broker.hostAndPort!!, rpcUser.username, rpcUser.password) }) } } }.get() diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt index a5bf6c9b7d..eee72cf1b5 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/ClientRPCInfrastructureTests.kt @@ -7,7 +7,7 @@ import net.corda.core.internal.concurrent.thenMatch import net.corda.core.messaging.RPCOps import net.corda.core.utilities.getOrThrow import net.corda.node.services.messaging.rpcContext -import net.corda.testing.internal.RPCDriverExposedDSLInterface +import net.corda.testing.internal.RPCDriverDSL import net.corda.testing.internal.rpcDriver import net.corda.testing.internal.rpcTestUser import org.assertj.core.api.Assertions.assertThat @@ -26,7 +26,7 @@ import kotlin.test.assertTrue class ClientRPCInfrastructureTests : AbstractRPCTest() { // TODO: Test that timeouts work - private fun RPCDriverExposedDSLInterface.testProxy(): TestOps { + private fun RPCDriverDSL.testProxy(): TestOps { return testProxy(TestOpsImpl()).ops } diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt index 7ba003f910..6d80a39398 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCConcurrencyTests.kt @@ -1,15 +1,15 @@ package net.corda.client.rpc import net.corda.client.rpc.internal.RPCClientConfiguration -import net.corda.core.messaging.RPCOps -import net.corda.core.utilities.millis import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose +import net.corda.core.messaging.RPCOps import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.millis import net.corda.node.services.messaging.RPCServerConfiguration -import net.corda.testing.internal.RPCDriverExposedDSLInterface +import net.corda.testing.internal.RPCDriverDSL import net.corda.testing.internal.rpcDriver import net.corda.testing.internal.testThreadFactory import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet @@ -20,7 +20,10 @@ import org.junit.runners.Parameterized import rx.Observable import rx.subjects.UnicastSubject import java.util.* -import java.util.concurrent.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executor +import java.util.concurrent.Executors @RunWith(Parameterized::class) class RPCConcurrencyTests : AbstractRPCTest() { @@ -84,7 +87,7 @@ class RPCConcurrencyTests : AbstractRPCTest() { } } - private fun RPCDriverExposedDSLInterface.testProxy(): TestProxy { + private fun RPCDriverDSL.testProxy(): TestProxy { return testProxy( TestOpsImpl(pool), clientConfiguration = RPCClientConfiguration.default.copy( diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt index ede114cf7d..90a01ec645 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPerformanceTests.kt @@ -5,14 +5,14 @@ import net.corda.client.rpc.internal.RPCClientConfiguration import net.corda.core.messaging.RPCOps import net.corda.core.utilities.minutes import net.corda.core.utilities.seconds -import net.corda.testing.internal.performance.div import net.corda.node.services.messaging.RPCServerConfiguration -import net.corda.testing.internal.RPCDriverExposedDSLInterface -import net.corda.testing.measure +import net.corda.testing.internal.RPCDriverDSL +import net.corda.testing.internal.performance.div import net.corda.testing.internal.performance.startPublishingFixedRateInjector import net.corda.testing.internal.performance.startReporter import net.corda.testing.internal.performance.startTightLoopInjector import net.corda.testing.internal.rpcDriver +import net.corda.testing.measure import org.junit.Ignore import org.junit.Test import org.junit.runner.RunWith @@ -42,7 +42,7 @@ class RPCPerformanceTests : AbstractRPCTest() { } } - private fun RPCDriverExposedDSLInterface.testProxy( + private fun RPCDriverDSL.testProxy( clientConfiguration: RPCClientConfiguration, serverConfiguration: RPCServerConfiguration ): TestProxy { diff --git a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt index 6e7de32087..c7c1e71346 100644 --- a/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt +++ b/client/rpc/src/test/kotlin/net/corda/client/rpc/RPCPermissionsTests.kt @@ -5,7 +5,7 @@ import net.corda.core.messaging.RPCOps import net.corda.node.services.Permissions.Companion.invokeRpc import net.corda.node.services.messaging.rpcContext import net.corda.nodeapi.internal.config.User -import net.corda.testing.internal.RPCDriverExposedDSLInterface +import net.corda.testing.internal.RPCDriverDSL import net.corda.testing.internal.rpcDriver import org.junit.Test import org.junit.runner.RunWith @@ -37,7 +37,7 @@ class RPCPermissionsTests : AbstractRPCTest() { /** * Create an RPC proxy for the given user. */ - private fun RPCDriverExposedDSLInterface.testProxyFor(rpcUser: User) = testProxy(TestOpsImpl(), rpcUser).ops + private fun RPCDriverDSL.testProxyFor(rpcUser: User) = testProxy(TestOpsImpl(), rpcUser).ops private fun userOf(name: String, permissions: Set) = User(name, "password", permissions) diff --git a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt index a072d9ec34..9e51829bdc 100644 --- a/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt +++ b/core/src/test/kotlin/net/corda/core/flows/ContractUpgradeFlowTest.kt @@ -20,14 +20,17 @@ import net.corda.node.internal.SecureCordaRPCOps import net.corda.node.internal.StartedNode import net.corda.node.services.Permissions.Companion.startFlow import net.corda.nodeapi.internal.config.User -import net.corda.testing.* +import net.corda.testing.ALICE_NAME +import net.corda.testing.BOB_NAME import net.corda.testing.contracts.DummyContract import net.corda.testing.contracts.DummyContractV2 -import net.corda.testing.internal.RPCDriverExposedDSLInterface +import net.corda.testing.internal.RPCDriverDSL import net.corda.testing.internal.rpcDriver import net.corda.testing.internal.rpcTestUser import net.corda.testing.internal.startRpcClient import net.corda.testing.node.MockNetwork +import net.corda.testing.singleIdentity +import net.corda.testing.startFlow import org.junit.After import org.junit.Before import org.junit.Test @@ -120,7 +123,7 @@ class ContractUpgradeFlowTest { check(bobNode) } - private fun RPCDriverExposedDSLInterface.startProxy(node: StartedNode<*>, user: User): CordaRPCOps { + private fun RPCDriverDSL.startProxy(node: StartedNode<*>, user: User): CordaRPCOps { return startRpcClient( rpcAddress = startRpcServer( rpcUser = user, diff --git a/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt b/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt index 15abb2b06d..cc3c1e0dc8 100644 --- a/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/NodePerformanceTests.kt @@ -17,11 +17,12 @@ import net.corda.nodeapi.internal.config.User import net.corda.testing.DUMMY_NOTARY import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.driver -import net.corda.testing.node.NotarySpec +import net.corda.testing.internal.InternalDriverDSL import net.corda.testing.internal.performance.div import net.corda.testing.internal.performance.startPublishingFixedRateInjector import net.corda.testing.internal.performance.startReporter import net.corda.testing.internal.performance.startTightLoopInjector +import net.corda.testing.node.NotarySpec import org.junit.Before import org.junit.Ignore import org.junit.Test @@ -91,7 +92,7 @@ class NodePerformanceTests { driver(startNodesInProcess = true) { val a = startNode(rpcUsers = listOf(User("A", "A", setOf(startFlow())))).get() a as NodeHandle.InProcess - val metricRegistry = startReporter(shutdownManager, a.node.services.monitoringService.metrics) + val metricRegistry = startReporter((this as InternalDriverDSL).shutdownManager, a.node.services.monitoringService.metrics) a.rpcClientToNode().use("A", "A") { connection -> startPublishingFixedRateInjector(metricRegistry, 8, 5.minutes, 2000L / TimeUnit.SECONDS) { connection.proxy.startFlow(::EmptyFlow).returnValue.get() @@ -109,7 +110,7 @@ class NodePerformanceTests { extraCordappPackagesToScan = listOf("net.corda.finance") ) { val notary = defaultNotaryNode.getOrThrow() as NodeHandle.InProcess - val metricRegistry = startReporter(shutdownManager, notary.node.services.monitoringService.metrics) + val metricRegistry = startReporter((this as InternalDriverDSL).shutdownManager, notary.node.services.monitoringService.metrics) notary.rpcClientToNode().use("A", "A") { connection -> println("ISSUING") val doneFutures = (1..100).toList().parallelStream().map { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/AttachmentLoadingTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/AttachmentLoadingTests.kt index 02619d2ff7..d31b79c255 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/AttachmentLoadingTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/AttachmentLoadingTests.kt @@ -20,13 +20,14 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.getOrThrow import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.internal.cordapp.CordappProviderImpl -import net.corda.testing.* import net.corda.testing.DUMMY_BANK_A import net.corda.testing.DUMMY_NOTARY -import net.corda.testing.driver.DriverDSLExposedInterface +import net.corda.testing.driver.DriverDSL import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.driver import net.corda.testing.node.MockAttachmentStorage +import net.corda.testing.rigorousMock +import net.corda.testing.withTestSerialization import org.junit.Assert.assertEquals import org.junit.Test import java.net.URLClassLoader @@ -51,16 +52,16 @@ class AttachmentLoadingTests { Class.forName("net.corda.finance.contracts.isolated.IsolatedDummyFlow\$Initiator", true, URLClassLoader(arrayOf(isolatedJAR))) .asSubclass(FlowLogic::class.java) - private fun DriverDSLExposedInterface.createTwoNodes(): List { + private fun DriverDSL.createTwoNodes(): List { return listOf( startNode(providedName = bankAName), startNode(providedName = bankBName) ).transpose().getOrThrow() } - private fun DriverDSLExposedInterface.installIsolatedCordappTo(nodeName: CordaX500Name) { + private fun DriverDSL.installIsolatedCordappTo(nodeName: CordaX500Name) { // Copy the app jar to the first node. The second won't have it. - val path = (baseDirectory(nodeName.toString()) / "cordapps").createDirectories() / "isolated.jar" + val path = (baseDirectory(nodeName) / "cordapps").createDirectories() / "isolated.jar" logger.info("Installing isolated jar to $path") isolatedJAR.openStream().buffered().use { input -> Files.newOutputStream(path).buffered().use { output -> diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index 6d480bdf87..769015bfba 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -18,7 +18,7 @@ import net.corda.node.services.messaging.* import net.corda.node.services.transactions.RaftValidatingNotaryService import net.corda.testing.ALICE import net.corda.testing.chooseIdentity -import net.corda.testing.driver.DriverDSLExposedInterface +import net.corda.testing.driver.DriverDSL import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.driver import net.corda.testing.node.ClusterSpec @@ -116,13 +116,13 @@ class P2PMessagingTest { } } - private fun startDriverWithDistributedService(dsl: DriverDSLExposedInterface.(List>) -> Unit) { + private fun startDriverWithDistributedService(dsl: DriverDSL.(List>) -> Unit) { driver(startNodesInProcess = true, notarySpecs = listOf(NotarySpec(DISTRIBUTED_SERVICE_NAME, cluster = ClusterSpec.Raft(clusterSize = 2)))) { dsl(defaultNotaryHandle.nodeHandles.getOrThrow().map { (it as NodeHandle.InProcess).node }) } } - private fun DriverDSLExposedInterface.startAlice(): StartedNode { + private fun DriverDSL.startAlice(): StartedNode { return startNode(providedName = ALICE.name, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)) .map { (it as NodeHandle.InProcess).node } .getOrThrow() diff --git a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt index 17313ec688..e048bc791e 100644 --- a/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt +++ b/samples/attachment-demo/src/main/kotlin/net/corda/attachmentdemo/AttachmentDemo.kt @@ -25,7 +25,7 @@ import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.getOrThrow import net.corda.testing.DUMMY_BANK_B import net.corda.testing.DUMMY_NOTARY -import net.corda.testing.driver.poll +import net.corda.testing.internal.poll import java.io.InputStream import java.net.HttpURLConnection import java.net.URL diff --git a/samples/irs-demo/src/integration-test/kotlin/net/corda/test/spring/SpringDriver.kt b/samples/irs-demo/src/integration-test/kotlin/net/corda/test/spring/SpringDriver.kt index 4de01a0d78..692c252f7a 100644 --- a/samples/irs-demo/src/integration-test/kotlin/net/corda/test/spring/SpringDriver.kt +++ b/samples/irs-demo/src/integration-test/kotlin/net/corda/test/spring/SpringDriver.kt @@ -3,8 +3,11 @@ package net.corda.test.spring import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.concurrent.map import net.corda.core.utilities.contextLogger -import net.corda.testing.driver.* -import net.corda.testing.internal.ProcessUtilities +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.PortAllocation +import net.corda.testing.driver.WebserverHandle +import net.corda.testing.internal.* import net.corda.testing.node.NotarySpec import okhttp3.OkHttpClient import okhttp3.Request @@ -14,22 +17,6 @@ import java.nio.file.Path import java.nio.file.Paths import java.util.concurrent.TimeUnit -interface SpringDriverExposedDSLInterface : DriverDSLExposedInterface { - - /** - * Starts a Spring Boot application, passes the RPC connection data as parameters the process. - * Returns future which will complete after (and if) the server passes healthcheck. - * @param clazz Class with main method which is expected to run Spring application - * @param handle Corda Node handle this webapp is expected to connect to - * @param checkUrl URL path to use for server readiness check - uses [okhttp3.Response.isSuccessful] as qualifier - * - * TODO: Rather then expecting a given clazz to contain main method which start Spring app our own simple class can do this - */ - fun startSpringBootWebapp(clazz: Class<*>, handle: NodeHandle, checkUrl: String): CordaFuture -} - -interface SpringDriverInternalDSLInterface : DriverDSLInternalInterface, SpringDriverExposedDSLInterface - fun springDriver( defaultParameters: DriverParameters = DriverParameters(), isDebug: Boolean = defaultParameters.isDebug, @@ -42,29 +29,40 @@ fun springDriver( startNodesInProcess: Boolean = defaultParameters.startNodesInProcess, notarySpecs: List, extraCordappPackagesToScan: List = defaultParameters.extraCordappPackagesToScan, - dsl: SpringDriverExposedDSLInterface.() -> A -) = genericDriver( - defaultParameters = defaultParameters, - isDebug = isDebug, - driverDirectory = driverDirectory, - portAllocation = portAllocation, - debugPortAllocation = debugPortAllocation, - systemProperties = systemProperties, - useTestClock = useTestClock, - initialiseSerialization = initialiseSerialization, - startNodesInProcess = startNodesInProcess, - extraCordappPackagesToScan = extraCordappPackagesToScan, - notarySpecs = notarySpecs, - driverDslWrapper = { driverDSL:DriverDSL -> SpringBootDriverDSL(driverDSL) }, - coerce = { it }, dsl = dsl -) + dsl: SpringBootDriverDSL.() -> A +): A { + return genericDriver( + defaultParameters = defaultParameters, + isDebug = isDebug, + driverDirectory = driverDirectory, + portAllocation = portAllocation, + debugPortAllocation = debugPortAllocation, + systemProperties = systemProperties, + useTestClock = useTestClock, + initialiseSerialization = initialiseSerialization, + startNodesInProcess = startNodesInProcess, + extraCordappPackagesToScan = extraCordappPackagesToScan, + notarySpecs = notarySpecs, + driverDslWrapper = { driverDSL: DriverDSLImpl -> SpringBootDriverDSL(driverDSL) }, + coerce = { it }, dsl = dsl + ) +} -data class SpringBootDriverDSL(private val driverDSL: DriverDSL) : DriverDSLInternalInterface by driverDSL, SpringDriverInternalDSLInterface { +data class SpringBootDriverDSL(private val driverDSL: DriverDSLImpl) : InternalDriverDSL by driverDSL { companion object { private val log = contextLogger() } - override fun startSpringBootWebapp(clazz: Class<*>, handle: NodeHandle, checkUrl: String): CordaFuture { + /** + * Starts a Spring Boot application, passes the RPC connection data as parameters the process. + * Returns future which will complete after (and if) the server passes healthcheck. + * @param clazz Class with main method which is expected to run Spring application + * @param handle Corda Node handle this webapp is expected to connect to + * @param checkUrl URL path to use for server readiness check - uses [okhttp3.Response.isSuccessful] as qualifier + * + * TODO: Rather then expecting a given clazz to contain main method which start Spring app our own simple class can do this + */ + fun startSpringBootWebapp(clazz: Class<*>, handle: NodeHandle, checkUrl: String): CordaFuture { val debugPort = if (driverDSL.isDebug) driverDSL.debugPortAllocation.nextPort() else null val process = startApplication(handle, debugPort, clazz) driverDSL.shutdownManager.registerProcessShutdown(process) diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index a4b7285938..4283f061be 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -15,7 +15,7 @@ import net.corda.testing.DUMMY_BANK_B import net.corda.testing.chooseIdentity import net.corda.testing.driver.NodeHandle import net.corda.testing.driver.driver -import net.corda.testing.driver.poll +import net.corda.testing.internal.poll import net.corda.traderdemo.flow.BuyerFlow import net.corda.traderdemo.flow.CommercialPaperIssueFlow import net.corda.traderdemo.flow.SellerFlow diff --git a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt index e6d581f236..37bb84424e 100644 --- a/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt +++ b/testing/node-driver/src/integration-test/kotlin/net/corda/testing/driver/DriverTests.kt @@ -10,6 +10,8 @@ import net.corda.testing.DUMMY_BANK_A import net.corda.testing.DUMMY_NOTARY import net.corda.testing.DUMMY_REGULATOR import net.corda.testing.common.internal.ProjectStructure.projectRootDir +import net.corda.testing.internal.addressMustBeBound +import net.corda.testing.internal.addressMustNotBeBound import net.corda.testing.node.NotarySpec import org.assertj.core.api.Assertions.assertThat import org.junit.Test @@ -74,7 +76,7 @@ class DriverTests { } val baseDirectory = driver(notarySpecs = listOf(NotarySpec(DUMMY_NOTARY.name))) { - (this as DriverDSL).baseDirectory(DUMMY_NOTARY.name) + baseDirectory(DUMMY_NOTARY.name) } assertThat(baseDirectory / "process-id").doesNotExist() } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt index e46742f885..5e5cf90e9d 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -2,202 +2,35 @@ package net.corda.testing.driver -import com.google.common.util.concurrent.ThreadFactoryBuilder -import com.typesafe.config.Config -import com.typesafe.config.ConfigRenderOptions import net.corda.client.rpc.CordaRPCClient -import net.corda.cordform.CordformContext -import net.corda.cordform.CordformNode -import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture -import net.corda.core.concurrent.firstOf import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party -import net.corda.core.internal.* -import net.corda.core.internal.concurrent.* import net.corda.core.messaging.CordaRPCOps import net.corda.core.node.NodeInfo -import net.corda.core.node.services.NetworkMapCache -import net.corda.core.node.services.NotaryService -import net.corda.core.toFuture -import net.corda.core.utilities.* +import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.Node -import net.corda.node.internal.NodeStartup import net.corda.node.internal.StartedNode -import net.corda.node.services.Permissions.Companion.invokeRpc -import net.corda.node.services.config.* -import net.corda.node.utilities.ServiceIdentityGenerator -import net.corda.nodeapi.NodeInfoFilesCopier +import net.corda.node.services.config.NodeConfiguration +import net.corda.node.services.config.VerifierType import net.corda.nodeapi.internal.config.User -import net.corda.nodeapi.internal.config.toConfig -import net.corda.nodeapi.internal.addShutdownHook -import net.corda.testing.* +import net.corda.testing.DUMMY_NOTARY import net.corda.testing.internal.InProcessNode -import net.corda.testing.internal.ProcessUtilities -import net.corda.testing.node.ClusterSpec -import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO +import net.corda.testing.internal.DriverDSLImpl +import net.corda.testing.internal.genericDriver +import net.corda.testing.internal.getTimestampAsDirectoryName import net.corda.testing.node.NotarySpec -import okhttp3.OkHttpClient -import okhttp3.Request -import org.slf4j.Logger -import rx.Observable -import rx.observables.ConnectableObservable -import java.net.* +import java.net.InetSocketAddress +import java.net.ServerSocket import java.nio.file.Path import java.nio.file.Paths -import java.nio.file.StandardCopyOption.REPLACE_EXISTING -import java.time.Duration -import java.time.Instant -import java.time.ZoneOffset.UTC -import java.time.format.DateTimeFormatter -import java.util.* -import java.util.concurrent.Executors -import java.util.concurrent.ScheduledExecutorService -import java.util.concurrent.TimeUnit.MILLISECONDS -import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.atomic.AtomicInteger -import kotlin.concurrent.thread - -/** - * This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests. - * - * The process the driver is run in behaves as an Artemis client and starts up other processes. - * - * TODO this file is getting way too big, it should be split into several files. - */ -private val log: Logger = loggerFor() - -private val DEFAULT_POLL_INTERVAL = 500.millis - -private const val DEFAULT_WARN_COUNT = 120 - -/** - * A sub-set of permissions that grant most of the essential operations used in the unit/integration tests as well as - * in demo application like NodeExplorer. - */ -private val DRIVER_REQUIRED_PERMISSIONS = setOf( - invokeRpc(CordaRPCOps::nodeInfo), - invokeRpc(CordaRPCOps::networkMapFeed), - invokeRpc(CordaRPCOps::networkMapSnapshot), - invokeRpc(CordaRPCOps::notaryIdentities), - invokeRpc(CordaRPCOps::stateMachinesFeed), - invokeRpc(CordaRPCOps::stateMachineRecordedTransactionMappingFeed), - invokeRpc(CordaRPCOps::nodeInfoFromParty), - invokeRpc(CordaRPCOps::internalVerifiedTransactionsFeed), - invokeRpc("vaultQueryBy"), - invokeRpc("vaultTrackBy"), - invokeRpc(CordaRPCOps::registeredFlows) -) /** * Object ecapsulating a notary started automatically by the driver. */ data class NotaryHandle(val identity: Party, val validating: Boolean, val nodeHandles: CordaFuture>) -/** - * This is the interface that's exposed to DSL users. - */ -interface DriverDSLExposedInterface : CordformContext { - /** Returns a list of [NotaryHandle]s matching the list of [NotarySpec]s passed into [driver]. */ - val notaryHandles: List - - /** - * Returns the [NotaryHandle] for the single notary on the network. Throws if there are none or more than one. - * @see notaryHandles - */ - val defaultNotaryHandle: NotaryHandle get() { - return when (notaryHandles.size) { - 0 -> throw IllegalStateException("There are no notaries defined on the network") - 1 -> notaryHandles[0] - else -> throw IllegalStateException("There is more than one notary defined on the network") - } - } - - /** - * Returns the identity of the single notary on the network. Throws if there are none or more than one. - * @see defaultNotaryHandle - */ - val defaultNotaryIdentity: Party get() = defaultNotaryHandle.identity - - /** - * Returns a [CordaFuture] on the [NodeHandle] for the single-node notary on the network. Throws if there - * are no notaries or more than one, or if the notary is a distributed cluster. - * @see defaultNotaryHandle - * @see notaryHandles - */ - val defaultNotaryNode: CordaFuture get() { - return defaultNotaryHandle.nodeHandles.map { - it.singleOrNull() ?: throw IllegalStateException("Default notary is not a single node") - } - } - - /** - * Start a node. - * - * @param defaultParameters The default parameters for the node. Allows the node to be configured in builder style - * when called from Java code. - * @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something - * random. Note that this must be unique as the driver uses it as a primary key! - * @param verifierType The type of transaction verifier to use. See: [VerifierType] - * @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list. - * @param startInSameProcess Determines if the node should be started inside the same process the Driver is running - * in. If null the Driver-level value will be used. - * @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available. - */ - fun startNode( - defaultParameters: NodeParameters = NodeParameters(), - providedName: CordaX500Name? = defaultParameters.providedName, - rpcUsers: List = defaultParameters.rpcUsers, - verifierType: VerifierType = defaultParameters.verifierType, - customOverrides: Map = defaultParameters.customOverrides, - startInSameProcess: Boolean? = defaultParameters.startInSameProcess, - maximumHeapSize: String = defaultParameters.maximumHeapSize): CordaFuture - - /** - * Helper function for starting a [Node] with custom parameters from Java. - * - * @param parameters The default parameters for the driver. - * @return [NodeHandle] that will be available sometime in the future. - */ - fun startNode(parameters: NodeParameters): CordaFuture = startNode(defaultParameters = parameters) - - /** Call [startWebserver] with a default maximumHeapSize. */ - fun startWebserver(handle: NodeHandle): CordaFuture = startWebserver(handle, "200m") - - /** - * Starts a web server for a node - * @param handle The handle for the node that this webserver connects to via RPC. - * @param maximumHeapSize Argument for JVM -Xmx option e.g. "200m". - */ - fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture - - /** - * Polls a function until it returns a non-null value. Note that there is no timeout on the polling. - * - * @param pollName A description of what is being polled. - * @param pollInterval The interval of polling. - * @param warnCount The number of polls after the Driver gives a warning. - * @param check The function being polled. - * @return A future that completes with the non-null value [check] has returned. - */ - fun pollUntilNonNull(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -> A?): CordaFuture - - /** - * Polls the given function until it returns true. - * @see pollUntilNonNull - */ - fun pollUntilTrue(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -> Boolean): CordaFuture { - return pollUntilNonNull(pollName, pollInterval, warnCount) { if (check()) Unit else null } - } - - val shutdownManager: ShutdownManager -} - -interface DriverDSLInternalInterface : DriverDSLExposedInterface { - fun start() - fun shutdown() -} - sealed class NodeHandle { abstract val nodeInfo: NodeInfo /** @@ -319,9 +152,9 @@ data class NodeParameters( * @param systemProperties A Map of extra system properties which will be given to each new node. Defaults to empty. * @param useTestClock If true the test clock will be used in Node. * @param startNodesInProcess Provides the default behaviour of whether new nodes should start inside this process or - * not. Note that this may be overridden in [DriverDSLExposedInterface.startNode]. + * not. Note that this may be overridden in [DriverDSL.startNode]. * @param notarySpecs The notaries advertised for this network. These nodes will be started automatically and will be - * available from [DriverDSLExposedInterface.notaryHandles]. Defaults to a simple validating notary. + * available from [DriverDSL.notaryHandles]. Defaults to a simple validating notary. * @param dsl The dsl itself. * @return The value returned in the [dsl] closure. */ @@ -335,13 +168,13 @@ fun driver( useTestClock: Boolean = defaultParameters.useTestClock, initialiseSerialization: Boolean = defaultParameters.initialiseSerialization, startNodesInProcess: Boolean = defaultParameters.startNodesInProcess, - waitForAllNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish, + waitForAllNodesToFinish: Boolean = defaultParameters.waitForAllNodesToFinish, notarySpecs: List = defaultParameters.notarySpecs, extraCordappPackagesToScan: List = defaultParameters.extraCordappPackagesToScan, - dsl: DriverDSLExposedInterface.() -> A + dsl: DriverDSL.() -> A ): A { return genericDriver( - driverDsl = DriverDSL( + driverDsl = DriverDSLImpl( portAllocation = portAllocation, debugPortAllocation = debugPortAllocation, systemProperties = systemProperties, @@ -368,7 +201,7 @@ fun driver( */ fun driver( parameters: DriverParameters, - dsl: DriverDSLExposedInterface.() -> A + dsl: DriverDSL.() -> A ): A { return driver(defaultParameters = parameters, dsl = dsl) } @@ -384,7 +217,7 @@ data class DriverParameters( val useTestClock: Boolean = false, val initialiseSerialization: Boolean = true, val startNodesInProcess: Boolean = false, - val waitForNodesToFinish: Boolean = false, + val waitForAllNodesToFinish: Boolean = false, val notarySpecs: List = listOf(NotarySpec(DUMMY_NOTARY.name)), val extraCordappPackagesToScan: List = emptyList() ) { @@ -396,643 +229,7 @@ data class DriverParameters( fun setUseTestClock(useTestClock: Boolean) = copy(useTestClock = useTestClock) fun setInitialiseSerialization(initialiseSerialization: Boolean) = copy(initialiseSerialization = initialiseSerialization) fun setStartNodesInProcess(startNodesInProcess: Boolean) = copy(startNodesInProcess = startNodesInProcess) - fun setTerminateNodesOnShutdown(terminateNodesOnShutdown: Boolean) = copy(waitForNodesToFinish = terminateNodesOnShutdown) + fun setWaitForAllNodesToFinish(waitForAllNodesToFinish: Boolean) = copy(waitForAllNodesToFinish = waitForAllNodesToFinish) fun setExtraCordappPackagesToScan(extraCordappPackagesToScan: List) = copy(extraCordappPackagesToScan = extraCordappPackagesToScan) fun setNotarySpecs(notarySpecs: List) = copy(notarySpecs = notarySpecs) } - -/** - * This is a helper method to allow extending of the DSL, along the lines of - * interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface - * interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface - * class SomeOtherDSL(val driverDSL : DriverDSL) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface - * - * @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause. - */ -fun genericDriver( - driverDsl: D, - initialiseSerialization: Boolean = true, - coerce: (D) -> DI, - dsl: DI.() -> A -): A { - val serializationEnv = setGlobalSerialization(initialiseSerialization) - val shutdownHook = addShutdownHook(driverDsl::shutdown) - try { - driverDsl.start() - return dsl(coerce(driverDsl)) - } catch (exception: Throwable) { - log.error("Driver shutting down because of exception", exception) - throw exception - } finally { - driverDsl.shutdown() - shutdownHook.cancel() - serializationEnv.unset() - } -} - -/** - * This is a helper method to allow extending of the DSL, along the lines of - * interface SomeOtherExposedDSLInterface : DriverDSLExposedInterface - * interface SomeOtherInternalDSLInterface : DriverDSLInternalInterface, SomeOtherExposedDSLInterface - * class SomeOtherDSL(val driverDSL : DriverDSL) : DriverDSLInternalInterface by driverDSL, SomeOtherInternalDSLInterface - * - * @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause. - */ -fun genericDriver( - defaultParameters: DriverParameters = DriverParameters(), - isDebug: Boolean = defaultParameters.isDebug, - driverDirectory: Path = defaultParameters.driverDirectory, - portAllocation: PortAllocation = defaultParameters.portAllocation, - debugPortAllocation: PortAllocation = defaultParameters.debugPortAllocation, - systemProperties: Map = defaultParameters.systemProperties, - useTestClock: Boolean = defaultParameters.useTestClock, - initialiseSerialization: Boolean = defaultParameters.initialiseSerialization, - waitForNodesToFinish: Boolean = defaultParameters.waitForNodesToFinish, - startNodesInProcess: Boolean = defaultParameters.startNodesInProcess, - notarySpecs: List, - extraCordappPackagesToScan: List = defaultParameters.extraCordappPackagesToScan, - driverDslWrapper: (DriverDSL) -> D, - coerce: (D) -> DI, dsl: DI.() -> A -): A { - val serializationEnv = setGlobalSerialization(initialiseSerialization) - val driverDsl = driverDslWrapper( - DriverDSL( - portAllocation = portAllocation, - debugPortAllocation = debugPortAllocation, - systemProperties = systemProperties, - driverDirectory = driverDirectory.toAbsolutePath(), - useTestClock = useTestClock, - isDebug = isDebug, - startNodesInProcess = startNodesInProcess, - waitForNodesToFinish = waitForNodesToFinish, - extraCordappPackagesToScan = extraCordappPackagesToScan, - notarySpecs = notarySpecs - ) - ) - val shutdownHook = addShutdownHook(driverDsl::shutdown) - try { - driverDsl.start() - return dsl(coerce(driverDsl)) - } catch (exception: Throwable) { - log.error("Driver shutting down because of exception", exception) - throw exception - } finally { - driverDsl.shutdown() - shutdownHook.cancel() - serializationEnv.unset() - } -} - -fun getTimestampAsDirectoryName(): String { - return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now()) -} - -class ListenProcessDeathException(hostAndPort: NetworkHostAndPort, listenProcess: Process) : - CordaException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}") - -/** - * @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended. - */ -fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, listenProcess: Process? = null) { - addressMustBeBoundFuture(executorService, hostAndPort, listenProcess).getOrThrow() -} - -fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, listenProcess: Process? = null): CordaFuture { - return poll(executorService, "address $hostAndPort to bind") { - if (listenProcess != null && !listenProcess.isAlive) { - throw ListenProcessDeathException(hostAndPort, listenProcess) - } - try { - Socket(hostAndPort.host, hostAndPort.port).close() - Unit - } catch (_exception: SocketException) { - null - } - } -} - -/* - * The default timeout value of 40 seconds have been chosen based on previous node shutdown time estimate. - * It's been observed that nodes can take up to 30 seconds to shut down, so just to stay on the safe side the 60 seconds - * timeout has been chosen. - */ -fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, timeout: Duration = 40.seconds) { - addressMustNotBeBoundFuture(executorService, hostAndPort).getOrThrow(timeout) -} - -fun addressMustNotBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort): CordaFuture { - return poll(executorService, "address $hostAndPort to unbind") { - try { - Socket(hostAndPort.host, hostAndPort.port).close() - null - } catch (_exception: SocketException) { - Unit - } - } -} - -fun poll( - executorService: ScheduledExecutorService, - pollName: String, - pollInterval: Duration = 500.millis, - warnCount: Int = 120, - check: () -> A? -): CordaFuture { - val resultFuture = openFuture() - val task = object : Runnable { - var counter = -1 - override fun run() { - if (resultFuture.isCancelled) return // Give up, caller can no longer get the result. - if (++counter == warnCount) { - log.warn("Been polling $pollName for ${(pollInterval * warnCount.toLong()).seconds} seconds...") - } - try { - val checkResult = check() - if (checkResult != null) { - resultFuture.set(checkResult) - } else { - executorService.schedule(this, pollInterval.toMillis(), MILLISECONDS) - } - } catch (t: Throwable) { - resultFuture.setException(t) - } - } - } - executorService.submit(task) // The check may be expensive, so always run it in the background even the first time. - return resultFuture -} - -class DriverDSL( - val portAllocation: PortAllocation, - val debugPortAllocation: PortAllocation, - val systemProperties: Map, - val driverDirectory: Path, - val useTestClock: Boolean, - val isDebug: Boolean, - val startNodesInProcess: Boolean, - val waitForNodesToFinish: Boolean, - extraCordappPackagesToScan: List, - val notarySpecs: List -) : DriverDSLInternalInterface { - private var _executorService: ScheduledExecutorService? = null - val executorService get() = _executorService!! - private var _shutdownManager: ShutdownManager? = null - override val shutdownManager get() = _shutdownManager!! - private val cordappPackages = extraCordappPackagesToScan + getCallerPackage() - // TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/ - // This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system. - // Investigate whether we can avoid that. - private val nodeInfoFilesCopier = NodeInfoFilesCopier() - // Map from a nodes legal name to an observable emitting the number of nodes in its network map. - private val countObservables = mutableMapOf>() - private lateinit var _notaries: List - override val notaryHandles: List get() = _notaries - - class State { - val processes = ArrayList() - } - - private val state = ThreadBox(State()) - - //TODO: remove this once we can bundle quasar properly. - private val quasarJarPath: String by lazy { - val cl = ClassLoader.getSystemClassLoader() - val urls = (cl as URLClassLoader).urLs - val quasarPattern = ".*quasar.*\\.jar$".toRegex() - val quasarFileUrl = urls.first { quasarPattern.matches(it.path) } - Paths.get(quasarFileUrl.toURI()).toString() - } - - override fun shutdown() { - if (waitForNodesToFinish) { - state.locked { - processes.forEach { it.waitFor() } - } - } - _shutdownManager?.shutdown() - _executorService?.shutdownNow() - } - - private fun establishRpc(config: NodeConfiguration, processDeathFuture: CordaFuture): CordaFuture { - val rpcAddress = config.rpcAddress!! - val client = CordaRPCClient(rpcAddress) - val connectionFuture = poll(executorService, "RPC connection") { - try { - client.start(config.rpcUsers[0].username, config.rpcUsers[0].password) - } catch (e: Exception) { - if (processDeathFuture.isDone) throw e - log.error("Exception $e, Retrying RPC connection at $rpcAddress") - null - } - } - return firstOf(connectionFuture, processDeathFuture) { - if (it == processDeathFuture) { - throw ListenProcessDeathException(rpcAddress, processDeathFuture.getOrThrow()) - } - val connection = connectionFuture.getOrThrow() - shutdownManager.registerShutdown(connection::close) - connection.proxy - } - } - - override fun startNode( - defaultParameters: NodeParameters, - providedName: CordaX500Name?, - rpcUsers: List, - verifierType: VerifierType, - customOverrides: Map, - startInSameProcess: Boolean?, - maximumHeapSize: String - ): CordaFuture { - val p2pAddress = portAllocation.nextHostAndPort() - val rpcAddress = portAllocation.nextHostAndPort() - val webAddress = portAllocation.nextHostAndPort() - // TODO: Derive name from the full picked name, don't just wrap the common name - val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB") - val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) } - val config = ConfigHelper.loadConfig( - baseDirectory = baseDirectory(name), - allowMissingConfig = true, - configOverrides = configOf( - "myLegalName" to name.toString(), - "p2pAddress" to p2pAddress.toString(), - "rpcAddress" to rpcAddress.toString(), - "webAddress" to webAddress.toString(), - "useTestClock" to useTestClock, - "rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() }, - "verifierType" to verifierType.name - ) + customOverrides - ) - return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize) - } - - internal fun startCordformNode(cordform: CordformNode): CordaFuture { - val name = CordaX500Name.parse(cordform.name) - // TODO We shouldn't have to allocate an RPC or web address if they're not specified. We're having to do this because of startNodeInternal - val rpcAddress = if (cordform.rpcAddress == null) mapOf("rpcAddress" to portAllocation.nextHostAndPort().toString()) else emptyMap() - val webAddress = cordform.webAddress?.let { NetworkHostAndPort.parse(it) } ?: portAllocation.nextHostAndPort() - val notary = if (cordform.notary != null) mapOf("notary" to cordform.notary) else emptyMap() - val rpcUsers = cordform.rpcUsers - val config = ConfigHelper.loadConfig( - baseDirectory = baseDirectory(name), - allowMissingConfig = true, - configOverrides = cordform.config + rpcAddress + notary + mapOf( - "rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers - ) - ) - return startNodeInternal(config, webAddress, null, "200m") - } - - private fun queryWebserver(handle: NodeHandle, process: Process): WebserverHandle { - val protocol = if (handle.configuration.useHTTPS) "https://" else "http://" - val url = URL("$protocol${handle.webAddress}/api/status") - val client = OkHttpClient.Builder().connectTimeout(5, SECONDS).readTimeout(60, SECONDS).build() - - while (process.isAlive) try { - val response = client.newCall(Request.Builder().url(url).build()).execute() - if (response.isSuccessful && (response.body().string() == "started")) { - return WebserverHandle(handle.webAddress, process) - } - } catch (e: ConnectException) { - log.debug("Retrying webserver info at ${handle.webAddress}") - } - - throw IllegalStateException("Webserver at ${handle.webAddress} has died") - } - - override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture { - val debugPort = if (isDebug) debugPortAllocation.nextPort() else null - val process = DriverDSL.startWebserver(handle, debugPort, maximumHeapSize) - shutdownManager.registerProcessShutdown(process) - val webReadyFuture = addressMustBeBoundFuture(executorService, handle.webAddress, process) - return webReadyFuture.map { queryWebserver(handle, process) } - } - - override fun start() { - _executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build()) - _shutdownManager = ShutdownManager(executorService) - shutdownManager.registerShutdown { nodeInfoFilesCopier.close() } - val notaryInfos = generateNotaryIdentities() - val nodeHandles = startNotaries() - _notaries = notaryInfos.zip(nodeHandles) { (identity, validating), nodes -> NotaryHandle(identity, validating, nodes) } - } - - private fun generateNotaryIdentities(): List> { - return notarySpecs.map { spec -> - val identity = if (spec.cluster == null) { - ServiceIdentityGenerator.generateToDisk( - dirs = listOf(baseDirectory(spec.name)), - serviceName = spec.name.copy(commonName = NotaryService.constructId(validating = spec.validating)) - ) - } else { - ServiceIdentityGenerator.generateToDisk( - dirs = generateNodeNames(spec).map { baseDirectory(it) }, - serviceName = spec.name - ) - } - Pair(identity, spec.validating) - } - } - - private fun generateNodeNames(spec: NotarySpec): List { - return (0 until spec.cluster!!.clusterSize).map { spec.name.copy(commonName = null, organisation = "${spec.name.organisation}-$it") } - } - - private fun startNotaries(): List>> { - return notarySpecs.map { - when { - it.cluster == null -> startSingleNotary(it) - it.cluster is ClusterSpec.Raft -> startRaftNotaryCluster(it) - else -> throw IllegalArgumentException("BFT-SMaRt not supported") - } - } - } - - // TODO This mapping is done is several places including the gradle plugin. In general we need a better way of - // generating the configs for the nodes, probably making use of Any.toConfig() - private fun NotaryConfig.toConfigMap(): Map = mapOf("notary" to toConfig().root().unwrapped()) - - private fun startSingleNotary(spec: NotarySpec): CordaFuture> { - return startNode( - providedName = spec.name, - rpcUsers = spec.rpcUsers, - verifierType = spec.verifierType, - customOverrides = NotaryConfig(spec.validating).toConfigMap() - ).map { listOf(it) } - } - - private fun startRaftNotaryCluster(spec: NotarySpec): CordaFuture> { - fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map { - val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList() - val config = NotaryConfig( - validating = spec.validating, - raft = RaftConfig(nodeAddress = nodeAddress, clusterAddresses = clusterAddresses)) - return config.toConfigMap() - } - - val nodeNames = generateNodeNames(spec) - val clusterAddress = portAllocation.nextHostAndPort() - - // Start the first node that will bootstrap the cluster - val firstNodeFuture = startNode( - providedName = nodeNames[0], - rpcUsers = spec.rpcUsers, - verifierType = spec.verifierType, - customOverrides = notaryConfig(clusterAddress) + mapOf( - "database.serverNameTablePrefix" to nodeNames[0].toString().replace(Regex("[^0-9A-Za-z]+"), "") - ) - ) - - // All other nodes will join the cluster - val restNodeFutures = nodeNames.drop(1).map { - val nodeAddress = portAllocation.nextHostAndPort() - startNode( - providedName = it, - rpcUsers = spec.rpcUsers, - verifierType = spec.verifierType, - customOverrides = notaryConfig(nodeAddress, clusterAddress) + mapOf( - "database.serverNameTablePrefix" to it.toString().replace(Regex("[^0-9A-Za-z]+"), "") - ) - ) - } - - return firstNodeFuture.flatMap { first -> - restNodeFutures.transpose().map { rest -> listOf(first) + rest } - } - } - - fun baseDirectory(nodeName: CordaX500Name): Path { - val nodeDirectoryName = nodeName.organisation.filter { !it.isWhitespace() } - return driverDirectory / nodeDirectoryName - } - - override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName)) - - /** - * @param initial number of nodes currently in the network map of a running node. - * @param networkMapCacheChangeObservable an observable returning the updates to the node network map. - * @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes - * the initial value emitted is always [initial] - */ - private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable): - ConnectableObservable { - val count = AtomicInteger(initial) - return networkMapCacheChangeObservable.map { it -> - when (it) { - is NetworkMapCache.MapChange.Added -> count.incrementAndGet() - is NetworkMapCache.MapChange.Removed -> count.decrementAndGet() - is NetworkMapCache.MapChange.Modified -> count.get() - } - }.startWith(initial).replay() - } - - /** - * @param rpc the [CordaRPCOps] of a newly started node. - * @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes - * equal to the number of running nodes. The future will yield the number of connected nodes. - */ - private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture { - val (snapshot, updates) = rpc.networkMapFeed() - val counterObservable = nodeCountObservable(snapshot.size, updates) - countObservables[rpc.nodeInfo().legalIdentities[0].name] = counterObservable - /* TODO: this might not always be the exact number of nodes one has to wait for, - * for example in the following sequence - * 1 start 3 nodes in order, A, B, C. - * 2 before the future returned by this function resolves, kill B - * At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes. - */ - val requiredNodes = countObservables.size - - // This is an observable which yield the minimum number of nodes in each node network map. - val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array -> - args.map { it as Int }.min() ?: 0 - } - val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture() - counterObservable.connect() - return future - } - - private fun startNodeInternal(config: Config, - webAddress: NetworkHostAndPort, - startInProcess: Boolean?, - maximumHeapSize: String): CordaFuture { - val configuration = config.parseAsNodeConfiguration() - val baseDirectory = configuration.baseDirectory.createDirectories() - nodeInfoFilesCopier.addConfig(baseDirectory) - val onNodeExit: () -> Unit = { - nodeInfoFilesCopier.removeConfig(baseDirectory) - countObservables.remove(configuration.myLegalName) - } - if (startInProcess ?: startNodesInProcess) { - val nodeAndThreadFuture = startInProcessNode(executorService, configuration, config, cordappPackages) - shutdownManager.registerShutdown( - nodeAndThreadFuture.map { (node, thread) -> - { - node.dispose() - thread.interrupt() - } - } - ) - return nodeAndThreadFuture.flatMap { (node, thread) -> - establishRpc(configuration, openFuture()).flatMap { rpc -> - allNodesConnected(rpc).map { - NodeHandle.InProcess(rpc.nodeInfo(), rpc, configuration, webAddress, node, thread, onNodeExit) - } - } - } - } else { - val debugPort = if (isDebug) debugPortAllocation.nextPort() else null - val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort, systemProperties, cordappPackages, maximumHeapSize) - if (waitForNodesToFinish) { - state.locked { - processes += process - } - } else { - shutdownManager.registerProcessShutdown(process) - } - val p2pReadyFuture = addressMustBeBoundFuture(executorService, configuration.p2pAddress, process) - return p2pReadyFuture.flatMap { - val processDeathFuture = poll(executorService, "process death") { - if (process.isAlive) null else process - } - establishRpc(configuration, processDeathFuture).flatMap { rpc -> - // Check for all nodes to have all other nodes in background in case RPC is failing over: - val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it } - firstOf(processDeathFuture, networkMapFuture) { - if (it == processDeathFuture) { - throw ListenProcessDeathException(configuration.p2pAddress, process) - } - processDeathFuture.cancel(false) - log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress") - NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, configuration, webAddress, debugPort, process, - onNodeExit) - } - } - } - } - } - - override fun pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): CordaFuture { - val pollFuture = poll(executorService, pollName, pollInterval, warnCount, check) - shutdownManager.registerShutdown { pollFuture.cancel(true) } - return pollFuture - } - - companion object { - private val defaultRpcUserList = listOf(User("default", "default", setOf("ALL")).toConfig().root().unwrapped()) - - private val names = arrayOf( - ALICE.name, - BOB.name, - DUMMY_BANK_A.name - ) - - private fun oneOf(array: Array) = array[Random().nextInt(array.size)] - - private fun startInProcessNode( - executorService: ScheduledExecutorService, - nodeConf: NodeConfiguration, - config: Config, - cordappPackages: List - ): CordaFuture, Thread>> { - return executorService.fork { - log.info("Starting in-process Node ${nodeConf.myLegalName.organisation}") - // Write node.conf - writeConfig(nodeConf.baseDirectory, "node.conf", config) - // TODO pass the version in? - val node = InProcessNode(nodeConf, MOCK_VERSION_INFO, cordappPackages).start() - val nodeThread = thread(name = nodeConf.myLegalName.organisation) { - node.internals.run() - } - node to nodeThread - }.flatMap { - nodeAndThread -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread } - } - } - - private fun startOutOfProcessNode( - nodeConf: NodeConfiguration, - config: Config, - quasarJarPath: String, - debugPort: Int?, - overriddenSystemProperties: Map, - cordappPackages: List, - maximumHeapSize: String - ): Process { - log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled")) - // Write node.conf - writeConfig(nodeConf.baseDirectory, "node.conf", config) - - val systemProperties = overriddenSystemProperties + mapOf( - "name" to nodeConf.myLegalName, - "visualvm.display.name" to "corda-${nodeConf.myLegalName}", - Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator), - "java.io.tmpdir" to System.getProperty("java.io.tmpdir"), // Inherit from parent process - "log4j2.debug" to if(debugPort != null) "true" else "false" - ) - // See experimental/quasar-hook/README.md for how to generate. - val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" + - "com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" + - "com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" + - "io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" + - "org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" + - "org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" + - "org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)" - val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } + - "-javaagent:$quasarJarPath=$excludePattern" - val loggingLevel = if (debugPort == null) "INFO" else "DEBUG" - - return ProcessUtilities.startCordaProcess( - className = "net.corda.node.Corda", // cannot directly get class for this, so just use string - arguments = listOf( - "--base-directory=${nodeConf.baseDirectory}", - "--logging-level=$loggingLevel", - "--no-local-shell" - ), - jdwpPort = debugPort, - extraJvmArguments = extraJvmArguments, - errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log", - workingDirectory = nodeConf.baseDirectory, - maximumHeapSize = maximumHeapSize - ) - } - - private fun startWebserver(handle: NodeHandle, debugPort: Int?, maximumHeapSize: String): Process { - val className = "net.corda.webserver.WebServer" - return ProcessUtilities.startCordaProcess( - className = className, // cannot directly get class for this, so just use string - arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()), - jdwpPort = debugPort, - extraJvmArguments = listOf( - "-Dname=node-${handle.configuration.p2pAddress}-webserver", - "-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process - ), - errorLogPath = Paths.get("error.$className.log"), - workingDirectory = null, - maximumHeapSize = maximumHeapSize - ) - } - - private fun getCallerPackage(): String { - return Exception() - .stackTrace - .first { it.fileName != "Driver.kt" } - .let { Class.forName(it.className).`package`?.name } - ?: throw IllegalStateException("Function instantiating driver must be defined in a package.") - } - - /** - * We have an alternative way of specifying classpath for spawned process: by using "-cp" option. So duplicating the setting of this - * rather long string is un-necessary and can be harmful on Windows. - */ - private fun Map.removeResolvedClasspath(): Map { - return filterNot { it.key == "java.class.path" } - } - } -} - -fun writeConfig(path: Path, filename: String, config: Config) { - val configString = config.root().render(ConfigRenderOptions.defaults()) - configString.byteInputStream().copyTo(path / filename, REPLACE_EXISTING) -} - diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt new file mode 100644 index 0000000000..42a394b51d --- /dev/null +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/driver/DriverDSL.kt @@ -0,0 +1,92 @@ +package net.corda.testing.driver + +import net.corda.core.concurrent.CordaFuture +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.map +import net.corda.node.internal.Node +import net.corda.node.services.config.VerifierType +import net.corda.nodeapi.internal.config.User +import net.corda.testing.node.NotarySpec +import java.nio.file.Path + +interface DriverDSL { + /** Returns a list of [NotaryHandle]s matching the list of [NotarySpec]s passed into [driver]. */ + val notaryHandles: List + + /** + * Returns the [NotaryHandle] for the single notary on the network. Throws if there are none or more than one. + * @see notaryHandles + */ + val defaultNotaryHandle: NotaryHandle get() { + return when (notaryHandles.size) { + 0 -> throw IllegalStateException("There are no notaries defined on the network") + 1 -> notaryHandles[0] + else -> throw IllegalStateException("There is more than one notary defined on the network") + } + } + + /** + * Returns the identity of the single notary on the network. Throws if there are none or more than one. + * @see defaultNotaryHandle + */ + val defaultNotaryIdentity: Party get() = defaultNotaryHandle.identity + + /** + * Returns a [CordaFuture] on the [NodeHandle] for the single-node notary on the network. Throws if there + * are no notaries or more than one, or if the notary is a distributed cluster. + * @see defaultNotaryHandle + * @see notaryHandles + */ + val defaultNotaryNode: CordaFuture get() { + return defaultNotaryHandle.nodeHandles.map { + it.singleOrNull() ?: throw IllegalStateException("Default notary is not a single node") + } + } + + /** + * Start a node. + * + * @param defaultParameters The default parameters for the node. Allows the node to be configured in builder style + * when called from Java code. + * @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something + * random. Note that this must be unique as the driver uses it as a primary key! + * @param verifierType The type of transaction verifier to use. See: [VerifierType] + * @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list. + * @param startInSameProcess Determines if the node should be started inside the same process the Driver is running + * in. If null the Driver-level value will be used. + * @return A [CordaFuture] on the [NodeHandle] to the node. The future will complete when the node is available. + */ + fun startNode( + defaultParameters: NodeParameters = NodeParameters(), + providedName: CordaX500Name? = defaultParameters.providedName, + rpcUsers: List = defaultParameters.rpcUsers, + verifierType: VerifierType = defaultParameters.verifierType, + customOverrides: Map = defaultParameters.customOverrides, + startInSameProcess: Boolean? = defaultParameters.startInSameProcess, + maximumHeapSize: String = defaultParameters.maximumHeapSize): CordaFuture + + /** + * Helper function for starting a [Node] with custom parameters from Java. + * + * @param parameters The default parameters for the driver. + * @return [NodeHandle] that will be available sometime in the future. + */ + fun startNode(parameters: NodeParameters): CordaFuture = startNode(defaultParameters = parameters) + + /** Call [startWebserver] with a default maximumHeapSize. */ + fun startWebserver(handle: NodeHandle): CordaFuture = startWebserver(handle, "200m") + + /** + * Starts a web server for a node + * @param handle The handle for the node that this webserver connects to via RPC. + * @param maximumHeapSize Argument for JVM -Xmx option e.g. "200m". + */ + fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture + + /** + * Returns the base directory for a node with the given [CordaX500Name]. This method is useful if the base directory + * is needed before the node is started. + */ + fun baseDirectory(nodeName: CordaX500Name): Path +} diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/DriverDSLImpl.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/DriverDSLImpl.kt new file mode 100644 index 0000000000..b2fe9a1070 --- /dev/null +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/DriverDSLImpl.kt @@ -0,0 +1,683 @@ +package net.corda.testing.internal + +import com.google.common.util.concurrent.ThreadFactoryBuilder +import com.typesafe.config.Config +import com.typesafe.config.ConfigRenderOptions +import net.corda.client.rpc.CordaRPCClient +import net.corda.cordform.CordformContext +import net.corda.cordform.CordformNode +import net.corda.core.concurrent.CordaFuture +import net.corda.core.concurrent.firstOf +import net.corda.core.identity.CordaX500Name +import net.corda.core.identity.Party +import net.corda.core.internal.ThreadBox +import net.corda.core.internal.concurrent.* +import net.corda.core.internal.copyTo +import net.corda.core.internal.createDirectories +import net.corda.core.internal.div +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.node.services.NetworkMapCache +import net.corda.core.node.services.NotaryService +import net.corda.core.toFuture +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.millis +import net.corda.node.internal.Node +import net.corda.node.internal.NodeStartup +import net.corda.node.internal.StartedNode +import net.corda.node.services.Permissions +import net.corda.node.services.config.* +import net.corda.node.utilities.ServiceIdentityGenerator +import net.corda.nodeapi.NodeInfoFilesCopier +import net.corda.nodeapi.internal.addShutdownHook +import net.corda.nodeapi.internal.config.User +import net.corda.nodeapi.internal.config.toConfig +import net.corda.testing.ALICE +import net.corda.testing.BOB +import net.corda.testing.DUMMY_BANK_A +import net.corda.testing.driver.* +import net.corda.testing.node.ClusterSpec +import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO +import net.corda.testing.node.NotarySpec +import net.corda.testing.setGlobalSerialization +import okhttp3.OkHttpClient +import okhttp3.Request +import rx.Observable +import rx.observables.ConnectableObservable +import java.net.ConnectException +import java.net.URL +import java.net.URLClassLoader +import java.nio.file.Path +import java.nio.file.Paths +import java.nio.file.StandardCopyOption +import java.time.Duration +import java.time.Instant +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import java.util.* +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import kotlin.concurrent.thread + +class DriverDSLImpl( + val portAllocation: PortAllocation, + val debugPortAllocation: PortAllocation, + val systemProperties: Map, + val driverDirectory: Path, + val useTestClock: Boolean, + val isDebug: Boolean, + val startNodesInProcess: Boolean, + val waitForNodesToFinish: Boolean, + extraCordappPackagesToScan: List, + val notarySpecs: List +) : InternalDriverDSL { + private var _executorService: ScheduledExecutorService? = null + val executorService get() = _executorService!! + private var _shutdownManager: ShutdownManager? = null + override val shutdownManager get() = _shutdownManager!! + private val cordappPackages = extraCordappPackagesToScan + getCallerPackage() + // TODO: this object will copy NodeInfo files from started nodes to other nodes additional-node-infos/ + // This uses the FileSystem and adds a delay (~5 seconds) given by the time we wait before polling the file system. + // Investigate whether we can avoid that. + private val nodeInfoFilesCopier = NodeInfoFilesCopier() + // Map from a nodes legal name to an observable emitting the number of nodes in its network map. + private val countObservables = mutableMapOf>() + private lateinit var _notaries: List + override val notaryHandles: List get() = _notaries + + class State { + val processes = ArrayList() + } + + private val state = ThreadBox(State()) + + //TODO: remove this once we can bundle quasar properly. + private val quasarJarPath: String by lazy { + val cl = ClassLoader.getSystemClassLoader() + val urls = (cl as URLClassLoader).urLs + val quasarPattern = ".*quasar.*\\.jar$".toRegex() + val quasarFileUrl = urls.first { quasarPattern.matches(it.path) } + Paths.get(quasarFileUrl.toURI()).toString() + } + + override fun shutdown() { + if (waitForNodesToFinish) { + state.locked { + processes.forEach { it.waitFor() } + } + } + _shutdownManager?.shutdown() + _executorService?.shutdownNow() + } + + private fun establishRpc(config: NodeConfiguration, processDeathFuture: CordaFuture): CordaFuture { + val rpcAddress = config.rpcAddress!! + val client = CordaRPCClient(rpcAddress) + val connectionFuture = poll(executorService, "RPC connection") { + try { + client.start(config.rpcUsers[0].username, config.rpcUsers[0].password) + } catch (e: Exception) { + if (processDeathFuture.isDone) throw e + log.error("Exception $e, Retrying RPC connection at $rpcAddress") + null + } + } + return firstOf(connectionFuture, processDeathFuture) { + if (it == processDeathFuture) { + throw ListenProcessDeathException(rpcAddress, processDeathFuture.getOrThrow()) + } + val connection = connectionFuture.getOrThrow() + shutdownManager.registerShutdown(connection::close) + connection.proxy + } + } + + override fun startNode( + defaultParameters: NodeParameters, + providedName: CordaX500Name?, + rpcUsers: List, + verifierType: VerifierType, + customOverrides: Map, + startInSameProcess: Boolean?, + maximumHeapSize: String + ): CordaFuture { + val p2pAddress = portAllocation.nextHostAndPort() + val rpcAddress = portAllocation.nextHostAndPort() + val webAddress = portAllocation.nextHostAndPort() + // TODO: Derive name from the full picked name, don't just wrap the common name + val name = providedName ?: CordaX500Name(organisation = "${oneOf(names).organisation}-${p2pAddress.port}", locality = "London", country = "GB") + val users = rpcUsers.map { it.copy(permissions = it.permissions + DRIVER_REQUIRED_PERMISSIONS) } + val config = ConfigHelper.loadConfig( + baseDirectory = baseDirectory(name), + allowMissingConfig = true, + configOverrides = configOf( + "myLegalName" to name.toString(), + "p2pAddress" to p2pAddress.toString(), + "rpcAddress" to rpcAddress.toString(), + "webAddress" to webAddress.toString(), + "useTestClock" to useTestClock, + "rpcUsers" to if (users.isEmpty()) defaultRpcUserList else users.map { it.toConfig().root().unwrapped() }, + "verifierType" to verifierType.name + ) + customOverrides + ) + return startNodeInternal(config, webAddress, startInSameProcess, maximumHeapSize) + } + + internal fun startCordformNode(cordform: CordformNode): CordaFuture { + val name = CordaX500Name.parse(cordform.name) + // TODO We shouldn't have to allocate an RPC or web address if they're not specified. We're having to do this because of startNodeInternal + val rpcAddress = if (cordform.rpcAddress == null) mapOf("rpcAddress" to portAllocation.nextHostAndPort().toString()) else emptyMap() + val webAddress = cordform.webAddress?.let { NetworkHostAndPort.parse(it) } ?: portAllocation.nextHostAndPort() + val notary = if (cordform.notary != null) mapOf("notary" to cordform.notary) else emptyMap() + val rpcUsers = cordform.rpcUsers + val config = ConfigHelper.loadConfig( + baseDirectory = baseDirectory(name), + allowMissingConfig = true, + configOverrides = cordform.config + rpcAddress + notary + mapOf( + "rpcUsers" to if (rpcUsers.isEmpty()) defaultRpcUserList else rpcUsers + ) + ) + return startNodeInternal(config, webAddress, null, "200m") + } + + private fun queryWebserver(handle: NodeHandle, process: Process): WebserverHandle { + val protocol = if (handle.configuration.useHTTPS) "https://" else "http://" + val url = URL("$protocol${handle.webAddress}/api/status") + val client = OkHttpClient.Builder().connectTimeout(5, TimeUnit.SECONDS).readTimeout(60, TimeUnit.SECONDS).build() + + while (process.isAlive) try { + val response = client.newCall(Request.Builder().url(url).build()).execute() + if (response.isSuccessful && (response.body().string() == "started")) { + return WebserverHandle(handle.webAddress, process) + } + } catch (e: ConnectException) { + log.debug("Retrying webserver info at ${handle.webAddress}") + } + + throw IllegalStateException("Webserver at ${handle.webAddress} has died") + } + + override fun startWebserver(handle: NodeHandle, maximumHeapSize: String): CordaFuture { + val debugPort = if (isDebug) debugPortAllocation.nextPort() else null + val process = startWebserver(handle, debugPort, maximumHeapSize) + shutdownManager.registerProcessShutdown(process) + val webReadyFuture = addressMustBeBoundFuture(executorService, handle.webAddress, process) + return webReadyFuture.map { queryWebserver(handle, process) } + } + + override fun start() { + _executorService = Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build()) + _shutdownManager = ShutdownManager(executorService) + shutdownManager.registerShutdown { nodeInfoFilesCopier.close() } + val notaryInfos = generateNotaryIdentities() + val nodeHandles = startNotaries() + _notaries = notaryInfos.zip(nodeHandles) { (identity, validating), nodes -> NotaryHandle(identity, validating, nodes) } + } + + private fun generateNotaryIdentities(): List> { + return notarySpecs.map { spec -> + val identity = if (spec.cluster == null) { + ServiceIdentityGenerator.generateToDisk( + dirs = listOf(baseDirectory(spec.name)), + serviceName = spec.name.copy(commonName = NotaryService.constructId(validating = spec.validating)) + ) + } else { + ServiceIdentityGenerator.generateToDisk( + dirs = generateNodeNames(spec).map { baseDirectory(it) }, + serviceName = spec.name + ) + } + Pair(identity, spec.validating) + } + } + + private fun generateNodeNames(spec: NotarySpec): List { + return (0 until spec.cluster!!.clusterSize).map { spec.name.copy(commonName = null, organisation = "${spec.name.organisation}-$it") } + } + + private fun startNotaries(): List>> { + return notarySpecs.map { + when { + it.cluster == null -> startSingleNotary(it) + it.cluster is ClusterSpec.Raft -> startRaftNotaryCluster(it) + else -> throw IllegalArgumentException("BFT-SMaRt not supported") + } + } + } + + // TODO This mapping is done is several places including the gradle plugin. In general we need a better way of + // generating the configs for the nodes, probably making use of Any.toConfig() + private fun NotaryConfig.toConfigMap(): Map = mapOf("notary" to toConfig().root().unwrapped()) + + private fun startSingleNotary(spec: NotarySpec): CordaFuture> { + return startNode( + providedName = spec.name, + rpcUsers = spec.rpcUsers, + verifierType = spec.verifierType, + customOverrides = NotaryConfig(spec.validating).toConfigMap() + ).map { listOf(it) } + } + + private fun startRaftNotaryCluster(spec: NotarySpec): CordaFuture> { + fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map { + val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList() + val config = NotaryConfig( + validating = spec.validating, + raft = RaftConfig(nodeAddress = nodeAddress, clusterAddresses = clusterAddresses)) + return config.toConfigMap() + } + + val nodeNames = generateNodeNames(spec) + val clusterAddress = portAllocation.nextHostAndPort() + + // Start the first node that will bootstrap the cluster + val firstNodeFuture = startNode( + providedName = nodeNames[0], + rpcUsers = spec.rpcUsers, + verifierType = spec.verifierType, + customOverrides = notaryConfig(clusterAddress) + mapOf( + "database.serverNameTablePrefix" to nodeNames[0].toString().replace(Regex("[^0-9A-Za-z]+"), "") + ) + ) + + // All other nodes will join the cluster + val restNodeFutures = nodeNames.drop(1).map { + val nodeAddress = portAllocation.nextHostAndPort() + startNode( + providedName = it, + rpcUsers = spec.rpcUsers, + verifierType = spec.verifierType, + customOverrides = notaryConfig(nodeAddress, clusterAddress) + mapOf( + "database.serverNameTablePrefix" to it.toString().replace(Regex("[^0-9A-Za-z]+"), "") + ) + ) + } + + return firstNodeFuture.flatMap { first -> + restNodeFutures.transpose().map { rest -> listOf(first) + rest } + } + } + + override fun baseDirectory(nodeName: CordaX500Name): Path { + val nodeDirectoryName = nodeName.organisation.filter { !it.isWhitespace() } + return driverDirectory / nodeDirectoryName + } + + /** + * @param initial number of nodes currently in the network map of a running node. + * @param networkMapCacheChangeObservable an observable returning the updates to the node network map. + * @return a [ConnectableObservable] which emits a new [Int] every time the number of registered nodes changes + * the initial value emitted is always [initial] + */ + private fun nodeCountObservable(initial: Int, networkMapCacheChangeObservable: Observable): + ConnectableObservable { + val count = AtomicInteger(initial) + return networkMapCacheChangeObservable.map { it -> + when (it) { + is NetworkMapCache.MapChange.Added -> count.incrementAndGet() + is NetworkMapCache.MapChange.Removed -> count.decrementAndGet() + is NetworkMapCache.MapChange.Modified -> count.get() + } + }.startWith(initial).replay() + } + + /** + * @param rpc the [CordaRPCOps] of a newly started node. + * @return a [CordaFuture] which resolves when every node started by driver has in its network map a number of nodes + * equal to the number of running nodes. The future will yield the number of connected nodes. + */ + private fun allNodesConnected(rpc: CordaRPCOps): CordaFuture { + val (snapshot, updates) = rpc.networkMapFeed() + val counterObservable = nodeCountObservable(snapshot.size, updates) + countObservables[rpc.nodeInfo().legalIdentities[0].name] = counterObservable + /* TODO: this might not always be the exact number of nodes one has to wait for, + * for example in the following sequence + * 1 start 3 nodes in order, A, B, C. + * 2 before the future returned by this function resolves, kill B + * At that point this future won't ever resolve as it will wait for nodes to know 3 other nodes. + */ + val requiredNodes = countObservables.size + + // This is an observable which yield the minimum number of nodes in each node network map. + val smallestSeenNetworkMapSize = Observable.combineLatest(countObservables.values.toList()) { args: Array -> + args.map { it as Int }.min() ?: 0 + } + val future = smallestSeenNetworkMapSize.filter { it >= requiredNodes }.toFuture() + counterObservable.connect() + return future + } + + private fun startNodeInternal(config: Config, + webAddress: NetworkHostAndPort, + startInProcess: Boolean?, + maximumHeapSize: String): CordaFuture { + val configuration = config.parseAsNodeConfiguration() + val baseDirectory = configuration.baseDirectory.createDirectories() + nodeInfoFilesCopier.addConfig(baseDirectory) + val onNodeExit: () -> Unit = { + nodeInfoFilesCopier.removeConfig(baseDirectory) + countObservables.remove(configuration.myLegalName) + } + if (startInProcess ?: startNodesInProcess) { + val nodeAndThreadFuture = startInProcessNode(executorService, configuration, config, cordappPackages) + shutdownManager.registerShutdown( + nodeAndThreadFuture.map { (node, thread) -> + { + node.dispose() + thread.interrupt() + } + } + ) + return nodeAndThreadFuture.flatMap { (node, thread) -> + establishRpc(configuration, openFuture()).flatMap { rpc -> + allNodesConnected(rpc).map { + NodeHandle.InProcess(rpc.nodeInfo(), rpc, configuration, webAddress, node, thread, onNodeExit) + } + } + } + } else { + val debugPort = if (isDebug) debugPortAllocation.nextPort() else null + val process = startOutOfProcessNode(configuration, config, quasarJarPath, debugPort, systemProperties, cordappPackages, maximumHeapSize) + if (waitForNodesToFinish) { + state.locked { + processes += process + } + } else { + shutdownManager.registerProcessShutdown(process) + } + val p2pReadyFuture = addressMustBeBoundFuture(executorService, configuration.p2pAddress, process) + return p2pReadyFuture.flatMap { + val processDeathFuture = poll(executorService, "process death") { + if (process.isAlive) null else process + } + establishRpc(configuration, processDeathFuture).flatMap { rpc -> + // Check for all nodes to have all other nodes in background in case RPC is failing over: + val networkMapFuture = executorService.fork { allNodesConnected(rpc) }.flatMap { it } + firstOf(processDeathFuture, networkMapFuture) { + if (it == processDeathFuture) { + throw ListenProcessDeathException(configuration.p2pAddress, process) + } + processDeathFuture.cancel(false) + log.info("Node handle is ready. NodeInfo: ${rpc.nodeInfo()}, WebAddress: $webAddress") + NodeHandle.OutOfProcess(rpc.nodeInfo(), rpc, configuration, webAddress, debugPort, process, + onNodeExit) + } + } + } + } + } + + override fun pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): CordaFuture { + val pollFuture = poll(executorService, pollName, pollInterval, warnCount, check) + shutdownManager.registerShutdown { pollFuture.cancel(true) } + return pollFuture + } + + companion object { + internal val log = contextLogger() + + private val defaultRpcUserList = listOf(User("default", "default", setOf("ALL")).toConfig().root().unwrapped()) + + private val names = arrayOf( + ALICE.name, + BOB.name, + DUMMY_BANK_A.name + ) + + /** + * A sub-set of permissions that grant most of the essential operations used in the unit/integration tests as well as + * in demo application like NodeExplorer. + */ + private val DRIVER_REQUIRED_PERMISSIONS = setOf( + Permissions.invokeRpc(CordaRPCOps::nodeInfo), + Permissions.invokeRpc(CordaRPCOps::networkMapFeed), + Permissions.invokeRpc(CordaRPCOps::networkMapSnapshot), + Permissions.invokeRpc(CordaRPCOps::notaryIdentities), + Permissions.invokeRpc(CordaRPCOps::stateMachinesFeed), + Permissions.invokeRpc(CordaRPCOps::stateMachineRecordedTransactionMappingFeed), + Permissions.invokeRpc(CordaRPCOps::nodeInfoFromParty), + Permissions.invokeRpc(CordaRPCOps::internalVerifiedTransactionsFeed), + Permissions.invokeRpc("vaultQueryBy"), + Permissions.invokeRpc("vaultTrackBy"), + Permissions.invokeRpc(CordaRPCOps::registeredFlows) + ) + + private fun oneOf(array: Array) = array[Random().nextInt(array.size)] + + private fun startInProcessNode( + executorService: ScheduledExecutorService, + nodeConf: NodeConfiguration, + config: Config, + cordappPackages: List + ): CordaFuture, Thread>> { + return executorService.fork { + log.info("Starting in-process Node ${nodeConf.myLegalName.organisation}") + // Write node.conf + writeConfig(nodeConf.baseDirectory, "node.conf", config) + // TODO pass the version in? + val node = InProcessNode(nodeConf, MOCK_VERSION_INFO, cordappPackages).start() + val nodeThread = thread(name = nodeConf.myLegalName.organisation) { + node.internals.run() + } + node to nodeThread + }.flatMap { + nodeAndThread -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress).map { nodeAndThread } + } + } + + private fun startOutOfProcessNode( + nodeConf: NodeConfiguration, + config: Config, + quasarJarPath: String, + debugPort: Int?, + overriddenSystemProperties: Map, + cordappPackages: List, + maximumHeapSize: String + ): Process { + log.info("Starting out-of-process Node ${nodeConf.myLegalName.organisation}, debug port is " + (debugPort ?: "not enabled")) + // Write node.conf + writeConfig(nodeConf.baseDirectory, "node.conf", config) + + val systemProperties = overriddenSystemProperties + mapOf( + "name" to nodeConf.myLegalName, + "visualvm.display.name" to "corda-${nodeConf.myLegalName}", + Node.scanPackagesSystemProperty to cordappPackages.joinToString(Node.scanPackagesSeparator), + "java.io.tmpdir" to System.getProperty("java.io.tmpdir"), // Inherit from parent process + "log4j2.debug" to if(debugPort != null) "true" else "false" + ) + // See experimental/quasar-hook/README.md for how to generate. + val excludePattern = "x(antlr**;bftsmart**;ch**;co.paralleluniverse**;com.codahale**;com.esotericsoftware**;" + + "com.fasterxml**;com.google**;com.ibm**;com.intellij**;com.jcabi**;com.nhaarman**;com.opengamma**;" + + "com.typesafe**;com.zaxxer**;de.javakaffee**;groovy**;groovyjarjarantlr**;groovyjarjarasm**;io.atomix**;" + + "io.github**;io.netty**;jdk**;joptsimple**;junit**;kotlin**;net.bytebuddy**;net.i2p**;org.apache**;" + + "org.assertj**;org.bouncycastle**;org.codehaus**;org.crsh**;org.dom4j**;org.fusesource**;org.h2**;" + + "org.hamcrest**;org.hibernate**;org.jboss**;org.jcp**;org.joda**;org.junit**;org.mockito**;org.objectweb**;" + + "org.objenesis**;org.slf4j**;org.w3c**;org.xml**;org.yaml**;reflectasm**;rx**)" + val extraJvmArguments = systemProperties.removeResolvedClasspath().map { "-D${it.key}=${it.value}" } + + "-javaagent:$quasarJarPath=$excludePattern" + val loggingLevel = if (debugPort == null) "INFO" else "DEBUG" + + return ProcessUtilities.startCordaProcess( + className = "net.corda.node.Corda", // cannot directly get class for this, so just use string + arguments = listOf( + "--base-directory=${nodeConf.baseDirectory}", + "--logging-level=$loggingLevel", + "--no-local-shell" + ), + jdwpPort = debugPort, + extraJvmArguments = extraJvmArguments, + errorLogPath = nodeConf.baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME / "error.log", + workingDirectory = nodeConf.baseDirectory, + maximumHeapSize = maximumHeapSize + ) + } + + private fun startWebserver(handle: NodeHandle, debugPort: Int?, maximumHeapSize: String): Process { + val className = "net.corda.webserver.WebServer" + return ProcessUtilities.startCordaProcess( + className = className, // cannot directly get class for this, so just use string + arguments = listOf("--base-directory", handle.configuration.baseDirectory.toString()), + jdwpPort = debugPort, + extraJvmArguments = listOf( + "-Dname=node-${handle.configuration.p2pAddress}-webserver", + "-Djava.io.tmpdir=${System.getProperty("java.io.tmpdir")}" // Inherit from parent process + ), + errorLogPath = Paths.get("error.$className.log"), + workingDirectory = null, + maximumHeapSize = maximumHeapSize + ) + } + + /** + * Get the package of the caller to the driver so that it can be added to the list of packages the nodes will scan. + * This makes the driver automatically pick the CorDapp module that it's run from. + * + * This returns List rather than String? to make it easier to bolt onto extraCordappPackagesToScan. + */ + private fun getCallerPackage(): List { + val stackTrace = Throwable().stackTrace + val index = stackTrace.indexOfLast { it.className == "net.corda.testing.driver.Driver" } + // In this case we're dealing with the the RPCDriver or one of it's cousins which are internal and we don't care about them + if (index == -1) return emptyList() + val callerPackage = Class.forName(stackTrace[index + 1].className).`package` ?: + throw IllegalStateException("Function instantiating driver must be defined in a package.") + return listOf(callerPackage.name) + } + + /** + * We have an alternative way of specifying classpath for spawned process: by using "-cp" option. So duplicating the setting of this + * rather long string is un-necessary and can be harmful on Windows. + */ + private fun Map.removeResolvedClasspath(): Map { + return filterNot { it.key == "java.class.path" } + } + } +} + +interface InternalDriverDSL : DriverDSL, CordformContext { + private companion object { + private val DEFAULT_POLL_INTERVAL = 500.millis + private const val DEFAULT_WARN_COUNT = 120 + } + + val shutdownManager: ShutdownManager + + override fun baseDirectory(nodeName: String): Path = baseDirectory(CordaX500Name.parse(nodeName)) + + /** + * Polls a function until it returns a non-null value. Note that there is no timeout on the polling. + * + * @param pollName A description of what is being polled. + * @param pollInterval The interval of polling. + * @param warnCount The number of polls after the Driver gives a warning. + * @param check The function being polled. + * @return A future that completes with the non-null value [check] has returned. + */ + fun pollUntilNonNull(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -> A?): CordaFuture + + /** + * Polls the given function until it returns true. + * @see pollUntilNonNull + */ + fun pollUntilTrue(pollName: String, pollInterval: Duration = DEFAULT_POLL_INTERVAL, warnCount: Int = DEFAULT_WARN_COUNT, check: () -> Boolean): CordaFuture { + return pollUntilNonNull(pollName, pollInterval, warnCount) { if (check()) Unit else null } + } + + fun start() + + fun shutdown() +} + +/** + * This is a helper method to allow extending of the DSL, along the lines of + * interface SomeOtherExposedDSLInterface : DriverDSL + * interface SomeOtherInternalDSLInterface : InternalDriverDSL, SomeOtherExposedDSLInterface + * class SomeOtherDSL(val driverDSL : DriverDSLImpl) : InternalDriverDSL by driverDSL, SomeOtherInternalDSLInterface + * + * @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause. + */ +fun genericDriver( + driverDsl: D, + initialiseSerialization: Boolean = true, + coerce: (D) -> DI, + dsl: DI.() -> A +): A { + val serializationEnv = setGlobalSerialization(initialiseSerialization) + val shutdownHook = addShutdownHook(driverDsl::shutdown) + try { + driverDsl.start() + return dsl(coerce(driverDsl)) + } catch (exception: Throwable) { + DriverDSLImpl.log.error("Driver shutting down because of exception", exception) + throw exception + } finally { + driverDsl.shutdown() + shutdownHook.cancel() + serializationEnv.unset() + } +} + +/** + * This is a helper method to allow extending of the DSL, along the lines of + * interface SomeOtherExposedDSLInterface : DriverDSL + * interface SomeOtherInternalDSLInterface : InternalDriverDSL, SomeOtherExposedDSLInterface + * class SomeOtherDSL(val driverDSL : DriverDSLImpl) : InternalDriverDSL by driverDSL, SomeOtherInternalDSLInterface + * + * @param coerce We need this explicit coercion witness because we can't put an extra DI : D bound in a `where` clause. + */ +fun genericDriver( + defaultParameters: DriverParameters = DriverParameters(), + isDebug: Boolean = defaultParameters.isDebug, + driverDirectory: Path = defaultParameters.driverDirectory, + portAllocation: PortAllocation = defaultParameters.portAllocation, + debugPortAllocation: PortAllocation = defaultParameters.debugPortAllocation, + systemProperties: Map = defaultParameters.systemProperties, + useTestClock: Boolean = defaultParameters.useTestClock, + initialiseSerialization: Boolean = defaultParameters.initialiseSerialization, + waitForNodesToFinish: Boolean = defaultParameters.waitForAllNodesToFinish, + startNodesInProcess: Boolean = defaultParameters.startNodesInProcess, + notarySpecs: List, + extraCordappPackagesToScan: List = defaultParameters.extraCordappPackagesToScan, + driverDslWrapper: (DriverDSLImpl) -> D, + coerce: (D) -> DI, dsl: DI.() -> A +): A { + val serializationEnv = setGlobalSerialization(initialiseSerialization) + val driverDsl = driverDslWrapper( + DriverDSLImpl( + portAllocation = portAllocation, + debugPortAllocation = debugPortAllocation, + systemProperties = systemProperties, + driverDirectory = driverDirectory.toAbsolutePath(), + useTestClock = useTestClock, + isDebug = isDebug, + startNodesInProcess = startNodesInProcess, + waitForNodesToFinish = waitForNodesToFinish, + extraCordappPackagesToScan = extraCordappPackagesToScan, + notarySpecs = notarySpecs + ) + ) + val shutdownHook = addShutdownHook(driverDsl::shutdown) + try { + driverDsl.start() + return dsl(coerce(driverDsl)) + } catch (exception: Throwable) { + DriverDSLImpl.log.error("Driver shutting down because of exception", exception) + throw exception + } finally { + driverDsl.shutdown() + shutdownHook.cancel() + serializationEnv.unset() + } +} + +fun getTimestampAsDirectoryName(): String { + return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(ZoneOffset.UTC).format(Instant.now()) +} + +fun writeConfig(path: Path, filename: String, config: Config) { + val configString = config.root().render(ConfigRenderOptions.defaults()) + configString.byteInputStream().copyTo(path / filename, StandardCopyOption.REPLACE_EXISTING) +} diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt new file mode 100644 index 0000000000..b24db80a3f --- /dev/null +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/InternalTestUtils.kt @@ -0,0 +1,93 @@ +package net.corda.testing.internal + +import net.corda.core.CordaException +import net.corda.core.concurrent.CordaFuture +import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.times +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.millis +import net.corda.core.utilities.seconds +import org.slf4j.LoggerFactory +import java.net.Socket +import java.net.SocketException +import java.time.Duration +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit + +private val log = LoggerFactory.getLogger("net.corda.testing.internal.InternalTestUtils") + +/** + * @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended. + */ +fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, listenProcess: Process? = null) { + addressMustBeBoundFuture(executorService, hostAndPort, listenProcess).getOrThrow() +} + +fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, listenProcess: Process? = null): CordaFuture { + return poll(executorService, "address $hostAndPort to bind") { + if (listenProcess != null && !listenProcess.isAlive) { + throw ListenProcessDeathException(hostAndPort, listenProcess) + } + try { + Socket(hostAndPort.host, hostAndPort.port).close() + Unit + } catch (_exception: SocketException) { + null + } + } +} + +/* + * The default timeout value of 40 seconds have been chosen based on previous node shutdown time estimate. + * It's been observed that nodes can take up to 30 seconds to shut down, so just to stay on the safe side the 60 seconds + * timeout has been chosen. + */ +fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort, timeout: Duration = 40.seconds) { + addressMustNotBeBoundFuture(executorService, hostAndPort).getOrThrow(timeout) +} + +fun addressMustNotBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: NetworkHostAndPort): CordaFuture { + return poll(executorService, "address $hostAndPort to unbind") { + try { + Socket(hostAndPort.host, hostAndPort.port).close() + null + } catch (_exception: SocketException) { + Unit + } + } +} + +fun poll( + executorService: ScheduledExecutorService, + pollName: String, + pollInterval: Duration = 500.millis, + warnCount: Int = 120, + check: () -> A? +): CordaFuture { + val resultFuture = openFuture() + val task = object : Runnable { + var counter = -1 + override fun run() { + if (resultFuture.isCancelled) return // Give up, caller can no longer get the result. + if (++counter == warnCount) { + log.warn("Been polling $pollName for ${(pollInterval * warnCount.toLong()).seconds} seconds...") + } + try { + val checkResult = check() + if (checkResult != null) { + resultFuture.set(checkResult) + } else { + executorService.schedule(this, pollInterval.toMillis(), TimeUnit.MILLISECONDS) + } + } catch (t: Throwable) { + resultFuture.setException(t) + } + } + } + executorService.submit(task) // The check may be expensive, so always run it in the background even the first time. + return resultFuture +} + +class ListenProcessDeathException(hostAndPort: NetworkHostAndPort, listenProcess: Process) : + CordaException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}") diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt index 061974adf6..781dcd6cff 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt @@ -12,13 +12,8 @@ import net.corda.node.internal.Node import net.corda.node.internal.StartedNode import net.corda.node.internal.cordapp.CordappLoader import net.corda.node.services.config.* -import net.corda.node.services.config.ConfigHelper -import net.corda.node.services.config.configOf -import net.corda.node.services.config.parseAsNodeConfiguration -import net.corda.node.services.config.plus import net.corda.nodeapi.internal.config.User import net.corda.testing.SerializationEnvironmentRule -import net.corda.testing.driver.addressMustNotBeBoundFuture import net.corda.testing.getFreeLocalPorts import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO import org.apache.logging.log4j.Level diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/RPCDriver.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/RPCDriver.kt index 7c9a81326f..8efac75115 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/RPCDriver.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/RPCDriver.kt @@ -52,159 +52,25 @@ import java.nio.file.Path import java.nio.file.Paths import java.util.* -interface RPCDriverExposedDSLInterface : DriverDSLExposedInterface { - /** - * Starts an In-VM RPC server. Note that only a single one may be started. - * - * @param rpcUser The single user who can access the server through RPC, and their permissions. - * @param nodeLegalName The legal name of the node to check against to authenticate a super user. - * @param configuration The RPC server configuration. - * @param ops The server-side implementation of the RPC interface. - */ - fun startInVmRpcServer( - rpcUser: User = rpcTestUser, - nodeLegalName: CordaX500Name = fakeNodeLegalName, - maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, - maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE, - configuration: RPCServerConfiguration = RPCServerConfiguration.default, - ops: I - ): CordaFuture - - /** - * Starts an In-VM RPC client. - * - * @param rpcOpsClass The [Class] of the RPC interface. - * @param username The username to authenticate with. - * @param password The password to authenticate with. - * @param configuration The RPC client configuration. - */ - fun startInVmRpcClient( - rpcOpsClass: Class, - username: String = rpcTestUser.username, - password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default - ): CordaFuture - - /** - * Starts an In-VM Artemis session connecting to the RPC server. - * - * @param username The username to authenticate with. - * @param password The password to authenticate with. - */ - fun startInVmArtemisSession( - username: String = rpcTestUser.username, - password: String = rpcTestUser.password - ): ClientSession - - /** - * Starts a Netty RPC server. - * - * @param serverName The name of the server, to be used for the folder created for Artemis files. - * @param rpcUser The single user who can access the server through RPC, and their permissions. - * @param nodeLegalName The legal name of the node to check against to authenticate a super user. - * @param configuration The RPC server configuration. - * @param ops The server-side implementation of the RPC interface. - */ - fun startRpcServer( - serverName: String = "driver-rpc-server-${random63BitValue()}", - rpcUser: User = rpcTestUser, - nodeLegalName: CordaX500Name = fakeNodeLegalName, - maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, - maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE, - configuration: RPCServerConfiguration = RPCServerConfiguration.default, - customPort: NetworkHostAndPort? = null, - ops: I - ): CordaFuture - - /** - * Starts a Netty RPC client. - * - * @param rpcOpsClass The [Class] of the RPC interface. - * @param rpcAddress The address of the RPC server to connect to. - * @param username The username to authenticate with. - * @param password The password to authenticate with. - * @param configuration The RPC client configuration. - */ - fun startRpcClient( - rpcOpsClass: Class, - rpcAddress: NetworkHostAndPort, - username: String = rpcTestUser.username, - password: String = rpcTestUser.password, - configuration: RPCClientConfiguration = RPCClientConfiguration.default - ): CordaFuture - - /** - * Starts a Netty RPC client in a new JVM process that calls random RPCs with random arguments. - * - * @param rpcOpsClass The [Class] of the RPC interface. - * @param rpcAddress The address of the RPC server to connect to. - * @param username The username to authenticate with. - * @param password The password to authenticate with. - */ - fun startRandomRpcClient( - rpcOpsClass: Class, - rpcAddress: NetworkHostAndPort, - username: String = rpcTestUser.username, - password: String = rpcTestUser.password - ): CordaFuture - - /** - * Starts a Netty Artemis session connecting to an RPC server. - * - * @param rpcAddress The address of the RPC server. - * @param username The username to authenticate with. - * @param password The password to authenticate with. - */ - fun startArtemisSession( - rpcAddress: NetworkHostAndPort, - username: String = rpcTestUser.username, - password: String = rpcTestUser.password - ): ClientSession - - fun startRpcBroker( - serverName: String = "driver-rpc-server-${random63BitValue()}", - rpcUser: User = rpcTestUser, - maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, - maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE, - customPort: NetworkHostAndPort? = null - ): CordaFuture - - fun startInVmRpcBroker( - rpcUser: User = rpcTestUser, - maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, - maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE - ): CordaFuture - - fun startRpcServerWithBrokerRunning( - rpcUser: User = rpcTestUser, - nodeLegalName: CordaX500Name = fakeNodeLegalName, - configuration: RPCServerConfiguration = RPCServerConfiguration.default, - ops: I, - brokerHandle: RpcBrokerHandle - ): RpcServerHandle -} - -inline fun RPCDriverExposedDSLInterface.startInVmRpcClient( +inline fun RPCDriverDSL.startInVmRpcClient( username: String = rpcTestUser.username, password: String = rpcTestUser.password, configuration: RPCClientConfiguration = RPCClientConfiguration.default ) = startInVmRpcClient(I::class.java, username, password, configuration) -inline fun RPCDriverExposedDSLInterface.startRandomRpcClient( +inline fun RPCDriverDSL.startRandomRpcClient( hostAndPort: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password ) = startRandomRpcClient(I::class.java, hostAndPort, username, password) -inline fun RPCDriverExposedDSLInterface.startRpcClient( +inline fun RPCDriverDSL.startRpcClient( rpcAddress: NetworkHostAndPort, username: String = rpcTestUser.username, password: String = rpcTestUser.password, configuration: RPCClientConfiguration = RPCClientConfiguration.default ) = startRpcClient(I::class.java, rpcAddress, username, password, configuration) -interface RPCDriverInternalDSLInterface : DriverDSLInternalInterface, RPCDriverExposedDSLInterface - data class RpcBrokerHandle( val hostAndPort: NetworkHostAndPort?, /** null if this is an InVM broker */ @@ -235,26 +101,28 @@ fun rpcDriver( extraCordappPackagesToScan: List = emptyList(), notarySpecs: List = emptyList(), externalTrace: Trace? = null, - dsl: RPCDriverExposedDSLInterface.() -> A -) = genericDriver( - driverDsl = RPCDriverDSL( - DriverDSL( - portAllocation = portAllocation, - debugPortAllocation = debugPortAllocation, - systemProperties = systemProperties, - driverDirectory = driverDirectory.toAbsolutePath(), - useTestClock = useTestClock, - isDebug = isDebug, - startNodesInProcess = startNodesInProcess, - waitForNodesToFinish = waitForNodesToFinish, - extraCordappPackagesToScan = extraCordappPackagesToScan, - notarySpecs = notarySpecs - ), externalTrace - ), - coerce = { it }, - dsl = dsl, - initialiseSerialization = false -) + dsl: RPCDriverDSL.() -> A +): A { + return genericDriver( + driverDsl = RPCDriverDSL( + DriverDSLImpl( + portAllocation = portAllocation, + debugPortAllocation = debugPortAllocation, + systemProperties = systemProperties, + driverDirectory = driverDirectory.toAbsolutePath(), + useTestClock = useTestClock, + isDebug = isDebug, + startNodesInProcess = startNodesInProcess, + waitForNodesToFinish = waitForNodesToFinish, + extraCordappPackagesToScan = extraCordappPackagesToScan, + notarySpecs = notarySpecs + ), externalTrace + ), + coerce = { it }, + dsl = dsl, + initialiseSerialization = false + ) +} private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityManager3 { override fun validateUser(user: String?, password: String?) = isValid(user, password) @@ -276,8 +144,8 @@ private class SingleUserSecurityManager(val rpcUser: User) : ActiveMQSecurityMan } data class RPCDriverDSL( - private val driverDSL: DriverDSL, private val externalTrace: Trace? -) : DriverDSLInternalInterface by driverDSL, RPCDriverInternalDSLInterface { + private val driverDSL: DriverDSLImpl, private val externalTrace: Trace? +) : InternalDriverDSL by driverDSL { private companion object { val notificationAddress = "notifications" @@ -340,12 +208,20 @@ data class RPCDriverDSL( } } - override fun startInVmRpcServer( - rpcUser: User, - nodeLegalName: CordaX500Name, - maxFileSize: Int, - maxBufferedBytesPerClient: Long, - configuration: RPCServerConfiguration, + /** + * Starts an In-VM RPC server. Note that only a single one may be started. + * + * @param rpcUser The single user who can access the server through RPC, and their permissions. + * @param nodeLegalName The legal name of the node to check against to authenticate a super user. + * @param configuration The RPC server configuration. + * @param ops The server-side implementation of the RPC interface. + */ + fun startInVmRpcServer( + rpcUser: User = rpcTestUser, + nodeLegalName: CordaX500Name = fakeNodeLegalName, + maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, + maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE, + configuration: RPCServerConfiguration = RPCServerConfiguration.default, ops: I ): CordaFuture { return startInVmRpcBroker(rpcUser, maxFileSize, maxBufferedBytesPerClient).map { broker -> @@ -353,7 +229,20 @@ data class RPCDriverDSL( } } - override fun startInVmRpcClient(rpcOpsClass: Class, username: String, password: String, configuration: RPCClientConfiguration): CordaFuture { + /** + * Starts an In-VM RPC client. + * + * @param rpcOpsClass The [Class] of the RPC interface. + * @param username The username to authenticate with. + * @param password The password to authenticate with. + * @param configuration The RPC client configuration. + */ + fun startInVmRpcClient( + rpcOpsClass: Class, + username: String = rpcTestUser.username, + password: String = rpcTestUser.password, + configuration: RPCClientConfiguration = RPCClientConfiguration.default + ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(inVmClientTransportConfiguration, configuration) val connection = client.start(rpcOpsClass, username, password, externalTrace) @@ -364,7 +253,16 @@ data class RPCDriverDSL( } } - override fun startInVmArtemisSession(username: String, password: String): ClientSession { + /** + * Starts an In-VM Artemis session connecting to the RPC server. + * + * @param username The username to authenticate with. + * @param password The password to authenticate with. + */ + fun startInVmArtemisSession( + username: String = rpcTestUser.username, + password: String = rpcTestUser.password + ): ClientSession { val locator = ActiveMQClient.createServerLocatorWithoutHA(inVmClientTransportConfiguration) val sessionFactory = locator.createSessionFactory() val session = sessionFactory.createSession(username, password, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE) @@ -376,14 +274,23 @@ data class RPCDriverDSL( return session } - override fun startRpcServer( - serverName: String, - rpcUser: User, - nodeLegalName: CordaX500Name, - maxFileSize: Int, - maxBufferedBytesPerClient: Long, - configuration: RPCServerConfiguration, - customPort: NetworkHostAndPort?, + /** + * Starts a Netty RPC server. + * + * @param serverName The name of the server, to be used for the folder created for Artemis files. + * @param rpcUser The single user who can access the server through RPC, and their permissions. + * @param nodeLegalName The legal name of the node to check against to authenticate a super user. + * @param configuration The RPC server configuration. + * @param ops The server-side implementation of the RPC interface. + */ + fun startRpcServer( + serverName: String = "driver-rpc-server-${random63BitValue()}", + rpcUser: User = rpcTestUser, + nodeLegalName: CordaX500Name = fakeNodeLegalName, + maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, + maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE, + configuration: RPCServerConfiguration = RPCServerConfiguration.default, + customPort: NetworkHostAndPort? = null, ops: I ): CordaFuture { return startRpcBroker(serverName, rpcUser, maxFileSize, maxBufferedBytesPerClient, customPort).map { broker -> @@ -391,12 +298,21 @@ data class RPCDriverDSL( } } - override fun startRpcClient( + /** + * Starts a Netty RPC client. + * + * @param rpcOpsClass The [Class] of the RPC interface. + * @param rpcAddress The address of the RPC server to connect to. + * @param username The username to authenticate with. + * @param password The password to authenticate with. + * @param configuration The RPC client configuration. + */ + fun startRpcClient( rpcOpsClass: Class, rpcAddress: NetworkHostAndPort, - username: String, - password: String, - configuration: RPCClientConfiguration + username: String = rpcTestUser.username, + password: String = rpcTestUser.password, + configuration: RPCClientConfiguration = RPCClientConfiguration.default ): CordaFuture { return driverDSL.executorService.fork { val client = RPCClient(ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), rpcAddress, null), configuration) @@ -408,13 +324,37 @@ data class RPCDriverDSL( } } - override fun startRandomRpcClient(rpcOpsClass: Class, rpcAddress: NetworkHostAndPort, username: String, password: String): CordaFuture { + /** + * Starts a Netty RPC client in a new JVM process that calls random RPCs with random arguments. + * + * @param rpcOpsClass The [Class] of the RPC interface. + * @param rpcAddress The address of the RPC server to connect to. + * @param username The username to authenticate with. + * @param password The password to authenticate with. + */ + fun startRandomRpcClient( + rpcOpsClass: Class, + rpcAddress: NetworkHostAndPort, + username: String = rpcTestUser.username, + password: String = rpcTestUser.password + ): CordaFuture { val process = ProcessUtilities.startJavaProcess(listOf(rpcOpsClass.name, rpcAddress.toString(), username, password)) driverDSL.shutdownManager.registerProcessShutdown(process) return doneFuture(process) } - override fun startArtemisSession(rpcAddress: NetworkHostAndPort, username: String, password: String): ClientSession { + /** + * Starts a Netty Artemis session connecting to an RPC server. + * + * @param rpcAddress The address of the RPC server. + * @param username The username to authenticate with. + * @param password The password to authenticate with. + */ + fun startArtemisSession( + rpcAddress: NetworkHostAndPort, + username: String = rpcTestUser.username, + password: String = rpcTestUser.password + ): ClientSession { val locator = ActiveMQClient.createServerLocatorWithoutHA(createNettyClientTransportConfiguration(rpcAddress)) val sessionFactory = locator.createSessionFactory() val session = sessionFactory.createSession(username, password, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) @@ -427,12 +367,12 @@ data class RPCDriverDSL( return session } - override fun startRpcBroker( - serverName: String, - rpcUser: User, - maxFileSize: Int, - maxBufferedBytesPerClient: Long, - customPort: NetworkHostAndPort? + fun startRpcBroker( + serverName: String = "driver-rpc-server-${random63BitValue()}", + rpcUser: User = rpcTestUser, + maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, + maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE, + customPort: NetworkHostAndPort? = null ): CordaFuture { val hostAndPort = customPort ?: driverDSL.portAllocation.nextHostAndPort() addressMustNotBeBound(driverDSL.executorService, hostAndPort) @@ -452,7 +392,11 @@ data class RPCDriverDSL( } } - override fun startInVmRpcBroker(rpcUser: User, maxFileSize: Int, maxBufferedBytesPerClient: Long): CordaFuture { + fun startInVmRpcBroker( + rpcUser: User = rpcTestUser, + maxFileSize: Int = ArtemisMessagingServer.MAX_FILE_SIZE, + maxBufferedBytesPerClient: Long = 10L * ArtemisMessagingServer.MAX_FILE_SIZE + ): CordaFuture { return driverDSL.executorService.fork { val artemisConfig = createInVmRpcServerArtemisConfig(maxFileSize, maxBufferedBytesPerClient) val server = EmbeddedActiveMQ() @@ -471,10 +415,10 @@ data class RPCDriverDSL( } } - override fun startRpcServerWithBrokerRunning( - rpcUser: User, - nodeLegalName: CordaX500Name, - configuration: RPCServerConfiguration, + fun startRpcServerWithBrokerRunning( + rpcUser: User = rpcTestUser, + nodeLegalName: CordaX500Name = fakeNodeLegalName, + configuration: RPCServerConfiguration = RPCServerConfiguration.default, ops: I, brokerHandle: RpcBrokerHandle ): RpcServerHandle { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/ShutdownManager.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/ShutdownManager.kt similarity index 99% rename from testing/node-driver/src/main/kotlin/net/corda/testing/driver/ShutdownManager.kt rename to testing/node-driver/src/main/kotlin/net/corda/testing/internal/ShutdownManager.kt index 0e8752aeb6..022569b958 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/driver/ShutdownManager.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/ShutdownManager.kt @@ -1,4 +1,4 @@ -package net.corda.testing.driver +package net.corda.testing.internal import net.corda.core.concurrent.CordaFuture import net.corda.core.internal.ThreadBox diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/demorun/DemoRunner.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/demorun/DemoRunner.kt index 5155a31850..fb7396ec1d 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/demorun/DemoRunner.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/demorun/DemoRunner.kt @@ -8,7 +8,7 @@ import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.transpose import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow -import net.corda.testing.driver.DriverDSL +import net.corda.testing.internal.DriverDSLImpl import net.corda.testing.driver.PortAllocation import net.corda.testing.driver.driver @@ -49,8 +49,8 @@ private fun CordformDefinition.runNodes(waitForAllNodesToFinish: Boolean, block: portAllocation = PortAllocation.Incremental(maxPort + 1), waitForAllNodesToFinish = waitForAllNodesToFinish ) { + this as DriverDSLImpl // access internal API setup(this) - this as DriverDSL // startCordformNode is an internal API nodes.map { val startedNode = startCordformNode(it) if (it.webAddress != null) { diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/performance/Injectors.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/performance/Injectors.kt index 83ef6fc585..b4b0f9f33d 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/performance/Injectors.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/performance/Injectors.kt @@ -3,7 +3,7 @@ package net.corda.testing.internal.performance import com.codahale.metrics.Gauge import com.codahale.metrics.MetricRegistry import com.google.common.base.Stopwatch -import net.corda.testing.driver.ShutdownManager +import net.corda.testing.internal.ShutdownManager import java.time.Duration import java.util.* import java.util.concurrent.CountDownLatch diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/performance/Reporter.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/performance/Reporter.kt index 9cac82d1af..5446165087 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/performance/Reporter.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/performance/Reporter.kt @@ -3,7 +3,7 @@ package net.corda.testing.internal.performance import com.codahale.metrics.ConsoleReporter import com.codahale.metrics.JmxReporter import com.codahale.metrics.MetricRegistry -import net.corda.testing.driver.ShutdownManager +import net.corda.testing.internal.ShutdownManager import java.util.concurrent.TimeUnit import javax.management.ObjectName import kotlin.concurrent.thread diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/X500NameUtils.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/X500NameUtils.kt deleted file mode 100644 index 1af6357b7e..0000000000 --- a/testing/test-utils/src/main/kotlin/net/corda/testing/X500NameUtils.kt +++ /dev/null @@ -1,29 +0,0 @@ -@file:JvmName("X500NameUtils") - -package net.corda.testing - -import org.bouncycastle.asn1.x500.X500Name -import org.bouncycastle.asn1.x500.X500NameBuilder -import org.bouncycastle.asn1.x500.style.BCStyle - -/** - * Generate a distinguished name from the provided X500 . - * - * @param O organisation name. - * @param L locality. - * @param C county. - * @param CN common name. - * @param OU organisation unit. - * @param ST state. - */ -@JvmOverloads -fun getX500Name(O: String, L: String, C: String, CN: String? = null, OU: String? = null, ST: String? = null): X500Name { - return X500NameBuilder(BCStyle.INSTANCE).apply { - addRDN(BCStyle.C, C) - ST?.let { addRDN(BCStyle.ST, it) } - addRDN(BCStyle.L, L) - addRDN(BCStyle.O, O) - OU?.let { addRDN(BCStyle.OU, it) } - CN?.let { addRDN(BCStyle.CN, it) } - }.build() -} \ No newline at end of file diff --git a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt index 12365b2f65..8ce77720f5 100644 --- a/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt +++ b/verifier/src/integration-test/kotlin/net/corda/verifier/VerifierDriver.kt @@ -16,14 +16,16 @@ import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.contextLogger import net.corda.node.services.config.configureDevKeyAndTrustStores -import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER import net.corda.nodeapi.ArtemisTcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.VerifierApi import net.corda.nodeapi.internal.config.NodeSSLConfiguration import net.corda.nodeapi.internal.config.SSLConfiguration -import net.corda.testing.driver.* -import net.corda.testing.internal.ProcessUtilities +import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_USER +import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.PortAllocation +import net.corda.testing.driver.driver +import net.corda.testing.internal.* import net.corda.testing.node.NotarySpec import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient @@ -43,34 +45,6 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.atomic.AtomicInteger -/** - * This file defines an extension to [DriverDSL] that allows starting of verifier processes and - * lightweight verification requestors. - */ -interface VerifierExposedDSLInterface : DriverDSLExposedInterface { - /** Starts a lightweight verification requestor that implements the Node's Verifier API */ - fun startVerificationRequestor(name: CordaX500Name): CordaFuture - - /** Starts an out of process verifier connected to [address] */ - fun startVerifier(address: NetworkHostAndPort): CordaFuture - - /** - * Waits until [number] verifiers are listening for verification requests coming from the Node. Check - * [VerificationRequestorHandle.waitUntilNumberOfVerifiers] for an equivalent for requestors. - */ - fun NodeHandle.waitUntilNumberOfVerifiers(number: Int) -} - -/** Starts a verifier connecting to the specified node */ -fun VerifierExposedDSLInterface.startVerifier(nodeHandle: NodeHandle) = - startVerifier(nodeHandle.configuration.p2pAddress) - -/** Starts a verifier connecting to the specified requestor */ -fun VerifierExposedDSLInterface.startVerifier(verificationRequestorHandle: VerificationRequestorHandle) = - startVerifier(verificationRequestorHandle.p2pAddress) - -interface VerifierInternalDSLInterface : DriverDSLInternalInterface, VerifierExposedDSLInterface - /** * Behaves the same as [driver] and adds verifier-related functionality. */ @@ -85,10 +59,10 @@ fun verifierDriver( waitForNodesToFinish: Boolean = false, extraCordappPackagesToScan: List = emptyList(), notarySpecs: List = emptyList(), - dsl: VerifierExposedDSLInterface.() -> A + dsl: VerifierDriverDSL.() -> A ) = genericDriver( driverDsl = VerifierDriverDSL( - DriverDSL( + DriverDSLImpl( portAllocation = portAllocation, debugPortAllocation = debugPortAllocation, systemProperties = systemProperties, @@ -143,10 +117,8 @@ data class VerificationRequestorHandle( } -data class VerifierDriverDSL( - val driverDSL: DriverDSL -) : DriverDSLInternalInterface by driverDSL, VerifierInternalDSLInterface { - val verifierCount = AtomicInteger(0) +data class VerifierDriverDSL(private val driverDSL: DriverDSLImpl) : InternalDriverDSL by driverDSL { + private val verifierCount = AtomicInteger(0) companion object { private val log = contextLogger() @@ -183,7 +155,8 @@ data class VerifierDriverDSL( } } - override fun startVerificationRequestor(name: CordaX500Name): CordaFuture { + /** Starts a lightweight verification requestor that implements the Node's Verifier API */ + fun startVerificationRequestor(name: CordaX500Name): CordaFuture { val hostAndPort = driverDSL.portAllocation.nextHostAndPort() return driverDSL.executorService.fork { startVerificationRequestorInternal(name, hostAndPort) @@ -255,7 +228,8 @@ data class VerifierDriverDSL( ) } - override fun startVerifier(address: NetworkHostAndPort): CordaFuture { + /** Starts an out of process verifier connected to [address] */ + fun startVerifier(address: NetworkHostAndPort): CordaFuture { log.info("Starting verifier connecting to address $address") val id = verifierCount.andIncrement val jdwpPort = if (driverDSL.isDebug) driverDSL.debugPortAllocation.nextPort() else null @@ -270,6 +244,16 @@ data class VerifierDriverDSL( return doneFuture(VerifierHandle(process)) } + /** Starts a verifier connecting to the specified node */ + fun startVerifier(nodeHandle: NodeHandle): CordaFuture { + return startVerifier(nodeHandle.configuration.p2pAddress) + } + + /** Starts a verifier connecting to the specified requestor */ + fun startVerifier(verificationRequestorHandle: VerificationRequestorHandle): CordaFuture { + return startVerifier(verificationRequestorHandle.p2pAddress) + } + private fun NodeHandle.connectToNode(closure: (ClientSession) -> A): A { val transport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), configuration.p2pAddress, configuration) val locator = ActiveMQClient.createServerLocatorWithoutHA(transport) @@ -280,7 +264,11 @@ data class VerifierDriverDSL( } } - override fun NodeHandle.waitUntilNumberOfVerifiers(number: Int) { + /** + * Waits until [number] verifiers are listening for verification requests coming from the Node. Check + * [VerificationRequestorHandle.waitUntilNumberOfVerifiers] for an equivalent for requestors. + */ + fun NodeHandle.waitUntilNumberOfVerifiers(number: Int) { connectToNode { session -> poll(driverDSL.executorService, "$number verifiers to come online") { if (session.queueQuery(SimpleString(VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount >= number) { diff --git a/webserver/src/integration-test/kotlin/net/corda/webserver/WebserverDriverTests.kt b/webserver/src/integration-test/kotlin/net/corda/webserver/WebserverDriverTests.kt index ee85cef6fd..c23c270015 100644 --- a/webserver/src/integration-test/kotlin/net/corda/webserver/WebserverDriverTests.kt +++ b/webserver/src/integration-test/kotlin/net/corda/webserver/WebserverDriverTests.kt @@ -4,8 +4,8 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.getOrThrow import net.corda.testing.DUMMY_BANK_A import net.corda.testing.driver.WebserverHandle -import net.corda.testing.driver.addressMustBeBound -import net.corda.testing.driver.addressMustNotBeBound +import net.corda.testing.internal.addressMustBeBound +import net.corda.testing.internal.addressMustNotBeBound import net.corda.testing.driver.driver import org.junit.Test import java.util.concurrent.Executors