From a2d65f1a6bb32cafb6380b1dea6567670c0fb28d Mon Sep 17 00:00:00 2001 From: Viktor Kolomeyko Date: Fri, 16 Mar 2018 10:55:48 +0000 Subject: [PATCH] Improve error reporting in case of failure of flaky tests (#2828) * Improve error reporting in case of failure of flaky tests Also eliminate warnings and other minor changes. * Address code review comments raised by @exFalso Also improve output for stack traces. --- .../net/corda/client/rpc/RPCStabilityTests.kt | 72 +++++++++---------- 1 file changed, 36 insertions(+), 36 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 246358999c..6597df2915 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -15,6 +15,7 @@ import net.corda.nodeapi.RPCApi import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.internal.testThreadFactory import net.corda.testing.node.internal.* +import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.core.SimpleString import org.junit.After import org.junit.Assert.assertEquals @@ -46,15 +47,15 @@ class RPCStabilityTests { override val protocolVersion = 0 } - private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Int { - val values = ConcurrentLinkedQueue() + private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Map> { + val values = ConcurrentLinkedQueue>>() return poll(executorService, "number of threads to become stable", 250.millis) { - values.add(Thread.activeCount()) + values.add(Thread.getAllStackTraces().mapValues { it.value.toList() }) if (values.size > 5) { values.poll() } val first = values.peek() - if (values.size == 5 && values.all { it == first }) { + if (values.size == 5 && values.all { it.keys.size == first.keys.size }) { first } else { null @@ -64,30 +65,39 @@ class RPCStabilityTests { @Test fun `client and server dont leak threads`() { - val executor = Executors.newScheduledThreadPool(1) fun startAndStop() { rpcDriver { val server = startRpcServer(ops = DummyOps).get() startRpcClient(server.broker.hostAndPort!!).get() } } - repeat(5) { - startAndStop() + + runBlockAndCheckThreads(::startAndStop) + } + + private fun runBlockAndCheckThreads(block: () -> Unit) { + val executor = Executors.newScheduledThreadPool(1) + + try { + // Warm-up so that all the thread pools & co. created + block() + + val threadsBefore = waitUntilNumberOfThreadsStable(executor) + repeat(5) { + block() + } + val threadsAfter = waitUntilNumberOfThreadsStable(executor) + // This is a less than check because threads from other tests may be shutting down while this test is running. + // This is therefore a "best effort" check. When this test is run on its own this should be a strict equality. + // In case of failure we output the threads along with their stacktraces to get an idea what was running at a time. + assert(threadsBefore.keys.size >= threadsAfter.keys.size, { "threadsBefore: $threadsBefore\nthreadsAfter: $threadsAfter" }) + } finally { + executor.shutdownNow() } - val numberOfThreadsBefore = waitUntilNumberOfThreadsStable(executor) - repeat(5) { - startAndStop() - } - val numberOfThreadsAfter = waitUntilNumberOfThreadsStable(executor) - // This is a less than check because threads from other tests may be shutting down while this test is running. - // This is therefore a "best effort" check. When this test is run on its own this should be a strict equality. - assertTrue(numberOfThreadsBefore >= numberOfThreadsAfter) - executor.shutdownNow() } @Test fun `client doesnt leak threads when it fails to start`() { - val executor = Executors.newScheduledThreadPool(1) fun startAndStop() { rpcDriver { Try.on { startRpcClient(NetworkHostAndPort("localhost", 9999)).get() } @@ -100,20 +110,10 @@ class RPCStabilityTests { } } } - repeat(5) { - startAndStop() - } - val numberOfThreadsBefore = waitUntilNumberOfThreadsStable(executor) - repeat(5) { - startAndStop() - } - val numberOfThreadsAfter = waitUntilNumberOfThreadsStable(executor) - - assertTrue(numberOfThreadsBefore >= numberOfThreadsAfter) - executor.shutdownNow() + runBlockAndCheckThreads(::startAndStop) } - fun RpcBrokerHandle.getStats(): Map { + private fun RpcBrokerHandle.getStats(): Map { return serverControl.run { mapOf( "connections" to listConnectionIDs().toSet(), @@ -211,14 +211,14 @@ class RPCStabilityTests { val server = startRpcServer(ops = leakObservableOpsImpl) val proxy = startRpcClient(server.get().broker.hostAndPort!!).get() // Leak many observables - val N = 200 - (1..N).map { + val count = 200 + (1..count).map { pool.fork { proxy.leakObservable(); Unit } }.transpose().getOrThrow() // In a loop force GC and check whether the server is notified while (true) { System.gc() - if (leakObservableOpsImpl.leakedUnsubscribedCount.get() == N) break + if (leakObservableOpsImpl.leakedUnsubscribedCount.get() == count) break Thread.sleep(100) } } @@ -361,11 +361,11 @@ class RPCStabilityTests { clients[0].destroyForcibly() pollUntilClientNumber(server, numberOfClients - 1) // Kill the rest - (1..numberOfClients - 1).forEach { + (1 until numberOfClients).forEach { clients[it].destroyForcibly() } pollUntilClientNumber(server, 0) - // Now poll until the server detects the disconnects and unsubscribes from all obserables. + // Now poll until the server detects the disconnects and un-subscribes from all observables. pollUntilTrue("number of times subscribe() has been called") { trackSubscriberOpsImpl.subscriberCount.get() == 0 }.get() } } @@ -391,7 +391,7 @@ class RPCStabilityTests { // Construct an RPC session manually so that we can hang in the message handler val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" val session = startArtemisSession(server.broker.hostAndPort!!) - session.createTemporaryQueue(myQueue, myQueue) + session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue) val consumer = session.createConsumer(myQueue, null, -1, -1, false) consumer.setMessageHandler { Thread.sleep(50) // 5x slower than the server producer @@ -428,7 +428,7 @@ class RPCStabilityTests { // Construct an RPC client session manually val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" val session = startArtemisSession(server.broker.hostAndPort!!) - session.createTemporaryQueue(myQueue, myQueue) + session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue) val consumer = session.createConsumer(myQueue, null, -1, -1, false) val replies = ArrayList() consumer.setMessageHandler {