diff --git a/.idea/compiler.xml b/.idea/compiler.xml index 9208ede561..53159d93be 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -49,6 +49,10 @@ + + + + @@ -318,6 +322,8 @@ + + diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index aa619cb5bc..8c7eefdf33 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -120,14 +120,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration, cacheFactoryPrototype: BindableNamedCacheFactory, protected val versionInfo: VersionInfo, protected val flowManager: FlowManager, - protected val serverThread: AffinityExecutor.ServiceAffinityExecutor, - protected val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() { + val serverThread: AffinityExecutor.ServiceAffinityExecutor, + val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() { protected abstract val log: Logger @Suppress("LeakingThis") private var tokenizableServices: MutableList? = mutableListOf(platformClock, this) - protected val metricRegistry = MetricRegistry() + val metricRegistry = MetricRegistry() protected val cacheFactory = cacheFactoryPrototype.bindWithConfig(configuration).bindWithMetrics(metricRegistry).tokenize() val monitoringService = MonitoringService(metricRegistry).tokenize() @@ -144,7 +144,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } } - protected val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo) + val cordappLoader: CordappLoader = makeCordappLoader(configuration, versionInfo) val schemaService = NodeSchemaService(cordappLoader.cordappSchemas).tokenize() val identityService = PersistentIdentityService(cacheFactory).tokenize() val database: CordaPersistence = createCordaPersistence( diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index 95ec986c19..d941b83c23 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -33,7 +33,6 @@ import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler -import net.corda.node.services.messaging.ReceivedMessage import net.corda.node.services.statemachine.interceptors.* import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor @@ -97,8 +96,6 @@ class MultiThreadedStateMachineManager( val timedFlows = ConcurrentHashMap() } - override val flowHospital: StaffedFlowHospital = StaffedFlowHospital() - private val concurrentBox = ConcurrentBox(InnerState()) private val scheduler = FiberExecutorScheduler("Flow fiber scheduler", executor) @@ -111,17 +108,18 @@ class MultiThreadedStateMachineManager( private val sessionToFlow = ConcurrentHashMap() private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub) private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null - private val transitionExecutor = makeTransitionExecutor() private val ourSenderUUID get() = serviceHub.networkService.ourSenderUUID private var checkpointSerializationContext: CheckpointSerializationContext? = null private var tokenizableServices: List? = null private var actionExecutor: ActionExecutor? = null + override val flowHospital: StaffedFlowHospital = StaffedFlowHospital(flowMessaging, ourSenderUUID) + private val transitionExecutor = makeTransitionExecutor() + override val allStateMachines: List> get() = concurrentBox.content.flows.values.map { it.fiber.logic } - private val totalStartedFlows = metrics.counter("Flows.Started") private val totalFinishedFlows = metrics.counter("Flows.Finished") private val totalSuccessFlows = metrics.counter("Flows.Success") @@ -214,7 +212,7 @@ class MultiThreadedStateMachineManager( invocationContext = context, flowLogic = flowLogic, flowStart = FlowStart.Explicit, - ourIdentity = ourIdentity ?: getOurFirstIdentity(), + ourIdentity = ourIdentity ?: ourFirstIdentity, deduplicationHandler = deduplicationHandler, isStartIdempotent = false ) @@ -408,21 +406,19 @@ class MultiThreadedStateMachineManager( } private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) { - val message: ReceivedMessage = event.receivedMessage - val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler - val peer = message.peer + val peer = event.receivedMessage.peer val sessionMessage = try { - message.data.deserialize() + event.receivedMessage.data.deserialize() } catch (ex: Exception) { logger.error("Received corrupt SessionMessage data from $peer") - deduplicationHandler.afterDatabaseTransaction() + event.deduplicationHandler.afterDatabaseTransaction() return } val sender = serviceHub.networkMapCache.getPeerByLegalName(peer) if (sender != null) { when (sessionMessage) { - is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, deduplicationHandler, sender) - is InitialSessionMessage -> onSessionInit(sessionMessage, message.platformVersion, deduplicationHandler, sender) + is ExistingSessionMessage -> onExistingSessionMessage(sessionMessage, event.deduplicationHandler, sender) + is InitialSessionMessage -> onSessionInit(sessionMessage, sender, event) } } else { logger.error("Unknown peer $peer in $sessionMessage") @@ -453,13 +449,8 @@ class MultiThreadedStateMachineManager( } } - private fun onSessionInit(sessionMessage: InitialSessionMessage, senderPlatformVersion: Int, deduplicationHandler: DeduplicationHandler, sender: Party) { - fun createErrorMessage(initiatorSessionId: SessionId, message: String): ExistingSessionMessage { - val errorId = secureRandom.nextLong() - val payload = RejectSessionMessage(message, errorId) - return ExistingSessionMessage(initiatorSessionId, payload) - } - val replyError = try { + private fun onSessionInit(sessionMessage: InitialSessionMessage, sender: Party, event: ExternalEvent.ExternalMessageEvent) { + try { val initiatedFlowFactory = getInitiatedFlowFactory(sessionMessage) val initiatedSessionId = SessionId.createRandom(secureRandom) val senderSession = FlowSessionImpl(sender, initiatedSessionId) @@ -469,40 +460,34 @@ class MultiThreadedStateMachineManager( is InitiatedFlowFactory.CorDapp -> FlowInfo(initiatedFlowFactory.flowVersion, initiatedFlowFactory.appName) } val senderCoreFlowVersion = when (initiatedFlowFactory) { - is InitiatedFlowFactory.Core -> senderPlatformVersion + is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion is InitiatedFlowFactory.CorDapp -> null } - startInitiatedFlow(flowLogic, deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo) - null - } catch (exception: Exception) { - logger.warn("Exception while creating initiated flow", exception) - createErrorMessage( - sessionMessage.initiatorSessionId, - (exception as? SessionRejectException)?.message ?: "Unable to establish session" - ) - } - - if (replyError != null) { - flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) - deduplicationHandler.afterDatabaseTransaction() + startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo) + } catch (t: Throwable) { + logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " + + "flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t) + flowHospital.sessionInitErrored(sessionMessage, sender, event, t) } } // TODO this is a temporary hack until we figure out multiple identities - private fun getOurFirstIdentity(): Party { - return serviceHub.myInfo.legalIdentities[0] - } + private val ourFirstIdentity: Party get() = serviceHub.myInfo.legalIdentities[0] private fun getInitiatedFlowFactory(message: InitialSessionMessage): InitiatedFlowFactory<*> { - val initiatingFlowClass = try { - Class.forName(message.initiatorFlowClassName, true, classloader).asSubclass(FlowLogic::class.java) + val initiatorClass = try { + Class.forName(message.initiatorFlowClassName, true, classloader) } catch (e: ClassNotFoundException) { - throw SessionRejectException("Don't know ${message.initiatorFlowClassName}") - } catch (e: ClassCastException) { - throw SessionRejectException("${message.initiatorFlowClassName} is not a flow") + throw SessionRejectException.UnknownClass(message.initiatorFlowClassName) } - return serviceHub.getFlowFactory(initiatingFlowClass) ?: - throw SessionRejectException("$initiatingFlowClass is not registered") + + val initiatorFlowClass = try { + initiatorClass.asSubclass(FlowLogic::class.java) + } catch (e: ClassCastException) { + throw SessionRejectException.NotAFlow(initiatorClass) + } + + return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass) } private fun startInitiatedFlow( @@ -515,7 +500,7 @@ class MultiThreadedStateMachineManager( initiatedFlowInfo: FlowInfo ) { val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo) - val ourIdentity = getOurFirstIdentity() + val ourIdentity = ourFirstIdentity startFlowInternal( InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity, initiatingMessageDeduplicationHandler, diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt new file mode 100644 index 0000000000..8a9156af4a --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt @@ -0,0 +1,166 @@ +package net.corda.node.services.statemachine + +import net.corda.core.crypto.random63BitValue +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.registerCordappFlowFactory +import net.corda.core.identity.Party +import net.corda.core.utilities.getOrThrow +import net.corda.node.services.persistence.checkpoints +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.internal.LogHelper +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.internal.* +import org.assertj.core.api.Assertions.assertThat +import org.junit.After +import org.junit.Before +import org.junit.Ignore +import org.junit.Test +import rx.Observable +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class FlowFrameworkPersistenceTests { + companion object { + init { + LogHelper.setLevel("+net.corda.flow") + } + } + + private lateinit var mockNet: InternalMockNetwork + private val receivedSessionMessages = ArrayList() + private lateinit var aliceNode: TestStartedNode + private lateinit var bobNode: TestStartedNode + private lateinit var notaryIdentity: Party + private lateinit var alice: Party + private lateinit var bob: Party + private lateinit var aliceFlowManager: MockNodeFlowManager + private lateinit var bobFlowManager: MockNodeFlowManager + + @Before + fun start() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), + servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin() + ) + aliceFlowManager = MockNodeFlowManager() + bobFlowManager = MockNodeFlowManager() + + aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager)) + bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager)) + + receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } + + // Extract identities + alice = aliceNode.info.singleIdentity() + bob = bobNode.info.singleIdentity() + notaryIdentity = mockNet.defaultNotaryIdentity + } + + @After + fun cleanUp() { + mockNet.stopNodes() + receivedSessionMessages.clear() + } + + @Test + fun `newly added flow is preserved on restart`() { + aliceNode.services.startFlow(NoOpFlow(nonTerminating = true)) + aliceNode.internals.acceptableLiveFiberCountOnStop = 1 + val restoredFlow = aliceNode.restartAndGetRestoredFlow() + assertThat(restoredFlow.flowStarted).isTrue() + } + + @Test + fun `flow restarted just after receiving payload`() { + bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) + .nonTerminating() } + aliceNode.services.startFlow(SendFlow("Hello", bob)) + + // We push through just enough messages to get only the payload sent + bobNode.pumpReceive() + bobNode.internals.disableDBCloseOnStop() + bobNode.internals.acceptableLiveFiberCountOnStop = 1 + bobNode.dispose() + mockNet.runNetwork() + val restoredFlow = bobNode.restartAndGetRestoredFlow() + assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") + } + + @Test + fun `flow loaded from checkpoint will respond to messages from before start`() { + aliceNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) } + bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow + val restoredFlow = bobNode.restartAndGetRestoredFlow() + assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") + } + + @Ignore("Some changes in startup order make this test's assumptions fail.") + @Test + fun `flow with send will resend on interrupted restart`() { + val payload = random63BitValue() + val payload2 = random63BitValue() + + var sentCount = 0 + mockNet.messagingNetwork.sentMessages.toSessionTransfers().filter { it.isPayloadTransfer }.forEach { sentCount++ } + val charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME)) + val secondFlow = charlieNode.registerCordappFlowFactory(PingPongFlow::class) { PingPongFlow(it, payload2) } + mockNet.runNetwork() + val charlie = charlieNode.info.singleIdentity() + + // Kick off first send and receive + bobNode.services.startFlow(PingPongFlow(charlie, payload)) + bobNode.database.transaction { + assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) + } + // Make sure the add() has finished initial processing. + bobNode.internals.disableDBCloseOnStop() + // Restart node and thus reload the checkpoint and resend the message with same UUID + bobNode.dispose() + bobNode.database.transaction { + assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) // confirm checkpoint + bobNode.services.networkMapCache.clearNetworkMapCache() + } + val node2b = mockNet.createNode(InternalMockNodeParameters(bobNode.internals.id)) + bobNode.internals.manuallyCloseDB() + val (firstAgain, fut1) = node2b.getSingleFlow() + // Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync. + mockNet.runNetwork() + fut1.getOrThrow() + + val receivedCount = receivedSessionMessages.count { it.isPayloadTransfer } + // Check flows completed cleanly and didn't get out of phase + assertEquals(4, receivedCount, "Flow should have exchanged 4 unique messages")// Two messages each way + // can't give a precise value as every addMessageHandler re-runs the undelivered messages + assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages") + node2b.database.transaction { + assertEquals(0, node2b.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended") + } + charlieNode.database.transaction { + assertEquals(0, charlieNode.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended") + } + assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3") + assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3") + assertEquals(payload, secondFlow.getOrThrow().receivedPayload, "Received payload does not match the (restarted) first value on Node 2") + assertEquals(payload + 1, secondFlow.getOrThrow().receivedPayload2, "Received payload does not match the expected second value on Node 2") + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////// + //region Helpers + + private inline fun > TestStartedNode.restartAndGetRestoredFlow(): P { + val newNode = mockNet.restartNode(this) + newNode.internals.acceptableLiveFiberCountOnStop = 1 + mockNet.runNetwork() + return newNode.getSingleFlow

