diff --git a/docs/source/corda-configuration-file.rst b/docs/source/corda-configuration-file.rst index 8af79ccfa1..2f1b0ddbac 100644 --- a/docs/source/corda-configuration-file.rst +++ b/docs/source/corda-configuration-file.rst @@ -114,12 +114,14 @@ absolute path to the node's base directory. here must be externally accessible when running nodes across a cluster of machines. If the provided host is unreachable, the node will try to auto-discover its public one. -:p2pMessagingRetry: Only used for notarisation requests. When the response doesn't arrive in time, the message is - resent to a different notary-replica round-robin in case of clustered notaries. +:flowTimeout: When a flow implementing the ``TimedFlow`` interface does not complete in time, it is restarted from the + initial checkpoint. Currently only used for notarisation requests: if a notary replica dies while processing a notarisation request, + the client flow eventually times out and gets restarted. On restart the request is resent to a different notary replica + in a round-robin fashion (assuming the notary is clustered). - :messageRedeliveryDelay: The initial retry delay, e.g. `30 seconds`. - :maxRetryCount: How many retries to attempt. - :backoffBase: The base of the exponential backoff, `t_{wait} = messageRedeliveryDelay * backoffBase^{retryCount}`. + :timeout: The initial flow timeout period, e.g. `30 seconds`. + :maxRestartCount: Maximum number of times the flow will restart before resulting in an error. + :backoffBase: The base of the exponential backoff, `t_{wait} = timeout * backoffBase^{retryCount}`. :rpcAddress: The address of the RPC system on which RPC requests can be made to the node. If not provided then the node will run without RPC. This is now deprecated in favour of the ``rpcSettings`` block. diff --git a/node/src/integration-test/kotlin/net/corda/node/services/MultiThreadedTimedFlowTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/MultiThreadedTimedFlowTests.kt index b15fe3eb19..442f9c0e8a 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/MultiThreadedTimedFlowTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/MultiThreadedTimedFlowTests.kt @@ -61,9 +61,9 @@ class TimedFlowMultiThreadedSMMTests : IntegrationTest() { driver(DriverParameters(isDebug = true, startNodesInProcess = true, portAllocation = RandomFree)) { - val configOverrides = mapOf("p2pMessagingRetry" to mapOf( - "messageRedeliveryDelay" to Duration.ofSeconds(1), - "maxRetryCount" to 2, + val configOverrides = mapOf("flowTimeout" to mapOf( + "timeout" to Duration.ofSeconds(1), + "maxRestartCount" to 2, "backoffBase" to 1.0 )) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt index f0ffec54ec..27e18d45d2 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/TimedFlowTests.kt @@ -31,10 +31,9 @@ import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.seconds import net.corda.node.internal.StartedNode +import net.corda.node.services.config.FlowTimeoutConfiguration import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.NotaryConfig -import net.corda.node.services.config.P2PMessagingRetryConfiguration -import net.corda.node.services.vault.VaultQueryIntegrationTests import net.corda.nodeapi.internal.DevIdentityGenerator import net.corda.nodeapi.internal.network.NetworkParametersCopier import net.corda.testing.common.internal.testNetworkParameters @@ -43,13 +42,14 @@ import net.corda.testing.core.dummyCommand import net.corda.testing.core.singleIdentity import net.corda.testing.internal.GlobalDatabaseRule import net.corda.testing.internal.LogHelper -import net.corda.testing.internal.toDatabaseSchemaName import net.corda.testing.node.InMemoryMessagingNetwork import net.corda.testing.node.MockNetworkParameters import net.corda.testing.node.internal.InternalMockNetwork import net.corda.testing.node.internal.InternalMockNodeParameters import net.corda.testing.node.internal.startFlow -import org.junit.* +import org.junit.Before +import org.junit.ClassRule +import org.junit.Test import org.junit.rules.ExternalResource import org.junit.rules.RuleChain import org.slf4j.MDC @@ -85,8 +85,8 @@ class TimedFlowTestRule(val clusterSize: Int) : ExternalResource() { InternalMockNodeParameters( legalName = CordaX500Name("Alice", "AliceCorp", "GB"), configOverrides = { conf: NodeConfiguration -> - val retryConfig = P2PMessagingRetryConfiguration(10.seconds, 3, 1.0) - doReturn(retryConfig).whenever(conf).p2pMessagingRetry + val flowTimeoutConfig = FlowTimeoutConfiguration(10.seconds, 3, 1.0) + doReturn(flowTimeoutConfig).whenever(conf).flowTimeout } ) ) diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt index f2cf46adef..6ee3682456 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/HardRestartTest.kt @@ -6,12 +6,17 @@ import net.corda.core.flows.* import net.corda.core.identity.Party import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose +import net.corda.core.internal.div import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.unwrap import net.corda.node.services.Permissions -import net.corda.testing.core.* +import net.corda.testing.common.internal.ProjectStructure +import net.corda.testing.core.DUMMY_BANK_A_NAME +import net.corda.testing.core.DUMMY_BANK_B_NAME +import net.corda.testing.core.DUMMY_NOTARY_NAME +import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver @@ -20,12 +25,12 @@ import net.corda.testing.internal.IntegrationTestSchemas import net.corda.testing.internal.toDatabaseSchemaName import net.corda.testing.node.User import org.junit.ClassRule -import org.junit.Ignore import org.junit.Test import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.Executors import kotlin.concurrent.thread +import kotlin.test.assertEquals class HardRestartTest : IntegrationTest() { companion object { @@ -33,14 +38,20 @@ class HardRestartTest : IntegrationTest() { @JvmField val databaseSchemas = IntegrationTestSchemas(DUMMY_BANK_A_NAME.toDatabaseSchemaName(), DUMMY_BANK_B_NAME.toDatabaseSchemaName(), DUMMY_NOTARY_NAME.toDatabaseSchemaName()) + val logConfigFile = ProjectStructure.projectRootDir / "config" / "dev" / "log4j2.xml" } + @StartableByRPC @InitiatingFlow - class Ping(val pongParty: Party) : FlowLogic() { + class Ping(val pongParty: Party, val times: Int) : FlowLogic() { @Suspendable override fun call() { val pongSession = initiateFlow(pongParty) - pongSession.sendAndReceive(Unit) + pongSession.sendAndReceive(times) + for (i in 1 .. times) { + val j = pongSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } } } @@ -48,14 +59,18 @@ class HardRestartTest : IntegrationTest() { class Pong(val pingSession: FlowSession) : FlowLogic() { @Suspendable override fun call() { - pingSession.sendAndReceive(Unit) + val times = pingSession.sendAndReceive(Unit).unwrap { it } + for (i in 1 .. times) { + val j = pingSession.sendAndReceive(i).unwrap { it } + assertEquals(i, j) + } } } @Test - fun restartPingPongFlowRandomly() { + fun restartShortPingPongFlowRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) - driver(DriverParameters(isDebug = true, startNodesInProcess = false)) { + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { val (a, b) = listOf( startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) @@ -74,12 +89,74 @@ class HardRestartTest : IntegrationTest() { startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) } CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { - val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity()).returnValue + val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 1).returnValue latch.countDown() // No matter the kill returnValue.getOrThrow() } + pongRestartThread.join() + } + } + + @Test + fun restartLongPingPongFlowRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + val (a, b) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + ).transpose().getOrThrow() + + val latch = CountDownLatch(1) + + // We kill -9 and restart the Pong node after a random sleep + val pongRestartThread = thread { + latch.await() + val ms = Random().nextInt(1000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + (b as OutOfProcess).process.destroyForcibly() + b.stop() + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + } + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { + val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue + latch.countDown() + // No matter the kill + returnValue.getOrThrow() + } + + pongRestartThread.join() + } + } + + @Test + fun softRestartLongPingPongFlowRandomly() { + val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { + val (a, b) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + ).transpose().getOrThrow() + + val latch = CountDownLatch(1) + + // We kill -9 and restart the Pong node after a random sleep + val pongRestartThread = thread { + latch.await() + val ms = Random().nextInt(1000) + println("Sleeping $ms ms before kill") + Thread.sleep(ms.toLong()) + b.stop() + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) + } + CordaRPCClient(a.rpcAddress).use(demoUser.username, demoUser.password) { + val returnValue = it.proxy.startFlow(::Ping, b.nodeInfo.singleIdentity(), 100).returnValue + latch.countDown() + // No matter the kill + returnValue.getOrThrow() + } pongRestartThread.join() } @@ -133,7 +210,7 @@ class HardRestartTest : IntegrationTest() { @Test fun restartRecursiveFlowRandomly() { val demoUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) - driver(DriverParameters(isDebug = true, startNodesInProcess = false)) { + driver(DriverParameters(isDebug = true, startNodesInProcess = false, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString()))) { val (a, b) = listOf( startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:30000")), startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to "localhost:40000")) diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 87b350994f..b6f232a34b 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -50,7 +50,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val networkServices: NetworkServicesConfig? val certificateChainCheckPolicies: List val verifierType: VerifierType - val p2pMessagingRetry: P2PMessagingRetryConfiguration + val flowTimeout: FlowTimeoutConfiguration val notary: NotaryConfig? val additionalNodeInfoPollingFrequencyMsec: Long val p2pAddress: NetworkHostAndPort @@ -193,12 +193,11 @@ data class NetworkServicesConfig( /** * Currently only used for notarisation requests. * - * When the response doesn't arrive in time, the message is resent to a different notary-replica round-robin - * in case of clustered notaries. + * Specifies the configuration for timing out and restarting a [TimedFlow]. */ -data class P2PMessagingRetryConfiguration( - val messageRedeliveryDelay: Duration, - val maxRetryCount: Int, +data class FlowTimeoutConfiguration( + val timeout: Duration, + val maxRestartCount: Int, val backoffBase: Double ) @@ -221,7 +220,7 @@ data class NodeConfigurationImpl( override val rpcUsers: List, override val security: SecurityConfiguration? = null, override val verifierType: VerifierType, - override val p2pMessagingRetry: P2PMessagingRetryConfiguration, + override val flowTimeout: FlowTimeoutConfiguration, override val p2pAddress: NetworkHostAndPort, private val rpcAddress: NetworkHostAndPort? = null, private val rpcSettings: NodeRpcSettings, diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index a83dac8b67..9c554f08a0 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -15,6 +15,7 @@ import co.paralleluniverse.strands.SettableFuture import com.codahale.metrics.MetricRegistry import net.corda.core.messaging.MessageRecipients import net.corda.core.utilities.contextLogger +import net.corda.core.utilities.debug import net.corda.core.utilities.trace import net.corda.node.VersionInfo import net.corda.node.services.statemachine.FlowMessagingImpl @@ -189,6 +190,10 @@ class MessagingExecutor( } private fun acknowledgeJob(job: Job.Acknowledge) { + log.debug { + val id = job.message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID) + "Acking $id" + } job.message.individualAcknowledge() } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index b78d5da6a0..5f62db0917 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -149,7 +149,12 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, val eventQueue = getTransientField(TransientValues::eventQueue) try { eventLoop@ while (true) { - val nextEvent = eventQueue.receive() + val nextEvent = try { + eventQueue.receive() + } catch (interrupted: InterruptedException) { + log.error("Flow interrupted while waiting for events, aborting immediately") + abortFiber() + } val continuation = processEvent(transitionExecutor, nextEvent) when (continuation) { is FlowContinuation.Resume -> return continuation.result @@ -176,7 +181,10 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, * processing finished. Purely used for internal invariant checks. */ @Suspendable - private fun processEventImmediately(event: Event, isDbTransactionOpenOnEntry: Boolean, isDbTransactionOpenOnExit: Boolean): FlowContinuation { + private fun processEventImmediately( + event: Event, + isDbTransactionOpenOnEntry: Boolean, + isDbTransactionOpenOnExit: Boolean): FlowContinuation { checkDbTransaction(isDbTransactionOpenOnEntry) val transitionExecutor = getTransientField(TransientValues::transitionExecutor) val continuation = processEvent(transitionExecutor, event) @@ -256,7 +264,9 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, processEventImmediately( Event.EnterSubFlow(subFlow.javaClass, createSubFlowVersion( - serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion)), + serviceHub.cordappProvider.getCordappForFlow(subFlow), serviceHub.myInfo.platformVersion + ) + ), isDbTransactionOpenOnEntry = true, isDbTransactionOpenOnExit = true ) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index 835238333a..ac1f6ffd9a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -602,10 +602,10 @@ class MultiThreadedStateMachineManager( /** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */ private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> { - return with(serviceHub.configuration.p2pMessagingRetry) { - val timeoutDelaySeconds = messageRedeliveryDelay.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() + return with(serviceHub.configuration.flowTimeout) { + val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() timeoutScheduler.schedule({ - val event = Event.Error(FlowTimeoutException(maxRetryCount)) + val event = Event.Error(FlowTimeoutException(maxRestartCount)) flow.fiber.scheduleEvent(event) }, timeoutDelaySeconds, TimeUnit.SECONDS) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index feeb0329cc..e5695fea15 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -332,6 +332,7 @@ class SingleThreadedStateMachineManager( mutex.locked { if (flows.containsKey(id)) return@map null } val checkpoint = deserializeCheckpoint(serializedCheckpoint) if (checkpoint == null) return@map null + logger.debug { "Restored $checkpoint" } createFlowFromCheckpoint( id = id, checkpoint = checkpoint, @@ -380,7 +381,10 @@ class SingleThreadedStateMachineManager( // Just flow initiation message null } - externalEventMutex.withLock { + mutex.locked { + if (stopping) { + return + } // Remove any sessions the old flow has. for (sessionId in getFlowSessionIds(currentState.checkpoint)) { sessionToFlow.remove(sessionId) @@ -401,12 +405,13 @@ class SingleThreadedStateMachineManager( } } - private val externalEventMutex = ReentrantLock() override fun deliverExternalEvent(event: ExternalEvent) { - externalEventMutex.withLock { - when (event) { - is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event) - is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event) + mutex.locked { + if (!stopping) { + when (event) { + is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event) + is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event) + } } } } @@ -614,10 +619,10 @@ class SingleThreadedStateMachineManager( /** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */ private fun scheduleTimeoutException(flow: Flow, retryCount: Int): ScheduledFuture<*> { - return with(serviceHub.configuration.p2pMessagingRetry) { - val timeoutDelaySeconds = messageRedeliveryDelay.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() + return with(serviceHub.configuration.flowTimeout) { + val timeoutDelaySeconds = timeout.seconds * Math.pow(backoffBase, retryCount.toDouble()).toLong() timeoutScheduler.schedule({ - val event = Event.Error(FlowTimeoutException(maxRetryCount)) + val event = Event.Error(FlowTimeoutException(maxRestartCount)) flow.fiber.scheduleEvent(event) }, timeoutDelaySeconds, TimeUnit.SECONDS) } diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index c468290f06..f5c94d72e1 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -14,7 +14,7 @@ crlCheckSoftFail = true lazyBridgeStart = true dataSourceProperties = { dataSourceClassName = org.h2.jdbcx.JdbcDataSource - dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;LOCK_TIMEOUT=10000;WRITE_DELAY=0;AUTO_SERVER_PORT="${h2port} + dataSource.url = "jdbc:h2:file:"${baseDirectory}"/persistence;DB_CLOSE_ON_EXIT=FALSE;WRITE_DELAY=0;LOCK_TIMEOUT=10000;AUTO_SERVER_PORT="${h2port} dataSource.user = sa dataSource.password = "" } @@ -44,8 +44,8 @@ rpcSettings = { useSsl = false standAloneBroker = false } -p2pMessagingRetry { - messageRedeliveryDelay = 30 seconds - maxRetryCount = 3 +flowTimeout { + timeout = 30 seconds + maxRestartCount = 3 backoffBase = 2.0 } diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index 117ca66129..1b58ffcc38 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -280,7 +280,7 @@ class NodeConfigurationImplTest { verifierType = VerifierType.InMemory, p2pAddress = NetworkHostAndPort("localhost", 0), messagingServerAddress = null, - p2pMessagingRetry = P2PMessagingRetryConfiguration(5.seconds, 3, 1.0), + flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0), notary = null, devMode = true, noLocalShell = false, diff --git a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt index f4d7eb3133..d187aa42a7 100644 --- a/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/messaging/ArtemisMessagingTest.kt @@ -83,8 +83,7 @@ class ArtemisMessagingTest { doReturn(null).whenever(it).jmxMonitoringHttpPort doReturn(emptyList()).whenever(it).certificateChainCheckPolicies doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration - doReturn(P2PMessagingRetryConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).p2pMessagingRetry - + doReturn(FlowTimeoutConfiguration(5.seconds, 3, backoffBase = 1.0)).whenever(it).flowTimeout } LogHelper.setLevel(PersistentUniquenessProvider::class) database = configureDatabase(makeTestDataSourceProperties(), DatabaseConfig(runMigration = true), { null }, { null }) diff --git a/node/src/test/resources/working-config.conf b/node/src/test/resources/working-config.conf index f2c1333136..47c0f9e731 100644 --- a/node/src/test/resources/working-config.conf +++ b/node/src/test/resources/working-config.conf @@ -24,9 +24,9 @@ rpcSettings = { useSsl = false standAloneBroker = false } -p2pMessagingRetry { - messageRedeliveryDelay = 30 seconds - maxRetryCount = 3 +flowTimeout { + timeout = 30 seconds + maxRestartCount = 3 backoffBase = 2.0 } enterpriseConfiguration = { diff --git a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt index 825ddf1f44..725a14c756 100644 --- a/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt +++ b/samples/trader-demo/src/integration-test/kotlin/net/corda/traderdemo/TraderDemoTest.kt @@ -11,6 +11,7 @@ package net.corda.traderdemo import net.corda.client.rpc.CordaRPCClient +import net.corda.core.messaging.startFlow import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.millis import net.corda.finance.DOLLARS @@ -25,6 +26,7 @@ import net.corda.testing.core.DUMMY_BANK_B_NAME import net.corda.testing.core.singleIdentity import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.InProcess +import net.corda.testing.driver.OutOfProcess import net.corda.testing.driver.driver import net.corda.testing.internal.IntegrationTest import net.corda.testing.internal.IntegrationTestSchemas @@ -91,4 +93,31 @@ class TraderDemoTest : IntegrationTest() { assertThat(clientB.dollarCashBalance).isEqualTo(5.DOLLARS) } } + + @Test + fun `Tudor test`() { + driver(DriverParameters(isDebug = true, startNodesInProcess = false, extraCordappPackagesToScan = listOf("net.corda.finance"))) { + val demoUser = User("demo", "demo", setOf(startFlow(), all())) + val bankUser = User("user1", "test", permissions = setOf(all())) + val (nodeA, nodeB, bankNode) = listOf( + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser)), + startNode(providedName = DUMMY_BANK_B_NAME, rpcUsers = listOf(demoUser)), + startNode(providedName = BOC_NAME, rpcUsers = listOf(bankUser)) + ).map { (it.getOrThrow() as OutOfProcess) } + + val nodeBRpc = CordaRPCClient(nodeB.rpcAddress).start(demoUser.username, demoUser.password).proxy + val nodeARpc = CordaRPCClient(nodeA.rpcAddress).start(demoUser.username, demoUser.password).proxy + val nodeBankRpc = let { + val client = CordaRPCClient(bankNode.rpcAddress) + client.start(bankUser.username, bankUser.password).proxy + } + + TraderDemoClientApi(nodeBankRpc).runIssuer(amount = 100.DOLLARS, buyerName = nodeA.nodeInfo.singleIdentity().name, sellerName = nodeB.nodeInfo.singleIdentity().name) + val stxFuture = nodeBRpc.startFlow(::SellerFlow, nodeA.nodeInfo.singleIdentity(), 5.DOLLARS).returnValue + nodeARpc.stateMachinesFeed().updates.toBlocking().first() // wait until initiated flow starts + nodeA.stop() + startNode(providedName = DUMMY_BANK_A_NAME, rpcUsers = listOf(demoUser), customOverrides = mapOf("p2pAddress" to nodeA.p2pAddress.toString())) + stxFuture.getOrThrow() + } + } } diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 8503cf0c77..005962fa2c 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -493,7 +493,7 @@ private fun mockNodeConfiguration(): NodeConfiguration { doReturn(null).whenever(it).networkServices doReturn(VerifierType.InMemory).whenever(it).verifierType // Set to be long enough so retries don't trigger unless we override it - doReturn(P2PMessagingRetryConfiguration(1.hours, 3, backoffBase = 2.0)).whenever(it).p2pMessagingRetry + doReturn(FlowTimeoutConfiguration(1.hours, 3, backoffBase = 1.0)).whenever(it).flowTimeout doReturn(5.seconds.toMillis()).whenever(it).additionalNodeInfoPollingFrequencyMsec doReturn(null).whenever(it).devModeOptions doReturn(EnterpriseConfiguration(