Bogdan - small rework of RPC client sending (#2812)

* rework message handling while connection is down: do not send anything, throw on all obs and futures when failure detected

* RPC calls will throw if used during failover; adapted tests
This commit is contained in:
bpaunescu 2018-03-14 11:45:18 +00:00 committed by GitHub
parent df536cee86
commit 3502186572
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 29 additions and 109 deletions

View File

@ -7,16 +7,13 @@ import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.fork
import net.corda.core.internal.concurrent.transpose import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.serialize import net.corda.core.serialization.serialize
import net.corda.core.utilities.* import net.corda.core.utilities.*
import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi import net.corda.nodeapi.RPCApi
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.testThreadFactory import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.User
import net.corda.testing.node.internal.* import net.corda.testing.node.internal.*
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.After import org.junit.After
@ -28,15 +25,12 @@ import rx.Observable
import rx.subjects.PublishSubject import rx.subjects.PublishSubject
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
import java.time.Duration import java.time.Duration
import java.time.Instant
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlin.concurrent.thread
import kotlin.test.assertNotNull
class RPCStabilityTests { class RPCStabilityTests {
@Rule @Rule
@ -253,42 +247,13 @@ class RPCStabilityTests {
assertEquals("pong", client.ping()) assertEquals("pong", client.ping())
serverFollower.shutdown() serverFollower.shutdown()
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow() startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
Thread.sleep(1000) //wait for the server to come back up
val pingFuture = pool.fork(client::ping) val pingFuture = pool.fork(client::ping)
assertEquals("pong", pingFuture.getOrThrow(10.seconds)) assertEquals("pong", pingFuture.getOrThrow(10.seconds))
clientFollower.shutdown() // Driver would do this after the new server, causing hang. clientFollower.shutdown() // Driver would do this after the new server, causing hang.
} }
} }
@Test
fun `client reconnects to server and resends buffered messages`() {
rpcDriver(startNodesInProcess = false) {
var nodeInfo: NodeInfo? = null
var nodeTime: Instant? = null
val alice = startNode(providedName = ALICE_NAME,
rpcUsers = listOf(User("alice", "alice", setOf("ALL")))).getOrThrow()
CordaRPCClient(alice.rpcAddress).use("alice", "alice") { connection ->
val proxy = connection.proxy
alice.stop()
val nodeInfoThread = thread {
nodeInfo = proxy.nodeInfo()
}
val currentTimeThread = thread {
nodeTime = proxy.currentNodeTime()
}
Thread.sleep(5000)
startNode(providedName = ALICE_NAME,
rpcUsers = listOf(User("alice", "alice", setOf("ALL"))),
customOverrides = mapOf("rpcSettings" to mapOf("address" to "localhost:${alice.rpcAddress.port}")))
currentTimeThread.join()
nodeInfoThread.join()
assertNotNull(nodeInfo)
assertNotNull(nodeTime)
}
}
}
@Test @Test
fun `connection failover fails, rpc calls throw`() { fun `connection failover fails, rpc calls throw`() {
rpcDriver { rpcDriver {
@ -356,57 +321,6 @@ class RPCStabilityTests {
} }
} }
interface ThreadOps : RPCOps {
fun sendMessage(id: Int, msgNo: Int): String
}
@Test
fun `multiple threads with 1000 messages for each thread`() {
val messageNo = 1000
val threadNo = 8
val ops = object : ThreadOps {
override val protocolVersion = 0
override fun sendMessage(id: Int, msgNo: Int): String {
return "($id-$msgNo)"
}
}
rpcDriver(startNodesInProcess = false) {
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
val proxy = RPCClient<ThreadOps>(serverPort).start(ThreadOps::class.java, "alice", "alice").proxy
val expectedMap = mutableMapOf<Int, StringBuilder>()
val resultsMap = mutableMapOf<Int, StringBuilder>()
(1 until threadNo).forEach { nr ->
(1 until messageNo).forEach { msgNo ->
expectedMap[nr] = expectedMap.getOrDefault(nr, StringBuilder()).append("($nr-$msgNo)")
}
}
val threads = mutableMapOf<Int, Thread>()
(1 until threadNo).forEach { nr ->
val thread = thread {
(1 until messageNo).forEach { msgNo ->
resultsMap[nr] = resultsMap.getOrDefault(nr, StringBuilder()).append(proxy.sendMessage(nr, msgNo))
}
}
threads[nr] = thread
}
// give the threads a chance to start sending some messages
Thread.sleep(50)
serverFollower.shutdown()
startRpcServer<ThreadOps>(rpcUser = User("alice", "alice", setOf("ALL")),
ops = ops, customPort = serverPort).getOrThrow()
threads.values.forEach { it.join() }
(1 until threadNo).forEach { assertEquals(expectedMap[it].toString(), resultsMap[it].toString()) }
}
}
interface TrackSubscriberOps : RPCOps { interface TrackSubscriberOps : RPCOps {
fun subscribe(): Observable<Unit> fun subscribe(): Observable<Unit>
} }

View File

@ -41,6 +41,8 @@ import java.util.*
import java.util.concurrent.* import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.concurrent.withLock
import kotlin.reflect.jvm.javaMethod import kotlin.reflect.jvm.javaMethod
/** /**
@ -161,9 +163,6 @@ class RPCClientProxyHandler(
build() build()
} }
// Used to buffer client requests if the server is unavailable
private val outgoingRequestBuffer = ConcurrentHashMap<InvocationId, RPCApi.ClientToServer>()
private var sessionFactory: ClientSessionFactory? = null private var sessionFactory: ClientSessionFactory? = null
private var producerSession: ClientSession? = null private var producerSession: ClientSession? = null
private var consumerSession: ClientSession? = null private var consumerSession: ClientSession? = null
@ -173,6 +172,9 @@ class RPCClientProxyHandler(
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry) private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
private val deduplicationSequenceNumber = AtomicLong(0) private val deduplicationSequenceNumber = AtomicLong(0)
private val lock = ReentrantReadWriteLock()
private var sendingEnabled = true
/** /**
* Start the client. This creates the per-client queue, starts the consumer session and the reaper. * Start the client. This creates the per-client queue, starts the consumer session and the reaper.
*/ */
@ -215,6 +217,11 @@ class RPCClientProxyHandler(
throw RPCException("RPC Proxy is closed") throw RPCException("RPC Proxy is closed")
} }
lock.readLock().withLock {
if (!sendingEnabled)
throw RPCException("RPC server is not available.")
}
val replyId = InvocationId.newInstance() val replyId = InvocationId.newInstance()
callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>")) callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>"))
try { try {
@ -233,13 +240,8 @@ class RPCClientProxyHandler(
"Generated several RPC requests with same ID $replyId" "Generated several RPC requests with same ID $replyId"
} }
outgoingRequestBuffer[replyId] = request
// try and send the request
sendMessage(request) sendMessage(request)
val result = replyFuture.getOrThrow() return replyFuture.getOrThrow()
// at this point the server responded, remove the buffered request
outgoingRequestBuffer.remove(replyId)
return result
} catch (e: RuntimeException) { } catch (e: RuntimeException) {
// Already an unchecked exception, so just rethrow it // Already an unchecked exception, so just rethrow it
throw e throw e
@ -407,8 +409,12 @@ class RPCClientProxyHandler(
private fun failoverHandler(event: FailoverEventType) { private fun failoverHandler(event: FailoverEventType) {
when (event) { when (event) {
FailoverEventType.FAILURE_DETECTED -> { FailoverEventType.FAILURE_DETECTED -> {
log.warn("RPC server unavailable. RPC calls are being buffered.") lock.writeLock().withLock {
log.warn("Terminating observables.") sendingEnabled = false
}
log.warn("RPC server unavailable.")
log.warn("Terminating observables and in flight RPCs.")
val m = observableContext.observableMap.asMap() val m = observableContext.observableMap.asMap()
m.keys.forEach { k -> m.keys.forEach { k ->
observationExecutorPool.run(k) { observationExecutorPool.run(k) {
@ -416,24 +422,24 @@ class RPCClientProxyHandler(
} }
} }
observableContext.observableMap.invalidateAll() observableContext.observableMap.invalidateAll()
rpcReplyMap.forEach { _, replyFuture ->
replyFuture.setException(RPCException("Connection failure detected."))
}
rpcReplyMap.clear()
callSiteMap?.clear()
} }
FailoverEventType.FAILOVER_COMPLETED -> { FailoverEventType.FAILOVER_COMPLETED -> {
log.info("RPC server available. Draining request buffer.") lock.writeLock().withLock {
outgoingRequestBuffer.keys.forEach { replyId -> sendingEnabled = true
outgoingRequestBuffer[replyId]?.let { sendMessage(it) }
} }
log.info("RPC server available.")
} }
FailoverEventType.FAILOVER_FAILED -> { FailoverEventType.FAILOVER_FAILED -> {
log.error("Could not reconnect to the RPC server. All buffered requests will be discarded and RPC calls " + log.error("Could not reconnect to the RPC server.")
"will throw an RPCException.")
rpcReplyMap.forEach { id, replyFuture ->
replyFuture.setException(RPCException("Could not re-connect to RPC server. Failover failed."))
}
outgoingRequestBuffer.clear()
rpcReplyMap.clear()
callSiteMap?.clear()
} }
} }
} }