().first + } + + private fun receivedSessionMessagesObservable(): Observable { + return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() + } + + //endregion Helpers +} diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index 53ec7d9f2a..23b919eb66 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -40,16 +40,13 @@ import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType import org.junit.After import org.junit.Before -import org.junit.Ignore import org.junit.Test import rx.Notification import rx.Observable import java.time.Instant import java.util.* import kotlin.reflect.KClass -import kotlin.test.assertEquals import kotlin.test.assertFailsWith -import kotlin.test.assertTrue class FlowFrameworkTests { companion object { @@ -449,320 +446,142 @@ class FlowFrameworkTests { private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) - private fun TestStartedNode.sendSessionMessage(message: SessionMessage, destination: Party) { - services.networkService.apply { - val address = getAddressOfParty(PartyInfo.SingleNode(destination, emptyList())) - send(createMessage(FlowMessagingImpl.sessionTopic, message.serialize().bytes), address) - } - } - private fun assertSessionTransfers(vararg expected: SessionTransfer) { assertThat(receivedSessionMessages).containsExactly(*expected) } - //endregion Helpers -} + private val FlowLogic<*>.progressSteps: CordaFuture>> + get() { + return progressTracker!!.changes + .ofType(Change.Position::class.java) + .map { it.newStep } + .materialize() + .toList() + .toFuture() + } -class FlowFrameworkTripartyTests { + @InitiatingFlow + private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party, + @Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic() { + @Suspendable + override fun call() { + // Kick off the flow on the other side ... + val session = initiateFlow(otherParty) + session.send(1) + // ... then pause this one until it's received the session-end message from the other side + receivedOtherFlowEnd.acquire() + session.sendAndReceive(2) + } + } - companion object { + // we need brand new class for a flow to fail, so here it is + @InitiatingFlow + private open class NeverRegisteredFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic() { init { - LogHelper.setLevel("+net.corda.flow") + require(otherParties.isNotEmpty()) } - private lateinit var mockNet: InternalMockNetwork - private lateinit var aliceNode: TestStartedNode - private lateinit var bobNode: TestStartedNode - private lateinit var charlieNode: TestStartedNode - private lateinit var alice: Party - private lateinit var bob: Party - private lateinit var charlie: Party - private lateinit var notaryIdentity: Party - private val receivedSessionMessages = ArrayList() - } - - @Before - fun setUpGlobalMockNet() { - mockNet = InternalMockNetwork( - cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), - servicePeerAllocationStrategy = RoundRobin() - ) - - aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) - bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME)) - charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME)) - - - // Extract identities - alice = aliceNode.info.singleIdentity() - bob = bobNode.info.singleIdentity() - charlie = charlieNode.info.singleIdentity() - notaryIdentity = mockNet.defaultNotaryIdentity - - receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } - } - - @After - fun cleanUp() { - mockNet.stopNodes() - receivedSessionMessages.clear() - } - - private fun receivedSessionMessagesObservable(): Observable { - return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() - } - - @Test - fun `sending to multiple parties`() { - bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() } - charlieNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() } - val payload = "Hello World" - aliceNode.services.startFlow(SendFlow(payload, bob, charlie)) - mockNet.runNetwork() - bobNode.internals.acceptableLiveFiberCountOnStop = 1 - charlieNode.internals.acceptableLiveFiberCountOnStop = 1 - val bobFlow = bobNode.getSingleFlow().first - val charlieFlow = charlieNode.getSingleFlow().first - assertThat(bobFlow.receivedPayloads[0]).isEqualTo(payload) - assertThat(charlieFlow.receivedPayloads[0]).isEqualTo(payload) - - assertSessionTransfers(bobNode, - aliceNode sent sessionInit(SendFlow::class, payload = payload) to bobNode, - bobNode sent sessionConfirm() to aliceNode, - aliceNode sent normalEnd to bobNode - //There's no session end from the other flows as they're manually suspended - ) - - assertSessionTransfers(charlieNode, - aliceNode sent sessionInit(SendFlow::class, payload = payload) to charlieNode, - charlieNode sent sessionConfirm() to aliceNode, - aliceNode sent normalEnd to charlieNode - //There's no session end from the other flows as they're manually suspended - ) - } - - @Test - fun `receiving from multiple parties`() { - val bobPayload = "Test 1" - val charliePayload = "Test 2" - bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(bobPayload, it) } - charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(charliePayload, it) } - val multiReceiveFlow = ReceiveFlow(bob, charlie).nonTerminating() - aliceNode.services.startFlow(multiReceiveFlow) - aliceNode.internals.acceptableLiveFiberCountOnStop = 1 - mockNet.runNetwork() - assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(bobPayload) - assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(charliePayload) - - assertSessionTransfers(bobNode, - aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, - bobNode sent sessionConfirm() to aliceNode, - bobNode sent sessionData(bobPayload) to aliceNode, - bobNode sent normalEnd to aliceNode - ) - - assertSessionTransfers(charlieNode, - aliceNode sent sessionInit(ReceiveFlow::class) to charlieNode, - charlieNode sent sessionConfirm() to aliceNode, - charlieNode sent sessionData(charliePayload) to aliceNode, - charlieNode sent normalEnd to aliceNode - ) - } - - @Test - fun `FlowException only propagated to parent`() { - charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } } - bobNode.registerCordappFlowFactory(ReceiveFlow::class) { ReceiveFlow(charlie) } - val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob)) - mockNet.runNetwork() - assertThatExceptionOfType(UnexpectedFlowEndException::class.java) - .isThrownBy { receivingFiber.resultFuture.getOrThrow() } - } - - @Test - fun `FlowException thrown and there is a 3rd unrelated party flow`() { - // Bob will send its payload and then block waiting for the receive from Alice. Meanwhile Alice will move - // onto Charlie which will throw the exception - val node2Fiber = bobNode - .registerCordappFlowFactory(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") } - .map { it.stateMachine } - charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } } - - val aliceFiber = aliceNode.services.startFlow(ReceiveFlow(bob, charlie)) as FlowStateMachineImpl - mockNet.runNetwork() - - // Alice will terminate with the error it received from Charlie but it won't propagate that to Bob (as it's - // not relevant to it) but it will end its session with it - assertThatExceptionOfType(MyFlowException::class.java).isThrownBy { - aliceFiber.resultFuture.getOrThrow() - } - val bobResultFuture = node2Fiber.getOrThrow().resultFuture - assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy { - bobResultFuture.getOrThrow() - } - - assertSessionTransfers(bobNode, - aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, - bobNode sent sessionConfirm() to aliceNode, - bobNode sent sessionData("Hello") to aliceNode, - aliceNode sent errorMessage() to bobNode - ) - } - - private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) - - private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List { - val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress } - assertThat(actualForNode).containsExactly(*expected) - return actualForNode - } - -} - -class FlowFrameworkPersistenceTests { - companion object { - init { - LogHelper.setLevel("+net.corda.flow") + @Suspendable + override fun call(): FlowInfo { + val flowInfos = otherParties.map { + val session = initiateFlow(it) + session.send(payload) + session.getCounterpartyFlowInfo() + }.toList() + return flowInfos.first() } } - private lateinit var mockNet: InternalMockNetwork - private val receivedSessionMessages = ArrayList() - private lateinit var aliceNode: TestStartedNode - private lateinit var bobNode: TestStartedNode - private lateinit var notaryIdentity: Party - private lateinit var alice: Party - private lateinit var bob: Party - private lateinit var aliceFlowManager: MockNodeFlowManager - private lateinit var bobFlowManager: MockNodeFlowManager - - @Before - fun start() { - mockNet = InternalMockNetwork( - cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), - servicePeerAllocationStrategy = RoundRobin() - ) - aliceFlowManager = MockNodeFlowManager() - bobFlowManager = MockNodeFlowManager() - - aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager)) - bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager)) - - receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } - - // Extract identities - alice = aliceNode.info.singleIdentity() - bob = bobNode.info.singleIdentity() - notaryIdentity = mockNet.defaultNotaryIdentity - } - - @After - fun cleanUp() { - mockNet.stopNodes() - receivedSessionMessages.clear() - } - - @Test - fun `newly added flow is preserved on restart`() { - aliceNode.services.startFlow(NoOpFlow(nonTerminating = true)) - aliceNode.internals.acceptableLiveFiberCountOnStop = 1 - val restoredFlow = aliceNode.restartAndGetRestoredFlow() - assertThat(restoredFlow.flowStarted).isTrue() - } - - @Test - fun `flow restarted just after receiving payload`() { - bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it).nonTerminating() } - aliceNode.services.startFlow(SendFlow("Hello", bob)) - - // We push through just enough messages to get only the payload sent - bobNode.pumpReceive() - bobNode.internals.disableDBCloseOnStop() - bobNode.internals.acceptableLiveFiberCountOnStop = 1 - bobNode.dispose() - mockNet.runNetwork() - val restoredFlow = bobNode.restartAndGetRestoredFlow() - assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") - } - - @Test - fun `flow loaded from checkpoint will respond to messages from before start`() { - aliceNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) } - bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow - val restoredFlow = bobNode.restartAndGetRestoredFlow() - assertThat(restoredFlow.receivedPayloads[0]).isEqualTo("Hello") - } - - @Ignore("Some changes in startup order make this test's assumptions fail.") - @Test - fun `flow with send will resend on interrupted restart`() { - val payload = random63BitValue() - val payload2 = random63BitValue() - - var sentCount = 0 - mockNet.messagingNetwork.sentMessages.toSessionTransfers().filter { it.isPayloadTransfer }.forEach { sentCount++ } - val charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME)) - val secondFlow = charlieNode.registerCordappFlowFactory(PingPongFlow::class) { PingPongFlow(it, payload2) } - mockNet.runNetwork() - val charlie = charlieNode.info.singleIdentity() - - // Kick off first send and receive - bobNode.services.startFlow(PingPongFlow(charlie, payload)) - bobNode.database.transaction { - assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) + private object WaitingFlows { + @InitiatingFlow + class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val otherPartySession = initiateFlow(otherParty) + otherPartySession.send(stx) + return waitForLedgerCommit(stx.id) + } } - // Make sure the add() has finished initial processing. - bobNode.internals.disableDBCloseOnStop() - // Restart node and thus reload the checkpoint and resend the message with same UUID - bobNode.dispose() - bobNode.database.transaction { - assertEquals(1, bobNode.internals.checkpointStorage.checkpoints().size) // confirm checkpoint - bobNode.services.networkMapCache.clearNetworkMapCache() - } - val node2b = mockNet.createNode(InternalMockNodeParameters(bobNode.internals.id)) - bobNode.internals.manuallyCloseDB() - val (firstAgain, fut1) = node2b.getSingleFlow() - // Run the network which will also fire up the second flow. First message should get deduped. So message data stays in sync. - mockNet.runNetwork() - fut1.getOrThrow() - val receivedCount = receivedSessionMessages.count { it.isPayloadTransfer } - // Check flows completed cleanly and didn't get out of phase - assertEquals(4, receivedCount, "Flow should have exchanged 4 unique messages")// Two messages each way - // can't give a precise value as every addMessageHandler re-runs the undelivered messages - assertTrue(sentCount > receivedCount, "Node restart should have retransmitted messages") - node2b.database.transaction { - assertEquals(0, node2b.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended") + class Committer(val otherPartySession: FlowSession, val throwException: (() -> Exception)? = null) : FlowLogic() { + @Suspendable + override fun call(): SignedTransaction { + val stx = otherPartySession.receive().unwrap { it } + if (throwException != null) throw throwException.invoke() + return subFlow(FinalityFlow(stx, setOf(otherPartySession.counterparty))) + } } - charlieNode.database.transaction { - assertEquals(0, charlieNode.internals.checkpointStorage.checkpoints().size, "Checkpoints left after restored flow should have ended") - } - assertEquals(payload2, firstAgain.receivedPayload, "Received payload does not match the first value on Node 3") - assertEquals(payload2 + 1, firstAgain.receivedPayload2, "Received payload does not match the expected second value on Node 3") - assertEquals(payload, secondFlow.getOrThrow().receivedPayload, "Received payload does not match the (restarted) first value on Node 2") - assertEquals(payload + 1, secondFlow.getOrThrow().receivedPayload2, "Received payload does not match the expected second value on Node 2") } - //////////////////////////////////////////////////////////////////////////////////////////////////////////// - //region Helpers - - private inline fun > TestStartedNode.restartAndGetRestoredFlow(): P { - val newNode = mockNet.restartNode(this) - newNode.internals.acceptableLiveFiberCountOnStop = 1 - mockNet.runNetwork() - return newNode.getSingleFlow

