mirror of
https://github.com/corda/corda.git
synced 2025-02-21 09:51:57 +00:00
CORDA-1686: Make Node Explorer release its RPC connection on shutdown. (#3457)
* Make Node Explorer release its RPC connection on shutdown. * Declare Explorer's login() function as tail-recursive. * Replace lateinit rpcConnection with oridinary var. * Notify the node when closing an RPC connection gracefully.
This commit is contained in:
parent
82f79387a3
commit
76c114502e
@ -40,7 +40,7 @@ data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val mess
|
|||||||
/**
|
/**
|
||||||
* This model exposes raw event streams to and from the node.
|
* This model exposes raw event streams to and from the node.
|
||||||
*/
|
*/
|
||||||
class NodeMonitorModel {
|
class NodeMonitorModel : AutoCloseable {
|
||||||
|
|
||||||
private val retryableStateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
|
private val retryableStateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
|
||||||
private val stateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
|
private val stateMachineUpdatesSubject = PublishSubject.create<StateMachineUpdate>()
|
||||||
@ -49,6 +49,7 @@ class NodeMonitorModel {
|
|||||||
private val stateMachineTransactionMappingSubject = PublishSubject.create<StateMachineTransactionMapping>()
|
private val stateMachineTransactionMappingSubject = PublishSubject.create<StateMachineTransactionMapping>()
|
||||||
private val progressTrackingSubject = PublishSubject.create<ProgressTrackingEvent>()
|
private val progressTrackingSubject = PublishSubject.create<ProgressTrackingEvent>()
|
||||||
private val networkMapSubject = PublishSubject.create<MapChange>()
|
private val networkMapSubject = PublishSubject.create<MapChange>()
|
||||||
|
private var rpcConnection: CordaRPCConnection? = null
|
||||||
|
|
||||||
val stateMachineUpdates: Observable<StateMachineUpdate> = stateMachineUpdatesSubject
|
val stateMachineUpdates: Observable<StateMachineUpdate> = stateMachineUpdatesSubject
|
||||||
val vaultUpdates: Observable<Vault.Update<ContractState>> = vaultUpdatesSubject
|
val vaultUpdates: Observable<Vault.Update<ContractState>> = vaultUpdatesSubject
|
||||||
@ -84,6 +85,17 @@ class NodeMonitorModel {
|
|||||||
*/
|
*/
|
||||||
class CordaRPCOpsWrapper(val cordaRPCOps: CordaRPCOps)
|
class CordaRPCOpsWrapper(val cordaRPCOps: CordaRPCOps)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disconnects from the Corda node for a clean client shutdown.
|
||||||
|
*/
|
||||||
|
override fun close() {
|
||||||
|
try {
|
||||||
|
rpcConnection?.notifyServerAndClose()
|
||||||
|
} catch (e: Exception) {
|
||||||
|
logger.error("Error closing RPC connection to node", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register for updates to/from a given vault.
|
* Register for updates to/from a given vault.
|
||||||
* TODO provide an unsubscribe mechanism
|
* TODO provide an unsubscribe mechanism
|
||||||
@ -145,9 +157,10 @@ class NodeMonitorModel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): List<StateMachineInfo> {
|
private fun performRpcReconnect(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): List<StateMachineInfo> {
|
||||||
|
val proxy = establishConnectionWithRetry(nodeHostAndPort, username, password, shouldRetry).let { connection ->
|
||||||
val connection = establishConnectionWithRetry(nodeHostAndPort, username, password, shouldRetry)
|
rpcConnection = connection
|
||||||
val proxy = connection.proxy
|
connection.proxy
|
||||||
|
}
|
||||||
|
|
||||||
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
val (stateMachineInfos, stateMachineUpdatesRaw) = proxy.stateMachinesFeed()
|
||||||
|
|
||||||
@ -162,7 +175,7 @@ class NodeMonitorModel {
|
|||||||
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
|
// It is good idea to close connection to properly mark the end of it. During re-connect we will create a new
|
||||||
// client and a new connection, so no going back to this one. Also the server might be down, so we are
|
// client and a new connection, so no going back to this one. Also the server might be down, so we are
|
||||||
// force closing the connection to avoid propagation of notification to the server side.
|
// force closing the connection to avoid propagation of notification to the server side.
|
||||||
connection.forceClose()
|
rpcConnection?.forceClose()
|
||||||
// Perform re-connect.
|
// Perform re-connect.
|
||||||
performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = true)
|
performRpcReconnect(nodeHostAndPort, username, password, shouldRetry = true)
|
||||||
})
|
})
|
||||||
@ -175,18 +188,17 @@ class NodeMonitorModel {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): CordaRPCConnection {
|
private fun establishConnectionWithRetry(nodeHostAndPort: NetworkHostAndPort, username: String, password: String, shouldRetry: Boolean): CordaRPCConnection {
|
||||||
|
|
||||||
val retryInterval = 5.seconds
|
val retryInterval = 5.seconds
|
||||||
|
|
||||||
|
val client = CordaRPCClient(
|
||||||
|
nodeHostAndPort,
|
||||||
|
CordaRPCClientConfiguration.DEFAULT.copy(
|
||||||
|
connectionMaxRetryInterval = retryInterval
|
||||||
|
)
|
||||||
|
)
|
||||||
do {
|
do {
|
||||||
val connection = try {
|
val connection = try {
|
||||||
logger.info("Connecting to: $nodeHostAndPort")
|
logger.info("Connecting to: $nodeHostAndPort")
|
||||||
val client = CordaRPCClient(
|
|
||||||
nodeHostAndPort,
|
|
||||||
CordaRPCClientConfiguration.DEFAULT.copy(
|
|
||||||
connectionMaxRetryInterval = retryInterval
|
|
||||||
)
|
|
||||||
)
|
|
||||||
val _connection = client.start(username, password)
|
val _connection = client.start(username, password)
|
||||||
// Check connection is truly operational before returning it.
|
// Check connection is truly operational before returning it.
|
||||||
val nodeInfo = _connection.proxy.nodeInfo()
|
val nodeInfo = _connection.proxy.nodeInfo()
|
||||||
@ -195,7 +207,7 @@ class NodeMonitorModel {
|
|||||||
} catch (throwable: Throwable) {
|
} catch (throwable: Throwable) {
|
||||||
if (shouldRetry) {
|
if (shouldRetry) {
|
||||||
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
// Deliberately not logging full stack trace as it will be full of internal stacktraces.
|
||||||
logger.info("Exception upon establishing connection: " + throwable.message)
|
logger.info("Exception upon establishing connection: {}", throwable.message)
|
||||||
null
|
null
|
||||||
} else {
|
} else {
|
||||||
throw throwable
|
throw throwable
|
||||||
|
@ -10,6 +10,7 @@ import javafx.stage.Stage
|
|||||||
import jfxtras.resources.JFXtrasFontRoboto
|
import jfxtras.resources.JFXtrasFontRoboto
|
||||||
import joptsimple.OptionParser
|
import joptsimple.OptionParser
|
||||||
import net.corda.client.jfx.model.Models
|
import net.corda.client.jfx.model.Models
|
||||||
|
import net.corda.client.jfx.model.NodeMonitorModel
|
||||||
import net.corda.client.jfx.model.observableValue
|
import net.corda.client.jfx.model.observableValue
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.explorer.model.CordaViewModel
|
import net.corda.explorer.model.CordaViewModel
|
||||||
@ -21,6 +22,7 @@ import org.controlsfx.dialog.ExceptionDialog
|
|||||||
import tornadofx.App
|
import tornadofx.App
|
||||||
import tornadofx.addStageIcon
|
import tornadofx.addStageIcon
|
||||||
import tornadofx.find
|
import tornadofx.find
|
||||||
|
import kotlin.system.exitProcess
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main class for Explorer, you will need Tornado FX to run the explorer.
|
* Main class for Explorer, you will need Tornado FX to run the explorer.
|
||||||
@ -34,6 +36,8 @@ class Main : App(MainView::class) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun start(stage: Stage) {
|
override fun start(stage: Stage) {
|
||||||
|
var nodeModel: NodeMonitorModel? = null
|
||||||
|
|
||||||
// Login to Corda node
|
// Login to Corda node
|
||||||
super.start(stage)
|
super.start(stage)
|
||||||
stage.minHeight = 600.0
|
stage.minHeight = 600.0
|
||||||
@ -43,27 +47,29 @@ class Main : App(MainView::class) {
|
|||||||
val button = Alert(Alert.AlertType.CONFIRMATION, "Are you sure you want to exit Corda explorer?").apply {
|
val button = Alert(Alert.AlertType.CONFIRMATION, "Are you sure you want to exit Corda explorer?").apply {
|
||||||
initOwner(stage.scene.window)
|
initOwner(stage.scene.window)
|
||||||
}.showAndWait().get()
|
}.showAndWait().get()
|
||||||
if (button != ButtonType.OK) it.consume()
|
if (button == ButtonType.OK) {
|
||||||
|
nodeModel?.close()
|
||||||
|
} else {
|
||||||
|
it.consume()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val hostname = parameters.named["host"]
|
val hostname = parameters.named["host"]
|
||||||
val port = asInteger(parameters.named["port"])
|
val port = asInteger(parameters.named["port"])
|
||||||
val username = parameters.named["username"]
|
val username = parameters.named["username"]
|
||||||
val password = parameters.named["password"]
|
val password = parameters.named["password"]
|
||||||
var isLoggedIn = false
|
|
||||||
|
|
||||||
if ((hostname != null) && (port != null) && (username != null) && (password != null)) {
|
if ((hostname != null) && (port != null) && (username != null) && (password != null)) {
|
||||||
try {
|
try {
|
||||||
loginView.login(hostname, port, username, password)
|
nodeModel = loginView.login(hostname, port, username, password)
|
||||||
isLoggedIn = true
|
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
ExceptionDialog(e).apply { initOwner(stage.scene.window) }.showAndWait()
|
ExceptionDialog(e).apply { initOwner(stage.scene.window) }.showAndWait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isLoggedIn) {
|
if (nodeModel == null) {
|
||||||
stage.hide()
|
stage.hide()
|
||||||
loginView.login()
|
nodeModel = loginView.login()
|
||||||
stage.show()
|
stage.show()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -84,7 +90,7 @@ class Main : App(MainView::class) {
|
|||||||
runInFxApplicationThread {
|
runInFxApplicationThread {
|
||||||
// [showAndWait] need to be in the FX thread.
|
// [showAndWait] need to be in the FX thread.
|
||||||
ExceptionDialog(throwable).showAndWait()
|
ExceptionDialog(throwable).showAndWait()
|
||||||
System.exit(1)
|
exitProcess(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Do this first before creating the notification bar, so it can autosize itself properly.
|
// Do this first before creating the notification bar, so it can autosize itself properly.
|
||||||
|
@ -27,11 +27,14 @@ class LoginView : View(WINDOW_TITLE) {
|
|||||||
private val port by objectProperty(SettingsModel::portProperty)
|
private val port by objectProperty(SettingsModel::portProperty)
|
||||||
private val fullscreen by objectProperty(SettingsModel::fullscreenProperty)
|
private val fullscreen by objectProperty(SettingsModel::fullscreenProperty)
|
||||||
|
|
||||||
fun login(host: String, port: Int, username: String, password: String) {
|
fun login(host: String, port: Int, username: String, password: String): NodeMonitorModel {
|
||||||
getModel<NodeMonitorModel>().register(NetworkHostAndPort(host, port), username, password)
|
return getModel<NodeMonitorModel>().apply {
|
||||||
|
register(NetworkHostAndPort(host, port), username, password)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun login() {
|
tailrec fun login(): NodeMonitorModel? {
|
||||||
|
var nodeModel: NodeMonitorModel? = null
|
||||||
val status = Dialog<LoginStatus>().apply {
|
val status = Dialog<LoginStatus>().apply {
|
||||||
dialogPane = root
|
dialogPane = root
|
||||||
setResultConverter {
|
setResultConverter {
|
||||||
@ -39,7 +42,7 @@ class LoginView : View(WINDOW_TITLE) {
|
|||||||
ButtonBar.ButtonData.OK_DONE -> try {
|
ButtonBar.ButtonData.OK_DONE -> try {
|
||||||
root.isDisable = true
|
root.isDisable = true
|
||||||
// TODO : Run this async to avoid UI lockup.
|
// TODO : Run this async to avoid UI lockup.
|
||||||
login(hostTextField.text, portProperty.value, usernameTextField.text, passwordTextField.text)
|
nodeModel = login(hostTextField.text, portProperty.value, usernameTextField.text, passwordTextField.text)
|
||||||
if (!rememberMe.value) {
|
if (!rememberMe.value) {
|
||||||
username.value = ""
|
username.value = ""
|
||||||
host.value = ""
|
host.value = ""
|
||||||
@ -64,12 +67,13 @@ class LoginView : View(WINDOW_TITLE) {
|
|||||||
initOwner(root.scene.window)
|
initOwner(root.scene.window)
|
||||||
}.showAndWait().get()
|
}.showAndWait().get()
|
||||||
if (button == ButtonType.OK) {
|
if (button == ButtonType.OK) {
|
||||||
|
nodeModel?.close()
|
||||||
exitProcess(0)
|
exitProcess(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}.showAndWait().get()
|
}.showAndWait().get()
|
||||||
if (status != LoginStatus.loggedIn) login()
|
return if (status == LoginStatus.loggedIn) nodeModel else login()
|
||||||
}
|
}
|
||||||
|
|
||||||
init {
|
init {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user