diff --git a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt
index 1d13b53c66..0fac4a11ce 100644
--- a/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt
+++ b/core-tests/src/test/kotlin/net/corda/coretests/flows/FinalityFlowTests.kt
@@ -1,30 +1,82 @@
 package net.corda.coretests.flows
 
+import co.paralleluniverse.fibers.Suspendable
 import com.natpryce.hamkrest.and
 import com.natpryce.hamkrest.assertion.assertThat
+import net.corda.core.contracts.Amount
+import net.corda.core.contracts.PartyAndReference
+import net.corda.core.contracts.StateAndContract
+import net.corda.core.contracts.StateAndRef
+import net.corda.core.contracts.TransactionVerificationException
+import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.TransactionSignature
 import net.corda.core.flows.FinalityFlow
+import net.corda.core.flows.FlowException
+import net.corda.core.flows.FlowLogic
+import net.corda.core.flows.FlowSession
+import net.corda.core.flows.FlowTransactionMetadata
+import net.corda.core.flows.InitiatedBy
+import net.corda.core.flows.InitiatingFlow
+import net.corda.core.flows.NotaryError
+import net.corda.core.flows.NotaryException
+import net.corda.core.flows.NotarySigCheck
+import net.corda.core.flows.ReceiveFinalityFlow
+import net.corda.core.flows.ReceiveTransactionFlow
+import net.corda.core.flows.StartableByRPC
+import net.corda.core.flows.TransactionStatus
+import net.corda.core.flows.UnexpectedFlowEndException
 import net.corda.core.identity.Party
+import net.corda.core.internal.FetchDataFlow
+import net.corda.core.internal.PLATFORM_VERSION
+import net.corda.core.internal.PlatformVersionSwitches
+import net.corda.core.internal.ServiceHubCoreInternal
+import net.corda.core.node.StatesToRecord
 import net.corda.core.transactions.SignedTransaction
 import net.corda.core.transactions.TransactionBuilder
+import net.corda.core.utilities.OpaqueBytes
 import net.corda.core.utilities.getOrThrow
-import net.corda.coretests.flows.WithFinality.FinalityInvoker
-import net.corda.finance.POUNDS
-import net.corda.finance.contracts.asset.Cash
-import net.corda.finance.issuedBy
-import net.corda.testing.core.*
+import net.corda.core.utilities.unwrap
 import net.corda.coretesting.internal.matchers.flow.willReturn
 import net.corda.coretesting.internal.matchers.flow.willThrow
-import net.corda.testing.node.internal.*
+import net.corda.coretests.flows.WithFinality.FinalityInvoker
+import net.corda.finance.GBP
+import net.corda.finance.POUNDS
+import net.corda.finance.contracts.asset.Cash
+import net.corda.finance.flows.CashIssueFlow
+import net.corda.finance.flows.CashPaymentFlow
+import net.corda.finance.issuedBy
+import net.corda.testing.contracts.DummyContract
+import net.corda.testing.core.ALICE_NAME
+import net.corda.testing.core.BOB_NAME
+import net.corda.testing.core.CHARLIE_NAME
+import net.corda.testing.core.TestIdentity
+import net.corda.testing.core.singleIdentity
+import net.corda.testing.node.internal.CustomCordapp
+import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
+import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
+import net.corda.testing.node.internal.FINANCE_WORKFLOWS_CORDAPP
+import net.corda.testing.node.internal.InternalMockNetwork
+import net.corda.testing.node.internal.InternalMockNodeParameters
+import net.corda.testing.node.internal.MOCK_VERSION_INFO
+import net.corda.testing.node.internal.TestCordappInternal
+import net.corda.testing.node.internal.TestStartedNode
+import net.corda.testing.node.internal.cordappWithPackages
+import net.corda.testing.node.internal.enclosedCordapp
 import org.assertj.core.api.Assertions.assertThat
 import org.junit.After
 import org.junit.Test
