Merge pull request #6451 from corda/dan/os-4.5-to-4.6-merge-2020-07-08

NOTICK OS 4.5 to 4.6 merge 2020-07-08
This commit is contained in:
Dan Newton 2020-07-09 14:02:22 +01:00 committed by GitHub
commit 52240dc3d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 2184 additions and 2992 deletions

View File

@ -8,13 +8,26 @@ killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
*/
boolean isReleaseTag = (env.TAG_NAME =~ /^release.*JDK11$/)
/*
** calculate the stage for NexusIQ evaluation
** * build for snapshots
** * stage-release: for release candidates and for health checks
** * operate: for final release
*/
def nexusIqStage = "build"
if (isReleaseTag) {
switch (env.TAG_NAME) {
case ~/.*-RC\d+(-.*)?/: nexusIqStage = "stage-release"; break;
case ~/.*-HC\d+(-.*)?/: nexusIqStage = "stage-release"; break;
default: nexusIqStage = "operate"
}
}
pipeline {
agent {
label 'k8s'
}
options {
timestamps()
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
timeout(time: 3, unit: 'HOURS')
}
@ -27,6 +40,25 @@ pipeline {
}
stages {
stage('Sonatype Check') {
steps {
sh "./gradlew --no-daemon clean jar"
script {
sh "./gradlew --no-daemon properties | grep -E '^(version|group):' >version-properties"
def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: //'").trim()
def groupId = sh (returnStdout: true, script: "grep ^group: version-properties | sed -e 's/^group: //'").trim()
def artifactId = 'corda'
nexusAppId = "jenkins-${groupId}-${artifactId}-jdk11-${version}"
}
nexusPolicyEvaluation (
failBuildOnNetworkError: false,
iqApplication: manualApplication(nexusAppId),
iqScanPatterns: [[scanPattern: 'node/capsule/build/libs/corda*.jar']],
iqStage: nexusIqStage
)
}
}
stage('Generate Build Image') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {

View File

@ -1,62 +0,0 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
@Library('corda-shared-build-pipeline-steps')
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'k8s' }
options {
timestamps()
timeout(time: 3, unit: 'HOURS')
}
environment {
DOCKER_TAG_TO_USE = "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
}
stages {
stage('Corda - Generate Build Image') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-Dkubenetize=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage"
}
sh "kubectl auth can-i get pods"
}
}
stage('Corda - Run Tests') {
stage('Integration Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelIntegrationTest"
if (env.CHANGE_ID) {
pullRequest.createStatus(status: 'success',
context: 'continuous-integration/jenkins/pr-merge/integrationTest',
description: 'Integration Tests Passed',
targetUrl: "${env.JOB_URL}/testResults")
}
}
}
}
}
post {
always {
junit testResults: '**/build/test-results-xml/**/*.xml', keepLongStdio: true
}
cleanup {
deleteDir() /* clean up our workspace */
}
}
}

View File

@ -8,7 +8,6 @@ pipeline {
options {
timestamps()
overrideIndexTriggers(false)
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
timeout(time: 3, unit: 'HOURS')
}
triggers {

View File

@ -11,7 +11,6 @@ pipeline {
timestamps()
ansiColor('xterm')
overrideIndexTriggers(false)
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
timeout(time: 3, unit: 'HOURS')
}
@ -24,6 +23,7 @@ pipeline {
// in the name
ARTIFACTORY_BUILD_NAME = "Corda / Publish / Publish Nightly to Artifactory"
.replaceAll("/", " :: ")
DOCKER_URL = "https://index.docker.io/v1/"
}
stages {
@ -58,6 +58,17 @@ pipeline {
)
}
}
stage('Publish Nightly to Docker Hub') {
steps {
withCredentials([
usernamePassword(credentialsId: 'corda-publisher-docker-hub-credentials',
usernameVariable: 'DOCKER_USERNAME',
passwordVariable: 'DOCKER_PASSWORD')]) {
sh "./gradlew pushOfficialImages"
}
}
}
}

View File

@ -11,7 +11,6 @@ pipeline {
timestamps()
ansiColor('xterm')
overrideIndexTriggers(false)
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
timeout(time: 3, unit: 'HOURS')
}

View File

@ -37,13 +37,13 @@ pipeline {
agent { label 'k8s' }
options {
timestamps()
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
disableConcurrentBuilds()
timeout(time: 3, unit: 'HOURS')
}
environment {
DOCKER_TAG_TO_USE = "${env.GIT_COMMIT.subSequence(0, 8)}"
DOCKER_URL = "https://index.docker.io/v1/"
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
ARTIFACTORY_CREDENTIALS = credentials('artifactory-credentials')
@ -150,6 +150,20 @@ pipeline {
)
}
}
stage('Publish Release to Docker Hub') {
when {
expression { isReleaseTag }
}
steps {
withCredentials([
usernamePassword(credentialsId: 'corda-publisher-docker-hub-credentials',
usernameVariable: 'DOCKER_USERNAME',
passwordVariable: 'DOCKER_PASSWORD')]) {
sh "./gradlew pushOfficialImages"
}
}
}
}

View File

@ -1,60 +0,0 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
@Library('corda-shared-build-pipeline-steps')
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'k8s' }
options {
timestamps()
timeout(time: 3, unit: 'HOURS')
}
environment {
DOCKER_TAG_TO_USE = "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
}
stages {
stage('Corda Pull Request - Generate Build Image') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-Dkubenetize=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage"
}
sh "kubectl auth can-i get pods"
}
}
stage('Unit Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelUnitTest"
if (env.CHANGE_ID) {
pullRequest.createStatus(status: 'success',
context: 'continuous-integration/jenkins/pr-merge/unitTest',
description: 'Unit Tests Passed',
targetUrl: "${env.JOB_URL}/testResults")
}
}
}
}
post {
always {
junit testResults: '**/build/test-results-xml/**/*.xml', keepLongStdio: true
}
cleanup {
deleteDir() /* clean up our workspace */
}
}
}

View File

@ -0,0 +1,2 @@
## corda-core-deterministic.
This artifact is a deterministic subset of the binary contents of `corda-core`.

View File

@ -207,10 +207,18 @@ artifacts {
publish file: deterministicJar, name: jarBaseName, type: 'jar', extension: 'jar', builtBy: metafix
}
tasks.named('sourceJar', Jar) {
from 'README.md'
include 'README.md'
}
tasks.named('javadocJar', Jar) {
from 'README.md'
include 'README.md'
}
publish {
dependenciesFrom configurations.deterministicArtifacts
publishSources = false
publishJavadoc = false
name jarBaseName
}

View File

