CORDA-1693: Ensure that DemoBench's RPC connections terminate on shutdown. (#3468)

This commit is contained in:
Chris Rankin 2018-06-28 16:37:51 +01:00 committed by GitHub
parent d5b5825c09
commit 4267513332
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 39 deletions

View File

@ -12,6 +12,7 @@ import net.corda.demobench.readErrorLines
import tornadofx.*
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.StandardCopyOption.REPLACE_EXISTING
import java.util.concurrent.Executors
@ -123,7 +124,7 @@ class Explorer internal constructor(private val explorerController: ExplorerCont
class ExplorerController : Controller() {
private val jvm by inject<JVMConfig>()
private val explorerPath = jvm.applicationDir.resolve("explorer").resolve("node-explorer.jar")
private val explorerPath: Path = jvm.applicationDir.resolve("explorer").resolve("node-explorer.jar")
init {
log.info("Explorer JAR: $explorerPath")

View File

@ -21,7 +21,7 @@ class R3Pty(val name: CordaX500Name, settings: SettingsProvider, dimension: Dime
val terminal = JediTermWidget(dimension, settings)
val isConnected: Boolean get() = terminal.ttyConnector?.isConnected == true
val isConnected: Boolean get() = terminal.ttyConnector?.isConnected ?: false
override fun close() {
log.info("Closing terminal '{}'", name)
@ -65,5 +65,5 @@ class R3Pty(val name: CordaX500Name, settings: SettingsProvider, dimension: Dime
@Suppress("unused")
@Throws(InterruptedException::class)
fun waitFor(): Int? = terminal.ttyConnector?.waitFor()
fun waitFor(): Int = terminal.ttyConnector?.waitFor() ?: -1
}

View File

@ -12,37 +12,46 @@ import java.util.concurrent.TimeUnit.SECONDS
class NodeRPC(config: NodeConfigWrapper, start: (NodeConfigWrapper, CordaRPCOps) -> Unit, invoke: (CordaRPCOps) -> Unit) : AutoCloseable {
private companion object {
private val log = contextLogger()
val oneSecond = SECONDS.toMillis(1)
private val oneSecond = SECONDS.toMillis(1)
}
private val rpcClient = CordaRPCClient(NetworkHostAndPort("localhost", config.nodeConfig.rpcAddress.port))
@Volatile
private var rpcConnection: CordaRPCConnection? = null
private val timer = Timer()
private val timer = Timer("DemoBench NodeRPC (${config.key})", true)
@Volatile
private var timerThread: Thread? = null
init {
val setupTask = object : TimerTask() {
override fun run() {
try {
val user = config.nodeConfig.rpcUsers[0]
val connection = rpcClient.start(user.username, user.password)
rpcConnection = connection
val ops = connection.proxy
// Grab the timer's thread so that we know which one to interrupt.
// This looks like the simplest way of getting the thread. (Ugh)
timerThread = Thread.currentThread()
// Cancel the "setup" task now that we've created the RPC client.
this.cancel()
// Run "start-up" task, now that the RPC client is ready.
start(config, ops)
// Schedule a new task that will refresh the display once per second.
timer.schedule(object : TimerTask() {
override fun run() {
invoke(ops)
}
}, 0, oneSecond)
val user = config.nodeConfig.rpcUsers[0]
val ops: CordaRPCOps = try {
rpcClient.start(user.username, user.password).let { connection ->
rpcConnection = connection
connection.proxy
}
} catch (e: Exception) {
log.warn("Node '{}' not ready yet (Error: {})", config.nodeConfig.myLegalName, e.message)
return
}
// Cancel the "setup" task now that we've created the RPC client.
cancel()
// Run "start-up" task, now that the RPC client is ready.
start(config, ops)
// Schedule a new task that will refresh the display once per second.
timer.schedule(object : TimerTask() {
override fun run() {
invoke(ops)
}
}, 0, oneSecond)
}
}
@ -53,9 +62,11 @@ class NodeRPC(config: NodeConfigWrapper, start: (NodeConfigWrapper, CordaRPCOps)
override fun close() {
timer.cancel()
try {
// No need to notify the node because it's also shutting down.
rpcConnection?.forceClose()
} catch (e: Exception) {
log.error("Failed to close RPC connection (Error: {})", e.message)
}
timerThread?.interrupt()
}
}

View File

@ -42,7 +42,7 @@ class NodeTerminalView : Fragment() {
override val root by fxml<VBox>()
private companion object {
val pageSpecification = PageSpecification(1, 1)
private val pageSpecification = PageSpecification(1, 1)
}
private val nodeController by inject<NodeController>()
@ -86,7 +86,7 @@ class NodeTerminalView : Fragment() {
root.children.add(stack)
root.isVisible = true
SwingUtilities.invokeLater({
SwingUtilities.invokeLater {
val r3pty = R3Pty(config.nodeConfig.myLegalName, TerminalSettingsProvider(), Dimension(160, 80), onExit)
pty = r3pty
@ -112,7 +112,7 @@ class NodeTerminalView : Fragment() {
rpc?.close()
}
}
})
}
}
/*
@ -181,9 +181,9 @@ class NodeTerminalView : Fragment() {
}
private fun launchRPC(config: NodeConfigWrapper) = NodeRPC(
config = config,
start = this::initialise,
invoke = this::pollCashBalances
config = config,
start = ::initialise,
invoke = ::pollCashBalances
)
private fun initialise(config: NodeConfigWrapper, ops: CordaRPCOps) {
@ -234,8 +234,8 @@ class NodeTerminalView : Fragment() {
private fun pollCashBalances(ops: CordaRPCOps) {
try {
val cashBalances = ops.getCashBalances().entries.joinToString(
separator = ", ",
transform = { e -> e.value.toString() }
separator = ", ",
transform = { e -> e.value.toString() }
)
Platform.runLater {
@ -252,21 +252,18 @@ class NodeTerminalView : Fragment() {
header.isDisable = true
subscriptions.forEach {
// Don't allow any exceptions here to halt tab destruction.
try {
it.unsubscribe()
} catch (e: Exception) {
}
ignoreExceptions { it.unsubscribe() }
}
webServer.close()
explorer.close()
viewer.close()
rpc?.close()
ignoreExceptions { webServer.close() }
ignoreExceptions { explorer.close() }
ignoreExceptions { viewer.close() }
ignoreExceptions { rpc?.close() }
}
fun destroy() {
if (!isDestroyed) {
shutdown()
pty?.close()
ignoreExceptions { pty?.close() }
isDestroyed = true
}
}
@ -279,6 +276,10 @@ class NodeTerminalView : Fragment() {
}
}
private fun ignoreExceptions(op: () -> Unit) {
try { op() } catch (e: Exception) {}
}
class TerminalSettingsProvider : DefaultSettingsProvider() {
override fun getDefaultStyle() = TextStyle(TerminalColor.WHITE, TerminalColor.rgb(50, 50, 50))
override fun emulateX11CopyPaste() = true