diff --git a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultQueryIntegrationTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultQueryIntegrationTests.kt index 1c21e14617..3bea4ac7c0 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultQueryIntegrationTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/vault/VaultQueryIntegrationTests.kt @@ -13,8 +13,10 @@ package net.corda.node.services.vault import net.corda.core.identity.CordaX500Name import net.corda.testing.core.TestIdentity -import net.corda.testing.internal.* -import org.junit.* +import net.corda.testing.internal.GlobalDatabaseRule +import net.corda.testing.internal.toDatabaseSchemaName +import org.junit.ClassRule +import org.junit.Rule import org.junit.rules.RuleChain class VaultQueryIntegrationTests : VaultQueryTestsBase(), VaultQueryParties by vaultQueryTestRule { @@ -29,4 +31,9 @@ class VaultQueryIntegrationTests : VaultQueryTestsBase(), VaultQueryParties by v @ClassRule @JvmField val ruleChain = RuleChain.outerRule(globalDatabaseRule).around(vaultQueryTestRule) } + + @Suppress("LeakingThis") + @Rule + @JvmField + val transactionRule = VaultQueryRollbackRule(this) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt index 5392273f03..e9f14b82ad 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/MultiThreadedStateMachineManager.kt @@ -23,12 +23,20 @@ import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId import net.corda.core.identity.Party -import net.corda.core.internal.* +import net.corda.core.internal.ConcurrentBox +import net.corda.core.internal.FlowStateMachine +import net.corda.core.internal.LifeCycle +import net.corda.core.internal.bufferUntilSubscribed +import net.corda.core.internal.castIfPossible import net.corda.core.internal.concurrent.OpenFuture import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.openFuture import net.corda.core.messaging.DataFeed -import net.corda.core.serialization.* +import net.corda.core.serialization.SerializationContext +import net.corda.core.serialization.SerializationDefaults +import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.serialize import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.Try import net.corda.core.utilities.contextLogger @@ -39,7 +47,12 @@ import net.corda.node.services.api.ServiceHubInternal import net.corda.node.services.config.shouldCheckCheckpoints import net.corda.node.services.messaging.DeduplicationHandler import net.corda.node.services.messaging.ReceivedMessage -import net.corda.node.services.statemachine.interceptors.* +import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor +import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker +import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor +import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor +import net.corda.node.services.statemachine.interceptors.MetricInterceptor +import net.corda.node.services.statemachine.interceptors.PrintingInterceptor import net.corda.node.services.statemachine.transitions.StateMachine import net.corda.node.utilities.AffinityExecutor import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -53,13 +66,15 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ExecutorService import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.ThreadSafe import kotlin.collections.ArrayList +import kotlin.concurrent.withLock import kotlin.streams.toList /** * The StateMachineManagerImpl will always invoke the flow fibers on the given [AffinityExecutor], regardless of which - * thread actually starts them via [startFlow]. + * thread actually starts them via [deliverExternalEvent]. */ @ThreadSafe class MultiThreadedStateMachineManager( @@ -101,6 +116,7 @@ class MultiThreadedStateMachineManager( private val flowMessaging: FlowMessaging = FlowMessagingImpl(serviceHub) private val fiberDeserializationChecker = if (serviceHub.configuration.shouldCheckCheckpoints()) FiberDeserializationChecker() else null private val transitionExecutor = makeTransitionExecutor() + private val ourSenderUUID = serviceHub.networkService.ourSenderUUID private var checkpointSerializationContext: SerializationContext? = null private var tokenizableServices: List? = null @@ -142,7 +158,7 @@ class MultiThreadedStateMachineManager( resumeRestoredFlows(fibers) flowMessaging.start { receivedMessage, deduplicationHandler -> lifeCycle.requireState(State.STARTED) { - onSessionMessage(receivedMessage, deduplicationHandler) + deliverExternalEvent(deduplicationHandler.externalCause) } } } @@ -187,7 +203,7 @@ class MultiThreadedStateMachineManager( } } - override fun startFlow( + private fun startFlow( flowLogic: FlowLogic, context: InvocationContext, ourIdentity: Party?, @@ -322,7 +338,73 @@ class MultiThreadedStateMachineManager( } } - private fun onSessionMessage(message: ReceivedMessage, deduplicationHandler: DeduplicationHandler) { + override fun retryFlowFromSafePoint(currentState: StateMachineState) { + // Get set of external events + val flowId = currentState.flowLogic.runId + val oldFlowLeftOver = concurrentBox.concurrent { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue + if (oldFlowLeftOver == null) { + logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.") + return + } + val flow = if (currentState.isAnyCheckpointPersisted) { + val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId) + if (serializedCheckpoint == null) { + logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.") + return + } + val checkpoint = deserializeCheckpoint(serializedCheckpoint) + if (checkpoint == null) { + logger.error("Unable to deserialize database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.") + return + } + // Resurrect flow + createFlowFromCheckpoint( + id = flowId, + checkpoint = checkpoint, + initialDeduplicationHandler = null, + isAnyCheckpointPersisted = true, + isStartIdempotent = false, + senderUUID = null + ) + } else { + // Just flow initiation message + null + } + externalEventMutex.withLock { + if (flow != null) addAndStartFlow(flowId, flow) + // Deliver all the external events from the old flow instance. + val unprocessedExternalEvents = mutableListOf() + do { + val event = oldFlowLeftOver.tryReceive() + if (event is Event.GeneratedByExternalEvent) { + unprocessedExternalEvents += event.deduplicationHandler.externalCause + } + } while (event != null) + val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents + for (externalEvent in externalEvents) { + deliverExternalEvent(externalEvent) + } + } + } + + private val externalEventMutex = ReentrantLock() + override fun deliverExternalEvent(event: ExternalEvent) { + externalEventMutex.withLock { + when (event) { + is ExternalEvent.ExternalMessageEvent -> onSessionMessage(event) + is ExternalEvent.ExternalStartFlowEvent<*> -> onExternalStartFlow(event) + } + } + } + + private fun onExternalStartFlow(event: ExternalEvent.ExternalStartFlowEvent) { + val future = startFlow(event.flowLogic, event.context, ourIdentity = null, deduplicationHandler = event.deduplicationHandler) + event.wireUpFuture(future) + } + + private fun onSessionMessage(event: ExternalEvent.ExternalMessageEvent) { + val message: ReceivedMessage = event.receivedMessage + val deduplicationHandler: DeduplicationHandler = event.deduplicationHandler val peer = message.peer val sessionMessage = try { message.data.deserialize() @@ -396,7 +478,7 @@ class MultiThreadedStateMachineManager( } if (replyError != null) { - flowMessaging.sendSessionMessage(sender, replyError, DeduplicationId.createRandom(secureRandom)) + flowMessaging.sendSessionMessage(sender, replyError, SenderDeduplicationId(DeduplicationId.createRandom(secureRandom), ourSenderUUID)) deduplicationHandler.afterDatabaseTransaction() } } @@ -470,7 +552,8 @@ class MultiThreadedStateMachineManager( isAnyCheckpointPersisted = false, isStartIdempotent = isStartIdempotent, isRemoved = false, - flowLogic = flowLogic + flowLogic = flowLogic, + senderUUID = ourSenderUUID ) flowStateMachineImpl.transientState = TransientReference(initialState) concurrentBox.concurrent { @@ -505,7 +588,7 @@ class MultiThreadedStateMachineManager( private fun createTransientValues(id: StateMachineRunId, resultFuture: CordaFuture): FlowStateMachineImpl.TransientValues { return FlowStateMachineImpl.TransientValues( - eventQueue = Channels.newChannel(stateMachineConfiguration.eventQueueSize, Channels.OverflowPolicy.BLOCK), + eventQueue = Channels.newChannel(-1, Channels.OverflowPolicy.BLOCK), resultFuture = resultFuture, database = database, transitionExecutor = transitionExecutor, @@ -521,7 +604,8 @@ class MultiThreadedStateMachineManager( checkpoint: Checkpoint, isAnyCheckpointPersisted: Boolean, isStartIdempotent: Boolean, - initialDeduplicationHandler: DeduplicationHandler? + initialDeduplicationHandler: DeduplicationHandler?, + senderUUID: String? = ourSenderUUID ): Flow { val flowState = checkpoint.flowState val resultFuture = openFuture() @@ -536,7 +620,8 @@ class MultiThreadedStateMachineManager( isAnyCheckpointPersisted = isAnyCheckpointPersisted, isStartIdempotent = isStartIdempotent, isRemoved = false, - flowLogic = logic + flowLogic = logic, + senderUUID = senderUUID ) val fiber = FlowStateMachineImpl(id, logic, scheduler) fiber.transientValues = TransientReference(createTransientValues(id, resultFuture)) @@ -554,7 +639,8 @@ class MultiThreadedStateMachineManager( isAnyCheckpointPersisted = isAnyCheckpointPersisted, isStartIdempotent = isStartIdempotent, isRemoved = false, - flowLogic = fiber.logic + flowLogic = fiber.logic, + senderUUID = senderUUID ) fiber.transientValues = TransientReference(createTransientValues(id, resultFuture)) fiber.transientState = TransientReference(state) @@ -574,9 +660,13 @@ class MultiThreadedStateMachineManager( sessionToFlow.put(sessionId, id) } concurrentBox.concurrent { - incrementLiveFibers() - unfinishedFibers.countUp() - flows.put(id, flow) + val oldFlow = flows.put(id, flow) + if (oldFlow == null) { + incrementLiveFibers() + unfinishedFibers.countUp() + } else { + oldFlow.resultFuture.captureLater(flow.resultFuture) + } flow.fiber.scheduleEvent(Event.DoRemainingWork) when (checkpoint.flowState) { is FlowState.Unstarted -> { @@ -611,7 +701,7 @@ class MultiThreadedStateMachineManager( private fun makeTransitionExecutor(): TransitionExecutor { val interceptors = ArrayList() - interceptors.add { HospitalisingInterceptor(PropagatingFlowHospital, it) } + interceptors.add { HospitalisingInterceptor(StaffedFlowHospital, it) } if (serviceHub.configuration.devMode) { interceptors.add { DumpHistoryOnErrorInterceptor(it) } interceptors.add { MetricInterceptor(metrics, it) } diff --git a/node/src/test/resources/working-config.conf b/node/src/test/resources/working-config.conf index 45ca6ef647..3b3b51af89 100644 --- a/node/src/test/resources/working-config.conf +++ b/node/src/test/resources/working-config.conf @@ -28,4 +28,23 @@ p2pMessagingRetry { messageRedeliveryDelay = 30 seconds maxRetryCount = 3 backoffBase = 2.0 +} +enterpriseConfiguration = { + mutualExclusionConfiguration = { + on = false + updateInterval = 20000 + waitInterval = 40000 + } + tuning = { + flowThreadPoolSize = 1 + rpcThreadPoolSize = 4 + maximumMessagingBatchSize = 256 + p2pConfirmationWindowSize = 1048576 + brokerConnectionTtlCheckIntervalMs = 20 + stateMachine = { + eventQueueSize = 16 + sessionDeliverPersistenceStrategy = "OnNextCommit" + } + } + useMultiThreadedSMM = true } \ No newline at end of file