mirror of
https://github.com/corda/corda.git
synced 2025-02-26 03:18:57 +00:00
Refactored as per code review comments.
This commit is contained in:
parent
4ce69e3ace
commit
0b692d7482
@ -13,8 +13,6 @@ package net.corda.node.services.statemachine
|
||||
import co.paralleluniverse.fibers.Fiber
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.codahale.metrics.*
|
||||
import net.corda.core.context.InvocationOrigin
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.thenMatch
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
@ -74,7 +72,7 @@ class ActionExecutorImpl(
|
||||
is Action.ScheduleEvent -> executeScheduleEvent(fiber, action)
|
||||
is Action.SleepUntil -> executeSleepUntil(action)
|
||||
is Action.RemoveCheckpoint -> executeRemoveCheckpoint(action)
|
||||
is Action.SendInitial -> executeSendInitial(action, fiber.mightDeadlockDrainingTarget(action.party))
|
||||
is Action.SendInitial -> executeSendInitial(action)
|
||||
is Action.SendExisting -> executeSendExisting(action)
|
||||
is Action.AddSessionBinding -> executeAddSessionBinding(action)
|
||||
is Action.RemoveSessionBindings -> executeRemoveSessionBindings(action)
|
||||
@ -86,12 +84,6 @@ class ActionExecutorImpl(
|
||||
}
|
||||
}
|
||||
|
||||
private fun FlowFiber.mightDeadlockDrainingTarget(target: Party): Boolean {
|
||||
// This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator.
|
||||
// It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case.
|
||||
return invocationContext().origin.let { it is InvocationOrigin.Peer && it.party == target.name }
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) {
|
||||
services.validatedTransactions.trackTransaction(action.hash).thenMatch(
|
||||
@ -174,8 +166,8 @@ class ActionExecutorImpl(
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
private fun executeSendInitial(action: Action.SendInitial, omitDrainingModeHeaders: Boolean) {
|
||||
flowMessaging.sendSessionMessage(action.party, action.initialise, action.deduplicationId, omitDrainingModeHeaders)
|
||||
private fun executeSendInitial(action: Action.SendInitial) {
|
||||
flowMessaging.sendSessionMessage(action.party, action.initialise, action.deduplicationId)
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
|
@ -11,7 +11,6 @@
|
||||
package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.flows.StateMachineRunId
|
||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||
|
||||
@ -26,6 +25,4 @@ interface FlowFiber {
|
||||
fun scheduleEvent(event: Event)
|
||||
|
||||
fun snapshot(): StateMachineState
|
||||
|
||||
fun invocationContext(): InvocationContext
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ package net.corda.node.services.statemachine
|
||||
|
||||
import co.paralleluniverse.fibers.Suspendable
|
||||
import com.esotericsoftware.kryo.KryoException
|
||||
import net.corda.core.context.InvocationOrigin
|
||||
import net.corda.core.flows.FlowException
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
@ -33,7 +34,7 @@ interface FlowMessaging {
|
||||
* listen on the send acknowledgement.
|
||||
*/
|
||||
@Suspendable
|
||||
fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId, omitDrainingModeHeaders: Boolean = false)
|
||||
fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId)
|
||||
|
||||
/**
|
||||
* Start the messaging using the [onMessage] message handler.
|
||||
@ -59,9 +60,9 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
|
||||
}
|
||||
|
||||
@Suspendable
|
||||
override fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId, omitDrainingModeHeaders: Boolean) {
|
||||
override fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId) {
|
||||
log.trace { "Sending message $deduplicationId $message to party $party" }
|
||||
val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders(omitDrainingModeHeaders))
|
||||
val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders(party))
|
||||
val partyInfo = serviceHub.networkMapCache.getPartyInfo(party) ?: throw IllegalArgumentException("Don't know about $party")
|
||||
val address = serviceHub.networkService.getAddressOfParty(partyInfo)
|
||||
val sequenceKey = when (message) {
|
||||
@ -71,9 +72,13 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging {
|
||||
serviceHub.networkService.send(networkMessage, address, sequenceKey = sequenceKey)
|
||||
}
|
||||
|
||||
private fun SessionMessage.additionalHeaders(omitDrainingModeHeaders: Boolean): Map<String, String> {
|
||||
private fun SessionMessage.additionalHeaders(target: Party): Map<String, String> {
|
||||
|
||||
// This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator.
|
||||
// It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case.
|
||||
val mightDeadlockDrainingTarget = FlowStateMachineImpl.currentStateMachine()?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name }
|
||||
return when {
|
||||
this !is InitialSessionMessage || omitDrainingModeHeaders -> emptyMap()
|
||||
this !is InitialSessionMessage || mightDeadlockDrainingTarget -> emptyMap()
|
||||
else -> mapOf(P2PMessagingHeaders.Type.KEY to P2PMessagingHeaders.Type.SESSION_INIT_VALUE)
|
||||
}
|
||||
}
|
||||
|
@ -64,8 +64,6 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
||||
|
||||
override val serviceHub get() = getTransientField(TransientValues::serviceHub)
|
||||
|
||||
override fun invocationContext(): InvocationContext = snapshot().flowLogic.stateMachine.context
|
||||
|
||||
data class TransientValues(
|
||||
val eventQueue: Channel<Event>,
|
||||
val resultFuture: CordaFuture<Any?>,
|
||||
|
Loading…
x
Reference in New Issue
Block a user