mirror of
https://github.com/corda/corda.git
synced 2025-06-01 15:10:54 +00:00
Merge pull request #7173 from corda/ENT-6511-gracefulShutdown-backport-from-4.6
ENT-6511: graceful shutdown backport from 4.6
This commit is contained in:
commit
3c70454d86
@ -293,4 +293,27 @@ class CordaRPCClientReconnectionTest {
|
|||||||
.isInstanceOf(RPCException::class.java)
|
.isInstanceOf(RPCException::class.java)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=300_000)
|
||||||
|
fun `rpc client does not attempt to reconnect after shutdown`() {
|
||||||
|
driver(DriverParameters(cordappsForAllNodes = emptyList())) {
|
||||||
|
val address = NetworkHostAndPort("localhost", portAllocator.nextPort())
|
||||||
|
fun startNode(): NodeHandle {
|
||||||
|
return startNode(
|
||||||
|
providedName = CHARLIE_NAME,
|
||||||
|
rpcUsers = listOf(CordaRPCClientTest.rpcUser),
|
||||||
|
customOverrides = mapOf("rpcSettings.address" to address.toString())
|
||||||
|
).getOrThrow()
|
||||||
|
}
|
||||||
|
|
||||||
|
val node = startNode()
|
||||||
|
val client = CordaRPCClient(node.rpcAddress, config)
|
||||||
|
(client.start(rpcUser.username, rpcUser.password, gracefulReconnect = gracefulReconnect)).use {
|
||||||
|
val rpcOps = it.proxy as ReconnectingCordaRPCOps
|
||||||
|
rpcOps.shutdown()
|
||||||
|
// If we get here we know we're not stuck in a reconnect cycle with a node that's been shut down
|
||||||
|
assertThat(rpcOps.reconnectingRPCConnection.isClosed())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -292,6 +292,7 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
}
|
}
|
||||||
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
|
private class ErrorInterceptingHandler(val reconnectingRPCConnection: ReconnectingRPCConnection) : InvocationHandler {
|
||||||
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
|
private fun Method.isStartFlow() = name.startsWith("startFlow") || name.startsWith("startTrackedFlow")
|
||||||
|
private fun Method.isShutdown() = name == "shutdown" || name == "gracefulShutdown" || name == "terminate"
|
||||||
|
|
||||||
private fun checkIfIsStartFlow(method: Method, e: InvocationTargetException) {
|
private fun checkIfIsStartFlow(method: Method, e: InvocationTargetException) {
|
||||||
if (method.isStartFlow()) {
|
if (method.isStartFlow()) {
|
||||||
@ -306,12 +307,12 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
*
|
*
|
||||||
* A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed.
|
* A negative number for [maxNumberOfAttempts] means an unlimited number of retries will be performed.
|
||||||
*/
|
*/
|
||||||
@Suppress("ThrowsCount", "ComplexMethod")
|
@Suppress("ThrowsCount", "ComplexMethod", "NestedBlockDepth")
|
||||||
private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfAttempts: Int): Any? {
|
private fun doInvoke(method: Method, args: Array<out Any>?, maxNumberOfAttempts: Int): Any? {
|
||||||
checkIfClosed()
|
checkIfClosed()
|
||||||
var remainingAttempts = maxNumberOfAttempts
|
var remainingAttempts = maxNumberOfAttempts
|
||||||
var lastException: Throwable? = null
|
var lastException: Throwable? = null
|
||||||
while (remainingAttempts != 0) {
|
while (remainingAttempts != 0 && !reconnectingRPCConnection.isClosed()) {
|
||||||
try {
|
try {
|
||||||
log.debug { "Invoking RPC $method..." }
|
log.debug { "Invoking RPC $method..." }
|
||||||
return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
|
return method.invoke(reconnectingRPCConnection.proxy, *(args ?: emptyArray())).also {
|
||||||
@ -324,9 +325,14 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
reconnectingRPCConnection.reconnectOnError(e)
|
reconnectingRPCConnection.reconnectOnError(e)
|
||||||
}
|
}
|
||||||
is ConnectionFailureException -> {
|
is ConnectionFailureException -> {
|
||||||
log.warn("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
|
if (method.isShutdown()) {
|
||||||
reconnectingRPCConnection.reconnectOnError(e)
|
log.debug("Shutdown invoked, stop reconnecting.", e)
|
||||||
checkIfIsStartFlow(method, e)
|
reconnectingRPCConnection.notifyServerAndClose()
|
||||||
|
} else {
|
||||||
|
log.warn("Failed to perform operation ${method.name}. Connection dropped. Retrying....", e)
|
||||||
|
reconnectingRPCConnection.reconnectOnError(e)
|
||||||
|
checkIfIsStartFlow(method, e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
is RPCException -> {
|
is RPCException -> {
|
||||||
rethrowIfUnrecoverable(e.targetException as RPCException)
|
rethrowIfUnrecoverable(e.targetException as RPCException)
|
||||||
@ -349,6 +355,7 @@ class ReconnectingCordaRPCOps private constructor(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (reconnectingRPCConnection.isClosed()) return null
|
||||||
throw MaxRpcRetryException(maxNumberOfAttempts, method, lastException)
|
throw MaxRpcRetryException(maxNumberOfAttempts, method, lastException)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,6 +111,8 @@ object InteractiveShell {
|
|||||||
YAML
|
YAML
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun isShutdownCmd(cmd: String) = cmd == "shutdown" || cmd == "gracefulShutdown" || cmd == "terminate"
|
||||||
|
|
||||||
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) {
|
fun startShell(configuration: ShellConfiguration, classLoader: ClassLoader? = null, standalone: Boolean = false) {
|
||||||
makeRPCConnection = { username: String, password: String ->
|
makeRPCConnection = { username: String, password: String ->
|
||||||
val connection = if (standalone) {
|
val connection = if (standalone) {
|
||||||
@ -623,6 +625,10 @@ object InteractiveShell {
|
|||||||
throw e.rootCause
|
throw e.rootCause
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (isShutdownCmd(cmd)) {
|
||||||
|
out.println("Called 'shutdown' on the node.\nQuitting the shell now.").also { out.flush() }
|
||||||
|
onExit.invoke()
|
||||||
|
}
|
||||||
} catch (e: StringToMethodCallParser.UnparseableCallException) {
|
} catch (e: StringToMethodCallParser.UnparseableCallException) {
|
||||||
out.println(e.message, Decoration.bold, Color.red)
|
out.println(e.message, Decoration.bold, Color.red)
|
||||||
if (e !is StringToMethodCallParser.UnparseableCallException.NoSuchFile) {
|
if (e !is StringToMethodCallParser.UnparseableCallException.NoSuchFile) {
|
||||||
@ -634,10 +640,6 @@ object InteractiveShell {
|
|||||||
InputStreamSerializer.invokeContext = null
|
InputStreamSerializer.invokeContext = null
|
||||||
InputStreamDeserializer.closeAll()
|
InputStreamDeserializer.closeAll()
|
||||||
}
|
}
|
||||||
if (cmd == "shutdown") {
|
|
||||||
out.println("Called 'shutdown' on the node.\nQuitting the shell now.").also { out.flush() }
|
|
||||||
onExit.invoke()
|
|
||||||
}
|
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user