From cbef5d6a2ddf57f3d70de939f18fa02ddd79522b Mon Sep 17 00:00:00 2001 From: Rick Parker Date: Mon, 21 Aug 2017 13:05:25 +0100 Subject: [PATCH] Fix flakey RPC tests (#1292) --- .../corda/kotlin/rpc/StandaloneCordaRPClientTest.kt | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt index 79d0117557..c23b9b533b 100644 --- a/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt +++ b/client/rpc/src/smoke-test/kotlin/net/corda/kotlin/rpc/StandaloneCordaRPClientTest.kt @@ -34,6 +34,7 @@ import java.io.FilterInputStream import java.io.InputStream import java.nio.file.Paths import java.util.* +import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.AtomicInteger import kotlin.streams.toList import kotlin.test.assertEquals @@ -119,11 +120,14 @@ class StandaloneCordaRPClientTest { val handle = rpcProxy.startTrackedFlow( ::CashIssueFlow, 429.DOLLARS, OpaqueBytes.of(0), notaryNode.notaryIdentity ) + val updateLatch = CountDownLatch(1) handle.progress.subscribe { msg -> log.info("Flow>> $msg") ++trackCount + updateLatch.countDown() } handle.returnValue.getOrThrow(timeout) + updateLatch.await() assertNotEquals(0, trackCount) } @@ -137,17 +141,20 @@ class StandaloneCordaRPClientTest { val (stateMachines, updates) = rpcProxy.stateMachinesFeed() assertEquals(0, stateMachines.size) + val updateLatch = CountDownLatch(1) val updateCount = AtomicInteger(0) updates.subscribe { update -> if (update is StateMachineUpdate.Added) { log.info("StateMachine>> Id=${update.id}") updateCount.incrementAndGet() + updateLatch.countDown() } } // Now issue some cash rpcProxy.startFlow(::CashIssueFlow, 513.SWISS_FRANCS, OpaqueBytes.of(0), notaryNode.notaryIdentity) .returnValue.getOrThrow(timeout) + updateLatch.await() assertEquals(1, updateCount.get()) } @@ -156,16 +163,16 @@ class StandaloneCordaRPClientTest { val (vault, vaultUpdates) = rpcProxy.vaultTrackBy(paging = PageSpecification(DEFAULT_PAGE_NUM)) assertEquals(0, vault.totalStatesAvailable) - val updateCount = AtomicInteger(0) + val updateLatch = CountDownLatch(1) vaultUpdates.subscribe { update -> log.info("Vault>> FlowId=${update.flowId}") - updateCount.incrementAndGet() + updateLatch.countDown() } // Now issue some cash rpcProxy.startFlow(::CashIssueFlow, 629.POUNDS, OpaqueBytes.of(0), notaryNode.notaryIdentity) .returnValue.getOrThrow(timeout) - assertNotEquals(0, updateCount.get()) + updateLatch.await() // Check that this cash exists in the vault val cashBalance = rpcProxy.getCashBalances()