+import java.sql.SQLException
+import java.util.Random
+import kotlin.test.assertEquals
+import kotlin.test.assertNull
+import kotlin.test.fail
 
 class FinalityFlowTests : WithFinality {
     companion object {
         private val CHARLIE = TestIdentity(CHARLIE_NAME, 90).party
     }
 
-    override val mockNet = InternalMockNetwork(cordappsForAllNodes = listOf(FINANCE_CONTRACTS_CORDAPP, enclosedCordapp(),
+    override val mockNet = InternalMockNetwork(cordappsForAllNodes = setOf(FINANCE_CONTRACTS_CORDAPP, FINANCE_WORKFLOWS_CORDAPP, DUMMY_CONTRACTS_CORDAPP, enclosedCordapp(),
                                                        CustomCordapp(targetPlatformVersion = 3, classes = setOf(FinalityFlow::class.java))))
 
     private val aliceNode = makeNode(ALICE_NAME)
@@ -62,7 +114,7 @@ class FinalityFlowTests : WithFinality {
         val stx = aliceNode.issuesCashTo(oldBob)
         @Suppress("DEPRECATION")
         aliceNode.startFlowAndRunNetwork(FinalityFlow(stx)).resultFuture.getOrThrow()
-        assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
+        assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull
     }
 
     @Test(timeout=300_000)
@@ -76,12 +128,262 @@ class FinalityFlowTests : WithFinality {
                 oldRecipients = setOf(oldBob.info.singleIdentity())
         )).resultFuture
         resultFuture.getOrThrow()
-        assertThat(newCharlie.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
-        assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull()
+        assertThat(newCharlie.services.validatedTransactions.getTransaction(stx.id)).isNotNull
+        assertThat(oldBob.services.validatedTransactions.getTransaction(stx.id)).isNotNull
     }
 
-    private fun createBob(cordapps: List<TestCordappInternal> = emptyList()): TestStartedNode {
-        return mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, additionalCordapps = cordapps))
+    @Test(timeout=300_000)
+    fun `two phase finality flow transaction`() {
+        val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
+
+        val stx = aliceNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx
+        aliceNode.startFlowAndRunNetwork(CashPaymentFlow(Amount(100, GBP), bobNode.info.singleIdentity())).resultFuture.getOrThrow()
+
+        assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
+        assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
+    }
+
+    @Test(timeout=300_000)
+    fun `two phase finality flow initiator to pre-2PF peer`() {
+        val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
+
+        val stx = aliceNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx
+        aliceNode.startFlowAndRunNetwork(CashPaymentFlow(Amount(100, GBP), bobNode.info.singleIdentity())).resultFuture.getOrThrow()
+
+        assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
+        assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
+    }
+
+    @Test(timeout=300_000)
+    fun `pre-2PF initiator to two phase finality flow peer`() {
+        val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
+
+        val stx = bobNode.startFlow(CashIssueFlow(Amount(1000L, GBP), OpaqueBytes.of(1), notary)).resultFuture.getOrThrow().stx
+        bobNode.startFlowAndRunNetwork(CashPaymentFlow(Amount(100, GBP), aliceNode.info.singleIdentity())).resultFuture.getOrThrow()
+
+        assertThat(aliceNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
+        assertThat(bobNode.services.validatedTransactions.getTransaction(stx.id)).isNotNull
+    }
+
+    @Test(timeout=300_000)
+    fun `two phase finality flow double spend transaction`() {
+        val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
+
+        val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
+        val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
+
+        val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
+        val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
+
+        try {
+            aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
+        }
+        catch (e: NotaryException) {
+            val stxId = (e.error as NotaryError.Conflict).txId
+            val (_, txnDsStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
+            assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnDsStatusAlice)
+            val (_, txnDsStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
+            assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnDsStatusBob)
+        }
+    }
+
+    @Test(timeout=300_000)
+    fun `two phase finality flow double spend transaction from pre-2PF initiator`() {
+        val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
+
+        val ref = bobNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
+        val stx = bobNode.startFlowAndRunNetwork(SpendFlow(ref, aliceNode.info.singleIdentity())).resultFuture.getOrThrow()
+
+        val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
+        val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
+
+        try {
+            bobNode.startFlowAndRunNetwork(SpendFlow(ref, aliceNode.info.singleIdentity())).resultFuture.getOrThrow()
+        }
+        catch (e: NotaryException) {
+            val stxId = (e.error as NotaryError.Conflict).txId
+            assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId))
+            assertNull(aliceNode.services.validatedTransactions.getTransactionInternal(stxId))
+        }
+    }
+
+    @Test(timeout=300_000)
+    fun `two phase finality flow double spend transaction to pre-2PF peer`() {
+        val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
+
+        val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
+        val stx = aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
+
+        val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
+        val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(stx.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusBob)
+
+        try {
+            aliceNode.startFlowAndRunNetwork(SpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
+        }
+        catch (e: NotaryException) {
+            val stxId = (e.error as NotaryError.Conflict).txId
+            val (_, txnDsStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(stxId) ?: fail()
+            assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnDsStatusAlice)
+            assertNull(bobNode.services.validatedTransactions.getTransactionInternal(stxId))
+        }
+    }
+
+    @Test(timeout=300_000)
+    fun `two phase finality flow speedy spender`() {
+        val bobNode = createBob(platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
+
+        val ref = aliceNode.startFlowAndRunNetwork(IssueFlow(notary)).resultFuture.getOrThrow()
+        val notarisedStxn1 = aliceNode.startFlowAndRunNetwork(SpeedySpendFlow(ref, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
+
+        val (_, txnStatusAlice) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusAlice)
+        val (_, txnStatusBob) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
+        assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob)
+
+        // now lets attempt a new spend with the new output of the previous transaction
+        val newStateRef = notarisedStxn1.coreTransaction.outRef<DummyContract.SingleOwnerState>(1)
+        val notarisedStxn2 = aliceNode.startFlowAndRunNetwork(SpeedySpendFlow(newStateRef, bobNode.info.singleIdentity())).resultFuture.getOrThrow()
+
+        // the original transaction is now finalised at Bob (despite the original flow not completing) because Bob resolved the
+        // original transaction from Alice in the second transaction (and Alice had already notarised and finalised the original transaction)
+        val (_, txnStatusBobAgain) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn1.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusBobAgain)
+
+        val (_, txnStatusAlice2) = aliceNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusAlice2)
+        val (_, txnStatusBob2) = bobNode.services.validatedTransactions.getTransactionInternal(notarisedStxn2.id) ?: fail()
+        assertEquals(TransactionStatus.MISSING_NOTARY_SIG, txnStatusBob2)
+
+        // Validate attempt at flow finalisation by Bob has no effect on outcome.
+        val finaliseStxn1 =  bobNode.startFlowAndRunNetwork(FinaliseSpeedySpendFlow(notarisedStxn1.id, notarisedStxn1.sigs)).resultFuture.getOrThrow()
+        val (_, txnStatusBobYetAgain) = bobNode.services.validatedTransactions.getTransactionInternal(finaliseStxn1.id) ?: fail()
+        assertEquals(TransactionStatus.VERIFIED, txnStatusBobYetAgain)
+    }
+
+    @StartableByRPC
+    class IssueFlow(val notary: Party) : FlowLogic<StateAndRef<DummyContract.SingleOwnerState>>() {
+
+        @Suspendable
+        override fun call(): StateAndRef<DummyContract.SingleOwnerState> {
+            val partyAndReference = PartyAndReference(ourIdentity, OpaqueBytes.of(1))
+            val txBuilder = DummyContract.generateInitial(Random().nextInt(), notary, partyAndReference)
+            val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
+            val notarised = subFlow(FinalityFlow(signedTransaction, emptySet<FlowSession>()))
+            return notarised.coreTransaction.outRef(0)
+        }
+    }
+
+    @StartableByRPC
+    @InitiatingFlow
+    class SpendFlow(private val stateAndRef: StateAndRef<DummyContract.SingleOwnerState>, private val newOwner: Party) : FlowLogic<SignedTransaction>() {
+
+        @Suspendable
+        override fun call(): SignedTransaction {
+            val txBuilder = DummyContract.move(stateAndRef, newOwner)
+            val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
+            val sessionWithCounterParty = initiateFlow(newOwner)
+            sessionWithCounterParty.sendAndReceive<String>("initial-message")
+            return subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty)))
+        }
+    }
+
+    @InitiatedBy(SpendFlow::class)
+    class AcceptSpendFlow(private val otherSide: FlowSession) : FlowLogic<Unit>() {
+
+        @Suspendable
+        override fun call() {
+            otherSide.receive<String>()
+            otherSide.send("initial-response")
+
+            subFlow(ReceiveFinalityFlow(otherSide))
+        }
+    }
+
+    /**
+     * This flow allows an Initiator to race ahead of a Receiver when using Two Phase Finality.
+     * The initiator transaction will be finalised, so output states can be used in a follow-up transaction.
+     * The receiver transaction will not be finalised, causing ledger inconsistency.
+     */
+    @StartableByRPC
+    @InitiatingFlow
+    class SpeedySpendFlow(private val stateAndRef: StateAndRef<DummyContract.SingleOwnerState>, private val newOwner: Party) : FlowLogic<SignedTransaction>() {
+
+        @Suspendable
+        override fun call(): SignedTransaction {
+            val newState = StateAndContract(DummyContract.SingleOwnerState(99999, ourIdentity), DummyContract.PROGRAM_ID)
+            val txBuilder = DummyContract.move(stateAndRef, newOwner).withItems(newState)
+            val signedTransaction = serviceHub.signInitialTransaction(txBuilder, ourIdentity.owningKey)
+            val sessionWithCounterParty = initiateFlow(newOwner)
+            try {
+                subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty)))
+            }
+            catch (e: FinalisationFailedException) {
+                // expected (transaction has been notarised by Initiator)
+                return e.notarisedTxn
+            }
+            return signedTransaction
+        }
+    }
+
+    @InitiatedBy(SpeedySpendFlow::class)
+    class AcceptSpeedySpendFlow(private val otherSideSession: FlowSession) : FlowLogic<SignedTransaction>() {
+
+        @Suspendable
+        override fun call(): SignedTransaction {
+            // Mimic ReceiveFinalityFlow but fail to finalise
+            try {
+                val stx = subFlow(ReceiveTransactionFlow(otherSideSession,
+                        checkSufficientSignatures = false, statesToRecord = StatesToRecord.ONLY_RELEVANT, deferredAck = true))
+                require(NotarySigCheck.needsNotarySignature(stx))
+                logger.info("Peer recording transaction without notary signature.")
+                (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx,
+                        FlowTransactionMetadata(otherSideSession.counterparty.name, StatesToRecord.ONLY_RELEVANT))
+                otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (overrideAutoAck)
+                logger.info("Peer recorded transaction without notary signature.")
+
+                val notarySignatures = otherSideSession.receive<List<TransactionSignature>>()
+                        .unwrap { it }
+                logger.info("Peer received notarised signature.")
+                (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx + notarySignatures, notarySignatures, StatesToRecord.ONLY_RELEVANT)
+                throw FinalisationFailedException(stx + notarySignatures)
+            }
+            catch (e: SQLException) {
+                logger.error("Peer failure upon recording or finalising transaction: $e")
+                otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (overrideAutoAck)
+                throw UnexpectedFlowEndException("Peer failure upon recording or finalising transaction.", e.cause)
+            }
+            catch (uae: TransactionVerificationException.UntrustedAttachmentsException) {
+                logger.error("Peer failure upon receiving transaction: $uae")
+                otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (overrideAutoAck)
+                throw uae
+            }
+        }
+    }
+
+    class FinaliseSpeedySpendFlow(val id: SecureHash, val sigs: List<TransactionSignature>) : FlowLogic<SignedTransaction>() {
+
+        @Suspendable
+        override fun call(): SignedTransaction {
+            // Mimic ReceiveFinalityFlow finalisation
+            val stx = serviceHub.validatedTransactions.getTransaction(id) ?: throw FlowException("Missing transaction: $id")
+            (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx + sigs, sigs, StatesToRecord.ONLY_RELEVANT)
+            logger.info("Peer finalised transaction with notary signature.")
+
+            return stx + sigs
+        }
+    }
+
+    class FinalisationFailedException(val notarisedTxn: SignedTransaction) : FlowException("Failed to finalise transaction with notary signature.")
+
+    private fun createBob(cordapps: List<TestCordappInternal> = emptyList(), platformVersion: Int = PLATFORM_VERSION): TestStartedNode {
+        return mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, additionalCordapps = cordapps,
+            version = MOCK_VERSION_INFO.copy(platformVersion = platformVersion)))
     }
 
     private fun TestStartedNode.issuesCashTo(recipient: TestStartedNode): SignedTransaction {
diff --git a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt
index 7d5a1505c1..c44654caad 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FinalityFlow.kt
@@ -3,9 +3,14 @@ package net.corda.core.flows
 import co.paralleluniverse.fibers.Suspendable
 import net.corda.core.CordaInternal
 import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.TransactionSignature
 import net.corda.core.crypto.isFulfilledBy
+import net.corda.core.flows.NotarySigCheck.needsNotarySignature
 import net.corda.core.identity.Party
 import net.corda.core.identity.groupAbstractPartyByWellKnownParty
+import net.corda.core.internal.FetchDataFlow
+import net.corda.core.internal.PlatformVersionSwitches
+import net.corda.core.internal.ServiceHubCoreInternal
 import net.corda.core.internal.pushToLoggingContext
 import net.corda.core.internal.telemetry.telemetryServiceInternal
 import net.corda.core.internal.warnOnce
@@ -15,6 +20,7 @@ import net.corda.core.transactions.LedgerTransaction
 import net.corda.core.transactions.SignedTransaction
 import net.corda.core.utilities.ProgressTracker
 import net.corda.core.utilities.debug
+import net.corda.core.utilities.unwrap
 
 /**
  * Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction
@@ -133,10 +139,18 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
             override fun childProgressTracker() = NotaryFlow.Client.tracker()
         }
 
-        object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants")
+        @Suppress("ClassNaming")
+        object RECORD_UNNOTARISED : ProgressTracker.Step("Recording un-notarised transaction locally")
+        @Suppress("ClassNaming")
+        object BROADCASTING_PRE_NOTARISATION : ProgressTracker.Step("Broadcasting un-notarised transaction")
+        @Suppress("ClassNaming")
+        object BROADCASTING_POST_NOTARISATION : ProgressTracker.Step("Broadcasting notary signature")
+        @Suppress("ClassNaming")
+        object FINALISING_TRANSACTION : ProgressTracker.Step("Finalising transaction locally")
+        object BROADCASTING : ProgressTracker.Step("Broadcasting notarised transaction to other participants")
 
         @JvmStatic
-        fun tracker() = ProgressTracker(NOTARISING, BROADCASTING)
+        fun tracker() = ProgressTracker(RECORD_UNNOTARISED, BROADCASTING_PRE_NOTARISATION, NOTARISING, BROADCASTING_POST_NOTARISATION, FINALISING_TRANSACTION, BROADCASTING)
     }
 
     @Suspendable
@@ -155,7 +169,6 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
         // the point where subFlow is invoked, as that minimizes the checkpointing work to be done.
         //
         // Lookup the resolved transactions and use them to map each signed transaction to the list of participants.
-        // Then send to the notary if needed, record locally and distribute.
 
         transaction.pushToLoggingContext()
         logCommandData()
@@ -173,32 +186,121 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
             }
         }
 
-        val notarised = notariseAndRecord()
+        // Recoverability
+        // As of platform version 13 we introduce a 2-phase finality protocol whereby
+        // - record un-notarised transaction locally and broadcast to external participants to record
+        // - notarise transaction
+        // - broadcast notary signature to external participants (finalise remotely)
+        // - finalise locally
 
-        progressTracker.currentStep = BROADCASTING
+        val (oldPlatformSessions, newPlatformSessions) = sessions.partition {
+            serviceHub.networkMapCache.getNodeByLegalName(it.counterparty.name)?.platformVersion!! < PlatformVersionSwitches.TWO_PHASE_FINALITY
+        }
 
-        if (newApi) {
-            oldV3Broadcast(notarised, oldParticipants.toSet())
-            for (session in sessions) {
+        val useTwoPhaseFinality = serviceHub.myInfo.platformVersion >= PlatformVersionSwitches.TWO_PHASE_FINALITY
+        if (useTwoPhaseFinality && needsNotarySignature(transaction)) {
+            recordLocallyAndBroadcast(newPlatformSessions, transaction)
+        }
+
+        val stxn = notariseOrRecord()
+        val notarySignatures = stxn.sigs - transaction.sigs.toSet()
+        if (notarySignatures.isNotEmpty()) {
+            if (useTwoPhaseFinality && newPlatformSessions.isNotEmpty()) {
+                broadcastSignaturesAndFinalise(newPlatformSessions, notarySignatures)
+            }
+            else {
+                progressTracker.currentStep = FINALISING_TRANSACTION
+                serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
+                    (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord)
+                    logger.info("Finalised transaction locally.")
+                }
+            }
+        }
+
+        if (!useTwoPhaseFinality || !needsNotarySignature(transaction)) {
+            broadcastToOtherParticipants(externalTxParticipants, newPlatformSessions + oldPlatformSessions, stxn)
+        } else if (useTwoPhaseFinality && oldPlatformSessions.isNotEmpty()) {
+            broadcastToOtherParticipants(externalTxParticipants, oldPlatformSessions, stxn)
+        }
+
+        return stxn
+    }
+
+    @Suspendable
+    private fun recordLocallyAndBroadcast(sessions: Collection<FlowSession>, tx: SignedTransaction) {
+        serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordLocallyAndBroadcast", flowLogic = this) {
+            recordUnnotarisedTransaction(tx)
+            logger.info("Recorded transaction without notary signature locally.")
+            progressTracker.currentStep = BROADCASTING_PRE_NOTARISATION
+            sessions.forEach { session ->
                 try {
-                    subFlow(SendTransactionFlow(session, notarised))
-                    logger.info("Party ${session.counterparty} received the transaction.")
+                    logger.debug { "Sending transaction to party $session." }
+                    subFlow(SendTransactionFlow(session, tx))
                 } catch (e: UnexpectedFlowEndException) {
                     throw UnexpectedFlowEndException(
-                            "${session.counterparty} has finished prematurely and we're trying to send them the finalised transaction. " +
+                            "${session.counterparty} has finished prematurely and we're trying to send them a transaction without notary signature. " +
                                     "Did they forget to call ReceiveFinalityFlow? (${e.message})",
                             e.cause,
                             e.originalErrorId
                     )
                 }
             }
-        } else {
-            oldV3Broadcast(notarised, (externalTxParticipants + oldParticipants).toSet())
         }
+    }
 
-        logger.info("All parties received the transaction successfully.")
+    @Suspendable
+    private fun broadcastSignaturesAndFinalise(sessions: Collection<FlowSession>, notarySignatures: List<TransactionSignature>) {
+        progressTracker.currentStep = BROADCASTING_POST_NOTARISATION
+        serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcastSignaturesAndFinalise", flowLogic = this) {
+            logger.info("Transaction notarised and broadcasting notary signature.")
+            sessions.forEach { session ->
+                try {
+                    logger.debug { "Sending notary signature to party $session." }
+                    session.send(notarySignatures)
+                    // remote will finalise txn with notary signature
+                } catch (e: UnexpectedFlowEndException) {
+                    throw UnexpectedFlowEndException(
+                            "${session.counterparty} has finished prematurely and we're trying to send them the notary signature. " +
+                                    "Did they forget to call ReceiveFinalityFlow? (${e.message})",
+                            e.cause,
+                            e.originalErrorId
+                    )
+                }
+            }
+            progressTracker.currentStep = FINALISING_TRANSACTION
+            serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
+                (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(transaction, notarySignatures, statesToRecord)
+                logger.info("Finalised transaction locally with notary signature.")
+            }
+        }
+    }
 
-        return notarised
+    @Suspendable
+    private fun broadcastToOtherParticipants(externalTxParticipants: Set<Party>, sessions: Collection<FlowSession>, tx: SignedTransaction) {
+        if (externalTxParticipants.isEmpty() && sessions.isEmpty() && oldParticipants.isEmpty()) return
+        progressTracker.currentStep = BROADCASTING
+        serviceHub.telemetryServiceInternal.span("${this::class.java.name}#broadcastToOtherParticipants", flowLogic = this) {
+            logger.info("Broadcasting complete transaction to other participants.")
+            if (newApi) {
+                oldV3Broadcast(tx, oldParticipants.toSet())
+                for (session in sessions) {
+                    try {
+                        logger.debug { "Sending transaction to party $session." }
+                        subFlow(SendTransactionFlow(session, tx))
+                    } catch (e: UnexpectedFlowEndException) {
+                        throw UnexpectedFlowEndException(
+                                "${session.counterparty} has finished prematurely and we're trying to send them the finalised transaction. " +
+                                        "Did they forget to call ReceiveFinalityFlow? (${e.message})",
+                                e.cause,
+                                e.originalErrorId
+                        )
+                    }
+                }
+            } else {
+                oldV3Broadcast(tx, (externalTxParticipants + oldParticipants).toSet())
+            }
+            logger.info("Broadcasted complete transaction to other participants.")
+        }
     }
 
     @Suspendable
@@ -221,22 +323,39 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
     }
 
     @Suspendable
-    private fun notariseAndRecord(): SignedTransaction {
-        serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord", flowLogic = this) {
-            val notarised = if (needsNotarySignature(transaction)) {
+    private fun recordTransactionLocally(tx: SignedTransaction): SignedTransaction {
+        serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactionLocally", flowLogic = this) {
+            serviceHub.recordTransactions(statesToRecord, listOf(tx))
+            logger.info("Recorded transaction locally.")
+            return tx
+        }
+    }
+
+    @Suspendable
+    private fun recordUnnotarisedTransaction(tx: SignedTransaction): SignedTransaction {
+        progressTracker.currentStep = RECORD_UNNOTARISED
+        serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
+            (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(tx,
+                    FlowTransactionMetadata(
+                            serviceHub.myInfo.legalIdentities.first().name,
+                            statesToRecord,
+                            sessions.map { it.counterparty.name }.toSet()))
+            return tx
+        }
+    }
+
+    @Suspendable
+    private fun notariseOrRecord(): SignedTransaction {
+        serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseOrRecord", flowLogic = this) {
+            return if (needsNotarySignature(transaction)) {
                 progressTracker.currentStep = NOTARISING
                 val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true))
                 transaction + notarySignatures
             } else {
-                logger.info("No need to notarise this transaction.")
+                logger.info("No need to notarise this transaction. Recording locally.")
+                recordTransactionLocally(transaction)
                 transaction
             }
-            serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord:recordTransactions", flowLogic = this) {
-                logger.info("Recording transaction locally.")
-                serviceHub.recordTransactions(statesToRecord, listOf(notarised))
-                logger.info("Recorded transaction locally successfully.")
-            }
-            return notarised
         }
     }
 
@@ -268,6 +387,20 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
     }
 }
 
+object NotarySigCheck {
+    fun needsNotarySignature(stx: SignedTransaction): Boolean {
+        val wtx = stx.tx
+        val needsNotarisation = wtx.inputs.isNotEmpty() || wtx.references.isNotEmpty() || wtx.timeWindow != null
+        return needsNotarisation && hasNoNotarySignature(stx)
+    }
+
+    private fun hasNoNotarySignature(stx: SignedTransaction): Boolean {
+        val notaryKey = stx.tx.notary?.owningKey
+        val signers = stx.sigs.asSequence().map { it.by }.toSet()
+        return notaryKey?.isFulfilledBy(signers) != true
+    }
+}
+
 /**
  * The receiving counterpart to [FinalityFlow].
  *
@@ -285,15 +418,34 @@ class FinalityFlow private constructor(val transaction: SignedTransaction,
 class ReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession: FlowSession,
                                                     private val expectedTxId: SecureHash? = null,
                                                     private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic<SignedTransaction>() {
+    @Suppress("ComplexMethod")
     @Suspendable
     override fun call(): SignedTransaction {
-        return subFlow(object : ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = true, statesToRecord = statesToRecord) {
-            override fun checkBeforeRecording(stx: SignedTransaction) {
-                require(expectedTxId == null || expectedTxId == stx.id) {
-                    "We expected to receive transaction with ID $expectedTxId but instead got ${stx.id}. Transaction was" +
-                            "not recorded and nor its states sent to the vault."
-                }
+        val stx = subFlow(ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = false, statesToRecord = statesToRecord, deferredAck = true))
+
+        val fromTwoPhaseFinalityNode = serviceHub.networkMapCache.getNodeByLegalName(otherSideSession.counterparty.name)?.platformVersion!! >= PlatformVersionSwitches.TWO_PHASE_FINALITY
+        if (fromTwoPhaseFinalityNode && needsNotarySignature(stx)) {
+            serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordUnnotarisedTransaction", flowLogic = this) {
+                logger.debug { "Peer recording transaction without notary signature." }
+                (serviceHub as ServiceHubCoreInternal).recordUnnotarisedTransaction(stx,
+                        FlowTransactionMetadata(otherSideSession.counterparty.name, statesToRecord))
             }
-        })
+            otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
+            logger.info("Peer recorded transaction without notary signature. Waiting to receive notary signature.")
+            val notarySignatures = otherSideSession.receive<List<TransactionSignature>>()
+                    .unwrap { it }
+            serviceHub.telemetryServiceInternal.span("${this::class.java.name}#finalizeTransactionWithExtraSignatures", flowLogic = this) {
+                logger.debug { "Peer received notarised signature." }
+                (serviceHub as ServiceHubCoreInternal).finalizeTransactionWithExtraSignatures(stx, notarySignatures, statesToRecord)
+                logger.info("Peer finalised transaction with notary signature.")
+            }
+        } else {
+            serviceHub.telemetryServiceInternal.span("${this::class.java.name}#recordTransactions", flowLogic = this) {
+                serviceHub.recordTransactions(statesToRecord, setOf(stx))
+            }
+            otherSideSession.send(FetchDataFlow.Request.End) // Finish fetching data (deferredAck)
+            logger.info("Peer successfully recorded received transaction.")
+        }
+        return stx
     }
 }
diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt b/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt
new file mode 100644
index 0000000000..49ecae6151
--- /dev/null
+++ b/core/src/main/kotlin/net/corda/core/flows/FlowTransaction.kt
@@ -0,0 +1,37 @@
+package net.corda.core.flows
+
+import net.corda.core.identity.CordaX500Name
+import net.corda.core.node.StatesToRecord
+import net.corda.core.serialization.CordaSerializable
+import java.time.Instant
+
+/**
+ * Flow data object representing key information required for recovery.
+ */
+
+@CordaSerializable
+data class FlowTransaction(
+        val stateMachineRunId: StateMachineRunId,
+        val txId: String,
+        val status: TransactionStatus,
+        val signatures: ByteArray?,
+        val timestamp: Instant,
+        val metadata: FlowTransactionMetadata?) {
+
+    fun isInitiator(myCordaX500Name: CordaX500Name) =
+        this.metadata?.initiator == myCordaX500Name
+}
+
+@CordaSerializable
+data class FlowTransactionMetadata(
+        val initiator: CordaX500Name,
+        val statesToRecord: StatesToRecord? = StatesToRecord.ONLY_RELEVANT,
+        val peers: Set<CordaX500Name>? = null
+)
+
+@CordaSerializable
+enum class TransactionStatus {
+    UNVERIFIED,
+    VERIFIED,
+    MISSING_NOTARY_SIG;
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt
index 413f01db3f..4f5d04b6d9 100644
--- a/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/ReceiveTransactionFlow.kt
@@ -27,10 +27,20 @@ import java.security.SignatureException
  * @property otherSideSession session to the other side which is calling [SendTransactionFlow].
  * @property checkSufficientSignatures if true checks all required signatures are present. See [SignedTransaction.verify].
  * @property statesToRecord which transaction states should be recorded in the vault, if any.
+ * @property deferredAck if set then the caller of this flow is responsible for explicitly sending a FetchDataFlow.Request.End
+ *           acknowledgement to indicate transaction resolution is complete. See usage within [FinalityFlow].
+ *           Not recommended for 3rd party use.
  */
-open class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSideSession: FlowSession,
-                                                            private val checkSufficientSignatures: Boolean = true,
-                                                            private val statesToRecord: StatesToRecord = StatesToRecord.NONE) : FlowLogic<SignedTransaction>() {
+open class ReceiveTransactionFlow constructor(private val otherSideSession: FlowSession,
+                                              private val checkSufficientSignatures: Boolean = true,
+                                              private val statesToRecord: StatesToRecord = StatesToRecord.NONE,
+                                              private val deferredAck: Boolean = false) : FlowLogic<SignedTransaction>() {
+    @JvmOverloads constructor(
+            otherSideSession: FlowSession,
+            checkSufficientSignatures: Boolean = true,
+            statesToRecord: StatesToRecord = StatesToRecord.NONE
+    ) : this(otherSideSession, checkSufficientSignatures, statesToRecord, false)
+
     @Suppress("KDocMissingDocumentation")
     @Suspendable
     @Throws(SignatureException::class,
@@ -47,7 +57,7 @@ open class ReceiveTransactionFlow @JvmOverloads constructor(private val otherSid
             it.pushToLoggingContext()
             logger.info("Received transaction acknowledgement request from party ${otherSideSession.counterparty}.")
             checkParameterHash(it.networkParametersHash)
-            subFlow(ResolveTransactionsFlow(it, otherSideSession, statesToRecord))
+            subFlow(ResolveTransactionsFlow(it, otherSideSession, statesToRecord, deferredAck))
             logger.info("Transaction dependencies resolution completed.")
             try {
                 it.verify(serviceHub, checkSufficientSignatures)
diff --git a/core/src/main/kotlin/net/corda/core/internal/PlatformVersionSwitches.kt b/core/src/main/kotlin/net/corda/core/internal/PlatformVersionSwitches.kt
index c6d93f272f..f40ea96c6c 100644
--- a/core/src/main/kotlin/net/corda/core/internal/PlatformVersionSwitches.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/PlatformVersionSwitches.kt
@@ -18,4 +18,5 @@ object PlatformVersionSwitches {
     const val ENABLE_P2P_COMPRESSION = 7
     const val RESTRICTED_DATABASE_OPERATIONS = 7
     const val CERTIFICATE_ROTATION = 9
+    const val TWO_PHASE_FINALITY = 13
 }
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt
index 66b525692f..cf3359f2e7 100644
--- a/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/ResolveTransactionsFlow.kt
@@ -21,7 +21,8 @@ class ResolveTransactionsFlow private constructor(
         val initialTx: SignedTransaction?,
         val txHashes: Set<SecureHash>,
         val otherSide: FlowSession,
-        val statesToRecord: StatesToRecord
+        val statesToRecord: StatesToRecord,
+        val deferredAck: Boolean = false
 ) : FlowLogic<Unit>() {
 
     constructor(txHashes: Set<SecureHash>, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
@@ -36,6 +37,9 @@ class ResolveTransactionsFlow private constructor(
     constructor(transaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE)
             : this(transaction, transaction.dependencies, otherSide, statesToRecord)
 
+    constructor(transaction: SignedTransaction, otherSide: FlowSession, statesToRecord: StatesToRecord = StatesToRecord.NONE, deferredAck: Boolean = false)
+            : this(transaction, transaction.dependencies, otherSide, statesToRecord, deferredAck)
+
     private var fetchNetParamsFromCounterpart = false
 
     @Suppress("MagicNumber")
@@ -60,8 +64,10 @@ class ResolveTransactionsFlow private constructor(
         val resolver = (serviceHub as ServiceHubCoreInternal).createTransactionsResolver(this)
         resolver.downloadDependencies(batchMode)
 
-        logger.trace { "ResolveTransactionsFlow: Sending END." }
-        otherSide.send(FetchDataFlow.Request.End) // Finish fetching data.
+        if (!deferredAck) {
+            logger.trace { "ResolveTransactionsFlow: Sending END." }
+            otherSide.send(FetchDataFlow.Request.End) // Finish fetching data.
+        }
 
         // If transaction resolution is performed for a transaction where some states are relevant, then those should be
         // recorded if this has not already occurred.
diff --git a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt
index 4b7d856699..af7ce40179 100644
--- a/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/ServiceHubCoreInternal.kt
@@ -2,10 +2,13 @@ package net.corda.core.internal
 
 import co.paralleluniverse.fibers.Suspendable
 import net.corda.core.DeleteForDJVM
+import net.corda.core.crypto.TransactionSignature
+import net.corda.core.flows.FlowTransactionMetadata
 import net.corda.core.internal.notary.NotaryService
 import net.corda.core.node.ServiceHub
 import net.corda.core.node.StatesToRecord
 import net.corda.core.serialization.internal.AttachmentsClassLoaderCache
+import net.corda.core.transactions.SignedTransaction
 import java.util.concurrent.ExecutorService
 
 // TODO: This should really be called ServiceHubInternal but that name is already taken by net.corda.node.services.api.ServiceHubInternal.
@@ -24,6 +27,24 @@ interface ServiceHubCoreInternal : ServiceHub {
     fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver
 
     val attachmentsClassLoaderCache: AttachmentsClassLoaderCache
+
+    /**
+     * Stores [SignedTransaction] and participant signatures without the notary signature in the local transaction storage.
+     * Optionally add finality flow recovery metadata.
+     * This is expected to be run within a database transaction.
+     *
+     * @param txn The transaction to record.
+     */
+    fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?= null)
+
+    /**
+     * Stores [SignedTransaction] with extra signatures in the local transaction storage
+     *
+     * @param sigs The signatures to add to the transaction.
+     * @param txn The transactions to record.
+     * @param statesToRecord how the vault should treat the output states of the transaction.
+     */
+    fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord)
 }
 
 interface TransactionsResolver {
diff --git a/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt b/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt
index add9ecb651..7f83f94cb6 100644
--- a/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/flows/FlowReloadAfterCheckpointTest.kt
@@ -336,8 +336,8 @@ class FlowReloadAfterCheckpointTest {
                 .toSet()
                 .single()
             reloads.await(DEFAULT_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)
-            assertEquals(7, reloads.filter { it == flowStartedByAlice }.size)
-            assertEquals(6, reloads.filter { it == flowStartedByBob }.size)
+            assertEquals(8, reloads.filter { it == flowStartedByAlice }.size)
+            assertEquals(7, reloads.filter { it == flowStartedByBob }.size)
         }
     }
 
diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt
index 139bb89505..5c17bb7bac 100644
--- a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/FlowHospitalTest.kt
@@ -19,7 +19,10 @@ import net.corda.core.flows.ReceiveFinalityFlow
 import net.corda.core.flows.SignTransactionFlow
 import net.corda.core.flows.StartableByRPC
 import net.corda.core.flows.UnexpectedFlowEndException
+import net.corda.core.identity.CordaX500Name
 import net.corda.core.identity.Party
+import net.corda.core.internal.PLATFORM_VERSION
+import net.corda.core.internal.PlatformVersionSwitches
 import net.corda.core.internal.concurrent.transpose
 import net.corda.core.messaging.StateMachineUpdate
 import net.corda.core.messaging.startFlow
@@ -40,14 +43,22 @@ import net.corda.testing.core.singleIdentity
 import net.corda.testing.driver.DriverParameters
 import net.corda.testing.driver.driver
 import net.corda.testing.node.User
+import net.corda.testing.node.internal.CustomCordapp
+import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
+import net.corda.testing.node.internal.InternalMockNetwork
+import net.corda.testing.node.internal.InternalMockNodeParameters
+import net.corda.testing.node.internal.MOCK_VERSION_INFO
+import net.corda.testing.node.internal.TestCordappInternal
+import net.corda.testing.node.internal.TestStartedNode
 import net.corda.testing.node.internal.enclosedCordapp
 import net.corda.testing.node.internal.findCordapp
+import net.corda.testing.node.testContext
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assertions.assertThatThrownBy
 import org.junit.Before
 import org.junit.Test
 import java.sql.SQLException
-import java.util.*
+import java.util.Random
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.TimeoutException
@@ -59,6 +70,11 @@ class FlowHospitalTest {
 
     private val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
 
+    companion object {
+        private val mockNet = InternalMockNetwork(cordappsForAllNodes = setOf(DUMMY_CONTRACTS_CORDAPP, enclosedCordapp(),
+                CustomCordapp(targetPlatformVersion = 3, classes = setOf(FinalityFlow::class.java))))
+    }
+
     @Before
     fun before() {
         SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow = false
@@ -238,6 +254,78 @@ class FlowHospitalTest {
         assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow)
     }
 
+    @Test(timeout=300_000)
+    fun `catching a notary error - two phase finality flow initiator to pre-2PF peer`() {
+        var dischargedCounter = 0
+        StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
+            ++dischargedCounter
+        }
+        val aliceNode = createNode(ALICE_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
+        val bobNode = createNode(BOB_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
+
+        val handle = aliceNode.services.startFlow(CreateTransactionFlow(bobNode.info.singleIdentity()), testContext())
+        mockNet.runNetwork()
+        val ref = handle.getOrThrow().resultFuture.get()
+        aliceNode.services.startFlow(SpendStateAndCatchDoubleSpendFlow(bobNode.info.singleIdentity(), ref), testContext()).getOrThrow()
+        aliceNode.services.startFlow(SpendStateAndCatchDoubleSpendFlow(bobNode.info.singleIdentity(), ref), testContext()).getOrThrow()
+        mockNet.runNetwork()
+
+        // 1 is the notary failing to notarise and propagating the error
+        // 2 is the receiving flow failing due to the unexpected session end error
+        assertEquals(2, dischargedCounter)
+        assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow)
+    }
+
+    @Test(timeout=300_000)
+    fun `catching a notary error - pre-2PF initiator to two phase finality flow peer`() {
+        var dischargedCounter = 0
+        StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
+            ++dischargedCounter
+        }
+        val aliceNode = createNode(ALICE_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY - 1)
+        val bobNode = createNode(BOB_NAME, platformVersion = PlatformVersionSwitches.TWO_PHASE_FINALITY)
+
+        val handle = aliceNode.services.startFlow(CreateTransactionFlow(bobNode.info.singleIdentity()), testContext())
+        mockNet.runNetwork()
+        val ref = handle.getOrThrow().resultFuture.get()
+        aliceNode.services.startFlow(SpendStateAndCatchDoubleSpendFlow(bobNode.info.singleIdentity(), ref), testContext()).getOrThrow()
+        aliceNode.services.startFlow(SpendStateAndCatchDoubleSpendFlow(bobNode.info.singleIdentity(), ref), testContext()).getOrThrow()
+        mockNet.runNetwork()
+
+        // 1 is the notary failing to notarise and propagating the error
+        // 2 is the receiving flow failing due to the unexpected session end error
+        assertEquals(2, dischargedCounter)
+        assertTrue(SpendStateAndCatchDoubleSpendResponderFlow.exceptionSeenInUserFlow)
+    }
+
+    private fun createNode(legalName: CordaX500Name, cordapps: List<TestCordappInternal> = emptyList(), platformVersion: Int = PLATFORM_VERSION): TestStartedNode {
+        return mockNet.createNode(InternalMockNodeParameters(legalName = legalName, additionalCordapps = cordapps,
+                version = MOCK_VERSION_INFO.copy(platformVersion = platformVersion)))
+    }
+
+    @Test(timeout = 300_000)
+    fun `old finality flow catching a notary error will cause a peer to fail with unexpected session end during ReceiveFinalityFlow that passes through user code`() {
+        var dischargedCounter = 0
+        StaffedFlowHospital.onFlowErrorPropagated.add { _, _ ->
+            ++dischargedCounter
+        }
+        val user = User("mark", "dadada", setOf(Permissions.all()))
+        driver(DriverParameters(isDebug = false, startNodesInProcess = true)) {
+
+            val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow()
+            val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow()
+            nodeAHandle.rpc.let {
+                val ref = it.startFlow(::CreateTransactionFlow, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
+                it.startFlow(::SpendStateAndCatchDoubleSpendOldFinalityFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
+                it.startFlow(::SpendStateAndCatchDoubleSpendOldFinalityFlow, nodeBHandle.nodeInfo.singleIdentity(), ref).returnValue.getOrThrow(20.seconds)
+            }
+        }
+        // 1 is the notary failing to notarise and propagating the error
+        // 2 is the receiving flow failing due to the unexpected session end error
+        assertEquals(2, dischargedCounter)
+        assertTrue(SpendStateAndCatchDoubleSpendResponderOldFinalityFlow.exceptionSeenInUserFlow)
+    }
+
     @Test(timeout = 300_000)
     fun `unexpected session end errors outside of ReceiveFinalityFlow are not handled`() {
         var dischargedCounter = 0
@@ -483,6 +571,62 @@ class FlowHospitalTest {
         }
     }
 
+    @InitiatingFlow
+    @StartableByRPC
+    class SpendStateAndCatchDoubleSpendOldFinalityFlow(
+            private val peer: Party,
+            private val ref: StateAndRef<DummyState>,
+            private val consumePeerError: Boolean
+    ) : FlowLogic<StateAndRef<DummyState>>() {
+
+        constructor(peer: Party, ref: StateAndRef<DummyState>): this(peer, ref, false)
+
+        @Suspendable
+        override fun call(): StateAndRef<DummyState> {
+            val tx = TransactionBuilder(serviceHub.networkMapCache.notaryIdentities.first()).apply {
+                addInputState(ref)
+                addOutputState(DummyState(participants = listOf(ourIdentity)))
+                addCommand(DummyContract.Commands.Move(), listOf(ourIdentity.owningKey, peer.owningKey))
+            }
+            val stx = serviceHub.signInitialTransaction(tx)
+            val session = initiateFlow(peer)
+            session.send(consumePeerError)
+            val ftx = subFlow(CollectSignaturesFlow(stx, listOf(session)))
+            try {
+                subFlow(OldFinalityFlow(ftx, session))
+            } catch(e: NotaryException) {
+                logger.info("Caught notary exception")
+            }
+            return ftx.coreTransaction.outRef(0)
+        }
+    }
+
+    @InitiatedBy(SpendStateAndCatchDoubleSpendOldFinalityFlow::class)
+    class SpendStateAndCatchDoubleSpendResponderOldFinalityFlow(private val session: FlowSession) : FlowLogic<Unit>() {
+
+        companion object {
+            var exceptionSeenInUserFlow = false
+        }
+
+        @Suspendable
+        override fun call() {
+            val consumeError = session.receive<Boolean>().unwrap { it }
+            val stx = subFlow(object : SignTransactionFlow(session) {
+                override fun checkTransaction(stx: SignedTransaction) {
+
+                }
+            })
+            try {
+                subFlow(OldReceiveFinalityFlow(session, stx.id))
+            } catch (e: UnexpectedFlowEndException) {
+                exceptionSeenInUserFlow = true
+                if (!consumeError) {
+                    throw e
+                }
+            }
+        }
+    }
+
     @InitiatingFlow
     @StartableByRPC
     class CreateTransactionButDontFinalizeFlow(private val peer: Party, private val ref: StateAndRef<DummyState>) : FlowLogic<Unit>() {
diff --git a/node/src/integration-test/kotlin/net/corda/node/services/statemachine/OldFinalityFlow.kt b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/OldFinalityFlow.kt
new file mode 100644
index 0000000000..e352ac02e9
--- /dev/null
+++ b/node/src/integration-test/kotlin/net/corda/node/services/statemachine/OldFinalityFlow.kt
@@ -0,0 +1,306 @@
+package net.corda.node.services.statemachine
+
+import co.paralleluniverse.fibers.Suspendable
+import net.corda.core.CordaInternal
+import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.isFulfilledBy
+import net.corda.core.flows.FlowLogic
+import net.corda.core.flows.FlowSession
+import net.corda.core.flows.InitiatingFlow
+import net.corda.core.flows.NotaryException
+import net.corda.core.flows.NotaryFlow
+import net.corda.core.flows.ReceiveTransactionFlow
+import net.corda.core.flows.SendTransactionFlow
+import net.corda.core.flows.UnexpectedFlowEndException
+import net.corda.core.identity.Party
+import net.corda.core.identity.groupAbstractPartyByWellKnownParty
+import net.corda.core.internal.telemetry.telemetryServiceInternal
+import net.corda.core.internal.warnOnce
+import net.corda.core.node.StatesToRecord
+import net.corda.core.node.StatesToRecord.ONLY_RELEVANT
+import net.corda.core.transactions.LedgerTransaction
+import net.corda.core.transactions.SignedTransaction
+import net.corda.core.utilities.ProgressTracker
+import net.corda.core.utilities.debug
+
+/**
+ * Verifies the given transaction, then sends it to the named notary. If the notary agrees that the transaction
+ * is acceptable then it is from that point onwards committed to the ledger, and will be written through to the
+ * vault. Additionally it will be distributed to the parties reflected in the participants list of the states.
+ *
+ * By default, the initiating flow will commit states that are relevant to the initiating party as indicated by
+ * [StatesToRecord.ONLY_RELEVANT]. Relevance is determined by the union of all participants to states which have been
+ * included in the transaction. This default behaviour may be modified by passing in an alternate value for [StatesToRecord].
+ *
+ * The transaction is expected to have already been resolved: if its dependencies are not available in local
+ * storage, verification will fail. It must have signatures from all necessary parties other than the notary.
+ *
+ * A list of [FlowSession]s is required for each non-local participant of the transaction. These participants will receive
+ * the final notarised transaction by calling [ReceiveFinalityFlow] in their counterpart flows. Sessions with non-participants
+ * can also be included, but they must specify [StatesToRecord.ALL_VISIBLE] for statesToRecord if they wish to record the
+ * contract states into their vaults.
+ *
+ * The flow returns the same transaction but with the additional signatures from the notary.
+ *
+ * NOTE: This is an inlined flow but for backwards compatibility is annotated with [InitiatingFlow].
+ */
+// To maintain backwards compatibility with the old API, FinalityFlow can act both as an initiating flow and as an inlined flow.
+// This is only possible because a flow is only truly initiating when the first call to initiateFlow is made (where the
+// presence of @InitiatingFlow is checked). So the new API is inlined simply because that code path doesn't call initiateFlow.
+@InitiatingFlow
+class OldFinalityFlow private constructor(val transaction: SignedTransaction,
+                                          private val oldParticipants: Collection<Party>,
+                                          override val progressTracker: ProgressTracker,
+                                          private val sessions: Collection<FlowSession>,
+                                          private val newApi: Boolean,
+                                          private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic<SignedTransaction>() {
+
+    @CordaInternal
+    data class ExtraConstructorArgs(val oldParticipants: Collection<Party>, val sessions: Collection<FlowSession>, val newApi: Boolean, val statesToRecord: StatesToRecord)
+
+    @CordaInternal
+    fun getExtraConstructorArgs() = ExtraConstructorArgs(oldParticipants, sessions, newApi, statesToRecord)
+
+    @Deprecated(DEPRECATION_MSG)
+    constructor(transaction: SignedTransaction, extraRecipients: Set<Party>, progressTracker: ProgressTracker) : this(
+            transaction, extraRecipients, progressTracker, emptyList(), false
+    )
+    @Deprecated(DEPRECATION_MSG)
+    constructor(transaction: SignedTransaction, extraRecipients: Set<Party>) : this(transaction, extraRecipients, tracker(), emptyList(), false)
+    @Deprecated(DEPRECATION_MSG)
+    constructor(transaction: SignedTransaction) : this(transaction, emptySet(), tracker(), emptyList(), false)
+    @Deprecated(DEPRECATION_MSG)
+    constructor(transaction: SignedTransaction, progressTracker: ProgressTracker) : this(transaction, emptySet(), progressTracker, emptyList(), false)
+
+    /**
+     * Notarise the given transaction and broadcast it to the given [FlowSession]s. This list **must** at least include
+     * all the non-local participants of the transaction. Sessions to non-participants can also be provided.
+     *
+     * @param transaction What to commit.
+     */
+    constructor(transaction: SignedTransaction, firstSession: FlowSession, vararg restSessions: FlowSession) : this(
+            transaction, listOf(firstSession) + restSessions.asList()
+    )
+
+    /**
+     * Notarise the given transaction and broadcast it to all the participants.
+     *
+     * @param transaction What to commit.
+     * @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can
+     * also be provided.
+     */
+    @JvmOverloads
+    constructor(
+            transaction: SignedTransaction,
+            sessions: Collection<FlowSession>,
+            progressTracker: ProgressTracker = tracker()
+    ) : this(transaction, emptyList(), progressTracker, sessions, true)
+
+    /**
+     * Notarise the given transaction and broadcast it to all the participants.
+     *
+     * @param transaction What to commit.
+     * @param sessions A collection of [FlowSession]s for each non-local participant of the transaction. Sessions to non-participants can
+     * also be provided.
+     * @param statesToRecord Which states to commit to the vault.
+     */
+    @JvmOverloads
+    constructor(
+            transaction: SignedTransaction,
+            sessions: Collection<FlowSession>,
+            statesToRecord: StatesToRecord,
+            progressTracker: ProgressTracker = tracker()
+    ) : this(transaction, emptyList(), progressTracker, sessions, true, statesToRecord)
+
+    /**
+     * Notarise the given transaction and broadcast it to all the participants.
+     *
+     * @param transaction What to commit.
+     * @param sessions A collection of [FlowSession]s for each non-local participant.
+     * @param oldParticipants An **optional** collection of parties for participants who are still using the old API.
+     *
+     * You will only need to use this parameter if you have upgraded your CorDapp from the V3 FinalityFlow API but are required to provide
+     * backwards compatibility with participants running V3 nodes. If you're writing a new CorDapp then this does not apply and this
+     * parameter should be ignored.
+     */
+    @Deprecated(DEPRECATION_MSG)
+    constructor(
+            transaction: SignedTransaction,
+            sessions: Collection<FlowSession>,
+            oldParticipants: Collection<Party>,
+            progressTracker: ProgressTracker
+    ) : this(transaction, oldParticipants, progressTracker, sessions, true)
+
+    companion object {
+        private const val DEPRECATION_MSG = "It is unsafe to use this constructor as it requires nodes to automatically " +
+                "accept notarised transactions without first checking their relevancy. Instead, use one of the constructors " +
+                "that requires only FlowSessions."
+
+        object NOTARISING : ProgressTracker.Step("Requesting signature by notary service") {
+            override fun childProgressTracker() = NotaryFlow.Client.tracker()
+        }
+
+        object BROADCASTING : ProgressTracker.Step("Broadcasting transaction to participants")
+
+        @JvmStatic
+        fun tracker() = ProgressTracker(NOTARISING, BROADCASTING)
+    }
+
+    @Suppress("ComplexMethod")
+    @Suspendable
+    @Throws(NotaryException::class)
+    override fun call(): SignedTransaction {
+        if (!newApi) {
+            logger.warnOnce("The current usage of FinalityFlow is unsafe. Please consider upgrading your CorDapp to use " +
+                    "FinalityFlow with FlowSessions. (${serviceHub.getAppContext().cordapp.info})")
+        } else {
+            require(sessions.none { serviceHub.myInfo.isLegalIdentity(it.counterparty) }) {
+                "Do not provide flow sessions for the local node. FinalityFlow will record the notarised transaction locally."
+            }
+        }
+
+        // Note: this method is carefully broken up to minimize the amount of data reachable from the stack at
+        // the point where subFlow is invoked, as that minimizes the checkpointing work to be done.
+        //
+        // Lookup the resolved transactions and use them to map each signed transaction to the list of participants.
+        // Then send to the notary if needed, record locally and distribute.
+
+        logCommandData()
+        val ledgerTransaction = verifyTx()
+        val externalTxParticipants = extractExternalParticipants(ledgerTransaction)
+
+        if (newApi) {
+            val sessionParties = sessions.map { it.counterparty }
+            val missingRecipients = externalTxParticipants - sessionParties - oldParticipants
+            require(missingRecipients.isEmpty()) {
+                "Flow sessions were not provided for the following transaction participants: $missingRecipients"
+            }
+            sessionParties.intersect(oldParticipants).let {
+                require(it.isEmpty()) { "The following parties are specified both in flow sessions and in the oldParticipants list: $it" }
+            }
+        }
+
+        val notarised = notariseAndRecord()
+
+        progressTracker.currentStep = BROADCASTING
+
+        if (newApi) {
+            oldV3Broadcast(notarised, oldParticipants.toSet())
+            for (session in sessions) {
+                try {
+                    subFlow(SendTransactionFlow(session, notarised))
+                    logger.info("Party ${session.counterparty} received the transaction.")
+                } catch (e: UnexpectedFlowEndException) {
+                    throw UnexpectedFlowEndException(
+                            "${session.counterparty} has finished prematurely and we're trying to send them the finalised transaction. " +
+                                    "Did they forget to call ReceiveFinalityFlow? (${e.message})",
+                            e.cause,
+                            e.originalErrorId
+                    )
+                }
+            }
+        } else {
+            oldV3Broadcast(notarised, (externalTxParticipants + oldParticipants).toSet())
+        }
+
+        logger.info("All parties received the transaction successfully.")
+
+        return notarised
+    }
+
+    @Suspendable
+    private fun oldV3Broadcast(notarised: SignedTransaction, recipients: Set<Party>) {
+        for (recipient in recipients) {
+            if (!serviceHub.myInfo.isLegalIdentity(recipient)) {
+                logger.debug { "Sending transaction to party $recipient." }
+                val session = initiateFlow(recipient)
+                subFlow(SendTransactionFlow(session, notarised))
+                logger.info("Party $recipient received the transaction.")
+            }
+        }
+    }
+
+    private fun logCommandData() {
+        if (logger.isDebugEnabled) {
+            val commandDataTypes = transaction.tx.commands.asSequence().mapNotNull { it.value::class.qualifiedName }.distinct()
+            logger.debug("Started finalization, commands are ${commandDataTypes.joinToString(", ", "[", "]")}.")
+        }
+    }
+
+    @Suspendable
+    private fun notariseAndRecord(): SignedTransaction {
+        serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord", flowLogic = this) {
+            val notarised = if (needsNotarySignature(transaction)) {
+                progressTracker.currentStep = NOTARISING
+                val notarySignatures = subFlow(NotaryFlow.Client(transaction, skipVerification = true))
+                transaction + notarySignatures
+            } else {
+                logger.info("No need to notarise this transaction.")
+                transaction
+            }
+            serviceHub.telemetryServiceInternal.span("${this::class.java.name}#notariseAndRecord:recordTransactions", flowLogic = this) {
+                logger.info("Recording transaction locally.")
+                serviceHub.recordTransactions(statesToRecord, listOf(notarised))
+                logger.info("Recorded transaction locally successfully.")
+            }
+            return notarised
+        }
+    }
+
+    private fun needsNotarySignature(stx: SignedTransaction): Boolean {
+        val wtx = stx.tx
+        val needsNotarisation = wtx.inputs.isNotEmpty() || wtx.references.isNotEmpty() || wtx.timeWindow != null
+        return needsNotarisation && hasNoNotarySignature(stx)
+    }
+
+    private fun hasNoNotarySignature(stx: SignedTransaction): Boolean {
+        val notaryKey = stx.tx.notary?.owningKey
+        val signers = stx.sigs.asSequence().map { it.by }.toSet()
+        return notaryKey?.isFulfilledBy(signers) != true
+    }
+
+    private fun extractExternalParticipants(ltx: LedgerTransaction): Set<Party> {
+        val participants = ltx.outputStates.flatMap { it.participants } + ltx.inputStates.flatMap { it.participants }
+        return groupAbstractPartyByWellKnownParty(serviceHub, participants).keys - serviceHub.myInfo.legalIdentities
+    }
+
+    private fun verifyTx(): LedgerTransaction {
+        val notary = transaction.tx.notary
+        // The notary signature(s) are allowed to be missing but no others.
+        if (notary != null) transaction.verifySignaturesExcept(notary.owningKey) else transaction.verifyRequiredSignatures()
+        // TODO= [CORDA-3267] Remove duplicate signature verification
+        val ltx = transaction.toLedgerTransaction(serviceHub, false)
+        ltx.verify()
+        return ltx
+    }
+}
+
+/**
+ * The receiving counterpart to [FinalityFlow].
+ *
+ * All parties who are receiving a finalised transaction from a sender flow must subcall this flow in their own flows.
+ *
+ * It's typical to have already signed the transaction proposal in the same workflow using [SignTransactionFlow]. If so
+ * then the transaction ID can be passed in as an extra check to ensure the finalised transaction is the one that was signed
+ * before it's committed to the vault.
+ *
+ * @param otherSideSession The session which is providing the transaction to record.
+ * @param expectedTxId Expected ID of the transaction that's about to be received. This is typically retrieved from
+ * [SignTransactionFlow]. Setting it to null disables the expected transaction ID check.
+ * @param statesToRecord Which states to commit to the vault. Defaults to [StatesToRecord.ONLY_RELEVANT].
+ */
+class OldReceiveFinalityFlow @JvmOverloads constructor(private val otherSideSession: FlowSession,
+                                                       private val expectedTxId: SecureHash? = null,
+                                                       private val statesToRecord: StatesToRecord = ONLY_RELEVANT) : FlowLogic<SignedTransaction>() {
+    @Suspendable
+    override fun call(): SignedTransaction {
+        return subFlow(object : ReceiveTransactionFlow(otherSideSession, checkSufficientSignatures = true, statesToRecord = statesToRecord) {
+            override fun checkBeforeRecording(stx: SignedTransaction) {
+                require(expectedTxId == null || expectedTxId == stx.id) {
+                    "We expected to receive transaction with ID $expectedTxId but instead got ${stx.id}. Transaction was" +
+                            "not recorded and nor its states sent to the vault."
+                }
+            }
+        })
+    }
+}
diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt
index 198b9d3ab6..10ff0364d7 100644
--- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt
+++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultObserverExceptionTest.kt
@@ -39,7 +39,6 @@ import org.assertj.core.api.Assertions
 import org.junit.After
 import org.junit.Assert
 import org.junit.Test
-import java.lang.IllegalStateException
 import java.sql.SQLException
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.TimeUnit
@@ -422,9 +421,10 @@ class VaultObserverExceptionTest {
     /**
      * An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
      *
-     * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
-     * also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
-     * counterparty node.
+     * Two Phase Finality update:
+     * This causes the transaction to not be finalised on the local node but the notary still records the transaction as spent. The transaction
+     * does finalize at the counterparty node since the notary signatures are broadcast to peers prior to initiator node finalisation.
+     * Subscriber events will occur on the counterparty node.
      *
      * More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
      *
@@ -487,29 +487,32 @@ class VaultObserverExceptionTest {
             println("First set of flows")
             val stateId = startErrorInObservableWhenConsumingState()
             assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
-            assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
+            // Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
+            assertEquals(1, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
             assertEquals(1, notary.getNotarisedTransactionIds().size)
             assertEquals(1, observationCounter)
             assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
-            assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
+            assertEquals(1, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
 
             println("Second set of flows")
             val stateId2 = startErrorInObservableWhenConsumingState()
             assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
-            assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
+            // Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
+            assertEquals(1, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
             assertEquals(2, notary.getNotarisedTransactionIds().size)
             assertEquals(2, observationCounter)
             assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
-            assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
+            assertEquals(2, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
         }
     }
 
     /**
      * An error is thrown inside of the [VaultService.rawUpdates] observable while recording a transaction inside of the initiating node.
      *
-     * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
-     * also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
-     * counterparty node.
+     * Two Phase Finality update:
+     * This causes the transaction to not be finalised on the local node but the notary still records the transaction as spent. The transaction
+     * does finalize at the counterparty node since the notary signatures are broadcast to peers prior to initiator node finalisation.
+     * Subscriber events will occur on the counterparty node.
      *
      * More importantly, the observer listening to the [VaultService.rawUpdates] observable should not unsubscribe.
      *
@@ -578,19 +581,21 @@ class VaultObserverExceptionTest {
 
             val stateId = startErrorInObservableWhenConsumingState()
             assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
-            assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
+            // Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
+            assertEquals(1, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
             assertEquals(1, notary.getNotarisedTransactionIds().size)
             assertEquals(1, observationCounter)
             assertEquals(3, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
-            assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
+            assertEquals(1, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
 
             val stateId2 = startErrorInObservableWhenConsumingState()
             assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
-            assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
+            // Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
+            assertEquals(1, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
             assertEquals(2, notary.getNotarisedTransactionIds().size)
             assertEquals(2, observationCounter)
             assertEquals(6, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
-            assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
+            assertEquals(2, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
         }
     }
 
@@ -681,9 +686,10 @@ class VaultObserverExceptionTest {
     /**
      * An error is thrown inside of the [VaultService.updates] observable while recording a transaction inside of the initiating node.
      *
-     * This causes the transaction to not be saved on the local node but the notary still records the transaction as spent. The transaction
-     * also is not send to the counterparty node since it failed before reaching the send. Therefore no subscriber events occur on the
-     * counterparty node.
+     * Two Phase Finality update:
+     * This causes the transaction to not be finalised on the local node but the notary still records the transaction as spent. The transaction
+     * does finalize at the counterparty node since the notary signatures are broadcast to peers prior to initiator node finalisation.
+     * Subscriber events will occur on the counterparty node.
      *
      * More importantly, the observer listening to the [VaultService.updates] observable should not unsubscribe.
      *
@@ -743,19 +749,21 @@ class VaultObserverExceptionTest {
             val stateId = startErrorInObservableWhenConsumingState()
             assertEquals(0, aliceNode.getStatesById(stateId, Vault.StateStatus.CONSUMED).size)
             assertEquals(1, aliceNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
-            assertEquals(0, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
+            // Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
+            assertEquals(1, bobNode.getStatesById(stateId, Vault.StateStatus.UNCONSUMED).size)
             assertEquals(1, notary.getNotarisedTransactionIds().size)
             assertEquals(1, observationCounter)
             assertEquals(2, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
-            assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
+            assertEquals(1, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
 
             val stateId2 = startErrorInObservableWhenConsumingState()
             assertEquals(0, aliceNode.getStatesById(stateId2, Vault.StateStatus.CONSUMED).size)
             assertEquals(2, aliceNode.getAllStates(Vault.StateStatus.UNCONSUMED).size)
-            assertEquals(0, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
+            // Ledger is temporarily inconsistent as peer finalised transaction but initiator error'ed before finalisation.
+            assertEquals(1, bobNode.getStatesById(stateId2, Vault.StateStatus.UNCONSUMED).size)
             assertEquals(2, notary.getNotarisedTransactionIds().size)
             assertEquals(4, rawUpdatesCount[aliceNode.nodeInfo.singleIdentity()])
-            assertEquals(0, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
+            assertEquals(2, rawUpdatesCount.getOrDefault(bobNode.nodeInfo.singleIdentity(), 0))
         }
     }
 
diff --git a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt
index bc6cf3d2af..3d737ea422 100644
--- a/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt
+++ b/node/src/main/kotlin/net/corda/node/services/DbTransactionsResolver.kt
@@ -3,6 +3,7 @@ package net.corda.node.services
 import co.paralleluniverse.fibers.Suspendable
 import net.corda.core.crypto.SecureHash
 import net.corda.core.flows.FlowLogic
+import net.corda.core.flows.TransactionStatus
 import net.corda.core.internal.FetchTransactionsFlow
 import net.corda.core.internal.ResolveTransactionsFlow
 import net.corda.core.internal.TransactionsResolver
@@ -101,10 +102,10 @@ class DbTransactionsResolver(private val flow: ResolveTransactionsFlow) : Transa
         val transactionStorage = flow.serviceHub.validatedTransactions as WritableTransactionStorage
         for (txId in sortedDependencies) {
             // Retrieve and delete the transaction from the unverified store.
-            val (tx, isVerified) = checkNotNull(transactionStorage.getTransactionInternal(txId)) {
+            val (tx, txStatus) = checkNotNull(transactionStorage.getTransactionInternal(txId)) {
                 "Somehow the unverified transaction ($txId) that we stored previously is no longer there."
             }
-            if (!isVerified) {
+            if (txStatus != TransactionStatus.VERIFIED) {
                 tx.verify(flow.serviceHub)
                 flow.serviceHub.recordTransactions(usedStatesToRecord, listOf(tx))
             } else {
diff --git a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt
index 8139b3b0a4..a0bd6121d1 100644
--- a/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt
+++ b/node/src/main/kotlin/net/corda/node/services/api/ServiceHubInternal.kt
@@ -3,10 +3,19 @@ package net.corda.node.services.api
 import net.corda.core.concurrent.CordaFuture
 import net.corda.core.context.InvocationContext
 import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.TransactionSignature
 import net.corda.core.flows.FlowLogic
+import net.corda.core.flows.FlowTransactionMetadata
 import net.corda.core.flows.StateMachineRunId
-import net.corda.core.internal.*
+import net.corda.core.flows.TransactionStatus
+import net.corda.core.internal.FlowStateMachineHandle
+import net.corda.core.internal.NamedCacheFactory
+import net.corda.core.internal.ResolveTransactionsFlow
+import net.corda.core.internal.ServiceHubCoreInternal
+import net.corda.core.internal.TransactionsResolver
 import net.corda.core.internal.concurrent.OpenFuture
+import net.corda.core.internal.dependencies
+import net.corda.core.internal.requireSupportedHashType
 import net.corda.core.messaging.DataFeed
 import net.corda.core.messaging.StateMachineTransactionMapping
 import net.corda.core.node.NodeInfo
@@ -15,6 +24,7 @@ import net.corda.core.node.services.NetworkMapCache
 import net.corda.core.node.services.NetworkMapCacheBase
 import net.corda.core.node.services.TransactionStorage
 import net.corda.core.transactions.SignedTransaction
+import net.corda.core.transactions.WireTransaction
 import net.corda.core.utilities.contextLogger
 import net.corda.node.internal.InitiatedFlowFactory
 import net.corda.node.internal.cordapp.CordappProviderInternal
@@ -27,7 +37,11 @@ import net.corda.node.services.statemachine.ExternalEvent
 import net.corda.node.services.statemachine.FlowStateMachineImpl
 import net.corda.nodeapi.internal.persistence.CordaPersistence
 import java.security.PublicKey
-import java.util.*
+import java.util.ArrayList
+import java.util.Collections
+import java.util.HashMap
+import java.util.HashSet
+import java.util.LinkedHashSet
 
 interface NetworkMapCacheInternal : NetworkMapCache, NetworkMapCacheBase {
     override val nodeReady: OpenFuture<Void?>
@@ -63,12 +77,15 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
             return sort.complete()
         }
 
+        @Suppress("LongParameterList")
         fun recordTransactions(statesToRecord: StatesToRecord,
                                txs: Collection<SignedTransaction>,
                                validatedTransactions: WritableTransactionStorage,
                                stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
                                vaultService: VaultServiceInternal,
-                               database: CordaPersistence) {
+                               database: CordaPersistence,
+                               updateFn: (SignedTransaction) -> Boolean = validatedTransactions::addTransaction
+        ) {
 
             database.transaction {
                 require(txs.isNotEmpty()) { "No transactions passed in for recording" }
@@ -79,9 +96,9 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
                 // for transactions being recorded at ONLY_RELEVANT, if this transaction has been seen before its outputs should already
                 // have been recorded at ONLY_RELEVANT, so there shouldn't be anything to re-record here.
                 val (recordedTransactions, previouslySeenTxs) = if (statesToRecord != StatesToRecord.ALL_VISIBLE) {
-                    orderedTxs.filter(validatedTransactions::addTransaction) to emptyList()
+                    orderedTxs.filter(updateFn) to emptyList()
                 } else {
-                    orderedTxs.partition(validatedTransactions::addTransaction)
+                    orderedTxs.partition(updateFn)
                 }
                 val stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id
                 if (stateMachineRunId != null) {
@@ -129,6 +146,22 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
                 vaultService.notifyAll(statesToRecord, recordedTransactions.map { it.coreTransaction }, previouslySeenTxs.map { it.coreTransaction })
             }
         }
+
+        @Suppress("LongParameterList")
+        fun finalizeTransactionWithExtraSignatures(statesToRecord: StatesToRecord,
+                                                   txn: SignedTransaction,
+                                                   sigs: Collection<TransactionSignature>,
+                                                   validatedTransactions: WritableTransactionStorage,
+                                                   stateMachineRecordedTransactionMapping: StateMachineRecordedTransactionMappingStorage,
+                                                   vaultService: VaultServiceInternal,
+                                                   database: CordaPersistence) {
+            database.transaction {
+                require(sigs.isNotEmpty()) { "No signatures passed in for recording" }
+                recordTransactions(statesToRecord, listOf(txn), validatedTransactions, stateMachineRecordedTransactionMapping, vaultService, database) {
+                    validatedTransactions.finalizeTransactionWithExtraSignatures(it, sigs)
+                }
+            }
+        }
     }
 
     override val attachments: AttachmentStorageInternal
@@ -156,7 +189,9 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
     val cacheFactory: NamedCacheFactory
 
     override fun recordTransactions(statesToRecord: StatesToRecord, txs: Iterable<SignedTransaction>) {
-        txs.forEach { requireSupportedHashType(it) }
+        txs.forEach {
+            requireSupportedHashType(it)
+        }
         recordTransactions(
                 statesToRecord,
                 txs as? Collection ?: txs.toList(), // We can't change txs to a Collection as it's now part of the public API
@@ -167,6 +202,32 @@ interface ServiceHubInternal : ServiceHubCoreInternal {
         )
     }
 
+    override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) {
+        requireSupportedHashType(txn)
+        if (txn.coreTransaction is WireTransaction)
+            (txn + sigs).verifyRequiredSignatures()
+        finalizeTransactionWithExtraSignatures(
+                statesToRecord,
+                txn,
+                sigs,
+                validatedTransactions,
+                stateMachineRecordedTransactionMapping,
+                vaultService,
+                database
+        )
+    }
+
+    override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?) {
+        if (txn.coreTransaction is WireTransaction) {
+            txn.notary?.let { notary ->
+                txn.verifySignaturesExcept(notary.owningKey)
+            } ?: txn.verifyRequiredSignatures()
+        }
+        database.transaction {
+            validatedTransactions.addUnnotarisedTransaction(txn, metadata)
+        }
+    }
+
     override fun createTransactionsResolver(flow: ResolveTransactionsFlow): TransactionsResolver = DbTransactionsResolver(flow)
 
     /**
@@ -253,16 +314,33 @@ interface WritableTransactionStorage : TransactionStorage {
     // TODO: Throw an exception if trying to add a transaction with fewer signatures than an existing entry.
     fun addTransaction(transaction: SignedTransaction): Boolean
 
+    /**
+     * Add an un-notarised transaction to the store with a status of *MISSING_TRANSACTION_SIG*.
+     * Optionally add finality flow recovery metadata.
+     * @param transaction The transaction to be recorded.
+     * @param metadata Finality flow recovery metadata.
+     * @return true if the transaction was recorded as a *new* transaction, false if the transaction already exists.
+     */
+    fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata? = null): Boolean
+
+    /**
+     * Update a previously un-notarised transaction including associated notary signatures.
+     * @param transaction The notarised transaction to be finalized.
+     * @param signatures The notary signatures.
+     * @return true if the transaction is recorded as a *finalized* transaction, false if the transaction already exists.
+     */
+    fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>) : Boolean
+
     /**
      * Add a new *unverified* transaction to the store.
      */
     fun addUnverifiedTransaction(transaction: SignedTransaction)
 
     /**
-     * Return the transaction with the given ID from the store, and a flag of whether it's verified. Returns null if no transaction with the
-     * ID exists.
+     * Return the transaction with the given ID from the store, and its associated [TransactionStatus].
+     * Returns null if no transaction with the ID exists.
      */
-    fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>?
+    fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>?
 
     /**
      * Returns a future that completes with the transaction corresponding to [id] once it has been committed. Do not warn when run inside
diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt
index 24046f2941..a9651af587 100644
--- a/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt
+++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBTransactionStorage.kt
@@ -3,17 +3,26 @@ package net.corda.node.services.persistence
 import net.corda.core.concurrent.CordaFuture
 import net.corda.core.crypto.SecureHash
 import net.corda.core.crypto.TransactionSignature
+import net.corda.core.flows.FlowTransactionMetadata
+import net.corda.core.identity.CordaX500Name
 import net.corda.core.internal.NamedCacheFactory
 import net.corda.core.internal.ThreadBox
 import net.corda.core.internal.VisibleForTesting
 import net.corda.core.internal.bufferUntilSubscribed
 import net.corda.core.internal.concurrent.doneFuture
 import net.corda.core.messaging.DataFeed
-import net.corda.core.serialization.*
+import net.corda.core.node.StatesToRecord
+import net.corda.core.serialization.SerializationContext
+import net.corda.core.serialization.SerializationDefaults
+import net.corda.core.serialization.SerializedBytes
+import net.corda.core.serialization.SingletonSerializeAsToken
+import net.corda.core.serialization.deserialize
 import net.corda.core.serialization.internal.effectiveSerializationEnv
+import net.corda.core.serialization.serialize
 import net.corda.core.toFuture
 import net.corda.core.transactions.CoreTransaction
 import net.corda.core.transactions.SignedTransaction
+import net.corda.core.transactions.WireTransaction
 import net.corda.core.utilities.contextLogger
 import net.corda.core.utilities.debug
 import net.corda.node.CordaClock
@@ -21,22 +30,35 @@ import net.corda.node.services.api.WritableTransactionStorage
 import net.corda.node.services.statemachine.FlowStateMachineImpl
 import net.corda.node.utilities.AppendOnlyPersistentMapBase
 import net.corda.node.utilities.WeightBasedAppendOnlyPersistentMap
-import net.corda.nodeapi.internal.persistence.*
+import net.corda.nodeapi.internal.persistence.CordaPersistence
+import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
+import net.corda.nodeapi.internal.persistence.bufferUntilDatabaseCommit
+import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
+import net.corda.nodeapi.internal.persistence.currentDBSession
+import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
 import net.corda.serialization.internal.CordaSerializationEncoding.SNAPPY
 import rx.Observable
 import rx.subjects.PublishSubject
 import java.time.Instant
-import java.util.*
-import javax.persistence.*
+import java.util.Collections
+import javax.persistence.AttributeConverter
+import javax.persistence.Column
+import javax.persistence.Convert
+import javax.persistence.Converter
+import javax.persistence.Entity
+import javax.persistence.Id
+import javax.persistence.Lob
+import javax.persistence.Table
 import kotlin.streams.toList
 
+@Suppress("TooManyFunctions")
 class DBTransactionStorage(private val database: CordaPersistence, cacheFactory: NamedCacheFactory,
                            private val clock: CordaClock) : WritableTransactionStorage, SingletonSerializeAsToken() {
 
     @Suppress("MagicNumber") // database column width
     @Entity
     @Table(name = "${NODE_DATABASE_PREFIX}transactions")
-    class DBTransaction(
+    data class DBTransaction(
             @Id
             @Column(name = "tx_id", length = 144, nullable = false)
             val txId: String,
@@ -53,17 +75,41 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
             val status: TransactionStatus,
 
             @Column(name = "timestamp", nullable = false)
-            val timestamp: Instant
-            )
+            val timestamp: Instant,
+
+            @Column(name = "signatures")
+            val signatures: ByteArray?,
+
+            /**
+             * Flow finality metadata used for recovery
+             * TODO: create association table solely for Flow metadata and recovery purposes.
+             * See https://r3-cev.atlassian.net/browse/ENT-9521
+             */
+
+            /** X500Name of flow initiator **/
+            @Column(name = "initiator")
+            val initiator: String? = null,
+
+            /** X500Name of flow participant parties **/
+            @Column(name = "participants")
+            @Convert(converter = StringListConverter::class)
+            val participants: List<String>? = null,
+
+            /** states to record: NONE, ALL_VISIBLE, ONLY_RELEVANT */
+            @Column(name = "states_to_record")
+            val statesToRecord: StatesToRecord? = null
+    )
 
     enum class TransactionStatus {
         UNVERIFIED,
-        VERIFIED;
+        VERIFIED,
+        MISSING_NOTARY_SIG;
 
         fun toDatabaseValue(): String {
             return when (this) {
                 UNVERIFIED -> "U"
                 VERIFIED -> "V"
+                MISSING_NOTARY_SIG -> "M"
             }
         }
 
@@ -71,11 +117,20 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
             return this == VERIFIED
         }
 
+        fun toTransactionStatus(): net.corda.core.flows.TransactionStatus {
+            return when(this) {
+                UNVERIFIED -> net.corda.core.flows.TransactionStatus.UNVERIFIED
+                VERIFIED -> net.corda.core.flows.TransactionStatus.VERIFIED
+                MISSING_NOTARY_SIG -> net.corda.core.flows.TransactionStatus.MISSING_NOTARY_SIG
+            }
+        }
+
         companion object {
             fun fromDatabaseValue(databaseValue: String): TransactionStatus {
                 return when (databaseValue) {
                     "V" -> VERIFIED
                     "U" -> UNVERIFIED
+                    "M" -> MISSING_NOTARY_SIG
                     else -> throw UnexpectedStatusValueException(databaseValue)
                 }
             }
@@ -95,6 +150,21 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
         }
     }
 
+    @Converter
+    class StringListConverter : AttributeConverter<List<String>?, String?> {
+        override fun convertToDatabaseColumn(stringList: List<String>?): String? {
+            return stringList?.let { if (it.isEmpty()) null else it.joinToString(SPLIT_CHAR) }
+        }
+
+        override fun convertToEntityAttribute(string: String?): List<String>? {
+            return string?.split(SPLIT_CHAR)
+        }
+
+        companion object {
+            private const val SPLIT_CHAR = ";"
+        }
+    }
+
     internal companion object {
         const val TRANSACTION_ALREADY_IN_PROGRESS_WARNING = "trackTransaction is called with an already existing, open DB transaction. As a result, there might be transactions missing from the returned data feed, because of race conditions."
 
@@ -107,7 +177,7 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
 
         private val logger = contextLogger()
 
-        private fun contextToUse(): SerializationContext {
+        fun contextToUse(): SerializationContext {
             return if (effectiveSerializationEnv.serializationFactory.currentContext?.useCase == SerializationContext.UseCase.Storage) {
                 effectiveSerializationEnv.serializationFactory.currentContext!!
             } else {
@@ -121,10 +191,19 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
                     cacheFactory = cacheFactory,
                     name = "DBTransactionStorage_transactions",
                     toPersistentEntityKey = SecureHash::toString,
-                    fromPersistentEntity = {
-                        SecureHash.create(it.txId) to TxCacheValue(
-                                it.transaction.deserialize(context = contextToUse()),
-                                it.status)
+                    fromPersistentEntity = { dbTxn ->
+                        SecureHash.create(dbTxn.txId) to TxCacheValue(
+                                dbTxn.transaction.deserialize(context = contextToUse()),
+                                dbTxn.status,
+                                dbTxn.signatures?.deserialize(context = contextToUse()),
+                                dbTxn.initiator?.let { initiator ->
+                                    FlowTransactionMetadata(
+                                            CordaX500Name.parse(initiator),
+                                            dbTxn.statesToRecord!!,
+                                            dbTxn.participants?.let { it.map { CordaX500Name.parse(it) }.toSet() }
+                                    )
+                                }
+                        )
                     },
                     toPersistentEntity = { key: SecureHash, value: TxCacheValue ->
                         DBTransaction(
@@ -132,7 +211,11 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
                                 stateMachineRunId = FlowStateMachineImpl.currentStateMachine()?.id?.uuid?.toString(),
                                 transaction = value.toSignedTx().serialize(context = contextToUse().withEncoding(SNAPPY)).bytes,
                                 status = value.status,
-                                timestamp = clock.instant()
+                                timestamp = clock.instant(),
+                                signatures = value.sigs.serialize(context = contextToUse().withEncoding(SNAPPY)).bytes,
+                                statesToRecord = value.metadata?.statesToRecord,
+                                initiator = value.metadata?.initiator?.toString(),
+                                participants = value.metadata?.peers?.map { it.toString() }
                         )
                     },
                     persistentEntityClass = DBTransaction::class.java,
@@ -158,19 +241,43 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
         criteriaUpdate.set(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED)
         criteriaUpdate.where(criteriaBuilder.and(
                 criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()),
-                criteriaBuilder.equal(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.UNVERIFIED)
-        ))
+                criteriaBuilder.and(updateRoot.get<TransactionStatus>(DBTransaction::status.name).`in`(setOf(TransactionStatus.UNVERIFIED, TransactionStatus.MISSING_NOTARY_SIG))
+        )))
         criteriaUpdate.set(updateRoot.get<Instant>(DBTransaction::timestamp.name), clock.instant())
         val update = session.createQuery(criteriaUpdate)
         val rowsUpdated = update.executeUpdate()
         return rowsUpdated != 0
     }
 
-    override fun addTransaction(transaction: SignedTransaction): Boolean {
+    override fun addTransaction(transaction: SignedTransaction) =
+            addTransaction(transaction) {
+                updateTransaction(transaction.id)
+            }
+
+    override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?) =
+            database.transaction {
+                txStorage.locked {
+                    val cacheValue = TxCacheValue(transaction, status = TransactionStatus.MISSING_NOTARY_SIG, metadata = metadata)
+                    val added = addWithDuplicatesAllowed(transaction.id, cacheValue)
+                    if (added) {
+                        logger.info ("Transaction ${transaction.id} recorded as un-notarised.")
+                    } else {
+                        logger.info("Transaction ${transaction.id} (un-notarised) already exists so no need to record.")
+                    }
+                    added
+                }
+            }
+
+    override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>) =
+            addTransaction(transaction + signatures) {
+                finalizeTransactionWithExtraSignatures(transaction.id, signatures)
+            }
+
+    private fun addTransaction(transaction: SignedTransaction, updateFn: (SecureHash) -> Boolean): Boolean {
         return database.transaction {
             txStorage.locked {
                 val cachedValue = TxCacheValue(transaction, TransactionStatus.VERIFIED)
-                val addedOrUpdated = addOrUpdate(transaction.id, cachedValue) { k, _ -> updateTransaction(k) }
+                val addedOrUpdated = addOrUpdate(transaction.id, cachedValue) { k, _ -> updateFn(k) }
                 if (addedOrUpdated) {
                     logger.debug { "Transaction ${transaction.id} has been recorded as verified" }
                     onNewTx(transaction)
@@ -182,6 +289,40 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
         }
     }
 
+    private fun finalizeTransactionWithExtraSignatures(txId: SecureHash, signatures: Collection<TransactionSignature>): Boolean {
+        return txStorage.locked {
+            val session = currentDBSession()
+            val criteriaBuilder = session.criteriaBuilder
+            val criteriaUpdate = criteriaBuilder.createCriteriaUpdate(DBTransaction::class.java)
+            val updateRoot = criteriaUpdate.from(DBTransaction::class.java)
+            criteriaUpdate.set(updateRoot.get<ByteArray>(DBTransaction::signatures.name), signatures.serialize(context = contextToUse().withEncoding(SNAPPY)).bytes)
+            criteriaUpdate.set(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED)
+            criteriaUpdate.where(criteriaBuilder.and(
+                    criteriaBuilder.equal(updateRoot.get<String>(DBTransaction::txId.name), txId.toString()),
+                    criteriaBuilder.equal(updateRoot.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.MISSING_NOTARY_SIG)
+            ))
+            criteriaUpdate.set(updateRoot.get<Instant>(DBTransaction::timestamp.name), clock.instant())
+            val update = session.createQuery(criteriaUpdate)
+            val rowsUpdated = update.executeUpdate()
+            if (rowsUpdated == 0) {
+                // indicates race condition whereby ReceiverFinality MISSING_NOTARY_SIG overwritten to UNVERIFIED by ResolveTransactionsFlow (in follow-up txn)
+                // TO-DO: ensure unverified txn signatures are validated prior to recording  (https://r3-cev.atlassian.net/browse/ENT-9566)
+                val criteriaUpdateUnverified = criteriaBuilder.createCriteriaUpdate(DBTransaction::class.java)
+                val updateRootUnverified = criteriaUpdateUnverified.from(DBTransaction::class.java)
+                criteriaUpdateUnverified.set(updateRootUnverified.get<ByteArray>(DBTransaction::signatures.name), signatures.serialize(context = contextToUse().withEncoding(SNAPPY)).bytes)
+                criteriaUpdateUnverified.set(updateRootUnverified.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.VERIFIED)
+                criteriaUpdateUnverified.where(criteriaBuilder.and(
+                        criteriaBuilder.equal(updateRootUnverified.get<String>(DBTransaction::txId.name), txId.toString()),
+                        criteriaBuilder.equal(updateRootUnverified.get<TransactionStatus>(DBTransaction::status.name), TransactionStatus.UNVERIFIED)
+                ))
+                criteriaUpdateUnverified.set(updateRootUnverified.get<Instant>(DBTransaction::timestamp.name), clock.instant())
+                val updateUnverified = session.createQuery(criteriaUpdateUnverified)
+                val rowsUpdatedUnverified = updateUnverified.executeUpdate()
+                rowsUpdatedUnverified != 0
+            } else true
+        }
+    }
+
     private fun onNewTx(transaction: SignedTransaction): Boolean {
         updatesPublisher.bufferUntilDatabaseCommit().onNext(transaction)
         return true
@@ -194,10 +335,18 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
     }
 
     override fun addUnverifiedTransaction(transaction: SignedTransaction) {
+        if (transaction.coreTransaction is WireTransaction)
+            transaction.verifyRequiredSignatures()
         database.transaction {
             txStorage.locked {
                 val cacheValue = TxCacheValue(transaction, status = TransactionStatus.UNVERIFIED)
-                val added = addWithDuplicatesAllowed(transaction.id, cacheValue)
+                val added = addWithDuplicatesAllowed(transaction.id, cacheValue) { k, v, existingEntry ->
+                    if (existingEntry.status == TransactionStatus.MISSING_NOTARY_SIG) {
+                        // TODO verify signatures on passed in transaction include notary (See https://r3-cev.atlassian.net/browse/ENT-9566))
+                        session.merge(toPersistentEntity(k, v))
+                        true
+                    } else false
+                }
                 if (added) {
                     logger.debug { "Transaction ${transaction.id} recorded as unverified." }
                 } else {
@@ -207,9 +356,9 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
         }
     }
 
-    override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>? {
+    override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, net.corda.core.flows.TransactionStatus>? {
         return database.transaction {
-            txStorage.content[id]?.let { it.toSignedTx() to it.status.isVerified() }
+            txStorage.content[id]?.let { it.toSignedTx() to it.status.toTransactionStatus() }
         }
     }
 
@@ -269,16 +418,30 @@ class DBTransactionStorage(private val database: CordaPersistence, cacheFactory:
     }
 
     // Cache value type to just store the immutable bits of a signed transaction plus conversion helpers
-    private data class TxCacheValue(
+    private class TxCacheValue(
             val txBits: SerializedBytes<CoreTransaction>,
             val sigs: List<TransactionSignature>,
-            val status: TransactionStatus
+            val status: TransactionStatus,
+            // flow metadata recorded for recovery
+            val metadata: FlowTransactionMetadata? = null
     ) {
         constructor(stx: SignedTransaction, status: TransactionStatus) : this(
                 stx.txBits,
                 Collections.unmodifiableList(stx.sigs),
-                status)
-
+                status
+        )
+        constructor(stx: SignedTransaction, status: TransactionStatus, metadata: FlowTransactionMetadata?) : this(
+                stx.txBits,
+                Collections.unmodifiableList(stx.sigs),
+                status,
+                metadata
+        )
+        constructor(stx: SignedTransaction, status: TransactionStatus, sigs: List<TransactionSignature>?, metadata: FlowTransactionMetadata?) : this(
+                stx.txBits,
+                if (sigs == null) Collections.unmodifiableList(stx.sigs) else Collections.unmodifiableList(stx.sigs + sigs).distinct(),
+                status,
+                metadata
+        )
         fun toSignedTx() = SignedTransaction(txBits, sigs)
     }
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt
index 2d314e9c3b..e853cd66a5 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StaffedFlowHospital.kt
@@ -469,6 +469,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
     }
 
     object FinalityDoctor : Staff {
+        @Suppress("ComplexMethod")
         override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
             return if (currentState.flowLogic is FinalityHandler) {
                 log.warn("Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
@@ -480,10 +481,18 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
                         // no need to keep around the flow, since notarisation has already failed at the counterparty.
                         Diagnosis.NOT_MY_SPECIALTY
                     }
+                    isErrorPropagatedFromCounterparty(newError) && isErrorThrownDuringReceiveFinalityFlow(newError) -> {
+                        // no need to keep around the flow, since notarisation has already failed at the counterparty.
+                        Diagnosis.NOT_MY_SPECIALTY
+                    }
                     isEndSessionErrorThrownDuringReceiveTransactionFlow(newError) -> {
                         // Typically occurs if the initiating flow catches a notary exception and ends their flow successfully.
                         Diagnosis.NOT_MY_SPECIALTY
                     }
+                    isEndSessionErrorThrownDuringReceiveFinalityFlow(newError) -> {
+                        // Typically occurs if the initiating flow catches a notary exception and ends their flow successfully.
+                        Diagnosis.NOT_MY_SPECIALTY
+                    }
                     else -> {
                         log.warn(
                             "Flow ${flowFiber.id} failed to be finalised. Manual intervention may be required before retrying " +
@@ -530,6 +539,19 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
                     && strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!!)
         }
 
+        /**
+         * This method will return true if [ReceiveFinalityFlow] is at the top of the stack during the error.
+         * This may happen in the post-notarisation logic of Two Phase Finality upon receiving a notarisation exception
+         * from the peer running [FinalityFlow].
+         */
+        private fun isErrorThrownDuringReceiveFinalityFlow(error: Throwable): Boolean {
+            val strippedStacktrace = error.stackTrace
+                    .filterNot { it?.className?.contains("counter-flow exception from peer") ?: false }
+                    .filterNot { it?.className?.startsWith("net.corda.node.services.statemachine.") ?: false }
+            return strippedStacktrace.isNotEmpty()
+                    && strippedStacktrace.first().className.startsWith(ReceiveFinalityFlow::class.qualifiedName!!)
+        }
+
         /**
          * Checks if an end session error exception was thrown and that it did so within [ReceiveTransactionFlow].
          *
@@ -542,6 +564,15 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
                     && error.message?.contains(StartedFlowTransition.UNEXPECTED_SESSION_END_MESSAGE) == true
                     && isErrorThrownDuringReceiveTransactionFlow(error)
         }
+
+        /**
+         * Checks if an end session error exception was thrown and that it did so within [ReceiveFinalityFlow].
+         */
+        private fun isEndSessionErrorThrownDuringReceiveFinalityFlow(error: Throwable): Boolean {
+            return error is UnexpectedFlowEndException
+                    && error.message?.contains(StartedFlowTransition.UNEXPECTED_SESSION_END_MESSAGE) == true
+                    && isErrorThrownDuringReceiveFinalityFlow(error)
+        }
     }
 
     /**
diff --git a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt
index 570172fa06..27d493b0a4 100644
--- a/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt
+++ b/node/src/main/kotlin/net/corda/node/utilities/AppendOnlyPersistentMap.kt
@@ -142,18 +142,22 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(
      * Associates the specified value with the specified key in this map and persists it.
      * If the map previously contained a committed mapping for the key, the old value is not replaced.  It may throw an error from the
      * underlying storage if this races with another database transaction to store a value for the same key.
+     * An optional [forceUpdate] function allows performing additional checks/updates on an existingEntry to determine whether the map
+     * should be updated.
      * @return true if added key was unique, otherwise false
      */
-    fun addWithDuplicatesAllowed(key: K, value: V, logWarning: Boolean = true): Boolean {
+    fun addWithDuplicatesAllowed(key: K, value: V, logWarning: Boolean = true,
+                                 forceUpdate: (K, V, E) -> Boolean = { _, _, _ -> false }): Boolean {
         return set(key, value, logWarning) { k, v ->
             val session = currentDBSession()
             val existingEntry = session.find(persistentEntityClass, toPersistentEntityKey(k))
             if (existingEntry == null) {
                 session.save(toPersistentEntity(k, v))
                 null
-            } else {
-                fromPersistentEntity(existingEntry).second
             }
+            else if (!forceUpdate(key, value, existingEntry)) {
+                fromPersistentEntity(existingEntry).second
+            } else null
         }
     }
 
diff --git a/node/src/main/resources/migration/node-core.changelog-master.xml b/node/src/main/resources/migration/node-core.changelog-master.xml
index fe333ea9df..a7949838ce 100644
--- a/node/src/main/resources/migration/node-core.changelog-master.xml
+++ b/node/src/main/resources/migration/node-core.changelog-master.xml
@@ -28,7 +28,8 @@
     <include file="migration/node-core.changelog-v16.xml"/>
     <include file="migration/node-core.changelog-v20.xml"/>
     <include file="migration/node-core.changelog-v22.xml"/>
-
+    <include file="migration/node-core.changelog-v23.xml"/>
+    <include file="migration/node-core.changelog-v24.xml"/>
     <!-- This must run after node-core.changelog-init.xml, to prevent database columns being created twice. -->
     <include file="migration/vault-schema.changelog-v9.xml"/>
 
diff --git a/node/src/main/resources/migration/node-core.changelog-v23.xml b/node/src/main/resources/migration/node-core.changelog-v23.xml
new file mode 100644
index 0000000000..095f5e6bee
--- /dev/null
+++ b/node/src/main/resources/migration/node-core.changelog-v23.xml
@@ -0,0 +1,12 @@
+<?xml version="1.1" encoding="UTF-8" standalone="no"?>
+<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
+                   logicalFilePath="migration/node-services.changelog-init.xml">
+
+    <changeSet author="R3.Corda" id="add_signatures_column">
+        <addColumn tableName="node_transactions">
+            <column name="signatures" type="VARBINARY(33554432)"/>
+        </addColumn>
+    </changeSet>
+</databaseChangeLog>
\ No newline at end of file
diff --git a/node/src/main/resources/migration/node-core.changelog-v24.xml b/node/src/main/resources/migration/node-core.changelog-v24.xml
new file mode 100644
index 0000000000..041633ddcf
--- /dev/null
+++ b/node/src/main/resources/migration/node-core.changelog-v24.xml
@@ -0,0 +1,25 @@
+<?xml version="1.1" encoding="UTF-8" standalone="no"?>
+<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
+                   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+                   xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
+                   logicalFilePath="migration/node-services.changelog-init.xml">
+
+    <changeSet author="R3.Corda" id="add_flow_metadata_columns">
+        <addColumn tableName="node_transactions">
+            <column name="initiator" type="NVARCHAR(128)">
+                <constraints nullable="true"/>
+            </column>
+        </addColumn>
+        <addColumn tableName="node_transactions">
+            <column name="participants" type="NVARCHAR(1280)">
+                <constraints nullable="true"/>
+            </column>
+        </addColumn>
+        <addColumn tableName="node_transactions">
+            <column name="states_to_record" type="INT">
+                <constraints nullable="true"/>
+            </column>
+        </addColumn>
+    </changeSet>
+
+</databaseChangeLog>
\ No newline at end of file
diff --git a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt
index 0c49ee44ac..cddecab98a 100644
--- a/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt
+++ b/node/src/test/kotlin/net/corda/node/messaging/TwoPartyTradeFlowTests.kt
@@ -2,9 +2,26 @@ package net.corda.node.messaging
 
 import co.paralleluniverse.fibers.Suspendable
 import net.corda.core.concurrent.CordaFuture
-import net.corda.core.contracts.*
-import net.corda.core.crypto.*
-import net.corda.core.flows.*
+import net.corda.core.contracts.Amount
+import net.corda.core.contracts.ContractState
+import net.corda.core.contracts.InsufficientBalanceException
+import net.corda.core.contracts.Issued
+import net.corda.core.contracts.OwnableState
+import net.corda.core.contracts.PartyAndReference
+import net.corda.core.contracts.StateAndRef
+import net.corda.core.contracts.TransactionVerificationException
+import net.corda.core.crypto.Crypto
+import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.SignableData
+import net.corda.core.crypto.SignatureMetadata
+import net.corda.core.crypto.TransactionSignature
+import net.corda.core.flows.FlowLogic
+import net.corda.core.flows.FlowSession
+import net.corda.core.flows.FlowTransactionMetadata
+import net.corda.core.flows.InitiatedBy
+import net.corda.core.flows.InitiatingFlow
+import net.corda.core.flows.StateMachineRunId
+import net.corda.core.flows.TransactionStatus
 import net.corda.core.identity.AbstractParty
 import net.corda.core.identity.AnonymousParty
 import net.corda.core.identity.CordaX500Name
@@ -38,14 +55,26 @@ import net.corda.node.services.api.WritableTransactionStorage
 import net.corda.node.services.persistence.DBTransactionStorage
 import net.corda.node.services.statemachine.Checkpoint
 import net.corda.nodeapi.internal.persistence.CordaPersistence
-import net.corda.testing.core.*
+import net.corda.testing.core.ALICE_NAME
+import net.corda.testing.core.BOB_NAME
+import net.corda.testing.core.BOC_NAME
+import net.corda.testing.core.DUMMY_NOTARY_NAME
+import net.corda.testing.core.TestIdentity
+import net.corda.testing.core.expect
+import net.corda.testing.core.expectEvents
+import net.corda.testing.core.sequence
+import net.corda.testing.core.singleIdentity
 import net.corda.testing.dsl.LedgerDSL
 import net.corda.testing.dsl.TestLedgerDSLInterpreter
 import net.corda.testing.dsl.TestTransactionDSLInterpreter
 import net.corda.testing.internal.IS_OPENJ9
 import net.corda.testing.internal.LogHelper
 import net.corda.testing.internal.vault.VaultFiller
-import net.corda.testing.node.internal.*
+import net.corda.testing.node.internal.FINANCE_CONTRACTS_CORDAPP
+import net.corda.testing.node.internal.InternalMockNetwork
+import net.corda.testing.node.internal.InternalMockNodeParameters
+import net.corda.testing.node.internal.TestStartedNode
+import net.corda.testing.node.internal.startFlow
 import net.corda.testing.node.ledger
 import org.assertj.core.api.Assertions.assertThat
 import org.junit.After
@@ -56,7 +85,11 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import rx.Observable
 import java.io.ByteArrayOutputStream
-import java.util.*
+import java.util.ArrayList
+import java.util.Collections
+import java.util.Currency
+import java.util.Random
+import java.util.UUID
 import java.util.jar.JarOutputStream
 import java.util.zip.ZipEntry
 import kotlin.streams.toList
@@ -139,7 +172,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
 
             // TODO: Verify that the result was inserted into the transaction database.
             // assertEquals(bobResult.get(), aliceNode.storage.validatedTransactions[aliceResult.get().id])
-            assertEquals(aliceResult.getOrThrow(), bobStateMachine.getOrThrow().resultFuture.getOrThrow())
+            assertEquals(aliceResult.getOrThrow().id, (bobStateMachine.getOrThrow().resultFuture.getOrThrow() as SignedTransaction).id)
 
             aliceNode.dispose()
             bobNode.dispose()
@@ -285,7 +318,7 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
             mockNet.runNetwork()
 
             // Bob is now finished and has the same transaction as Alice.
-            assertThat(bobFuture.getOrThrow()).isEqualTo(aliceFuture.getOrThrow())
+            assertThat((bobFuture.getOrThrow() as SignedTransaction).id).isEqualTo((aliceFuture.getOrThrow().id))
 
             assertThat(bobNode.smm.findStateMachines(Buyer::class.java)).isEmpty()
             bobNode.database.transaction {
@@ -768,6 +801,21 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
             return true
         }
 
+        override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?): Boolean {
+            database.transaction {
+                records.add(TxRecord.Add(transaction))
+                delegate.addUnnotarisedTransaction(transaction)
+            }
+            return true
+        }
+
+        override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>) : Boolean {
+            database.transaction {
+                delegate.finalizeTransactionWithExtraSignatures(transaction, signatures)
+            }
+            return true
+        }
+
         override fun addUnverifiedTransaction(transaction: SignedTransaction) {
             database.transaction {
                 delegate.addUnverifiedTransaction(transaction)
@@ -781,11 +829,12 @@ class TwoPartyTradeFlowTests(private val anonymous: Boolean) {
             }
         }
 
-        override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>? {
+        override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>? {
             return database.transaction {
                 delegate.getTransactionInternal(id)
             }
         }
+
     }
 
     interface TxRecord {
diff --git a/node/src/test/kotlin/net/corda/node/migration/VaultStateMigrationTest.kt b/node/src/test/kotlin/net/corda/node/migration/VaultStateMigrationTest.kt
index da138f9d15..9688afca81 100644
--- a/node/src/test/kotlin/net/corda/node/migration/VaultStateMigrationTest.kt
+++ b/node/src/test/kotlin/net/corda/node/migration/VaultStateMigrationTest.kt
@@ -212,7 +212,8 @@ class VaultStateMigrationTest {
                     stateMachineRunId = null,
                     transaction = tx.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes,
                     status = DBTransactionStorage.TransactionStatus.VERIFIED,
-                    timestamp = Instant.now()
+                    timestamp = Instant.now(),
+                    signatures = null
             )
             session.save(persistentTx)
         }
diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt
index 51b400c321..fd086f6ff9 100644
--- a/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBTransactionStorageTests.kt
@@ -1,21 +1,32 @@
 package net.corda.node.services.persistence
 
+import junit.framework.TestCase.assertNotNull
 import junit.framework.TestCase.assertTrue
 import net.corda.core.concurrent.CordaFuture
 import net.corda.core.contracts.StateRef
 import net.corda.core.crypto.Crypto
 import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.SignableData
 import net.corda.core.crypto.SignatureMetadata
 import net.corda.core.crypto.TransactionSignature
+import net.corda.core.crypto.sign
+import net.corda.core.flows.FlowTransactionMetadata
+import net.corda.core.node.StatesToRecord
+import net.corda.core.serialization.deserialize
 import net.corda.core.toFuture
 import net.corda.core.transactions.SignedTransaction
+import net.corda.core.transactions.WireTransaction
 import net.corda.node.CordaClock
 import net.corda.node.MutableClock
 import net.corda.node.SimpleClock
+import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.MISSING_NOTARY_SIG
+import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.UNVERIFIED
+import net.corda.node.services.persistence.DBTransactionStorage.TransactionStatus.VERIFIED
 import net.corda.node.services.transactions.PersistentUniquenessProvider
 import net.corda.nodeapi.internal.persistence.CordaPersistence
 import net.corda.nodeapi.internal.persistence.DatabaseConfig
 import net.corda.testing.core.ALICE_NAME
+import net.corda.testing.core.BOB_NAME
 import net.corda.testing.core.DUMMY_NOTARY_NAME
 import net.corda.testing.core.SerializationEnvironmentRule
 import net.corda.testing.core.TestIdentity
@@ -32,17 +43,21 @@ import org.junit.Before
 import org.junit.Rule
 import org.junit.Test
 import rx.plugins.RxJavaHooks
+import java.security.KeyPair
 import java.time.Clock
 import java.time.Instant
 import java.util.concurrent.Semaphore
 import java.util.concurrent.TimeUnit
 import kotlin.concurrent.thread
 import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertNull
 
 class DBTransactionStorageTests {
     private companion object {
-        val ALICE_PUBKEY = TestIdentity(ALICE_NAME, 70).publicKey
-        val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20).party
+        val ALICE = TestIdentity(ALICE_NAME, 70)
+        val BOB_PARTY = TestIdentity(BOB_NAME, 80).party
+        val DUMMY_NOTARY = TestIdentity(DUMMY_NOTARY_NAME, 20)
     }
 
     @Rule
@@ -90,6 +105,140 @@ class DBTransactionStorageTests {
         assertEquals(now, readTransactionTimestampFromDB(transaction.id))
     }
 
+    @Test(timeout = 300_000)
+    fun `create transaction missing notary signature and validate status in db`() {
+        val now = Instant.ofEpochSecond(333444555L)
+        val transactionClock = TransactionClock(now)
+        newTransactionStorage(clock = transactionClock)
+        val transaction = newTransaction()
+        transactionStorage.addUnnotarisedTransaction(transaction)
+        assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status)
+    }
+
+    @Test(timeout = 300_000)
+    fun `create un-notarised transaction with flow metadata and validate status in db`() {
+        val now = Instant.ofEpochSecond(333444555L)
+        val transactionClock = TransactionClock(now)
+        newTransactionStorage(clock = transactionClock)
+        val transaction = newTransaction()
+        transactionStorage.addUnnotarisedTransaction(transaction, FlowTransactionMetadata(ALICE.party.name, StatesToRecord.ALL_VISIBLE, setOf(BOB_PARTY.name)))
+        val txn = readTransactionFromDB(transaction.id)
+        assertEquals(MISSING_NOTARY_SIG, txn.status)
+        assertEquals(StatesToRecord.ALL_VISIBLE, txn.statesToRecord)
+        assertEquals(ALICE_NAME.toString(), txn.initiator)
+        assertEquals(listOf(BOB_NAME.toString()), txn.participants)
+    }
+
+    @Test(timeout = 300_000)
+    fun `finalize transaction with no prior recording of un-notarised transaction`() {
+        val now = Instant.ofEpochSecond(333444555L)
+        val transactionClock = TransactionClock(now)
+        newTransactionStorage(clock = transactionClock)
+        val transaction = newTransaction()
+        transactionStorage.finalizeTransactionWithExtraSignatures(transaction, listOf(notarySig(transaction.id)))
+        readTransactionFromDB(transaction.id).let {
+            assertSignatures(it.transaction, it.signatures, transaction.sigs)
+            assertEquals(VERIFIED, it.status)
+        }
+    }
+
+    @Test(timeout = 300_000)
+    fun `finalize transaction with extra signatures after recording transaction as un-notarised`() {
+        val now = Instant.ofEpochSecond(333444555L)
+        val transactionClock = TransactionClock(now)
+        newTransactionStorage(clock = transactionClock)
+        val transaction = newTransaction(notarySig = false)
+        transactionStorage.addUnnotarisedTransaction(transaction)
+        assertNull(transactionStorage.getTransaction(transaction.id))
+        assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transaction.id).status)
+        val notarySig = notarySig(transaction.id)
+        transactionStorage.finalizeTransactionWithExtraSignatures(transaction, listOf(notarySig))
+        readTransactionFromDB(transaction.id).let {
+            assertSignatures(it.transaction, it.signatures, transaction.sigs + notarySig)
+            assertEquals(VERIFIED, it.status)
+        }
+    }
+
+    @Test(timeout = 300_000)
+    fun `finalize unverified transaction and verify no additional signatures are added`() {
+        val now = Instant.ofEpochSecond(333444555L)
+        val transactionClock = TransactionClock(now)
+        newTransactionStorage(clock = transactionClock)
+        val transaction = newTransaction()
+        transactionStorage.addUnverifiedTransaction(transaction)
+        assertNull(transactionStorage.getTransaction(transaction.id))
+        assertEquals(UNVERIFIED, readTransactionFromDB(transaction.id).status)
+        // attempt to finalise with another notary signature
+        transactionStorage.finalizeTransactionWithExtraSignatures(transaction, listOf(notarySig(transaction.id)))
+        readTransactionFromDB(transaction.id).let {
+            assertSignatures(it.transaction, it.signatures, transaction.sigs)
+            assertEquals(VERIFIED, it.status)
+        }
+    }
+
+    @Test(timeout = 300_000)
+    fun `simulate finalize race condition where first transaction trumps follow-up transaction`() {
+        val now = Instant.ofEpochSecond(333444555L)
+        val transactionClock = TransactionClock(now)
+        newTransactionStorage(clock = transactionClock)
+        val transactionWithoutNotarySig = newTransaction(notarySig = false)
+
+        // txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
+        transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySig)
+        assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transactionWithoutNotarySig.id).status)
+
+        // txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)
+        val notarySig = notarySig(transactionWithoutNotarySig.id)
+        transactionStorage.addUnverifiedTransaction(transactionWithoutNotarySig + notarySig)
+        assertEquals(UNVERIFIED, readTransactionFromDB(transactionWithoutNotarySig.id).status)
+
+        // txn finalised with notary signatures (even though in UNVERIFIED state)
+        assertTrue(transactionStorage.finalizeTransactionWithExtraSignatures(transactionWithoutNotarySig, listOf(notarySig)))
+        readTransactionFromDB(transactionWithoutNotarySig.id).let {
+            assertSignatures(it.transaction, it.signatures, transactionWithoutNotarySig.sigs + notarySig)
+            assertEquals(VERIFIED, it.status)
+        }
+
+        // attempt to record follow-up txn
+        assertFalse(transactionStorage.addTransaction(transactionWithoutNotarySig + notarySig))
+        readTransactionFromDB(transactionWithoutNotarySig.id).let {
+            assertSignatures(it.transaction, it.signatures, transactionWithoutNotarySig.sigs + notarySig)
+            assertEquals(VERIFIED, it.status)
+        }
+    }
+
+    @Test(timeout = 300_000)
+    fun `simulate finalize race condition where follow-up transaction races ahead of initial transaction`() {
+        val now = Instant.ofEpochSecond(333444555L)
+        val transactionClock = TransactionClock(now)
+        newTransactionStorage(clock = transactionClock)
+        val transactionWithoutNotarySigs = newTransaction(notarySig = false)
+
+        // txn recorded as un-notarised (simulate ReceiverFinalityFlow in initial flow)
+        transactionStorage.addUnnotarisedTransaction(transactionWithoutNotarySigs)
+        assertEquals(MISSING_NOTARY_SIG, readTransactionFromDB(transactionWithoutNotarySigs.id).status)
+
+        // txn then recorded as unverified (simulate ResolveTransactionFlow in follow-up flow)
+        val notarySig = notarySig(transactionWithoutNotarySigs.id)
+        val transactionWithNotarySigs = transactionWithoutNotarySigs + notarySig
+        transactionStorage.addUnverifiedTransaction(transactionWithNotarySigs)
+        assertEquals(UNVERIFIED, readTransactionFromDB(transactionWithoutNotarySigs.id).status)
+
+        // txn then recorded as verified (simulate ResolveTransactions recording in follow-up flow)
+        assertTrue(transactionStorage.addTransaction(transactionWithNotarySigs))
+        readTransactionFromDB(transactionWithoutNotarySigs.id).let {
+            assertSignatures(it.transaction, it.signatures, expectedSigs = transactionWithNotarySigs.sigs)
+            assertEquals(VERIFIED, it.status)
+        }
+
+        // attempt to finalise original txn
+        assertFalse(transactionStorage.finalizeTransactionWithExtraSignatures(transactionWithoutNotarySigs, listOf(notarySig)))
+        readTransactionFromDB(transactionWithoutNotarySigs.id).let {
+            assertSignatures(it.transaction, it.signatures, expectedSigs = transactionWithNotarySigs.sigs)
+            assertEquals(VERIFIED, it.status)
+        }
+    }
+
     @Test(timeout = 300_000)
     fun `create unverified then verified transaction and validate timestamps in db`() {
         val unverifiedTime = Instant.ofEpochSecond(555666777L)
@@ -175,6 +324,17 @@ class DBTransactionStorageTests {
         return fromDb[0].timestamp
     }
 
+    private fun readTransactionFromDB(id: SecureHash): DBTransactionStorage.DBTransaction {
+        val fromDb = database.transaction {
+            session.createQuery(
+                    "from ${DBTransactionStorage.DBTransaction::class.java.name} where tx_id = :transactionId",
+                    DBTransactionStorage.DBTransaction::class.java
+            ).setParameter("transactionId", id.toString()).resultList.map { it }
+        }
+        assertEquals(1, fromDb.size)
+        return fromDb[0]
+    }
+
     @Test(timeout = 300_000)
     fun `empty store`() {
         assertThat(transactionStorage.getTransaction(newTransaction().id)).isNull()
@@ -369,7 +529,7 @@ class DBTransactionStorageTests {
 
         // Assert
 
-        assertThat(result).isNotNull()
+        assertThat(result).isNotNull
         assertThat(result?.get(20, TimeUnit.SECONDS)?.id).isEqualTo(signedTransaction.id)
     }
 
@@ -399,18 +559,36 @@ class DBTransactionStorageTests {
         assertThat(transactionStorage.getTransaction(transaction.id)).isEqualTo(transaction)
     }
 
-    private fun newTransaction(): SignedTransaction {
+    private fun newTransaction(notarySig: Boolean = true): SignedTransaction {
         val wtx = createWireTransaction(
                 inputs = listOf(StateRef(SecureHash.randomSHA256(), 0)),
                 attachments = emptyList(),
                 outputs = emptyList(),
-                commands = listOf(dummyCommand()),
-                notary = DUMMY_NOTARY,
+                commands = listOf(dummyCommand(ALICE.publicKey)),
+                notary = DUMMY_NOTARY.party,
                 timeWindow = null
         )
-        return SignedTransaction(
-                wtx,
-                listOf(TransactionSignature(ByteArray(1), ALICE_PUBKEY, SignatureMetadata(1, Crypto.findSignatureScheme(ALICE_PUBKEY).schemeNumberID)))
-        )
+        return makeSigned(wtx, ALICE.keyPair, notarySig = notarySig)
+    }
+
+    private fun makeSigned(wtx: WireTransaction, vararg keys: KeyPair, notarySig: Boolean = true): SignedTransaction {
+        val keySigs = keys.map { it.sign(SignableData(wtx.id, SignatureMetadata(1, Crypto.findSignatureScheme(it.public).schemeNumberID))) }
+        val sigs = if (notarySig) {
+            keySigs + notarySig(wtx.id)
+        } else {
+            keySigs
+        }
+        return SignedTransaction(wtx, sigs)
+    }
+
+    private fun notarySig(txId: SecureHash) =
+            DUMMY_NOTARY.keyPair.sign(SignableData(txId, SignatureMetadata(1, Crypto.findSignatureScheme(DUMMY_NOTARY.publicKey).schemeNumberID)))
+
+    private fun assertSignatures(transaction: ByteArray, extraSigs: ByteArray?,
+                                 expectedSigs: List<TransactionSignature>) {
+        assertNotNull(extraSigs)
+        assertEquals(expectedSigs,
+                (transaction.deserialize<SignedTransaction>(context = DBTransactionStorage.contextToUse()).sigs +
+                extraSigs!!.deserialize<List<TransactionSignature>>()).distinct())
     }
 }
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
index 2e4ccdecc2..608d1ad7f4 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
@@ -319,7 +319,7 @@ class RetryFlowMockTest {
 
         private fun doInsert() {
             val tx = DBTransactionStorage.DBTransaction("Foo", null, Utils.EMPTY_BYTES,
-                    DBTransactionStorage.TransactionStatus.VERIFIED, Instant.now())
+                    DBTransactionStorage.TransactionStatus.VERIFIED, Instant.now(), null)
             contextTransaction.session.save(tx)
         }
     }
diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt
index 85b3aaceb4..7456a63cd5 100644
--- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt
+++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/MockServices.kt
@@ -13,27 +13,47 @@ import net.corda.core.identity.Party
 import net.corda.core.identity.PartyAndCertificate
 import net.corda.core.internal.PLATFORM_VERSION
 import net.corda.core.internal.requireSupportedHashType
+import net.corda.core.internal.telemetry.TelemetryComponent
+import net.corda.core.internal.telemetry.TelemetryServiceImpl
 import net.corda.core.messaging.DataFeed
 import net.corda.core.messaging.FlowHandle
 import net.corda.core.messaging.FlowProgressHandle
 import net.corda.core.messaging.StateMachineTransactionMapping
-import net.corda.core.node.*
-import net.corda.core.node.services.*
+import net.corda.core.node.AppServiceHub
+import net.corda.core.node.NetworkParameters
+import net.corda.core.node.NodeInfo
+import net.corda.core.node.ServiceHub
+import net.corda.core.node.ServicesForResolution
+import net.corda.core.node.StatesToRecord
+import net.corda.core.node.services.ContractUpgradeService
+import net.corda.core.node.services.CordaService
+import net.corda.core.node.services.IdentityService
+import net.corda.core.node.services.KeyManagementService
+import net.corda.core.node.services.NetworkMapCache
+import net.corda.core.node.services.NetworkParametersService
+import net.corda.core.node.services.ServiceLifecycleObserver
+import net.corda.core.node.services.TransactionStorage
+import net.corda.core.node.services.TransactionVerifierService
+import net.corda.core.node.services.VaultService
 import net.corda.core.node.services.diagnostics.DiagnosticsService
-import net.corda.core.internal.telemetry.TelemetryComponent
-import net.corda.core.internal.telemetry.TelemetryServiceImpl
 import net.corda.core.node.services.vault.CordaTransactionSupport
 import net.corda.core.serialization.SerializeAsToken
 import net.corda.core.transactions.SignedTransaction
 import net.corda.core.utilities.NetworkHostAndPort
+import net.corda.coretesting.internal.DEV_ROOT_CA
 import net.corda.node.VersionInfo
 import net.corda.node.internal.ServicesForResolutionImpl
 import net.corda.node.internal.cordapp.JarScanningCordappLoader
-import net.corda.node.services.api.*
+import net.corda.node.services.api.SchemaService
+import net.corda.node.services.api.ServiceHubInternal
+import net.corda.node.services.api.StateMachineRecordedTransactionMappingStorage
+import net.corda.node.services.api.VaultServiceInternal
+import net.corda.node.services.api.WritableTransactionStorage
 import net.corda.node.services.diagnostics.NodeDiagnosticsService
 import net.corda.node.services.identity.InMemoryIdentityService
 import net.corda.node.services.identity.PersistentIdentityService
 import net.corda.node.services.keys.BasicHSMKeyManagementService
+import net.corda.node.services.network.PersistentNetworkMapCache
 import net.corda.node.services.persistence.PublicKeyToOwningIdentityCacheImpl
 import net.corda.node.services.schema.NodeSchemaService
 import net.corda.node.services.transactions.InMemoryTransactionVerifierService
@@ -44,12 +64,16 @@ import net.corda.nodeapi.internal.persistence.DatabaseConfig
 import net.corda.nodeapi.internal.persistence.contextTransaction
 import net.corda.testing.common.internal.testNetworkParameters
 import net.corda.testing.core.TestIdentity
-import net.corda.coretesting.internal.DEV_ROOT_CA
-import net.corda.node.services.network.PersistentNetworkMapCache
 import net.corda.testing.internal.MockCordappProvider
 import net.corda.testing.internal.TestingNamedCacheFactory
 import net.corda.testing.internal.configureDatabase
-import net.corda.testing.node.internal.*
+import net.corda.testing.node.internal.DriverDSLImpl
+import net.corda.testing.node.internal.MockCryptoService
+import net.corda.testing.node.internal.MockKeyManagementService
+import net.corda.testing.node.internal.MockNetworkParametersStorage
+import net.corda.testing.node.internal.MockTransactionStorage
+import net.corda.testing.node.internal.cordappsForPackages
+import net.corda.testing.node.internal.getCallerPackage
 import net.corda.testing.services.MockAttachmentStorage
 import java.io.ByteArrayOutputStream
 import java.nio.file.Paths
@@ -57,7 +81,7 @@ import java.security.KeyPair
 import java.sql.Connection
 import java.time.Clock
 import java.time.Instant
-import java.util.*
+import java.util.Properties
 import java.util.function.Consumer
 import java.util.jar.JarFile
 import java.util.zip.ZipEntry
diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt
index 5c430d575e..e5486ffaf9 100644
--- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt
+++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/InternalMockNetwork.kt
@@ -569,12 +569,15 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
         val allActiveFlows = allNodes.flatMap { it.smm.snapshot() }
 
         return allActiveFlows.any {
-            val flowState = it.snapshot().checkpoint.flowState
-            flowState is FlowState.Started && when (flowState.flowIORequest) {
-                is FlowIORequest.ExecuteAsyncOperation -> true
-                is FlowIORequest.Sleep -> true
-                else -> false
-            }
+            val flowSnapshot = it.snapshot()
+            if (!flowSnapshot.isFlowResumed && flowSnapshot.isWaitingForFuture) {
+                val flowState = flowSnapshot.checkpoint.flowState
+                flowState is FlowState.Started && when (flowState.flowIORequest) {
+                    is FlowIORequest.ExecuteAsyncOperation -> true
+                    is FlowIORequest.Sleep -> true
+                    else -> false
+                }
+            } else false
         }
     }
 
diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt
index c1cebf95e1..c54dceba55 100644
--- a/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt
+++ b/testing/node-driver/src/main/kotlin/net/corda/testing/node/internal/MockTransactionStorage.kt
@@ -2,12 +2,15 @@ package net.corda.testing.node.internal
 
 import net.corda.core.concurrent.CordaFuture
 import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.TransactionSignature
 import net.corda.core.internal.concurrent.doneFuture
 import net.corda.core.messaging.DataFeed
 import net.corda.core.serialization.SingletonSerializeAsToken
 import net.corda.core.toFuture
 import net.corda.core.transactions.SignedTransaction
 import net.corda.node.services.api.WritableTransactionStorage
+import net.corda.core.flows.FlowTransactionMetadata
+import net.corda.core.flows.TransactionStatus
 import net.corda.testing.node.MockServices
 import rx.Observable
 import rx.subjects.PublishSubject
@@ -42,11 +45,23 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
     }
 
     override fun addTransaction(transaction: SignedTransaction): Boolean {
-        val current = txns.putIfAbsent(transaction.id, TxHolder(transaction, isVerified = true))
+        val current = txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.VERIFIED))
         return if (current == null) {
             notify(transaction)
         } else if (!current.isVerified) {
-            current.isVerified = true
+            notify(transaction)
+        } else {
+            false
+        }
+    }
+
+    override fun addUnnotarisedTransaction(transaction: SignedTransaction, metadata: FlowTransactionMetadata?): Boolean {
+        return txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.MISSING_NOTARY_SIG)) == null
+    }
+
+    override fun finalizeTransactionWithExtraSignatures(transaction: SignedTransaction, signatures: Collection<TransactionSignature>): Boolean {
+        val current = txns.replace(transaction.id, TxHolder(transaction, status = TransactionStatus.VERIFIED))
+        return if (current != null) {
             notify(transaction)
         } else {
             false
@@ -54,12 +69,14 @@ open class MockTransactionStorage : WritableTransactionStorage, SingletonSeriali
     }
 
     override fun addUnverifiedTransaction(transaction: SignedTransaction) {
-        txns.putIfAbsent(transaction.id, TxHolder(transaction, isVerified = false))
+        txns.putIfAbsent(transaction.id, TxHolder(transaction, status = TransactionStatus.UNVERIFIED))
     }
 
-    override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.isVerified) it.stx else null }
+    override fun getTransaction(id: SecureHash): SignedTransaction? = txns[id]?.let { if (it.status == TransactionStatus.VERIFIED) it.stx else null }
 
-    override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, Boolean>? = txns[id]?.let { Pair(it.stx, it.isVerified) }
+    override fun getTransactionInternal(id: SecureHash): Pair<SignedTransaction, TransactionStatus>? = txns[id]?.let { Pair(it.stx, it.status) }
 
-    private class TxHolder(val stx: SignedTransaction, var isVerified: Boolean)
+    private class TxHolder(val stx: SignedTransaction, var status: TransactionStatus) {
+        val isVerified = status == TransactionStatus.VERIFIED
+    }
 }
