mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
CORDA-3506 - Implement session close operations (#6357)
This commit is contained in:
parent
c6cfe1ecf3
commit
7261fa690f
@ -11,7 +11,7 @@ java8MinUpdateVersion=171
|
||||
# When incrementing platformVersion make sure to update #
|
||||
# net.corda.core.internal.CordaUtilsKt.PLATFORM_VERSION as well. #
|
||||
# ***************************************************************#
|
||||
platformVersion=7
|
||||
platformVersion=8
|
||||
guavaVersion=28.0-jre
|
||||
# Quasar version to use with Java 8:
|
||||
quasarVersion=0.7.12_r3
|
||||
|
@ -167,7 +167,7 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
|
||||
|
||||
@Suspendable
|
||||
override fun testCode(): Any =
|
||||
await(ExternalAsyncOperation(serviceHub) { _, _ ->
|
||||
await(ExternalAsyncOperation(serviceHub) { serviceHub, _ ->
|
||||
serviceHub.cordaService(FutureService::class.java).createFuture()
|
||||
})
|
||||
}
|
||||
|
@ -25,6 +25,7 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.NonEmptySet
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
import net.corda.core.utilities.UntrustworthyData
|
||||
import net.corda.core.utilities.debug
|
||||
@ -378,6 +379,22 @@ abstract class FlowLogic<out T> {
|
||||
stateMachine.suspend(request, maySkipCheckpoint)
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the provided sessions and performs cleanup of any resources tied to these sessions.
|
||||
*
|
||||
* Note that sessions are closed automatically when the corresponding top-level flow terminates.
|
||||
* So, it's beneficial to eagerly close them in long-lived flows that might have many open sessions that are not needed anymore and consume resources (e.g. memory, disk etc.).
|
||||
* A closed session cannot be used anymore, e.g. to send or receive messages. So, you have to ensure you are calling this method only when the provided sessions are not going to be used anymore.
|
||||
* As a result, any operations on a closed session will fail with an [UnexpectedFlowEndException].
|
||||
* When a session is closed, the other side is informed and the session is closed there too eventually.
|
||||
* To prevent misuse of the API, if there is an attempt to close an uninitialised session the invocation will fail with an [IllegalStateException].
|
||||
*/
|
||||
@Suspendable
|
||||
fun close(sessions: NonEmptySet<FlowSession>) {
|
||||
val request = FlowIORequest.CloseSessions(sessions)
|
||||
stateMachine.suspend(request, false)
|
||||
}
|
||||
|
||||
/**
|
||||
* Invokes the given subflow. This function returns once the subflow completes successfully with the result
|
||||
* returned by that subflow's [call] method. If the subflow has a progress tracker, it is attached to the
|
||||
|
@ -191,6 +191,19 @@ abstract class FlowSession {
|
||||
*/
|
||||
@Suspendable
|
||||
abstract fun send(payload: Any)
|
||||
|
||||
/**
|
||||
* Closes this session and performs cleanup of any resources tied to this session.
|
||||
*
|
||||
* Note that sessions are closed automatically when the corresponding top-level flow terminates.
|
||||
* So, it's beneficial to eagerly close them in long-lived flows that might have many open sessions that are not needed anymore and consume resources (e.g. memory, disk etc.).
|
||||
* A closed session cannot be used anymore, e.g. to send or receive messages. So, you have to ensure you are calling this method only when the session is not going to be used anymore.
|
||||
* As a result, any operations on a closed session will fail with an [UnexpectedFlowEndException].
|
||||
* When a session is closed, the other side is informed and the session is closed there too eventually.
|
||||
* To prevent misuse of the API, if there is an attempt to close an uninitialised session the invocation will fail with an [IllegalStateException].
|
||||
*/
|
||||
@Suspendable
|
||||
abstract fun close()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -28,7 +28,7 @@ import java.util.jar.JarInputStream
|
||||
|
||||
// *Internal* Corda-specific utilities.
|
||||
|
||||
const val PLATFORM_VERSION = 7
|
||||
const val PLATFORM_VERSION = 8
|
||||
|
||||
fun ServicesForResolution.ensureMinimumPlatformVersion(requiredMinPlatformVersion: Int, feature: String) {
|
||||
checkMinimumPlatformVersion(networkParameters.minimumPlatformVersion, requiredMinPlatformVersion, feature)
|
||||
|
@ -55,6 +55,13 @@ sealed class FlowIORequest<out R : Any> {
|
||||
}}, shouldRetrySend=$shouldRetrySend)"
|
||||
}
|
||||
|
||||
/**
|
||||
* Closes the specified sessions.
|
||||
*
|
||||
* @property sessions the sessions to be closed.
|
||||
*/
|
||||
data class CloseSessions(val sessions: NonEmptySet<FlowSession>): FlowIORequest<Unit>()
|
||||
|
||||
/**
|
||||
* Wait for a transaction to be committed to the database.
|
||||
*
|
||||
|
@ -0,0 +1,273 @@
|
||||
package net.corda.node.flows
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.client.rpc.CordaRPCClient
|
||||
import net.corda.core.CordaRuntimeException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.transpose
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.node.services.Permissions
|
||||
import net.corda.node.services.statemachine.transitions.PrematureSessionCloseException
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.User
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.Test
|
||||
import java.sql.SQLTransientConnectionException
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class FlowSessionCloseTest {
|
||||
|
||||
private val user = User("user", "pwd", setOf(Permissions.all()))
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `flow cannot close uninitialised session`() {
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) {
|
||||
val (nodeAHandle, nodeBHandle) = listOf(
|
||||
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)),
|
||||
startNode(providedName = BOB_NAME, rpcUsers = listOf(user))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
assertThatThrownBy { it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), true, null, false).returnValue.getOrThrow() }
|
||||
.isInstanceOf(CordaRuntimeException::class.java)
|
||||
.hasMessageContaining(PrematureSessionCloseException::class.java.name)
|
||||
.hasMessageContaining("The following session was closed before it was initialised")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `flow cannot access closed session`() {
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) {
|
||||
val (nodeAHandle, nodeBHandle) = listOf(
|
||||
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)),
|
||||
startNode(providedName = BOB_NAME, rpcUsers = listOf(user))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
InitiatorFlow.SessionAPI.values().forEach { sessionAPI ->
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
assertThatThrownBy { it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, sessionAPI, false).returnValue.getOrThrow() }
|
||||
.isInstanceOf(UnexpectedFlowEndException::class.java)
|
||||
.hasMessageContaining("Tried to access ended session")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `flow can close initialised session successfully`() {
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) {
|
||||
val (nodeAHandle, nodeBHandle) = listOf(
|
||||
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)),
|
||||
startNode(providedName = BOB_NAME, rpcUsers = listOf(user))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, null, false).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `flow can close initialised session successfully even in case of failures and replays`() {
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) {
|
||||
val (nodeAHandle, nodeBHandle) = listOf(
|
||||
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)),
|
||||
startNode(providedName = BOB_NAME, rpcUsers = listOf(user))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::InitiatorFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false, null, true).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `flow can close multiple sessions successfully`() {
|
||||
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) {
|
||||
val (nodeAHandle, nodeBHandle) = listOf(
|
||||
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)),
|
||||
startNode(providedName = BOB_NAME, rpcUsers = listOf(user))
|
||||
).transpose().getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::InitiatorMultipleSessionsFlow, nodeBHandle.nodeInfo.legalIdentities.first()).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test ensures that when sessions are closed, the associated resources are eagerly cleaned up.
|
||||
* If sessions are not closed, then the node will crash with an out-of-memory error.
|
||||
* This can be confirmed by commenting out [FlowSession.close] operation in the invoked flow and re-run the test.
|
||||
*/
|
||||
@Test(timeout=300_000)
|
||||
fun `flow looping over sessions can close them to release resources and avoid out-of-memory failures, when the other side does not finish early`() {
|
||||
driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) {
|
||||
val (nodeAHandle, nodeBHandle) = listOf(
|
||||
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), maximumHeapSize = "256m"),
|
||||
startNode(providedName = BOB_NAME, rpcUsers = listOf(user), maximumHeapSize = "256m")
|
||||
).transpose().getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::InitiatorLoopingFlow, nodeBHandle.nodeInfo.legalIdentities.first(), true).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `flow looping over sessions will close sessions automatically, when the other side finishes early`() {
|
||||
driver(DriverParameters(startNodesInProcess = false, cordappsForAllNodes = listOf(enclosedCordapp()), notarySpecs = emptyList())) {
|
||||
val (nodeAHandle, nodeBHandle) = listOf(
|
||||
startNode(providedName = ALICE_NAME, rpcUsers = listOf(user), maximumHeapSize = "256m"),
|
||||
startNode(providedName = BOB_NAME, rpcUsers = listOf(user), maximumHeapSize = "256m")
|
||||
).transpose().getOrThrow()
|
||||
|
||||
CordaRPCClient(nodeAHandle.rpcAddress).start(user.username, user.password).use {
|
||||
it.proxy.startFlow(::InitiatorLoopingFlow, nodeBHandle.nodeInfo.legalIdentities.first(), false).returnValue.getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class InitiatorFlow(val party: Party, private val prematureClose: Boolean = false,
|
||||
private val accessClosedSessionWithApi: SessionAPI? = null,
|
||||
private val retryClose: Boolean = false): FlowLogic<Unit>() {
|
||||
|
||||
@CordaSerializable
|
||||
enum class SessionAPI {
|
||||
SEND,
|
||||
SEND_AND_RECEIVE,
|
||||
RECEIVE,
|
||||
GET_FLOW_INFO
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val session = initiateFlow(party)
|
||||
|
||||
if (prematureClose) {
|
||||
session.close()
|
||||
}
|
||||
|
||||
session.send(retryClose)
|
||||
sleep(1.seconds)
|
||||
|
||||
if (accessClosedSessionWithApi != null) {
|
||||
when(accessClosedSessionWithApi) {
|
||||
SessionAPI.SEND -> session.send("dummy payload ")
|
||||
SessionAPI.RECEIVE -> session.receive<String>()
|
||||
SessionAPI.SEND_AND_RECEIVE -> session.sendAndReceive<String>("dummy payload")
|
||||
SessionAPI.GET_FLOW_INFO -> session.getCounterpartyFlowInfo()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(InitiatorFlow::class)
|
||||
class InitiatedFlow(private val otherSideSession: FlowSession): FlowLogic<Unit>() {
|
||||
|
||||
companion object {
|
||||
var thrown = false
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val retryClose = otherSideSession.receive<Boolean>()
|
||||
.unwrap{ it }
|
||||
|
||||
otherSideSession.close()
|
||||
|
||||
// failing with a transient exception to force a replay of the close.
|
||||
if (retryClose) {
|
||||
if (!thrown) {
|
||||
thrown = true
|
||||
throw SQLTransientConnectionException("Connection is not available")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class InitiatorLoopingFlow(val party: Party, val blockingCounterparty: Boolean = false): FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
for (i in 1..1_000) {
|
||||
val session = initiateFlow(party)
|
||||
session.sendAndReceive<String>(blockingCounterparty ).unwrap{ assertEquals("Got it", it) }
|
||||
|
||||
/**
|
||||
* If the counterparty blocks, we need to eagerly close the session and release resources to avoid running out of memory.
|
||||
* Otherwise, the session end messages from the other side will do that automatically.
|
||||
*/
|
||||
if (blockingCounterparty) {
|
||||
session.close()
|
||||
}
|
||||
|
||||
logger.info("Completed iteration $i")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(InitiatorLoopingFlow::class)
|
||||
class InitiatedLoopingFlow(private val otherSideSession: FlowSession): FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
val shouldBlock = otherSideSession.receive<Boolean>()
|
||||
.unwrap{ it }
|
||||
otherSideSession.send("Got it")
|
||||
|
||||
if (shouldBlock) {
|
||||
otherSideSession.receive<String>()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatingFlow
|
||||
@StartableByRPC
|
||||
class InitiatorMultipleSessionsFlow(val party: Party): FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
for (round in 1 .. 2) {
|
||||
val sessions = mutableListOf<FlowSession>()
|
||||
for (session_number in 1 .. 5) {
|
||||
val session = initiateFlow(party)
|
||||
sessions.add(session)
|
||||
session.sendAndReceive<String>("What's up?").unwrap{ assertEquals("All good!", it) }
|
||||
}
|
||||
close(sessions.toNonEmptySet())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(InitiatorMultipleSessionsFlow::class)
|
||||
class InitiatedMultipleSessionsFlow(private val otherSideSession: FlowSession): FlowLogic<Unit>() {
|
||||
@Suspendable
|
||||
override fun call() {
|
||||
otherSideSession.receive<String>()
|
||||
.unwrap{ assertEquals("What's up?", it) }
|
||||
otherSideSession.send("All good!")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -60,13 +60,11 @@ import net.corda.nodeapi.internal.lifecycle.NodeLifecycleObserver.Companion.repo
|
||||
import net.corda.node.internal.NodeStartup
|
||||
import net.corda.node.services.api.CheckpointStorage
|
||||
import net.corda.node.services.statemachine.Checkpoint
|
||||
import net.corda.node.services.statemachine.DataSessionMessage
|
||||
import net.corda.node.services.statemachine.ErrorState
|
||||
import net.corda.node.services.statemachine.FlowError
|
||||
import net.corda.node.services.statemachine.ExistingSessionMessagePayload
|
||||
import net.corda.node.services.statemachine.FlowSessionImpl
|
||||
import net.corda.node.services.statemachine.FlowState
|
||||
import net.corda.node.services.statemachine.FlowStateMachineImpl
|
||||
import net.corda.node.services.statemachine.InitiatedSessionState
|
||||
import net.corda.node.services.statemachine.SessionId
|
||||
import net.corda.node.services.statemachine.SessionState
|
||||
import net.corda.node.services.statemachine.SubFlow
|
||||
@ -325,6 +323,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
val send: List<SendJson>? = null,
|
||||
val receive: NonEmptySet<FlowSession>? = null,
|
||||
val sendAndReceive: List<SendJson>? = null,
|
||||
val closeSessions: NonEmptySet<FlowSession>? = null,
|
||||
val waitForLedgerCommit: SecureHash? = null,
|
||||
val waitForStateConsumption: Set<StateRef>? = null,
|
||||
val getFlowInfo: NonEmptySet<FlowSession>? = null,
|
||||
@ -352,6 +351,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
is FlowIORequest.Send -> SuspendedOn(send = sessionToMessage.toJson())
|
||||
is FlowIORequest.Receive -> SuspendedOn(receive = sessions)
|
||||
is FlowIORequest.SendAndReceive -> SuspendedOn(sendAndReceive = sessionToMessage.toJson())
|
||||
is FlowIORequest.CloseSessions -> SuspendedOn(closeSessions = sessions)
|
||||
is FlowIORequest.WaitForLedgerCommit -> SuspendedOn(waitForLedgerCommit = hash)
|
||||
is FlowIORequest.GetFlowInfo -> SuspendedOn(getFlowInfo = sessions)
|
||||
is FlowIORequest.Sleep -> SuspendedOn(sleepTill = wakeUpAfter)
|
||||
@ -379,16 +379,14 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
|
||||
private class ActiveSession(
|
||||
val peer: Party,
|
||||
val ourSessionId: SessionId,
|
||||
val receivedMessages: List<DataSessionMessage>,
|
||||
val errors: List<FlowError>,
|
||||
val receivedMessages: List<ExistingSessionMessagePayload>,
|
||||
val peerFlowInfo: FlowInfo,
|
||||
val peerSessionId: SessionId?
|
||||
)
|
||||
|
||||
private fun SessionState.toActiveSession(sessionId: SessionId): ActiveSession? {
|
||||
return if (this is SessionState.Initiated) {
|
||||
val peerSessionId = (initiatedState as? InitiatedSessionState.Live)?.peerSinkSessionId
|
||||
ActiveSession(peerParty, sessionId, receivedMessages, errors, peerFlowInfo, peerSessionId)
|
||||
ActiveSession(peerParty, sessionId, receivedMessages, peerFlowInfo, peerSinkSessionId)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
@ -130,13 +130,9 @@ internal class ActionExecutorImpl(
|
||||
log.warn("Propagating error", exception)
|
||||
}
|
||||
for (sessionState in action.sessions) {
|
||||
// We cannot propagate if the session isn't live.
|
||||
if (sessionState.initiatedState !is InitiatedSessionState.Live) {
|
||||
continue
|
||||
}
|
||||
// Don't propagate errors to the originating session
|
||||
for (errorMessage in action.errorMessages) {
|
||||
val sinkSessionId = sessionState.initiatedState.peerSinkSessionId
|
||||
val sinkSessionId = sessionState.peerSinkSessionId
|
||||
val existingMessage = ExistingSessionMessage(sinkSessionId, errorMessage)
|
||||
val deduplicationId = DeduplicationId.createForError(errorMessage.errorId, sinkSessionId)
|
||||
flowMessaging.sendSessionMessage(sessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, action.senderUUID))
|
||||
|
@ -78,6 +78,7 @@ internal class FlowMonitor(
|
||||
is FlowIORequest.Send -> "to send a message to parties ${request.sessionToMessage.keys.partiesInvolved()}"
|
||||
is FlowIORequest.Receive -> "to receive messages from parties ${request.sessions.partiesInvolved()}"
|
||||
is FlowIORequest.SendAndReceive -> "to send and receive messages from parties ${request.sessionToMessage.keys.partiesInvolved()}"
|
||||
is FlowIORequest.CloseSessions -> "to close sessions: ${request.sessions}"
|
||||
is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}"
|
||||
is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}"
|
||||
is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}"
|
||||
|
@ -81,6 +81,12 @@ class FlowSessionImpl(
|
||||
@Suspendable
|
||||
override fun send(payload: Any) = send(payload, maySkipCheckpoint = false)
|
||||
|
||||
@Suspendable
|
||||
override fun close() {
|
||||
val request = FlowIORequest.CloseSessions(NonEmptySet.of(this))
|
||||
return flowStateMachine.suspend(request, false)
|
||||
}
|
||||
|
||||
private fun enforceNotPrimitive(type: Class<*>) {
|
||||
require(!type.isPrimitive) { "Cannot receive primitive type $type" }
|
||||
}
|
||||
|
@ -106,6 +106,7 @@ data class Checkpoint(
|
||||
invocationContext,
|
||||
ourIdentity,
|
||||
emptyMap(),
|
||||
emptySet(),
|
||||
listOf(topLevelSubFlow),
|
||||
numberOfSuspends = 0
|
||||
),
|
||||
@ -132,6 +133,22 @@ data class Checkpoint(
|
||||
return copy(checkpointState = checkpointState.copy(sessions = checkpointState.sessions + session))
|
||||
}
|
||||
|
||||
fun addSessionsToBeClosed(sessionIds: Set<SessionId>): Checkpoint {
|
||||
return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed + sessionIds))
|
||||
}
|
||||
|
||||
fun removeSessionsToBeClosed(sessionIds: Set<SessionId>): Checkpoint {
|
||||
return copy(checkpointState = checkpointState.copy(sessionsToBeClosed = checkpointState.sessionsToBeClosed - sessionIds))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of the Checkpoint with the specified session removed from the session map.
|
||||
* @param sessionIds the sessions to remove.
|
||||
*/
|
||||
fun removeSessions(sessionIds: Set<SessionId>): Checkpoint {
|
||||
return copy(checkpointState = checkpointState.copy(sessions = checkpointState.sessions - sessionIds))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a copy of the Checkpoint with a new subFlow stack.
|
||||
* @param subFlows the new List of subFlows.
|
||||
@ -193,16 +210,18 @@ data class Checkpoint(
|
||||
* @param invocationContext the initiator of the flow.
|
||||
* @param ourIdentity the identity the flow is run as.
|
||||
* @param sessions map of source session ID to session state.
|
||||
* @param sessionsToBeClosed the sessions that have pending session end messages and need to be closed. This is available to avoid scanning all the sessions.
|
||||
* @param subFlowStack the stack of currently executing subflows.
|
||||
* @param numberOfSuspends the number of flow suspends due to IO API calls.
|
||||
*/
|
||||
@CordaSerializable
|
||||
data class CheckpointState(
|
||||
val invocationContext: InvocationContext,
|
||||
val ourIdentity: Party,
|
||||
val sessions: SessionMap, // This must preserve the insertion order!
|
||||
val subFlowStack: List<SubFlow>,
|
||||
val numberOfSuspends: Int
|
||||
val invocationContext: InvocationContext,
|
||||
val ourIdentity: Party,
|
||||
val sessions: SessionMap, // This must preserve the insertion order!
|
||||
val sessionsToBeClosed: Set<SessionId>,
|
||||
val subFlowStack: List<SubFlow>,
|
||||
val numberOfSuspends: Int
|
||||
)
|
||||
|
||||
/**
|
||||
@ -236,30 +255,25 @@ sealed class SessionState {
|
||||
|
||||
/**
|
||||
* We have received a confirmation, the peer party and session id is resolved.
|
||||
* @property errors if not empty the session is in an errored state.
|
||||
* @property receivedMessages the messages that have been received and are pending processing.
|
||||
* this could be any [ExistingSessionMessagePayload] type in theory, but it in practice it can only be one of the following types now:
|
||||
* * [DataSessionMessage]
|
||||
* * [ErrorSessionMessage]
|
||||
* * [EndSessionMessage]
|
||||
* @property otherSideErrored whether the session has received an error from the other side.
|
||||
*/
|
||||
data class Initiated(
|
||||
val peerParty: Party,
|
||||
val peerFlowInfo: FlowInfo,
|
||||
val receivedMessages: List<DataSessionMessage>,
|
||||
val initiatedState: InitiatedSessionState,
|
||||
val errors: List<FlowError>,
|
||||
val receivedMessages: List<ExistingSessionMessagePayload>,
|
||||
val otherSideErrored: Boolean,
|
||||
val peerSinkSessionId: SessionId,
|
||||
override val deduplicationSeed: String
|
||||
) : SessionState()
|
||||
}
|
||||
|
||||
typealias SessionMap = Map<SessionId, SessionState>
|
||||
|
||||
/**
|
||||
* Tracks whether an initiated session state is live or has ended. This is a separate state, as we still need the rest
|
||||
* of [SessionState.Initiated], even when the session has ended, for un-drained session messages and potential future
|
||||
* [FlowInfo] requests.
|
||||
*/
|
||||
sealed class InitiatedSessionState {
|
||||
data class Live(val peerSinkSessionId: SessionId) : InitiatedSessionState()
|
||||
object Ended : InitiatedSessionState() { override fun toString() = "Ended" }
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the way the flow has started.
|
||||
*/
|
||||
|
@ -1,9 +1,8 @@
|
||||
package net.corda.node.services.statemachine.transitions
|
||||
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.DeclaredField
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.node.services.statemachine.Action
|
||||
import net.corda.node.services.statemachine.ConfirmSessionMessage
|
||||
import net.corda.node.services.statemachine.DataSessionMessage
|
||||
@ -12,7 +11,7 @@ import net.corda.node.services.statemachine.ErrorSessionMessage
|
||||
import net.corda.node.services.statemachine.Event
|
||||
import net.corda.node.services.statemachine.ExistingSessionMessage
|
||||
import net.corda.node.services.statemachine.FlowError
|
||||
import net.corda.node.services.statemachine.InitiatedSessionState
|
||||
import net.corda.node.services.statemachine.FlowState
|
||||
import net.corda.node.services.statemachine.RejectSessionMessage
|
||||
import net.corda.node.services.statemachine.SenderDeduplicationId
|
||||
import net.corda.node.services.statemachine.SessionState
|
||||
@ -37,6 +36,11 @@ class DeliverSessionMessageTransition(
|
||||
override val startingState: StateMachineState,
|
||||
val event: Event.DeliverSessionMessage
|
||||
) : Transition {
|
||||
|
||||
private companion object {
|
||||
val log = contextLogger()
|
||||
}
|
||||
|
||||
override fun transition(): TransitionResult {
|
||||
return builder {
|
||||
// Add the DeduplicationHandler to the pending ones ASAP so in case an error happens we still know
|
||||
@ -49,7 +53,7 @@ class DeliverSessionMessageTransition(
|
||||
// Check whether we have a session corresponding to the message.
|
||||
val existingSession = startingState.checkpoint.checkpointState.sessions[event.sessionMessage.recipientSessionId]
|
||||
if (existingSession == null) {
|
||||
freshErrorTransition(CannotFindSessionException(event.sessionMessage.recipientSessionId))
|
||||
checkIfMissingSessionIsAnIssue(event.sessionMessage)
|
||||
} else {
|
||||
val payload = event.sessionMessage.payload
|
||||
// Dispatch based on what kind of message it is.
|
||||
@ -58,7 +62,7 @@ class DeliverSessionMessageTransition(
|
||||
is DataSessionMessage -> dataMessageTransition(existingSession, payload)
|
||||
is ErrorSessionMessage -> errorMessageTransition(existingSession, payload)
|
||||
is RejectSessionMessage -> rejectMessageTransition(existingSession, payload)
|
||||
is EndSessionMessage -> endMessageTransition()
|
||||
is EndSessionMessage -> endMessageTransition(payload)
|
||||
}
|
||||
}
|
||||
// Schedule a DoRemainingWork to check whether the flow needs to be woken up.
|
||||
@ -67,6 +71,14 @@ class DeliverSessionMessageTransition(
|
||||
}
|
||||
}
|
||||
|
||||
private fun TransitionBuilder.checkIfMissingSessionIsAnIssue(message: ExistingSessionMessage) {
|
||||
val payload = message.payload
|
||||
if (payload is EndSessionMessage)
|
||||
log.debug { "Received session end message for a session that has already ended: ${event.sessionMessage.recipientSessionId}"}
|
||||
else
|
||||
freshErrorTransition(CannotFindSessionException(event.sessionMessage.recipientSessionId))
|
||||
}
|
||||
|
||||
private fun TransitionBuilder.confirmMessageTransition(sessionState: SessionState, message: ConfirmSessionMessage) {
|
||||
// We received a confirmation message. The corresponding session state must be Initiating.
|
||||
when (sessionState) {
|
||||
@ -76,9 +88,9 @@ class DeliverSessionMessageTransition(
|
||||
peerParty = event.sender,
|
||||
peerFlowInfo = message.initiatedFlowInfo,
|
||||
receivedMessages = emptyList(),
|
||||
initiatedState = InitiatedSessionState.Live(message.initiatedSessionId),
|
||||
errors = emptyList(),
|
||||
deduplicationSeed = sessionState.deduplicationSeed
|
||||
peerSinkSessionId = message.initiatedSessionId,
|
||||
deduplicationSeed = sessionState.deduplicationSeed,
|
||||
otherSideErrored = false
|
||||
)
|
||||
val newCheckpoint = currentState.checkpoint.addSession(
|
||||
event.sessionMessage.recipientSessionId to initiatedSession
|
||||
@ -115,28 +127,11 @@ class DeliverSessionMessageTransition(
|
||||
}
|
||||
|
||||
private fun TransitionBuilder.errorMessageTransition(sessionState: SessionState, payload: ErrorSessionMessage) {
|
||||
val exception: Throwable = if (payload.flowException == null) {
|
||||
UnexpectedFlowEndException("Counter-flow errored", cause = null, originalErrorId = payload.errorId)
|
||||
} else {
|
||||
payload.flowException.originalErrorId = payload.errorId
|
||||
payload.flowException
|
||||
}
|
||||
|
||||
return when (sessionState) {
|
||||
is SessionState.Initiated -> {
|
||||
when (exception) {
|
||||
// reflection used to access private field
|
||||
is UnexpectedFlowEndException -> DeclaredField<Party?>(
|
||||
UnexpectedFlowEndException::class.java,
|
||||
"peer",
|
||||
exception
|
||||
).value = sessionState.peerParty
|
||||
is FlowException -> DeclaredField<Party?>(FlowException::class.java, "peer", exception).value = sessionState.peerParty
|
||||
}
|
||||
val checkpoint = currentState.checkpoint
|
||||
val sessionId = event.sessionMessage.recipientSessionId
|
||||
val flowError = FlowError(payload.errorId, exception)
|
||||
val newSessionState = sessionState.copy(errors = sessionState.errors + flowError)
|
||||
val newSessionState = sessionState.copy(receivedMessages = sessionState.receivedMessages + payload)
|
||||
currentState = currentState.copy(
|
||||
checkpoint = checkpoint.addSession(sessionId to newSessionState)
|
||||
)
|
||||
@ -165,23 +160,26 @@ class DeliverSessionMessageTransition(
|
||||
}
|
||||
}
|
||||
|
||||
private fun TransitionBuilder.endMessageTransition() {
|
||||
private fun TransitionBuilder.endMessageTransition(payload: EndSessionMessage) {
|
||||
|
||||
val sessionId = event.sessionMessage.recipientSessionId
|
||||
val sessions = currentState.checkpoint.checkpointState.sessions
|
||||
val sessionState = sessions[sessionId]
|
||||
if (sessionState == null) {
|
||||
return freshErrorTransition(CannotFindSessionException(sessionId))
|
||||
}
|
||||
// a check has already been performed to confirm the session exists for this message before this method is invoked.
|
||||
val sessionState = sessions[sessionId]!!
|
||||
when (sessionState) {
|
||||
is SessionState.Initiated -> {
|
||||
val newSessionState = sessionState.copy(initiatedState = InitiatedSessionState.Ended)
|
||||
currentState = currentState.copy(
|
||||
checkpoint = currentState.checkpoint.addSession(sessionId to newSessionState)
|
||||
val flowState = currentState.checkpoint.flowState
|
||||
// flow must have already been started when session end messages are being delivered.
|
||||
if (flowState !is FlowState.Started)
|
||||
return freshErrorTransition(UnexpectedEventInState())
|
||||
|
||||
)
|
||||
val newSessionState = sessionState.copy(receivedMessages = sessionState.receivedMessages + payload)
|
||||
val newCheckpoint = currentState.checkpoint.addSession(event.sessionMessage.recipientSessionId to newSessionState)
|
||||
.addSessionsToBeClosed(setOf(event.sessionMessage.recipientSessionId))
|
||||
currentState = currentState.copy(checkpoint = newCheckpoint)
|
||||
}
|
||||
else -> {
|
||||
freshErrorTransition(UnexpectedEventInState())
|
||||
freshErrorTransition(PrematureSessionEndException(event.sessionMessage.recipientSessionId))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -117,8 +117,9 @@ class ErrorFlowTransition(
|
||||
sessionState
|
||||
}
|
||||
}
|
||||
// if we have already received error message from the other side, we don't include that session in the list to avoid propagating errors.
|
||||
val initiatedSessions = sessions.values.mapNotNull { session ->
|
||||
if (session is SessionState.Initiated && session.errors.isEmpty()) {
|
||||
if (session is SessionState.Initiated && !session.otherSideErrored) {
|
||||
session
|
||||
} else {
|
||||
null
|
||||
|
@ -105,8 +105,9 @@ class KilledFlowTransition(
|
||||
sessionState
|
||||
}
|
||||
}
|
||||
// if we have already received error message from the other side, we don't include that session in the list to avoid propagating errors.
|
||||
val initiatedSessions = sessions.values.mapNotNull { session ->
|
||||
if (session is SessionState.Initiated && session.errors.isEmpty()) {
|
||||
if (session is SessionState.Initiated && !session.otherSideErrored) {
|
||||
session
|
||||
} else {
|
||||
null
|
||||
|
@ -1,13 +1,18 @@
|
||||
package net.corda.node.services.statemachine.transitions
|
||||
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowInfo
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.DeclaredField
|
||||
import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.node.services.statemachine.*
|
||||
import java.lang.IllegalStateException
|
||||
import org.slf4j.Logger
|
||||
import kotlin.collections.LinkedHashMap
|
||||
|
||||
/**
|
||||
* This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request
|
||||
@ -20,28 +25,62 @@ class StartedFlowTransition(
|
||||
override val startingState: StateMachineState,
|
||||
val started: FlowState.Started
|
||||
) : Transition {
|
||||
|
||||
companion object {
|
||||
private val logger: Logger = contextLogger()
|
||||
}
|
||||
|
||||
override fun transition(): TransitionResult {
|
||||
val flowIORequest = started.flowIORequest
|
||||
val checkpoint = startingState.checkpoint
|
||||
val errorsToThrow = collectRelevantErrorsToThrow(flowIORequest, checkpoint)
|
||||
val (newState, errorsToThrow) = collectRelevantErrorsToThrow(startingState, flowIORequest)
|
||||
if (errorsToThrow.isNotEmpty()) {
|
||||
return TransitionResult(
|
||||
newState = startingState.copy(isFlowResumed = true),
|
||||
newState = newState.copy(isFlowResumed = true),
|
||||
// throw the first exception. TODO should this aggregate all of them somehow?
|
||||
actions = listOf(Action.CreateTransaction),
|
||||
continuation = FlowContinuation.Throw(errorsToThrow[0])
|
||||
)
|
||||
}
|
||||
return when (flowIORequest) {
|
||||
is FlowIORequest.Send -> sendTransition(flowIORequest)
|
||||
is FlowIORequest.Receive -> receiveTransition(flowIORequest)
|
||||
is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
|
||||
is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
|
||||
is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
|
||||
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
|
||||
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
|
||||
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
|
||||
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
|
||||
val sessionsToBeTerminated = findSessionsToBeTerminated(startingState)
|
||||
// if there are sessions to be closed, we close them as part of this transition and normal processing will continue on the next transition.
|
||||
return if (sessionsToBeTerminated.isNotEmpty()) {
|
||||
terminateSessions(sessionsToBeTerminated)
|
||||
} else {
|
||||
when (flowIORequest) {
|
||||
is FlowIORequest.Send -> sendTransition(flowIORequest)
|
||||
is FlowIORequest.Receive -> receiveTransition(flowIORequest)
|
||||
is FlowIORequest.SendAndReceive -> sendAndReceiveTransition(flowIORequest)
|
||||
is FlowIORequest.CloseSessions -> closeSessionTransition(flowIORequest)
|
||||
is FlowIORequest.WaitForLedgerCommit -> waitForLedgerCommitTransition(flowIORequest)
|
||||
is FlowIORequest.Sleep -> sleepTransition(flowIORequest)
|
||||
is FlowIORequest.GetFlowInfo -> getFlowInfoTransition(flowIORequest)
|
||||
is FlowIORequest.WaitForSessionConfirmations -> waitForSessionConfirmationsTransition()
|
||||
is FlowIORequest.ExecuteAsyncOperation<*> -> executeAsyncOperation(flowIORequest)
|
||||
FlowIORequest.ForceCheckpoint -> executeForceCheckpoint()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun findSessionsToBeTerminated(startingState: StateMachineState): SessionMap {
|
||||
return startingState.checkpoint.checkpointState.sessionsToBeClosed.mapNotNull { sessionId ->
|
||||
val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!! as SessionState.Initiated
|
||||
if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is EndSessionMessage) {
|
||||
sessionId to sessionState
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}.toMap()
|
||||
}
|
||||
|
||||
private fun terminateSessions(sessionsToBeTerminated: SessionMap): TransitionResult {
|
||||
return builder {
|
||||
val sessionsToRemove = sessionsToBeTerminated.keys
|
||||
val newCheckpoint = currentState.checkpoint.removeSessions(sessionsToRemove)
|
||||
.removeSessionsToBeClosed(sessionsToRemove)
|
||||
currentState = currentState.copy(checkpoint = newCheckpoint)
|
||||
actions.add(Action.RemoveSessionBindings(sessionsToRemove))
|
||||
actions.add(Action.ScheduleEvent(Event.DoRemainingWork))
|
||||
FlowContinuation.ProcessEvents
|
||||
}
|
||||
}
|
||||
|
||||
@ -149,6 +188,34 @@ class StartedFlowTransition(
|
||||
}
|
||||
}
|
||||
|
||||
private fun closeSessionTransition(flowIORequest: FlowIORequest.CloseSessions): TransitionResult {
|
||||
return builder {
|
||||
val sessionIdsToRemove = flowIORequest.sessions.map { sessionToSessionId(it) }.toSet()
|
||||
val existingSessionsToRemove = currentState.checkpoint.checkpointState.sessions.filter { (sessionId, _) ->
|
||||
sessionIdsToRemove.contains(sessionId)
|
||||
}
|
||||
val alreadyClosedSessions = sessionIdsToRemove.filter { sessionId -> sessionId !in existingSessionsToRemove }
|
||||
if (alreadyClosedSessions.isNotEmpty()) {
|
||||
logger.warn("Attempting to close already closed sessions: $alreadyClosedSessions")
|
||||
}
|
||||
|
||||
if (existingSessionsToRemove.isNotEmpty()) {
|
||||
val sendEndMessageActions = existingSessionsToRemove.values.mapIndexed { index, state ->
|
||||
val sinkSessionId = (state as SessionState.Initiated).peerSinkSessionId
|
||||
val message = ExistingSessionMessage(sinkSessionId, EndSessionMessage)
|
||||
val deduplicationId = DeduplicationId.createForNormal(currentState.checkpoint, index, state)
|
||||
Action.SendExisting(state.peerParty, message, SenderDeduplicationId(deduplicationId, currentState.senderUUID))
|
||||
}
|
||||
|
||||
currentState = currentState.copy(checkpoint = currentState.checkpoint.removeSessions(existingSessionsToRemove.keys))
|
||||
actions.add(Action.RemoveSessionBindings(sessionIdsToRemove))
|
||||
actions.add(Action.SendMultiple(emptyList(), sendEndMessageActions))
|
||||
}
|
||||
|
||||
resumeFlowLogic(Unit)
|
||||
}
|
||||
}
|
||||
|
||||
private fun receiveTransition(flowIORequest: FlowIORequest.Receive): TransitionResult {
|
||||
return builder {
|
||||
val sessionIdToSession = LinkedHashMap<SessionId, FlowSessionImpl>()
|
||||
@ -199,7 +266,8 @@ class StartedFlowTransition(
|
||||
someNotFound = true
|
||||
} else {
|
||||
newSessionMessages[sessionId] = sessionState.copy(receivedMessages = messages.subList(1, messages.size).toList())
|
||||
resultMessages[sessionId] = messages[0].payload
|
||||
// at this point, we've already checked for errors and session ends, so it's guaranteed that the first message will be a data message.
|
||||
resultMessages[sessionId] = (messages[0] as DataSessionMessage).payload
|
||||
}
|
||||
}
|
||||
else -> {
|
||||
@ -257,12 +325,6 @@ class StartedFlowTransition(
|
||||
val checkpoint = startingState.checkpoint
|
||||
val newSessions = LinkedHashMap(checkpoint.checkpointState.sessions)
|
||||
var index = 0
|
||||
for ((sourceSessionId, _) in sourceSessionIdToMessage) {
|
||||
val existingSessionState = checkpoint.checkpointState.sessions[sourceSessionId] ?: return freshErrorTransition(CannotFindSessionException(sourceSessionId))
|
||||
if (existingSessionState is SessionState.Initiated && existingSessionState.initiatedState is InitiatedSessionState.Ended) {
|
||||
return freshErrorTransition(IllegalStateException("Tried to send to ended session $sourceSessionId"))
|
||||
}
|
||||
}
|
||||
|
||||
val messagesByType = sourceSessionIdToMessage.toList()
|
||||
.map { (sourceSessionId, message) -> Triple(sourceSessionId, checkpoint.checkpointState.sessions[sourceSessionId]!!, message) }
|
||||
@ -286,17 +348,13 @@ class StartedFlowTransition(
|
||||
val newBufferedMessages = initiatingSessionState.bufferedMessages + Pair(deduplicationId, sessionMessage)
|
||||
newSessions[sourceSessionId] = initiatingSessionState.copy(bufferedMessages = newBufferedMessages)
|
||||
}
|
||||
val sendExistingActions = messagesByType[SessionState.Initiated::class]?.mapNotNull {(_, sessionState, message) ->
|
||||
val sendExistingActions = messagesByType[SessionState.Initiated::class]?.map {(_, sessionState, message) ->
|
||||
val initiatedSessionState = sessionState as SessionState.Initiated
|
||||
if (initiatedSessionState.initiatedState !is InitiatedSessionState.Live)
|
||||
null
|
||||
else {
|
||||
val sessionMessage = DataSessionMessage(message)
|
||||
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, initiatedSessionState)
|
||||
val sinkSessionId = initiatedSessionState.initiatedState.peerSinkSessionId
|
||||
val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage)
|
||||
Action.SendExisting(initiatedSessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))
|
||||
}
|
||||
val sessionMessage = DataSessionMessage(message)
|
||||
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, initiatedSessionState)
|
||||
val sinkSessionId = initiatedSessionState.peerSinkSessionId
|
||||
val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage)
|
||||
Action.SendExisting(initiatedSessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))
|
||||
} ?: emptyList()
|
||||
|
||||
if (sendInitialActions.isNotEmpty() || sendExistingActions.isNotEmpty()) {
|
||||
@ -309,21 +367,68 @@ class StartedFlowTransition(
|
||||
return (session as FlowSessionImpl).sourceSessionId
|
||||
}
|
||||
|
||||
private fun collectErroredSessionErrors(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {
|
||||
return sessionIds.flatMap { sessionId ->
|
||||
val sessionState = checkpoint.checkpointState.sessions[sessionId]!!
|
||||
when (sessionState) {
|
||||
is SessionState.Uninitiated -> emptyList()
|
||||
is SessionState.Initiating -> {
|
||||
if (sessionState.rejectionError == null) {
|
||||
emptyList()
|
||||
} else {
|
||||
listOf(sessionState.rejectionError.exception)
|
||||
private fun collectErroredSessionErrors(startingState: StateMachineState, sessionIds: Collection<SessionId>): Pair<StateMachineState, List<Throwable>> {
|
||||
var newState = startingState
|
||||
val errors = sessionIds.filter { sessionId ->
|
||||
startingState.checkpoint.checkpointState.sessions.containsKey(sessionId)
|
||||
}.flatMap { sessionId ->
|
||||
val sessionState = startingState.checkpoint.checkpointState.sessions[sessionId]!!
|
||||
when (sessionState) {
|
||||
is SessionState.Uninitiated -> emptyList()
|
||||
is SessionState.Initiating -> {
|
||||
if (sessionState.rejectionError == null) {
|
||||
emptyList()
|
||||
} else {
|
||||
listOf(sessionState.rejectionError.exception)
|
||||
}
|
||||
}
|
||||
is SessionState.Initiated -> {
|
||||
if (sessionState.receivedMessages.isNotEmpty() && sessionState.receivedMessages.first() is ErrorSessionMessage) {
|
||||
val errorMessage = sessionState.receivedMessages.first() as ErrorSessionMessage
|
||||
val exception = convertErrorMessageToException(errorMessage, sessionState.peerParty)
|
||||
val newSessionState = sessionState.copy(receivedMessages = sessionState.receivedMessages.subList(1, sessionState.receivedMessages.size), otherSideErrored = true)
|
||||
val newCheckpoint = startingState.checkpoint.addSession(sessionId to newSessionState)
|
||||
newState = startingState.copy(checkpoint = newCheckpoint)
|
||||
listOf(exception)
|
||||
} else {
|
||||
emptyList()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
is SessionState.Initiated -> sessionState.errors.map(FlowError::exception)
|
||||
}
|
||||
return Pair(newState, errors)
|
||||
}
|
||||
|
||||
private fun convertErrorMessageToException(errorMessage: ErrorSessionMessage, peer: Party): Throwable {
|
||||
val exception: Throwable = if (errorMessage.flowException == null) {
|
||||
UnexpectedFlowEndException("Counter-flow errored", cause = null, originalErrorId = errorMessage.errorId)
|
||||
} else {
|
||||
errorMessage.flowException.originalErrorId = errorMessage.errorId
|
||||
errorMessage.flowException
|
||||
}
|
||||
when (exception) {
|
||||
// reflection used to access private field
|
||||
is UnexpectedFlowEndException -> DeclaredField<Party?>(
|
||||
UnexpectedFlowEndException::class.java,
|
||||
"peer",
|
||||
exception
|
||||
).value = peer
|
||||
is FlowException -> DeclaredField<Party?>(FlowException::class.java, "peer", exception).value = peer
|
||||
}
|
||||
return exception
|
||||
}
|
||||
|
||||
private fun collectUncloseableSessions(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {
|
||||
val uninitialisedSessions = sessionIds.mapNotNull { sessionId ->
|
||||
if (!checkpoint.checkpointState.sessions.containsKey(sessionId))
|
||||
null
|
||||
else
|
||||
sessionId to checkpoint.checkpointState.sessions[sessionId]
|
||||
}
|
||||
.filter { (_, sessionState) -> sessionState !is SessionState.Initiated }
|
||||
.map { it.first }
|
||||
|
||||
return uninitialisedSessions.map { PrematureSessionCloseException(it) }
|
||||
}
|
||||
|
||||
private fun collectErroredInitiatingSessionErrors(checkpoint: Checkpoint): List<Throwable> {
|
||||
@ -333,77 +438,64 @@ class StartedFlowTransition(
|
||||
}
|
||||
|
||||
private fun collectEndedSessionErrors(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {
|
||||
return sessionIds.mapNotNull { sessionId ->
|
||||
val sessionState = checkpoint.checkpointState.sessions[sessionId]!!
|
||||
when (sessionState) {
|
||||
is SessionState.Initiated -> {
|
||||
if (sessionState.initiatedState === InitiatedSessionState.Ended) {
|
||||
UnexpectedFlowEndException(
|
||||
"Tried to access ended session $sessionId",
|
||||
cause = null,
|
||||
originalErrorId = context.secureRandom.nextLong()
|
||||
)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
else -> null
|
||||
}
|
||||
return sessionIds.filter { sessionId ->
|
||||
!checkpoint.checkpointState.sessions.containsKey(sessionId)
|
||||
}.map {sessionId ->
|
||||
UnexpectedFlowEndException(
|
||||
"Tried to access ended session $sessionId",
|
||||
cause = null,
|
||||
originalErrorId = context.secureRandom.nextLong()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private fun collectEndedEmptySessionErrors(sessionIds: Collection<SessionId>, checkpoint: Checkpoint): List<Throwable> {
|
||||
return sessionIds.mapNotNull { sessionId ->
|
||||
val sessionState = checkpoint.checkpointState.sessions[sessionId]!!
|
||||
when (sessionState) {
|
||||
is SessionState.Initiated -> {
|
||||
if (sessionState.initiatedState === InitiatedSessionState.Ended &&
|
||||
sessionState.receivedMessages.isEmpty()) {
|
||||
UnexpectedFlowEndException(
|
||||
"Tried to access ended session $sessionId with empty buffer",
|
||||
cause = null,
|
||||
originalErrorId = context.secureRandom.nextLong()
|
||||
)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
else -> null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun collectRelevantErrorsToThrow(flowIORequest: FlowIORequest<*>, checkpoint: Checkpoint): List<Throwable> {
|
||||
private fun collectRelevantErrorsToThrow(startingState: StateMachineState, flowIORequest: FlowIORequest<*>): Pair<StateMachineState, List<Throwable>> {
|
||||
return when (flowIORequest) {
|
||||
is FlowIORequest.Send -> {
|
||||
val sessionIds = flowIORequest.sessionToMessage.keys.map(this::sessionToSessionId)
|
||||
collectErroredSessionErrors(sessionIds, checkpoint) + collectEndedSessionErrors(sessionIds, checkpoint)
|
||||
val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds)
|
||||
val endedSessionErrors = collectEndedSessionErrors(sessionIds, startingState.checkpoint)
|
||||
Pair(newState, erroredSessionErrors + endedSessionErrors)
|
||||
}
|
||||
is FlowIORequest.Receive -> {
|
||||
val sessionIds = flowIORequest.sessions.map(this::sessionToSessionId)
|
||||
collectErroredSessionErrors(sessionIds, checkpoint) + collectEndedEmptySessionErrors(sessionIds, checkpoint)
|
||||
val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds)
|
||||
val endedSessionErrors = collectEndedSessionErrors(sessionIds, startingState.checkpoint)
|
||||
Pair(newState, erroredSessionErrors + endedSessionErrors)
|
||||
}
|
||||
is FlowIORequest.SendAndReceive -> {
|
||||
val sessionIds = flowIORequest.sessionToMessage.keys.map(this::sessionToSessionId)
|
||||
collectErroredSessionErrors(sessionIds, checkpoint) + collectEndedSessionErrors(sessionIds, checkpoint)
|
||||
val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds)
|
||||
val endedSessionErrors = collectEndedSessionErrors(sessionIds, startingState.checkpoint)
|
||||
Pair(newState, erroredSessionErrors + endedSessionErrors)
|
||||
}
|
||||
is FlowIORequest.WaitForLedgerCommit -> {
|
||||
collectErroredSessionErrors(checkpoint.checkpointState.sessions.keys, checkpoint)
|
||||
return collectErroredSessionErrors(startingState, startingState.checkpoint.checkpointState.sessions.keys)
|
||||
}
|
||||
is FlowIORequest.GetFlowInfo -> {
|
||||
collectErroredSessionErrors(flowIORequest.sessions.map(this::sessionToSessionId), checkpoint)
|
||||
val sessionIds = flowIORequest.sessions.map(this::sessionToSessionId)
|
||||
val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds)
|
||||
val endedSessionErrors = collectEndedSessionErrors(sessionIds, startingState.checkpoint)
|
||||
Pair(newState, erroredSessionErrors + endedSessionErrors)
|
||||
}
|
||||
is FlowIORequest.CloseSessions -> {
|
||||
val sessionIds = flowIORequest.sessions.map(this::sessionToSessionId)
|
||||
val (newState, erroredSessionErrors) = collectErroredSessionErrors(startingState, sessionIds)
|
||||
val uncloseableSessionErrors = collectUncloseableSessions(sessionIds, startingState.checkpoint)
|
||||
Pair(newState, erroredSessionErrors + uncloseableSessionErrors)
|
||||
}
|
||||
is FlowIORequest.Sleep -> {
|
||||
emptyList()
|
||||
Pair(startingState, emptyList())
|
||||
}
|
||||
is FlowIORequest.WaitForSessionConfirmations -> {
|
||||
collectErroredInitiatingSessionErrors(checkpoint)
|
||||
val errors = collectErroredInitiatingSessionErrors(startingState.checkpoint)
|
||||
Pair(startingState, errors)
|
||||
}
|
||||
is FlowIORequest.ExecuteAsyncOperation<*> -> {
|
||||
emptyList()
|
||||
Pair(startingState, emptyList())
|
||||
}
|
||||
FlowIORequest.ForceCheckpoint -> {
|
||||
emptyList()
|
||||
Pair(startingState, emptyList())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -18,7 +18,6 @@ import net.corda.node.services.statemachine.FlowRemovalReason
|
||||
import net.corda.node.services.statemachine.FlowSessionImpl
|
||||
import net.corda.node.services.statemachine.FlowState
|
||||
import net.corda.node.services.statemachine.InitialSessionMessage
|
||||
import net.corda.node.services.statemachine.InitiatedSessionState
|
||||
import net.corda.node.services.statemachine.SenderDeduplicationId
|
||||
import net.corda.node.services.statemachine.SessionId
|
||||
import net.corda.node.services.statemachine.SessionMessage
|
||||
@ -267,8 +266,8 @@ class TopLevelTransition(
|
||||
|
||||
private fun TransitionBuilder.sendEndMessages() {
|
||||
val sendEndMessageActions = currentState.checkpoint.checkpointState.sessions.values.mapIndexed { index, state ->
|
||||
if (state is SessionState.Initiated && state.initiatedState is InitiatedSessionState.Live) {
|
||||
val message = ExistingSessionMessage(state.initiatedState.peerSinkSessionId, EndSessionMessage)
|
||||
if (state is SessionState.Initiated) {
|
||||
val message = ExistingSessionMessage(state.peerSinkSessionId, EndSessionMessage)
|
||||
val deduplicationId = DeduplicationId.createForNormal(currentState.checkpoint, index, state)
|
||||
Action.SendExisting(state.peerParty, message, SenderDeduplicationId(deduplicationId, currentState.senderUUID))
|
||||
} else {
|
||||
|
@ -81,3 +81,5 @@ class TransitionBuilder(val context: TransitionContext, initialState: StateMachi
|
||||
|
||||
class CannotFindSessionException(sessionId: SessionId) : IllegalStateException("Couldn't find session with id $sessionId")
|
||||
class UnexpectedEventInState : IllegalStateException("Unexpected event")
|
||||
class PrematureSessionCloseException(sessionId: SessionId): IllegalStateException("The following session was closed before it was initialised: $sessionId")
|
||||
class PrematureSessionEndException(sessionId: SessionId): IllegalStateException("A premature session end message was received before the session was initialised: $sessionId")
|
@ -8,7 +8,6 @@ import net.corda.node.services.statemachine.DeduplicationId
|
||||
import net.corda.node.services.statemachine.ExistingSessionMessage
|
||||
import net.corda.node.services.statemachine.FlowStart
|
||||
import net.corda.node.services.statemachine.FlowState
|
||||
import net.corda.node.services.statemachine.InitiatedSessionState
|
||||
import net.corda.node.services.statemachine.SenderDeduplicationId
|
||||
import net.corda.node.services.statemachine.SessionState
|
||||
import net.corda.node.services.statemachine.StateMachineState
|
||||
@ -45,7 +44,7 @@ class UnstartedFlowTransition(
|
||||
val initiatingMessage = flowStart.initiatingMessage
|
||||
val initiatedState = SessionState.Initiated(
|
||||
peerParty = flowStart.peerSession.counterparty,
|
||||
initiatedState = InitiatedSessionState.Live(initiatingMessage.initiatorSessionId),
|
||||
peerSinkSessionId = initiatingMessage.initiatorSessionId,
|
||||
peerFlowInfo = FlowInfo(
|
||||
flowVersion = flowStart.senderCoreFlowVersion ?: initiatingMessage.flowVersion,
|
||||
appName = initiatingMessage.appName
|
||||
@ -55,8 +54,8 @@ class UnstartedFlowTransition(
|
||||
} else {
|
||||
listOf(DataSessionMessage(initiatingMessage.firstPayload))
|
||||
},
|
||||
errors = emptyList(),
|
||||
deduplicationSeed = "D-${initiatingMessage.initiatorSessionId.toLong}-${initiatingMessage.initiationEntropy}"
|
||||
deduplicationSeed = "D-${initiatingMessage.initiatorSessionId.toLong}-${initiatingMessage.initiationEntropy}",
|
||||
otherSideErrored = false
|
||||
)
|
||||
val confirmationMessage = ConfirmSessionMessage(flowStart.initiatedSessionId, flowStart.initiatedFlowInfo)
|
||||
val sessionMessage = ExistingSessionMessage(initiatingMessage.initiatorSessionId, confirmationMessage)
|
||||
|
@ -201,7 +201,7 @@ class FlowFrameworkTests {
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `other side ends before doing expected send`() {
|
||||
fun `other side ends before doing expected send`() {
|
||||
bobNode.registerCordappFlowFactory(ReceiveFlow::class) { NoOpFlow() }
|
||||
val resultFuture = aliceNode.services.startFlow(ReceiveFlow(bob)).resultFuture
|
||||
mockNet.runNetwork()
|
||||
@ -868,6 +868,7 @@ class FlowFrameworkTests {
|
||||
session.send(1)
|
||||
// ... then pause this one until it's received the session-end message from the other side
|
||||
receivedOtherFlowEnd.acquire()
|
||||
|
||||
session.sendAndReceive<Int>(2)
|
||||
}
|
||||
}
|
||||
|
@ -247,7 +247,7 @@ class FlowMetadataRecordingTest {
|
||||
it.initialParameters.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)
|
||||
)
|
||||
assertThat(it.launchingCordapp).contains("custom-cordapp")
|
||||
assertEquals(7, it.platformVersion)
|
||||
assertEquals(8, it.platformVersion)
|
||||
assertEquals(nodeAHandle.nodeInfo.singleIdentity().name.toString(), it.startedBy)
|
||||
assertEquals(context!!.trace.invocationId.timestamp, it.invocationInstant)
|
||||
assertTrue(it.startInstant >= it.invocationInstant)
|
||||
|
@ -183,6 +183,11 @@ class RetryFlowMockTest {
|
||||
override fun send(payload: Any) {
|
||||
TODO("not implemented")
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
TODO("Not yet implemented")
|
||||
}
|
||||
|
||||
}), nodeA.services.newContext()).get()
|
||||
records.next()
|
||||
// Killing it should remove it.
|
||||
|
@ -529,7 +529,8 @@ open class InternalMockNetwork(cordappPackages: List<String> = emptyList(),
|
||||
}
|
||||
|
||||
private fun pumpAll(): Boolean {
|
||||
val transferredMessages = messagingNetwork.endpoints.map { it.pumpReceive(false) }
|
||||
val transferredMessages = messagingNetwork.endpoints.filter { it.active }
|
||||
.map { it.pumpReceive(false) }
|
||||
return transferredMessages.any { it != null }
|
||||
}
|
||||
|
||||
|
@ -173,6 +173,7 @@ class MockNodeMessagingService(private val configuration: NodeConfiguration,
|
||||
it.join()
|
||||
}
|
||||
running = false
|
||||
stateHelper.active = false
|
||||
network.netNodeHasShutdown(myAddress)
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user