From 0b692d7482901ade06ceba967793fcafd264138e Mon Sep 17 00:00:00 2001 From: sollecitom Date: Fri, 23 Mar 2018 13:27:28 +0000 Subject: [PATCH] Refactored as per code review comments. --- .../services/statemachine/ActionExecutorImpl.kt | 14 +++----------- .../corda/node/services/statemachine/FlowFiber.kt | 3 --- .../node/services/statemachine/FlowMessaging.kt | 15 ++++++++++----- .../services/statemachine/FlowStateMachineImpl.kt | 2 -- 4 files changed, 13 insertions(+), 21 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index daede37ee4..a22a03bd1d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -13,8 +13,6 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Fiber import co.paralleluniverse.fibers.Suspendable import com.codahale.metrics.* -import net.corda.core.context.InvocationOrigin -import net.corda.core.identity.Party import net.corda.core.internal.concurrent.thenMatch import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializedBytes @@ -74,7 +72,7 @@ class ActionExecutorImpl( is Action.ScheduleEvent -> executeScheduleEvent(fiber, action) is Action.SleepUntil -> executeSleepUntil(action) is Action.RemoveCheckpoint -> executeRemoveCheckpoint(action) - is Action.SendInitial -> executeSendInitial(action, fiber.mightDeadlockDrainingTarget(action.party)) + is Action.SendInitial -> executeSendInitial(action) is Action.SendExisting -> executeSendExisting(action) is Action.AddSessionBinding -> executeAddSessionBinding(action) is Action.RemoveSessionBindings -> executeRemoveSessionBindings(action) @@ -86,12 +84,6 @@ class ActionExecutorImpl( } } - private fun FlowFiber.mightDeadlockDrainingTarget(target: Party): Boolean { - // This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator. - // It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case. - return invocationContext().origin.let { it is InvocationOrigin.Peer && it.party == target.name } - } - @Suspendable private fun executeTrackTransaction(fiber: FlowFiber, action: Action.TrackTransaction) { services.validatedTransactions.trackTransaction(action.hash).thenMatch( @@ -174,8 +166,8 @@ class ActionExecutorImpl( } @Suspendable - private fun executeSendInitial(action: Action.SendInitial, omitDrainingModeHeaders: Boolean) { - flowMessaging.sendSessionMessage(action.party, action.initialise, action.deduplicationId, omitDrainingModeHeaders) + private fun executeSendInitial(action: Action.SendInitial) { + flowMessaging.sendSessionMessage(action.party, action.initialise, action.deduplicationId) } @Suspendable diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowFiber.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowFiber.kt index 596a8ae653..dbf4b6b8fe 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowFiber.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowFiber.kt @@ -11,7 +11,6 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable -import net.corda.core.context.InvocationContext import net.corda.core.flows.StateMachineRunId import net.corda.node.services.statemachine.transitions.StateMachine @@ -26,6 +25,4 @@ interface FlowFiber { fun scheduleEvent(event: Event) fun snapshot(): StateMachineState - - fun invocationContext(): InvocationContext } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt index 4c5c221333..54a4003036 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMessaging.kt @@ -12,6 +12,7 @@ package net.corda.node.services.statemachine import co.paralleluniverse.fibers.Suspendable import com.esotericsoftware.kryo.KryoException +import net.corda.core.context.InvocationOrigin import net.corda.core.flows.FlowException import net.corda.core.identity.Party import net.corda.core.serialization.SerializedBytes @@ -33,7 +34,7 @@ interface FlowMessaging { * listen on the send acknowledgement. */ @Suspendable - fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId, omitDrainingModeHeaders: Boolean = false) + fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId) /** * Start the messaging using the [onMessage] message handler. @@ -59,9 +60,9 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { } @Suspendable - override fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId, omitDrainingModeHeaders: Boolean) { + override fun sendSessionMessage(party: Party, message: SessionMessage, deduplicationId: DeduplicationId) { log.trace { "Sending message $deduplicationId $message to party $party" } - val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders(omitDrainingModeHeaders)) + val networkMessage = serviceHub.networkService.createMessage(sessionTopic, serializeSessionMessage(message).bytes, deduplicationId, message.additionalHeaders(party)) val partyInfo = serviceHub.networkMapCache.getPartyInfo(party) ?: throw IllegalArgumentException("Don't know about $party") val address = serviceHub.networkService.getAddressOfParty(partyInfo) val sequenceKey = when (message) { @@ -71,9 +72,13 @@ class FlowMessagingImpl(val serviceHub: ServiceHubInternal): FlowMessaging { serviceHub.networkService.send(networkMessage, address, sequenceKey = sequenceKey) } - private fun SessionMessage.additionalHeaders(omitDrainingModeHeaders: Boolean): Map { + private fun SessionMessage.additionalHeaders(target: Party): Map { + + // This prevents a "deadlock" in case an initiated flow tries to start a session against a draining node that is also the initiator. + // It does not help in case more than 2 nodes are involved in a circle, so the kill switch via RPC should be used in that case. + val mightDeadlockDrainingTarget = FlowStateMachineImpl.currentStateMachine()?.context?.origin.let { it is InvocationOrigin.Peer && it.party == target.name } return when { - this !is InitialSessionMessage || omitDrainingModeHeaders -> emptyMap() + this !is InitialSessionMessage || mightDeadlockDrainingTarget -> emptyMap() else -> mapOf(P2PMessagingHeaders.Type.KEY to P2PMessagingHeaders.Type.SESSION_INIT_VALUE) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 26c26b5c91..afd2718d2f 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -64,8 +64,6 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, override val serviceHub get() = getTransientField(TransientValues::serviceHub) - override fun invocationContext(): InvocationContext = snapshot().flowLogic.stateMachine.context - data class TransientValues( val eventQueue: Channel, val resultFuture: CordaFuture,