diff --git a/core/src/main/kotlin/net/corda/core/flows/ResultSerializationException.kt b/core/src/main/kotlin/net/corda/core/flows/ResultSerializationException.kt
new file mode 100644
index 0000000000..34e463d1ac
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/flows/ResultSerializationException.kt
@@ -0,0 +1,11 @@
+package net.corda.core.flows
+
+import net.corda.core.CordaRuntimeException
+import net.corda.core.serialization.internal.MissingSerializerException
+
+/**
+ * Thrown whenever a flow result cannot be serialized when attempting to save it in the database
+ */
+class ResultSerializationException private constructor(message: String?) : CordaRuntimeException(message) {
+    constructor(e: MissingSerializerException): this(e.message)
+}
\ No newline at end of file
diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt
index 2f8fc40935..65775c7eb7 100644
--- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowWithClientIdTest.kt
@@ -3,15 +3,20 @@ package net.corda.node.flows
 import co.paralleluniverse.fibers.Suspendable
 import net.corda.core.flows.FlowLogic
 import net.corda.core.flows.StartableByRPC
+import net.corda.core.internal.concurrent.OpenFuture
+import net.corda.core.internal.concurrent.openFuture
 import net.corda.core.messaging.startFlowWithClientId
+import net.corda.core.flows.ResultSerializationException
 import net.corda.core.utilities.getOrThrow
 import net.corda.core.utilities.seconds
 import net.corda.testing.driver.DriverParameters
 import net.corda.testing.driver.driver
 import org.junit.Before
 import org.junit.Test
-import java.util.*
+import rx.Observable
+import java.util.UUID
 import kotlin.test.assertEquals
+import kotlin.test.assertFailsWith
 import kotlin.test.assertNotEquals
 import kotlin.test.assertTrue
 
@@ -55,9 +60,22 @@ class FlowWithClientIdTest {
             assertEquals(flowHandle0.clientId, flowHandle1.clientId)
             assertEquals(2, counter) // this asserts that 2 different flows were spawned indeed
         }
-
     }
 
+    @Test(timeout=300_000)
+    fun `on flow unserializable result a 'CordaRuntimeException' is thrown containing in its message the unserializable type`() {
+        val clientId = UUID.randomUUID().toString()
+        driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
+            val nodeA = startNode().getOrThrow()
+
+            val e = assertFailsWith<ResultSerializationException> {
+                nodeA.rpc.startFlowWithClientId(clientId, ::UnserializableResultFlow).returnValue.getOrThrow(20.seconds)
+            }
+
+            val errorMessage = e.message
+            assertTrue(errorMessage!!.contains("Unable to create an object serializer for type class ${UnserializableResultFlow.UNSERIALIZABLE_OBJECT::class.java.name}"))
+        }
+    }
 }
 
 @StartableByRPC
@@ -75,3 +93,14 @@ internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
     }
 }
 
+@StartableByRPC
+internal class UnserializableResultFlow: FlowLogic<OpenFuture<Observable<Unit>>>() {
+    companion object {
+        val UNSERIALIZABLE_OBJECT = openFuture<Observable<Unit>>().also { it.set(Observable.empty<Unit>())}
+    }
+
+    @Suspendable
+    override fun call(): OpenFuture<Observable<Unit>> {
+        return UNSERIALIZABLE_OBJECT
+    }
+}
diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt
index 1edc2491a8..4f047b4333 100644
--- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt
+++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt
@@ -4,6 +4,7 @@ import net.corda.core.flows.StateMachineRunId
 import net.corda.core.serialization.SerializedBytes
 import net.corda.node.services.statemachine.Checkpoint
 import net.corda.node.services.statemachine.CheckpointState
+import net.corda.node.services.statemachine.FlowResultMetadata
 import net.corda.node.services.statemachine.FlowState
 import java.util.stream.Stream
 