@ -6,6 +6,7 @@ import org.hibernate.Session
import org.hibernate.Transaction
import rx.subjects.PublishSubject
import java.sql.Connection
import java.sql.SQLException
import java.util.UUID
import javax.persistence.EntityManager
@ -87,6 +88,7 @@ class DatabaseTransaction(
committed = true
}
@Throws(SQLException::class)
fun rollback() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.clear()
@ -97,16 +99,20 @@ class DatabaseTransaction(
clearException()
}
@Throws(SQLException::class)
fun close() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.close()
try {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.close()
}
if (database.closeConnection) {
connection.close()
}
} finally {
clearException()
contextTransactionOrNull = outerTransaction
}
if (database.closeConnection) {
connection.close()
}
clearException()
contextTransactionOrNull = outerTransaction
if (outerTransaction == null) {
synchronized(this) {
closed = true

View File

@ -0,0 +1,288 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.list
import net.corda.core.internal.readAllLines
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.OutOfProcessImpl
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
import net.corda.testing.node.internal.InternalDriverDSL
import org.jboss.byteman.agent.submit.ScriptText
import org.jboss.byteman.agent.submit.Submit
import org.junit.Before
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
abstract class StateMachineErrorHandlingTest {
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
var counter = 0
@Before
fun setup() {
counter = 0
}
internal fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) {
driver(
DriverParameters(
notarySpecs = listOf(notarySpec),
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
dsl()
}
}
internal fun DriverDSL.createBytemanNode(
providedName: CordaX500Name,
additionalCordapps: Collection<TestCordapp> = emptyList()
): Pair<NodeHandle, Int> {
val port = nextPort()
val nodeHandle = (this as InternalDriverDSL).startNode(
NodeParameters(
providedName = providedName,
rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps
),
bytemanPort = port
).getOrThrow()
return nodeHandle to port
}
internal fun DriverDSL.createNode(providedName: CordaX500Name, additionalCordapps: Collection<TestCordapp> = emptyList()): NodeHandle {
return startNode(
NodeParameters(
providedName = providedName,
rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps
)
).getOrThrow()
}
internal fun submitBytemanRules(rules: String, port: Int) {
val submit = Submit("localhost", port)
submit.addScripts(listOf(ScriptText("Test script", rules)))
}
internal fun getBytemanOutput(nodeHandle: NodeHandle): List<String> {
return nodeHandle.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
}
internal fun OutOfProcessImpl.stop(timeout: Duration): Boolean {
return process.run {
destroy()
waitFor(timeout.seconds, TimeUnit.SECONDS)
}.also { onStopCallback() }
}
@Suppress("LongParameterList")
internal fun CordaRPCOps.assertHospitalCounts(
discharged: Int = 0,
observation: Int = 0,
propagated: Int = 0,
dischargedRetry: Int = 0,
observationRetry: Int = 0,
propagatedRetry: Int = 0
) {
val counts = startFlow(StateMachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.getOrThrow(20.seconds)
assertEquals(discharged, counts.discharged)
assertEquals(observation, counts.observation)
assertEquals(propagated, counts.propagated)
assertEquals(dischargedRetry, counts.dischargeRetry)
assertEquals(observationRetry, counts.observationRetry)
assertEquals(propagatedRetry, counts.propagatedRetry)
}
internal fun CordaRPCOps.assertHospitalCountsAllZero() = assertHospitalCounts()
internal fun CordaRPCOps.assertNumberOfCheckpoints(
runnable: Int = 0,
failed: Int = 0,
completed: Int = 0,
hospitalized: Int = 0
) {
val counts = startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.getOrThrow(20.seconds)
assertEquals(runnable, counts.runnable, "There should be $runnable runnable checkpoints")
assertEquals(failed, counts.failed, "There should be $failed failed checkpoints")
assertEquals(completed, counts.completed, "There should be $completed completed checkpoints")
assertEquals(hospitalized, counts.hospitalized, "There should be $hospitalized hospitalized checkpoints")
}
internal fun CordaRPCOps.assertNumberOfCheckpointsAllZero() = assertNumberOfCheckpoints()
@StartableByRPC
@InitiatingFlow
class SendAMessageFlow(private val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
val session = initiateFlow(party)
session.send("hello there")
logger.info("Finished my flow")
return "Finished executing test flow - ${this.runId}"
}
}
@InitiatedBy(SendAMessageFlow::class)
class SendAMessageResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
logger.info("Finished my flow")
}
}
@StartableByRPC
class ThrowAnErrorFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
throwException()
return "cant get here"
}
private fun throwException() {
logger.info("Throwing exception in flow")
throw IllegalStateException("throwing exception in flow")
}
}
@StartableByRPC
class ThrowAHospitalizeErrorFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
throwException()
return "cant get here"
}
private fun throwException() {
logger.info("Throwing exception in flow")
throw HospitalizeFlowException("throwing exception in flow")
}
}
@StartableByRPC
class GetNumberOfCheckpointsFlow : FlowLogic<NumberOfCheckpoints>() {
override fun call() = NumberOfCheckpoints(
runnable = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.RUNNABLE),
failed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.FAILED),
completed = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.COMPLETED),
hospitalized = getNumberOfCheckpointsWithStatus(Checkpoint.FlowStatus.HOSPITALIZED)
)
private fun getNumberOfCheckpointsWithStatus(status: Checkpoint.FlowStatus): Int {
return serviceHub.jdbcSession()
.prepareStatement("select count(*) from node_checkpoints where status = ? and flow_id != ?")
.apply {
setInt(1, status.ordinal)
setString(2, runId.uuid.toString())
}
.use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}.toInt()
}
}
@CordaSerializable
data class NumberOfCheckpoints(
val runnable: Int = 0,
val failed: Int = 0,
val completed: Int = 0,
val hospitalized: Int = 0
)
// Internal use for testing only!!
@StartableByRPC
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
override fun call(): HospitalCounts =
HospitalCounts(
serviceHub.cordaService(HospitalCounter::class.java).dischargedCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationCounter,
serviceHub.cordaService(HospitalCounter::class.java).propagatedCounter,
serviceHub.cordaService(HospitalCounter::class.java).dischargeRetryCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationRetryCounter,
serviceHub.cordaService(HospitalCounter::class.java).propagatedRetryCounter
)
}
@CordaSerializable
data class HospitalCounts(
val discharged: Int,
val observation: Int,
val propagated: Int,
val dischargeRetry: Int,
val observationRetry: Int,
val propagatedRetry: Int
)
@Suppress("UNUSED_PARAMETER")
@CordaService
class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() {
var dischargedCounter: Int = 0
var observationCounter: Int = 0
var propagatedCounter: Int = 0
var dischargeRetryCounter: Int = 0
var observationRetryCounter: Int = 0
var propagatedRetryCounter: Int = 0
init {
StaffedFlowHospital.onFlowDischarged.add { _, _ ->
dischargedCounter++
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
observationCounter++
}
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
propagatedCounter++
}
StaffedFlowHospital.onFlowResuscitated.add { _, _, outcome ->
when (outcome) {
StaffedFlowHospital.Outcome.DISCHARGE -> dischargeRetryCounter++
StaffedFlowHospital.Outcome.OVERNIGHT_OBSERVATION -> observationRetryCounter++
StaffedFlowHospital.Outcome.UNTREATABLE -> propagatedRetryCounter++
}
}
}
}
internal val actionExecutorClassName: String by lazy {
Class.forName("net.corda.node.services.statemachine.ActionExecutorImpl").name
}
internal val stateMachineManagerClassName: String by lazy {
Class.forName("net.corda.node.services.statemachine.SingleThreadedStateMachineManager").name
}
}

