#592: RPCServer lifecycle

This commit is contained in:
Andras Slemmer
2017-05-04 16:28:49 +01:00
parent 34517f653a
commit 652cbb0d9f
2 changed files with 23 additions and 1 deletions

View File

@ -75,8 +75,10 @@ class DriverTests {
driver(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())) { driver(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())) {
val baseDirectory = startNode(DUMMY_BANK_A.name).getOrThrow().configuration.baseDirectory val baseDirectory = startNode(DUMMY_BANK_A.name).getOrThrow().configuration.baseDirectory
val logFile = (baseDirectory / LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() } val logFile = (baseDirectory / LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() }
println("ASD $logFile")
val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } } val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } }
assertThat(debugLinesPresent).isTrue() assertThat(debugLinesPresent).isTrue()
println("hmm.")
} }
} }

View File

@ -18,6 +18,7 @@ import net.corda.core.messaging.RPCOps
import net.corda.core.random63BitValue import net.corda.core.random63BitValue
import net.corda.core.serialization.KryoPoolWithContext import net.corda.core.serialization.KryoPoolWithContext
import net.corda.core.utilities.LazyStickyPool import net.corda.core.utilities.LazyStickyPool
import net.corda.core.utilities.LifeCycle
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.node.services.RPCUserService import net.corda.node.services.RPCUserService
@ -28,6 +29,7 @@ import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.api.core.client.ServerLocator import org.apache.activemq.artemis.api.core.client.ServerLocator
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.activemq.artemis.api.core.management.CoreNotificationType import org.apache.activemq.artemis.api.core.management.CoreNotificationType
@ -84,6 +86,12 @@ class RPCServer(
val log = loggerFor<RPCServer>() val log = loggerFor<RPCServer>()
val kryoPool = KryoPool.Builder { RPCKryo(RpcServerObservableSerializer) }.build() val kryoPool = KryoPool.Builder { RPCKryo(RpcServerObservableSerializer) }.build()
} }
private enum class State {
UNSTARTED,
STARTED,
FINISHED
}
private val lifeCycle = LifeCycle(State.UNSTARTED)
// The methodname->Method map to use for dispatching. // The methodname->Method map to use for dispatching.
private val methodTable = ops.javaClass.declaredMethods.groupBy { it.name }.mapValues { it.value.single() } private val methodTable = ops.javaClass.declaredMethods.groupBy { it.name }.mapValues { it.value.single() }
// The observable subscription mapping. // The observable subscription mapping.
@ -134,17 +142,24 @@ class RPCServer(
rpcConfiguration.reapIntervalMs, rpcConfiguration.reapIntervalMs,
TimeUnit.MILLISECONDS TimeUnit.MILLISECONDS
) )
val sessions = ArrayList<ClientSession>()
for (i in 1 .. rpcConfiguration.consumerPoolSize) { for (i in 1 .. rpcConfiguration.consumerPoolSize) {
val sessionFactory = serverLocator.createSessionFactory() val sessionFactory = serverLocator.createSessionFactory()
val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE) val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME) val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME)
consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler) consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler)
session.start()
sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer)) sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer))
sessions.add(session)
} }
clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS) clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS)
clientBindingRemovalConsumer.setMessageHandler(this::bindingRemovalArtemisMessageHandler) clientBindingRemovalConsumer.setMessageHandler(this::bindingRemovalArtemisMessageHandler)
serverControl = activeMqServerControl serverControl = activeMqServerControl
lifeCycle.transition(State.UNSTARTED, State.STARTED)
// We delay the consumer session start because Artemis starts delivering messages immediately, so we need to be
// fully initialised.
sessions.forEach {
it.start()
}
} }
fun close() { fun close() {
@ -165,9 +180,11 @@ class RPCServer(
it.session.close() it.session.close()
it.sessionFactory.close() it.sessionFactory.close()
} }
lifeCycle.transition(State.STARTED, State.FINISHED)
} }
private fun bindingRemovalArtemisMessageHandler(artemisMessage: ClientMessage) { private fun bindingRemovalArtemisMessageHandler(artemisMessage: ClientMessage) {
lifeCycle.requireState(State.STARTED)
val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE) val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
require(notificationType == CoreNotificationType.BINDING_REMOVED.name) require(notificationType == CoreNotificationType.BINDING_REMOVED.name)
val clientAddress = artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME) val clientAddress = artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME)
@ -178,11 +195,13 @@ class RPCServer(
// Note that this function operates on the *current* view of client observables. During invalidation further // Note that this function operates on the *current* view of client observables. During invalidation further
// Observables may be serialised and thus registered. // Observables may be serialised and thus registered.
private fun invalidateClient(clientAddress: SimpleString) { private fun invalidateClient(clientAddress: SimpleString) {
lifeCycle.requireState(State.STARTED)
val observableIds = clientAddressToObservables.removeAll(clientAddress) val observableIds = clientAddressToObservables.removeAll(clientAddress)
observableMap.invalidateAll(observableIds) observableMap.invalidateAll(observableIds)
} }
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) { private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
lifeCycle.requireState(State.STARTED)
val clientToServer = RPCApi.ClientToServer.fromClientMessage(kryoPool, artemisMessage) val clientToServer = RPCApi.ClientToServer.fromClientMessage(kryoPool, artemisMessage)
log.debug { "-> RPC -> $clientToServer" } log.debug { "-> RPC -> $clientToServer" }
when (clientToServer) { when (clientToServer) {
@ -234,6 +253,7 @@ class RPCServer(
} }
private fun reapSubscriptions() { private fun reapSubscriptions() {
lifeCycle.requireState(State.STARTED)
observableMap.cleanUp() observableMap.cleanUp()
} }