diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowException.kt b/core/src/main/kotlin/net/corda/core/flows/FlowException.kt
index 80a648227a..6395961e06 100644
--- a/core/src/main/kotlin/net/corda/core/flows/FlowException.kt
+++ b/core/src/main/kotlin/net/corda/core/flows/FlowException.kt
@@ -2,6 +2,7 @@ package net.corda.core.flows
 
 import net.corda.core.CordaException
 import net.corda.core.CordaRuntimeException
+import net.corda.core.identity.Party
 
 // DOCSTART 1
 /**
@@ -28,6 +29,9 @@ open class FlowException(message: String?, cause: Throwable?, var originalErrorI
     constructor(cause: Throwable?) : this(cause?.toString(), cause)
     constructor() : this(null, null)
 
+    // private field with obscure name to ensure it is not overridden
+    private var peer: Party? = null
+
     override fun getErrorId(): Long? = originalErrorId
 }
 // DOCEND 1
@@ -42,5 +46,8 @@ class UnexpectedFlowEndException(message: String, cause: Throwable?, val origina
     constructor(message: String, cause: Throwable?) : this(message, cause, null)
     constructor(message: String) : this(message, null)
 
+    // private field with obscure name to ensure it is not overridden
+    private var peer: Party? = null
+
     override fun getErrorId(): Long? = originalErrorId
 }
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
index 9353f8fb7f..36a4e1ecb0 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt
@@ -159,10 +159,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
                     is FlowContinuation.Resume -> {
                         return continuation.result
                     }
-                    is FlowContinuation.Throw -> {
-                        continuation.throwable.fillInStackTrace()
-                        throw continuation.throwable
-                    }
+                    is FlowContinuation.Throw -> throw continuation.throwable.fillInLocalStackTrace()
                     FlowContinuation.ProcessEvents -> continue@eventLoop
                     FlowContinuation.Abort -> abortFiber()
                 }
@@ -173,6 +170,39 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
         }
     }
 
+    private fun Throwable.fillInLocalStackTrace(): Throwable {
+        fillInStackTrace()
+        // provide useful information that can be displayed to the user
+        // reflection use to access private field
+        when (this) {
+            is UnexpectedFlowEndException -> {
+                DeclaredField<Party?>(UnexpectedFlowEndException::class.java, "peer", this).value?.let {
+                    stackTrace = arrayOf(
+                        StackTraceElement(
+                            "Received unexpected counter-flow exception from peer ${it.name}",
+                            "",
+                            "",
+                            -1
+                        )
+                    ) + stackTrace
+                }
+            }
+            is FlowException -> {
+                DeclaredField<Party?>(FlowException::class.java, "peer", this).value?.let {
+                    stackTrace = arrayOf(
+                        StackTraceElement(
+                            "Received counter-flow exception from peer ${it.name}",
+                            "",
+                            "",
+                            -1
+                        )
+                    ) + stackTrace
+                }
+            }
+        }
+        return this
+    }
+
     /**
      * Immediately processes the passed in event. Always called with an open database transaction.
      *
diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt
index a16c212d8e..127f23f1e2 100644
--- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt
+++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/DeliverSessionMessageTransition.kt
@@ -1,6 +1,10 @@
 package net.corda.node.services.statemachine.transitions
 
+import net.corda.core.flows.FlowException
 import net.corda.core.flows.UnexpectedFlowEndException
+import net.corda.core.identity.Party
+import net.corda.core.internal.DeclaredField
+import net.corda.core.internal.declaredField
 import net.corda.node.services.statemachine.Action
 import net.corda.node.services.statemachine.ConfirmSessionMessage
 import net.corda.node.services.statemachine.DataSessionMessage
@@ -120,6 +124,15 @@ class DeliverSessionMessageTransition(
 
         return when (sessionState) {
             is SessionState.Initiated -> {
+                when (exception) {
+                    // reflection used to access private field
+                    is UnexpectedFlowEndException -> DeclaredField<Party?>(
+                        UnexpectedFlowEndException::class.java,
+                        "peer",
+                        exception
+                    ).value = sessionState.peerParty
+                    is FlowException -> DeclaredField<Party?>(FlowException::class.java, "peer", exception).value = sessionState.peerParty
+                }
                 val checkpoint = currentState.checkpoint
                 val sessionId = event.sessionMessage.recipientSessionId
                 val flowError = FlowError(payload.errorId, exception)
diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
index b31c446986..ccfd98962e 100644
--- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
+++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt
@@ -11,6 +11,7 @@ import net.corda.core.crypto.SecureHash
 import net.corda.core.crypto.random63BitValue
 import net.corda.core.flows.*
 import net.corda.core.identity.Party
+import net.corda.core.internal.DeclaredField
 import net.corda.core.internal.concurrent.flatMap
 import net.corda.core.messaging.MessageRecipients
 import net.corda.core.node.services.PartyInfo
@@ -38,6 +39,7 @@ import net.corda.testing.node.internal.*
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.Assertions.assertThatThrownBy
 import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
+import org.assertj.core.api.Condition
 import org.junit.After
 import org.junit.Before
 import org.junit.Test
@@ -45,6 +47,7 @@ import rx.Notification
 import rx.Observable
 import java.time.Instant
 import java.util.*
+import java.util.function.Predicate
 import kotlin.reflect.KClass
 import kotlin.test.assertFailsWith
 
@@ -186,6 +189,7 @@ class FlowFrameworkTests {
                 .isThrownBy { receivingFiber.resultFuture.getOrThrow() }
                 .withMessage("Nothing useful")
                 .withStackTraceContaining(ReceiveFlow::class.java.name)  // Make sure the stack trace is that of the receiving flow
+                .withStackTraceContaining("Received counter-flow exception from peer")
         bobNode.database.transaction {
             assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
         }
@@ -208,6 +212,29 @@ class FlowFrameworkTests {
         assertThat((lastMessage.payload as ErrorSessionMessage).flowException!!.stackTrace).isEmpty()
     }
 
+    @Test
+    fun `sub-class of FlowException can have a peer field without causing serialisation problems`() {
+        val exception = MyPeerFlowException("Nothing useful", alice)
+        bobNode.registerCordappFlowFactory(ReceiveFlow::class) {
+            ExceptionFlow { exception }
+        }
+
+        val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob)) as FlowStateMachineImpl
+
+        mockNet.runNetwork()
+
+        assertThatExceptionOfType(MyPeerFlowException::class.java)
+            .isThrownBy { receivingFiber.resultFuture.getOrThrow() }
+            .has(Condition(Predicate<MyPeerFlowException> { it.peer == alice }, "subclassed peer field has original value"))
+            .has(Condition(Predicate<MyPeerFlowException> {
+                DeclaredField<Party?>(
+                    FlowException::class.java,
+                    "peer",
+                    it
+                ).value == bob
+            }, "FlowException's private peer field has value set"))
+    }
+
     private class ConditionalExceptionFlow(val otherPartySession: FlowSession, val sendPayload: Any) : FlowLogic<Unit>() {
         @Suspendable
         override fun call() {
@@ -732,6 +759,8 @@ internal class MyFlowException(override val message: String) : FlowException() {
     override fun hashCode(): Int = message.hashCode()
 }
 
+internal class MyPeerFlowException(override val message: String, val peer: Party) : FlowException()
+
 @InitiatingFlow
 internal class SendAndReceiveFlow(private val otherParty: Party, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
     constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession)