diff --git a/.ci/api-current.txt b/.ci/api-current.txt
index fac366c8fa..3e83150ab4 100644
--- a/.ci/api-current.txt
+++ b/.ci/api-current.txt
@@ -1200,7 +1200,7 @@ public static final class net.corda.core.flows.FinalityFlow$Companion extends ja
   @org.jetbrains.annotations.NotNull public net.corda.core.utilities.ProgressTracker childProgressTracker()
   public static final net.corda.core.flows.FinalityFlow$Companion$NOTARISING INSTANCE
 ##
-@net.corda.core.serialization.CordaSerializable public class net.corda.core.flows.FlowException extends net.corda.core.CordaException
+@net.corda.core.serialization.CordaSerializable public class net.corda.core.flows.FlowException extends net.corda.core.CordaException implements net.corda.core.flows.IdentifiableException
   public <init>()
   public <init>(String)
   public <init>(String, Throwable)
@@ -1589,9 +1589,10 @@ public final class net.corda.core.flows.TransactionParts extends java.lang.Objec
   public int hashCode()
   public String toString()
 ##
-@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException
-  public <init>(String)
-  public <init>(String, Throwable)
+@net.corda.core.serialization.CordaSerializable public final class net.corda.core.flows.UnexpectedFlowEndException extends net.corda.core.CordaRuntimeException implements net.corda.core.flows.IdentifiableException
+  public <init>(String, Throwable, long)
+  @org.jetbrains.annotations.NotNull public Long getErrorId()
+  public final long getOriginalErrorId()
 ##
 @net.corda.core.DoNotImplement @net.corda.core.serialization.CordaSerializable public abstract class net.corda.core.identity.AbstractParty extends java.lang.Object
   public <init>(java.security.PublicKey)
diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt
index 5050b6253a..9eeddc32d8 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt
@@ -134,6 +134,110 @@ abstract class FlowLogic<out T> {
 
     val ourIdentity: Party get() = stateMachine.ourIdentity
 
+    // Used to implement the deprecated send/receive functions using Party. When such a deprecated function is used we
+    // create a fresh session for the Party, put it here and use it in subsequent deprecated calls.
+    private val deprecatedPartySessionMap = HashMap<Party, FlowSession>()
+    private fun getDeprecatedSessionForParty(party: Party): FlowSession {
+        return deprecatedPartySessionMap.getOrPut(party) { initiateFlow(party) }
+    }
+    /**
+     * Returns a [FlowInfo] object describing the flow [otherParty] is using. With [FlowInfo.flowVersion] it
+     * provides the necessary information needed for the evolution of flows and enabling backwards compatibility.
+     *
+     * This method can be called before any send or receive has been done with [otherParty]. In such a case this will force
+     * them to start their flow.
+     */
+    @Deprecated("Use FlowSession.getCounterpartyFlowInfo()", level = DeprecationLevel.WARNING)
+    @Suspendable
+    fun getFlowInfo(otherParty: Party): FlowInfo = getDeprecatedSessionForParty(otherParty).getCounterpartyFlowInfo()
+
+    /**
+     * Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
+     * is received, which must be of the given [R] type.
+     *
+     * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
+     * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
+     * corrupted data in order to exploit your code.
+     *
+     * Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
+     * use this when you expect to do a message swap than do use [send] and then [receive] in turn.
+     *
+     * @return an [UntrustworthyData] wrapper around the received object.
+     */
+    @Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
+    inline fun <reified R : Any> sendAndReceive(otherParty: Party, payload: Any): UntrustworthyData<R> {
+        return sendAndReceive(R::class.java, otherParty, payload)
+    }
+
+    /**
+     * Serializes and queues the given [payload] object for sending to the [otherParty]. Suspends until a response
+     * is received, which must be of the given [receiveType]. Remember that when receiving data from other parties the data
+     * should not be trusted until it's been thoroughly verified for consistency and that all expectations are
+     * satisfied, as a malicious peer may send you subtly corrupted data in order to exploit your code.
+     *
+     * Note that this function is not just a simple send+receive pair: it is more efficient and more correct to
+     * use this when you expect to do a message swap than do use [send] and then [receive] in turn.
+     *
+     * @return an [UntrustworthyData] wrapper around the received object.
+     */
+    @Deprecated("Use FlowSession.sendAndReceive()", level = DeprecationLevel.WARNING)
+    @Suspendable
+    open fun <R : Any> sendAndReceive(receiveType: Class<R>, otherParty: Party, payload: Any): UntrustworthyData<R> {
+        return getDeprecatedSessionForParty(otherParty).sendAndReceive(receiveType, payload)
+    }
+
+    /**
+     * Similar to [sendAndReceive] but also instructs the `payload` to be redelivered until the expected message is received.
+     *
+     * Note that this method should NOT be used for regular party-to-party communication, use [sendAndReceive] instead.
+     * It is only intended for the case where the [otherParty] is running a distributed service with an idempotent
+     * flow which only accepts a single request and sends back a single response – e.g. a notary or certain types of
+     * oracle services. If one or more nodes in the service cluster go down mid-session, the message will be redelivered
+     * to a different one, so there is no need to wait until the initial node comes back up to obtain a response.
+     */
+    @Deprecated("Use FlowSession.sendAndReceiveWithRetry()", level = DeprecationLevel.WARNING)
+    internal inline fun <reified R : Any> sendAndReceiveWithRetry(otherParty: Party, payload: Any): UntrustworthyData<R> {
+        return getDeprecatedSessionForParty(otherParty).sendAndReceiveWithRetry(payload)
+    }
+
+    /**
+     * Suspends until the specified [otherParty] sends us a message of type [R].
+     *
+     * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
+     * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
+     * corrupted data in order to exploit your code.
+     */
+    @Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
+    inline fun <reified R : Any> receive(otherParty: Party): UntrustworthyData<R> = receive(R::class.java, otherParty)
+
+    /**
+     * Suspends until the specified [otherParty] sends us a message of type [receiveType].
+     *
+     * Remember that when receiving data from other parties the data should not be trusted until it's been thoroughly
+     * verified for consistency and that all expectations are satisfied, as a malicious peer may send you subtly
+     * corrupted data in order to exploit your code.
+     *
+     * @return an [UntrustworthyData] wrapper around the received object.
+     */
+    @Deprecated("Use FlowSession.receive()", level = DeprecationLevel.WARNING)
+    @Suspendable
+    open fun <R : Any> receive(receiveType: Class<R>, otherParty: Party): UntrustworthyData<R> {
+        return getDeprecatedSessionForParty(otherParty).receive(receiveType)
+    }
+
+    /**
+     * Queues the given [payload] for sending to the [otherParty] and continues without suspending.
+     *
+     * Note that the other party may receive the message at some arbitrary later point or not at all: if [otherParty]
+     * is offline then message delivery will be retried until it comes back or until the message is older than the
+     * network's event horizon time.
+     */
+    @Deprecated("Use FlowSession.send()", level = DeprecationLevel.WARNING)
+    @Suspendable
+    open fun send(otherParty: Party, payload: Any) {
+        getDeprecatedSessionForParty(otherParty).send(payload)
+    }
+
     @Suspendable
     internal fun <R : Any> FlowSession.sendAndReceiveWithRetry(receiveType: Class<R>, payload: Any): UntrustworthyData<R> {
         val request = FlowIORequest.SendAndReceive(
diff --git a/docs/source/api-flows.rst b/docs/source/api-flows.rst
index 6e80c7ae72..8f00b6bc0c 100644
--- a/docs/source/api-flows.rst
+++ b/docs/source/api-flows.rst
@@ -416,6 +416,68 @@ Our side of the flow must mirror these calls. We could do this as follows:
         :end-before: DOCEND 08
         :dedent: 12
 
+Why sessions?
+^^^^^^^^^^^^^
+
+Before ``FlowSession`` s were introduced the send/receive API looked a bit different. They were functions on
+``FlowLogic`` and took the address ``Party`` as argument. The platform internally maintained a mapping from ``Party`` to
+session, hiding sessions from the user completely.
+
+Although this is a convenient API it introduces subtle issues where a message that was originally meant for a specific
+session may end up in another.
+
+Consider the following contrived example using the old ``Party`` based API:
+
+.. container:: codeset
+
+    .. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt
+        :language: kotlin
+        :start-after: DOCSTART LaunchSpaceshipFlow
+        :end-before: DOCEND LaunchSpaceshipFlow
+
+The intention of the flows is very clear: LaunchSpaceshipFlow asks the president whether a spaceship should be launched.
+It is expecting a boolean reply. The president in return first tells the secretary that they need coffee, which is also
+communicated with a boolean. Afterwards the president replies to the launcher that they don't want to launch.
+
+However the above can go horribly wrong when the ``launcher`` happens to be the same party ``getSecretary`` returns. In
+this case the boolean meant for the secretary will be received by the launcher!
+
+This indicates that ``Party`` is not a good identifier for the communication sequence, and indeed the ``Party`` based
+API may introduce ways for an attacker to fish for information and even trigger unintended control flow like in the
+above case.
+
+Hence we introduced ``FlowSession``, which identifies the communication sequence. With ``FlowSession`` s the above set
+of flows would look like this:
+
+.. container:: codeset
+
+    .. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt
+        :language: kotlin
+        :start-after: DOCSTART LaunchSpaceshipFlowCorrect
+        :end-before: DOCEND LaunchSpaceshipFlowCorrect
+
+Note how the president is now explicit about which session it wants to send to.
+
+Porting from the old Party-based API
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+In the old API the first ``send`` or ``receive`` to a ``Party`` was the one kicking off the counter-flow. This is now
+explicit in the ``initiateFlow`` function call. To port existing code:
+
+.. container:: codeset
+
+    .. literalinclude:: ../../docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt
+        :language: kotlin
+        :start-after: DOCSTART FlowSession porting
+        :end-before: DOCEND FlowSession porting
+        :dedent: 8
+
+    .. literalinclude:: ../../docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java
+        :language: java
+        :start-after: DOCSTART FlowSession porting
+        :end-before: DOCEND FlowSession porting
+        :dedent: 12
+
 Subflows
 --------
 Subflows are pieces of reusable flows that may be run by calling ``FlowLogic.subFlow``. There are two broad categories
diff --git a/docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java b/docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java
index d17615841d..bf2c58b858 100644
--- a/docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java
+++ b/docs/source/example-code/src/main/java/net/corda/docs/FlowCookbookJava.java
@@ -582,6 +582,13 @@ public class FlowCookbookJava {
             SignedTransaction notarisedTx2 = subFlow(new FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()));
             // DOCEND 10
 
+            // DOCSTART FlowSession porting
+            send(regulator, new Object()); // Old API
+            // becomes
+            FlowSession session = initiateFlow(regulator);
+            session.send(new Object());
+            // DOCEND FlowSession porting
+
             return null;
         }
     }
diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt
index 880570e2df..0528caeaf3 100644
--- a/docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt
+++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/FlowCookbook.kt
@@ -572,6 +572,13 @@ class InitiatorFlow(val arg1: Boolean, val arg2: Int, private val counterparty:
         val additionalParties: Set<Party> = setOf(regulator)
         val notarisedTx2: SignedTransaction = subFlow(FinalityFlow(fullySignedTx, additionalParties, FINALISATION.childProgressTracker()))
         // DOCEND 10
+
+        // DOCSTART FlowSession porting
+        send(regulator, Any()) // Old API
+        // becomes
+        val session = initiateFlow(regulator)
+        session.send(Any())
+        // DOCEND FlowSession porting
     }
 }
 
