diff --git a/core/src/main/kotlin/net/corda/core/internal/messaging/InternalCordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/internal/messaging/InternalCordaRPCOps.kt index e3ab065422..8f92d54c32 100644 --- a/core/src/main/kotlin/net/corda/core/internal/messaging/InternalCordaRPCOps.kt +++ b/core/src/main/kotlin/net/corda/core/internal/messaging/InternalCordaRPCOps.kt @@ -1,6 +1,5 @@ package net.corda.core.internal.messaging -import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.AttachmentTrustInfo import net.corda.core.messaging.CordaRPCOps @@ -14,11 +13,4 @@ interface InternalCordaRPCOps : CordaRPCOps { /** Get all attachment trust information */ val attachmentTrustInfos: List - - /** - * Resume a paused flow. - * - * @return whether the flow was successfully resumed. - */ - fun unPauseFlow(id: StateMachineRunId): Boolean } \ No newline at end of file diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowPausingTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowPausingTest.kt deleted file mode 100644 index c2961a8045..0000000000 --- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowPausingTest.kt +++ /dev/null @@ -1,113 +0,0 @@ -package net.corda.node.services.statemachine - -import co.paralleluniverse.fibers.Suspendable -import net.corda.core.flows.FlowLogic -import net.corda.core.flows.FlowSession -import net.corda.core.flows.InitiatedBy -import net.corda.core.flows.InitiatingFlow -import net.corda.core.flows.StartableByRPC -import net.corda.core.flows.StateMachineRunId -import net.corda.core.identity.Party -import net.corda.core.internal.messaging.InternalCordaRPCOps -import net.corda.core.messaging.CordaRPCOps -import net.corda.core.messaging.startFlow -import net.corda.core.utilities.getOrThrow -import net.corda.core.utilities.unwrap -import net.corda.node.services.Permissions -import net.corda.testing.core.ALICE_NAME -import net.corda.testing.core.BOB_NAME -import net.corda.testing.driver.DriverParameters -import net.corda.testing.driver.NodeParameters -import net.corda.testing.driver.driver -import net.corda.testing.node.User -import org.junit.Test -import java.time.Duration -import kotlin.test.assertEquals -import kotlin.test.assertNotNull - -class FlowPausingTest { - - companion object { - val TOTAL_MESSAGES = 100 - val SLEEP_BETWEEN_MESSAGES_MS = 10L - } - - @Test(timeout = 300_000) - fun `Paused flows can recieve session messages`() { - val rpcUser = User("demo", "demo", setOf(Permissions.startFlow(), Permissions.all())) - driver(DriverParameters(startNodesInProcess = true, inMemoryDB = false)) { - val alice = startNode(NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser))).getOrThrow() - val bob = startNode(NodeParameters(providedName = BOB_NAME, rpcUsers = listOf(rpcUser))) - val startedBob = bob.getOrThrow() - val aliceFlow = alice.rpc.startFlow(::HeartbeatFlow, startedBob.nodeInfo.legalIdentities[0]) - // We wait here for the initiated flow to start running on bob - val initiatedFlowId = startedBob.rpc.waitForFlowToStart(150) - assertNotNull(initiatedFlowId) - /* We shut down bob, we want this to happen before bob has finished receiving all of the heartbeats. - This is a Race but if bob finishes too quickly then we will fail to unpause the initiated flow running on BOB latter - and this test will fail.*/ - startedBob.stop() - //Start bob backup in Safe mode. This means no flows will run but BOB should receive messages and queue these up. - val restartedBob = startNode(NodeParameters( - providedName = BOB_NAME, - rpcUsers = listOf(rpcUser), - customOverrides = mapOf("smmStartMode" to "Safe"))).getOrThrow() - - //Sleep for long enough so BOB has time to receive all the messages. - //All messages in this period should be queued up and replayed when the flow is unpaused. - Thread.sleep(TOTAL_MESSAGES * SLEEP_BETWEEN_MESSAGES_MS) - //ALICE should not have finished yet as the HeartbeatResponderFlow should not have sent the final message back (as it is paused). - assertEquals(false, aliceFlow.returnValue.isDone) - assertEquals(true, (restartedBob.rpc as InternalCordaRPCOps).unPauseFlow(initiatedFlowId!!)) - - assertEquals(true, aliceFlow.returnValue.getOrThrow()) - alice.stop() - restartedBob.stop() - } - } - - fun CordaRPCOps.waitForFlowToStart(maxTrys: Int): StateMachineRunId? { - for (i in 1..maxTrys) { - val snapshot = this.stateMachinesSnapshot().singleOrNull() - if (snapshot == null) { - Thread.sleep(SLEEP_BETWEEN_MESSAGES_MS) - } else { - return snapshot.id - } - } - return null - } - - @StartableByRPC - @InitiatingFlow - class HeartbeatFlow(private val otherParty: Party): FlowLogic() { - var sequenceNumber = 0 - @Suspendable - override fun call(): Boolean { - val session = initiateFlow(otherParty) - for (i in 1..TOTAL_MESSAGES) { - session.send(sequenceNumber++) - sleep(Duration.ofMillis(10)) - } - val success = session.receive().unwrap{data -> data} - return success - } - } - - @InitiatedBy(HeartbeatFlow::class) - class HeartbeatResponderFlow(val session: FlowSession): FlowLogic() { - var sequenceNumber : Int = 0 - @Suspendable - override fun call() { - var pass = true - for (i in 1..TOTAL_MESSAGES) { - val receivedSequenceNumber = session.receive().unwrap{data -> data} - if (receivedSequenceNumber != sequenceNumber) { - pass = false - } - sequenceNumber++ - } - session.send(pass) - } - } -} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/NodeCmdLineOptions.kt b/node/src/main/kotlin/net/corda/node/NodeCmdLineOptions.kt index 88f2b924f8..e8f2feeedc 100644 --- a/node/src/main/kotlin/net/corda/node/NodeCmdLineOptions.kt +++ b/node/src/main/kotlin/net/corda/node/NodeCmdLineOptions.kt @@ -14,7 +14,6 @@ import net.corda.node.services.config.ConfigHelper import net.corda.node.services.config.NodeConfiguration import net.corda.node.services.config.Valid import net.corda.node.services.config.parseAsNodeConfiguration -import net.corda.node.services.statemachine.StateMachineManager import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy import picocli.CommandLine.Option import java.nio.file.Path @@ -201,9 +200,6 @@ open class NodeCmdLineOptions : SharedNodeCmdLineOptions() { devMode?.let { configOverrides += "devMode" to it } - if (safeMode) { - configOverrides += "smmStartMode" to StateMachineManager.StartMode.Safe.toString() - } return try { valid(ConfigHelper.loadConfig(baseDirectory, configFile, configOverrides = ConfigFactory.parseMap(configOverrides))) } catch (e: ConfigException) { diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index a517d4f3d0..64be5c3691 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -581,7 +581,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, tokenizableServices = null verifyCheckpointsCompatible(frozenTokenizableServices) - val smmStartedFuture = smm.start(frozenTokenizableServices, configuration.smmStartMode) + val smmStartedFuture = smm.start(frozenTokenizableServices) // Shut down the SMM so no Fibers are scheduled. runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) } val flowMonitor = FlowMonitor( diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt index 3ee3126c29..571c97b82c 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt @@ -169,8 +169,6 @@ internal class CordaRPCOpsImpl( override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id) - override fun unPauseFlow(id: StateMachineRunId): Boolean = smm.unPauseFlow(id) - override fun stateMachinesFeed(): DataFeed, StateMachineUpdate> { val (allStateMachines, changes) = smm.track() diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index 39d2c04a93..f2dc3f16cb 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -11,7 +11,6 @@ import net.corda.core.internal.notary.NotaryServiceFlow import net.corda.core.utilities.NetworkHostAndPort import net.corda.node.services.config.rpc.NodeRpcOptions import net.corda.node.services.config.schema.v1.V1NodeConfigurationSpec -import net.corda.node.services.statemachine.StateMachineManager import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier import net.corda.nodeapi.internal.config.MutualSslConfiguration import net.corda.nodeapi.internal.config.User @@ -94,8 +93,6 @@ interface NodeConfiguration : ConfigurationWithOptionsContainer { val quasarExcludePackages: List - val smmStartMode: StateMachineManager.StartMode - companion object { // default to at least 8MB and a bit extra for larger heap sizes val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory() diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt index e6893094fd..782c1a40dc 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfigurationImpl.kt @@ -8,7 +8,6 @@ import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor import net.corda.core.utilities.seconds import net.corda.node.services.config.rpc.NodeRpcOptions -import net.corda.node.services.statemachine.StateMachineManager import net.corda.nodeapi.BrokerRpcSslOptions import net.corda.nodeapi.internal.DEV_PUB_KEY_HASHES import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier @@ -84,8 +83,7 @@ data class NodeConfigurationImpl( override val blacklistedAttachmentSigningKeys: List = Defaults.blacklistedAttachmentSigningKeys, override val configurationWithOptions: ConfigurationWithOptions, override val flowExternalOperationThreadPoolSize: Int = Defaults.flowExternalOperationThreadPoolSize, - override val quasarExcludePackages: List = Defaults.quasarExcludePackages, - override val smmStartMode : StateMachineManager.StartMode = Defaults.smmStartMode + override val quasarExcludePackages: List = Defaults.quasarExcludePackages ) : NodeConfiguration { internal object Defaults { val jmxMonitoringHttpPort: Int? = null @@ -124,7 +122,6 @@ data class NodeConfigurationImpl( val blacklistedAttachmentSigningKeys: List = emptyList() const val flowExternalOperationThreadPoolSize: Int = 1 val quasarExcludePackages: List = emptyList() - val smmStartMode : StateMachineManager.StartMode = StateMachineManager.StartMode.ExcludingPaused fun cordappsDirectories(baseDirectory: Path) = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT) diff --git a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt index a5bde3e836..b4c5477e14 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/schema/v1/V1NodeConfigurationSpec.kt @@ -9,7 +9,6 @@ import net.corda.common.validation.internal.Validated.Companion.valid import net.corda.node.services.config.* import net.corda.node.services.config.NodeConfigurationImpl.Defaults import net.corda.node.services.config.schema.parsers.* -import net.corda.node.services.statemachine.StateMachineManager internal object V1NodeConfigurationSpec : Configuration.Specification("NodeConfiguration") { private val myLegalName by string().mapValid(::toCordaX500Name) @@ -67,7 +66,6 @@ internal object V1NodeConfigurationSpec : Configuration.Specification - doReturn(smmStartMode).whenever(conf).smmStartMode - }) - return mockNet.restartNode(node, parameters = parameters) - } - - @Test(timeout = 300_000) - fun `All are paused when the node is restarted in safe start mode`() { - val flows = ArrayList>() - for (i in 1..NUMBER_OF_FLOWS) { - flows += aliceNode.services.startFlow(CheckpointingFlow()) - } - //All of the flows must not resume before the node restarts. - val restartedAlice = restartNode(aliceNode, StateMachineManager.StartMode.Safe) - assertEquals(0, restartedAlice.smm.snapshot().size) - //We need to wait long enough here so any running flows would finish. - Thread.sleep(NUMBER_OF_FLOWS * SLEEP_TIME) - restartedAlice.database.transaction { - for (flow in flows) { - val checkpoint = restartedAlice.internals.checkpointStorage.getCheckpoint(flow.id) - assertEquals(Checkpoint.FlowStatus.PAUSED, checkpoint!!.status) - } - } - } - - internal class CheckpointingFlow: FlowLogic() { - @Suspendable - override fun call() { - sleep(Duration.ofMillis(SLEEP_TIME)) - } - } -} diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt index 5ab5bfca80..5c3c3df0ce 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt @@ -641,7 +641,6 @@ private fun mockNodeConfiguration(certificatesDirectory: Path): NodeConfiguratio doReturn(NetworkParameterAcceptanceSettings()).whenever(it).networkParameterAcceptanceSettings doReturn(rigorousMock()).whenever(it).configurationWithOptions doReturn(2).whenever(it).flowExternalOperationThreadPoolSize - doReturn(StateMachineManager.StartMode.ExcludingPaused).whenever(it).smmStartMode } }