CORDA-3291 isKilled flag and session errors for killed flows (#6170)

* CORDA-3291 `isKilled` flag and session errors for killed flows

## Summary

Two major improvements have been worked on:

- A new flag named `isKilled` has been added to `FlowLogic` to allow
developers to break out of loops without suspension points.
- Killed flows now send session errors to their counter parties allowing
their flows to also terminate without further coordination.

Achieving these changes required a __fundamental__ change to how flows are
killed as well as how they sleep.

## `isKilled` flag

The addition of `FlowLogic.isKilled` allows flows to check if the
current flow has been killed. They can then throw an exception to lead
to the flow's termination (following the standard error pathway). They
can also perform some extra logic or not throw an exception if they
really wanted to.

No matter what, once the flag is set, the flow will terminate. Due to
timing, a killed flow might successfully process its next suspension
event, but it will then process a killed transition and terminate.

## Send session errors when killing a flow

A flow will now send session errors to all of its counter parties. They
are transferred as `UnexpectedFlowEndException`s. This allows initiated
flows to handle these errors as they see fit, although they should
probably just terminate.

## How flows are killed

### Before

Originally we were relying on Quasar to interrupt a flow's fiber, we
could then handle the resulting `InterruptedException`. The problem with
this solution is that it only worked when a flow was already suspended
or when a flow moved into suspension. Flows stuck in loops did not work.

### After

We now *do not* use Quasar to interrupt a flow's fiber. Instead, we
switch `FlowStateMachine.isKilled` to true and schedule a new event.
Any event that is processed after switching this flag will now cause a
`KilledFlowTransition`. This transition follows similar logic to how
error propagation works. Note, the extra event allows a suspended flow
to be killed without waiting for the event that it was _really_ waiting
for.

This allows a lot of the tidy up code in `StateMachineManager.killFlow`
to be removed as tidy up is executed as part of removing a flow.
Deleting a flow's checkpoint and releasing related soft locks is still
handled manually in case of infinite loops but also triggered as part
of the actions executed in a transition.

This required flow sleeping to be changed as we no longer rely on
quasar.

## How flows now sleep

The reliance on Quasar to make a flow sleep has been removed.

Instead, when a flow sleeps we create a `ScheduledFuture` that is
delayed for the requested sleep duration. When the future executes it
schedules a `WakeUpFromSleep` event that wakes up the flow... Duh.

`FlowSleepScheduler` handles the future logic. It also uses the same
scheduled thread pool that timed flows uses.

A future field was added to `StateMachineState`. This removes the 
need for concurrency control around flow sleeps as the code path does
not need to touch any concurrent data structures.

To achieve this:

- `StateMachineState.future` added as a `var`
- When the `ScheduledFuture` is created to wake up the flow the passed
in `StateMachineState` has its `future` value changed
- When resumed `future` and `isWaitingForFuture` are set to `null` and
`false` respectively
- When cancelling a sleeping flow, the `future` is cancelled and nulled
out. `isWaitingForFuture` is not changed since the flow is ending anyway
so really the value of the field is not important.
This commit is contained in:
Dan Newton 2020-04-28 15:53:44 +01:00 committed by GitHub
parent 1c3ec2eb18
commit 297e504740
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1632 additions and 218 deletions

View File

@ -0,0 +1,364 @@
package net.corda.coretests.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before
import org.junit.Test
import java.util.concurrent.Semaphore
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class FlowIsKilledTest {
private companion object {
const val EXCEPTION_MESSAGE = "Goodbye, cruel world!"
}
@Before
fun setup() {
Configurator.setLevel("net.corda.node.services.statemachine", Level.DEBUG)
}
@Test(timeout = 300_000)
fun `manually handle the isKilled check`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(::AFlowThatWantsToDie)
AFlowThatWantsToDie.lockA.acquire()
rpc.killFlow(handle.id)
AFlowThatWantsToDie.lockB.release()
assertThatExceptionOfType(KilledFlowException::class.java)
.isThrownBy { handle.returnValue.getOrThrow(1.minutes) }
.withMessage(EXCEPTION_MESSAGE)
assertEquals(11, AFlowThatWantsToDie.position)
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@Test(timeout = 300_000)
fun `manually handled killed flows propagate error to counter parties`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val charlie = startNode(providedName = CHARLIE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(
::AFlowThatWantsToDieAndKillsItsFriends,
listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity())
)
AFlowThatWantsToDieAndKillsItsFriends.lockA.acquire()
AFlowThatWantsToDieAndKillsItsFriendsResponder.locks.forEach { it.value.acquire() }
rpc.killFlow(handle.id)
AFlowThatWantsToDieAndKillsItsFriends.lockB.release()
assertThatExceptionOfType(KilledFlowException::class.java)
.isThrownBy { handle.returnValue.getOrThrow(1.minutes) }
.withMessage(EXCEPTION_MESSAGE)
AFlowThatWantsToDieAndKillsItsFriendsResponder.locks.forEach { it.value.acquire() }
assertEquals(11, AFlowThatWantsToDieAndKillsItsFriends.position)
assertTrue(AFlowThatWantsToDieAndKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatWantsToDieAndKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
val aliceCheckpoints = alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
}
}
}
@Test(timeout = 300_000)
fun `a manually killed initiated flow will propagate the killed error to the initiator and its counter parties`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val handle = alice.rpc.startFlow(
::AFlowThatGetsMurderedByItsFriend,
bob.nodeInfo.singleIdentity()
)
AFlowThatGetsMurderedByItsFriendResponder.lockA.acquire()
val initiatedFlowId = AFlowThatGetsMurderedByItsFriendResponder.flowId!!
bob.rpc.killFlow(initiatedFlowId)
AFlowThatGetsMurderedByItsFriendResponder.lockB.release()
assertFailsWith<UnexpectedFlowEndException> {
handle.returnValue.getOrThrow(1.minutes)
}
assertTrue(AFlowThatGetsMurderedByItsFriend.receivedKilledException)
assertEquals(11, AFlowThatGetsMurderedByItsFriendResponder.position)
val aliceCheckpoints = alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
}
}
@Test(timeout = 300_000)
fun `manually handle killed flows using checkForIsNotKilled`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(::AFlowThatChecksIfItWantsToDie)
AFlowThatChecksIfItWantsToDie.lockA.acquire()
rpc.killFlow(handle.id)
AFlowThatChecksIfItWantsToDie.lockB.release()
assertThatExceptionOfType(KilledFlowException::class.java)
.isThrownBy { handle.returnValue.getOrThrow(1.minutes) }
.withMessageNotContaining(EXCEPTION_MESSAGE)
assertEquals(11, AFlowThatChecksIfItWantsToDie.position)
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@Test(timeout = 300_000)
fun `manually handle killed flows using checkForIsNotKilled with lazy message`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(::AFlowThatChecksIfItWantsToDieAndLeavesANote)
AFlowThatChecksIfItWantsToDieAndLeavesANote.lockA.acquire()
rpc.killFlow(handle.id)
AFlowThatChecksIfItWantsToDieAndLeavesANote.lockB.release()
assertThatExceptionOfType(KilledFlowException::class.java)
.isThrownBy { handle.returnValue.getOrThrow(1.minutes) }
.withMessage(EXCEPTION_MESSAGE)
assertEquals(11, AFlowThatChecksIfItWantsToDie.position)
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@StartableByRPC
class AFlowThatWantsToDie : FlowLogic<Unit>() {
companion object {
val lockA = Semaphore(0)
val lockB = Semaphore(0)
var position = 0
}
@Suspendable
override fun call() {
for (i in 0..100) {
position = i
logger.info("i = $i")
if (isKilled) {
throw KilledFlowException(runId, EXCEPTION_MESSAGE)
}
if (i == 10) {
lockA.release()
lockB.acquire()
}
}
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatWantsToDieAndKillsItsFriends(private val parties: List<Party>) : FlowLogic<Unit>() {
companion object {
val lockA = Semaphore(0)
val lockB = Semaphore(0)
var position = 0
}
@Suspendable
override fun call() {
val sessionOne = initiateFlow(parties[0])
val sessionTwo = initiateFlow(parties[1])
// trigger sessions with 2 counter parties
sessionOne.sendAndReceive<String>("what is up")
sessionOne.send("what is up 2")
sessionTwo.sendAndReceive<String>("what is up")
sessionTwo.send("what is up 2")
for (i in 0..100) {
position = i
logger.info("i = $i")
if (isKilled) {
throw KilledFlowException(runId, EXCEPTION_MESSAGE)
}
if (i == 10) {
lockA.release()
lockB.acquire()
}
}
}
}
@InitiatedBy(AFlowThatWantsToDieAndKillsItsFriends::class)
class AFlowThatWantsToDieAndKillsItsFriendsResponder(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
val locks = mapOf(
BOB_NAME to Semaphore(0),
CHARLIE_NAME to Semaphore(0)
)
var receivedKilledExceptions = mutableMapOf(
BOB_NAME to false,
CHARLIE_NAME to false
)
}
@Suspendable
override fun call() {
session.receive<String>()
session.send("hi")
session.receive<String>()
locks[ourIdentity.name]!!.release()
try {
session.receive<String>()
} catch (e: UnexpectedFlowEndException) {
receivedKilledExceptions[ourIdentity.name] = true
locks[ourIdentity.name]!!.release()
throw e
}
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedByItsFriend(private val party: Party) : FlowLogic<Unit>() {
companion object {
var receivedKilledException = false
}
@Suspendable
override fun call() {
val sessionOne = initiateFlow(party)
// trigger sessions with 2 counter parties
sessionOne.sendAndReceive<String>("what is up")
try {
sessionOne.receive<String>()
} catch (e: UnexpectedFlowEndException) {
receivedKilledException = true
throw e
}
}
}
@InitiatedBy(AFlowThatGetsMurderedByItsFriend::class)
class AFlowThatGetsMurderedByItsFriendResponder(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
val lockA = Semaphore(0)
val lockB = Semaphore(0)
var flowId: StateMachineRunId? = null
var position = 0
}
@Suspendable
override fun call() {
flowId = runId
session.receive<String>()
session.send("hi")
for (i in 0..100) {
position = i
if (isKilled) {
throw KilledFlowException(runId, EXCEPTION_MESSAGE)
}
if (i == 10) {
lockA.release()
lockB.acquire()
}
}
}
}
@StartableByRPC
class AFlowThatChecksIfItWantsToDie : FlowLogic<Unit>() {
companion object {
val lockA = Semaphore(0)
val lockB = Semaphore(0)
var position = 0
}
@Suspendable
override fun call() {
for (i in 0..100) {
position = i
logger.info("i = $i")
checkFlowIsNotKilled()
if (i == 10) {
lockA.release()
lockB.acquire()
}
}
}
}
@StartableByRPC
class AFlowThatChecksIfItWantsToDieAndLeavesANote : FlowLogic<Unit>() {
companion object {
val lockA = Semaphore(0)
val lockB = Semaphore(0)
var position = 0
}
@Suspendable
override fun call() {
for (i in 0..100) {
position = i
logger.info("i = $i")
checkFlowIsNotKilled { EXCEPTION_MESSAGE }
if (i == 10) {
lockA.release()
lockB.acquire()
}
}
}
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
}
}
}

