CORDA-2802 use eventually to wait (#4932)

* CORDA-2802 use eventually to wait

* Catch Exception, not Throwable
This commit is contained in:
Dominic Fox 2019-03-26 16:01:06 +00:00 committed by josecoll
parent 3e4a5976d8
commit c2ad64ccde
7 changed files with 153 additions and 101 deletions

View File

@ -11,7 +11,8 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.*
import net.corda.node.services.rpc.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.eventually
import net.corda.testing.common.internal.eventually
import net.corda.testing.common.internal.succeeds
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.internal.testThreadFactory
@ -249,7 +250,9 @@ class RPCStabilityTests {
assertEquals("pong", client.ping())
serverFollower.shutdown()
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
val response = eventually<RPCException, String>(10.seconds) { client.ping() }
val response = eventually {
succeeds { client.ping() }
}
assertEquals("pong", response)
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
@ -316,13 +319,13 @@ class RPCStabilityTests {
})
serverFollower.shutdown()
Thread.sleep(100)
assertTrue(terminateHandlerCalled)
assertTrue(errorHandlerCalled)
assertEquals("Connection failure detected.", exceptionMessage)
assertTrue(subscription.isUnsubscribed)
eventually {
assertTrue(terminateHandlerCalled)
assertTrue(errorHandlerCalled)
assertEquals("Connection failure detected.", exceptionMessage)
assertTrue(subscription.isUnsubscribed)
}
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}

View File

@ -1,25 +0,0 @@
package net.corda.nodeapi
import java.time.Duration
/**
* Ideas borrowed from "io.kotlintest" with some improvements made
* This is meant for use from Kotlin code use only mainly due to it's inline/reified nature
*/
inline fun <reified E : Throwable, R> eventually(duration: Duration, f: () -> R): R {
val end = System.nanoTime() + duration.toNanos()
var times = 0
while (System.nanoTime() < end) {
try {
return f()
} catch (e: Throwable) {
when (e) {
is E -> {
}// ignore and continue
else -> throw e // unexpected exception type - rethrow
}
}
times++
}
throw AssertionError("Test failed after $duration; attempted $times times")
}

View File