View File

@ -1,6 +1,5 @@
package net.corda.node.services.statemachine
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.messaging.startFlow
@ -22,7 +21,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
class StateMachineFinalityErrorHandlingTest : StateMachineErrorHandlingTest() {
/**
* Throws an exception when recoding a transaction inside of [ReceiveFinalityFlow] on the responding
@ -33,10 +32,10 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its
* send to the responding node and the responding node successfully received it.
*/
@Test(timeout=300_000)
fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() {
@Test(timeout = 300_000)
fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work
@ -67,14 +66,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
alice.rpc.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
@ -83,15 +77,11 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
charlie.rpc.assertHospitalCounts(observation = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
}
}
@ -104,10 +94,10 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its
* send to the responding node and the responding node successfully received it.
*/
@Test(timeout=300_000)
fun `error resolving a transaction's dependencies inside of ReceiveFinalityFlow will keep the flow in for observation`() {
@Test(timeout = 300_000)
fun `error resolving a transaction's dependencies inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work
@ -138,14 +128,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
alice.rpc.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
@ -154,15 +139,11 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
charlie.rpc.assertHospitalCounts(observation = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
}
}
@ -170,17 +151,17 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding
* flow's node.
*
* The exception is thrown 5 times.
* The exception is thrown 3 times.
*
* The responding flow is retried 3 times and then completes successfully.
*
* The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the
* flow is retried instead of moving straight to observation.
*/
@Test(timeout=300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and complete successfully`() {
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and complete successfully`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """
@ -204,35 +185,14 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("finality_flag") && readCounter("counter") < 5
IF flagged("finality_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
alice.rpc.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
@ -241,20 +201,14 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
val output = getBytemanOutput(charlie)
// This sleep is a bit suspect...
Thread.sleep(1000)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
}
}
@ -262,7 +216,7 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding
* flow's node.
*
* The exception is thrown 7 times.
* The exception is thrown 4 times.
*
* The responding flow is retried 3 times and is then kept in for observation.
*
@ -272,10 +226,10 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the
* flow is retried instead of moving straight to observation.
*/
@Test(timeout=300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and be kept for observation is error persists`() {
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and be kept for observation is error persists`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """
@ -299,36 +253,15 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("finality_flag") && readCounter("counter") < 7
IF flagged("finality_flag") && readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
aliceClient.startFlow(
alice.rpc.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
@ -338,20 +271,14 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
).returnValue.getOrThrow(30.seconds)
}
val output = getBytemanOutput(charlie)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for CashIssueAndPaymentFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertNumberOfCheckpoints(runnable = 1)
charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
}
}
}

View File

@ -0,0 +1,581 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaRuntimeException
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.api.CheckpointStorage
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.internal.OutOfProcessImpl
import org.junit.Test
import java.sql.Connection
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
private companion object {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.processEvent].
*
* This is not an expected place for an exception to occur, but allows us to test what happens when a random exception is propagated
* up to [FlowStateMachineImpl.run] during flow initialisation.
*
* A "Transaction context is missing" exception is thrown due to where the exception is thrown (no transaction is created so this is
* thrown when leaving [FlowStateMachineImpl.processEventsUntilFlowIsResumed] due to the finally block).
*/
@Test(timeout = 300_000)
fun `unexpected error during flow initialisation throws exception to client`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD processEvent
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD processEvent
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<CordaRuntimeException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
}
alice.rpc.assertNumberOfCheckpoints(failed = 1)
alice.rpc.assertHospitalCounts(propagated = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* A [SQLException] is then thrown when trying to rollback the flow's database transaction.
*
* The [SQLException] should be suppressed and the flow should continue to retry and complete successfully.
*/
@Test(timeout = 300_000)
fun `error during initialisation when trying to rollback the flow's database transaction the flow is able to retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") == 0
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception when rolling back transaction in transition executor
INTERFACE ${Connection::class.java.name}
METHOD rollback
AT ENTRY
IF readCounter("counter") == 1
DO incrementCounter("counter"); traceln("Throwing exception in transition executor"); throw new java.sql.SQLException("could not reach db", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* A [SQLException] is then thrown when trying to close the flow's database transaction.
*
* The [SQLException] should be suppressed and the flow should continue to retry and complete successfully.
*/
@Test(timeout = 300_000)
fun `error during initialisation when trying to close the flow's database transaction the flow is able to retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") == 0
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception when rolling back transaction in transition executor
INTERFACE ${Connection::class.java.name}
METHOD close
AT ENTRY
IF readCounter("counter") == 1
DO incrementCounter("counter"); traceln("Throwing exception in transition executor"); throw new java.sql.SQLException("could not reach db", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
val terminated = (alice as OutOfProcessImpl).stop(60.seconds)
assertTrue(terminated, "The node must be shutdown before it can be restarted")
val (alice2, _) = createBytemanNode(ALICE_NAME)
Thread.sleep(20.seconds.toMillis())
alice2.rpc.assertNumberOfCheckpointsAllZero()
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the retry itself.
*
* The flow then retries the retry causing the flow to complete successfully.
*/
@Test(timeout = 300_000)
fun `error during retrying a flow that failed when committing its original checkpoint will retry the flow again and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Throw exception on retry
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(
discharged = 1,
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
* saved its first checkpoint (remains in an unstarted state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(discharged = 3)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
* saved its first checkpoint (remains in an unstarted state).
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.rpc.assertNumberOfCheckpoints(runnable = 1)
charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
val terminated = (charlie as OutOfProcessImpl).stop(60.seconds)
assertTrue(terminated, "The node must be shutdown before it can be restarted")
val (charlie2, _) = createBytemanNode(CHARLIE_NAME)
Thread.sleep(10.seconds.toMillis())
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie2.rpc.assertNumberOfCheckpointsAllZero()
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state) on a responding node.
*
* The exception is thrown 3 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `responding flow - session init can be retried when there is a transient connection error to the database`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 0
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state) on a responding node.
*
* The exception is thrown 4 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* fails and is kept for in for observation.
*/
@Test(timeout = 300_000)
fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
}
charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
}
}
}

View File

@ -0,0 +1,661 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaRuntimeException
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.transitions.TopLevelTransition
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
private companion object {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
}
/**
* Throws an exception when performing an [Action.SendInitial] action.
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and is then kept in
* the hospital for observation.
*/
@Test(timeout = 300_000)
fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendMultiple action
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
}
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.SendInitial] event.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendMultiple action
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when executing [DeduplicationHandler.afterDatabaseTransaction] from inside an [Action.AcknowledgeMessages] action.
*
* The exception is thrown every time [DeduplicationHandler.afterDatabaseTransaction] is executed inside of
* [ActionExecutorImpl.executeAcknowledgeMessages]
*
* The exceptions should be swallowed. Therefore there should be no trips to the hospital and no retries.
* The flow should complete successfully as the error is swallowed.
*/
@Test(timeout = 300_000)
fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Set flag when inside executeAcknowledgeMessages
CLASS $actionExecutorClassName
METHOD executeAcknowledgeMessages
AT INVOKE ${DeduplicationHandler::class.java.name}.afterDatabaseTransaction()
IF !flagged("exception_flag")
DO flag("exception_flag"); traceln("Setting flag to true")
ENDRULE
RULE Throw exception when executing ${DeduplicationHandler::class.java.name}.afterDatabaseTransaction when inside executeAcknowledgeMessages
INTERFACE ${DeduplicationHandler::class.java.name}
METHOD afterDatabaseTransaction
AT ENTRY
IF flagged("exception_flag")
DO traceln("Throwing exception"); clear("exception_flag"); traceln("SETTING FLAG TO FALSE"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCountsAllZero()
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when trying to propagate an error (processing an
* [Event.StartErrorPropagation] event)
*
* The exception is thrown 3 times.
*
* This causes the flow to retry the [Event.StartErrorPropagation] event until it succeeds. This this scenario it is retried 3 times,
* on the final retry the flow successfully propagates the error and completes exceptionally.
*/
@Test(timeout = 300_000)
fun `error during error propagation the flow is able to retry and recover`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ThrowAnErrorFlow::class.java.name}
METHOD throwException
AT ENTRY
IF !flagged("my_flag")
DO traceln("SETTING FLAG TO TRUE"); flag("my_flag")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("my_flag") && readCounter("counter") < 3
DO traceln("Throwing exception"); incrementCounter("counter"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<CordaRuntimeException> {
alice.rpc.startFlow(StateMachineErrorHandlingTest::ThrowAnErrorFlow).returnValue.getOrThrow(60.seconds)
}
alice.rpc.assertNumberOfCheckpoints(failed = 1)
alice.rpc.assertHospitalCounts(
propagated = 1,
propagatedRetry = 3
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when replaying a flow that has already successfully created its initial checkpoint.
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the retry itself.
*
* The flow is discharged and replayed from the hospital. An exception is then thrown during the retry that causes the flow to be
* retried again.
*/
@Test(timeout = 300_000)
fun `error during flow retry when executing retryFlowFromSafePoint the flow is able to retry and recover`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Set flag when executing first commit
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Throw exception on retry
CLASS $stateMachineManagerClassName
METHOD addAndStartFlow
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(40.seconds)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(
discharged = 1,
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event after the flow has suspended (has moved to a started state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
// seems to be restarting the flow from the beginning every time
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Set flag when executing first commit
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
// seems to be restarting the flow from the beginning every time
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); clear("remove_checkpoint_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws a [ConstraintViolationException] when performing an [Action.CommitTransaction] event when the flow is finishing.
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 4
DO incrementCounter("counter");
clear("remove_checkpoint_flag");
traceln("Throwing exception");
throw new org.hibernate.exception.ConstraintViolationException("This flow has a terminal condition", new java.sql.SQLException(), "made up constraint")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
}
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state).
*
* The exception is thrown 3 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `flow can be retried when there is a transient connection error to the database`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 0
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state).
*
* The exception is thrown 4 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* fails and is kept for in for observation.
*/
@Test(timeout = 300_000)
fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing on a responding node.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
DO incrementCounter("counter");
clear("remove_checkpoint_flag");
traceln("Throwing exception");
throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
}
}
}