View File

@ -0,0 +1,146 @@
package net.corda.coretests.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator
import org.junit.Before
import org.junit.Test
import java.time.Duration
import java.time.Instant
import kotlin.test.assertTrue
class FlowSleepTest {
@Before
fun setup() {
Configurator.setLevel("net.corda.node.services.statemachine", Level.DEBUG)
}
@Test(timeout = 300_000)
fun `flow can sleep`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val (start, finish) = alice.rpc.startFlow(::SleepyFlow).returnValue.getOrThrow(1.minutes)
val difference = Duration.between(start, finish)
assertTrue(difference >= 5.seconds)
assertTrue(difference < 7.seconds)
}
}
@Test(timeout = 300_000)
fun `flow can sleep multiple times`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val (start, middle, finish) = alice.rpc.startFlow(::AnotherSleepyFlow).returnValue.getOrThrow(1.minutes)
val differenceBetweenStartAndMiddle = Duration.between(start, middle)
val differenceBetweenMiddleAndFinish = Duration.between(middle, finish)
assertTrue(differenceBetweenStartAndMiddle >= 5.seconds)
assertTrue(differenceBetweenStartAndMiddle < 7.seconds)
assertTrue(differenceBetweenMiddleAndFinish >= 10.seconds)
assertTrue(differenceBetweenMiddleAndFinish < 12.seconds)
}
}
@Test(timeout = 300_000)
fun `flow can sleep and perform other suspending functions`() {
// ensures that events received while the flow is sleeping are not processed
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val (start, finish) = alice.rpc.startFlow(
::SleepAndInteractWithPartyFlow,
bob.nodeInfo.singleIdentity()
).returnValue.getOrThrow(1.minutes)
val difference = Duration.between(start, finish)
assertTrue(difference >= 5.seconds)
assertTrue(difference < 7.seconds)
}
}
@StartableByRPC
class SleepyFlow : FlowLogic<Pair<Instant, Instant>>() {
@Suspendable
override fun call(): Pair<Instant, Instant> {
val start = Instant.now()
sleep(5.seconds)
return start to Instant.now()
}
}
@StartableByRPC
class AnotherSleepyFlow : FlowLogic<Triple<Instant, Instant, Instant>>() {
@Suspendable
override fun call(): Triple<Instant, Instant, Instant> {
val start = Instant.now()
sleep(5.seconds)
val middle = Instant.now()
sleep(10.seconds)
return Triple(start, middle, Instant.now())
}
}
@StartableByRPC
@InitiatingFlow
class SleepAndInteractWithPartyFlow(private val party: Party) : FlowLogic<Pair<Instant, Instant>>() {
@Suspendable
override fun call(): Pair<Instant, Instant> {
subFlow(PingPongFlow(party))
val start = Instant.now()
sleep(5.seconds)
val finish = Instant.now()
val session = initiateFlow(party)
session.sendAndReceive<String>("hi")
session.sendAndReceive<String>("hi")
subFlow(PingPongFlow(party))
return start to finish
}
}
@InitiatedBy(SleepAndInteractWithPartyFlow::class)
class SleepAndInteractWithPartyResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
session.send("go away")
session.receive<String>().unwrap { it }
session.send("go away")
}
}
@InitiatingFlow
class PingPongFlow(val party: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(party)
session.sendAndReceive<String>("ping pong").unwrap { it }
}
}
@InitiatedBy(PingPongFlow::class)
class PingPongResponder(val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
session.send("I got you bro")
}
}
}

View File

@ -127,6 +127,32 @@ abstract class FlowLogic<out T> {
*/
val serviceHub: ServiceHub get() = stateMachine.serviceHub
/**
* Returns `true` when the current [FlowLogic] has been killed (has received a command to halt its progress and terminate).
*
* Check this property in long-running computation loops to exit a flow that has been killed:
* ```
* while (!isKilled) {
* // do some computation
* }
* ```
*
* Ideal usage would include throwing a [KilledFlowException] which will lead to the termination of the flow:
* ```
* for (item in list) {
* if (isKilled) {
* throw KilledFlowException(runId)
* }
* // do some computation
* }
* ```
*
* Note, once the [isKilled] flag is set to `true` the flow may terminate once it reaches the next API function marked with the
* @[Suspendable] annotation. Therefore, it is possible to write a flow that does not interact with the [isKilled] flag while still
* terminating correctly.
*/
val isKilled: Boolean get() = stateMachine.isKilled
/**
* Creates a communication session with [destination]. Subsequently you may send/receive using this session object. How the messaging
* is routed depends on the [Destination] type, including whether this call does any initial communication.
@ -570,6 +596,46 @@ abstract class FlowLogic<out T> {
val request = FlowIORequest.ExecuteAsyncOperation(flowAsyncOperation)
return stateMachine.suspend(request, false)
}
/**
* Helper function that throws a [KilledFlowException] if the current [FlowLogic] has been killed.
*
* Call this function in long-running computation loops to exit a flow that has been killed:
* ```
* for (item in list) {
* checkFlowIsNotKilled()
* // do some computation
* }
* ```
*
* See the [isKilled] property for more information.
*/
fun checkFlowIsNotKilled() {
if (isKilled) {
throw KilledFlowException(runId)
}
}
/**
* Helper function that throws a [KilledFlowException] if the current [FlowLogic] has been killed. The provided message is added to the
* thrown [KilledFlowException].
*
* Call this function in long-running computation loops to exit a flow that has been killed:
* ```
* for (item in list) {
* checkFlowIsNotKilled { "The flow $runId was killed while iterating through the list of items" }
* // do some computation
* }
* ```
*
* See the [isKilled] property for more information.
*/
fun checkFlowIsNotKilled(lazyMessage: () -> Any) {
if (isKilled) {
val message = lazyMessage()
throw KilledFlowException(runId, message.toString())
}
}
}
/**

View File

@ -0,0 +1,14 @@
package net.corda.core.flows
import net.corda.core.CordaRuntimeException
/**
* An exception that is thrown when a flow has been killed.
*
* This exception can be returned and thrown to RPC clients waiting for the result of a flow's future.
*
* It can also be used in conjunction with [FlowLogic.isKilled] to escape long-running computation loops when a flow has been killed.
*/
class KilledFlowException(val id: StateMachineRunId, message: String) : CordaRuntimeException(message) {
constructor(id: StateMachineRunId) : this(id, "The flow $id was killed")
}

