From 81b16776f3d5109e977d86d75287ef0a7f773eb5 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Wed, 14 Feb 2018 15:16:59 +0000 Subject: [PATCH] Fix RPC observation vs reply ordering --- .../node/services/messaging/RPCServer.kt | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index eca9ce1601..deac23fa84 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -221,11 +221,16 @@ class RPCServer( private fun handleSendJob(sequenceNumber: Long, job: RpcSendJob.Send) { try { - job.artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, sequenceNumber) - rpcProducer!!.send(job.clientAddress, job.artemisMessage) - log.debug { "<- RPC <- ${job.originalMessage}" } + val artemisMessage = producerSession!!.createMessage(false) + // We must do the serialisation here as any encountered Observables may already have events, which would + // trigger more sends. We must make sure that the root of the Observables (e.g. the RPC reply) is sent + // before any child observations. + job.message.writeToClientMessage(job.serializationContext, artemisMessage) + artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, sequenceNumber) + rpcProducer!!.send(job.clientAddress, artemisMessage) + log.debug { "<- RPC <- ${job.message}" } } catch (throwable: Throwable) { - log.error("Failed to send message, kicking client. Message was ${job.originalMessage}", throwable) + log.error("Failed to send message, kicking client. Message was ${job.message}", throwable) serverControl!!.closeConsumerConnectionsForAddress(job.clientAddress.toString()) invalidateClient(job.clientAddress) } @@ -413,17 +418,15 @@ class RPCServer( private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this) fun sendMessage(serverToClient: RPCApi.ServerToClient) { - val artemisMessage = producerSession!!.createMessage(false) - serverToClient.writeToClientMessage(serializationContextWithObservableContext, artemisMessage) - sendJobQueue.put(RpcSendJob.Send(clientAddress, artemisMessage, serverToClient)) + sendJobQueue.put(RpcSendJob.Send(clientAddress, serializationContextWithObservableContext, serverToClient)) } } private sealed class RpcSendJob { data class Send( val clientAddress: SimpleString, - val artemisMessage: ClientMessage, - val originalMessage: RPCApi.ServerToClient + val serializationContext: SerializationContext, + val message: RPCApi.ServerToClient ) : RpcSendJob() object Stop : RpcSendJob() }