FlowException thrown by a flow is propagated to all counterparties

This commit is contained in:
Shams Asari
2017-01-19 12:00:14 +00:00
parent cfcfb30beb
commit 646ce8afe0
33 changed files with 634 additions and 606 deletions

View File

@ -5,14 +5,15 @@ import net.corda.core.contracts.Amount
import net.corda.core.contracts.POUNDS
import net.corda.core.contracts.issuedBy
import net.corda.core.crypto.Party
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.OpaqueBytes
import net.corda.core.toFuture
import net.corda.flows.CashCommand
import net.corda.flows.CashFlow
import net.corda.flows.CashFlowResult
import net.corda.node.driver.DriverBasedTest
import net.corda.node.driver.NodeHandle
import net.corda.node.driver.driver
@ -137,13 +138,13 @@ class DistributedServiceTests : DriverBasedTest() {
val issueHandle = aliceProxy.startFlow(
::CashFlow,
CashCommand.IssueCash(amount, OpaqueBytes.of(0), alice.nodeInfo.legalIdentity, raftNotaryIdentity))
require(issueHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
issueHandle.returnValue.toFuture().getOrThrow()
}
private fun paySelf(amount: Amount<Currency>) {
val payHandle = aliceProxy.startFlow(
::CashFlow,
CashCommand.PayCash(amount.issuedBy(alice.nodeInfo.legalIdentity.ref(0)), alice.nodeInfo.legalIdentity))
require(payHandle.returnValue.toBlocking().first() is CashFlowResult.Success)
payHandle.returnValue.toFuture().getOrThrow()
}
}

View File

@ -28,7 +28,6 @@ import net.corda.core.node.services.*
import net.corda.core.serialization.*
import net.corda.core.transactions.SignedTransaction
import net.corda.core.transactions.WireTransaction
import net.corda.flows.CashFlowResult
import net.corda.node.internal.AbstractNode
import net.corda.node.services.User
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NODE_USER
@ -190,8 +189,6 @@ private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null)
register(Cash.Clauses.ConserveAmount::class.java)
register(listOf(Unit).javaClass) // SingletonList
register(setOf(Unit).javaClass) // SingletonSet
register(CashFlowResult.Success::class.java)
register(CashFlowResult.Failed::class.java)
register(ServiceEntry::class.java)
register(NodeInfo::class.java)
register(PhysicalLocation::class.java)

View File