@ -3,8 +3,8 @@ package net.corda.nodeapi.internal.network
import net.corda.core.internal.div
import net.corda.core.internal.list
import net.corda.core.internal.write
import net.corda.nodeapi.eventually
import net.corda.core.internal.NODE_INFO_DIRECTORY
import net.corda.testing.common.internal.eventually
import org.assertj.core.api.Assertions.assertThat
import org.junit.Before
import org.junit.Rule
@ -63,7 +63,7 @@ class NodeInfoFilesCopierTest {
nodeInfoFilesCopier.addConfig(node2RootPath)
advanceTime()
eventually<AssertionError, Unit>(Duration.ofMinutes(1)) {
eventually(Duration.ofMinutes(1)) {
// Check only one file is copied.
checkDirectoryContainsSingleFile(node2AdditionalNodeInfoPath, GOOD_NODE_INFO_NAME)
}
@ -81,7 +81,7 @@ class NodeInfoFilesCopierTest {
(node2RootPath / BAD_NODE_INFO_NAME).write(content)
advanceTime()
eventually<AssertionError, Unit>(Duration.ofMinutes(1)) {
eventually(Duration.ofMinutes(1)) {
// Check only one file is copied to the other node.
checkDirectoryContainsSingleFile(node1AdditionalNodeInfoPath, GOOD_NODE_INFO_NAME)
}
@ -105,7 +105,7 @@ class NodeInfoFilesCopierTest {
(node2RootPath / GOOD_NODE_INFO_NAME).write(content)
advanceTime()
eventually<AssertionError, Unit>(Duration.ofMinutes(1)) {
eventually(Duration.ofMinutes(1)) {
// Check only one file is copied to the other node.
checkDirectoryContainsSingleFile(node1AdditionalNodeInfoPath, GOOD_NODE_INFO_NAME)
}
@ -124,8 +124,9 @@ class NodeInfoFilesCopierTest {
(node2RootPath / GOOD_NODE_INFO_NAME_2).write(content)
// Give some time to the filesystem to report the change.
Thread.sleep(100)
assertThat(node1AdditionalNodeInfoPath.list()).isEmpty()
eventually {
assertThat(node1AdditionalNodeInfoPath.list()).isEmpty()
}
}
private fun advanceTime() {

View File

@ -11,6 +11,7 @@ import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_FILE_NAME
import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.testing.common.internal.eventually
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.*
import net.corda.testing.driver.NodeHandle
@ -113,20 +114,21 @@ class NetworkMapTest(var initFunc: (URL, NetworkMapServer) -> CompatibilityZoneP
)
val laterHash = laterParams.serialize().hash
networkMapServer.scheduleParametersUpdate(laterParams, "Another update", Instant.ofEpochMilli(random63BitValue()))
Thread.sleep(cacheTimeout.toMillis() * 2)
updates.expectEvents(isStrict = false) {
sequence(
expect { update: ParametersUpdateInfo ->
assertEquals(update.description, "Next parameters")
assertEquals(update.hash, nextHash)
assertEquals(update.parameters, nextParams)
},
expect { update: ParametersUpdateInfo ->
assertEquals(update.description, "Another update")
assertEquals(update.hash, laterHash)
assertEquals(update.parameters, laterParams)
}
)
eventually {
updates.expectEvents(isStrict = false) {
sequence(
expect { update: ParametersUpdateInfo ->
assertEquals(update.description, "Next parameters")
assertEquals(update.hash, nextHash)
assertEquals(update.parameters, nextParams)
},
expect { update: ParametersUpdateInfo ->
assertEquals(update.description, "Another update")
assertEquals(update.hash, laterHash)
assertEquals(update.parameters, laterParams)
}
)
}
}
// This should throw, because the nextHash has been replaced by laterHash
assertThatThrownBy { alice.rpc.acceptNewNetworkParameters(nextHash) }.hasMessageContaining("Refused to accept parameters with hash")

View File

@ -27,6 +27,7 @@ import net.corda.nodeapi.internal.network.NETWORK_PARAMS_UPDATE_FILE_NAME
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier
import net.corda.nodeapi.internal.network.SignedNetworkParameters
import net.corda.nodeapi.internal.network.verifiedNetworkParametersCert
import net.corda.testing.common.internal.eventually
import net.corda.testing.common.internal.testNetworkParameters
import net.corda.testing.core.*
import net.corda.testing.internal.DEV_ROOT_CA
@ -38,6 +39,7 @@ import net.corda.testing.node.makeTestIdentityService
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Assert.fail
import org.junit.Before
import org.junit.Rule
import org.junit.Test
@ -45,6 +47,7 @@ import rx.schedulers.TestScheduler
import java.io.IOException
import java.net.URL
import java.security.KeyPair
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
@ -126,9 +129,8 @@ class NetworkMapUpdaterTest {
networkMapClient.publish(signedNodeInfo2)
assertThat(nodeReadyFuture).isNotDone()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, times(2)).addNode(any())
eventually { verify(networkMapCache, times(2)).addNode(any()) }
verify(networkMapCache, times(1)).addNode(nodeInfo1)
verify(networkMapCache, times(1)).addNode(nodeInfo2)
assertThat(nodeReadyFuture).isDone()
@ -137,10 +139,9 @@ class NetworkMapUpdaterTest {
networkMapClient.publish(signedNodeInfo3)
networkMapClient.publish(signedNodeInfo4)
advanceTime()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
// 4 node info from network map, and 1 from file.
verify(networkMapCache, times(5)).addNode(any())
eventually { verify(networkMapCache, times(5)).addNode(any()) }
verify(networkMapCache, times(1)).addNode(nodeInfo3)
verify(networkMapCache, times(1)).addNode(nodeInfo4)
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
@ -164,20 +165,17 @@ class NetworkMapUpdaterTest {
startUpdater()
advanceTime()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
// 4 node info from network map, and 1 from file.
verify(networkMapCache, times(5)).addNode(any())
eventually { verify(networkMapCache, times(5)).addNode(any()) }
verify(networkMapCache, times(1)).addNode(fileNodeInfoAndSigned.nodeInfo)
// Test remove node.
listOf(nodeInfo1, nodeInfo2, nodeInfo3, nodeInfo4).forEach {
server.removeNodeInfo(it)
}
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, times(4)).removeNode(any())
eventually { verify(networkMapCache, times(4)).removeNode(any()) }
verify(networkMapCache, times(1)).removeNode(nodeInfo1)
verify(networkMapCache, times(1)).removeNode(nodeInfo2)
verify(networkMapCache, times(1)).removeNode(nodeInfo3)
@ -237,13 +235,12 @@ class NetworkMapUpdaterTest {
val newParameters = testNetworkParameters(epoch = 314, maxMessageSize = 10485761)
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
startUpdater()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
val newHash = newParameters.serialize().hash
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
assert(!updateFile.exists()) { "network parameters should not be auto accepted" }
assertNever("network parameters should not be auto accepted") { updateFile.exists() }
updater!!.acceptNewNetworkParameters(newHash) { it.serialize().sign(ourKeyPair) }
verify(networkParametersStorage, times(1)).saveParameters(any())
eventually { verify(networkParametersStorage, times(1)).saveParameters(any()) }
val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
val paramsFromFile = signedNetworkParams.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate)
assertEquals(newParameters, paramsFromFile)
@ -258,14 +255,15 @@ class NetworkMapUpdaterTest {
whitelistedContractImplementations = mapOf("key" to listOf(SecureHash.randomSHA256())))
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
startUpdater()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
val newHash = newParameters.serialize().hash
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
val paramsFromFile = signedNetworkParams.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate)
assertEquals(newParameters, paramsFromFile)
assertEquals(newHash, server.latestParametersAccepted(ourKeyPair.public))
eventually {
assertTrue(updateFile.exists(), "Update file should be created")
val signedNetworkParams = updateFile.readObject<SignedNetworkParameters>()
val paramsFromFile = signedNetworkParams.verifiedNetworkParametersCert(DEV_ROOT_CA.certificate)
assertEquals(newParameters, paramsFromFile)
assertEquals(newHash, server.latestParametersAccepted(ourKeyPair.public))
}
}
@Test
@ -276,10 +274,9 @@ class NetworkMapUpdaterTest {
whitelistedContractImplementations = mapOf("key" to listOf(SecureHash.randomSHA256())))
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
startUpdater(excludedAutoAcceptNetworkParameters = setOf("whitelistedContractImplementations"))
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
assert(!updateFile.exists()) { "network parameters should not be auto accepted" }
assertNever("network parameters should not be auto accepted") { updateFile.exists() }
}
@Test
@ -290,10 +287,9 @@ class NetworkMapUpdaterTest {
whitelistedContractImplementations = mapOf("key" to listOf(SecureHash.randomSHA256())))
server.scheduleParametersUpdate(newParameters, "Test update", Instant.MIN)
startUpdater(autoAcceptNetworkParameters = false)
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
val updateFile = baseDir / NETWORK_PARAMS_UPDATE_FILE_NAME
assert(!updateFile.exists()) { "network parameters should not be auto accepted" }
assertNever("network parameters should not be auto accepted") { updateFile.exists() }
}
@Test
@ -354,17 +350,20 @@ class NetworkMapUpdaterTest {
startUpdater()
advanceTime()
verify(networkMapCache, times(1)).addNode(localNodeInfo)
Thread.sleep(2L * cacheExpiryMs)
// Node from file has higher serial than the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash)
eventually { assertThat(networkMapCache.allNodeHashes).containsOnly(localSignedNodeInfo.signed.raw.hash) }
val fileName = "${NodeInfoFilesCopier.NODE_INFO_FILE_NAME_PREFIX}${localNodeInfo.legalIdentities[0].name.serialize().hash}"
(nodeInfoDir / fileName).delete()
advanceTime()
verify(networkMapCache, times(1)).removeNode(any())
verify(networkMapCache).removeNode(localNodeInfo)
Thread.sleep(2L * cacheExpiryMs)
// Instead of node from file we should have now the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(serverSignedNodeInfo.raw.hash)
eventually {
// Instead of node from file we should have now the one from NetworkMapServer
assertThat(networkMapCache.allNodeHashes).containsOnly(serverSignedNodeInfo.raw.hash)
}
}
// Test fix for ENT-1882
@ -378,8 +377,9 @@ class NetworkMapUpdaterTest {
networkMapCache.addNode(myInfo) // Simulate behaviour on node startup when our node info is added to cache
networkMapClient.publish(signedOtherInfo)
startUpdater(ourNodeInfo = signedMyInfo)
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, never()).removeNode(myInfo)
assertAlways("Node must never be removed") {
verify(networkMapCache, never()).removeNode(myInfo)
}
assertThat(server.networkMapHashes()).containsOnly(signedOtherInfo.raw.hash)
assertThat(networkMapCache.allNodeHashes).containsExactlyInAnyOrder(signedMyInfo.raw.hash, signedOtherInfo.raw.hash)
}
@ -402,18 +402,17 @@ class NetworkMapUpdaterTest {
startUpdater()
// TODO: Remove sleep in unit test.
Thread.sleep(2L * cacheExpiryMs)
verify(networkMapCache, times(1)).addNode(signedNodeInfo1.verified())
eventually { verify(networkMapCache, times(1)).addNode(signedNodeInfo1.verified()) }
assert(networkMapCache.allNodeHashes.size == 1)
networkMapClient.publish(signedNodeInfo2)
Thread.sleep(2L * cacheExpiryMs)
advanceTime()
verify(networkMapCache, times(1)).addNode(signedNodeInfo2.verified())
verify(networkMapCache, times(1)).removeNode(signedNodeInfo1.verified())
assert(networkMapCache.allNodeHashes.size == 1)
eventually {
verify(networkMapCache, times(1)).addNode(signedNodeInfo2.verified())
verify(networkMapCache, times(1)).removeNode(signedNodeInfo1.verified())
}
assertEquals(1, networkMapCache.allNodeHashes.size)
}
@Test
@ -471,4 +470,24 @@ class NetworkMapUpdaterTest {
private fun advanceTime() {
scheduler.advanceTimeBy(10, TimeUnit.SECONDS)
}
private fun assertNever(condition: String, check: () -> Boolean) {
val timeoutMillis = 2L * cacheExpiryMs
val start = Instant.now()
while (Duration.between(start, Instant.now()).toMillis() < timeoutMillis) {
Thread.sleep(100)
if (check()) fail(condition)
}
}
private fun assertAlways(condition: String, check: () -> Unit) {
assertNever(condition) {
try {
check()
false
} catch (e: Exception) {
true
}
}
}
}

View File

@ -15,12 +15,14 @@ import net.corda.node.services.FinalityHandler
import net.corda.node.services.messaging.Message
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.nodeapi.internal.persistence.contextTransaction
import net.corda.testing.common.internal.eventually
import net.corda.testing.core.TestIdentity
import net.corda.testing.node.internal.*
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.hibernate.exception.ConstraintViolationException
import org.junit.After
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
import java.sql.SQLException
@ -99,10 +101,10 @@ class RetryFlowMockTest {
})
val count = 10000 // Lots of iterations so the flow keeps going long enough
nodeA.startFlow(KeepSendingFlow(count, partyB))
while (messagesSent.size < 1) {
Thread.sleep(10)
eventually(waitBetween = Duration.ofMillis(10)) {
assertTrue(messagesSent.isNotEmpty())
assertNotNull(messagesSent.first().senderUUID)
}
assertNotNull(messagesSent.first().senderUUID)
nodeA = mockNet.restartNode(nodeA)
// This is a bit racy because restarting the node actually starts it, so we need to make sure there's enough iterations we get here with flow still going.
nodeA.setMessagingServiceSpy(object : MessagingServiceSpy() {
@ -113,8 +115,8 @@ class RetryFlowMockTest {
})
// Now short circuit the iterations so the flow finishes soon.
KeepSendingFlow.count.set(count - 2)
while (nodeA.smm.allStateMachines.isNotEmpty()) {
Thread.sleep(10)
eventually(waitBetween = Duration.ofMillis(10)) {
assertTrue(nodeA.smm.allStateMachines.isEmpty())
}
assertNull(messagesSent.last().senderUUID)
}

View File

@ -0,0 +1,50 @@
package net.corda.testing.common.internal
import java.time.Duration
/**
* Ideas borrowed from "io.kotlintest" with some improvements made
* This is meant for use from Kotlin code use only mainly due to it's inline/reified nature
*
* @param duration How long to wait for, before returning the last test failure. The default is 5 seconds.
* @param waitBetween How long to wait before retrying the test condition. The default is 1/10th of a second.
* @param waitBefore How long to wait before trying the test condition for the first time. It's assumed that [eventually]
* is being used because the condition is not _immediately_ fulfilled, so this defaults to the value of [waitBetween].
* @param test A test which should pass within the given [duration].
*
* @throws AssertionError, if the test does not pass within the given [duration].
*/
inline fun <R> eventually(
duration: Duration = Duration.ofSeconds(5),
waitBetween: Duration = Duration.ofMillis(100),
waitBefore: Duration = waitBetween,
test: () -> R): R {
val end = System.nanoTime() + duration.toNanos()
var times = 0
var lastFailure: AssertionError? = null
if (!waitBefore.isZero) Thread.sleep(waitBefore.toMillis())
while (System.nanoTime() < end) {
try {
return test()
} catch (e: AssertionError) {
if (!waitBetween.isZero) Thread.sleep(waitBetween.toMillis())
lastFailure = e
}
times++
}
throw AssertionError("Test failed with \"${lastFailure?.message}\" after $duration; attempted $times times")
}
/**
* Use when the action you want to retry until it succeeds throws an exception, rather than failing a test.
*/
inline fun <R> succeeds(action: () -> R): R =
try {
action()
} catch (e: Exception) {
throw AssertionError(e.message)
}