NOTICK Migrate recent RPC related changes to OS, as these were initially imp… (#6532)

* Migrate recent RPC related changes to OS, as these were initially implemented in ENT only

* tests cleanup

* cleanup imports
This commit is contained in:
Tamas Veingartner 2020-08-07 09:18:09 +01:00 committed by GitHub
parent 0d5ee8b0fa
commit e6af60edda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 67 additions and 41 deletions

View File

@ -6,6 +6,7 @@ import net.corda.client.rpc.CordaRPCClientTest
import net.corda.client.rpc.GracefulReconnect
import net.corda.client.rpc.MaxRpcRetryException
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.UnrecoverableRPCException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps
import net.corda.core.messaging.startTrackedFlow
import net.corda.core.utilities.NetworkHostAndPort
@ -82,6 +83,29 @@ class CordaRPCClientReconnectionTest {
}
@Test(timeout=300_000)
fun `minimum server protocol version should cause exception if higher than allowed`() {
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) {
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
fun startNode(): NodeHandle {
return startNode(
providedName = CHARLIE_NAME,
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
customOverrides = mapOf("rpcSettings.address" to address.toString())
).getOrThrow()
}
assertThatThrownBy {
val node = startNode ()
val client = CordaRPCClient(node.rpcAddress, config.copy(minimumServerProtocolVersion = 100, maxReconnectAttempts = 1))
client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)
}
.isInstanceOf(UnrecoverableRPCException::class.java)
.hasMessageStartingWith("Requested minimum protocol version (100) is higher than the server's supported protocol version ")
}
}
@Test(timeout=300_000)
fun `rpc client calls and returned observables continue working when the server crashes and restarts`() {
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) {
@ -292,7 +316,7 @@ class CordaRPCClientReconnectionTest {
val node = startNode()
CordaRPCClient(node.rpcAddress, config).start(rpcUser.username, rpcUser.password, gracefulReconnect).use {
node.stop()
thread() {
thread {
it.proxy.startTrackedFlow(
::CashIssueFlow,
10.DOLLARS,
@ -349,4 +373,4 @@ class CordaRPCClientReconnectionTest {
}
}
}
}
}

View File

@ -46,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
import org.apache.activemq.artemis.api.core.client.FailoverEventListener
import org.apache.activemq.artemis.api.core.client.FailoverEventType
import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Notification
@ -124,7 +125,7 @@ internal class RPCClientProxyHandler(
val toStringMethod: Method = Object::toString.javaMethod!!
val equalsMethod: Method = Object::equals.javaMethod!!
val hashCodeMethod: Method = Object::hashCode.javaMethod!!
var terminating = false
private fun addRpcCallSiteToThrowable(throwable: Throwable, callSite: CallSite) {
var currentThrowable = throwable
while (true) {
@ -217,7 +218,7 @@ internal class RPCClientProxyHandler(
.weakValues()
.removalListener(onObservableRemove)
.executor(MoreExecutors.directExecutor()),
"RpcClientProxyHandler_rpcObservable"
"RpcClientProxyHandler_rpcObservable"
)
}
@ -233,6 +234,22 @@ internal class RPCClientProxyHandler(
private val sendingEnabled = AtomicBoolean(true)
// Used to interrupt failover thread (i.e. client is closed while failing over).
private var haFailoverThread: Thread? = null
private val haFailoverHandler: FailoverHandler = FailoverHandler(
detected = { log.warn("Connection failure. Attempting to reconnect using back-up addresses.")
cleanUpOnConnectionLoss()
sessionFactory?.apply {
connection.destroy()
cleanup()
close()
}
haFailoverThread = Thread.currentThread()
attemptReconnect()
})
private val defaultFailoverHandler: FailoverHandler = FailoverHandler(
detected = { cleanUpOnConnectionLoss() },
completed = { sendingEnabled.set(true)
log.info("RPC server available.")},
failed = { log.error("Could not reconnect to the RPC server.")})
/**
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
@ -265,15 +282,27 @@ internal class RPCClientProxyHandler(
}
// Depending on how the client is constructed, connection failure is treated differently
if (serverLocator.staticTransportConfigurations.size == 1) {
sessionFactory!!.addFailoverListener(this::failoverHandler)
sessionFactory!!.addFailoverListener(defaultFailoverHandler)
} else {
sessionFactory!!.addFailoverListener(this::haFailoverHandler)
sessionFactory!!.addFailoverListener(haFailoverHandler)
}
initSessions()
lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET)
startSessions()
}
class FailoverHandler(private val detected: () -> Unit = {},
private val completed: () -> Unit = {},
private val failed: () -> Unit = {}): FailoverEventListener {
override fun failoverEvent(eventType: FailoverEventType?) {
when (eventType) {
FailoverEventType.FAILURE_DETECTED -> { detected() }
FailoverEventType.FAILOVER_COMPLETED -> { completed() }
FailoverEventType.FAILOVER_FAILED -> { if (!terminating) failed() }
}
}
}
// This is the general function that transforms a client side RPC to internal Artemis messages.
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
lifeCycle.requireState { it == State.STARTED || it == State.SERVER_VERSION_NOT_SET }
@ -313,6 +342,10 @@ internal class RPCClientProxyHandler(
"Generated several RPC requests with same ID $replyId"
}
if (request.methodName.equals("terminate", true)){
terminating = true
}
sendMessage(request)
return replyFuture.getOrThrow()
} catch (e: RuntimeException) {
@ -573,7 +606,7 @@ internal class RPCClientProxyHandler(
log.debug { "Connected successfully after $reconnectAttempt attempts using ${transport.params}." }
log.info("RPC server available.")
sessionFactory!!.addFailoverListener(this::haFailoverHandler)
sessionFactory!!.addFailoverListener(haFailoverHandler)
initSessions()
startSessions()
sendingEnabled.set(true)
@ -602,38 +635,6 @@ internal class RPCClientProxyHandler(
producerSession!!.start()
}
private fun haFailoverHandler(event: FailoverEventType) {
if (event == FailoverEventType.FAILURE_DETECTED) {
log.warn("Connection failure. Attempting to reconnect using back-up addresses.")
cleanUpOnConnectionLoss()
sessionFactory?.apply {
connection.destroy()
cleanup()
close()
}
haFailoverThread = Thread.currentThread()
attemptReconnect()
}
// Other events are not considered as reconnection is not done by Artemis.
}
private fun failoverHandler(event: FailoverEventType) {
when (event) {
FailoverEventType.FAILURE_DETECTED -> {
cleanUpOnConnectionLoss()
}
FailoverEventType.FAILOVER_COMPLETED -> {
sendingEnabled.set(true)
log.info("RPC server available.")
}
FailoverEventType.FAILOVER_FAILED -> {
log.error("Could not reconnect to the RPC server.")
}
}
}
private fun cleanUpOnConnectionLoss() {
sendingEnabled.set(false)
log.warn("Terminating observables.")

View File

@ -9,6 +9,7 @@ import net.corda.client.rpc.MaxRpcRetryException
import net.corda.client.rpc.PermissionException
import net.corda.client.rpc.RPCConnection
import net.corda.client.rpc.RPCException
import net.corda.client.rpc.UnrecoverableRPCException
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CLOSED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTED
import net.corda.client.rpc.internal.ReconnectingCordaRPCOps.ReconnectingRPCConnection.CurrentState.CONNECTING
@ -211,7 +212,7 @@ class ReconnectingCordaRPCOps private constructor(
* Establishes a connection by automatically retrying if the attempt to establish a connection fails.
*
* @param retryInterval the interval between retries.
* @param roundRobinIndex index of the address that will be used for the connection.
* @param roundRobinIndex the index of the address that will be used for the connection.
* @param retries the number of retries remaining. A negative value implies infinite retries.
*/
private tailrec fun establishConnectionWithRetry(
@ -240,7 +241,7 @@ class ReconnectingCordaRPCOps private constructor(
}
} catch (ex: Exception) {
when (ex) {
is ActiveMQSecurityException -> {
is UnrecoverableRPCException, is ActiveMQSecurityException -> {
log.error("Failed to login to node.", ex)
throw ex
}