diff --git a/.ci/dev/regression/Jenkinsfile b/.ci/dev/regression/Jenkinsfile
index fe155dd2f8..c971361c5c 100644
--- a/.ci/dev/regression/Jenkinsfile
+++ b/.ci/dev/regression/Jenkinsfile
@@ -114,7 +114,9 @@ pipeline {
                             "-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_USERNAME=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
                             "-Ddocker.container.env.parameter.CORDA_ARTIFACTORY_PASSWORD=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
                             "-Ddocker.build.tag=\"\${DOCKER_TAG_TO_USE}\"" +
-                            " clean preAllocateForParallelRegressionTest preAllocateForAllParallelSlowIntegrationTest pushBuildImage --stacktrace"
+                            " clean preAllocateForAllParallelUnitTest preAllocateForAllParallelIntegrationTest " +
+                            " preAllocateForAllParallelSlowIntegrationTest preAllocateForAllParallelSmokeTest " +
+                            " pushBuildImage --stacktrace"
                 }
                 sh "kubectl auth can-i get pods"
             }
@@ -122,7 +124,7 @@ pipeline {
 
         stage('Testing phase') {
             parallel {
-                stage('Regression Test') {
+                stage('Unit Test') {
                     steps {
                         sh "./gradlew " +
                                 "-DbuildId=\"\${BUILD_ID}\" " +
@@ -132,7 +134,33 @@ pipeline {
                                 "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
                                 "-Dgit.branch=\"\${GIT_BRANCH}\" " +
                                 "-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
-                                " parallelRegressionTest --stacktrace"
+                                " allParallelUnitTest --stacktrace"
+                    }
+                }
+                stage('Integration Test') {
+                    steps {
+                        sh "./gradlew " +
+                                "-DbuildId=\"\${BUILD_ID}\" " +
+                                "-Dkubenetize=true " +
+                                "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
+                                "-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
+                                "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
+                                "-Dgit.branch=\"\${GIT_BRANCH}\" " +
+                                "-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
+                                " allParallelIntegrationTest --stacktrace"
+                    }
+                }
+                stage('Smoke Test') {
+                    steps {
+                        sh "./gradlew " +
+                                "-DbuildId=\"\${BUILD_ID}\" " +
+                                "-Dkubenetize=true " +
+                                "-Ddocker.run.tag=\"\${DOCKER_TAG_TO_USE}\" " +
+                                "-Dartifactory.username=\"\${ARTIFACTORY_CREDENTIALS_USR}\" " +
+                                "-Dartifactory.password=\"\${ARTIFACTORY_CREDENTIALS_PSW}\" " +
+                                "-Dgit.branch=\"\${GIT_BRANCH}\" " +
+                                "-Dgit.target.branch=\"\${GIT_BRANCH}\" " +
+                                " allParallelSmokeTest --stacktrace"
                     }
                 }
                 stage('Slow Integration Test') {
diff --git a/build.gradle b/build.gradle
index 7224ccc8f9..fcacad11a4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -753,19 +753,14 @@ distributedTesting {
             profile 'generalPurpose.yml'
             distribution DistributeTestsBy.METHOD
         }
-        parallelRegressionTest {
-            testGroups 'test', 'integrationTest', 'smokeTest'
-            profile 'generalPurpose.yml'
-            distribution DistributeTestsBy.METHOD
-        }
         allParallelSmokeTest {
             testGroups 'smokeTest'
-            profile 'generalPurpose.yml'
+            profile 'regression.yml'
             distribution DistributeTestsBy.METHOD
         }
         allParallelSlowIntegrationTest {
             testGroups 'slowIntegrationTest'
-            profile 'generalPurpose.yml'
+            profile 'regression.yml'
             distribution DistributeTestsBy.METHOD
         }
     }
diff --git a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt
index 826f52f0d9..51e6939257 100644
--- a/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt
+++ b/core/src/main/kotlin/net/corda/core/messaging/CordaRPCOps.kt
@@ -321,6 +321,14 @@ interface CordaRPCOps : RPCOps {
      */
     fun removeClientId(clientId: String): Boolean
 
+    /**
+     * Returns all finished flows that were started with a client id.
+     *
+     * @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully,
+     * [false] if completed exceptionally.
+     */
+    fun finishedFlowsWithClientIds(): Map<String, Boolean>
+
     /** Returns Node's NodeInfo, assuming this will not change while the node is running. */
     fun nodeInfo(): NodeInfo
 
diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt
index 16c5857ae9..9b8ed573d2 100644
--- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt
+++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/DatabaseTransaction.kt
@@ -73,6 +73,7 @@ class DatabaseTransaction(
         firstExceptionInDatabaseTransaction = null
     }
 
+    @Throws(SQLException::class)
     fun commit() {
         firstExceptionInDatabaseTransaction?.let {
             throw DatabaseTransactionException(it)
diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/SchemaMigration.kt b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/SchemaMigration.kt
index b528203c00..e7b36efa94 100644
--- a/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/SchemaMigration.kt
+++ b/node-api/src/main/kotlin/net/corda/nodeapi/internal/persistence/SchemaMigration.kt
@@ -92,6 +92,25 @@ open class SchemaMigration(
         }
     }
 
+    /**
+     * Returns the count of pending database migration changes
+     * @param schemas The set of MappedSchemas to check
+     * @param forceThrowOnMissingMigration throws an exception if a mapped schema is missing the migration resource. Can be set to false
+     *                                      when allowing hibernate to create missing schemas in dev or tests.
+     */
+    fun getPendingChangesCount(schemas: Set<MappedSchema>, forceThrowOnMissingMigration: Boolean) : Int {
+        val resourcesAndSourceInfo = prepareResources(schemas, forceThrowOnMissingMigration)
+
+        // current version of Liquibase appears to be non-threadsafe
+        // this is apparent when multiple in-process nodes are all running migrations simultaneously
+        mutex.withLock {
+            dataSource.connection.use { connection ->
+                val (_, changeToRunCount, _) = prepareRunner(connection, resourcesAndSourceInfo)
+                return changeToRunCount;
+            }
+        }
+    }
+
     /**
      * Synchronises the changelog table with the schema descriptions passed in without applying any of the changes to the database.
      * This can be used when migrating a CorDapp that had its schema generated by hibernate to liquibase schema migration, or when
diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt
index 0cb4c7fd77..87175bb06d 100644
--- a/node/src/integration-test-slow/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt
+++ b/node/src/integration-test-slow/kotlin/net/corda/node/flows/FlowCheckpointVersionNodeStartupCheckTest.kt
@@ -10,11 +10,13 @@ import net.corda.core.messaging.startFlow
 import net.corda.core.utilities.getOrThrow
 import net.corda.core.utilities.unwrap
 import net.corda.node.internal.CheckpointIncompatibleException
+import net.corda.node.services.statemachine.Checkpoint
 import net.corda.testing.core.ALICE_NAME
 import net.corda.testing.core.BOB_NAME
 import net.corda.testing.core.singleIdentity
 import net.corda.testing.driver.DriverDSL
 import net.corda.testing.driver.DriverParameters
+import net.corda.testing.driver.NodeHandle
 import net.corda.testing.driver.NodeParameters
 import net.corda.testing.driver.driver
 import net.corda.testing.node.internal.assertUncompletedCheckpoints
@@ -32,7 +34,7 @@ class FlowCheckpointVersionNodeStartupCheckTest {
     }
 
     @Test(timeout=300_000)
-	fun `restart node with mismatch between suspended flow and installed CorDapps`() {
+    fun `restart node with mismatch between suspended flow and installed CorDapps`() {
         driver(DriverParameters(
                 startNodesInProcess = false,
                 inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
@@ -40,7 +42,44 @@ class FlowCheckpointVersionNodeStartupCheckTest {
                 notarySpecs = emptyList(),
                 allowHibernateToManageAppSchema = false
         )) {
-            createSuspendedFlowInBob()
+            val (bob, _) = createSuspendedFlowInBob()
+            bob.stop()
+            restartBobWithMismatchedCorDapp()
+        }
+    }
+
+    @Test(timeout=300_000)
+    fun `restart node with mismatch between suspended paused flow and installed CorDapps`() {
+        driver(DriverParameters(
+                startNodesInProcess = false,
+                inMemoryDB = false, // Ensure database is persisted between node restarts so we can keep suspended flows
+                cordappsForAllNodes = emptyList(),
+                notarySpecs = emptyList(),
+                allowHibernateToManageAppSchema = false
+        )) {
+            val (bob, flowId) = createSuspendedFlowInBob()
+            val flow = bob.rpc.startFlow(::UpdateStatusToPaused, flowId)
+            flow.returnValue.getOrThrow()
+            bob.stop()
+            restartBobWithMismatchedCorDapp()
+        }
+    }
+
+    private fun DriverDSL.createSuspendedFlowInBob(): Pair<NodeHandle, StateMachineRunId> {
+        val (alice, bob) = listOf(
+                startNode(providedName = ALICE_NAME),
+                startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp)))
+        ).map { it.getOrThrow() }
+
+        alice.stop() // Stop Alice so that Bob never receives the message
+
+        val flowId = bob.rpc.startFlow(FlowCheckpointVersionNodeStartupCheckTest::ReceiverFlow, alice.nodeInfo.singleIdentity()).id
+        // Wait until Bob's flow has started
+        bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first()
+        return Pair(bob, flowId)
+    }
+    
+	fun DriverDSL.restartBobWithMismatchedCorDapp() {
             val cordappsDir = baseDirectory(BOB_NAME) / "cordapps"
 
             // Test the scenerio where the CorDapp no longer exists
@@ -58,21 +97,6 @@ class FlowCheckpointVersionNodeStartupCheckTest {
                     // The part of the log message generated by CheckpointIncompatibleException.FlowVersionIncompatibleException
                     "that is incompatible with the current installed version of"
             )
-        }
-    }
-
-    private fun DriverDSL.createSuspendedFlowInBob() {
-        val (alice, bob) = listOf(
-                startNode(providedName = ALICE_NAME),
-                startNode(NodeParameters(providedName = BOB_NAME, additionalCordapps = listOf(defaultCordapp)))
-        ).map { it.getOrThrow() }
-
-        alice.stop() // Stop Alice so that Bob never receives the message
-
-        bob.rpc.startFlow(FlowCheckpointVersionNodeStartupCheckTest::ReceiverFlow, alice.nodeInfo.singleIdentity())
-        // Wait until Bob's flow has started
-        bob.rpc.stateMachinesFeed().let { it.updates.map { it.id }.startWith(it.snapshot.map { it.id }) }.toBlocking().first()
-        bob.stop()
     }
 
     private fun DriverDSL.assertBobFailsToStartWithLogMessage(logMessage: String) {
@@ -107,4 +131,13 @@ class FlowCheckpointVersionNodeStartupCheckTest {
         @Suspendable
         override fun call() = otherSide.send("Hello!")
     }
+
+    @StartableByRPC
+    class UpdateStatusToPaused(private val id: StateMachineRunId): FlowLogic<Unit>() {
+        @Suspendable
+        override fun call() {
+            val statement = "Update node_checkpoints set status = ${Checkpoint.FlowStatus.PAUSED.ordinal} where flow_id = '${id.uuid}'"
+            serviceHub.jdbcSession().prepareStatement(statement).execute()
+        }
+    }
 }
diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineErrorHandlingTest.kt
index 37d6cce6ac..311e1d562e 100644
--- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineErrorHandlingTest.kt
+++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineErrorHandlingTest.kt
@@ -112,11 +112,10 @@ abstract class StateMachineErrorHandlingTest {
         submit.addScripts(listOf(ScriptText("Test script", rules)))
     }
 
-    internal fun getBytemanOutput(nodeHandle: NodeHandle): List<String> {
-        return nodeHandle.baseDirectory
-            .list()
-            .first { it.toString().contains("net.corda.node.Corda") && it.toString().contains("stdout.log") }
-            .readAllLines()
+    private fun NodeHandle.getBytemanOutput(): List<String> {
+        return baseDirectory.list()
+            .filter { "net.corda.node.Corda" in it.toString() && "stdout.log" in it.toString() }
+            .flatMap { it.readAllLines() }
     }
 
     internal fun OutOfProcessImpl.stop(timeout: Duration): Boolean {
@@ -126,6 +125,10 @@ abstract class StateMachineErrorHandlingTest {
         }.also { onStopCallback() }
     }
 
+    internal fun NodeHandle.assertBytemanOutput(string: String, count: Int) {
+        assertEquals(count, getBytemanOutput().filter { string in it }.size)
+    }
+
     @Suppress("LongParameterList")
     internal fun CordaRPCOps.assertHospitalCounts(
         discharged: Int = 0,
@@ -246,6 +249,7 @@ abstract class StateMachineErrorHandlingTest {
     // Internal use for testing only!!
     @StartableByRPC
     class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
+        @Suspendable
         override fun call(): HospitalCounts =
             HospitalCounts(
                 serviceHub.cordaService(HospitalCounter::class.java).dischargedCounter,
diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt
index 07550ef709..97ac7a6387 100644
--- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt
+++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineFlowInitErrorHandlingTest.kt
@@ -2,9 +2,11 @@ package net.corda.node.services.statemachine
 
 import net.corda.core.CordaRuntimeException
 import net.corda.core.messaging.startFlow
+import net.corda.core.messaging.startFlowWithClientId
 import net.corda.core.utilities.getOrThrow
 import net.corda.core.utilities.seconds
 import net.corda.node.services.api.CheckpointStorage
+import net.corda.nodeapi.internal.persistence.DatabaseTransaction
 import net.corda.testing.core.ALICE_NAME
 import net.corda.testing.core.CHARLIE_NAME
 import net.corda.testing.core.singleIdentity
@@ -58,6 +60,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
                 IF readCounter("counter") < 3
                 DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
                 ENDRULE
+                
+                RULE Log external start flow event
+                CLASS $stateMachineManagerClassName
+                METHOD onExternalStartFlow
+                AT ENTRY
+                IF true
+                DO traceln("External start flow event")
+                ENDRULE
             """.trimIndent()
 
             submitBytemanRules(rules, port)
@@ -69,6 +79,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
                 30.seconds
             )
 
+            alice.assertBytemanOutput("External start flow event", 4)
             alice.rpc.assertNumberOfCheckpointsAllZero()
             alice.rpc.assertHospitalCounts(discharged = 3)
             assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
@@ -256,6 +267,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
                 IF readCounter("counter") < 4
                 DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
                 ENDRULE
+                
+                RULE Log external start flow event
+                CLASS $stateMachineManagerClassName
+                METHOD onExternalStartFlow
+                AT ENTRY
+                IF true
+                DO traceln("External start flow event")
+                ENDRULE
             """.trimIndent()
 
             submitBytemanRules(rules, port)
@@ -267,6 +286,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
             // flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
             Thread.sleep(30.seconds.toMillis())
 
+            alice.assertBytemanOutput("External start flow event", 4)
             alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
             alice.rpc.assertHospitalCounts(
                 discharged = 3,
@@ -331,6 +351,380 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
         }
     }
 
+    /**
+     * Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state).
+     * This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node.
+     *
+     * The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier
+     * to test.
+     *
+     * The exception is thrown 3 times.
+     *
+     * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
+     * succeeds and the flow finishes.
+     *
+     * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
+     *
+     * The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false
+     * at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state.
+     *
+     */
+    @Test(timeout = 300_000)
+    fun `error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() {
+        startDriver {
+            val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
+
+            val rules = """
+                RULE Create Counter
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF createCounter("counter", $counter)
+                DO traceln("Counter created")
+                ENDRULE
+
+                RULE Flag when commit transaction reached
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF true
+                DO flag("commit")
+                ENDRULE
+                
+                RULE Throw exception on executeSignalFlowHasStarted action
+                CLASS ${DatabaseTransaction::class.java.name}
+                METHOD commit
+                AT EXIT
+                IF readCounter("counter") < 3 && flagged("commit")
+                DO incrementCounter("counter"); clear("commit"); traceln("Throwing exception"); throw new java.sql.SQLException("you thought it worked didnt you!", "1")
+                ENDRULE
+                
+                RULE Log external start flow event
+                CLASS $stateMachineManagerClassName
+                METHOD onExternalStartFlow
+                AT ENTRY
+                IF true
+                DO traceln("External start flow event")
+                ENDRULE
+            """.trimIndent()
+
+            submitBytemanRules(rules, port)
+
+            alice.rpc.startFlow(
+                StateMachineErrorHandlingTest::SendAMessageFlow,
+                charlie.nodeInfo.singleIdentity()
+            ).returnValue.getOrThrow(
+                30.seconds
+            )
+
+            alice.assertBytemanOutput("External start flow event", 1)
+            alice.rpc.assertNumberOfCheckpointsAllZero()
+            alice.rpc.assertHospitalCounts(discharged = 3)
+            assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
+        }
+    }
+
+    /**
+     * Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
+     * (remains in an unstarted state).
+     *
+     * The exception is thrown 3 times.
+     *
+     * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
+     * succeeds and the flow finishes.
+     *
+     * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
+     *
+     */
+    @Test(timeout = 300_000)
+    fun `with client id - error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
+        startDriver {
+            val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
+
+            val rules = """
+                RULE Create Counter
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF createCounter("counter", $counter)
+                DO traceln("Counter created")
+                ENDRULE
+
+                RULE Throw exception on executeCommitTransaction action
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF readCounter("counter") < 3
+                DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
+                ENDRULE
+                
+                RULE Log external start flow event
+                CLASS $stateMachineManagerClassName
+                METHOD onExternalStartFlow
+                AT ENTRY
+                IF true
+                DO traceln("External start flow event")
+                ENDRULE
+            """.trimIndent()
+
+            submitBytemanRules(rules, port)
+
+            alice.rpc.startFlowWithClientId(
+                "here is my client id",
+                StateMachineErrorHandlingTest::SendAMessageFlow,
+                charlie.nodeInfo.singleIdentity()
+            ).returnValue.getOrThrow(
+                30.seconds
+            )
+
+            alice.assertBytemanOutput("External start flow event", 4)
+            alice.rpc.assertNumberOfCheckpoints(completed = 1)
+            alice.rpc.assertHospitalCounts(discharged = 3)
+            assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
+        }
+    }
+
+    /**
+     * Throws an exception when calling [FlowStateMachineImpl.processEvent].
+     *
+     * This is not an expected place for an exception to occur, but allows us to test what happens when a random exception is propagated
+     * up to [FlowStateMachineImpl.run] during flow initialisation.
+     *
+     * A "Transaction context is missing" exception is thrown due to where the exception is thrown (no transaction is created so this is
+     * thrown when leaving [FlowStateMachineImpl.processEventsUntilFlowIsResumed] due to the finally block).
+     */
+    @Test(timeout = 300_000)
+    fun `with client id - unexpected error during flow initialisation throws exception to client`() {
+        startDriver {
+            val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
+            val rules = """
+                RULE Create Counter
+                CLASS ${FlowStateMachineImpl::class.java.name}
+                METHOD processEvent
+                AT ENTRY
+                IF createCounter("counter", $counter)
+                DO traceln("Counter created")
+                ENDRULE
+                
+                RULE Throw exception
+                CLASS ${FlowStateMachineImpl::class.java.name}
+                METHOD processEvent
+                AT ENTRY
+                IF readCounter("counter") < 1
+                DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.lang.RuntimeException("die dammit die")
+                ENDRULE
+            """.trimIndent()
+
+            submitBytemanRules(rules, port)
+
+            assertFailsWith<CordaRuntimeException> {
+                alice.rpc.startFlowWithClientId(
+                    "give me all of your client ids, or else",
+                    StateMachineErrorHandlingTest::SendAMessageFlow,
+                    charlie.nodeInfo.singleIdentity()
+                ).returnValue.getOrThrow(30.seconds)
+            }
+
+            alice.rpc.assertNumberOfCheckpoints(failed = 1)
+            alice.rpc.assertHospitalCounts(propagated = 1)
+            assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
+        }
+    }
+
+    /**
+     * Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
+     * (remains in an unstarted state).
+     *
+     * The exception is thrown 4 times.
+     *
+     * This causes the transition to be discharged from the hospital 3 times (retries 3 times) and then be kept in for observation.
+     *
+     * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
+     */
+    @Test(timeout = 450_000)
+    fun `with client id - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
+        startDriver {
+            val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
+
+            val rules = """
+                RULE Create Counter
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF createCounter("counter", $counter)
+                DO traceln("Counter created")
+                ENDRULE
+
+                RULE Throw exception on executeCommitTransaction action
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF readCounter("counter") < 4
+                DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
+                ENDRULE
+                
+                RULE Log external start flow event
+                CLASS $stateMachineManagerClassName
+                METHOD onExternalStartFlow
+                AT ENTRY
+                IF true
+                DO traceln("External start flow event")
+                ENDRULE
+            """.trimIndent()
+
+            submitBytemanRules(rules, port)
+
+            executor.execute {
+                alice.rpc.startFlowWithClientId(
+                    "please sir, can i have a client id?",
+                    StateMachineErrorHandlingTest::SendAMessageFlow,
+                    charlie.nodeInfo.singleIdentity()
+                )
+            }
+
+            // flow is not signaled as started calls to [getOrThrow] will hang, sleeping instead
+            Thread.sleep(30.seconds.toMillis())
+
+            alice.assertBytemanOutput("External start flow event", 4)
+            alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
+            alice.rpc.assertHospitalCounts(
+                discharged = 3,
+                observation = 1
+            )
+            assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
+            val terminated = (alice as OutOfProcessImpl).stop(60.seconds)
+            assertTrue(terminated, "The node must be shutdown before it can be restarted")
+            val (alice2, _) = createBytemanNode(ALICE_NAME)
+            Thread.sleep(20.seconds.toMillis())
+            alice2.rpc.assertNumberOfCheckpoints(completed = 1)
+        }
+    }
+
+    /**
+     * Throws an exception when performing an [Action.CommitTransaction] event before the flow has initialised and saved its first checkpoint
+     * (remains in an unstarted state).
+     *
+     * An exception is thrown when committing a database transaction during a transition to trigger the retry of the flow. Another
+     * exception is then thrown during the retry itself.
+     *
+     * The flow then retries the retry causing the flow to complete successfully.
+     */
+    @Test(timeout = 300_000)
+    fun `with client id - error during retrying a flow that failed when committing its original checkpoint will retry the flow again and complete successfully`() {
+        startDriver {
+            val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
+
+            val rules = """
+                RULE Throw exception on executeCommitTransaction action after first suspend + commit
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF !flagged("commit_exception_flag")
+                DO flag("commit_exception_flag"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
+                ENDRULE
+                
+                RULE Throw exception on retry
+                CLASS $stateMachineManagerClassName
+                METHOD onExternalStartFlow
+                AT ENTRY
+                IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")
+                DO flag("retry_exception_flag"); traceln("Throwing retry exception"); throw new java.lang.RuntimeException("Here we go again")
+                ENDRULE
+            """.trimIndent()
+
+            submitBytemanRules(rules, port)
+
+            alice.rpc.startFlowWithClientId(
+                "hi, i'd like to be your client id",
+                StateMachineErrorHandlingTest::SendAMessageFlow,
+                charlie.nodeInfo.singleIdentity()
+            ).returnValue.getOrThrow(
+                30.seconds
+            )
+
+            alice.rpc.assertNumberOfCheckpoints(completed = 1)
+            alice.rpc.assertHospitalCounts(
+                discharged = 1,
+                dischargedRetry = 1
+            )
+            assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
+        }
+    }
+
+    /**
+     * Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state).
+     * This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node.
+     *
+     * The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier
+     * to test.
+     *
+     * The exception is thrown 3 times.
+     *
+     * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
+     * succeeds and the flow finishes.
+     *
+     * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
+     *
+     * The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false
+     * at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state.
+     *
+     */
+    @Test(timeout = 300_000)
+    fun `with client id - error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() {
+        startDriver {
+            val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
+
+            val rules = """
+                RULE Create Counter
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF createCounter("counter", $counter)
+                DO traceln("Counter created")
+                ENDRULE
+
+                RULE Flag when commit transaction reached
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF true
+                DO flag("commit")
+                ENDRULE
+                
+                RULE Throw exception on executeSignalFlowHasStarted action
+                CLASS ${DatabaseTransaction::class.java.name}
+                METHOD commit
+                AT EXIT
+                IF readCounter("counter") < 3 && flagged("commit")
+                DO incrementCounter("counter"); clear("commit"); traceln("Throwing exception"); throw new java.sql.SQLException("you thought it worked didnt you!", "1")
+                ENDRULE
+                
+                RULE Log external start flow event
+                CLASS $stateMachineManagerClassName
+                METHOD onExternalStartFlow
+                AT ENTRY
+                IF true
+                DO traceln("External start flow event")
+                ENDRULE
+            """.trimIndent()
+
+            submitBytemanRules(rules, port)
+
+            alice.rpc.startFlowWithClientId(
+                "hello im a client id",
+                StateMachineErrorHandlingTest::SendAMessageFlow,
+                charlie.nodeInfo.singleIdentity()
+            ).returnValue.getOrThrow(
+                30.seconds
+            )
+
+            alice.assertBytemanOutput("External start flow event", 1)
+            alice.rpc.assertNumberOfCheckpoints(completed = 1)
+            alice.rpc.assertHospitalCounts(discharged = 3)
+            assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
+        }
+    }
+
     /**
      * Throws an exception when performing an [Action.CommitTransaction] event on a responding node before the flow has initialised and
      * saved its first checkpoint (remains in an unstarted state).
@@ -363,6 +757,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
                 IF readCounter("counter") < 3
                 DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
                 ENDRULE
+                
+                RULE Log session init event
+                CLASS $stateMachineManagerClassName
+                METHOD onSessionInit
+                AT ENTRY
+                IF true
+                DO traceln("On session init event")
+                ENDRULE
             """.trimIndent()
 
             submitBytemanRules(rules, port)
@@ -375,6 +777,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
             )
 
             alice.rpc.assertNumberOfCheckpointsAllZero()
+            charlie.assertBytemanOutput("On session init event", 4)
             charlie.rpc.assertNumberOfCheckpointsAllZero()
             charlie.rpc.assertHospitalCounts(discharged = 3)
         }
@@ -411,6 +814,14 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
                 IF readCounter("counter") < 4
                 DO incrementCounter("counter"); traceln("Throwing exception"); throw new java.sql.SQLException("die dammit die", "1")
                 ENDRULE
+                
+                RULE Log session init event
+                CLASS $stateMachineManagerClassName
+                METHOD onSessionInit
+                AT ENTRY
+                IF true
+                DO traceln("On session init event")
+                ENDRULE
             """.trimIndent()
 
             submitBytemanRules(rules, port)
@@ -423,6 +834,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
             Thread.sleep(30.seconds.toMillis())
 
             alice.rpc.assertNumberOfCheckpoints(runnable = 1)
+            charlie.assertBytemanOutput("On session init event", 4)
             charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
             charlie.rpc.assertHospitalCounts(
                 discharged = 3,
@@ -463,7 +875,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
                 CLASS $actionExecutorClassName
                 METHOD executeCommitTransaction
                 AT ENTRY
-                IF createCounter("counter", $counter) 
+                IF createCounter("counter", $counter) && createCounter("counter_2", $counter) 
                 DO traceln("Counter created")
                 ENDRULE
 
@@ -479,8 +891,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
                 INTERFACE ${CheckpointStorage::class.java.name}
                 METHOD getCheckpoint
                 AT ENTRY
-                IF true
-                DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
+                IF readCounter("counter_2") < 3
+                DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
                 ENDRULE
             """.trimIndent()
 
@@ -496,7 +908,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
             alice.rpc.assertNumberOfCheckpointsAllZero()
             charlie.rpc.assertHospitalCounts(
                 discharged = 3,
-                observation = 0
+                dischargedRetry = 1
             )
             assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
             assertEquals(0, charlie.rpc.stateMachinesSnapshot().size)
@@ -527,7 +939,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
                 CLASS $actionExecutorClassName
                 METHOD executeCommitTransaction
                 AT ENTRY
-                IF createCounter("counter", $counter) 
+                IF createCounter("counter", $counter) && createCounter("counter_2", $counter) 
                 DO traceln("Counter created")
                 ENDRULE
 
@@ -543,8 +955,8 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
                 INTERFACE ${CheckpointStorage::class.java.name}
                 METHOD getCheckpoint
                 AT ENTRY
-                IF true
-                DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
+                IF readCounter("counter_2") < 3
+                DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
                 ENDRULE
             """.trimIndent()
 
@@ -562,10 +974,84 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
             charlie.rpc.assertNumberOfCheckpoints(hospitalized = 1)
             charlie.rpc.assertHospitalCounts(
                 discharged = 3,
-                observation = 1
+                observation = 1,
+                dischargedRetry = 1
             )
             assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
             assertEquals(1, charlie.rpc.stateMachinesSnapshot().size)
         }
     }
+
+    /**
+     * Throws an exception when after the first [Action.CommitTransaction] event before the flow has initialised (remains in an unstarted state).
+     * This is to cover transient issues, where the transaction committed the checkpoint but failed to respond to the node.
+     *
+     * The exception is thrown when performing [Action.SignalFlowHasStarted], the error won't actually appear here but it makes it easier
+     * to test.
+     *
+     * The exception is thrown 3 times.
+     *
+     * This causes the transition to be discharged from the hospital 3 times (retries 3 times). On the final retry the transition
+     * succeeds and the flow finishes.
+     *
+     * Each time the flow retries, it starts from the beginning of the flow (due to being in an unstarted state).
+     *
+     * The first retry will load the checkpoint that the flow doesn't know exists ([StateMachineState.isAnyCheckpointPersisted] is false
+     * at this point). The flag gets switched to true after this first retry and the flow has now returned to an expected state.
+     *
+     */
+    @Test(timeout = 300_000)
+    fun `responding flow - error during transition when checkpoint commits but transient db exception is thrown during flow initialisation will retry and complete successfully`() {
+        startDriver {
+            val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME)
+
+            val rules = """
+                RULE Create Counter
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF createCounter("counter", $counter)
+                DO traceln("Counter created")
+                ENDRULE
+
+               RULE Flag when commit transaction reached
+                CLASS $actionExecutorClassName
+                METHOD executeCommitTransaction
+                AT ENTRY
+                IF true
+                DO flag("commit")
+                ENDRULE
+                
+                RULE Throw exception on executeSignalFlowHasStarted action
+                CLASS ${DatabaseTransaction::class.java.name}
+                METHOD commit
+                AT EXIT
+                IF readCounter("counter") < 3 && flagged("commit")
+                DO incrementCounter("counter"); clear("commit"); traceln("Throwing exception"); throw new java.sql.SQLException("you thought it worked didnt you!", "1")
+                ENDRULE
+                
+                RULE Log session init event
+                CLASS $stateMachineManagerClassName
+                METHOD onSessionInit
+                AT ENTRY
+                IF true
+                DO traceln("On session init event")
+                ENDRULE
+            """.trimIndent()
+
+            submitBytemanRules(rules, port)
+
+            alice.rpc.startFlow(
+                StateMachineErrorHandlingTest::SendAMessageFlow,
+                charlie.nodeInfo.singleIdentity()
+            ).returnValue.getOrThrow(
+                30.seconds
+            )
+
+            alice.rpc.assertNumberOfCheckpointsAllZero()
+            charlie.assertBytemanOutput("On session init event", 1)
+            charlie.rpc.assertNumberOfCheckpointsAllZero()
+            charlie.rpc.assertHospitalCounts(discharged = 3)
+        }
+    }
 }
\ No newline at end of file
diff --git a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt
index f97800214f..894a66692f 100644
--- a/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt
+++ b/node/src/integration-test-slow/kotlin/net/corda/node/services/statemachine/StateMachineGeneralErrorHandlingTest.kt
@@ -494,7 +494,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
                 CLASS $actionExecutorClassName
                 METHOD executeCommitTransaction
                 AT ENTRY
-                IF createCounter("counter", $counter) 
+                IF createCounter("counter", $counter) && createCounter("counter_2", $counter) 
                 DO traceln("Counter created")
                 ENDRULE
 
@@ -510,8 +510,16 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
                 INTERFACE ${CheckpointStorage::class.java.name}
                 METHOD getCheckpoint
                 AT ENTRY
+                IF readCounter("counter_2") < 3
+                DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
+                ENDRULE
+                
+                RULE Log external start flow event
+                CLASS $stateMachineManagerClassName
+                METHOD onExternalStartFlow
+                AT ENTRY
                 IF true
-                DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
+                DO traceln("External start flow event")
                 ENDRULE
             """.trimIndent()
 
@@ -527,7 +535,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
             alice.rpc.assertNumberOfCheckpointsAllZero()
             alice.rpc.assertHospitalCounts(
                 discharged = 3,
-                observation = 0
+                dischargedRetry = 1
             )
             assertEquals(0, alice.rpc.stateMachinesSnapshot().size)
         }
@@ -557,7 +565,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
                 CLASS $actionExecutorClassName
                 METHOD executeCommitTransaction
                 AT ENTRY
-                IF createCounter("counter", $counter) 
+                IF createCounter("counter", $counter) && createCounter("counter_2", $counter) 
                 DO traceln("Counter created")
                 ENDRULE
 
@@ -573,8 +581,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
                 INTERFACE ${CheckpointStorage::class.java.name}
                 METHOD getCheckpoint
                 AT ENTRY
-                IF true
-                DO traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
+                IF readCounter("counter_2") < 3
+                DO incrementCounter("counter_2"); traceln("Throwing exception getting checkpoint"); throw new java.sql.SQLTransientConnectionException("Connection is not available")
                 ENDRULE
             """.trimIndent()
 
@@ -590,7 +598,8 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
             alice.rpc.assertNumberOfCheckpoints(hospitalized = 1)
             alice.rpc.assertHospitalCounts(
                 discharged = 3,
-                observation = 1
+                observation = 1,
+                dischargedRetry = 1
             )
             assertEquals(1, alice.rpc.stateMachinesSnapshot().size)
         }
diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt
index 499dcfd232..eff577eba5 100644
--- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowRetryTest.kt
@@ -468,4 +468,4 @@ class GetCheckpointNumberOfStatusFlow(private val flowStatus: Checkpoint.FlowSta
             }
         }
     }
-}
\ No newline at end of file
+}
diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt
index ec1bff03e5..99f5674620 100644
--- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt
@@ -140,6 +140,17 @@ class FlowWithClientIdTest {
             }.withMessage("java.lang.IllegalStateException: Bla bla bla")
         }
     }