View File

@ -0,0 +1,181 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
/**
* Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout = 300_000)
fun `error during transition due to killing a flow will terminate the flow`() {
startDriver {
val alice = createNode(ALICE_NAME)
val flow = alice.rpc.startTrackedFlow(StateMachineKillFlowErrorHandlingTest::SleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == SleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = alice.rpc.killFlow(flow.id)
}
}
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(20.seconds) }
assertTrue(flowKilled)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCountsAllZero()
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Triggers `killFlow` during user application code.
*
* The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable]
* call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly.
*
* Although the call to kill the flow is made during user application code. It will not be removed / stop processing
* until the next suspension point is reached within the flow.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout = 300_000)
fun `flow killed during user code execution stops and removes the flow correctly`() {
startDriver {
val alice = createNode(ALICE_NAME)
val flow = alice.rpc.startTrackedFlow(StateMachineKillFlowErrorHandlingTest::ThreadSleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == ThreadSleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = alice.rpc.killFlow(flow.id)
}
}
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(30.seconds) }
assertTrue(flowKilled)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCountsAllZero()
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and
* all that remains is its checkpoint in the database.
*
* The flow terminates and is not retried.
*
* Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are
* from the original flow that was put in for observation.
*/
@Test(timeout = 300_000)
fun `flow killed when it is in the flow hospital for observation is removed correctly`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val charlie = createNode(CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendMultiple action
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val flow = alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
alice.rpc.killFlow(flow.id)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
@StartableByRPC
class SleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
}
@StartableByRPC
class ThreadSleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep()
logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
// Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar
private fun sleep() {
Thread.sleep(20000)
}
}
}

View File

