Merge pull request #446 from corda/aslemmer-expose-deliver-persistence-strategy

Expose state machine options, default OnNextCommit persistence
This commit is contained in:
Andras Slemmer 2018-02-14 17:31:50 +00:00 committed by GitHub
commit 945095480d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 47 additions and 11 deletions

View File

@ -1,5 +1,8 @@
package net.corda.node.services.config
import net.corda.node.services.statemachine.transitions.SessionDeliverPersistenceStrategy
import net.corda.node.services.statemachine.transitions.StateMachineConfiguration
data class EnterpriseConfiguration(
val mutualExclusionConfiguration: MutualExclusionConfiguration,
val useMultiThreadedSMM: Boolean = true,
@ -25,7 +28,8 @@ data class PerformanceTuning(
val maximumMessagingBatchSize: Int,
val rpcThreadPoolSize: Int,
val p2pConfirmationWindowSize: Int,
val brokerConnectionTtlCheckIntervalMs: Long
val brokerConnectionTtlCheckIntervalMs: Long,
val stateMachine: StateMachineConfiguration
) {
companion object {
val default = PerformanceTuning(
@ -33,7 +37,8 @@ data class PerformanceTuning(
maximumMessagingBatchSize = 256,
rpcThreadPoolSize = 4,
p2pConfirmationWindowSize = 1048576,
brokerConnectionTtlCheckIntervalMs = 20
brokerConnectionTtlCheckIntervalMs = 20,
stateMachine = StateMachineConfiguration.default
)
}
}

View File

@ -6,7 +6,6 @@ import co.paralleluniverse.fibers.FiberScheduler
import co.paralleluniverse.fibers.Suspendable
import co.paralleluniverse.strands.Strand
import co.paralleluniverse.strands.channels.Channel
import com.codahale.metrics.Counter
import net.corda.core.concurrent.CordaFuture
import net.corda.core.context.InvocationContext
import net.corda.core.flows.*

View File

@ -268,6 +268,8 @@ class MultiThreadedStateMachineManager(
}
}
private val stateMachineConfiguration = serviceHub.configuration.enterpriseConfiguration.tuning.stateMachine
private fun checkQuasarJavaAgentPresence() {
check(SuspendableHelper.isJavaAgentActive(), {
"""Missing the '-javaagent' JVM argument. Make sure you run the tests with the Quasar java agent attached to your JVM.
@ -488,12 +490,12 @@ class MultiThreadedStateMachineManager(
private fun createTransientValues(id: StateMachineRunId, resultFuture: CordaFuture<Any?>): FlowStateMachineImpl.TransientValues {
return FlowStateMachineImpl.TransientValues(
eventQueue = Channels.newChannel(16, Channels.OverflowPolicy.BLOCK),
eventQueue = Channels.newChannel(stateMachineConfiguration.eventQueueSize, Channels.OverflowPolicy.BLOCK),
resultFuture = resultFuture,
database = database,
transitionExecutor = transitionExecutor,
actionExecutor = actionExecutor!!,
stateMachine = StateMachine(id, StateMachineConfiguration.default, secureRandom),
stateMachine = StateMachine(id, stateMachineConfiguration, secureRandom),
serviceHub = serviceHub,
checkpointSerializationContext = checkpointSerializationContext!!
)

View File

@ -101,8 +101,6 @@ class SingleThreadedStateMachineManager(
private val totalStartedFlows = metrics.counter("Flows.Started")
private val totalFinishedFlows = metrics.counter("Flows.Finished")
private val totalSuccessFlows = metrics.counter("Flows.Success")
private val totalErrorFlows = metrics.counter("Flows.Error")
/**
* An observable that emits triples of the changing flow, the type of change, and a process-specific ID number
@ -269,6 +267,8 @@ class SingleThreadedStateMachineManager(
}
}
private val stateMachineConfiguration = serviceHub.configuration.enterpriseConfiguration.tuning.stateMachine
private fun checkQuasarJavaAgentPresence() {
check(SuspendableHelper.isJavaAgentActive(), {
"""Missing the '-javaagent' JVM argument. Make sure you run the tests with the Quasar java agent attached to your JVM.
@ -489,12 +489,12 @@ class SingleThreadedStateMachineManager(
private fun createTransientValues(id: StateMachineRunId, resultFuture: CordaFuture<Any?>): FlowStateMachineImpl.TransientValues {
return FlowStateMachineImpl.TransientValues(
eventQueue = Channels.newChannel(16, Channels.OverflowPolicy.BLOCK),
eventQueue = Channels.newChannel(stateMachineConfiguration.eventQueueSize, Channels.OverflowPolicy.BLOCK),
resultFuture = resultFuture,
database = database,
transitionExecutor = transitionExecutor,
actionExecutor = actionExecutor!!,
stateMachine = StateMachine(id, StateMachineConfiguration.default, secureRandom),
stateMachine = StateMachine(id, stateMachineConfiguration, secureRandom),
serviceHub = serviceHub,
checkpointSerializationContext = checkpointSerializationContext!!
)

View File

@ -4,17 +4,43 @@ import net.corda.core.flows.*
import net.corda.node.services.statemachine.*
import java.security.SecureRandom
/**
* Specifies what strategy to use to persist received messages.
*
* - [OnDeliver] means the received message should be persisted in a checkpoint as soon as possible. This means that the
* next time the flow enters the state machine a checkpoint will be created with the current state and the received
* message. Note that the deduplication ID of the received message will be committed together with the checkpoint.
* This means that for each [FlowSession.receive] *two* checkpoints will be created, one when receive() is called,
* and one when the message is received. It also means that internal session messages not exposed to the flow also
* create checkpoints.
* - [OnNextCommit] means that instead of creating an explicit checkpoint we wait for the next one that would happen
* anyway. During this time the message will not be acknowledged.
* Note that this also means that if the flow is completely idempotent then the message will never be persisted as
* no checkpoints are ever committed (unless the flow errors). In this case the message will be acknowledged at the
* very end of the flow.
* In general turning this on is safe and much more efficient than [OnDeliver]. However if the flow is hogging the
* fiber (for example doing some IO) then the acknowledgement window of the received message will be extended to
* an arbitrary length.
*/
enum class SessionDeliverPersistenceStrategy {
OnDeliver,
OnNextCommit
}
/**
* @property sessionDeliverPersistenceStrategy see [SessionDeliverPersistenceStrategy]
* @property eventQueueSize the size of a flow's event queue. If the queue gets full the thread scheduling the event
* will block. An example scenario would be if the flow is waiting for a lot of messages at once, but is slow at
* processing each.
*/
data class StateMachineConfiguration(
val sessionDeliverPersistenceStrategy: SessionDeliverPersistenceStrategy
val sessionDeliverPersistenceStrategy: SessionDeliverPersistenceStrategy,
val eventQueueSize: Int
) {
companion object {
val default = StateMachineConfiguration(
sessionDeliverPersistenceStrategy = SessionDeliverPersistenceStrategy.OnDeliver
sessionDeliverPersistenceStrategy = SessionDeliverPersistenceStrategy.OnDeliver,
eventQueueSize = 16
)
}
}

View File

@ -37,6 +37,10 @@ enterpriseConfiguration = {
maximumMessagingBatchSize = 256
p2pConfirmationWindowSize = 1048576
brokerConnectionTtlCheckIntervalMs = 20
stateMachine = {
eventQueueSize = 16
sessionDeliverPersistenceStrategy = "OnNextCommit"
}
}
useMultiThreadedSMM = true
}