Fixing demobench freeze on terminal command 'bye' (#1319)

* Fixing demobench freeze on terminal command 'bye'

* Addressing review comments

* Adding docs.
This commit is contained in:
mkit
2017-08-24 16:40:20 +01:00
committed by GitHub
parent bd48bdfd28
commit 412a54d5ac
4 changed files with 74 additions and 15 deletions

View File

@ -5,10 +5,10 @@ import net.corda.core.internal.logElapsedTime
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializationDefaults
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.minutes
import net.corda.core.utilities.seconds
import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport
import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.RPCApi import net.corda.nodeapi.RPCApi
@ -110,6 +110,21 @@ class RPCClient<I : RPCOps>(
val proxy: I val proxy: I
/** The RPC protocol version reported by the server */ /** The RPC protocol version reported by the server */
val serverProtocolVersion: Int val serverProtocolVersion: Int
/**
* Closes this client without notifying the server.
* The server will eventually clear out the RPC message queue and disconnect subscribed observers,
* but this may take longer than desired, so to conserve resources you should normally use [notifyServerAndClose].
* This method is helpful when the node may be shutting down or
* have already shut down and you don't want to block waiting for it to come back.
*/
fun forceClose()
/**
* Closes this client gracefully by sending a notification to the server, so it can immediately clean up resources.
* If the server is not available this method may block for a short period until it's clear the server is not coming back.
*/
fun notifyServerAndClose()
} }
/** /**
@ -168,13 +183,30 @@ class RPCClient<I : RPCOps>(
object : RPCConnection<I> { object : RPCConnection<I> {
override val proxy = ops override val proxy = ops
override val serverProtocolVersion = serverProtocolVersion override val serverProtocolVersion = serverProtocolVersion
override fun close() {
proxyHandler.close() private fun close(notify: Boolean) {
if (notify) {
proxyHandler.notifyServerAndClose()
} else {
proxyHandler.forceClose()
}
serverLocator.close() serverLocator.close()
} }
override fun notifyServerAndClose() {
close(true)
}
override fun forceClose() {
close(false)
}
override fun close() {
close(true)
}
} }
} catch (exception: Throwable) { } catch (exception: Throwable) {
proxyHandler.close() proxyHandler.notifyServerAndClose()
serverLocator.close() serverLocator.close()
throw exception throw exception
} }

View File

@ -10,14 +10,17 @@ import com.google.common.cache.RemovalCause
import com.google.common.cache.RemovalListener import com.google.common.cache.RemovalListener
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import com.google.common.util.concurrent.ThreadFactoryBuilder import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.internal.ThreadBox
import net.corda.core.crypto.random63BitValue import net.corda.core.crypto.random63BitValue
import net.corda.core.internal.LazyPool import net.corda.core.internal.LazyPool
import net.corda.core.internal.LazyStickyPool import net.corda.core.internal.LazyStickyPool
import net.corda.core.internal.LifeCycle import net.corda.core.internal.LifeCycle
import net.corda.core.internal.ThreadBox
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationContext
import net.corda.core.utilities.* import net.corda.core.utilities.Try
import net.corda.core.utilities.debug
import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.* import net.corda.nodeapi.*
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.SimpleString
@ -159,7 +162,7 @@ class RPCClientProxyHandler(
ThreadFactoryBuilder().setNameFormat("rpc-client-reaper-%d").setDaemon(true).build() ThreadFactoryBuilder().setNameFormat("rpc-client-reaper-%d").setDaemon(true).build()
) )
reaperScheduledFuture = reaperExecutor!!.scheduleAtFixedRate( reaperScheduledFuture = reaperExecutor!!.scheduleAtFixedRate(
this::reapObservables, this::reapObservablesAndNotify,
rpcConfiguration.reapInterval.toMillis(), rpcConfiguration.reapInterval.toMillis(),
rpcConfiguration.reapInterval.toMillis(), rpcConfiguration.reapInterval.toMillis(),
TimeUnit.MILLISECONDS TimeUnit.MILLISECONDS
@ -268,13 +271,36 @@ class RPCClientProxyHandler(
} }
/** /**
* Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors. * Closes this handler without notifying observables.
* This method clears up only local resources and as such does not block on any network resources.
*/ */
fun close() { fun forceClose() {
close(false)
}
/**
* Closes this handler and sends notifications to all observables, so it can immediately clean up resources.
* Notifications sent to observables are to be acknowledged, therefore this call blocks until all acknowledgements are received.
* If this is not convenient see the [forceClose] method.
* If an observable is not accessible this method may block for a duration of the message broker timeout.
*/
fun notifyServerAndClose() {
close(true)
}
/**
* Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors.
* When observables are to be notified (i.e. the [notify] parameter is true),
* the method blocks until all the messages are acknowledged by the observables.
* Note: If any of the observables is inaccessible, the method blocks for the duration of the timeout set on the message broker.
*
* @param notify whether to notify observables or not.
*/
private fun close(notify: Boolean = true) {
sessionAndConsumer?.sessionFactory?.close() sessionAndConsumer?.sessionFactory?.close()
reaperScheduledFuture?.cancel(false) reaperScheduledFuture?.cancel(false)
observableContext.observableMap.invalidateAll() observableContext.observableMap.invalidateAll()
reapObservables() reapObservables(notify)
reaperExecutor?.shutdownNow() reaperExecutor?.shutdownNow()
sessionAndProducerPool.close().forEach { sessionAndProducerPool.close().forEach {
it.sessionFactory.close() it.sessionFactory.close()
@ -315,8 +341,11 @@ class RPCClientProxyHandler(
lifeCycle.transition(State.SERVER_VERSION_NOT_SET, State.STARTED) lifeCycle.transition(State.SERVER_VERSION_NOT_SET, State.STARTED)
} }
private fun reapObservables() { private fun reapObservablesAndNotify() = reapObservables()
private fun reapObservables(notify: Boolean = true) {
observableContext.observableMap.cleanUp() observableContext.observableMap.cleanUp()
if (!notify) return
val observableIds = observablesToReap.locked { val observableIds = observablesToReap.locked {
if (observables.isNotEmpty()) { if (observables.isNotEmpty()) {
val temporary = observables val temporary = observables

View File

@ -58,8 +58,6 @@ class R3Pty(val name: X500Name, settings: SettingsProvider, dimension: Dimension
executor.submit { executor.submit {
val exitValue = connector.waitFor() val exitValue = connector.waitFor()
log.info("Terminal has exited (value={})", exitValue) log.info("Terminal has exited (value={})", exitValue)
// TODO: Remove this arbitrary sleep when https://github.com/corda/corda/issues/689 is fixed.
try { Thread.sleep(SECONDS.toMillis(2)) } catch (e: InterruptedException) {}
onExit(exitValue) onExit(exitValue)
} }

View File

@ -54,7 +54,7 @@ class NodeRPC(config: NodeConfig, start: (NodeConfig, CordaRPCOps) -> Unit, invo
override fun close() { override fun close() {
timer.cancel() timer.cancel()
try { try {
rpcConnection?.close() rpcConnection?.forceClose()
} catch (e: Exception) { } catch (e: Exception) {
log.error("Failed to close RPC connection (Error: {})", e.message) log.error("Failed to close RPC connection (Error: {})", e.message)
} }