mirror of
https://github.com/corda/corda.git
synced 2024-12-18 20:47:57 +00:00
Merge pull request #7860 from corda/merge-release/os/4.11-release/os/4.12-2024-10-29-432
ENT-11275: Merging forward updates from release/os/4.11 to release/os/4.12 - 2024-10-29
This commit is contained in:
commit
a67e6cdb1e
@ -310,6 +310,7 @@ abstract class FlowLogic<out T> {
|
|||||||
@Suspendable
|
@Suspendable
|
||||||
@JvmOverloads
|
@JvmOverloads
|
||||||
open fun receiveAllMap(sessions: Map<FlowSession, Class<out Any>>, maySkipCheckpoint: Boolean = false): Map<FlowSession, UntrustworthyData<Any>> {
|
open fun receiveAllMap(sessions: Map<FlowSession, Class<out Any>>, maySkipCheckpoint: Boolean = false): Map<FlowSession, UntrustworthyData<Any>> {
|
||||||
|
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#receiveAllMap") {
|
||||||
enforceNoPrimitiveInReceive(sessions.values)
|
enforceNoPrimitiveInReceive(sessions.values)
|
||||||
val replies = stateMachine.suspend(
|
val replies = stateMachine.suspend(
|
||||||
ioRequest = FlowIORequest.Receive(sessions.keys.toNonEmptySet()),
|
ioRequest = FlowIORequest.Receive(sessions.keys.toNonEmptySet()),
|
||||||
@ -317,6 +318,7 @@ abstract class FlowLogic<out T> {
|
|||||||
)
|
)
|
||||||
return replies.mapValues { (session, payload) -> payload.checkPayloadIs(sessions[session]!!) }
|
return replies.mapValues { (session, payload) -> payload.checkPayloadIs(sessions[session]!!) }
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Suspends until a message has been received for each session in the specified [sessions].
|
* Suspends until a message has been received for each session in the specified [sessions].
|
||||||
@ -332,10 +334,12 @@ abstract class FlowLogic<out T> {
|
|||||||
@Suspendable
|
@Suspendable
|
||||||
@JvmOverloads
|
@JvmOverloads
|
||||||
open fun <R : Any> receiveAll(receiveType: Class<R>, sessions: List<FlowSession>, maySkipCheckpoint: Boolean = false): List<UntrustworthyData<R>> {
|
open fun <R : Any> receiveAll(receiveType: Class<R>, sessions: List<FlowSession>, maySkipCheckpoint: Boolean = false): List<UntrustworthyData<R>> {
|
||||||
|
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#receiveAll") {
|
||||||
enforceNoPrimitiveInReceive(listOf(receiveType))
|
enforceNoPrimitiveInReceive(listOf(receiveType))
|
||||||
enforceNoDuplicates(sessions)
|
enforceNoDuplicates(sessions)
|
||||||
return castMapValuesToKnownType(receiveAllMap(associateSessionsToReceiveType(receiveType, sessions)))
|
return castMapValuesToKnownType(receiveAllMap(associateSessionsToReceiveType(receiveType, sessions)))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queues the given [payload] for sending to the provided [sessions] and continues without suspending.
|
* Queues the given [payload] for sending to the provided [sessions] and continues without suspending.
|
||||||
@ -351,9 +355,11 @@ abstract class FlowLogic<out T> {
|
|||||||
@Suspendable
|
@Suspendable
|
||||||
@JvmOverloads
|
@JvmOverloads
|
||||||
fun sendAll(payload: Any, sessions: Set<FlowSession>, maySkipCheckpoint: Boolean = false) {
|
fun sendAll(payload: Any, sessions: Set<FlowSession>, maySkipCheckpoint: Boolean = false) {
|
||||||
|
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAll") {
|
||||||
val sessionToPayload = sessions.map { it to payload }.toMap()
|
val sessionToPayload = sessions.map { it to payload }.toMap()
|
||||||
return sendAllMap(sessionToPayload, maySkipCheckpoint)
|
return sendAllMap(sessionToPayload, maySkipCheckpoint)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Queues the given payloads for sending to the provided sessions and continues without suspending.
|
* Queues the given payloads for sending to the provided sessions and continues without suspending.
|
||||||
@ -368,11 +374,13 @@ abstract class FlowLogic<out T> {
|
|||||||
@Suspendable
|
@Suspendable
|
||||||
@JvmOverloads
|
@JvmOverloads
|
||||||
fun sendAllMap(payloadsPerSession: Map<FlowSession, Any>, maySkipCheckpoint: Boolean = false) {
|
fun sendAllMap(payloadsPerSession: Map<FlowSession, Any>, maySkipCheckpoint: Boolean = false) {
|
||||||
|
serviceHub.telemetryServiceInternal.span("${this::class.java.name}#sendAllMap") {
|
||||||
val request = FlowIORequest.Send(
|
val request = FlowIORequest.Send(
|
||||||
sessionToMessage = stateMachine.serialize(payloadsPerSession)
|
sessionToMessage = stateMachine.serialize(payloadsPerSession)
|
||||||
)
|
)
|
||||||
stateMachine.suspend(request, maySkipCheckpoint)
|
stateMachine.suspend(request, maySkipCheckpoint)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Closes the provided sessions and performs cleanup of any resources tied to these sessions.
|
* Closes the provided sessions and performs cleanup of any resources tied to these sessions.
|
||||||
|
Loading…
Reference in New Issue
Block a user