Merge pull request #6851 from corda/dan/ENT-6142-port-to-os

ENT-6142 port to OS
This commit is contained in:
Dan Newton 2021-01-29 13:29:25 +00:00 committed by GitHub
commit d214f5ecbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 687 additions and 85 deletions

View File

@ -153,13 +153,15 @@ abstract class StateMachineErrorHandlingTest {
runnable: Int = 0, runnable: Int = 0,
failed: Int = 0, failed: Int = 0,
completed: Int = 0, completed: Int = 0,
hospitalized: Int = 0 hospitalized: Int = 0,
killed: Int = 0
) { ) {
val counts = startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds) val counts = startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(runnable, counts.runnable, "There should be $runnable runnable checkpoints") assertEquals(runnable, counts.runnable, "There should be $runnable runnable checkpoints")
assertEquals(failed, counts.failed, "There should be $failed failed checkpoints") assertEquals(failed, counts.failed, "There should be $failed failed checkpoints")
assertEquals(completed, counts.completed, "There should be $completed completed checkpoints") assertEquals(completed, counts.completed, "There should be $completed completed checkpoints")
assertEquals(hospitalized, counts.hospitalized, "There should be $hospitalized hospitalized checkpoints") assertEquals(hospitalized, counts.hospitalized, "There should be $hospitalized hospitalized checkpoints")
assertEquals(killed, counts.killed, "There should be $killed killed checkpoints")
} }
internal fun CordaRPCOps.assertNumberOfCheckpointsAllZero() = assertNumberOfCheckpoints() internal fun CordaRPCOps.assertNumberOfCheckpointsAllZero() = assertNumberOfCheckpoints()
@ -189,6 +191,7 @@ abstract class StateMachineErrorHandlingTest {
class ThrowAnErrorFlow : FlowLogic<String>() { class ThrowAnErrorFlow : FlowLogic<String>() {
@Suspendable @Suspendable
override fun call(): String { override fun call(): String {
sleep(1.seconds)
throwException() throwException()
return "cant get here" return "cant get here"
} }
@ -219,7 +222,8 @@ abstract class StateMachineErrorHandlingTest {
runnable = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.RUNNABLE), runnable = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.RUNNABLE),
failed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.FAILED), failed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.FAILED),
completed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.COMPLETED), completed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.COMPLETED),
hospitalized = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.HOSPITALIZED) hospitalized = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.HOSPITALIZED),
killed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.KILLED)
) )
private fun getNumberOfCheckpointsWithStatus(status: Checkpoint.FlowStatus): Int { private fun getNumberOfCheckpointsWithStatus(status: Checkpoint.FlowStatus): Int {
@ -243,7 +247,8 @@ abstract class StateMachineErrorHandlingTest {
val runnable: Int = 0, val runnable: Int = 0,
val failed: Int = 0, val failed: Int = 0,
val completed: Int = 0, val completed: Int = 0,
val hospitalized: Int = 0 val hospitalized: Int = 0,
val killed: Int = 0
) )
// Internal use for testing only!! // Internal use for testing only!!

View File

@ -1056,4 +1056,158 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
charlie.rpc.assertHospitalCounts(discharged = 3) charlie.rpc.assertHospitalCounts(discharged = 3)
} }
} }
/**
* Throws an exception when calling [FlowStateMachineImpl.recordDuration] to cause an unexpected error during flow initialisation.
*
* The hospital has the flow's medical history updated with the new failure added to it. As the failure occurred before the original
* checkpoint was persisted, there is no checkpoint to update in the database.
*/
@Test(timeout = 300_000)
fun `unexpected error during flow initialisation that gets caught by default exception handler puts flow into in-memory overnight observation`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD openThreadLocalWormhole
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(
::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
)
}
Thread.sleep(10.seconds.toMillis())
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
// The flow failed during flow initialisation before committing the original checkpoint
// therefore there is no checkpoint to update the status of
alice.rpc.assertNumberOfCheckpoints(hospitalized = 0)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised.
*
* The hospital has the flow's medical history updated with the new failure added to it. The status of the checkpoint is also set to
* [Checkpoint.FlowStatus.HOSPITALIZED] to reflect this information in the database.
*/
@Test(timeout = 300_000)
fun `unexpected error after flow initialisation that gets caught by default exception handler puts flow into overnight observation and reflected in database`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(::ThrowAnErrorFlow).returnValue.getOrThrow(30.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised. When updating the status of the flow to [Checkpoint.FlowStatus.HOSPITALIZED] an error occurs.
*
* The update is rescheduled and tried again. This is done separate from the fiber.
*/
@Test(timeout = 300_000)
fun `unexpected error after flow initialisation that gets caught by default exception handler retries the status update if it fails`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception when updating status
INTERFACE ${CheckpointStorage::class.java.name}
METHOD updateStatus
AT ENTRY
IF readCounter("counter") < 2
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("should be a sql exception")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(::ThrowAnErrorFlow).returnValue.getOrThrow(50.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.recordDuration] to cause an unexpected error after a flow has returned its
* result to the client.
*
* As the flow has already returned its result to the client, then the status of the flow has already been updated correctly and now the
* flow has experienced an unexpected error. There is no need to change the status as the flow has already finished.
*/
@Test(timeout = 300_000)
fun `unexpected error after flow has returned result to client that gets caught by default exception handler does nothing except log`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD recordDuration
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints()
}
}
} }

