Fix RPC observation vs reply ordering

This commit is contained in:
Andras Slemmer 2018-02-14 15:16:59 +00:00
parent 7924a5a834
commit 81b16776f3

View File

@ -221,11 +221,16 @@ class RPCServer(
private fun handleSendJob(sequenceNumber: Long, job: RpcSendJob.Send) { private fun handleSendJob(sequenceNumber: Long, job: RpcSendJob.Send) {
try { try {
job.artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, sequenceNumber) val artemisMessage = producerSession!!.createMessage(false)
rpcProducer!!.send(job.clientAddress, job.artemisMessage) // We must do the serialisation here as any encountered Observables may already have events, which would
log.debug { "<- RPC <- ${job.originalMessage}" } // trigger more sends. We must make sure that the root of the Observables (e.g. the RPC reply) is sent
// before any child observations.
job.message.writeToClientMessage(job.serializationContext, artemisMessage)
artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, sequenceNumber)
rpcProducer!!.send(job.clientAddress, artemisMessage)
log.debug { "<- RPC <- ${job.message}" }
} catch (throwable: Throwable) { } catch (throwable: Throwable) {
log.error("Failed to send message, kicking client. Message was ${job.originalMessage}", throwable) log.error("Failed to send message, kicking client. Message was ${job.message}", throwable)
serverControl!!.closeConsumerConnectionsForAddress(job.clientAddress.toString()) serverControl!!.closeConsumerConnectionsForAddress(job.clientAddress.toString())
invalidateClient(job.clientAddress) invalidateClient(job.clientAddress)
} }
@ -413,17 +418,15 @@ class RPCServer(
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this) private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this)
fun sendMessage(serverToClient: RPCApi.ServerToClient) { fun sendMessage(serverToClient: RPCApi.ServerToClient) {
val artemisMessage = producerSession!!.createMessage(false) sendJobQueue.put(RpcSendJob.Send(clientAddress, serializationContextWithObservableContext, serverToClient))
serverToClient.writeToClientMessage(serializationContextWithObservableContext, artemisMessage)
sendJobQueue.put(RpcSendJob.Send(clientAddress, artemisMessage, serverToClient))
} }
} }
private sealed class RpcSendJob { private sealed class RpcSendJob {
data class Send( data class Send(
val clientAddress: SimpleString, val clientAddress: SimpleString,
val artemisMessage: ClientMessage, val serializationContext: SerializationContext,
val originalMessage: RPCApi.ServerToClient val message: RPCApi.ServerToClient
) : RpcSendJob() ) : RpcSendJob()
object Stop : RpcSendJob() object Stop : RpcSendJob()
} }