Merge branch 'release/os/4.5' into dan/os-4.5-to-4.6-merge-2020-07-08

This commit is contained in:
LankyDan 2020-07-08 10:44:47 +01:00
commit fdae04fc28
34 changed files with 1974 additions and 2813 deletions

View File

@ -8,13 +8,26 @@ killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
*/
boolean isReleaseTag = (env.TAG_NAME =~ /^release.*JDK11$/)
/*
** calculate the stage for NexusIQ evaluation
** * build for snapshots
** * stage-release: for release candidates and for health checks
** * operate: for final release
*/
def nexusIqStage = "build"
if (isReleaseTag) {
switch (env.TAG_NAME) {
case ~/.*-RC\d+(-.*)?/: nexusIqStage = "stage-release"; break;
case ~/.*-HC\d+(-.*)?/: nexusIqStage = "stage-release"; break;
default: nexusIqStage = "operate"
}
}
pipeline {
agent {
label 'k8s'
}
options {
timestamps()
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
timeout(time: 3, unit: 'HOURS')
}
@ -27,6 +40,25 @@ pipeline {
}
stages {
stage('Sonatype Check') {
steps {
sh "./gradlew --no-daemon clean jar"
script {
sh "./gradlew --no-daemon properties | grep -E '^(version|group):' >version-properties"
def version = sh (returnStdout: true, script: "grep ^version: version-properties | sed -e 's/^version: //'").trim()
def groupId = sh (returnStdout: true, script: "grep ^group: version-properties | sed -e 's/^group: //'").trim()
def artifactId = 'corda'
nexusAppId = "jenkins-${groupId}-${artifactId}-jdk11-${version}"
}
nexusPolicyEvaluation (
failBuildOnNetworkError: false,
iqApplication: manualApplication(nexusAppId),
iqScanPatterns: [[scanPattern: 'node/capsule/build/libs/corda*.jar']],
iqStage: nexusIqStage
)
}
}
stage('Generate Build Image') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {

View File

@ -1,62 +0,0 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
@Library('corda-shared-build-pipeline-steps')
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'k8s' }
options {
timestamps()
timeout(time: 3, unit: 'HOURS')
}
environment {
DOCKER_TAG_TO_USE = "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
}
stages {
stage('Corda - Generate Build Image') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-Dkubenetize=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage"
}
sh "kubectl auth can-i get pods"
}
}
stage('Corda - Run Tests') {
stage('Integration Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelIntegrationTest"
if (env.CHANGE_ID) {
pullRequest.createStatus(status: 'success',
context: 'continuous-integration/jenkins/pr-merge/integrationTest',
description: 'Integration Tests Passed',
targetUrl: "${env.JOB_URL}/testResults")
}
}
}
}
}
post {
always {
junit testResults: '**/build/test-results-xml/**/*.xml', keepLongStdio: true
}
cleanup {
deleteDir() /* clean up our workspace */
}
}
}

View File

@ -8,7 +8,6 @@ pipeline {
options {
timestamps()
overrideIndexTriggers(false)
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
timeout(time: 3, unit: 'HOURS')
}
triggers {

View File

@ -11,7 +11,6 @@ pipeline {
timestamps()
ansiColor('xterm')
overrideIndexTriggers(false)
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
timeout(time: 3, unit: 'HOURS')
}
@ -24,6 +23,7 @@ pipeline {
// in the name
ARTIFACTORY_BUILD_NAME = "Corda / Publish / Publish Nightly to Artifactory"
.replaceAll("/", " :: ")
DOCKER_URL = "https://index.docker.io/v1/"
}
stages {
@ -58,6 +58,17 @@ pipeline {
)
}
}
stage('Publish Nightly to Docker Hub') {
steps {
withCredentials([
usernamePassword(credentialsId: 'corda-publisher-docker-hub-credentials',
usernameVariable: 'DOCKER_USERNAME',
passwordVariable: 'DOCKER_PASSWORD')]) {
sh "./gradlew pushOfficialImages"
}
}
}
}

View File

@ -11,7 +11,6 @@ pipeline {
timestamps()
ansiColor('xterm')
overrideIndexTriggers(false)
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
timeout(time: 3, unit: 'HOURS')
}

View File

@ -37,13 +37,13 @@ pipeline {
agent { label 'k8s' }
options {
timestamps()
buildDiscarder(logRotator(daysToKeepStr: '7', artifactDaysToKeepStr: '7'))
disableConcurrentBuilds()
timeout(time: 3, unit: 'HOURS')
}
environment {
DOCKER_TAG_TO_USE = "${env.GIT_COMMIT.subSequence(0, 8)}"
DOCKER_URL = "https://index.docker.io/v1/"
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
ARTIFACTORY_CREDENTIALS = credentials('artifactory-credentials')
@ -150,6 +150,20 @@ pipeline {
)
}
}
stage('Publish Release to Docker Hub') {
when {
expression { isReleaseTag }
}
steps {
withCredentials([
usernamePassword(credentialsId: 'corda-publisher-docker-hub-credentials',
usernameVariable: 'DOCKER_USERNAME',
passwordVariable: 'DOCKER_PASSWORD')]) {
sh "./gradlew pushOfficialImages"
}
}
}
}

View File

@ -1,60 +0,0 @@
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
@Library('corda-shared-build-pipeline-steps')
import static com.r3.build.BuildControl.killAllExistingBuildsForJob
killAllExistingBuildsForJob(env.JOB_NAME, env.BUILD_NUMBER.toInteger())
pipeline {
agent { label 'k8s' }
options {
timestamps()
timeout(time: 3, unit: 'HOURS')
}
environment {
DOCKER_TAG_TO_USE = "${UUID.randomUUID().toString().toLowerCase().subSequence(0, 12)}"
EXECUTOR_NUMBER = "${env.EXECUTOR_NUMBER}"
BUILD_ID = "${env.BUILD_ID}-${env.JOB_NAME}"
}
stages {
stage('Corda Pull Request - Generate Build Image') {
steps {
withCredentials([string(credentialsId: 'container_reg_passwd', variable: 'DOCKER_PUSH_PWD')]) {
sh "./gradlew " +
"-Dkubenetize=true " +
"-Ddocker.push.password=\"\${DOCKER_PUSH_PWD}\" " +
"-Ddocker.work.dir=\"/tmp/\${EXECUTOR_NUMBER}\" " +
"-Ddocker.provided.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean pushBuildImage"
}
sh "kubectl auth can-i get pods"
}
}
stage('Unit Tests') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" allParallelUnitTest"
if (env.CHANGE_ID) {
pullRequest.createStatus(status: 'success',
context: 'continuous-integration/jenkins/pr-merge/unitTest',
description: 'Unit Tests Passed',
targetUrl: "${env.JOB_URL}/testResults")
}
}
}
}
post {
always {
junit testResults: '**/build/test-results-xml/**/*.xml', keepLongStdio: true
}
cleanup {
deleteDir() /* clean up our workspace */
}
}
}

View File

@ -0,0 +1,2 @@
## corda-core-deterministic.
This artifact is a deterministic subset of the binary contents of `corda-core`.

View File

@ -207,10 +207,18 @@ artifacts {
publish file: deterministicJar, name: jarBaseName, type: 'jar', extension: 'jar', builtBy: metafix
}
tasks.named('sourceJar', Jar) {
from 'README.md'
include 'README.md'
}
tasks.named('javadocJar', Jar) {
from 'README.md'
include 'README.md'
}
publish {
dependenciesFrom configurations.deterministicArtifacts
publishSources = false
publishJavadoc = false
name jarBaseName
}

