RPC: Improve the client class with a convenience function and some startup time logging.

This reveals that building the first RPC client is ludicrously slow (like 1.8 seconds) but subsequent builds are more like 30 msec. This might be interpreter overhead, or it might be due to Artemis/SSL doing lots of piggy lazy initialisation or something. But at any rate it may be worth investigating a bit later.
This commit is contained in:
Mike Hearn 2017-01-06 13:58:56 +01:00
parent ecb460b698
commit 32523d376e

View File

@ -2,7 +2,9 @@ package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import net.corda.core.ThreadBox
import net.corda.core.logElapsedTime
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.utilities.loggerFor
import net.corda.node.services.config.NodeSSLConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.ConnectionDirection.Outbound
import org.apache.activemq.artemis.api.core.ActiveMQException
@ -23,6 +25,10 @@ import javax.annotation.concurrent.ThreadSafe
*/
@ThreadSafe
class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfiguration?) : Closeable, ArtemisMessagingComponent() {
private companion object {
val log = loggerFor<CordaRPCClient>()
}
// TODO: Certificate handling for clients needs more work.
private inner class State {
var running = false
@ -33,30 +39,54 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
private val state = ThreadBox(State())
/** Opens the connection to the server and registers a JVM shutdown hook to cleanly disconnect. */
/**
* Opens the connection to the server with the given username and password, then returns itself.
* Registers a JVM shutdown hook to cleanly disconnect.
*/
@Throws(ActiveMQException::class)
fun start(username: String, password: String) {
fun start(username: String, password: String): CordaRPCClient {
state.locked {
check(!running)
checkStorePasswords()
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port))
serverLocator.threadPoolMaxSize = 1
// TODO: Configure session reconnection, confirmation window sizes and other Artemis features.
// This will allow reconnection in case of server restart/network outages/IP address changes, etc.
// See http://activemq.apache.org/artemis/docs/1.5.0/client-reconnection.html
sessionFactory = serverLocator.createSessionFactory()
session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize)
session.start()
clientImpl = CordaRPCClientImpl(session, state.lock, username)
running = true
log.logElapsedTime("Startup") {
checkStorePasswords()
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(Outbound(), host.hostText, host.port))
serverLocator.threadPoolMaxSize = 1
// TODO: Configure session reconnection, confirmation window sizes and other Artemis features.
// This will allow reconnection in case of server restart/network outages/IP address changes, etc.
// See http://activemq.apache.org/artemis/docs/1.5.0/client-reconnection.html
sessionFactory = serverLocator.createSessionFactory()
session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize)
session.start()
clientImpl = CordaRPCClientImpl(session, state.lock, username)
running = true
}
}
Runtime.getRuntime().addShutdownHook(Thread {
close()
})
return this
}
/** Shuts down the client and lets the server know it can free the used resources (in a nice way) */
/**
* A convenience function that opens a connection with the given credentials, executes the given code block with all
* available RPCs in scope and shuts down the RPC connection again. It's meant for quick prototyping and demos. For
* more control you probably want to control the lifecycle of the client and proxies independently, as well as
* configuring a timeout and other such features via the [proxy] method.
*
* After this method returns the client is closed and can't be restarted.
*/
@Throws(ActiveMQException::class)
fun <T> use(username: String, password: String, block: CordaRPCOps.() -> T): T {
require(!state.locked { running })
start(username, password)
(this as Closeable).use {
return proxy().block()
}
}
/** Shuts down the client and lets the server know it can free the used resources (in a nice way). */
override fun close() {
state.locked {
if (!running) return
@ -105,7 +135,9 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
fun proxy(timeout: Duration? = null, minVersion: Int = 0): CordaRPCOps {
return state.locked {
check(running) { "Client must have been started first" }
clientImpl.proxyFor(CordaRPCOps::class.java, timeout, minVersion)
log.logElapsedTime("Proxy build") {
clientImpl.proxyFor(CordaRPCOps::class.java, timeout, minVersion)
}
}
}