View File

@ -47,4 +47,5 @@ interface FlowStateMachine<FLOWRETURN> {
val ourIdentity: Party
val ourSenderUUID: String?
val creationTime: Long
val isKilled: Boolean
}

View File

@ -3,6 +3,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
@ -84,7 +85,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(20.seconds) }
val output = getBytemanOutput(alice)
@ -170,7 +171,7 @@ class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
}
}
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(30.seconds) }
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(30.seconds) }
val output = getBytemanOutput(alice)

View File

@ -0,0 +1,600 @@
package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowExternalOperation
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.asset.Cash
import net.corda.finance.flows.CashIssueFlow
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.OutOfProcess
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.FINANCE_CORDAPPS
import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator
import org.assertj.core.api.Assertions
import org.junit.Before
import org.junit.Test
import java.time.Duration
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import kotlin.system.measureTimeMillis
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class KillFlowTest {
@Before
fun setup() {
Configurator.setLevel("net.corda.node.services.statemachine", Level.DEBUG)
}
@Test(timeout = 300_000)
fun `a killed flow will end when it reaches the next suspension point`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(::AFlowThatGetsMurderedWhenItTriesToSuspend)
AFlowThatGetsMurderedWhenItTriesToSuspend.lockA.acquire()
rpc.killFlow(handle.id)
AFlowThatGetsMurderedWhenItTriesToSuspend.lockB.release()
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@Test(timeout = 300_000)
fun `a killed flow will propagate the killed error to counter parties when it reaches the next suspension point`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val charlie = startNode(providedName = CHARLIE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(
::AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends,
listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity())
)
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends.lockA.acquire()
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { it.value.acquire() }
rpc.killFlow(handle.id)
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends.lockB.release()
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.locks.forEach { it.value.acquire() }
assertTrue(AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
val aliceCheckpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, charlieCheckpoints)
}
}
}
@Test(timeout = 300_000)
fun `killing a flow that is sleeping ends the flow immediately`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(::AFlowThatGetsMurdered)
Thread.sleep(5000)
val time = measureTimeMillis {
rpc.killFlow(handle.id)
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@Test(timeout = 300_000)
fun `killing a flow suspended in send + receive + sendAndReceive ends the flow immediately`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = false)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val bobParty = bob.nodeInfo.singleIdentity()
bob.stop()
val terminated = (bob as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS)
if (terminated) {
alice.rpc.run {
killFlowAndAssert(::AFlowThatGetsMurderedTryingToSendAMessage, bobParty)
killFlowAndAssert(::AFlowThatGetsMurderedTryingToReceiveAMessage, bobParty)
killFlowAndAssert(::AFlowThatGetsMurderedTryingToSendAndReceiveAMessage, bobParty)
}
} else {
throw IllegalStateException("The node should have terminated!")
}
}
}
private inline fun <reified T : FlowLogic<Unit>> CordaRPCOps.killFlowAndAssert(flow: (Party) -> T, party: Party) {
val handle = startFlow(flow, party)
Thread.sleep(5000)
val time = measureTimeMillis {
killFlow(handle.id)
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
val checkpoints = startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
@Test(timeout = 300_000)
fun `killing a flow suspended in waitForLedgerCommit ends the flow immediately`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(::AFlowThatGetsMurderedTryingToWaitForATransaction)
Thread.sleep(5000)
val time = measureTimeMillis {
rpc.killFlow(handle.id)
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@Test(timeout = 300_000)
fun `killing a flow suspended in await ends the flow immediately`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(::AFlowThatGetsMurderedTryingToAwaitAFuture)
Thread.sleep(5000)
val time = measureTimeMillis {
rpc.killFlow(handle.id)
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(1.minutes)
}
}
assertTrue(time < 1.minutes.toMillis(), "It should at a minimum, take less than a minute to kill this flow")
assertTrue(time < 5.seconds.toMillis(), "Really, it should take less than a few seconds to kill a flow")
val checkpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, checkpoints)
}
}
}
@Test(timeout = 300_000)
fun `a killed flow will propagate the killed error to counter parties if it was suspended`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val charlie = startNode(providedName = CHARLIE_NAME).getOrThrow()
alice.rpc.let { rpc ->
val handle = rpc.startFlow(
::AFlowThatGetsMurderedAndSomehowKillsItsFriends,
listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity())
)
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach {
it.value.acquire()
}
rpc.killFlow(handle.id)
assertFailsWith<KilledFlowException> {
handle.returnValue.getOrThrow(20.seconds)
}
AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.locks.forEach {
it.value.acquire()
}
assertTrue(AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
val aliceCheckpoints = rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, charlieCheckpoints)
}
}
}
@Test(timeout = 300_000)
fun `a killed initiated flow will propagate the killed error to the initiator and its counter parties`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
val bob = startNode(providedName = BOB_NAME).getOrThrow()
val charlie = startNode(providedName = CHARLIE_NAME).getOrThrow()
val handle = alice.rpc.startFlow(
::AFlowThatGetsMurderedByItsFriend,
listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity())
)
AFlowThatGetsMurderedByItsFriendResponder.locks.forEach { it.value.acquire() }
val initiatedFlowId = AFlowThatGetsMurderedByItsFriendResponder.flowIds[BOB_NAME]!!
bob.rpc.killFlow(initiatedFlowId)
assertFailsWith<UnexpectedFlowEndException> {
handle.returnValue.getOrThrow(1.minutes)
}
AFlowThatGetsMurderedByItsFriendResponder.locks[CHARLIE_NAME]!!.acquire()
assertTrue(AFlowThatGetsMurderedByItsFriend.receivedKilledException)
assertFalse(AFlowThatGetsMurderedByItsFriendResponder.receivedKilledExceptions[BOB_NAME]!!)
assertTrue(AFlowThatGetsMurderedByItsFriendResponder.receivedKilledExceptions[CHARLIE_NAME]!!)
val aliceCheckpoints = alice.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, aliceCheckpoints)
val bobCheckpoints = bob.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, bobCheckpoints)
val charlieCheckpoints = charlie.rpc.startFlow(::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(1, charlieCheckpoints)
}
}
@Test(timeout = 300_000)
fun `killing a flow releases soft lock`() {
driver(DriverParameters(startNodesInProcess = true)) {
val alice = startNode(
providedName = ALICE_NAME,
defaultParameters = NodeParameters(additionalCordapps = FINANCE_CORDAPPS)
).getOrThrow()
alice.rpc.let { rpc ->
val issuerRef = OpaqueBytes("BankOfMars".toByteArray())
val cash = rpc.startFlow(
::CashIssueFlow,
10.DOLLARS,
issuerRef,
defaultNotaryIdentity
).returnValue.getOrThrow().stx.tx.outRefsOfType<Cash.State>().single()
val flow = rpc.startFlow(::SoftLock, cash.ref, Duration.ofMinutes(5))
var locked = false
while (!locked) {
try {
rpc.startFlow(::SoftLock, cash.ref, Duration.ofSeconds(1)).returnValue.getOrThrow()
} catch (e: StatesNotAvailableException) {
locked = true
}
}
val killed = rpc.killFlow(flow.id)
assertTrue(killed)
Assertions.assertThatCode {
rpc.startFlow(
::SoftLock,
cash.ref,
Duration.ofSeconds(1)
).returnValue.getOrThrow()
}.doesNotThrowAnyException()
}
}
}
@StartableByRPC
class AFlowThatGetsMurderedWhenItTriesToSuspend : FlowLogic<Unit>() {
companion object {
val lockA = Semaphore(0)
val lockB = Semaphore(0)
}
@Suspendable
override fun call() {
lockA.release()
lockB.acquire()
sleep(1.seconds)
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends(private val parties: List<Party>) : FlowLogic<Unit>() {
companion object {
val lockA = Semaphore(0)
val lockB = Semaphore(0)
}
@Suspendable
override fun call() {
val sessionOne = initiateFlow(parties[0])
val sessionTwo = initiateFlow(parties[1])
// trigger sessions with 2 counter parties
sessionOne.sendAndReceive<String>("what is up")
sessionOne.send("what is up 2")
sessionTwo.sendAndReceive<String>("what is up")
sessionTwo.send("what is up 2")
lockA.release()
lockB.acquire()
sleep(1.seconds)
}
}
@InitiatedBy(AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends::class)
class AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriendsResponder(private val session: FlowSession) :
FlowLogic<Unit>() {
companion object {
val locks = mapOf(
BOB_NAME to Semaphore(0),
CHARLIE_NAME to Semaphore(0)
)
var receivedKilledExceptions = mutableMapOf(
BOB_NAME to false,
CHARLIE_NAME to false
)
}
@Suspendable
override fun call() {
session.receive<String>()
session.send("hi")
session.receive<String>()
locks[ourIdentity.name]!!.release()
try {
session.receive<String>()
} catch (e: UnexpectedFlowEndException) {
receivedKilledExceptions[ourIdentity.name] = true
locks[ourIdentity.name]!!.release()
throw e
}
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurdered : FlowLogic<Unit>() {
@Suspendable
override fun call() {
sleep(1.minutes)
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedTryingToSendAMessage(private val party: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(party)
session.send("hi")
}
}
@InitiatedBy(AFlowThatGetsMurderedTryingToSendAMessage::class)
class AFlowThatGetsMurderedTryingToSendAMessageResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.send("haha")
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedTryingToReceiveAMessage(private val party: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(party)
session.receive<String>()
}
}
@InitiatedBy(AFlowThatGetsMurderedTryingToReceiveAMessage::class)
class AFlowThatGetsMurderedTryingToReceiveAMessageResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>()
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedTryingToSendAndReceiveAMessage(private val party: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val session = initiateFlow(party)
session.sendAndReceive<String>("hi")
}
}
@InitiatedBy(AFlowThatGetsMurderedTryingToSendAndReceiveAMessage::class)
class AFlowThatGetsMurderedTryingToSendAndReceiveAMessageResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>()
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedTryingToWaitForATransaction : FlowLogic<Unit>() {
@Suspendable
override fun call() {
waitForLedgerCommit(SecureHash.randomSHA256())
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedTryingToAwaitAFuture : FlowLogic<Unit>() {
@Suspendable
override fun call() {
await(MyFuture())
}
class MyFuture : FlowExternalOperation<Unit> {
override fun execute(deduplicationId: String) {
Thread.sleep(3.minutes.toMillis())
}
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedAndSomehowKillsItsFriends(private val parties: List<Party>) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val sessionOne = initiateFlow(parties[0])
val sessionTwo = initiateFlow(parties[1])
// trigger sessions with 2 counter parties
sessionOne.sendAndReceive<String>("what is up")
sessionOne.send("what is up 2")
// why is this second send needed to cause the kill command to propagate to the other side
// will be exactly the same for normal error propagation
sessionTwo.sendAndReceive<String>("what is up")
sessionTwo.send("what is up 2")
sleep(3.minutes)
}
}
@InitiatedBy(AFlowThatGetsMurderedAndSomehowKillsItsFriends::class)
class AFlowThatGetsMurderedAndSomehowKillsItsFriendsResponder(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
val locks = mapOf(
BOB_NAME to Semaphore(0),
CHARLIE_NAME to Semaphore(0)
)
var receivedKilledExceptions = mutableMapOf(
BOB_NAME to false,
CHARLIE_NAME to false
)
}
@Suspendable
override fun call() {
session.receive<String>()
session.send("hi")
session.receive<String>()
locks[ourIdentity.name]!!.release()
try {
session.receive<String>()
} catch (e: UnexpectedFlowEndException) {
receivedKilledExceptions[ourIdentity.name] = true
locks[ourIdentity.name]!!.release()
throw e
}
}
}
@StartableByRPC
@InitiatingFlow
class AFlowThatGetsMurderedByItsFriend(private val parties: List<Party>) : FlowLogic<Unit>() {
companion object {
var receivedKilledException = false
}
@Suspendable
override fun call() {
val sessionOne = initiateFlow(parties[0])
val sessionTwo = initiateFlow(parties[1])
// trigger sessions with 2 counter parties
sessionOne.sendAndReceive<String>("what is up")
sessionOne.send("what is up 2")
// why is this second send needed to cause the kill command to propagate to the other side
// will be exactly the same for normal error propagation
sessionTwo.sendAndReceive<String>("what is up")
sessionTwo.send("what is up 2")
try {
sessionOne.receive<String>()
} catch (e: UnexpectedFlowEndException) {
logger.info("Received exception in initiating flow")
receivedKilledException = true
throw e
}
}
}
@InitiatedBy(AFlowThatGetsMurderedByItsFriend::class)
class AFlowThatGetsMurderedByItsFriendResponder(private val session: FlowSession) : FlowLogic<Unit>() {
companion object {
val locks = mapOf(
BOB_NAME to Semaphore(0),
CHARLIE_NAME to Semaphore(0)
)
var receivedKilledExceptions = mutableMapOf(
BOB_NAME to false,
CHARLIE_NAME to false
)
var flowIds = mutableMapOf<CordaX500Name, StateMachineRunId>()
}
@Suspendable
override fun call() {
flowIds[ourIdentity.name] = runId
session.receive<String>()
session.send("hi")
session.receive<String>()
locks[ourIdentity.name]!!.release()
try {
session.receive<String>()
} catch (e: UnexpectedFlowEndException) {
receivedKilledExceptions[ourIdentity.name] = true
locks[ourIdentity.name]!!.release()
throw e
}
}
}
@StartableByRPC
class SoftLock(private val stateRef: StateRef, private val duration: Duration) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
logger.info("Soft locking state with hash $stateRef...")
serviceHub.vaultService.softLockReserve(runId.uuid, NonEmptySet.of(stateRef))
sleep(duration)
}
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
return serviceHub.jdbcSession().prepareStatement("select count(*) from node_checkpoints").use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
}
}
}

