CORDA-3549: Re-write flaky CordaServiceLifecycleFatalTests (#5879)

* CORDA-3549: Improve stability of `CordaServiceLifecycleFatalTests`

* CORDA-3549: Bump-up reps count to ensure that test is definitely not flaky when executed by CI
(once proved the number of reps will be reduced)

* CORDA-3549: Making Detekt happier

* CORDA-2942: Ensure `NodeLifecycleEventsDistributor` cleans-up smoothly when node shuts down
This commit is contained in:
Viktor Kolomeyko
2020-01-23 16:35:49 +00:00
committed by Matthew Nesbit
parent c1909722f6
commit b72f71e7ac
5 changed files with 105 additions and 56 deletions

View File

@ -7,6 +7,7 @@ import net.corda.core.internal.concurrent.openFuture
import net.corda.core.node.services.CordaServiceCriticalFailureException
import net.corda.core.utilities.Try
import net.corda.core.utilities.contextLogger
import java.io.Closeable
import java.util.Collections.singleton
import java.util.LinkedList
import java.util.concurrent.Executors
@ -23,7 +24,7 @@ import kotlin.system.exitProcess
*
* The class is safe for concurrent use from multiple threads.
*/
class NodeLifecycleEventsDistributor {
class NodeLifecycleEventsDistributor : Closeable {
companion object {
private val log = contextLogger()
@ -43,7 +44,7 @@ class NodeLifecycleEventsDistributor {
private val readWriteLock: ReadWriteLock = ReentrantReadWriteLock()
private val executor = Executors.newSingleThreadExecutor(
ThreadFactoryBuilder().setNameFormat("NodeLifecycleEventsDistributor-%d").build())
ThreadFactoryBuilder().setNameFormat("NodeLifecycleEventsDistributor-%d").setDaemon(true).build())
/**
* Adds observer to the distribution list.
@ -88,19 +89,24 @@ class NodeLifecycleEventsDistributor {
val result = openFuture<Any?>()
executor.execute {
val orderedSnapshot = if (event.reversedPriority) snapshot.reversed() else snapshot
orderedSnapshot.forEach {
log.debug("Distributing event $event to: $it")
val updateResult = it.update(event)
if (updateResult.isSuccess) {
log.debug("Event $event distribution outcome: $updateResult")
} else {
log.error("Failed to distribute event $event, failure outcome: $updateResult")
handlePossibleFatalTermination(event, updateResult as Try.Failure<String>)
}
}
if(executor.isShutdown || executor.isTerminated) {
log.warn("Not distributing $event as executor been already shutdown. Double close() case?")
result.set(null)
} else {
executor.execute {
val orderedSnapshot = if (event.reversedPriority) snapshot.reversed() else snapshot
orderedSnapshot.forEach {
log.debug("Distributing event $event to: $it")
val updateResult = it.update(event)
if (updateResult.isSuccess) {
log.debug("Event $event distribution outcome: $updateResult")
} else {
log.error("Failed to distribute event $event, failure outcome: $updateResult")
handlePossibleFatalTermination(event, updateResult as Try.Failure<String>)
}
}
result.set(null)
}
}
return result.map { }
}
@ -114,6 +120,10 @@ class NodeLifecycleEventsDistributor {
}
}
override fun close() {
executor.shutdown()
}
/**
* Custom implementation vs. using [kotlin.concurrent.withLock] to allow interruption during lock acquisition.
*/

View File

@ -15,8 +15,6 @@ internal class NodeLifecycleEventsDistributorMultiThreadedTest {
private val logger = contextLogger()
}
private val instance = NodeLifecycleEventsDistributor()
private val addedCounter = AtomicLong()
private val eventsDeliveredCounter = AtomicLong()
@ -24,29 +22,32 @@ internal class NodeLifecycleEventsDistributorMultiThreadedTest {
@Test
fun addAndDistributeConcurrently() {
val initialObserversCount = 10
repeat(initialObserversCount) { instance.add(MyObserver(it)) }
NodeLifecycleEventsDistributor().use { instance ->
val operationsCount = 100_000
val event = NodeLifecycleEvent.BeforeNodeStart(mock())
val additionFreq = 1000
val distributionFutures = (1..operationsCount).stream(true).mapToObj {
if(it % additionFreq == 0) {
logger.debug("Adding observer")
instance.add(MyObserver(it))
addedCounter.incrementAndGet()
logger.info("Progress so far: $it")
val initialObserversCount = 10
repeat(initialObserversCount) { instance.add(MyObserver(it)) }
val operationsCount = 100_000
val event = NodeLifecycleEvent.BeforeNodeStart(mock())
val additionFreq = 1000
val distributionFutures = (1..operationsCount).stream(true).mapToObj {
if (it % additionFreq == 0) {
logger.debug("Adding observer")
instance.add(MyObserver(it))
addedCounter.incrementAndGet()
logger.info("Progress so far: $it")
}
logger.debug("Distributing event")
instance.distributeEvent(event)
}
logger.debug("Distributing event")
instance.distributeEvent(event)
}
distributionFutures.forEach { it.get() }
distributionFutures.forEach { it.get() }
with(eventsDeliveredCounter.get()) {
// Greater than original observers times events
assertTrue("$this") { this > initialObserversCount.toLong() * operationsCount }
// Less than ever added observers times events
assertTrue("$this") { this < (initialObserversCount.toLong() + addedCounter.get()) * operationsCount }
with(eventsDeliveredCounter.get()) {
// Greater than original observers times events
assertTrue("$this") { this > initialObserversCount.toLong() * operationsCount }
// Less than ever added observers times events
assertTrue("$this") { this < (initialObserversCount.toLong() + addedCounter.get()) * operationsCount }
}
}
}