diff --git a/docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt b/docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt
new file mode 100644
index 0000000000..e6826fa213
--- /dev/null
+++ b/docs/source/example-code/src/main/kotlin/net/corda/docs/LaunchSpaceshipFlow.kt
@@ -0,0 +1,99 @@
+package net.corda.docs
+
+import co.paralleluniverse.fibers.Suspendable
+import net.corda.core.flows.FlowLogic
+import net.corda.core.flows.FlowSession
+import net.corda.core.flows.InitiatedBy
+import net.corda.core.flows.InitiatingFlow
+import net.corda.core.identity.Party
+import net.corda.core.utilities.unwrap
+
+// DOCSTART LaunchSpaceshipFlow
+@InitiatingFlow
+class LaunchSpaceshipFlow : FlowLogic<Unit>() {
+    @Suspendable
+    override fun call() {
+        val shouldLaunchSpaceship = receive<Boolean>(getPresident()).unwrap { it }
+        if (shouldLaunchSpaceship) {
+            launchSpaceship()
+        }
+    }
+
+    fun launchSpaceship() {
+    }
+
+    fun getPresident(): Party {
+        TODO()
+    }
+}
+
+@InitiatedBy(LaunchSpaceshipFlow::class)
+@InitiatingFlow
+class PresidentSpaceshipFlow(val launcher: Party) : FlowLogic<Unit>() {
+    @Suspendable
+    override fun call() {
+        val needCoffee = true
+        send(getSecretary(), needCoffee)
+        val shouldLaunchSpaceship = false
+        send(launcher, shouldLaunchSpaceship)
+    }
+
+    fun getSecretary(): Party {
+        TODO()
+    }
+}
+
+@InitiatedBy(PresidentSpaceshipFlow::class)
+class SecretaryFlow(val president: Party) : FlowLogic<Unit>() {
+    @Suspendable
+    override fun call() {
+        // ignore
+    }
+}
+// DOCEND LaunchSpaceshipFlow
+
+// DOCSTART LaunchSpaceshipFlowCorrect
+@InitiatingFlow
+class LaunchSpaceshipFlowCorrect : FlowLogic<Unit>() {
+    @Suspendable
+    override fun call() {
+        val presidentSession = initiateFlow(getPresident())
+        val shouldLaunchSpaceship = presidentSession.receive<Boolean>().unwrap { it }
+        if (shouldLaunchSpaceship) {
+            launchSpaceship()
+        }
+    }
+
+    fun launchSpaceship() {
+    }
+
+    fun getPresident(): Party {
+        TODO()
+    }
+}
+
+@InitiatedBy(LaunchSpaceshipFlowCorrect::class)
+@InitiatingFlow
+class PresidentSpaceshipFlowCorrect(val launcherSession: FlowSession) : FlowLogic<Unit>() {
+    @Suspendable
+    override fun call() {
+        val needCoffee = true
+        val secretarySession = initiateFlow(getSecretary())
+        secretarySession.send(needCoffee)
+        val shouldLaunchSpaceship = false
+        launcherSession.send(shouldLaunchSpaceship)
+    }
+
+    fun getSecretary(): Party {
+        TODO()
+    }
+}
+
+@InitiatedBy(PresidentSpaceshipFlowCorrect::class)
+class SecretaryFlowCorrect(val presidentSession: FlowSession) : FlowLogic<Unit>() {
+    @Suspendable
+    override fun call() {
+        // ignore
+    }
+}
+// DOCEND LaunchSpaceshipFlowCorrect
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
index 684c74b1ab..5260942c36 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/TopLevelTransition.kt
@@ -130,30 +130,19 @@ class TopLevelTransition(
                     flowState = FlowState.Started(event.ioRequest, event.fiber),
                     numberOfSuspends = currentState.checkpoint.numberOfSuspends + 1
             )