@ -1,7 +1,6 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
@ -20,13 +19,14 @@ import org.junit.Test
import kotlin.test.assertEquals
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
class StateMachineSubFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
/**
* This test checks that flow calling an initiating subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty.
* The exception is thrown 5 times.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
@ -37,11 +37,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test(timeout=300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
@Test(timeout = 300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
@ -72,7 +72,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 5
IF flagged("subflow_flag") && flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
@ -83,52 +83,20 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
IF flagged("subflow_flag") && flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
alice.rpc.startFlow(
StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
@ -136,7 +104,8 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* This test checks that flow calling an initiating subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty.
* The exception is thrown 5 times.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
@ -147,11 +116,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test(timeout=300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs after the first receive will retry and complete successfully`() {
@Test(timeout = 300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs after the first receive will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
@ -182,55 +151,23 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("suspend_flag") && readCounter("counter") < 5
IF flagged("subflow_flag") && flagged("suspend_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
alice.rpc.startFlow(
StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
@ -238,7 +175,8 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* This test checks that flow calling an inline subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty.
* The exception is thrown 5 times.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
@ -249,11 +187,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test(timeout=300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
@Test(timeout = 300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
@ -276,55 +214,23 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && readCounter("counter") < 5
IF flagged("subflow_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
alice.rpc.startFlow(
StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
@ -332,7 +238,8 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* This test checks that flow calling an inline subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty.
* The exception is thrown 5 times.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
@ -343,11 +250,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test(timeout=300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first receive will retry and complete successfully`() {
@Test(timeout = 300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first receive will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
@ -370,7 +277,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("commit_flag") && readCounter("counter") < 5
IF flagged("subflow_flag") && flagged("commit_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
@ -381,52 +288,20 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
IF flagged("subflow_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
alice.rpc.startFlow(
StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}

View File

@ -1,188 +0,0 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.list
import net.corda.core.internal.readAllLines
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
import net.corda.testing.node.internal.InternalDriverDSL
import org.jboss.byteman.agent.submit.ScriptText
import org.jboss.byteman.agent.submit.Submit
import org.junit.Before
abstract class StatemachineErrorHandlingTest {
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
var counter = 0
@Before
fun setup() {
counter = 0
}
internal fun startDriver(notarySpec: NotarySpec = NotarySpec(DUMMY_NOTARY_NAME), dsl: DriverDSL.() -> Unit) {
driver(
DriverParameters(
notarySpecs = listOf(notarySpec),
startNodesInProcess = false,
inMemoryDB = false,
systemProperties = mapOf("co.paralleluniverse.fibers.verifyInstrumentation" to "true")
)
) {
dsl()
}
}
internal fun DriverDSL.createBytemanNode(
providedName: CordaX500Name,
additionalCordapps: Collection<TestCordapp> = emptyList()
): NodeHandle {
return (this as InternalDriverDSL).startNode(
NodeParameters(
providedName = providedName,
rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps
),
bytemanPort = 12000
).getOrThrow()
}
internal fun DriverDSL.createNode(providedName: CordaX500Name, additionalCordapps: Collection<TestCordapp> = emptyList()): NodeHandle {
return startNode(
NodeParameters(
providedName = providedName,
rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps
)
).getOrThrow()
}
internal fun submitBytemanRules(rules: String) {
val submit = Submit("localhost", 12000)
submit.addScripts(listOf(ScriptText("Test script", rules)))
}
internal fun getBytemanOutput(nodeHandle: NodeHandle): List<String> {
return nodeHandle.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
}
@StartableByRPC
@InitiatingFlow
class SendAMessageFlow(private val party: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
val session = initiateFlow(party)
session.send("hello there")
return "Finished executing test flow - ${this.runId}"
}
}
@InitiatedBy(SendAMessageFlow::class)
class SendAMessageResponder(private val session: FlowSession) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
}
}
@StartableByRPC
class ThrowAnErrorFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
throwException()
return "cant get here"
}
private fun throwException() {
logger.info("Throwing exception in flow")
throw IllegalStateException("throwing exception in flow")
}
}
@StartableByRPC
class GetNumberOfUncompletedCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
val sqlStatement = "select count(*) from node_checkpoints where status not in (${Checkpoint.FlowStatus.COMPLETED.ordinal})"
return serviceHub.jdbcSession().prepareStatement(sqlStatement).use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
}
}
@StartableByRPC
class GetNumberOfHospitalizedCheckpointsFlow : FlowLogic<Long>() {
override fun call(): Long {
val sqlStatement = "select count(*) from node_checkpoints where status in (${Checkpoint.FlowStatus.HOSPITALIZED.ordinal})"
return serviceHub.jdbcSession().prepareStatement(sqlStatement).use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
}
}
// Internal use for testing only!!
@StartableByRPC
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
override fun call(): HospitalCounts =
HospitalCounts(
serviceHub.cordaService(HospitalCounter::class.java).dischargeCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationCounter
)
}
@CordaSerializable
data class HospitalCounts(val discharge: Int, val observation: Int)
@Suppress("UNUSED_PARAMETER")
@CordaService
class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() {
var observationCounter: Int = 0
var dischargeCounter: Int = 0
init {
StaffedFlowHospital.onFlowDischarged.add { _, _ ->
++dischargeCounter
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
}
}
}
internal val actionExecutorClassName: String by lazy {
Class.forName("net.corda.node.services.statemachine.ActionExecutorImpl").name
}
internal val stateMachineManagerClassName: String by lazy {
Class.forName("net.corda.node.services.statemachine.SingleThreadedStateMachineManager").name
}
}

View File

@ -1,321 +0,0 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
/**
* Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout=300_000)
fun `error during transition due to killing a flow will terminate the flow`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(StatemachineKillFlowErrorHandlingTest::SleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == SleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(20.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` during user application code.
*
* The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable]
* call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly.
*
* Although the call to kill the flow is made during user application code. It will not be removed / stop processing
* until the next suspension point is reached within the flow.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout=300_000)
fun `flow killed during user code execution stops and removes the flow correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(StatemachineKillFlowErrorHandlingTest::ThreadSleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == ThreadSleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(30.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
println(numberOfTerminalDiagnoses)
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and
* all that remains is its checkpoint in the database.
*
* The flow terminates and is not retried.
*
* Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are
* from the original flow that was put in for observation.
*/
@Test(timeout=300_000)
fun `flow killed when it is in the flow hospital for observation is removed correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val charlie = createNode(CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendMultiple action
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
aliceClient.killFlow(flow.id)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@StartableByRPC
class SleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
}
@StartableByRPC
class ThreadSleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep()
logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
// Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar
private fun sleep() {
Thread.sleep(20000)
}
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import java.sql.SQLException
/**
* An executor of a single [Action].
@ -10,5 +11,6 @@ interface ActionExecutor {
* Execute [action] by [fiber].
*/
@Suspendable
@Throws(SQLException::class)
fun executeAction(fiber: FlowFiber, action: Action)
}

View File

@ -14,6 +14,7 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.sql.SQLException
import java.time.Duration
/**
@ -208,6 +209,7 @@ internal class ActionExecutorImpl(
}
@Suspendable
@Throws(SQLException::class)
private fun executeCreateTransaction() {
if (contextTransactionOrNull != null) {
throw IllegalStateException("Refusing to create a second transaction")
@ -224,6 +226,7 @@ internal class ActionExecutorImpl(
}
@Suspendable
@Throws(SQLException::class)
private fun executeCommitTransaction() {
try {
contextTransaction.commit()

View File

@ -41,7 +41,7 @@ sealed class Event {
* Signal that an error has happened. This may be due to an uncaught exception in the flow or some external error.
* @param exception the exception itself.
*/
data class Error(val exception: Throwable) : Event()
data class Error(val exception: Throwable, val rollback: Boolean = true) : Event()
/**
* Signal that a ledger transaction has committed. This is an event completing a [FlowIORequest.WaitForLedgerCommit]

View File

@ -284,12 +284,15 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
openThreadLocalWormhole()
setLoggingContext()
initialiseFlow()
logger.debug { "Calling flow: $logic" }
val startTime = System.nanoTime()
var initialised = false
val resultOrError = try {
initialiseFlow()
initialised = true
// This sets the Cordapp classloader on the contextClassLoader of the current thread.
// Needed because in previous versions of the finance app we used Thread.contextClassLoader to resolve services defined in cordapps.
Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader
@ -310,14 +313,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Event.FlowFinish(resultOrError.value, softLocksId)
}
is Try.Failure -> {
Event.Error(resultOrError.exception)
Event.Error(resultOrError.exception, initialised)
}
}
// Immediately process the last event. This is to make sure the transition can assume that it has an open
// database transaction.
val continuation = processEventImmediately(
finalEvent,
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnEntry = initialised,
isDbTransactionOpenOnExit = false
)
if (continuation == FlowContinuation.ProcessEvents) {
@ -335,8 +338,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable
private fun initialiseFlow() {
processEventsUntilFlowIsResumed(
isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = true
isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = true
)
}

View File

@ -387,63 +387,51 @@ internal class SingleThreadedStateMachineManager(
flowSleepScheduler.cancel(currentState)
// Get set of external events
val flowId = currentState.flowLogic.runId
try {
val oldFlowLeftOver = innerState.withLock { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
if (oldFlowLeftOver == null) {
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
val oldFlowLeftOver = innerState.withLock { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
if (oldFlowLeftOver == null) {
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: return
// Resurrect flow
flowCreator.createFlowFromCheckpoint(flowId, checkpoint) ?: return
} else {
// Just flow initiation message
null
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: return
// Resurrect flow
flowCreator.createFlowFromCheckpoint(flowId, checkpoint) ?: return
} else {
// Just flow initiation message
null
}
innerState.withLock {
if (stopping) {
return
}
innerState.withLock {
if (stopping) {
return
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
} catch (e: Exception) {
// Failed to retry - manually put the flow in for observation rather than
// relying on the [HospitalisingInterceptor] to do so
val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored)
?.errors
?.map { it.exception }
?.plus(e) ?: emptyList()
logger.info("Failed to retry flow $flowId, keeping in for observation and aborting")
flowHospital.forceIntoOvernightObservation(currentState, exceptions)
throw e
}
}
@ -609,7 +597,8 @@ internal class SingleThreadedStateMachineManager(
// Load the flow's checkpoint
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
checkpointStorage.getCheckpoint(flowId)?.let { serializedCheckpoint ->
val existingCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
existingCheckpoint?.let { serializedCheckpoint ->
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId)
if (checkpoint == null) {
return openFuture<FlowStateMachine<A>>().mapError {
@ -763,6 +752,8 @@ internal class SingleThreadedStateMachineManager(
(exception as? FlowException)?.originalErrorId = flowError.errorId
flow.resultFuture.setException(exception)
lastState.flowLogic.progressTracker?.endWithError(exception)
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Failure<Nothing>(exception)))
}

View File

@ -32,12 +32,14 @@ import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.persistence.PersistenceException
import kotlin.collections.HashMap
import kotlin.concurrent.timerTask
import kotlin.math.pow
/**
* This hospital consults "staff" to see if they can automatically diagnose and treat flows.
*/
@Suppress("TooManyFunctions")
class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
private val clock: Clock,
private val ourSenderUUID: String) : Closeable {
@ -52,15 +54,24 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
DatabaseEndocrinologist,
TransitionErrorGeneralPractitioner,
SedationNurse,
NotaryDoctor
NotaryDoctor,
ResuscitationSpecialist
)
private const val MAX_BACKOFF_TIME = 110.0 // Totals to 2 minutes when calculating the backoff time
@VisibleForTesting
val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowDischarged = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowErrorPropagated = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowResuscitated = mutableListOf<(id: StateMachineRunId, by: List<String>, outcome: Outcome) -> Unit>()
@VisibleForTesting
val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>()
}
@ -194,12 +205,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
/**
* Request treatment for the [flowFiber]. A flow can only be added to the hospital if they are not already being
* treated.
* Request treatment for the [flowFiber].
*/
fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
// Only treat flows that are not already in the hospital
if (!currentState.isRemoved && flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) {
if (!currentState.isRemoved) {
flowsInHospital[flowFiber.id] = flowFiber
admit(flowFiber, currentState, errors)
}
}
@ -219,20 +229,30 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
Diagnosis.DISCHARGE -> {
val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState)
log.info("Flow error discharged from hospital (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})")
onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) }
onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }) }
Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff)
}
Diagnosis.OVERNIGHT_OBSERVATION -> {
log.info("Flow error kept for overnight observation by ${report.by} (error was ${report.error.message})")
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) }
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }) }
Triple(Outcome.OVERNIGHT_OBSERVATION, Event.OvernightObservation, 0.seconds)
}
Diagnosis.NOT_MY_SPECIALTY, Diagnosis.TERMINAL -> {
// None of the staff care for these errors, or someone decided it is a terminal condition, so we let them propagate
log.info("Flow error allowed to propagate", report.error)
onFlowErrorPropagated.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }) }
Triple(Outcome.UNTREATABLE, Event.StartErrorPropagation, 0.seconds)
}
Diagnosis.RESUSCITATE -> {
// reschedule the last outcome as it failed to process it
// do a 0.seconds backoff in dev mode? / when coming from the driver? make it configurable?
val backOff = calculateBackOffForResuscitation(medicalHistory, currentState)
val outcome = medicalHistory.records.last().outcome
log.info("Flow error to be resuscitated, rescheduling previous outcome - $outcome (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})")
onFlowResuscitated.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }, outcome) }
Triple(outcome, outcome.event, backOff)
}
}
val numberOfSuspends = currentState.checkpoint.checkpointState.numberOfSuspends
@ -251,18 +271,29 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
}
}
private fun calculateBackOffForChronicCondition(report: ConsultationReport, medicalHistory: FlowMedicalHistory, currentState: StateMachineState): Duration {
return report.by.firstOrNull { it is Chronic }?.let { chronicStaff ->
return medicalHistory.timesDischargedForTheSameThing(chronicStaff, currentState).let {
if (it == 0) {
0.seconds
} else {
maxOf(10, (10 + (Math.random()) * (10 * 1.5.pow(it)) / 2).toInt()).seconds
}
}
private fun calculateBackOffForChronicCondition(
report: ConsultationReport,
medicalHistory: FlowMedicalHistory,
currentState: StateMachineState
): Duration {
return report.by.firstOrNull { it is Chronic }?.let { staff ->
calculateBackOff(medicalHistory.timesDischargedForTheSameThing(staff, currentState))
} ?: 0.seconds
}
private fun calculateBackOffForResuscitation(
medicalHistory: FlowMedicalHistory,
currentState: StateMachineState
): Duration = calculateBackOff(medicalHistory.timesResuscitated(currentState))
private fun calculateBackOff(timesDiagnosisGiven: Int): Duration {
return if (timesDiagnosisGiven == 0) {
0.seconds
} else {
maxOf(10, (10 + (Math.random()) * minOf(MAX_BACKOFF_TIME, (10 * 1.5.pow(timesDiagnosisGiven)) / 2)).toInt()).seconds
}
}
private fun consultStaff(flowFiber: FlowFiber,
currentState: StateMachineState,
errors: List<Throwable>,
@ -324,6 +355,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount }
}
fun timesResuscitated(currentState: StateMachineState): Int {
val lastAdmittanceSuspendCount = currentState.checkpoint.checkpointState.numberOfSuspends
return records.count { ResuscitationSpecialist in it.by && it.suspendCount == lastAdmittanceSuspendCount }
}
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
}
@ -357,10 +393,16 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
}
}
enum class Outcome { DISCHARGE, OVERNIGHT_OBSERVATION, UNTREATABLE }
enum class Outcome(val event: Event) {
DISCHARGE(Event.RetryFlowFromSafePoint),
OVERNIGHT_OBSERVATION(Event.OvernightObservation),
UNTREATABLE(Event.StartErrorPropagation)
}
/** The order of the enum values are in priority order. */
enum class Diagnosis {
/** Retry the last outcome/diagnosis **/
RESUSCITATE,
/** The flow should not see other staff members */
TERMINAL,
/** Retry from last safe point. */
@ -375,6 +417,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
}
/**
* The [Chronic] interface relates to [Staff] that return diagnoses that can be constantly be diagnosed if the flow keeps returning to
* the hospital. [Chronic] diagnoses apply a backoff before scheduling a new [Event], this prevents a flow from constantly retrying
* without a chance for the underlying issue to resolve itself.
*/
interface Chronic
/**
@ -545,10 +592,10 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
newError.mentionsThrowable(AsyncOperationTransitionException::class.java) -> Diagnosis.NOT_MY_SPECIALTY
history.notDischargedForTheSameThingMoreThan(2, this, currentState) -> Diagnosis.DISCHARGE
else -> Diagnosis.OVERNIGHT_OBSERVATION
}
}.also { logDiagnosis(it, newError, flowFiber, history) }
} else {
Diagnosis.NOT_MY_SPECIALTY
}.also { logDiagnosis(it, newError, flowFiber, history) }
}
}
private fun logDiagnosis(diagnosis: Diagnosis, newError: Throwable, flowFiber: FlowFiber, history: FlowMedicalHistory) {
@ -599,6 +646,25 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
return Diagnosis.NOT_MY_SPECIALTY
}
}
/**
* Handles errors coming from the processing of errors events ([Event.StartErrorPropagation] and [Event.RetryFlowFromSafePoint]),
* returning a [Diagnosis.RESUSCITATE] diagnosis
*/
object ResuscitationSpecialist : Staff {
override fun consult(
flowFiber: FlowFiber,
currentState: StateMachineState,
newError: Throwable,
history: FlowMedicalHistory
): Diagnosis {
return if (newError is ErrorStateTransitionException) {
Diagnosis.RESUSCITATE
} else {
Diagnosis.NOT_MY_SPECIALTY
}
}
}
}
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {

View File

@ -16,3 +16,5 @@ class StateTransitionException(
}
class AsyncOperationTransitionException(exception: Exception) : CordaException(exception.message, exception)
class ErrorStateTransitionException(val exception: Exception) : CordaException(exception.message, exception)

View File

@ -9,6 +9,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseTransactionException
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.security.SecureRandom
import java.sql.SQLException
import javax.persistence.OptimisticLockException
/**
@ -19,8 +20,8 @@ import javax.persistence.OptimisticLockException
* completely aborted to avoid error loops.
*/
class TransitionExecutorImpl(
val secureRandom: SecureRandom,
val database: CordaPersistence
val secureRandom: SecureRandom,
val database: CordaPersistence
) : TransitionExecutor {
private companion object {
@ -30,36 +31,44 @@ class TransitionExecutorImpl(
@Suppress("NestedBlockDepth", "ReturnCount")
@Suspendable
override fun executeTransition(
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
contextDatabase = database
for (action in transition.actions) {
try {
actionExecutor.executeAction(fiber, action)
} catch (exception: Exception) {
contextTransactionOrNull?.run {
rollback()
close()
}
rollbackTransactionOnError()
if (transition.newState.checkpoint.errorState is ErrorState.Errored) {
// If we errored while transitioning to an error state then we cannot record the additional
// error as that may result in an infinite loop, e.g. error propagation fails -> record error -> propagate fails again.
// Instead we just keep around the old error state and wait for a new schedule, perhaps
// triggered from a flow hospital
log.warn("Error while executing $action during transition to errored state, aborting transition", exception)
// CORDA-3354 - Go to the hospital with the new error that has occurred
// while already in a error state (as this error could be for a different reason)
return Pair(FlowContinuation.Abort, previousState.copy(isFlowResumed = false))
log.warn("Error while executing $action, with error event $event, updating errored state", exception)
val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
listOf(
FlowError(
secureRandom.nextLong(),
ErrorStateTransitionException(exception)
)
)
)
),
isFlowResumed = false
)
return Pair(FlowContinuation.ProcessEvents, newState)
} else {
// Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork
// to trigger error propagation
if(previousState.isRemoved && exception is OptimisticLockException) {
log.debug("Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " +
"Occurred while executing $action, with event $event", exception)
if (log.isDebugEnabled && previousState.isRemoved && exception is OptimisticLockException) {
log.debug(
"Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " +
"Occurred while executing $action, with event $event", exception
)
} else {
log.info("Error while executing $action, with event $event, erroring state", exception)
}
@ -77,12 +86,12 @@ class TransitionExecutorImpl(
}
val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException))
)
),
isFlowResumed = false
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException))
)
),
isFlowResumed = false
)
fiber.scheduleEvent(Event.DoRemainingWork)
return Pair(FlowContinuation.ProcessEvents, newState)
@ -91,4 +100,25 @@ class TransitionExecutorImpl(
}
return Pair(transition.continuation, transition.newState)
}
private fun rollbackTransactionOnError() {
contextTransactionOrNull?.run {
try {
rollback()
} catch (rollbackException: SQLException) {
log.info(
"Error rolling back database transaction from a previous error, continuing error handling for the original error",
rollbackException
)
}
try {
close()
} catch (rollbackException: SQLException) {
log.info(
"Error closing database transaction from a previous error, continuing error handling for the original error",
rollbackException
)
}
}
}
}

View File

@ -17,17 +17,17 @@ import net.corda.node.services.statemachine.transitions.TransitionResult
* transition.
*/
class HospitalisingInterceptor(
private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor
private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor
) : TransitionExecutor {
@Suspendable
override fun executeTransition(
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
// If the fiber's previous state was clean then remove it from the hospital
@ -38,8 +38,8 @@ class HospitalisingInterceptor(
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
if (nextState.checkpoint.errorState is ErrorState.Errored && previousState.checkpoint.errorState is ErrorState.Clean) {
val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception }
if (canEnterHospital(previousState, nextState)) {
val exceptionsToHandle = (nextState.checkpoint.errorState as ErrorState.Errored).errors.map { it.exception }
flowHospital.requestTreatment(fiber, previousState, exceptionsToHandle)
}
if (nextState.isRemoved) {
@ -48,6 +48,11 @@ class HospitalisingInterceptor(
return Pair(continuation, nextState)
}
private fun canEnterHospital(previousState: StateMachineState, nextState: StateMachineState): Boolean {
return nextState.checkpoint.errorState is ErrorState.Errored
&& (previousState.checkpoint.errorState as? ErrorState.Errored)?.errors != nextState.checkpoint.errorState.errors
}
private fun removeFlow(id: StateMachineRunId) {
flowHospital.leave(id)
flowHospital.removeMedicalHistory(id)

View File

@ -1,6 +1,8 @@
package net.corda.node.services.statemachine.transitions
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.StateMachineState
/**
* This transition checks the current state of the flow and determines whether anything needs to be done.

View File

@ -3,7 +3,9 @@ package net.corda.node.services.statemachine.transitions
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.InitiatingFlow
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.DeduplicationId
@ -11,12 +13,15 @@ import net.corda.node.services.statemachine.EndSessionMessage
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.Event
import net.corda.node.services.statemachine.ExistingSessionMessage
import net.corda.node.services.statemachine.ExternalEvent
import net.corda.node.services.statemachine.FlowRemovalReason
import net.corda.node.services.statemachine.FlowSessionImpl
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.InitialSessionMessage
import net.corda.node.services.statemachine.InitiatedSessionState
import net.corda.node.services.statemachine.SenderDeduplicationId
import net.corda.node.services.statemachine.SessionId
import net.corda.node.services.statemachine.SessionMessage
import net.corda.node.services.statemachine.SessionState
import net.corda.node.services.statemachine.StateMachineState
import net.corda.node.services.statemachine.SubFlow
@ -62,7 +67,7 @@ class TopLevelTransition(
private fun errorTransition(event: Event.Error): TransitionResult {
return builder {
freshErrorTransition(event.exception)
freshErrorTransition(event.exception, event.rollback)
FlowContinuation.ProcessEvents
}
}
@ -314,24 +319,40 @@ class TopLevelTransition(
private fun retryFlowFromSafePointTransition(startingState: StateMachineState): TransitionResult {
return builder {
// Need to create a flow from the prior checkpoint or flow initiation.
actions.add(Action.CreateTransaction)
actions.add(Action.RetryFlowFromSafePoint(startingState))
actions.add(Action.CommitTransaction)
FlowContinuation.Abort
}
}
private fun overnightObservationTransition(): TransitionResult {
return builder {
val flowStartEvents = currentState.pendingDeduplicationHandlers.filter(::isFlowStartEvent)
val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
actions.add(Action.CreateTransaction)
actions.add(Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted))
actions.add(Action.CommitTransaction)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions += Action.CreateTransaction
actions += Action.PersistDeduplicationFacts(flowStartEvents)
actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
actions += Action.CommitTransaction
actions += Action.AcknowledgeMessages(flowStartEvents)
currentState = currentState.copy(
checkpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED),
pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers - flowStartEvents
)
FlowContinuation.ProcessEvents
}
}
private fun isFlowStartEvent(handler: DeduplicationHandler): Boolean {
return handler.externalCause.run { isSessionInit() || isFlowStart() }
}
private fun ExternalEvent.isSessionInit(): Boolean {
return this is ExternalEvent.ExternalMessageEvent && this.receivedMessage.data.deserialize<SessionMessage>() is InitialSessionMessage
}
private fun ExternalEvent.isFlowStart(): Boolean {
return this is ExternalEvent.ExternalStartFlowEvent<*>
}
private fun wakeUpFromSleepTransition(): TransitionResult {
return builder {
resumeFlowLogic(Unit)

View File

@ -28,12 +28,12 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
*
* @param error the error.
*/
fun freshErrorTransition(error: Throwable) {
fun freshErrorTransition(error: Throwable, rollback: Boolean = true) {
val flowError = FlowError(
errorId = (error as? IdentifiableException)?.errorId ?: context.secureRandom.nextLong(),
exception = error
)
errorTransition(flowError)
errorTransition(flowError, rollback)
}
/**
@ -42,7 +42,7 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
*
* @param error the error.
*/
fun errorsTransition(errors: List<FlowError>) {
fun errorsTransition(errors: List<FlowError>, rollback: Boolean) {
currentState = currentState.copy(
checkpoint = currentState.checkpoint.copy(
errorState = currentState.checkpoint.errorState.addErrors(errors)
@ -50,10 +50,10 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
isFlowResumed = false
)
actions.clear()
actions.addAll(arrayOf(
Action.RollbackTransaction,
Action.ScheduleEvent(Event.DoRemainingWork)
))
if(rollback) {
actions += Action.RollbackTransaction
}
actions += Action.ScheduleEvent(Event.DoRemainingWork)
}
/**
@ -62,8 +62,8 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
*
* @param error the error.
*/
fun errorTransition(error: FlowError) {
errorsTransition(listOf(error))
fun errorTransition(error: FlowError, rollback: Boolean) {
errorsTransition(listOf(error), rollback)
}
fun resumeFlowLogic(result: Any?): FlowContinuation {

View File

@ -0,0 +1,2 @@
## corda-serialization-deterministic.
This artifact is a deterministic subset of the binary contents of `corda-serialization`.

View File

@ -193,12 +193,20 @@ artifacts {
publish file: deterministicJar, name: jarBaseName, type: 'jar', extension: 'jar', builtBy: metafix
}
tasks.named('sourceJar', Jar) {
from 'README.md'
include 'README.md'
}
tasks.named('javadocJar', Jar) {
from 'README.md'
include 'README.md'
}
publish {
dependenciesFrom(configurations.deterministicArtifacts) {
defaultScope = 'compile'
}
publishSources = false
publishJavadoc = false
name jarBaseName
}

View File

@ -0,0 +1,2 @@
## corda-node-driver.
This artifact is the node-driver used for testing Corda.

View File

@ -73,9 +73,17 @@ jar {
}
}
tasks.named('javadocJar', Jar) {
from 'README.md'
include 'README.md'
}
tasks.named('javadoc', Javadoc) {
enabled = false
}
publish {
publishSources = true
publishJavadoc = false
name jar.baseName
}