mirror of
https://github.com/corda/corda.git
synced 2025-04-06 19:07:08 +00:00
Write better test for dupes
This commit is contained in:
parent
6a4f783106
commit
7f1bfac8b0
@ -13,13 +13,12 @@ import net.corda.core.utilities.*
|
||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.testing.core.SerializationEnvironmentRule
|
||||
import net.corda.testing.internal.*
|
||||
import net.corda.testing.internal.testThreadFactory
|
||||
import net.corda.testing.node.internal.*
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Ignore
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
@ -31,6 +30,7 @@ import java.util.concurrent.Executors
|
||||
import java.util.concurrent.ScheduledExecutorService
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
class RPCStabilityTests {
|
||||
@Rule
|
||||
@ -352,44 +352,77 @@ class RPCStabilityTests {
|
||||
}
|
||||
}
|
||||
|
||||
interface StreamOps : RPCOps {
|
||||
fun stream(streamInterval: Duration): Observable<Long>
|
||||
}
|
||||
class StreamOpsImpl : StreamOps {
|
||||
override val protocolVersion = 0
|
||||
override fun stream(streamInterval: Duration): Observable<Long> {
|
||||
return Observable.interval(streamInterval.toNanos(), TimeUnit.NANOSECONDS)
|
||||
@Test
|
||||
fun `deduplication in the server`() {
|
||||
rpcDriver {
|
||||
val server = startRpcServer(ops = SlowConsumerRPCOpsImpl()).getOrThrow()
|
||||
|
||||
// Construct an RPC client session manually
|
||||
val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}"
|
||||
val session = startArtemisSession(server.broker.hostAndPort!!)
|
||||
session.createTemporaryQueue(myQueue, myQueue)
|
||||
val consumer = session.createConsumer(myQueue, null, -1, -1, false)
|
||||
val replies = ArrayList<Any>()
|
||||
consumer.setMessageHandler {
|
||||
replies.add(it)
|
||||
it.acknowledge()
|
||||
}
|
||||
|
||||
val producer = session.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
session.start()
|
||||
|
||||
pollUntilClientNumber(server, 1)
|
||||
|
||||
val message = session.createMessage(false)
|
||||
val request = RPCApi.ClientToServer.RpcRequest(
|
||||
clientAddress = SimpleString(myQueue),
|
||||
methodName = DummyOps::protocolVersion.name,
|
||||
serialisedArguments = emptyList<Any>().serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT),
|
||||
replyId = Trace.InvocationId.newInstance(),
|
||||
sessionId = Trace.SessionId.newInstance()
|
||||
)
|
||||
request.writeToClientMessage(message)
|
||||
message.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, 0)
|
||||
producer.send(message)
|
||||
// duplicate the message
|
||||
producer.send(message)
|
||||
|
||||
pollUntilTrue("Number of replies is 1") {
|
||||
replies.size == 1
|
||||
}.getOrThrow()
|
||||
}
|
||||
}
|
||||
@Ignore("This is flaky as sometimes artemis delivers out of order messages after the kick")
|
||||
|
||||
@Test
|
||||
fun `deduplication on the client side`() {
|
||||
fun `deduplication in the client`() {
|
||||
rpcDriver {
|
||||
val server = startRpcServer(ops = StreamOpsImpl()).getOrThrow()
|
||||
val proxy = startRpcClient<StreamOps>(
|
||||
server.broker.hostAndPort!!,
|
||||
configuration = RPCClientConfiguration.default.copy(
|
||||
connectionRetryInterval = 1.days // switch off failover
|
||||
)
|
||||
).getOrThrow()
|
||||
// Find the internal address of the client
|
||||
val clientAddress = server.broker.serverControl.addressNames.find { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) }
|
||||
val events = ArrayList<Long>()
|
||||
// Start streaming an incrementing value 2000 times per second from the server.
|
||||
val subscription = proxy.stream(streamInterval = Duration.ofNanos(500_000)).subscribe {
|
||||
events.add(it)
|
||||
}
|
||||
// These sleeps are *fine*, the invariant should hold regardless of any delays
|
||||
Thread.sleep(50)
|
||||
// Kick the client. This seems to trigger redelivery of (presumably non-acked) messages.
|
||||
server.broker.serverControl.closeConsumerConnectionsForAddress(clientAddress)
|
||||
Thread.sleep(50)
|
||||
subscription.unsubscribe()
|
||||
for (i in 0 until events.size) {
|
||||
require(events[i] == i.toLong()) {
|
||||
"Events not incremental, possible duplicate, ${events[i]} != ${i.toLong()}\nExpected: ${(0..i).toList()}\nGot : $events\n"
|
||||
val broker = startRpcBroker().getOrThrow()
|
||||
|
||||
// Construct an RPC server session manually
|
||||
val session = startArtemisSession(broker.hostAndPort!!)
|
||||
val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
val producer = session.createProducer()
|
||||
val dedupeId = AtomicLong(0)
|
||||
consumer.setMessageHandler {
|
||||
it.acknowledge()
|
||||
val request = RPCApi.ClientToServer.fromClientMessage(it)
|
||||
when (request) {
|
||||
is RPCApi.ClientToServer.RpcRequest -> {
|
||||
val reply = RPCApi.ServerToClient.RpcReply(request.replyId, Try.Success(0), "server")
|
||||
val message = session.createMessage(false)
|
||||
reply.writeToClientMessage(SerializationDefaults.RPC_SERVER_CONTEXT, message)
|
||||
message.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, dedupeId.getAndIncrement())
|
||||
producer.send(request.clientAddress, message)
|
||||
// duplicate the reply
|
||||
producer.send(request.clientAddress, message)
|
||||
}
|
||||
is RPCApi.ClientToServer.ObservablesClosed -> {
|
||||
}
|
||||
}
|
||||
}
|
||||
session.start()
|
||||
|
||||
startRpcClient<RPCOps>(broker.hostAndPort!!).getOrThrow()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -75,7 +75,6 @@ object RPCApi {
|
||||
|
||||
const val DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME = "deduplication-sequence-number"
|
||||
|
||||
|
||||
val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION =
|
||||
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " +
|
||||
"${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'"
|
||||
@ -181,12 +180,11 @@ object RPCApi {
|
||||
|
||||
abstract fun writeToClientMessage(context: SerializationContext, message: ClientMessage)
|
||||
|
||||
/** The identity used to identify the deduplication ID sequence. This should be unique per server JVM run */
|
||||
abstract val deduplicationIdentity: String
|
||||
|
||||
/**
|
||||
* Reply in response to an [ClientToServer.RpcRequest].
|
||||
* @property deduplicationSequenceNumber a sequence number strictly incrementing with each message. Use this for
|
||||
* duplicate detection on the client.
|
||||
*/
|
||||
data class RpcReply(
|
||||
val id: InvocationId,
|
||||
|
@ -299,40 +299,44 @@ class RPCServer(
|
||||
lifeCycle.requireState(State.STARTED)
|
||||
val clientToServer = RPCApi.ClientToServer.fromClientMessage(artemisMessage)
|
||||
log.debug { "-> RPC -> $clientToServer" }
|
||||
when (clientToServer) {
|
||||
is RPCApi.ClientToServer.RpcRequest -> {
|
||||
val deduplicationSequenceNumber = artemisMessage.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME)
|
||||
if (deduplicationChecker.checkDuplicateMessageId(
|
||||
identity = clientToServer.clientAddress,
|
||||
sequenceNumber = deduplicationSequenceNumber
|
||||
)) {
|
||||
log.info("Message duplication detected, discarding message")
|
||||
return
|
||||
}
|
||||
val arguments = Try.on {
|
||||
clientToServer.serialisedArguments.deserialize<List<Any?>>(context = RPC_SERVER_CONTEXT)
|
||||
}
|
||||
val context = artemisMessage.context(clientToServer.sessionId)
|
||||
context.invocation.pushToLoggingContext()
|
||||
when (arguments) {
|
||||
is Try.Success -> {
|
||||
rpcExecutor!!.submit {
|
||||
val result = invokeRpc(context, clientToServer.methodName, arguments.value)
|
||||
sendReply(clientToServer.replyId, clientToServer.clientAddress, result)
|
||||
try {
|
||||
when (clientToServer) {
|
||||
is RPCApi.ClientToServer.RpcRequest -> {
|
||||
val deduplicationSequenceNumber = artemisMessage.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME)
|
||||
if (deduplicationChecker.checkDuplicateMessageId(
|
||||
identity = clientToServer.clientAddress,
|
||||
sequenceNumber = deduplicationSequenceNumber
|
||||
)) {
|
||||
log.info("Message duplication detected, discarding message")
|
||||
return
|
||||
}
|
||||
val arguments = Try.on {
|
||||
clientToServer.serialisedArguments.deserialize<List<Any?>>(context = RPC_SERVER_CONTEXT)
|
||||
}
|
||||
val context = artemisMessage.context(clientToServer.sessionId)
|
||||
context.invocation.pushToLoggingContext()
|
||||
when (arguments) {
|
||||
is Try.Success -> {
|
||||
log.info("SUBMITTING")
|
||||
rpcExecutor!!.submit {
|
||||
val result = invokeRpc(context, clientToServer.methodName, arguments.value)
|
||||
sendReply(clientToServer.replyId, clientToServer.clientAddress, result)
|
||||
}
|
||||
}
|
||||
is Try.Failure -> {
|
||||
// We failed to deserialise the arguments, route back the error
|
||||
log.warn("Inbound RPC failed", arguments.exception)
|
||||
sendReply(clientToServer.replyId, clientToServer.clientAddress, arguments)
|
||||
}
|
||||
}
|
||||
is Try.Failure -> {
|
||||
// We failed to deserialise the arguments, route back the error
|
||||
log.warn("Inbound RPC failed", arguments.exception)
|
||||
sendReply(clientToServer.replyId, clientToServer.clientAddress, arguments)
|
||||
}
|
||||
}
|
||||
is RPCApi.ClientToServer.ObservablesClosed -> {
|
||||
observableMap.invalidateAll(clientToServer.ids)
|
||||
}
|
||||
}
|
||||
is RPCApi.ClientToServer.ObservablesClosed -> {
|
||||
observableMap.invalidateAll(clientToServer.ids)
|
||||
}
|
||||
} finally {
|
||||
artemisMessage.acknowledge()
|
||||
}
|
||||
artemisMessage.acknowledge()
|
||||
}
|
||||
|
||||
private fun invokeRpc(context: RpcAuthContext, methodName: String, arguments: List<Any?>): Try<Any> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user