+
+    @Test(timeout=300_000)
+    fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
+        val clientId = UUID.randomUUID().toString()
+        driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
+            val nodeA = startNode().getOrThrow()
+            nodeA.rpc.startFlowWithClientId(clientId, ::ResultFlow, 5).returnValue.getOrThrow(20.seconds)
+            val finishedFlows = nodeA.rpc.finishedFlowsWithClientIds()
+            assertEquals(true, finishedFlows[clientId])
+        }
+    }
 }
 
 @StartableByRPC
diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
index 9d11865935..544aaf84f6 100644
--- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
+++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
@@ -465,13 +465,24 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
             updateAppSchemasWithCheckpoints: Boolean
     ) {
         check(started == null) { "Node has already been started" }
+        check(updateCoreSchemas || updateAppSchemas) { "Neither core nor app schema scripts were specified" }
         Node.printBasicNodeInfo("Running database schema migration scripts ...")
         val props = configuration.dataSourceProperties
         if (props.isEmpty) throw DatabaseConfigurationException("There must be a database configured.")
+        var pendingAppChanges: Int = 0
+        var pendingCoreChanges: Int = 0
         database.startHikariPool(props, metricRegistry) { dataSource, haveCheckpoints ->
-            SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
-                    .checkOrUpdate(schemaService.internalSchemas, updateCoreSchemas, haveCheckpoints, true)
-                    .checkOrUpdate(schemaService.appSchemas, updateAppSchemas, !updateAppSchemasWithCheckpoints && haveCheckpoints, false)
+            val schemaMigration = SchemaMigration(dataSource, cordappLoader, configuration.baseDirectory, configuration.myLegalName)
+            if(updateCoreSchemas) {
+                schemaMigration.runMigration(haveCheckpoints, schemaService.internalSchemas, true)
+            } else {
+                pendingCoreChanges = schemaMigration.getPendingChangesCount(schemaService.internalSchemas, true)
+            }
+            if(updateAppSchemas) {
+                schemaMigration.runMigration(!updateAppSchemasWithCheckpoints && haveCheckpoints, schemaService.appSchemas, false)
+            } else {
+                pendingAppChanges = schemaMigration.getPendingChangesCount(schemaService.appSchemas, false)
+            }
         }
         // Now log the vendor string as this will also cause a connection to be tested eagerly.
         logVendorString(database, log)
@@ -491,7 +502,18 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
                 cordappProvider.start()
             }
         }
