diff --git a/build.gradle b/build.gradle
index 640f4b052e..003ec7003f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -469,4 +469,4 @@ wrapper {
 buildScan {
     termsOfServiceUrl = 'https://gradle.com/terms-of-service'
     termsOfServiceAgree = 'yes'
-}
+}
\ No newline at end of file
diff --git a/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt b/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt
index aab20f5e78..14351e0a7d 100644
--- a/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/CollectSignaturesFlow.kt
@@ -3,8 +3,10 @@ package net.corda.core.flows
 import co.paralleluniverse.fibers.Suspendable
 import net.corda.core.crypto.TransactionSignature
 import net.corda.core.crypto.isFulfilledBy
+import net.corda.core.crypto.toStringShort
 import net.corda.core.identity.Party
 import net.corda.core.identity.groupPublicKeysByWellKnownParty
+import net.corda.core.internal.toMultiMap
 import net.corda.core.node.ServiceHub
 import net.corda.core.transactions.SignedTransaction
 import net.corda.core.transactions.WireTransaction
@@ -65,7 +67,8 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
                                                       val sessionsToCollectFrom: Collection<FlowSession>,
                                                       val myOptionalKeys: Iterable<PublicKey>?,
                                                       override val progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : FlowLogic<SignedTransaction>() {
-    @JvmOverloads constructor(partiallySignedTx: SignedTransaction, sessionsToCollectFrom: Collection<FlowSession>, progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker)
+    @JvmOverloads
+    constructor(partiallySignedTx: SignedTransaction, sessionsToCollectFrom: Collection<FlowSession>, progressTracker: ProgressTracker = CollectSignaturesFlow.tracker()) : this(partiallySignedTx, sessionsToCollectFrom, null, progressTracker)
 
     companion object {
         object COLLECTING : ProgressTracker.Step("Collecting signatures from counterparties.")
@@ -105,14 +108,55 @@ class CollectSignaturesFlow @JvmOverloads constructor(val partiallySignedTx: Sig
         // If the unsigned counterparties list is empty then we don't need to collect any more signatures here.
         if (unsigned.isEmpty()) return partiallySignedTx
 
-        val partyToKeysMap = groupPublicKeysByWellKnownParty(serviceHub, unsigned)
-        // Check that we have a session for all parties.  No more, no less.
-        require(sessionsToCollectFrom.map { it.counterparty }.toSet() == partyToKeysMap.keys) {
+        val setOfAllSessionKeys = sessionsToCollectFrom.groupBy { it.sessionOwningKey }.map {
+            require(it.value.size == 1) { "There are multiple sessions initiated for party key ${it.key.toStringShort()}" }
+            it.key to it.value.first()
+        }.toMap()
+
+        val partyToKeysItSignsFor = groupPublicKeysByWellKnownParty(serviceHub, unsigned)
+        val keyToSigningParty = partyToKeysItSignsFor.flatMap { (wellKnown, allKeysItSignsFor) ->
+            allKeysItSignsFor.map { it to wellKnown }
+        }.toMap()
+
+        val unrelatedSessions = sessionsToCollectFrom.filterNot {
+            if (it.sessionOwningKey == it.counterparty.owningKey) {
+                //this session was initiated by a wellKnownParty
+                //the session must have a corresponding unsigned
+                it.counterparty in partyToKeysItSignsFor
+            } else {
+                //this session was not initiated by a wellKnownParty
+                //so must directly exist in the unsigned
+                unsigned.contains(it.sessionOwningKey)
+            }
+        }
+
+        val keyToSessionMap = unsigned.map {
+            if (it in setOfAllSessionKeys) {
+                // the unsigned key exists directly as a sessionKey, so use that session
+                it to setOfAllSessionKeys[it]!!
+            } else {
+                //it might be delegated to a wellKnownParty
+                val wellKnownParty: Party? = keyToSigningParty[it]
+                if (wellKnownParty != null) {
+                    //there is a wellKnownParty for this key, check if it has a session, and if so - use that session
+                    val sessionForWellKnownParty = setOfAllSessionKeys[wellKnownParty.owningKey]
+                            ?: throw IllegalStateException("No session available to request signature for key: ${it.toStringShort()}")
+                    it to sessionForWellKnownParty
+                } else {
+                    throw IllegalStateException("Could not find a session or wellKnown party for key ${it.toStringShort()}")
+                }
+            }
+        }
+
+        //now invert the map to find the keys per session
+        val sessionToKeysMap = keyToSessionMap.map { it.second to it.first }.toMultiMap()
+
+        require(unrelatedSessions.isEmpty()) {
             "The Initiator of CollectSignaturesFlow must pass in exactly the sessions required to sign the transaction."
         }
         // Collect signatures from all counterparties and append them to the partially signed transaction.
-        val counterpartySignatures = sessionsToCollectFrom.flatMap { session ->
-            subFlow(CollectSignatureFlow(partiallySignedTx, session, partyToKeysMap[session.counterparty]!!))
+        val counterpartySignatures = sessionToKeysMap.flatMap { (session, keys)->
+            subFlow(CollectSignatureFlow(partiallySignedTx, session, keys))
         }
         val stx = partiallySignedTx + counterpartySignatures
 
@@ -196,7 +240,7 @@ class CollectSignatureFlow(val partiallySignedTx: SignedTransaction, val session
  * @param otherSideSession The session which is providing you a transaction to sign.
  */
 abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSession: FlowSession,
-                                   override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic<SignedTransaction>() {
+                                                             override val progressTracker: ProgressTracker = SignTransactionFlow.tracker()) : FlowLogic<SignedTransaction>() {
 
     companion object {
         object RECEIVING : ProgressTracker.Step("Receiving transaction proposal for signing.")
@@ -247,12 +291,6 @@ abstract class SignTransactionFlow @JvmOverloads constructor(val otherSideSessio
 
     @Suspendable
     private fun checkSignatures(stx: SignedTransaction) {
-        // We set `ignoreUnrecognisedParties` to `true` in `groupPublicKeysByWellKnownParty`. This is because we don't 
-        // need to recognise all keys, but just the initiator's.
-        val signingWellKnownIdentities = groupPublicKeysByWellKnownParty(serviceHub, stx.sigs.map(TransactionSignature::by), true)
-        require(otherSideSession.counterparty in signingWellKnownIdentities) {
-            "The Initiator of CollectSignaturesFlow must have signed the transaction. Found $signingWellKnownIdentities, expected $otherSideSession"
-        }
         val signed = stx.sigs.map { it.by }
         val allSigners = stx.tx.requiredSigningKeys
         val notSigned = allSigners - signed
diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt
index bca4a37b5c..481d875384 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt
@@ -6,6 +6,7 @@ import net.corda.core.CordaInternal
 import net.corda.core.DeleteForDJVM
 import net.corda.core.contracts.StateRef
 import net.corda.core.crypto.SecureHash
+import net.corda.core.identity.AbstractParty
 import net.corda.core.identity.Party
 import net.corda.core.identity.PartyAndCertificate
 import net.corda.core.internal.*
@@ -115,6 +116,19 @@ abstract class FlowLogic<out T> {
      */
     val serviceHub: ServiceHub get() = stateMachine.serviceHub
 
+    /**
+     * Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
+     * that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
+     */
+    @Suspendable
+    fun initiateFlow(requested: AbstractParty): FlowSession {
+        val wellKnown = serviceHub.identityService.wellKnownPartyFromAnonymous(requested)
+        if (wellKnown == null) {
+            throw IllegalStateException("could not initiate flow with party $requested as they are not in the node identity service")
+        }
+        return stateMachine.initiateFlow(wellKnown = wellKnown, requested = requested)
+    }
+
     /**
      * Creates a communication session with [party]. Subsequently you may send/receive using this session object. Note
      * that this function does not communicate in itself, the counter-flow will be kicked off by the first send/receive.
@@ -252,7 +266,6 @@ abstract class FlowLogic<out T> {
         return sendAndReceiveWithRetry(R::class.java, payload)
     }
 
-
     /** Suspends until a message has been received for each session in the specified [sessions].
      *
      * Consider [receiveAll(receiveType: Class<R>, sessions: List<FlowSession>): List<UntrustworthyData<R>>] when the same type is expected from all sessions.
diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt b/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt
index b1782f5424..5f45e985c0 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FlowSession.kt
@@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
 import net.corda.core.DoNotImplement
 import net.corda.core.identity.Party
 import net.corda.core.utilities.UntrustworthyData
+import java.security.PublicKey
 
 /**
  *
@@ -44,6 +45,13 @@ import net.corda.core.utilities.UntrustworthyData
  */
 @DoNotImplement
 abstract class FlowSession {
+
+    /**
+     * The current [sessionOwningKey] in the context of this session. It is not guaranteed to be [counterparty.owningKey] as the session
+     * may have been initiated with an [net.corda.core.identity.AnonymousParty] useful for grouping sessions vs signing requests
+     */
+    abstract val sessionOwningKey: PublicKey
+
     /**
      * The [Party] on the other side of this session. In the case of a session created by [FlowLogic.initiateFlow]
      *   [counterparty] is the same Party as the one passed to that function.
diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt
index d2c8222b0d..1a7d71263d 100644
--- a/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt
+++ b/core/src/main/kotlin/net/corda/core/internal/FlowStateMachine.kt
@@ -9,6 +9,7 @@ import net.corda.core.flows.FlowLogic
 import net.corda.core.flows.FlowSession
 import net.corda.core.flows.FlowStackSnapshot
 import net.corda.core.flows.StateMachineRunId
+import net.corda.core.identity.AbstractParty
 import net.corda.core.identity.Party
 import net.corda.core.node.ServiceHub
 import org.slf4j.Logger
@@ -21,7 +22,10 @@ interface FlowStateMachine<FLOWRETURN> {
     fun <SUSPENDRETURN : Any> suspend(ioRequest: FlowIORequest<SUSPENDRETURN>, maySkipCheckpoint: Boolean): SUSPENDRETURN
 
     @Suspendable
-    fun initiateFlow(party: Party): FlowSession
+    fun initiateFlow(party: Party): FlowSession = initiateFlow(wellKnown = party, requested = null)
+
+    @Suspendable
+    fun initiateFlow(wellKnown: Party, requested: AbstractParty?): FlowSession
 
     fun checkFlowPermission(permissionName: String, extraAuditData: Map<String, String>)
 
diff --git a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt
index b8f2bc7757..57aebf2417 100644
--- a/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt
+++ b/core/src/main/kotlin/net/corda/core/node/services/IdentityService.kt
@@ -5,6 +5,8 @@ import net.corda.core.DoNotImplement
 import net.corda.core.contracts.PartyAndReference
 import net.corda.core.crypto.toStringShort
 import net.corda.core.identity.*
+import net.corda.core.internal.hash
+import net.corda.core.utilities.contextLogger
 import java.security.InvalidAlgorithmParameterException
 import java.security.PublicKey
 import java.security.cert.*
@@ -24,6 +26,10 @@ interface IdentityService {
     val trustAnchor: TrustAnchor
     val caCertStore: CertStore
 
+    companion object {
+        val log = contextLogger()
+    }
+
     /**
      * Verify and then store an identity.
      *
@@ -94,6 +100,7 @@ interface IdentityService {
         // The original version of this would return the party as-is if it was a Party (rather than AnonymousParty),
         // however that means that we don't verify that we know who owns the key. As such as now enforce turning the key
         // into a party, and from there figure out the well known party.
+        log.debug("Attempting to find wellKnownParty for: ${party.owningKey.hash}")
         val candidate = partyFromKey(party.owningKey)
         // TODO: This should be done via the network map cache, which is the authoritative source of well known identities
         return if (candidate != null) {
diff --git a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt
index 5e18e0af6f..b79f9fab14 100644
--- a/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt
+++ b/core/src/test/kotlin/net/corda/core/flows/CollectSignaturesFlowTests.kt
@@ -6,13 +6,11 @@ import net.corda.core.contracts.Command
 import net.corda.core.contracts.StateAndContract
 import net.corda.core.contracts.requireThat
 import net.corda.core.flows.mixins.WithContracts
-import net.corda.core.identity.CordaX500Name
-import net.corda.core.identity.Party
-import net.corda.core.identity.excludeHostNode
-import net.corda.core.identity.groupAbstractPartyByWellKnownParty
+import net.corda.core.identity.*
 import net.corda.core.node.services.IdentityService
 import net.corda.core.transactions.SignedTransaction
 import net.corda.core.transactions.TransactionBuilder
+import net.corda.node.services.api.IdentityServiceInternal
 import net.corda.testing.contracts.DummyContract
 import net.corda.testing.core.*
 import net.corda.testing.internal.matchers.flow.willReturn
@@ -23,8 +21,11 @@ import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
 import net.corda.testing.node.internal.InternalMockNetwork
 import net.corda.testing.node.internal.TestStartedNode
 import net.corda.testing.node.internal.enclosedCordapp
+import org.hamcrest.CoreMatchers.`is`
 import org.junit.AfterClass
+import org.junit.Assert
 import org.junit.Test
+import java.security.PublicKey
 
 class CollectSignaturesFlowTests : WithContracts {
     companion object {
@@ -55,11 +56,59 @@ class CollectSignaturesFlowTests : WithContracts {
         aliceNode.verifyAndRegister(bConfidentialIdentity)
 
         assert.that(
-            aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie),
+                aliceNode.startTestFlow(alice, bConfidentialIdentity.party, charlie),
                 willReturn(requiredSignatures(3))
         )
     }
 
+    @Test
+    fun `successfully collects signatures when sessions are initiated with AnonymousParty`() {
+        val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice)
+        val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob)
+        val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob)
+        val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie)
+
+        bobNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java)
+        charlieNode.registerInitiatedFlow(AnonymousSessionTestFlowResponder::class.java)
+
+        val owners = listOf(aConfidentialIdentity1, bConfidentialIdentity1, bConfidentialIdentity2, cConfidentialIdentity1)
+
+        val future = aliceNode.startFlow(AnonymousSessionTestFlow(owners)).resultFuture
+        mockNet.runNetwork()
+        val stx = future.get()
+        val missingSigners = stx.getMissingSigners()
+        Assert.assertThat(missingSigners, `is`(emptySet()))
+    }
+
+    @Test
+    fun `successfully collects signatures when sessions are initiated with both AnonymousParty and WellKnownParty`() {
+        val aConfidentialIdentity1 = aliceNode.createConfidentialIdentity(alice)
+        val bConfidentialIdentity1 = bobNode.createConfidentialIdentity(bob)
+        val bConfidentialIdentity2 = bobNode.createConfidentialIdentity(bob)
+        val cConfidentialIdentity1 = charlieNode.createConfidentialIdentity(charlie)
+        val cConfidentialIdentity2 = charlieNode.createConfidentialIdentity(charlie)
+
+        bobNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java)
+        charlieNode.registerInitiatedFlow(MixAndMatchAnonymousSessionTestFlowResponder::class.java)
+
+        val owners = listOf(
+                aConfidentialIdentity1,
+                bConfidentialIdentity1,
+                bConfidentialIdentity2,
+                cConfidentialIdentity1,
+                cConfidentialIdentity2
+        )
+
+        val keysToLookup = listOf(bConfidentialIdentity1.owningKey, bConfidentialIdentity2.owningKey, cConfidentialIdentity1.owningKey)
+        val keysToKeepAnonymous = listOf(cConfidentialIdentity2.owningKey)
+
+        val future = aliceNode.startFlow(MixAndMatchAnonymousSessionTestFlow(owners, keysToLookup.toSet(), keysToKeepAnonymous.toSet())).resultFuture
+        mockNet.runNetwork()
+        val stx = future.get()
+        val missingSigners = stx.getMissingSigners()
+        Assert.assertThat(missingSigners, `is`(emptySet()))
+    }
+
     @Test
     fun `no need to collect any signatures`() {
         val ptx = aliceNode.signDummyContract(alice.ref(1))
@@ -97,10 +146,10 @@ class CollectSignaturesFlowTests : WithContracts {
     //region Operators
     private fun TestStartedNode.startTestFlow(vararg party: Party) =
             startFlowAndRunNetwork(
-                TestFlow.Initiator(DummyContract.MultiOwnerState(
-                    MAGIC_NUMBER,
-                    listOf(*party)),
-                    mockNet.defaultNotaryIdentity))
+                    TestFlow.Initiator(DummyContract.MultiOwnerState(
+                            MAGIC_NUMBER,
+                            listOf(*party)),
+                            mockNet.defaultNotaryIdentity))
 
     //region Test Flow
     // With this flow, the initiator starts the "CollectTransactionFlow". It is then the responders responsibility to
@@ -144,3 +193,82 @@ class CollectSignaturesFlowTests : WithContracts {
     }
     //region
 }
+
+@InitiatingFlow
+class AnonymousSessionTestFlow(val cis: List<PartyAndCertificate>) : FlowLogic<SignedTransaction>() {
+    @Suspendable
+    override fun call(): SignedTransaction {
+
+        for (ci in cis) {
+            if (ci.name != ourIdentity.name) {
+                (serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci)
+            }
+        }
+        val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) })
+        val create = net.corda.testing.contracts.DummyContract.Commands.Create()
+        val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first())
+                .addOutputState(state)
+                .addCommand(create, cis.map { it.owningKey })
+
+
+        val ourKey = cis.single { it.name == ourIdentity.name }.owningKey
+        val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey)
+        val sessionsToCollectFrom = cis.filter { it.name != ourIdentity.name }.map { initiateFlow(AnonymousParty(it.owningKey)) }
+        return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey)))
+    }
+}
+
+@InitiatedBy(AnonymousSessionTestFlow::class)
+class AnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
+    @Suspendable
+    override fun call() {
+        val signFlow = object : SignTransactionFlow(otherSideSession) {
+            @Suspendable
+            override fun checkTransaction(stx: SignedTransaction) = requireThat {
+            }
+        }
+        val stxId = subFlow(signFlow).id
+    }
+}
+
+@InitiatingFlow
+class MixAndMatchAnonymousSessionTestFlow(val cis: List<PartyAndCertificate>,
+                                          val keysToLookUp: Set<PublicKey>,
+                                          val keysToKeepAnonymous: Set<PublicKey>) : FlowLogic<SignedTransaction>() {
+    @Suspendable
+    override fun call(): SignedTransaction {
+
+        for (ci  in cis) {
+            if (ci.name != ourIdentity.name) {
+                (serviceHub.identityService as IdentityServiceInternal).verifyAndRegisterIdentity(ci)
+            }
+        }
+        val state = DummyContract.MultiOwnerState(owners = cis.map { AnonymousParty(it.owningKey) })
+        val create = net.corda.testing.contracts.DummyContract.Commands.Create()
+        val txBuilder = TransactionBuilder(notary = serviceHub.networkMapCache.notaryIdentities.first())
+                .addOutputState(state)
+                .addCommand(create, cis.map { it.owningKey })
+
+
+        val ourKey = cis.single { it.name == ourIdentity.name }.owningKey
+        val signedByUsTx = serviceHub.signInitialTransaction(txBuilder, ourKey)
+
+        val resolvedParties = keysToLookUp.map { serviceHub.identityService.wellKnownPartyFromAnonymous(AnonymousParty(it))!! }.toSet()
+        val anonymousParties = keysToKeepAnonymous.map { AnonymousParty(it) }
+        val sessionsToCollectFrom = (resolvedParties + anonymousParties).map { initiateFlow(it) }
+        return subFlow(CollectSignaturesFlow(signedByUsTx, sessionsToCollectFrom, myOptionalKeys = listOf(ourKey)))
+    }
+}
+
+@InitiatedBy(MixAndMatchAnonymousSessionTestFlow::class)
+class MixAndMatchAnonymousSessionTestFlowResponder(private val otherSideSession: FlowSession) : FlowLogic<Unit>() {
+    @Suspendable
+    override fun call() {
+        val signFlow = object : SignTransactionFlow(otherSideSession) {
+            @Suspendable
+            override fun checkTransaction(stx: SignedTransaction) = requireThat {
+            }
+        }
+        val stxId = subFlow(signFlow).id
+    }
+}
diff --git a/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt b/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt
index 070e5b0ed1..94a2199611 100644
--- a/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt
+++ b/node/src/main/kotlin/net/corda/node/services/api/IdentityServiceInternal.kt
@@ -1,6 +1,7 @@
 package net.corda.node.services.api
 
 import net.corda.core.identity.CordaX500Name
+import net.corda.core.identity.Party
 import net.corda.core.identity.PartyAndCertificate
 import net.corda.core.internal.CertRole
 import net.corda.core.node.services.IdentityService
@@ -8,6 +9,7 @@ import net.corda.core.utilities.contextLogger
 import net.corda.nodeapi.internal.crypto.X509Utilities
 import net.corda.nodeapi.internal.crypto.x509Certificates
 import java.security.InvalidAlgorithmParameterException
+import java.security.PublicKey
 import java.security.cert.CertPathValidatorException
 import java.security.cert.CertificateExpiredException
 import java.security.cert.CertificateNotYetValidException
diff --git a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt
index 3b46ddaca1..d57a37425d 100644
--- a/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/identity/InMemoryIdentityService.kt
@@ -22,6 +22,8 @@ import javax.annotation.concurrent.ThreadSafe
 @ThreadSafe
 class InMemoryIdentityService(identities: List<PartyAndCertificate> = emptyList(),
                               override val trustRoot: X509Certificate) : SingletonSerializeAsToken(), IdentityServiceInternal {
+
+
     companion object {
         private val log = contextLogger()
     }
diff --git a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt
index 2fe6d1deb8..be70ca57c7 100644
--- a/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt
+++ b/node/src/main/kotlin/net/corda/node/services/identity/PersistentIdentityService.kt
@@ -9,6 +9,7 @@ import net.corda.core.serialization.SingletonSerializeAsToken
 import net.corda.core.utilities.MAX_HASH_HEX_SIZE
 import net.corda.core.utilities.contextLogger
 import net.corda.core.utilities.debug
+import net.corda.core.utilities.toBase58String
 import net.corda.node.services.api.IdentityServiceInternal
 import net.corda.node.utilities.AppendOnlyPersistentMap
 import net.corda.nodeapi.internal.crypto.X509CertificateFactory
@@ -170,7 +171,9 @@ class PersistentIdentityService(cacheFactory: NamedCacheFactory) : SingletonSeri
     }
 
     // We give the caller a copy of the data set to avoid any locking problems
-    override fun getAllIdentities(): Iterable<PartyAndCertificate> = database.transaction { keyToParties.allPersisted().map { it.second }.asIterable() }
+    override fun getAllIdentities(): Iterable<PartyAndCertificate> = database.transaction {
+        keyToParties.allPersisted().map { it.second }.asIterable()
+    }
 
     override fun wellKnownPartyFromX500Name(name: CordaX500Name): Party? = certificateFromCordaX500Name(name)?.party
 
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt
index e08d2d5884..25cb7b79fa 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Event.kt
@@ -1,6 +1,7 @@
 package net.corda.node.services.statemachine
 
 import net.corda.core.flows.FlowLogic
+import net.corda.core.identity.AbstractParty
 import net.corda.core.identity.Party
 import net.corda.core.internal.FlowIORequest
 import net.corda.core.serialization.SerializedBytes
@@ -71,7 +72,7 @@ sealed class Event {
      * communication takes place at this time, only on the first send/receive operation on the session.
      * @param party the [Party] to create a session with.
      */
-    data class InitiateFlow(val party: Party) : Event()
+    data class InitiateFlow(val wellKnownParty: Party, val requestedParty: AbstractParty?) : Event()
 
     /**
      * Signal the entering into a subflow.
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt
index 364546ce9d..08ff17f3ea 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowSessionImpl.kt
@@ -7,15 +7,17 @@ import net.corda.core.flows.FlowSession
 import net.corda.core.identity.Party
 import net.corda.core.internal.FlowIORequest
 import net.corda.core.internal.FlowStateMachine
+import net.corda.core.internal.checkPayloadIs
 import net.corda.core.serialization.SerializationDefaults
 import net.corda.core.serialization.SerializedBytes
 import net.corda.core.serialization.serialize
 import net.corda.core.utilities.NonEmptySet
 import net.corda.core.utilities.UntrustworthyData
-import net.corda.core.internal.checkPayloadIs
+import java.security.PublicKey
 
 class FlowSessionImpl(
         override val counterparty: Party,
+        override val sessionOwningKey: PublicKey,
         val sourceSessionId: SessionId
 ) : FlowSession() {
 
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
index 36a4e1ecb0..0c7e3b56ff 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
@@ -10,6 +10,7 @@ import net.corda.core.concurrent.CordaFuture
 import net.corda.core.context.InvocationContext
 import net.corda.core.cordapp.Cordapp
 import net.corda.core.flows.*
+import net.corda.core.identity.AbstractParty
 import net.corda.core.identity.Party
 import net.corda.core.internal.*
 import net.corda.core.serialization.internal.CheckpointSerializationContext
@@ -230,7 +231,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
                 "Transaction context is missing. This might happen if a suspendable method is not annotated with @Suspendable annotation."
             }
         } else {
-            require(contextTransactionOrNull == null){"Transaction is marked as not present, but is not null"}
+            require(contextTransactionOrNull == null) { "Transaction is marked as not present, but is not null" }
         }
     }
 
@@ -354,9 +355,9 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
     }
 
     @Suspendable
-    override fun initiateFlow(party: Party): FlowSession {
+    override fun initiateFlow(wellKnown: Party, requested: AbstractParty?): FlowSession {
         val resume = processEventImmediately(
-                Event.InitiateFlow(party),
+                Event.InitiateFlow(wellKnownParty = wellKnown, requestedParty = requested),
                 isDbTransactionOpenOnEntry = true,
                 isDbTransactionOpenOnExit = true
         ) as FlowContinuation.Resume
@@ -440,7 +441,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
                     isDbTransactionOpenOnEntry = true,
                     isDbTransactionOpenOnExit = false
             )
-            require(continuation == FlowContinuation.ProcessEvents){"Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation "}
+            require(continuation == FlowContinuation.ProcessEvents) { "Expected a continuation of type ${FlowContinuation.ProcessEvents}, found $continuation " }
             unpark(SERIALIZER_BLOCKER)
         }
         return uncheckedCast(processEventsUntilFlowIsResumed(
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
index 76994a826e..34883e0e72 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt
@@ -37,11 +37,7 @@ import net.corda.node.services.api.ServiceHubInternal
 import net.corda.node.services.config.shouldCheckCheckpoints
 import net.corda.node.services.messaging.DeduplicationHandler
 import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
-import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
-import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
-import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
-import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
-import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
+import net.corda.node.services.statemachine.interceptors.*
 import net.corda.node.services.statemachine.transitions.StateMachine
 import net.corda.node.utilities.AffinityExecutor
 import net.corda.node.utilities.errorAndTerminate
@@ -57,12 +53,13 @@ import rx.subjects.PublishSubject
 import java.lang.Integer.min
 import java.security.SecureRandom
 import java.util.HashSet
-import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Executors
-import java.util.concurrent.ScheduledFuture
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.*
 import javax.annotation.concurrent.ThreadSafe
+import kotlin.collections.ArrayList
+import kotlin.collections.HashMap
+import kotlin.collections.component1
+import kotlin.collections.component2
+import kotlin.collections.set
 import kotlin.streams.toList
 
 /**
@@ -469,7 +466,7 @@ class SingleThreadedStateMachineManager(
         try {
             val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage)
             val initiatedSessionId = SessionId.createRandom(secureRandom)
-            val senderSession = FlowSessionImpl(sender, initiatedSessionId)
+            val senderSession = FlowSessionImpl(sender, sender.owningKey, initiatedSessionId)
             val flowLogic = initiatedFlowFactory.createFlow(senderSession)
             val initiatedFlowInfo = when (initiatedFlowFactory) {
                 is InitiatedFlowFactory.Core -> FlowInfo(serviceHub.myInfo.platformVersion, "corda")
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
index d9e7b7e173..0b0c1aa6c4 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
@@ -235,8 +235,8 @@ class TopLevelTransition(
                 return@builder FlowContinuation.ProcessEvents
             }
             val sourceSessionId = SessionId.createRandom(context.secureRandom)
-            val sessionImpl = FlowSessionImpl(event.party, sourceSessionId)
-            val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.party, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
+            val sessionImpl = FlowSessionImpl(event.wellKnownParty, event.requestedParty?.owningKey?: event.wellKnownParty.owningKey, sourceSessionId)
+            val newSessions = checkpoint.sessions + (sourceSessionId to SessionState.Uninitiated(event.wellKnownParty, initiatingSubFlow, sourceSessionId, context.secureRandom.nextLong()))
             currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
             actions.add(Action.AddSessionBinding(context.id, sourceSessionId))
             FlowContinuation.Resume(sessionImpl)
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
index 776c8eb9f6..4b8c261e9d 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/RetryFlowMockTest.kt
@@ -142,6 +142,7 @@ class RetryFlowMockTest {
         val alice = TestIdentity(CordaX500Name.parse("L=London,O=Alice Ltd,OU=Trade,C=GB")).party
         val records = nodeA.smm.flowHospital.track().updates.toBlocking().toIterable().iterator()
         val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
+            override val sessionOwningKey = alice.owningKey
             override val counterparty = alice
 
             override fun getCounterpartyFlowInfo(maySkipCheckpoint: Boolean): FlowInfo {