Provide an API to register callback on app shutdown (#2402)

Provide an API to register callback on app shutdown.
This commit is contained in:
Ben Wyeth 2018-01-24 15:19:24 +00:00 committed by Mike Hearn
parent 3c0e006456
commit d17670c747
7 changed files with 78 additions and 12 deletions

View File

@ -354,4 +354,19 @@ interface ServiceHub : ServicesForResolution {
* @return A new [Connection] * @return A new [Connection]
*/ */
fun jdbcSession(): Connection fun jdbcSession(): Connection
/**
* Allows the registration of a callback that may inform services when the app is shutting down.
*
* The intent is to allow the cleaning up of resources - e.g. releasing ports.
*
* You should not rely on this to clean up executing flows - that's what quasar is for.
*
* Please note that the shutdown handler is not guaranteed to be called. In production the node process may crash,
* be killed by the operating system and other forms of fatal termination may occur that result in this code never
* running. So you should use this functionality only for unit/integration testing or for code that can optimise
* this shutdown e.g. by cleaning up things that would otherwise trigger a slow recovery process next time the
* node starts.
*/
fun registerUnloadHandler(runOnStop: () -> Unit)
} }

View File

@ -162,6 +162,9 @@ UNRELEASED
However, assuming a clean reset of the artemis data and that the nodes are consistent versions, However, assuming a clean reset of the artemis data and that the nodes are consistent versions,
data persisted via the AMQP serializer will be forward compatible. data persisted via the AMQP serializer will be forward compatible.
* The ability for CordaServices to register callbacks so they can be notified of shutdown and clean up resource such as
open ports.
.. _changelog_v1: .. _changelog_v1:
Release 1.0 Release 1.0

View File

@ -31,13 +31,6 @@ import java.util.*
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import kotlin.streams.toList import kotlin.streams.toList
private fun checkQuasarAgent() {
if (!(ManagementFactory.getRuntimeMXBean().inputArguments.any { it.contains("quasar") })) {
throw IllegalStateException("No quasar agent")
}
}
@Ignore("Run these locally") @Ignore("Run these locally")
class NodePerformanceTests { class NodePerformanceTests {
@StartableByRPC @StartableByRPC
@ -52,11 +45,6 @@ class NodePerformanceTests {
val averageMs: Double val averageMs: Double
) )
@Before
fun before() {
checkQuasarAgent()
}
@Test @Test
fun `empty flow per second`() { fun `empty flow per second`() {
driver(startNodesInProcess = true) { driver(startNodesInProcess = true) {

View File

@ -0,0 +1,48 @@
package net.corda.node
import net.corda.core.node.ServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow
import net.corda.testing.DUMMY_BANK_A_NAME
import net.corda.testing.driver.driver
import org.junit.Assert
import org.junit.Test
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
class NodeUnloadHandlerTests {
companion object {
val latch = CountDownLatch(1)
}
@Test
fun `should be able to register run on stop lambda`() {
driver(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.node"), isDebug = true) {
startNode(providedName = DUMMY_BANK_A_NAME).getOrThrow()
// just want to fall off the end of this for the mo...
}
Assert.assertTrue("Timed out waiting for AbstractNode to invoke the test service shutdown callback",latch.await(30, TimeUnit.SECONDS))
}
@CordaService
class RunOnStopTestService(serviceHub: ServiceHub) : SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
}
init {
serviceHub.registerUnloadHandler(this::shutdown)
}
fun shutdown() {
log.info("shutting down")
latch.countDown()
}
}
}

View File

@ -804,6 +804,11 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
} }
override fun jdbcSession(): Connection = database.createSession() override fun jdbcSession(): Connection = database.createSession()
// allows services to register handlers to be informed when the node stop method is called
override fun registerUnloadHandler(handler: () -> Unit) {
runOnStop += handler
}
} }
} }

View File

@ -57,6 +57,7 @@ open class MockServices private constructor(
private val initialIdentity: TestIdentity, private val initialIdentity: TestIdentity,
private val moreKeys: Array<out KeyPair> private val moreKeys: Array<out KeyPair>
) : ServiceHub, StateLoader by validatedTransactions { ) : ServiceHub, StateLoader by validatedTransactions {
companion object { companion object {
@JvmStatic @JvmStatic
val MOCK_VERSION_INFO = VersionInfo(1, "Mock release", "Mock revision", "Mock Vendor") val MOCK_VERSION_INFO = VersionInfo(1, "Mock release", "Mock revision", "Mock Vendor")
@ -157,6 +158,8 @@ open class MockServices private constructor(
} }
override fun jdbcSession(): Connection = throw UnsupportedOperationException() override fun jdbcSession(): Connection = throw UnsupportedOperationException()
override fun registerUnloadHandler(runOnStop: () -> Unit) = throw UnsupportedOperationException()
} }
class MockKeyManagementService(val identityService: IdentityServiceInternal, class MockKeyManagementService(val identityService: IdentityServiceInternal,

View File

@ -54,6 +54,7 @@ import okhttp3.Request
import rx.Observable import rx.Observable
import rx.observables.ConnectableObservable import rx.observables.ConnectableObservable
import rx.schedulers.Schedulers import rx.schedulers.Schedulers
import java.lang.management.ManagementFactory
import java.net.ConnectException import java.net.ConnectException
import java.net.URL import java.net.URL
import java.net.URLClassLoader import java.net.URLClassLoader
@ -737,6 +738,9 @@ class DriverDSLImpl(
): CordaFuture<Pair<StartedNode<Node>, Thread>> { ): CordaFuture<Pair<StartedNode<Node>, Thread>> {
return executorService.fork { return executorService.fork {
log.info("Starting in-process Node ${config.corda.myLegalName.organisation}") log.info("Starting in-process Node ${config.corda.myLegalName.organisation}")
if (!(ManagementFactory.getRuntimeMXBean().inputArguments.any { it.contains("quasar") })) {
throw IllegalStateException("No quasar agent: -javaagent:lib/quasar.jar and working directory project root might fix")
}
// Write node.conf // Write node.conf
writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe) writeConfig(config.corda.baseDirectory, "node.conf", config.typesafe)
// TODO pass the version in? // TODO pass the version in?