mirror of
https://github.com/corda/corda.git
synced 2025-06-11 03:41:41 +00:00
CORDA-1599 Fix a small race we have with waiting for mock network to … (#3348)
* CORDA-1599 Fix a small race we have with waiting for mock network to become inactive that is affecting tests. * Missed println
This commit is contained in:
parent
0f3453d1c7
commit
dd564cfc79
@ -27,6 +27,7 @@ import net.corda.nodeapi.internal.persistence.CordaPersistence
|
|||||||
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
|
||||||
import net.corda.nodeapi.internal.persistence.contextTransaction
|
import net.corda.nodeapi.internal.persistence.contextTransaction
|
||||||
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
|
import net.corda.nodeapi.internal.persistence.contextTransactionOrNull
|
||||||
|
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import org.slf4j.MDC
|
import org.slf4j.MDC
|
||||||
@ -68,7 +69,8 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
val actionExecutor: ActionExecutor,
|
val actionExecutor: ActionExecutor,
|
||||||
val stateMachine: StateMachine,
|
val stateMachine: StateMachine,
|
||||||
val serviceHub: ServiceHubInternal,
|
val serviceHub: ServiceHubInternal,
|
||||||
val checkpointSerializationContext: SerializationContext
|
val checkpointSerializationContext: SerializationContext,
|
||||||
|
val unfinishedFibers: ReusableLatch
|
||||||
)
|
)
|
||||||
|
|
||||||
internal var transientValues: TransientReference<TransientValues>? = null
|
internal var transientValues: TransientReference<TransientValues>? = null
|
||||||
@ -239,6 +241,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
|
|||||||
}
|
}
|
||||||
|
|
||||||
recordDuration(startTime)
|
recordDuration(startTime)
|
||||||
|
getTransientField(TransientValues::unfinishedFibers).countDown()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Suspendable
|
@Suspendable
|
||||||
|
@ -13,20 +13,12 @@ import net.corda.core.flows.FlowInfo
|
|||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.internal.FlowStateMachine
|
import net.corda.core.internal.*
|
||||||
import net.corda.core.internal.ThreadBox
|
|
||||||
import net.corda.core.internal.TimedFlow
|
|
||||||
import net.corda.core.internal.bufferUntilSubscribed
|
|
||||||
import net.corda.core.internal.castIfPossible
|
|
||||||
import net.corda.core.internal.concurrent.OpenFuture
|
import net.corda.core.internal.concurrent.OpenFuture
|
||||||
import net.corda.core.internal.concurrent.map
|
import net.corda.core.internal.concurrent.map
|
||||||
import net.corda.core.internal.concurrent.openFuture
|
import net.corda.core.internal.concurrent.openFuture
|
||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.serialization.SerializationContext
|
import net.corda.core.serialization.*
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
|
||||||
import net.corda.core.serialization.SerializedBytes
|
|
||||||
import net.corda.core.serialization.deserialize
|
|
||||||
import net.corda.core.serialization.serialize
|
|
||||||
import net.corda.core.utilities.ProgressTracker
|
import net.corda.core.utilities.ProgressTracker
|
||||||
import net.corda.core.utilities.Try
|
import net.corda.core.utilities.Try
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
@ -38,11 +30,7 @@ import net.corda.node.services.config.shouldCheckCheckpoints
|
|||||||
import net.corda.node.services.messaging.DeduplicationHandler
|
import net.corda.node.services.messaging.DeduplicationHandler
|
||||||
import net.corda.node.services.messaging.ReceivedMessage
|
import net.corda.node.services.messaging.ReceivedMessage
|
||||||
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
|
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
|
||||||
import net.corda.node.services.statemachine.interceptors.DumpHistoryOnErrorInterceptor
|
import net.corda.node.services.statemachine.interceptors.*
|
||||||
import net.corda.node.services.statemachine.interceptors.FiberDeserializationChecker
|
|
||||||
import net.corda.node.services.statemachine.interceptors.FiberDeserializationCheckingInterceptor
|
|
||||||
import net.corda.node.services.statemachine.interceptors.HospitalisingInterceptor
|
|
||||||
import net.corda.node.services.statemachine.interceptors.PrintingInterceptor
|
|
||||||
import net.corda.node.services.statemachine.transitions.StateMachine
|
import net.corda.node.services.statemachine.transitions.StateMachine
|
||||||
import net.corda.node.utilities.AffinityExecutor
|
import net.corda.node.utilities.AffinityExecutor
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
@ -54,16 +42,10 @@ import rx.Observable
|
|||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import java.security.SecureRandom
|
import java.security.SecureRandom
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.*
|
||||||
import java.util.concurrent.ExecutorService
|
|
||||||
import java.util.concurrent.Executors
|
|
||||||
import java.util.concurrent.ScheduledFuture
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
import kotlin.collections.ArrayList
|
import kotlin.collections.ArrayList
|
||||||
import kotlin.collections.HashMap
|
import kotlin.collections.HashMap
|
||||||
import kotlin.concurrent.withLock
|
|
||||||
import kotlin.streams.toList
|
import kotlin.streams.toList
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -228,7 +210,6 @@ class SingleThreadedStateMachineManager(
|
|||||||
logger.debug("Killing flow known to physical node.")
|
logger.debug("Killing flow known to physical node.")
|
||||||
decrementLiveFibers()
|
decrementLiveFibers()
|
||||||
totalFinishedFlows.inc()
|
totalFinishedFlows.inc()
|
||||||
unfinishedFibers.countDown()
|
|
||||||
try {
|
try {
|
||||||
flow.fiber.interrupt()
|
flow.fiber.interrupt()
|
||||||
true
|
true
|
||||||
@ -237,6 +218,7 @@ class SingleThreadedStateMachineManager(
|
|||||||
checkpointStorage.removeCheckpoint(id)
|
checkpointStorage.removeCheckpoint(id)
|
||||||
}
|
}
|
||||||
transitionExecutor.forceRemoveFlow(id)
|
transitionExecutor.forceRemoveFlow(id)
|
||||||
|
unfinishedFibers.countDown()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO replace with a clustered delete after we'll support clustered nodes
|
// TODO replace with a clustered delete after we'll support clustered nodes
|
||||||
@ -280,7 +262,6 @@ class SingleThreadedStateMachineManager(
|
|||||||
if (flow != null) {
|
if (flow != null) {
|
||||||
decrementLiveFibers()
|
decrementLiveFibers()
|
||||||
totalFinishedFlows.inc()
|
totalFinishedFlows.inc()
|
||||||
unfinishedFibers.countDown()
|
|
||||||
return when (removalReason) {
|
return when (removalReason) {
|
||||||
is FlowRemovalReason.OrderlyFinish -> removeFlowOrderly(flow, removalReason, lastState)
|
is FlowRemovalReason.OrderlyFinish -> removeFlowOrderly(flow, removalReason, lastState)
|
||||||
is FlowRemovalReason.ErrorFinish -> removeFlowError(flow, removalReason, lastState)
|
is FlowRemovalReason.ErrorFinish -> removeFlowError(flow, removalReason, lastState)
|
||||||
@ -661,7 +642,8 @@ class SingleThreadedStateMachineManager(
|
|||||||
actionExecutor = actionExecutor!!,
|
actionExecutor = actionExecutor!!,
|
||||||
stateMachine = StateMachine(id, secureRandom),
|
stateMachine = StateMachine(id, secureRandom),
|
||||||
serviceHub = serviceHub,
|
serviceHub = serviceHub,
|
||||||
checkpointSerializationContext = checkpointSerializationContext!!
|
checkpointSerializationContext = checkpointSerializationContext!!,
|
||||||
|
unfinishedFibers = unfinishedFibers
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,6 +105,8 @@ class RetryFlowMockTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `Patient records do not leak in hospital when using killFlow`() {
|
fun `Patient records do not leak in hospital when using killFlow`() {
|
||||||
|
// Make sure we have seen an update from the hospital, and thus the flow went there.
|
||||||
|
val records = nodeA.smm.flowHospital.track().updates.toBlocking().toIterable().iterator()
|
||||||
val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
|
val flow: FlowStateMachine<Unit> = nodeA.services.startFlow(FinalityHandler(object : FlowSession() {
|
||||||
override val counterparty: Party
|
override val counterparty: Party
|
||||||
get() = TODO("not implemented")
|
get() = TODO("not implemented")
|
||||||
@ -141,8 +143,9 @@ class RetryFlowMockTest {
|
|||||||
TODO("not implemented")
|
TODO("not implemented")
|
||||||
}
|
}
|
||||||
}), nodeA.services.newContext()).get()
|
}), nodeA.services.newContext()).get()
|
||||||
// Make sure we have seen an update from the hospital, and thus the flow went there.
|
// Should be 2 records, one for admission and one for keep in.
|
||||||
nodeA.smm.flowHospital.track().updates.toBlocking().first()
|
records.next()
|
||||||
|
records.next()
|
||||||
// Killing it should remove it.
|
// Killing it should remove it.
|
||||||
nodeA.smm.killFlow(flow.id)
|
nodeA.smm.killFlow(flow.id)
|
||||||
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
|
assertThat(nodeA.smm.flowHospital.track().snapshot).isEmpty()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user