-            if (event.maySkipCheckpoint) {
-                actions.addAll(arrayOf(
-                        Action.CommitTransaction,
-                        Action.ScheduleEvent(Event.DoRemainingWork)
-                ))
-                currentState = currentState.copy(
-                        checkpoint = newCheckpoint,
-                        isFlowResumed = false
-                )
-            } else {
-                actions.addAll(arrayOf(
-                        Action.PersistCheckpoint(context.id, newCheckpoint),
-                        Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
-                        Action.CommitTransaction,
-                        Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
-                        Action.ScheduleEvent(Event.DoRemainingWork)
-                ))
-                currentState = currentState.copy(
-                        checkpoint = newCheckpoint,
-                        pendingDeduplicationHandlers = emptyList(),
-                        isFlowResumed = false,
-                        isAnyCheckpointPersisted = true
-                )
-            }
+            actions.addAll(arrayOf(
+                    Action.PersistCheckpoint(context.id, newCheckpoint),
+                    Action.PersistDeduplicationFacts(currentState.pendingDeduplicationHandlers),
+                    Action.CommitTransaction,
+                    Action.AcknowledgeMessages(currentState.pendingDeduplicationHandlers),
+                    Action.ScheduleEvent(Event.DoRemainingWork)
+            ))
+            currentState = currentState.copy(
+                    checkpoint = newCheckpoint,
+                    pendingDeduplicationHandlers = emptyList(),
+                    isFlowResumed = false,
+                    isAnyCheckpointPersisted = true
+            )
             FlowContinuation.ProcessEvents
         }
     }