diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt index f2cf46adef..6ee3682456 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt @@ -6,12 +6,17 @@ import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose +import net.corda.core.internal.div import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.services.Permissions -import net.corda.testing.core.* +import net.corda.testing.common.internal.ProjectStructure +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.DUMMY_BANK_B_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver @@ -20,12 +25,12 @@ import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.internal.toDatabaseSchemaName import net.corda.testing.node.User import org.junit.ClassRule -import org.junit.Ignore import org.junit.Test import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors import kotlin.concurrent.thread +import kotlin.test.assertEquals class HardRestartTest : IntegrationTest() { companion object { @@ -33,14 +38,20 @@ class HardRestartTest : IntegrationTest() { @JvmField val databaseSchemas = IntegrationTestSchemas(DUMMY_BANK_A_NAME.toDatabaseSchemaName(), DUMMY_BANK_B_NAME.toDatabaseSchemaName(), DUMMY_NOTARY_NAME.toDatabaseSchemaName()) + val logConfigFile = ProjectStructure.projectRootDir / "config" / "dev" / "log4j2.xml" } + @StartableByRPC @InitiatingFlow - class Ping(val pongParty: Party) : FlowLogic() { + class Ping(val pongParty: Party, val times: Int) : FlowLogic() { @Suspendable override fun call() { val pongSession = initiateFlow(pongParty) - pongSession.sendAndReceive(Unit) + pongSession.sendAndReceive(times) + for (i in 1 .. times) { + val j = pongSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } } } @@ -48,14 +59,18 @@ class HardRestartTest : IntegrationTest() { class Pong(val pingSession: FlowSession) : FlowLogic() { @Suspendable override fun call() { - pingSession.sendAndReceive(Unit) + val times = pingSession.sendAndReceive(Unit).unwrap { it } + for (i in 1 .. times) { + val j = pingSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } } } @Test - fun restartPingPongFlowRandomly() { + fun restartShortPingPongFlowRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) - driver(DriverParameters(isDebug = true, startNodesInProcess = false)) { + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { val (a, b) = listOf( startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) @@ -74,12 +89,74 @@ class HardRestartTest : IntegrationTest() { startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) } CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { - val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity()).returnValue + val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 1).returnValue latch.countDown() // No matter the kill returnValue.getOrThrow() } + pongRestartThread.join() + } + } + + @Test + fun restartLongPingPongFlowRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + val (a, b) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + ).transpose().getOrThrow() + + val latch = CountDownLatch(1) + + // We kill -9 and restart the Pong node after a random sleep + val pongRestartThread = thread { + latch.await() + val ms = Random().nextInt(1000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + (b as OutOfProcess).process.destroyForcibly() + b.stop() + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + } + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { + val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue + latch.countDown() + // No matter the kill + returnValue.getOrThrow() + } + + pongRestartThread.join() + } + } + + @Test + fun softRestartLongPingPongFlowRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + val (a, b) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + ).transpose().getOrThrow() + + val latch = CountDownLatch(1) + + // We kill -9 and restart the Pong node after a random sleep + val pongRestartThread = thread { + latch.await() + val ms = Random().nextInt(1000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + b.stop() + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + } + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { + val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue + latch.countDown() + // No matter the kill + returnValue.getOrThrow() + } pongRestartThread.join() } @@ -133,7 +210,7 @@ class HardRestartTest : IntegrationTest() { @Test fun restartRecursiveFlowRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) - driver(DriverParameters(isDebug = true, startNodesInProcess = false)) { + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { val (a, b) = listOf( startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index a83dac8b67..9c554f08a0 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -15,6 +15,7 @@ import co.paralleluniverse.strands.SettableFuture import com.codahale.metrics.MetricRegistry import net.corda.core.messaging.MessageRecipients import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.services.statemachine.FlowMessagingImpl @@ -189,6 +190,10 @@ class MessagingExecutor( } private fun acknowledgeJob(job: Job.Acknowledge) { + log.debug { + val id = job.message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID) + "Acking $id" + } job.message.individualAcknowledge() } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index b78d5da6a0..5f62db0917 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -149,7 +149,12 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, val eventQueue = getTransientField(TransientValues::eventQueue) try { eventLoop@ while (true) { - val nextEvent = eventQueue.receive() + val nextEvent = try { + eventQueue.receive() + } catch (interrupted: InterruptedException) { + log.error("Flow interrupted while waiting for events, aborting immediately") + abortFiber() + } val continuation = processEvent(transitionExecutor, nextEvent) when (continuation) { is FlowContinuation.Resume -> return continuation.result @@ -176,7 +181,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, * processing finished. Purely used for internal invariant checks. */ @Suspendable - private fun processEventImmediately(event: Event, isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): FlowContinuation { + private fun processEventImmediately( + event: Event, + isDbTransactionOpenOnEntry: Boolean, + isDbTransactionOpenOnExit: Boolean): FlowContinuation { checkDbTransaction(isDbTransactionOpenOnEntry) val transitionExecutor = getTransientField(TransientValues::transitionExecutor) val continuation = processEvent(transitionExecutor, event) @@ -256,7 +264,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, processEventImmediately( Event.EnterSubFlow(subFlow.javaClass, createSubFlowVersion( - serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion)), + serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion + ) + ), isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = true ) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index feeb0329cc..a8a2db1854 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -332,6 +332,7 @@ class SingleThreadedStateMachineManager( mutex.locked { if (flows.containsKey(id)) return@map null } val checkpoint = deserializeCheckpoint(serializedCheckpoint) if (checkpoint == null) return@map null + logger.debug { "Restored $checkpoint" } createFlowFromCheckpoint( id = id, checkpoint = checkpoint, @@ -380,7 +381,10 @@ class SingleThreadedStateMachineManager( // Just flow initiation message null } - externalEventMutex.withLock { + mutex.locked { + if (stopping) { + return + } // Remove any sessions the old flow has. for (sessionId in getFlowSessionIds(currentState.checkpoint)) { sessionToFlow.remove(sessionId) @@ -401,12 +405,13 @@ class SingleThreadedStateMachineManager( } } - private val externalEventMutex = ReentrantLock() override fun deliverExternalEvent(event: ExternalEvent) { - externalEventMutex.withLock { - when (event) { - is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event) - is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event) + mutex.locked { + if (!stopping) { + when (event) { + is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event) + is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event) + } } } } diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index c468290f06..a0d651cfaf 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -14,7 +14,7 @@ crlCheckSoftFail = true lazyBridgeStart = true dataSourceProperties = { dataSourceClassName = org.h2.jdbcx.JdbcDataSource - dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=0;AUTO_SERVER_PORT="${h2port} + dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;WRITE_DELAY=0;LOCK_TIMEOUT=10000;AUTO_SERVER_PORT="${h2port} dataSource.user = sa dataSource.password = "" } diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index 825ddf1f44..725a14c756 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -11,6 +11,7 @@ package net.corda.traderdemo import net.corda.client.rpc.CordaRPCClient +import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.millis import net.corda.finance.DOLLARS @@ -25,6 +26,7 @@ import net.corda.testing.core.DUMMY_BANK_B_NAME import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.InProcess +import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTestSchemas @@ -91,4 +93,31 @@ class TraderDemoTest : IntegrationTest() { assertThat(clientB.dollarCashBalance).isEqualTo(5.DOLLARS) } } + + @Test + fun `Tudor test`() { + driver(DriverParameters(isDebug = true, startNodesInProcess = false, extraCordappPackagesToScan = listOf("net.corda.finance"))) { + val demoUser = User("demo", "demo", setOf(startFlow(), all())) + val bankUser = User("user1", "test", permissions = setOf(all())) + val (nodeA, nodeB, bankNode) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)), + startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser)) + ).map { (it.getOrThrow() as OutOfProcess) } + + val nodeBRpc = CordaRPCClient(nodeB.rpcAddress).start(demoUser.username, demoUser.password).proxy + val nodeARpc = CordaRPCClient(nodeA.rpcAddress).start(demoUser.username, demoUser.password).proxy + val nodeBankRpc = let { + val client = CordaRPCClient(bankNode.rpcAddress) + client.start(bankUser.username, bankUser.password).proxy + } + + TraderDemoClientApi(nodeBankRpc).runIssuer(amount = 100.DOLLARS, buyerName = nodeA.nodeInfo.singleIdentity().name, sellerName = nodeB.nodeInfo.singleIdentity().name) + val stxFuture = nodeBRpc.startFlow(::SellerFlow, nodeA.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue + nodeARpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts + nodeA.stop() + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to nodeA.p2pAddress.toString())) + stxFuture.getOrThrow() + } + } }