View File

@ -114,7 +114,7 @@ sealed class Action {
/**
* Sleep until [time].
*/
data class SleepUntil(val time: Instant) : Action()
data class SleepUntil(val currentState: StateMachineState, val time: Instant) : Action()
/**
* Create a new database transaction.

View File

@ -1,6 +1,5 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.Gauge
import com.codahale.metrics.Histogram
@ -64,7 +63,7 @@ class ActionExecutorImpl(
is Action.AcknowledgeMessages -> executeAcknowledgeMessages(action)
is Action.PropagateErrors -> executePropagateErrors(action)
is Action.ScheduleEvent -> executeScheduleEvent(fiber, action)
is Action.SleepUntil -> executeSleepUntil(action)
is Action.SleepUntil -> executeSleepUntil(fiber, action)
is Action.RemoveCheckpoint -> executeRemoveCheckpoint(action)
is Action.SendInitial -> executeSendInitial(action)
is Action.SendExisting -> executeSendExisting(action)
@ -170,11 +169,12 @@ class ActionExecutorImpl(
}
@Suspendable
private fun executeSleepUntil(action: Action.SleepUntil) {
// TODO introduce explicit sleep state + wakeup event instead of relying on Fiber.sleep. This is so shutdown
// conditions may "interrupt" the sleep instead of waiting until wakeup.
val duration = Duration.between(services.clock.instant(), action.time)
Fiber.sleep(duration.toNanos(), TimeUnit.NANOSECONDS)
private fun executeSleepUntil(fiber: FlowFiber, action: Action.SleepUntil) {
stateMachineManager.scheduleFlowSleep(
fiber,
action.currentState,
Duration.between(services.clock.instant(), action.time)
)
}
@Suspendable

View File

@ -150,6 +150,13 @@ sealed class Event {
override fun toString() = "RetryFlowFromSafePoint"
}
/**
* Wake a flow up from its sleep.
*/
object WakeUpFromSleep : Event() {
override fun toString() = "WakeUpSleepyFlow"
}
/**
* Indicates that an event was generated by an external event and that external event needs to be replayed if we retry the flow,
* even if it has not yet been processed and placed on the pending de-duplication handlers list.

View File

@ -9,6 +9,7 @@ import net.corda.node.services.statemachine.transitions.StateMachine
*/
interface FlowFiber {
val id: StateMachineRunId
val instanceId: StateMachineInstanceId
val stateMachine: StateMachine
@Suspendable

View File

@ -0,0 +1,78 @@
package net.corda.node.services.statemachine
import net.corda.core.internal.FlowIORequest
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import java.time.Duration
import java.util.concurrent.Future
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
internal class FlowSleepScheduler(private val smm: StateMachineManagerInternal, private val scheduledExecutor: ScheduledExecutorService) {
private companion object {
val log = contextLogger()
}
/**
* Put a flow to sleep for a specified duration.
*
* @param fiber The [FlowFiber] that will be woken up after sleeping
* @param currentState The current [StateMachineState]
* @param duration How long to sleep for
*/
fun sleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration) {
// No other future should be running, cancel it if there is
currentState.future?.run {
log.debug { "Cancelling the existing future for flow ${fiber.id}" }
cancelIfRunning()
}
currentState.future = setAlarmClock(fiber, duration)
}
/**
* Schedule a wake up event.
*
* @param fiber The [FlowFiber] to schedule a wake up event for
*/
fun scheduleWakeUp(fiber: FlowFiber) {
fiber.scheduleEvent(Event.WakeUpFromSleep)
}
/**
* Cancel a sleeping flow's future. Note, this does not cause the flow to wake up.
*
* @param currentState The current [StateMachineState]
*/
fun cancel(currentState: StateMachineState) {
(currentState.checkpoint.flowState as? FlowState.Started)?.let { flowState ->
if (currentState.isWaitingForFuture && flowState.flowIORequest is FlowIORequest.Sleep) {
(currentState.future as? ScheduledFuture)?.run {
log.debug { "Cancelling the sleep scheduled future for flow ${currentState.flowLogic.runId}" }
cancelIfRunning()
currentState.future = null
}
}
}
}
private fun Future<*>.cancelIfRunning() {
if (!isDone) cancel(true)
}
private fun setAlarmClock(fiber: FlowFiber, duration: Duration): ScheduledFuture<Unit> {
val instance = fiber.instanceId
log.debug { "Putting flow to sleep for $duration" }
return scheduledExecutor.schedule<Unit>(
{
log.debug { "Scheduling flow wake up event for flow ${instance.runId}" }
// This passes back into the SMM to check that the fiber that went to sleep is the same fiber that is now being scheduled
// with the wake up event
smm.scheduleFlowWakeUp(instance)
},
duration.toMillis(), TimeUnit.MILLISECONDS
)
}
}

View File

@ -9,10 +9,27 @@ import co.paralleluniverse.strands.channels.Channel
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.cordapp.Cordapp
import net.corda.core.flows.*
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.FlowStackSnapshot
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.AnonymousParty
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.isIdempotentFlow
import net.corda.core.internal.isRegularFile
import net.corda.core.internal.location
import net.corda.core.internal.toPath
import net.corda.core.internal.uncheckedCast
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
@ -113,6 +130,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override val resultFuture: CordaFuture<R> get() = uncheckedCast(getTransientField(TransientValues::resultFuture))
override val context: InvocationContext get() = transientState!!.value.checkpoint.invocationContext
override val ourIdentity: Party get() = transientState!!.value.checkpoint.ourIdentity
override val isKilled: Boolean get() = transientState!!.value.isKilled
internal var hasSoftLockedStates: Boolean = false
set(value) {
if (value) field = value else throw IllegalArgumentException("Can only set to true")
@ -131,6 +150,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val actionExecutor = getTransientField(TransientValues::actionExecutor)
val transition = stateMachine.transition(event, oldState)
val (continuation, newState) = transitionExecutor.executeTransition(this, oldState, event, transition, actionExecutor)
// Ensure that the next state that is being written to the transient state maintains the [isKilled] flag
// This condition can be met if a flow is killed during [TransitionExecutor.executeTransition]
if (oldState.isKilled && !newState.isKilled) {
newState.isKilled = true
}
transientState = TransientReference(newState)
setLoggingContext()
return continuation
@ -157,6 +181,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
eventQueue.receive()
} catch (interrupted: InterruptedException) {
log.error("Flow interrupted while waiting for events, aborting immediately")
(transientValues?.value?.resultFuture as? OpenFuture<*>)?.setException(KilledFlowException(id))
abortFiber()
}
val continuation = processEvent(transitionExecutor, nextEvent)
@ -469,8 +494,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnExit = false
)
require(continuation == FlowContinuation.ProcessEvents) { "Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation " }
unpark(SERIALIZER_BLOCKER)
// If the flow has been aborted then do not resume the fiber
if (continuation != FlowContinuation.Abort) {
require(continuation == FlowContinuation.ProcessEvents) {
"Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation"
}
unpark(SERIALIZER_BLOCKER)
}
}
return uncheckedCast(processEventsUntilFlowIsResumed(
isDbTransactionOpenOnEntry = false,
@ -502,6 +533,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
override val stateMachine get() = getTransientField(TransientValues::stateMachine)
override val instanceId: StateMachineInstanceId get() = StateMachineInstanceId(id, super.getId())
/**
* Records the duration of this flow from call() to completion or failure.
* Note that the duration will include the time the flow spent being parked, and not just the total

View File

@ -6,6 +6,7 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.fibers.instrument.JavaAgent
import co.paralleluniverse.strands.channels.Channels
import com.codahale.metrics.Gauge
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowException
@ -13,11 +14,15 @@ import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.mapError
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.mapNotNull
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
@ -35,7 +40,11 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.errorAndTerminate
@ -50,11 +59,14 @@ import rx.Observable
import rx.subjects.PublishSubject
import java.lang.Integer.min
import java.security.SecureRandom
import java.util.*
import java.util.concurrent.*
import java.time.Duration
import java.util.HashSet
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
import kotlin.collections.HashMap
import kotlin.collections.component1
import kotlin.collections.component2
import kotlin.collections.set
@ -81,10 +93,10 @@ class SingleThreadedStateMachineManager(
private class Flow(val fiber: FlowStateMachineImpl<*>, val resultFuture: OpenFuture<Any?>)
private data class ScheduledTimeout(
/** Will fire a [FlowTimeoutException] indicating to the flow hospital to restart the flow. */
val scheduledFuture: ScheduledFuture<*>,
/** Specifies the number of times this flow has been retried. */
val retryCount: Int = 0
/** Will fire a [FlowTimeoutException] indicating to the flow hospital to restart the flow. */
val scheduledFuture: ScheduledFuture<*>,
/** Specifies the number of times this flow has been retried. */
val retryCount: Int = 0
)
// A list of all the state machines being managed by this class. We expose snapshots of it via the stateMachines
@ -101,13 +113,16 @@ class SingleThreadedStateMachineManager(
private val mutex = ThreadBox(InnerState())
private val scheduler = FiberExecutorScheduler("Same thread scheduler", executor)
private val timeoutScheduler = Executors.newScheduledThreadPool(1)
private val scheduledFutureExecutor = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder().setNameFormat("flow-scheduled-future-thread").setDaemon(true).build()
)
// How many Fibers are running and not suspended. If zero and stopping is true, then we are halted.
private val liveFibers = ReusableLatch()
// Monitoring support.
private val metrics = serviceHub.monitoringService.metrics
private val sessionToFlow = ConcurrentHashMap<SessionId, StateMachineRunId>()
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
private val flowSleepScheduler = FlowSleepScheduler(this, scheduledFutureExecutor)
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
@ -197,7 +212,7 @@ class SingleThreadedStateMachineManager(
check(!foundUnrestorableFibers) { "Unrestorable checkpoints were created, please check the logs for details." }
}
flowHospital.close()
timeoutScheduler.shutdown()
scheduledFutureExecutor.shutdown()
scheduler.shutdown()
}
@ -233,34 +248,34 @@ class SingleThreadedStateMachineManager(
override fun killFlow(id: StateMachineRunId): Boolean {
val killFlowResult = mutex.locked {
cancelTimeoutIfScheduled(id)
val flow = flows.remove(id)
val flow = flows[id]
if (flow != null) {
flow.fiber.transientState?.let {
flow.fiber.transientState = TransientReference(it.value.copy(isRemoved = true))
}
logger.info("Killing flow $id known to this node.")
decrementLiveFibers()
totalFinishedFlows.inc()
try {
flow.fiber.interrupt()
// The checkpoint and soft locks are removed here instead of relying on the processing of the next event after setting
// the killed flag. This is to ensure a flow can be removed from the database, even if it is stuck in a infinite loop.
database.transaction {
checkpointStorage.removeCheckpoint(id)
serviceHub.vaultService.softLockRelease(id.uuid)
}
// the same code is NOT done in remove flow when an error occurs
// what is the point of this latch?
unfinishedFibers.countDown()
val state = flow.fiber.transientState
return@locked if (state != null) {
state.value.isKilled = true
flow.fiber.scheduleEvent(Event.DoRemainingWork)
true
} finally {
database.transaction {
checkpointStorage.removeCheckpoint(id)
serviceHub.vaultService.softLockRelease(id.uuid)
}
transitionExecutor.forceRemoveFlow(id)
unfinishedFibers.countDown()
} else {
logger.info("Flow $id has not been initialised correctly and cannot be killed")
false
}
} else {
// It may be that the id refers to a checkpoint that couldn't be deserialised into a flow, so we delete it if it exists.
database.transaction {
checkpointStorage.removeCheckpoint(id)
}
database.transaction { checkpointStorage.removeCheckpoint(id) }
}
}
return if(killFlowResult) {
return if (killFlowResult) {
true
} else {
flowHospital.dropSessionInit(id)
@ -297,6 +312,7 @@ class SingleThreadedStateMachineManager(
override fun removeFlow(flowId: StateMachineRunId, removalReason: FlowRemovalReason, lastState: StateMachineState) {
mutex.locked {
cancelTimeoutIfScheduled(flowId)
cancelFlowSleep(lastState)
val flow = flows.remove(flowId)
if (flow != null) {
decrementLiveFibers()
@ -360,6 +376,7 @@ class SingleThreadedStateMachineManager(
@Suppress("TooGenericExceptionCaught", "ComplexMethod", "MaxLineLength") // this is fully intentional here, see comment in the catch clause
override fun retryFlowFromSafePoint(currentState: StateMachineState) {
cancelFlowSleep(currentState)
// Get set of external events
val flowId = currentState.flowLogic.runId
try {
@ -624,15 +641,17 @@ class SingleThreadedStateMachineManager(
val startedFuture = openFuture<Unit>()
val initialState = StateMachineState(
checkpoint = checkpoint,
pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isWaitingForFuture = false,
isAnyCheckpointPersisted = existingCheckpoint != null,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
flowLogic = flowLogic,
senderUUID = ourSenderUUID
checkpoint = checkpoint,
pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isWaitingForFuture = false,
future = null,
isAnyCheckpointPersisted = existingCheckpoint != null,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
isKilled = false,
flowLogic = flowLogic,
senderUUID = ourSenderUUID
)
flowStateMachineImpl.transientState = TransientReference(initialState)
mutex.locked {
@ -651,6 +670,25 @@ class SingleThreadedStateMachineManager(
mutex.locked { cancelTimeoutIfScheduled(flowId) }
}
override fun scheduleFlowSleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration) {
flowSleepScheduler.sleep(fiber, currentState, duration)
}
override fun scheduleFlowWakeUp(instanceId: StateMachineInstanceId) {
mutex.locked {
flows[instanceId.runId]?.let { flow ->
// Only schedule a wake up event if the fiber the flow is executing on has not changed
if (flow.fiber.instanceId == instanceId) {
flowSleepScheduler.scheduleWakeUp(flow.fiber)
}
}
}
}
private fun cancelFlowSleep(currentState: StateMachineState) {
flowSleepScheduler.cancel(currentState)
}
/**
* Schedules the flow [flowId] to be retried if it does not finish within the timeout period
* specified in the config.
@ -703,7 +741,7 @@ class SingleThreadedStateMachineManager(
/** Schedules a [FlowTimeoutException] to be fired in order to restart the flow. */
private fun scheduleTimeoutException(flow: Flow, delay: Long): ScheduledFuture<*> {
return with(serviceHub.configuration.flowTimeout) {
timeoutScheduler.schedule({
scheduledFutureExecutor.schedule({
val event = Event.Error(FlowTimeoutException())
flow.fiber.scheduleEvent(event)
}, delay, TimeUnit.SECONDS)
@ -780,15 +818,17 @@ class SingleThreadedStateMachineManager(
is FlowState.Unstarted -> {
val logic = tryCheckpointDeserialize(flowState.frozenFlowLogic, id) ?: return null
val state = StateMachineState(
checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isWaitingForFuture = false,
isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
flowLogic = logic,
senderUUID = null
checkpoint = checkpoint,
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isWaitingForFuture = false,
future = null,
isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
isKilled = false,
flowLogic = logic,
senderUUID = null
)
val fiber = FlowStateMachineImpl(id, logic, scheduler)
fiber.transientValues = TransientReference(createTransientValues(id, resultFuture))
@ -799,20 +839,22 @@ class SingleThreadedStateMachineManager(
is FlowState.Started -> {
val fiber = tryCheckpointDeserialize(flowState.frozenFiber, id) ?: return null
val state = StateMachineState(
// Do a trivial checkpoint copy below, to update the Checkpoint#timestamp value.
// The Checkpoint#timestamp is being used by FlowMonitor as the starting time point of a potential suspension.
// We need to refresh the Checkpoint#timestamp here, in case of an e.g. node start up after a long period.
// If not then, there is a time window (until the next checkpoint update) in which the FlowMonitor
// could log this flow as a waiting flow, from the last checkpoint update i.e. before the node's start up.
checkpoint = checkpoint.copy(),
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isWaitingForFuture = false,
isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
flowLogic = fiber.logic,
senderUUID = null
// Do a trivial checkpoint copy below, to update the Checkpoint#timestamp value.
// The Checkpoint#timestamp is being used by FlowMonitor as the starting time point of a potential suspension.
// We need to refresh the Checkpoint#timestamp here, in case of an e.g. node start up after a long period.
// If not then, there is a time window (until the next checkpoint update) in which the FlowMonitor
// could log this flow as a waiting flow, from the last checkpoint update i.e. before the node's start up.
checkpoint = checkpoint.copy(),
pendingDeduplicationHandlers = initialDeduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isWaitingForFuture = false,
future = null,
isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
isKilled = false,
flowLogic = fiber.logic,
senderUUID = null
)
fiber.transientValues = TransientReference(createTransientValues(id, resultFuture))
fiber.transientState = TransientReference(state)

View File

@ -0,0 +1,5 @@
package net.corda.node.services.statemachine
import net.corda.core.flows.StateMachineRunId
data class StateMachineInstanceId(val runId: StateMachineRunId, val fiberId: Long)

View File

@ -10,6 +10,7 @@ import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage
import rx.Observable
import java.time.Duration
/**
* A StateMachineManager is responsible for coordination and persistence of multiple [FlowStateMachine] objects.
@ -103,6 +104,8 @@ interface StateMachineManagerInternal {
fun retryFlowFromSafePoint(currentState: StateMachineState)
fun scheduleFlowTimeout(flowId: StateMachineRunId)
fun cancelFlowTimeout(flowId: StateMachineRunId)
fun scheduleFlowSleep(fiber: FlowFiber, currentState: StateMachineState, duration: Duration)
fun scheduleFlowWakeUp(instanceId: StateMachineInstanceId)
}
/**

View File

@ -11,6 +11,7 @@ import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler
import java.time.Instant
import java.util.concurrent.Future
/**
* The state of the state machine, capturing the state of a flow. It consists of two parts, an *immutable* part that is
@ -22,13 +23,16 @@ import java.time.Instant
* @param isFlowResumed true if the control is returned (or being returned) to "user-space" flow code. This is used
* to make [Event.DoRemainingWork] idempotent.
* @param isWaitingForFuture true if the flow is waiting for the completion of a future triggered by one of the statemachine's actions
* [FlowIORequest.WaitForLedgerCommit]. This used is to make tracking idempotent.
* @param future If the flow is relying on a [Future] completing, then this field will be set otherwise it remains null
* @param isAnyCheckpointPersisted true if at least a single checkpoint has been persisted. This is used to determine
* whether we should DELETE the checkpoint at the end of the flow.
* @param isStartIdempotent true if the start of the flow is idempotent, making the skipping of the initial checkpoint
* possible.
* @param isRemoved true if the flow has been removed from the state machine manager. This is used to avoid any further
* work.
* @param isKilled true if the flow has been marked as killed. This is used to cause a flow to move to a killed flow transition no matter
* what event it is set to process next. [isKilled] is a `var` and set as [Volatile] to prevent concurrency errors that can occur if a flow
* is killed during the middle of a state transition.
* @param senderUUID the identifier of the sending state machine or null if this flow is resumed from a checkpoint so that it does not participate in de-duplication high-water-marking.
*/
// TODO perhaps add a read-only environment to the state machine for things that don't change over time?
@ -39,9 +43,12 @@ data class StateMachineState(
val pendingDeduplicationHandlers: List<DeduplicationHandler>,
val isFlowResumed: Boolean,
val isWaitingForFuture: Boolean,
var future: Future<*>?,
val isAnyCheckpointPersisted: Boolean,
val isStartIdempotent: Boolean,
val isRemoved: Boolean,
@Volatile
var isKilled: Boolean,
val senderUUID: String?
)

View File

@ -1,7 +1,6 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
@ -18,13 +17,6 @@ interface TransitionExecutor {
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState>
/**
* Called if the normal exit path where the new state is marked as removed via [StateMachineState.isRemoved] is not called.
* Currently this only happens via [StateMachineManager.killFlow]. This allows instances of this interface to clean up
* any state they are holding for a flow to prevent a memory leak.
*/
fun forceRemoveFlow(id: StateMachineRunId)
}
/**

View File

@ -1,7 +1,6 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
@ -23,7 +22,6 @@ class TransitionExecutorImpl(
val secureRandom: SecureRandom,
val database: CordaPersistence
) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {}
private companion object {
val log = contextLogger()

View File

@ -60,9 +60,4 @@ class DumpHistoryOnErrorInterceptor(val delegate: TransitionExecutor) : Transiti
return Pair(continuation, nextState)
}
override fun forceRemoveFlow(id: StateMachineRunId) {
records.remove(id)
delegate.forceRemoveFlow(id)
}
}

View File

@ -1,12 +1,17 @@
package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowFiber
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.TransitionExecutor
import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
import java.util.concurrent.LinkedBlockingQueue
@ -19,9 +24,6 @@ class FiberDeserializationCheckingInterceptor(
val fiberDeserializationChecker: FiberDeserializationChecker,
val delegate: TransitionExecutor
) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
delegate.forceRemoveFlow(id)
}
@Suspendable
override fun executeTransition(

View File

@ -20,15 +20,6 @@ class HospitalisingInterceptor(
private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor
) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
removeFlow(id)
delegate.forceRemoveFlow(id)
}
private fun removeFlow(id: StateMachineRunId) {
flowHospital.leave(id)
flowHospital.removeMedicalHistory(id)
}
@Suspendable
override fun executeTransition(
@ -56,4 +47,9 @@ class HospitalisingInterceptor(
}
return Pair(continuation, nextState)
}
private fun removeFlow(id: StateMachineRunId) {
flowHospital.leave(id)
flowHospital.removeMedicalHistory(id)
}
}

View File

@ -2,7 +2,6 @@ package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import com.codahale.metrics.MetricRegistry
import net.corda.core.flows.StateMachineRunId
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
@ -13,12 +12,15 @@ import net.corda.node.services.statemachine.transitions.FlowContinuation
import net.corda.node.services.statemachine.transitions.TransitionResult
class MetricInterceptor(val metrics: MetricRegistry, val delegate: TransitionExecutor) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
delegate.forceRemoveFlow(id)
}
@Suspendable
override fun executeTransition(fiber: FlowFiber, previousState: StateMachineState, event: Event, transition: TransitionResult, actionExecutor: ActionExecutor): Pair<FlowContinuation, StateMachineState> {
override fun executeTransition(
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
val metricActionInterceptor = MetricActionInterceptor(metrics, actionExecutor)
return delegate.executeTransition(fiber, previousState, event, transition, metricActionInterceptor)
}

View File

@ -1,7 +1,6 @@
package net.corda.node.services.statemachine.interceptors
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StateMachineRunId
import net.corda.core.utilities.contextLogger
import net.corda.node.services.statemachine.ActionExecutor
import net.corda.node.services.statemachine.Event
@ -16,9 +15,6 @@ import java.time.Instant
* This interceptor simply prints all state machine transitions. Useful for debugging.
*/
class PrintingInterceptor(val delegate: TransitionExecutor) : TransitionExecutor {
override fun forceRemoveFlow(id: StateMachineRunId) {
delegate.forceRemoveFlow(id)
}
companion object {
val log = contextLogger()

View File

@ -0,0 +1,120 @@
package net.corda.node.services.statemachine.transitions
import net.corda.core.flows.FlowException
import net.corda.core.flows.KilledFlowException
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ErrorSessionMessage
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.FlowRemovalReason
import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState
class KilledFlowTransition(
override val context: TransitionContext,
override val startingState: StateMachineState,
val event: Event
) : Transition {
override fun transition(): TransitionResult {
return builder {
val killedFlowError = createKilledFlowError()
val killedFlowErrorMessage = createErrorMessageFromError(killedFlowError)
val errorMessages = listOf(killedFlowErrorMessage)
val (initiatedSessions, newSessions) = bufferErrorMessagesInInitiatingSessions(startingState.checkpoint.sessions, errorMessages)
val newCheckpoint = startingState.checkpoint.copy(
sessions = newSessions
)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(
Action.PropagateErrors(
errorMessages,
initiatedSessions,
startingState.senderUUID
)
)
if (!startingState.isFlowResumed) {
actions.add(Action.CreateTransaction)
}
// The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow]
if(startingState.isAnyCheckpointPersisted) {
actions.add(Action.RemoveCheckpoint(context.id))
}
actions.addAll(
arrayOf(
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(context.id.uuid),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.RemoveSessionBindings(currentState.checkpoint.sessions.keys)
)
)
currentState = currentState.copy(
pendingDeduplicationHandlers = emptyList(),
isRemoved = true
)
actions.add(Action.RemoveFlow(context.id, createKilledRemovalReason(killedFlowError), currentState))
FlowContinuation.Abort
}
}
private fun createKilledFlowError(): FlowError {
val exception = when (event) {
is Event.Error -> event.exception
else -> KilledFlowException(context.id)
}
return FlowError(context.secureRandom.nextLong(), exception)
}
// Purposely left the same as [bufferErrorMessagesInInitiatingSessions] in [ErrorFlowTransition] so that it can be refactored
private fun createErrorMessageFromError(error: FlowError): ErrorSessionMessage {
val exception = error.exception
// If the exception doesn't contain an originalErrorId that means it's a fresh FlowException that should
// propagate to the neighbouring flows. If it has the ID filled in that means it's a rethrown FlowException and
// shouldn't be propagated.
return if (exception is FlowException && exception.originalErrorId == null) {
ErrorSessionMessage(flowException = exception, errorId = error.errorId)
} else {
ErrorSessionMessage(flowException = null, errorId = error.errorId)
}
}
// Purposely left the same as [bufferErrorMessagesInInitiatingSessions] in [ErrorFlowTransition] so that it can be refactored
// Buffer error messages in Initiating sessions, return the initialised ones.
private fun bufferErrorMessagesInInitiatingSessions(
sessions: Map<SessionId, SessionState>,
errorMessages: List<ErrorSessionMessage>
): Pair<List<SessionState.Initiated>, Map<SessionId, SessionState>> {
val newSessions = sessions.mapValues { (sourceSessionId, sessionState) ->
if (sessionState is SessionState.Initiating && sessionState.rejectionError == null) {
// *prepend* the error messages in order to error the other sessions ASAP. The other messages will
// be delivered all the same, they just won't trigger flow resumption because of dirtiness.
val errorMessagesWithDeduplication = errorMessages.map {
DeduplicationId.createForError(it.errorId, sourceSessionId) to it
}
sessionState.copy(bufferedMessages = errorMessagesWithDeduplication + sessionState.bufferedMessages)
} else {
sessionState
}
}
val initiatedSessions = sessions.values.mapNotNull { session ->
if (session is SessionState.Initiated && session.errors.isEmpty()) {
session
} else {
null
}
}
return Pair(initiatedSessions, newSessions)
}
private fun createKilledRemovalReason(error: FlowError): FlowRemovalReason.ErrorFinish {
return FlowRemovalReason.ErrorFinish(listOf(error))
}
}

View File

@ -88,9 +88,16 @@ class StartedFlowTransition(
}
private fun sleepTransition(flowIORequest: FlowIORequest.Sleep): TransitionResult {
return builder {
actions.add(Action.SleepUntil(flowIORequest.wakeUpAfter))
resumeFlowLogic(Unit)
// This ensures that the [Sleep] request is not executed multiple times if extra
// [DoRemainingWork] events are pushed onto the fiber's event queue before the flow has really woken up
return if (!startingState.isWaitingForFuture) {
builder {
currentState = currentState.copy(isWaitingForFuture = true)
actions.add(Action.SleepUntil(currentState, flowIORequest.wakeUpAfter))
FlowContinuation.ProcessEvents
}
} else {
TransitionResult(startingState)
}
}

View File

@ -32,7 +32,14 @@ class TopLevelTransition(
override val startingState: StateMachineState,
val event: Event
) : Transition {
@Suppress("ComplexMethod")
override fun transition(): TransitionResult {
if (startingState.isKilled) {
return KilledFlowTransition(context, startingState, event).transition()
}
return when (event) {
is Event.DoRemainingWork -> DoRemainingWorkTransition(context, startingState).transition()
is Event.DeliverSessionMessage -> DeliverSessionMessageTransition(context, startingState, event).transition()
@ -48,6 +55,7 @@ class TopLevelTransition(
is Event.AsyncOperationCompletion -> asyncOperationCompletionTransition(event)
is Event.AsyncOperationThrows -> asyncOperationThrowsTransition(event)
is Event.RetryFlowFromSafePoint -> retryFlowFromSafePointTransition(startingState)
is Event.WakeUpFromSleep -> wakeUpFromSleepTransition()
}
}
@ -298,4 +306,10 @@ class TopLevelTransition(
FlowContinuation.Abort
}
}
private fun wakeUpFromSleepTransition(): TransitionResult {
return builder {
resumeFlowLogic(Unit)
}
}
}

View File

@ -68,13 +68,13 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
fun resumeFlowLogic(result: Any?): FlowContinuation {
actions.add(Action.CreateTransaction)
currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false)
currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false, future = null)
return FlowContinuation.Resume(result)
}
fun resumeFlowLogic(result: Throwable): FlowContinuation {
actions.add(Action.CreateTransaction)
currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false)
currentState = currentState.copy(isFlowResumed = true, isWaitingForFuture = false, future = null)
return FlowContinuation.Throw(result)
}
}

View File

@ -1,6 +1,5 @@
package net.corda.node
import co.paralleluniverse.fibers.Fiber
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.PermissionException
import net.corda.core.context.AuthServiceId
@ -8,7 +7,6 @@ import net.corda.core.context.InvocationContext
import net.corda.core.contracts.Amount
import net.corda.core.contracts.ContractState
import net.corda.core.contracts.Issued
import net.corda.core.contracts.StateRef
import net.corda.core.crypto.isFulfilledBy
import net.corda.core.crypto.keys
import net.corda.core.flows.FlowLogic
@ -17,18 +15,19 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.RPC_UPLOADER
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.*
import net.corda.core.node.services.StatesNotAvailableException
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.vaultQueryBy
import net.corda.core.messaging.vaultTrackBy
import net.corda.core.node.services.Vault
import net.corda.core.node.services.queryBy
import net.corda.core.node.services.vault.AttachmentQueryCriteria
import net.corda.core.node.services.vault.ColumnPredicate
import net.corda.core.node.services.vault.EqualityComparisonOperator
import net.corda.core.transactions.SignedTransaction
import net.corda.core.utilities.NonEmptySet
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.finance.DOLLARS
import net.corda.finance.GBP
import net.corda.finance.contracts.asset.Cash
@ -36,7 +35,6 @@ import net.corda.finance.flows.CashIssueFlow
import net.corda.finance.flows.CashPaymentFlow
import net.corda.node.internal.security.AuthorizingSubject
import net.corda.node.internal.security.RPCSecurityManagerImpl
import net.corda.node.services.Permissions.Companion.all
import net.corda.node.services.Permissions.Companion.invokeRpc
import net.corda.node.services.Permissions.Companion.startFlow
import net.corda.node.services.rpc.CURRENT_RPC_CONTEXT
@ -54,14 +52,15 @@ import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.testActor
import org.apache.commons.io.IOUtils
import org.assertj.core.api.Assertions.*
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Assert.assertArrayEquals
import org.junit.Before
import org.junit.Test
import rx.Observable
import java.io.ByteArrayOutputStream
import java.time.Duration
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNull
@ -383,72 +382,6 @@ class CordaRPCOpsImplTest {
}
}
@Test(timeout=300_000)
fun `kill a stuck flow through RPC`() {
CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet())))
withPermissions(
startFlow<NewJoinerFlow>(),
invokeRpc(CordaRPCOps::killFlow),
invokeRpc(CordaRPCOps::stateMachinesFeed),
invokeRpc(CordaRPCOps::stateMachinesSnapshot)
) {
val flow = rpc.startFlow(::NewJoinerFlow)
val killed = rpc.killFlow(flow.id)
assertThat(killed).isTrue()
assertThat(rpc.stateMachinesSnapshot().map { info -> info.id }).doesNotContain(flow.id)
}
}
@Test(timeout=300_000)
fun `kill a waiting flow through RPC`() {
CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet())))
withPermissions(
startFlow<HopefulFlow>(),
invokeRpc(CordaRPCOps::killFlow),
invokeRpc(CordaRPCOps::stateMachinesFeed),
invokeRpc(CordaRPCOps::stateMachinesSnapshot)
) {
val flow = rpc.startFlow(::HopefulFlow, alice)
val killed = rpc.killFlow(flow.id)
assertThat(killed).isTrue()
assertThat(rpc.stateMachinesSnapshot().map { info -> info.id }).doesNotContain(flow.id)
}
}
@Test(timeout=300_000)
fun `killing a flow releases soft lock`() {
CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet())))
withPermissions(all()) {
val issuerRef = OpaqueBytes("BankOfMars".toByteArray())
val cash = rpc.startFlow(::CashIssueFlow, 10.DOLLARS, issuerRef, notary).returnValue.getOrThrow().stx.tx.outRefsOfType<Cash.State>().single()
val flow = rpc.startFlow(::SoftLock, cash.ref, Duration.ofMinutes(5))
var locked = false
while (!locked) {
try {
rpc.startFlow(::SoftLock, cash.ref, Duration.ofSeconds(1)).returnValue.getOrThrow()
} catch (e: StatesNotAvailableException) {
locked = true
}
}
val killed = rpc.killFlow(flow.id)
assertThat(killed).isTrue()
assertThatCode { rpc.startFlow(::SoftLock, cash.ref, Duration.ofSeconds(1)).returnValue.getOrThrow() }.doesNotThrowAnyException()
}
}
@StartableByRPC
class SoftLock(private val stateRef: StateRef, private val duration: Duration) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
logger.info("Soft locking state with hash $stateRef...")
serviceHub.vaultService.softLockReserve(runId.uuid, NonEmptySet.of(stateRef))
sleep(duration)
}
}
@Test(timeout=300_000)
fun `kill a nonexistent flow through RPC`() {
CURRENT_RPC_CONTEXT.set(RpcAuthContext(InvocationContext.rpc(testActor()), buildSubject("TEST_USER", emptySet())))
@ -479,25 +412,6 @@ class CordaRPCOpsImplTest {
}
}
@StartableByRPC
class NewJoinerFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
logger.info("When can I join you say? Almost there buddy...")
Fiber.currentFiber().join()
return "You'll never get me!"
}
}
@StartableByRPC
class HopefulFlow(private val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
logger.info("Waiting for a miracle...")
return initiateFlow(party).receive<String>().unwrap { it }
}
}
class NonRPCFlow : FlowLogic<Unit>() {
@Suspendable
override fun call() = Unit

View File

@ -8,12 +8,15 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.KilledFlowException
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.concurrent.flatMap
import net.corda.core.messaging.MessageRecipients
import net.corda.core.utilities.UntrustworthyData
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.services.FinalityHandler
import net.corda.node.services.messaging.Message
@ -35,9 +38,10 @@ import org.junit.Test
import java.sql.SQLException
import java.time.Duration
import java.time.Instant
import java.util.*
import java.util.Collections
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertNotNull
import kotlin.test.assertNull
@ -183,6 +187,12 @@ class RetryFlowMockTest {
records.next()
// Killing it should remove it.
nodeA.smm.killFlow(flow.id)
assertFailsWith<KilledFlowException> {
flow.resultFuture.getOrThrow(20.seconds)
}
// Sleep added because the flow leaves the hospital after the future has returned
// This means that the removal code has not run by the time the snapshot is taken
Thread.sleep(2000)
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
}