Fix RPC flaky test (#2849)

* fix rpc reconnect flaky test; remove obsolete rpcproxy log message

* rpc client proxy: replace lock with atomic variable

* rpc client proxy: removed volatile property

* RPCStabilityTests: used eventually() method to test async response

* RPCStabilityTests: remove unused import
This commit is contained in:
bpaunescu
2018-03-20 16:23:29 +00:00
committed by GitHub
parent 144632818c
commit ac9cb59a6e
4 changed files with 23 additions and 19 deletions

View File

@ -78,6 +78,7 @@ dependencies {
testCompile project(':node-driver') testCompile project(':node-driver')
testCompile project(':client:mock') testCompile project(':client:mock')
integrationTestCompile project(path: ':node-api', configuration: 'testArtifacts')
// Smoke tests do NOT have any Node code on the classpath! // Smoke tests do NOT have any Node code on the classpath!
smokeTestCompile project(':smoke-test-utils') smokeTestCompile project(':smoke-test-utils')

View File

@ -12,6 +12,7 @@ import net.corda.core.serialization.serialize
import net.corda.core.utilities.* import net.corda.core.utilities.*
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.eventually
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.testThreadFactory import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.internal.* import net.corda.testing.node.internal.*
@ -247,9 +248,8 @@ class RPCStabilityTests {
assertEquals("pong", client.ping()) assertEquals("pong", client.ping())
serverFollower.shutdown() serverFollower.shutdown()
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow() startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
Thread.sleep(1000) //wait for the server to come back up val response = eventually<RPCException, String>(10.seconds) { client.ping() }
val pingFuture = pool.fork(client::ping) assertEquals("pong", response)
assertEquals("pong", pingFuture.getOrThrow(10.seconds))
clientFollower.shutdown() // Driver would do this after the new server, causing hang. clientFollower.shutdown() // Driver would do this after the new server, causing hang.
} }
} }

View File

@ -41,10 +41,9 @@ import java.lang.reflect.Method
import java.time.Instant import java.time.Instant
import java.util.* import java.util.*
import java.util.concurrent.* import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
import kotlin.reflect.jvm.javaMethod import kotlin.reflect.jvm.javaMethod
/** /**
@ -173,9 +172,7 @@ class RPCClientProxyHandler(
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry) private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
private val deduplicationSequenceNumber = AtomicLong(0) private val deduplicationSequenceNumber = AtomicLong(0)
private val lock = ReentrantReadWriteLock() private val sendingEnabled = AtomicBoolean(true)
@Volatile
private var sendingEnabled = true
/** /**
* Start the client. This creates the per-client queue, starts the consumer session and the reaper. * Start the client. This creates the per-client queue, starts the consumer session and the reaper.
@ -219,10 +216,8 @@ class RPCClientProxyHandler(
throw RPCException("RPC Proxy is closed") throw RPCException("RPC Proxy is closed")
} }
lock.readLock().withLock { if (!sendingEnabled.get())
if (!sendingEnabled)
throw RPCException("RPC server is not available.") throw RPCException("RPC server is not available.")
}
val replyId = InvocationId.newInstance() val replyId = InvocationId.newInstance()
callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>")) callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>"))
@ -411,11 +406,8 @@ class RPCClientProxyHandler(
private fun failoverHandler(event: FailoverEventType) { private fun failoverHandler(event: FailoverEventType) {
when (event) { when (event) {
FailoverEventType.FAILURE_DETECTED -> { FailoverEventType.FAILURE_DETECTED -> {
lock.writeLock().withLock { sendingEnabled.set(false)
sendingEnabled = false
}
log.warn("RPC server unavailable. RPC calls are being buffered.")
log.warn("Terminating observables.") log.warn("Terminating observables.")
val m = observableContext.observableMap.asMap() val m = observableContext.observableMap.asMap()
m.keys.forEach { k -> m.keys.forEach { k ->
@ -434,9 +426,7 @@ class RPCClientProxyHandler(
} }
FailoverEventType.FAILOVER_COMPLETED -> { FailoverEventType.FAILOVER_COMPLETED -> {
lock.writeLock().withLock { sendingEnabled.set(true)
sendingEnabled = true
}
log.info("RPC server available.") log.info("RPC server available.")
} }

View File

@ -49,6 +49,19 @@ dependencies {
testCompile project(':node-driver') testCompile project(':node-driver')
} }
configurations {
testArtifacts.extendsFrom testRuntime
}
task testJar(type: Jar) {
classifier "tests"
from sourceSets.test.output
}
artifacts {
testArtifacts testJar
}
jar { jar {
baseName 'corda-node-api' baseName 'corda-node-api'
} }