CORDA-1875 - backport tweaks made to RPC client for handling loss of node (#3916)

* CORDA-1875: backport tweaks made to RPC client for handling loss of connection to the node

* CORDA-1875: backport missing code
This commit is contained in:
bpaunescu 2018-09-10 10:20:16 +01:00 committed by Katelyn Baker
parent c7d5e094e7
commit a3ec8bf9bd
6 changed files with 180 additions and 13 deletions

View File

@ -13,12 +13,12 @@ import net.corda.core.utilities.*
import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.internal.*
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.After
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Assert.*
import org.junit.Rule
import org.junit.Test
import rx.Observable
@ -37,6 +37,8 @@ class RPCStabilityTests {
@JvmField
val testSerialization = SerializationEnvironmentRule(true)
private val pool = Executors.newFixedThreadPool(10, testThreadFactory())
private val portAllocation = PortAllocation.Incremental(10000)
@After
fun shutdown() {
pool.shutdown()
@ -247,12 +249,99 @@ class RPCStabilityTests {
assertEquals("pong", client.ping())
serverFollower.shutdown()
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
val pingFuture = pool.fork(client::ping)
assertEquals("pong", pingFuture.getOrThrow(10.seconds))
val response = eventually<RPCException, String>(10.seconds) { client.ping() }
assertEquals("pong", response)
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}
@Test
fun `connection failover fails, rpc calls throw`() {
rpcDriver {
val ops = object : ReconnectOps {
override val protocolVersion = 1000
override fun ping() = "pong"
}
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
// Set retry interval to 1s to reduce test duration
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
assertEquals("pong", client.ping())
serverFollower.shutdown()
try {
client.ping()
} catch (e: Exception) {
assertTrue(e is RPCException)
}
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}
interface NoOps : RPCOps {
fun subscribe(): Observable<Nothing>
}
@Test
fun `observables error when connection breaks`() {
rpcDriver {
val ops = object : NoOps {
override val protocolVersion = 1000
override fun subscribe(): Observable<Nothing> {
return PublishSubject.create<Nothing>()
}
}
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<NoOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 500.millis, maxReconnectAttempts = 1)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<NoOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
var terminateHandlerCalled = false
var errorHandlerCalled = false
var exceptionMessage: String? = null
val subscription = client.subscribe()
.doOnTerminate{ terminateHandlerCalled = true }
.subscribe({}, {
errorHandlerCalled = true
//log exception
exceptionMessage = it.message
})
serverFollower.shutdown()
Thread.sleep(100)
assertTrue(terminateHandlerCalled)
assertTrue(errorHandlerCalled)
assertEquals("Connection failure detected.", exceptionMessage)
assertTrue(subscription.isUnsubscribed)
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}
@Test
fun `client throws RPCException after initial connection attempt fails`() {
val client = CordaRPCClient(portAllocation.nextHostAndPort())
var exceptionMessage: String? = null
try {
client.start("user", "pass").proxy
} catch (e1: RPCException) {
exceptionMessage = e1.message
} catch (e2: Exception) {
fail("Expected RPCException to be thrown. Received ${e2.javaClass.simpleName} instead.")
}
assertNotNull(exceptionMessage)
assertEquals("Cannot connect to server(s). Tried with all available servers.", exceptionMessage)
}
interface TrackSubscriberOps : RPCOps {
fun subscribe(): Observable<Unit>
}
@ -432,4 +521,26 @@ fun RPCDriverDSL.pollUntilClientNumber(server: RpcServerHandle, expected: Int) {
val clientAddresses = server.broker.serverControl.addressNames.filter { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) }
clientAddresses.size == expected
}.get()
}
/**
* Ideas borrowed from "io.kotlintest" with some improvements made
* This is meant for use from Kotlin code use only mainly due to it's inline/reified nature
*/
inline fun <reified E : Throwable, R> eventually(duration: Duration, f: () -> R): R {
val end = System.nanoTime() + duration.toNanos()
var times = 0
while (System.nanoTime() < end) {
try {
return f()
} catch (e: Throwable) {
when (e) {
is E -> {
}// ignore and continue
else -> throw e // unexpected exception type - rethrow
}
}
times++
}
throw AssertionError("Test failed after $duration; attempted $times times")
}

View File

@ -66,6 +66,10 @@ data class CordaRPCClientConfiguration(val connectionMaxRetryInterval: Duration)
* with an error, the observable is closed and you can't then re-subscribe again: you'll have to re-request a fresh
* observable with another RPC.
*
* In case of loss of connection to the server, the client will try to reconnect using the settings provided via
* [CordaRPCClientConfiguration]. While attempting failover, current and future RPC calls will throw
* [RPCException] and previously returned observables will call onError().
*
* @param hostAndPort The network address to connect to.
* @param configuration An optional configuration used to tweak client behaviour.
* @param sslConfiguration An optional [SSLConfiguration] used to enable secure communication with the server.

View File

@ -27,6 +27,7 @@ import net.corda.core.utilities.debug
import net.corda.core.utilities.getOrThrow
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.internal.DeduplicationChecker
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
@ -39,6 +40,7 @@ import java.lang.reflect.Method
import java.time.Instant
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.reflect.jvm.javaMethod
@ -170,6 +172,8 @@ class RPCClientProxyHandler(
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
private val deduplicationSequenceNumber = AtomicLong(0)
private val sendingEnabled = AtomicBoolean(true)
/**
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
*/
@ -188,7 +192,12 @@ class RPCClientProxyHandler(
rpcConfiguration.reapInterval.toMillis(),
TimeUnit.MILLISECONDS
)
sessionFactory = serverLocator.createSessionFactory()
try {
sessionFactory = serverLocator.createSessionFactory()
} catch (e: ActiveMQNotConnectedException) {
throw (RPCException("Cannot connect to server(s). Tried with all available servers.", e))
}
sessionFactory!!.addFailoverListener(this::failoverHandler)
producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME)
consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
@ -211,6 +220,9 @@ class RPCClientProxyHandler(
throw RPCException("RPC Proxy is closed")
}
if (!sendingEnabled.get())
throw RPCException("RPC server is not available.")
val replyId = InvocationId.newInstance()
callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>"))
try {
@ -393,6 +405,46 @@ class RPCClientProxyHandler(
sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds))
}
}
private fun failoverHandler(event: FailoverEventType) {
when (event) {
FailoverEventType.FAILURE_DETECTED -> {
cleanUpOnConnectionLoss()
}
FailoverEventType.FAILOVER_COMPLETED -> {
sendingEnabled.set(true)
log.info("RPC server available.")
}
FailoverEventType.FAILOVER_FAILED -> {
log.error("Could not reconnect to the RPC server.")
}
}
}
private fun cleanUpOnConnectionLoss() {
sendingEnabled.set(false)
log.warn("Terminating observables.")
val m = observableContext.observableMap.asMap()
m.keys.forEach { k ->
observationExecutorPool.run(k) {
try {
m[k]?.onError(RPCException("Connection failure detected."))
} catch (th: Throwable) {
log.error("Unexpected exception when RPC connection failure handling", th)
}
}
}
observableContext.observableMap.invalidateAll()
rpcReplyMap.forEach { _, replyFuture ->
replyFuture.setException(RPCException("Connection failure detected."))
}
rpcReplyMap.clear()
callSiteMap?.clear()
}
}
private typealias RpcObservableMap = Cache<InvocationId, UnicastSubject<Notification<*>>>

