mirror of
https://github.com/corda/corda.git
synced 2025-06-19 23:53:52 +00:00
Fix intermittent failure in ScheduledFlowTests (#140)
Allow a single latch for all activity in the MockNetwork
This commit is contained in:
@ -49,6 +49,7 @@ import net.corda.node.utilities.AddOrRemove.ADD
|
|||||||
import net.corda.node.utilities.AffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor
|
||||||
import net.corda.node.utilities.configureDatabase
|
import net.corda.node.utilities.configureDatabase
|
||||||
import net.corda.node.utilities.databaseTransaction
|
import net.corda.node.utilities.databaseTransaction
|
||||||
|
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||||
import org.jetbrains.exposed.sql.Database
|
import org.jetbrains.exposed.sql.Database
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import java.nio.file.FileAlreadyExistsException
|
import java.nio.file.FileAlreadyExistsException
|
||||||
@ -74,7 +75,8 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
|
|||||||
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
|
// AbstractNode. It should be possible to generate the NodeInfo outside of AbstractNode, so it can be passed in.
|
||||||
abstract class AbstractNode(open val configuration: NodeConfiguration,
|
abstract class AbstractNode(open val configuration: NodeConfiguration,
|
||||||
val advertisedServices: Set<ServiceInfo>,
|
val advertisedServices: Set<ServiceInfo>,
|
||||||
val platformClock: Clock) : SingletonSerializeAsToken() {
|
val platformClock: Clock,
|
||||||
|
@VisibleForTesting val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
|
||||||
companion object {
|
companion object {
|
||||||
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
|
val PRIVATE_KEY_FILE_NAME = "identity-private-key"
|
||||||
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
|
val PUBLIC_IDENTITY_FILE_NAME = "identity-public"
|
||||||
@ -219,7 +221,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
|||||||
keyManagement = makeKeyManagementService()
|
keyManagement = makeKeyManagementService()
|
||||||
api = APIServerImpl(this@AbstractNode)
|
api = APIServerImpl(this@AbstractNode)
|
||||||
flowLogicFactory = initialiseFlowLogicFactory()
|
flowLogicFactory = initialiseFlowLogicFactory()
|
||||||
scheduler = NodeSchedulerService(database, services, flowLogicFactory)
|
scheduler = NodeSchedulerService(database, services, flowLogicFactory, unfinishedSchedules = busyNodeLatch)
|
||||||
|
|
||||||
val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler)
|
val tokenizableServices = mutableListOf(storage, net, vault, keyManagement, identity, platformClock, scheduler)
|
||||||
|
|
||||||
@ -237,7 +239,8 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
|
|||||||
listOf(tokenizableServices),
|
listOf(tokenizableServices),
|
||||||
checkpointStorage,
|
checkpointStorage,
|
||||||
serverThread,
|
serverThread,
|
||||||
database)
|
database,
|
||||||
|
busyNodeLatch)
|
||||||
if (serverThread is ExecutorService) {
|
if (serverThread is ExecutorService) {
|
||||||
runOnStop += Runnable {
|
runOnStop += Runnable {
|
||||||
// We wait here, even though any in-flight messages should have been drained away because the
|
// We wait here, even though any in-flight messages should have been drained away because the
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package net.corda.node.services.events
|
package net.corda.node.services.events
|
||||||
|
|
||||||
import co.paralleluniverse.fibers.Suspendable
|
import co.paralleluniverse.fibers.Suspendable
|
||||||
import com.google.common.annotations.VisibleForTesting
|
|
||||||
import com.google.common.util.concurrent.SettableFuture
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
import kotlinx.support.jdk8.collections.compute
|
import kotlinx.support.jdk8.collections.compute
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.ThreadBox
|
||||||
@ -48,7 +47,8 @@ import javax.annotation.concurrent.ThreadSafe
|
|||||||
class NodeSchedulerService(private val database: Database,
|
class NodeSchedulerService(private val database: Database,
|
||||||
private val services: ServiceHubInternal,
|
private val services: ServiceHubInternal,
|
||||||
private val flowLogicRefFactory: FlowLogicRefFactory,
|
private val flowLogicRefFactory: FlowLogicRefFactory,
|
||||||
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor())
|
private val schedulerTimerExecutor: Executor = Executors.newSingleThreadExecutor(),
|
||||||
|
private val unfinishedSchedules: ReusableLatch = ReusableLatch())
|
||||||
: SchedulerService, SingletonSerializeAsToken() {
|
: SchedulerService, SingletonSerializeAsToken() {
|
||||||
|
|
||||||
private val log = loggerFor<NodeSchedulerService>()
|
private val log = loggerFor<NodeSchedulerService>()
|
||||||
@ -89,9 +89,6 @@ class NodeSchedulerService(private val database: Database,
|
|||||||
|
|
||||||
private val mutex = ThreadBox(InnerState())
|
private val mutex = ThreadBox(InnerState())
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
val unfinishedSchedules = ReusableLatch()
|
|
||||||
|
|
||||||
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
|
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
|
||||||
fun start() {
|
fun start() {
|
||||||
mutex.locked {
|
mutex.locked {
|
||||||
@ -124,13 +121,13 @@ class NodeSchedulerService(private val database: Database,
|
|||||||
val removedAction = scheduledStates.remove(ref)
|
val removedAction = scheduledStates.remove(ref)
|
||||||
if (removedAction != null) {
|
if (removedAction != null) {
|
||||||
unfinishedSchedules.countDown()
|
unfinishedSchedules.countDown()
|
||||||
}
|
if (removedAction == earliestState) {
|
||||||
if (removedAction == earliestState && removedAction != null) {
|
|
||||||
recomputeEarliest()
|
recomputeEarliest()
|
||||||
rescheduleWakeUp()
|
rescheduleWakeUp()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method first cancels the [Future] for any pending action so that the [awaitWithDeadline] used below
|
* This method first cancels the [Future] for any pending action so that the [awaitWithDeadline] used below
|
||||||
@ -182,6 +179,7 @@ class NodeSchedulerService(private val database: Database,
|
|||||||
val scheduledLogic: FlowLogic<*>? = getScheduledLogic()
|
val scheduledLogic: FlowLogic<*>? = getScheduledLogic()
|
||||||
if (scheduledLogic != null) {
|
if (scheduledLogic != null) {
|
||||||
subFlow(scheduledLogic)
|
subFlow(scheduledLogic)
|
||||||
|
scheduler.unfinishedSchedules.countDown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -215,10 +213,7 @@ class NodeSchedulerService(private val database: Database,
|
|||||||
// TODO: FlowLogicRefFactory needs to sort out the class loader etc
|
// TODO: FlowLogicRefFactory needs to sort out the class loader etc
|
||||||
val logic = scheduler.flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
|
val logic = scheduler.flowLogicRefFactory.toFlowLogic(scheduledActivity.logicRef)
|
||||||
logger.trace { "Scheduler starting FlowLogic $logic" }
|
logger.trace { "Scheduler starting FlowLogic $logic" }
|
||||||
// FlowLogic will be checkpointed by the time this returns.
|
|
||||||
//scheduler.services.startFlowAndForget(logic)
|
|
||||||
scheduledLogic = logic
|
scheduledLogic = logic
|
||||||
scheduler.unfinishedSchedules.countDown()
|
|
||||||
null
|
null
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -19,7 +19,6 @@ import net.corda.node.services.statemachine.StateMachineManager.FlowSession
|
|||||||
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState
|
import net.corda.node.services.statemachine.StateMachineManager.FlowSessionState
|
||||||
import net.corda.node.utilities.StrandLocalTransactionManager
|
import net.corda.node.utilities.StrandLocalTransactionManager
|
||||||
import net.corda.node.utilities.createDatabaseTransaction
|
import net.corda.node.utilities.createDatabaseTransaction
|
||||||
import net.corda.node.utilities.databaseTransaction
|
|
||||||
import org.jetbrains.exposed.sql.Database
|
import org.jetbrains.exposed.sql.Database
|
||||||
import org.jetbrains.exposed.sql.Transaction
|
import org.jetbrains.exposed.sql.Transaction
|
||||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||||
@ -87,14 +86,12 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
logic.call()
|
logic.call()
|
||||||
} catch (t: Throwable) {
|
} catch (t: Throwable) {
|
||||||
actionOnEnd()
|
actionOnEnd()
|
||||||
commitTransaction()
|
|
||||||
_resultFuture?.setException(t)
|
_resultFuture?.setException(t)
|
||||||
throw ExecutionException(t)
|
throw ExecutionException(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is to prevent actionOnEnd being called twice if it throws an exception
|
// This is to prevent actionOnEnd being called twice if it throws an exception
|
||||||
actionOnEnd()
|
actionOnEnd()
|
||||||
commitTransaction()
|
|
||||||
_resultFuture?.set(result)
|
_resultFuture?.set(result)
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
@ -270,9 +267,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
|
|
||||||
private fun processException(t: Throwable) {
|
private fun processException(t: Throwable) {
|
||||||
// This can get called in actionOnSuspend *after* we commit the database transaction, so optionally open a new one here.
|
// This can get called in actionOnSuspend *after* we commit the database transaction, so optionally open a new one here.
|
||||||
databaseTransaction(database) {
|
createDatabaseTransaction(database)
|
||||||
actionOnEnd()
|
actionOnEnd()
|
||||||
}
|
|
||||||
_resultFuture?.setException(t)
|
_resultFuture?.setException(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,7 +6,6 @@ import co.paralleluniverse.io.serialization.kryo.KryoSerializer
|
|||||||
import co.paralleluniverse.strands.Strand
|
import co.paralleluniverse.strands.Strand
|
||||||
import com.codahale.metrics.Gauge
|
import com.codahale.metrics.Gauge
|
||||||
import com.esotericsoftware.kryo.Kryo
|
import com.esotericsoftware.kryo.Kryo
|
||||||
import com.google.common.annotations.VisibleForTesting
|
|
||||||
import com.google.common.util.concurrent.ListenableFuture
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
import kotlinx.support.jdk8.collections.removeIf
|
import kotlinx.support.jdk8.collections.removeIf
|
||||||
import net.corda.core.ThreadBox
|
import net.corda.core.ThreadBox
|
||||||
@ -69,7 +68,8 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
tokenizableServices: List<Any>,
|
tokenizableServices: List<Any>,
|
||||||
val checkpointStorage: CheckpointStorage,
|
val checkpointStorage: CheckpointStorage,
|
||||||
val executor: AffinityExecutor,
|
val executor: AffinityExecutor,
|
||||||
val database: Database) {
|
val database: Database,
|
||||||
|
private val unfinishedFibers: ReusableLatch = ReusableLatch()) {
|
||||||
|
|
||||||
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
|
inner class FiberScheduler : FiberExecutorScheduler("Same thread scheduler", executor)
|
||||||
|
|
||||||
@ -102,8 +102,7 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
@Volatile private var stopping = false
|
@Volatile private var stopping = false
|
||||||
// How many Fibers are running and not suspended. If zero and stopping is true, then we are halted.
|
// How many Fibers are running and not suspended. If zero and stopping is true, then we are halted.
|
||||||
private val liveFibers = ReusableLatch()
|
private val liveFibers = ReusableLatch()
|
||||||
@VisibleForTesting
|
|
||||||
val unfinishedFibers = ReusableLatch()
|
|
||||||
|
|
||||||
// Monitoring support.
|
// Monitoring support.
|
||||||
private val metrics = serviceHub.monitoringService.metrics
|
private val metrics = serviceHub.monitoringService.metrics
|
||||||
@ -336,13 +335,14 @@ class StateMachineManager(val serviceHub: ServiceHubInternal,
|
|||||||
fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE
|
fiber.logic.progressTracker?.currentStep = ProgressTracker.DONE
|
||||||
mutex.locked {
|
mutex.locked {
|
||||||
stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) }
|
stateMachines.remove(fiber)?.let { checkpointStorage.removeCheckpoint(it) }
|
||||||
totalFinishedFlows.inc()
|
|
||||||
unfinishedFibers.countDown()
|
|
||||||
notifyChangeObservers(fiber, AddOrRemove.REMOVE)
|
notifyChangeObservers(fiber, AddOrRemove.REMOVE)
|
||||||
}
|
}
|
||||||
endAllFiberSessions(fiber)
|
endAllFiberSessions(fiber)
|
||||||
} finally {
|
} finally {
|
||||||
|
fiber.commitTransaction()
|
||||||
decrementLiveFibers()
|
decrementLiveFibers()
|
||||||
|
totalFinishedFlows.inc()
|
||||||
|
unfinishedFibers.countDown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mutex.locked {
|
mutex.locked {
|
||||||
|
@ -4,7 +4,6 @@ import co.paralleluniverse.fibers.Suspendable
|
|||||||
import net.corda.core.contracts.*
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.crypto.CompositeKey
|
import net.corda.core.crypto.CompositeKey
|
||||||
import net.corda.core.crypto.Party
|
import net.corda.core.crypto.Party
|
||||||
import net.corda.core.crypto.SecureHash
|
|
||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.flows.FlowLogicRefFactory
|
import net.corda.core.flows.FlowLogicRefFactory
|
||||||
import net.corda.core.node.CordaPluginRegistry
|
import net.corda.core.node.CordaPluginRegistry
|
||||||
@ -20,7 +19,6 @@ import net.corda.testing.node.MockNetwork
|
|||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Assert.assertTrue
|
import org.junit.Assert.assertTrue
|
||||||
import org.junit.Before
|
import org.junit.Before
|
||||||
import org.junit.Ignore
|
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.security.PublicKey
|
import java.security.PublicKey
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
@ -128,10 +126,7 @@ class ScheduledFlowTests {
|
|||||||
assertTrue("Must be processed", stateFromB.state.data.processed)
|
assertTrue("Must be processed", stateFromB.state.data.processed)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Ignore
|
|
||||||
@Test
|
@Test
|
||||||
// TODO I need to investigate why we get very very occasional SessionInit failures
|
|
||||||
// during notarisation.
|
|
||||||
fun `Run a whole batch of scheduled flows`() {
|
fun `Run a whole batch of scheduled flows`() {
|
||||||
val N = 100
|
val N = 100
|
||||||
for (i in 0..N - 1) {
|
for (i in 0..N - 1) {
|
||||||
|
@ -44,7 +44,8 @@ import kotlin.concurrent.thread
|
|||||||
@ThreadSafe
|
@ThreadSafe
|
||||||
class InMemoryMessagingNetwork(
|
class InMemoryMessagingNetwork(
|
||||||
val sendManuallyPumped: Boolean,
|
val sendManuallyPumped: Boolean,
|
||||||
val servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random()
|
val servicePeerAllocationStrategy: ServicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.Random(),
|
||||||
|
private val messagesInFlight: ReusableLatch = ReusableLatch()
|
||||||
) : SingletonSerializeAsToken() {
|
) : SingletonSerializeAsToken() {
|
||||||
companion object {
|
companion object {
|
||||||
val MESSAGES_LOG_NAME = "messages"
|
val MESSAGES_LOG_NAME = "messages"
|
||||||
@ -78,8 +79,6 @@ class InMemoryMessagingNetwork(
|
|||||||
// Holds the mapping from services to peers advertising the service.
|
// Holds the mapping from services to peers advertising the service.
|
||||||
private val serviceToPeersMapping = HashMap<ServiceHandle, LinkedHashSet<PeerHandle>>()
|
private val serviceToPeersMapping = HashMap<ServiceHandle, LinkedHashSet<PeerHandle>>()
|
||||||
|
|
||||||
val messagesInFlight = ReusableLatch()
|
|
||||||
|
|
||||||
@Suppress("unused") // Used by the visualiser tool.
|
@Suppress("unused") // Used by the visualiser tool.
|
||||||
/** A stream of (sender, message, recipients) triples */
|
/** A stream of (sender, message, recipients) triples */
|
||||||
val receivedMessages: Observable<MessageTransfer>
|
val receivedMessages: Observable<MessageTransfer>
|
||||||
|
@ -26,6 +26,7 @@ import net.corda.node.services.vault.NodeVaultService
|
|||||||
import net.corda.node.utilities.AffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor
|
||||||
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor.ServiceAffinityExecutor
|
||||||
import net.corda.testing.TestNodeConfiguration
|
import net.corda.testing.TestNodeConfiguration
|
||||||
|
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import java.nio.file.FileSystem
|
import java.nio.file.FileSystem
|
||||||
import java.security.KeyPair
|
import java.security.KeyPair
|
||||||
@ -53,7 +54,8 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
|||||||
private val defaultFactory: Factory = MockNetwork.DefaultFactory) {
|
private val defaultFactory: Factory = MockNetwork.DefaultFactory) {
|
||||||
private var nextNodeId = 0
|
private var nextNodeId = 0
|
||||||
val filesystem: FileSystem = Jimfs.newFileSystem(unix())
|
val filesystem: FileSystem = Jimfs.newFileSystem(unix())
|
||||||
val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped, servicePeerAllocationStrategy)
|
private val busyLatch: ReusableLatch = ReusableLatch()
|
||||||
|
val messagingNetwork = InMemoryMessagingNetwork(networkSendManuallyPumped, servicePeerAllocationStrategy, busyLatch)
|
||||||
|
|
||||||
// A unique identifier for this network to segregate databases with the same nodeID but different networks.
|
// A unique identifier for this network to segregate databases with the same nodeID but different networks.
|
||||||
private val networkId = random63BitValue()
|
private val networkId = random63BitValue()
|
||||||
@ -111,7 +113,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
|||||||
override val networkMapAddress: SingleMessageRecipient?,
|
override val networkMapAddress: SingleMessageRecipient?,
|
||||||
advertisedServices: Set<ServiceInfo>,
|
advertisedServices: Set<ServiceInfo>,
|
||||||
val id: Int,
|
val id: Int,
|
||||||
val keyPair: KeyPair?) : AbstractNode(config, advertisedServices, TestClock()) {
|
val keyPair: KeyPair?) : AbstractNode(config, advertisedServices, TestClock(), mockNet.busyLatch) {
|
||||||
override val log: Logger = loggerFor<MockNode>()
|
override val log: Logger = loggerFor<MockNode>()
|
||||||
override val serverThread: AffinityExecutor =
|
override val serverThread: AffinityExecutor =
|
||||||
if (mockNet.threadPerNode)
|
if (mockNet.threadPerNode)
|
||||||
@ -291,17 +293,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
|
|||||||
|
|
||||||
// Test method to block until all scheduled activity, active flows
|
// Test method to block until all scheduled activity, active flows
|
||||||
// and network activity has ceased.
|
// and network activity has ceased.
|
||||||
// TODO This is not perfect in that certain orderings my skip over the scanning loop.
|
|
||||||
// However, in practice it works well for testing of scheduled flows.
|
|
||||||
fun waitQuiescent() {
|
fun waitQuiescent() {
|
||||||
while(nodes.any { it.smm.unfinishedFibers.count > 0
|
busyLatch.await()
|
||||||
|| it.scheduler.unfinishedSchedules.count > 0}
|
|
||||||
|| messagingNetwork.messagesInFlight.count > 0) {
|
|
||||||
for (node in nodes) {
|
|
||||||
node.smm.unfinishedFibers.await()
|
|
||||||
node.scheduler.unfinishedSchedules.await()
|
|
||||||
}
|
|
||||||
messagingNetwork.messagesInFlight.await()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user