diff --git a/.idea/compiler.xml b/.idea/compiler.xml index ae994c7ef9..44a28247dd 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -42,10 +42,13 @@ + + + diff --git a/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java b/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java index 4c0245462d..d46afb11b9 100644 --- a/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java +++ b/client/rpc/src/integration-test/java/net/corda/client/rpc/CordaRPCJavaClientTest.java @@ -1,6 +1,5 @@ package net.corda.client.rpc; -import net.corda.core.concurrent.CordaFuture; import net.corda.core.contracts.Amount; import net.corda.core.messaging.CordaRPCOps; import net.corda.core.messaging.FlowHandle; @@ -13,7 +12,7 @@ import net.corda.node.internal.Node; import net.corda.node.internal.StartedNode; import net.corda.nodeapi.User; import net.corda.testing.CoreTestUtils; -import net.corda.testing.node.NodeBasedTest; +import net.corda.testing.internal.NodeBasedTest; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -51,8 +50,7 @@ public class CordaRPCJavaClientTest extends NodeBasedTest { @Before public void setUp() throws ExecutionException, InterruptedException { - CordaFuture> nodeFuture = startNotaryNode(getALICE().getName(), singletonList(rpcUser), true); - node = nodeFuture.get(); + node = startNode(getALICE().getName(), 1, singletonList(rpcUser)); client = new CordaRPCClient(requireNonNull(node.getInternals().getConfiguration().getRpcAddress())); } diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt index 89264f2e05..14ae217a2b 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/BlacklistKotlinClosureTest.kt @@ -2,89 +2,38 @@ package net.corda.client.rpc import co.paralleluniverse.fibers.Suspendable import com.esotericsoftware.kryo.KryoException -import net.corda.core.flows.* -import net.corda.core.identity.Party +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.StartableByRPC import net.corda.core.messaging.startFlow import net.corda.core.serialization.CordaSerializable import net.corda.core.utilities.getOrThrow -import net.corda.core.utilities.loggerFor -import net.corda.core.utilities.unwrap -import net.corda.node.internal.Node -import net.corda.node.internal.StartedNode -import net.corda.nodeapi.User -import net.corda.testing.* -import net.corda.testing.node.NodeBasedTest -import org.junit.After -import org.junit.Before -import org.junit.Rule +import net.corda.testing.ALICE +import net.corda.testing.driver.driver +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.Test -import org.junit.rules.ExpectedException -@CordaSerializable -data class Packet(val x: () -> Long) - -class BlacklistKotlinClosureTest : NodeBasedTest(listOf("net.corda.client.rpc")) { +class BlacklistKotlinClosureTest { companion object { - @Suppress("UNUSED") val logger = loggerFor() const val EVIL: Long = 666 } @StartableByRPC - @InitiatingFlow - class FlowC(private val remoteParty: Party, private val data: Packet) : FlowLogic() { + class FlowC(@Suppress("unused") private val data: Packet) : FlowLogic() { @Suspendable - override fun call() { - val session = initiateFlow(remoteParty) - val x = session.sendAndReceive(data).unwrap { x -> x } - logger.info("FlowC: ${x.x()}") - } + override fun call() = Unit } - @InitiatedBy(FlowC::class) - class RemoteFlowC(private val session: FlowSession) : FlowLogic() { - @Suspendable - override fun call() { - val packet = session.receive().unwrap { x -> x } - logger.info("RemoteFlowC: ${packet.x() + 1}") - session.send(Packet({ packet.x() + 1 })) - } - } - - @JvmField - @Rule - val expectedEx: ExpectedException = ExpectedException.none() - - private val rpcUser = User("user1", "test", permissions = setOf("ALL")) - private lateinit var aliceNode: StartedNode - private lateinit var bobNode: StartedNode - private lateinit var aliceClient: CordaRPCClient - private var connection: CordaRPCConnection? = null - - private fun login(username: String, password: String) { - connection = aliceClient.start(username, password) - } - - @Before - fun setUp() { - aliceNode = startNode(ALICE.name, rpcUsers = listOf(rpcUser)).getOrThrow() - bobNode = startNode(BOB.name, rpcUsers = listOf(rpcUser)).getOrThrow() - bobNode.registerInitiatedFlow(RemoteFlowC::class.java) - aliceClient = CordaRPCClient(aliceNode.internals.configuration.rpcAddress!!) - } - - @After - fun done() { - connection?.close() - bobNode.internals.stop() - aliceNode.internals.stop() - } + @CordaSerializable + data class Packet(val x: () -> Long) @Test fun `closure sent via RPC`() { - login(rpcUser.username, rpcUser.password) - val proxy = connection!!.proxy - expectedEx.expect(KryoException::class.java) - expectedEx.expectMessage("is not annotated or on the whitelist, so cannot be used in serialization") - proxy.startFlow(::FlowC, bobNode.info.chooseIdentity(), Packet{ EVIL }).returnValue.getOrThrow() + driver(startNodesInProcess = true) { + val rpc = startNode(providedName = ALICE.name).getOrThrow().rpc + val packet = Packet { EVIL } + assertThatExceptionOfType(KryoException::class.java) + .isThrownBy { rpc.startFlow(::FlowC, packet) } + .withMessageContaining("is not annotated or on the whitelist, so cannot be used in serialization") + } } } \ No newline at end of file diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt index 9479ffe36a..e5b412b596 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/CordaRPCClientTest.kt @@ -24,7 +24,7 @@ import net.corda.node.services.FlowPermissions.Companion.startFlowPermission import net.corda.nodeapi.User import net.corda.testing.ALICE import net.corda.testing.chooseIdentity -import net.corda.testing.node.NodeBasedTest +import net.corda.testing.internal.NodeBasedTest import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.After @@ -49,7 +49,7 @@ class CordaRPCClientTest : NodeBasedTest(listOf("net.corda.finance.contracts", C @Before fun setUp() { - node = startNotaryNode(ALICE.name, rpcUsers = listOf(rpcUser)).getOrThrow() + node = startNode(ALICE.name, rpcUsers = listOf(rpcUser)) client = CordaRPCClient(node.internals.configuration.rpcAddress!!) } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt index 3ba78ff7bd..745b763e9e 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/RaftNotaryServiceTests.kt @@ -8,53 +8,53 @@ import net.corda.core.flows.NotaryFlow import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.internal.concurrent.map -import net.corda.core.internal.concurrent.transpose import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow import net.corda.node.internal.StartedNode import net.corda.node.services.transactions.RaftValidatingNotaryService -import net.corda.testing.* +import net.corda.testing.DUMMY_BANK_A +import net.corda.testing.chooseIdentity import net.corda.testing.contracts.DummyContract -import net.corda.testing.node.NodeBasedTest +import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.driver +import net.corda.testing.dummyCommand import org.junit.Test import java.util.* import kotlin.test.assertEquals import kotlin.test.assertFailsWith -class RaftNotaryServiceTests : NodeBasedTest(listOf("net.corda.testing.contracts")) { +class RaftNotaryServiceTests { private val notaryName = CordaX500Name(RaftValidatingNotaryService.id, "RAFT Notary Service", "London", "GB") @Test fun `detect double spend`() { - val (bankA) = listOf( - startNode(DUMMY_BANK_A.name), - startNotaryCluster(notaryName, 3).map { it.first() } - ).transpose().getOrThrow() + driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.testing.contracts")) { + val (notaryParty) = startNotaryCluster(notaryName, 3).getOrThrow() + val bankA = startNode(providedName = DUMMY_BANK_A.name).map { (it as NodeHandle.InProcess).node }.getOrThrow() - val notaryParty = bankA.services.networkMapCache.getNotary(notaryName)!! + val inputState = issueState(bankA, notaryParty) - val inputState = issueState(bankA, notaryParty) + val firstTxBuilder = TransactionBuilder(notaryParty) + .addInputState(inputState) + .addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey)) + val firstSpendTx = bankA.services.signInitialTransaction(firstTxBuilder) - val firstTxBuilder = TransactionBuilder(notaryParty) - .addInputState(inputState) - .addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey)) - val firstSpendTx = bankA.services.signInitialTransaction(firstTxBuilder) + val firstSpend = bankA.services.startFlow(NotaryFlow.Client(firstSpendTx)) + firstSpend.resultFuture.getOrThrow() - val firstSpend = bankA.services.startFlow(NotaryFlow.Client(firstSpendTx)) - firstSpend.resultFuture.getOrThrow() + val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run { + val dummyState = DummyContract.SingleOwnerState(0, bankA.info.chooseIdentity()) + addOutputState(dummyState, DummyContract.PROGRAM_ID) + addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey)) + this + } + val secondSpendTx = bankA.services.signInitialTransaction(secondSpendBuilder) + val secondSpend = bankA.services.startFlow(NotaryFlow.Client(secondSpendTx)) - val secondSpendBuilder = TransactionBuilder(notaryParty).withItems(inputState).run { - val dummyState = DummyContract.SingleOwnerState(0, bankA.info.chooseIdentity()) - addOutputState(dummyState, DummyContract.PROGRAM_ID) - addCommand(dummyCommand(bankA.services.myInfo.chooseIdentity().owningKey)) - this + val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() } + val error = ex.error as NotaryError.Conflict + assertEquals(error.txId, secondSpendTx.id) } - val secondSpendTx = bankA.services.signInitialTransaction(secondSpendBuilder) - val secondSpend = bankA.services.startFlow(NotaryFlow.Client(secondSpendTx)) - - val ex = assertFailsWith(NotaryException::class) { secondSpend.resultFuture.getOrThrow() } - val error = ex.error as NotaryError.Conflict - assertEquals(error.txId, secondSpendTx.id) } private fun issueState(node: StartedNode<*>, notary: Party): StateAndRef<*> { diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt index d307a02705..07b04868a0 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/NodeInfoWatcherTest.kt @@ -13,14 +13,12 @@ import net.corda.testing.ALICE import net.corda.testing.ALICE_KEY import net.corda.testing.DEV_TRUST_ROOT import net.corda.testing.getTestPartyAndCertificate +import net.corda.testing.internal.NodeBasedTest import net.corda.testing.node.MockKeyManagementService -import net.corda.testing.node.NodeBasedTest import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.contentOf import org.junit.Before -import org.junit.Rule import org.junit.Test -import org.junit.rules.TemporaryFolder import rx.observers.TestSubscriber import rx.schedulers.TestScheduler import java.nio.file.Path @@ -29,42 +27,37 @@ import kotlin.test.assertEquals import kotlin.test.assertTrue class NodeInfoWatcherTest : NodeBasedTest() { - - @Rule - @JvmField - var folder = TemporaryFolder() - - lateinit var keyManagementService: KeyManagementService - lateinit var nodeInfoPath: Path - val scheduler = TestScheduler() - val testSubscriber = TestSubscriber() - - // Object under test - lateinit var nodeInfoWatcher: NodeInfoWatcher - companion object { val nodeInfo = NodeInfo(listOf(), listOf(getTestPartyAndCertificate(ALICE)), 0, 0) } + private lateinit var keyManagementService: KeyManagementService + private lateinit var nodeInfoPath: Path + private val scheduler = TestScheduler() + private val testSubscriber = TestSubscriber() + + // Object under test + private lateinit var nodeInfoWatcher: NodeInfoWatcher + @Before fun start() { val identityService = InMemoryIdentityService(trustRoot = DEV_TRUST_ROOT) keyManagementService = MockKeyManagementService(identityService, ALICE_KEY) - nodeInfoWatcher = NodeInfoWatcher(folder.root.toPath(), scheduler = scheduler) - nodeInfoPath = folder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY + nodeInfoWatcher = NodeInfoWatcher(tempFolder.root.toPath(), scheduler = scheduler) + nodeInfoPath = tempFolder.root.toPath() / CordformNode.NODE_INFO_DIRECTORY } @Test fun `save a NodeInfo`() { assertEquals(0, - folder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size) - NodeInfoWatcher.saveToFile(folder.root.toPath(), nodeInfo, keyManagementService) + tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) }.size) + NodeInfoWatcher.saveToFile(tempFolder.root.toPath(), nodeInfo, keyManagementService) - val nodeInfoFiles = folder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) } + val nodeInfoFiles = tempFolder.root.list().filter { it.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX) } assertEquals(1, nodeInfoFiles.size) val fileName = nodeInfoFiles.first() assertTrue(fileName.startsWith(NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX)) - val file = (folder.root.path / fileName).toFile() + val file = (tempFolder.root.path / fileName).toFile() // Just check that something is written, another tests verifies that the written value can be read back. assertThat(contentOf(file)).isNotEmpty() } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt index 6eb921fc4d..30ac56d490 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/network/PersistentNetworkMapCacheTest.kt @@ -1,32 +1,28 @@ package net.corda.node.services.network -import co.paralleluniverse.fibers.Suspendable -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowSession -import net.corda.core.flows.InitiatedBy -import net.corda.core.flows.InitiatingFlow import net.corda.core.identity.CordaX500Name import net.corda.core.identity.Party import net.corda.core.node.NodeInfo -import net.corda.core.utilities.* +import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.internal.Node import net.corda.node.internal.StartedNode -import net.corda.testing.* -import net.corda.testing.node.NodeBasedTest -import org.assertj.core.api.Assertions.assertThat +import net.corda.testing.ALICE +import net.corda.testing.BOB +import net.corda.testing.DUMMY_NOTARY +import net.corda.testing.chooseIdentity +import net.corda.testing.internal.NodeBasedTest import org.junit.Before import org.junit.Test import kotlin.test.assertEquals class PersistentNetworkMapCacheTest : NodeBasedTest() { private val partiesList = listOf(DUMMY_NOTARY, ALICE, BOB) - private val addressesMap: HashMap = HashMap() + private val addressesMap = HashMap() private val infos: MutableSet = HashSet() @Before fun start() { val nodes = startNodesWithPort(partiesList) - nodes.forEach { it.internals.nodeReadyFuture.get() } // Need to wait for network map registration, as these tests are ran without waiting. nodes.forEach { infos.add(it.info) addressesMap[it.info.chooseIdentity().name] = it.info.addresses[0] @@ -35,7 +31,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { } @Test - fun `get nodes by owning key and by name, no network map service`() { + fun `get nodes by owning key and by name`() { val alice = startNodesWithPort(listOf(ALICE))[0] val netCache = alice.services.networkMapCache alice.database.transaction { @@ -47,7 +43,7 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { } @Test - fun `get nodes by address no network map service`() { + fun `get nodes by address`() { val alice = startNodesWithPort(listOf(ALICE))[0] val netCache = alice.services.networkMapCache alice.database.transaction { @@ -57,92 +53,20 @@ class PersistentNetworkMapCacheTest : NodeBasedTest() { } @Test - fun `restart node with DB map cache and no network map`() { + fun `restart node with DB map cache`() { val alice = startNodesWithPort(listOf(ALICE))[0] val partyNodes = alice.services.networkMapCache.allNodes assertEquals(infos.size, partyNodes.size) assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet()) } - @Test - fun `start 2 nodes without pointing at NetworkMapService and communicate with each other`() { - val parties = partiesList.subList(1, partiesList.size) - val nodes = startNodesWithPort(parties) - nodes.forEach { - val partyNodes = it.services.networkMapCache.allNodes - assertEquals(infos.size, partyNodes.size) - assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet()) - } - checkConnectivity(nodes) - } - - @Test - fun `start 2 nodes pointing at NetworkMapService but don't start network map node`() { - val parties = partiesList.subList(1, partiesList.size) - val nodes = startNodesWithPort(parties) - nodes.forEach { - val partyNodes = it.services.networkMapCache.allNodes - assertEquals(infos.size, partyNodes.size) - assertEquals(infos.flatMap { it.legalIdentities }.toSet(), partyNodes.flatMap { it.legalIdentities }.toSet()) - } - checkConnectivity(nodes) - } - - @Test - fun `start node and network map communicate`() { - val parties = partiesList.subList(0, 2) - val nodes = startNodesWithPort(parties) - checkConnectivity(nodes) - } - - @Test - fun `start node without networkMapService and no database - success`() { - startNode(CHARLIE.name).getOrThrow(2.seconds) - } - // HELPERS // Helper function to restart nodes with the same host and port. private fun startNodesWithPort(nodesToStart: List, customRetryIntervalMs: Long? = null): List> { return nodesToStart.map { party -> val configOverrides = (addressesMap[party.name]?.let { mapOf("p2pAddress" to it.toString()) } ?: emptyMap()) + (customRetryIntervalMs?.let { mapOf("activeMQServer.bridge.retryIntervalMs" to it.toString()) } ?: emptyMap()) - startNode(party.name, - configOverrides = configOverrides, - waitForConnection = false).getOrThrow() - } - } - - // Check that nodes are functional, communicate each with each. - private fun checkConnectivity(nodes: List>) { - nodes.forEach { node1 -> - nodes.forEach { node2 -> - if (!(node1 === node2)) { // Do not check connectivity to itself - node2.internals.registerInitiatedFlow(SendBackFlow::class.java) - val resultFuture = node1.services.startFlow(SendFlow(node2.info.chooseIdentity())).resultFuture - assertThat(resultFuture.getOrThrow()).isEqualTo("Hello!") - } - } - } - } - - @InitiatingFlow - private class SendFlow(val otherParty: Party) : FlowLogic() { - @Suspendable - override fun call(): String { - logger.info("SEND FLOW to $otherParty") - logger.info("Party key ${otherParty.owningKey.toBase58String()}") - val session = initiateFlow(otherParty) - return session.sendAndReceive("Hi!").unwrap { it } - } - } - - @InitiatedBy(SendFlow::class) - private class SendBackFlow(val otherSideSession: FlowSession) : FlowLogic() { - @Suspendable - override fun call() { - logger.info("SEND BACK FLOW to ${otherSideSession.counterparty}") - logger.info("Party key ${otherSideSession.counterparty.owningKey.toBase58String()}") - otherSideSession.send("Hello!") + startNode(party.name, configOverrides = configOverrides) } } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt index a32c1eae59..533a5df56a 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowVersioningTest.kt @@ -5,22 +5,20 @@ import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowSession import net.corda.core.flows.InitiatingFlow import net.corda.core.identity.Party -import net.corda.core.internal.concurrent.transpose import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.testing.ALICE import net.corda.testing.BOB import net.corda.testing.chooseIdentity -import net.corda.testing.node.NodeBasedTest +import net.corda.testing.internal.NodeBasedTest import org.assertj.core.api.Assertions.assertThat import org.junit.Test class FlowVersioningTest : NodeBasedTest() { @Test fun `getFlowContext returns the platform version for core flows`() { - val (alice, bob) = listOf( - startNode(ALICE.name, platformVersion = 2), - startNode(BOB.name, platformVersion = 3)).transpose().getOrThrow() + val alice = startNode(ALICE.name, platformVersion = 2) + val bob = startNode(BOB.name, platformVersion = 3) bob.internals.installCoreFlow(PretendInitiatingCoreFlow::class, ::PretendInitiatedCoreFlow) val (alicePlatformVersionAccordingToBob, bobPlatformVersionAccordingToAlice) = alice.services.startFlow( PretendInitiatingCoreFlow(bob.info.chooseIdentity())).resultFuture.getOrThrow() diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt index d6aaeb10a7..bc1817035d 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/MQSecurityTest.kt @@ -28,8 +28,8 @@ import net.corda.testing.ALICE import net.corda.testing.BOB import net.corda.testing.chooseIdentity import net.corda.testing.configureTestSSL +import net.corda.testing.internal.NodeBasedTest import net.corda.testing.messaging.SimpleMQClient -import net.corda.testing.node.NodeBasedTest import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException import org.apache.activemq.artemis.api.core.ActiveMQSecurityException import org.apache.activemq.artemis.api.core.SimpleString @@ -52,7 +52,7 @@ abstract class MQSecurityTest : NodeBasedTest() { @Before fun start() { - alice = startNode(ALICE.name, rpcUsers = extraRPCUsers + rpcUser).getOrThrow() + alice = startNode(ALICE.name, rpcUsers = extraRPCUsers + rpcUser) attacker = createAttacker() startAttacker(attacker) } @@ -87,7 +87,7 @@ abstract class MQSecurityTest : NodeBasedTest() { @Test fun `create queue for peer which has not been communicated with`() { - val bob = startNode(BOB.name).getOrThrow() + val bob = startNode(BOB.name) assertAllQueueCreationAttacksFail("$PEERS_PREFIX${bob.info.chooseIdentity().owningKey.toBase58String()}") } @@ -219,7 +219,7 @@ abstract class MQSecurityTest : NodeBasedTest() { } private fun startBobAndCommunicateWithAlice(): Party { - val bob = startNode(BOB.name).getOrThrow() + val bob = startNode(BOB.name) bob.internals.registerInitiatedFlow(ReceiveFlow::class.java) val bobParty = bob.info.chooseIdentity() // Perform a protocol exchange to force the peer queue to be created diff --git a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt index a371ca1704..c43185bdeb 100644 --- a/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt +++ b/node/src/integration-test/kotlin/net/corda/services/messaging/P2PMessagingTest.kt @@ -3,10 +3,8 @@ package net.corda.services.messaging import net.corda.core.concurrent.CordaFuture import net.corda.core.crypto.random63BitValue import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.concurrent.transpose -import net.corda.core.internal.elapsedTime +import net.corda.core.internal.concurrent.map import net.corda.core.internal.randomOrNull -import net.corda.core.internal.times import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.serialization.CordaSerializable @@ -14,116 +12,122 @@ import net.corda.core.serialization.deserialize import net.corda.core.serialization.serialize import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds +import net.corda.node.internal.Node import net.corda.node.internal.StartedNode import net.corda.node.services.messaging.* import net.corda.node.services.transactions.RaftValidatingNotaryService -import net.corda.testing.* -import net.corda.testing.node.NodeBasedTest +import net.corda.testing.ALICE +import net.corda.testing.chooseIdentity +import net.corda.testing.driver.DriverDSLExposedInterface +import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.driver import org.assertj.core.api.Assertions.assertThat -import org.junit.Ignore import org.junit.Test import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger -class P2PMessagingTest : NodeBasedTest() { +class P2PMessagingTest { private companion object { val DISTRIBUTED_SERVICE_NAME = CordaX500Name(RaftValidatingNotaryService.id, "DistributedService", "London", "GB") } - @Test - fun `network map will work after restart`() { - val identities = listOf(DUMMY_BANK_A, DUMMY_BANK_B, DUMMY_NOTARY) - fun startNodes() = identities.map { startNode(it.name) }.transpose() - - val startUpDuration = elapsedTime { startNodes().getOrThrow() } - // Start the network map a second time - this will restore message queues from the journal. - // This will hang and fail prior the fix. https://github.com/corda/corda/issues/37 - clearAllNodeInfoDb() // Clear network map data from nodes databases. - stopAllNodes() - startNodes().getOrThrow(timeout = startUpDuration * 3) - } - @Test fun `communicating with a distributed service which we're part of`() { - val distributedService = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow() - assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, distributedService[0]) + driver(startNodesInProcess = true) { + val distributedService = startDistributedService() + assertAllNodesAreUsed(distributedService, DISTRIBUTED_SERVICE_NAME, distributedService[0]) + } } @Test fun `distributed service requests are retried if one of the nodes in the cluster goes down without sending a response`() { - val distributedServiceNodes = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow() - val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() - val serviceAddress = alice.services.networkMapCache.run { - val notaryParty = notaryIdentities.randomOrNull()!! - alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!) + driver(startNodesInProcess = true) { + val distributedServiceNodes = startDistributedService() + val alice = startAlice() + val serviceAddress = alice.services.networkMapCache.run { + val notaryParty = notaryIdentities.randomOrNull()!! + alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!) + } + + val dummyTopic = "dummy.topic" + val responseMessage = "response" + + val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage) + + // Send a single request with retry + val responseFuture = with(alice.network) { + val request = TestRequest(replyTo = myAddress) + val responseFuture = onNext(dummyTopic, request.sessionID) + val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes) + send(msg, serviceAddress, retryId = request.sessionID) + responseFuture + } + crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) + // The request wasn't successful. + assertThat(responseFuture.isDone).isFalse() + crashingNodes.ignoreRequests = false + + // The retry should be successful. + val response = responseFuture.getOrThrow(10.seconds) + assertThat(response).isEqualTo(responseMessage) } - - val dummyTopic = "dummy.topic" - val responseMessage = "response" - - val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage) - - // Send a single request with retry - val responseFuture = with(alice.network) { - val request = TestRequest(replyTo = myAddress) - val responseFuture = onNext(dummyTopic, request.sessionID) - val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes) - send(msg, serviceAddress, retryId = request.sessionID) - responseFuture - } - crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) - // The request wasn't successful. - assertThat(responseFuture.isDone).isFalse() - crashingNodes.ignoreRequests = false - - // The retry should be successful. - val response = responseFuture.getOrThrow(10.seconds) - assertThat(response).isEqualTo(responseMessage) } @Test - @Ignore("Fails on Team City due to issues with restaring nodes.") fun `distributed service request retries are persisted across client node restarts`() { - val distributedServiceNodes = startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2).getOrThrow() - val alice = startNode(ALICE.name, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)).getOrThrow() - val serviceAddress = alice.services.networkMapCache.run { - val notaryParty = notaryIdentities.randomOrNull()!! - alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!) + driver(startNodesInProcess = true) { + val distributedServiceNodes = startDistributedService() + val alice = startAlice() + val serviceAddress = alice.services.networkMapCache.run { + val notaryParty = notaryIdentities.randomOrNull()!! + alice.network.getAddressOfParty(getPartyInfo(notaryParty)!!) + } + + val dummyTopic = "dummy.topic" + val responseMessage = "response" + + val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage) + + val sessionId = random63BitValue() + + // Send a single request with retry + with(alice.network) { + val request = TestRequest(sessionId, myAddress) + val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes) + send(msg, serviceAddress, retryId = request.sessionID) + } + + // Wait until the first request is received + crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) + // Stop alice's node after we ensured that the first request was delivered and ignored. + alice.dispose() + val numberOfRequestsReceived = crashingNodes.requestsReceived.get() + assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1) + + crashingNodes.ignoreRequests = false + + // Restart the node and expect a response + val aliceRestarted = startAlice() + val response = aliceRestarted.network.onNext(dummyTopic, sessionId).getOrThrow(5.seconds) + + assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived) + assertThat(response).isEqualTo(responseMessage) } + } - val dummyTopic = "dummy.topic" - val responseMessage = "response" + private fun DriverDSLExposedInterface.startDistributedService(): List> { + return startNotaryCluster(DISTRIBUTED_SERVICE_NAME, 2) + .getOrThrow() + .second + .map { (it as NodeHandle.InProcess).node } + } - val crashingNodes = simulateCrashingNodes(distributedServiceNodes, dummyTopic, responseMessage) - - val sessionId = random63BitValue() - - // Send a single request with retry - with(alice.network) { - val request = TestRequest(sessionId, myAddress) - val msg = createMessage(TopicSession(dummyTopic), data = request.serialize().bytes) - send(msg, serviceAddress, retryId = request.sessionID) - } - - // Wait until the first request is received - crashingNodes.firstRequestReceived.await(5, TimeUnit.SECONDS) - // Stop alice's node after we ensured that the first request was delivered and ignored. - alice.services.networkMapCache.clearNetworkMapCache() - - alice.dispose() - val numberOfRequestsReceived = crashingNodes.requestsReceived.get() - assertThat(numberOfRequestsReceived).isGreaterThanOrEqualTo(1) - - crashingNodes.ignoreRequests = false - - // Restart the node and expect a response - val aliceRestarted = startNode(ALICE.name, waitForConnection = true, configOverrides = mapOf("messageRedeliveryDelaySeconds" to 5)).getOrThrow() - val response = aliceRestarted.network.onNext(dummyTopic, sessionId).getOrThrow(5.seconds) - - assertThat(crashingNodes.requestsReceived.get()).isGreaterThan(numberOfRequestsReceived) - assertThat(response).isEqualTo(responseMessage) + private fun DriverDSLExposedInterface.startAlice(): StartedNode { + return startNode(providedName = ALICE.name, customOverrides = mapOf("messageRedeliveryDelaySeconds" to 1)) + .map { (it as NodeHandle.InProcess).node } + .getOrThrow() } data class CrashingNodes( @@ -203,4 +207,4 @@ class P2PMessagingTest : NodeBasedTest() { @CordaSerializable private data class TestRequest(override val sessionID: Long = random63BitValue(), override val replyTo: SingleMessageRecipient) : ServiceRequestMessage -} \ No newline at end of file +} diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index 61580c8632..8e7e639967 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -1,19 +1,17 @@ package net.corda.traderdemo import net.corda.client.rpc.CordaRPCClient -import net.corda.core.internal.packageName import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.millis import net.corda.finance.DOLLARS import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow -import net.corda.finance.schemas.CashSchemaV1 -import net.corda.finance.schemas.CommercialPaperSchemaV1 -import net.corda.node.services.FlowPermissions.Companion.startFlowPermission +import net.corda.node.services.FlowPermissions import net.corda.nodeapi.User import net.corda.testing.* +import net.corda.testing.driver.NodeHandle +import net.corda.testing.driver.driver import net.corda.testing.driver.poll -import net.corda.testing.node.NodeBasedTest import net.corda.traderdemo.flow.BuyerFlow import net.corda.traderdemo.flow.CommercialPaperIssueFlow import net.corda.traderdemo.flow.SellerFlow @@ -21,53 +19,54 @@ import org.assertj.core.api.Assertions.assertThat import org.junit.Test import java.util.concurrent.Executors -class TraderDemoTest : NodeBasedTest(listOf( - "net.corda.finance.contracts.asset", "net.corda.finance.contracts", - CashSchemaV1::class.packageName, CommercialPaperSchemaV1::class.packageName)) { +class TraderDemoTest { @Test fun `runs trader demo`() { - val demoUser = User("demo", "demo", setOf(startFlowPermission())) + val demoUser = User("demo", "demo", setOf(FlowPermissions.startFlowPermission())) val bankUser = User("user1", "test", permissions = setOf( - startFlowPermission(), - startFlowPermission(), - startFlowPermission())) - val notaryFuture = startNotaryNode(DUMMY_NOTARY.name, validating = false) - val nodeAFuture = startNode(DUMMY_BANK_A.name, rpcUsers = listOf(demoUser)) - val nodeBFuture = startNode(DUMMY_BANK_B.name, rpcUsers = listOf(demoUser)) - val bankNodeFuture = startNode(BOC.name, rpcUsers = listOf(bankUser)) - val (nodeA, nodeB, bankNode) = listOf(nodeAFuture, nodeBFuture, bankNodeFuture, notaryFuture).map { it.getOrThrow() } + FlowPermissions.startFlowPermission(), + FlowPermissions.startFlowPermission(), + FlowPermissions.startFlowPermission())) + driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance")) { + val (nodeA, nodeB, bankNode) = listOf( + startNode(providedName = DUMMY_BANK_A.name, rpcUsers = listOf(demoUser)), + startNode(providedName = DUMMY_BANK_B.name, rpcUsers = listOf(demoUser)), + startNode(providedName = BOC.name, rpcUsers = listOf(bankUser)), + startNotaryNode(DUMMY_NOTARY.name, validating = false)) + .map { (it.getOrThrow() as NodeHandle.InProcess).node } - nodeA.internals.registerInitiatedFlow(BuyerFlow::class.java) - val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map { - val client = CordaRPCClient(it.internals.configuration.rpcAddress!!) - client.start(demoUser.username, demoUser.password).proxy + nodeA.internals.registerInitiatedFlow(BuyerFlow::class.java) + val (nodeARpc, nodeBRpc) = listOf(nodeA, nodeB).map { + val client = CordaRPCClient(it.internals.configuration.rpcAddress!!) + client.start(demoUser.username, demoUser.password).proxy + } + val nodeBankRpc = let { + val client = CordaRPCClient(bankNode.internals.configuration.rpcAddress!!) + client.start(bankUser.username, bankUser.password).proxy + } + + val clientA = TraderDemoClientApi(nodeARpc) + val clientB = TraderDemoClientApi(nodeBRpc) + val clientBank = TraderDemoClientApi(nodeBankRpc) + + val originalACash = clientA.cashCount // A has random number of issued amount + val expectedBCash = clientB.cashCount + 1 + val expectedPaper = listOf(clientA.commercialPaperCount + 1, clientB.commercialPaperCount) + + clientBank.runIssuer(amount = 100.DOLLARS, buyerName = nodeA.info.chooseIdentity().name, sellerName = nodeB.info.chooseIdentity().name) + clientB.runSeller(buyerName = nodeA.info.chooseIdentity().name, amount = 5.DOLLARS) + + assertThat(clientA.cashCount).isGreaterThan(originalACash) + assertThat(clientB.cashCount).isEqualTo(expectedBCash) + // Wait until A receives the commercial paper + val executor = Executors.newScheduledThreadPool(1) + poll(executor, "A to be notified of the commercial paper", pollInterval = 100.millis) { + val actualPaper = listOf(clientA.commercialPaperCount, clientB.commercialPaperCount) + if (actualPaper == expectedPaper) Unit else null + }.getOrThrow() + executor.shutdown() + assertThat(clientA.dollarCashBalance).isEqualTo(95.DOLLARS) + assertThat(clientB.dollarCashBalance).isEqualTo(5.DOLLARS) } - val nodeBankRpc = let { - val client = CordaRPCClient(bankNode.internals.configuration.rpcAddress!!) - client.start(bankUser.username, bankUser.password).proxy - } - - val clientA = TraderDemoClientApi(nodeARpc) - val clientB = TraderDemoClientApi(nodeBRpc) - val clientBank = TraderDemoClientApi(nodeBankRpc) - - val originalACash = clientA.cashCount // A has random number of issued amount - val expectedBCash = clientB.cashCount + 1 - val expectedPaper = listOf(clientA.commercialPaperCount + 1, clientB.commercialPaperCount) - - clientBank.runIssuer(amount = 100.DOLLARS, buyerName = nodeA.info.chooseIdentity().name, sellerName = nodeB.info.chooseIdentity().name) - clientB.runSeller(buyerName = nodeA.info.chooseIdentity().name, amount = 5.DOLLARS) - - assertThat(clientA.cashCount).isGreaterThan(originalACash) - assertThat(clientB.cashCount).isEqualTo(expectedBCash) - // Wait until A receives the commercial paper - val executor = Executors.newScheduledThreadPool(1) - poll(executor, "A to be notified of the commercial paper", pollInterval = 100.millis) { - val actualPaper = listOf(clientA.commercialPaperCount, clientB.commercialPaperCount) - if (actualPaper == expectedPaper) Unit else null - }.getOrThrow() - executor.shutdown() - assertThat(clientA.dollarCashBalance).isEqualTo(95.DOLLARS) - assertThat(clientB.dollarCashBalance).isEqualTo(5.DOLLARS) } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt new file mode 100644 index 0000000000..a3449d9417 --- /dev/null +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/internal/NodeBasedTest.kt @@ -0,0 +1,115 @@ +package net.corda.testing.internal + +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.concurrent.fork +import net.corda.core.internal.concurrent.transpose +import net.corda.core.internal.createDirectories +import net.corda.core.internal.div +import net.corda.core.node.NodeInfo +import net.corda.core.utilities.getOrThrow +import net.corda.node.internal.Node +import net.corda.node.internal.StartedNode +import net.corda.node.internal.cordapp.CordappLoader +import net.corda.node.services.config.ConfigHelper +import net.corda.node.services.config.configOf +import net.corda.node.services.config.parseAsNodeConfiguration +import net.corda.node.services.config.plus +import net.corda.nodeapi.User +import net.corda.testing.SerializationEnvironmentRule +import net.corda.testing.driver.addressMustNotBeBoundFuture +import net.corda.testing.getFreeLocalPorts +import net.corda.testing.node.MockServices +import org.apache.logging.log4j.Level +import org.junit.After +import org.junit.Rule +import org.junit.rules.TemporaryFolder +import java.nio.file.Path +import java.util.concurrent.Executors +import kotlin.concurrent.thread + +// TODO Some of the logic here duplicates what's in the driver +abstract class NodeBasedTest(private val cordappPackages: List = emptyList()) { + companion object { + private val WHITESPACE = "\\s++".toRegex() + } + + @Rule + @JvmField + val testSerialization = SerializationEnvironmentRule() + @Rule + @JvmField + val tempFolder = TemporaryFolder() + + private val nodes = mutableListOf>() + private val nodeInfos = mutableListOf() + + init { + System.setProperty("consoleLogLevel", Level.DEBUG.name().toLowerCase()) + } + + /** + * Stops the network map node and all the nodes started by [startNode]. This is called automatically after each test + * but can also be called manually within a test. + */ + @After + fun stopAllNodes() { + val shutdownExecutor = Executors.newScheduledThreadPool(nodes.size) + nodes.map { shutdownExecutor.fork(it::dispose) }.transpose().getOrThrow() + // Wait until ports are released + val portNotBoundChecks = nodes.flatMap { + listOf( + it.internals.configuration.p2pAddress.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }, + it.internals.configuration.rpcAddress?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) } + ) + }.filterNotNull() + nodes.clear() + portNotBoundChecks.transpose().getOrThrow() + } + + @JvmOverloads + fun startNode(legalName: CordaX500Name, + platformVersion: Int = 1, + rpcUsers: List = emptyList(), + configOverrides: Map = emptyMap()): StartedNode { + val baseDirectory = baseDirectory(legalName).createDirectories() + val localPort = getFreeLocalPorts("localhost", 2) + val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString() + val config = ConfigHelper.loadConfig( + baseDirectory = baseDirectory, + allowMissingConfig = true, + configOverrides = configOf( + "myLegalName" to legalName.toString(), + "p2pAddress" to p2pAddress, + "rpcAddress" to localPort[1].toString(), + "rpcUsers" to rpcUsers.map { it.toMap() } + ) + configOverrides + ) + + val parsedConfig = config.parseAsNodeConfiguration() + val node = Node( + parsedConfig, + MockServices.MOCK_VERSION_INFO.copy(platformVersion = platformVersion), + initialiseSerialization = false, + cordappLoader = CordappLoader.createDefaultWithTestPackages(parsedConfig, cordappPackages)).start() + nodes += node + ensureAllNetworkMapCachesHaveAllNodeInfos() + thread(name = legalName.organisation) { + node.internals.run() + } + + return node + } + + protected fun baseDirectory(legalName: CordaX500Name): Path { + return tempFolder.root.toPath() / legalName.organisation.replace(WHITESPACE, "") + } + + private fun ensureAllNetworkMapCachesHaveAllNodeInfos() { + val runningNodes = nodes.filter { it.internals.started != null } + val runningNodesInfo = runningNodes.map { it.info } + for (node in runningNodes) + for (nodeInfo in runningNodesInfo) { + node.services.networkMapCache.addNode(nodeInfo) + } + } +} diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt deleted file mode 100644 index 8fddf5cb3c..0000000000 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/NodeBasedTest.kt +++ /dev/null @@ -1,181 +0,0 @@ -package net.corda.testing.node - -import net.corda.core.concurrent.CordaFuture -import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.concurrent.* -import net.corda.core.internal.createDirectories -import net.corda.core.internal.div -import net.corda.core.node.NodeInfo -import net.corda.core.utilities.NetworkHostAndPort -import net.corda.core.utilities.getOrThrow -import net.corda.node.internal.Node -import net.corda.node.internal.StartedNode -import net.corda.node.internal.cordapp.CordappLoader -import net.corda.node.services.config.* -import net.corda.node.utilities.ServiceIdentityGenerator -import net.corda.nodeapi.User -import net.corda.nodeapi.config.toConfig -import net.corda.testing.driver.addressMustNotBeBoundFuture -import net.corda.testing.getFreeLocalPorts -import net.corda.testing.node.MockServices.Companion.MOCK_VERSION_INFO -import net.corda.testing.SerializationEnvironmentRule -import org.apache.logging.log4j.Level -import org.junit.After -import org.junit.Rule -import org.junit.rules.TemporaryFolder -import java.util.concurrent.Executors -import kotlin.concurrent.thread - -/** - * Extend this class if you need to run nodes in a test. You could use the driver DSL but it's extremely slow for testing - * purposes. Use the driver if you need to run the nodes in separate processes otherwise this class will suffice. - */ -// TODO Some of the logic here duplicates what's in the driver -abstract class NodeBasedTest(private val cordappPackages: List = emptyList()) { - companion object { - private val WHITESPACE = "\\s++".toRegex() - } - - @Rule - @JvmField - val testSerialization = SerializationEnvironmentRule() - @Rule - @JvmField - val tempFolder = TemporaryFolder() - - private val nodes = mutableListOf>() - private val nodeInfos = mutableListOf() - - init { - System.setProperty("consoleLogLevel", Level.DEBUG.name().toLowerCase()) - } - - /** - * Stops the network map node and all the nodes started by [startNode]. This is called automatically after each test - * but can also be called manually within a test. - */ - @After - fun stopAllNodes() { - val shutdownExecutor = Executors.newScheduledThreadPool(nodes.size) - nodes.map { shutdownExecutor.fork(it::dispose) }.transpose().getOrThrow() - // Wait until ports are released - val portNotBoundChecks = nodes.flatMap { - listOf( - it.internals.configuration.p2pAddress.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }, - it.internals.configuration.rpcAddress?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) } - ) - }.filterNotNull() - nodes.clear() - portNotBoundChecks.transpose().getOrThrow() - } - - /** - * Clear network map data from nodes' databases. - */ - fun clearAllNodeInfoDb() { - nodes.forEach { it.services.networkMapCache.clearNetworkMapCache() } - } - - @JvmOverloads - fun startNode(legalName: CordaX500Name, - platformVersion: Int = 1, - rpcUsers: List = emptyList(), - configOverrides: Map = emptyMap(), - waitForConnection: Boolean = true): CordaFuture> { - val node = startNodeInternal( - legalName, - platformVersion, - rpcUsers, - configOverrides) - return if (waitForConnection) node.internals.nodeReadyFuture.map { node } else doneFuture(node) - } - - // TODO This method has been added temporarily, to be deleted once the set of notaries is defined at the network level. - fun startNotaryNode(name: CordaX500Name, - rpcUsers: List = emptyList(), - validating: Boolean = true): CordaFuture> { - return startNode(name, rpcUsers = rpcUsers, configOverrides = mapOf("notary" to mapOf("validating" to validating))) - } - - fun startNotaryCluster(notaryName: CordaX500Name, clusterSize: Int): CordaFuture>> { - fun notaryConfig(nodeAddress: NetworkHostAndPort, clusterAddress: NetworkHostAndPort? = null): Map { - val clusterAddresses = if (clusterAddress != null) listOf(clusterAddress) else emptyList() - val config = NotaryConfig(validating = true, raft = RaftConfig(nodeAddress = nodeAddress, clusterAddresses = clusterAddresses)) - return mapOf("notary" to config.toConfig().root().unwrapped()) - } - - ServiceIdentityGenerator.generateToDisk( - (0 until clusterSize).map { baseDirectory(notaryName.copy(organisation = "${notaryName.organisation}-$it")) }, - notaryName) - - val nodeAddresses = getFreeLocalPorts("localhost", clusterSize) - - val masterNodeFuture = startNode( - CordaX500Name(organisation = "${notaryName.organisation}-0", locality = notaryName.locality, country = notaryName.country), - configOverrides = notaryConfig(nodeAddresses[0]) + mapOf( - "database" to mapOf( - "serverNameTablePrefix" to if (clusterSize > 1) "${notaryName.organisation}0".replace(Regex("[^0-9A-Za-z]+"), "") else "" - ) - ) - ) - - val remainingNodesFutures = (1 until clusterSize).map { - startNode( - CordaX500Name(organisation = "${notaryName.organisation}-$it", locality = notaryName.locality, country = notaryName.country), - configOverrides = notaryConfig(nodeAddresses[it], nodeAddresses[0]) + mapOf( - "database" to mapOf( - "serverNameTablePrefix" to "${notaryName.organisation}$it".replace(Regex("[^0-9A-Za-z]+"), "") - ) - ) - ) - } - - return remainingNodesFutures.transpose().flatMap { remainingNodes -> - masterNodeFuture.map { masterNode -> listOf(masterNode) + remainingNodes } - } - } - - protected fun baseDirectory(legalName: CordaX500Name) = tempFolder.root.toPath() / legalName.organisation.replace(WHITESPACE, "") - - private fun ensureAllNetworkMapCachesHaveAllNodeInfos() { - val runningNodes = nodes.filter { it.internals.started != null } - val runningNodesInfo = runningNodes.map { it.info } - for (node in runningNodes) - for (nodeInfo in runningNodesInfo) { - node.services.networkMapCache.addNode(nodeInfo) - } - } - - private fun startNodeInternal(legalName: CordaX500Name, - platformVersion: Int, - rpcUsers: List, - configOverrides: Map): StartedNode { - val baseDirectory = baseDirectory(legalName).createDirectories() - val localPort = getFreeLocalPorts("localhost", 2) - val p2pAddress = configOverrides["p2pAddress"] ?: localPort[0].toString() - val config = ConfigHelper.loadConfig( - baseDirectory = baseDirectory, - allowMissingConfig = true, - configOverrides = configOf( - "myLegalName" to legalName.toString(), - "p2pAddress" to p2pAddress, - "rpcAddress" to localPort[1].toString(), - "rpcUsers" to rpcUsers.map { it.toMap() } - ) + configOverrides - ) - - val parsedConfig = config.parseAsNodeConfiguration() - val node = Node( - parsedConfig, - MOCK_VERSION_INFO.copy(platformVersion = platformVersion), - initialiseSerialization = false, - cordappLoader = CordappLoader.createDefaultWithTestPackages(parsedConfig, cordappPackages)).start() - nodes += node - ensureAllNetworkMapCachesHaveAllNodeInfos() - thread(name = legalName.organisation) { - node.internals.run() - } - - return node - } -}