diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index d616f34bb3..246358999c 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -7,16 +7,13 @@ import net.corda.core.crypto.random63BitValue import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose import net.corda.core.messaging.RPCOps -import net.corda.core.node.NodeInfo import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.serialize import net.corda.core.utilities.* import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.RPCApi -import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.internal.testThreadFactory -import net.corda.testing.node.User import net.corda.testing.node.internal.* import org.apache.activemq.artemis.api.core.SimpleString import org.junit.After @@ -28,15 +25,12 @@ import rx.Observable import rx.subjects.PublishSubject import rx.subjects.UnicastSubject import java.time.Duration -import java.time.Instant import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLong -import kotlin.concurrent.thread -import kotlin.test.assertNotNull class RPCStabilityTests { @Rule @@ -253,42 +247,13 @@ class RPCStabilityTests { assertEquals("pong", client.ping()) serverFollower.shutdown() startRpcServer(ops = ops, customPort = serverPort).getOrThrow() + Thread.sleep(1000) //wait for the server to come back up val pingFuture = pool.fork(client::ping) assertEquals("pong", pingFuture.getOrThrow(10.seconds)) clientFollower.shutdown() // Driver would do this after the new server, causing hang. } } - @Test - fun `client reconnects to server and resends buffered messages`() { - rpcDriver(startNodesInProcess = false) { - var nodeInfo: NodeInfo? = null - var nodeTime: Instant? = null - val alice = startNode(providedName = ALICE_NAME, - rpcUsers = listOf(User("alice", "alice", setOf("ALL")))).getOrThrow() - CordaRPCClient(alice.rpcAddress).use("alice", "alice") { connection -> - val proxy = connection.proxy - alice.stop() - val nodeInfoThread = thread { - nodeInfo = proxy.nodeInfo() - } - - val currentTimeThread = thread { - nodeTime = proxy.currentNodeTime() - } - - Thread.sleep(5000) - startNode(providedName = ALICE_NAME, - rpcUsers = listOf(User("alice", "alice", setOf("ALL"))), - customOverrides = mapOf("rpcSettings" to mapOf("address" to "localhost:${alice.rpcAddress.port}"))) - currentTimeThread.join() - nodeInfoThread.join() - assertNotNull(nodeInfo) - assertNotNull(nodeTime) - } - } - } - @Test fun `connection failover fails, rpc calls throw`() { rpcDriver { @@ -356,57 +321,6 @@ class RPCStabilityTests { } } - interface ThreadOps : RPCOps { - fun sendMessage(id: Int, msgNo: Int): String - } - - @Test - fun `multiple threads with 1000 messages for each thread`() { - val messageNo = 1000 - val threadNo = 8 - val ops = object : ThreadOps { - override val protocolVersion = 0 - override fun sendMessage(id: Int, msgNo: Int): String { - return "($id-$msgNo)" - } - } - - rpcDriver(startNodesInProcess = false) { - val serverFollower = shutdownManager.follower() - val serverPort = startRpcServer(rpcUser = User("alice", "alice", setOf("ALL")), - ops = ops).getOrThrow().broker.hostAndPort!! - - serverFollower.unfollow() - val proxy = RPCClient(serverPort).start(ThreadOps::class.java, "alice", "alice").proxy - val expectedMap = mutableMapOf() - val resultsMap = mutableMapOf() - - (1 until threadNo).forEach { nr -> - (1 until messageNo).forEach { msgNo -> - expectedMap[nr] = expectedMap.getOrDefault(nr, StringBuilder()).append("($nr-$msgNo)") - } - } - - val threads = mutableMapOf() - (1 until threadNo).forEach { nr -> - val thread = thread { - (1 until messageNo).forEach { msgNo -> - resultsMap[nr] = resultsMap.getOrDefault(nr, StringBuilder()).append(proxy.sendMessage(nr, msgNo)) - } - } - threads[nr] = thread - } - // give the threads a chance to start sending some messages - Thread.sleep(50) - serverFollower.shutdown() - startRpcServer(rpcUser = User("alice", "alice", setOf("ALL")), - ops = ops, customPort = serverPort).getOrThrow() - threads.values.forEach { it.join() } - (1 until threadNo).forEach { assertEquals(expectedMap[it].toString(), resultsMap[it].toString()) } - } - - } - interface TrackSubscriberOps : RPCOps { fun subscribe(): Observable } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 274dd63941..54b931ed8d 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -41,6 +41,8 @@ import java.util.* import java.util.concurrent.* import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.locks.ReentrantReadWriteLock +import kotlin.concurrent.withLock import kotlin.reflect.jvm.javaMethod /** @@ -161,9 +163,6 @@ class RPCClientProxyHandler( build() } - // Used to buffer client requests if the server is unavailable - private val outgoingRequestBuffer = ConcurrentHashMap() - private var sessionFactory: ClientSessionFactory? = null private var producerSession: ClientSession? = null private var consumerSession: ClientSession? = null @@ -173,6 +172,9 @@ class RPCClientProxyHandler( private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry) private val deduplicationSequenceNumber = AtomicLong(0) + private val lock = ReentrantReadWriteLock() + private var sendingEnabled = true + /** * Start the client. This creates the per-client queue, starts the consumer session and the reaper. */ @@ -215,6 +217,11 @@ class RPCClientProxyHandler( throw RPCException("RPC Proxy is closed") } + lock.readLock().withLock { + if (!sendingEnabled) + throw RPCException("RPC server is not available.") + } + val replyId = InvocationId.newInstance() callSiteMap?.set(replyId, Throwable("")) try { @@ -233,13 +240,8 @@ class RPCClientProxyHandler( "Generated several RPC requests with same ID $replyId" } - outgoingRequestBuffer[replyId] = request - // try and send the request sendMessage(request) - val result = replyFuture.getOrThrow() - // at this point the server responded, remove the buffered request - outgoingRequestBuffer.remove(replyId) - return result + return replyFuture.getOrThrow() } catch (e: RuntimeException) { // Already an unchecked exception, so just rethrow it throw e @@ -407,8 +409,12 @@ class RPCClientProxyHandler( private fun failoverHandler(event: FailoverEventType) { when (event) { FailoverEventType.FAILURE_DETECTED -> { - log.warn("RPC server unavailable. RPC calls are being buffered.") - log.warn("Terminating observables.") + lock.writeLock().withLock { + sendingEnabled = false + } + + log.warn("RPC server unavailable.") + log.warn("Terminating observables and in flight RPCs.") val m = observableContext.observableMap.asMap() m.keys.forEach { k -> observationExecutorPool.run(k) { @@ -416,24 +422,24 @@ class RPCClientProxyHandler( } } observableContext.observableMap.invalidateAll() + + rpcReplyMap.forEach { _, replyFuture -> + replyFuture.setException(RPCException("Connection failure detected.")) + } + + rpcReplyMap.clear() + callSiteMap?.clear() } FailoverEventType.FAILOVER_COMPLETED -> { - log.info("RPC server available. Draining request buffer.") - outgoingRequestBuffer.keys.forEach { replyId -> - outgoingRequestBuffer[replyId]?.let { sendMessage(it) } + lock.writeLock().withLock { + sendingEnabled = true } + log.info("RPC server available.") } FailoverEventType.FAILOVER_FAILED -> { - log.error("Could not reconnect to the RPC server. All buffered requests will be discarded and RPC calls " + - "will throw an RPCException.") - rpcReplyMap.forEach { id, replyFuture -> - replyFuture.setException(RPCException("Could not re-connect to RPC server. Failover failed.")) - } - outgoingRequestBuffer.clear() - rpcReplyMap.clear() - callSiteMap?.clear() + log.error("Could not reconnect to the RPC server.") } } }