diff --git a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt index 8f97963deb..6a45d1b0b6 100644 --- a/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt +++ b/core/src/main/kotlin/net/corda/core/flows/FlowLogic.kt @@ -25,6 +25,7 @@ import net.corda.core.node.NodeInfo import net.corda.core.node.ServiceHub import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.serialize import net.corda.core.transactions.SignedTransaction import net.corda.core.utilities.ProgressTracker @@ -317,6 +318,53 @@ abstract class FlowLogic { return castMapValuesToKnownType(receiveAllMap(associateSessionsToReceiveType(receiveType, sessions))) } + /** + * Queues the given [payload] for sending to the provided [sessions] and continues without suspending. + * + * Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided [sessions] + * is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the + * network's event horizon time. + * + * @param payload the payload to send. + * @param sessions the sessions to send the provided payload to. + * @param maySkipCheckpoint whether checkpointing should be skipped. + */ + @Suspendable + @JvmOverloads + fun sendAll(payload: Any, sessions: Set, maySkipCheckpoint: Boolean = false) { + val sessionToPayload = sessions.map { it to payload }.toMap() + return sendAll(sessionToPayload, maySkipCheckpoint) + } + + /** + * Queues the given payloads for sending to the provided sessions and continues without suspending. + * + * Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided [sessions] + * is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the + * network's event horizon time. + * + * @param payloadsPerSession a mapping that contains the payload to be sent to each session. + * @param maySkipCheckpoint whether checkpointing should be skipped. + */ + @Suspendable + @JvmOverloads + fun sendAll(payloadsPerSession: Map, maySkipCheckpoint: Boolean = false) { + val request = FlowIORequest.Send( + sessionToMessage = serializePayloads(payloadsPerSession) + ) + stateMachine.suspend(request, maySkipCheckpoint) + } + + @Suspendable + private fun serializePayloads(payloadsPerSession: Map): Map> { + val cachedSerializedPayloads = mutableMapOf>() + + return payloadsPerSession.mapValues { (_, payload) -> + cachedSerializedPayloads[payload] ?: payload.serialize(context = SerializationDefaults.P2P_CONTEXT).also { cachedSerializedPayloads[payload] = it } + } + } + + /** * Invokes the given subflow. This function returns once the subflow completes successfully with the result * returned by that subflow's [call] method. If the subflow has a progress tracker, it is attached to the diff --git a/docs/source/api-flows.rst b/docs/source/api-flows.rst index 27de7b5c28..257690b290 100644 --- a/docs/source/api-flows.rst +++ b/docs/source/api-flows.rst @@ -264,14 +264,18 @@ In order to create a communication session between your initiator flow and the r * ``sendAndReceive(receiveType: Class, payload: Any): R`` * Sends the ``payload`` object and receives an object of type ``receiveType`` back -In addition ``FlowLogic`` provides functions that batch receives: +In addition ``FlowLogic`` provides functions that can receive messages from multiple sessions and send messages to multiple sessions: * ``receiveAllMap(sessions: Map>): Map>`` - Receives from all ``FlowSession`` objects specified in the passed in map. The received types may differ. + * Receives from all ``FlowSession`` objects specified in the passed in map. The received types may differ. * ``receiveAll(receiveType: Class, sessions: List): List>`` - Receives from all ``FlowSession`` objects specified in the passed in list. The received types must be the same. + * Receives from all ``FlowSession`` objects specified in the passed in list. The received types must be the same. +* ``sendAll(payload: Any, sessions: Set)`` + * Sends the ``payload`` object to all the provided ``FlowSession``\s. +* ``sendAll(payloadsPerSession: Map)`` + * Sends a potentially different payload to each ``FlowSession``, as specified by the provided ``payloadsPerSession``. -The batched functions are implemented more efficiently by the flow framework. +.. note:: It's more efficient to call ``sendAndReceive`` instead of calling ``send`` and then ``receive``. It's also more efficient to call ``sendAll``/``receiveAll`` instead of multiple ``send``/``receive`` respectively. InitiateFlow ~~~~~~~~~~~~ diff --git a/docs/source/api-persistence.rst b/docs/source/api-persistence.rst index e9442dff03..ab2a745671 100644 --- a/docs/source/api-persistence.rst +++ b/docs/source/api-persistence.rst @@ -460,6 +460,7 @@ Please note that suspendable flow operations such as: * ``FlowSession.send`` * ``FlowSession.receive`` * ``FlowLogic.receiveAll`` +* ``FlowLogic.sendAll`` * ``FlowLogic.sleep`` * ``FlowLogic.subFlow`` diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt index 2f46010730..96a75c6e59 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/MessagingExecutor.kt @@ -53,6 +53,11 @@ class MessagingExecutor( producer.send(SimpleString(mqAddress), artemisMessage) } + @Synchronized + fun send(messages: Map) { + messages.forEach { recipients, message -> send(message, recipients) } + } + @Synchronized fun acknowledge(message: ClientMessage) { log.debug { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt index 925fdb27e1..14f8229fec 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/Action.kt @@ -37,6 +37,17 @@ sealed class Action { val deduplicationId: SenderDeduplicationId ) : Action() + /** + * Send session messages to multiple destinations. + * + * @property sendInitial session messages to send in order to establish a session. + * @property sendExisting session messages to send to existing sessions. + */ + data class SendMultiple( + val sendInitial: List, + val sendExisting: List + ): Action() + /** * Persist the specified [checkpoint]. */ diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 3ffdd4b709..9af0caab1f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -68,6 +68,7 @@ class ActionExecutorImpl( is Action.RemoveCheckpoint -> executeRemoveCheckpoint(action) is Action.SendInitial -> executeSendInitial(action) is Action.SendExisting -> executeSendExisting(action) + is Action.SendMultiple -> executeSendMultiple(action) is Action.AddSessionBinding -> executeAddSessionBinding(action) is Action.RemoveSessionBindings -> executeRemoveSessionBindings(action) is Action.SignalFlowHasStarted -> executeSignalFlowHasStarted(action) @@ -191,6 +192,13 @@ class ActionExecutorImpl( flowMessaging.sendSessionMessage(action.peerParty, action.message, action.deduplicationId) } + @Suspendable + private fun executeSendMultiple(action: Action.SendMultiple) { + val messages = action.sendInitial.map { Message(it.destination, it.initialise, it.deduplicationId) } + + action.sendExisting.map { Message(it.peerParty, it.message, it.deduplicationId) } + flowMessaging.sendSessionMessages(messages) + } + @Suspendable private fun executeAddSessionBinding(action: Action.AddSessionBinding) { stateMachineManager.addSessionBinding(action.flowId, action.sessionId) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt index 8963c7616f..da371d6d25 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt @@ -13,6 +13,7 @@ import net.corda.core.utilities.contextLogger import net.corda.core.utilities.trace import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.messaging.DeduplicationHandler +import net.corda.node.services.messaging.MessagingService import net.corda.node.services.messaging.ReceivedMessage import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders import java.io.NotSerializableException @@ -27,12 +28,17 @@ interface FlowMessaging { @Suspendable fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId) + @Suspendable + fun sendSessionMessages(messageData: List) + /** * Start the messaging using the [onMessage] message handler. */ fun start(onMessage: (ReceivedMessage, deduplicationHandler: DeduplicationHandler) -> Unit) } +data class Message(val destination: Destination, val sessionMessage: SessionMessage, val dedupId: SenderDeduplicationId) + /** * Implementation of [FlowMessaging] using a [ServiceHubInternal] to do the messaging and routing. */ @@ -51,6 +57,17 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { @Suspendable override fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId) { + val addressedMessage = createMessage(destination, message, deduplicationId) + serviceHub.networkService.send(addressedMessage.message, addressedMessage.target, addressedMessage.sequenceKey) + } + + @Suspendable + override fun sendSessionMessages(messageData: List) { + val addressedMessages = messageData.map { createMessage(it.destination, it.sessionMessage, it.dedupId) } + serviceHub.networkService.send(addressedMessages) + } + + private fun createMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId): MessagingService.AddressedMessage { val party = if (destination is Party) { log.trace { "Sending message $deduplicationId $message to $destination" } destination @@ -69,7 +86,7 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { is InitialSessionMessage -> message.initiatorSessionId is ExistingSessionMessage -> message.recipientSessionId } - serviceHub.networkService.send(networkMessage, address, sequenceKey = sequenceKey) + return MessagingService.AddressedMessage(networkMessage, address, sequenceKey) } private fun SessionMessage.additionalHeaders(target: Party): Map { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt index 3269a87a2f..6f9956b692 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StartedFlowTransition.kt @@ -7,6 +7,7 @@ import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.SerializedBytes import net.corda.core.utilities.toNonEmptySet import net.corda.node.services.statemachine.* +import java.lang.IllegalStateException /** * This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request @@ -246,46 +247,49 @@ class StartedFlowTransition( val checkpoint = startingState.checkpoint val newSessions = LinkedHashMap(checkpoint.sessions) var index = 0 - for ((sourceSessionId, message) in sourceSessionIdToMessage) { - val existingSessionState = checkpoint.sessions[sourceSessionId] - if (existingSessionState == null) { - return freshErrorTransition(CannotFindSessionException(sourceSessionId)) - } else { - val sessionMessage = DataSessionMessage(message) - val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, existingSessionState) - when (existingSessionState) { - is SessionState.Uninitiated -> { - val initialMessage = createInitialSessionMessage(existingSessionState.initiatingSubFlow, sourceSessionId, existingSessionState.additionalEntropy, message) - actions.add(Action.SendInitial(existingSessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))) - newSessions[sourceSessionId] = SessionState.Initiating( - bufferedMessages = emptyList(), - rejectionError = null, - deduplicationSeed = existingSessionState.deduplicationSeed - ) - Unit - } - is SessionState.Initiating -> { - // We're initiating this session, buffer the message - val newBufferedMessages = existingSessionState.bufferedMessages + Pair(deduplicationId, sessionMessage) - newSessions[sourceSessionId] = existingSessionState.copy(bufferedMessages = newBufferedMessages) - } - is SessionState.Initiated -> { - when (existingSessionState.initiatedState) { - is InitiatedSessionState.Live -> { - val sinkSessionId = existingSessionState.initiatedState.peerSinkSessionId - val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage) - actions.add(Action.SendExisting(existingSessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))) - Unit - } - InitiatedSessionState.Ended -> { - return freshErrorTransition(IllegalStateException("Tried to send to ended session $sourceSessionId")) - } - } - } - } + for ((sourceSessionId, _) in sourceSessionIdToMessage) { + val existingSessionState = checkpoint.sessions[sourceSessionId] ?: return freshErrorTransition(CannotFindSessionException(sourceSessionId)) + if (existingSessionState is SessionState.Initiated && existingSessionState.initiatedState is InitiatedSessionState.Ended) { + return freshErrorTransition(IllegalStateException("Tried to send to ended session $sourceSessionId")) } - } + + val messagesByType = sourceSessionIdToMessage.toList() + .map { (sourceSessionId, message) -> Triple(sourceSessionId, checkpoint.sessions[sourceSessionId]!!, message) } + .groupBy { it.second::class } + + val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.map { (sourceSessionId, sessionState, message) -> + val uninitiatedSessionState = sessionState as SessionState.Uninitiated + val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, sessionState) + val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, uninitiatedSessionState.additionalEntropy, message) + newSessions[sourceSessionId] = SessionState.Initiating( + bufferedMessages = emptyList(), + rejectionError = null, + deduplicationSeed = uninitiatedSessionState.deduplicationSeed + ) + Action.SendInitial(uninitiatedSessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)) + } ?: emptyList() + messagesByType[SessionState.Initiating::class]?.forEach { (sourceSessionId, sessionState, message) -> + val initiatingSessionState = sessionState as SessionState.Initiating + val sessionMessage = DataSessionMessage(message) + val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, initiatingSessionState) + val newBufferedMessages = initiatingSessionState.bufferedMessages + Pair(deduplicationId, sessionMessage) + newSessions[sourceSessionId] = initiatingSessionState.copy(bufferedMessages = newBufferedMessages) + } + val sendExistingActions = messagesByType[SessionState.Initiated::class]?.mapNotNull {(_, sessionState, message) -> + val initiatedSessionState = sessionState as SessionState.Initiated + if (initiatedSessionState.initiatedState !is InitiatedSessionState.Live) + null + else { + val sessionMessage = DataSessionMessage(message) + val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, initiatedSessionState) + val sinkSessionId = initiatedSessionState.initiatedState.peerSinkSessionId + val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage) + Action.SendExisting(initiatedSessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)) + } + } ?: emptyList() + + actions.add(Action.SendMultiple(sendInitialActions, sendExistingActions)) currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions)) } diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt index a4f77c5390..ce92f2954b 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkTests.kt @@ -536,6 +536,16 @@ class FlowFrameworkTests { assertThat(result.getOrThrow()).isEqualTo("HelloHello") } + @Test(timeout=300_000) + fun `initiating flow with anonymous party at the same node`() { + val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false) + val bobResponderFlow = bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) } + val result = bobNode.services.startFlow(SendAndReceiveFlow(anonymousBob.party.anonymise(), "Hello")).resultFuture + mockNet.runNetwork() + bobResponderFlow.getOrThrow() + assertThat(result.getOrThrow()).isEqualTo("HelloHello") + } + //region Helpers private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0) @@ -888,4 +898,4 @@ internal class ExceptionFlow(val exception: () -> E) : FlowLogic< exceptionThrown = exception() throw exceptionThrown } -} +} \ No newline at end of file diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowParallelMessagingTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowParallelMessagingTests.kt new file mode 100644 index 0000000000..53f7d588ea --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowParallelMessagingTests.kt @@ -0,0 +1,261 @@ +package net.corda.node.services.statemachine + +import co.paralleluniverse.fibers.Suspendable +import net.corda.core.flows.Destination +import net.corda.core.flows.FlowException +import net.corda.core.flows.FlowLogic +import net.corda.core.flows.FlowSession +import net.corda.core.flows.InitiatedBy +import net.corda.core.flows.InitiatingFlow +import net.corda.core.flows.StartableByRPC +import net.corda.core.flows.UnexpectedFlowEndException +import net.corda.core.identity.Party +import net.corda.core.identity.PartyAndCertificate +import net.corda.core.serialization.CordaSerializable +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.unwrap +import net.corda.testing.core.ALICE_NAME +import net.corda.testing.core.singleIdentity +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.TestStartedNode +import net.corda.testing.node.internal.enclosedCordapp +import net.corda.testing.node.internal.startFlow +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.AfterClass +import org.junit.BeforeClass +import org.junit.Test +import kotlin.test.assertEquals + +class FlowParallelMessagingTests { + + companion object { + + private lateinit var mockNet: InternalMockNetwork + private lateinit var senderNode: TestStartedNode + private lateinit var recipientNode1: TestStartedNode + private lateinit var recipientNode2: TestStartedNode + private lateinit var notaryIdentity: Party + private lateinit var senderParty: Party + private lateinit var recipientParty1: Party + private lateinit var recipientParty2: Party + + @BeforeClass + @JvmStatic + fun setup() { + mockNet = InternalMockNetwork( + cordappsForAllNodes = listOf(enclosedCordapp()) + ) + + senderNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME.copy(organisation = "SenderNode"))) + recipientNode1 = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME.copy(organisation = "RecipientNode1"))) + recipientNode2 = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME.copy(organisation = "RecipientNode2"))) + + notaryIdentity = mockNet.defaultNotaryIdentity + senderParty = senderNode.info.singleIdentity() + recipientParty1 = recipientNode1.info.singleIdentity() + recipientParty2 = recipientNode2.info.singleIdentity() + } + + @AfterClass + @JvmStatic + fun cleanUp() { + mockNet.stopNodes() + } + } + + + @Test(timeout=300_000) + fun `messages can be exchanged in parallel using sendAll & receiveAll between multiple parties successfully`() { + val messages = mapOf( + recipientParty1 to MessageType.REPLY, + recipientParty2 to MessageType.REPLY + ) + val flow = senderNode.services.startFlow(SenderFlow(messages)) + + mockNet.runNetwork() + val result = flow.resultFuture.getOrThrow() + + assertEquals("ok", result) + } + + @Test(timeout=300_000) + fun `flow exceptions from counterparties during receiveAll are handled properly`() { + val messages = mapOf( + recipientParty1 to MessageType.REPLY, + recipientParty2 to MessageType.GRACEFUL_FAILURE + ) + val flow = senderNode.services.startFlow(SenderFlow(messages)) + + mockNet.runNetwork() + assertThatThrownBy{ flow.resultFuture.getOrThrow() } + .isInstanceOf(FlowException::class.java) + .hasMessage("graceful failure") + } + + @Test(timeout=300_000) + fun `runtime exceptions from counterparties during receiveAll are handled properly`() { + val messages = mapOf( + recipientParty1 to MessageType.REPLY, + recipientParty2 to MessageType.CRASH + ) + val flow = senderNode.services.startFlow(SenderFlow(messages)) + + mockNet.runNetwork() + assertThatThrownBy{ flow.resultFuture.getOrThrow() } + .isInstanceOf(UnexpectedFlowEndException::class.java) + } + + @Test(timeout=300_000) + fun `initial session messages and existing session messages can be sent together using sendAll`() { + val flow = senderNode.services.startFlow(StagedSenderFlow(listOf(recipientParty1, recipientParty2))) + + mockNet.runNetwork() + val result = flow.resultFuture.getOrThrow() + + assertEquals("ok", result) + } + + @Test(timeout=300_000) + fun `messages can be exchanged successfully even between anonymous parties`() { + val senderAnonymousParty = senderNode.createConfidentialIdentity(senderParty) + val firstRecipientAnonymousParty = recipientNode1.createConfidentialIdentity(recipientParty1) + senderNode.verifyAndRegister(firstRecipientAnonymousParty) + val secondRecipientAnonymousParty = recipientNode2.createConfidentialIdentity(recipientParty2) + senderNode.verifyAndRegister(secondRecipientAnonymousParty) + + val messages = mapOf( + senderAnonymousParty.party.anonymise() to MessageType.REPLY, + firstRecipientAnonymousParty.party.anonymise() to MessageType.REPLY, + secondRecipientAnonymousParty.party.anonymise() to MessageType.REPLY + ) + + val flow = senderNode.services.startFlow(SenderFlow(messages)) + + mockNet.runNetwork() + val result = flow.resultFuture.getOrThrow() + + assertEquals("ok", result) + } + + @Test(timeout=300_000) + fun `a flow cannot invoke receiveAll with duplicate sessions`() { + val flow = senderNode.services.startFlow(InvalidReceiveFlow(listOf(recipientParty1), String::class.java)) + + mockNet.runNetwork() + + assertThatThrownBy{ flow.resultFuture.getOrThrow() } + .isInstanceOf(java.lang.IllegalArgumentException::class.java) + .hasMessage("A flow session can only appear once as argument.") + } + + fun TestStartedNode.createConfidentialIdentity(party: Party) = + services.keyManagementService.freshKeyAndCert(services.myInfo.legalIdentitiesAndCerts.single { it.name == party.name }, false) + + fun TestStartedNode.verifyAndRegister(identity: PartyAndCertificate) = + services.identityService.verifyAndRegisterIdentity(identity) + + @StartableByRPC + @InitiatingFlow + class SenderFlow(private val parties: Map): FlowLogic() { + @Suspendable + override fun call(): String { + val messagesPerSession = parties.toList().map { (party, messageType) -> + val session = initiateFlow(party) + Pair(session, messageType) + }.toMap() + + sendAll(messagesPerSession) + val messages = receiveAll(String::class.java, messagesPerSession.keys.toList()) + + messages.map { it.unwrap { payload -> assertEquals("pong", payload) } } + + return "ok" + } + } + + @Suppress("TooGenericExceptionThrown") + @InitiatedBy(SenderFlow::class) + class RecipientFlow(private val otherPartySession: FlowSession): FlowLogic() { + @Suspendable + override fun call(): String { + val msg = otherPartySession.receive().unwrap { it } + when (msg) { + MessageType.REPLY -> otherPartySession.send("pong") + MessageType.GRACEFUL_FAILURE -> throw FlowException("graceful failure") + MessageType.CRASH -> throw RuntimeException("crash") + } + + return "ok" + } + } + + @StartableByRPC + @InitiatingFlow + class StagedSenderFlow(private val parties: List): FlowLogic() { + @Suspendable + override fun call(): String { + if (parties.size < 2) { + throw IllegalArgumentException("at least two parties required for staged execution") + } + + val sessions = parties.map { initiateFlow(it) }.toSet() + + sessions.first().send(StagedMessageType.INITIAL_RECIPIENT) + sessions.first().receive().unwrap{ payload -> assertEquals("pong", payload) } + + sendAll(StagedMessageType.REGULAR_RECIPIENT, sessions) + val messages = receiveAll(String::class.java, sessions.toList()) + + messages.map { it.unwrap { payload -> assertEquals("pong", payload) } } + + return "ok" + } + } + + @InitiatedBy(StagedSenderFlow::class) + class StagedRecipientFlow(private val otherPartySession: FlowSession): FlowLogic() { + @Suspendable + override fun call(): String { + val msg = otherPartySession.receive().unwrap { it } + when (msg) { + StagedMessageType.INITIAL_RECIPIENT -> { + otherPartySession.send("pong") + otherPartySession.receive().unwrap { payload -> assertEquals(StagedMessageType.REGULAR_RECIPIENT, payload) } + otherPartySession.send("pong") + } + StagedMessageType.REGULAR_RECIPIENT -> otherPartySession.send("pong") + } + + return "ok" + } + } + + @StartableByRPC + @InitiatingFlow + class InvalidReceiveFlow(private val parties: List, private val payloadType: Class): FlowLogic() { + @Suspendable + override fun call(): String { + val sessions = parties.flatMap { party -> + val session = initiateFlow(party) + listOf(session, session) + } + receiveAll(payloadType, sessions) + return "ok" + } + } + + @CordaSerializable + enum class MessageType { + REPLY, + GRACEFUL_FAILURE, + CRASH + } + + @CordaSerializable + enum class StagedMessageType { + INITIAL_RECIPIENT, + REGULAR_RECIPIENT + } + +} \ No newline at end of file