Merge pull request #3834 from corda/mike-rpc-propagate-deser-errors

Propagate RPC deserialisation faults back to the caller
This commit is contained in:
PokeyBot
2018-08-24 16:52:23 +01:00
committed by GitHub
3 changed files with 77 additions and 60 deletions

View File

@ -31,26 +31,15 @@ import org.apache.activemq.artemis.api.core.ActiveMQException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.RoutingType import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
import org.apache.activemq.artemis.api.core.client.FailoverEventType
import org.apache.activemq.artemis.api.core.client.ServerLocator
import rx.Notification import rx.Notification
import rx.Observable import rx.Observable
import rx.subjects.UnicastSubject import rx.subjects.UnicastSubject
import java.lang.reflect.InvocationHandler import java.lang.reflect.InvocationHandler
import java.lang.reflect.Method import java.lang.reflect.Method
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import kotlin.reflect.jvm.javaMethod import kotlin.reflect.jvm.javaMethod
@ -288,56 +277,71 @@ class RPCClientProxyHandler(
// The handler for Artemis messages. // The handler for Artemis messages.
private fun artemisMessageHandler(message: ClientMessage) { private fun artemisMessageHandler(message: ClientMessage) {
val serverToClient = RPCApi.ServerToClient.fromClientMessage(serializationContextWithObservableContext, message) fun completeExceptionally(id: InvocationId, e: Throwable, future: SettableFuture<Any?>?) {
val deduplicationSequenceNumber = message.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME) val rpcCallSite: Throwable? = callSiteMap?.get(id)
if (deduplicationChecker.checkDuplicateMessageId(serverToClient.deduplicationIdentity, deduplicationSequenceNumber)) { if (rpcCallSite != null) addRpcCallSiteToThrowable(e, rpcCallSite)
log.info("Message duplication detected, discarding message") future?.setException(e.cause ?: e)
return
} }
log.debug { "Got message from RPC server $serverToClient" }
when (serverToClient) { try {
is RPCApi.ServerToClient.RpcReply -> { // Deserialize the reply from the server, both the wrapping metadata and the actual body of the return value.
val replyFuture = rpcReplyMap.remove(serverToClient.id) val serverToClient: RPCApi.ServerToClient = try {
if (replyFuture == null) { RPCApi.ServerToClient.fromClientMessage(serializationContextWithObservableContext, message)
log.error("RPC reply arrived to unknown RPC ID ${serverToClient.id}, this indicates an internal RPC error.") } catch (e: RPCApi.ServerToClient.FailedToDeserializeReply) {
} else { // Might happen if something goes wrong during mapping the response to classes, evolution, class synthesis etc.
val result = serverToClient.result log.error("Failed to deserialize RPC body", e)
when (result) { completeExceptionally(e.id, e, rpcReplyMap.remove(e.id))
is Try.Success -> replyFuture.set(result.value) return
is Try.Failure -> { }
val rpcCallSite = callSiteMap?.get(serverToClient.id) val deduplicationSequenceNumber = message.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME)
if (rpcCallSite != null) addRpcCallSiteToThrowable(result.exception, rpcCallSite) if (deduplicationChecker.checkDuplicateMessageId(serverToClient.deduplicationIdentity, deduplicationSequenceNumber)) {
replyFuture.setException(result.exception) log.info("Message duplication detected, discarding message")
} return
} }
} log.debug { "Got message from RPC server $serverToClient" }
} when (serverToClient) {
is RPCApi.ServerToClient.Observation -> { is RPCApi.ServerToClient.RpcReply -> {
val observable = observableContext.observableMap.getIfPresent(serverToClient.id) val replyFuture = rpcReplyMap.remove(serverToClient.id)
if (observable == null) { if (replyFuture == null) {
log.debug("Observation ${serverToClient.content} arrived to unknown Observable with ID ${serverToClient.id}. " + log.error("RPC reply arrived to unknown RPC ID ${serverToClient.id}, this indicates an internal RPC error.")
"This may be due to an observation arriving before the server was " + } else {
"notified of observable shutdown") val result: Try<Any?> = serverToClient.result
} else { when (result) {
// We schedule the onNext() on an executor sticky-pooled based on the Observable ID. is Try.Success -> replyFuture.set(result.value)
observationExecutorPool.run(serverToClient.id) { executor -> is Try.Failure -> {
executor.submit { completeExceptionally(serverToClient.id, result.exception, replyFuture)
val content = serverToClient.content }
if (content.isOnCompleted || content.isOnError) { }
observableContext.observableMap.invalidate(serverToClient.id) }
} }
// Add call site information on error is RPCApi.ServerToClient.Observation -> {
if (content.isOnError) { val observable: UnicastSubject<Notification<*>>? = observableContext.observableMap.getIfPresent(serverToClient.id)
val rpcCallSite = callSiteMap?.get(serverToClient.id) if (observable == null) {
if (rpcCallSite != null) addRpcCallSiteToThrowable(content.throwable, rpcCallSite) log.debug("Observation ${serverToClient.content} arrived to unknown Observable with ID ${serverToClient.id}. " +
} "This may be due to an observation arriving before the server was " +
observable.onNext(content) "notified of observable shutdown")
} else {
// We schedule the onNext() on an executor sticky-pooled based on the Observable ID.
observationExecutorPool.run(serverToClient.id) { executor ->
executor.submit {
val content = serverToClient.content
if (content.isOnCompleted || content.isOnError) {
observableContext.observableMap.invalidate(serverToClient.id)
}
// Add call site information on error
if (content.isOnError) {
val rpcCallSite = callSiteMap?.get(serverToClient.id)
if (rpcCallSite != null) addRpcCallSiteToThrowable(content.throwable, rpcCallSite)
}
observable.onNext(content)
}
} }
} }
} }
} }
} finally {
message.acknowledge()
} }
message.acknowledge()
} }
/** /**

View File

@ -14,7 +14,7 @@ import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.Try import net.corda.core.utilities.Try
import org.apache.activemq.artemis.api.core.ActiveMQBuffer import org.apache.activemq.artemis.api.core.ActiveMQBuffer
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.* import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.management.CoreNotificationType import org.apache.activemq.artemis.api.core.management.CoreNotificationType
import org.apache.activemq.artemis.api.core.management.ManagementHelper import org.apache.activemq.artemis.api.core.management.ManagementHelper
import org.apache.activemq.artemis.reader.MessageUtil import org.apache.activemq.artemis.reader.MessageUtil
@ -212,6 +212,11 @@ object RPCApi {
} }
} }
/**
* Thrown if the RPC reply body couldn't be deserialized.
*/
class FailedToDeserializeReply(val id: InvocationId, cause: Throwable) : RuntimeException("Failed to deserialize RPC reply: ${cause.message}", cause)
companion object { companion object {
private fun Any.safeSerialize(context: SerializationContext, wrap: (Throwable) -> Any) = try { private fun Any.safeSerialize(context: SerializationContext, wrap: (Throwable) -> Any) = try {
serialize(context = context) serialize(context = context)
@ -226,10 +231,18 @@ object RPCApi {
RPCApi.ServerToClient.Tag.RPC_REPLY -> { RPCApi.ServerToClient.Tag.RPC_REPLY -> {
val id = message.invocationId(RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.") val id = message.invocationId(RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.")
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, id) val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, id)
// The result here is a Try<> that represents the attempt to try the operation on the server side.
// If anything goes wrong with deserialisation of the response, we propagate it differently because
// we also need to pass through the invocation and dedupe IDs.
val result: Try<Any?> = try {
message.getBodyAsByteArray().deserialize(context = poolWithIdContext)
} catch (e: Exception) {
throw FailedToDeserializeReply(id, e)
}
RpcReply( RpcReply(
id = id, id = id,
deduplicationIdentity = deduplicationIdentity, deduplicationIdentity = deduplicationIdentity,
result = message.getBodyAsByteArray().deserialize(context = poolWithIdContext) result = result
) )
} }
RPCApi.ServerToClient.Tag.OBSERVATION -> { RPCApi.ServerToClient.Tag.OBSERVATION -> {

View File

@ -101,7 +101,7 @@ class DeserializationInput constructor(
} catch (nse: NotSerializableException) { } catch (nse: NotSerializableException) {
throw nse throw nse
} catch (t: Throwable) { } catch (t: Throwable) {
throw NotSerializableException("Unexpected throwable: ${t.message}").apply { initCause(t) } throw NotSerializableException("Internal deserialization failure: ${t.javaClass.name}: ${t.message}").apply { initCause(t) }
} finally { } finally {
objectHistory.clear() objectHistory.clear()
} }