RPC: Report failures during deserialisation of method arguments to the client as an exception.

This commit is contained in:
Mike Hearn 2017-09-22 15:22:40 +02:00 committed by Mike Hearn
parent 4030903fee
commit 20a9892123
2 changed files with 47 additions and 10 deletions

View File

@ -128,15 +128,26 @@ object RPCApi {
}
companion object {
fun fromClientMessage(context: SerializationContext, message: ClientMessage): ClientToServer {
val tag = Tag.values()[message.getIntProperty(TAG_FIELD_NAME)]
return when (tag) {
RPCApi.ClientToServer.Tag.RPC_REQUEST -> RpcRequest(
clientAddress = MessageUtil.getJMSReplyTo(message),
id = RpcRequestId(message.getLongProperty(RPC_ID_FIELD_NAME)),
methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME),
arguments = message.getBodyAsByteArray().deserialize(context = context)
)
RPCApi.ClientToServer.Tag.RPC_REQUEST -> {
val partialReq = RpcRequest(
clientAddress = MessageUtil.getJMSReplyTo(message),
id = RpcRequestId(message.getLongProperty(RPC_ID_FIELD_NAME)),
methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME),
arguments = emptyList()
)
// Deserialisation of the arguments can fail, but we'd like to return a response mapped to
// this specific RPC, so we throw the partial request with ID and method name.
try {
val arguments = message.getBodyAsByteArray().deserialize<List<Any?>>(context = context)
return partialReq.copy(arguments = arguments)
} catch (t: Throwable) {
throw ArgumentDeserialisationException(t, partialReq)
}
}
RPCApi.ClientToServer.Tag.OBSERVABLES_CLOSED -> {
val ids = ArrayList<ObservableId>()
val buffer = message.bodyBuffer
@ -149,6 +160,7 @@ object RPCApi {
}
}
}
}
/**
@ -215,6 +227,13 @@ object RPCApi {
}
}
}
/**
* Thrown when the arguments passed to an RPC couldn't be deserialised by the server. This will
* typically indicate a missing application on the server side, or different versions between
* client and server.
*/
class ArgumentDeserialisationException(cause: Throwable, val reqWithNoArguments: ClientToServer.RpcRequest) : RPCException("Failed to deserialise RPC arguments: version or app skew between client and server?", cause)
}
data class ArtemisProducer(

View File

@ -260,13 +260,31 @@ class RPCServer(
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
lifeCycle.requireState(State.STARTED)
val clientToServer = RPCApi.ClientToServer.fromClientMessage(RPC_SERVER_CONTEXT, artemisMessage)
// Attempt de-serialisation of the RPC request message, in such a way that we can route the error back to
// the RPC that was being tried if it fails in a method/rpc specific way.
val clientToServerTry = Try.on { RPCApi.ClientToServer.fromClientMessage(RPC_SERVER_CONTEXT, artemisMessage) }
val clientToServer = try {
clientToServerTry.getOrThrow()
} catch (e: RPCApi.ArgumentDeserialisationException) {
// The exception itself has a more informative error message, and this could be caused by buggy apps, so
// let's just log it as a warning instead of an error. Relay the failure to the client so they can see it.
log.warn("Inbound RPC failed", e)
sendReply(e.reqWithNoArguments.id, e.reqWithNoArguments.clientAddress, Try.Failure(e.cause!!))
return
} catch (e: Exception) {
// This path indicates something more fundamental went wrong, like a missing message header.
log.error("Failed to parse an inbound RPC: version skew between client and server?", e)
return
} finally {
artemisMessage.acknowledge()
}
// Now try dispatching the request itself.
log.debug { "-> RPC -> $clientToServer" }
when (clientToServer) {
is RPCApi.ClientToServer.RpcRequest -> {
val rpcContext = RpcContext(
currentUser = getUser(artemisMessage)
)
val rpcContext = RpcContext(currentUser = getUser(artemisMessage))
rpcExecutor!!.submit {
val result = invokeRpc(rpcContext, clientToServer.methodName, clientToServer.arguments)
sendReply(clientToServer.id, clientToServer.clientAddress, result)