From e20278fbfd372cdace5c3d5100c566e3218e18e0 Mon Sep 17 00:00:00 2001 From: Shams Asari Date: Mon, 28 Jan 2019 14:08:54 +0000 Subject: [PATCH] Made FlowCheckpointVersionNodeStartupCheckTest leaner (#4640) * Removed `restart node successfully with suspended flow` as it duplicates `TraderDemoTest#Test restart node during flow works properly` * Removed the need for a notary --- .idea/compiler.xml | 22 +++ .../net/corda/core/internal/PathUtils.kt | 3 + ...owCheckpointVersionNodeStartupCheckTest.kt | 128 +++++++----------- .../net/corda/traderdemo/TraderDemoTest.kt | 51 ++++--- .../node/internal/InternalTestUtils.kt | 14 ++ 5 files changed, 112 insertions(+), 106 deletions(-) diff --git a/.idea/compiler.xml b/.idea/compiler.xml index 804913cc66..813f60e011 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -4,6 +4,11 @@ + + + + + @@ -36,6 +41,8 @@ + + @@ -64,6 +71,8 @@ + + @@ -170,6 +179,10 @@ + + + + @@ -189,6 +202,10 @@ + + + + @@ -266,6 +283,11 @@ + + + + + diff --git a/core/src/main/kotlin/net/corda/core/internal/PathUtils.kt b/core/src/main/kotlin/net/corda/core/internal/PathUtils.kt index dfd873628f..0f7d214af4 100644 --- a/core/src/main/kotlin/net/corda/core/internal/PathUtils.kt +++ b/core/src/main/kotlin/net/corda/core/internal/PathUtils.kt @@ -63,6 +63,9 @@ fun Path.copyTo(target: Path, vararg options: CopyOption): Path = Files.copy(thi /** @see Files.move */ fun Path.moveTo(target: Path, vararg options: CopyOption): Path = Files.move(this, target, *options) +/** @see Files.move */ +fun Path.renameTo(fileName: String, vararg options: CopyOption): Path = moveTo(parent / fileName, *options) + /** See overload of [Files.copy] which takes in an [InputStream]. */ fun Path.copyTo(out: OutputStream): Long = Files.copy(this, out) diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt index 977b4c557b..f0bf751f76 100644 --- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt @@ -1,12 +1,13 @@ package net.corda.node.flows +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.* +import net.corda.core.identity.Party import net.corda.core.internal.* -import net.corda.core.internal.concurrent.transpose -import net.corda.core.messaging.startTrackedFlow +import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap import net.corda.node.internal.CheckpointIncompatibleException -import net.corda.node.internal.NodeStartup -import net.corda.testMessage.* import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.BOB_NAME import net.corda.testing.core.singleIdentity @@ -14,65 +15,30 @@ import net.corda.testing.driver.DriverDSL import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.NodeParameters import net.corda.testing.driver.driver -import net.corda.testing.node.TestCordapp -import net.corda.testing.node.internal.* -import net.test.cordapp.v1.Record -import net.test.cordapp.v1.SendMessageFlow -import org.assertj.core.api.Assertions.assertThat +import net.corda.testing.node.internal.CustomCordapp +import net.corda.testing.node.internal.ListenProcessDeathException +import net.corda.testing.node.internal.assertCheckpoints +import net.corda.testing.node.internal.enclosedCordapp import org.junit.Test import java.nio.file.Path import java.nio.file.StandardCopyOption.REPLACE_EXISTING -import java.util.* -import java.util.concurrent.TimeUnit import kotlin.test.assertEquals import kotlin.test.assertFailsWith -import kotlin.test.assertNotNull +// TraderDemoTest already has a test which checks the node can resume a flow from a checkpoint class FlowCheckpointVersionNodeStartupCheckTest { companion object { - val message = Message("Hello world!") - val defaultCordapp = cordappWithPackages( - MessageState::class.packageName, SendMessageFlow::class.packageName - ) - } - - @Test - fun `restart node successfully with suspended flow`() { - return driver(parametersForRestartingNodes()) { - createSuspendedFlowInBob(setOf(defaultCordapp)) - // Bob will resume the flow - val alice = startNode(providedName = ALICE_NAME).getOrThrow() - startNode(providedName = BOB_NAME).getOrThrow() - val page = alice.rpc.vaultTrack(MessageState::class.java) - val result = if (page.snapshot.states.isNotEmpty()) { - page.snapshot.states.first() - } else { - val r = page.updates.timeout(30, TimeUnit.SECONDS).take(1).toBlocking().single() - if (r.consumed.isNotEmpty()) r.consumed.first() else r.produced.first() - } - assertNotNull(result) - assertEquals(message, result.state.data.message) - } + val defaultCordapp = enclosedCordapp() } @Test fun `restart node with incompatible version of suspended flow due to different jar name`() { driver(parametersForRestartingNodes()) { - val uniqueName = "different-jar-name-test-${UUID.randomUUID()}" - val cordapp = defaultCordapp.copy(name = uniqueName) - - val bobBaseDir = createSuspendedFlowInBob(setOf(cordapp)) - - val cordappsDir = bobBaseDir / "cordapps" - val cordappJar = cordappsDir.list().single { it.toString().endsWith(".jar") } - // Make sure we're dealing with right jar - assertThat(cordappJar.fileName.toString()).contains(uniqueName) - // Rename the jar file. - cordappJar.moveTo(cordappsDir / "renamed-${cordappJar.fileName}") + val defaultCordappJar = createSuspendedFlowInBob() + defaultCordappJar.renameTo("renamed-${defaultCordappJar.fileName}") assertBobFailsToStartWithLogMessage( - emptyList(), - CheckpointIncompatibleException.FlowNotInstalledException(SendMessageFlow::class.java).message + CheckpointIncompatibleException.FlowNotInstalledException(ReceiverFlow::class.java).message ) } } @@ -80,54 +46,42 @@ class FlowCheckpointVersionNodeStartupCheckTest { @Test fun `restart node with incompatible version of suspended flow due to different jar hash`() { driver(parametersForRestartingNodes()) { - val uniqueWorkflowJarName = "different-jar-hash-test-${UUID.randomUUID()}" - val uniqueContractJarName = "contract-$uniqueWorkflowJarName" - val defaultWorkflowJar = cordappWithPackages(SendMessageFlow::class.packageName) - val defaultContractJar = cordappWithPackages(MessageState::class.packageName) - val contractJar = defaultContractJar.copy(name = uniqueContractJarName) - val workflowJar = defaultWorkflowJar.copy(name = uniqueWorkflowJarName) - - val bobBaseDir = createSuspendedFlowInBob(setOf(workflowJar, contractJar)) - - val cordappsDir = bobBaseDir / "cordapps" - val cordappJar = cordappsDir.list().single { - ! it.toString().contains(uniqueContractJarName) && it.toString().endsWith(".jar") - } - // Make sure we're dealing with right jar - assertThat(cordappJar.fileName.toString()).contains(uniqueWorkflowJarName) + val defaultCordappJar = createSuspendedFlowInBob() // The name is part of the MANIFEST so changing it is sufficient to change the jar hash - val modifiedCordapp = workflowJar.copy(name = "${workflowJar.name}-modified") + val modifiedCordapp = defaultCordapp.copy(name = "${defaultCordapp.name}-modified") val modifiedCordappJar = CustomCordapp.getJarFile(modifiedCordapp) - modifiedCordappJar.moveTo(cordappJar, REPLACE_EXISTING) + modifiedCordappJar.moveTo(defaultCordappJar, REPLACE_EXISTING) assertBobFailsToStartWithLogMessage( - emptyList(), // The part of the log message generated by CheckpointIncompatibleException.FlowVersionIncompatibleException "that is incompatible with the current installed version of" ) } } - private fun DriverDSL.createSuspendedFlowInBob(cordapps: Set): Path { - val (alice, bob) = listOf(ALICE_NAME, BOB_NAME) - .map { startNode(NodeParameters(providedName = it, additionalCordapps = cordapps)) } - .transpose() - .getOrThrow() - alice.stop() - val flowTracker = bob.rpc.startTrackedFlow(::SendMessageFlow, message, defaultNotaryIdentity, alice.nodeInfo.singleIdentity()).progress - // Wait until Bob progresses as far as possible because Alice node is offline - flowTracker.takeFirst { it == SendMessageFlow.Companion.FINALISING_TRANSACTION.label }.toBlocking().single() + private fun DriverDSL.createSuspendedFlowInBob(): Path { + val (alice, bob) = listOf( + startNode(providedName = ALICE_NAME), + startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp))) + ).map { it.getOrThrow() } + + alice.stop() // Stop Alice so that Bob never receives the message + + bob.rpc.startFlow(::ReceiverFlow, alice.nodeInfo.singleIdentity()) + // Wait until Bob's flow has started + bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first() bob.stop() - return bob.baseDirectory + assertCheckpoints(BOB_NAME, 1) + + return (bob.baseDirectory / "cordapps").list().single { it.toString().endsWith(".jar") } } - private fun DriverDSL.assertBobFailsToStartWithLogMessage(cordapps: Collection, logMessage: String) { + private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) { assertFailsWith(ListenProcessDeathException::class) { startNode(NodeParameters( providedName = BOB_NAME, - customOverrides = mapOf("devMode" to false), - additionalCordapps = cordapps + customOverrides = mapOf("devMode" to false) )).getOrThrow() } @@ -141,7 +95,21 @@ class FlowCheckpointVersionNodeStartupCheckTest { return DriverParameters( startNodesInProcess = false, // Start nodes in separate processes to ensure CordappLoader is not shared between restarts inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows - cordappsForAllNodes = emptyList() + cordappsForAllNodes = emptyList(), + notarySpecs = emptyList() ) } + + @InitiatingFlow + @StartableByRPC + class ReceiverFlow(private val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): String = initiateFlow(otherParty).receive().unwrap { it } + } + + @InitiatedBy(ReceiverFlow::class) + class SenderFlow(private val otherSide: FlowSession) : FlowLogic() { + @Suspendable + override fun call() = otherSide.send("Hello!") + } } diff --git a/samples/trader-demo/workflows-trader/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/workflows-trader/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index 5ebd6c6797..cb82d46472 100644 --- a/samples/trader-demo/workflows-trader/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/workflows-trader/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -1,27 +1,33 @@ package net.corda.traderdemo import net.corda.client.rpc.CordaRPCClient +import net.corda.core.identity.CordaX500Name +import net.corda.core.internal.div import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.millis import net.corda.finance.DOLLARS +import net.corda.finance.USD +import net.corda.finance.contracts.getCashBalance import net.corda.finance.flows.CashIssueFlow import net.corda.finance.flows.CashPaymentFlow import net.corda.node.services.Permissions.Companion.all import net.corda.node.services.Permissions.Companion.startFlow -import net.corda.testing.core.* -import net.corda.testing.driver.DriverParameters -import net.corda.testing.driver.InProcess -import net.corda.testing.driver.OutOfProcess -import net.corda.testing.driver.driver +import net.corda.testing.core.BOC_NAME +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.DUMMY_BANK_B_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.driver.* import net.corda.testing.node.TestCordapp import net.corda.testing.node.User import net.corda.testing.node.internal.FINANCE_CORDAPPS +import net.corda.testing.node.internal.assertCheckpoints import net.corda.testing.node.internal.poll import net.corda.traderdemo.flow.CommercialPaperIssueFlow import net.corda.traderdemo.flow.SellerFlow import org.assertj.core.api.Assertions.assertThat import org.junit.Test +import java.sql.DriverManager import java.util.concurrent.Executors class TraderDemoTest { @@ -85,27 +91,20 @@ class TraderDemoTest { inMemoryDB = false, cordappsForAllNodes = FINANCE_CORDAPPS + TestCordapp.findCordapp("net.corda.traderdemo") )) { - val demoUser = User("demo", "demo", setOf(startFlow(), all())) - val bankUser = User("user1", "test", permissions = setOf(all())) - val (nodeA, nodeB, bankNode) = listOf( - startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), - startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)), - startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser)) - ).map { (it.getOrThrow() as OutOfProcess) } - - val nodeBRpc = CordaRPCClient(nodeB.rpcAddress).start(demoUser.username, demoUser.password).proxy - val nodeARpc = CordaRPCClient(nodeA.rpcAddress).start(demoUser.username, demoUser.password).proxy - val nodeBankRpc = let { - val client = CordaRPCClient(bankNode.rpcAddress) - client.start(bankUser.username, bankUser.password).proxy - } - - TraderDemoClientApi(nodeBankRpc).runIssuer(amount = 100.DOLLARS, buyerName = nodeA.nodeInfo.singleIdentity().name, sellerName = nodeB.nodeInfo.singleIdentity().name) - val stxFuture = nodeBRpc.startFlow(::SellerFlow, nodeA.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue - nodeARpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts - nodeA.stop() - startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to nodeA.p2pAddress.toString())) - stxFuture.getOrThrow() + val (buyer, seller, bank) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME), + startNode(providedName = DUMMY_BANK_B_NAME), + startNode(providedName = BOC_NAME) + ).map { it.getOrThrow() } + TraderDemoClientApi(bank.rpc).runIssuer(amount = 100.DOLLARS, buyerName = DUMMY_BANK_A_NAME, sellerName = DUMMY_BANK_B_NAME) + val saleFuture = seller.rpc.startFlow(::SellerFlow, buyer.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue + buyer.rpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts + buyer.stop() + assertCheckpoints(DUMMY_BANK_A_NAME, 1) + val buyer2 = startNode(providedName = DUMMY_BANK_A_NAME, customOverrides = mapOf("p2pAddress" to buyer.p2pAddress.toString())).getOrThrow() + saleFuture.getOrThrow() + assertThat(buyer2.rpc.getCashBalance(USD)).isEqualTo(95.DOLLARS) + assertThat(seller.rpc.getCashBalance(USD)).isEqualTo(5.DOLLARS) } } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt index cd60d8693e..3a14edf1b3 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalTestUtils.kt @@ -6,9 +6,11 @@ import net.corda.core.CordaException import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowLogic +import net.corda.core.identity.CordaX500Name import net.corda.core.internal.FlowStateMachine import net.corda.core.internal.VisibleForTesting import net.corda.core.internal.concurrent.openFuture +import net.corda.core.internal.div import net.corda.core.internal.times import net.corda.core.messaging.CordaRPCOps import net.corda.core.serialization.internal.SerializationEnvironment @@ -20,6 +22,7 @@ import net.corda.core.utilities.millis import net.corda.core.utilities.seconds import net.corda.node.services.api.StartedNodeServices import net.corda.node.services.messaging.Message +import net.corda.testing.driver.DriverDSL import net.corda.testing.driver.NodeHandle import net.corda.testing.internal.chooseIdentity import net.corda.testing.internal.createTestSerializationEnv @@ -29,12 +32,14 @@ import net.corda.testing.node.TestCordapp import net.corda.testing.node.User import net.corda.testing.node.testContext import org.apache.commons.lang.ClassUtils +import org.assertj.core.api.Assertions.assertThat import org.slf4j.LoggerFactory import rx.Observable import rx.subjects.AsyncSubject import java.io.InputStream import java.net.Socket import java.net.SocketException +import java.sql.DriverManager import java.time.Duration import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit @@ -239,6 +244,15 @@ fun CordaRPCOps.waitForShutdown(): Observable { return completable } +fun DriverDSL.assertCheckpoints(name: CordaX500Name, expected: Long) { + DriverManager.getConnection("jdbc:h2:file:${baseDirectory(name) / "persistence"}", "sa", "").use { connection -> + connection.createStatement().executeQuery("select count(*) from NODE_CHECKPOINTS").use { rs -> + rs.next() + assertThat(rs.getLong(1)).isEqualTo(expected) + } + } +} + /** * Should only be used by Driver and MockNode. */