mirror of
https://github.com/corda/corda.git
synced 2025-04-28 15:02:59 +00:00
CORDA-3192: Add an exception for Unrecoverable RPC errors (#5558)
CORDA-3192: Add an exception for Unrecoverable RPC errors (#5558)
This commit is contained in:
commit
1ba3aebeba
@ -3,12 +3,17 @@ package net.corda.client.rpc
|
|||||||
import net.corda.client.rpc.internal.RPCClient
|
import net.corda.client.rpc.internal.RPCClient
|
||||||
import net.corda.core.context.Trace
|
import net.corda.core.context.Trace
|
||||||
import net.corda.core.crypto.random63BitValue
|
import net.corda.core.crypto.random63BitValue
|
||||||
|
import net.corda.core.internal.PLATFORM_VERSION
|
||||||
import net.corda.core.internal.concurrent.fork
|
import net.corda.core.internal.concurrent.fork
|
||||||
import net.corda.core.internal.concurrent.transpose
|
import net.corda.core.internal.concurrent.transpose
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.serialization.SerializationDefaults
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
import net.corda.core.serialization.serialize
|
import net.corda.core.serialization.serialize
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
import net.corda.core.utilities.Try
|
||||||
|
import net.corda.core.utilities.getOrThrow
|
||||||
|
import net.corda.core.utilities.millis
|
||||||
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.node.services.rpc.RPCServerConfiguration
|
import net.corda.node.services.rpc.RPCServerConfiguration
|
||||||
import net.corda.nodeapi.RPCApi
|
import net.corda.nodeapi.RPCApi
|
||||||
import net.corda.testing.common.internal.eventually
|
import net.corda.testing.common.internal.eventually
|
||||||
@ -16,11 +21,21 @@ import net.corda.testing.common.internal.succeeds
|
|||||||
import net.corda.testing.core.SerializationEnvironmentRule
|
import net.corda.testing.core.SerializationEnvironmentRule
|
||||||
import net.corda.testing.driver.internal.incrementalPortAllocation
|
import net.corda.testing.driver.internal.incrementalPortAllocation
|
||||||
import net.corda.testing.internal.testThreadFactory
|
import net.corda.testing.internal.testThreadFactory
|
||||||
import net.corda.testing.node.internal.*
|
import net.corda.testing.node.internal.RPCDriverDSL
|
||||||
|
import net.corda.testing.node.internal.RpcBrokerHandle
|
||||||
|
import net.corda.testing.node.internal.RpcServerHandle
|
||||||
|
import net.corda.testing.node.internal.poll
|
||||||
|
import net.corda.testing.node.internal.rpcDriver
|
||||||
|
import net.corda.testing.node.internal.rpcTestUser
|
||||||
|
import net.corda.testing.node.internal.startRandomRpcClient
|
||||||
|
import net.corda.testing.node.internal.startRpcClient
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Assert.*
|
import org.junit.Assert.assertEquals
|
||||||
|
import org.junit.Assert.assertNotNull
|
||||||
|
import org.junit.Assert.assertTrue
|
||||||
|
import org.junit.Assert.fail
|
||||||
import org.junit.Ignore
|
import org.junit.Ignore
|
||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
@ -286,6 +301,31 @@ class RPCStabilityTests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `connection exits when bad config means the exception is unrecoverable`() {
|
||||||
|
rpcDriver {
|
||||||
|
val ops = object : ReconnectOps {
|
||||||
|
override val protocolVersion = 1000
|
||||||
|
override fun ping() = "pong"
|
||||||
|
}
|
||||||
|
|
||||||
|
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
|
||||||
|
val clientConfiguration = CordaRPCClientConfiguration.DEFAULT.copy(minimumServerProtocolVersion = PLATFORM_VERSION + 1)
|
||||||
|
val clientFollower = shutdownManager.follower()
|
||||||
|
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
|
||||||
|
try {
|
||||||
|
client.ping()
|
||||||
|
} catch (e: Exception) {
|
||||||
|
assertTrue(e is RPCException)
|
||||||
|
assertTrue(e.cause is UnrecoverableRPCException)
|
||||||
|
assertEquals(e.message, "Requested minimum protocol version " +
|
||||||
|
"(${PLATFORM_VERSION}) is higher than the server's supported protocol version (${PLATFORM_VERSION +1})")
|
||||||
|
}
|
||||||
|
clientFollower.unfollow()
|
||||||
|
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
interface NoOps : RPCOps {
|
interface NoOps : RPCOps {
|
||||||
fun subscribe(): Observable<Nothing>
|
fun subscribe(): Observable<Nothing>
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,11 @@ open class RPCException(message: String?, cause: Throwable?) : CordaRuntimeExcep
|
|||||||
constructor(msg: String) : this(msg, null)
|
constructor(msg: String) : this(msg, null)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Thrown to indicate a fatal error in the RPC system which cannot be recovered from and so needs some manual support.
|
||||||
|
*/
|
||||||
|
open class UnrecoverableRPCException(message: String?, cause: Throwable? = null) : RPCException(message, cause)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Thrown to indicate an RPC operation has been retried for the [maxNumberOfRetries] unsuccessfully.
|
* Thrown to indicate an RPC operation has been retried for the [maxNumberOfRetries] unsuccessfully.
|
||||||
* @param maxNumberOfRetries the number of retries that had been performed.
|
* @param maxNumberOfRetries the number of retries that had been performed.
|
||||||
|
@ -2,7 +2,7 @@ package net.corda.client.rpc.internal
|
|||||||
|
|
||||||
import net.corda.client.rpc.CordaRPCClientConfiguration
|
import net.corda.client.rpc.CordaRPCClientConfiguration
|
||||||
import net.corda.client.rpc.RPCConnection
|
import net.corda.client.rpc.RPCConnection
|
||||||
import net.corda.client.rpc.RPCException
|
import net.corda.client.rpc.UnrecoverableRPCException
|
||||||
import net.corda.core.context.Actor
|
import net.corda.core.context.Actor
|
||||||
import net.corda.core.context.Trace
|
import net.corda.core.context.Trace
|
||||||
import net.corda.core.crypto.random63BitValue
|
import net.corda.core.crypto.random63BitValue
|
||||||
@ -94,7 +94,8 @@ class RPCClient<I : RPCOps>(
|
|||||||
val ops: I = uncheckedCast(Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler))
|
val ops: I = uncheckedCast(Proxy.newProxyInstance(rpcOpsClass.classLoader, arrayOf(rpcOpsClass), proxyHandler))
|
||||||
val serverProtocolVersion = ops.protocolVersion
|
val serverProtocolVersion = ops.protocolVersion
|
||||||
if (serverProtocolVersion < rpcConfiguration.minimumServerProtocolVersion) {
|
if (serverProtocolVersion < rpcConfiguration.minimumServerProtocolVersion) {
|
||||||
throw RPCException("Requested minimum protocol version (${rpcConfiguration.minimumServerProtocolVersion}) is higher" +
|
throw UnrecoverableRPCException("Requested minimum protocol version " +
|
||||||
|
"(${rpcConfiguration.minimumServerProtocolVersion}) is higher" +
|
||||||
" than the server's supported protocol version ($serverProtocolVersion)")
|
" than the server's supported protocol version ($serverProtocolVersion)")
|
||||||
}
|
}
|
||||||
proxyHandler.setServerProtocolVersion(serverProtocolVersion)
|
proxyHandler.setServerProtocolVersion(serverProtocolVersion)
|
||||||
@ -129,3 +130,4 @@ class RPCClient<I : RPCOps>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,8 +16,12 @@ import net.corda.core.context.Actor
|
|||||||
import net.corda.core.context.Trace
|
import net.corda.core.context.Trace
|
||||||
import net.corda.core.context.Trace.InvocationId
|
import net.corda.core.context.Trace.InvocationId
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.internal.*
|
import net.corda.core.internal.LazyStickyPool
|
||||||
|
import net.corda.core.internal.LifeCycle
|
||||||
|
import net.corda.core.internal.NamedCacheFactory
|
||||||
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
import net.corda.core.internal.messaging.InternalCordaRPCOps
|
||||||
|
import net.corda.core.internal.times
|
||||||
import net.corda.core.messaging.CordaRPCOps
|
import net.corda.core.messaging.CordaRPCOps
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.serialization.SerializationContext
|
import net.corda.core.serialization.SerializationContext
|
||||||
@ -33,15 +37,26 @@ import org.apache.activemq.artemis.api.core.ActiveMQException
|
|||||||
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
|
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.*
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
|
||||||
|
import org.apache.activemq.artemis.api.core.client.FailoverEventType
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||||
import rx.Notification
|
import rx.Notification
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.UnicastSubject
|
import rx.subjects.UnicastSubject
|
||||||
import java.lang.reflect.InvocationHandler
|
import java.lang.reflect.InvocationHandler
|
||||||
import java.lang.reflect.Method
|
import java.lang.reflect.Method
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.*
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.ExecutorService
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
|
import java.util.concurrent.ScheduledFuture
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import kotlin.reflect.jvm.javaMethod
|
import kotlin.reflect.jvm.javaMethod
|
||||||
|
@ -123,6 +123,7 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper class useful for reconnecting to a Node.
|
* Helper class useful for reconnecting to a Node.
|
||||||
*/
|
*/
|
||||||
@ -216,10 +217,7 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||||
log.debug { "Exception upon establishing connection: ${ex.message}" }
|
log.debug { "Exception upon establishing connection: ${ex.message}" }
|
||||||
}
|
}
|
||||||
is ActiveMQConnectionTimedOutException -> {
|
is ActiveMQConnectionTimedOutException,
|
||||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
|
||||||
log.debug { "Exception upon establishing connection: ${ex.message}" }
|
|
||||||
}
|
|
||||||
is ActiveMQUnBlockedException -> {
|
is ActiveMQUnBlockedException -> {
|
||||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||||
log.debug { "Exception upon establishing connection: ${ex.message}" }
|
log.debug { "Exception upon establishing connection: ${ex.message}" }
|
||||||
|
@ -323,9 +323,11 @@ thread, however note that two separate Observables may invoke their respective c
|
|||||||
|
|
||||||
Error handling
|
Error handling
|
||||||
--------------
|
--------------
|
||||||
If something goes wrong with the RPC infrastructure itself, an ``RPCException`` is thrown. If you call a method that
|
If something goes wrong with the RPC infrastructure itself, an ``RPCException`` is thrown. If something
|
||||||
requires a higher version of the protocol than the server supports, ``UnsupportedOperationException`` is thrown.
|
goes wrong that needs a manual intervention to resolve (e.g. a configuration error), an
|
||||||
Otherwise the behaviour depends on the ``devMode`` node configuration option.
|
``UnrecoverableRPCException`` is thrown. If you call a method that requires a higher version of the protocol
|
||||||
|
than the server supports, ``UnsupportedOperationException`` is thrown. Otherwise the behaviour depends
|
||||||
|
on the ``devMode`` node configuration option.
|
||||||
|
|
||||||
If the server implementation throws an exception, that exception is serialised and rethrown on the client
|
If the server implementation throws an exception, that exception is serialised and rethrown on the client
|
||||||
side as if it was thrown from inside the called RPC method. These exceptions can be caught as normal.
|
side as if it was thrown from inside the called RPC method. These exceptions can be caught as normal.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user