CORDA-296: added handler for connection failover (#2639)

* CORDA-296: added handler for connection failover
print rpc address during node startup

* address PR comments: typo, whitespace, onError observables

* reworked rpc client msg buffering to avoid race conditions when failure happens while the request is being prepared to be sent; added unit test for 8 threads sharing same client sending 1000 requests

* decreased sleep time in rpc test, code cleanup
This commit is contained in:
bpaunescu 2018-03-08 08:21:35 +00:00 committed by GitHub
parent 17eb30965d
commit 98a6c71480
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 153 additions and 1 deletions

View File

@ -7,13 +7,16 @@ import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.* import net.corda.core.utilities.*
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi import net.corda.nodeapi.RPCApi
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.testThreadFactory import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.User
import net.corda.testing.node.internal.* import net.corda.testing.node.internal.*
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.After import org.junit.After
@ -25,12 +28,15 @@ import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
import java.time.Duration import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlin.concurrent.thread
import kotlin.test.assertNotNull
class RPCStabilityTests { class RPCStabilityTests {
@Rule @Rule
@ -253,6 +259,114 @@ class RPCStabilityTests {
} }
} }
@Test
fun `client reconnects to server and resends buffered messages`() {
rpcDriver(startNodesInProcess = false) {
var nodeInfo: NodeInfo? = null
var nodeTime: Instant? = null
val alice = startNode(providedName = ALICE_NAME,
rpcUsers = listOf(User("alice", "alice", setOf("ALL")))).getOrThrow()
CordaRPCClient(alice.rpcAddress).use("alice", "alice") { connection ->
val proxy = connection.proxy
alice.stop()
val nodeInfoThread = thread {
nodeInfo = proxy.nodeInfo()
}
val currentTimeThread = thread {
nodeTime = proxy.currentNodeTime()
}
Thread.sleep(5000)
startNode(providedName = ALICE_NAME,
rpcUsers = listOf(User("alice", "alice", setOf("ALL"))),
customOverrides = mapOf("rpcSettings" to mapOf("address" to "localhost:${alice.rpcAddress.port}")))
currentTimeThread.join()
nodeInfoThread.join()
assertNotNull(nodeInfo)
assertNotNull(nodeTime)
}
}
}
@Test
fun `connection failover fails, rpc calls throw`() {
rpcDriver {
val ops = object : ReconnectOps {
override val protocolVersion = 0
override fun ping() = "pong"
}
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
// Set retry interval to 1s to reduce test duration
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds, maxReconnectAttempts = 5)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
assertEquals("pong", client.ping())
serverFollower.shutdown()
try {
client.ping()
} catch (e: Exception) {
assertTrue(e is RPCException)
}
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}
interface ThreadOps : RPCOps {
fun sendMessage(id: Int, msgNo: Int): String
}
@Test
fun `multiple threads with 1000 messages for each thread`() {
val messageNo = 1000
val threadNo = 8
val ops = object : ThreadOps {
override val protocolVersion = 0
override fun sendMessage(id: Int, msgNo: Int): String {
return "($id-$msgNo)"
}
}
rpcDriver(startNodesInProcess = false) {
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
val proxy = RPCClient<ThreadOps>(serverPort).start(ThreadOps::class.java, "alice", "alice").proxy
val expectedMap = mutableMapOf<Int, StringBuilder>()
val resultsMap = mutableMapOf<Int, StringBuilder>()
(1 until threadNo).forEach { nr ->
(1 until messageNo).forEach { msgNo ->
expectedMap[nr] = expectedMap.getOrDefault(nr, StringBuilder()).append("($nr-$msgNo)")
}
}
val threads = mutableMapOf<Int, Thread>()
(1 until threadNo).forEach { nr ->
val thread = thread {
(1 until messageNo).forEach { msgNo ->
resultsMap[nr] = resultsMap.getOrDefault(nr, StringBuilder()).append(proxy.sendMessage(nr, msgNo))
}
}
threads[nr] = thread
}
// give the threads a chance to start sending some messages
Thread.sleep(50)
serverFollower.shutdown()
startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
ops = ops, customPort = serverPort).getOrThrow()
threads.values.forEach { it.join() }
(1 until threadNo).forEach { assertEquals(expectedMap[it].toString(), resultsMap[it].toString()) }
}
}
interface TrackSubscriberOps : RPCOps { interface TrackSubscriberOps : RPCOps {
fun subscribe(): Observable<Unit> fun subscribe(): Observable<Unit>
} }

