From 7f1bfac8b092519a53b1f059b73c7a9b62c13d35 Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Mon, 19 Feb 2018 14:53:28 +0000 Subject: [PATCH] Write better test for dupes --- .../net/corda/client/rpc/RPCStabilityTests.kt | 101 ++++++++++++------ .../main/kotlin/net/corda/nodeapi/RPCApi.kt | 4 +- .../node/services/messaging/RPCServer.kt | 62 ++++++----- 3 files changed, 101 insertions(+), 66 deletions(-) diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index de74d563cb..0ac85510f9 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -13,13 +13,12 @@ import net.corda.core.utilities.* import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.RPCApi import net.corda.testing.core.SerializationEnvironmentRule -import net.corda.testing.internal.* +import net.corda.testing.internal.testThreadFactory import net.corda.testing.node.internal.* import org.apache.activemq.artemis.api.core.SimpleString import org.junit.After import org.junit.Assert.assertEquals import org.junit.Assert.assertTrue -import org.junit.Ignore import org.junit.Rule import org.junit.Test import rx.Observable @@ -31,6 +30,7 @@ import java.util.concurrent.Executors import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong class RPCStabilityTests { @Rule @@ -352,44 +352,77 @@ class RPCStabilityTests { } } - interface StreamOps : RPCOps { - fun stream(streamInterval: Duration): Observable - } - class StreamOpsImpl : StreamOps { - override val protocolVersion = 0 - override fun stream(streamInterval: Duration): Observable { - return Observable.interval(streamInterval.toNanos(), TimeUnit.NANOSECONDS) + @Test + fun `deduplication in the server`() { + rpcDriver { + val server = startRpcServer(ops = SlowConsumerRPCOpsImpl()).getOrThrow() + + // Construct an RPC client session manually + val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}" + val session = startArtemisSession(server.broker.hostAndPort!!) + session.createTemporaryQueue(myQueue, myQueue) + val consumer = session.createConsumer(myQueue, null, -1, -1, false) + val replies = ArrayList() + consumer.setMessageHandler { + replies.add(it) + it.acknowledge() + } + + val producer = session.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME) + session.start() + + pollUntilClientNumber(server, 1) + + val message = session.createMessage(false) + val request = RPCApi.ClientToServer.RpcRequest( + clientAddress = SimpleString(myQueue), + methodName = DummyOps::protocolVersion.name, + serialisedArguments = emptyList().serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT), + replyId = Trace.InvocationId.newInstance(), + sessionId = Trace.SessionId.newInstance() + ) + request.writeToClientMessage(message) + message.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, 0) + producer.send(message) + // duplicate the message + producer.send(message) + + pollUntilTrue("Number of replies is 1") { + replies.size == 1 + }.getOrThrow() } } - @Ignore("This is flaky as sometimes artemis delivers out of order messages after the kick") + @Test - fun `deduplication on the client side`() { + fun `deduplication in the client`() { rpcDriver { - val server = startRpcServer(ops = StreamOpsImpl()).getOrThrow() - val proxy = startRpcClient( - server.broker.hostAndPort!!, - configuration = RPCClientConfiguration.default.copy( - connectionRetryInterval = 1.days // switch off failover - ) - ).getOrThrow() - // Find the internal address of the client - val clientAddress = server.broker.serverControl.addressNames.find { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) } - val events = ArrayList() - // Start streaming an incrementing value 2000 times per second from the server. - val subscription = proxy.stream(streamInterval = Duration.ofNanos(500_000)).subscribe { - events.add(it) - } - // These sleeps are *fine*, the invariant should hold regardless of any delays - Thread.sleep(50) - // Kick the client. This seems to trigger redelivery of (presumably non-acked) messages. - server.broker.serverControl.closeConsumerConnectionsForAddress(clientAddress) - Thread.sleep(50) - subscription.unsubscribe() - for (i in 0 until events.size) { - require(events[i] == i.toLong()) { - "Events not incremental, possible duplicate, ${events[i]} != ${i.toLong()}\nExpected: ${(0..i).toList()}\nGot : $events\n" + val broker = startRpcBroker().getOrThrow() + + // Construct an RPC server session manually + val session = startArtemisSession(broker.hostAndPort!!) + val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME) + val producer = session.createProducer() + val dedupeId = AtomicLong(0) + consumer.setMessageHandler { + it.acknowledge() + val request = RPCApi.ClientToServer.fromClientMessage(it) + when (request) { + is RPCApi.ClientToServer.RpcRequest -> { + val reply = RPCApi.ServerToClient.RpcReply(request.replyId, Try.Success(0), "server") + val message = session.createMessage(false) + reply.writeToClientMessage(SerializationDefaults.RPC_SERVER_CONTEXT, message) + message.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, dedupeId.getAndIncrement()) + producer.send(request.clientAddress, message) + // duplicate the reply + producer.send(request.clientAddress, message) + } + is RPCApi.ClientToServer.ObservablesClosed -> { + } } } + session.start() + + startRpcClient(broker.hostAndPort!!).getOrThrow() } } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt index 338bac1bfd..9392c176c7 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt @@ -75,7 +75,6 @@ object RPCApi { const val DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME = "deduplication-sequence-number" - val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION = "${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " + "${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'" @@ -181,12 +180,11 @@ object RPCApi { abstract fun writeToClientMessage(context: SerializationContext, message: ClientMessage) + /** The identity used to identify the deduplication ID sequence. This should be unique per server JVM run */ abstract val deduplicationIdentity: String /** * Reply in response to an [ClientToServer.RpcRequest]. - * @property deduplicationSequenceNumber a sequence number strictly incrementing with each message. Use this for - * duplicate detection on the client. */ data class RpcReply( val id: InvocationId, diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index 58788db721..bc47992147 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -299,40 +299,44 @@ class RPCServer( lifeCycle.requireState(State.STARTED) val clientToServer = RPCApi.ClientToServer.fromClientMessage(artemisMessage) log.debug { "-> RPC -> $clientToServer" } - when (clientToServer) { - is RPCApi.ClientToServer.RpcRequest -> { - val deduplicationSequenceNumber = artemisMessage.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME) - if (deduplicationChecker.checkDuplicateMessageId( - identity = clientToServer.clientAddress, - sequenceNumber = deduplicationSequenceNumber - )) { - log.info("Message duplication detected, discarding message") - return - } - val arguments = Try.on { - clientToServer.serialisedArguments.deserialize>(context = RPC_SERVER_CONTEXT) - } - val context = artemisMessage.context(clientToServer.sessionId) - context.invocation.pushToLoggingContext() - when (arguments) { - is Try.Success -> { - rpcExecutor!!.submit { - val result = invokeRpc(context, clientToServer.methodName, arguments.value) - sendReply(clientToServer.replyId, clientToServer.clientAddress, result) + try { + when (clientToServer) { + is RPCApi.ClientToServer.RpcRequest -> { + val deduplicationSequenceNumber = artemisMessage.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME) + if (deduplicationChecker.checkDuplicateMessageId( + identity = clientToServer.clientAddress, + sequenceNumber = deduplicationSequenceNumber + )) { + log.info("Message duplication detected, discarding message") + return + } + val arguments = Try.on { + clientToServer.serialisedArguments.deserialize>(context = RPC_SERVER_CONTEXT) + } + val context = artemisMessage.context(clientToServer.sessionId) + context.invocation.pushToLoggingContext() + when (arguments) { + is Try.Success -> { + log.info("SUBMITTING") + rpcExecutor!!.submit { + val result = invokeRpc(context, clientToServer.methodName, arguments.value) + sendReply(clientToServer.replyId, clientToServer.clientAddress, result) + } + } + is Try.Failure -> { + // We failed to deserialise the arguments, route back the error + log.warn("Inbound RPC failed", arguments.exception) + sendReply(clientToServer.replyId, clientToServer.clientAddress, arguments) } } - is Try.Failure -> { - // We failed to deserialise the arguments, route back the error - log.warn("Inbound RPC failed", arguments.exception) - sendReply(clientToServer.replyId, clientToServer.clientAddress, arguments) - } + } + is RPCApi.ClientToServer.ObservablesClosed -> { + observableMap.invalidateAll(clientToServer.ids) } } - is RPCApi.ClientToServer.ObservablesClosed -> { - observableMap.invalidateAll(clientToServer.ids) - } + } finally { + artemisMessage.acknowledge() } - artemisMessage.acknowledge() } private fun invokeRpc(context: RpcAuthContext, methodName: String, arguments: List): Try {