diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 6397a27458..bb45162494 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -3,12 +3,17 @@ package net.corda.client.rpc import net.corda.client.rpc.internal.RPCClient import net.corda.core.context.Trace import net.corda.core.crypto.random63BitValue +import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.internal.concurrent.fork import net.corda.core.internal.concurrent.transpose import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.serialize -import net.corda.core.utilities.* +import net.corda.core.utilities.NetworkHostAndPort +import net.corda.core.utilities.Try +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.millis +import net.corda.core.utilities.seconds import net.corda.node.services.rpc.RPCServerConfiguration import net.corda.nodeapi.RPCApi import net.corda.testing.common.internal.eventually @@ -16,11 +21,21 @@ import net.corda.testing.common.internal.succeeds import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.internal.testThreadFactory -import net.corda.testing.node.internal.* +import net.corda.testing.node.internal.RPCDriverDSL +import net.corda.testing.node.internal.RpcBrokerHandle +import net.corda.testing.node.internal.RpcServerHandle +import net.corda.testing.node.internal.poll +import net.corda.testing.node.internal.rpcDriver +import net.corda.testing.node.internal.rpcTestUser +import net.corda.testing.node.internal.startRandomRpcClient +import net.corda.testing.node.internal.startRpcClient import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.core.SimpleString import org.junit.After -import org.junit.Assert.* +import org.junit.Assert.assertEquals +import org.junit.Assert.assertNotNull +import org.junit.Assert.assertTrue +import org.junit.Assert.fail import org.junit.Ignore import org.junit.Rule import org.junit.Test @@ -286,6 +301,31 @@ class RPCStabilityTests { } } + @Test + fun `connection exits when bad config means the exception is unrecoverable`() { + rpcDriver { + val ops = object : ReconnectOps { + override val protocolVersion = 1000 + override fun ping() = "pong" + } + + val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! + val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = PLATFORM_VERSION + 1) + val clientFollower = shutdownManager.follower() + val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() + try { + client.ping() + } catch (e: Exception) { + assertTrue(e is RPCException) + assertTrue(e.cause is UnrecoverableRPCException) + assertEquals(e.message, "Requested minimum protocol version " + + "(${PLATFORM_VERSION}) is higher than the server's supported protocol version (${PLATFORM_VERSION +1})") + } + clientFollower.unfollow() + clientFollower.shutdown() // Driver would do this after the new server, causing hang. + } + } + interface NoOps : RPCOps { fun subscribe(): Observable } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/RPCException.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/RPCException.kt index e1bcdeafaf..35ea942f5e 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/RPCException.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/RPCException.kt @@ -9,6 +9,11 @@ open class RPCException(message: String?, cause: Throwable?) : CordaRuntimeExcep constructor(msg: String) : this(msg, null) } +/** + * Thrown to indicate a fatal error in the RPC system which cannot be recovered from and so needs some manual support. + */ +open class UnrecoverableRPCException(message: String?, cause: Throwable? = null) : RPCException(message, cause) + /** * Thrown to indicate an RPC operation has been retried for the [maxNumberOfRetries] unsuccessfully. * @param maxNumberOfRetries the number of retries that had been performed. @@ -20,4 +25,4 @@ class MaxRpcRetryException(maxNumberOfRetries: Int, cause: Throwable?): /** * Signals that the underlying [RPCConnection] dropped. */ -open class ConnectionFailureException(cause: Throwable? = null) : RPCException("Connection failure detected.", cause) \ No newline at end of file +open class ConnectionFailureException(cause: Throwable? = null) : RPCException("Connection failure detected.", cause) diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 34d562a75d..e728bfcca6 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -2,7 +2,7 @@ package net.corda.client.rpc.internal import net.corda.client.rpc.CordaRPCClientConfiguration import net.corda.client.rpc.RPCConnection -import net.corda.client.rpc.RPCException +import net.corda.client.rpc.UnrecoverableRPCException import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.crypto.random63BitValue @@ -94,7 +94,8 @@ class RPCClient( val ops: I = uncheckedCast(Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler)) val serverProtocolVersion = ops.protocolVersion if (serverProtocolVersion < rpcConfiguration.minimumServerProtocolVersion) { - throw RPCException("Requested minimum protocol version (${rpcConfiguration.minimumServerProtocolVersion}) is higher" + + throw UnrecoverableRPCException("Requested minimum protocol version " + + "(${rpcConfiguration.minimumServerProtocolVersion}) is higher" + " than the server's supported protocol version ($serverProtocolVersion)") } proxyHandler.setServerProtocolVersion(serverProtocolVersion) @@ -129,3 +130,4 @@ class RPCClient( } } } + diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index ee2f5052a6..aabea195fc 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -16,8 +16,12 @@ import net.corda.core.context.Actor import net.corda.core.context.Trace import net.corda.core.context.Trace.InvocationId import net.corda.core.identity.CordaX500Name -import net.corda.core.internal.* +import net.corda.core.internal.LazyStickyPool +import net.corda.core.internal.LifeCycle +import net.corda.core.internal.NamedCacheFactory +import net.corda.core.internal.ThreadBox import net.corda.core.internal.messaging.InternalCordaRPCOps +import net.corda.core.internal.times import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationContext @@ -33,15 +37,26 @@ import org.apache.activemq.artemis.api.core.ActiveMQException import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.SimpleString -import org.apache.activemq.artemis.api.core.client.* import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE +import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientProducer +import org.apache.activemq.artemis.api.core.client.ClientSession +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory +import org.apache.activemq.artemis.api.core.client.FailoverEventType +import org.apache.activemq.artemis.api.core.client.ServerLocator import rx.Notification import rx.Observable import rx.subjects.UnicastSubject import java.lang.reflect.InvocationHandler import java.lang.reflect.Method import java.util.* -import java.util.concurrent.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicLong import kotlin.reflect.jvm.javaMethod diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt index 7f0a46efe4..7eaa096b6f 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/ReconnectingCordaRPCOps.kt @@ -123,6 +123,7 @@ class ReconnectingCordaRPCOps private constructor( ) } } + /** * Helper class useful for reconnecting to a Node. */ @@ -216,10 +217,7 @@ class ReconnectingCordaRPCOps private constructor( // Deliberately not logging full stack trace as it will be full of internal stacktraces. log.debug { "Exception upon establishing connection: ${ex.message}" } } - is ActiveMQConnectionTimedOutException -> { - // Deliberately not logging full stack trace as it will be full of internal stacktraces. - log.debug { "Exception upon establishing connection: ${ex.message}" } - } + is ActiveMQConnectionTimedOutException, is ActiveMQUnBlockedException -> { // Deliberately not logging full stack trace as it will be full of internal stacktraces. log.debug { "Exception upon establishing connection: ${ex.message}" } diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index 61feef9a80..80f90b6c00 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -323,9 +323,11 @@ thread, however note that two separate Observables may invoke their respective c Error handling -------------- -If something goes wrong with the RPC infrastructure itself, an ``RPCException`` is thrown. If you call a method that -requires a higher version of the protocol than the server supports, ``UnsupportedOperationException`` is thrown. -Otherwise the behaviour depends on the ``devMode`` node configuration option. +If something goes wrong with the RPC infrastructure itself, an ``RPCException`` is thrown. If something +goes wrong that needs a manual intervention to resolve (e.g. a configuration error), an +``UnrecoverableRPCException`` is thrown. If you call a method that requires a higher version of the protocol +than the server supports, ``UnsupportedOperationException`` is thrown. Otherwise the behaviour depends +on the ``devMode`` node configuration option. If the server implementation throws an exception, that exception is serialised and rethrown on the client side as if it was thrown from inside the called RPC method. These exceptions can be caught as normal.