View File

@ -697,6 +697,58 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
} }
} }
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised, placing the flow into a dead state.
*
* On shutdown this flow will still terminate correctly and not prevent the node from shutting down.
*/
@Suppress("TooGenericExceptionCaught")
@Test(timeout = 300_000)
fun `a dead flow can be shutdown`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Log that state machine has ended
CLASS $stateMachineManagerClassName
METHOD stop
AT EXIT
IF true
DO traceln("State machine shutdown")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(::ThrowAnErrorFlow).returnValue.getOrThrow(50.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
try {
// This actually shuts down the node
alice.rpc.shutdown()
} catch(e: Exception) {
// Exception gets thrown due to shutdown
}
Thread.sleep(30.seconds.toMillis())
alice.assertBytemanOutput("State machine shutdown", 1)
}
}
@StartableByRPC @StartableByRPC
class SleepCatchAndRethrowFlow : FlowLogic<String>() { class SleepCatchAndRethrowFlow : FlowLogic<String>() {
@Suspendable @Suspendable

View File

@ -5,6 +5,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.messaging.startTrackedFlow import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
@ -140,6 +141,168 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
} }
} }
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised, placing the flow into a dead state.
*
* The flow is then manually killed which triggers the flow to go through the normal kill flow process.
*/
@Test(timeout = 300_000)
fun `a dead flow can be killed`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val handle = alice.rpc.startFlow(::ThrowAnErrorFlow)
val id = handle.id
assertFailsWith<TimeoutException> {
handle.returnValue.getOrThrow(20.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
val killed = alice.rpc.killFlow(id)
assertTrue(killed)
Thread.sleep(20.seconds.toMillis())
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpointsAllZero()
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised, placing the flow into a dead state.
*
* The flow is then manually killed which triggers the flow to go through the normal kill flow process.
*
* Since the flow was started with a client id, record of the [KilledFlowException] should exists in the database.
*/
@Test(timeout = 300_000)
fun `a dead flow that was started with a client id can be killed`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val handle = alice.rpc.startFlowWithClientId("my id", ::ThrowAnErrorFlow)
val id = handle.id
assertFailsWith<TimeoutException> {
handle.returnValue.getOrThrow(20.seconds)
}
val (discharge, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
val killed = alice.rpc.killFlow(id)
assertTrue(killed)
Thread.sleep(20.seconds.toMillis())
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(killed = 1)
// Exception thrown by flow
assertFailsWith<KilledFlowException> {
alice.rpc.reattachFlowWithClientId<String>("my id")?.returnValue?.getOrThrow(20.seconds)
}
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.logFlowError] to cause an unexpected error after the flow has properly
* initialised, placing the flow into a dead state.
*
* The flow is then manually killed which triggers the flow to go through the normal kill flow process.
*/
@Test(timeout = 300_000)
fun `a dead flow that is killed and fails again will forcibly kill itself`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD logFlowError
AT ENTRY
IF readCounter("counter") == 0
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception 2
CLASS ${TransitionExecutorImpl::class.java.name}
METHOD executeTransition
AT ENTRY
IF readCounter("counter") == 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die again")
ENDRULE
RULE Log that removeFlow is called
CLASS $stateMachineManagerClassName
METHOD removeFlow
AT EXIT
IF true
DO traceln("removeFlow called")
ENDRULE
RULE Log that killFlowForcibly is called
CLASS $stateMachineManagerClassName
METHOD killFlowForcibly
AT EXIT
IF true
DO traceln("killFlowForcibly called")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val handle = alice.rpc.startFlow(::ThrowAnErrorFlow)
val id = handle.id
assertFailsWith<TimeoutException> {
handle.returnValue.getOrThrow(20.seconds)
}
assertTrue(alice.rpc.killFlow(id))
Thread.sleep(20.seconds.toMillis())
alice.assertBytemanOutput("removeFlow called", 1)
alice.assertBytemanOutput("killFlowForcibly called", 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpointsAllZero()
}
}
@StartableByRPC @StartableByRPC
class SleepFlow : FlowLogic<Unit>() { class SleepFlow : FlowLogic<Unit>() {

View File

@ -92,6 +92,7 @@ class FlowCreator(
lock: Semaphore = Semaphore(1), lock: Semaphore = Semaphore(1),
resultFuture: OpenFuture<Any?> = openFuture(), resultFuture: OpenFuture<Any?> = openFuture(),
firstRestore: Boolean = true, firstRestore: Boolean = true,
isKilled: Boolean = false,
progressTracker: ProgressTracker? = null progressTracker: ProgressTracker? = null
): Flow<*>? { ): Flow<*>? {
val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore) val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore)
@ -116,7 +117,8 @@ class FlowCreator(
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount
?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null, ?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null,
numberOfCommits = checkpoint.checkpointState.numberOfCommits, numberOfCommits = checkpoint.checkpointState.numberOfCommits,
lock = lock lock = lock,
isKilled = isKilled
) )
injectOldProgressTracker(progressTracker, fiber.logic) injectOldProgressTracker(progressTracker, fiber.logic)
return Flow(fiber, resultFuture) return Flow(fiber, resultFuture)
@ -248,7 +250,8 @@ class FlowCreator(
numberOfCommits: Int, numberOfCommits: Int,
lock: Semaphore, lock: Semaphore,
deduplicationHandler: DeduplicationHandler? = null, deduplicationHandler: DeduplicationHandler? = null,
senderUUID: String? = null senderUUID: String? = null,
isKilled: Boolean = false
): StateMachineState { ): StateMachineState {
return StateMachineState( return StateMachineState(
checkpoint = checkpoint, checkpoint = checkpoint,
@ -259,7 +262,8 @@ class FlowCreator(
isAnyCheckpointPersisted = anyCheckpointPersisted, isAnyCheckpointPersisted = anyCheckpointPersisted,
isStartIdempotent = false, isStartIdempotent = false,
isRemoved = false, isRemoved = false,
isKilled = false, isKilled = isKilled,
isDead = false,
flowLogic = fiber.logic, flowLogic = fiber.logic,
senderUUID = senderUUID, senderUUID = senderUUID,
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount, reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount,

View File

@ -10,7 +10,9 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
class FlowDefaultUncaughtExceptionHandler( internal class FlowDefaultUncaughtExceptionHandler(
private val smm: StateMachineManagerInternal,
private val innerState: StateMachineInnerState,
private val flowHospital: StaffedFlowHospital, private val flowHospital: StaffedFlowHospital,
private val checkpointStorage: CheckpointStorage, private val checkpointStorage: CheckpointStorage,
private val database: CordaPersistence, private val database: CordaPersistence,
@ -31,7 +33,19 @@ class FlowDefaultUncaughtExceptionHandler(
) )
} else { } else {
fiber.logger.warn("Caught exception from flow $id", throwable) fiber.logger.warn("Caught exception from flow $id", throwable)
setFlowToHospitalized(fiber, throwable) if (fiber.isKilled) {
// If the flow was already killed and it has reached this exception handler then the flow must be killed forcibly to
// ensure it terminates. This could lead to sessions related to the flow not terminating as errors might not have been
// propagated to them.
smm.killFlowForcibly(id)
} else {
innerState.withLock {
setFlowToHospitalized(fiber, throwable)
// This flow has died and cannot continue to run as normal. Mark is as dead so that it can be handled directly by
// retry, kill and shutdown operations.
fiber.transientState = fiber.transientState.copy(isDead = true)
}
}
} }
} }
@ -52,9 +66,13 @@ class FlowDefaultUncaughtExceptionHandler(
@Suppress("TooGenericExceptionCaught") @Suppress("TooGenericExceptionCaught")
private fun setFlowToHospitalizedRescheduleOnFailure(id: StateMachineRunId) { private fun setFlowToHospitalizedRescheduleOnFailure(id: StateMachineRunId) {
try { try {
log.debug { "Updating the status of flow $id to hospitalized after uncaught exception" } innerState.withLock {
database.transaction { checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.HOSPITALIZED) } if (flows[id]?.fiber?.transientState?.isDead == true) {
log.debug { "Updated the status of flow $id to hospitalized after uncaught exception" } log.debug { "Updating the status of flow $id to hospitalized after uncaught exception" }
database.transaction { checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.HOSPITALIZED) }
log.debug { "Updated the status of flow $id to hospitalized after uncaught exception" }
}
}
} catch (e: Exception) { } catch (e: Exception) {
log.info("Failed to update the status of flow $id to hospitalized after uncaught exception, rescheduling", e) log.info("Failed to update the status of flow $id to hospitalized after uncaught exception, rescheduling", e)
scheduledExecutor.schedule({ setFlowToHospitalizedRescheduleOnFailure(id) }, RESCHEDULE_DELAY, TimeUnit.SECONDS) scheduledExecutor.schedule({ setFlowToHospitalizedRescheduleOnFailure(id) }, RESCHEDULE_DELAY, TimeUnit.SECONDS)

View File

@ -33,6 +33,7 @@ import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.core.utilities.minutes
import net.corda.node.internal.InitiatedFlowFactory import net.corda.node.internal.InitiatedFlowFactory
import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.api.ServiceHubInternal
@ -242,6 +243,8 @@ internal class SingleThreadedStateMachineManager(
private fun setFlowDefaultUncaughtExceptionHandler() { private fun setFlowDefaultUncaughtExceptionHandler() {
Fiber.setDefaultUncaughtExceptionHandler( Fiber.setDefaultUncaughtExceptionHandler(
FlowDefaultUncaughtExceptionHandler( FlowDefaultUncaughtExceptionHandler(
this,
innerState,
flowHospital, flowHospital,
checkpointStorage, checkpointStorage,
database, database,
@ -272,17 +275,40 @@ internal class SingleThreadedStateMachineManager(
if (stopping) throw IllegalStateException("Already stopping!") if (stopping) throw IllegalStateException("Already stopping!")
stopping = true stopping = true
for ((_, flow) in flows) { for ((_, flow) in flows) {
flow.fiber.scheduleEvent(Event.SoftShutdown) if (!flow.fiber.transientState.isDead) {
flow.fiber.scheduleEvent(Event.SoftShutdown)
}
} }
} }
// Account for any expected Fibers in a test scenario. // Account for any expected Fibers in a test scenario.
liveFibers.countDown(allowedUnsuspendedFiberCount) liveFibers.countDown(allowedUnsuspendedFiberCount)
liveFibers.await() awaitShutdownOfFlows()
flowHospital.close() flowHospital.close()
scheduledFutureExecutor.shutdown() scheduledFutureExecutor.shutdown()
scheduler.shutdown() scheduler.shutdown()
} }
private fun awaitShutdownOfFlows() {
val shutdownLogger = StateMachineShutdownLogger(innerState)
var shutdown: Boolean
do {
// Manually shutdown dead flows as they can no longer process scheduled events.
// This needs to be repeated in this loop to prevent flows that die after shutdown is triggered from being forgotten.
// The mutex is not enough protection to stop race-conditions here, the removal of dead flows has to be repeated.
innerState.withMutex {
for ((id, flow) in flows) {
if (flow.fiber.transientState.isDead) {
removeFlow(id, FlowRemovalReason.SoftShutdown, flow.fiber.transientState)
}
}
}
shutdown = liveFibers.await(1.minutes.toMillis())
if (!shutdown) {
shutdownLogger.log()
}
} while (!shutdown)
}
/** /**
* Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to [changes] and * Atomic get snapshot + subscribe. This is needed so we don't miss updates between subscriptions to [changes] and
* calls to [allStateMachines] * calls to [allStateMachines]
@ -365,31 +391,29 @@ internal class SingleThreadedStateMachineManager(
override fun killFlow(id: StateMachineRunId): Boolean { override fun killFlow(id: StateMachineRunId): Boolean {
val flow = innerState.withLock { flows[id] } val flow = innerState.withLock { flows[id] }
val killFlowResult = flow?.let { killInMemoryFlow(it) } ?: killOutOfMemoryFlow(id) val killFlowResult = flow?.let {
if (flow.fiber.transientState.isDead) {
// We cannot rely on fiber event processing in dead flows.
killInMemoryDeadFlow(it)
} else {
// Healthy flows need an event in case they they are suspended.
killInMemoryFlow(it)
}
} ?: killOutOfMemoryFlow(id)
return killFlowResult || flowHospital.dropSessionInit(id) return killFlowResult || flowHospital.dropSessionInit(id)
} }
private fun killInMemoryFlow(flow: Flow<*>): Boolean { private fun killInMemoryFlow(flow: Flow<*>): Boolean {
val id = flow.fiber.id val id = flow.fiber.id
return flow.withFlowLock(VALID_KILL_FLOW_STATUSES) { return flow.withFlowLock(VALID_KILL_FLOW_STATUSES) {
if (!flow.fiber.transientState.isKilled) { if (!transientState.isKilled) {
flow.fiber.transientState = flow.fiber.transientState.copy(isKilled = true) transientState = transientState.copy(isKilled = true)
logger.info("Killing flow $id known to this node.") logger.info("Killing flow $id known to this node.")
// The checkpoint and soft locks are handled here as well as in a flow's transition. This means that we do not need to rely updateCheckpointWhenKillingFlow(
// on the processing of the next event after setting the killed flag. This is to ensure a flow can be updated/removed from id = id,
// the database, even if it is stuck in a infinite loop. clientId = transientState.checkpoint.checkpointState.invocationContext.clientId,
if (flow.fiber.transientState.isAnyCheckpointPersisted) { isAnyCheckpointPersisted = transientState.isAnyCheckpointPersisted
database.transaction { )
if (flow.fiber.clientId != null) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.KILLED)
checkpointStorage.removeFlowException(id)
checkpointStorage.addFlowException(id, KilledFlowException(id))
} else {
checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true)
}
serviceHub.vaultService.softLockRelease(id.uuid)
}
}
unfinishedFibers.countDown() unfinishedFibers.countDown()
scheduleEvent(Event.DoRemainingWork) scheduleEvent(Event.DoRemainingWork)
@ -401,6 +425,67 @@ internal class SingleThreadedStateMachineManager(
} }
} }
private fun killInMemoryDeadFlow(flow: Flow<*>): Boolean {
val id = flow.fiber.id
return flow.withFlowLock(VALID_KILL_FLOW_STATUSES) {
if (!transientState.isKilled) {
transientState = transientState.copy(isKilled = true)
logger.info("Killing dead flow $id known to this node.")
val (flowForRetry, _) = createNewFlowForRetry(transientState) ?: return false
updateCheckpointWhenKillingFlow(
id = id,
clientId = transientState.checkpoint.checkpointState.invocationContext.clientId,
isAnyCheckpointPersisted = transientState.isAnyCheckpointPersisted
)
unfinishedFibers.countDown()
innerState.withLock {
if (stopping) {
return true
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(transientState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flowForRetry != null) {
addAndStartFlow(id, flowForRetry)
}
}
true
} else {
logger.info("A repeated request to kill flow $id has been made, ignoring...")
false
}
}
}
private fun updateCheckpointWhenKillingFlow(
id: StateMachineRunId,
clientId: String?,
isAnyCheckpointPersisted: Boolean,
exception: KilledFlowException = KilledFlowException(id)
) {
// The checkpoint and soft locks are handled here as well as in a flow's transition. This means that we do not need to rely
// on the processing of the next event after setting the killed flag. This is to ensure a flow can be updated/removed from
// the database, even if it is stuck in a infinite loop or cannot be run (checkpoint cannot be deserialized from database).
if (isAnyCheckpointPersisted) {
database.transaction {
if (clientId != null) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.KILLED)
checkpointStorage.removeFlowException(id)
checkpointStorage.addFlowException(id, exception)
} else {
checkpointStorage.removeCheckpoint(id, mayHavePersistentResults = true)
}
serviceHub.vaultService.softLockRelease(id.uuid)
}
}
}
private fun killOutOfMemoryFlow(id: StateMachineRunId): Boolean { private fun killOutOfMemoryFlow(id: StateMachineRunId): Boolean {
return database.transaction { return database.transaction {
val checkpoint = checkpointStorage.getCheckpoint(id) val checkpoint = checkpointStorage.getCheckpoint(id)
@ -423,6 +508,25 @@ internal class SingleThreadedStateMachineManager(
} }
} }
override fun killFlowForcibly(flowId: StateMachineRunId): Boolean {
val flow = innerState.withLock { flows[flowId] }
flow?.withFlowLock(VALID_KILL_FLOW_STATUSES) {
logger.info("Forcibly killing flow $flowId, errors will not be propagated to the flow's sessions")
updateCheckpointWhenKillingFlow(
id = flowId,
clientId = transientState.checkpoint.checkpointState.invocationContext.clientId,
isAnyCheckpointPersisted = transientState.isAnyCheckpointPersisted
)
removeFlow(
flowId,
FlowRemovalReason.ErrorFinish(listOf(FlowError(secureRandom.nextLong(), KilledFlowException(flowId)))),
transientState
)
return true
}
return false
}
private fun markAllFlowsAsPaused() { private fun markAllFlowsAsPaused() {
return checkpointStorage.markAllPaused() return checkpointStorage.markAllPaused()
} }
@ -540,48 +644,8 @@ internal class SingleThreadedStateMachineManager(
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.") logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return return
} }
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
// Ignore [isAnyCheckpointPersisted] as the checkpoint could be committed but the flag remains un-updated
val checkpointLoadingStatus = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) ?: return@transaction CheckpointLoadingStatus.NotFound
val checkpoint = serializedCheckpoint.let { val (flow, numberOfCommitsFromCheckpoint) = createNewFlowForRetry(currentState) ?: return
tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
checkpointStorage.removeFlowException(flowId)
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
}
} ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize
}
CheckpointLoadingStatus.Success(checkpoint)
}
val (flow, numberOfCommitsFromCheckpoint) = when {
// Resurrect flow
checkpointLoadingStatus is CheckpointLoadingStatus.Success -> {
val numberOfCommitsFromCheckpoint = checkpointLoadingStatus.checkpoint.checkpointState.numberOfCommits
val flow = flowCreator.createFlowFromCheckpoint(
flowId,
checkpointLoadingStatus.checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
currentState.lock,
firstRestore = false,
progressTracker = currentState.flowLogic.progressTracker
) ?: return
flow to numberOfCommitsFromCheckpoint
}
checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return
else -> {
// Just flow initiation message
null to -1
}
}
innerState.withLock { innerState.withLock {
if (stopping) { if (stopping) {
@ -599,6 +663,53 @@ internal class SingleThreadedStateMachineManager(
} }
} }
private fun createNewFlowForRetry(currentState: StateMachineState): Pair<Flow<*>?, Int>? {
val id = currentState.flowLogic.runId
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
// Ignore [isAnyCheckpointPersisted] as the checkpoint could be committed but the flag remains un-updated
val checkpointLoadingStatus = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(id) ?: return@transaction CheckpointLoadingStatus.NotFound
val checkpoint = serializedCheckpoint.let {
tryDeserializeCheckpoint(serializedCheckpoint, id)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
checkpointStorage.removeFlowException(id)
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
}
} ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize
}
CheckpointLoadingStatus.Success(checkpoint)
}
return when {
// Resurrect flow
checkpointLoadingStatus is CheckpointLoadingStatus.Success -> {
val numberOfCommitsFromCheckpoint = checkpointLoadingStatus.checkpoint.checkpointState.numberOfCommits
val flow = flowCreator.createFlowFromCheckpoint(
id,
checkpointLoadingStatus.checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
currentState.lock,
firstRestore = false,
isKilled = currentState.isKilled,
progressTracker = currentState.flowLogic.progressTracker
) ?: return null
flow to numberOfCommitsFromCheckpoint
}
checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> {
logger.error("Unable to find database checkpoint for flow $id. Something is very wrong. The flow will not retry.")
null
}
checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return null
else -> {
// Just flow initiation message
null to -1
}
}
}
/** /**
* Extract all the [ExternalEvent] from this flows event queue and queue them (in the correct order) in the PausedFlow. * Extract all the [ExternalEvent] from this flows event queue and queue them (in the correct order) in the PausedFlow.
* This differs from [extractAndScheduleEventsForRetry] which also extracts (and schedules) [Event.Pause]. This means that if there are * This differs from [extractAndScheduleEventsForRetry] which also extracts (and schedules) [Event.Pause]. This means that if there are

View File

@ -142,6 +142,7 @@ internal interface StateMachineManagerInternal {
fun retryFlowFromSafePoint(currentState: StateMachineState) fun retryFlowFromSafePoint(currentState: StateMachineState)
fun scheduleFlowTimeout(flowId: StateMachineRunId) fun scheduleFlowTimeout(flowId: StateMachineRunId)
fun cancelFlowTimeout(flowId: StateMachineRunId) fun cancelFlowTimeout(flowId: StateMachineRunId)
fun killFlowForcibly(flowId: StateMachineRunId): Boolean
} }
/** /**

View File

@ -0,0 +1,54 @@
package net.corda.node.services.statemachine
import net.corda.core.utilities.contextLogger
internal class StateMachineShutdownLogger(private val innerState: StateMachineInnerState) {
private companion object {
val log = contextLogger()
}
fun log() {
innerState.withLock {
val message = StringBuilder("Shutdown of the state machine is blocked.\n")
val deadFlowMessage = StringBuilder()
if (flows.isNotEmpty()) {
message.append("The following live flows have not shutdown:\n")
for ((id, flow) in flows) {
val state = flow.fiber.transientState
val line = " - $id with properties " +
"[Status: ${state.checkpoint.status}, " +
"IO request: ${state.checkpoint.flowIoRequest ?: "Unstarted"}, " +
"Suspended: ${!state.isFlowResumed}, " +
"Last checkpoint timestamp: ${state.checkpoint.timestamp}, " +
"Killed: ${state.isKilled}]\n"
if (!state.isDead) {
message.append(line)
} else {
deadFlowMessage.append(line)
}
}
}
if (pausedFlows.isNotEmpty()) {
message.append("The following paused flows have not shutdown:\n")
for ((id, flow) in pausedFlows) {
message.append(
" - $id with properties " +
"[Status: ${flow.checkpoint.status}, " +
"IO request: ${flow.checkpoint.flowIoRequest ?: "Unstarted"}, " +
"Last checkpoint timestamp: ${flow.checkpoint.timestamp}, " +
"Resumable: ${flow.resumable}, " +
"Hospitalized: ${flow.hospitalized}]\n"
)
}
}
if (deadFlowMessage.isNotEmpty()) {
deadFlowMessage.insert(0, "The following dead (crashed) flows have not shutdown:\n")
message.append(deadFlowMessage)
}
message.append("Manual intervention maybe be required for state machine shutdown due to these flows.\n")
message.append("Continuing state machine shutdown loop...")
log.info(message.toString())
}
}
}

View File

@ -47,6 +47,8 @@ import java.util.concurrent.Semaphore
* work. * work.
* @param isKilled true if the flow has been marked as killed. This is used to cause a flow to move to a killed flow transition no matter * @param isKilled true if the flow has been marked as killed. This is used to cause a flow to move to a killed flow transition no matter
* what event it is set to process next. * what event it is set to process next.
* @param isDead true if the flow has been marked as dead. This happens when a flow experiences an unexpected error and escapes its event loop
* which prevents it from processing events.
* @param senderUUID the identifier of the sending state machine or null if this flow is resumed from a checkpoint so that it does not participate in de-duplication high-water-marking. * @param senderUUID the identifier of the sending state machine or null if this flow is resumed from a checkpoint so that it does not participate in de-duplication high-water-marking.
* @param reloadCheckpointAfterSuspendCount The number of times a flow has been reloaded (not retried). This is [null] when * @param reloadCheckpointAfterSuspendCount The number of times a flow has been reloaded (not retried). This is [null] when
* [NodeConfiguration.reloadCheckpointAfterSuspendCount] is not enabled. * [NodeConfiguration.reloadCheckpointAfterSuspendCount] is not enabled.
@ -68,6 +70,7 @@ data class StateMachineState(
val isStartIdempotent: Boolean, val isStartIdempotent: Boolean,
val isRemoved: Boolean, val isRemoved: Boolean,
val isKilled: Boolean, val isKilled: Boolean,
val isDead: Boolean,
val senderUUID: String?, val senderUUID: String?,
val reloadCheckpointAfterSuspendCount: Int?, val reloadCheckpointAfterSuspendCount: Int?,
var numberOfCommits: Int, var numberOfCommits: Int,
@ -278,7 +281,7 @@ sealed class SessionState {
* @property rejectionError if non-null the initiation failed. * @property rejectionError if non-null the initiation failed.
*/ */
data class Initiating( data class Initiating(
val bufferedMessages: List<Pair<DeduplicationId, ExistingSessionMessagePayload>>, val bufferedMessages: ArrayList<Pair<DeduplicationId, ExistingSessionMessagePayload>>,
val rejectionError: FlowError?, val rejectionError: FlowError?,
override val deduplicationSeed: String override val deduplicationSeed: String
) : SessionState() ) : SessionState()
@ -295,7 +298,7 @@ sealed class SessionState {
data class Initiated( data class Initiated(
val peerParty: Party, val peerParty: Party,
val peerFlowInfo: FlowInfo, val peerFlowInfo: FlowInfo,
val receivedMessages: List<ExistingSessionMessagePayload>, val receivedMessages: ArrayList<ExistingSessionMessagePayload>,
val otherSideErrored: Boolean, val otherSideErrored: Boolean,
val peerSinkSessionId: SessionId, val peerSinkSessionId: SessionId,
override val deduplicationSeed: String override val deduplicationSeed: String

View File

@ -87,7 +87,7 @@ class DeliverSessionMessageTransition(
val initiatedSession = SessionState.Initiated( val initiatedSession = SessionState.Initiated(
peerParty = event.sender, peerParty = event.sender,
peerFlowInfo = message.initiatedFlowInfo, peerFlowInfo = message.initiatedFlowInfo,
receivedMessages = emptyList(), receivedMessages = arrayListOf(),
peerSinkSessionId = message.initiatedSessionId, peerSinkSessionId = message.initiatedSessionId,
deduplicationSeed = sessionState.deduplicationSeed, deduplicationSeed = sessionState.deduplicationSeed,
otherSideErrored = false otherSideErrored = false

View File

@ -121,9 +121,9 @@ class ErrorFlowTransition(
if (sessionState is SessionState.Initiating && sessionState.rejectionError == null) { if (sessionState is SessionState.Initiating && sessionState.rejectionError == null) {
// *prepend* the error messages in order to error the other sessions ASAP. The other messages will // *prepend* the error messages in order to error the other sessions ASAP. The other messages will
// be delivered all the same, they just won't trigger flow resumption because of dirtiness. // be delivered all the same, they just won't trigger flow resumption because of dirtiness.
val errorMessagesWithDeduplication = errorMessages.map { val errorMessagesWithDeduplication: ArrayList<Pair<DeduplicationId, ExistingSessionMessagePayload>> = errorMessages.map {
DeduplicationId.createForError(it.errorId, sourceSessionId) to it DeduplicationId.createForError(it.errorId, sourceSessionId) to it
} }.toArrayList()
sessionState.copy(bufferedMessages = errorMessagesWithDeduplication + sessionState.bufferedMessages) sessionState.copy(bufferedMessages = errorMessagesWithDeduplication + sessionState.bufferedMessages)
} else { } else {
sessionState sessionState

View File

@ -7,12 +7,14 @@ import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ErrorSessionMessage import net.corda.node.services.statemachine.ErrorSessionMessage
import net.corda.node.services.statemachine.Event import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.ExistingSessionMessagePayload
import net.corda.node.services.statemachine.FlowError import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.FlowRemovalReason import net.corda.node.services.statemachine.FlowRemovalReason
import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.SessionId import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionState import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState import net.corda.node.services.statemachine.StateMachineState
import java.util.ArrayList
class KilledFlowTransition( class KilledFlowTransition(
override val context: TransitionContext, override val context: TransitionContext,
@ -101,9 +103,9 @@ class KilledFlowTransition(
if (sessionState is SessionState.Initiating && sessionState.rejectionError == null) { if (sessionState is SessionState.Initiating && sessionState.rejectionError == null) {
// *prepend* the error messages in order to error the other sessions ASAP. The other messages will // *prepend* the error messages in order to error the other sessions ASAP. The other messages will
// be delivered all the same, they just won't trigger flow resumption because of dirtiness. // be delivered all the same, they just won't trigger flow resumption because of dirtiness.
val errorMessagesWithDeduplication = errorMessages.map { val errorMessagesWithDeduplication: ArrayList<Pair<DeduplicationId, ExistingSessionMessagePayload>> = errorMessages.map {
DeduplicationId.createForError(it.errorId, sourceSessionId) to it DeduplicationId.createForError(it.errorId, sourceSessionId) to it
} }.toArrayList()
sessionState.copy(bufferedMessages = errorMessagesWithDeduplication + sessionState.bufferedMessages) sessionState.copy(bufferedMessages = errorMessagesWithDeduplication + sessionState.bufferedMessages)
} else { } else {
sessionState sessionState

View File

@ -250,7 +250,7 @@ class StartedFlowTransition(
if (messages.isEmpty()) { if (messages.isEmpty()) {
someNotFound = true someNotFound = true
} else { } else {
newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toList()) newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toArrayList())
// at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message. // at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message.
resultMessages[sessionId] = if (messages[0] is EndSessionMessage) { resultMessages[sessionId] = if (messages[0] is EndSessionMessage) {
throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?") throw UnexpectedFlowEndException("Received session end message instead of a data session message. Mismatched send and receive?")
@ -285,7 +285,7 @@ class StartedFlowTransition(
} }
val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, sessionState.additionalEntropy, null) val initialMessage = createInitialSessionMessage(sessionState.initiatingSubFlow, sourceSessionId, sessionState.additionalEntropy, null)
val newSessionState = SessionState.Initiating( val newSessionState = SessionState.Initiating(
bufferedMessages = emptyList(), bufferedMessages = arrayListOf(),
rejectionError = null, rejectionError = null,
deduplicationSeed = sessionState.deduplicationSeed deduplicationSeed = sessionState.deduplicationSeed
) )
@ -324,7 +324,7 @@ class StartedFlowTransition(
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, sessionState) val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, sessionState)
val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, uninitiatedSessionState.additionalEntropy, message) val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, uninitiatedSessionState.additionalEntropy, message)
newSessions[sourceSessionId] = SessionState.Initiating( newSessions[sourceSessionId] = SessionState.Initiating(
bufferedMessages = emptyList(), bufferedMessages = arrayListOf(),
rejectionError = null, rejectionError = null,
deduplicationSeed = uninitiatedSessionState.deduplicationSeed deduplicationSeed = uninitiatedSessionState.deduplicationSeed
) )
@ -375,7 +375,10 @@ class StartedFlowTransition(
if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is ErrorSessionMessage) { if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is ErrorSessionMessage) {
val errorMessage = sessionState.receivedMessages.first() as ErrorSessionMessage val errorMessage = sessionState.receivedMessages.first() as ErrorSessionMessage
val exception = convertErrorMessageToException(errorMessage, sessionState.peerParty) val exception = convertErrorMessageToException(errorMessage, sessionState.peerParty)
val newSessionState = sessionState.copy(receivedMessages = sessionState.receivedMessages.subList(1, sessionState.receivedMessages.size), otherSideErrored = true) val newSessionState = sessionState.copy(
receivedMessages = sessionState.receivedMessages.subList(1, sessionState.receivedMessages.size).toArrayList(),
otherSideErrored = true
)
val newCheckpoint = startingState.checkpoint.addSession(sessionId to newSessionState) val newCheckpoint = startingState.checkpoint.addSession(sessionId to newSessionState)
newState = startingState.copy(checkpoint = newCheckpoint) newState = startingState.copy(checkpoint = newCheckpoint)
listOf(exception) listOf(exception)

View File

@ -24,6 +24,37 @@ interface Transition {
val continuation = build(builder) val continuation = build(builder)
return TransitionResult(builder.currentState, builder.actions, continuation) return TransitionResult(builder.currentState, builder.actions, continuation)
} }
/**
* Add [element] to the [ArrayList] and return the list.
*
* Copy of [List.plus] that returns an [ArrayList] instead.
*/
operator fun <T> ArrayList<T>.plus(element: T) : ArrayList<T> {
val result = ArrayList<T>(size + 1)
result.addAll(this)
result.add(element)
return result
}
/**
* Add [elements] to the [ArrayList] and return the list.
*
* Copy of [List.plus] that returns an [ArrayList] instead.
*/
operator fun <T> ArrayList<T>.plus(elements: Collection<T>) : ArrayList<T> {
val result = ArrayList<T>(this.size + elements.size)
result.addAll(this)
result.addAll(elements)
return result
}
/**
* Convert the [List] into an [ArrayList].
*/
fun <T> List<T>.toArrayList() : ArrayList<T> {
return ArrayList(this)
}
} }
class TransitionContext( class TransitionContext(

View File

@ -6,6 +6,7 @@ import net.corda.node.services.statemachine.ConfirmSessionMessage
import net.corda.node.services.statemachine.DataSessionMessage import net.corda.node.services.statemachine.DataSessionMessage
import net.corda.node.services.statemachine.DeduplicationId import net.corda.node.services.statemachine.DeduplicationId
import net.corda.node.services.statemachine.ExistingSessionMessage import net.corda.node.services.statemachine.ExistingSessionMessage
import net.corda.node.services.statemachine.ExistingSessionMessagePayload
import net.corda.node.services.statemachine.FlowStart import net.corda.node.services.statemachine.FlowStart
import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.SenderDeduplicationId import net.corda.node.services.statemachine.SenderDeduplicationId
@ -50,9 +51,9 @@ class UnstartedFlowTransition(
appName = initiatingMessage.appName appName = initiatingMessage.appName
), ),
receivedMessages = if (initiatingMessage.firstPayload == null) { receivedMessages = if (initiatingMessage.firstPayload == null) {
emptyList() arrayListOf()
} else { } else {
listOf(DataSessionMessage(initiatingMessage.firstPayload)) arrayListOf<ExistingSessionMessagePayload>(DataSessionMessage(initiatingMessage.firstPayload))
}, },
deduplicationSeed = "D-${initiatingMessage.initiatorSessionId.toLong}-${initiatingMessage.initiationEntropy}", deduplicationSeed = "D-${initiatingMessage.initiatorSessionId.toLong}-${initiatingMessage.initiationEntropy}",
otherSideErrored = false otherSideErrored = false