From 354977ffeaab45b316f041880379ccce1767eaa3 Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Thu, 12 Jan 2017 09:54:08 +0000 Subject: [PATCH] Fix intermittent failure in ScheduledFlowTests (#140) Allow a single latch for all activity in the MockNetwork --- .../net/corda/node/internal/AbstractNode.kt | 9 ++++++--- .../services/events/NodeSchedulerService.kt | 19 +++++++------------ .../statemachine/FlowStateMachineImpl.kt | 8 ++------ .../statemachine/StateMachineManager.kt | 12 ++++++------ .../corda/node/services/ScheduledFlowTests.kt | 5 ----- .../testing/node/InMemoryMessagingNetwork.kt | 5 ++--- .../kotlin/net/corda/testing/node/MockNode.kt | 18 +++++------------- 7 files changed, 28 insertions(+), 48 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 1a79e553ca..9b8a5ce628 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -49,6 +49,7 @@ import net.corda.node.utilities.AddOrRemove.ADD import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.configureDatabase import net.corda.node.utilities.databaseTransaction +import org.apache.activemq.artemis.utils.ReusableLatch import org.jetbrains.exposed.sql.Database import org.slf4j.Logger import java.nio.file.FileAlreadyExistsException @@ -74,7 +75,8 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair // AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in. abstract class AbstractNode(open val configuration: NodeConfiguration, val advertisedServices: Set, - val platformClock: Clock) : SingletonSerializeAsToken() { + val platformClock: Clock, + @VisibleForTesting val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() { companion object { val PRIVATE_KEY_FILE_NAME = "identity-private-key" val PUBLIC_IDENTITY_FILE_NAME = "identity-public" @@ -219,7 +221,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, keyManagement = makeKeyManagementService() api = APIServerImpl(this@AbstractNode) flowLogicFactory = initialiseFlowLogicFactory() - scheduler = NodeSchedulerService(database, services, flowLogicFactory) + scheduler = NodeSchedulerService(database, services, flowLogicFactory, unfinishedSchedules = busyNodeLatch) val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler) @@ -237,7 +239,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, listOf(tokenizableServices), checkpointStorage, serverThread, - database) + database, + busyNodeLatch) if (serverThread is ExecutorService) { runOnStop += Runnable { // We wait here, even though any in-flight messages should have been drained away because the diff --git a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt index 08e30b9e6f..51de4aaf76 100644 --- a/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt +++ b/node/src/main/kotlin/net/corda/node/services/events/NodeSchedulerService.kt @@ -1,7 +1,6 @@ package net.corda.node.services.events import co.paralleluniverse.fibers.Suspendable -import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.SettableFuture import kotlinx.support.jdk8.collections.compute import net.corda.core.ThreadBox @@ -48,7 +47,8 @@ import javax.annotation.concurrent.ThreadSafe class NodeSchedulerService(private val database: Database, private val services: ServiceHubInternal, private val flowLogicRefFactory: FlowLogicRefFactory, - private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor()) + private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(), + private val unfinishedSchedules: ReusableLatch = ReusableLatch()) : SchedulerService, SingletonSerializeAsToken() { private val log = loggerFor() @@ -89,9 +89,6 @@ class NodeSchedulerService(private val database: Database, private val mutex = ThreadBox(InnerState()) - @VisibleForTesting - val unfinishedSchedules = ReusableLatch() - // We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow. fun start() { mutex.locked { @@ -124,10 +121,10 @@ class NodeSchedulerService(private val database: Database, val removedAction = scheduledStates.remove(ref) if (removedAction != null) { unfinishedSchedules.countDown() - } - if (removedAction == earliestState && removedAction != null) { - recomputeEarliest() - rescheduleWakeUp() + if (removedAction == earliestState) { + recomputeEarliest() + rescheduleWakeUp() + } } } } @@ -182,6 +179,7 @@ class NodeSchedulerService(private val database: Database, val scheduledLogic: FlowLogic<*>? = getScheduledLogic() if (scheduledLogic != null) { subFlow(scheduledLogic) + scheduler.unfinishedSchedules.countDown() } } @@ -215,10 +213,7 @@ class NodeSchedulerService(private val database: Database, // TODO: FlowLogicRefFactory needs to sort out the class loader etc val logic = scheduler.flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef) logger.trace { "Scheduler starting FlowLogic $logic" } - // FlowLogic will be checkpointed by the time this returns. - //scheduler.services.startFlowAndForget(logic) scheduledLogic = logic - scheduler.unfinishedSchedules.countDown() null } } else { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 9b71110da3..a3e55f04b4 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -19,7 +19,6 @@ import net.corda.node.services.statemachine.StateMachineManager.FlowSession import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState import net.corda.node.utilities.StrandLocalTransactionManager import net.corda.node.utilities.createDatabaseTransaction -import net.corda.node.utilities.databaseTransaction import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Transaction import org.jetbrains.exposed.sql.transactions.TransactionManager @@ -87,14 +86,12 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, logic.call() } catch (t: Throwable) { actionOnEnd() - commitTransaction() _resultFuture?.setException(t) throw ExecutionException(t) } // This is to prevent actionOnEnd being called twice if it throws an exception actionOnEnd() - commitTransaction() _resultFuture?.set(result) return result } @@ -270,9 +267,8 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, private fun processException(t: Throwable) { // This can get called in actionOnSuspend *after* we commit the database transaction, so optionally open a new one here. - databaseTransaction(database) { - actionOnEnd() - } + createDatabaseTransaction(database) + actionOnEnd() _resultFuture?.setException(t) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt index cb0a0bef6e..fdfa200f1d 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineManager.kt @@ -6,7 +6,6 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer import co.paralleluniverse.strands.Strand import com.codahale.metrics.Gauge import com.esotericsoftware.kryo.Kryo -import com.google.common.annotations.VisibleForTesting import com.google.common.util.concurrent.ListenableFuture import kotlinx.support.jdk8.collections.removeIf import net.corda.core.ThreadBox @@ -69,7 +68,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, tokenizableServices: List, val checkpointStorage: CheckpointStorage, val executor: AffinityExecutor, - val database: Database) { + val database: Database, + private val unfinishedFibers: ReusableLatch = ReusableLatch()) { inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor) @@ -102,8 +102,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, @Volatile private var stopping = false // How many Fibers are running and not suspended. If zero and stopping is true, then we are halted. private val liveFibers = ReusableLatch() - @VisibleForTesting - val unfinishedFibers = ReusableLatch() + // Monitoring support. private val metrics = serviceHub.monitoringService.metrics @@ -336,13 +335,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal, fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE mutex.locked { stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) } - totalFinishedFlows.inc() - unfinishedFibers.countDown() notifyChangeObservers(fiber, AddOrRemove.REMOVE) } endAllFiberSessions(fiber) } finally { + fiber.commitTransaction() decrementLiveFibers() + totalFinishedFlows.inc() + unfinishedFibers.countDown() } } mutex.locked { diff --git a/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt b/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt index 9e60c223db..dd1f47b1ee 100644 --- a/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/ScheduledFlowTests.kt @@ -4,7 +4,6 @@ import co.paralleluniverse.fibers.Suspendable import net.corda.core.contracts.* import net.corda.core.crypto.CompositeKey import net.corda.core.crypto.Party -import net.corda.core.crypto.SecureHash import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogicRefFactory import net.corda.core.node.CordaPluginRegistry @@ -20,7 +19,6 @@ import net.corda.testing.node.MockNetwork import org.junit.After import org.junit.Assert.assertTrue import org.junit.Before -import org.junit.Ignore import org.junit.Test import java.security.PublicKey import java.time.Instant @@ -128,10 +126,7 @@ class ScheduledFlowTests { assertTrue("Must be processed", stateFromB.state.data.processed) } - @Ignore @Test - // TODO I need to investigate why we get very very occasional SessionInit failures - // during notarisation. fun `Run a whole batch of scheduled flows`() { val N = 100 for (i in 0..N - 1) { diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt index cb83d6e0ed..d055730748 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/InMemoryMessagingNetwork.kt @@ -44,7 +44,8 @@ import kotlin.concurrent.thread @ThreadSafe class InMemoryMessagingNetwork( val sendManuallyPumped: Boolean, - val servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random() + val servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random(), + private val messagesInFlight: ReusableLatch = ReusableLatch() ) : SingletonSerializeAsToken() { companion object { val MESSAGES_LOG_NAME = "messages" @@ -78,8 +79,6 @@ class InMemoryMessagingNetwork( // Holds the mapping from services to peers advertising the service. private val serviceToPeersMapping = HashMap>() - val messagesInFlight = ReusableLatch() - @Suppress("unused") // Used by the visualiser tool. /** A stream of (sender, message, recipients) triples */ val receivedMessages: Observable diff --git a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt index 121d0b1ecc..ea5da50ab9 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/node/MockNode.kt @@ -26,6 +26,7 @@ import net.corda.node.services.vault.NodeVaultService import net.corda.node.utilities.AffinityExecutor import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor import net.corda.testing.TestNodeConfiguration +import org.apache.activemq.artemis.utils.ReusableLatch import org.slf4j.Logger import java.nio.file.FileSystem import java.security.KeyPair @@ -53,7 +54,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, private val defaultFactory: Factory = MockNetwork.DefaultFactory) { private var nextNodeId = 0 val filesystem: FileSystem = Jimfs.newFileSystem(unix()) - val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped, servicePeerAllocationStrategy) + private val busyLatch: ReusableLatch = ReusableLatch() + val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped, servicePeerAllocationStrategy, busyLatch) // A unique identifier for this network to segregate databases with the same nodeID but different networks. private val networkId = random63BitValue() @@ -111,7 +113,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override val networkMapAddress: SingleMessageRecipient?, advertisedServices: Set, val id: Int, - val keyPair: KeyPair?) : AbstractNode(config, advertisedServices, TestClock()) { + val keyPair: KeyPair?) : AbstractNode(config, advertisedServices, TestClock(), mockNet.busyLatch) { override val log: Logger = loggerFor() override val serverThread: AffinityExecutor = if (mockNet.threadPerNode) @@ -291,17 +293,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, // Test method to block until all scheduled activity, active flows // and network activity has ceased. - // TODO This is not perfect in that certain orderings my skip over the scanning loop. - // However, in practice it works well for testing of scheduled flows. fun waitQuiescent() { - while(nodes.any { it.smm.unfinishedFibers.count > 0 - || it.scheduler.unfinishedSchedules.count > 0} - || messagingNetwork.messagesInFlight.count > 0) { - for (node in nodes) { - node.smm.unfinishedFibers.await() - node.scheduler.unfinishedSchedules.await() - } - messagingNetwork.messagesInFlight.await() - } + busyLatch.await() } }