View File

@ -1,5 +1,6 @@
package net.corda.node.services.rpc
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.internal.RPCClient
import net.corda.core.context.AuthServiceId
import net.corda.core.identity.CordaX500Name
@ -21,7 +22,6 @@ import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.PortAllocation
import net.corda.testing.driver.internal.RandomFree
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
@ -94,7 +94,7 @@ class ArtemisRpcTests {
// client.trustStore["cordaclienttls"] = rootCertificate
withKeyStores(server, client) { brokerSslOptions, clientSslOptions ->
testSslCommunication(brokerSslOptions, true, clientSslOptions, clientConnectionSpy = expectExceptionOfType(ActiveMQNotConnectedException::class))
testSslCommunication(brokerSslOptions, true, clientSslOptions, clientConnectionSpy = expectExceptionOfType(RPCException::class))
}
}
}
@ -115,7 +115,7 @@ class ArtemisRpcTests {
client.trustStore["cordaclienttls"] = rootCertificate
withKeyStores(server, client) { brokerSslOptions, clientSslOptions ->
testSslCommunication(brokerSslOptions, true, clientSslOptions, clientConnectionSpy = expectExceptionOfType(ActiveMQNotConnectedException::class))
testSslCommunication(brokerSslOptions, true, clientSslOptions, clientConnectionSpy = expectExceptionOfType(RPCException::class))
}
}
}
@ -158,7 +158,7 @@ class ArtemisRpcTests {
client.trustStore["cordaclienttls"] = rootCertificate
withKeyStores(server, client) { brokerSslOptions, clientSslOptions ->
testSslCommunication(brokerSslOptions, true, clientSslOptions, clientConnectionSpy = expectExceptionOfType(ActiveMQNotConnectedException::class))
testSslCommunication(brokerSslOptions, true, clientSslOptions, clientConnectionSpy = expectExceptionOfType(RPCException::class))
}
}
}

View File

@ -3,10 +3,10 @@ package net.corda.irs.web
import com.fasterxml.jackson.databind.ObjectMapper
import net.corda.client.jackson.JacksonSupport
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.RPCException
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.finance.plugin.registerFinanceJSONMappers
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
@ -39,7 +39,7 @@ class IrsDemoWebApplication {
do {
try {
return CordaRPCClient(NetworkHostAndPort.parse(cordaHost)).start(cordaUser, cordaPassword).proxy
} catch (ex: ActiveMQNotConnectedException) {
} catch (ex: RPCException) {
if (maxRetries-- > 0) {
Thread.sleep(1000)
} else {

View File

@ -3,13 +3,13 @@ package net.corda.webserver.internal
import com.google.common.html.HtmlEscapers.htmlEscaper
import net.corda.client.jackson.JacksonSupport
import net.corda.client.rpc.CordaRPCClient
import net.corda.client.rpc.RPCException
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.contextLogger
import net.corda.webserver.WebServerConfig
import net.corda.webserver.converters.CordaConverterProvider
import net.corda.webserver.services.WebServerPluginRegistry
import net.corda.webserver.servlets.*
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.eclipse.jetty.server.*
import org.eclipse.jetty.server.handler.ErrorHandler
import org.eclipse.jetty.server.handler.HandlerCollection
@ -170,7 +170,7 @@ class NodeWebServer(val config: WebServerConfig) {
while (true) {
try {
return connectLocalRpcAsNodeUser()
} catch (e: ActiveMQNotConnectedException) {
} catch (e: RPCException) {
log.debug("Could not connect to ${config.rpcAddress} due to exception: ", e)
Thread.sleep(retryDelay)
// This error will happen if the server has yet to create the keystore