@ -2,16 +2,17 @@ package net.corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.crypto.Party
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.node.CordaPluginRegistry
import net.corda.core.node.PluginServiceHub
import net.corda.core.node.recordTransactions
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.core.transactions.SignedTransaction
import net.corda.flows.*
import net.corda.core.node.CordaPluginRegistry
import java.io.InputStream
import javax.annotation.concurrent.ThreadSafe
import java.util.function.Function
import javax.annotation.concurrent.ThreadSafe
object DataVending {
@ -33,55 +34,40 @@ object DataVending {
*/
@ThreadSafe
class Service(services: PluginServiceHub) : SingletonSerializeAsToken() {
companion object {
val logger = loggerFor<DataVending.Service>()
}
init {
services.registerFlowInitiator(FetchTransactionsFlow::class, ::FetchTransactionsHandler)
services.registerFlowInitiator(FetchAttachmentsFlow::class, ::FetchAttachmentsHandler)
services.registerFlowInitiator(BroadcastTransactionFlow::class, ::NotifyTransactionHandler)
}
private class FetchTransactionsHandler(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
require(it.hashes.isNotEmpty())
it
}
val txs = request.hashes.map {
val tx = serviceHub.storageService.validatedTransactions.getTransaction(it)
if (tx == null)
logger.info("Got request for unknown tx $it")
tx
}
send(otherParty, txs)
private class FetchTransactionsHandler(otherParty: Party) : FetchDataHandler<SignedTransaction>(otherParty) {
override fun getData(id: SecureHash): SignedTransaction? {
return serviceHub.storageService.validatedTransactions.getTransaction(id)
}
}
// TODO: Use Artemis message streaming support here, called "large messages". This avoids the need to buffer.
private class FetchAttachmentsHandler(val otherParty: Party) : FlowLogic<Unit>() {
private class FetchAttachmentsHandler(otherParty: Party) : FetchDataHandler<ByteArray>(otherParty) {
override fun getData(id: SecureHash): ByteArray? {
return serviceHub.storageService.attachments.openAttachment(id)?.open()?.readBytes()
}
}
private abstract class FetchDataHandler<out T>(val otherParty: Party) : FlowLogic<Unit>() {
@Suspendable
@Throws(FetchDataFlow.HashNotFound::class)
override fun call() {
val request = receive<FetchDataFlow.Request>(otherParty).unwrap {
require(it.hashes.isNotEmpty())
if (it.hashes.isEmpty()) throw FlowException("Empty hash list")
it
}
val attachments = request.hashes.map {
val jar: InputStream? = serviceHub.storageService.attachments.openAttachment(it)?.open()
if (jar == null) {
logger.info("Got request for unknown attachment $it")
null
} else {
jar.readBytes()
}
val response = request.hashes.map {
getData(it) ?: throw FetchDataFlow.HashNotFound(it)
}
send(otherParty, attachments)
send(otherParty, response)
}
protected abstract fun getData(id: SecureHash): T?
}

View File

@ -30,7 +30,7 @@ import java.util.concurrent.ExecutionException
class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
val logic: FlowLogic<R>,
scheduler: FiberScheduler) : Fiber<R>("flow", scheduler), FlowStateMachine<R> {
scheduler: FiberScheduler) : Fiber<Unit>("flow", scheduler), FlowStateMachine<R> {
companion object {
// Used to work around a small limitation in Quasar.
private val QUASAR_UNBLOCKER = run {
@ -49,7 +49,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
@Transient override lateinit var serviceHub: ServiceHubInternal
@Transient internal lateinit var database: Database
@Transient internal lateinit var actionOnSuspend: (FlowIORequest) -> Unit
@Transient internal lateinit var actionOnEnd: () -> Unit
@Transient internal lateinit var actionOnEnd: (FlowException?) -> Unit
@Transient internal var fromCheckpoint: Boolean = false
@Transient private var txTrampoline: Transaction? = null
@ -80,29 +80,41 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
override fun run(): R {
override fun run() {
createTransaction()
val result = try {
logic.call()
} catch (e: FlowException) {
if (e.stackTrace[0].className == javaClass.name) {
// FlowException was propagated to us as it's stack trace points to this internal class (see suspendAndExpectReceive).
// If we've got to here then the flow doesn't want to handle it and so we end, but we don't propagate
// the exception further as it's not relevant to anyone else.
actionOnEnd(null)
} else {
// FLowException came from this flow
actionOnEnd(e)
}
_resultFuture?.setException(e)
return
} catch (t: Throwable) {
actionOnEnd()
actionOnEnd(null)
_resultFuture?.setException(t)
throw ExecutionException(t)
}
// Wait for sessions with unconfirmed session state.
openSessions.values.filter { it.state is FlowSessionState.Initiating }.forEach {
it.waitForConfirmation()
}
// Only sessions which have a single send and nothing else will block here
openSessions.values
.filter { it.state is FlowSessionState.Initiating }
.forEach { it.waitForConfirmation() }
// This is to prevent actionOnEnd being called twice if it throws an exception
actionOnEnd()
actionOnEnd(null)
_resultFuture?.set(result)
return result
}
private fun createTransaction() {
// Make sure we have a database transaction
createDatabaseTransaction(database)
logger.trace { "Starting database transaction ${TransactionManager.currentOrNull()} on ${Strand.currentStrand()}." }
logger.trace { "Starting database transaction ${TransactionManager.currentOrNull()} on ${Strand.currentStrand()}" }
}
internal fun commitTransaction() {
@ -221,6 +233,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
}
@Suspendable
@Suppress("UNCHECKED_CAST", "PLATFORM_CLASS_MAPPED_TO_KOTLIN")
private fun <M : ExistingSessionMessage> suspendAndExpectReceive(receiveRequest: ReceiveRequest<M>): ReceivedSessionMessage<M> {
val session = receiveRequest.session
fun getReceivedMessage(): ReceivedSessionMessage<ExistingSessionMessage>? = session.receivedMessages.poll()
@ -237,19 +250,23 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
suspend(receiveRequest)
getReceivedMessage() ?:
throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but instead " +
"got nothing: $receiveRequest")
"got nothing for $receiveRequest")
}
if (receivedMessage.message is SessionEnd) {
openSessions.values.remove(session)
throw FlowException("Party ${session.state.sendToParty} has ended their flow but we were expecting to " +
"receive ${receiveRequest.receiveType.simpleName} from them")
} else if (receiveRequest.receiveType.isInstance(receivedMessage.message)) {
@Suppress("UNCHECKED_CAST")
if (receiveRequest.receiveType.isInstance(receivedMessage.message)) {
return receivedMessage as ReceivedSessionMessage<M>
} else if (receivedMessage.message is SessionEnd) {
openSessions.values.remove(session)
if (receivedMessage.message.errorResponse != null) {
(receivedMessage.message.errorResponse as java.lang.Throwable).fillInStackTrace()
throw receivedMessage.message.errorResponse
} else {
throw FlowSessionException("${session.state.sendToParty} has ended their flow but we were expecting " +
"to receive ${receiveRequest.receiveType.simpleName} from them")
}
} else {
throw IllegalStateException("Was expecting a ${receiveRequest.receiveType.simpleName} but instead got " +
"${receivedMessage.message}: $receiveRequest")
"${receivedMessage.message} for $receiveRequest")
}
}

View File

@ -29,7 +29,7 @@ data class SessionData(override val recipientSessionId: Long, val payload: Any)
}
}
data class SessionEnd(override val recipientSessionId: Long) : ExistingSessionMessage
data class SessionEnd(override val recipientSessionId: Long, val errorResponse: FlowException?) : ExistingSessionMessage
data class ReceivedSessionMessage<out M : ExistingSessionMessage>(val sender: Party, val message: M)
@ -37,7 +37,9 @@ fun <T> ReceivedSessionMessage<SessionData>.checkPayloadIs(type: Class<T>): Untr
if (type.isInstance(message.payload)) {
return UntrustworthyData(type.cast(message.payload))
} else {
throw FlowException("We were expecting a ${type.name} from $sender but we instead got a " +
throw FlowSessionException("We were expecting a ${type.name} from $sender but we instead got a " +
"${message.payload.javaClass.name} (${message.payload})")
}
}
}
class FlowSessionException(message: String) : RuntimeException(message)

View File

@ -12,6 +12,7 @@ import net.corda.core.ThreadBox
import net.corda.core.bufferUntilSubscribed
import net.corda.core.crypto.Party
import net.corda.core.crypto.commonName
import net.corda.core.flows.FlowException
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowStateMachine
import net.corda.core.flows.StateMachineRunId
@ -194,7 +195,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
checkpointStorage.forEach {
// If a flow is added before start() then don't attempt to restore it
if (!stateMachines.containsValue(it)) {
val fiber = deserializeFiber(it.serializedFiber)
val fiber = deserializeFiber(it)
initFiber(fiber)
stateMachines[fiber] = it
}
@ -256,7 +257,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
if (peerParty != null) {
if (message is SessionConfirm) {
logger.debug { "Received session confirmation but associated fiber has already terminated, so sending session end" }
sendSessionMessage(peerParty, SessionEnd(message.initiatedSessionId))
sendSessionMessage(peerParty, SessionEnd(message.initiatedSessionId, null))
} else {
logger.trace { "Ignoring session end message for already closed session: $message" }
}
@ -269,30 +270,44 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
private fun onSessionInit(sessionInit: SessionInit, sender: Party) {
logger.trace { "Received $sessionInit $sender" }
val otherPartySessionId = sessionInit.initiatorSessionId
try {
val markerClass = Class.forName(sessionInit.flowName)
val flowFactory = serviceHub.getFlowFactory(markerClass)
if (flowFactory != null) {
val flow = flowFactory(sender)
val fiber = createFiber(flow)
val session = FlowSession(flow, random63BitValue(), FlowSessionState.Initiated(sender, otherPartySessionId))
if (sessionInit.firstPayload != null) {
session.receivedMessages += ReceivedSessionMessage(sender, SessionData(session.ourSessionId, sessionInit.firstPayload))
}
openSessions[session.ourSessionId] = session
fiber.openSessions[Pair(flow, sender)] = session
updateCheckpoint(fiber)
sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), fiber)
fiber.logger.debug { "Initiated from $sessionInit on $session" }
startFiber(fiber)
} else {
logger.warn("Unknown flow marker class in $sessionInit")
sendSessionMessage(sender, SessionReject(otherPartySessionId, "Don't know ${markerClass.name}"))
}
fun sendSessionReject(message: String) = sendSessionMessage(sender, SessionReject(otherPartySessionId, message))
val markerClass = try {
Class.forName(sessionInit.flowName)
} catch (e: Exception) {
logger.warn("Received invalid $sessionInit", e)
sendSessionMessage(sender, SessionReject(otherPartySessionId, "Unable to establish session"))
sendSessionReject("Don't know ${sessionInit.flowName}")
return
}
val flowFactory = serviceHub.getFlowFactory(markerClass)
if (flowFactory == null) {
logger.warn("Unknown flow marker class in $sessionInit")
sendSessionReject("Don't know ${markerClass.name}")
return
}
val session = try {
val flow = flowFactory(sender)
val fiber = createFiber(flow)
val session = FlowSession(flow, random63BitValue(), FlowSessionState.Initiated(sender, otherPartySessionId))
if (sessionInit.firstPayload != null) {
session.receivedMessages += ReceivedSessionMessage(sender, SessionData(session.ourSessionId, sessionInit.firstPayload))
}
openSessions[session.ourSessionId] = session
fiber.openSessions[Pair(flow, sender)] = session
updateCheckpoint(fiber)
session
} catch (e: Exception) {
logger.warn("Couldn't start session for $sessionInit", e)
sendSessionReject("Unable to establish session")
return
}
sendSessionMessage(sender, SessionConfirm(otherPartySessionId, session.ourSessionId), session.fiber)
session.fiber.logger.debug { "Initiated from $sessionInit on $session" }
startFiber(session.fiber)
}
private fun serializeFiber(fiber: FlowStateMachineImpl<*>): SerializedBytes<FlowStateMachineImpl<*>> {
@ -302,11 +317,11 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
return fiber.serialize(kryo)
}
private fun deserializeFiber(serialisedFiber: SerializedBytes<FlowStateMachineImpl<*>>): FlowStateMachineImpl<*> {
private fun deserializeFiber(checkpoint: Checkpoint): FlowStateMachineImpl<*> {
val kryo = quasarKryo()
// put the map of token -> tokenized into the kryo context
SerializeAsTokenSerializer.setContext(kryo, serializationContext)
return serialisedFiber.deserialize(kryo).apply { fromCheckpoint = true }
return checkpoint.serializedFiber.deserialize(kryo).apply { fromCheckpoint = true }
}
private fun quasarKryo(): Kryo {
@ -330,14 +345,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
processIORequest(ioRequest)
decrementLiveFibers()
}
fiber.actionOnEnd = {
fiber.actionOnEnd = { errorResponse: FlowException? ->
try {
fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE
mutex.locked {
stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) }
notifyChangeObservers(fiber, AddOrRemove.REMOVE)
}
endAllFiberSessions(fiber)
endAllFiberSessions(fiber, errorResponse)
} finally {
fiber.commitTransaction()
decrementLiveFibers()
@ -352,14 +367,12 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>) {
private fun endAllFiberSessions(fiber: FlowStateMachineImpl<*>, errorResponse: FlowException?) {
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
(errorResponse as java.lang.Throwable?)?.stackTrace = emptyArray()
openSessions.values.removeIf { session ->
if (session.fiber == fiber) {
val initiatedState = session.state as? FlowSessionState.Initiated
if (initiatedState != null) {
sendSessionMessage(initiatedState.peerParty, SessionEnd(initiatedState.peerSessionId), fiber)
recentlyClosedSessions[session.ourSessionId] = initiatedState.peerParty
}
session.endSession(errorResponse)
true
} else {
false
@ -367,6 +380,17 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
}
}
private fun FlowSession.endSession(errorResponse: FlowException?) {
val initiatedState = state as? Initiated
if (initiatedState != null) {
sendSessionMessage(
initiatedState.peerParty,
SessionEnd(initiatedState.peerSessionId, errorResponse),
fiber)
recentlyClosedSessions[ourSessionId] = initiatedState.peerParty
}
}
private fun startFiber(fiber: FlowStateMachineImpl<*>) {
try {
resumeFiber(fiber)

View File

@ -11,18 +11,16 @@ import net.corda.core.utilities.DUMMY_NOTARY
import net.corda.core.utilities.DUMMY_NOTARY_KEY
import net.corda.flows.NotaryChangeFlow.Instigator
import net.corda.flows.StateReplacementException
import net.corda.flows.StateReplacementRefused
import net.corda.node.internal.AbstractNode
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.testing.node.MockNetwork
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before
import org.junit.Test
import java.time.Instant
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
class NotaryChangeTests {
@ -83,8 +81,9 @@ class NotaryChangeTests {
net.runNetwork()
val ex = assertFailsWith(StateReplacementException::class) { future.resultFuture.getOrThrow() }
assertThat(ex.error).isInstanceOf(StateReplacementRefused::class.java)
assertThatExceptionOfType(StateReplacementException::class.java).isThrownBy {
future.resultFuture.getOrThrow()
}
}
@Test

View File

@ -34,7 +34,8 @@ import net.corda.testing.node.MockNetwork
import net.corda.testing.node.MockNetwork.MockNode
import net.corda.testing.sequence
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
@ -249,14 +250,14 @@ class StateMachineManagerTests {
assertSessionTransfers(node2,
node1 sent sessionInit(SendFlow::class, payload) to node2,
node2 sent sessionConfirm to node1,
node1 sent sessionEnd to node2
node1 sent sessionEnd() to node2
//There's no session end from the other flows as they're manually suspended
)
assertSessionTransfers(node3,
node1 sent sessionInit(SendFlow::class, payload) to node3,
node3 sent sessionConfirm to node1,
node1 sent sessionEnd to node3
node1 sent sessionEnd() to node3
//There's no session end from the other flows as they're manually suspended
)
@ -283,14 +284,14 @@ class StateMachineManagerTests {
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1,
node2 sent sessionData(node2Payload) to node1,
node2 sent sessionEnd to node1
node2 sent sessionEnd() to node1
)
assertSessionTransfers(node3,
node1 sent sessionInit(ReceiveFlow::class) to node3,
node3 sent sessionConfirm to node1,
node3 sent sessionData(node3Payload) to node1,
node3 sent sessionEnd to node1
node3 sent sessionEnd() to node1
)
}
@ -306,7 +307,7 @@ class StateMachineManagerTests {
node2 sent sessionData(20L) to node1,
node1 sent sessionData(11L) to node2,
node2 sent sessionData(21L) to node1,
node1 sent sessionEnd to node2
node1 sent sessionEnd() to node2
)
}
@ -368,18 +369,104 @@ class StateMachineManagerTests {
}
@Test
fun `exception thrown on other side`() {
val erroringFiber = node2.initiateSingleShotFlow(ReceiveFlow::class) { ExceptionFlow }.map { it.stateMachine as FlowStateMachineImpl }
fun `FlowException thrown on other side`() {
val erroringFlowFuture = node2.initiateSingleShotFlow(ReceiveFlow::class) {
ExceptionFlow { MyFlowException("Nothing useful") }
}
val receivingFiber = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity)) as FlowStateMachineImpl
net.runNetwork()
assertThatThrownBy { receivingFiber.resultFuture.getOrThrow() }.isInstanceOf(FlowException::class.java)
assertThatExceptionOfType(MyFlowException::class.java)
.isThrownBy { receivingFiber.resultFuture.getOrThrow() }
.withMessage("Nothing useful")
.withStackTraceContaining("ReceiveFlow") // Make sure the stack trace is that of the receiving flow
databaseTransaction(node2.database) {
assertThat(node2.checkpointStorage.checkpoints()).isEmpty()
}
val errorFlow = erroringFlowFuture.getOrThrow()
assertThat(receivingFiber.isTerminated).isTrue()
assertThat(erroringFiber.getOrThrow().isTerminated).isTrue()
assertThat((errorFlow.stateMachine as FlowStateMachineImpl).isTerminated).isTrue()
assertSessionTransfers(
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1,
node2 sent sessionEnd to node1
node2 sent sessionEnd(errorFlow.exceptionThrown) to node1
)
// Make sure the original stack trace isn't sent down the wire
assertThat((sessionTransfers.last().message as SessionEnd).errorResponse!!.stackTrace).isEmpty()
}
private class SendAndReceiveFlow(val otherParty: Party, val payload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
sendAndReceive<Any>(otherParty, payload)
}
}
@Test
fun `FlowException thrown and there is a 3rd party flow`() {
val node3 = net.createNode(node1.info.address)
net.runNetwork()
// Node 2 will send its payload and then block waiting for the receive from node 1. Meanwhile node 1 will move
// onto node 3 which will throw the exception
val node2Fiber = node2
.initiateSingleShotFlow(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") }
.map { it.stateMachine }
node3.initiateSingleShotFlow(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } }
val node1Fiber = node1.services.startFlow(ReceiveFlow(node2.info.legalIdentity, node3.info.legalIdentity)) as FlowStateMachineImpl
net.runNetwork()
// Node 1 will terminate with the error it received from node 3 but it won't propagate that to node 2 (as it's
// not relevant to it) but it will end its session with it
assertThatExceptionOfType(MyFlowException::class.java).isThrownBy {
node1Fiber.resultFuture.getOrThrow()
}
val node2ResultFuture = node2Fiber.getOrThrow().resultFuture
assertThatExceptionOfType(FlowSessionException::class.java).isThrownBy {
node2ResultFuture.getOrThrow()
}
assertSessionTransfers(node2,
node1 sent sessionInit(ReceiveFlow::class) to node2,
node2 sent sessionConfirm to node1,
node2 sent sessionData("Hello") to node1,
node1 sent sessionEnd() to node2 // Unexpected session-end
)
}
private class ConditionalExceptionFlow(val otherParty: Party, val sendPayload: Any) : FlowLogic<Unit>() {
@Suspendable
override fun call() {
val throwException = receive<Boolean>(otherParty).unwrap { it }
if (throwException) {
throw MyFlowException("Throwing exception as requested")
}
send(otherParty, sendPayload)
}
}
@Test
fun `retry subFlow due to receiving FlowException`() {
class AskForExceptionFlow(val otherParty: Party, val throwException: Boolean) : FlowLogic<String>() {
@Suspendable
override fun call(): String = sendAndReceive<String>(otherParty, throwException).unwrap { it }
}
class RetryOnExceptionFlow(val otherParty: Party) : FlowLogic<String>() {
@Suspendable
override fun call(): String {
return try {
subFlow(AskForExceptionFlow(otherParty, throwException = true))
} catch (e: MyFlowException) {
subFlow(AskForExceptionFlow(otherParty, throwException = false))
}
}
}
node2.services.registerFlowInitiator(AskForExceptionFlow::class) { ConditionalExceptionFlow(it, "Hello") }
val resultFuture = node1.services.startFlow(RetryOnExceptionFlow(node2.info.legalIdentity)).resultFuture
net.runNetwork()
assertThat(resultFuture.getOrThrow()).isEqualTo("Hello")
}
private inline fun <reified P : FlowLogic<*>> MockNode.restartAndGetRestoredFlow(
@ -403,15 +490,16 @@ class StateMachineManagerTests {
private fun sessionData(payload: Any) = SessionData(0, payload)
private val sessionEnd = SessionEnd(0)
private fun sessionEnd(error: FlowException? = null) = SessionEnd(0, error)
private fun assertSessionTransfers(vararg expected: SessionTransfer) {
assertThat(sessionTransfers).containsExactly(*expected)
}
private fun assertSessionTransfers(node: MockNode, vararg expected: SessionTransfer) {
private fun assertSessionTransfers(node: MockNode, vararg expected: SessionTransfer): List<SessionTransfer> {
val actualForNode = sessionTransfers.filter { it.from == node.id || it.to == node.net.myAddress }
assertThat(actualForNode).containsExactly(*expected)
return actualForNode
}
private data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) {
@ -440,7 +528,6 @@ class StateMachineManagerTests {
private infix fun MockNode.sent(message: SessionMessage): Pair<Int, SessionMessage> = Pair(id, message)
private infix fun Pair<Int, SessionMessage>.to(node: MockNode): SessionTransfer = SessionTransfer(first, second, node.net.myAddress)
private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic<Unit>() {
@Transient var flowStarted = false
@ -498,7 +585,16 @@ class StateMachineManagerTests {
}
}
private object ExceptionFlow : FlowLogic<Nothing>() {
override fun call(): Nothing = throw Exception()
private class ExceptionFlow<E : Exception>(val exception: () -> E) : FlowLogic<Nothing>() {
lateinit var exceptionThrown: E
override fun call(): Nothing {
exceptionThrown = exception()
throw exceptionThrown
}
}
private class MyFlowException(message: String) : FlowException(message) {
override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message
override fun hashCode(): Int = message?.hashCode() ?: 31
}
}