@@ -66,5 +67,13 @@ interface CheckpointStorage {
      */
     fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
 
+    fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>>
+
+    /**
+     * Load a flow result from the store. If [throwIfMissing] is true then it throws an [IllegalStateException]
+     * if the flow result is missing in the database.
+     */
+    fun getFlowResult(id: StateMachineRunId, throwIfMissing: Boolean = false): Any?
+
     fun updateStatus(runId: StateMachineRunId, flowStatus: Checkpoint.FlowStatus)
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt
index a5be0edef5..f6ad8fd14e 100644
--- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt
+++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt
@@ -6,8 +6,11 @@ import net.corda.core.flows.StateMachineRunId
 import net.corda.core.internal.PLATFORM_VERSION
 import net.corda.core.internal.VisibleForTesting
 import net.corda.core.internal.uncheckedCast
+import net.corda.core.flows.ResultSerializationException
 import net.corda.core.serialization.SerializationDefaults
 import net.corda.core.serialization.SerializedBytes
+import net.corda.core.serialization.deserialize
+import net.corda.core.serialization.internal.MissingSerializerException
 import net.corda.core.serialization.serialize
 import net.corda.core.utilities.contextLogger
 import net.corda.node.services.api.CheckpointStorage
@@ -15,6 +18,7 @@ import net.corda.node.services.statemachine.Checkpoint
 import net.corda.node.services.statemachine.Checkpoint.FlowStatus
 import net.corda.node.services.statemachine.CheckpointState
 import net.corda.node.services.statemachine.ErrorState
+import net.corda.node.services.statemachine.FlowResultMetadata
 import net.corda.node.services.statemachine.FlowState
 import net.corda.node.services.statemachine.SubFlowVersion
 import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
@@ -59,6 +63,24 @@ class DBCheckpointStorage(
 
         private val RUNNABLE_CHECKPOINTS = setOf(FlowStatus.RUNNABLE, FlowStatus.HOSPITALIZED)
 
+        // This is a dummy [DBFlowMetadata] object which help us whenever we want to persist a [DBFlowCheckpoint], but not persist its [DBFlowMetadata].
+        // [DBFlowCheckpoint] needs to always reference a [DBFlowMetadata] ([DBFlowCheckpoint.flowMetadata] is not nullable).
+        // However, since we do not -hibernate- cascade, it does not get persisted into the database.
+        private val dummyDBFlowMetadata: DBFlowMetadata = DBFlowMetadata(
+                flowId = "dummyFlowId",
+                invocationId = "dummyInvocationId",
+                flowName = "dummyFlowName",
+                userSuppliedIdentifier = "dummyUserSuppliedIdentifier",
+                startType = StartReason.INITIATED,
+                initialParameters = ByteArray(0),
+                launchingCordapp = "dummyLaunchingCordapp",
+                platformVersion = -1,
+                startedBy = "dummyStartedBy",
+                invocationInstant = Instant.now(),
+                startInstant = Instant.now(),
+                finishInstant = null
+        )
+
         /**
          * This needs to run before Hibernate is initialised.
          *
@@ -185,28 +207,31 @@ class DBCheckpointStorage(
         var flow_id: String,
 
         @Type(type = "corda-blob")
-        @Column(name = "result_value", nullable = false)
-        var value: ByteArray = EMPTY_BYTE_ARRAY,
+        @Column(name = "result_value", nullable = true)
+        var value: ByteArray? = null,
 
         @Column(name = "timestamp")
         val persistedInstant: Instant
     ) {
+        @Suppress("ComplexMethod")
         override fun equals(other: Any?): Boolean {
             if (this === other) return true
             if (javaClass != other?.javaClass) return false
-
             other as DBFlowResult
-
             if (flow_id != other.flow_id) return false
-            if (!value.contentEquals(other.value)) return false
+            val value = value
+            val otherValue = other.value
+            if (value != null) {
+                if (otherValue == null) return false
+                if (!value.contentEquals(otherValue)) return false
+            } else if (otherValue != null) return false
             if (persistedInstant != other.persistedInstant) return false
-
             return true
         }
 
         override fun hashCode(): Int {
             var result = flow_id.hashCode()
-            result = 31 * result + value.contentHashCode()
+            result = 31 * result + (value?.contentHashCode() ?: 0)
             result = 31 * result + persistedInstant.hashCode()
             return result
         }
@@ -299,7 +324,7 @@ class DBCheckpointStorage(
         @Column(name = "invocation_time", nullable = false)
         var invocationInstant: Instant,
 
-        @Column(name = "start_time", nullable = true)
+        @Column(name = "start_time", nullable = false)
         var startInstant: Instant,
 
         @Column(name = "finish_time", nullable = true)
@@ -363,7 +388,7 @@ class DBCheckpointStorage(
             now
         )
 
-        val metadata = createDBFlowMetadata(flowId, checkpoint)
+        val metadata = createDBFlowMetadata(flowId, checkpoint, now)
 
         // Most fields are null as they cannot have been set when creating the initial checkpoint
         val dbFlowCheckpoint = DBFlowCheckpoint(
@@ -384,8 +409,11 @@ class DBCheckpointStorage(
         currentDBSession().save(metadata)
     }
 
+    @Suppress("ComplexMethod")
     override fun updateCheckpoint(
-        id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>?,
+        id: StateMachineRunId,
+        checkpoint: Checkpoint,
+        serializedFlowState: SerializedBytes<FlowState>?,
         serializedCheckpointState: SerializedBytes<CheckpointState>
     ) {
         val now = clock.instant()
@@ -404,18 +432,25 @@ class DBCheckpointStorage(
             )
         }
 
-        //This code needs to be added back in when we want to persist the result. For now this requires the result to be @CordaSerializable.
-        //val result = updateDBFlowResult(entity, checkpoint, now)
+        val dbFlowResult = if (checkpoint.status == FlowStatus.COMPLETED) {
+            try {
+                createDBFlowResult(flowId, checkpoint.result, now)
+            } catch (e: MissingSerializerException) {
+                throw ResultSerializationException(e)
+            }
+        } else {
+            null
+        }
+
         val exceptionDetails = updateDBFlowException(flowId, checkpoint, now)
 
-        val metadata = createDBFlowMetadata(flowId, checkpoint)
-
+        // Updates to children entities ([DBFlowCheckpointBlob], [DBFlowResult], [DBFlowException], [DBFlowMetadata]) are not cascaded to children tables.
         val dbFlowCheckpoint = DBFlowCheckpoint(
             flowId = flowId,
             blob = blob,
-            result = null,
+            result = dbFlowResult,
             exceptionDetails = exceptionDetails,
-            flowMetadata = metadata,
+            flowMetadata = dummyDBFlowMetadata, // [DBFlowMetadata] will only update its 'finish_time' when a checkpoint finishes
             status = checkpoint.status,
             compatible = checkpoint.compatible,
             progressStep = checkpoint.progressStep?.take(MAX_PROGRESS_STEP_LENGTH),
@@ -425,9 +460,9 @@ class DBCheckpointStorage(
 
         currentDBSession().update(dbFlowCheckpoint)
         blob?.let { currentDBSession().update(it) }
+        dbFlowResult?.let { currentDBSession().save(it) }
         if (checkpoint.isFinished()) {
-            metadata.finishInstant = now
-            currentDBSession().update(metadata)
+            setDBFlowMetadataFinishTime(flowId, now)
         }
     }
 
@@ -446,11 +481,11 @@ class DBCheckpointStorage(
         var deletedRows = 0
         val flowId = id.uuid.toString()
         deletedRows += deleteRow(DBFlowMetadata::class.java, DBFlowMetadata::flowId.name, flowId)
+        deletedRows += deleteRow(DBFlowResult::class.java, DBFlowResult::flow_id.name, flowId)
         deletedRows += deleteRow(DBFlowCheckpointBlob::class.java, DBFlowCheckpointBlob::flowId.name, flowId)
         deletedRows += deleteRow(DBFlowCheckpoint::class.java, DBFlowCheckpoint::flowId.name, flowId)
-//        resultId?.let { deletedRows += deleteRow(DBFlowResult::class.java, DBFlowResult::flow_id.name, it.toString()) }
 //        exceptionId?.let { deletedRows += deleteRow(DBFlowException::class.java, DBFlowException::flow_id.name, it.toString()) }
-        return deletedRows == 3
+        return deletedRows >= 2
     }
 
     private fun <T> deleteRow(clazz: Class<T>, pk: String, value: String): Int {
@@ -488,6 +523,10 @@ class DBCheckpointStorage(
         return currentDBSession().find(DBFlowCheckpoint::class.java, id.uuid.toString())
     }
 
+    private fun getDBFlowResult(id: StateMachineRunId): DBFlowResult? {
+        return currentDBSession().find(DBFlowResult::class.java, id.uuid.toString())
+    }
+
     override fun getPausedCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
         val session = currentDBSession()
         val jpqlQuery = """select new ${DBPausedFields::class.java.name}(checkpoint.id, blob.checkpoint, checkpoint.status,
@@ -500,12 +539,34 @@ class DBCheckpointStorage(
         }
     }
 
+    // This method needs modification once CORDA-3681 is implemented to include FAILED flows as well
+    override fun getFinishedFlowsResultsMetadata(): Stream<Pair<StateMachineRunId, FlowResultMetadata>> {
+        val session = currentDBSession()
+        val jpqlQuery = """select new ${DBFlowResultMetadataFields::class.java.name}(checkpoint.id, checkpoint.status, metadata.userSuppliedIdentifier) 
+                from ${DBFlowCheckpoint::class.java.name} checkpoint 
+                join ${DBFlowMetadata::class.java.name} metadata on metadata.id = checkpoint.flowMetadata  
+                where checkpoint.status = ${FlowStatus.COMPLETED.ordinal}""".trimIndent()
+        val query = session.createQuery(jpqlQuery, DBFlowResultMetadataFields::class.java)
+        return query.resultList.stream().map {
+            StateMachineRunId(UUID.fromString(it.id)) to FlowResultMetadata(it.status, it.clientId)
+        }
+    }
+
+    override fun getFlowResult(id: StateMachineRunId, throwIfMissing: Boolean): Any? {
+        val dbFlowResult = getDBFlowResult(id)
+        if (throwIfMissing && dbFlowResult == null) {
+            throw IllegalStateException("Flow's $id result was not found in the database. Something is very wrong.")
+        }
+        val serializedFlowResult = dbFlowResult?.value?.let { SerializedBytes<Any>(it) }
+        return serializedFlowResult?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
+    }
+
     override fun updateStatus(runId: StateMachineRunId, flowStatus: FlowStatus) {
         val update = "Update ${NODE_DATABASE_PREFIX}checkpoints set status = ${flowStatus.ordinal} where flow_id = '${runId.uuid}'"
         currentDBSession().createNativeQuery(update).executeUpdate()
     }
 
-    private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint): DBFlowMetadata {
+    private fun createDBFlowMetadata(flowId: String, checkpoint: Checkpoint, now: Instant): DBFlowMetadata {
         val context = checkpoint.checkpointState.invocationContext
         val flowInfo = checkpoint.checkpointState.subFlowStack.first()
         return DBFlowMetadata(
@@ -521,7 +582,7 @@ class DBCheckpointStorage(
             platformVersion = PLATFORM_VERSION,
             startedBy = context.principal().name,
             invocationInstant = context.trace.invocationId.timestamp,
-            startInstant = clock.instant(),
+            startInstant = now,
             finishInstant = null
         )
     }
@@ -541,35 +602,10 @@ class DBCheckpointStorage(
         )
     }
 
-    /**
-     * Creates, updates or deletes the result related to the current flow/checkpoint.
-     *
-     * This is needed because updates are not cascading via Hibernate, therefore operations must be handled manually.
-     *
-     * A [DBFlowResult] is created if [DBFlowCheckpoint.result] does not exist and the [Checkpoint] has a result..
-     * The existing [DBFlowResult] is updated if [DBFlowCheckpoint.result] exists and the [Checkpoint] has a result.
-     * The existing [DBFlowResult] is deleted if [DBFlowCheckpoint.result] exists and the [Checkpoint] has no result.
-     * Nothing happens if both [DBFlowCheckpoint] and [Checkpoint] do not have a result.
-     */
-    private fun updateDBFlowResult(flowId: String, entity: DBFlowCheckpoint, checkpoint: Checkpoint, now: Instant): DBFlowResult? {
-        val result = checkpoint.result?.let { createDBFlowResult(flowId, it, now) }
-        if (entity.result != null) {
-            if (result != null) {
-                result.flow_id = entity.result!!.flow_id
-                currentDBSession().update(result)
-            } else {
-                currentDBSession().delete(entity.result)
-            }
-        } else if (result != null) {
-            currentDBSession().save(result)
-        }
-        return result
-    }
-
-    private fun createDBFlowResult(flowId: String, result: Any, now: Instant): DBFlowResult {
+    private fun createDBFlowResult(flowId: String, result: Any?, now: Instant): DBFlowResult {
         return DBFlowResult(
             flow_id = flowId,
-            value = result.storageSerialize().bytes,
+            value = result?.storageSerialize()?.bytes,
             persistedInstant = now
         )
     }
@@ -618,6 +654,14 @@ class DBCheckpointStorage(
         }
     }
 
+    private fun setDBFlowMetadataFinishTime(flowId: String, now: Instant) {
+        val session = currentDBSession()
+        val sqlQuery = "Update ${NODE_DATABASE_PREFIX}flow_metadata set finish_time = '$now' " +
+                "where flow_id = '$flowId'"
+        val query = session.createNativeQuery(sqlQuery)
+        query.executeUpdate()
+    }
+
     private fun InvocationContext.getStartedType(): StartReason {
         return when (origin) {
             is InvocationOrigin.RPC, is InvocationOrigin.Shell -> StartReason.RPC
@@ -648,7 +692,7 @@ class DBCheckpointStorage(
             // Always load as a [Clean] checkpoint to represent that the checkpoint is the last _good_ checkpoint
             errorState = ErrorState.Clean,
             // A checkpoint with a result should not normally be loaded (it should be [null] most of the time)
-            result = result?.let { SerializedBytes<Any>(it.value) },
+            result = result?.let { dbFlowResult -> dbFlowResult.value?.let { SerializedBytes<Any>(it) } },
             status = status,
             progressStep = progressStep,
             flowIoRequest = ioRequestType,
@@ -679,6 +723,12 @@ class DBCheckpointStorage(
         }
     }
 
+    private class DBFlowResultMetadataFields(
+        val id: String,
+        val status: FlowStatus,
+        val clientId: String?
+    )
+
     private fun <T : Any> T.storageSerialize(): SerializedBytes<T> {
         return serialize(context = SerializationDefaults.STORAGE_CONTEXT)
     }
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
index 4bbd292c77..a1c5c9f164 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
@@ -5,6 +5,7 @@ import co.paralleluniverse.fibers.FiberExecutorScheduler
 import co.paralleluniverse.fibers.instrument.JavaAgent
 import com.codahale.metrics.Gauge
 import com.google.common.util.concurrent.ThreadFactoryBuilder
+import net.corda.core.CordaRuntimeException
 import net.corda.core.concurrent.CordaFuture
 import net.corda.core.context.InvocationContext
 import net.corda.core.flows.FlowException
@@ -20,7 +21,6 @@ import net.corda.core.internal.castIfPossible
 import net.corda.core.internal.concurrent.OpenFuture
 import net.corda.core.internal.concurrent.doneFuture
 import net.corda.core.internal.concurrent.map
-import net.corda.core.internal.concurrent.mapError
 import net.corda.core.internal.concurrent.openFuture
 import net.corda.core.internal.mapNotNull
 import net.corda.core.internal.uncheckedCast
@@ -127,6 +127,7 @@ internal class SingleThreadedStateMachineManager(
      */
     override val changes: Observable<StateMachineManager.Change> = innerState.changesPublisher
 
+    @Suppress("ComplexMethod")
     override fun start(tokenizableServices: List<Any>, startMode: StateMachineManager.StartMode): CordaFuture<Unit> {
         checkQuasarJavaAgentPresence()
         val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT.withTokenContext(
@@ -174,8 +175,6 @@ internal class SingleThreadedStateMachineManager(
         }
 
         // at the moment we have RUNNABLE, HOSPITALIZED and PAUSED flows
-        // - RESULTED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3692
-        // - FAILED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3681
         // - Incompatible checkpoints need to be handled upon implementing CORDA-3897
         for (flow in fibers) {
             flow.fiber.clientId?.let {
@@ -191,6 +190,17 @@ internal class SingleThreadedStateMachineManager(
             }
         }
 
+        val finishedFlowsResults = checkpointStorage.getFinishedFlowsResultsMetadata().toList()
+        for ((id, finishedFlowResult) in finishedFlowsResults) {
+            finishedFlowResult.clientId?.let {
+                if (finishedFlowResult.status == Checkpoint.FlowStatus.COMPLETED) {
+                    innerState.clientIdsToFlowIds[it] = FlowWithClientIdStatus.Removed(id, true)
+                } else {
+                    // - FAILED flows need to be fetched upon implementing https://r3-cev.atlassian.net/browse/CORDA-3681
+                }
+            } ?: logger.error("Found finished flow $id without a client id. Something is very wrong and this flow will be ignored.")
+        }
+
         return serviceHub.networkMapCache.nodeReady.map {
             logger.info("Node ready, info: ${serviceHub.myInfo}")
             resumeRestoredFlows(fibers)
@@ -276,24 +286,24 @@ internal class SingleThreadedStateMachineManager(
 
         val clientId = context.clientId
         if (clientId != null) {
-            var existingFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>? = null
+            var existingStatus: FlowWithClientIdStatus? = null
             innerState.withLock {
-                clientIdsToFlowIds.compute(clientId) { _, existingStatus ->
-                    if (existingStatus != null) {
-                        existingFuture = when (existingStatus) {
-                            is FlowWithClientIdStatus.Active -> existingStatus.flowStateMachineFuture
-                            // This below dummy future ('doneFuture(5)') will be populated from DB upon implementing CORDA-3692 and CORDA-3681 - for now just return a dummy future
-                            is FlowWithClientIdStatus.Removed -> doneClientIdFuture(existingStatus.flowId, doneFuture(@Suppress("MagicNumber")5), clientId)
-                        }
-                        existingStatus
+                clientIdsToFlowIds.compute(clientId) { _, status ->
+                    if (status != null) {
+                        existingStatus = status
+                        status
                     } else {
                         newFuture = openFuture()
                         FlowWithClientIdStatus.Active(newFuture!!)
                     }
                 }
             }
-            if (existingFuture != null) return uncheckedCast(existingFuture)
 
+            // Flow -started with client id- already exists, return the existing's flow future and don't start a new flow.
+            existingStatus?.let {
+                val existingFuture = activeOrRemovedClientIdFuture(it, clientId)
+                return@startFlow uncheckedCast(existingFuture)
+            }
             onClientIDNotFound?.invoke()
         }
 
@@ -674,17 +684,9 @@ internal class SingleThreadedStateMachineManager(
             // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
             val existingCheckpoint = database.transaction { checkpointStorage.getCheckpoint(flowId) }
             existingCheckpoint?.let { serializedCheckpoint ->
-                val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId)
-                if (checkpoint == null) {
-                    return openFuture<FlowStateMachine<A>>().mapError {
-                        IllegalStateException(
-                            "Unable to deserialize database checkpoint for flow $flowId. " +
-                                    "Something is very wrong. The flow will not retry."
-                        )
-                    }
-                } else {
-                    checkpoint
-                }
+                tryDeserializeCheckpoint(serializedCheckpoint, flowId) ?: throw IllegalStateException(
+                    "Unable to deserialize database checkpoint for flow $flowId. Something is very wrong. The flow will not retry."
+                )
             }
         } else {
             // This is a brand new flow
@@ -878,6 +880,24 @@ internal class SingleThreadedStateMachineManager(
         }
     }
 
+    private fun activeOrRemovedClientIdFuture(existingStatus: FlowWithClientIdStatus, clientId: String) = when (existingStatus) {
+        is FlowWithClientIdStatus.Active -> existingStatus.flowStateMachineFuture
+        is FlowWithClientIdStatus.Removed -> {
+            val flowId = existingStatus.flowId
+
+            val resultFuture = if (existingStatus.succeeded) {
+                val flowResult = database.transaction { checkpointStorage.getFlowResult(existingStatus.flowId, throwIfMissing = true) }
+                doneFuture(flowResult)
+            } else {
+                // this block will be implemented upon implementing CORDA-3681 - for now just return a dummy exception
+                val flowException = CordaRuntimeException("dummy")
+                openFuture<Any?>().apply { setException(flowException) }
+            }
+
+            doneClientIdFuture(flowId, resultFuture, clientId)
+        }
+    }
+
     /**
      * The flow out of which a [doneFuture] will be produced should be a started flow,
      * i.e. it should not exist in [mutex.content.startedFutures].
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt
index 83e5bd44c0..9065823ffa 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt
@@ -388,3 +388,8 @@ sealed class FlowWithClientIdStatus {
     data class Active(val flowStateMachineFuture: CordaFuture<out FlowStateMachineHandle<out Any?>>) : FlowWithClientIdStatus()
     data class Removed(val flowId: StateMachineRunId, val succeeded: Boolean) : FlowWithClientIdStatus()
 }
+
+data class FlowResultMetadata(
+    val status: Checkpoint.FlowStatus,
+    val clientId: String?
+)
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt
index 8b22573421..a2fa8b5bb3 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/TransitionExecutorImpl.kt
@@ -1,6 +1,7 @@
 package net.corda.node.services.statemachine
 
 import co.paralleluniverse.fibers.Suspendable
+import net.corda.core.flows.ResultSerializationException
 import net.corda.core.utilities.contextLogger
 import net.corda.node.services.statemachine.transitions.FlowContinuation
 import net.corda.node.services.statemachine.transitions.TransitionResult
@@ -73,22 +74,12 @@ class TransitionExecutorImpl(
                         log.info("Error while executing $action, with event $event, erroring state", exception)
                     }
 
-                    // distinguish between a DatabaseTransactionException and an actual StateTransitionException
-                    val stateTransitionOrDatabaseTransactionException =
-                        if (exception is DatabaseTransactionException) {
-                            // if the exception is a DatabaseTransactionException then it is not really a StateTransitionException
-                            // it is actually an exception that previously broke a DatabaseTransaction and was suppressed by user code
-                            // it was rethrown on [DatabaseTransaction.commit]. Unwrap the original exception and pass it to flow hospital
-                            exception.cause
-                        } else {
-                            // Wrap the exception with [StateTransitionException] for handling by the flow hospital
-                            StateTransitionException(action, event, exception)
-                        }
+                    val flowError = createError(exception, action, event)
 
                     val newState = previousState.copy(
                         checkpoint = previousState.checkpoint.copy(
                             errorState = previousState.checkpoint.errorState.addErrors(
-                                listOf(FlowError(secureRandom.nextLong(), stateTransitionOrDatabaseTransactionException))
+                                listOf(flowError)
                             )
                         ),
                         isFlowResumed = false
@@ -121,4 +112,23 @@ class TransitionExecutorImpl(
             }
         }
     }
+
+    private fun createError(e: Exception, action: Action, event: Event): FlowError {
+        // distinguish between a DatabaseTransactionException and an actual StateTransitionException
+        val stateTransitionOrOtherException: Throwable =
+            if (e is DatabaseTransactionException) {
+                // if the exception is a DatabaseTransactionException then it is not really a StateTransitionException
+                // it is actually an exception that previously broke a DatabaseTransaction and was suppressed by user code
+                // it was rethrown on [DatabaseTransaction.commit]. Unwrap the original exception and pass it to flow hospital
+                e.cause
+            } else if (e is ResultSerializationException) {
+                // We must not wrap a [ResultSerializationException] with a [StateTransitionException],
+                // because we will propagate the exception to rpc clients and [StateTransitionException] cannot be propagated to rpc clients.
+                e
+            } else {
+                // Wrap the exception with [StateTransitionException] for handling by the flow hospital
+                StateTransitionException(action, event, e)
+            }
+        return FlowError(secureRandom.nextLong(), stateTransitionOrOtherException)
+    }
 }
\ No newline at end of file
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
index 9d1c2c0b59..af95d0ca1e 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
@@ -240,10 +240,22 @@ class TopLevelTransition(
                             isFlowResumed = false,
                             isRemoved = true
                     )
-                    val allSourceSessionIds = checkpoint.checkpointState.sessions.keys
+
                     if (currentState.isAnyCheckpointPersisted) {
-                        actions.add(Action.RemoveCheckpoint(context.id))
+                        if (currentState.checkpoint.checkpointState.invocationContext.clientId == null) {
+                            actions.add(Action.RemoveCheckpoint(context.id))
+                        } else {
+                            actions.add(
+                                Action.PersistCheckpoint(
+                                    context.id,
+                                    currentState.checkpoint,
+                                    isCheckpointUpdate = currentState.isAnyCheckpointPersisted
+                                )
+                            )
+                        }
                     }
+
+                    val allSourceSessionIds = currentState.checkpoint.checkpointState.sessions.keys
                     actions.addAll(arrayOf(
                         Action.PersistDeduplicationFacts(pendingDeduplicationHandlers),
                             Action.ReleaseSoftLocks(event.softLocksId),
diff --git a/node/src/main/resources/migration/node-core.changelog-v19-keys.xml b/node/src/main/resources/migration/node-core.changelog-v19-keys.xml
index 26359ecd2f..3e2f404ad8 100644
--- a/node/src/main/resources/migration/node-core.changelog-v19-keys.xml
+++ b/node/src/main/resources/migration/node-core.changelog-v19-keys.xml
@@ -12,12 +12,15 @@
         <addPrimaryKey columnNames="flow_id" constraintName="node_checkpoints_pk" tableName="node_checkpoints"/>
     </changeSet>
 
-    <!-- TODO: add indexes for the rest of the tables as well (Results + Exceptions)   -->
+    <!-- TODO: add indexes for Exceptions table as well -->
     <!-- TODO: the following only add indexes so maybe also align name of file?   -->
     <changeSet author="R3.Corda" id="add_new_checkpoint_schema_indexes">
         <createIndex indexName="node_checkpoint_blobs_idx" tableName="node_checkpoint_blobs" clustered="false" unique="true">
             <column name="flow_id"/>
         </createIndex>
+        <createIndex indexName="node_flow_results_idx" tableName="node_flow_results" clustered="false" unique="true">
+            <column name="flow_id"/>
+        </createIndex>
         <createIndex indexName="node_flow_metadata_idx" tableName="node_flow_metadata" clustered="false" unique="true">
             <column name="flow_id"/>
         </createIndex>
diff --git a/node/src/main/resources/migration/node-core.changelog-v19-postgres.xml b/node/src/main/resources/migration/node-core.changelog-v19-postgres.xml
index 6aedc510b4..d7098a5076 100644
--- a/node/src/main/resources/migration/node-core.changelog-v19-postgres.xml
+++ b/node/src/main/resources/migration/node-core.changelog-v19-postgres.xml
@@ -49,14 +49,13 @@
         </createTable>
     </changeSet>
 
-
     <changeSet author="R3.Corda" id="add_new_flow_result_table-postgres" dbms="postgresql">
         <createTable tableName="node_flow_results">
             <column name="flow_id" type="NVARCHAR(64)">
                 <constraints nullable="false"/>
             </column>
             <column name="result_value" type="varbinary(33554432)">
-                <constraints nullable="false"/>
+                <constraints nullable="true"/>
             </column>
             <column name="timestamp" type="java.sql.Types.TIMESTAMP">
                 <constraints nullable="false"/>
diff --git a/node/src/main/resources/migration/node-core.changelog-v19.xml b/node/src/main/resources/migration/node-core.changelog-v19.xml
index 6b8c1e9b24..03165ce7d6 100644
--- a/node/src/main/resources/migration/node-core.changelog-v19.xml
+++ b/node/src/main/resources/migration/node-core.changelog-v19.xml
@@ -49,14 +49,13 @@
         </createTable>
     </changeSet>
 
-
     <changeSet author="R3.Corda" id="add_new_flow_result_table" dbms="!postgresql">
         <createTable tableName="node_flow_results">
             <column name="flow_id" type="NVARCHAR(64)">
                 <constraints nullable="false"/>
             </column>
             <column name="result_value" type="blob">
-                <constraints nullable="false"/>
+                <constraints nullable="true"/>
             </column>
             <column name="timestamp" type="java.sql.Types.TIMESTAMP">
                 <constraints nullable="false"/>
diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt
index 3c526e4a9d..a9a75c52e5 100644
--- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt
@@ -276,14 +276,13 @@ class DBCheckpointStorageTests {
         database.transaction {
             checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
         }
-        val updatedCheckpoint = checkpoint.copy(result = "The result")
+        val updatedCheckpoint = checkpoint.copy(result = "The result", status = Checkpoint.FlowStatus.COMPLETED)
         val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
         database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState()) }
 
         database.transaction {
             assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
-            // The result not stored yet
-            assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
+            assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
             assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowMetadata>().size)
             assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpointBlob>().size)
             assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().size)
@@ -457,7 +456,6 @@ class DBCheckpointStorageTests {
     }
 
     @Test(timeout = 300_000)
-    @Ignore
     fun `update checkpoint with result information creates new result database record`() {
         val result = "This is the result"
         val (id, checkpoint) = newCheckpoint()
@@ -466,7 +464,7 @@ class DBCheckpointStorageTests {
         database.transaction {
             checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
         }
-        val updatedCheckpoint = checkpoint.copy(result = result)
+        val updatedCheckpoint = checkpoint.copy(result = result, status = Checkpoint.FlowStatus.COMPLETED)
         val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
         database.transaction {
             checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState())
@@ -481,63 +479,6 @@ class DBCheckpointStorageTests {
         }
     }
 
-    @Test(timeout = 300_000)
-    @Ignore
-    fun `update checkpoint with result information updates existing result database record`() {
-        val result = "This is the result"
-        val somehowThereIsANewResult = "Another result (which should not be possible!)"
-        val (id, checkpoint) = newCheckpoint()
-        val serializedFlowState =
-            checkpoint.serializeFlowState()
-        database.transaction {
-            checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
-        }
-        val updatedCheckpoint = checkpoint.copy(result = result)
-        val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
-        database.transaction {
-            checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState())
-        }
-        val updatedCheckpoint2 = checkpoint.copy(result = somehowThereIsANewResult)
-        val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState()
-        database.transaction {
-            checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2, updatedCheckpoint2.serializeCheckpointState())
-        }
-        database.transaction {
-            assertEquals(
-                somehowThereIsANewResult,
-                checkpointStorage.getCheckpoint(id)!!.deserialize().result
-            )
-            assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
-            assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
-        }
-    }
-
-    @Test(timeout = 300_000)
-    fun `removing result information from checkpoint deletes existing result database record`() {
-        val result = "This is the result"
-        val (id, checkpoint) = newCheckpoint()
-        val serializedFlowState =
-            checkpoint.serializeFlowState()
-        database.transaction {
-            checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState, checkpoint.serializeCheckpointState())
-        }
-        val updatedCheckpoint = checkpoint.copy(result = result)
-        val updatedSerializedFlowState = updatedCheckpoint.serializeFlowState()
-        database.transaction {
-            checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState, updatedCheckpoint.serializeCheckpointState())
-        }
-        val updatedCheckpoint2 = checkpoint.copy(result = null)
-        val updatedSerializedFlowState2 = updatedCheckpoint2.serializeFlowState()
-        database.transaction {
-            checkpointStorage.updateCheckpoint(id, updatedCheckpoint2, updatedSerializedFlowState2, updatedCheckpoint2.serializeCheckpointState())
-        }
-        database.transaction {
-            assertNull(checkpointStorage.getCheckpoint(id)!!.deserialize().result)
-            assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
-            assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
-        }
-    }
-
     @Ignore
     @Test(timeout = 300_000)
     fun `update checkpoint with error information creates a new error database record`() {
@@ -890,6 +831,41 @@ class DBCheckpointStorageTests {
         }
     }
 
+    // This test needs modification once CORDA-3681 is implemented to include FAILED flows as well
+    @Test(timeout = 300_000)
+    fun `'getFinishedFlowsResultsMetadata' fetches flows results metadata for finished flows only`() {
+        val (_, checkpoint) = newCheckpoint(1)
+        val runnable = changeStatus(checkpoint, Checkpoint.FlowStatus.RUNNABLE)
+        val hospitalized = changeStatus(checkpoint, Checkpoint.FlowStatus.HOSPITALIZED)
+        val completed = changeStatus(checkpoint, Checkpoint.FlowStatus.COMPLETED)
+        val failed = changeStatus(checkpoint, Checkpoint.FlowStatus.FAILED)
+        val killed = changeStatus(checkpoint, Checkpoint.FlowStatus.KILLED)
+        val paused = changeStatus(checkpoint, Checkpoint.FlowStatus.PAUSED)
+
+        database.transaction {
+            val serializedFlowState =
+                checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
+
+            checkpointStorage.addCheckpoint(runnable.id, runnable.checkpoint, serializedFlowState, runnable.checkpoint.serializeCheckpointState())
+            checkpointStorage.addCheckpoint(hospitalized.id, hospitalized.checkpoint, serializedFlowState, hospitalized.checkpoint.serializeCheckpointState())
+            checkpointStorage.addCheckpoint(completed.id, completed.checkpoint, serializedFlowState, completed.checkpoint.serializeCheckpointState())
+            checkpointStorage.addCheckpoint(failed.id, failed.checkpoint, serializedFlowState, failed.checkpoint.serializeCheckpointState())
+            checkpointStorage.addCheckpoint(killed.id, killed.checkpoint, serializedFlowState, killed.checkpoint.serializeCheckpointState())
+            checkpointStorage.addCheckpoint(paused.id, paused.checkpoint, serializedFlowState, paused.checkpoint.serializeCheckpointState())
+        }
+
+        val checkpointsInDb = database.transaction {
+            checkpointStorage.getCheckpoints().toList().size
+        }
+
+        val resultsMetadata = database.transaction {
+            checkpointStorage.getFinishedFlowsResultsMetadata()
+        }.toList()
+
+        assertEquals(6, checkpointsInDb)
+        assertEquals(Checkpoint.FlowStatus.COMPLETED, resultsMetadata.single().second.status)
+    }
+
     data class IdAndCheckpoint(val id: StateMachineRunId, val checkpoint: Checkpoint)
 
     private fun changeStatus(oldCheckpoint: Checkpoint, status: Checkpoint.FlowStatus): IdAndCheckpoint {
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt
index b600525b6c..e869a82018 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowClientIdTests.kt
@@ -1,11 +1,13 @@
 package net.corda.node.services.statemachine
 
-import co.paralleluniverse.fibers.Fiber
 import co.paralleluniverse.fibers.Suspendable
 import co.paralleluniverse.strands.concurrent.Semaphore
+import net.corda.core.CordaRuntimeException
 import net.corda.core.flows.FlowLogic
+import net.corda.core.internal.FlowIORequest
 import net.corda.core.utilities.getOrThrow
 import net.corda.core.utilities.seconds
+import net.corda.node.services.persistence.DBCheckpointStorage
 import net.corda.testing.core.ALICE_NAME
 import net.corda.testing.node.InMemoryMessagingNetwork
 import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
@@ -19,6 +21,7 @@ import org.junit.Assert
 import org.junit.Before
 import org.junit.Ignore
 import org.junit.Test
+import rx.Observable
 import java.lang.IllegalStateException
 import java.sql.SQLTransientConnectionException
 import java.util.UUID
@@ -26,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import kotlin.concurrent.thread
 import kotlin.test.assertEquals
 import kotlin.test.assertFailsWith
+import kotlin.test.assertNull
 import kotlin.test.assertTrue
 
 class FlowClientIdTests {
@@ -49,6 +53,7 @@ class FlowClientIdTests {
         mockNet.stopNodes()
         ResultFlow.hook = null
         ResultFlow.suspendableHook = null
+        UnSerializableResultFlow.firstRun = true
         SingleThreadedStateMachineManager.beforeClientIDCheck = null
         SingleThreadedStateMachineManager.onClientIDNotFound = null
         SingleThreadedStateMachineManager.onCallingStartFlowInternal = null
@@ -65,6 +70,16 @@ class FlowClientIdTests {
         Assert.assertEquals(1, counter)
     }
 
+    @Test(timeout=300_000)
+    fun `flow's result gets persisted if the flow is started with a client id`() {
+        val clientId = UUID.randomUUID().toString()
+        aliceNode.services.startFlowWithClientId(clientId, ResultFlow(10)).resultFuture.getOrThrow()
+
+        aliceNode.database.transaction {
+            assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
+        }
+    }
+
     @Test(timeout=300_000)
     fun `flow's result is retrievable after flow's lifetime, when flow is started with a client id - different parameters are ignored`() {
         val clientId = UUID.randomUUID().toString()
@@ -83,6 +98,41 @@ class FlowClientIdTests {
         Assert.assertEquals(result0, result1)
     }
 
+    @Test(timeout=300_000)
+    fun `if flow's result is not found in the database an IllegalStateException is thrown`() {
+        val clientId = UUID.randomUUID().toString()
+        val handle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
+        val flowId0 = handle0.id
+        handle0.resultFuture.getOrThrow()
+
+        // manually remove the checkpoint (including DBFlowResult) from the database
+        aliceNode.database.transaction {
+            aliceNode.internals.checkpointStorage.removeCheckpoint(flowId0)
+        }
+
+        assertFailsWith<IllegalStateException> {
+            aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
+        }
+    }
+
+    @Test(timeout=300_000)
+    fun `flow returning null gets retrieved after flow's lifetime when started with client id`() {
+        val clientId = UUID.randomUUID().toString()
+        aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null)).resultFuture.getOrThrow()
+
+        val flowResult = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(null)).resultFuture.getOrThrow()
+        assertNull(flowResult)
+    }
+
+    @Test(timeout=300_000)
+    fun `flow returning Unit gets retrieved after flow's lifetime when started with client id`() {
+        val clientId = UUID.randomUUID().toString()
+        aliceNode.services.startFlowWithClientId(clientId, ResultFlow(Unit)).resultFuture.getOrThrow()
+
+        val flowResult = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(Unit)).resultFuture.getOrThrow()
+        assertEquals(Unit, flowResult)
+    }
+
     @Test(timeout=300_000)
     fun `flow's result is available if reconnect after flow had retried from previous checkpoint, when flow is started with a client id`() {
         var firstRun = true
@@ -240,42 +290,31 @@ class FlowClientIdTests {
         Assert.assertEquals(10, resultsCounter.get())
     }
 
-
     @Test(timeout=300_000)
     fun `on node start -running- flows with client id are hook-able`() {
         val clientId = UUID.randomUUID().toString()
-        var noSecondFlowWasSpawned = 0
         var firstRun = true
-        var firstFiber: Fiber<out Any?>? = null
         val flowIsRunning = Semaphore(0)
         val waitUntilFlowIsRunning = Semaphore(0)
 
         ResultFlow.suspendableHook = object : FlowLogic<Unit>() {
             @Suspendable
             override fun call() {
-                if (firstRun) {
-                    firstFiber = Fiber.currentFiber()
-                    firstRun = false
-                }
-
                 waitUntilFlowIsRunning.release()
-                try {
-                    flowIsRunning.acquire() // make flow wait here to impersonate a running flow
-                } catch (e: InterruptedException) {
-                    flowIsRunning.release()
-                    throw e
+
+                if (firstRun) {
+                    firstRun = false
+                    // high sleeping time doesn't matter because the fiber will get an [Event.SoftShutdown] on node restart, which will wake up the fiber
+                    sleep(100.seconds, maySkipCheckpoint = true)
                 }
 
-                noSecondFlowWasSpawned++
+                flowIsRunning.acquire() // make flow wait here to impersonate a running flow
             }
         }
 
         val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
         waitUntilFlowIsRunning.acquire()
-        aliceNode.internals.acceptableLiveFiberCountOnStop = 1
         val aliceNode = mockNet.restartNode(aliceNode)
-        // Blow up the first fiber running our flow as it is leaked here, on normal node shutdown that fiber should be gone
-        firstFiber!!.interrupt()
 
         waitUntilFlowIsRunning.acquire()
         // Re-hook a running flow
@@ -285,7 +324,6 @@ class FlowClientIdTests {
         Assert.assertEquals(flowHandle0.id, flowHandle1.id)
         Assert.assertEquals(clientId, flowHandle1.clientId)
         Assert.assertEquals(5, flowHandle1.resultFuture.getOrThrow(20.seconds))
-        Assert.assertEquals(1, noSecondFlowWasSpawned)
     }
 
 //    @Test(timeout=300_000)
@@ -340,6 +378,28 @@ class FlowClientIdTests {
 //        Assert.assertEquals(1, noSecondFlowWasSpawned)
 //    }
 
+    @Test(timeout=300_000)
+    fun `on node start -completed- flows with client id are hook-able`() {
+        val clientId = UUID.randomUUID().toString()
+        var counter = 0
+        ResultFlow.hook = {
+            counter++
+        }
+
+        val flowHandle0 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
+        flowHandle0.resultFuture.getOrThrow()
+        val aliceNode = mockNet.restartNode(aliceNode)
+
+        // Re-hook a completed flow
+        val flowHandle1 = aliceNode.services.startFlowWithClientId(clientId, ResultFlow(5))
+        val result1 = flowHandle1.resultFuture.getOrThrow(20.seconds)
+
+        Assert.assertEquals(1, counter) // assert flow has run only once
+        Assert.assertEquals(flowHandle0.id, flowHandle1.id)
+        Assert.assertEquals(clientId, flowHandle1.clientId)
+        Assert.assertEquals(5, result1)
+    }
+
     @Test(timeout=300_000)
     fun `On 'startFlowInternal' throwing, subsequent request with same client id does not get de-duplicated and starts a new flow`() {
         val clientId = UUID.randomUUID().toString()
@@ -400,6 +460,43 @@ class FlowClientIdTests {
 //
 //        assertEquals(0, counter)
 //    }
+
+    // This test needs modification once CORDA-3681 is implemented to check that 'node_flow_exceptions' gets a row
+    @Test(timeout=300_000)
+    fun `if flow fails to serialize its result then the result gets converted to an exception result`() {
+        val clientId = UUID.randomUUID().toString()
+        assertFailsWith<CordaRuntimeException> {
+            aliceNode.services.startFlowWithClientId(clientId, ResultFlow<Observable<Unit>>(Observable.empty())).resultFuture.getOrThrow()
+        }
+
+        // flow has failed to serialize its result => table 'node_flow_results' should be empty, 'node_flow_exceptions' should get one row instead
+        aliceNode.services.database.transaction {
+            val checkpointStatus = findRecordsFromDatabase<DBCheckpointStorage.DBFlowCheckpoint>().single().status
+            assertEquals(Checkpoint.FlowStatus.FAILED, checkpointStatus)
+            assertEquals(0, findRecordsFromDatabase<DBCheckpointStorage.DBFlowResult>().size)
+            // uncomment the below line once CORDA-3681 is implemented
+            //assertEquals(1, findRecordsFromDatabase<DBCheckpointStorage.DBFlowException>().size)
+        }
+
+        assertFailsWith<CordaRuntimeException> {
+            aliceNode.services.startFlowWithClientId(clientId, ResultFlow<Observable<Unit>>(Observable.empty())).resultFuture.getOrThrow()
+        }
+    }
+
+    /**
+     * The below test does not follow a valid path. Normally it should error and propagate.
+     * However, we want to assert that a flow that fails to serialize its result its retriable.
+     */
+    @Test(timeout=300_000)
+    fun `flow failing to serialize its result gets retried and succeeds if returning a different result`() {
+        val clientId = UUID.randomUUID().toString()
+        // before the hospital schedules a [Event.Error] we manually schedule a [Event.RetryFlowFromSafePoint]
+        StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
+            FlowStateMachineImpl.currentStateMachine()!!.scheduleEvent(Event.RetryFlowFromSafePoint)
+        }
+        val result = aliceNode.services.startFlowWithClientId(clientId, UnSerializableResultFlow()).resultFuture.getOrThrow()
+        assertEquals(5, result)
+    }
 }
 
 internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
@@ -414,4 +511,21 @@ internal class ResultFlow<A>(private val result: A): FlowLogic<A>() {
         suspendableHook?.let { subFlow(it) }
         return result
     }
+}
+
+internal class UnSerializableResultFlow: FlowLogic<Any>() {
+    companion object {
+        var firstRun = true
+    }
+
+    @Suspendable
+    override fun call(): Any {
+        stateMachine.suspend(FlowIORequest.ForceCheckpoint, false)
+        return if (firstRun) {
+            firstRun = false
+            Observable.empty<Any>()
+        } else {
+            5 // serializable result
+        }
+    }
 }
\ No newline at end of file
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
index feafb34279..5fac649b18 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
@@ -62,6 +62,7 @@ import net.corda.testing.node.internal.InternalMockNodeParameters
 import net.corda.testing.node.internal.TestStartedNode
 import net.corda.testing.node.internal.getMessage
 import net.corda.testing.node.internal.startFlow
+import net.corda.testing.node.internal.startFlowWithClientId
 import org.apache.commons.lang3.exception.ExceptionUtils
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
@@ -81,7 +82,7 @@ import java.sql.SQLTransientConnectionException
 import java.time.Clock
 import java.time.Duration
 import java.time.Instant
-import java.util.ArrayList
+import java.util.UUID
 import java.util.concurrent.TimeoutException
 import java.util.function.Predicate
 import kotlin.reflect.KClass
@@ -372,12 +373,11 @@ class FlowFrameworkTests {
         }
     }
 
-    // Ignoring test since completed flows are not currently keeping their checkpoints in the database
-    @Ignore
     @Test(timeout = 300_000)
     fun `Flow metadata finish time is set in database when the flow finishes`() {
         val terminationSignal = Semaphore(0)
-        val flow = aliceNode.services.startFlow(NoOpFlow(terminateUponSignal = terminationSignal))
+        val clientId = UUID.randomUUID().toString()
+        val flow = aliceNode.services.startFlowWithClientId(clientId, NoOpFlow(terminateUponSignal = terminationSignal))
         mockNet.waitQuiescent()
         aliceNode.database.transaction {
             val metadata = session.find(DBCheckpointStorage.DBFlowMetadata::class.java, flow.id.uuid.toString())
@@ -832,12 +832,6 @@ class FlowFrameworkTests {
         assertEquals(null, persistedException)
     }
 
-    private inline fun <reified T> DatabaseTransaction.findRecordsFromDatabase(): List<T> {
-        val criteria = session.criteriaBuilder.createQuery(T::class.java)
-        criteria.select(criteria.from(T::class.java))
-        return session.createQuery(criteria).resultList
-    }
-
     //region Helpers
 
     private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
@@ -1022,6 +1016,12 @@ internal fun TestStartedNode.sendSessionMessage(message: SessionMessage, destina
     }
 }
 
+inline fun <reified T> DatabaseTransaction.findRecordsFromDatabase(): List<T> {
+    val criteria = session.criteriaBuilder.createQuery(T::class.java)
+    criteria.select(criteria.from(T::class.java))
+    return session.createQuery(criteria).resultList
+}
+
 internal fun errorMessage(errorResponse: FlowException? = null) =
     ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0))