diff --git a/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt index 3087838093..143de4d5f7 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt @@ -1,5 +1,8 @@ package net.corda.node.services.config +import net.corda.node.services.statemachine.transitions.SessionDeliverPersistenceStrategy +import net.corda.node.services.statemachine.transitions.StateMachineConfiguration + data class EnterpriseConfiguration( val mutualExclusionConfiguration: MutualExclusionConfiguration, val useMultiThreadedSMM: Boolean = true, @@ -25,7 +28,8 @@ data class PerformanceTuning( val maximumMessagingBatchSize: Int, val rpcThreadPoolSize: Int, val p2pConfirmationWindowSize: Int, - val brokerConnectionTtlCheckIntervalMs: Long + val brokerConnectionTtlCheckIntervalMs: Long, + val stateMachine: StateMachineConfiguration ) { companion object { val default = PerformanceTuning( @@ -33,7 +37,8 @@ data class PerformanceTuning( maximumMessagingBatchSize = 256, rpcThreadPoolSize = 4, p2pConfirmationWindowSize = 1048576, - brokerConnectionTtlCheckIntervalMs = 20 + brokerConnectionTtlCheckIntervalMs = 20, + stateMachine = StateMachineConfiguration.default ) } } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 7329590715..15987f1349 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -6,7 +6,6 @@ import co.paralleluniverse.fibers.FiberScheduler import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.strands.Strand import co.paralleluniverse.strands.channels.Channel -import com.codahale.metrics.Counter import net.corda.core.concurrent.CordaFuture import net.corda.core.context.InvocationContext import net.corda.core.flows.* diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index c62713511b..ad14c1c36d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -268,6 +268,8 @@ class MultiThreadedStateMachineManager( } } + private val stateMachineConfiguration = serviceHub.configuration.enterpriseConfiguration.tuning.stateMachine + private fun checkQuasarJavaAgentPresence() { check(SuspendableHelper.isJavaAgentActive(), { """Missing the '-javaagent' JVM argument. Make sure you run the tests with the Quasar java agent attached to your JVM. @@ -488,12 +490,12 @@ class MultiThreadedStateMachineManager( private fun createTransientValues(id: StateMachineRunId, resultFuture: CordaFuture): FlowStateMachineImpl.TransientValues { return FlowStateMachineImpl.TransientValues( - eventQueue = Channels.newChannel(16, Channels.OverflowPolicy.BLOCK), + eventQueue = Channels.newChannel(stateMachineConfiguration.eventQueueSize, Channels.OverflowPolicy.BLOCK), resultFuture = resultFuture, database = database, transitionExecutor = transitionExecutor, actionExecutor = actionExecutor!!, - stateMachine = StateMachine(id, StateMachineConfiguration.default, secureRandom), + stateMachine = StateMachine(id, stateMachineConfiguration, secureRandom), serviceHub = serviceHub, checkpointSerializationContext = checkpointSerializationContext!! ) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 50fba1e293..e1f308f881 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -101,8 +101,6 @@ class SingleThreadedStateMachineManager( private val totalStartedFlows = metrics.counter("Flows.Started") private val totalFinishedFlows = metrics.counter("Flows.Finished") - private val totalSuccessFlows = metrics.counter("Flows.Success") - private val totalErrorFlows = metrics.counter("Flows.Error") /** * An observable that emits triples of the changing flow, the type of change, and a process-specific ID number @@ -269,6 +267,8 @@ class SingleThreadedStateMachineManager( } } + private val stateMachineConfiguration = serviceHub.configuration.enterpriseConfiguration.tuning.stateMachine + private fun checkQuasarJavaAgentPresence() { check(SuspendableHelper.isJavaAgentActive(), { """Missing the '-javaagent' JVM argument. Make sure you run the tests with the Quasar java agent attached to your JVM. @@ -489,12 +489,12 @@ class SingleThreadedStateMachineManager( private fun createTransientValues(id: StateMachineRunId, resultFuture: CordaFuture): FlowStateMachineImpl.TransientValues { return FlowStateMachineImpl.TransientValues( - eventQueue = Channels.newChannel(16, Channels.OverflowPolicy.BLOCK), + eventQueue = Channels.newChannel(stateMachineConfiguration.eventQueueSize, Channels.OverflowPolicy.BLOCK), resultFuture = resultFuture, database = database, transitionExecutor = transitionExecutor, actionExecutor = actionExecutor!!, - stateMachine = StateMachine(id, StateMachineConfiguration.default, secureRandom), + stateMachine = StateMachine(id, stateMachineConfiguration, secureRandom), serviceHub = serviceHub, checkpointSerializationContext = checkpointSerializationContext!! ) diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StateMachine.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StateMachine.kt index 8b37972423..977dd8f744 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StateMachine.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/transitions/StateMachine.kt @@ -4,17 +4,43 @@ import net.corda.core.flows.* import net.corda.node.services.statemachine.* import java.security.SecureRandom +/** + * Specifies what strategy to use to persist received messages. + * + * - [OnDeliver] means the received message should be persisted in a checkpoint as soon as possible. This means that the + * next time the flow enters the state machine a checkpoint will be created with the current state and the received + * message. Note that the deduplication ID of the received message will be committed together with the checkpoint. + * This means that for each [FlowSession.receive] *two* checkpoints will be created, one when receive() is called, + * and one when the message is received. It also means that internal session messages not exposed to the flow also + * create checkpoints. + * - [OnNextCommit] means that instead of creating an explicit checkpoint we wait for the next one that would happen + * anyway. During this time the message will not be acknowledged. + * Note that this also means that if the flow is completely idempotent then the message will never be persisted as + * no checkpoints are ever committed (unless the flow errors). In this case the message will be acknowledged at the + * very end of the flow. + * In general turning this on is safe and much more efficient than [OnDeliver]. However if the flow is hogging the + * fiber (for example doing some IO) then the acknowledgement window of the received message will be extended to + * an arbitrary length. + */ enum class SessionDeliverPersistenceStrategy { OnDeliver, OnNextCommit } +/** + * @property sessionDeliverPersistenceStrategy see [SessionDeliverPersistenceStrategy] + * @property eventQueueSize the size of a flow's event queue. If the queue gets full the thread scheduling the event + * will block. An example scenario would be if the flow is waiting for a lot of messages at once, but is slow at + * processing each. + */ data class StateMachineConfiguration( - val sessionDeliverPersistenceStrategy: SessionDeliverPersistenceStrategy + val sessionDeliverPersistenceStrategy: SessionDeliverPersistenceStrategy, + val eventQueueSize: Int ) { companion object { val default = StateMachineConfiguration( - sessionDeliverPersistenceStrategy = SessionDeliverPersistenceStrategy.OnDeliver + sessionDeliverPersistenceStrategy = SessionDeliverPersistenceStrategy.OnDeliver, + eventQueueSize = 16 ) } } diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 646599c601..6669453571 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -37,6 +37,10 @@ enterpriseConfiguration = { maximumMessagingBatchSize = 256 p2pConfirmationWindowSize = 1048576 brokerConnectionTtlCheckIntervalMs = 20 + stateMachine = { + eventQueueSize = 16 + sessionDeliverPersistenceStrategy = "OnNextCommit" + } } useMultiThreadedSMM = true }