View File

@ -161,6 +161,9 @@ class RPCClientProxyHandler(
build() build()
} }
// Used to buffer client requests if the server is unavailable
private val outgoingRequestBuffer = ConcurrentHashMap<InvocationId, RPCApi.ClientToServer>()
private var sessionFactory: ClientSessionFactory? = null private var sessionFactory: ClientSessionFactory? = null
private var producerSession: ClientSession? = null private var producerSession: ClientSession? = null
private var consumerSession: ClientSession? = null private var consumerSession: ClientSession? = null
@ -195,6 +198,7 @@ class RPCClientProxyHandler(
consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress) consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
rpcConsumer = consumerSession!!.createConsumer(clientAddress) rpcConsumer = consumerSession!!.createConsumer(clientAddress)
rpcConsumer!!.setMessageHandler(this::artemisMessageHandler) rpcConsumer!!.setMessageHandler(this::artemisMessageHandler)
producerSession!!.addFailoverListener(this::failoverHandler)
lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET) lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET)
consumerSession!!.start() consumerSession!!.start()
producerSession!!.start() producerSession!!.start()
@ -228,8 +232,14 @@ class RPCClientProxyHandler(
require(rpcReplyMap.put(replyId, replyFuture) == null) { require(rpcReplyMap.put(replyId, replyFuture) == null) {
"Generated several RPC requests with same ID $replyId" "Generated several RPC requests with same ID $replyId"
} }
outgoingRequestBuffer[replyId] = request
// try and send the request
sendMessage(request) sendMessage(request)
return replyFuture.getOrThrow() val result = replyFuture.getOrThrow()
// at this point the server responded, remove the buffered request
outgoingRequestBuffer.remove(replyId)
return result
} catch (e: RuntimeException) { } catch (e: RuntimeException) {
// Already an unchecked exception, so just rethrow it // Already an unchecked exception, so just rethrow it
throw e throw e
@ -393,6 +403,34 @@ class RPCClientProxyHandler(
sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds)) sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds))
} }
} }
private fun failoverHandler(event: FailoverEventType) {
when (event) {
FailoverEventType.FAILURE_DETECTED -> {
log.warn("RPC server unavailable. RPC calls are being buffered.")
}
FailoverEventType.FAILOVER_COMPLETED -> {
log.info("RPC server available. Draining request buffer.")
outgoingRequestBuffer.keys.forEach { replyId ->
outgoingRequestBuffer[replyId]?.let { sendMessage(it) }
}
}
FailoverEventType.FAILOVER_FAILED -> {
log.error("Could not reconnect to the RPC server. All buffered requests will be discarded and RPC calls " +
"will throw an RPCException.")
rpcReplyMap.forEach { id, replyFuture ->
replyFuture.setException(RPCException("Could not re-connect to RPC server. Failover failed."))
val observable = observableContext.observableMap.getIfPresent(id)
observable?.onError(RPCException("Could not re-connect to RPC server. Failover failed."))
}
outgoingRequestBuffer.clear()
rpcReplyMap.clear()
callSiteMap?.clear()
}
}
}
} }
private typealias RpcObservableMap = Cache<InvocationId, UnicastSubject<Notification<*>>> private typealias RpcObservableMap = Cache<InvocationId, UnicastSubject<Notification<*>>>