diff --git a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt index 83d5965028..4cf43156d2 100644 --- a/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt +++ b/client/rpc/src/integration-test/kotlin/net/corda/client/rpc/RPCStabilityTests.kt @@ -5,7 +5,6 @@ import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.pool.KryoPool -import com.google.common.base.Stopwatch import com.google.common.net.HostAndPort import com.google.common.util.concurrent.Futures import net.corda.client.rpc.internal.RPCClient @@ -17,7 +16,6 @@ import net.corda.node.services.messaging.RPCServerConfiguration import net.corda.nodeapi.RPCApi import net.corda.nodeapi.RPCKryo import net.corda.testing.* -import org.apache.activemq.artemis.ArtemisConstants import org.apache.activemq.artemis.api.core.SimpleString import org.junit.Assert.assertEquals import org.junit.Assert.assertTrue @@ -28,8 +26,6 @@ import rx.subjects.UnicastSubject import java.time.Duration import java.util.concurrent.* import java.util.concurrent.atomic.AtomicInteger -import kotlin.concurrent.thread -import kotlin.test.fail class RPCStabilityTests { @@ -218,65 +214,27 @@ class RPCStabilityTests { @Test fun `client reconnects to rebooted server`() { - // TODO: Remove multiple trials when we fix the Artemis bug (which should have its own test(s)). - if (ArtemisConstants::class.java.`package`.implementationVersion == "1.5.3") { - // The test fails maybe 1 in 100 times, so to stay green until we upgrade Artemis, retry if it fails: - for (i in (1..3)) { - try { - `client reconnects to rebooted server`(1) - } catch (e: TimeoutException) { - continue - } - return - } - fail("Test failed 3 times, which is vanishingly unlikely unless something has changed.") - } else { - // We've upgraded Artemis so make the test fail reliably, in the 2.1.0 case that takes 25 trials: - `client reconnects to rebooted server`(25) - } - } - - private fun `client reconnects to rebooted server`(trials: Int) { rpcDriver { - val coreBurner = thread { - while (!Thread.interrupted()) { - // Spin. - } + val ops = object : ReconnectOps { + override val protocolVersion = 0 + override fun ping() = "pong" } - try { - val ops = object : ReconnectOps { - override val protocolVersion = 0 - override fun ping() = "pong" - } - var serverFollower = shutdownManager.follower() - val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! - serverFollower.unfollow() - val clientFollower = shutdownManager.follower() - val client = startRpcClient(serverPort).getOrThrow() - clientFollower.unfollow() - assertEquals("pong", client.ping()) - val background = Executors.newSingleThreadExecutor() - (1..trials).forEach { - System.err.println("Start trial $it of $trials.") - serverFollower.shutdown() - serverFollower = shutdownManager.follower() - startRpcServer(ops = ops, customPort = serverPort).getOrThrow() - serverFollower.unfollow() - val stopwatch = Stopwatch.createStarted() - val pingFuture = background.submit(Callable { - client.ping() // Would also hang in foreground, we need it in background so we can timeout. - }) - assertEquals("pong", pingFuture.getOrThrow(10.seconds)) - System.err.println("Took ${stopwatch.elapsed(TimeUnit.MILLISECONDS)} millis.") - } - background.shutdown() // No point in the hanging case. - clientFollower.shutdown() // Driver would do this after the current server, causing 'legit' failover hang. - } finally { - with(coreBurner) { - interrupt() - join() - } + val serverFollower = shutdownManager.follower() + val serverPort = startRpcServer(ops = ops).getOrThrow().broker.hostAndPort!! + serverFollower.unfollow() + // Set retry interval to 1s to reduce test duration + val clientConfiguration = RPCClientConfiguration.default.copy(connectionRetryInterval = 1.seconds) + val clientFollower = shutdownManager.follower() + val client = startRpcClient(serverPort, configuration = clientConfiguration).getOrThrow() + clientFollower.unfollow() + assertEquals("pong", client.ping()) + serverFollower.shutdown() + startRpcServer(ops = ops, customPort = serverPort).getOrThrow() + val pingFuture = future { + client.ping() } + assertEquals("pong", pingFuture.getOrThrow(10.seconds)) + clientFollower.shutdown() // Driver would do this after the new server, causing hang. } } diff --git a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt index b3820c154b..836315752a 100644 --- a/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt +++ b/node-api/src/main/kotlin/net/corda/nodeapi/RPCApi.kt @@ -65,10 +65,14 @@ object RPCApi { val RPC_SERVER_QUEUE_NAME = "rpc.server" val RPC_CLIENT_QUEUE_NAME_PREFIX = "rpc.client" val RPC_CLIENT_BINDING_REMOVALS = "rpc.clientqueueremovals" + val RPC_CLIENT_BINDING_ADDITIONS = "rpc.clientqueueadditions" val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION = "${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " + "${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'" + val RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION = + "${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_ADDED.name}' AND " + + "${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'" data class RpcRequestId(val toLong: Long) data class ObservableId(val toLong: Long) diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt index 1fee1f6f2d..5b2d5f9c3c 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/ArtemisMessagingServer.kt @@ -200,6 +200,12 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, address = NOTIFICATIONS_ADDRESS, filter = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION, durable = false + ), + queueConfig( + name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS, + address = NOTIFICATIONS_ADDRESS, + filter = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION, + durable = false ) ) addressesSettings = mapOf( diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index e001d4a248..1306c4206b 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -42,8 +42,8 @@ import rx.Subscription import java.lang.reflect.InvocationTargetException import java.lang.reflect.Method import java.time.Duration -import java.util.* import java.util.concurrent.* +import kotlin.collections.ArrayList data class RPCServerConfiguration( /** The number of threads to use for handling RPC requests */ @@ -91,14 +91,22 @@ class RPCServer( STARTED, FINISHED } + + private sealed class BufferOrNone { + data class Buffer(val container: MutableCollection) : BufferOrNone() + object None : BufferOrNone() + } + + private data class MessageAndContext(val message: RPCApi.ServerToClient.RpcReply, val context: ObservableContext) + private val lifeCycle = LifeCycle(State.UNSTARTED) - // The methodname->Method map to use for dispatching. + /** The methodname->Method map to use for dispatching. */ private val methodTable: Map - // The observable subscription mapping. + /** The observable subscription mapping. */ private val observableMap = createObservableSubscriptionMap() - // A mapping from client addresses to IDs of associated Observables + /** A mapping from client addresses to IDs of associated Observables */ private val clientAddressToObservables = Multimaps.synchronizedSetMultimap(HashMultimap.create()) - // The scheduled reaper handle. + /** The scheduled reaper handle. */ private var reaperScheduledFuture: ScheduledFuture<*>? = null private var observationSendExecutor: ExecutorService? = null @@ -113,13 +121,16 @@ class RPCServer( ArtemisProducer(sessionFactory, session, session.createProducer()) } private var clientBindingRemovalConsumer: ClientConsumer? = null + private var clientBindingAdditionConsumer: ClientConsumer? = null private var serverControl: ActiveMQServerControl? = null + private val responseMessageBuffer = ConcurrentHashMap() + init { val groupedMethods = ops.javaClass.declaredMethods.groupBy { it.name } groupedMethods.forEach { name, methods -> if (methods.size > 1) { - throw IllegalArgumentException("Encountered more than one method called ${name} on ${ops.javaClass.name}") + throw IllegalArgumentException("Encountered more than one method called $name on ${ops.javaClass.name}") } } methodTable = groupedMethods.mapValues { it.value.single() } @@ -155,17 +166,8 @@ class RPCServer( rpcConfiguration.reapInterval.toMillis(), TimeUnit.MILLISECONDS ) - val sessions = ArrayList() - for (i in 1 .. rpcConfiguration.consumerPoolSize) { - val sessionFactory = serverLocator.createSessionFactory() - val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) - val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME) - consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler) - sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer)) - sessions.add(session) - } - clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS) - clientBindingRemovalConsumer!!.setMessageHandler(this::bindingRemovalArtemisMessageHandler) + val sessions = createConsumerSessions() + createNotificationConsumers() serverControl = activeMqServerControl lifeCycle.transition(State.UNSTARTED, State.STARTED) // We delay the consumer session start because Artemis starts delivering messages immediately, so we need to be @@ -179,6 +181,26 @@ class RPCServer( } } + private fun createConsumerSessions(): ArrayList { + val sessions = ArrayList() + for (i in 1..rpcConfiguration.consumerPoolSize) { + val sessionFactory = serverLocator.createSessionFactory() + val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) + val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME) + consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler) + sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer)) + sessions.add(session) + } + return sessions + } + + private fun createNotificationConsumers() { + clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS) + clientBindingRemovalConsumer!!.setMessageHandler(this::bindingRemovalArtemisMessageHandler) + clientBindingAdditionConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_ADDITIONS) + clientBindingAdditionConsumer!!.setMessageHandler(this::bindingAdditionArtemisMessageHandler) + } + fun close() { reaperScheduledFuture?.cancel(false) rpcExecutor?.shutdownNow() @@ -203,12 +225,38 @@ class RPCServer( invalidateClient(SimpleString(clientAddress)) } + private fun bindingAdditionArtemisMessageHandler(artemisMessage: ClientMessage) { + lifeCycle.requireState(State.STARTED) + val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) + require(notificationType == CoreNotificationType.BINDING_ADDED.name) + val clientAddress = SimpleString(artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME)) + log.debug("RPC client queue created on address $clientAddress") + + val buffer = stopBuffering(clientAddress) + buffer?.let { drainBuffer(it) } + } + + /** + * Disables message buffering for [clientAddress] and returns the existing buffer + * or `null` if no requests were ever received. + */ + private fun stopBuffering(clientAddress: SimpleString): BufferOrNone.Buffer? { + return responseMessageBuffer.put(clientAddress, BufferOrNone.None) as? BufferOrNone.Buffer + } + + private fun drainBuffer(buffer: BufferOrNone.Buffer) { + buffer.container.forEach { + it.context.sendMessage(it.message) + } + } + // Note that this function operates on the *current* view of client observables. During invalidation further // Observables may be serialised and thus registered. private fun invalidateClient(clientAddress: SimpleString) { lifeCycle.requireState(State.STARTED) val observableIds = clientAddressToObservables.removeAll(clientAddress) observableMap.invalidateAll(observableIds) + responseMessageBuffer.remove(clientAddress) } private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { @@ -221,17 +269,7 @@ class RPCServer( currentUser = getUser(artemisMessage) ) rpcExecutor!!.submit { - val result = ErrorOr.catch { - try { - CURRENT_RPC_CONTEXT.set(rpcContext) - log.debug { "Calling ${clientToServer.methodName}" } - val method = methodTable[clientToServer.methodName] ?: - throw RPCException("Received RPC for unknown method ${clientToServer.methodName} - possible client/server version skew?") - method.invoke(ops, *clientToServer.arguments.toTypedArray()) - } finally { - CURRENT_RPC_CONTEXT.remove() - } - } + val result = invokeRpc(rpcContext, clientToServer.methodName, clientToServer.arguments) val resultWithExceptionUnwrapped = result.mapError { if (it is InvocationTargetException) { it.cause ?: RPCException("Caught InvocationTargetException without cause") @@ -239,21 +277,7 @@ class RPCServer( it } } - val reply = RPCApi.ServerToClient.RpcReply( - id = clientToServer.id, - result = resultWithExceptionUnwrapped - ) - val observableContext = ObservableContext( - clientToServer.id, - observableMap, - clientAddressToObservables, - clientToServer.clientAddress, - serverControl!!, - sessionAndProducerPool, - observationSendExecutor!!, - kryoPool - ) - observableContext.sendMessage(reply) + sendReply(clientToServer.id, clientToServer.clientAddress, resultWithExceptionUnwrapped) } } is RPCApi.ClientToServer.ObservablesClosed -> { @@ -263,6 +287,62 @@ class RPCServer( artemisMessage.acknowledge() } + private fun invokeRpc(rpcContext: RpcContext, methodName: String, arguments: List): ErrorOr { + return ErrorOr.catch { + try { + CURRENT_RPC_CONTEXT.set(rpcContext) + log.debug { "Calling $methodName" } + val method = methodTable[methodName] ?: + throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?") + method.invoke(ops, *arguments.toTypedArray()) + } finally { + CURRENT_RPC_CONTEXT.remove() + } + } + } + + private fun sendReply(requestId: RPCApi.RpcRequestId, clientAddress: SimpleString, resultWithExceptionUnwrapped: ErrorOr) { + val reply = RPCApi.ServerToClient.RpcReply( + id = requestId, + result = resultWithExceptionUnwrapped + ) + val observableContext = ObservableContext( + requestId, + observableMap, + clientAddressToObservables, + clientAddress, + serverControl!!, + sessionAndProducerPool, + observationSendExecutor!!, + kryoPool + ) + + val buffered = bufferIfQueueNotBound(clientAddress, reply, observableContext) + if (!buffered) observableContext.sendMessage(reply) + } + + /** + * Buffer the message if the queue at [clientAddress] is not yet bound. + * + * This can happen after server restart when the client consumer session initiates failover, + * but the client queue is not yet set up. We buffer the messages and flush the buffer only once + * we receive a notification that the client queue bindings were added. + */ + private fun bufferIfQueueNotBound(clientAddress: SimpleString, message: RPCApi.ServerToClient.RpcReply, context: ObservableContext): Boolean { + val clientBuffer = responseMessageBuffer.compute(clientAddress, { _, value -> + when (value) { + null -> BufferOrNone.Buffer(ArrayList()).apply { + container.add(MessageAndContext(message, context)) + } + is BufferOrNone.Buffer -> value.apply { + container.add(MessageAndContext(message, context)) + } + is BufferOrNone.None -> value + } + }) + return clientBuffer is BufferOrNone.Buffer + } + private fun reapSubscriptions() { observableMap.cleanUp() } diff --git a/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt b/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt index 925df7a4ad..326d01fb74 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt @@ -284,6 +284,12 @@ data class RPCDriverDSL( address = notificationAddress filterString = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION isDurable = false + }, + CoreQueueConfiguration().apply { + name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS + address = notificationAddress + filterString = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION + isDurable = false } ) addressesSettings = mapOf(