\ No newline at end of file
diff --git a/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt b/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt
index 9a25595d63..4ebca902f3 100644
--- a/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt
+++ b/testing/test-utils/src/main/kotlin/net/corda/testing/dsl/TestDSL.kt
@@ -6,12 +6,15 @@ import net.corda.core.contracts.*
 import net.corda.core.cordapp.CordappProvider
 import net.corda.core.crypto.NullKeys.NULL_SIGNATURE
 import net.corda.core.crypto.SecureHash
+import net.corda.core.crypto.TransactionSignature
 import net.corda.core.flows.FlowException
+import net.corda.core.flows.FlowTransactionMetadata
 import net.corda.core.identity.Party
 import net.corda.core.internal.*
 import net.corda.core.internal.notary.NotaryService
 import net.corda.core.node.ServiceHub
 import net.corda.core.node.ServicesForResolution
+import net.corda.core.node.StatesToRecord
 import net.corda.core.node.services.AttachmentId
 import net.corda.core.node.services.TransactionStorage
 import net.corda.core.serialization.internal.AttachmentsClassLoaderCache
@@ -134,6 +137,10 @@ data class TestTransactionDSLInterpreter private constructor(
         override val notaryService: NotaryService? = null
 
         override val attachmentsClassLoaderCache: AttachmentsClassLoaderCache = AttachmentsClassLoaderCacheImpl(TestingNamedCacheFactory())
+
+        override fun recordUnnotarisedTransaction(txn: SignedTransaction, metadata: FlowTransactionMetadata?) {}
+
+        override fun finalizeTransactionWithExtraSignatures(txn: SignedTransaction, sigs: Collection<TransactionSignature>, statesToRecord: StatesToRecord) {}
     }
 
     private fun copy(): TestTransactionDSLInterpreter =