CORDA-2572: Add peer information to stacktrace of received FlowException (#4998)

When a `UnexpectedFlowEndException` or a `FlowException` is received the
peer that the exception was thrown from will be added to the stacktrace.
This is due to it being easier to see and a field that developers
are much less likely to override.

A nullable field `peer` has been added to `FlowException` and
`UnexpectedFlowEndException`. This is read later on (when peer info
is not available) to append the peer info to the stacktrace.
This commit is contained in:
Dan Newton 2019-04-24 11:45:10 +01:00 committed by Shams Asari
parent 864a355e63
commit 627096b217
4 changed files with 83 additions and 4 deletions

View File

@ -2,6 +2,7 @@ package net.corda.core.flows
import net.corda.core.CordaException import net.corda.core.CordaException
import net.corda.core.CordaRuntimeException import net.corda.core.CordaRuntimeException
import net.corda.core.identity.Party
// DOCSTART 1 // DOCSTART 1
/** /**
@ -28,6 +29,9 @@ open class FlowException(message: String?, cause: Throwable?, var originalErrorI
constructor(cause: Throwable?) : this(cause?.toString(), cause) constructor(cause: Throwable?) : this(cause?.toString(), cause)
constructor() : this(null, null) constructor() : this(null, null)
// private field with obscure name to ensure it is not overridden
private var peer: Party? = null
override fun getErrorId(): Long? = originalErrorId override fun getErrorId(): Long? = originalErrorId
} }
// DOCEND 1 // DOCEND 1
@ -42,5 +46,8 @@ class UnexpectedFlowEndException(message: String, cause: Throwable?, val origina
constructor(message: String, cause: Throwable?) : this(message, cause, null) constructor(message: String, cause: Throwable?) : this(message, cause, null)
constructor(message: String) : this(message, null) constructor(message: String) : this(message, null)
// private field with obscure name to ensure it is not overridden
private var peer: Party? = null
override fun getErrorId(): Long? = originalErrorId override fun getErrorId(): Long? = originalErrorId
} }

View File

@ -159,10 +159,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
is FlowContinuation.Resume -> { is FlowContinuation.Resume -> {
return continuation.result return continuation.result
} }
is FlowContinuation.Throw -> { is FlowContinuation.Throw -> throw continuation.throwable.fillInLocalStackTrace()
continuation.throwable.fillInStackTrace()
throw continuation.throwable
}
FlowContinuation.ProcessEvents -> continue@eventLoop FlowContinuation.ProcessEvents -> continue@eventLoop
FlowContinuation.Abort -> abortFiber() FlowContinuation.Abort -> abortFiber()
} }
@ -173,6 +170,39 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
} }
} }
private fun Throwable.fillInLocalStackTrace(): Throwable {
fillInStackTrace()
// provide useful information that can be displayed to the user
// reflection use to access private field
when (this) {
is UnexpectedFlowEndException -> {
DeclaredField<Party?>(UnexpectedFlowEndException::class.java, "peer", this).value?.let {
stackTrace = arrayOf(
StackTraceElement(
"Received unexpected counter-flow exception from peer ${it.name}",
"",
"",
-1
)
) + stackTrace
}
}
is FlowException -> {
DeclaredField<Party?>(FlowException::class.java, "peer", this).value?.let {
stackTrace = arrayOf(
StackTraceElement(
"Received counter-flow exception from peer ${it.name}",
"",
"",
-1
)
) + stackTrace
}
}
}
return this
}
/** /**
* Immediately processes the passed in event. Always called with an open database transaction. * Immediately processes the passed in event. Always called with an open database transaction.
* *

View File

@ -1,6 +1,10 @@
package net.corda.node.services.statemachine.transitions package net.corda.node.services.statemachine.transitions
import net.corda.core.flows.FlowException
import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.declaredField
import net.corda.node.services.statemachine.Action import net.corda.node.services.statemachine.Action
import net.corda.node.services.statemachine.ConfirmSessionMessage import net.corda.node.services.statemachine.ConfirmSessionMessage
import net.corda.node.services.statemachine.DataSessionMessage import net.corda.node.services.statemachine.DataSessionMessage
@ -120,6 +124,15 @@ class DeliverSessionMessageTransition(
return when (sessionState) { return when (sessionState) {
is SessionState.Initiated -> { is SessionState.Initiated -> {
when (exception) {
// reflection used to access private field
is UnexpectedFlowEndException -> DeclaredField<Party?>(
UnexpectedFlowEndException::class.java,
"peer",
exception
).value = sessionState.peerParty
is FlowException -> DeclaredField<Party?>(FlowException::class.java, "peer", exception).value = sessionState.peerParty
}
val checkpoint = currentState.checkpoint val checkpoint = currentState.checkpoint
val sessionId = event.sessionMessage.recipientSessionId val sessionId = event.sessionMessage.recipientSessionId
val flowError = FlowError(payload.errorId, exception) val flowError = FlowError(payload.errorId, exception)

View File

@ -11,6 +11,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.crypto.random63BitValue import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.* import net.corda.core.flows.*
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.DeclaredField
import net.corda.core.internal.concurrent.flatMap import net.corda.core.internal.concurrent.flatMap
import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.MessageRecipients
import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.PartyInfo
@ -38,6 +39,7 @@ import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
import org.assertj.core.api.Condition
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
@ -45,6 +47,7 @@ import rx.Notification
import rx.Observable import rx.Observable
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
import java.util.function.Predicate
import kotlin.reflect.KClass import kotlin.reflect.KClass
import kotlin.test.assertFailsWith import kotlin.test.assertFailsWith
@ -186,6 +189,7 @@ class FlowFrameworkTests {
.isThrownBy { receivingFiber.resultFuture.getOrThrow() } .isThrownBy { receivingFiber.resultFuture.getOrThrow() }
.withMessage("Nothing useful") .withMessage("Nothing useful")
.withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow .withStackTraceContaining(ReceiveFlow::class.java.name) // Make sure the stack trace is that of the receiving flow
.withStackTraceContaining("Received counter-flow exception from peer")
bobNode.database.transaction { bobNode.database.transaction {
assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty() assertThat(bobNode.internals.checkpointStorage.checkpoints()).isEmpty()
} }
@ -208,6 +212,29 @@ class FlowFrameworkTests {
assertThat((lastMessage.payload as ErrorSessionMessage).flowException!!.stackTrace).isEmpty() assertThat((lastMessage.payload as ErrorSessionMessage).flowException!!.stackTrace).isEmpty()
} }
@Test
fun `sub-class of FlowException can have a peer field without causing serialisation problems`() {
val exception = MyPeerFlowException("Nothing useful", alice)
bobNode.registerCordappFlowFactory(ReceiveFlow::class) {
ExceptionFlow { exception }
}
val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob)) as FlowStateMachineImpl
mockNet.runNetwork()
assertThatExceptionOfType(MyPeerFlowException::class.java)
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
.has(Condition(Predicate<MyPeerFlowException> { it.peer == alice }, "subclassed peer field has original value"))
.has(Condition(Predicate<MyPeerFlowException> {
DeclaredField<Party?>(
FlowException::class.java,
"peer",
it
).value == bob
}, "FlowException's private peer field has value set"))
}
private class ConditionalExceptionFlow(val otherPartySession: FlowSession, val sendPayload: Any) : FlowLogic<Unit>() { private class ConditionalExceptionFlow(val otherPartySession: FlowSession, val sendPayload: Any) : FlowLogic<Unit>() {
@Suspendable @Suspendable
override fun call() { override fun call() {
@ -732,6 +759,8 @@ internal class MyFlowException(override val message: String) : FlowException() {
override fun hashCode(): Int = message.hashCode() override fun hashCode(): Int = message.hashCode()
} }
internal class MyPeerFlowException(override val message: String, val peer: Party) : FlowException()
@InitiatingFlow @InitiatingFlow
internal class SendAndReceiveFlow(private val otherParty: Party, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() { internal class SendAndReceiveFlow(private val otherParty: Party, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic<Any>() {
constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession) constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession)