RPC server: buffer response messages until the client queue is fully set up.

The issue arises when the server restarts, and the client is sometimes
not able to recreate its queue in time, so the server is unable to send
back a response message and just drops it, causing the client to hang.
This commit is contained in:
Andrius Dagys 2017-06-15 15:19:33 +01:00
parent 4f16512dcf
commit aaf7de0d02
5 changed files with 157 additions and 103 deletions

View File

@ -5,7 +5,6 @@ import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.base.Stopwatch
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures
import net.corda.client.rpc.internal.RPCClient
@ -17,7 +16,6 @@ import net.corda.node.services.messaging.RPCServerConfiguration
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.RPCKryo
import net.corda.testing.*
import org.apache.activemq.artemis.ArtemisConstants
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
@ -28,8 +26,6 @@ import rx.subjects.UnicastSubject
import java.time.Duration
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import kotlin.test.fail
class RPCStabilityTests {
@ -218,65 +214,27 @@ class RPCStabilityTests {
@Test
fun `client reconnects to rebooted server`() {
// TODO: Remove multiple trials when we fix the Artemis bug (which should have its own test(s)).
if (ArtemisConstants::class.java.`package`.implementationVersion == "1.5.3") {
// The test fails maybe 1 in 100 times, so to stay green until we upgrade Artemis, retry if it fails:
for (i in (1..3)) {
try {
`client reconnects to rebooted server`(1)
} catch (e: TimeoutException) {
continue
}
return
}
fail("Test failed 3 times, which is vanishingly unlikely unless something has changed.")
} else {
// We've upgraded Artemis so make the test fail reliably, in the 2.1.0 case that takes 25 trials:
`client reconnects to rebooted server`(25)
}
}
private fun `client reconnects to rebooted server`(trials: Int) {
rpcDriver {
val coreBurner = thread {
while (!Thread.interrupted()) {
// Spin.
}
}
try {
val ops = object : ReconnectOps {
override val protocolVersion = 0
override fun ping() = "pong"
}
var serverFollower = shutdownManager.follower()
val serverFollower = shutdownManager.follower()
val serverPort = startRpcServer<ReconnectOps>(ops = ops).getOrThrow().broker.hostAndPort!!
serverFollower.unfollow()
// Set retry interval to 1s to reduce test duration
val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds)
val clientFollower = shutdownManager.follower()
val client = startRpcClient<ReconnectOps>(serverPort).getOrThrow()
val client = startRpcClient<ReconnectOps>(serverPort, configuration = clientConfiguration).getOrThrow()
clientFollower.unfollow()
assertEquals("pong", client.ping())
val background = Executors.newSingleThreadExecutor()
(1..trials).forEach {
System.err.println("Start trial $it of $trials.")
serverFollower.shutdown()
serverFollower = shutdownManager.follower()
startRpcServer<ReconnectOps>(ops = ops, customPort = serverPort).getOrThrow()
serverFollower.unfollow()
val stopwatch = Stopwatch.createStarted()
val pingFuture = background.submit(Callable {
client.ping() // Would also hang in foreground, we need it in background so we can timeout.
})
val pingFuture = future {
client.ping()
}
assertEquals("pong", pingFuture.getOrThrow(10.seconds))
System.err.println("Took ${stopwatch.elapsed(TimeUnit.MILLISECONDS)} millis.")
}
background.shutdown() // No point in the hanging case.
clientFollower.shutdown() // Driver would do this after the current server, causing 'legit' failover hang.
} finally {
with(coreBurner) {
interrupt()
join()
}
}
clientFollower.shutdown() // Driver would do this after the new server, causing hang.
}
}

View File

@ -65,10 +65,14 @@ object RPCApi {
val RPC_SERVER_QUEUE_NAME = "rpc.server"
val RPC_CLIENT_QUEUE_NAME_PREFIX = "rpc.client"
val RPC_CLIENT_BINDING_REMOVALS = "rpc.clientqueueremovals"
val RPC_CLIENT_BINDING_ADDITIONS = "rpc.clientqueueadditions"
val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION =
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " +
"${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'"
val RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION =
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_ADDED.name}' AND " +
"${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'"
data class RpcRequestId(val toLong: Long)
data class ObservableId(val toLong: Long)

View File

@ -200,6 +200,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
address = NOTIFICATIONS_ADDRESS,
filter = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION,
durable = false
),
queueConfig(
name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS,
address = NOTIFICATIONS_ADDRESS,
filter = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION,
durable = false
)
)
addressesSettings = mapOf(

View File

@ -42,8 +42,8 @@ import rx.Subscription
import java.lang.reflect.InvocationTargetException
import java.lang.reflect.Method
import java.time.Duration
import java.util.*
import java.util.concurrent.*
import kotlin.collections.ArrayList
data class RPCServerConfiguration(
/** The number of threads to use for handling RPC requests */
@ -91,14 +91,22 @@ class RPCServer(
STARTED,
FINISHED
}
private sealed class BufferOrNone {
data class Buffer(val container: MutableCollection<MessageAndContext>) : BufferOrNone()
object None : BufferOrNone()
}
private data class MessageAndContext(val message: RPCApi.ServerToClient.RpcReply, val context: ObservableContext)
private val lifeCycle = LifeCycle(State.UNSTARTED)
// The methodname->Method map to use for dispatching.
/** The methodname->Method map to use for dispatching. */
private val methodTable: Map<String, Method>
// The observable subscription mapping.
/** The observable subscription mapping. */
private val observableMap = createObservableSubscriptionMap()
// A mapping from client addresses to IDs of associated Observables
/** A mapping from client addresses to IDs of associated Observables */
private val clientAddressToObservables = Multimaps.synchronizedSetMultimap(HashMultimap.create<SimpleString, RPCApi.ObservableId>())
// The scheduled reaper handle.
/** The scheduled reaper handle. */
private var reaperScheduledFuture: ScheduledFuture<*>? = null
private var observationSendExecutor: ExecutorService? = null
@ -113,13 +121,16 @@ class RPCServer(
ArtemisProducer(sessionFactory, session, session.createProducer())
}
private var clientBindingRemovalConsumer: ClientConsumer? = null
private var clientBindingAdditionConsumer: ClientConsumer? = null
private var serverControl: ActiveMQServerControl? = null
private val responseMessageBuffer = ConcurrentHashMap<SimpleString, BufferOrNone>()
init {
val groupedMethods = ops.javaClass.declaredMethods.groupBy { it.name }
groupedMethods.forEach { name, methods ->
if (methods.size > 1) {
throw IllegalArgumentException("Encountered more than one method called ${name} on ${ops.javaClass.name}")
throw IllegalArgumentException("Encountered more than one method called $name on ${ops.javaClass.name}")
}
}
methodTable = groupedMethods.mapValues { it.value.single() }
@ -155,17 +166,8 @@ class RPCServer(
rpcConfiguration.reapInterval.toMillis(),
TimeUnit.MILLISECONDS
)
val sessions = ArrayList<ClientSession>()
for (i in 1 .. rpcConfiguration.consumerPoolSize) {
val sessionFactory = serverLocator.createSessionFactory()
val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME)
consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler)
sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer))
sessions.add(session)
}
clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS)
clientBindingRemovalConsumer!!.setMessageHandler(this::bindingRemovalArtemisMessageHandler)
val sessions = createConsumerSessions()
createNotificationConsumers()
serverControl = activeMqServerControl
lifeCycle.transition(State.UNSTARTED, State.STARTED)
// We delay the consumer session start because Artemis starts delivering messages immediately, so we need to be
@ -179,6 +181,26 @@ class RPCServer(
}
}
private fun createConsumerSessions(): ArrayList<ClientSession> {
val sessions = ArrayList<ClientSession>()
for (i in 1..rpcConfiguration.consumerPoolSize) {
val sessionFactory = serverLocator.createSessionFactory()
val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME)
consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler)
sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer))
sessions.add(session)
}
return sessions
}
private fun createNotificationConsumers() {
clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS)
clientBindingRemovalConsumer!!.setMessageHandler(this::bindingRemovalArtemisMessageHandler)
clientBindingAdditionConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_ADDITIONS)
clientBindingAdditionConsumer!!.setMessageHandler(this::bindingAdditionArtemisMessageHandler)
}
fun close() {
reaperScheduledFuture?.cancel(false)
rpcExecutor?.shutdownNow()
@ -203,12 +225,38 @@ class RPCServer(
invalidateClient(SimpleString(clientAddress))
}
private fun bindingAdditionArtemisMessageHandler(artemisMessage: ClientMessage) {
lifeCycle.requireState(State.STARTED)
val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
require(notificationType == CoreNotificationType.BINDING_ADDED.name)
val clientAddress = SimpleString(artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME))
log.debug("RPC client queue created on address $clientAddress")
val buffer = stopBuffering(clientAddress)
buffer?.let { drainBuffer(it) }
}
/**
* Disables message buffering for [clientAddress] and returns the existing buffer
* or `null` if no requests were ever received.
*/
private fun stopBuffering(clientAddress: SimpleString): BufferOrNone.Buffer? {
return responseMessageBuffer.put(clientAddress, BufferOrNone.None) as? BufferOrNone.Buffer
}
private fun drainBuffer(buffer: BufferOrNone.Buffer) {
buffer.container.forEach {
it.context.sendMessage(it.message)
}
}
// Note that this function operates on the *current* view of client observables. During invalidation further
// Observables may be serialised and thus registered.
private fun invalidateClient(clientAddress: SimpleString) {
lifeCycle.requireState(State.STARTED)
val observableIds = clientAddressToObservables.removeAll(clientAddress)
observableMap.invalidateAll(observableIds)
responseMessageBuffer.remove(clientAddress)
}
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
@ -221,17 +269,7 @@ class RPCServer(
currentUser = getUser(artemisMessage)
)
rpcExecutor!!.submit {
val result = ErrorOr.catch {
try {
CURRENT_RPC_CONTEXT.set(rpcContext)
log.debug { "Calling ${clientToServer.methodName}" }
val method = methodTable[clientToServer.methodName] ?:
throw RPCException("Received RPC for unknown method ${clientToServer.methodName} - possible client/server version skew?")
method.invoke(ops, *clientToServer.arguments.toTypedArray())
} finally {
CURRENT_RPC_CONTEXT.remove()
}
}
val result = invokeRpc(rpcContext, clientToServer.methodName, clientToServer.arguments)
val resultWithExceptionUnwrapped = result.mapError {
if (it is InvocationTargetException) {
it.cause ?: RPCException("Caught InvocationTargetException without cause")
@ -239,21 +277,7 @@ class RPCServer(
it
}
}
val reply = RPCApi.ServerToClient.RpcReply(
id = clientToServer.id,
result = resultWithExceptionUnwrapped
)
val observableContext = ObservableContext(
clientToServer.id,
observableMap,
clientAddressToObservables,
clientToServer.clientAddress,
serverControl!!,
sessionAndProducerPool,
observationSendExecutor!!,
kryoPool
)
observableContext.sendMessage(reply)
sendReply(clientToServer.id, clientToServer.clientAddress, resultWithExceptionUnwrapped)
}
}
is RPCApi.ClientToServer.ObservablesClosed -> {
@ -263,6 +287,62 @@ class RPCServer(
artemisMessage.acknowledge()
}
private fun invokeRpc(rpcContext: RpcContext, methodName: String, arguments: List<Any?>): ErrorOr<Any> {
return ErrorOr.catch {
try {
CURRENT_RPC_CONTEXT.set(rpcContext)
log.debug { "Calling $methodName" }
val method = methodTable[methodName] ?:
throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?")
method.invoke(ops, *arguments.toTypedArray())
} finally {
CURRENT_RPC_CONTEXT.remove()
}
}
}
private fun sendReply(requestId: RPCApi.RpcRequestId, clientAddress: SimpleString, resultWithExceptionUnwrapped: ErrorOr<Any>) {
val reply = RPCApi.ServerToClient.RpcReply(
id = requestId,
result = resultWithExceptionUnwrapped
)
val observableContext = ObservableContext(
requestId,
observableMap,
clientAddressToObservables,
clientAddress,
serverControl!!,
sessionAndProducerPool,
observationSendExecutor!!,
kryoPool
)
val buffered = bufferIfQueueNotBound(clientAddress, reply, observableContext)
if (!buffered) observableContext.sendMessage(reply)
}
/**
* Buffer the message if the queue at [clientAddress] is not yet bound.
*
* This can happen after server restart when the client consumer session initiates failover,
* but the client queue is not yet set up. We buffer the messages and flush the buffer only once
* we receive a notification that the client queue bindings were added.
*/
private fun bufferIfQueueNotBound(clientAddress: SimpleString, message: RPCApi.ServerToClient.RpcReply, context: ObservableContext): Boolean {
val clientBuffer = responseMessageBuffer.compute(clientAddress, { _, value ->
when (value) {
null -> BufferOrNone.Buffer(ArrayList<MessageAndContext>()).apply {
container.add(MessageAndContext(message, context))
}
is BufferOrNone.Buffer -> value.apply {
container.add(MessageAndContext(message, context))
}
is BufferOrNone.None -> value
}
})
return clientBuffer is BufferOrNone.Buffer
}
private fun reapSubscriptions() {
observableMap.cleanUp()
}

View File

@ -284,6 +284,12 @@ data class RPCDriverDSL(
address = notificationAddress
filterString = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION
isDurable = false
},
CoreQueueConfiguration().apply {
name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS
address = notificationAddress
filterString = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION
isDurable = false
}
)
addressesSettings = mapOf(