-        Node.printBasicNodeInfo("Database migration done.")
+        val updatedSchemas = listOfNotNull(
+                ("core").takeIf { updateCoreSchemas },
+                ("app").takeIf { updateAppSchemas }
+        ).joinToString(separator = " and ");
+
+        val pendingChanges = listOfNotNull(
+                ("no outstanding").takeIf { pendingAppChanges == 0 && pendingCoreChanges == 0 },
+                ("$pendingCoreChanges outstanding core").takeIf { !updateCoreSchemas && pendingCoreChanges > 0 },
+                ("$pendingAppChanges outstanding app").takeIf { !updateAppSchemas && pendingAppChanges > 0 }
+        ).joinToString(prefix = "There are ", postfix = " database changes.");
+
+        Node.printBasicNodeInfo("Database migration scripts for $updatedSchemas schemas complete. $pendingChanges")
     }
 
     fun runSchemaSync() {
@@ -583,6 +605,10 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
             throw e
         }
 
+        // Only execute futures/callbacks linked to [rootFuture] after the database transaction below is committed.
+        // This ensures that the node is fully ready before starting flows.
+        val rootFuture = openFuture<Void?>()
+
         // Do all of this in a database transaction so anything that might need a connection has one.
         val (resultingNodeInfo, readyFuture) = database.transaction(recoverableFailureTolerance = 0) {
             networkParametersStorage.setCurrentParameters(signedNetParams, trustRoot)
@@ -604,7 +630,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
             tokenizableServices = null
 
             verifyCheckpointsCompatible(frozenTokenizableServices)
-            val smmStartedFuture = smm.start(frozenTokenizableServices)
+            val callback = smm.start(frozenTokenizableServices)
+            val smmStartedFuture = rootFuture.map { callback() }
             // Shut down the SMM so no Fibers are scheduled.
             runOnStop += { smm.stop(acceptableLiveFiberCountOnStop()) }
             val flowMonitor = FlowMonitor(
@@ -624,6 +651,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
             resultingNodeInfo to readyFuture
         }
 
+        rootFuture.captureLater(services.networkMapCache.nodeReady)
+
         readyFuture.map { ready ->
             if (ready) {
                 // NB: Dispatch lifecycle events outside of transaction to ensure attachments and the like persisted into the DB
diff --git a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt
index 5042e2e9ff..052de0ab3c 100644
--- a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt
+++ b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt
@@ -6,6 +6,7 @@ import net.corda.core.flows.FlowLogic
 import net.corda.core.node.ServiceHub
 import net.corda.core.serialization.internal.CheckpointSerializationDefaults
 import net.corda.node.services.api.CheckpointStorage
+import net.corda.node.services.statemachine.Checkpoint
 import net.corda.node.services.statemachine.SubFlow
 import net.corda.node.services.statemachine.SubFlowVersion
 import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
@@ -13,6 +14,8 @@ import net.corda.serialization.internal.withTokenContext
 
 object CheckpointVerifier {
 
+    private val statusToVerify = setOf(Checkpoint.FlowStatus.RUNNABLE, Checkpoint.FlowStatus.HOSPITALIZED, Checkpoint.FlowStatus.PAUSED)
+
     /**
      * Verifies that all Checkpoints stored in the db can be safely loaded with the currently installed version.
      * @throws CheckpointIncompatibleException if any offending checkpoint is found.
@@ -35,7 +38,7 @@ object CheckpointVerifier {
 
         val cordappsByHash = currentCordapps.associateBy { it.jarHash }
 
-        checkpointStorage.getCheckpointsToRun().use {
+        checkpointStorage.getCheckpoints(statusToVerify).use {
             it.forEach { (_, serializedCheckpoint) ->
                 val checkpoint = try {
                     serializedCheckpoint.deserialize(checkpointSerializationContext)
diff --git a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt
index ab311ecaed..b9beecb325 100644
--- a/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt
+++ b/node/src/main/kotlin/net/corda/node/internal/CordaRPCOpsImpl.kt
@@ -170,6 +170,8 @@ internal class CordaRPCOpsImpl(
 
     override fun removeClientId(clientId: String): Boolean = smm.removeClientId(clientId)
 
+    override fun finishedFlowsWithClientIds(): Map<String, Boolean> = smm.finishedFlowsWithClientIds()
+
     override fun stateMachinesFeed(): DataFeed<List<StateMachineInfo>, StateMachineUpdate> {
 
         val (allStateMachines, changes) = smm.track()
diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt
index 49ea860589..6e750619f4 100644
--- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt
+++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt
@@ -15,14 +15,20 @@ interface CheckpointStorage {
     /**
      * Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id
      */
-    fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>,
-                      serializedCheckpointState: SerializedBytes<CheckpointState>)
+    fun addCheckpoint(
+        id: StateMachineRunId, checkpoint: Checkpoint,
+        serializedFlowState: SerializedBytes<FlowState>?,
+        serializedCheckpointState: SerializedBytes<CheckpointState>
+    )
 
     /**
      * Update an existing checkpoint. Will throw if there is not checkpoint for this id.
      */
-    fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>?,
-                         serializedCheckpointState: SerializedBytes<CheckpointState>)
+    fun updateCheckpoint(
+        id: StateMachineRunId, checkpoint: Checkpoint,
+        serializedFlowState: SerializedBytes<FlowState>?,
+        serializedCheckpointState: SerializedBytes<CheckpointState>
+    )
 
     /**
      * Update an existing checkpoints status ([Checkpoint.status]).
@@ -78,7 +84,7 @@ interface CheckpointStorage {
      * until the underlying database connection is closed, so any processing should happen before it is closed.
      * This method does not fetch [Checkpoint.Serialized.serializedFlowState] to save memory.
      */
-    fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
+    fun getPausedCheckpoints(): Stream<Triple<StateMachineRunId, Checkpoint.Serialized, Boolean>>
 
     fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>>
 
diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt
index cf2976f662..a088e3bf6e 100644
--- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt
+++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt
@@ -373,7 +373,7 @@ class DBCheckpointStorage(
     override fun addCheckpoint(
         id: StateMachineRunId,
         checkpoint: Checkpoint,
-        serializedFlowState: SerializedBytes<FlowState>,
+        serializedFlowState: SerializedBytes<FlowState>?,
         serializedCheckpointState: SerializedBytes<CheckpointState>
     ) {
         val now = clock.instant()
@@ -555,15 +555,17 @@ class DBCheckpointStorage(
         return currentDBSession().find(DBFlowException::class.java, id.uuid.toString())
     }
 
-    override fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
+    override fun getPausedCheckpoints(): Stream<Triple<StateMachineRunId, Checkpoint.Serialized, Boolean>> {
         val session = currentDBSession()
         val jpqlQuery = """select new ${DBPausedFields::class.java.name}(checkpoint.id, blob.checkpoint, checkpoint.status,
-                checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible) from ${DBFlowCheckpoint::class.java.name}
-                checkpoint join ${DBFlowCheckpointBlob::class.java.name} blob on checkpoint.blob = blob.id where
-                checkpoint.status = ${FlowStatus.PAUSED.ordinal}""".trimIndent()
+                checkpoint.progressStep, checkpoint.ioRequestType, checkpoint.compatible, exception.id) 
+                from ${DBFlowCheckpoint::class.java.name} checkpoint 
+                join ${DBFlowCheckpointBlob::class.java.name} blob on checkpoint.blob = blob.id
+                left outer join ${DBFlowException::class.java.name} exception on checkpoint.exceptionDetails = exception.id
+                where checkpoint.status = ${FlowStatus.PAUSED.ordinal}""".trimIndent()
         val query = session.createQuery(jpqlQuery, DBPausedFields::class.java)
         return query.resultList.stream().map {
-            StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint()
+            Triple(StateMachineRunId(UUID.fromString(it.id)), it.toSerializedCheckpoint(), it.wasHospitalized)
         }
     }
 
@@ -722,8 +724,10 @@ class DBCheckpointStorage(
         val status: FlowStatus,
         val progressStep: String?,
         val ioRequestType: String?,
-        val compatible: Boolean
+        val compatible: Boolean,
+        exception: String?
     ) {
+        val wasHospitalized = exception != null
         fun toSerializedCheckpoint(): Checkpoint.Serialized {
             return Checkpoint.Serialized(
                 serializedCheckpointState = SerializedBytes(checkpoint),
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt
index d5faad801e..615afb8df5 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt
@@ -145,7 +145,7 @@ sealed class Action {
     /**
      * Commit the current database transaction.
      */
-    object CommitTransaction : Action() {
+    data class CommitTransaction(val currentState: StateMachineState) : Action() {
         override fun toString() = "CommitTransaction"
     }
 
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt
index b1162a390b..929d3d594a 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt
@@ -61,7 +61,7 @@ internal class ActionExecutorImpl(
             is Action.RemoveFlow -> executeRemoveFlow(action)
             is Action.CreateTransaction -> executeCreateTransaction()
             is Action.RollbackTransaction -> executeRollbackTransaction()
-            is Action.CommitTransaction -> executeCommitTransaction()
+            is Action.CommitTransaction -> executeCommitTransaction(action)
             is Action.ExecuteAsyncOperation -> executeAsyncOperation(fiber, action)
             is Action.ReleaseSoftLocks -> executeReleaseSoftLocks(action)
             is Action.RetryFlowFromSafePoint -> executeRetryFlowFromSafePoint(action)
@@ -94,10 +94,7 @@ internal class ActionExecutorImpl(
         if (action.isCheckpointUpdate) {
             checkpointStorage.updateCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState)
         } else {
-            if (flowState is FlowState.Finished) {
-                throw IllegalStateException("A new checkpoint cannot be created with a finished flow state.")
-            }
-            checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState!!, serializedCheckpointState)
+            checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState, serializedCheckpointState)
         }
     }
 
@@ -222,13 +219,14 @@ internal class ActionExecutorImpl(
 
     @Suspendable
     @Throws(SQLException::class)
-    private fun executeCommitTransaction() {
+    private fun executeCommitTransaction(action: Action.CommitTransaction) {
         try {
             contextTransaction.commit()
         } finally {
             contextTransaction.close()
             contextTransactionOrNull = null
         }
+        action.currentState.run { numberOfCommits = checkpoint.checkpointState.numberOfCommits }
     }
 
     @Suppress("TooGenericExceptionCaught")
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt
index aa340c582a..9aab0183b7 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowCreator.kt
@@ -15,6 +15,7 @@ import net.corda.core.serialization.SerializedBytes
 import net.corda.core.serialization.internal.CheckpointSerializationContext
 import net.corda.core.serialization.internal.checkpointDeserialize
 import net.corda.core.serialization.internal.checkpointSerialize
+import net.corda.core.utilities.ProgressTracker
 import net.corda.core.utilities.contextLogger
 import net.corda.node.services.api.CheckpointStorage
 import net.corda.node.services.api.ServiceHubInternal
@@ -23,6 +24,8 @@ import net.corda.node.services.statemachine.transitions.StateMachine
 import net.corda.node.utilities.isEnabledTimedFlow
 import net.corda.nodeapi.internal.persistence.CordaPersistence
 import org.apache.activemq.artemis.utils.ReusableLatch
+import org.apache.commons.lang3.reflect.FieldUtils
+import java.lang.reflect.Field
 import java.security.SecureRandom
 import java.util.concurrent.Semaphore
 
@@ -32,7 +35,9 @@ data class NonResidentFlow(
     val runId: StateMachineRunId,
     var checkpoint: Checkpoint,
     val resultFuture: OpenFuture<Any?> = openFuture(),
-    val resumable: Boolean = true
+    val resumable: Boolean = true,
+    val hospitalized: Boolean = false,
+    val progressTracker: ProgressTracker? = null
 ) {
     val events = mutableListOf<ExternalEvent>()
 
@@ -41,6 +46,7 @@ data class NonResidentFlow(
     }
 }
 
+@Suppress("TooManyFunctions")
 class FlowCreator(
     private val checkpointSerializationContext: CheckpointSerializationContext,
     private val checkpointStorage: CheckpointStorage,
@@ -70,7 +76,12 @@ class FlowCreator(
             }
             else -> nonResidentFlow.checkpoint
         }
-        return createFlowFromCheckpoint(nonResidentFlow.runId, checkpoint, resultFuture = nonResidentFlow.resultFuture)
+        return createFlowFromCheckpoint(
+            nonResidentFlow.runId,
+            checkpoint,
+            resultFuture = nonResidentFlow.resultFuture,
+            progressTracker = nonResidentFlow.progressTracker
+        )
     }
 
     @Suppress("LongParameterList")
@@ -80,7 +91,8 @@ class FlowCreator(
         reloadCheckpointAfterSuspendCount: Int? = null,
         lock: Semaphore = Semaphore(1),
         resultFuture: OpenFuture<Any?> = openFuture(),
-        firstRestore: Boolean = true
+        firstRestore: Boolean = true,
+        progressTracker: ProgressTracker? = null
     ): Flow<*>? {
         val fiber = oldCheckpoint.getFiberFromCheckpoint(runId, firstRestore)
         var checkpoint = oldCheckpoint
@@ -91,6 +103,7 @@ class FlowCreator(
             updateCompatibleInDb(runId, true)
             checkpoint = checkpoint.copy(compatible = true)
         }
+
         checkpoint = checkpoint.copy(status = Checkpoint.FlowStatus.RUNNABLE)
 
         fiber.logic.stateMachine = fiber
@@ -102,8 +115,10 @@ class FlowCreator(
             anyCheckpointPersisted = true,
             reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount
                 ?: if (reloadCheckpointAfterSuspend) checkpoint.checkpointState.numberOfSuspends else null,
+            numberOfCommits = checkpoint.checkpointState.numberOfCommits,
             lock = lock
         )
+        injectOldProgressTracker(progressTracker, fiber.logic)
         return Flow(fiber, resultFuture)
     }
 
@@ -148,6 +163,7 @@ class FlowCreator(
             fiber = flowStateMachineImpl,
             anyCheckpointPersisted = existingCheckpoint != null,
             reloadCheckpointAfterSuspendCount = if (reloadCheckpointAfterSuspend) 0 else null,
+            numberOfCommits = existingCheckpoint?.checkpointState?.numberOfCommits ?: 0,
             lock = Semaphore(1),
             deduplicationHandler = deduplicationHandler,
             senderUUID = senderUUID
@@ -229,6 +245,7 @@ class FlowCreator(
         fiber: FlowStateMachineImpl<*>,
         anyCheckpointPersisted: Boolean,
         reloadCheckpointAfterSuspendCount: Int?,
+        numberOfCommits: Int,
         lock: Semaphore,
         deduplicationHandler: DeduplicationHandler? = null,
         senderUUID: String? = null
@@ -246,7 +263,66 @@ class FlowCreator(
             flowLogic = fiber.logic,
             senderUUID = senderUUID,
             reloadCheckpointAfterSuspendCount = reloadCheckpointAfterSuspendCount,
+            numberOfCommits = numberOfCommits,
             lock = lock
         )
     }
+
+    /**
+     * The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that
+     * any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress
+     * updates are correctly sent out after the flow is retried.
+     *
+     * If the new tracker contains any child trackers from sub-flows, we need to attach those to the old tracker as well.
+     */
+    private fun injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
+        if (oldTracker != null) {
+            val newTracker = newFlowLogic.progressTracker
+            if (newTracker != null) {
+                attachNewChildren(oldTracker, newTracker)
+                replaceTracker(newFlowLogic, oldTracker)
+            }
+        }
+    }
+
+    private fun attachNewChildren(oldTracker: ProgressTracker, newTracker: ProgressTracker) {
+        oldTracker.currentStep = newTracker.currentStep
+        oldTracker.steps.forEachIndexed { index, step ->
+            val newStep = newTracker.steps[index]
+            val newChildTracker = newTracker.getChildProgressTracker(newStep)
+            newChildTracker?.let { child ->
+                oldTracker.setChildProgressTracker(step, child)
+            }
+        }
+        resubscribeToChildren(oldTracker)
+    }
+
+    /**
+     * Re-subscribes to child tracker observables. When a nested progress tracker is deserialized from a checkpoint,
+     * it retains the child links, but does not automatically re-subscribe to the child changes.
+     */
+    private fun resubscribeToChildren(tracker: ProgressTracker) {
+        tracker.steps.forEach {
+            val childTracker = tracker.getChildProgressTracker(it)
+            if (childTracker != null) {
+                tracker.setChildProgressTracker(it, childTracker)
+                resubscribeToChildren(childTracker)
+            }
+        }
+    }
+
+    /** Replaces the deserialized [ProgressTracker] in the [newFlowLogic] with the old one to retain old subscribers. */
+    private fun replaceTracker(newFlowLogic: FlowLogic<*>, oldProgressTracker: ProgressTracker?) {
+        val field = getProgressTrackerField(newFlowLogic)
+        field?.apply {
+            isAccessible = true
+            set(newFlowLogic, oldProgressTracker)
+        }
+    }
+
+    private fun getProgressTrackerField(newFlowLogic: FlowLogic<*>): Field? {
+        // The progress tracker field may have been overridden in an abstract superclass, so we have to traverse up
+        // the hierarchy.
+        return FieldUtils.getAllFieldsList(newFlowLogic::class.java).find { it.name == "progressTracker" }
+    }
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
index 7d2ac7f62a..267ff59aa6 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
@@ -237,12 +237,11 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
     }
 
     private fun Throwable.fillInLocalStackTrace(): Throwable {
-        fillInStackTrace()
-        // provide useful information that can be displayed to the user
-        // reflection use to access private field
+        // Fill in the stacktrace when the exception originates from another node
         when (this) {
             is UnexpectedFlowEndException -> {
                 DeclaredField<Party?>(UnexpectedFlowEndException::class.java, "peer", this).value?.let {
+                    fillInStackTrace()
                     stackTrace = arrayOf(
                         StackTraceElement(
                             "Received unexpected counter-flow exception from peer ${it.name}",
@@ -255,6 +254,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
             }
             is FlowException -> {
                 DeclaredField<Party?>(FlowException::class.java, "peer", this).value?.let {
+                    fillInStackTrace()
                     stackTrace = arrayOf(
                         StackTraceElement(
                             "Received counter-flow exception from peer ${it.name}",
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
index 27f9d36bf5..1802f6c89c 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
@@ -22,7 +22,6 @@ import net.corda.core.internal.concurrent.OpenFuture
 import net.corda.core.internal.concurrent.doneFuture
 import net.corda.core.internal.concurrent.map
 import net.corda.core.internal.concurrent.openFuture
-import net.corda.core.internal.mapNotNull
 import net.corda.core.internal.uncheckedCast
 import net.corda.core.messaging.DataFeed
 import net.corda.core.serialization.deserialize
@@ -41,7 +40,6 @@ import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInter
 import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
 import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
 import net.corda.node.utilities.AffinityExecutor
-import net.corda.node.utilities.injectOldProgressTracker
 import net.corda.node.utilities.isEnabledTimedFlow
 import net.corda.nodeapi.internal.persistence.CordaPersistence
 import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
@@ -153,7 +151,7 @@ internal class SingleThreadedStateMachineManager(
     override val changes: Observable<StateMachineManager.Change> = innerState.changesPublisher
 
     @Suppress("ComplexMethod")
-    override fun start(tokenizableServices: List<Any>, startMode: StateMachineManager.StartMode): CordaFuture<Unit> {
+    override fun start(tokenizableServices: List<Any>, startMode: StateMachineManager.StartMode): () -> Unit {
         checkQuasarJavaAgentPresence()
         val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
                 CheckpointSerializeAsTokenContextImpl(
@@ -200,13 +198,14 @@ internal class SingleThreadedStateMachineManager(
         // - Incompatible checkpoints need to be handled upon implementing CORDA-3897
         for (flow in fibers.values) {
             flow.fiber.clientId?.let {
-                innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(doneFuture(flow.fiber))
+                innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(flow.fiber.id, doneFuture(flow.fiber))
             }
         }
 
         for (pausedFlow in pausedFlows) {
             pausedFlow.value.checkpoint.checkpointState.invocationContext.clientId?.let {
                 innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Active(
+                    pausedFlow.key,
                     doneClientIdFuture(pausedFlow.key, pausedFlow.value.resultFuture, it)
                 )
             }
@@ -223,7 +222,7 @@ internal class SingleThreadedStateMachineManager(
             } ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.")
         }
 
-        return serviceHub.networkMapCache.nodeReady.map {
+        return {
             logger.info("Node ready, info: ${serviceHub.myInfo}")
             resumeRestoredFlows(fibers)
             flowMessaging.start { _, deduplicationHandler ->
@@ -312,17 +311,20 @@ internal class SingleThreadedStateMachineManager(
                         status
                     } else {
                         newFuture = openFuture()
-                        FlowWithClientIdStatus.Active(newFuture!!)
+                        FlowWithClientIdStatus.Active(flowId, newFuture!!)
                     }
                 }
             }
 
             // Flow -started with client id- already exists, return the existing's flow future and don't start a new flow.
             existingStatus?.let {
-                val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
-                return@startFlow uncheckedCast(existingFuture)
-            }
-            onClientIDNotFound?.invoke()
+                // If the flow ID is the same as the one recorded in the client ID map,
+                // then this start flow event has been retried, and we should not de-duplicate.
+                if (flowId != it.flowId) {
+                    val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
+                    return@startFlow uncheckedCast(existingFuture)
+                }
+            } ?: onClientIDNotFound?.invoke()
         }
 
         return try {
@@ -456,12 +458,8 @@ internal class SingleThreadedStateMachineManager(
             innerState.withLock { if (id in flows) return@Checkpoints }
             val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id)?.also {
                 if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
-                    if (checkpointStorage.removeFlowException(id)) {
-                        checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
-                    } else {
-                        logger.error("Unable to remove database exception for flow $id. Something is very wrong. The flow will not be loaded and run.")
-                        return@Checkpoints
-                    }
+                    checkpointStorage.removeFlowException(id)
+                    checkpointStorage.updateStatus(id, Checkpoint.FlowStatus.RUNNABLE)
                 }
             } ?: return@Checkpoints
             val flow = flowCreator.createFlowFromCheckpoint(id, checkpoint)
@@ -472,9 +470,9 @@ internal class SingleThreadedStateMachineManager(
                 flows[id] = flow
             }
         }
-        checkpointStorage.getPausedCheckpoints().forEach Checkpoints@{ (id, serializedCheckpoint) ->
+        checkpointStorage.getPausedCheckpoints().forEach Checkpoints@{ (id, serializedCheckpoint, hospitalised) ->
             val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return@Checkpoints
-            pausedFlows[id] = NonResidentFlow(id, checkpoint)
+            pausedFlows[id] = NonResidentFlow(id, checkpoint, hospitalized = hospitalised)
         }
         return Pair(flows, pausedFlows)
     }
@@ -495,40 +493,49 @@ internal class SingleThreadedStateMachineManager(
             logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
             return
         }
-        val flow = if (currentState.isAnyCheckpointPersisted) {
-            // We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
-            // we mirror exactly what happens when restarting the node.
-            val checkpoint = database.transaction {
-                val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
-                if (serializedCheckpoint == null) {
-                    logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
-                    return@transaction null
-                }
+        // We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
+        // we mirror exactly what happens when restarting the node.
+        // Ignore [isAnyCheckpointPersisted] as the checkpoint could be committed but the flag remains un-updated
+        val checkpointLoadingStatus = database.transaction {
+            val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) ?: return@transaction CheckpointLoadingStatus.NotFound
 
+            val checkpoint = serializedCheckpoint.let {
                 tryDeserializeCheckpoint(serializedCheckpoint, flowId)?.also {
                     if (it.status == Checkpoint.FlowStatus.HOSPITALIZED) {
-                        if (checkpointStorage.removeFlowException(flowId)) {
-                            checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
-                        } else {
-                            logger.error("Unable to remove database exception for flow $flowId. Something is very wrong. The flow will not be loaded and run.")
-                            return@transaction null
-                        }
+                        checkpointStorage.removeFlowException(flowId)
+                        checkpointStorage.updateStatus(flowId, Checkpoint.FlowStatus.RUNNABLE)
                     }
-                } ?: return@transaction null
-            } ?: return
+                } ?: return@transaction CheckpointLoadingStatus.CouldNotDeserialize
+            }
 
-            // Resurrect flow
-            flowCreator.createFlowFromCheckpoint(
-                flowId,
-                checkpoint,
-                currentState.reloadCheckpointAfterSuspendCount,
-                currentState.lock,
-                firstRestore = false
-            ) ?: return
-        } else {
-            // Just flow initiation message
-            null
+            CheckpointLoadingStatus.Success(checkpoint)
         }
+
+        val (flow, numberOfCommitsFromCheckpoint) = when {
+            // Resurrect flow
+            checkpointLoadingStatus is CheckpointLoadingStatus.Success -> {
+                val numberOfCommitsFromCheckpoint = checkpointLoadingStatus.checkpoint.checkpointState.numberOfCommits
+                val flow = flowCreator.createFlowFromCheckpoint(
+                    flowId,
+                    checkpointLoadingStatus.checkpoint,
+                    currentState.reloadCheckpointAfterSuspendCount,
+                    currentState.lock,
+                    firstRestore = false,
+                    progressTracker = currentState.flowLogic.progressTracker
+                ) ?: return
+                flow to numberOfCommitsFromCheckpoint
+            }
+            checkpointLoadingStatus is CheckpointLoadingStatus.NotFound && currentState.isAnyCheckpointPersisted -> {
+                logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
+                return
+            }
+            checkpointLoadingStatus is CheckpointLoadingStatus.CouldNotDeserialize -> return
+            else -> {
+                // Just flow initiation message
+                null to -1
+            }
+        }
+
         innerState.withLock {
             if (stopping) {
                 return
@@ -538,10 +545,10 @@ internal class SingleThreadedStateMachineManager(
                 sessionToFlow.remove(sessionId)
             }
             if (flow != null) {
-                injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
                 addAndStartFlow(flowId, flow)
             }
-            extractAndScheduleEventsForRetry(oldFlowLeftOver, currentState)
+
+            extractAndScheduleEventsForRetry(oldFlowLeftOver, currentState, numberOfCommitsFromCheckpoint)
         }
     }
 
@@ -567,13 +574,27 @@ internal class SingleThreadedStateMachineManager(
 
 
     /**
-     * Extract all the incomplete deduplication handlers as well as the [ExternalEvent] and [Event.Pause] events from this flows event queue
-     * [oldEventQueue]. Then schedule them (in the same order) for the new flow. This means that if a retried flow has a pause event
-     * scheduled then the retried flow will eventually pause. The new flow will not retry again if future retry events have been scheduled.
-     * When this method is called this flow must have been replaced by the new flow in [StateMachineInnerState.flows]. This method differs
-     * from [extractAndQueueExternalEventsForPausedFlow] where (only) [externalEvents] are extracted and scheduled straight away.
+     * Extract all the (unpersisted) incomplete deduplication handlers [currentState.pendingDeduplicationHandlers], as well as the
+     * [ExternalEvent] and [Event.Pause] events from this flows event queue [oldEventQueue]. Then schedule them (in the same order) for the
+     * new flow. This means that if a retried flow has a pause event scheduled then the retried flow will eventually pause. The new flow
+     * will not retry again if future retry events have been scheduled. When this method is called this flow must have been replaced by the
+     * new flow in [StateMachineInnerState.flows].
+     *
+     * This method differs from [extractAndQueueExternalEventsForPausedFlow] where (only) [externalEvents] are extracted and scheduled
+     * straight away.
+     *
+     * @param oldEventQueue The old event queue of the flow/fiber to unprocessed extract events from
+     *
+     * @param currentState The current state of the flow, used to extract processed events (held in [StateMachineState.pendingDeduplicationHandlers])
+     *
+     * @param numberOfCommitsFromCheckpoint The number of commits that the checkpoint loaded from the database has, to compare to the
+     * commits the flow has currently reached
      */
-    private fun extractAndScheduleEventsForRetry(oldEventQueue: Channel<Event>, currentState: StateMachineState) {
+    private fun extractAndScheduleEventsForRetry(
+        oldEventQueue: Channel<Event>,
+        currentState: StateMachineState,
+        numberOfCommitsFromCheckpoint: Int
+    ) {
         val flow = innerState.withLock {
             flows[currentState.flowLogic.runId]
         }
@@ -583,9 +604,13 @@ internal class SingleThreadedStateMachineManager(
             if (event is Event.Pause || event is Event.GeneratedByExternalEvent) events.add(event)
         } while (event != null)
 
-        for (externalEvent in currentState.pendingDeduplicationHandlers) {
-            deliverExternalEvent(externalEvent.externalCause)
+        // Only redeliver events if they were not persisted to the database
+        if (currentState.numberOfCommits >= numberOfCommitsFromCheckpoint) {
+            for (externalEvent in currentState.pendingDeduplicationHandlers) {
+                deliverExternalEvent(externalEvent.externalCause)
+            }
         }
+
         for (event in events) {
             if (event is Event.GeneratedByExternalEvent) {
                 deliverExternalEvent(event.deduplicationHandler.externalCause)
@@ -758,8 +783,7 @@ internal class SingleThreadedStateMachineManager(
     ): CordaFuture<FlowStateMachine<A>> {
         onCallingStartFlowInternal?.invoke()
 
-        val existingFlow = innerState.withLock { flows[flowId] }
-        val existingCheckpoint = if (existingFlow != null && existingFlow.fiber.transientState.isAnyCheckpointPersisted) {
+        val existingCheckpoint = if (innerState.withLock { flows[flowId] != null }) {
             // Load the flow's checkpoint
             // The checkpoint will be missing if the flow failed before persisting the original checkpoint
             // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
@@ -811,7 +835,13 @@ internal class SingleThreadedStateMachineManager(
                 decrementLiveFibers()
                 //Setting flowState = FlowState.Paused means we don't hold the frozen fiber in memory.
                 val checkpoint = currentState.checkpoint.copy(status = Checkpoint.FlowStatus.PAUSED, flowState = FlowState.Paused)
-                val pausedFlow = NonResidentFlow(id, checkpoint, flow.resultFuture)
+                val pausedFlow = NonResidentFlow(
+                    id,
+                    checkpoint,
+                    flow.resultFuture,
+                    hospitalized = currentState.checkpoint.status == Checkpoint.FlowStatus.HOSPITALIZED,
+                    progressTracker = currentState.flowLogic.progressTracker
+                )
                 val eventQueue = flow.fiber.transientValues.eventQueue
                 extractAndQueueExternalEventsForPausedFlow(eventQueue, currentState.pendingDeduplicationHandlers, pausedFlow)
                 pausedFlows.put(id, pausedFlow)
@@ -1098,4 +1128,19 @@ internal class SingleThreadedStateMachineManager(
         }
         return false
     }
+
+    override fun finishedFlowsWithClientIds(): Map<String, Boolean> {
+        return innerState.withLock {
+            clientIdsToFlowIds.asSequence()
+                .filter { (_, status) -> status is FlowWithClientIdStatus.Removed }
+                .map { (clientId, status) -> clientId to (status as FlowWithClientIdStatus.Removed).succeeded }
+                .toMap()
+        }
+    }
+
+    private sealed class CheckpointLoadingStatus {
+        class Success(val checkpoint: Checkpoint) : CheckpointLoadingStatus()
+        object NotFound : CheckpointLoadingStatus()
+        object CouldNotDeserialize : CheckpointLoadingStatus()
+    }
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt
index 832af2cc99..05887df101 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt
@@ -42,7 +42,7 @@ interface StateMachineManager {
      *
      * @return `Future` which completes when SMM is fully started
      */
-    fun start(tokenizableServices: List<Any>, startMode: StartMode = StartMode.ExcludingPaused) : CordaFuture<Unit>
+    fun start(tokenizableServices: List<Any>, startMode: StartMode = StartMode.ExcludingPaused) : () -> Unit
 
     /**
      * Stops the state machine manager gracefully, waiting until all but [allowedUnsuspendedFiberCount] flows reach the
@@ -120,6 +120,14 @@ interface StateMachineManager {
      * @return whether the mapping was removed.
      */
     fun removeClientId(clientId: String): Boolean
+
+    /**
+     * Returns all finished flows that were started with a client id.
+     *
+     * @return A [Map] containing client ids for finished flows, mapped to [true] if finished successfully,
+     * [false] if completed exceptionally.
+     */
+    fun finishedFlowsWithClientIds(): Map<String, Boolean>
 }
 
 // These must be idempotent! A later failure in the state transition may error the flow state, and a replay may call
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt
index c53cf81dc0..5293616cef 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt
@@ -49,6 +49,8 @@ import java.util.concurrent.Semaphore
  * @param senderUUID the identifier of the sending state machine or null if this flow is resumed from a checkpoint so that it does not participate in de-duplication high-water-marking.
  * @param reloadCheckpointAfterSuspendCount The number of times a flow has been reloaded (not retried). This is [null] when
  * [NodeConfiguration.reloadCheckpointAfterSuspendCount] is not enabled.
+ * @param numberOfCommits The number of times the flow's checkpoint has been successfully committed. This field is a var so that it can be
+ * updated after committing a database transaction that contained a checkpoint insert/update.
  * @param lock The flow's lock, used to prevent the flow performing a transition while being interacted with from external threads, and
  * vise-versa.
  */
@@ -67,6 +69,7 @@ data class StateMachineState(
     val isKilled: Boolean,
     val senderUUID: String?,
     val reloadCheckpointAfterSuspendCount: Int?,
+    var numberOfCommits: Int,
     val lock: Semaphore
 ) : KryoSerializable {
     override fun write(kryo: Kryo?, output: Output?) {
@@ -129,7 +132,9 @@ data class Checkpoint(
                         emptyMap(),
                         emptySet(),
                         listOf(topLevelSubFlow),
-                        numberOfSuspends = 0
+                        numberOfSuspends = 0,
+                        // We set this to 1 here to avoid an extra copy and increment in UnstartedFlowTransition.createInitialCheckpoint
+                        numberOfCommits = 1
                     ),
                     flowState = FlowState.Unstarted(flowStart, frozenFlowLogic),
                     errorState = ErrorState.Clean
@@ -229,21 +234,23 @@ data class Checkpoint(
 }
 
 /**
- * @param invocationContext the initiator of the flow.
- * @param ourIdentity the identity the flow is run as.
- * @param sessions map of source session ID to session state.
- * @param sessionsToBeClosed the sessions that have pending session end messages and need to be closed. This is available to avoid scanning all the sessions.
- * @param subFlowStack the stack of currently executing subflows.
- * @param numberOfSuspends the number of flow suspends due to IO API calls.
+ * @param invocationContext The initiator of the flow.
+ * @param ourIdentity The identity the flow is run as.
+ * @param sessions Map of source session ID to session state.
+ * @param sessionsToBeClosed The sessions that have pending session end messages and need to be closed. This is available to avoid scanning all the sessions.
+ * @param subFlowStack The stack of currently executing subflows.
+ * @param numberOfSuspends The number of flow suspends due to IO API calls.
+ * @param numberOfCommits The number of times this checkpoint has been persisted.
  */
 @CordaSerializable
 data class CheckpointState(
-        val invocationContext: InvocationContext,
-        val ourIdentity: Party,
-        val sessions: SessionMap, // This must preserve the insertion order!
-        val sessionsToBeClosed: Set<SessionId>,
-        val subFlowStack: List<SubFlow>,
-        val numberOfSuspends: Int
+    val invocationContext: InvocationContext,
+    val ourIdentity: Party,
+    val sessions: SessionMap, // This must preserve the insertion order!
+    val sessionsToBeClosed: Set<SessionId>,
+    val subFlowStack: List<SubFlow>,
+    val numberOfSuspends: Int,
+    val numberOfCommits: Int
 )
 
 /**
@@ -417,9 +424,13 @@ sealed class SubFlowVersion {
     data class CorDappFlow(override val platformVersion: Int, val corDappName: String, val corDappHash: SecureHash) : SubFlowVersion()
 }
 
-sealed class FlowWithClientIdStatus {
-    data class Active(val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>) : FlowWithClientIdStatus()
-    data class Removed(val flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus()
+sealed class FlowWithClientIdStatus(val flowId: StateMachineRunId) {
+    class Active(
+        flowId: StateMachineRunId,
+        val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>
+    ) : FlowWithClientIdStatus(flowId)
+
+    class Removed(flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus(flowId)
 }
 
 data class FlowResultMetadata(
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt
index 4f4e6cd51e..39044de821 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/ErrorFlowTransition.kt
@@ -49,7 +49,7 @@ class ErrorFlowTransition(
                         checkpointState = startingState.checkpoint.checkpointState.copy(sessions = newSessions)
                 )
                 currentState = currentState.copy(checkpoint = newCheckpoint)
-                actions.add(Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID))
+                actions += Action.PropagateErrors(errorMessages, initiatedSessions, startingState.senderUUID)
             }
 
             // If we're errored but not propagating keep processing events.
@@ -59,32 +59,38 @@ class ErrorFlowTransition(
 
             // If we haven't been removed yet remove the flow.
             if (!currentState.isRemoved) {
-                val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.FAILED)
+                val newCheckpoint = startingState.checkpoint.copy(
+                    status = Checkpoint.FlowStatus.FAILED,
+                    flowState = FlowState.Finished,
+                    checkpointState = startingState.checkpoint.checkpointState.copy(
+                        numberOfCommits = startingState.checkpoint.checkpointState.numberOfCommits + 1
+                    )
+                )
+                currentState = currentState.copy(
+                    checkpoint = newCheckpoint,
+                    pendingDeduplicationHandlers = emptyList(),
+                    isRemoved = true
+                )
 
                 val removeOrPersistCheckpoint = if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
                     Action.RemoveCheckpoint(context.id)
                 } else {
-                    Action.PersistCheckpoint(context.id, newCheckpoint.copy(flowState = FlowState.Finished), isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
+                    Action.PersistCheckpoint(
+                        context.id,
+                        newCheckpoint,
+                        isCheckpointUpdate = currentState.isAnyCheckpointPersisted
+                    )
                 }
 
-                actions.addAll(arrayOf(
-                        Action.CreateTransaction,
-                        removeOrPersistCheckpoint,
-                        Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
-                        Action.ReleaseSoftLocks(context.id.uuid),
-                        Action.CommitTransaction,
-                        Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
-                        Action.RemoveSessionBindings(currentState.checkpoint.checkpointState.sessions.keys)
-                ))
+                actions += Action.CreateTransaction
+                actions += removeOrPersistCheckpoint
+                actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
+                actions += Action.ReleaseSoftLocks(context.id.uuid)
+                actions += Action.CommitTransaction(currentState)
+                actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
+                actions += Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys)
+                actions += Action.RemoveFlow(context.id, FlowRemovalReason.ErrorFinish(allErrors), currentState)
 
-                currentState = currentState.copy(
-                        checkpoint = newCheckpoint,
-                        pendingDeduplicationHandlers = emptyList(),
-                        isRemoved = true
-                )
-
-                val removalReason = FlowRemovalReason.ErrorFinish(allErrors)
-                actions.add(Action.RemoveFlow(context.id, removalReason, currentState))
                 FlowContinuation.Abort
             } else {
                 // Otherwise keep processing events. This branch happens when there are some outstanding initiating
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt
index bc059668d3..f80733f9c5 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/KilledFlowTransition.kt
@@ -29,39 +29,31 @@ class KilledFlowTransition(
                 startingState.checkpoint.checkpointState.sessions,
                 errorMessages
             )
-            val newCheckpoint = startingState.checkpoint.setSessions(sessions = newSessions)
-            currentState = currentState.copy(checkpoint = newCheckpoint)
-            actions.add(
-                Action.PropagateErrors(
-                    errorMessages,
-                    initiatedSessions,
-                    startingState.senderUUID
-                )
-            )
-
-            if (!startingState.isFlowResumed) {
-                actions.add(Action.CreateTransaction)
-            }
-            // The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow]
-            if (startingState.isAnyCheckpointPersisted) {
-                actions.add(Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true))
-            }
-            actions.addAll(
-                arrayOf(
-                    Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
-                    Action.ReleaseSoftLocks(context.id.uuid),
-                    Action.CommitTransaction,
-                    Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
-                    Action.RemoveSessionBindings(currentState.checkpoint.checkpointState.sessions.keys)
-                )
-            )
-
             currentState = currentState.copy(
+                checkpoint = startingState.checkpoint.setSessions(sessions = newSessions),
                 pendingDeduplicationHandlers = emptyList(),
                 isRemoved = true
             )
+            actions += Action.PropagateErrors(
+                errorMessages,
+                initiatedSessions,
+                startingState.senderUUID
+            )
+
+            if (!startingState.isFlowResumed) {
+                actions += Action.CreateTransaction
+            }
+            // The checkpoint and soft locks are also removed directly in [StateMachineManager.killFlow]
+            if (startingState.isAnyCheckpointPersisted) {
+                actions += Action.RemoveCheckpoint(context.id, mayHavePersistentResults = true)
+            }
+            actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
+            actions += Action.ReleaseSoftLocks(context.id.uuid)
+            actions += Action.CommitTransaction(currentState)
+            actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
+            actions += Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys)
+            actions += Action.RemoveFlow(context.id, createKilledRemovalReason(killedFlowError), currentState)
 
-            actions.add(Action.RemoveFlow(context.id, createKilledRemovalReason(killedFlowError), currentState))
             FlowContinuation.Abort
         }
     }
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt
index 5ac73c799e..85af30f898 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt
@@ -121,7 +121,7 @@ class StartedFlowTransition(
                 actions = listOf(
                     Action.CreateTransaction,
                     Action.TrackTransaction(flowIORequest.hash, state),
-                    Action.CommitTransaction
+                    Action.CommitTransaction(state)
                 )
             )
         } else {
@@ -389,22 +389,20 @@ class StartedFlowTransition(
     }
 
     private fun convertErrorMessageToException(errorMessage: ErrorSessionMessage, peer: Party): Throwable {
-        val exception: Throwable = if (errorMessage.flowException == null) {
-            UnexpectedFlowEndException("Counter-flow errored", cause = null, originalErrorId = errorMessage.errorId)
-        } else {
-            errorMessage.flowException.originalErrorId = errorMessage.errorId
-            errorMessage.flowException
-        }
-        when (exception) {
-            // reflection used to access private field
-            is UnexpectedFlowEndException -> DeclaredField<Party?>(
+        return if (errorMessage.flowException == null) {
+            UnexpectedFlowEndException("Counter-flow errored", cause = null, originalErrorId = errorMessage.errorId).apply {
+                DeclaredField<Party?>(
                     UnexpectedFlowEndException::class.java,
                     "peer",
-                    exception
-            ).value = peer
-            is FlowException -> DeclaredField<Party?>(FlowException::class.java, "peer", exception).value = peer
+                    this
+                ).value = peer
+            }
+        } else {
+            errorMessage.flowException.apply {
+                originalErrorId = errorMessage.errorId
+                DeclaredField<Party?>(FlowException::class.java, "peer", errorMessage.flowException).value = peer
+            }
         }
-        return exception
     }
 
     private fun collectUncloseableSessions(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
index a7e408503f..9eba35f0db 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
@@ -189,15 +189,16 @@ class TopLevelTransition(
 
     private fun suspendTransition(event: Event.Suspend): TransitionResult {
         return builder {
-            val newCheckpoint = currentState.checkpoint.run {
-                val newCheckpointState = if (checkpointState.invocationContext.arguments!!.isNotEmpty()) {
-                    checkpointState.copy(
-                        invocationContext = checkpointState.invocationContext.copy(arguments = emptyList()),
-                        numberOfSuspends = checkpointState.numberOfSuspends + 1
-                    )
-                } else {
-                    checkpointState.copy(numberOfSuspends = checkpointState.numberOfSuspends + 1)
-                }
+            val newCheckpoint = startingState.checkpoint.run {
+                val newCheckpointState = checkpointState.copy(
+                   invocationContext = if (checkpointState.invocationContext.arguments!!.isNotEmpty()) {
+                       checkpointState.invocationContext.copy(arguments = emptyList())
+                   } else {
+                       checkpointState.invocationContext
+                   },
+                   numberOfSuspends = checkpointState.numberOfSuspends + 1,
+                   numberOfCommits = checkpointState.numberOfCommits + 1
+                )
                 copy(
                     flowState = FlowState.Started(event.ioRequest, event.fiber),
                     checkpointState = newCheckpointState,
@@ -206,29 +207,26 @@ class TopLevelTransition(
                 )
             }
             if (event.maySkipCheckpoint) {
-                actions.addAll(arrayOf(
-                        Action.CommitTransaction,
-                        Action.ScheduleEvent(Event.DoRemainingWork)
-                ))
-                currentState = currentState.copy(
+                currentState = startingState.copy(
                     checkpoint = newCheckpoint,
                     isFlowResumed = false
                 )
+                actions += Action.CommitTransaction(currentState)
+                actions += Action.ScheduleEvent(Event.DoRemainingWork)
             } else {
-                actions.addAll(arrayOf(
-                        Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
-                        Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
-                        Action.CommitTransaction,
-                        Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
-                        Action.ScheduleEvent(Event.DoRemainingWork)
-                ))
-                currentState = currentState.copy(
+                currentState = startingState.copy(
                     checkpoint = newCheckpoint,
                     pendingDeduplicationHandlers = emptyList(),
                     isFlowResumed = false,
                     isAnyCheckpointPersisted = true
                 )
+                actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = startingState.isAnyCheckpointPersisted)
+                actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
+                actions += Action.CommitTransaction(currentState)
+                actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
+                actions += Action.ScheduleEvent(Event.DoRemainingWork)
             }
+
             FlowContinuation.ProcessEvents
         }
     }
@@ -238,44 +236,40 @@ class TopLevelTransition(
             val checkpoint = currentState.checkpoint
             when (checkpoint.errorState) {
                 ErrorState.Clean -> {
-                    val pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers
-                    currentState = currentState.copy(
-                            checkpoint = checkpoint.copy(
-                                checkpointState = checkpoint.checkpointState.copy(
-                                        numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1
-                                ),
-                                flowState = FlowState.Finished,
-                                result = event.returnValue,
-                                status = Checkpoint.FlowStatus.COMPLETED
+                    currentState = startingState.copy(
+                        checkpoint = checkpoint.copy(
+                            checkpointState = checkpoint.checkpointState.copy(
+                                numberOfSuspends = checkpoint.checkpointState.numberOfSuspends + 1,
+                                numberOfCommits = checkpoint.checkpointState.numberOfCommits + 1
                             ),
-                            pendingDeduplicationHandlers = emptyList(),
-                            isFlowResumed = false,
-                            isRemoved = true
+                            flowState = FlowState.Finished,
+                            result = event.returnValue,
+                            status = Checkpoint.FlowStatus.COMPLETED
+                        ),
+                        pendingDeduplicationHandlers = emptyList(),
+                        isFlowResumed = false,
+                        isRemoved = true
                     )
 
-                    if (currentState.isAnyCheckpointPersisted) {
-                        if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
-                            actions.add(Action.RemoveCheckpoint(context.id))
-                        } else {
-                            actions.add(
-                                Action.PersistCheckpoint(
-                                    context.id,
-                                    currentState.checkpoint,
-                                    isCheckpointUpdate = currentState.isAnyCheckpointPersisted
-                                )
-                            )
+                    if (startingState.checkpoint.checkpointState.invocationContext.clientId == null) {
+                        if (startingState.isAnyCheckpointPersisted) {
+                            actions += Action.RemoveCheckpoint(context.id)
                         }
+                    } else {
+                        actions += Action.PersistCheckpoint(
+                            context.id,
+                            currentState.checkpoint,
+                            isCheckpointUpdate = startingState.isAnyCheckpointPersisted
+                        )
                     }
 
-                    val allSourceSessionIds = currentState.checkpoint.checkpointState.sessions.keys
-                    actions.addAll(arrayOf(
-                        Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
-                            Action.ReleaseSoftLocks(event.softLocksId),
-                            Action.CommitTransaction,
-                            Action.AcknowledgeMessages(pendingDeduplicationHandlers),
-                            Action.RemoveSessionBindings(allSourceSessionIds),
-                            Action.RemoveFlow(context.id, FlowRemovalReason.OrderlyFinish(event.returnValue), currentState)
-                    ))
+                    actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
+                    actions += Action.ReleaseSoftLocks(event.softLocksId)
+                    actions += Action.CommitTransaction(currentState)
+                    actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
+                    actions += Action.RemoveSessionBindings(startingState.checkpoint.checkpointState.sessions.keys)
+                    actions += Action.RemoveFlow(context.id, FlowRemovalReason.OrderlyFinish(event.returnValue), currentState)
+
                     sendEndMessages()
                     // Resume to end fiber
                     FlowContinuation.Resume(null)
@@ -358,17 +352,22 @@ class TopLevelTransition(
 
     private fun overnightObservationTransition(): TransitionResult {
         return builder {
-            val flowStartEvents = currentState.pendingDeduplicationHandlers.filter(::isFlowStartEvent)
+            val flowStartEvents = startingState.pendingDeduplicationHandlers.filter(::isFlowStartEvent)
             val newCheckpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED)
+            currentState = startingState.copy(
+                checkpoint = startingState.checkpoint.copy(
+                    status = Checkpoint.FlowStatus.HOSPITALIZED,
+                    checkpointState = startingState.checkpoint.checkpointState.copy(
+                        numberOfCommits = startingState.checkpoint.checkpointState.numberOfCommits + 1
+                    )
+                ),
+                pendingDeduplicationHandlers = startingState.pendingDeduplicationHandlers - flowStartEvents
+            )
             actions += Action.CreateTransaction
             actions += Action.PersistDeduplicationFacts(flowStartEvents)
-            actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted)
-            actions += Action.CommitTransaction
+            actions += Action.PersistCheckpoint(context.id, newCheckpoint, isCheckpointUpdate = startingState.isAnyCheckpointPersisted)
+            actions += Action.CommitTransaction(currentState)
             actions += Action.AcknowledgeMessages(flowStartEvents)
-            currentState = currentState.copy(
-                checkpoint = startingState.checkpoint.copy(status = Checkpoint.FlowStatus.HOSPITALIZED),
-                pendingDeduplicationHandlers = currentState.pendingDeduplicationHandlers - flowStartEvents
-            )
             FlowContinuation.ProcessEvents
         }
     }
@@ -394,15 +393,11 @@ class TopLevelTransition(
     private fun pausedFlowTransition(): TransitionResult {
         return builder {
             if (!startingState.isFlowResumed) {
-                actions.add(Action.CreateTransaction)
+                actions += Action.CreateTransaction
             }
-            actions.addAll(
-                arrayOf(
-                    Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.PAUSED),
-                    Action.CommitTransaction,
-                    Action.MoveFlowToPaused(currentState)
-                )
-            )
+            actions += Action.UpdateFlowStatus(context.id, Checkpoint.FlowStatus.PAUSED)
+            actions += Action.CommitTransaction(currentState)
+            actions += Action.MoveFlowToPaused(currentState)
             FlowContinuation.Abort
         }
     }
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt
index 7361943cde..b250b42232 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/UnstartedFlowTransition.kt
@@ -27,14 +27,14 @@ class UnstartedFlowTransition(
                 createInitialCheckpoint()
             }
 
-            actions.add(Action.SignalFlowHasStarted(context.id))
+            actions += Action.SignalFlowHasStarted(context.id)
 
             if (unstarted.flowStart is FlowStart.Initiated) {
                 initialiseInitiatedSession(unstarted.flowStart)
             }
 
             currentState = currentState.copy(isFlowResumed = true)
-            actions.add(Action.CreateTransaction)
+            actions += Action.CreateTransaction
             FlowContinuation.Resume(null)
         }
     }
@@ -73,16 +73,14 @@ class UnstartedFlowTransition(
 
     // Create initial checkpoint and acknowledge triggering messages.
     private fun TransitionBuilder.createInitialCheckpoint() {
-        actions.addAll(arrayOf(
-                Action.CreateTransaction,
-                Action.PersistCheckpoint(context.id, currentState.checkpoint, isCheckpointUpdate = currentState.isAnyCheckpointPersisted),
-                Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
-                Action.CommitTransaction,
-                Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers)
-        ))
-        currentState = currentState.copy(
-                pendingDeduplicationHandlers = emptyList(),
-                isAnyCheckpointPersisted = true
+        currentState = startingState.copy(
+            pendingDeduplicationHandlers = emptyList(),
+            isAnyCheckpointPersisted = true
         )
+        actions += Action.CreateTransaction
+        actions += Action.PersistCheckpoint(context.id, startingState.checkpoint, isCheckpointUpdate = startingState.isAnyCheckpointPersisted)
+        actions += Action.PersistDeduplicationFacts(startingState.pendingDeduplicationHandlers)
+        actions += Action.CommitTransaction(currentState)
+        actions += Action.AcknowledgeMessages(startingState.pendingDeduplicationHandlers)
     }
 }
diff --git a/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt b/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt
deleted file mode 100644
index 99aeebc6d8..0000000000
--- a/node/src/main/kotlin/net/corda/node/utilities/StateMachineManagerUtils.kt
+++ /dev/null
@@ -1,66 +0,0 @@
-package net.corda.node.utilities
-
-import net.corda.core.flows.FlowLogic
-import net.corda.core.utilities.ProgressTracker
-import net.corda.node.services.statemachine.StateMachineManagerInternal
-import org.apache.commons.lang3.reflect.FieldUtils
-import java.lang.reflect.Field
-
-/**
- * The flow de-serialized from the checkpoint will contain a new instance of the progress tracker, which means that
- * any existing flow observers would be lost. We need to replace it with the old progress tracker to ensure progress
- * updates are correctly sent out after the flow is retried.
- *
- * If the new tracker contains any child trackers from sub-flows, we need to attach those to the old tracker as well.
- */
-//TODO: instead of replacing the progress tracker after constructing the flow logic, we should inject it during fiber deserialization
-internal fun StateMachineManagerInternal.injectOldProgressTracker(oldTracker: ProgressTracker?, newFlowLogic: FlowLogic<*>) {
-    if (oldTracker != null) {
-        val newTracker = newFlowLogic.progressTracker
-        if (newTracker != null) {
-            attachNewChildren(oldTracker, newTracker)
-            replaceTracker(newFlowLogic, oldTracker)
-        }
-    }
-}
-
-private fun attachNewChildren(oldTracker: ProgressTracker, newTracker: ProgressTracker) {
-    oldTracker.currentStep = newTracker.currentStep
-    oldTracker.steps.forEachIndexed { index, step ->
-        val newStep = newTracker.steps[index]
-        val newChildTracker = newTracker.getChildProgressTracker(newStep)
-        newChildTracker?.let { child ->
-            oldTracker.setChildProgressTracker(step, child)
-        }
-    }
-    resubscribeToChildren(oldTracker)
-}
-
-/**
- * Re-subscribes to child tracker observables. When a nested progress tracker is deserialized from a checkpoint,
- * it retains the child links, but does not automatically re-subscribe to the child changes.
- */
-private fun resubscribeToChildren(tracker: ProgressTracker) {
-    tracker.steps.forEach {
-        val childTracker = tracker.getChildProgressTracker(it)
-        if (childTracker != null) {
-            tracker.setChildProgressTracker(it, childTracker)
-            resubscribeToChildren(childTracker)
-        }
-    }
-}
-
-/** Replaces the deserialized [ProgressTracker] in the [newFlowLogic] with the old one to retain old subscribers. */
-private fun replaceTracker(newFlowLogic: FlowLogic<*>, oldProgressTracker: ProgressTracker?) {
-    val field = getProgressTrackerField(newFlowLogic)
-    field?.apply {
-        isAccessible = true
-        set(newFlowLogic, oldProgressTracker)
-    }
-}
-
-private fun getProgressTrackerField(newFlowLogic: FlowLogic<*>): Field? {
-    // The progress tracker field may have been overridden in an abstract superclass, so we have to traverse up
-    // the hierarchy.
-    return FieldUtils.getAllFieldsList(newFlowLogic::class.java).find { it.name == "progressTracker" }
-}
diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt
index 96f5535a1c..3141d97972 100644
--- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt
@@ -47,6 +47,7 @@ import java.time.Clock
 import java.util.*
 import kotlin.streams.toList
 import kotlin.test.assertEquals
+import kotlin.test.assertFalse
 import kotlin.test.assertTrue
 
 internal fun CheckpointStorage.checkpoints(): List<Checkpoint.Serialized> {
@@ -803,6 +804,40 @@ class DBCheckpointStorageTests {
         assertTrue(Checkpoint.FlowStatus.FAILED in finishedStatuses)
     }
 
+    @Test(timeout = 300_000)
+    fun `'getPausedCheckpoints' fetches paused flows with and without database exceptions`() {
+        val (_, checkpoint) = newCheckpoint(1)
+        val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
+        val checkpointState = checkpoint.serializeCheckpointState()
+        val hospitalizedPaused = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
+        val cleanPaused = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
+        database.transaction {
+            checkpointStorage.addCheckpoint(hospitalizedPaused.id, hospitalizedPaused.checkpoint, serializedFlowState, checkpointState)
+            checkpointStorage.addCheckpoint(cleanPaused.id, cleanPaused.checkpoint, serializedFlowState, checkpointState)
+        }
+        database.transaction {
+            checkpointStorage.updateCheckpoint(
+                    hospitalizedPaused.id,
+                    hospitalizedPaused.checkpoint.addError(IllegalStateException(), Checkpoint.FlowStatus.HOSPITALIZED),
+                    serializedFlowState,
+                    checkpointState
+            )
+        }
+        database.transaction {
+            checkpointStorage.updateStatus(hospitalizedPaused.id, Checkpoint.FlowStatus.PAUSED)
+            checkpointStorage.updateStatus(cleanPaused.id, Checkpoint.FlowStatus.PAUSED)
+        }
+        database.transaction {
+            val checkpoints = checkpointStorage.getPausedCheckpoints().toList()
+            val dbHospitalizedPaused = checkpoints.single { it.first == hospitalizedPaused.id }
+            assertEquals(hospitalizedPaused.id, dbHospitalizedPaused.first)
+            assertTrue(dbHospitalizedPaused.third)
+            val dbCleanPaused = checkpoints.single { it.first == cleanPaused.id }
+            assertEquals(cleanPaused.id, dbCleanPaused.first)
+            assertFalse(dbCleanPaused.third)
+        }
+    }
+
     data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint)
 
     private fun changeStatus(oldCheckpoint: Checkpoint, status: Checkpoint.FlowStatus): IdAndCheckpoint {
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt
index c945b55aa4..0665fea236 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt
@@ -4,8 +4,10 @@ import co.paralleluniverse.fibers.Suspendable
 import co.paralleluniverse.strands.concurrent.Semaphore
 import net.corda.core.CordaRuntimeException
 import net.corda.core.flows.FlowLogic
+import net.corda.core.flows.KilledFlowException
 import net.corda.core.internal.FlowIORequest
 import net.corda.core.internal.FlowStateMachineHandle
+import net.corda.core.internal.concurrent.transpose
 import net.corda.core.utilities.getOrThrow
 import net.corda.core.utilities.seconds
 import net.corda.node.services.persistence.DBCheckpointStorage
@@ -18,18 +20,17 @@ import net.corda.testing.node.internal.InternalMockNodeParameters
 import net.corda.testing.node.internal.TestStartedNode
 import net.corda.testing.node.internal.startFlow
 import net.corda.testing.node.internal.startFlowWithClientId
-import net.corda.core.flows.KilledFlowException
 import org.assertj.core.api.Assertions.assertThatExceptionOfType
 import org.junit.After
 import org.junit.Assert
 import org.junit.Before
 import org.junit.Test
 import rx.Observable
-import java.lang.IllegalArgumentException
 import java.sql.SQLTransientConnectionException
-import java.util.UUID
+import java.util.*
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
-import kotlin.IllegalStateException
 import kotlin.concurrent.thread
 import kotlin.test.assertEquals
 import kotlin.test.assertFailsWith
@@ -457,59 +458,6 @@ class FlowClientIdTests {
         Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds))
     }
 
-    // the below test has to be made available only in ENT
-//    @Test(timeout=300_000)
-//    fun `on node restart -paused- flows with client id are hook-able`() {
-//        val clientId = UUID.randomUUID().toString()
-//        var noSecondFlowWasSpawned = 0
-//        var firstRun = true
-//        var firstFiber: Fiber<out Any?>? = null
-//        val flowIsRunning = Semaphore(0)
-//        val waitUntilFlowIsRunning = Semaphore(0)
-//
-//        ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
-//            @Suspendable
-//            override fun call() {
-//                if (firstRun) {
-//                    firstFiber = Fiber.currentFiber()
-//                    firstRun = false
-//                }
-//
-//                waitUntilFlowIsRunning.release()
-//                try {
-//                    flowIsRunning.acquire() // make flow wait here to impersonate a running flow
-//                } catch (e: InterruptedException) {
-//                    flowIsRunning.release()
-//                    throw e
-//                }
-//
-//                noSecondFlowWasSpawned++
-//            }
-//        }
-//
-//        val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
-//        waitUntilFlowIsRunning.acquire()
-//        aliceNode.internals.acceptableLiveFiberCountOnStop = 1
-//        // Pause the flow on node restart
-//        val aliceNode = mockNet.restartNode(aliceNode,
-//            InternalMockNodeParameters(
-//                configOverrides = {
-//                    doReturn(StateMachineManager.StartMode.Safe).whenever(it).smmStartMode
-//                }
-//            ))
-//        // Blow up the first fiber running our flow as it is leaked here, on normal node shutdown that fiber should be gone
-//        firstFiber!!.interrupt()
-//
-//        // Re-hook a paused flow
-//        val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
-//
-//        Assert.assertEquals(flowHandle0.id, flowHandle1.id)
-//        Assert.assertEquals(clientId, flowHandle1.clientId)
-//        aliceNode.smm.unPauseFlow(flowHandle1.id)
-//        Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds))
-//        Assert.assertEquals(1, noSecondFlowWasSpawned)
-//    }
-
     @Test(timeout = 300_000)
     fun `on node start -completed- flows with client id are hook-able`() {
         val clientId = UUID.randomUUID().toString()
@@ -556,44 +504,6 @@ class FlowClientIdTests {
         assertEquals(1, counter)
     }
 
-    // the below test has to be made available only in ENT
-//    @Test(timeout=300_000)
-//    fun `On 'startFlowInternal' throwing, subsequent request with same client hits the time window in which the previous request was about to remove the client id mapping`() {
-//        val clientId = UUID.randomUUID().toString()
-//        var firstRequest = true
-//        SingleThreadedStateMachineManager.onCallingStartFlowInternal = {
-//            if (firstRequest) {
-//                firstRequest = false
-//                throw IllegalStateException("Yet another one")
-//            }
-//        }
-//
-//        val wait = Semaphore(0)
-//        val waitForFirstRequest = Semaphore(0)
-//        SingleThreadedStateMachineManager.onStartFlowInternalThrewAndAboutToRemove = {
-//            waitForFirstRequest.release()
-//            wait.acquire()
-//            Thread.sleep(10000)
-//        }
-//        var counter = 0
-//        ResultFlow.hook = { counter++ }
-//
-//        thread {
-//            assertFailsWith<IllegalStateException> {
-//                aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
-//            }
-//        }
-//
-//        waitForFirstRequest.acquire()
-//        wait.release()
-//        assertFailsWith<IllegalStateException> {
-//            // the subsequent request will not hang on a never ending future, because the previous request ,upon failing, will also complete the future exceptionally
-//            aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
-//        }
-//
-//        assertEquals(0, counter)
-//    }
-
     @Test(timeout = 300_000)
     fun `if flow fails to serialize its result then the result gets converted to an exception result`() {
         val clientId = UUID.randomUUID().toString()
@@ -775,17 +685,59 @@ class FlowClientIdTests {
             reattachedFlowHandle?.resultFuture?.getOrThrow()
         }.withMessage("java.lang.IllegalStateException: Bla bla bla")
     }
+
+    @Test(timeout = 300_000)
+    fun `finishedFlowsWithClientIds returns completed flows with client ids`() {
+        val clientIds = listOf("a", "b", "c", "d", "e")
+        val lock = CountDownLatch(1)
+        ResultFlow.hook = { clientId ->
+            if (clientId == clientIds[3]) {
+                throw java.lang.IllegalStateException("This didn't go so well")
+            }
+            if (clientId == clientIds[4]) {
+                lock.await(30, TimeUnit.SECONDS)
+            }
+        }
+        val flows = listOf(
+            aliceNode.services.startFlowWithClientId(clientIds[0], ResultFlow(10)),
+            aliceNode.services.startFlowWithClientId(clientIds[1], ResultFlow(10)),
+            aliceNode.services.startFlowWithClientId(clientIds[2], ResultFlow(10))
+        )
+        val failedFlow = aliceNode.services.startFlowWithClientId(clientIds[3], ResultFlow(10))
+        val runningFlow = aliceNode.services.startFlowWithClientId(clientIds[4], ResultFlow(10))
+        flows.map { it.resultFuture }.transpose().getOrThrow(30.seconds)
+        assertFailsWith<java.lang.IllegalStateException> { failedFlow.resultFuture.getOrThrow(20.seconds) }
+
+        val finishedFlows = aliceNode.smm.finishedFlowsWithClientIds()
+
+        lock.countDown()
+
+        assertEquals(4, finishedFlows.size)
+        assertEquals(3, finishedFlows.filterValues { it }.size)
+        assertEquals(1, finishedFlows.filterValues { !it }.size)
+        assertEquals(setOf("a", "b", "c", "d"), finishedFlows.map { it.key }.toSet())
+        assertTrue(runningFlow.clientId !in finishedFlows.keys)
+
+        assertEquals(
+            listOf(10, 10, 10),
+            finishedFlows.filterValues { it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.get() }
+        )
+        // [CordaRunTimeException] returned because [IllegalStateException] is not serializable
+        assertFailsWith<CordaRuntimeException> {
+            finishedFlows.filterValues { !it }.map { aliceNode.smm.reattachFlowWithClientId<Int>(it.key)?.resultFuture?.getOrThrow() }
+        }
+    }
 }
 
 internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
     companion object {
-        var hook: (() -> Unit)? = null
+        var hook: ((String?) -> Unit)? = null
         var suspendableHook: FlowLogic<Unit>? = null
     }
 
     @Suspendable
     override fun call(): A {
-        hook?.invoke()
+        hook?.invoke(stateMachine.clientId)
         suspendableHook?.let { subFlow(it) }
         return result
     }
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
index d9dc853115..6a50dcdb42 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
@@ -147,6 +147,8 @@ class FlowFrameworkTests {
 
         SuspendingFlow.hookBeforeCheckpoint = {}
         SuspendingFlow.hookAfterCheckpoint = {}
+        StaffedFlowHospital.onFlowResuscitated.clear()
+        StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
     }
 
     @Test(timeout=300_000)
@@ -311,7 +313,7 @@ class FlowFrameworkTests {
             .isThrownBy { receivingFiber.resultFuture.getOrThrow() }
             .withMessage("Nothing useful")
             .withStackTraceContaining(ReceiveFlow::class.java.name)  // Make sure the stack trace is that of the receiving flow
-            .withStackTraceContaining("Received counter-flow exception from peer")
+            .withStackTraceContaining("Received counter-flow exception from peer ${bob.name}")
         bobNode.database.transaction {
             assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
         }
@@ -634,6 +636,7 @@ class FlowFrameworkTests {
             Notification.createOnNext(ReceiveFlow.START_STEP),
             Notification.createOnError(receiveFlowException)
         )
+        assertThat(receiveFlowException).hasStackTraceContaining("Received unexpected counter-flow exception from peer ${bob.name}")
 
         assertSessionTransfers(
             aliceNode sent sessionInit(ReceiveFlow::class) to bobNode,
@@ -692,9 +695,6 @@ class FlowFrameworkTests {
                 firstExecution = false
                 throw HospitalizeFlowException()
             } else {
-                // the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
-                // and doesn't commit before flow starts running.
-                Thread.sleep(3000)
                 dbCheckpointStatusBeforeSuspension = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
                 currentDBSession().clear() // clear session as Hibernate with fails with 'org.hibernate.NonUniqueObjectException' once it tries to save a DBFlowCheckpoint upon checkpoint
                 inMemoryCheckpointStatusBeforeSuspension = flowFiber.transientState.checkpoint.status
@@ -743,9 +743,6 @@ class FlowFrameworkTests {
                 firstExecution = false
                 throw HospitalizeFlowException()
             } else {
-                // the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
-                // and doesn't commit before flow starts running.
-                Thread.sleep(3000)
                 dbCheckpointStatus = aliceNode.internals.checkpointStorage.getCheckpoints().toList().single().second.status
                 inMemoryCheckpointStatus = flowFiber.transientState.checkpoint.status
 
@@ -860,9 +857,6 @@ class FlowFrameworkTests {
         var secondRun = false
         SuspendingFlow.hookBeforeCheckpoint = {
             if(secondRun) {
-                // the below sleep should be removed once we fix : The thread's transaction executing StateMachineManager.start takes long
-                // and doesn't commit before flow starts running.
-                Thread.sleep(3000)
                 aliceNode.database.transaction {
                     checkpointStatusAfterRestart = findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().single().status
                     dbExceptionAfterRestart = findRecordsFromDatabase()
@@ -890,7 +884,6 @@ class FlowFrameworkTests {
         Thread.sleep(3000) // wait until flow saves overnight observation state in database
         aliceNode = mockNet.restartNode(aliceNode)
 
-
         waitUntilHospitalized.acquire()
         Thread.sleep(3000) // wait until flow saves overnight observation state in database
         assertEquals(2, counter)
@@ -1272,4 +1265,4 @@ internal class SuspendingFlow : FlowLogic<Unit>() {
         stateMachine.suspend(FlowIORequest.ForceCheckpoint, maySkipCheckpoint = false) // flow checkpoints => checkpoint is in DB
         stateMachine.hookAfterCheckpoint()
     }
-}
\ No newline at end of file
+}
diff --git a/samples/irs-demo/build.gradle b/samples/irs-demo/build.gradle
index 7628bf32bc..85a55fd6a3 100644
--- a/samples/irs-demo/build.gradle
+++ b/samples/irs-demo/build.gradle
@@ -50,6 +50,9 @@ repositories {
     }
 }
 
+evaluationDependsOn("cordapp")
+evaluationDependsOn("web")
+
 dependencies {
     compile "commons-io:commons-io:$commons_io_version"
     compile project(":samples:irs-demo:web")
@@ -84,9 +87,6 @@ task slowIntegrationTest(type: Test, dependsOn: []) {
     classpath = sourceSets.slowIntegrationTest.runtimeClasspath
 }
 
-evaluationDependsOn("cordapp")
-evaluationDependsOn("web")
-
 task systemTest(type: Test, dependsOn: ["cordapp:prepareDockerNodes", "web:generateDockerCompose"]) {
     testClassesDirs = sourceSets.systemTest.output.classesDirs
     classpath = sourceSets.systemTest.runtimeClasspath
diff --git a/samples/irs-demo/web/build.gradle b/samples/irs-demo/web/build.gradle
index c7f130691c..8064af4347 100644
--- a/samples/irs-demo/web/build.gradle
+++ b/samples/irs-demo/web/build.gradle
@@ -92,6 +92,7 @@ dependencies {
 jar {
     from sourceSets.main.output
     dependsOn clientInstall
+    archiveClassifier = 'thin'
 }
 
 def docker_dir = file("$project.buildDir/docker")
diff --git a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/ObjectBuilder.kt b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/ObjectBuilder.kt
index 75f1242a61..ffc2fae5b5 100644
--- a/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/ObjectBuilder.kt
+++ b/serialization/src/main/kotlin/net/corda/serialization/internal/amqp/ObjectBuilder.kt
@@ -79,7 +79,7 @@ interface ObjectBuilder {
          * Create an [ObjectBuilderProvider] for the given [LocalTypeInformation.Composable].
          */
         fun makeProvider(typeInformation: LocalTypeInformation.Composable): ObjectBuilderProvider =
-                makeProvider(typeInformation.typeIdentifier, typeInformation.constructor, typeInformation.properties)
+                makeProvider(typeInformation.typeIdentifier, typeInformation.constructor, typeInformation.properties, false)
 
         /**
          * Create an [ObjectBuilderProvider] for the given type, constructor and set of properties.
@@ -90,15 +90,17 @@ interface ObjectBuilder {
         fun makeProvider(
                 typeIdentifier: TypeIdentifier,
                 constructor: LocalConstructorInformation,
-                properties: Map<String, LocalPropertyInformation>
+                properties: Map<String, LocalPropertyInformation>,
+                includeAllConstructorParameters: Boolean
         ): ObjectBuilderProvider =
-                if (constructor.hasParameters) makeConstructorBasedProvider(properties, typeIdentifier, constructor)
+                if (constructor.hasParameters) makeConstructorBasedProvider(properties, typeIdentifier, constructor, includeAllConstructorParameters)
                 else makeSetterBasedProvider(properties, typeIdentifier, constructor)
 
         private fun makeConstructorBasedProvider(
                 properties: Map<String, LocalPropertyInformation>,
                 typeIdentifier: TypeIdentifier,
-                constructor: LocalConstructorInformation
+                constructor: LocalConstructorInformation,
+                includeAllConstructorParameters: Boolean
         ): ObjectBuilderProvider {
             requireForSer(properties.values.all {
                 when (it) {
@@ -119,6 +121,10 @@ interface ObjectBuilder {
                                     "but property $name is not constructor-paired"
                     )
                 }
+            }.toMutableMap()
+
+            if (includeAllConstructorParameters) {
+                addMissingConstructorParameters(constructorIndices, constructor)
             }
 
             val propertySlots = constructorIndices.keys.mapIndexed { slot, name -> name to slot }.toMap()
@@ -128,6 +134,15 @@ interface ObjectBuilder {
             }
         }
 
+        private fun addMissingConstructorParameters(constructorIndices: MutableMap<String, Int>, constructor: LocalConstructorInformation) {
+            // Add constructor parameters not in the list of properties
+            // so we can use them in object evolution
+            for ((parameterIndex, parameter) in constructor.parameters.withIndex()) {
+                // Only use the parameters not already matched to properties
+                constructorIndices.putIfAbsent(parameter.name, parameterIndex)
+            }
+        }
+
         private fun makeSetterBasedProvider(
                 properties: Map<String, LocalPropertyInformation>,
                 typeIdentifier: TypeIdentifier,
@@ -254,7 +269,7 @@ class EvolutionObjectBuilder(
                 remoteTypeInformation: RemoteTypeInformation.Composable,
                 mustPreserveData: Boolean
         ): () -> ObjectBuilder {
-            val localBuilderProvider = ObjectBuilder.makeProvider(typeIdentifier, constructor, localProperties)
+            val localBuilderProvider = ObjectBuilder.makeProvider(typeIdentifier, constructor, localProperties, true)
 
             val remotePropertyNames = remoteTypeInformation.properties.keys.sorted()
             val reroutedIndices = remotePropertyNames.map { propertyName ->
diff --git a/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/EvolutionObjectBuilderRenamedPropertyTests.kt b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/EvolutionObjectBuilderRenamedPropertyTests.kt
new file mode 100644
index 0000000000..0e7f795839
--- /dev/null
+++ b/serialization/src/test/kotlin/net/corda/serialization/internal/amqp/EvolutionObjectBuilderRenamedPropertyTests.kt
@@ -0,0 +1,90 @@
+package net.corda.serialization.internal.amqp
+
+import net.corda.core.contracts.BelongsToContract
+import net.corda.core.contracts.Contract
+import net.corda.core.contracts.ContractState
+import net.corda.core.identity.AbstractParty
+import net.corda.core.serialization.DeprecatedConstructorForDeserialization
+import net.corda.core.serialization.SerializedBytes
+import net.corda.core.transactions.LedgerTransaction
+import net.corda.serialization.internal.amqp.testutils.deserialize
+import net.corda.serialization.internal.amqp.testutils.serialize
+import net.corda.serialization.internal.amqp.testutils.testDefaultFactory
+import net.corda.serialization.internal.amqp.testutils.writeTestResource
+import org.assertj.core.api.Assertions
+import org.junit.Test
+
+class EvolutionObjectBuilderRenamedPropertyTests
+{
+    private val cordappVersionTestValue = 38854445
+    private val dataTestValue = "d7af8af0-c10e-45bc-a5f7-92de432be0ef"
+    private val xTestValue = 7568055
+    private val yTestValue = 4113687
+
+    class TemplateContract : Contract {
+        override fun verify(tx: LedgerTransaction) { }
+    }
+
+    /**
+     * Step 1
+     *
+     * This is the original class definition in object evolution.
+     */
+//    @BelongsToContract(TemplateContract::class)
+//    data class TemplateState(val cordappVersion: Int, val data: String, val x : Int?, override val participants: List<AbstractParty> = listOf()) : ContractState
+
+    /**
+     * Step 2
+     *
+     * This is an intermediate class definition in object evolution.
+     * The y property has been added and a constructor copies the value of x into y. x is now set to null by the constructor.
+     */
+//    @BelongsToContract(TemplateContract::class)
+//    data class TemplateState(val cordappVersion: Int, val data: String, val x : Int?, val y : String?, override val participants: List<AbstractParty> = listOf()) : ContractState {
+//        @DeprecatedConstructorForDeserialization(1)
+//        constructor(cordappVersion: Int, data : String, x : Int?, participants: List<AbstractParty>)
+//                : this(cordappVersion, data, null, x?.toString(), participants)
+//    }
+
+    /**
+     * Step 3
+     *
+     * This is the final class definition in object evolution.
+     * The x property has been removed but the constructor that copies values of x into y still exists. We expect previous versions of this
+     * object to pass the value of x to the constructor when deserialized.
+     */
+    @BelongsToContract(TemplateContract::class)
+    data class TemplateState(val cordappVersion: Int, val data: String, val y : String?, override val participants: List<AbstractParty> = listOf()) : ContractState {
+        @DeprecatedConstructorForDeserialization(1)
+        constructor(cordappVersion: Int, data : String, x : Int?, participants: List<AbstractParty>) : this(cordappVersion, data, x?.toString(), participants)
+    }
+
+    @Test(timeout=300_000)
+    fun `Step 1 to Step 3`() {
+
+        // The next two commented lines are how the serialized data is generated. To regenerate the data, uncomment these along
+        // with the correct version of the class and rerun the test. This will generate a new file in the project resources.
+
+//        val step1 = TemplateState(cordappVersionTestValue, dataTestValue, xTestValue)
+//        saveSerializedObject(step1)
+
+        // serialization/src/test/resources/net/corda/serialization/internal/amqp/EvolutionObjectBuilderRenamedPropertyTests.Step1
+        val bytes = this::class.java.getResource("EvolutionObjectBuilderRenamedPropertyTests.Step1").readBytes()
+
+        val serializerFactory: SerializerFactory = testDefaultFactory()
+        val deserializedObject = DeserializationInput(serializerFactory)
+                .deserialize(SerializedBytes<TemplateState>(bytes))
+
+        Assertions.assertThat(deserializedObject.cordappVersion).isEqualTo(cordappVersionTestValue)
+        Assertions.assertThat(deserializedObject.data).isEqualTo(dataTestValue)
+//        Assertions.assertThat(deserializedObject.x).isEqualTo(xTestValue)
+        Assertions.assertThat(deserializedObject.y).isEqualTo(xTestValue.toString())
+        Assertions.assertThat(deserializedObject).isInstanceOf(TemplateState::class.java)
+    }
+
+    /**
+     * Write serialized object to resources folder
+     */
+    @Suppress("unused")
+    fun <T : Any> saveSerializedObject(obj : T) = writeTestResource(SerializationOutput(testDefaultFactory()).serialize(obj))
+}
\ No newline at end of file
diff --git a/serialization/src/test/resources/net/corda/serialization/internal/amqp/EvolutionObjectBuilderRenamedPropertyTests.Step1 b/serialization/src/test/resources/net/corda/serialization/internal/amqp/EvolutionObjectBuilderRenamedPropertyTests.Step1
new file mode 100644
index 0000000000..c2a94d7f11
Binary files /dev/null and b/serialization/src/test/resources/net/corda/serialization/internal/amqp/EvolutionObjectBuilderRenamedPropertyTests.Step1 differ