View File

@ -6,6 +6,7 @@ import org.hibernate.Session
import org.hibernate.Transaction
import rx.subjects.PublishSubject
import java.sql.Connection
import java.sql.SQLException
import java.util.UUID
import javax.persistence.EntityManager
@ -87,6 +88,7 @@ class DatabaseTransaction(
committed = true
}
@Throws(SQLException::class)
fun rollback() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.clear()
@ -97,16 +99,20 @@ class DatabaseTransaction(
clearException()
}
@Throws(SQLException::class)
fun close() {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.close()
try {
if (sessionDelegate.isInitialized() && session.isOpen) {
session.close()
}
if (database.closeConnection) {
connection.close()
}
} finally {
clearException()
contextTransactionOrNull = outerTransaction
}
if (database.closeConnection) {
connection.close()
}
clearException()
contextTransactionOrNull = outerTransaction
if (outerTransaction == null) {
synchronized(this) {
closed = true

View File

@ -3,6 +3,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC
@ -10,11 +11,14 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.list
import net.corda.core.internal.readAllLines
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.core.utilities.unwrap
import net.corda.node.services.Permissions
import net.corda.testing.core.DUMMY_NOTARY_NAME
@ -23,6 +27,7 @@ import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.driver.internal.OutOfProcessImpl
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
@ -30,8 +35,11 @@ import net.corda.testing.node.internal.InternalDriverDSL
import org.jboss.byteman.agent.submit.ScriptText
import org.jboss.byteman.agent.submit.Submit
import org.junit.Before
import java.time.Duration
import java.util.concurrent.TimeUnit
import kotlin.test.assertEquals
abstract class StatemachineErrorHandlingTest {
abstract class StateMachineErrorHandlingTest {
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
var counter = 0
@ -57,15 +65,17 @@ abstract class StatemachineErrorHandlingTest {
internal fun DriverDSL.createBytemanNode(
providedName: CordaX500Name,
additionalCordapps: Collection<TestCordapp> = emptyList()
): NodeHandle {
return (this as InternalDriverDSL).startNode(
): Pair<NodeHandle, Int> {
val port = nextPort()
val nodeHandle = (this as InternalDriverDSL).startNode(
NodeParameters(
providedName = providedName,
rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps
),
bytemanPort = 12000
bytemanPort = port
).getOrThrow()
return nodeHandle to port
}
internal fun DriverDSL.createNode(providedName: CordaX500Name, additionalCordapps: Collection<TestCordapp> = emptyList()): NodeHandle {
@ -78,8 +88,8 @@ abstract class StatemachineErrorHandlingTest {
).getOrThrow()
}
internal fun submitBytemanRules(rules: String) {
val submit = Submit("localhost", 12000)
internal fun submitBytemanRules(rules: String, port: Int) {
val submit = Submit("localhost", port)
submit.addScripts(listOf(ScriptText("Test script", rules)))
}
@ -90,6 +100,37 @@ abstract class StatemachineErrorHandlingTest {
.readAllLines()
}
internal fun OutOfProcessImpl.stop(timeout: Duration): Boolean {
return process.run {
destroy()
waitFor(timeout.seconds, TimeUnit.SECONDS)
}.also { onStopCallback() }
}
@Suppress("LongParameterList")
internal fun CordaRPCOps.assertHospitalCounts(
discharged: Int = 0,
observation: Int = 0,
propagated: Int = 0,
dischargedRetry: Int = 0,
observationRetry: Int = 0,
propagatedRetry: Int = 0
) {
val counts = startFlow(StateMachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.getOrThrow(20.seconds)
assertEquals(discharged, counts.discharged)
assertEquals(observation, counts.observation)
assertEquals(propagated, counts.propagated)
assertEquals(dischargedRetry, counts.dischargeRetry)
assertEquals(observationRetry, counts.observationRetry)
assertEquals(propagatedRetry, counts.propagatedRetry)
}
internal fun CordaRPCOps.assertHospitalCountsAllZero() = assertHospitalCounts()
internal fun CordaRPCOps.assertNumberOfCheckpoints(number: Long) {
assertEquals(number, startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
@StartableByRPC
@InitiatingFlow
class SendAMessageFlow(private val party: Party) : FlowLogic<String>() {
@ -97,6 +138,7 @@ abstract class StatemachineErrorHandlingTest {
override fun call(): String {
val session = initiateFlow(party)
session.send("hello there")
logger.info("Finished my flow")
return "Finished executing test flow - ${this.runId}"
}
}
@ -106,6 +148,35 @@ abstract class StatemachineErrorHandlingTest {
@Suspendable
override fun call() {
session.receive<String>().unwrap { it }
logger.info("Finished my flow")
}
}
@StartableByRPC
class ThrowAnErrorFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
throwException()
return "cant get here"
}
private fun throwException() {
logger.info("Throwing exception in flow")
throw IllegalStateException("throwing exception in flow")
}
}
@StartableByRPC
class ThrowAHospitalizeErrorFlow : FlowLogic<String>() {
@Suspendable
override fun call(): String {
throwException()
return "cant get here"
}
private fun throwException() {
logger.info("Throwing exception in flow")
throw HospitalizeFlowException("throwing exception in flow")
}
}
@ -154,26 +225,51 @@ abstract class StatemachineErrorHandlingTest {
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
override fun call(): HospitalCounts =
HospitalCounts(
serviceHub.cordaService(HospitalCounter::class.java).dischargeCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationCounter
serviceHub.cordaService(HospitalCounter::class.java).dischargedCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationCounter,
serviceHub.cordaService(HospitalCounter::class.java).propagatedCounter,
serviceHub.cordaService(HospitalCounter::class.java).dischargeRetryCounter,
serviceHub.cordaService(HospitalCounter::class.java).observationRetryCounter,
serviceHub.cordaService(HospitalCounter::class.java).propagatedRetryCounter
)
}
@CordaSerializable
data class HospitalCounts(val discharge: Int, val observation: Int)
data class HospitalCounts(
val discharged: Int,
val observation: Int,
val propagated: Int,
val dischargeRetry: Int,
val observationRetry: Int,
val propagatedRetry: Int
)
@Suppress("UNUSED_PARAMETER")
@CordaService
class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() {
var dischargedCounter: Int = 0
var observationCounter: Int = 0
var dischargeCounter: Int = 0
var propagatedCounter: Int = 0
var dischargeRetryCounter: Int = 0
var observationRetryCounter: Int = 0
var propagatedRetryCounter: Int = 0
init {
StaffedFlowHospital.onFlowDischarged.add { _, _ ->
++dischargeCounter
dischargedCounter++
}
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ ->
++observationCounter
observationCounter++
}
StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
propagatedCounter++
}
StaffedFlowHospital.onFlowResuscitated.add { _, _, outcome ->
when (outcome) {
StaffedFlowHospital.Outcome.DISCHARGE -> dischargeRetryCounter++
StaffedFlowHospital.Outcome.OVERNIGHT_OBSERVATION -> observationRetryCounter++
StaffedFlowHospital.Outcome.UNTREATABLE -> propagatedRetryCounter++
}
}
}
}

View File

@ -1,6 +1,5 @@
package net.corda.node.services.statemachine
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.internal.ResolveTransactionsFlow
import net.corda.core.messaging.startFlow
@ -22,7 +21,7 @@ import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
class StateMachineFinalityErrorHandlingTest : StateMachineErrorHandlingTest() {
/**
* Throws an exception when recoding a transaction inside of [ReceiveFinalityFlow] on the responding
@ -33,10 +32,10 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its
* send to the responding node and the responding node successfully received it.
*/
@Test(timeout=300_000)
fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() {
@Test(timeout = 300_000)
fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work
@ -67,14 +66,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
alice.rpc.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
@ -83,15 +77,11 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
charlie.rpc.assertHospitalCounts(observation = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
charlie.rpc.assertNumberOfCheckpoints(1)
}
}
@ -104,10 +94,10 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Only the responding node keeps a checkpoint. The initiating flow has completed successfully as it has complete its
* send to the responding node and the responding node successfully received it.
*/
@Test(timeout=300_000)
fun `error resolving a transaction's dependencies inside of ReceiveFinalityFlow will keep the flow in for observation`() {
@Test(timeout = 300_000)
fun `error resolving a transaction's dependencies inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work
@ -138,14 +128,9 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
alice.rpc.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
@ -154,15 +139,11 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
charlie.rpc.assertHospitalCounts(observation = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
charlie.rpc.assertNumberOfCheckpoints(1)
}
}
@ -170,17 +151,17 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding
* flow's node.
*
* The exception is thrown 5 times.
* The exception is thrown 3 times.
*
* The responding flow is retried 3 times and then completes successfully.
*
* The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the
* flow is retried instead of moving straight to observation.
*/
@Test(timeout=300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and complete successfully`() {
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and complete successfully`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """
@ -204,35 +185,14 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("finality_flag") && readCounter("counter") < 5
IF flagged("finality_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(
alice.rpc.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
@ -241,20 +201,11 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
defaultNotaryIdentity
).returnValue.getOrThrow(30.seconds)
val output = getBytemanOutput(charlie)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
assertEquals(0, charlieClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
charlie.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
charlie.rpc.assertNumberOfCheckpoints(0)
}
}
@ -262,7 +213,7 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* Throws an exception when executing [Action.CommitTransaction] as part of receiving a transaction to record inside of [ReceiveFinalityFlow] on the responding
* flow's node.
*
* The exception is thrown 7 times.
* The exception is thrown 4 times.
*
* The responding flow is retried 3 times and is then kept in for observation.
*
@ -272,10 +223,10 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
* The [StaffedFlowHospital.TransitionErrorGeneralPractitioner] catches these errors instead of the [StaffedFlowHospital.FinalityDoctor]. Due to this, the
* flow is retried instead of moving straight to observation.
*/
@Test(timeout=300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and be kept for observation is error persists`() {
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and be kept for observation is error persists`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val charlie = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """
@ -299,36 +250,15 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("finality_flag") && readCounter("counter") < 7
IF flagged("finality_flag") && readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val charlieClient =
CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
aliceClient.startFlow(
alice.rpc.startFlow(
::CashIssueAndPaymentFlow,
500.DOLLARS,
OpaqueBytes.of(0x01),
@ -338,20 +268,14 @@ class StatemachineFinalityErrorHandlingTest : StatemachineErrorHandlingTest() {
).returnValue.getOrThrow(30.seconds)
}
val output = getBytemanOutput(charlie)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = charlieClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(1, aliceClient.stateMachinesSnapshot().size)
assertEquals(1, charlieClient.stateMachinesSnapshot().size)
// 1 for CashIssueAndPaymentFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
// 1 for ReceiveFinalityFlow and 1 for GetNumberOfCheckpointsFlow
assertEquals(2, charlieClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(1)
charlie.rpc.assertNumberOfCheckpoints(1)
}
}
}

View File

@ -0,0 +1,581 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaRuntimeException
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.api.CheckpointStorage
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.internal.OutOfProcessImpl
import org.junit.Test
import java.sql.Connection
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
private companion object {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.processEvent].
*
* This is not an expected place for an exception to occur, but allows us to test what happens when a random exception is propagated
* up to [FlowStateMachineImpl.run] during flow initialisation.
*
* A "Transaction context is missing" exception is thrown due to where the exception is thrown (no transaction is created so this is
* thrown when leaving [FlowStateMachineImpl.processEventsUntilFlowIsResumed] due to the finally block).
*/
@Test(timeout = 300_000)
fun `unexpected error during flow initialisation throws exception to client`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD processEvent
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD processEvent
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<CordaRuntimeException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
}
alice.rpc.assertHospitalCounts(propagated = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* A [SQLException] is then thrown when trying to rollback the flow's database transaction.
*
* The [SQLException] should be suppressed and the flow should continue to retry and complete successfully.
*/
@Test(timeout = 300_000)
fun `error during initialisation when trying to rollback the flow's database transaction the flow is able to retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") == 0
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception when rolling back transaction in transition executor
INTERFACE ${Connection::class.java.name}
METHOD rollback
AT ENTRY
IF readCounter("counter") == 1
DO incrementCounter("counter"); traceln("Throwing exception in transition executor"); throw new java.sql.SQLException("could not reach db", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
alice.rpc.assertHospitalCounts(discharged = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* A [SQLException] is then thrown when trying to close the flow's database transaction.
*
* The [SQLException] should be suppressed and the flow should continue to retry and complete successfully.
*/
@Test(timeout = 300_000)
fun `error during initialisation when trying to close the flow's database transaction the flow is able to retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") == 0
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception when rolling back transaction in transition executor
INTERFACE ${Connection::class.java.name}
METHOD close
AT ENTRY
IF readCounter("counter") == 1
DO incrementCounter("counter"); traceln("Throwing exception in transition executor"); throw new java.sql.SQLException("could not reach db", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
alice.rpc.assertHospitalCounts(discharged = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
val terminated = (alice as OutOfProcessImpl).stop(60.seconds)
assertTrue(terminated, "The node must be shutdown before it can be restarted")
val (alice2, _) = createBytemanNode(ALICE_NAME)
Thread.sleep(10.seconds.toMillis())
alice2.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the retry itself.
*
* The flow then retries the retry causing the flow to complete successfully.
*/
@Test(timeout = 300_000)
fun `error during retrying a flow that failed when committing its original checkpoint will retry the flow again and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Throw exception on retry
CLASS ${SingleThreadedStateMachineManager::class.java.name}
METHOD onExternalStartFlow
AT ENTRY
IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(
discharged = 1,
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
* saved its first checkpoint (remains in an unstarted state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
charlie.rpc.assertHospitalCounts(discharged = 3)
alice.rpc.assertNumberOfCheckpoints(0)
charlie.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
* saved its first checkpoint (remains in an unstarted state).
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(1)
charlie.rpc.assertNumberOfCheckpoints(0)
val terminated = (charlie as OutOfProcessImpl).stop(60.seconds)
assertTrue(terminated, "The node must be shutdown before it can be restarted")
val (charlie2, _) = createBytemanNode(CHARLIE_NAME)
Thread.sleep(10.seconds.toMillis())
alice.rpc.assertNumberOfCheckpoints(0)
charlie2.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state) on a responding node.
*
* The exception is thrown 3 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `responding flow - session init can be retried when there is a transient connection error to the database`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 0
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state) on a responding node.
*
* The exception is thrown 4 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* fails and is kept for in for observation.
*/
@Test(timeout = 300_000)
fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
}
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
}

View File

@ -0,0 +1,661 @@
package net.corda.node.services.statemachine
import net.corda.core.CordaRuntimeException
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.statemachine.transitions.TopLevelTransition
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
private companion object {
val executor: ExecutorService = Executors.newSingleThreadExecutor()
}
/**
* Throws an exception when performing an [Action.SendInitial] action.
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and is then kept in
* the hospital for observation.
*/
@Test(timeout = 300_000)
fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendMultiple
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendMultiple action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendMultiple
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
}
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(1)
}
}
/**
* Throws an exception when performing an [Action.SendInitial] event.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendMultiple
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendMultiple action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendMultiple
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when executing [DeduplicationHandler.afterDatabaseTransaction] from inside an [Action.AcknowledgeMessages] action.
*
* The exception is thrown every time [DeduplicationHandler.afterDatabaseTransaction] is executed inside of
* [ActionExecutorImpl.executeAcknowledgeMessages]
*
* The exceptions should be swallowed. Therefore there should be no trips to the hospital and no retries.
* The flow should complete successfully as the error is swallowed.
*/
@Test(timeout = 300_000)
fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Set flag when inside executeAcknowledgeMessages
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeAcknowledgeMessages
AT INVOKE ${DeduplicationHandler::class.java.name}.afterDatabaseTransaction()
IF !flagged("exception_flag")
DO flag("exception_flag"); traceln("Setting flag to true")
ENDRULE
RULE Throw exception when executing ${DeduplicationHandler::class.java.name}.afterDatabaseTransaction when inside executeAcknowledgeMessages
INTERFACE ${DeduplicationHandler::class.java.name}
METHOD afterDatabaseTransaction
AT ENTRY
IF flagged("exception_flag")
DO traceln("Throwing exception"); clear("exception_flag"); traceln("SETTING FLAG TO FALSE"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCountsAllZero()
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when trying to propagate an error (processing an
* [Event.StartErrorPropagation] event)
*
* The exception is thrown 3 times.
*
* This causes the flow to retry the [Event.StartErrorPropagation] event until it succeeds. This this scenario it is retried 3 times,
* on the final retry the flow successfully propagates the error and completes exceptionally.
*/
@Test(timeout = 300_000)
fun `error during error propagation the flow is able to retry and recover`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ThrowAnErrorFlow::class.java.name}
METHOD throwException
AT ENTRY
IF !flagged("my_flag")
DO traceln("SETTING FLAG TO TRUE"); flag("my_flag")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("my_flag") && readCounter("counter") < 3
DO traceln("Throwing exception"); incrementCounter("counter"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<CordaRuntimeException> {
alice.rpc.startFlow(StateMachineErrorHandlingTest::ThrowAnErrorFlow).returnValue.getOrThrow(60.seconds)
}
alice.rpc.assertHospitalCounts(
propagated = 1,
propagatedRetry = 3
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when replaying a flow that has already successfully created its initial checkpoint.
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the retry itself.
*
* The flow is discharged and replayed from the hospital. An exception is then thrown during the retry that causes the flow to be
* retried again.
*/
@Test(timeout = 300_000)
fun `error during flow retry when executing retryFlowFromSafePoint the flow is able to retry and recover`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Set flag when executing first commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Throw exception on retry
CLASS ${SingleThreadedStateMachineManager::class.java.name}
METHOD addAndStartFlow
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(40.seconds)
alice.rpc.assertHospitalCounts(
discharged = 1,
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event after the flow has suspended (has moved to a started state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
// seems to be restarting the flow from the beginning every time
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when executing first suspend
CLASS ${TopLevelTransition::class.java.name}
METHOD suspendTransition
AT ENTRY
IF !flagged("suspend_flag")
DO flag("suspend_flag"); traceln("Setting suspend flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Set flag when executing first commit
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
// seems to be restarting the flow from the beginning every time
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); clear("remove_checkpoint_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Throws a [ConstraintViolationException] when performing an [Action.CommitTransaction] event when the flow is finishing.
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it begins from the previous checkpoint where it suspended before failing.
*/
@Test(timeout = 300_000)
fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 4
DO incrementCounter("counter");
clear("remove_checkpoint_flag");
traceln("Throwing exception");
throw new org.hibernate.exception.ConstraintViolationException("This flow has a terminal condition", new java.sql.SQLException(), "made up constraint")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<TimeoutException> {
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
}
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(1)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state).
*
* The exception is thrown 3 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `flow can be retried when there is a transient connection error to the database`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 0
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, alice.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has suspended (remains in an unstarted
* state).
*
* The exception is thrown 4 times.
*
* An exception is also thrown from [CheckpointStorage.getCheckpoint].
*
* This test is to prevent a regression, where a transient database connection error can be thrown retrieving a flow's checkpoint when
* retrying the flow after it failed to commit it's original checkpoint.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* fails and is kept for in for observation.
*/
@Test(timeout = 300_000)
fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Throw exception on getCheckpoint
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, alice.rpc.startFlow(StateMachineErrorHandlingTest::GetNumberOfCheckpointsFlow).returnValue.get())
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event when the flow is finishing on a responding node.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Set flag when adding action to remove checkpoint
CLASS ${TopLevelTransition::class.java.name}
METHOD flowFinishTransition
AT ENTRY
IF !flagged("remove_checkpoint_flag")
DO flag("remove_checkpoint_flag"); traceln("Setting remove checkpoint flag to true")
ENDRULE
RULE Throw exception on executeCommitTransaction when removing checkpoint
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeCommitTransaction
AT ENTRY
IF flagged("remove_checkpoint_flag") && readCounter("counter") < 3
DO incrementCounter("counter");
clear("remove_checkpoint_flag");
traceln("Throwing exception");
throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
charlie.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
charlie.rpc.assertNumberOfCheckpoints(0)
}
}
}

View File

@ -0,0 +1,181 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
/**
* Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout = 300_000)
fun `error during transition due to killing a flow will terminate the flow`() {
startDriver {
val alice = createNode(ALICE_NAME)
val flow = alice.rpc.startTrackedFlow(StateMachineKillFlowErrorHandlingTest::SleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == SleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = alice.rpc.killFlow(flow.id)
}
}
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(20.seconds) }
assertTrue(flowKilled)
alice.rpc.assertHospitalCountsAllZero()
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Triggers `killFlow` during user application code.
*
* The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable]
* call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly.
*
* Although the call to kill the flow is made during user application code. It will not be removed / stop processing
* until the next suspension point is reached within the flow.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout = 300_000)
fun `flow killed during user code execution stops and removes the flow correctly`() {
startDriver {
val alice = createNode(ALICE_NAME)
val flow = alice.rpc.startTrackedFlow(StateMachineKillFlowErrorHandlingTest::ThreadSleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == ThreadSleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = alice.rpc.killFlow(flow.id)
}
}
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(30.seconds) }
assertTrue(flowKilled)
alice.rpc.assertHospitalCountsAllZero()
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
/**
* Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and
* all that remains is its checkpoint in the database.
*
* The flow terminates and is not retried.
*
* Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are
* from the original flow that was put in for observation.
*/
@Test(timeout = 300_000)
fun `flow killed when it is in the flow hospital for observation is removed correctly`() {
startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME)
val charlie = createNode(CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendMultiple
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendMultiple action
CLASS ${ActionExecutorImpl::class.java.name}
METHOD executeSendMultiple
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
val flow = alice.rpc.startFlow(StateMachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
alice.rpc.killFlow(flow.id)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
@StartableByRPC
class SleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
}
@StartableByRPC
class ThreadSleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep()
logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
// Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar
private fun sleep() {
Thread.sleep(20000)
}
}
}

View File

@ -1,7 +1,6 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession
import net.corda.core.flows.InitiatedBy
@ -20,13 +19,14 @@ import org.junit.Test
import kotlin.test.assertEquals
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
class StateMachineSubFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
/**
* This test checks that flow calling an initiating subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty.
* The exception is thrown 5 times.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
@ -37,11 +37,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test(timeout=300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
@Test(timeout = 300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
@ -72,7 +72,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 5
IF flagged("subflow_flag") && flagged("suspend_flag") && flagged("commit_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
@ -83,52 +83,20 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
IF flagged("subflow_flag") && flagged("suspend_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
alice.rpc.startFlow(
StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
@ -136,7 +104,8 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* This test checks that flow calling an initiating subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty.
* The exception is thrown 5 times.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
@ -147,11 +116,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test(timeout=300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs after the first receive will retry and complete successfully`() {
@Test(timeout = 300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs after the first receive will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
@ -182,55 +151,23 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("suspend_flag") && readCounter("counter") < 5
IF flagged("subflow_flag") && flagged("suspend_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
alice.rpc.startFlow(
StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInitiatingSubflowFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
@ -238,7 +175,8 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* This test checks that flow calling an inline subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first send to a counterparty.
* The exception is thrown 5 times.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
@ -249,11 +187,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test(timeout=300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
@Test(timeout = 300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
@ -276,55 +214,23 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && readCounter("counter") < 5
IF flagged("subflow_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
alice.rpc.startFlow(
StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}
@ -332,7 +238,8 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* This test checks that flow calling an inline subflow will recover correctly.
*
* Throws an exception when performing an [Action.CommitTransaction] event during the subflow's first receive from a counterparty.
* The exception is thrown 5 times.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
@ -343,11 +250,11 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
* if an error transition moves into another error transition. The flow still recovers from this state. 5 exceptions were thrown to verify
* that 3 retries are attempted before recovering.
*/
@Test(timeout=300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first receive will retry and complete successfully`() {
@Test(timeout = 300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first receive will retry and complete successfully`() {
startDriver {
val charlie = createNode(CHARLIE_NAME)
val alice = createBytemanNode(ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """
RULE Create Counter
@ -370,7 +277,7 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF flagged("subflow_flag") && flagged("commit_flag") && readCounter("counter") < 5
IF flagged("subflow_flag") && flagged("commit_flag") && readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
@ -381,52 +288,20 @@ class StatemachineSubflowErrorHandlingTest : StatemachineErrorHandlingTest() {
IF flagged("subflow_flag") && !flagged("commit_flag")
DO flag("commit_flag"); traceln("Setting commit flag to true")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
submitBytemanRules(rules, port)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
aliceClient.startFlow(StatemachineSubflowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow, charlie.nodeInfo.singleIdentity()).returnValue.getOrThrow(
alice.rpc.startFlow(
StateMachineSubFlowErrorHandlingTest::SendAMessageInAnInlineSubflowFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
alice.rpc.assertNumberOfCheckpoints(0)
}
}

View File

@ -1,321 +0,0 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.flows.StartableByRPC
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
import org.junit.Test
import java.time.Duration
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeoutException
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineKillFlowErrorHandlingTest : StatemachineErrorHandlingTest() {
/**
* Triggers `killFlow` while the flow is suspended causing a [InterruptedException] to be thrown and passed through the hospital.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout=300_000)
fun `error during transition due to killing a flow will terminate the flow`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(StatemachineKillFlowErrorHandlingTest::SleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == SleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(20.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` during user application code.
*
* The user application code is mimicked by a [Thread.sleep] which is importantly not placed inside the [Suspendable]
* call function. Placing it inside a [Suspendable] function causes quasar to behave unexpectedly.
*
* Although the call to kill the flow is made during user application code. It will not be removed / stop processing
* until the next suspension point is reached within the flow.
*
* The flow terminates and is not retried.
*
* No pass through the hospital is recorded. As the flow is marked as `isRemoved`.
*/
@Test(timeout=300_000)
fun `flow killed during user code execution stops and removes the flow correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val rules = """
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startTrackedFlow(StatemachineKillFlowErrorHandlingTest::ThreadSleepFlow)
var flowKilled = false
flow.progress.subscribe {
if (it == ThreadSleepFlow.STARTED.label) {
Thread.sleep(5000)
flowKilled = aliceClient.killFlow(flow.id)
}
}
assertFailsWith<KilledFlowException> { flow.returnValue.getOrThrow(30.seconds) }
val output = getBytemanOutput(alice)
assertTrue(flowKilled)
// Check the stdout for the lines generated by byteman
assertEquals(0, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(0, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
println(numberOfTerminalDiagnoses)
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(0, discharge)
assertEquals(0, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
/**
* Triggers `killFlow` after the flow has already been sent to observation. The flow is not running at this point and
* all that remains is its checkpoint in the database.
*
* The flow terminates and is not retried.
*
* Killing the flow does not lead to any passes through the hospital. All the recorded passes through the hospital are
* from the original flow that was put in for observation.
*/
@Test(timeout=300_000)
fun `flow killed when it is in the flow hospital for observation is removed correctly`() {
startDriver {
val alice = createBytemanNode(ALICE_NAME)
val charlie = createNode(CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeSendMultiple action
CLASS $actionExecutorClassName
METHOD executeSendMultiple
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
RULE Entering internal error staff member
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT ENTRY
IF true
DO traceln("Reached internal transition error staff member")
ENDRULE
RULE Increment discharge counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ DISCHARGE
IF true
DO traceln("Byteman test - discharging")
ENDRULE
RULE Increment observation counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ OVERNIGHT_OBSERVATION
IF true
DO traceln("Byteman test - overnight observation")
ENDRULE
RULE Increment terminal counter
CLASS ${StaffedFlowHospital.TransitionErrorGeneralPractitioner::class.java.name}
METHOD consult
AT READ TERMINAL
IF true
DO traceln("Byteman test - terminal")
ENDRULE
""".trimIndent()
submitBytemanRules(rules)
val aliceClient =
CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy
val flow = aliceClient.startFlow(StatemachineErrorHandlingTest::SendAMessageFlow, charlie.nodeInfo.singleIdentity())
assertFailsWith<TimeoutException> { flow.returnValue.getOrThrow(20.seconds) }
aliceClient.killFlow(flow.id)
val output = getBytemanOutput(alice)
// Check the stdout for the lines generated by byteman
assertEquals(3, output.filter { it.contains("Byteman test - discharging") }.size)
assertEquals(1, output.filter { it.contains("Byteman test - overnight observation") }.size)
val numberOfTerminalDiagnoses = output.filter { it.contains("Byteman test - terminal") }.size
assertEquals(0, numberOfTerminalDiagnoses)
val (discharge, observation) = aliceClient.startFlow(StatemachineErrorHandlingTest::GetHospitalCountersFlow).returnValue.get()
assertEquals(3, discharge)
assertEquals(1, observation)
assertEquals(0, aliceClient.stateMachinesSnapshot().size)
// 1 for GetNumberOfCheckpointsFlow
assertEquals(1, aliceClient.startFlow(StatemachineErrorHandlingTest::GetNumberOfUncompletedCheckpointsFlow).returnValue.get())
}
}
@StartableByRPC
class SleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
}
@StartableByRPC
class ThreadSleepFlow : FlowLogic<Unit>() {
object STARTED : ProgressTracker.Step("I am ready to die")
override val progressTracker = ProgressTracker(STARTED)
@Suspendable
override fun call() {
sleep(Duration.of(1, ChronoUnit.SECONDS))
progressTracker.currentStep = STARTED
logger.info("Starting ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep()
logger.info("Finished ${ThreadSleepFlow::class.qualifiedName} application sleep")
sleep(Duration.of(2, ChronoUnit.MINUTES))
}
// Sleep is moved outside of `@Suspendable` function to prevent issues with Quasar
private fun sleep() {
Thread.sleep(20000)
}
}
}

View File

@ -1,6 +1,7 @@
package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import java.sql.SQLException
/**
* An executor of a single [Action].
@ -10,5 +11,6 @@ interface ActionExecutor {
* Execute [action] by [fiber].
*/
@Suspendable
@Throws(SQLException::class)
fun executeAction(fiber: FlowFiber, action: Action)
}

View File

@ -14,6 +14,7 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.sql.SQLException
import java.time.Duration
/**
@ -208,6 +209,7 @@ internal class ActionExecutorImpl(
}
@Suspendable
@Throws(SQLException::class)
private fun executeCreateTransaction() {
if (contextTransactionOrNull != null) {
throw IllegalStateException("Refusing to create a second transaction")
@ -224,6 +226,7 @@ internal class ActionExecutorImpl(
}
@Suspendable
@Throws(SQLException::class)
private fun executeCommitTransaction() {
try {
contextTransaction.commit()

View File

@ -41,7 +41,7 @@ sealed class Event {
* Signal that an error has happened. This may be due to an uncaught exception in the flow or some external error.
* @param exception the exception itself.
*/
data class Error(val exception: Throwable) : Event()
data class Error(val exception: Throwable, val rollback: Boolean = true) : Event()
/**
* Signal that a ledger transaction has committed. This is an event completing a [FlowIORequest.WaitForLedgerCommit]

View File

@ -284,12 +284,15 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
openThreadLocalWormhole()
setLoggingContext()
initialiseFlow()
logger.debug { "Calling flow: $logic" }
val startTime = System.nanoTime()
var initialised = false
val resultOrError = try {
initialiseFlow()
initialised = true
// This sets the Cordapp classloader on the contextClassLoader of the current thread.
// Needed because in previous versions of the finance app we used Thread.contextClassLoader to resolve services defined in cordapps.
Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader
@ -310,14 +313,14 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Event.FlowFinish(resultOrError.value, softLocksId)
}
is Try.Failure -> {
Event.Error(resultOrError.exception)
Event.Error(resultOrError.exception, initialised)
}
}
// Immediately process the last event. This is to make sure the transition can assume that it has an open
// database transaction.
val continuation = processEventImmediately(
finalEvent,
isDbTransactionOpenOnEntry = true,
isDbTransactionOpenOnEntry = initialised,
isDbTransactionOpenOnExit = false
)
if (continuation == FlowContinuation.ProcessEvents) {
@ -335,8 +338,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Suspendable
private fun initialiseFlow() {
processEventsUntilFlowIsResumed(
isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = true
isDbTransactionOpenOnEntry = false,
isDbTransactionOpenOnExit = true
)
}

View File

@ -387,63 +387,51 @@ internal class SingleThreadedStateMachineManager(
flowSleepScheduler.cancel(currentState)
// Get set of external events
val flowId = currentState.flowLogic.runId
try {
val oldFlowLeftOver = innerState.withLock { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
if (oldFlowLeftOver == null) {
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
val oldFlowLeftOver = innerState.withLock { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
if (oldFlowLeftOver == null) {
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: return
// Resurrect flow
flowCreator.createFlowFromCheckpoint(flowId, checkpoint) ?: return
} else {
// Just flow initiation message
null
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: return
// Resurrect flow
flowCreator.createFlowFromCheckpoint(flowId, checkpoint) ?: return
} else {
// Just flow initiation message
null
}
innerState.withLock {
if (stopping) {
return
}
innerState.withLock {
if (stopping) {
return
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
} catch (e: Exception) {
// Failed to retry - manually put the flow in for observation rather than
// relying on the [HospitalisingInterceptor] to do so
val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored)
?.errors
?.map { it.exception }
?.plus(e) ?: emptyList()
logger.info("Failed to retry flow $flowId, keeping in for observation and aborting")
flowHospital.forceIntoOvernightObservation(currentState, exceptions)
throw e
}
}
@ -609,7 +597,8 @@ internal class SingleThreadedStateMachineManager(
// Load the flow's checkpoint
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
checkpointStorage.getCheckpoint(flowId)?.let { serializedCheckpoint ->
val existingCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
existingCheckpoint?.let { serializedCheckpoint ->
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId)
if (checkpoint == null) {
return openFuture<FlowStateMachine<A>>().mapError {
@ -763,6 +752,8 @@ internal class SingleThreadedStateMachineManager(
(exception as? FlowException)?.originalErrorId = flowError.errorId
flow.resultFuture.setException(exception)
lastState.flowLogic.progressTracker?.endWithError(exception)
// Complete the started future, needed when the flow fails during flow init (before completing an [UnstartedFlowTransition])
startedFutures.remove(flow.fiber.id)?.set(Unit)
changesPublisher.onNext(StateMachineManager.Change.Removed(lastState.flowLogic, Try.Failure<Nothing>(exception)))
}

View File

@ -32,6 +32,7 @@ import java.time.Instant
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import javax.persistence.PersistenceException
import kotlin.collections.HashMap
import kotlin.concurrent.timerTask
import kotlin.math.pow
@ -52,15 +53,24 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
DatabaseEndocrinologist,
TransitionErrorGeneralPractitioner,
SedationNurse,
NotaryDoctor
NotaryDoctor,
ResuscitationSpecialist
)
private const val MAX_BACKOFF_TIME = 110.0 // Totals to 2 minutes when calculating the backoff time
@VisibleForTesting
val onFlowKeptForOvernightObservation = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowDischarged = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowErrorPropagated = mutableListOf<(id: StateMachineRunId, by: List<String>) -> Unit>()
@VisibleForTesting
val onFlowResuscitated = mutableListOf<(id: StateMachineRunId, by: List<String>, outcome: Outcome) -> Unit>()
@VisibleForTesting
val onFlowAdmitted = mutableListOf<(id: StateMachineRunId) -> Unit>()
}
@ -194,12 +204,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
/**
* Request treatment for the [flowFiber]. A flow can only be added to the hospital if they are not already being
* treated.
* Request treatment for the [flowFiber].
*/
fun requestTreatment(flowFiber: FlowFiber, currentState: StateMachineState, errors: List<Throwable>) {
// Only treat flows that are not already in the hospital
if (!currentState.isRemoved && flowsInHospital.putIfAbsent(flowFiber.id, flowFiber) == null) {
if (!currentState.isRemoved) {
flowsInHospital[flowFiber.id] = flowFiber
admit(flowFiber, currentState, errors)
}
}
@ -219,20 +228,30 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
Diagnosis.DISCHARGE -> {
val backOff = calculateBackOffForChronicCondition(report, medicalHistory, currentState)
log.info("Flow error discharged from hospital (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})")
onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) }
onFlowDischarged.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }) }
Triple(Outcome.DISCHARGE, Event.RetryFlowFromSafePoint, backOff)
}
Diagnosis.OVERNIGHT_OBSERVATION -> {
log.info("Flow error kept for overnight observation by ${report.by} (error was ${report.error.message})")
// We don't schedule a next event for the flow - it will automatically retry from its checkpoint on node restart
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map{it.toString()}) }
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }) }
Triple(Outcome.OVERNIGHT_OBSERVATION, Event.OvernightObservation, 0.seconds)
}
Diagnosis.NOT_MY_SPECIALTY, Diagnosis.TERMINAL -> {
// None of the staff care for these errors, or someone decided it is a terminal condition, so we let them propagate
log.info("Flow error allowed to propagate", report.error)
onFlowErrorPropagated.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }) }
Triple(Outcome.UNTREATABLE, Event.StartErrorPropagation, 0.seconds)
}
Diagnosis.RESUSCITATE -> {
// reschedule the last outcome as it failed to process it
// do a 0.seconds backoff in dev mode? / when coming from the driver? make it configurable?
val backOff = calculateBackOffForResuscitation(medicalHistory, currentState)
val outcome = medicalHistory.records.last().outcome
log.info("Flow error to be resuscitated, rescheduling previous outcome - $outcome (delay ${backOff.seconds}s) by ${report.by} (error was ${report.error.message})")
onFlowResuscitated.forEach { hook -> hook.invoke(flowFiber.id, report.by.map { it.toString() }, outcome) }
Triple(outcome, outcome.event, backOff)
}
}
val numberOfSuspends = currentState.checkpoint.checkpointState.numberOfSuspends
@ -251,18 +270,29 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
}
}
private fun calculateBackOffForChronicCondition(report: ConsultationReport, medicalHistory: FlowMedicalHistory, currentState: StateMachineState): Duration {
return report.by.firstOrNull { it is Chronic }?.let { chronicStaff ->
return medicalHistory.timesDischargedForTheSameThing(chronicStaff, currentState).let {
if (it == 0) {
0.seconds
} else {
maxOf(10, (10 + (Math.random()) * (10 * 1.5.pow(it)) / 2).toInt()).seconds
}
}
private fun calculateBackOffForChronicCondition(
report: ConsultationReport,
medicalHistory: FlowMedicalHistory,
currentState: StateMachineState
): Duration {
return report.by.firstOrNull { it is Chronic }?.let { staff ->
calculateBackOff(medicalHistory.timesDischargedForTheSameThing(staff, currentState))
} ?: 0.seconds
}
private fun calculateBackOffForResuscitation(
medicalHistory: FlowMedicalHistory,
currentState: StateMachineState
): Duration = calculateBackOff(medicalHistory.timesResuscitated(currentState))
private fun calculateBackOff(timesDiagnosisGiven: Int): Duration {
return if (timesDiagnosisGiven == 0) {
0.seconds
} else {
maxOf(10, (10 + (Math.random()) * minOf(MAX_BACKOFF_TIME, (10 * 1.5.pow(timesDiagnosisGiven)) / 2)).toInt()).seconds
}
}
private fun consultStaff(flowFiber: FlowFiber,
currentState: StateMachineState,
errors: List<Throwable>,
@ -324,6 +354,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
return records.count { it.outcome == Outcome.DISCHARGE && by in it.by && it.suspendCount == lastAdmittanceSuspendCount }
}
fun timesResuscitated(currentState: StateMachineState): Int {
val lastAdmittanceSuspendCount = currentState.checkpoint.numberOfSuspends
return records.count { ResuscitationSpecialist in it.by && it.suspendCount == lastAdmittanceSuspendCount }
}
override fun toString(): String = "${this.javaClass.simpleName}(records = $records)"
}
@ -357,10 +392,16 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
}
}
enum class Outcome { DISCHARGE, OVERNIGHT_OBSERVATION, UNTREATABLE }
enum class Outcome(val event: Event?) {
DISCHARGE(Event.RetryFlowFromSafePoint),
OVERNIGHT_OBSERVATION(null),
UNTREATABLE(Event.StartErrorPropagation)
}
/** The order of the enum values are in priority order. */
enum class Diagnosis {
/** Retry the last outcome/diagnosis **/
RESUSCITATE,
/** The flow should not see other staff members */
TERMINAL,
/** Retry from last safe point. */
@ -375,6 +416,11 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
}
/**
* The [Chronic] interface relates to [Staff] that return diagnoses that can be constantly be diagnosed if the flow keeps returning to
* the hospital. [Chronic] diagnoses apply a backoff before scheduling a new [Event], this prevents a flow from constantly retrying
* without a chance for the underlying issue to resolve itself.
*/
interface Chronic
/**
@ -545,10 +591,10 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
newError.mentionsThrowable(AsyncOperationTransitionException::class.java) -> Diagnosis.NOT_MY_SPECIALTY
history.notDischargedForTheSameThingMoreThan(2, this, currentState) -> Diagnosis.DISCHARGE
else -> Diagnosis.OVERNIGHT_OBSERVATION
}
}.also { logDiagnosis(it, newError, flowFiber, history) }
} else {
Diagnosis.NOT_MY_SPECIALTY
}.also { logDiagnosis(it, newError, flowFiber, history) }
}
}
private fun logDiagnosis(diagnosis: Diagnosis, newError: Throwable, flowFiber: FlowFiber, history: FlowMedicalHistory) {
@ -599,6 +645,25 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
return Diagnosis.NOT_MY_SPECIALTY
}
}
/**
* Handles errors coming from the processing of errors events ([Event.StartErrorPropagation] and [Event.RetryFlowFromSafePoint]),
* returning a [Diagnosis.RESUSCITATE] diagnosis
*/
object ResuscitationSpecialist : Staff {
override fun consult(
flowFiber: FlowFiber,
currentState: StateMachineState,
newError: Throwable,
history: FlowMedicalHistory
): Diagnosis {
return if (newError is ErrorStateTransitionException) {
Diagnosis.RESUSCITATE
} else {
Diagnosis.NOT_MY_SPECIALTY
}
}
}
}
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {

View File

@ -16,3 +16,5 @@ class StateTransitionException(
}
class AsyncOperationTransitionException(exception: Exception) : CordaException(exception.message, exception)
class ErrorStateTransitionException(val exception: Exception) : CordaException(exception.message, exception)

View File

@ -9,6 +9,7 @@ import net.corda.nodeapi.internal.persistence.DatabaseTransactionException
import net.corda.nodeapi.internal.persistence.contextDatabase
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
import java.security.SecureRandom
import java.sql.SQLException
import javax.persistence.OptimisticLockException
/**
@ -19,8 +20,8 @@ import javax.persistence.OptimisticLockException
* completely aborted to avoid error loops.
*/
class TransitionExecutorImpl(
val secureRandom: SecureRandom,
val database: CordaPersistence
val secureRandom: SecureRandom,
val database: CordaPersistence
) : TransitionExecutor {
private companion object {
@ -30,36 +31,44 @@ class TransitionExecutorImpl(
@Suppress("NestedBlockDepth", "ReturnCount")
@Suspendable
override fun executeTransition(
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
contextDatabase = database
for (action in transition.actions) {
try {
actionExecutor.executeAction(fiber, action)
} catch (exception: Exception) {
contextTransactionOrNull?.run {
rollback()
close()
}
rollbackTransactionOnError()
if (transition.newState.checkpoint.errorState is ErrorState.Errored) {
// If we errored while transitioning to an error state then we cannot record the additional
// error as that may result in an infinite loop, e.g. error propagation fails -> record error -> propagate fails again.
// Instead we just keep around the old error state and wait for a new schedule, perhaps
// triggered from a flow hospital
log.warn("Error while executing $action during transition to errored state, aborting transition", exception)
// CORDA-3354 - Go to the hospital with the new error that has occurred
// while already in a error state (as this error could be for a different reason)
return Pair(FlowContinuation.Abort, previousState.copy(isFlowResumed = false))
log.warn("Error while executing $action, with error event $event, updating errored state", exception)
val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
listOf(
FlowError(
secureRandom.nextLong(),
ErrorStateTransitionException(exception)
)
)
)
),
isFlowResumed = false
)
return Pair(FlowContinuation.ProcessEvents, newState)
} else {
// Otherwise error the state manually keeping the old flow state and schedule a DoRemainingWork
// to trigger error propagation
if(previousState.isRemoved && exception is OptimisticLockException) {
log.debug("Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " +
"Occurred while executing $action, with event $event", exception)
if (log.isDebugEnabled && previousState.isRemoved && exception is OptimisticLockException) {
log.debug(
"Flow has been killed and the following error is likely due to the flow's checkpoint being deleted. " +
"Occurred while executing $action, with event $event", exception
)
} else {
log.info("Error while executing $action, with event $event, erroring state", exception)
}
@ -77,12 +86,12 @@ class TransitionExecutorImpl(
}
val newState = previousState.copy(
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException))
)
),
isFlowResumed = false
checkpoint = previousState.checkpoint.copy(
errorState = previousState.checkpoint.errorState.addErrors(
listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException))
)
),
isFlowResumed = false
)
fiber.scheduleEvent(Event.DoRemainingWork)
return Pair(FlowContinuation.ProcessEvents, newState)
@ -91,4 +100,25 @@ class TransitionExecutorImpl(
}
return Pair(transition.continuation, transition.newState)
}
private fun rollbackTransactionOnError() {
contextTransactionOrNull?.run {
try {
rollback()
} catch (rollbackException: SQLException) {
log.info(
"Error rolling back database transaction from a previous error, continuing error handling for the original error",
rollbackException
)
}
try {
close()
} catch (rollbackException: SQLException) {
log.info(
"Error closing database transaction from a previous error, continuing error handling for the original error",
rollbackException
)
}
}
}
}

View File

@ -17,17 +17,17 @@ import net.corda.node.services.statemachine.transitions.TransitionResult
* transition.
*/
class HospitalisingInterceptor(
private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor
private val flowHospital: StaffedFlowHospital,
private val delegate: TransitionExecutor
) : TransitionExecutor {
@Suspendable
override fun executeTransition(
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
fiber: FlowFiber,
previousState: StateMachineState,
event: Event,
transition: TransitionResult,
actionExecutor: ActionExecutor
): Pair<FlowContinuation, StateMachineState> {
// If the fiber's previous state was clean then remove it from the hospital
@ -38,8 +38,8 @@ class HospitalisingInterceptor(
val (continuation, nextState) = delegate.executeTransition(fiber, previousState, event, transition, actionExecutor)
if (nextState.checkpoint.errorState is ErrorState.Errored && previousState.checkpoint.errorState is ErrorState.Clean) {
val exceptionsToHandle = nextState.checkpoint.errorState.errors.map { it.exception }
if (canEnterHospital(previousState, nextState)) {
val exceptionsToHandle = (nextState.checkpoint.errorState as ErrorState.Errored).errors.map { it.exception }
flowHospital.requestTreatment(fiber, previousState, exceptionsToHandle)
}
if (nextState.isRemoved) {
@ -48,6 +48,11 @@ class HospitalisingInterceptor(
return Pair(continuation, nextState)
}
private fun canEnterHospital(previousState: StateMachineState, nextState: StateMachineState): Boolean {
return nextState.checkpoint.errorState is ErrorState.Errored
&& (previousState.checkpoint.errorState as? ErrorState.Errored)?.errors != nextState.checkpoint.errorState.errors
}
private fun removeFlow(id: StateMachineRunId) {
flowHospital.leave(id)
flowHospital.removeMedicalHistory(id)

View File

@ -1,6 +1,8 @@
package net.corda.node.services.statemachine.transitions
import net.corda.node.services.statemachine.*
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.StateMachineState
/**
* This transition checks the current state of the flow and determines whether anything needs to be done.

View File

@ -62,7 +62,7 @@ class TopLevelTransition(
private fun errorTransition(event: Event.Error): TransitionResult {
return builder {
freshErrorTransition(event.exception)
freshErrorTransition(event.exception, event.rollback)
FlowContinuation.ProcessEvents
}
}
@ -314,9 +314,7 @@ class TopLevelTransition(
private fun retryFlowFromSafePointTransition(startingState: StateMachineState): TransitionResult {
return builder {
// Need to create a flow from the prior checkpoint or flow initiation.
actions.add(Action.CreateTransaction)
actions.add(Action.RetryFlowFromSafePoint(startingState))
actions.add(Action.CommitTransaction)
FlowContinuation.Abort
}
}

View File

@ -28,12 +28,12 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
*
* @param error the error.
*/
fun freshErrorTransition(error: Throwable) {
fun freshErrorTransition(error: Throwable, rollback: Boolean = true) {
val flowError = FlowError(
errorId = (error as? IdentifiableException)?.errorId ?: context.secureRandom.nextLong(),
exception = error
)
errorTransition(flowError)
errorTransition(flowError, rollback)
}
/**
@ -42,7 +42,7 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
*
* @param error the error.
*/
fun errorsTransition(errors: List<FlowError>) {
fun errorsTransition(errors: List<FlowError>, rollback: Boolean) {
currentState = currentState.copy(
checkpoint = currentState.checkpoint.copy(
errorState = currentState.checkpoint.errorState.addErrors(errors)
@ -50,10 +50,10 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
isFlowResumed = false
)
actions.clear()
actions.addAll(arrayOf(
Action.RollbackTransaction,
Action.ScheduleEvent(Event.DoRemainingWork)
))
if(rollback) {
actions += Action.RollbackTransaction
}
actions += Action.ScheduleEvent(Event.DoRemainingWork)
}
/**
@ -62,8 +62,8 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
*
* @param error the error.
*/
fun errorTransition(error: FlowError) {
errorsTransition(listOf(error))
fun errorTransition(error: FlowError, rollback: Boolean) {
errorsTransition(listOf(error), rollback)
}
fun resumeFlowLogic(result: Any?): FlowContinuation {

View File

@ -0,0 +1,2 @@
## corda-serialization-deterministic.
This artifact is a deterministic subset of the binary contents of `corda-serialization`.

View File

@ -193,12 +193,20 @@ artifacts {
publish file: deterministicJar, name: jarBaseName, type: 'jar', extension: 'jar', builtBy: metafix
}
tasks.named('sourceJar', Jar) {
from 'README.md'
include 'README.md'
}
tasks.named('javadocJar', Jar) {
from 'README.md'
include 'README.md'
}
publish {
dependenciesFrom(configurations.deterministicArtifacts) {
defaultScope = 'compile'
}
publishSources = false
publishJavadoc = false
name jarBaseName
}

View File

@ -0,0 +1,2 @@
## corda-node-driver.
This artifact is the node-driver used for testing Corda.

View File

@ -73,9 +73,17 @@ jar {
}
}
tasks.named('javadocJar', Jar) {
from 'README.md'
include 'README.md'
}
tasks.named('javadoc', Javadoc) {
enabled = false
}
publish {
publishSources = true
publishJavadoc = false
name jar.baseName
}