From 652cbb0d9f50b491804179dc5ef95e87ce89b60e Mon Sep 17 00:00:00 2001 From: Andras Slemmer Date: Thu, 4 May 2017 16:28:49 +0100 Subject: [PATCH] #592: RPCServer lifecycle --- .../net/corda/node/driver/DriverTests.kt | 2 ++ .../node/services/messaging/RPCServer.kt | 22 ++++++++++++++++++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt b/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt index cd39098c83..54f77b30a9 100644 --- a/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/driver/DriverTests.kt @@ -75,8 +75,10 @@ class DriverTests { driver(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())) { val baseDirectory = startNode(DUMMY_BANK_A.name).getOrThrow().configuration.baseDirectory val logFile = (baseDirectory / LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() } + println("ASD $logFile") val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } } assertThat(debugLinesPresent).isTrue() + println("hmm.") } } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt index cb3f44ab28..9a0c0f70fe 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/RPCServer.kt @@ -18,6 +18,7 @@ import net.corda.core.messaging.RPCOps import net.corda.core.random63BitValue import net.corda.core.serialization.KryoPoolWithContext import net.corda.core.utilities.LazyStickyPool +import net.corda.core.utilities.LifeCycle import net.corda.core.utilities.debug import net.corda.core.utilities.loggerFor import net.corda.node.services.RPCUserService @@ -28,6 +29,7 @@ import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientMessage +import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.api.core.client.ServerLocator import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.api.core.management.CoreNotificationType @@ -84,6 +86,12 @@ class RPCServer( val log = loggerFor() val kryoPool = KryoPool.Builder { RPCKryo(RpcServerObservableSerializer) }.build() } + private enum class State { + UNSTARTED, + STARTED, + FINISHED + } + private val lifeCycle = LifeCycle(State.UNSTARTED) // The methodname->Method map to use for dispatching. private val methodTable = ops.javaClass.declaredMethods.groupBy { it.name }.mapValues { it.value.single() } // The observable subscription mapping. @@ -134,17 +142,24 @@ class RPCServer( rpcConfiguration.reapIntervalMs, TimeUnit.MILLISECONDS ) + val sessions = ArrayList() for (i in 1 .. rpcConfiguration.consumerPoolSize) { val sessionFactory = serverLocator.createSessionFactory() val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME) consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler) - session.start() sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer)) + sessions.add(session) } clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS) clientBindingRemovalConsumer.setMessageHandler(this::bindingRemovalArtemisMessageHandler) serverControl = activeMqServerControl + lifeCycle.transition(State.UNSTARTED, State.STARTED) + // We delay the consumer session start because Artemis starts delivering messages immediately, so we need to be + // fully initialised. + sessions.forEach { + it.start() + } } fun close() { @@ -165,9 +180,11 @@ class RPCServer( it.session.close() it.sessionFactory.close() } + lifeCycle.transition(State.STARTED, State.FINISHED) } private fun bindingRemovalArtemisMessageHandler(artemisMessage: ClientMessage) { + lifeCycle.requireState(State.STARTED) val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) require(notificationType == CoreNotificationType.BINDING_REMOVED.name) val clientAddress = artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME) @@ -178,11 +195,13 @@ class RPCServer( // Note that this function operates on the *current* view of client observables. During invalidation further // Observables may be serialised and thus registered. private fun invalidateClient(clientAddress: SimpleString) { + lifeCycle.requireState(State.STARTED) val observableIds = clientAddressToObservables.removeAll(clientAddress) observableMap.invalidateAll(observableIds) } private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { + lifeCycle.requireState(State.STARTED) val clientToServer = RPCApi.ClientToServer.fromClientMessage(kryoPool, artemisMessage) log.debug { "-> RPC -> $clientToServer" } when (clientToServer) { @@ -234,6 +253,7 @@ class RPCServer( } private fun reapSubscriptions() { + lifeCycle.requireState(State.STARTED) observableMap.cleanUp() }