diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt index 94fa65a018..cca836d858 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClient.kt @@ -5,10 +5,10 @@ import net.corda.core.internal.logElapsedTime import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationContext import net.corda.core.serialization.SerializationDefaults -import net.corda.core.utilities.minutes -import net.corda.core.utilities.seconds import net.corda.core.utilities.NetworkHostAndPort import net.corda.core.utilities.loggerFor +import net.corda.core.utilities.minutes +import net.corda.core.utilities.seconds import net.corda.nodeapi.ArtemisTcpTransport.Companion.tcpTransport import net.corda.nodeapi.ConnectionDirection import net.corda.nodeapi.RPCApi @@ -110,6 +110,21 @@ class RPCClient( val proxy: I /** The RPC protocol version reported by the server */ val serverProtocolVersion: Int + + /** + * Closes this client without notifying the server. + * The server will eventually clear out the RPC message queue and disconnect subscribed observers, + * but this may take longer than desired, so to conserve resources you should normally use [notifyServerAndClose]. + * This method is helpful when the node may be shutting down or + * have already shut down and you don't want to block waiting for it to come back. + */ + fun forceClose() + + /** + * Closes this client gracefully by sending a notification to the server, so it can immediately clean up resources. + * If the server is not available this method may block for a short period until it's clear the server is not coming back. + */ + fun notifyServerAndClose() } /** @@ -168,13 +183,30 @@ class RPCClient( object : RPCConnection { override val proxy = ops override val serverProtocolVersion = serverProtocolVersion - override fun close() { - proxyHandler.close() + + private fun close(notify: Boolean) { + if (notify) { + proxyHandler.notifyServerAndClose() + } else { + proxyHandler.forceClose() + } serverLocator.close() } + + override fun notifyServerAndClose() { + close(true) + } + + override fun forceClose() { + close(false) + } + + override fun close() { + close(true) + } } } catch (exception: Throwable) { - proxyHandler.close() + proxyHandler.notifyServerAndClose() serverLocator.close() throw exception } diff --git a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt index 47a20df007..1cd7974718 100644 --- a/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt +++ b/client/rpc/src/main/kotlin/net/corda/client/rpc/internal/RPCClientProxyHandler.kt @@ -10,14 +10,17 @@ import com.google.common.cache.RemovalCause import com.google.common.cache.RemovalListener import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.ThreadFactoryBuilder -import net.corda.core.internal.ThreadBox import net.corda.core.crypto.random63BitValue import net.corda.core.internal.LazyPool import net.corda.core.internal.LazyStickyPool import net.corda.core.internal.LifeCycle +import net.corda.core.internal.ThreadBox import net.corda.core.messaging.RPCOps import net.corda.core.serialization.SerializationContext -import net.corda.core.utilities.* +import net.corda.core.utilities.Try +import net.corda.core.utilities.debug +import net.corda.core.utilities.getOrThrow +import net.corda.core.utilities.loggerFor import net.corda.nodeapi.* import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration import org.apache.activemq.artemis.api.core.SimpleString @@ -159,7 +162,7 @@ class RPCClientProxyHandler( ThreadFactoryBuilder().setNameFormat("rpc-client-reaper-%d").setDaemon(true).build() ) reaperScheduledFuture = reaperExecutor!!.scheduleAtFixedRate( - this::reapObservables, + this::reapObservablesAndNotify, rpcConfiguration.reapInterval.toMillis(), rpcConfiguration.reapInterval.toMillis(), TimeUnit.MILLISECONDS @@ -268,13 +271,36 @@ class RPCClientProxyHandler( } /** - * Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors. + * Closes this handler without notifying observables. + * This method clears up only local resources and as such does not block on any network resources. */ - fun close() { + fun forceClose() { + close(false) + } + + /** + * Closes this handler and sends notifications to all observables, so it can immediately clean up resources. + * Notifications sent to observables are to be acknowledged, therefore this call blocks until all acknowledgements are received. + * If this is not convenient see the [forceClose] method. + * If an observable is not accessible this method may block for a duration of the message broker timeout. + */ + fun notifyServerAndClose() { + close(true) + } + + /** + * Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors. + * When observables are to be notified (i.e. the [notify] parameter is true), + * the method blocks until all the messages are acknowledged by the observables. + * Note: If any of the observables is inaccessible, the method blocks for the duration of the timeout set on the message broker. + * + * @param notify whether to notify observables or not. + */ + private fun close(notify: Boolean = true) { sessionAndConsumer?.sessionFactory?.close() reaperScheduledFuture?.cancel(false) observableContext.observableMap.invalidateAll() - reapObservables() + reapObservables(notify) reaperExecutor?.shutdownNow() sessionAndProducerPool.close().forEach { it.sessionFactory.close() @@ -315,8 +341,11 @@ class RPCClientProxyHandler( lifeCycle.transition(State.SERVER_VERSION_NOT_SET, State.STARTED) } - private fun reapObservables() { + private fun reapObservablesAndNotify() = reapObservables() + + private fun reapObservables(notify: Boolean = true) { observableContext.observableMap.cleanUp() + if (!notify) return val observableIds = observablesToReap.locked { if (observables.isNotEmpty()) { val temporary = observables diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/pty/R3Pty.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/pty/R3Pty.kt index 734aec73b4..64961f747a 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/pty/R3Pty.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/pty/R3Pty.kt @@ -58,8 +58,6 @@ class R3Pty(val name: X500Name, settings: SettingsProvider, dimension: Dimension executor.submit { val exitValue = connector.waitFor() log.info("Terminal has exited (value={})", exitValue) - // TODO: Remove this arbitrary sleep when https://github.com/corda/corda/issues/689 is fixed. - try { Thread.sleep(SECONDS.toMillis(2)) } catch (e: InterruptedException) {} onExit(exitValue) } diff --git a/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt b/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt index baeb40ba59..339eed00ad 100644 --- a/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt +++ b/tools/demobench/src/main/kotlin/net/corda/demobench/rpc/NodeRPC.kt @@ -54,7 +54,7 @@ class NodeRPC(config: NodeConfig, start: (NodeConfig, CordaRPCOps) -> Unit, invo override fun close() { timer.cancel() try { - rpcConnection?.close() + rpcConnection?.forceClose() } catch (e: Exception) { log.error("Failed to close RPC connection (Error: {})", e.message) }