().first + private class LazyServiceHubAccessFlow : FlowLogic() { + val lazyTime: Instant by lazy { serviceHub.clock.instant() } + @Suspendable + override fun call() = Unit } - private fun receivedSessionMessagesObservable(): Observable { - return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() + private interface CustomInterface + + private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) + + @InitiatingFlow + private class IncorrectCustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) + + @InitiatingFlow + private class VaultQueryFlow(val stx: SignedTransaction, val otherParty: Party) : FlowLogic>>() { + @Suspendable + override fun call(): List> { + val otherPartySession = initiateFlow(otherParty) + otherPartySession.send(stx) + // hold onto reference here to force checkpoint of vaultService and thus + // prove it is registered as a tokenizableService in the node + val vaultQuerySvc = serviceHub.vaultService + waitForLedgerCommit(stx.id) + return vaultQuerySvc.queryBy().states + } + } + + @InitiatingFlow(version = 2) + private class UpgradedFlow(val otherParty: Party, val otherPartySession: FlowSession? = null) : FlowLogic>() { + constructor(otherPartySession: FlowSession) : this(otherPartySession.counterparty, otherPartySession) + + @Suspendable + override fun call(): Pair { + val otherPartySession = this.otherPartySession ?: initiateFlow(otherParty) + val received = otherPartySession.receive().unwrap { it } + val otherFlowVersion = otherPartySession.getCounterpartyFlowInfo().flowVersion + return Pair(received, otherFlowVersion) + } + } + + private class SingleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + val payload = otherPartySession.receive().unwrap { it } + subFlow(InlinedSendFlow(payload + payload, otherPartySession)) + } + } + + private class DoubleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() { + subFlow(SingleInlinedSubFlow(otherPartySession)) + } + } + + private data class NonSerialisableData(val a: Int) + private class NonSerialisableFlowException(@Suppress("unused") val data: NonSerialisableData) : FlowException() + + private class InlinedSendFlow(val payload: String, val otherPartySession: FlowSession) : FlowLogic() { + @Suspendable + override fun call() = otherPartySession.send(payload) } //endregion Helpers } -private fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, ""))) +internal fun sessionConfirm(flowVersion: Int = 1) = ExistingSessionMessage(SessionId(0), ConfirmSessionMessage(SessionId(0), FlowInfo(flowVersion, ""))) -private inline fun > TestStartedNode.getSingleFlow(): Pair> { +internal inline fun > TestStartedNode.getSingleFlow(): Pair> { return smm.findStateMachines(P::class.java).single() } @@ -786,7 +605,7 @@ private fun sanitise(message: SessionMessage) = when (message) { } } -private fun Observable.toSessionTransfers(): Observable { +internal fun Observable.toSessionTransfers(): Observable { return filter { it.getMessage().topic == FlowMessagingImpl.sessionTopic }.map { val from = it.sender.id val message = it.messageData.deserialize() @@ -794,12 +613,19 @@ private fun Observable.toSessionTransfers(): Observable = Pair(internals.id, message) -private infix fun Pair.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress) +internal fun errorMessage(errorResponse: FlowException? = null) = ExistingSessionMessage(SessionId(0), ErrorSessionMessage(errorResponse, 0)) -private data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) { +internal infix fun TestStartedNode.sent(message: SessionMessage): Pair = Pair(internals.id, message) +internal infix fun Pair.to(node: TestStartedNode): SessionTransfer = SessionTransfer(first, second, node.network.myAddress) + +internal data class SessionTransfer(val from: Int, val message: SessionMessage, val to: MessageRecipients) { val isPayloadTransfer: Boolean get() = message is ExistingSessionMessage && message.payload is DataSessionMessage || @@ -808,40 +634,14 @@ private data class SessionTransfer(val from: Int, val message: SessionMessage, v override fun toString(): String = "$from sent $message to $to" } - -private fun sessionInit(clientFlowClass: KClass>, flowVersion: Int = 1, payload: Any? = null): InitialSessionMessage { +internal fun sessionInit(clientFlowClass: KClass>, flowVersion: Int = 1, payload: Any? = null): InitialSessionMessage { return InitialSessionMessage(SessionId(0), 0, clientFlowClass.java.name, flowVersion, "", payload?.serialize()) } -private fun sessionData(payload: Any) = ExistingSessionMessage(SessionId(0), DataSessionMessage(payload.serialize())) - - -private val FlowLogic<*>.progressSteps: CordaFuture>> - get() { - return progressTracker!!.changes - .ofType(Change.Position::class.java) - .map { it.newStep } - .materialize() - .toList() - .toFuture() - } +internal fun sessionData(payload: Any) = ExistingSessionMessage(SessionId(0), DataSessionMessage(payload.serialize())) @InitiatingFlow -private class WaitForOtherSideEndBeforeSendAndReceive(val otherParty: Party, - @Transient val receivedOtherFlowEnd: Semaphore) : FlowLogic() { - @Suspendable - override fun call() { - // Kick off the flow on the other side ... - val session = initiateFlow(otherParty) - session.send(1) - // ... then pause this one until it's received the session-end message from the other side - receivedOtherFlowEnd.acquire() - session.sendAndReceive(2) - } -} - -@InitiatingFlow -private open class SendFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic() { +internal open class SendFlow(private val payload: Any, private vararg val otherParties: Party) : FlowLogic() { init { require(otherParties.isNotEmpty()) } @@ -857,46 +657,7 @@ private open class SendFlow(val payload: Any, vararg val otherParties: Party) : } } -// we need brand new class for a flow to fail, so here it is -@InitiatingFlow -private open class NeverRegisteredFlow(val payload: Any, vararg val otherParties: Party) : FlowLogic() { - init { - require(otherParties.isNotEmpty()) - } - - @Suspendable - override fun call(): FlowInfo { - val flowInfos = otherParties.map { - val session = initiateFlow(it) - session.send(payload) - session.getCounterpartyFlowInfo() - }.toList() - return flowInfos.first() - } -} - -private object WaitingFlows { - @InitiatingFlow - class Waiter(val stx: SignedTransaction, val otherParty: Party) : FlowLogic() { - @Suspendable - override fun call(): SignedTransaction { - val otherPartySession = initiateFlow(otherParty) - otherPartySession.send(stx) - return waitForLedgerCommit(stx.id) - } - } - - class Committer(val otherPartySession: FlowSession, val throwException: (() -> Exception)? = null) : FlowLogic() { - @Suspendable - override fun call(): SignedTransaction { - val stx = otherPartySession.receive().unwrap { it } - if (throwException != null) throw throwException.invoke() - return subFlow(FinalityFlow(stx, setOf(otherPartySession.counterparty))) - } - } -} - -private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic() { +internal class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic() { @Transient var flowStarted = false @@ -909,7 +670,7 @@ private class NoOpFlow(val nonTerminating: Boolean = false) : FlowLogic() } } -private class InitiatedReceiveFlow(val otherPartySession: FlowSession) : FlowLogic() { +internal class InitiatedReceiveFlow(private val otherPartySession: FlowSession) : FlowLogic() { object START_STEP : ProgressTracker.Step("Starting") object RECEIVED_STEP : ProgressTracker.Step("Received") @@ -934,26 +695,13 @@ private class InitiatedReceiveFlow(val otherPartySession: FlowSession) : FlowLog } } -private class LazyServiceHubAccessFlow : FlowLogic() { - val lazyTime: Instant by lazy { serviceHub.clock.instant() } - @Suspendable - override fun call() = Unit -} - -private open class InitiatedSendFlow(val payload: Any, val otherPartySession: FlowSession) : FlowLogic() { +internal open class InitiatedSendFlow(private val payload: Any, private val otherPartySession: FlowSession) : FlowLogic() { @Suspendable override fun call() = otherPartySession.send(payload) } -private interface CustomInterface - -private class CustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) - @InitiatingFlow -private class IncorrectCustomSendFlow(payload: String, otherParty: Party) : CustomInterface, SendFlow(payload, otherParty) - -@InitiatingFlow -private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic() { +internal class ReceiveFlow(private vararg val otherParties: Party) : FlowLogic() { object START_STEP : ProgressTracker.Step("Starting") object RECEIVED_STEP : ProgressTracker.Step("Received") @@ -982,72 +730,23 @@ private class ReceiveFlow(vararg val otherParties: Party) : FlowLogic() { } } -private class MyFlowException(override val message: String) : FlowException() { +internal class MyFlowException(override val message: String) : FlowException() { override fun equals(other: Any?): Boolean = other is MyFlowException && other.message == this.message override fun hashCode(): Int = message.hashCode() } @InitiatingFlow -private class VaultQueryFlow(val stx: SignedTransaction, val otherParty: Party) : FlowLogic>>() { - @Suspendable - override fun call(): List> { - val otherPartySession = initiateFlow(otherParty) - otherPartySession.send(stx) - // hold onto reference here to force checkpoint of vaultService and thus - // prove it is registered as a tokenizableService in the node - val vaultQuerySvc = serviceHub.vaultService - waitForLedgerCommit(stx.id) - return vaultQuerySvc.queryBy().states - } -} - -@InitiatingFlow(version = 2) -private class UpgradedFlow(val otherParty: Party, val otherPartySession: FlowSession? = null) : FlowLogic>() { - constructor(otherPartySession: FlowSession) : this(otherPartySession.counterparty, otherPartySession) - - @Suspendable - override fun call(): Pair { - val otherPartySession = this.otherPartySession ?: initiateFlow(otherParty) - val received = otherPartySession.receive().unwrap { it } - val otherFlowVersion = otherPartySession.getCounterpartyFlowInfo().flowVersion - return Pair(received, otherFlowVersion) - } -} - -private class SingleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic() { - @Suspendable - override fun call() { - val payload = otherPartySession.receive().unwrap { it } - subFlow(InlinedSendFlow(payload + payload, otherPartySession)) - } -} - -private class DoubleInlinedSubFlow(val otherPartySession: FlowSession) : FlowLogic() { - @Suspendable - override fun call() { - subFlow(SingleInlinedSubFlow(otherPartySession)) - } -} - -private data class NonSerialisableData(val a: Int) -private class NonSerialisableFlowException(@Suppress("unused") val data: NonSerialisableData) : FlowException() - -@InitiatingFlow -private class SendAndReceiveFlow(val otherParty: Party, val payload: Any, val otherPartySession: FlowSession? = null) : FlowLogic() { +internal class SendAndReceiveFlow(private val otherParty: Party, private val payload: Any, private val otherPartySession: FlowSession? = null) : FlowLogic() { constructor(otherPartySession: FlowSession, payload: Any) : this(otherPartySession.counterparty, payload, otherPartySession) @Suspendable - override fun call(): Any = (otherPartySession - ?: initiateFlow(otherParty)).sendAndReceive(payload).unwrap { it } -} - -private class InlinedSendFlow(val payload: String, val otherPartySession: FlowSession) : FlowLogic() { - @Suspendable - override fun call() = otherPartySession.send(payload) + override fun call(): Any { + return (otherPartySession ?: initiateFlow(otherParty)).sendAndReceive(payload).unwrap { it } + } } @InitiatingFlow -private class PingPongFlow(val otherParty: Party, val payload: Long, val otherPartySession: FlowSession? = null) : FlowLogic() { +internal class PingPongFlow(private val otherParty: Party, private val payload: Long, private val otherPartySession: FlowSession? = null) : FlowLogic() { constructor(otherPartySession: FlowSession, payload: Long) : this(otherPartySession.counterparty, payload, otherPartySession) @Transient @@ -1063,7 +762,7 @@ private class PingPongFlow(val otherParty: Party, val payload: Long, val otherPa } } -private class ExceptionFlow(val exception: () -> E) : FlowLogic() { +internal class ExceptionFlow(val exception: () -> E) : FlowLogic() { object START_STEP : ProgressTracker.Step("Starting") override val progressTracker: ProgressTracker = ProgressTracker(START_STEP) @@ -1075,4 +774,4 @@ private class ExceptionFlow(val exception: () -> E) : FlowLogic() + + @Before + fun setUpMockNet() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = cordappsForPackages("net.corda.testing.contracts"), + servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin() + ) + + aliceNode = createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) + bobNode = createNode(InternalMockNodeParameters(legalName = BOB_NAME)) + + // Extract identities + alice = aliceNode.info.singleIdentity() + bob = bobNode.info.singleIdentity() + notaryIdentity = mockNet.defaultNotaryIdentity + + receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } + } + + private fun receivedSessionMessagesObservable(): Observable { + return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() + } + + @After + fun cleanUp() { + mockNet.stopNodes() + receivedSessionMessages.clear() + } + + private fun createNode(parameters: InternalMockNodeParameters): TestStartedNode { + return mockNet.createNode(parameters) { + object : InternalMockNetwork.MockNode(it) { + override fun makeStateMachineManager(): StateMachineManager { + val executor = MultiThreadedStateMachineExecutor(metricRegistry, 1) + return MultiThreadedStateMachineManager( + services, + checkpointStorage, + executor, + database, + newSecureRandom(), + busyNodeLatch, + cordappLoader.appClassLoader + ) + } + } + } + } + + @Test + fun `session init with unknown class is sent to the flow hospital, from where it's dropped`() { + aliceNode.sendSessionMessage(InitialSessionMessage(SessionId(random63BitValue()), 0, "not.a.real.Class", 1, "", null), bob) + mockNet.runNetwork() + assertThat(receivedSessionMessages).hasSize(1) // Only the session-init is expected as the session-reject is blocked by the flow hospital + val medicalRecords = bobNode.smm.flowHospital.track().apply { updates.notUsed() }.snapshot + assertThat(medicalRecords).hasSize(1) + val sessionInitRecord = medicalRecords[0] as StaffedFlowHospital.MedicalRecord.SessionInit + assertThat(sessionInitRecord.initiatorFlowClassName).isEqualTo("not.a.real.Class") + bobNode.smm.flowHospital.dropSessionInit(sessionInitRecord.id) // Drop the message which is processed as an error back to sender + mockNet.runNetwork() + assertThat(receivedSessionMessages).hasSize(2) // Now the session-reject is expected + val lastMessage = receivedSessionMessages.last().message as ExistingSessionMessage + assertThat((lastMessage.payload as RejectSessionMessage).message).isEqualTo("Don't know not.a.real.Class") + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTripartyTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTripartyTests.kt new file mode 100644 index 0000000000..eb7be9531f --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTripartyTests.kt @@ -0,0 +1,178 @@ +package net.corda.node.services.statemachine + +import net.corda.core.flows.UnexpectedFlowEndException +import net.corda.core.flows.registerCordappFlowFactory +import net.corda.core.identity.Party +import net.corda.core.internal.concurrent.map +import net.corda.core.utilities.getOrThrow +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.BOB_NAME +import net.corda.testing.core.CHARLIE_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.internal.LogHelper +import net.corda.testing.node.InMemoryMessagingNetwork +import net.corda.testing.node.internal.* +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.AssertionsForClassTypes +import org.junit.After +import org.junit.Before +import org.junit.Test +import rx.Observable +import java.util.* + +class FlowFrameworkTripartyTests { + companion object { + init { + LogHelper.setLevel("+net.corda.flow") + } + + private lateinit var mockNet: InternalMockNetwork + private lateinit var aliceNode: TestStartedNode + private lateinit var bobNode: TestStartedNode + private lateinit var charlieNode: TestStartedNode + private lateinit var alice: Party + private lateinit var bob: Party + private lateinit var charlie: Party + private lateinit var notaryIdentity: Party + private val receivedSessionMessages = ArrayList() + } + + @Before + fun setUpGlobalMockNet() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = cordappsForPackages("net.corda.finance.contracts", "net.corda.testing.contracts"), + servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin() + ) + + aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME)) + bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME)) + charlieNode = mockNet.createNode(InternalMockNodeParameters(legalName = CHARLIE_NAME)) + + + // Extract identities + alice = aliceNode.info.singleIdentity() + bob = bobNode.info.singleIdentity() + charlie = charlieNode.info.singleIdentity() + notaryIdentity = mockNet.defaultNotaryIdentity + + receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } + } + + @After + fun cleanUp() { + mockNet.stopNodes() + receivedSessionMessages.clear() + } + + private fun receivedSessionMessagesObservable(): Observable { + return mockNet.messagingNetwork.receivedMessages.toSessionTransfers() + } + + @Test + fun `sending to multiple parties`() { + bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) + .nonTerminating() } + charlieNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) + .nonTerminating() } + val payload = "Hello World" + aliceNode.services.startFlow(SendFlow(payload, bob, charlie)) + mockNet.runNetwork() + bobNode.internals.acceptableLiveFiberCountOnStop = 1 + charlieNode.internals.acceptableLiveFiberCountOnStop = 1 + val bobFlow = bobNode.getSingleFlow().first + val charlieFlow = charlieNode.getSingleFlow().first + assertThat(bobFlow.receivedPayloads[0]).isEqualTo(payload) + assertThat(charlieFlow.receivedPayloads[0]).isEqualTo(payload) + + assertSessionTransfers(bobNode, + aliceNode sent sessionInit(SendFlow::class, payload = payload) to bobNode, + bobNode sent sessionConfirm() to aliceNode, + aliceNode sent normalEnd to bobNode + //There's no session end from the other flows as they're manually suspended + ) + + assertSessionTransfers(charlieNode, + aliceNode sent sessionInit(SendFlow::class, payload = payload) to charlieNode, + charlieNode sent sessionConfirm() to aliceNode, + aliceNode sent normalEnd to charlieNode + //There's no session end from the other flows as they're manually suspended + ) + } + + @Test + fun `receiving from multiple parties`() { + val bobPayload = "Test 1" + val charliePayload = "Test 2" + bobNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(bobPayload, it) } + charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow(charliePayload, it) } + val multiReceiveFlow = ReceiveFlow(bob, charlie).nonTerminating() + aliceNode.services.startFlow(multiReceiveFlow) + aliceNode.internals.acceptableLiveFiberCountOnStop = 1 + mockNet.runNetwork() + assertThat(multiReceiveFlow.receivedPayloads[0]).isEqualTo(bobPayload) + assertThat(multiReceiveFlow.receivedPayloads[1]).isEqualTo(charliePayload) + + assertSessionTransfers(bobNode, + aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, + bobNode sent sessionConfirm() to aliceNode, + bobNode sent sessionData(bobPayload) to aliceNode, + bobNode sent normalEnd to aliceNode + ) + + assertSessionTransfers(charlieNode, + aliceNode sent sessionInit(ReceiveFlow::class) to charlieNode, + charlieNode sent sessionConfirm() to aliceNode, + charlieNode sent sessionData(charliePayload) to aliceNode, + charlieNode sent normalEnd to aliceNode + ) + } + + @Test + fun `FlowException only propagated to parent`() { + charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Chain") } } + bobNode.registerCordappFlowFactory(ReceiveFlow::class) { ReceiveFlow(charlie) } + val receivingFiber = aliceNode.services.startFlow(ReceiveFlow(bob)) + mockNet.runNetwork() + AssertionsForClassTypes.assertThatExceptionOfType(UnexpectedFlowEndException::class.java) + .isThrownBy { receivingFiber.resultFuture.getOrThrow() } + } + + @Test + fun `FlowException thrown and there is a 3rd unrelated party flow`() { + // Bob will send its payload and then block waiting for the receive from Alice. Meanwhile Alice will move + // onto Charlie which will throw the exception + val node2Fiber = bobNode + .registerCordappFlowFactory(ReceiveFlow::class) { SendAndReceiveFlow(it, "Hello") } + .map { it.stateMachine } + charlieNode.registerCordappFlowFactory(ReceiveFlow::class) { ExceptionFlow { MyFlowException("Nothing useful") } } + + val aliceFiber = aliceNode.services.startFlow(ReceiveFlow(bob, charlie)) as FlowStateMachineImpl + mockNet.runNetwork() + + // Alice will terminate with the error it received from Charlie but it won't propagate that to Bob (as it's + // not relevant to it) but it will end its session with it + AssertionsForClassTypes.assertThatExceptionOfType(MyFlowException::class.java) + .isThrownBy { + aliceFiber.resultFuture.getOrThrow() + } + val bobResultFuture = node2Fiber.getOrThrow().resultFuture + AssertionsForClassTypes.assertThatExceptionOfType(UnexpectedFlowEndException::class.java).isThrownBy { + bobResultFuture.getOrThrow() + } + + assertSessionTransfers(bobNode, + aliceNode sent sessionInit(ReceiveFlow::class) to bobNode, + bobNode sent sessionConfirm() to aliceNode, + bobNode sent sessionData("Hello") to aliceNode, + aliceNode sent errorMessage() to bobNode + ) + } + + private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) + + private fun assertSessionTransfers(node: TestStartedNode, vararg expected: SessionTransfer): List { + val actualForNode = receivedSessionMessages.filter { it.from == node.internals.id || it.to == node.network.myAddress } + assertThat(actualForNode).containsExactly(*expected) + return actualForNode + } +}