Enforce node death on failure to register with network map (#905)

* Give up polling when result future cancelled
This commit is contained in:
Andrzej Cichocki 2017-06-28 12:07:53 +01:00 committed by GitHub
parent 83fdf678ab
commit e5395fe1b7
9 changed files with 208 additions and 56 deletions

View File

@ -7,7 +7,7 @@ import com.typesafe.config.ConfigValueFactory;
import java.util.List;
import java.util.Map;
public class CordformNode {
public class CordformNode implements NodeDefinition {
protected static final String DEFAULT_HOST = "localhost";
/**

View File

@ -0,0 +1,9 @@
package net.corda.cordform;
import com.typesafe.config.Config;
public interface NodeDefinition {
String getName();
Config getConfig();
}

View File

@ -362,7 +362,7 @@ data class ErrorOr<out A> private constructor(val value: A?, val error: Throwabl
companion object {
/** Runs the given lambda and wraps the result. */
inline fun <T : Any> catch(body: () -> T): ErrorOr<T> {
inline fun <T> catch(body: () -> T): ErrorOr<T> {
return try {
ErrorOr(body())
} catch (t: Throwable) {

View File

@ -0,0 +1,37 @@
package net.corda.core.concurrent
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture
import net.corda.core.catch
import net.corda.core.failure
import net.corda.core.then
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.atomic.AtomicBoolean
/**
* As soon as a given future becomes done, the handler is invoked with that future as its argument.
* The result of the handler is copied into the result future, and the handler isn't invoked again.
* If a given future errors after the result future is done, the error is automatically logged.
*/
fun <S, T> firstOf(vararg futures: ListenableFuture<out S>, handler: (ListenableFuture<out S>) -> T) = firstOf(futures, defaultLog, handler)
private val defaultLog = LoggerFactory.getLogger("net.corda.core.concurrent")
@VisibleForTesting
internal val shortCircuitedTaskFailedMessage = "Short-circuited task failed:"
internal fun <S, T> firstOf(futures: Array<out ListenableFuture<out S>>, log: Logger, handler: (ListenableFuture<out S>) -> T): ListenableFuture<T> {
val resultFuture = SettableFuture.create<T>()
val winnerChosen = AtomicBoolean()
futures.forEach {
it.then {
if (winnerChosen.compareAndSet(false, true)) {
resultFuture.catch { handler(it) }
} else if (!it.isCancelled) {
it.failure { log.error(shortCircuitedTaskFailedMessage, it) }
}
}
}
return resultFuture
}

View File

@ -0,0 +1,78 @@
package net.corda.core.concurrent
import com.google.common.util.concurrent.SettableFuture
import com.nhaarman.mockito_kotlin.*
import net.corda.core.getOrThrow
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import org.slf4j.Logger
import java.io.EOFException
import java.util.concurrent.CancellationException
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class ConcurrencyUtilsTest {
private val f1 = SettableFuture.create<Int>()
private val f2 = SettableFuture.create<Double>()
private var invocations = 0
private val log: Logger = mock<Logger>()
@Test
fun `firstOf short circuit`() {
// Order not significant in this case:
val g = firstOf(arrayOf(f2, f1), log) {
++invocations
it.getOrThrow()
}
f1.set(100)
assertEquals(100, g.getOrThrow())
assertEquals(1, invocations)
verifyNoMoreInteractions(log)
val throwable = EOFException("log me")
f2.setException(throwable)
assertEquals(1, invocations) // Least astonishing to skip handler side-effects.
verify(log).error(eq(shortCircuitedTaskFailedMessage), same(throwable))
}
@Test
fun `firstOf re-entrant handler attempt due to cancel`() {
val futures = arrayOf(f1, f2)
val g = firstOf(futures, log) {
++invocations
futures.forEach { it.cancel(false) } // One handler invocation queued here.
it.getOrThrow()
}
f1.set(100)
assertEquals(100, g.getOrThrow())
assertEquals(1, invocations) // Handler didn't run as g was already done.
verifyNoMoreInteractions(log) // CancellationException is not logged (if due to cancel).
assertTrue(f2.isCancelled)
}
@Test
fun `firstOf re-entrant handler attempt not due to cancel`() {
val futures = arrayOf(f1, f2)
val fakeCancel = CancellationException()
val g = firstOf(futures, log) {
++invocations
futures.forEach { it.setException(fakeCancel) } // One handler attempt here.
it.getOrThrow()
}
f1.set(100)
assertEquals(100, g.getOrThrow())
assertEquals(1, invocations) // Handler didn't run as g was already done.
verify(log).error(eq(shortCircuitedTaskFailedMessage), same(fakeCancel))
assertThatThrownBy { f2.getOrThrow() }.isSameAs(fakeCancel)
}
@Test
fun `firstOf cancel is not special`() {
val g = firstOf(arrayOf(f2, f1), log) {
++invocations
it.getOrThrow() // This can always do something fancy if 'it' was cancelled.
}
f1.cancel(false)
assertThatThrownBy { g.getOrThrow() }.isInstanceOf(CancellationException::class.java)
assertEquals(1, invocations)
verifyNoMoreInteractions(log)
}
}

View File

@ -6,11 +6,15 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow
import net.corda.core.node.services.ServiceInfo
import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.ALICE
import net.corda.testing.driver.driver
import net.corda.node.internal.NodeStartup
import net.corda.node.services.startFlowPermission
import net.corda.nodeapi.User
import net.corda.testing.driver.ListenProcessDeathException
import net.corda.testing.driver.NetworkMapStartStrategy
import net.corda.testing.ProjectStructure.projectRootDir
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
@ -18,6 +22,7 @@ import org.junit.Test
import java.io.*
import java.nio.file.Files
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
class BootTests {
@ -48,6 +53,15 @@ class BootTests {
assertEquals(1, numberOfNodesThatLogged)
}
}
@Test
fun `node quits on failure to register with network map`() {
val tooManyAdvertisedServices = (1..100).map { ServiceInfo(ServiceType.regulator.getSubType("$it")) }.toSet()
driver(networkMapStartStrategy = NetworkMapStartStrategy.Nominated(ALICE.name)) {
val future = startNode(ALICE.name, advertisedServices = tooManyAdvertisedServices)
assertFailsWith(ListenProcessDeathException::class) { future.getOrThrow() }
}
}
}
@StartableByRPC

View File

@ -118,9 +118,6 @@ open class NodeStartup(val args: Array<String>) {
logger.error("Shell failed to start", e)
}
}
} failure {
logger.error("Error during network map registration", it)
exitProcess(1)
}
node.run()
}

View File

@ -2,7 +2,7 @@ package net.corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.ListenableFuture
import net.corda.core.ThreadBox
import net.corda.core.*
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.MessageRecipients
import net.corda.core.messaging.RPCOps
@ -10,9 +10,7 @@ import net.corda.core.messaging.SingleMessageRecipient
import net.corda.core.node.VersionInfo
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.random63BitValue
import net.corda.core.serialization.opaque
import net.corda.core.success
import net.corda.core.transactions.LedgerTransaction
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.trace
@ -236,7 +234,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
private var shutdownLatch = CountDownLatch(1)
private val shutdownLatch = CountDownLatch(1)
private fun processMessage(consumer: ClientConsumer): Boolean {
// Two possibilities here:
@ -286,6 +284,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
while (!networkMapRegistrationFuture.isDone && processMessage(consumer)) {
}
with(networkMapRegistrationFuture) {
if (isDone) getOrThrow() else andForget(log) // Trigger node shutdown here to avoid deadlock in shutdown hooks.
}
}
private fun runPostNetworkMap() {
@ -306,12 +307,15 @@ class NodeMessagingClient(override val config: NodeConfiguration,
* consume all messages via a new consumer without a filter applied.
*/
fun run(serverControl: ActiveMQServerControl) {
try {
// Build the network map.
runPreNetworkMap(serverControl)
// Process everything else once we have the network map.
runPostNetworkMap()
} finally {
shutdownLatch.countDown()
}
}
private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? {
try {

View File

@ -9,7 +9,9 @@ import com.typesafe.config.ConfigRenderOptions
import net.corda.client.rpc.CordaRPCClient
import net.corda.cordform.CordformContext
import net.corda.cordform.CordformNode
import net.corda.cordform.NodeDefinition
import net.corda.core.*
import net.corda.core.concurrent.firstOf
import net.corda.core.crypto.X509Utilities
import net.corda.core.crypto.appendToCommonName
import net.corda.core.crypto.commonName
@ -31,7 +33,6 @@ import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.User
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.nodeapi.config.parseAs
import net.corda.nodeapi.internal.ShutdownHook
import net.corda.nodeapi.internal.addShutdownHook
import net.corda.testing.MOCK_VERSION_INFO
import okhttp3.OkHttpClient
@ -275,19 +276,16 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
coerce: (D) -> DI,
dsl: DI.() -> A
): A {
var shutdownHook: ShutdownHook? = null
val shutdownHook = addShutdownHook(driverDsl::shutdown)
try {
driverDsl.start()
shutdownHook = addShutdownHook {
driverDsl.shutdown()
}
return dsl(coerce(driverDsl))
} catch (exception: Throwable) {
log.error("Driver shutting down because of exception", exception)
throw exception
} finally {
driverDsl.shutdown()
shutdownHook?.cancel()
shutdownHook.cancel()
}
}
@ -295,7 +293,7 @@ fun getTimestampAsDirectoryName(): String {
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now())
}
class ListenProcessDeathException(message: String) : Exception(message)
class ListenProcessDeathException(hostAndPort: HostAndPort, listenProcess: Process) : Exception("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}")
/**
* @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended.
@ -307,7 +305,7 @@ fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: H
fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process? = null): ListenableFuture<Unit> {
return poll(executorService, "address $hostAndPort to bind") {
if (listenProcess != null && !listenProcess.isAlive) {
throw ListenProcessDeathException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}")
throw ListenProcessDeathException(hostAndPort, listenProcess)
}
try {
Socket(hostAndPort.host, hostAndPort.port).close()
@ -340,33 +338,26 @@ fun <A> poll(
warnCount: Int = 120,
check: () -> A?
): ListenableFuture<A> {
val initialResult = check()
val resultFuture = SettableFuture.create<A>()
if (initialResult != null) {
resultFuture.set(initialResult)
return resultFuture
}
var counter = 0
fun schedulePoll() {
executorService.schedule(task@ {
counter++
if (counter == warnCount) {
val task = object : Runnable {
var counter = -1
override fun run() {
if (resultFuture.isCancelled) return // Give up, caller can no longer get the result.
if (++counter == warnCount) {
log.warn("Been polling $pollName for ${pollInterval.multipliedBy(warnCount.toLong()).seconds} seconds...")
}
val result = try {
check()
} catch (t: Throwable) {
resultFuture.setException(t)
return@task
}
if (result == null) {
schedulePoll()
ErrorOr.catch(check).match(onValue = {
if (it != null) {
resultFuture.set(it)
} else {
resultFuture.set(result)
executorService.schedule(this, pollInterval.toMillis(), MILLISECONDS)
}
}, pollInterval.toMillis(), MILLISECONDS)
}, onError = {
resultFuture.setException(it)
})
}
schedulePoll()
}
executorService.submit(task) // The check may be expensive, so always run it in the background even the first time.
return resultFuture
}
@ -518,21 +509,28 @@ class DriverDSL(
_executorService?.shutdownNow()
}
private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): ListenableFuture<CordaRPCOps> {
private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration, processDeathFuture: ListenableFuture<out Throwable>): ListenableFuture<CordaRPCOps> {
val client = CordaRPCClient(nodeAddress, sslConfig)
return poll(executorService, "for RPC connection") {
val connectionFuture = poll(executorService, "RPC connection") {
try {
val connection = client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
shutdownManager.registerShutdown { connection.close() }
return@poll connection.proxy
} catch(e: Exception) {
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
} catch (e: Exception) {
if (processDeathFuture.isDone) throw e
log.error("Exception $e, Retrying RPC connection at $nodeAddress")
null
}
}
return firstOf(connectionFuture, processDeathFuture) {
if (it == processDeathFuture) {
throw processDeathFuture.getOrThrow()
}
val connection = connectionFuture.getOrThrow()
shutdownManager.registerShutdown(connection::close)
connection.proxy
}
}
private fun networkMapServiceConfigLookup(networkMapCandidates: List<CordformNode>): (X500Name) -> Map<String, String>? {
private fun networkMapServiceConfigLookup(networkMapCandidates: List<NodeDefinition>): (X500Name) -> Map<String, String>? {
return networkMapStartStrategy.run {
when (this) {
is NetworkMapStartStrategy.Dedicated -> {
@ -564,6 +562,10 @@ class DriverDSL(
val webAddress = portAllocation.nextHostAndPort()
// TODO: Derive name from the full picked name, don't just wrap the common name
val name = providedName ?: X509Utilities.getX509Name("${oneOf(names).commonName}-${p2pAddress.port}","London","demo@r3.com",null)
val networkMapServiceConfigLookup = networkMapServiceConfigLookup(listOf(object : NodeDefinition {
override fun getName() = name.toString()
override fun getConfig() = configOf("p2pAddress" to p2pAddress.toString())
}))
val config = ConfigHelper.loadConfig(
baseDirectory = baseDirectory(name),
allowMissingConfig = true,
@ -573,7 +575,7 @@ class DriverDSL(
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"extraAdvertisedServiceIds" to advertisedServices.map { it.toString() },
"networkMapService" to networkMapServiceConfigLookup(emptyList())(name),
"networkMapService" to networkMapServiceConfigLookup(name),
"useTestClock" to useTestClock,
"rpcUsers" to rpcUsers.map { it.toMap() },
"verifierType" to verifierType.name
@ -708,7 +710,7 @@ class DriverDSL(
} }
)
return nodeAndThreadFuture.flatMap { (node, thread) ->
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc ->
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, SettableFuture.create()).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().map {
NodeHandle.InProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, node, thread)
}
@ -719,9 +721,20 @@ class DriverDSL(
val processFuture = startOutOfProcessNode(executorService, nodeConfiguration, config, quasarJarPath, debugPort, systemProperties, callerPackage)
registerProcess(processFuture)
return processFuture.flatMap { process ->
val processDeathFuture = poll(executorService, "process death") {
if (process.isAlive) null else ListenProcessDeathException(nodeConfiguration.p2pAddress, process)
}
// We continue to use SSL enabled port for RPC when its for node user.
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc ->
rpc.waitUntilRegisteredWithNetworkMap().map {
establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, processDeathFuture).flatMap { rpc ->
// Call waitUntilRegisteredWithNetworkMap in background in case RPC is failing over:
val networkMapFuture = executorService.submit(Callable {
rpc.waitUntilRegisteredWithNetworkMap()
}).flatMap { it }
firstOf(processDeathFuture, networkMapFuture) {
if (it == processDeathFuture) {
throw processDeathFuture.getOrThrow()
}
processDeathFuture.cancel(false)
NodeHandle.OutOfProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, debugPort, process)
}
}