Merge pull request #6665 from corda/jzd/merge-os4.6-to-os4.7-2020-08-26

NOTICK: Merge OS 4.6 into OS 4.7
This commit is contained in:
Dan Newton 2020-09-01 13:23:43 +01:00 committed by GitHub
commit a64a3bd02d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1266 additions and 481 deletions

View File

@ -114,7 +114,9 @@ pipeline {
"-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_USERNAME=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_PASSWORD=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
" clean preAllocateForParallelRegressionTest preAllocateForAllParallelSlowIntegrationTest pushBuildImage --stacktrace"
" clean preAllocateForAllParallelUnitTest preAllocateForAllParallelIntegrationTest " +
" preAllocateForAllParallelSlowIntegrationTest preAllocateForAllParallelSmokeTest " +
" pushBuildImage --stacktrace"
}
sh "kubectl auth can-i get pods"
}
@ -122,7 +124,7 @@ pipeline {
stage('Testing phase') {
parallel {
stage('Regression Test') {
stage('Unit Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
@ -132,7 +134,33 @@ pipeline {
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" parallelRegressionTest --stacktrace"
" allParallelUnitTest --stacktrace"
}
}
stage('Integration Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" allParallelIntegrationTest --stacktrace"
}
}
stage('Smoke Test') {
steps {
sh "./gradlew " +
"-DbuildId=\"\${BUILD_ID}\" " +
"-Dkubenetize=true " +
"-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
"-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
"-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
"-Dgit.branch=\"\${GIT_BRANCH}\" " +
"-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
" allParallelSmokeTest --stacktrace"
}
}
stage('Slow Integration Test') {

View File

@ -753,19 +753,14 @@ distributedTesting {
profile 'generalPurpose.yml'
distribution DistributeTestsBy.METHOD
}
parallelRegressionTest {
testGroups 'test', 'integrationTest', 'smokeTest'
profile 'generalPurpose.yml'
distribution DistributeTestsBy.METHOD
}
allParallelSmokeTest {
testGroups 'smokeTest'
profile 'generalPurpose.yml'
profile 'regression.yml'
distribution DistributeTestsBy.METHOD
}
allParallelSlowIntegrationTest {
testGroups 'slowIntegrationTest'
profile 'generalPurpose.yml'
profile 'regression.yml'
distribution DistributeTestsBy.METHOD
}
}

View File

@ -321,6 +321,14 @@ interface CordaRPCOps : RPCOps {
*/
fun removeClientId(clientId: String): Boolean
/**
* Returns all finished flows that were started with a client id.
*
* @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully,
* [false] if completed exceptionally.
*/
fun finishedFlowsWithClientIds(): Map<String, Boolean>
/** Returns Node's NodeInfo, assuming this will not change while the node is running. */
fun nodeInfo(): NodeInfo

View File

@ -73,6 +73,7 @@ class DatabaseTransaction(
firstExceptionInDatabaseTransaction = null
}
@Throws(SQLException::class)
fun commit() {
firstExceptionInDatabaseTransaction?.let {
throw DatabaseTransactionException(it)

View File

@ -92,6 +92,25 @@ open class SchemaMigration(
}
}
/**
* Returns the count of pending database migration changes
* @param schemas The set of MappedSchemas to check
* @param forceThrowOnMissingMigration throws an exception if a mapped schema is missing the migration resource. Can be set to false
* when allowing hibernate to create missing schemas in dev or tests.
*/
fun getPendingChangesCount(schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean) : Int {
val resourcesAndSourceInfo = prepareResources(schemas, forceThrowOnMissingMigration)
// current version of Liquibase appears to be non-threadsafe
// this is apparent when multiple in-process nodes are all running migrations simultaneously
mutex.withLock {
dataSource.connection.use { connection ->
val (_, changeToRunCount, _) = prepareRunner(connection, resourcesAndSourceInfo)
return changeToRunCount;
}
}
}
/**
* Synchronises the changelog table with the schema descriptions passed in without applying any of the changes to the database.
* This can be used when migrating a CorDapp that had its schema generated by hibernate to liquibase schema migration, or when

View File

@ -10,11 +10,13 @@ import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap
import net.corda.node.internal.CheckpointIncompatibleException
import net.corda.node.services.statemachine.Checkpoint
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.core.singleIdentity
import net.corda.testing.driver.DriverDSL
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.node.internal.assertUncompletedCheckpoints
@ -32,7 +34,7 @@ class FlowCheckpointVersionNodeStartupCheckTest {
}
@Test(timeout=300_000)
fun `restart node with mismatch between suspended flow and installed CorDapps`() {
fun `restart node with mismatch between suspended flow and installed CorDapps`() {
driver(DriverParameters(
startNodesInProcess = false,
inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
@ -40,7 +42,44 @@ class FlowCheckpointVersionNodeStartupCheckTest {
notarySpecs = emptyList(),
allowHibernateToManageAppSchema = false
)) {
createSuspendedFlowInBob()
val (bob, _) = createSuspendedFlowInBob()
bob.stop()
restartBobWithMismatchedCorDapp()
}
}
@Test(timeout=300_000)
fun `restart node with mismatch between suspended paused flow and installed CorDapps`() {
driver(DriverParameters(
startNodesInProcess = false,
inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
cordappsForAllNodes = emptyList(),
notarySpecs = emptyList(),
allowHibernateToManageAppSchema = false
)) {
val (bob, flowId) = createSuspendedFlowInBob()
val flow = bob.rpc.startFlow(::UpdateStatusToPaused, flowId)
flow.returnValue.getOrThrow()
bob.stop()
restartBobWithMismatchedCorDapp()
}
}
private fun DriverDSL.createSuspendedFlowInBob(): Pair<NodeHandle, StateMachineRunId> {
val (alice, bob) = listOf(
startNode(providedName = ALICE_NAME),
startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp)))
).map { it.getOrThrow() }
alice.stop() // Stop Alice so that Bob never receives the message
val flowId = bob.rpc.startFlow(FlowCheckpointVersionNodeStartupCheckTest::ReceiverFlow, alice.nodeInfo.singleIdentity()).id
// Wait until Bob's flow has started
bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first()
return Pair(bob, flowId)
}
fun DriverDSL.restartBobWithMismatchedCorDapp() {
val cordappsDir = baseDirectory(BOB_NAME) / "cordapps"
// Test the scenerio where the CorDapp no longer exists
@ -58,21 +97,6 @@ class FlowCheckpointVersionNodeStartupCheckTest {
// The part of the log message generated by CheckpointIncompatibleException.FlowVersionIncompatibleException
"that is incompatible with the current installed version of"
)
}
}
private fun DriverDSL.createSuspendedFlowInBob() {
val (alice, bob) = listOf(
startNode(providedName = ALICE_NAME),
startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp)))
).map { it.getOrThrow() }
alice.stop() // Stop Alice so that Bob never receives the message
bob.rpc.startFlow(FlowCheckpointVersionNodeStartupCheckTest::ReceiverFlow, alice.nodeInfo.singleIdentity())
// Wait until Bob's flow has started
bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first()
bob.stop()
}
private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) {
@ -107,4 +131,13 @@ class FlowCheckpointVersionNodeStartupCheckTest {
@Suspendable
override fun call() = otherSide.send("Hello!")
}
@StartableByRPC
class UpdateStatusToPaused(private val id: StateMachineRunId): FlowLogic<Unit>() {
@Suspendable
override fun call() {
val statement = "Update node_checkpoints set status = ${Checkpoint.FlowStatus.PAUSED.ordinal} where flow_id = '${id.uuid}'"
serviceHub.jdbcSession().prepareStatement(statement).execute()
}
}
}

View File

@ -112,11 +112,10 @@ abstract class StateMachineErrorHandlingTest {
submit.addScripts(listOf(ScriptText("Test script", rules)))
}
internal fun getBytemanOutput(nodeHandle: NodeHandle): List<String> {
return nodeHandle.baseDirectory
.list()
.first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
.readAllLines()
private fun NodeHandle.getBytemanOutput(): List<String> {
return baseDirectory.list()
.filter { "net.corda.node.Corda" in it.toString() && "stdout.log" in it.toString() }
.flatMap { it.readAllLines() }
}
internal fun OutOfProcessImpl.stop(timeout: Duration): Boolean {
@ -126,6 +125,10 @@ abstract class StateMachineErrorHandlingTest {
}.also { onStopCallback() }
}
internal fun NodeHandle.assertBytemanOutput(string: String, count: Int) {
assertEquals(count, getBytemanOutput().filter { string in it }.size)
}
@Suppress("LongParameterList")
internal fun CordaRPCOps.assertHospitalCounts(
discharged: Int = 0,
@ -246,6 +249,7 @@ abstract class StateMachineErrorHandlingTest {
// Internal use for testing only!!
@StartableByRPC
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
@Suspendable
override fun call(): HospitalCounts =
HospitalCounts(
serviceHub.cordaService(HospitalCounter::class.java).dischargedCounter,

View File

@ -2,9 +2,11 @@ package net.corda.node.services.statemachine
import net.corda.core.CordaRuntimeException
import net.corda.core.messaging.startFlow
import net.corda.core.messaging.startFlowWithClientId
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.api.CheckpointStorage
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.CHARLIE_NAME
import net.corda.testing.core.singleIdentity
@ -58,6 +60,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -69,6 +79,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
30.seconds
)
alice.assertBytemanOutput("External start flow event", 4)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
@ -256,6 +267,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -267,6 +286,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.assertBytemanOutput("External start flow event", 4)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
@ -331,6 +351,380 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
}
}
/**
* Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state).
* This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node.
*
* The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier
* to test.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
* The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false
* at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state.
*
*/
@Test(timeout = 300_000)
fun `error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Flag when commit transaction reached
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF true
DO flag("commit")
ENDRULE
RULE Throw exception on executeSignalFlowHasStarted action
CLASS ${DatabaseTransaction::class.java.name}
METHOD commit
AT EXIT
IF readCounter("counter") < 3 && flagged("commit")
DO incrementCounter("counter"); clear("commit"); traceln("Throwing exception"); throw new java.sql.SQLException("you thought it worked didnt you!", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.assertBytemanOutput("External start flow event", 1)
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
*/
@Test(timeout = 300_000)
fun `with client id - error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlowWithClientId(
"here is my client id",
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.assertBytemanOutput("External start flow event", 4)
alice.rpc.assertNumberOfCheckpoints(completed = 1)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when calling [FlowStateMachineImpl.processEvent].
*
* This is not an expected place for an exception to occur, but allows us to test what happens when a random exception is propagated
* up to [FlowStateMachineImpl.run] during flow initialisation.
*
* A "Transaction context is missing" exception is thrown due to where the exception is thrown (no transaction is created so this is
* thrown when leaving [FlowStateMachineImpl.processEventsUntilFlowIsResumed] due to the finally block).
*/
@Test(timeout = 300_000)
fun `with client id - unexpected error during flow initialisation throws exception to client`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Create Counter
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD processEvent
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception
CLASS ${FlowStateMachineImpl::class.java.name}
METHOD processEvent
AT ENTRY
IF readCounter("counter") < 1
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
assertFailsWith<CordaRuntimeException> {
alice.rpc.startFlowWithClientId(
"give me all of your client ids, or else",
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(30.seconds)
}
alice.rpc.assertNumberOfCheckpoints(failed = 1)
alice.rpc.assertHospitalCounts(propagated = 1)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* The exception is thrown 4 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*/
@Test(timeout = 450_000)
fun `with client id - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Throw exception on executeCommitTransaction action
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
executor.execute {
alice.rpc.startFlowWithClientId(
"please sir, can i have a client id?",
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
)
}
// flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
Thread.sleep(30.seconds.toMillis())
alice.assertBytemanOutput("External start flow event", 4)
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
val terminated = (alice as OutOfProcessImpl).stop(60.seconds)
assertTrue(terminated, "The node must be shutdown before it can be restarted")
val (alice2, _) = createBytemanNode(ALICE_NAME)
Thread.sleep(20.seconds.toMillis())
alice2.rpc.assertNumberOfCheckpoints(completed = 1)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
* (remains in an unstarted state).
*
* An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
* exception is then thrown during the retry itself.
*
* The flow then retries the retry causing the flow to complete successfully.
*/
@Test(timeout = 300_000)
fun `with client id - error during retrying a flow that failed when committing its original checkpoint will retry the flow again and complete successfully`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Throw exception on executeCommitTransaction action after first suspend + commit
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF !flagged("commit_exception_flag")
DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Throw exception on retry
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")
DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlowWithClientId(
"hi, i'd like to be your client id",
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpoints(completed = 1)
alice.rpc.assertHospitalCounts(
discharged = 1,
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state).
* This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node.
*
* The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier
* to test.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
* The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false
* at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state.
*
*/
@Test(timeout = 300_000)
fun `with client id - error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() {
startDriver {
val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Flag when commit transaction reached
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF true
DO flag("commit")
ENDRULE
RULE Throw exception on executeSignalFlowHasStarted action
CLASS ${DatabaseTransaction::class.java.name}
METHOD commit
AT EXIT
IF readCounter("counter") < 3 && flagged("commit")
DO incrementCounter("counter"); clear("commit"); traceln("Throwing exception"); throw new java.sql.SQLException("you thought it worked didnt you!", "1")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlowWithClientId(
"hello im a client id",
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.assertBytemanOutput("External start flow event", 1)
alice.rpc.assertNumberOfCheckpoints(completed = 1)
alice.rpc.assertHospitalCounts(discharged = 3)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
* saved its first checkpoint (remains in an unstarted state).
@ -363,6 +757,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 3
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log session init event
CLASS $stateMachineManagerClassName
METHOD onSessionInit
AT ENTRY
IF true
DO traceln("On session init event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -375,6 +777,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
)
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.assertBytemanOutput("On session init event", 4)
charlie.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(discharged = 3)
}
@ -411,6 +814,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
IF readCounter("counter") < 4
DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
ENDRULE
RULE Log session init event
CLASS $stateMachineManagerClassName
METHOD onSessionInit
AT ENTRY
IF true
DO traceln("On session init event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
@ -423,6 +834,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
Thread.sleep(30.seconds.toMillis())
alice.rpc.assertNumberOfCheckpoints(runnable = 1)
charlie.assertBytemanOutput("On session init event", 4)
charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
charlie.rpc.assertHospitalCounts(
discharged = 3,
@ -463,7 +875,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
IF createCounter("counter", $counter) && createCounter("counter_2", $counter)
DO traceln("Counter created")
ENDRULE
@ -479,8 +891,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
IF readCounter("counter_2") < 3
DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
@ -496,7 +908,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 0
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
@ -527,7 +939,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
IF createCounter("counter", $counter) && createCounter("counter_2", $counter)
DO traceln("Counter created")
ENDRULE
@ -543,8 +955,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
IF readCounter("counter_2") < 3
DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
@ -562,10 +974,84 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
charlie.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
observation = 1,
dischargedRetry = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
}
}
/**
* Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state).
* This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node.
*
* The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier
* to test.
*
* The exception is thrown 3 times.
*
* This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
* succeeds and the flow finishes.
*
* Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
*
* The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false
* at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state.
*
*/
@Test(timeout = 300_000)
fun `responding flow - error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() {
startDriver {
val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME)
val rules = """
RULE Create Counter
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
DO traceln("Counter created")
ENDRULE
RULE Flag when commit transaction reached
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF true
DO flag("commit")
ENDRULE
RULE Throw exception on executeSignalFlowHasStarted action
CLASS ${DatabaseTransaction::class.java.name}
METHOD commit
AT EXIT
IF readCounter("counter") < 3 && flagged("commit")
DO incrementCounter("counter"); clear("commit"); traceln("Throwing exception"); throw new java.sql.SQLException("you thought it worked didnt you!", "1")
ENDRULE
RULE Log session init event
CLASS $stateMachineManagerClassName
METHOD onSessionInit
AT ENTRY
IF true
DO traceln("On session init event")
ENDRULE
""".trimIndent()
submitBytemanRules(rules, port)
alice.rpc.startFlow(
StateMachineErrorHandlingTest::SendAMessageFlow,
charlie.nodeInfo.singleIdentity()
).returnValue.getOrThrow(
30.seconds
)
alice.rpc.assertNumberOfCheckpointsAllZero()
charlie.assertBytemanOutput("On session init event", 1)
charlie.rpc.assertNumberOfCheckpointsAllZero()
charlie.rpc.assertHospitalCounts(discharged = 3)
}
}
}

View File

@ -494,7 +494,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
IF createCounter("counter", $counter) && createCounter("counter_2", $counter)
DO traceln("Counter created")
ENDRULE
@ -510,8 +510,16 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF readCounter("counter_2") < 3
DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
RULE Log external start flow event
CLASS $stateMachineManagerClassName
METHOD onExternalStartFlow
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
DO traceln("External start flow event")
ENDRULE
""".trimIndent()
@ -527,7 +535,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
alice.rpc.assertNumberOfCheckpointsAllZero()
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 0
dischargedRetry = 1
)
assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
}
@ -557,7 +565,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
CLASS $actionExecutorClassName
METHOD executeCommitTransaction
AT ENTRY
IF createCounter("counter", $counter)
IF createCounter("counter", $counter) && createCounter("counter_2", $counter)
DO traceln("Counter created")
ENDRULE
@ -573,8 +581,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
INTERFACE ${CheckpointStorage::class.java.name}
METHOD getCheckpoint
AT ENTRY
IF true
DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
IF readCounter("counter_2") < 3
DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
ENDRULE
""".trimIndent()
@ -590,7 +598,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
alice.rpc.assertHospitalCounts(
discharged = 3,
observation = 1
observation = 1,
dischargedRetry = 1
)
assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
}

View File

@ -468,4 +468,4 @@ class GetCheckpointNumberOfStatusFlow(private val flowStatus: Checkpoint.FlowSta
}
}
}
}
}

View File

@ -140,6 +140,17 @@ class FlowWithClientIdTest {
}.withMessage("java.lang.IllegalStateException: Bla bla bla")
}
}
@Test(timeout=300_000)
fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
val clientId = UUID.randomUUID().toString()
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode().getOrThrow()
nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds)
val finishedFlows = nodeA.rpc.finishedFlowsWithClientIds()
assertEquals(true, finishedFlows[clientId])
}
}
}
@StartableByRPC

View File

@ -465,13 +465,24 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
updateAppSchemasWithCheckpoints: Boolean
) {
check(started == null) { "Node has already been started" }
check(updateCoreSchemas || updateAppSchemas) { "Neither core nor app schema scripts were specified" }
Node.printBasicNodeInfo("Running database schema migration scripts ...")
val props = configuration.dataSourceProperties
if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
var pendingAppChanges: Int = 0
var pendingCoreChanges: Int = 0
database.startHikariPool(props, metricRegistry) { dataSource, haveCheckpoints ->
SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
.checkOrUpdate(schemaService.internalSchemas, updateCoreSchemas, haveCheckpoints, true)
.checkOrUpdate(schemaService.appSchemas, updateAppSchemas, !updateAppSchemasWithCheckpoints && haveCheckpoints, false)
val schemaMigration = SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
if(updateCoreSchemas) {
schemaMigration.runMigration(haveCheckpoints, schemaService.internalSchemas, true)
} else {
pendingCoreChanges = schemaMigration.getPendingChangesCount(schemaService.internalSchemas, true)
}
if(updateAppSchemas) {
schemaMigration.runMigration(!updateAppSchemasWithCheckpoints && haveCheckpoints, schemaService.appSchemas, false)
} else {
pendingAppChanges = schemaMigration.getPendingChangesCount(schemaService.appSchemas, false)
}
}
// Now log the vendor string as this will also cause a connection to be tested eagerly.
logVendorString(database, log)
@ -491,7 +502,18 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
cordappProvider.start()
}
}
Node.printBasicNodeInfo("Database migration done.")
val updatedSchemas = listOfNotNull(
("core").takeIf { updateCoreSchemas },
("app").takeIf { updateAppSchemas }
).joinToString(separator = " and ");
val pendingChanges = listOfNotNull(
("no outstanding").takeIf { pendingAppChanges == 0 && pendingCoreChanges == 0 },
("$pendingCoreChanges outstanding core").takeIf { !updateCoreSchemas && pendingCoreChanges > 0 },
("$pendingAppChanges outstanding app").takeIf { !updateAppSchemas && pendingAppChanges > 0 }
).joinToString(prefix = "There are ", postfix = " database changes.");
Node.printBasicNodeInfo("Database migration scripts for $updatedSchemas schemas complete. $pendingChanges")
}
fun runSchemaSync() {
@ -583,6 +605,10 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
throw e
}
// Only execute futures/callbacks linked to [rootFuture] after the database transaction below is committed.
// This ensures that the node is fully ready before starting flows.
val rootFuture = openFuture<Void?>()
// Do all of this in a database transaction so anything that might need a connection has one.
val (resultingNodeInfo, readyFuture) = database.transaction(recoverableFailureTolerance = 0) {
networkParametersStorage.setCurrentParameters(signedNetParams, trustRoot)
@ -604,7 +630,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
tokenizableServices = null
verifyCheckpointsCompatible(frozenTokenizableServices)
val smmStartedFuture = smm.start(frozenTokenizableServices)
val callback = smm.start(frozenTokenizableServices)
val smmStartedFuture = rootFuture.map { callback() }
// Shut down the SMM so no Fibers are scheduled.
runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
val flowMonitor = FlowMonitor(
@ -624,6 +651,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
resultingNodeInfo to readyFuture
}
rootFuture.captureLater(services.networkMapCache.nodeReady)
readyFuture.map { ready ->
if (ready) {
// NB: Dispatch lifecycle events outside of transaction to ensure attachments and the like persisted into the DB

View File

@ -6,6 +6,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.node.ServiceHub
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.SubFlow
import net.corda.node.services.statemachine.SubFlowVersion
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
@ -13,6 +14,8 @@ import net.corda.serialization.internal.withTokenContext
object CheckpointVerifier {
private val statusToVerify = setOf(Checkpoint.FlowStatus.RUNNABLE, Checkpoint.FlowStatus.HOSPITALIZED, Checkpoint.FlowStatus.PAUSED)
/**
* Verifies that all Checkpoints stored in the db can be safely loaded with the currently installed version.
* @throws CheckpointIncompatibleException if any offending checkpoint is found.
@ -35,7 +38,7 @@ object CheckpointVerifier {
val cordappsByHash = currentCordapps.associateBy { it.jarHash }
checkpointStorage.getCheckpointsToRun().use {
checkpointStorage.getCheckpoints(statusToVerify).use {
it.forEach { (_, serializedCheckpoint) ->
val checkpoint = try {
serializedCheckpoint.deserialize(checkpointSerializationContext)

View File

@ -170,6 +170,8 @@ internal class CordaRPCOpsImpl(
override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId)
override fun finishedFlowsWithClientIds(): Map<String, Boolean> = smm.finishedFlowsWithClientIds()
override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
val (allStateMachines, changes) = smm.track()

View File

@ -15,14 +15,20 @@ interface CheckpointStorage {
/**
* Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id
*/
fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>,
serializedCheckpointState: SerializedBytes<CheckpointState>)
fun addCheckpoint(
id: StateMachineRunId, checkpoint: Checkpoint,
serializedFlowState: SerializedBytes<FlowState>?,
serializedCheckpointState: SerializedBytes<CheckpointState>
)
/**
* Update an existing checkpoint. Will throw if there is not checkpoint for this id.
*/
fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>?,
serializedCheckpointState: SerializedBytes<CheckpointState>)
fun updateCheckpoint(
id: StateMachineRunId, checkpoint: Checkpoint,
serializedFlowState: SerializedBytes<FlowState>?,
serializedCheckpointState: SerializedBytes<CheckpointState>
)
/**
* Update an existing checkpoints status ([Checkpoint.status]).
@ -78,7 +84,7 @@ interface CheckpointStorage {
* until the underlying database connection is closed, so any processing should happen before it is closed.
* This method does not fetch [Checkpoint.Serialized.serializedFlowState] to save memory.
*/
fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
fun getPausedCheckpoints(): Stream<Triple<StateMachineRunId, Checkpoint.Serialized, Boolean>>
fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>>

View File

@ -373,7 +373,7 @@ class DBCheckpointStorage(
override fun addCheckpoint(
id: StateMachineRunId,
checkpoint: Checkpoint,
serializedFlowState: SerializedBytes<FlowState>,
serializedFlowState: SerializedBytes<FlowState>?,
serializedCheckpointState: SerializedBytes<CheckpointState>
) {
val now = clock.instant()
@ -555,15 +555,17 @@ class DBCheckpointStorage(
return currentDBSession().find(DBFlowException::class.java, id.uuid.toString())
}
override fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
override fun getPausedCheckpoints(): Stream<Triple<StateMachineRunId, Checkpoint.Serialized, Boolean>> {
val session = currentDBSession()
val jpqlQuery = """select new ${DBPausedFields::class.java.name}(checkpoint.id, blob.checkpoint, checkpoint.status,
checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible) from ${DBFlowCheckpoint::class.java.name}
checkpoint join ${DBFlowCheckpointBlob::class.java.name} blob on checkpoint.blob = blob.id where
checkpoint.status = ${FlowStatus.PAUSED.ordinal}""".trimIndent()
checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible, exception.id)
from ${DBFlowCheckpoint::class.java.name} checkpoint
join ${DBFlowCheckpointBlob::class.java.name} blob on checkpoint.blob = blob.id
left outer join ${DBFlowException::class.java.name} exception on checkpoint.exceptionDetails = exception.id
where checkpoint.status = ${FlowStatus.PAUSED.ordinal}""".trimIndent()
val query = session.createQuery(jpqlQuery, DBPausedFields::class.java)
return query.resultList.stream().map {
StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint()
Triple(StateMachineRunId(UUID.fromString(it.id)), it.toSerializedCheckpoint(), it.wasHospitalized)
}
}
@ -722,8 +724,10 @@ class DBCheckpointStorage(
val status: FlowStatus,
val progressStep: String?,
val ioRequestType: String?,
val compatible: Boolean
val compatible: Boolean,
exception: String?
) {
val wasHospitalized = exception != null
fun toSerializedCheckpoint(): Checkpoint.Serialized {
return Checkpoint.Serialized(
serializedCheckpointState = SerializedBytes(checkpoint),

View File

@ -145,7 +145,7 @@ sealed class Action {
/**
* Commit the current database transaction.
*/
object CommitTransaction : Action() {
data class CommitTransaction(val currentState: StateMachineState) : Action() {
override fun toString() = "CommitTransaction"
}

View File

@ -61,7 +61,7 @@ internal class ActionExecutorImpl(
is Action.RemoveFlow -> executeRemoveFlow(action)
is Action.CreateTransaction -> executeCreateTransaction()
is Action.RollbackTransaction -> executeRollbackTransaction()
is Action.CommitTransaction -> executeCommitTransaction()
is Action.CommitTransaction -> executeCommitTransaction(action)
is Action.ExecuteAsyncOperation -> executeAsyncOperation(fiber, action)
is Action.ReleaseSoftLocks -> executeReleaseSoftLocks(action)
is Action.RetryFlowFromSafePoint -> executeRetryFlowFromSafePoint(action)
@ -94,10 +94,7 @@ internal class ActionExecutorImpl(
if (action.isCheckpointUpdate) {
checkpointStorage.updateCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState)
} else {
if (flowState is FlowState.Finished) {
throw IllegalStateException("A new checkpoint cannot be created with a finished flow state.")
}
checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState!!, serializedCheckpointState)
checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState)
}
}
@ -222,13 +219,14 @@ internal class ActionExecutorImpl(
@Suspendable
@Throws(SQLException::class)
private fun executeCommitTransaction() {
private fun executeCommitTransaction(action: Action.CommitTransaction) {
try {
contextTransaction.commit()
} finally {
contextTransaction.close()
contextTransactionOrNull = null
}
action.currentState.run { numberOfCommits = checkpoint.checkpointState.numberOfCommits }
}
@Suppress("TooGenericExceptionCaught")

View File

@ -15,6 +15,7 @@ import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
@ -23,6 +24,8 @@ import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.isEnabledTimedFlow
import net.corda.nodeapi.internal.persistence.CordaPersistence
import org.apache.activemq.artemis.utils.ReusableLatch
import org.apache.commons.lang3.reflect.FieldUtils
import java.lang.reflect.Field
import java.security.SecureRandom
import java.util.concurrent.Semaphore
@ -32,7 +35,9 @@ data class NonResidentFlow(
val runId: StateMachineRunId,
var checkpoint: Checkpoint,
val resultFuture: OpenFuture<Any?> = openFuture(),
val resumable: Boolean = true
val resumable: Boolean = true,
val hospitalized: Boolean = false,
val progressTracker: ProgressTracker? = null
) {
val events = mutableListOf<ExternalEvent>()
@ -41,6 +46,7 @@ data class NonResidentFlow(
}
}
@Suppress("TooManyFunctions")
class FlowCreator(
private val checkpointSerializationContext: CheckpointSerializationContext,
private val checkpointStorage: CheckpointStorage,
@ -70,7 +76,12 @@ class FlowCreator(
}
else -> nonResidentFlow.checkpoint
}
return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, resultFuture = nonResidentFlow.resultFuture)
return createFlowFromCheckpoint(
nonResidentFlow.runId,
checkpoint,
resultFuture = nonResidentFlow.resultFuture,
progressTracker = nonResidentFlow.progressTracker
)
}
@Suppress("LongParameterList")
@ -80,7 +91,8 @@ class FlowCreator(
reloadCheckpointAfterSuspendCount: Int? = null,
lock: Semaphore = Semaphore(1),
resultFuture: OpenFuture<Any?> = openFuture(),
firstRestore: Boolean = true
firstRestore: Boolean = true,
progressTracker: ProgressTracker? = null
): Flow<*>? {
val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore)
var checkpoint = oldCheckpoint
@ -91,6 +103,7 @@ class FlowCreator(
updateCompatibleInDb(runId, true)
checkpoint = checkpoint.copy(compatible = true)
}
checkpoint = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
fiber.logic.stateMachine = fiber
@ -102,8 +115,10 @@ class FlowCreator(
anyCheckpointPersisted = true,
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount
?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null,
numberOfCommits = checkpoint.checkpointState.numberOfCommits,
lock = lock
)
injectOldProgressTracker(progressTracker, fiber.logic)
return Flow(fiber, resultFuture)
}
@ -148,6 +163,7 @@ class FlowCreator(
fiber = flowStateMachineImpl,
anyCheckpointPersisted = existingCheckpoint != null,
reloadCheckpointAfterSuspendCount = if (reloadCheckpointAfterSuspend) 0 else null,
numberOfCommits = existingCheckpoint?.checkpointState?.numberOfCommits ?: 0,
lock = Semaphore(1),
deduplicationHandler = deduplicationHandler,
senderUUID = senderUUID
@ -229,6 +245,7 @@ class FlowCreator(
fiber: FlowStateMachineImpl<*>,
anyCheckpointPersisted: Boolean,
reloadCheckpointAfterSuspendCount: Int?,
numberOfCommits: Int,
lock: Semaphore,
deduplicationHandler: DeduplicationHandler? = null,
senderUUID: String? = null
@ -246,7 +263,66 @@ class FlowCreator(
flowLogic = fiber.logic,
senderUUID = senderUUID,
reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount,
numberOfCommits = numberOfCommits,
lock = lock
)
}
/**
* The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that
* any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress
* updates are correctly sent out after the flow is retried.
*
* If the new tracker contains any child trackers from sub-flows, we need to attach those to the old tracker as well.
*/
private fun injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
if (oldTracker != null) {
val newTracker = newFlowLogic.progressTracker
if (newTracker != null) {
attachNewChildren(oldTracker, newTracker)
replaceTracker(newFlowLogic, oldTracker)
}
}
}
private fun attachNewChildren(oldTracker: ProgressTracker, newTracker: ProgressTracker) {
oldTracker.currentStep = newTracker.currentStep
oldTracker.steps.forEachIndexed { index, step ->
val newStep = newTracker.steps[index]
val newChildTracker = newTracker.getChildProgressTracker(newStep)
newChildTracker?.let { child ->
oldTracker.setChildProgressTracker(step, child)
}
}
resubscribeToChildren(oldTracker)
}
/**
* Re-subscribes to child tracker observables. When a nested progress tracker is deserialized from a checkpoint,
* it retains the child links, but does not automatically re-subscribe to the child changes.
*/
private fun resubscribeToChildren(tracker: ProgressTracker) {
tracker.steps.forEach {
val childTracker = tracker.getChildProgressTracker(it)
if (childTracker != null) {
tracker.setChildProgressTracker(it, childTracker)
resubscribeToChildren(childTracker)
}
}
}
/** Replaces the deserialized [ProgressTracker] in the [newFlowLogic] with the old one to retain old subscribers. */
private fun replaceTracker(newFlowLogic: FlowLogic<*>, oldProgressTracker: ProgressTracker?) {
val field = getProgressTrackerField(newFlowLogic)
field?.apply {
isAccessible = true
set(newFlowLogic, oldProgressTracker)
}
}
private fun getProgressTrackerField(newFlowLogic: FlowLogic<*>): Field? {
// The progress tracker field may have been overridden in an abstract superclass, so we have to traverse up
// the hierarchy.
return FieldUtils.getAllFieldsList(newFlowLogic::class.java).find { it.name == "progressTracker" }
}
}

View File

@ -237,12 +237,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
private fun Throwable.fillInLocalStackTrace(): Throwable {
fillInStackTrace()
// provide useful information that can be displayed to the user
// reflection use to access private field
// Fill in the stacktrace when the exception originates from another node
when (this) {
is UnexpectedFlowEndException -> {
DeclaredField<Party?>(UnexpectedFlowEndException::class.java, "peer", this).value?.let {
fillInStackTrace()
stackTrace = arrayOf(
StackTraceElement(
"Received unexpected counter-flow exception from peer ${it.name}",
@ -255,6 +254,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
is FlowException -> {
DeclaredField<Party?>(FlowException::class.java, "peer", this).value?.let {
fillInStackTrace()
stackTrace = arrayOf(
StackTraceElement(
"Received counter-flow exception from peer ${it.name}",

View File

@ -22,7 +22,6 @@ import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.doneFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.mapNotNull
import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.deserialize
@ -41,7 +40,6 @@ import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInter
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.injectOldProgressTracker
import net.corda.node.utilities.isEnabledTimedFlow
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
@ -153,7 +151,7 @@ internal class SingleThreadedStateMachineManager(
override val changes: Observable<StateMachineManager.Change> = innerState.changesPublisher
@Suppress("ComplexMethod")
override fun start(tokenizableServices: List<Any>, startMode: StateMachineManager.StartMode): CordaFuture<Unit> {
override fun start(tokenizableServices: List<Any>, startMode: StateMachineManager.StartMode): () -> Unit {
checkQuasarJavaAgentPresence()
val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
CheckpointSerializeAsTokenContextImpl(
@ -200,13 +198,14 @@ internal class SingleThreadedStateMachineManager(
// - Incompatible checkpoints need to be handled upon implementing CORDA-3897
for (flow in fibers.values) {
flow.fiber.clientId?.let {
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(doneFuture(flow.fiber))
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(flow.fiber.id, doneFuture(flow.fiber))
}
}
for (pausedFlow in pausedFlows) {
pausedFlow.value.checkpoint.checkpointState.invocationContext.clientId?.let {
innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(
pausedFlow.key,
doneClientIdFuture(pausedFlow.key, pausedFlow.value.resultFuture, it)
)
}
@ -223,7 +222,7 @@ internal class SingleThreadedStateMachineManager(
} ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.")
}
return serviceHub.networkMapCache.nodeReady.map {
return {
logger.info("Node ready, info: ${serviceHub.myInfo}")
resumeRestoredFlows(fibers)
flowMessaging.start { _, deduplicationHandler ->
@ -312,17 +311,20 @@ internal class SingleThreadedStateMachineManager(
status
} else {
newFuture = openFuture()
FlowWithClientIdStatus.Active(newFuture!!)
FlowWithClientIdStatus.Active(flowId, newFuture!!)
}
}
}
// Flow -started with client id- already exists, return the existing's flow future and don't start a new flow.
existingStatus?.let {
val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
return@startFlow uncheckedCast(existingFuture)
}
onClientIDNotFound?.invoke()
// If the flow ID is the same as the one recorded in the client ID map,
// then this start flow event has been retried, and we should not de-duplicate.
if (flowId != it.flowId) {
val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
return@startFlow uncheckedCast(existingFuture)
}
} ?: onClientIDNotFound?.invoke()
}
return try {
@ -456,12 +458,8 @@ internal class SingleThreadedStateMachineManager(
innerState.withLock { if (id in flows) return@Checkpoints }
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
if (checkpointStorage.removeFlowException(id)) {
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
} else {
logger.error("Unable to remove database exception for flow $id. Something is very wrong. The flow will not be loaded and run.")
return@Checkpoints
}
checkpointStorage.removeFlowException(id)
checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
}
} ?: return@Checkpoints
val flow = flowCreator.createFlowFromCheckpoint(id, checkpoint)
@ -472,9 +470,9 @@ internal class SingleThreadedStateMachineManager(
flows[id] = flow
}
}
checkpointStorage.getPausedCheckpoints().forEach Checkpoints@{ (id, serializedCheckpoint) ->
checkpointStorage.getPausedCheckpoints().forEach Checkpoints@{ (id, serializedCheckpoint, hospitalised) ->
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@Checkpoints
pausedFlows[id] = NonResidentFlow(id, checkpoint)
pausedFlows[id] = NonResidentFlow(id, checkpoint, hospitalized = hospitalised)
}
return Pair(flows, pausedFlows)
}
@ -495,40 +493,49 @@ internal class SingleThreadedStateMachineManager(
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val checkpoint = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return@transaction null
}
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
// Ignore [isAnyCheckpointPersisted] as the checkpoint could be committed but the flag remains un-updated
val checkpointLoadingStatus = database.transaction {
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) ?: return@transaction CheckpointLoadingStatus.NotFound
val checkpoint = serializedCheckpoint.let {
tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also {
if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
if (checkpointStorage.removeFlowException(flowId)) {
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
} else {
logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not be loaded and run.")
return@transaction null
}
checkpointStorage.removeFlowException(flowId)
checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
}
} ?: return@transaction null
} ?: return
} ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize
}
// Resurrect flow
flowCreator.createFlowFromCheckpoint(
flowId,
checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
currentState.lock,
firstRestore = false
) ?: return
} else {
// Just flow initiation message
null
CheckpointLoadingStatus.Success(checkpoint)
}
val (flow, numberOfCommitsFromCheckpoint) = when {
// Resurrect flow
checkpointLoadingStatus is CheckpointLoadingStatus.Success -> {
val numberOfCommitsFromCheckpoint = checkpointLoadingStatus.checkpoint.checkpointState.numberOfCommits
val flow = flowCreator.createFlowFromCheckpoint(
flowId,
checkpointLoadingStatus.checkpoint,
currentState.reloadCheckpointAfterSuspendCount,
currentState.lock,
firstRestore = false,
progressTracker = currentState.flowLogic.progressTracker
) ?: return
flow to numberOfCommitsFromCheckpoint
}
checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return
else -> {
// Just flow initiation message
null to -1
}
}
innerState.withLock {
if (stopping) {
return
@ -538,10 +545,10 @@ internal class SingleThreadedStateMachineManager(
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
extractAndScheduleEventsForRetry(oldFlowLeftOver, currentState)
extractAndScheduleEventsForRetry(oldFlowLeftOver, currentState, numberOfCommitsFromCheckpoint)
}
}
@ -567,13 +574,27 @@ internal class SingleThreadedStateMachineManager(
/**
* Extract all the incomplete deduplication handlers as well as the [ExternalEvent] and [Event.Pause] events from this flows event queue
* [oldEventQueue]. Then schedule them (in the same order) for the new flow. This means that if a retried flow has a pause event
* scheduled then the retried flow will eventually pause. The new flow will not retry again if future retry events have been scheduled.
* When this method is called this flow must have been replaced by the new flow in [StateMachineInnerState.flows]. This method differs
* from [extractAndQueueExternalEventsForPausedFlow] where (only) [externalEvents] are extracted and scheduled straight away.
* Extract all the (unpersisted) incomplete deduplication handlers [currentState.pendingDeduplicationHandlers], as well as the
* [ExternalEvent] and [Event.Pause] events from this flows event queue [oldEventQueue]. Then schedule them (in the same order) for the
* new flow. This means that if a retried flow has a pause event scheduled then the retried flow will eventually pause. The new flow
* will not retry again if future retry events have been scheduled. When this method is called this flow must have been replaced by the
* new flow in [StateMachineInnerState.flows].
*
* This method differs from [extractAndQueueExternalEventsForPausedFlow] where (only) [externalEvents] are extracted and scheduled
* straight away.
*
* @param oldEventQueue The old event queue of the flow/fiber to unprocessed extract events from
*
* @param currentState The current state of the flow, used to extract processed events (held in [StateMachineState.pendingDeduplicationHandlers])
*
* @param numberOfCommitsFromCheckpoint The number of commits that the checkpoint loaded from the database has, to compare to the
* commits the flow has currently reached
*/
private fun extractAndScheduleEventsForRetry(oldEventQueue: Channel<Event>, currentState: StateMachineState) {
private fun extractAndScheduleEventsForRetry(
oldEventQueue: Channel<Event>,
currentState: StateMachineState,
numberOfCommitsFromCheckpoint: Int
) {
val flow = innerState.withLock {
flows[currentState.flowLogic.runId]
}
@ -583,9 +604,13 @@ internal class SingleThreadedStateMachineManager(
if (event is Event.Pause || event is Event.GeneratedByExternalEvent) events.add(event)
} while (event != null)
for (externalEvent in currentState.pendingDeduplicationHandlers) {
deliverExternalEvent(externalEvent.externalCause)
// Only redeliver events if they were not persisted to the database
if (currentState.numberOfCommits >= numberOfCommitsFromCheckpoint) {
for (externalEvent in currentState.pendingDeduplicationHandlers) {
deliverExternalEvent(externalEvent.externalCause)
}
}
for (event in events) {
if (event is Event.GeneratedByExternalEvent) {
deliverExternalEvent(event.deduplicationHandler.externalCause)
@ -758,8 +783,7 @@ internal class SingleThreadedStateMachineManager(
): CordaFuture<FlowStateMachine<A>> {
onCallingStartFlowInternal?.invoke()
val existingFlow = innerState.withLock { flows[flowId] }
val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState.isAnyCheckpointPersisted) {
val existingCheckpoint = if (innerState.withLock { flows[flowId] != null }) {
// Load the flow's checkpoint
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
@ -811,7 +835,13 @@ internal class SingleThreadedStateMachineManager(
decrementLiveFibers()
//Setting flowState = FlowState.Paused means we don't hold the frozen fiber in memory.
val checkpoint = currentState.checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED, flowState = FlowState.Paused)
val pausedFlow = NonResidentFlow(id, checkpoint, flow.resultFuture)
val pausedFlow = NonResidentFlow(
id,
checkpoint,
flow.resultFuture,
hospitalized = currentState.checkpoint.status == Checkpoint.FlowStatus.HOSPITALIZED,
progressTracker = currentState.flowLogic.progressTracker
)
val eventQueue = flow.fiber.transientValues.eventQueue
extractAndQueueExternalEventsForPausedFlow(eventQueue, currentState.pendingDeduplicationHandlers, pausedFlow)
pausedFlows.put(id, pausedFlow)
@ -1098,4 +1128,19 @@ internal class SingleThreadedStateMachineManager(
}
return false
}
override fun finishedFlowsWithClientIds(): Map<String, Boolean> {
return innerState.withLock {
clientIdsToFlowIds.asSequence()
.filter { (_, status) -> status is FlowWithClientIdStatus.Removed }
.map { (clientId, status) -> clientId to (status as FlowWithClientIdStatus.Removed).succeeded }
.toMap()
}
}
private sealed class CheckpointLoadingStatus {
class Success(val checkpoint: Checkpoint) : CheckpointLoadingStatus()
object NotFound : CheckpointLoadingStatus()
object CouldNotDeserialize : CheckpointLoadingStatus()
}
}

View File

@ -42,7 +42,7 @@ interface StateMachineManager {
*
* @return `Future` which completes when SMM is fully started
*/
fun start(tokenizableServices: List<Any>, startMode: StartMode = StartMode.ExcludingPaused) : CordaFuture<Unit>
fun start(tokenizableServices: List<Any>, startMode: StartMode = StartMode.ExcludingPaused) : () -> Unit
/**
* Stops the state machine manager gracefully, waiting until all but [allowedUnsuspendedFiberCount] flows reach the
@ -120,6 +120,14 @@ interface StateMachineManager {
* @return whether the mapping was removed.
*/
fun removeClientId(clientId: String): Boolean
/**
* Returns all finished flows that were started with a client id.
*
* @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully,
* [false] if completed exceptionally.
*/
fun finishedFlowsWithClientIds(): Map<String, Boolean>
}
// These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call

View File

@ -49,6 +49,8 @@ import java.util.concurrent.Semaphore
* @param senderUUID the identifier of the sending state machine or null if this flow is resumed from a checkpoint so that it does not participate in de-duplication high-water-marking.
* @param reloadCheckpointAfterSuspendCount The number of times a flow has been reloaded (not retried). This is [null] when
* [NodeConfiguration.reloadCheckpointAfterSuspendCount] is not enabled.
* @param numberOfCommits The number of times the flow's checkpoint has been successfully committed. This field is a var so that it can be
* updated after committing a database transaction that contained a checkpoint insert/update.
* @param lock The flow's lock, used to prevent the flow performing a transition while being interacted with from external threads, and
* vise-versa.
*/
@ -67,6 +69,7 @@ data class StateMachineState(
val isKilled: Boolean,
val senderUUID: String?,
val reloadCheckpointAfterSuspendCount: Int?,
var numberOfCommits: Int,
val lock: Semaphore
) : KryoSerializable {
override fun write(kryo: Kryo?, output: Output?) {
@ -129,7 +132,9 @@ data class Checkpoint(
emptyMap(),
emptySet(),
listOf(topLevelSubFlow),
numberOfSuspends = 0
numberOfSuspends = 0,
// We set this to 1 here to avoid an extra copy and increment in UnstartedFlowTransition.createInitialCheckpoint
numberOfCommits = 1
),
flowState = FlowState.Unstarted(flowStart, frozenFlowLogic),
errorState = ErrorState.Clean
@ -229,21 +234,23 @@ data class Checkpoint(
}
/**
* @param invocationContext the initiator of the flow.
* @param ourIdentity the identity the flow is run as.
* @param sessions map of source session ID to session state.
* @param sessionsToBeClosed the sessions that have pending session end messages and need to be closed. This is available to avoid scanning all the sessions.
* @param subFlowStack the stack of currently executing subflows.
* @param numberOfSuspends the number of flow suspends due to IO API calls.
* @param invocationContext The initiator of the flow.
* @param ourIdentity The identity the flow is run as.
* @param sessions Map of source session ID to session state.
* @param sessionsToBeClosed The sessions that have pending session end messages and need to be closed. This is available to avoid scanning all the sessions.
* @param subFlowStack The stack of currently executing subflows.
* @param numberOfSuspends The number of flow suspends due to IO API calls.
* @param numberOfCommits The number of times this checkpoint has been persisted.
*/
@CordaSerializable
data class CheckpointState(
val invocationContext: InvocationContext,
val ourIdentity: Party,
val sessions: SessionMap, // This must preserve the insertion order!
val sessionsToBeClosed: Set<SessionId>,
val subFlowStack: List<SubFlow>,
val numberOfSuspends: Int
val invocationContext: InvocationContext,
val ourIdentity: Party,
val sessions: SessionMap, // This must preserve the insertion order!
val sessionsToBeClosed: Set<SessionId>,
val subFlowStack: List<SubFlow>,
val numberOfSuspends: Int,
val numberOfCommits: Int
)
/**
@ -417,9 +424,13 @@ sealed class SubFlowVersion {
data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion()
}
sealed class FlowWithClientIdStatus {
data class Active(val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>) : FlowWithClientIdStatus()
data class Removed(val flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus()
sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId) {
class Active(
flowId: StateMachineRunId,
val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>
) : FlowWithClientIdStatus(flowId)
class Removed(flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus(flowId)
}
data class FlowResultMetadata(

View File

@ -49,7 +49,7 @@ class ErrorFlowTransition(
checkpointState = startingState.checkpoint.checkpointState.copy(sessions = newSessions)
)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID))
actions += Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID)
}
// If we're errored but not propagating keep processing events.
@ -59,32 +59,38 @@ class ErrorFlowTransition(
// If we haven't been removed yet remove the flow.
if (!currentState.isRemoved) {
val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
val newCheckpoint = startingState.checkpoint.copy(
status = Checkpoint.FlowStatus.FAILED,
flowState = FlowState.Finished,
checkpointState = startingState.checkpoint.checkpointState.copy(
numberOfCommits = startingState.checkpoint.checkpointState.numberOfCommits + 1
)
)
currentState = currentState.copy(
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isRemoved = true
)
val removeOrPersistCheckpoint = if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
Action.RemoveCheckpoint(context.id)
} else {
Action.PersistCheckpoint(context.id, newCheckpoint.copy(flowState = FlowState.Finished), isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
Action.PersistCheckpoint(
context.id,
newCheckpoint,
isCheckpointUpdate = currentState.isAnyCheckpointPersisted
)
}
actions.addAll(arrayOf(
Action.CreateTransaction,
removeOrPersistCheckpoint,
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(context.id.uuid),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.RemoveSessionBindings(currentState.checkpoint.checkpointState.sessions.keys)
))
actions += Action.CreateTransaction
actions += removeOrPersistCheckpoint
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.ReleaseSoftLocks(context.id.uuid)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
actions += Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys)
actions += Action.RemoveFlow(context.id, FlowRemovalReason.ErrorFinish(allErrors), currentState)
currentState = currentState.copy(
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isRemoved = true
)
val removalReason = FlowRemovalReason.ErrorFinish(allErrors)
actions.add(Action.RemoveFlow(context.id, removalReason, currentState))
FlowContinuation.Abort
} else {
// Otherwise keep processing events. This branch happens when there are some outstanding initiating

View File

@ -29,39 +29,31 @@ class KilledFlowTransition(
startingState.checkpoint.checkpointState.sessions,
errorMessages
)
val newCheckpoint = startingState.checkpoint.setSessions(sessions = newSessions)
currentState = currentState.copy(checkpoint = newCheckpoint)
actions.add(
Action.PropagateErrors(
errorMessages,
initiatedSessions,
startingState.senderUUID
)
)
if (!startingState.isFlowResumed) {
actions.add(Action.CreateTransaction)
}
// The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow]
if (startingState.isAnyCheckpointPersisted) {
actions.add(Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true))
}
actions.addAll(
arrayOf(
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(context.id.uuid),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.RemoveSessionBindings(currentState.checkpoint.checkpointState.sessions.keys)
)
)
currentState = currentState.copy(
checkpoint = startingState.checkpoint.setSessions(sessions = newSessions),
pendingDeduplicationHandlers = emptyList(),
isRemoved = true
)
actions += Action.PropagateErrors(
errorMessages,
initiatedSessions,
startingState.senderUUID
)
if (!startingState.isFlowResumed) {
actions += Action.CreateTransaction
}
// The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow]
if (startingState.isAnyCheckpointPersisted) {
actions += Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true)
}
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.ReleaseSoftLocks(context.id.uuid)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
actions += Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys)
actions += Action.RemoveFlow(context.id, createKilledRemovalReason(killedFlowError), currentState)
actions.add(Action.RemoveFlow(context.id, createKilledRemovalReason(killedFlowError), currentState))
FlowContinuation.Abort
}
}

View File

@ -121,7 +121,7 @@ class StartedFlowTransition(
actions = listOf(
Action.CreateTransaction,
Action.TrackTransaction(flowIORequest.hash, state),
Action.CommitTransaction
Action.CommitTransaction(state)
)
)
} else {
@ -389,22 +389,20 @@ class StartedFlowTransition(
}
private fun convertErrorMessageToException(errorMessage: ErrorSessionMessage, peer: Party): Throwable {
val exception: Throwable = if (errorMessage.flowException == null) {
UnexpectedFlowEndException("Counter-flow errored", cause = null, originalErrorId = errorMessage.errorId)
} else {
errorMessage.flowException.originalErrorId = errorMessage.errorId
errorMessage.flowException
}
when (exception) {
// reflection used to access private field
is UnexpectedFlowEndException -> DeclaredField<Party?>(
return if (errorMessage.flowException == null) {
UnexpectedFlowEndException("Counter-flow errored", cause = null, originalErrorId = errorMessage.errorId).apply {
DeclaredField<Party?>(
UnexpectedFlowEndException::class.java,
"peer",
exception
).value = peer
is FlowException -> DeclaredField<Party?>(FlowException::class.java, "peer", exception).value = peer
this
).value = peer
}
} else {
errorMessage.flowException.apply {
originalErrorId = errorMessage.errorId
DeclaredField<Party?>(FlowException::class.java, "peer", errorMessage.flowException).value = peer
}
}
return exception
}
private fun collectUncloseableSessions(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {

View File

@ -189,15 +189,16 @@ class TopLevelTransition(
private fun suspendTransition(event: Event.Suspend): TransitionResult {
return builder {
val newCheckpoint = currentState.checkpoint.run {
val newCheckpointState = if (checkpointState.invocationContext.arguments!!.isNotEmpty()) {
checkpointState.copy(
invocationContext = checkpointState.invocationContext.copy(arguments = emptyList()),
numberOfSuspends = checkpointState.numberOfSuspends + 1
)
} else {
checkpointState.copy(numberOfSuspends = checkpointState.numberOfSuspends + 1)
}
val newCheckpoint = startingState.checkpoint.run {
val newCheckpointState = checkpointState.copy(
invocationContext = if (checkpointState.invocationContext.arguments!!.isNotEmpty()) {
checkpointState.invocationContext.copy(arguments = emptyList())
} else {
checkpointState.invocationContext
},
numberOfSuspends = checkpointState.numberOfSuspends + 1,
numberOfCommits = checkpointState.numberOfCommits + 1
)
copy(
flowState = FlowState.Started(event.ioRequest, event.fiber),
checkpointState = newCheckpointState,
@ -206,29 +207,26 @@ class TopLevelTransition(
)
}
if (event.maySkipCheckpoint) {
actions.addAll(arrayOf(
Action.CommitTransaction,
Action.ScheduleEvent(Event.DoRemainingWork)
))
currentState = currentState.copy(
currentState = startingState.copy(
checkpoint = newCheckpoint,
isFlowResumed = false
)
actions += Action.CommitTransaction(currentState)
actions += Action.ScheduleEvent(Event.DoRemainingWork)
} else {
actions.addAll(arrayOf(
Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
Action.ScheduleEvent(Event.DoRemainingWork)
))
currentState = currentState.copy(
currentState = startingState.copy(
checkpoint = newCheckpoint,
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isAnyCheckpointPersisted = true
)
actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = startingState.isAnyCheckpointPersisted)
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
actions += Action.ScheduleEvent(Event.DoRemainingWork)
}
FlowContinuation.ProcessEvents
}
}
@ -238,44 +236,40 @@ class TopLevelTransition(
val checkpoint = currentState.checkpoint
when (checkpoint.errorState) {
ErrorState.Clean -> {
val pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers
currentState = currentState.copy(
checkpoint = checkpoint.copy(
checkpointState = checkpoint.checkpointState.copy(
numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1
),
flowState = FlowState.Finished,
result = event.returnValue,
status = Checkpoint.FlowStatus.COMPLETED
currentState = startingState.copy(
checkpoint = checkpoint.copy(
checkpointState = checkpoint.checkpointState.copy(
numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1,
numberOfCommits = checkpoint.checkpointState.numberOfCommits + 1
),
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isRemoved = true
flowState = FlowState.Finished,
result = event.returnValue,
status = Checkpoint.FlowStatus.COMPLETED
),
pendingDeduplicationHandlers = emptyList(),
isFlowResumed = false,
isRemoved = true
)
if (currentState.isAnyCheckpointPersisted) {
if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
actions.add(Action.RemoveCheckpoint(context.id))
} else {
actions.add(
Action.PersistCheckpoint(
context.id,
currentState.checkpoint,
isCheckpointUpdate = currentState.isAnyCheckpointPersisted
)
)
if (startingState.checkpoint.checkpointState.invocationContext.clientId == null) {
if (startingState.isAnyCheckpointPersisted) {
actions += Action.RemoveCheckpoint(context.id)
}
} else {
actions += Action.PersistCheckpoint(
context.id,
currentState.checkpoint,
isCheckpointUpdate = startingState.isAnyCheckpointPersisted
)
}
val allSourceSessionIds = currentState.checkpoint.checkpointState.sessions.keys
actions.addAll(arrayOf(
Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
Action.ReleaseSoftLocks(event.softLocksId),
Action.CommitTransaction,
Action.AcknowledgeMessages(pendingDeduplicationHandlers),
Action.RemoveSessionBindings(allSourceSessionIds),
Action.RemoveFlow(context.id, FlowRemovalReason.OrderlyFinish(event.returnValue), currentState)
))
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.ReleaseSoftLocks(event.softLocksId)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
actions += Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys)
actions += Action.RemoveFlow(context.id, FlowRemovalReason.OrderlyFinish(event.returnValue), currentState)
sendEndMessages()
// Resume to end fiber
FlowContinuation.Resume(null)
@ -358,17 +352,22 @@ class TopLevelTransition(
private fun overnightObservationTransition(): TransitionResult {
return builder {
val flowStartEvents = currentState.pendingDeduplicationHandlers.filter(::isFlowStartEvent)
val flowStartEvents = startingState.pendingDeduplicationHandlers.filter(::isFlowStartEvent)
val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
currentState = startingState.copy(
checkpoint = startingState.checkpoint.copy(
status = Checkpoint.FlowStatus.HOSPITALIZED,
checkpointState = startingState.checkpoint.checkpointState.copy(
numberOfCommits = startingState.checkpoint.checkpointState.numberOfCommits + 1
)
),
pendingDeduplicationHandlers = startingState.pendingDeduplicationHandlers - flowStartEvents
)
actions += Action.CreateTransaction
actions += Action.PersistDeduplicationFacts(flowStartEvents)
actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
actions += Action.CommitTransaction
actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = startingState.isAnyCheckpointPersisted)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(flowStartEvents)
currentState = currentState.copy(
checkpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED),
pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers - flowStartEvents
)
FlowContinuation.ProcessEvents
}
}
@ -394,15 +393,11 @@ class TopLevelTransition(
private fun pausedFlowTransition(): TransitionResult {
return builder {
if (!startingState.isFlowResumed) {
actions.add(Action.CreateTransaction)
actions += Action.CreateTransaction
}
actions.addAll(
arrayOf(
Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.PAUSED),
Action.CommitTransaction,
Action.MoveFlowToPaused(currentState)
)
)
actions += Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.PAUSED)
actions += Action.CommitTransaction(currentState)
actions += Action.MoveFlowToPaused(currentState)
FlowContinuation.Abort
}
}

View File

@ -27,14 +27,14 @@ class UnstartedFlowTransition(
createInitialCheckpoint()
}
actions.add(Action.SignalFlowHasStarted(context.id))
actions += Action.SignalFlowHasStarted(context.id)
if (unstarted.flowStart is FlowStart.Initiated) {
initialiseInitiatedSession(unstarted.flowStart)
}
currentState = currentState.copy(isFlowResumed = true)
actions.add(Action.CreateTransaction)
actions += Action.CreateTransaction
FlowContinuation.Resume(null)
}
}
@ -73,16 +73,14 @@ class UnstartedFlowTransition(
// Create initial checkpoint and acknowledge triggering messages.
private fun TransitionBuilder.createInitialCheckpoint() {
actions.addAll(arrayOf(
Action.CreateTransaction,
Action.PersistCheckpoint(context.id, currentState.checkpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
Action.CommitTransaction,
Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers)
))
currentState = currentState.copy(
pendingDeduplicationHandlers = emptyList(),
isAnyCheckpointPersisted = true
currentState = startingState.copy(
pendingDeduplicationHandlers = emptyList(),
isAnyCheckpointPersisted = true
)
actions += Action.CreateTransaction
actions += Action.PersistCheckpoint(context.id, startingState.checkpoint, isCheckpointUpdate = startingState.isAnyCheckpointPersisted)
actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
actions += Action.CommitTransaction(currentState)
actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
}
}

View File

@ -1,66 +0,0 @@
package net.corda.node.utilities
import net.corda.core.flows.FlowLogic
import net.corda.core.utilities.ProgressTracker
import net.corda.node.services.statemachine.StateMachineManagerInternal
import org.apache.commons.lang3.reflect.FieldUtils
import java.lang.reflect.Field
/**
* The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that
* any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress
* updates are correctly sent out after the flow is retried.
*
* If the new tracker contains any child trackers from sub-flows, we need to attach those to the old tracker as well.
*/
//TODO: instead of replacing the progress tracker after constructing the flow logic, we should inject it during fiber deserialization
internal fun StateMachineManagerInternal.injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
if (oldTracker != null) {
val newTracker = newFlowLogic.progressTracker
if (newTracker != null) {
attachNewChildren(oldTracker, newTracker)
replaceTracker(newFlowLogic, oldTracker)
}
}
}
private fun attachNewChildren(oldTracker: ProgressTracker, newTracker: ProgressTracker) {
oldTracker.currentStep = newTracker.currentStep
oldTracker.steps.forEachIndexed { index, step ->
val newStep = newTracker.steps[index]
val newChildTracker = newTracker.getChildProgressTracker(newStep)
newChildTracker?.let { child ->
oldTracker.setChildProgressTracker(step, child)
}
}
resubscribeToChildren(oldTracker)
}
/**
* Re-subscribes to child tracker observables. When a nested progress tracker is deserialized from a checkpoint,
* it retains the child links, but does not automatically re-subscribe to the child changes.
*/
private fun resubscribeToChildren(tracker: ProgressTracker) {
tracker.steps.forEach {
val childTracker = tracker.getChildProgressTracker(it)
if (childTracker != null) {
tracker.setChildProgressTracker(it, childTracker)
resubscribeToChildren(childTracker)
}
}
}
/** Replaces the deserialized [ProgressTracker] in the [newFlowLogic] with the old one to retain old subscribers. */
private fun replaceTracker(newFlowLogic: FlowLogic<*>, oldProgressTracker: ProgressTracker?) {
val field = getProgressTrackerField(newFlowLogic)
field?.apply {
isAccessible = true
set(newFlowLogic, oldProgressTracker)
}
}
private fun getProgressTrackerField(newFlowLogic: FlowLogic<*>): Field? {
// The progress tracker field may have been overridden in an abstract superclass, so we have to traverse up
// the hierarchy.
return FieldUtils.getAllFieldsList(newFlowLogic::class.java).find { it.name == "progressTracker" }
}

View File

@ -47,6 +47,7 @@ import java.time.Clock
import java.util.*
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
internal fun CheckpointStorage.checkpoints(): List<Checkpoint.Serialized> {
@ -803,6 +804,40 @@ class DBCheckpointStorageTests {
assertTrue(Checkpoint.FlowStatus.FAILED in finishedStatuses)
}
@Test(timeout = 300_000)
fun `'getPausedCheckpoints' fetches paused flows with and without database exceptions`() {
val (_, checkpoint) = newCheckpoint(1)
val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val checkpointState = checkpoint.serializeCheckpointState()
val hospitalizedPaused = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
val cleanPaused = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
database.transaction {
checkpointStorage.addCheckpoint(hospitalizedPaused.id, hospitalizedPaused.checkpoint, serializedFlowState, checkpointState)
checkpointStorage.addCheckpoint(cleanPaused.id, cleanPaused.checkpoint, serializedFlowState, checkpointState)
}
database.transaction {
checkpointStorage.updateCheckpoint(
hospitalizedPaused.id,
hospitalizedPaused.checkpoint.addError(IllegalStateException(), Checkpoint.FlowStatus.HOSPITALIZED),
serializedFlowState,
checkpointState
)
}
database.transaction {
checkpointStorage.updateStatus(hospitalizedPaused.id, Checkpoint.FlowStatus.PAUSED)
checkpointStorage.updateStatus(cleanPaused.id, Checkpoint.FlowStatus.PAUSED)
}
database.transaction {
val checkpoints = checkpointStorage.getPausedCheckpoints().toList()
val dbHospitalizedPaused = checkpoints.single { it.first == hospitalizedPaused.id }
assertEquals(hospitalizedPaused.id, dbHospitalizedPaused.first)
assertTrue(dbHospitalizedPaused.third)
val dbCleanPaused = checkpoints.single { it.first == cleanPaused.id }
assertEquals(cleanPaused.id, dbCleanPaused.first)
assertFalse(dbCleanPaused.third)
}
}
data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint)
private fun changeStatus(oldCheckpoint: Checkpoint, status: Checkpoint.FlowStatus): IdAndCheckpoint {

View File

@ -4,8 +4,10 @@ import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.concurrent.Semaphore
import net.corda.core.CordaRuntimeException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.KilledFlowException
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.FlowStateMachineHandle
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds
import net.corda.node.services.persistence.DBCheckpointStorage
@ -18,18 +20,17 @@ import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.startFlow
import net.corda.testing.node.internal.startFlowWithClientId
import net.corda.core.flows.KilledFlowException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Test
import rx.Observable
import java.lang.IllegalArgumentException
import java.sql.SQLTransientConnectionException
import java.util.UUID
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import kotlin.IllegalStateException
import kotlin.concurrent.thread
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
@ -457,59 +458,6 @@ class FlowClientIdTests {
Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds))
}
// the below test has to be made available only in ENT
// @Test(timeout=300_000)
// fun `on node restart -paused- flows with client id are hook-able`() {
// val clientId = UUID.randomUUID().toString()
// var noSecondFlowWasSpawned = 0
// var firstRun = true
// var firstFiber: Fiber<out Any?>? = null
// val flowIsRunning = Semaphore(0)
// val waitUntilFlowIsRunning = Semaphore(0)
//
// ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
// @Suspendable
// override fun call() {
// if (firstRun) {
// firstFiber = Fiber.currentFiber()
// firstRun = false
// }
//
// waitUntilFlowIsRunning.release()
// try {
// flowIsRunning.acquire() // make flow wait here to impersonate a running flow
// } catch (e: InterruptedException) {
// flowIsRunning.release()
// throw e
// }
//
// noSecondFlowWasSpawned++
// }
// }
//
// val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
// waitUntilFlowIsRunning.acquire()
// aliceNode.internals.acceptableLiveFiberCountOnStop = 1
// // Pause the flow on node restart
// val aliceNode = mockNet.restartNode(aliceNode,
// InternalMockNodeParameters(
// configOverrides = {
// doReturn(StateMachineManager.StartMode.Safe).whenever(it).smmStartMode
// }
// ))
// // Blow up the first fiber running our flow as it is leaked here, on normal node shutdown that fiber should be gone
// firstFiber!!.interrupt()
//
// // Re-hook a paused flow
// val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
//
// Assert.assertEquals(flowHandle0.id, flowHandle1.id)
// Assert.assertEquals(clientId, flowHandle1.clientId)
// aliceNode.smm.unPauseFlow(flowHandle1.id)
// Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds))
// Assert.assertEquals(1, noSecondFlowWasSpawned)
// }
@Test(timeout = 300_000)
fun `on node start -completed- flows with client id are hook-able`() {
val clientId = UUID.randomUUID().toString()
@ -556,44 +504,6 @@ class FlowClientIdTests {
assertEquals(1, counter)
}
// the below test has to be made available only in ENT
// @Test(timeout=300_000)
// fun `On 'startFlowInternal' throwing, subsequent request with same client hits the time window in which the previous request was about to remove the client id mapping`() {
// val clientId = UUID.randomUUID().toString()
// var firstRequest = true
// SingleThreadedStateMachineManager.onCallingStartFlowInternal = {
// if (firstRequest) {
// firstRequest = false
// throw IllegalStateException("Yet another one")
// }
// }
//
// val wait = Semaphore(0)
// val waitForFirstRequest = Semaphore(0)
// SingleThreadedStateMachineManager.onStartFlowInternalThrewAndAboutToRemove = {
// waitForFirstRequest.release()
// wait.acquire()
// Thread.sleep(10000)
// }
// var counter = 0
// ResultFlow.hook = { counter++ }
//
// thread {
// assertFailsWith<IllegalStateException> {
// aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
// }
// }
//
// waitForFirstRequest.acquire()
// wait.release()
// assertFailsWith<IllegalStateException> {
// // the subsequent request will not hang on a never ending future, because the previous request ,upon failing, will also complete the future exceptionally
// aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
// }
//
// assertEquals(0, counter)
// }
@Test(timeout = 300_000)
fun `if flow fails to serialize its result then the result gets converted to an exception result`() {
val clientId = UUID.randomUUID().toString()
@ -775,17 +685,59 @@ class FlowClientIdTests {
reattachedFlowHandle?.resultFuture?.getOrThrow()
}.withMessage("java.lang.IllegalStateException: Bla bla bla")
}
@Test(timeout = 300_000)
fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
val clientIds = listOf("a", "b", "c", "d", "e")
val lock = CountDownLatch(1)
ResultFlow.hook = { clientId ->
if (clientId == clientIds[3]) {
throw java.lang.IllegalStateException("This didn't go so well")
}
if (clientId == clientIds[4]) {
lock.await(30, TimeUnit.SECONDS)
}
}
val flows = listOf(
aliceNode.services.startFlowWithClientId(clientIds[0], ResultFlow(10)),
aliceNode.services.startFlowWithClientId(clientIds[1], ResultFlow(10)),
aliceNode.services.startFlowWithClientId(clientIds[2], ResultFlow(10))
)
val failedFlow = aliceNode.services.startFlowWithClientId(clientIds[3], ResultFlow(10))
val runningFlow = aliceNode.services.startFlowWithClientId(clientIds[4], ResultFlow(10))
flows.map { it.resultFuture }.transpose().getOrThrow(30.seconds)
assertFailsWith<java.lang.IllegalStateException> { failedFlow.resultFuture.getOrThrow(20.seconds) }
val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds()
lock.countDown()
assertEquals(4, finishedFlows.size)
assertEquals(3, finishedFlows.filterValues { it }.size)
assertEquals(1, finishedFlows.filterValues { !it }.size)
assertEquals(setOf("a", "b", "c", "d"), finishedFlows.map { it.key }.toSet())
assertTrue(runningFlow.clientId !in finishedFlows.keys)
assertEquals(
listOf(10, 10, 10),
finishedFlows.filterValues { it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.get() }
)
// [CordaRunTimeException] returned because [IllegalStateException] is not serializable
assertFailsWith<CordaRuntimeException> {
finishedFlows.filterValues { !it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.getOrThrow() }
}
}
}
internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
companion object {
var hook: (() -> Unit)? = null
var hook: ((String?) -> Unit)? = null
var suspendableHook: FlowLogic<Unit>? = null
}
@Suspendable
override fun call(): A {
hook?.invoke()
hook?.invoke(stateMachine.clientId)
suspendableHook?.let { subFlow(it) }
return result
}

View File

@ -147,6 +147,8 @@ class FlowFrameworkTests {
SuspendingFlow.hookBeforeCheckpoint = {}
SuspendingFlow.hookAfterCheckpoint = {}
StaffedFlowHospital.onFlowResuscitated.clear()
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
}
@Test(timeout=300_000)
@ -311,7 +313,7 @@ class FlowFrameworkTests {
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
.withMessage("Nothing useful")
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
.withStackTraceContaining("Received counter-flow exception from peer")
.withStackTraceContaining("Received counter-flow exception from peer ${bob.name}")
bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
}
@ -634,6 +636,7 @@ class FlowFrameworkTests {
Notification.createOnNext(ReceiveFlow.START_STEP),
Notification.createOnError(receiveFlowException)
)
assertThat(receiveFlowException).hasStackTraceContaining("Received unexpected counter-flow exception from peer ${bob.name}")
assertSessionTransfers(
aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
@ -692,9 +695,6 @@ class FlowFrameworkTests {
firstExecution = false
throw HospitalizeFlowException()
} else {
// the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
// and doesn't commit before flow starts running.
Thread.sleep(3000)
dbCheckpointStatusBeforeSuspension = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
currentDBSession().clear() // clear session as Hibernate with fails with 'org.hibernate.NonUniqueObjectException' once it tries to save a DBFlowCheckpoint upon checkpoint
inMemoryCheckpointStatusBeforeSuspension = flowFiber.transientState.checkpoint.status
@ -743,9 +743,6 @@ class FlowFrameworkTests {
firstExecution = false
throw HospitalizeFlowException()
} else {
// the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
// and doesn't commit before flow starts running.
Thread.sleep(3000)
dbCheckpointStatus = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
inMemoryCheckpointStatus = flowFiber.transientState.checkpoint.status
@ -860,9 +857,6 @@ class FlowFrameworkTests {
var secondRun = false
SuspendingFlow.hookBeforeCheckpoint = {
if(secondRun) {
// the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
// and doesn't commit before flow starts running.
Thread.sleep(3000)
aliceNode.database.transaction {
checkpointStatusAfterRestart = findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().single().status
dbExceptionAfterRestart = findRecordsFromDatabase()
@ -890,7 +884,6 @@ class FlowFrameworkTests {
Thread.sleep(3000) // wait until flow saves overnight observation state in database
aliceNode = mockNet.restartNode(aliceNode)
waitUntilHospitalized.acquire()
Thread.sleep(3000) // wait until flow saves overnight observation state in database
assertEquals(2, counter)
@ -1272,4 +1265,4 @@ internal class SuspendingFlow : FlowLogic<Unit>() {
stateMachine.suspend(FlowIORequest.ForceCheckpoint, maySkipCheckpoint = false) // flow checkpoints => checkpoint is in DB
stateMachine.hookAfterCheckpoint()
}
}
}

View File

@ -50,6 +50,9 @@ repositories {
}
}
evaluationDependsOn("cordapp")
evaluationDependsOn("web")
dependencies {
compile "commons-io:commons-io:$commons_io_version"
compile project(":samples:irs-demo:web")
@ -84,9 +87,6 @@ task slowIntegrationTest(type: Test, dependsOn: []) {
classpath = sourceSets.slowIntegrationTest.runtimeClasspath
}
evaluationDependsOn("cordapp")
evaluationDependsOn("web")
task systemTest(type: Test, dependsOn: ["cordapp:prepareDockerNodes", "web:generateDockerCompose"]) {
testClassesDirs = sourceSets.systemTest.output.classesDirs
classpath = sourceSets.systemTest.runtimeClasspath

View File

@ -92,6 +92,7 @@ dependencies {
jar {
from sourceSets.main.output
dependsOn clientInstall
archiveClassifier = 'thin'
}
def docker_dir = file("$project.buildDir/docker")

View File

@ -79,7 +79,7 @@ interface ObjectBuilder {
* Create an [ObjectBuilderProvider] for the given [LocalTypeInformation.Composable].
*/
fun makeProvider(typeInformation: LocalTypeInformation.Composable): ObjectBuilderProvider =
makeProvider(typeInformation.typeIdentifier, typeInformation.constructor, typeInformation.properties)
makeProvider(typeInformation.typeIdentifier, typeInformation.constructor, typeInformation.properties, false)
/**
* Create an [ObjectBuilderProvider] for the given type, constructor and set of properties.
@ -90,15 +90,17 @@ interface ObjectBuilder {
fun makeProvider(
typeIdentifier: TypeIdentifier,
constructor: LocalConstructorInformation,
properties: Map<String, LocalPropertyInformation>
properties: Map<String, LocalPropertyInformation>,
includeAllConstructorParameters: Boolean
): ObjectBuilderProvider =
if (constructor.hasParameters) makeConstructorBasedProvider(properties, typeIdentifier, constructor)
if (constructor.hasParameters) makeConstructorBasedProvider(properties, typeIdentifier, constructor, includeAllConstructorParameters)
else makeSetterBasedProvider(properties, typeIdentifier, constructor)
private fun makeConstructorBasedProvider(
properties: Map<String, LocalPropertyInformation>,
typeIdentifier: TypeIdentifier,
constructor: LocalConstructorInformation
constructor: LocalConstructorInformation,
includeAllConstructorParameters: Boolean
): ObjectBuilderProvider {
requireForSer(properties.values.all {
when (it) {
@ -119,6 +121,10 @@ interface ObjectBuilder {
"but property $name is not constructor-paired"
)
}
}.toMutableMap()
if (includeAllConstructorParameters) {
addMissingConstructorParameters(constructorIndices, constructor)
}
val propertySlots = constructorIndices.keys.mapIndexed { slot, name -> name to slot }.toMap()
@ -128,6 +134,15 @@ interface ObjectBuilder {
}
}
private fun addMissingConstructorParameters(constructorIndices: MutableMap<String, Int>, constructor: LocalConstructorInformation) {
// Add constructor parameters not in the list of properties
// so we can use them in object evolution
for ((parameterIndex, parameter) in constructor.parameters.withIndex()) {
// Only use the parameters not already matched to properties
constructorIndices.putIfAbsent(parameter.name, parameterIndex)
}
}
private fun makeSetterBasedProvider(
properties: Map<String, LocalPropertyInformation>,
typeIdentifier: TypeIdentifier,
@ -254,7 +269,7 @@ class EvolutionObjectBuilder(
remoteTypeInformation: RemoteTypeInformation.Composable,
mustPreserveData: Boolean
): () -> ObjectBuilder {
val localBuilderProvider = ObjectBuilder.makeProvider(typeIdentifier, constructor, localProperties)
val localBuilderProvider = ObjectBuilder.makeProvider(typeIdentifier, constructor, localProperties, true)
val remotePropertyNames = remoteTypeInformation.properties.keys.sorted()
val reroutedIndices = remotePropertyNames.map { propertyName ->

View File

@ -0,0 +1,90 @@
package net.corda.serialization.internal.amqp
import net.corda.core.contracts.BelongsToContract
import net.corda.core.contracts.Contract
import net.corda.core.contracts.ContractState
import net.corda.core.identity.AbstractParty
import net.corda.core.serialization.DeprecatedConstructorForDeserialization
import net.corda.core.serialization.SerializedBytes
import net.corda.core.transactions.LedgerTransaction
import net.corda.serialization.internal.amqp.testutils.deserialize
import net.corda.serialization.internal.amqp.testutils.serialize
import net.corda.serialization.internal.amqp.testutils.testDefaultFactory
import net.corda.serialization.internal.amqp.testutils.writeTestResource
import org.assertj.core.api.Assertions
import org.junit.Test
class EvolutionObjectBuilderRenamedPropertyTests
{
private val cordappVersionTestValue = 38854445
private val dataTestValue = "d7af8af0-c10e-45bc-a5f7-92de432be0ef"
private val xTestValue = 7568055
private val yTestValue = 4113687
class TemplateContract : Contract {
override fun verify(tx: LedgerTransaction) { }
}
/**
* Step 1
*
* This is the original class definition in object evolution.
*/
// @BelongsToContract(TemplateContract::class)
// data class TemplateState(val cordappVersion: Int, val data: String, val x : Int?, override val participants: List<AbstractParty> = listOf()) : ContractState
/**
* Step 2
*
* This is an intermediate class definition in object evolution.
* The y property has been added and a constructor copies the value of x into y. x is now set to null by the constructor.
*/
// @BelongsToContract(TemplateContract::class)
// data class TemplateState(val cordappVersion: Int, val data: String, val x : Int?, val y : String?, override val participants: List<AbstractParty> = listOf()) : ContractState {
// @DeprecatedConstructorForDeserialization(1)
// constructor(cordappVersion: Int, data : String, x : Int?, participants: List<AbstractParty>)
// : this(cordappVersion, data, null, x?.toString(), participants)
// }
/**
* Step 3
*
* This is the final class definition in object evolution.
* The x property has been removed but the constructor that copies values of x into y still exists. We expect previous versions of this
* object to pass the value of x to the constructor when deserialized.
*/
@BelongsToContract(TemplateContract::class)
data class TemplateState(val cordappVersion: Int, val data: String, val y : String?, override val participants: List<AbstractParty> = listOf()) : ContractState {
@DeprecatedConstructorForDeserialization(1)
constructor(cordappVersion: Int, data : String, x : Int?, participants: List<AbstractParty>) : this(cordappVersion, data, x?.toString(), participants)
}
@Test(timeout=300_000)
fun `Step 1 to Step 3`() {
// The next two commented lines are how the serialized data is generated. To regenerate the data, uncomment these along
// with the correct version of the class and rerun the test. This will generate a new file in the project resources.
// val step1 = TemplateState(cordappVersionTestValue, dataTestValue, xTestValue)
// saveSerializedObject(step1)
// serialization/src/test/resources/net/corda/serialization/internal/amqp/EvolutionObjectBuilderRenamedPropertyTests.Step1
val bytes = this::class.java.getResource("EvolutionObjectBuilderRenamedPropertyTests.Step1").readBytes()
val serializerFactory: SerializerFactory = testDefaultFactory()
val deserializedObject = DeserializationInput(serializerFactory)
.deserialize(SerializedBytes<TemplateState>(bytes))
Assertions.assertThat(deserializedObject.cordappVersion).isEqualTo(cordappVersionTestValue)
Assertions.assertThat(deserializedObject.data).isEqualTo(dataTestValue)
// Assertions.assertThat(deserializedObject.x).isEqualTo(xTestValue)
Assertions.assertThat(deserializedObject.y).isEqualTo(xTestValue.toString())
Assertions.assertThat(deserializedObject).isInstanceOf(TemplateState::class.java)
}
/**
* Write serialized object to resources folder
*/
@Suppress("unused")
fun <T : Any> saveSerializedObject(obj : T) = writeTestResource(SerializationOutput(testDefaultFactory()).serialize(obj))
}