Enterprise changes for OS merge.

This commit is contained in:
rick.parker 2018-05-25 15:50:44 +01:00
parent 033a5798a1
commit c830d50647
3 changed files with 135 additions and 19 deletions

View File

@ -13,8 +13,10 @@ package net.corda.node.services.vault
import net.corda.core.identity.CordaX500Name
import net.corda.testing.core.TestIdentity
import net.corda.testing.internal.*
import org.junit.*
import net.corda.testing.internal.GlobalDatabaseRule
import net.corda.testing.internal.toDatabaseSchemaName
import org.junit.ClassRule
import org.junit.Rule
import org.junit.rules.RuleChain
class VaultQueryIntegrationTests : VaultQueryTestsBase(), VaultQueryParties by vaultQueryTestRule {
@ -29,4 +31,9 @@ class VaultQueryIntegrationTests : VaultQueryTestsBase(), VaultQueryParties by v
@ClassRule @JvmField
val ruleChain = RuleChain.outerRule(globalDatabaseRule).around(vaultQueryTestRule)
}
@Suppress("LeakingThis")
@Rule
@JvmField
val transactionRule = VaultQueryRollbackRule(this)
}

View File

@ -23,12 +23,20 @@ import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.ConcurrentBox
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.LifeCycle
import net.corda.core.internal.bufferUntilSubscribed
import net.corda.core.internal.castIfPossible
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.*
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
@ -39,7 +47,12 @@ import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.messaging.ReceivedMessage
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
import net.corda.node.services.statemachine.interceptors.MetricInterceptor
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
import net.corda.node.services.statemachine.transitions.StateMachine
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
@ -53,13 +66,15 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.ThreadSafe
import kotlin.collections.ArrayList
import kotlin.concurrent.withLock
import kotlin.streams.toList
/**
* The StateMachineManagerImpl will always invoke the flow fibers on the given [AffinityExecutor], regardless of which
* thread actually starts them via [startFlow].
* thread actually starts them via [deliverExternalEvent].
*/
@ThreadSafe
class MultiThreadedStateMachineManager(
@ -101,6 +116,7 @@ class MultiThreadedStateMachineManager(
private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub)
private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null
private val transitionExecutor = makeTransitionExecutor()
private val ourSenderUUID = serviceHub.networkService.ourSenderUUID
private var checkpointSerializationContext: SerializationContext? = null
private var tokenizableServices: List<Any>? = null
@ -142,7 +158,7 @@ class MultiThreadedStateMachineManager(
resumeRestoredFlows(fibers)
flowMessaging.start { receivedMessage, deduplicationHandler ->
lifeCycle.requireState(State.STARTED) {
onSessionMessage(receivedMessage, deduplicationHandler)
deliverExternalEvent(deduplicationHandler.externalCause)
}
}
}
@ -187,7 +203,7 @@ class MultiThreadedStateMachineManager(
}
}
override fun <A> startFlow(
private fun <A> startFlow(
flowLogic: FlowLogic<A>,
context: InvocationContext,
ourIdentity: Party?,
@ -322,7 +338,73 @@ class MultiThreadedStateMachineManager(
}
}
private fun onSessionMessage(message: ReceivedMessage, deduplicationHandler: DeduplicationHandler) {
override fun retryFlowFromSafePoint(currentState: StateMachineState) {
// Get set of external events
val flowId = currentState.flowLogic.runId
val oldFlowLeftOver = concurrentBox.concurrent { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
if (oldFlowLeftOver == null) {
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val checkpoint = deserializeCheckpoint(serializedCheckpoint)
if (checkpoint == null) {
logger.error("Unable to deserialize database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
// Resurrect flow
createFlowFromCheckpoint(
id = flowId,
checkpoint = checkpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false,
senderUUID = null
)
} else {
// Just flow initiation message
null
}
externalEventMutex.withLock {
if (flow != null) addAndStartFlow(flowId, flow)
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
}
}
private val externalEventMutex = ReentrantLock()
override fun deliverExternalEvent(event: ExternalEvent) {
externalEventMutex.withLock {
when (event) {
is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event)
is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event)
}
}
}
private fun <T> onExternalStartFlow(event: ExternalEvent.ExternalStartFlowEvent<T>) {
val future = startFlow(event.flowLogic, event.context, ourIdentity = null, deduplicationHandler = event.deduplicationHandler)
event.wireUpFuture(future)
}
private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) {
val message: ReceivedMessage = event.receivedMessage
val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler
val peer = message.peer
val sessionMessage = try {
message.data.deserialize<SessionMessage>()
@ -396,7 +478,7 @@ class MultiThreadedStateMachineManager(
}
if (replyError != null) {
flowMessaging.sendSessionMessage(sender, replyError, DeduplicationId.createRandom(secureRandom))
flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID))
deduplicationHandler.afterDatabaseTransaction()
}
}
@ -470,7 +552,8 @@ class MultiThreadedStateMachineManager(
isAnyCheckpointPersisted = false,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
flowLogic = flowLogic
flowLogic = flowLogic,
senderUUID = ourSenderUUID
)
flowStateMachineImpl.transientState = TransientReference(initialState)
concurrentBox.concurrent {
@ -505,7 +588,7 @@ class MultiThreadedStateMachineManager(
private fun createTransientValues(id: StateMachineRunId, resultFuture: CordaFuture<Any?>): FlowStateMachineImpl.TransientValues {
return FlowStateMachineImpl.TransientValues(
eventQueue = Channels.newChannel(stateMachineConfiguration.eventQueueSize, Channels.OverflowPolicy.BLOCK),
eventQueue = Channels.newChannel(-1, Channels.OverflowPolicy.BLOCK),
resultFuture = resultFuture,
database = database,
transitionExecutor = transitionExecutor,
@ -521,7 +604,8 @@ class MultiThreadedStateMachineManager(
checkpoint: Checkpoint,
isAnyCheckpointPersisted: Boolean,
isStartIdempotent: Boolean,
initialDeduplicationHandler: DeduplicationHandler?
initialDeduplicationHandler: DeduplicationHandler?,
senderUUID: String? = ourSenderUUID
): Flow {
val flowState = checkpoint.flowState
val resultFuture = openFuture<Any?>()
@ -536,7 +620,8 @@ class MultiThreadedStateMachineManager(
isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
flowLogic = logic
flowLogic = logic,
senderUUID = senderUUID
)
val fiber = FlowStateMachineImpl(id, logic, scheduler)
fiber.transientValues = TransientReference(createTransientValues(id, resultFuture))
@ -554,7 +639,8 @@ class MultiThreadedStateMachineManager(
isAnyCheckpointPersisted = isAnyCheckpointPersisted,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
flowLogic = fiber.logic
flowLogic = fiber.logic,
senderUUID = senderUUID
)
fiber.transientValues = TransientReference(createTransientValues(id, resultFuture))
fiber.transientState = TransientReference(state)
@ -574,9 +660,13 @@ class MultiThreadedStateMachineManager(
sessionToFlow.put(sessionId, id)
}
concurrentBox.concurrent {
incrementLiveFibers()
unfinishedFibers.countUp()
flows.put(id, flow)
val oldFlow = flows.put(id, flow)
if (oldFlow == null) {
incrementLiveFibers()
unfinishedFibers.countUp()
} else {
oldFlow.resultFuture.captureLater(flow.resultFuture)
}
flow.fiber.scheduleEvent(Event.DoRemainingWork)
when (checkpoint.flowState) {
is FlowState.Unstarted -> {
@ -611,7 +701,7 @@ class MultiThreadedStateMachineManager(
private fun makeTransitionExecutor(): TransitionExecutor {
val interceptors = ArrayList<TransitionInterceptor>()
interceptors.add { HospitalisingInterceptor(PropagatingFlowHospital, it) }
interceptors.add { HospitalisingInterceptor(StaffedFlowHospital, it) }
if (serviceHub.configuration.devMode) {
interceptors.add { DumpHistoryOnErrorInterceptor(it) }
interceptors.add { MetricInterceptor(metrics, it) }

View File

@ -28,4 +28,23 @@ p2pMessagingRetry {
messageRedeliveryDelay = 30 seconds
maxRetryCount = 3
backoffBase = 2.0
}
enterpriseConfiguration = {
mutualExclusionConfiguration = {
on = false
updateInterval = 20000
waitInterval = 40000
}
tuning = {
flowThreadPoolSize = 1
rpcThreadPoolSize = 4
maximumMessagingBatchSize = 256
p2pConfirmationWindowSize = 1048576
brokerConnectionTtlCheckIntervalMs = 20
stateMachine = {
eventQueueSize = 16
sessionDeliverPersistenceStrategy = "OnNextCommit"
}
}
useMultiThreadedSMM = true
}