mirror of
https://github.com/corda/corda.git
synced 2024-12-19 04:57:58 +00:00
Merge branch 'release/os/4.6' into christians/update-feat-20200502
This commit is contained in:
commit
98f62f60f1
@ -1,6 +1,5 @@
|
||||
package net.corda.core.internal.messaging
|
||||
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.internal.AttachmentTrustInfo
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
|
||||
@ -14,11 +13,4 @@ interface InternalCordaRPCOps : CordaRPCOps {
|
||||
|
||||
/** Get all attachment trust information */
|
||||
val attachmentTrustInfos: List<AttachmentTrustInfo>
|
||||
|
||||
/**
|
||||
* Resume a paused flow.
|
||||
*
|
||||
* @return whether the flow was successfully resumed.
|
||||
*/
|
||||
fun unPauseFlow(id: StateMachineRunId): Boolean
|
||||
}
|
@ -1,113 +0,0 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||
import net.corda.core.messaging.CordaRPCOps
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.NodeParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import org.junit.Test
|
||||
import java.time.Duration
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertNotNull
|
||||
|
||||
class FlowPausingTest {
|
||||
|
||||
companion object {
|
||||
val TOTAL_MESSAGES = 100
|
||||
val SLEEP_BETWEEN_MESSAGES_MS = 10L
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `Paused flows can recieve session messages`() {
|
||||
val rpcUser = User("demo", "demo", setOf(Permissions.startFlow<HardRestartTest.Ping>(), Permissions.all()))
|
||||
driver(DriverParameters(startNodesInProcess = true, inMemoryDB = false)) {
|
||||
val alice = startNode(NodeParameters(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser))).getOrThrow()
|
||||
val bob = startNode(NodeParameters(providedName = BOB_NAME, rpcUsers = listOf(rpcUser)))
|
||||
val startedBob = bob.getOrThrow()
|
||||
val aliceFlow = alice.rpc.startFlow(::HeartbeatFlow, startedBob.nodeInfo.legalIdentities[0])
|
||||
// We wait here for the initiated flow to start running on bob
|
||||
val initiatedFlowId = startedBob.rpc.waitForFlowToStart(150)
|
||||
assertNotNull(initiatedFlowId)
|
||||
/* We shut down bob, we want this to happen before bob has finished receiving all of the heartbeats.
|
||||
This is a Race but if bob finishes too quickly then we will fail to unpause the initiated flow running on BOB latter
|
||||
and this test will fail.*/
|
||||
startedBob.stop()
|
||||
//Start bob backup in Safe mode. This means no flows will run but BOB should receive messages and queue these up.
|
||||
val restartedBob = startNode(NodeParameters(
|
||||
providedName = BOB_NAME,
|
||||
rpcUsers = listOf(rpcUser),
|
||||
customOverrides = mapOf("smmStartMode" to "Safe"))).getOrThrow()
|
||||
|
||||
//Sleep for long enough so BOB has time to receive all the messages.
|
||||
//All messages in this period should be queued up and replayed when the flow is unpaused.
|
||||
Thread.sleep(TOTAL_MESSAGES * SLEEP_BETWEEN_MESSAGES_MS)
|
||||
//ALICE should not have finished yet as the HeartbeatResponderFlow should not have sent the final message back (as it is paused).
|
||||
assertEquals(false, aliceFlow.returnValue.isDone)
|
||||
assertEquals(true, (restartedBob.rpc as InternalCordaRPCOps).unPauseFlow(initiatedFlowId!!))
|
||||
|
||||
assertEquals(true, aliceFlow.returnValue.getOrThrow())
|
||||
alice.stop()
|
||||
restartedBob.stop()
|
||||
}
|
||||
}
|
||||
|
||||
fun CordaRPCOps.waitForFlowToStart(maxTrys: Int): StateMachineRunId? {
|
||||
for (i in 1..maxTrys) {
|
||||
val snapshot = this.stateMachinesSnapshot().singleOrNull()
|
||||
if (snapshot == null) {
|
||||
Thread.sleep(SLEEP_BETWEEN_MESSAGES_MS)
|
||||
} else {
|
||||
return snapshot.id
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class HeartbeatFlow(private val otherParty: Party): FlowLogic<Boolean>() {
|
||||
var sequenceNumber = 0
|
||||
@Suspendable
|
||||
override fun call(): Boolean {
|
||||
val session = initiateFlow(otherParty)
|
||||
for (i in 1..TOTAL_MESSAGES) {
|
||||
session.send(sequenceNumber++)
|
||||
sleep(Duration.ofMillis(10))
|
||||
}
|
||||
val success = session.receive<Boolean>().unwrap{data -> data}
|
||||
return success
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(HeartbeatFlow::class)
|
||||
class HeartbeatResponderFlow(val session: FlowSession): FlowLogic<Unit>() {
|
||||
var sequenceNumber : Int = 0
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
var pass = true
|
||||
for (i in 1..TOTAL_MESSAGES) {
|
||||
val receivedSequenceNumber = session.receive<Int>().unwrap{data -> data}
|
||||
if (receivedSequenceNumber != sequenceNumber) {
|
||||
pass = false
|
||||
}
|
||||
sequenceNumber++
|
||||
}
|
||||
session.send(pass)
|
||||
}
|
||||
}
|
||||
}
|
@ -14,7 +14,6 @@ import net.corda.node.services.config.ConfigHelper
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.node.services.config.Valid
|
||||
import net.corda.node.services.config.parseAsNodeConfiguration
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.nodeapi.internal.config.UnknownConfigKeysPolicy
|
||||
import picocli.CommandLine.Option
|
||||
import java.nio.file.Path
|
||||
@ -201,9 +200,6 @@ open class NodeCmdLineOptions : SharedNodeCmdLineOptions() {
|
||||
devMode?.let {
|
||||
configOverrides += "devMode" to it
|
||||
}
|
||||
if (safeMode) {
|
||||
configOverrides += "smmStartMode" to StateMachineManager.StartMode.Safe.toString()
|
||||
}
|
||||
return try {
|
||||
valid(ConfigHelper.loadConfig(baseDirectory, configFile, configOverrides = ConfigFactory.parseMap(configOverrides)))
|
||||
} catch (e: ConfigException) {
|
||||
|
@ -581,7 +581,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
|
||||
tokenizableServices = null
|
||||
|
||||
verifyCheckpointsCompatible(frozenTokenizableServices)
|
||||
val smmStartedFuture = smm.start(frozenTokenizableServices, configuration.smmStartMode)
|
||||
val smmStartedFuture = smm.start(frozenTokenizableServices)
|
||||
// Shut down the SMM so no Fibers are scheduled.
|
||||
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
|
||||
val flowMonitor = FlowMonitor(
|
||||
|
@ -169,8 +169,6 @@ internal class CordaRPCOpsImpl(
|
||||
|
||||
override fun killFlow(id: StateMachineRunId): Boolean = smm.killFlow(id)
|
||||
|
||||
override fun unPauseFlow(id: StateMachineRunId): Boolean = smm.unPauseFlow(id)
|
||||
|
||||
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
|
||||
|
||||
val (allStateMachines, changes) = smm.track()
|
||||
|
@ -11,7 +11,6 @@ import net.corda.core.internal.notary.NotaryServiceFlow
|
||||
import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.node.services.config.rpc.NodeRpcOptions
|
||||
import net.corda.node.services.config.schema.v1.V1NodeConfigurationSpec
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
|
||||
import net.corda.nodeapi.internal.config.MutualSslConfiguration
|
||||
import net.corda.nodeapi.internal.config.User
|
||||
@ -94,8 +93,6 @@ interface NodeConfiguration : ConfigurationWithOptionsContainer {
|
||||
|
||||
val quasarExcludePackages: List<String>
|
||||
|
||||
val smmStartMode: StateMachineManager.StartMode
|
||||
|
||||
companion object {
|
||||
// default to at least 8MB and a bit extra for larger heap sizes
|
||||
val defaultTransactionCacheSize: Long = 8.MB + getAdditionalCacheMemory()
|
||||
|
@ -8,7 +8,6 @@ import net.corda.core.utilities.NetworkHostAndPort
|
||||
import net.corda.core.utilities.loggerFor
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.config.rpc.NodeRpcOptions
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
import net.corda.nodeapi.BrokerRpcSslOptions
|
||||
import net.corda.nodeapi.internal.DEV_PUB_KEY_HASHES
|
||||
import net.corda.nodeapi.internal.config.FileBasedCertificateStoreSupplier
|
||||
@ -84,8 +83,7 @@ data class NodeConfigurationImpl(
|
||||
override val blacklistedAttachmentSigningKeys: List<String> = Defaults.blacklistedAttachmentSigningKeys,
|
||||
override val configurationWithOptions: ConfigurationWithOptions,
|
||||
override val flowExternalOperationThreadPoolSize: Int = Defaults.flowExternalOperationThreadPoolSize,
|
||||
override val quasarExcludePackages: List<String> = Defaults.quasarExcludePackages,
|
||||
override val smmStartMode : StateMachineManager.StartMode = Defaults.smmStartMode
|
||||
override val quasarExcludePackages: List<String> = Defaults.quasarExcludePackages
|
||||
) : NodeConfiguration {
|
||||
internal object Defaults {
|
||||
val jmxMonitoringHttpPort: Int? = null
|
||||
@ -124,7 +122,6 @@ data class NodeConfigurationImpl(
|
||||
val blacklistedAttachmentSigningKeys: List<String> = emptyList()
|
||||
const val flowExternalOperationThreadPoolSize: Int = 1
|
||||
val quasarExcludePackages: List<String> = emptyList()
|
||||
val smmStartMode : StateMachineManager.StartMode = StateMachineManager.StartMode.ExcludingPaused
|
||||
|
||||
fun cordappsDirectories(baseDirectory: Path) = listOf(baseDirectory / CORDAPPS_DIR_NAME_DEFAULT)
|
||||
|
||||
|
@ -9,7 +9,6 @@ import net.corda.common.validation.internal.Validated.Companion.valid
|
||||
import net.corda.node.services.config.*
|
||||
import net.corda.node.services.config.NodeConfigurationImpl.Defaults
|
||||
import net.corda.node.services.config.schema.parsers.*
|
||||
import net.corda.node.services.statemachine.StateMachineManager
|
||||
|
||||
internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfiguration>("NodeConfiguration") {
|
||||
private val myLegalName by string().mapValid(::toCordaX500Name)
|
||||
@ -67,7 +66,6 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
|
||||
.withDefaultValue(Defaults.networkParameterAcceptanceSettings)
|
||||
private val flowExternalOperationThreadPoolSize by int().optional().withDefaultValue(Defaults.flowExternalOperationThreadPoolSize)
|
||||
private val quasarExcludePackages by string().list().optional().withDefaultValue(Defaults.quasarExcludePackages)
|
||||
private val smmStartMode by enum(StateMachineManager.StartMode::class).optional().withDefaultValue(Defaults.smmStartMode)
|
||||
@Suppress("unused")
|
||||
private val custom by nestedObject().optional()
|
||||
@Suppress("unused")
|
||||
@ -135,8 +133,7 @@ internal object V1NodeConfigurationSpec : Configuration.Specification<NodeConfig
|
||||
networkParameterAcceptanceSettings = config[networkParameterAcceptanceSettings],
|
||||
configurationWithOptions = ConfigurationWithOptions(configuration, Configuration.Options.defaults),
|
||||
flowExternalOperationThreadPoolSize = config[flowExternalOperationThreadPoolSize],
|
||||
quasarExcludePackages = config[quasarExcludePackages],
|
||||
smmStartMode = config[smmStartMode]
|
||||
quasarExcludePackages = config[quasarExcludePackages]
|
||||
))
|
||||
} catch (e: Exception) {
|
||||
return when (e) {
|
||||
|
@ -206,6 +206,7 @@ class DBCheckpointStorage(
|
||||
@Column(name = "started_type", nullable = false)
|
||||
var startType: StartReason,
|
||||
|
||||
@Type(type = "corda-blob")
|
||||
@Column(name = "flow_parameters", nullable = false)
|
||||
var initialParameters: ByteArray = EMPTY_BYTE_ARRAY,
|
||||
|
||||
|
@ -305,18 +305,6 @@ class SingleThreadedStateMachineManager(
|
||||
return checkpointStorage.markAllPaused()
|
||||
}
|
||||
|
||||
override fun unPauseFlow(id: StateMachineRunId): Boolean {
|
||||
mutex.locked {
|
||||
val pausedFlow = pausedFlows.remove(id) ?: return false
|
||||
val flow = flowCreator.createFlowFromNonResidentFlow(pausedFlow) ?: return false
|
||||
addAndStartFlow(flow.fiber.id, flow)
|
||||
for (event in pausedFlow.externalEvents) {
|
||||
flow.fiber.scheduleEvent(event)
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
override fun addSessionBinding(flowId: StateMachineRunId, sessionId: SessionId) {
|
||||
val previousFlowId = sessionToFlow.put(sessionId, flowId)
|
||||
if (previousFlowId != null) {
|
||||
|
@ -86,13 +86,6 @@ interface StateMachineManager {
|
||||
*/
|
||||
fun killFlow(id: StateMachineRunId): Boolean
|
||||
|
||||
/**
|
||||
* Start a paused flow.
|
||||
*
|
||||
* @return whether the flow was successfully started.
|
||||
*/
|
||||
fun unPauseFlow(id: StateMachineRunId): Boolean
|
||||
|
||||
/**
|
||||
* Deliver an external event to the state machine. Such an event might be a new P2P message, or a request to start a flow.
|
||||
* The event may be replayed if a flow fails and attempts to retry.
|
||||
|
@ -32,7 +32,6 @@ class NodeStartupCliTest {
|
||||
Assertions.assertThat(startup.verbose).isEqualTo(false)
|
||||
Assertions.assertThat(startup.loggingLevel).isEqualTo(Level.INFO)
|
||||
Assertions.assertThat(startup.cmdLineOptions.noLocalShell).isEqualTo(false)
|
||||
Assertions.assertThat(startup.cmdLineOptions.safeMode).isEqualTo(false)
|
||||
Assertions.assertThat(startup.cmdLineOptions.sshdServer).isEqualTo(false)
|
||||
Assertions.assertThat(startup.cmdLineOptions.justGenerateNodeInfo).isEqualTo(false)
|
||||
Assertions.assertThat(startup.cmdLineOptions.justGenerateRpcSslCerts).isEqualTo(false)
|
||||
|
@ -1,77 +0,0 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.nhaarman.mockito_kotlin.doReturn
|
||||
import com.nhaarman.mockito_kotlin.whenever
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.internal.FlowStateMachine
|
||||
import net.corda.node.services.config.NodeConfiguration
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.TestStartedNode
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import java.time.Duration
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class FlowPausingTests {
|
||||
|
||||
companion object {
|
||||
const val NUMBER_OF_FLOWS = 4
|
||||
const val SLEEP_TIME = 1000L
|
||||
}
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var aliceNode: TestStartedNode
|
||||
private lateinit var bobNode: TestStartedNode
|
||||
|
||||
@Before
|
||||
fun setUpMockNet() {
|
||||
mockNet = InternalMockNetwork()
|
||||
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME))
|
||||
bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME))
|
||||
}
|
||||
|
||||
@After
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
|
||||
private fun restartNode(node: TestStartedNode, smmStartMode: StateMachineManager.StartMode) : TestStartedNode {
|
||||
val parameters = InternalMockNodeParameters(configOverrides = {
|
||||
conf: NodeConfiguration ->
|
||||
doReturn(smmStartMode).whenever(conf).smmStartMode
|
||||
})
|
||||
return mockNet.restartNode(node, parameters = parameters)
|
||||
}
|
||||
|
||||
@Test(timeout = 300_000)
|
||||
fun `All are paused when the node is restarted in safe start mode`() {
|
||||
val flows = ArrayList<FlowStateMachine<Unit>>()
|
||||
for (i in 1..NUMBER_OF_FLOWS) {
|
||||
flows += aliceNode.services.startFlow(CheckpointingFlow())
|
||||
}
|
||||
//All of the flows must not resume before the node restarts.
|
||||
val restartedAlice = restartNode(aliceNode, StateMachineManager.StartMode.Safe)
|
||||
assertEquals(0, restartedAlice.smm.snapshot().size)
|
||||
//We need to wait long enough here so any running flows would finish.
|
||||
Thread.sleep(NUMBER_OF_FLOWS * SLEEP_TIME)
|
||||
restartedAlice.database.transaction {
|
||||
for (flow in flows) {
|
||||
val checkpoint = restartedAlice.internals.checkpointStorage.getCheckpoint(flow.id)
|
||||
assertEquals(Checkpoint.FlowStatus.PAUSED, checkpoint!!.status)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal class CheckpointingFlow: FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
sleep(Duration.ofMillis(SLEEP_TIME))
|
||||
}
|
||||
}
|
||||
}
|
@ -641,7 +641,6 @@ private fun mockNodeConfiguration(certificatesDirectory: Path): NodeConfiguratio
|
||||
doReturn(NetworkParameterAcceptanceSettings()).whenever(it).networkParameterAcceptanceSettings
|
||||
doReturn(rigorousMock<ConfigurationWithOptions>()).whenever(it).configurationWithOptions
|
||||
doReturn(2).whenever(it).flowExternalOperationThreadPoolSize
|
||||
doReturn(StateMachineManager.StartMode.ExcludingPaused).whenever(it).smmStartMode
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user