From 768d9309dcc71d8f720dc9f0435634199d7fd03e Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Fri, 1 Jun 2018 14:38:36 +0100 Subject: [PATCH] Various tests and fixes related to CORDA-1191 --- .../services/statemachine/HardRestartTest.kt | 240 ++++++++++++++++++ .../services/messaging/MessagingExecutor.kt | 7 + .../statemachine/FlowStateMachineImpl.kt | 16 +- .../SingleThreadedStateMachineManager.kt | 17 +- node/src/main/resources/reference.conf | 2 +- .../net/corda/traderdemo/TraderDemoTest.kt | 29 +++ 6 files changed, 301 insertions(+), 10 deletions(-) create mode 100644 node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt new file mode 100644 index 0000000000..bff99b12e2 --- /dev/null +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt @@ -0,0 +1,240 @@ +package net.corda.node.services.statemachine + +import co.paralleluniverse.fibers.Suspendable +import net.corda.client.rpc.CordaRPCClient +import net.corda.core.flows.* +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.fork +import net.corda.core.internal.concurrent.transpose +import net.corda.core.internal.div +import net.corda.core.messaging.CordaRPCOps +import net.corda.core.messaging.startFlow +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap +import net.corda.node.services.Permissions +import net.corda.testing.common.internal.ProjectStructure +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.DUMMY_BANK_B_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.driver.DriverParameters +import net.corda.testing.driver.OutOfProcess +import net.corda.testing.driver.driver +import net.corda.testing.node.User +import org.junit.Test +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import kotlin.concurrent.thread +import kotlin.test.assertEquals + +class HardRestartTest { + @StartableByRPC + @InitiatingFlow + class Ping(val pongParty: Party, val times: Int) : FlowLogic() { + @Suspendable + override fun call() { + val pongSession = initiateFlow(pongParty) + pongSession.sendAndReceive(times) + for (i in 1 .. times) { + val j = pongSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } + } + } + + @InitiatedBy(Ping::class) + class Pong(val pingSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val times = pingSession.sendAndReceive(Unit).unwrap { it } + for (i in 1 .. times) { + val j = pingSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } + } + } + + companion object { + val logConfigFile = ProjectStructure.projectRootDir / "config" / "dev" / "log4j2.xml" + } + + @Test + fun restartShortPingPongFlowRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + val (a, b) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + ).transpose().getOrThrow() + + val latch = CountDownLatch(1) + + // We kill -9 and restart the Pong node after a random sleep + val pongRestartThread = thread { + latch.await() + val ms = Random().nextInt(1000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + (b as OutOfProcess).process.destroyForcibly() + b.stop() + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + } + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { + val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 1).returnValue + latch.countDown() + // No matter the kill + returnValue.getOrThrow() + } + + pongRestartThread.join() + } + } + + @Test + fun restartLongPingPongFlowRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + val (a, b) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + ).transpose().getOrThrow() + + val latch = CountDownLatch(1) + + // We kill -9 and restart the Pong node after a random sleep + val pongRestartThread = thread { + latch.await() + val ms = Random().nextInt(1000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + (b as OutOfProcess).process.destroyForcibly() + b.stop() + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + } + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { + val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue + latch.countDown() + // No matter the kill + returnValue.getOrThrow() + } + + pongRestartThread.join() + } + } + + @Test + fun softRestartLongPingPongFlowRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + val (a, b) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + ).transpose().getOrThrow() + + val latch = CountDownLatch(1) + + // We kill -9 and restart the Pong node after a random sleep + val pongRestartThread = thread { + latch.await() + val ms = Random().nextInt(1000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + b.stop() + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + } + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { + val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue + latch.countDown() + // No matter the kill + returnValue.getOrThrow() + } + + pongRestartThread.join() + } + } + + sealed class RecursiveMode { + data class Top(val otherParty: Party, val initialDepth: Int) : RecursiveMode() + data class Recursive(val otherSession: FlowSession) : RecursiveMode() + } + + @StartableByRPC + @InitiatingFlow + @InitiatedBy(RecursiveB::class) + class RecursiveA(val mode: RecursiveMode) : FlowLogic() { + constructor(otherSession: FlowSession) : this(RecursiveMode.Recursive(otherSession)) + constructor(otherParty: Party, initialDepth: Int) : this(RecursiveMode.Top(otherParty, initialDepth)) + @Suspendable + override fun call(): String { + return when (mode) { + is HardRestartTest.RecursiveMode.Top -> { + val session = initiateFlow(mode.otherParty) + session.sendAndReceive(mode.initialDepth).unwrap { it } + } + is HardRestartTest.RecursiveMode.Recursive -> { + val depth = mode.otherSession.receive().unwrap { it } + val string = if (depth > 0) { + val newSession = initiateFlow(mode.otherSession.counterparty) + newSession.sendAndReceive(depth).unwrap { it } + } else { + "-" + } + mode.otherSession.send(string) + string + } + } + } + } + + @InitiatingFlow + @InitiatedBy(RecursiveA::class) + class RecursiveB(val otherSession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val depth = otherSession.receive().unwrap { it } + val newSession = initiateFlow(otherSession.counterparty) + val string = newSession.sendAndReceive(depth - 1).unwrap { it } + otherSession.send(string + ":" + depth) + } + } + + @Test + fun restartRecursiveFlowRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + val (a, b) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + ).transpose().getOrThrow() + + val latch = CountDownLatch(1) + + // We kill -9 and restart the node B after a random sleep + val bRestartThread = thread { + latch.await() + val ms = Random().nextInt(1000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + (b as OutOfProcess).process.destroyForcibly() + b.stop() + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + } + val executor = Executors.newFixedThreadPool(8) + try { + val tlRpc = ThreadLocal() + (1 .. 10).map { num -> + executor.fork { + val rpc = tlRpc.get() ?: CordaRPCClient(a.rpcAddress).start(demoUser.username, demoUser.password).proxy.also { tlRpc.set(it) } + val string = rpc.startFlow(::RecursiveA, b.nodeInfo.singleIdentity(), 10).returnValue.getOrThrow() + latch.countDown() + println("$num: $string") + } + }.transpose().getOrThrow() + + bRestartThread.join() + } finally { + executor.shutdown() + } + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 931e50e980..2f46010730 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -2,6 +2,7 @@ package net.corda.node.services.messaging import net.corda.core.messaging.MessageRecipients import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.services.statemachine.FlowMessagingImpl @@ -41,6 +42,7 @@ class MessagingExecutor( val amqDelayMillis = System.getProperty("amq.delivery.delay.ms", "0").toInt() } + @Synchronized fun send(message: Message, target: MessageRecipients) { val mqAddress = resolver.resolveTargetToArtemisQueue(target) val artemisMessage = cordaToArtemisMessage(message) @@ -51,7 +53,12 @@ class MessagingExecutor( producer.send(SimpleString(mqAddress), artemisMessage) } + @Synchronized fun acknowledge(message: ClientMessage) { + log.debug { + val id = message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID) + "Acking $id" + } message.individualAcknowledge() } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 69a31bc613..cd8a6cdf20 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -139,7 +139,12 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, val eventQueue = getTransientField(TransientValues::eventQueue) try { eventLoop@ while (true) { - val nextEvent = eventQueue.receive() + val nextEvent = try { + eventQueue.receive() + } catch (interrupted: InterruptedException) { + log.error("Flow interrupted while waiting for events, aborting immediately") + abortFiber() + } val continuation = processEvent(transitionExecutor, nextEvent) when (continuation) { is FlowContinuation.Resume -> return continuation.result @@ -166,7 +171,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, * processing finished. Purely used for internal invariant checks. */ @Suspendable - private fun processEventImmediately(event: Event, isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): FlowContinuation { + private fun processEventImmediately( + event: Event, + isDbTransactionOpenOnEntry: Boolean, + isDbTransactionOpenOnExit: Boolean): FlowContinuation { checkDbTransaction(isDbTransactionOpenOnEntry) val transitionExecutor = getTransientField(TransientValues::transitionExecutor) val continuation = processEvent(transitionExecutor, event) @@ -246,7 +254,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, processEventImmediately( Event.EnterSubFlow(subFlow.javaClass, createSubFlowVersion( - serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion)), + serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion + ) + ), isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = true ) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index eb6a659cdc..567a4c6edb 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -305,6 +305,7 @@ class SingleThreadedStateMachineManager( mutex.locked { if (flows.containsKey(id)) return@map null } val checkpoint = deserializeCheckpoint(serializedCheckpoint) if (checkpoint == null) return@map null + logger.debug { "Restored $checkpoint" } createFlowFromCheckpoint( id = id, checkpoint = checkpoint, @@ -353,7 +354,10 @@ class SingleThreadedStateMachineManager( // Just flow initiation message null } - externalEventMutex.withLock { + mutex.locked { + if (stopping) { + return + } // Remove any sessions the old flow has. for (sessionId in getFlowSessionIds(currentState.checkpoint)) { sessionToFlow.remove(sessionId) @@ -374,12 +378,13 @@ class SingleThreadedStateMachineManager( } } - private val externalEventMutex = ReentrantLock() override fun deliverExternalEvent(event: ExternalEvent) { - externalEventMutex.withLock { - when (event) { - is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event) - is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event) + mutex.locked { + if (!stopping) { + when (event) { + is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event) + is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event) + } } } } diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index ce33a39722..5fe68279e4 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -5,7 +5,7 @@ crlCheckSoftFail = true lazyBridgeStart = true dataSourceProperties = { dataSourceClassName = org.h2.jdbcx.JdbcDataSource - dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=100;AUTO_SERVER_PORT="${h2port} + dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;WRITE_DELAY=0;LOCK_TIMEOUT=10000;AUTO_SERVER_PORT="${h2port} dataSource.user = sa dataSource.password = "" } diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index 44afd368fb..80ed1acbf2 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -1,6 +1,7 @@ package net.corda.traderdemo import net.corda.client.rpc.CordaRPCClient +import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.millis import net.corda.finance.DOLLARS @@ -14,6 +15,7 @@ import net.corda.testing.core.DUMMY_BANK_B_NAME import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.InProcess +import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver import net.corda.testing.node.User import net.corda.testing.node.internal.poll @@ -71,4 +73,31 @@ class TraderDemoTest { assertThat(clientB.dollarCashBalance).isEqualTo(5.DOLLARS) } } + + @Test + fun `Tudor test`() { + driver(DriverParameters(isDebug = true, startNodesInProcess = false, extraCordappPackagesToScan = listOf("net.corda.finance"))) { + val demoUser = User("demo", "demo", setOf(startFlow(), all())) + val bankUser = User("user1", "test", permissions = setOf(all())) + val (nodeA, nodeB, bankNode) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)), + startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser)) + ).map { (it.getOrThrow() as OutOfProcess) } + + val nodeBRpc = CordaRPCClient(nodeB.rpcAddress).start(demoUser.username, demoUser.password).proxy + val nodeARpc = CordaRPCClient(nodeA.rpcAddress).start(demoUser.username, demoUser.password).proxy + val nodeBankRpc = let { + val client = CordaRPCClient(bankNode.rpcAddress) + client.start(bankUser.username, bankUser.password).proxy + } + + TraderDemoClientApi(nodeBankRpc).runIssuer(amount = 100.DOLLARS, buyerName = nodeA.nodeInfo.singleIdentity().name, sellerName = nodeB.nodeInfo.singleIdentity().name) + val stxFuture = nodeBRpc.startFlow(::SellerFlow, nodeA.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue + nodeARpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts + nodeA.stop() + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to nodeA.p2pAddress.toString())) + stxFuture.getOrThrow() + } + } }