Fix flakey RPC tests (#1292)

This commit is contained in:
Rick Parker 2017-08-21 13:05:25 +01:00 committed by GitHub
parent c3c2ffcc2f
commit cbef5d6a2d

View File

@ -34,6 +34,7 @@ import java.io.FilterInputStream
import java.io.InputStream import java.io.InputStream
import java.nio.file.Paths import java.nio.file.Paths
import java.util.* import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import kotlin.streams.toList import kotlin.streams.toList
import kotlin.test.assertEquals import kotlin.test.assertEquals
@ -119,11 +120,14 @@ class StandaloneCordaRPClientTest {
val handle = rpcProxy.startTrackedFlow( val handle = rpcProxy.startTrackedFlow(
::CashIssueFlow, 429.DOLLARS, OpaqueBytes.of(0), notaryNode.notaryIdentity ::CashIssueFlow, 429.DOLLARS, OpaqueBytes.of(0), notaryNode.notaryIdentity
) )
val updateLatch = CountDownLatch(1)
handle.progress.subscribe { msg -> handle.progress.subscribe { msg ->
log.info("Flow>> $msg") log.info("Flow>> $msg")
++trackCount ++trackCount
updateLatch.countDown()
} }
handle.returnValue.getOrThrow(timeout) handle.returnValue.getOrThrow(timeout)
updateLatch.await()
assertNotEquals(0, trackCount) assertNotEquals(0, trackCount)
} }
@ -137,17 +141,20 @@ class StandaloneCordaRPClientTest {
val (stateMachines, updates) = rpcProxy.stateMachinesFeed() val (stateMachines, updates) = rpcProxy.stateMachinesFeed()
assertEquals(0, stateMachines.size) assertEquals(0, stateMachines.size)
val updateLatch = CountDownLatch(1)
val updateCount = AtomicInteger(0) val updateCount = AtomicInteger(0)
updates.subscribe { update -> updates.subscribe { update ->
if (update is StateMachineUpdate.Added) { if (update is StateMachineUpdate.Added) {
log.info("StateMachine>> Id=${update.id}") log.info("StateMachine>> Id=${update.id}")
updateCount.incrementAndGet() updateCount.incrementAndGet()
updateLatch.countDown()
} }
} }
// Now issue some cash // Now issue some cash
rpcProxy.startFlow(::CashIssueFlow, 513.SWISS_FRANCS, OpaqueBytes.of(0), notaryNode.notaryIdentity) rpcProxy.startFlow(::CashIssueFlow, 513.SWISS_FRANCS, OpaqueBytes.of(0), notaryNode.notaryIdentity)
.returnValue.getOrThrow(timeout) .returnValue.getOrThrow(timeout)
updateLatch.await()
assertEquals(1, updateCount.get()) assertEquals(1, updateCount.get())
} }
@ -156,16 +163,16 @@ class StandaloneCordaRPClientTest {
val (vault, vaultUpdates) = rpcProxy.vaultTrackBy<Cash.State>(paging = PageSpecification(DEFAULT_PAGE_NUM)) val (vault, vaultUpdates) = rpcProxy.vaultTrackBy<Cash.State>(paging = PageSpecification(DEFAULT_PAGE_NUM))
assertEquals(0, vault.totalStatesAvailable) assertEquals(0, vault.totalStatesAvailable)
val updateCount = AtomicInteger(0) val updateLatch = CountDownLatch(1)
vaultUpdates.subscribe { update -> vaultUpdates.subscribe { update ->
log.info("Vault>> FlowId=${update.flowId}") log.info("Vault>> FlowId=${update.flowId}")
updateCount.incrementAndGet() updateLatch.countDown()
} }
// Now issue some cash // Now issue some cash
rpcProxy.startFlow(::CashIssueFlow, 629.POUNDS, OpaqueBytes.of(0), notaryNode.notaryIdentity) rpcProxy.startFlow(::CashIssueFlow, 629.POUNDS, OpaqueBytes.of(0), notaryNode.notaryIdentity)
.returnValue.getOrThrow(timeout) .returnValue.getOrThrow(timeout)
assertNotEquals(0, updateCount.get()) updateLatch.await()
// Check that this cash exists in the vault // Check that this cash exists in the vault
val cashBalance = rpcProxy.getCashBalances() val cashBalance = rpcProxy.getCashBalances()