mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
[CORDA-3628] - Implement sendAll API (#5990)
* [CORDA-3628] - Implement sendAll API * detekt * Some minor refactorings and docs * Eliminate warnings * Address Rick's comments * Switch sendAll to use a set
This commit is contained in:
parent
8ea6564cb9
commit
dcf659e643
@ -25,6 +25,7 @@ import net.corda.core.node.NodeInfo
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SerializationDefaults
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.transactions.SignedTransaction
|
||||
import net.corda.core.utilities.ProgressTracker
|
||||
@ -317,6 +318,53 @@ abstract class FlowLogic<out T> {
|
||||
return castMapValuesToKnownType(receiveAllMap(associateSessionsToReceiveType(receiveType, sessions)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues the given [payload] for sending to the provided [sessions] and continues without suspending.
|
||||
*
|
||||
* Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided [sessions]
|
||||
* is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the
|
||||
* network's event horizon time.
|
||||
*
|
||||
* @param payload the payload to send.
|
||||
* @param sessions the sessions to send the provided payload to.
|
||||
* @param maySkipCheckpoint whether checkpointing should be skipped.
|
||||
*/
|
||||
@Suspendable
|
||||
@JvmOverloads
|
||||
fun sendAll(payload: Any, sessions: Set<FlowSession>, maySkipCheckpoint: Boolean = false) {
|
||||
val sessionToPayload = sessions.map { it to payload }.toMap()
|
||||
return sendAll(sessionToPayload, maySkipCheckpoint)
|
||||
}
|
||||
|
||||
/**
|
||||
* Queues the given payloads for sending to the provided sessions and continues without suspending.
|
||||
*
|
||||
* Note that the other parties may receive the message at some arbitrary later point or not at all: if one of the provided [sessions]
|
||||
* is offline then message delivery will be retried until the corresponding node comes back or until the message is older than the
|
||||
* network's event horizon time.
|
||||
*
|
||||
* @param payloadsPerSession a mapping that contains the payload to be sent to each session.
|
||||
* @param maySkipCheckpoint whether checkpointing should be skipped.
|
||||
*/
|
||||
@Suspendable
|
||||
@JvmOverloads
|
||||
fun sendAll(payloadsPerSession: Map<FlowSession, Any>, maySkipCheckpoint: Boolean = false) {
|
||||
val request = FlowIORequest.Send(
|
||||
sessionToMessage = serializePayloads(payloadsPerSession)
|
||||
)
|
||||
stateMachine.suspend(request, maySkipCheckpoint)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun serializePayloads(payloadsPerSession: Map<FlowSession, Any>): Map<FlowSession, SerializedBytes<Any>> {
|
||||
val cachedSerializedPayloads = mutableMapOf<Any, SerializedBytes<Any>>()
|
||||
|
||||
return payloadsPerSession.mapValues { (_, payload) ->
|
||||
cachedSerializedPayloads[payload] ?: payload.serialize(context = SerializationDefaults.P2P_CONTEXT).also { cachedSerializedPayloads[payload] = it }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Invokes the given subflow. This function returns once the subflow completes successfully with the result
|
||||
* returned by that subflow's [call] method. If the subflow has a progress tracker, it is attached to the
|
||||
|
@ -264,14 +264,18 @@ In order to create a communication session between your initiator flow and the r
|
||||
* ``sendAndReceive(receiveType: Class<R>, payload: Any): R``
|
||||
* Sends the ``payload`` object and receives an object of type ``receiveType`` back
|
||||
|
||||
In addition ``FlowLogic`` provides functions that batch receives:
|
||||
In addition ``FlowLogic`` provides functions that can receive messages from multiple sessions and send messages to multiple sessions:
|
||||
|
||||
* ``receiveAllMap(sessions: Map<FlowSession, Class<out Any>>): Map<FlowSession, UntrustworthyData<Any>>``
|
||||
Receives from all ``FlowSession`` objects specified in the passed in map. The received types may differ.
|
||||
* Receives from all ``FlowSession`` objects specified in the passed in map. The received types may differ.
|
||||
* ``receiveAll(receiveType: Class<R>, sessions: List<FlowSession>): List<UntrustworthyData<R>>``
|
||||
Receives from all ``FlowSession`` objects specified in the passed in list. The received types must be the same.
|
||||
* Receives from all ``FlowSession`` objects specified in the passed in list. The received types must be the same.
|
||||
* ``sendAll(payload: Any, sessions: Set<FlowSession>)``
|
||||
* Sends the ``payload`` object to all the provided ``FlowSession``\s.
|
||||
* ``sendAll(payloadsPerSession: Map<FlowSession, Any>)``
|
||||
* Sends a potentially different payload to each ``FlowSession``, as specified by the provided ``payloadsPerSession``.
|
||||
|
||||
The batched functions are implemented more efficiently by the flow framework.
|
||||
.. note:: It's more efficient to call ``sendAndReceive`` instead of calling ``send`` and then ``receive``. It's also more efficient to call ``sendAll``/``receiveAll`` instead of multiple ``send``/``receive`` respectively.
|
||||
|
||||
InitiateFlow
|
||||
~~~~~~~~~~~~
|
||||
|
@ -460,6 +460,7 @@ Please note that suspendable flow operations such as:
|
||||
* ``FlowSession.send``
|
||||
* ``FlowSession.receive``
|
||||
* ``FlowLogic.receiveAll``
|
||||
* ``FlowLogic.sendAll``
|
||||
* ``FlowLogic.sleep``
|
||||
* ``FlowLogic.subFlow``
|
||||
|
||||
|
@ -53,6 +53,11 @@ class MessagingExecutor(
|
||||
producer.send(SimpleString(mqAddress), artemisMessage)
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
fun send(messages: Map<MessageRecipients, Message>) {
|
||||
messages.forEach { recipients, message -> send(message, recipients) }
|
||||
}
|
||||
|
||||
@Synchronized
|
||||
fun acknowledge(message: ClientMessage) {
|
||||
log.debug {
|
||||
|
@ -37,6 +37,17 @@ sealed class Action {
|
||||
val deduplicationId: SenderDeduplicationId
|
||||
) : Action()
|
||||
|
||||
/**
|
||||
* Send session messages to multiple destinations.
|
||||
*
|
||||
* @property sendInitial session messages to send in order to establish a session.
|
||||
* @property sendExisting session messages to send to existing sessions.
|
||||
*/
|
||||
data class SendMultiple(
|
||||
val sendInitial: List<SendInitial>,
|
||||
val sendExisting: List<SendExisting>
|
||||
): Action()
|
||||
|
||||
/**
|
||||
* Persist the specified [checkpoint].
|
||||
*/
|
||||
|
@ -68,6 +68,7 @@ class ActionExecutorImpl(
|
||||
is Action.RemoveCheckpoint -> executeRemoveCheckpoint(action)
|
||||
is Action.SendInitial -> executeSendInitial(action)
|
||||
is Action.SendExisting -> executeSendExisting(action)
|
||||
is Action.SendMultiple -> executeSendMultiple(action)
|
||||
is Action.AddSessionBinding -> executeAddSessionBinding(action)
|
||||
is Action.RemoveSessionBindings -> executeRemoveSessionBindings(action)
|
||||
is Action.SignalFlowHasStarted -> executeSignalFlowHasStarted(action)
|
||||
@ -191,6 +192,13 @@ class ActionExecutorImpl(
|
||||
flowMessaging.sendSessionMessage(action.peerParty, action.message, action.deduplicationId)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun executeSendMultiple(action: Action.SendMultiple) {
|
||||
val messages = action.sendInitial.map { Message(it.destination, it.initialise, it.deduplicationId) } +
|
||||
action.sendExisting.map { Message(it.peerParty, it.message, it.deduplicationId) }
|
||||
flowMessaging.sendSessionMessages(messages)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun executeAddSessionBinding(action: Action.AddSessionBinding) {
|
||||
stateMachineManager.addSessionBinding(action.flowId, action.sessionId)
|
||||
|
@ -13,6 +13,7 @@ import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.trace
|
||||
import net.corda.node.services.api.ServiceHubInternal
|
||||
import net.corda.node.services.messaging.DeduplicationHandler
|
||||
import net.corda.node.services.messaging.MessagingService
|
||||
import net.corda.node.services.messaging.ReceivedMessage
|
||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||
import java.io.NotSerializableException
|
||||
@ -27,12 +28,17 @@ interface FlowMessaging {
|
||||
@Suspendable
|
||||
fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId)
|
||||
|
||||
@Suspendable
|
||||
fun sendSessionMessages(messageData: List<Message>)
|
||||
|
||||
/**
|
||||
* Start the messaging using the [onMessage] message handler.
|
||||
*/
|
||||
fun start(onMessage: (ReceivedMessage, deduplicationHandler: DeduplicationHandler) -> Unit)
|
||||
}
|
||||
|
||||
data class Message(val destination: Destination, val sessionMessage: SessionMessage, val dedupId: SenderDeduplicationId)
|
||||
|
||||
/**
|
||||
* Implementation of [FlowMessaging] using a [ServiceHubInternal] to do the messaging and routing.
|
||||
*/
|
||||
@ -51,6 +57,17 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
|
||||
|
||||
@Suspendable
|
||||
override fun sendSessionMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId) {
|
||||
val addressedMessage = createMessage(destination, message, deduplicationId)
|
||||
serviceHub.networkService.send(addressedMessage.message, addressedMessage.target, addressedMessage.sequenceKey)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun sendSessionMessages(messageData: List<Message>) {
|
||||
val addressedMessages = messageData.map { createMessage(it.destination, it.sessionMessage, it.dedupId) }
|
||||
serviceHub.networkService.send(addressedMessages)
|
||||
}
|
||||
|
||||
private fun createMessage(destination: Destination, message: SessionMessage, deduplicationId: SenderDeduplicationId): MessagingService.AddressedMessage {
|
||||
val party = if (destination is Party) {
|
||||
log.trace { "Sending message $deduplicationId $message to $destination" }
|
||||
destination
|
||||
@ -69,7 +86,7 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
|
||||
is InitialSessionMessage -> message.initiatorSessionId
|
||||
is ExistingSessionMessage -> message.recipientSessionId
|
||||
}
|
||||
serviceHub.networkService.send(networkMessage, address, sequenceKey = sequenceKey)
|
||||
return MessagingService.AddressedMessage(networkMessage, address, sequenceKey)
|
||||
}
|
||||
|
||||
private fun SessionMessage.additionalHeaders(target: Party): Map<String, String> {
|
||||
|
@ -7,6 +7,7 @@ import net.corda.core.internal.FlowIORequest
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.utilities.toNonEmptySet
|
||||
import net.corda.node.services.statemachine.*
|
||||
import java.lang.IllegalStateException
|
||||
|
||||
/**
|
||||
* This transition describes what should happen with a specific [FlowIORequest]. Note that at this time the request
|
||||
@ -246,46 +247,49 @@ class StartedFlowTransition(
|
||||
val checkpoint = startingState.checkpoint
|
||||
val newSessions = LinkedHashMap(checkpoint.sessions)
|
||||
var index = 0
|
||||
for ((sourceSessionId, message) in sourceSessionIdToMessage) {
|
||||
val existingSessionState = checkpoint.sessions[sourceSessionId]
|
||||
if (existingSessionState == null) {
|
||||
return freshErrorTransition(CannotFindSessionException(sourceSessionId))
|
||||
} else {
|
||||
val sessionMessage = DataSessionMessage(message)
|
||||
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, existingSessionState)
|
||||
when (existingSessionState) {
|
||||
is SessionState.Uninitiated -> {
|
||||
val initialMessage = createInitialSessionMessage(existingSessionState.initiatingSubFlow, sourceSessionId, existingSessionState.additionalEntropy, message)
|
||||
actions.add(Action.SendInitial(existingSessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
|
||||
newSessions[sourceSessionId] = SessionState.Initiating(
|
||||
bufferedMessages = emptyList(),
|
||||
rejectionError = null,
|
||||
deduplicationSeed = existingSessionState.deduplicationSeed
|
||||
)
|
||||
Unit
|
||||
}
|
||||
is SessionState.Initiating -> {
|
||||
// We're initiating this session, buffer the message
|
||||
val newBufferedMessages = existingSessionState.bufferedMessages + Pair(deduplicationId, sessionMessage)
|
||||
newSessions[sourceSessionId] = existingSessionState.copy(bufferedMessages = newBufferedMessages)
|
||||
}
|
||||
is SessionState.Initiated -> {
|
||||
when (existingSessionState.initiatedState) {
|
||||
is InitiatedSessionState.Live -> {
|
||||
val sinkSessionId = existingSessionState.initiatedState.peerSinkSessionId
|
||||
val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage)
|
||||
actions.add(Action.SendExisting(existingSessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID)))
|
||||
Unit
|
||||
}
|
||||
InitiatedSessionState.Ended -> {
|
||||
return freshErrorTransition(IllegalStateException("Tried to send to ended session $sourceSessionId"))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for ((sourceSessionId, _) in sourceSessionIdToMessage) {
|
||||
val existingSessionState = checkpoint.sessions[sourceSessionId] ?: return freshErrorTransition(CannotFindSessionException(sourceSessionId))
|
||||
if (existingSessionState is SessionState.Initiated && existingSessionState.initiatedState is InitiatedSessionState.Ended) {
|
||||
return freshErrorTransition(IllegalStateException("Tried to send to ended session $sourceSessionId"))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
val messagesByType = sourceSessionIdToMessage.toList()
|
||||
.map { (sourceSessionId, message) -> Triple(sourceSessionId, checkpoint.sessions[sourceSessionId]!!, message) }
|
||||
.groupBy { it.second::class }
|
||||
|
||||
val sendInitialActions = messagesByType[SessionState.Uninitiated::class]?.map { (sourceSessionId, sessionState, message) ->
|
||||
val uninitiatedSessionState = sessionState as SessionState.Uninitiated
|
||||
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, sessionState)
|
||||
val initialMessage = createInitialSessionMessage(uninitiatedSessionState.initiatingSubFlow, sourceSessionId, uninitiatedSessionState.additionalEntropy, message)
|
||||
newSessions[sourceSessionId] = SessionState.Initiating(
|
||||
bufferedMessages = emptyList(),
|
||||
rejectionError = null,
|
||||
deduplicationSeed = uninitiatedSessionState.deduplicationSeed
|
||||
)
|
||||
Action.SendInitial(uninitiatedSessionState.destination, initialMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))
|
||||
} ?: emptyList()
|
||||
messagesByType[SessionState.Initiating::class]?.forEach { (sourceSessionId, sessionState, message) ->
|
||||
val initiatingSessionState = sessionState as SessionState.Initiating
|
||||
val sessionMessage = DataSessionMessage(message)
|
||||
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, initiatingSessionState)
|
||||
val newBufferedMessages = initiatingSessionState.bufferedMessages + Pair(deduplicationId, sessionMessage)
|
||||
newSessions[sourceSessionId] = initiatingSessionState.copy(bufferedMessages = newBufferedMessages)
|
||||
}
|
||||
val sendExistingActions = messagesByType[SessionState.Initiated::class]?.mapNotNull {(_, sessionState, message) ->
|
||||
val initiatedSessionState = sessionState as SessionState.Initiated
|
||||
if (initiatedSessionState.initiatedState !is InitiatedSessionState.Live)
|
||||
null
|
||||
else {
|
||||
val sessionMessage = DataSessionMessage(message)
|
||||
val deduplicationId = DeduplicationId.createForNormal(checkpoint, index++, initiatedSessionState)
|
||||
val sinkSessionId = initiatedSessionState.initiatedState.peerSinkSessionId
|
||||
val existingMessage = ExistingSessionMessage(sinkSessionId, sessionMessage)
|
||||
Action.SendExisting(initiatedSessionState.peerParty, existingMessage, SenderDeduplicationId(deduplicationId, startingState.senderUUID))
|
||||
}
|
||||
} ?: emptyList()
|
||||
|
||||
actions.add(Action.SendMultiple(sendInitialActions, sendExistingActions))
|
||||
currentState = currentState.copy(checkpoint = checkpoint.copy(sessions = newSessions))
|
||||
}
|
||||
|
||||
|
@ -536,6 +536,16 @@ class FlowFrameworkTests {
|
||||
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `initiating flow with anonymous party at the same node`() {
|
||||
val anonymousBob = bobNode.services.keyManagementService.freshKeyAndCert(bobNode.info.legalIdentitiesAndCerts.single(), false)
|
||||
val bobResponderFlow = bobNode.registerCordappFlowFactory(SendAndReceiveFlow::class) { SingleInlinedSubFlow(it) }
|
||||
val result = bobNode.services.startFlow(SendAndReceiveFlow(anonymousBob.party.anonymise(), "Hello")).resultFuture
|
||||
mockNet.runNetwork()
|
||||
bobResponderFlow.getOrThrow()
|
||||
assertThat(result.getOrThrow()).isEqualTo("HelloHello")
|
||||
}
|
||||
|
||||
//region Helpers
|
||||
|
||||
private val normalEnd = ExistingSessionMessage(SessionId(0), EndSessionMessage) // NormalSessionEnd(0)
|
||||
@ -888,4 +898,4 @@ internal class ExceptionFlow<E : Exception>(val exception: () -> E) : FlowLogic<
|
||||
exceptionThrown = exception()
|
||||
throw exceptionThrown
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,261 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.flows.Destination
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.flows.FlowLogic
|
||||
import net.corda.core.flows.FlowSession
|
||||
import net.corda.core.flows.InitiatedBy
|
||||
import net.corda.core.flows.InitiatingFlow
|
||||
import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.flows.UnexpectedFlowEndException
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.identity.PartyAndCertificate
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.unwrap
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.node.internal.InternalMockNetwork
|
||||
import net.corda.testing.node.internal.InternalMockNodeParameters
|
||||
import net.corda.testing.node.internal.TestStartedNode
|
||||
import net.corda.testing.node.internal.enclosedCordapp
|
||||
import net.corda.testing.node.internal.startFlow
|
||||
import org.assertj.core.api.Assertions.assertThatThrownBy
|
||||
import org.junit.AfterClass
|
||||
import org.junit.BeforeClass
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class FlowParallelMessagingTests {
|
||||
|
||||
companion object {
|
||||
|
||||
private lateinit var mockNet: InternalMockNetwork
|
||||
private lateinit var senderNode: TestStartedNode
|
||||
private lateinit var recipientNode1: TestStartedNode
|
||||
private lateinit var recipientNode2: TestStartedNode
|
||||
private lateinit var notaryIdentity: Party
|
||||
private lateinit var senderParty: Party
|
||||
private lateinit var recipientParty1: Party
|
||||
private lateinit var recipientParty2: Party
|
||||
|
||||
@BeforeClass
|
||||
@JvmStatic
|
||||
fun setup() {
|
||||
mockNet = InternalMockNetwork(
|
||||
cordappsForAllNodes = listOf(enclosedCordapp())
|
||||
)
|
||||
|
||||
senderNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME.copy(organisation = "SenderNode")))
|
||||
recipientNode1 = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME.copy(organisation = "RecipientNode1")))
|
||||
recipientNode2 = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME.copy(organisation = "RecipientNode2")))
|
||||
|
||||
notaryIdentity = mockNet.defaultNotaryIdentity
|
||||
senderParty = senderNode.info.singleIdentity()
|
||||
recipientParty1 = recipientNode1.info.singleIdentity()
|
||||
recipientParty2 = recipientNode2.info.singleIdentity()
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
@JvmStatic
|
||||
fun cleanUp() {
|
||||
mockNet.stopNodes()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `messages can be exchanged in parallel using sendAll & receiveAll between multiple parties successfully`() {
|
||||
val messages = mapOf(
|
||||
recipientParty1 to MessageType.REPLY,
|
||||
recipientParty2 to MessageType.REPLY
|
||||
)
|
||||
val flow = senderNode.services.startFlow(SenderFlow(messages))
|
||||
|
||||
mockNet.runNetwork()
|
||||
val result = flow.resultFuture.getOrThrow()
|
||||
|
||||
assertEquals("ok", result)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `flow exceptions from counterparties during receiveAll are handled properly`() {
|
||||
val messages = mapOf(
|
||||
recipientParty1 to MessageType.REPLY,
|
||||
recipientParty2 to MessageType.GRACEFUL_FAILURE
|
||||
)
|
||||
val flow = senderNode.services.startFlow(SenderFlow(messages))
|
||||
|
||||
mockNet.runNetwork()
|
||||
assertThatThrownBy{ flow.resultFuture.getOrThrow() }
|
||||
.isInstanceOf(FlowException::class.java)
|
||||
.hasMessage("graceful failure")
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `runtime exceptions from counterparties during receiveAll are handled properly`() {
|
||||
val messages = mapOf(
|
||||
recipientParty1 to MessageType.REPLY,
|
||||
recipientParty2 to MessageType.CRASH
|
||||
)
|
||||
val flow = senderNode.services.startFlow(SenderFlow(messages))
|
||||
|
||||
mockNet.runNetwork()
|
||||
assertThatThrownBy{ flow.resultFuture.getOrThrow() }
|
||||
.isInstanceOf(UnexpectedFlowEndException::class.java)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `initial session messages and existing session messages can be sent together using sendAll`() {
|
||||
val flow = senderNode.services.startFlow(StagedSenderFlow(listOf(recipientParty1, recipientParty2)))
|
||||
|
||||
mockNet.runNetwork()
|
||||
val result = flow.resultFuture.getOrThrow()
|
||||
|
||||
assertEquals("ok", result)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `messages can be exchanged successfully even between anonymous parties`() {
|
||||
val senderAnonymousParty = senderNode.createConfidentialIdentity(senderParty)
|
||||
val firstRecipientAnonymousParty = recipientNode1.createConfidentialIdentity(recipientParty1)
|
||||
senderNode.verifyAndRegister(firstRecipientAnonymousParty)
|
||||
val secondRecipientAnonymousParty = recipientNode2.createConfidentialIdentity(recipientParty2)
|
||||
senderNode.verifyAndRegister(secondRecipientAnonymousParty)
|
||||
|
||||
val messages = mapOf(
|
||||
senderAnonymousParty.party.anonymise() to MessageType.REPLY,
|
||||
firstRecipientAnonymousParty.party.anonymise() to MessageType.REPLY,
|
||||
secondRecipientAnonymousParty.party.anonymise() to MessageType.REPLY
|
||||
)
|
||||
|
||||
val flow = senderNode.services.startFlow(SenderFlow(messages))
|
||||
|
||||
mockNet.runNetwork()
|
||||
val result = flow.resultFuture.getOrThrow()
|
||||
|
||||
assertEquals("ok", result)
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `a flow cannot invoke receiveAll with duplicate sessions`() {
|
||||
val flow = senderNode.services.startFlow(InvalidReceiveFlow(listOf(recipientParty1), String::class.java))
|
||||
|
||||
mockNet.runNetwork()
|
||||
|
||||
assertThatThrownBy{ flow.resultFuture.getOrThrow() }
|
||||
.isInstanceOf(java.lang.IllegalArgumentException::class.java)
|
||||
.hasMessage("A flow session can only appear once as argument.")
|
||||
}
|
||||
|
||||
fun TestStartedNode.createConfidentialIdentity(party: Party) =
|
||||
services.keyManagementService.freshKeyAndCert(services.myInfo.legalIdentitiesAndCerts.single { it.name == party.name }, false)
|
||||
|
||||
fun TestStartedNode.verifyAndRegister(identity: PartyAndCertificate) =
|
||||
services.identityService.verifyAndRegisterIdentity(identity)
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class SenderFlow(private val parties: Map<out Destination, MessageType>): FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
val messagesPerSession = parties.toList().map { (party, messageType) ->
|
||||
val session = initiateFlow(party)
|
||||
Pair(session, messageType)
|
||||
}.toMap()
|
||||
|
||||
sendAll(messagesPerSession)
|
||||
val messages = receiveAll(String::class.java, messagesPerSession.keys.toList())
|
||||
|
||||
messages.map { it.unwrap { payload -> assertEquals("pong", payload) } }
|
||||
|
||||
return "ok"
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("TooGenericExceptionThrown")
|
||||
@InitiatedBy(SenderFlow::class)
|
||||
class RecipientFlow(private val otherPartySession: FlowSession): FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
val msg = otherPartySession.receive<MessageType>().unwrap { it }
|
||||
when (msg) {
|
||||
MessageType.REPLY -> otherPartySession.send("pong")
|
||||
MessageType.GRACEFUL_FAILURE -> throw FlowException("graceful failure")
|
||||
MessageType.CRASH -> throw RuntimeException("crash")
|
||||
}
|
||||
|
||||
return "ok"
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class StagedSenderFlow(private val parties: List<Destination>): FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
if (parties.size < 2) {
|
||||
throw IllegalArgumentException("at least two parties required for staged execution")
|
||||
}
|
||||
|
||||
val sessions = parties.map { initiateFlow(it) }.toSet()
|
||||
|
||||
sessions.first().send(StagedMessageType.INITIAL_RECIPIENT)
|
||||
sessions.first().receive<String>().unwrap{ payload -> assertEquals("pong", payload) }
|
||||
|
||||
sendAll(StagedMessageType.REGULAR_RECIPIENT, sessions)
|
||||
val messages = receiveAll(String::class.java, sessions.toList())
|
||||
|
||||
messages.map { it.unwrap { payload -> assertEquals("pong", payload) } }
|
||||
|
||||
return "ok"
|
||||
}
|
||||
}
|
||||
|
||||
@InitiatedBy(StagedSenderFlow::class)
|
||||
class StagedRecipientFlow(private val otherPartySession: FlowSession): FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
val msg = otherPartySession.receive<StagedMessageType>().unwrap { it }
|
||||
when (msg) {
|
||||
StagedMessageType.INITIAL_RECIPIENT -> {
|
||||
otherPartySession.send("pong")
|
||||
otherPartySession.receive<StagedMessageType>().unwrap { payload -> assertEquals(StagedMessageType.REGULAR_RECIPIENT, payload) }
|
||||
otherPartySession.send("pong")
|
||||
}
|
||||
StagedMessageType.REGULAR_RECIPIENT -> otherPartySession.send("pong")
|
||||
}
|
||||
|
||||
return "ok"
|
||||
}
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
class InvalidReceiveFlow<R: Any>(private val parties: List<Party>, private val payloadType: Class<R>): FlowLogic<String>() {
|
||||
@Suspendable
|
||||
override fun call(): String {
|
||||
val sessions = parties.flatMap { party ->
|
||||
val session = initiateFlow(party)
|
||||
listOf(session, session)
|
||||
}
|
||||
receiveAll(payloadType, sessions)
|
||||
return "ok"
|
||||
}
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
enum class MessageType {
|
||||
REPLY,
|
||||
GRACEFUL_FAILURE,
|
||||
CRASH
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
enum class StagedMessageType {
|
||||
INITIAL_RECIPIENT,
|
||||
REGULAR_RECIPIENT
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue
Block a user