mirror of
https://github.com/corda/corda.git
synced 2025-06-05 17:01:45 +00:00
NOTICK Restrict to shell commands (#6303)
Remove shell command for pausing flows
This commit is contained in:
parent
9c4a76d367
commit
0554c98d18
@ -1,6 +1,5 @@
|
|||||||
package net.corda.core.internal.messaging
|
package net.corda.core.internal.messaging
|
||||||
|
|
||||||
import net.corda.core.flows.StateMachineRunId
|
|
||||||
import net.corda.core.internal.AttachmentTrustInfo
|
import net.corda.core.internal.AttachmentTrustInfo
|
||||||
import net.corda.core.messaging.CordaRPCOps
|
import net.corda.core.messaging.CordaRPCOps
|
||||||
|
|
||||||
@ -14,11 +13,4 @@ interface InternalCordaRPCOps : CordaRPCOps {
|
|||||||
|
|
||||||
/** Get all attachment trust information */
|
/** Get all attachment trust information */
|
||||||
val attachmentTrustInfos: List<AttachmentTrustInfo>
|
val attachmentTrustInfos: List<AttachmentTrustInfo>
|
||||||
|
|
||||||
/**
|
|
||||||
* Resume a paused flow.
|
|
||||||
*
|
|
||||||
* @return whether the flow was successfully resumed.
|
|
||||||
*/
|
|
||||||
fun unPauseFlow(id: StateMachineRunId): Boolean
|
|
||||||
}
|
}
|
@ -1,113 +0,0 @@
|
|||||||
package net.corda.node.services.statemachine
|
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
|
||||||
import net.corda.core.flows.FlowLogic
|
|
||||||
import net.corda.core.flows.FlowSession
|
|
||||||
import net.corda.core.flows.InitiatedBy
|
|
||||||
import net.corda.core.flows.InitiatingFlow
|
|
||||||
import net.corda.core.flows.StartableByRPC
|
|
||||||
import net.corda.core.flows.StateMachineRunId
|
|
||||||
import net.corda.core.identity.Party
|
|
||||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
|
||||||
import net.corda.core.messaging.CordaRPCOps
|
|
||||||
import net.corda.core.messaging.startFlow
|
|
||||||
import net.corda.core.utilities.getOrThrow
|
|
||||||
import net.corda.core.utilities.unwrap
|
|
||||||
import net.corda.node.services.Permissions
|
|
||||||
import net.corda.testing.core.ALICE_NAME
|
|
||||||
import net.corda.testing.core.BOB_NAME
|
|
||||||
import net.corda.testing.driver.DriverParameters
|
|
||||||
import net.corda.testing.driver.NodeParameters
|
|
||||||
import net.corda.testing.driver.driver
|
|
||||||
import net.corda.testing.node.User
|
|
||||||
import org.junit.Test
|
|
||||||
import java.time.Duration
|
|
||||||
import kotlin.test.assertEquals
|
|
||||||
import kotlin.test.assertNotNull
|
|
||||||
|
|
||||||
class FlowPausingTest {
|
|
||||||
|
|
||||||
companion object {
|
|
||||||
val TOTAL_MESSAGES = 100
|
|
||||||
val SLEEP_BETWEEN_MESSAGES_MS = 10L
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 300_000)
|
|
||||||
fun `Paused flows can recieve session messages`() {
|
|
||||||
val rpcUser = User("demo", "demo", setOf(Permissions.startFlow<HardRestartTest.Ping>(), Permissions.all()))
|
|
||||||
driver(DriverParameters(startNodesInProcess = true, inMemoryDB = false)) {
|
|
||||||
val alice = startNode(NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser))).getOrThrow()
|
|
||||||
val bob = startNode(NodeParameters(providedName = BOB_NAME, rpcUsers = listOf(rpcUser)))
|
|
||||||
val startedBob = bob.getOrThrow()
|
|
||||||
val aliceFlow = alice.rpc.startFlow(::HeartbeatFlow, startedBob.nodeInfo.legalIdentities[0])
|
|
||||||
// We wait here for the initiated flow to start running on bob
|
|
||||||
val initiatedFlowId = startedBob.rpc.waitForFlowToStart(150)
|
|
||||||
assertNotNull(initiatedFlowId)
|
|
||||||
/* We shut down bob, we want this to happen before bob has finished receiving all of the heartbeats.
|
|
||||||
This is a Race but if bob finishes too quickly then we will fail to unpause the initiated flow running on BOB latter
|
|
||||||
and this test will fail.*/
|
|
||||||
startedBob.stop()
|
|
||||||
//Start bob backup in Safe mode. This means no flows will run but BOB should receive messages and queue these up.
|
|
||||||
val restartedBob = startNode(NodeParameters(
|
|
||||||
providedName = BOB_NAME,
|
|
||||||
rpcUsers = listOf(rpcUser),
|
|
||||||
customOverrides = mapOf("smmStartMode" to "Safe"))).getOrThrow()
|
|
||||||
|
|
||||||
//Sleep for long enough so BOB has time to receive all the messages.
|
|
||||||
//All messages in this period should be queued up and replayed when the flow is unpaused.
|
|
||||||
Thread.sleep(TOTAL_MESSAGES * SLEEP_BETWEEN_MESSAGES_MS)
|
|
||||||
//ALICE should not have finished yet as the HeartbeatResponderFlow should not have sent the final message back (as it is paused).
|
|
||||||
assertEquals(false, aliceFlow.returnValue.isDone)
|
|
||||||
assertEquals(true, (restartedBob.rpc as InternalCordaRPCOps).unPauseFlow(initiatedFlowId!!))
|
|
||||||
|
|
||||||
assertEquals(true, aliceFlow.returnValue.getOrThrow())
|
|
||||||
alice.stop()
|
|
||||||
restartedBob.stop()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fun CordaRPCOps.waitForFlowToStart(maxTrys: Int): StateMachineRunId? {
|
|
||||||
for (i in 1..maxTrys) {
|
|
||||||
val snapshot = this.stateMachinesSnapshot().singleOrNull()
|
|
||||||
if (snapshot == null) {
|
|
||||||
Thread.sleep(SLEEP_BETWEEN_MESSAGES_MS)
|
|
||||||
} else {
|
|
||||||
return snapshot.id
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
|
|
||||||
@StartableByRPC
|
|
||||||
@InitiatingFlow
|
|
||||||
class HeartbeatFlow(private val otherParty: Party): FlowLogic<Boolean>() {
|
|
||||||
var sequenceNumber = 0
|
|
||||||
@Suspendable
|
|
||||||
override fun call(): Boolean {
|
|
||||||
val session = initiateFlow(otherParty)
|
|
||||||
for (i in 1..TOTAL_MESSAGES) {
|
|
||||||
session.send(sequenceNumber++)
|
|
||||||
sleep(Duration.ofMillis(10))
|
|
||||||
}
|
|
||||||
val success = session.receive<Boolean>().unwrap{data -> data}
|
|
||||||
return success
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@InitiatedBy(HeartbeatFlow::class)
|
|
||||||
class HeartbeatResponderFlow(val session: FlowSession): FlowLogic<Unit>() {
|
|
||||||
var sequenceNumber : Int = 0
|
|
||||||
@Suspendable
|
|
||||||
override fun call() {
|
|
||||||
var pass = true
|
|
||||||
for (i in 1..TOTAL_MESSAGES) {
|
|
||||||
val receivedSequenceNumber = session.receive<Int>().unwrap{data -> data}
|
|
||||||
if (receivedSequenceNumber != sequenceNumber) {
|
|
||||||
pass = false
|
|
||||||
}
|
|
||||||
sequenceNumber++
|
|
||||||
}
|
|
||||||
session.send(pass)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -14,7 +14,6 @@ import net.corda.node.services.config.ConfigHelper
|
|||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.config.Valid
|
import net.corda.node.services.config.Valid
|
||||||
import net.corda.node.services.config.parseAsNodeConfiguration
|
import net.corda.node.services.config.parseAsNodeConfiguration
|
||||||
import net.corda.node.services.statemachine.StateMachineManager
|
|
||||||
import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy
|
import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy
|
||||||
import picocli.CommandLine.Option
|
import picocli.CommandLine.Option
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
@ -49,12 +48,6 @@ open class SharedNodeCmdLineOptions {
|
|||||||
)
|
)
|
||||||
var devMode: Boolean? = null
|
var devMode: Boolean? = null
|
||||||
|
|
||||||
@Option(
|
|
||||||
names = ["--pause-all-flows"],
|
|
||||||
description = ["Do not run any flows on startup. Sets all flows to paused, which can be unpaused via RPC."]
|
|
||||||
)
|
|
||||||
var safeMode: Boolean = false
|
|
||||||
|
|
||||||
open fun parseConfiguration(configuration: Config): Valid<NodeConfiguration> {
|
open fun parseConfiguration(configuration: Config): Valid<NodeConfiguration> {
|
||||||
val option = Configuration.Options(strict = unknownConfigKeysPolicy == UnknownConfigKeysPolicy.FAIL)
|
val option = Configuration.Options(strict = unknownConfigKeysPolicy == UnknownConfigKeysPolicy.FAIL)
|
||||||
return configuration.parseAsNodeConfiguration(option)
|
return configuration.parseAsNodeConfiguration(option)
|
||||||
@ -193,9 +186,6 @@ open class NodeCmdLineOptions : SharedNodeCmdLineOptions() {
|
|||||||
devMode?.let {
|
devMode?.let {
|
||||||
configOverrides += "devMode" to it
|
configOverrides += "devMode" to it
|
||||||
}
|
}
|
||||||
if (safeMode) {
|
|
||||||
configOverrides += "smmStartMode" to StateMachineManager.StartMode.Safe.toString()
|
|
||||||
}
|
|
||||||
return try {
|
return try {
|
||||||
valid(ConfigHelper.loadConfig(baseDirectory, configFile, configOverrides = ConfigFactory.parseMap(configOverrides)))
|
valid(ConfigHelper.loadConfig(baseDirectory, configFile, configOverrides = ConfigFactory.parseMap(configOverrides)))
|
||||||
} catch (e: ConfigException) {
|
} catch (e: ConfigException) {
|
||||||
|
@ -541,7 +541,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
|||||||
tokenizableServices = null
|
tokenizableServices = null
|
||||||
|
|
||||||
verifyCheckpointsCompatible(frozenTokenizableServices)
|
verifyCheckpointsCompatible(frozenTokenizableServices)
|
||||||
val smmStartedFuture = smm.start(frozenTokenizableServices, configuration.smmStartMode)
|
val smmStartedFuture = smm.start(frozenTokenizableServices)
|
||||||
// Shut down the SMM so no Fibers are scheduled.
|
// Shut down the SMM so no Fibers are scheduled.
|
||||||
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
|
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
|
||||||
val flowMonitor = FlowMonitor(
|
val flowMonitor = FlowMonitor(
|
||||||
|
@ -169,8 +169,6 @@ internal class CordaRPCOpsImpl(
|
|||||||
|
|
||||||
override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id)
|
override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id)
|
||||||
|
|
||||||
override fun unPauseFlow(id: StateMachineRunId): Boolean = smm.unPauseFlow(id)
|
|
||||||
|
|
||||||
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
|
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
|
||||||
|
|
||||||
val (allStateMachines, changes) = smm.track()
|
val (allStateMachines, changes) = smm.track()
|
||||||
|
@ -11,7 +11,6 @@ import net.corda.core.internal.notary.NotaryServiceFlow
|
|||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.services.config.rpc.NodeRpcOptions
|
import net.corda.node.services.config.rpc.NodeRpcOptions
|
||||||
import net.corda.node.services.config.schema.v1.V1NodeConfigurationSpec
|
import net.corda.node.services.config.schema.v1.V1NodeConfigurationSpec
|
||||||
import net.corda.node.services.statemachine.StateMachineManager
|
|
||||||
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
|
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
|
||||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||||
import net.corda.nodeapi.internal.config.User
|
import net.corda.nodeapi.internal.config.User
|
||||||
@ -94,8 +93,6 @@ interface NodeConfiguration : ConfigurationWithOptionsContainer {
|
|||||||
|
|
||||||
val quasarExcludePackages: List<String>
|
val quasarExcludePackages: List<String>
|
||||||
|
|
||||||
val smmStartMode: StateMachineManager.StartMode
|
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
// default to at least 8MB and a bit extra for larger heap sizes
|
// default to at least 8MB and a bit extra for larger heap sizes
|
||||||
val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory()
|
val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory()
|
||||||
|
@ -8,7 +8,6 @@ import net.corda.core.utilities.NetworkHostAndPort
|
|||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.node.services.config.rpc.NodeRpcOptions
|
import net.corda.node.services.config.rpc.NodeRpcOptions
|
||||||
import net.corda.node.services.statemachine.StateMachineManager
|
|
||||||
import net.corda.nodeapi.BrokerRpcSslOptions
|
import net.corda.nodeapi.BrokerRpcSslOptions
|
||||||
import net.corda.nodeapi.internal.DEV_PUB_KEY_HASHES
|
import net.corda.nodeapi.internal.DEV_PUB_KEY_HASHES
|
||||||
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
|
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
|
||||||
@ -85,8 +84,7 @@ data class NodeConfigurationImpl(
|
|||||||
override val blacklistedAttachmentSigningKeys: List<String> = Defaults.blacklistedAttachmentSigningKeys,
|
override val blacklistedAttachmentSigningKeys: List<String> = Defaults.blacklistedAttachmentSigningKeys,
|
||||||
override val configurationWithOptions: ConfigurationWithOptions,
|
override val configurationWithOptions: ConfigurationWithOptions,
|
||||||
override val flowExternalOperationThreadPoolSize: Int = Defaults.flowExternalOperationThreadPoolSize,
|
override val flowExternalOperationThreadPoolSize: Int = Defaults.flowExternalOperationThreadPoolSize,
|
||||||
override val quasarExcludePackages: List<String> = Defaults.quasarExcludePackages,
|
override val quasarExcludePackages: List<String> = Defaults.quasarExcludePackages
|
||||||
override val smmStartMode : StateMachineManager.StartMode = Defaults.smmStartMode
|
|
||||||
) : NodeConfiguration {
|
) : NodeConfiguration {
|
||||||
internal object Defaults {
|
internal object Defaults {
|
||||||
val jmxMonitoringHttpPort: Int? = null
|
val jmxMonitoringHttpPort: Int? = null
|
||||||
@ -125,7 +123,6 @@ data class NodeConfigurationImpl(
|
|||||||
val blacklistedAttachmentSigningKeys: List<String> = emptyList()
|
val blacklistedAttachmentSigningKeys: List<String> = emptyList()
|
||||||
const val flowExternalOperationThreadPoolSize: Int = 1
|
const val flowExternalOperationThreadPoolSize: Int = 1
|
||||||
val quasarExcludePackages: List<String> = emptyList()
|
val quasarExcludePackages: List<String> = emptyList()
|
||||||
val smmStartMode : StateMachineManager.StartMode = StateMachineManager.StartMode.ExcludingPaused
|
|
||||||
|
|
||||||
fun cordappsDirectories(baseDirectory: Path) = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT)
|
fun cordappsDirectories(baseDirectory: Path) = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT)
|
||||||
|
|
||||||
|
@ -9,7 +9,6 @@ import net.corda.common.validation.internal.Validated.Companion.valid
|
|||||||
import net.corda.node.services.config.*
|
import net.corda.node.services.config.*
|
||||||
import net.corda.node.services.config.NodeConfigurationImpl.Defaults
|
import net.corda.node.services.config.NodeConfigurationImpl.Defaults
|
||||||
import net.corda.node.services.config.schema.parsers.*
|
import net.corda.node.services.config.schema.parsers.*
|
||||||
import net.corda.node.services.statemachine.StateMachineManager
|
|
||||||
|
|
||||||
internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfiguration>("NodeConfiguration") {
|
internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfiguration>("NodeConfiguration") {
|
||||||
private val myLegalName by string().mapValid(::toCordaX500Name)
|
private val myLegalName by string().mapValid(::toCordaX500Name)
|
||||||
@ -67,7 +66,6 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
|
|||||||
.withDefaultValue(Defaults.networkParameterAcceptanceSettings)
|
.withDefaultValue(Defaults.networkParameterAcceptanceSettings)
|
||||||
private val flowExternalOperationThreadPoolSize by int().optional().withDefaultValue(Defaults.flowExternalOperationThreadPoolSize)
|
private val flowExternalOperationThreadPoolSize by int().optional().withDefaultValue(Defaults.flowExternalOperationThreadPoolSize)
|
||||||
private val quasarExcludePackages by string().list().optional().withDefaultValue(Defaults.quasarExcludePackages)
|
private val quasarExcludePackages by string().list().optional().withDefaultValue(Defaults.quasarExcludePackages)
|
||||||
private val smmStartMode by enum(StateMachineManager.StartMode::class).optional().withDefaultValue(Defaults.smmStartMode)
|
|
||||||
@Suppress("unused")
|
@Suppress("unused")
|
||||||
private val custom by nestedObject().optional()
|
private val custom by nestedObject().optional()
|
||||||
@Suppress("unused")
|
@Suppress("unused")
|
||||||
@ -135,8 +133,7 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
|
|||||||
networkParameterAcceptanceSettings = config[networkParameterAcceptanceSettings],
|
networkParameterAcceptanceSettings = config[networkParameterAcceptanceSettings],
|
||||||
configurationWithOptions = ConfigurationWithOptions(configuration, Configuration.Options.defaults),
|
configurationWithOptions = ConfigurationWithOptions(configuration, Configuration.Options.defaults),
|
||||||
flowExternalOperationThreadPoolSize = config[flowExternalOperationThreadPoolSize],
|
flowExternalOperationThreadPoolSize = config[flowExternalOperationThreadPoolSize],
|
||||||
quasarExcludePackages = config[quasarExcludePackages],
|
quasarExcludePackages = config[quasarExcludePackages]
|
||||||
smmStartMode = config[smmStartMode]
|
|
||||||
))
|
))
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
return when (e) {
|
return when (e) {
|
||||||
|
@ -305,18 +305,6 @@ class SingleThreadedStateMachineManager(
|
|||||||
return checkpointStorage.markAllPaused()
|
return checkpointStorage.markAllPaused()
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun unPauseFlow(id: StateMachineRunId): Boolean {
|
|
||||||
mutex.locked {
|
|
||||||
val pausedFlow = pausedFlows.remove(id) ?: return false
|
|
||||||
val flow = flowCreator.createFlowFromNonResidentFlow(pausedFlow) ?: return false
|
|
||||||
addAndStartFlow(flow.fiber.id, flow)
|
|
||||||
for (event in pausedFlow.externalEvents) {
|
|
||||||
flow.fiber.scheduleEvent(event)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun addSessionBinding(flowId: StateMachineRunId, sessionId: SessionId) {
|
override fun addSessionBinding(flowId: StateMachineRunId, sessionId: SessionId) {
|
||||||
val previousFlowId = sessionToFlow.put(sessionId, flowId)
|
val previousFlowId = sessionToFlow.put(sessionId, flowId)
|
||||||
if (previousFlowId != null) {
|
if (previousFlowId != null) {
|
||||||
|
@ -86,13 +86,6 @@ interface StateMachineManager {
|
|||||||
*/
|
*/
|
||||||
fun killFlow(id: StateMachineRunId): Boolean
|
fun killFlow(id: StateMachineRunId): Boolean
|
||||||
|
|
||||||
/**
|
|
||||||
* Start a paused flow.
|
|
||||||
*
|
|
||||||
* @return whether the flow was successfully started.
|
|
||||||
*/
|
|
||||||
fun unPauseFlow(id: StateMachineRunId): Boolean
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deliver an external event to the state machine. Such an event might be a new P2P message, or a request to start a flow.
|
* Deliver an external event to the state machine. Such an event might be a new P2P message, or a request to start a flow.
|
||||||
* The event may be replayed if a flow fails and attempts to retry.
|
* The event may be replayed if a flow fails and attempts to retry.
|
||||||
|
@ -32,7 +32,6 @@ class NodeStartupCliTest {
|
|||||||
Assertions.assertThat(startup.verbose).isEqualTo(false)
|
Assertions.assertThat(startup.verbose).isEqualTo(false)
|
||||||
Assertions.assertThat(startup.loggingLevel).isEqualTo(Level.INFO)
|
Assertions.assertThat(startup.loggingLevel).isEqualTo(Level.INFO)
|
||||||
Assertions.assertThat(startup.cmdLineOptions.noLocalShell).isEqualTo(false)
|
Assertions.assertThat(startup.cmdLineOptions.noLocalShell).isEqualTo(false)
|
||||||
Assertions.assertThat(startup.cmdLineOptions.safeMode).isEqualTo(false)
|
|
||||||
Assertions.assertThat(startup.cmdLineOptions.sshdServer).isEqualTo(false)
|
Assertions.assertThat(startup.cmdLineOptions.sshdServer).isEqualTo(false)
|
||||||
Assertions.assertThat(startup.cmdLineOptions.justGenerateNodeInfo).isEqualTo(false)
|
Assertions.assertThat(startup.cmdLineOptions.justGenerateNodeInfo).isEqualTo(false)
|
||||||
Assertions.assertThat(startup.cmdLineOptions.justGenerateRpcSslCerts).isEqualTo(false)
|
Assertions.assertThat(startup.cmdLineOptions.justGenerateRpcSslCerts).isEqualTo(false)
|
||||||
|
@ -1,77 +0,0 @@
|
|||||||
package net.corda.node.services.statemachine
|
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
|
||||||
import com.nhaarman.mockito_kotlin.doReturn
|
|
||||||
import com.nhaarman.mockito_kotlin.whenever
|
|
||||||
import net.corda.core.flows.FlowLogic
|
|
||||||
import net.corda.core.internal.FlowStateMachine
|
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
|
||||||
import net.corda.testing.core.ALICE_NAME
|
|
||||||
import net.corda.testing.core.BOB_NAME
|
|
||||||
import net.corda.testing.node.internal.InternalMockNetwork
|
|
||||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
|
||||||
import net.corda.testing.node.internal.TestStartedNode
|
|
||||||
import net.corda.testing.node.internal.startFlow
|
|
||||||
import org.junit.After
|
|
||||||
import org.junit.Before
|
|
||||||
import org.junit.Test
|
|
||||||
import java.time.Duration
|
|
||||||
import kotlin.test.assertEquals
|
|
||||||
|
|
||||||
class FlowPausingTests {
|
|
||||||
|
|
||||||
companion object {
|
|
||||||
const val NUMBER_OF_FLOWS = 4
|
|
||||||
const val SLEEP_TIME = 1000L
|
|
||||||
}
|
|
||||||
|
|
||||||
private lateinit var mockNet: InternalMockNetwork
|
|
||||||
private lateinit var aliceNode: TestStartedNode
|
|
||||||
private lateinit var bobNode: TestStartedNode
|
|
||||||
|
|
||||||
@Before
|
|
||||||
fun setUpMockNet() {
|
|
||||||
mockNet = InternalMockNetwork()
|
|
||||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
|
||||||
bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
fun cleanUp() {
|
|
||||||
mockNet.stopNodes()
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun restartNode(node: TestStartedNode, smmStartMode: StateMachineManager.StartMode) : TestStartedNode {
|
|
||||||
val parameters = InternalMockNodeParameters(configOverrides = {
|
|
||||||
conf: NodeConfiguration ->
|
|
||||||
doReturn(smmStartMode).whenever(conf).smmStartMode
|
|
||||||
})
|
|
||||||
return mockNet.restartNode(node, parameters = parameters)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 300_000)
|
|
||||||
fun `All are paused when the node is restarted in safe start mode`() {
|
|
||||||
val flows = ArrayList<FlowStateMachine<Unit>>()
|
|
||||||
for (i in 1..NUMBER_OF_FLOWS) {
|
|
||||||
flows += aliceNode.services.startFlow(CheckpointingFlow())
|
|
||||||
}
|
|
||||||
//All of the flows must not resume before the node restarts.
|
|
||||||
val restartedAlice = restartNode(aliceNode, StateMachineManager.StartMode.Safe)
|
|
||||||
assertEquals(0, restartedAlice.smm.snapshot().size)
|
|
||||||
//We need to wait long enough here so any running flows would finish.
|
|
||||||
Thread.sleep(NUMBER_OF_FLOWS * SLEEP_TIME)
|
|
||||||
restartedAlice.database.transaction {
|
|
||||||
for (flow in flows) {
|
|
||||||
val checkpoint = restartedAlice.internals.checkpointStorage.getCheckpoint(flow.id)
|
|
||||||
assertEquals(Checkpoint.FlowStatus.PAUSED, checkpoint!!.status)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
internal class CheckpointingFlow: FlowLogic<Unit>() {
|
|
||||||
@Suspendable
|
|
||||||
override fun call() {
|
|
||||||
sleep(Duration.ofMillis(SLEEP_TIME))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -638,7 +638,6 @@ private fun mockNodeConfiguration(certificatesDirectory: Path): NodeConfiguratio
|
|||||||
doReturn(NetworkParameterAcceptanceSettings()).whenever(it).networkParameterAcceptanceSettings
|
doReturn(NetworkParameterAcceptanceSettings()).whenever(it).networkParameterAcceptanceSettings
|
||||||
doReturn(rigorousMock<ConfigurationWithOptions>()).whenever(it).configurationWithOptions
|
doReturn(rigorousMock<ConfigurationWithOptions>()).whenever(it).configurationWithOptions
|
||||||
doReturn(2).whenever(it).flowExternalOperationThreadPoolSize
|
doReturn(2).whenever(it).flowExternalOperationThreadPoolSize
|
||||||
doReturn(StateMachineManager.StartMode.ExcludingPaused).whenever(it).smmStartMode
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user