From e5395fe1b7c8a0918684c27e3d6ddec5a9dddbff Mon Sep 17 00:00:00 2001 From: Andrzej Cichocki Date: Wed, 28 Jun 2017 12:07:53 +0100 Subject: [PATCH] Enforce node death on failure to register with network map (#905) * Give up polling when result future cancelled --- .../java/net/corda/cordform/CordformNode.java | 2 +- .../net/corda/cordform/NodeDefinition.java | 9 ++ core/src/main/kotlin/net/corda/core/Utils.kt | 2 +- .../corda/core/concurrent/ConcurrencyUtils.kt | 37 +++++++ .../core/concurrent/ConcurrencyUtilsTest.kt | 78 +++++++++++++++ .../kotlin/net/corda/node/BootTests.kt | 14 +++ .../net/corda/node/internal/NodeStartup.kt | 3 - .../services/messaging/NodeMessagingClient.kt | 22 +++-- .../kotlin/net/corda/testing/driver/Driver.kt | 97 +++++++++++-------- 9 files changed, 208 insertions(+), 56 deletions(-) create mode 100644 cordform-common/src/main/java/net/corda/cordform/NodeDefinition.java create mode 100644 core/src/main/kotlin/net/corda/core/concurrent/ConcurrencyUtils.kt create mode 100644 core/src/test/kotlin/net/corda/core/concurrent/ConcurrencyUtilsTest.kt diff --git a/cordform-common/src/main/java/net/corda/cordform/CordformNode.java b/cordform-common/src/main/java/net/corda/cordform/CordformNode.java index 66e0ba9ca0..80a9a3795a 100644 --- a/cordform-common/src/main/java/net/corda/cordform/CordformNode.java +++ b/cordform-common/src/main/java/net/corda/cordform/CordformNode.java @@ -7,7 +7,7 @@ import com.typesafe.config.ConfigValueFactory; import java.util.List; import java.util.Map; -public class CordformNode { +public class CordformNode implements NodeDefinition { protected static final String DEFAULT_HOST = "localhost"; /** diff --git a/cordform-common/src/main/java/net/corda/cordform/NodeDefinition.java b/cordform-common/src/main/java/net/corda/cordform/NodeDefinition.java new file mode 100644 index 0000000000..0b86b98627 --- /dev/null +++ b/cordform-common/src/main/java/net/corda/cordform/NodeDefinition.java @@ -0,0 +1,9 @@ +package net.corda.cordform; + +import com.typesafe.config.Config; + +public interface NodeDefinition { + String getName(); + + Config getConfig(); +} diff --git a/core/src/main/kotlin/net/corda/core/Utils.kt b/core/src/main/kotlin/net/corda/core/Utils.kt index 2c26c227c2..021e6e194d 100644 --- a/core/src/main/kotlin/net/corda/core/Utils.kt +++ b/core/src/main/kotlin/net/corda/core/Utils.kt @@ -362,7 +362,7 @@ data class ErrorOr private constructor(val value: A?, val error: Throwabl companion object { /** Runs the given lambda and wraps the result. */ - inline fun catch(body: () -> T): ErrorOr { + inline fun catch(body: () -> T): ErrorOr { return try { ErrorOr(body()) } catch (t: Throwable) { diff --git a/core/src/main/kotlin/net/corda/core/concurrent/ConcurrencyUtils.kt b/core/src/main/kotlin/net/corda/core/concurrent/ConcurrencyUtils.kt new file mode 100644 index 0000000000..11750fe3e8 --- /dev/null +++ b/core/src/main/kotlin/net/corda/core/concurrent/ConcurrencyUtils.kt @@ -0,0 +1,37 @@ +package net.corda.core.concurrent + +import com.google.common.annotations.VisibleForTesting +import com.google.common.util.concurrent.ListenableFuture +import com.google.common.util.concurrent.SettableFuture +import net.corda.core.catch +import net.corda.core.failure +import net.corda.core.then +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import java.util.concurrent.atomic.AtomicBoolean + +/** + * As soon as a given future becomes done, the handler is invoked with that future as its argument. + * The result of the handler is copied into the result future, and the handler isn't invoked again. + * If a given future errors after the result future is done, the error is automatically logged. + */ +fun firstOf(vararg futures: ListenableFuture, handler: (ListenableFuture) -> T) = firstOf(futures, defaultLog, handler) + +private val defaultLog = LoggerFactory.getLogger("net.corda.core.concurrent") +@VisibleForTesting +internal val shortCircuitedTaskFailedMessage = "Short-circuited task failed:" + +internal fun firstOf(futures: Array>, log: Logger, handler: (ListenableFuture) -> T): ListenableFuture { + val resultFuture = SettableFuture.create() + val winnerChosen = AtomicBoolean() + futures.forEach { + it.then { + if (winnerChosen.compareAndSet(false, true)) { + resultFuture.catch { handler(it) } + } else if (!it.isCancelled) { + it.failure { log.error(shortCircuitedTaskFailedMessage, it) } + } + } + } + return resultFuture +} diff --git a/core/src/test/kotlin/net/corda/core/concurrent/ConcurrencyUtilsTest.kt b/core/src/test/kotlin/net/corda/core/concurrent/ConcurrencyUtilsTest.kt new file mode 100644 index 0000000000..722d67184e --- /dev/null +++ b/core/src/test/kotlin/net/corda/core/concurrent/ConcurrencyUtilsTest.kt @@ -0,0 +1,78 @@ +package net.corda.core.concurrent + +import com.google.common.util.concurrent.SettableFuture +import com.nhaarman.mockito_kotlin.* +import net.corda.core.getOrThrow +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.Test +import org.slf4j.Logger +import java.io.EOFException +import java.util.concurrent.CancellationException +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class ConcurrencyUtilsTest { + private val f1 = SettableFuture.create() + private val f2 = SettableFuture.create() + private var invocations = 0 + private val log: Logger = mock() + @Test + fun `firstOf short circuit`() { + // Order not significant in this case: + val g = firstOf(arrayOf(f2, f1), log) { + ++invocations + it.getOrThrow() + } + f1.set(100) + assertEquals(100, g.getOrThrow()) + assertEquals(1, invocations) + verifyNoMoreInteractions(log) + val throwable = EOFException("log me") + f2.setException(throwable) + assertEquals(1, invocations) // Least astonishing to skip handler side-effects. + verify(log).error(eq(shortCircuitedTaskFailedMessage), same(throwable)) + } + + @Test + fun `firstOf re-entrant handler attempt due to cancel`() { + val futures = arrayOf(f1, f2) + val g = firstOf(futures, log) { + ++invocations + futures.forEach { it.cancel(false) } // One handler invocation queued here. + it.getOrThrow() + } + f1.set(100) + assertEquals(100, g.getOrThrow()) + assertEquals(1, invocations) // Handler didn't run as g was already done. + verifyNoMoreInteractions(log) // CancellationException is not logged (if due to cancel). + assertTrue(f2.isCancelled) + } + + @Test + fun `firstOf re-entrant handler attempt not due to cancel`() { + val futures = arrayOf(f1, f2) + val fakeCancel = CancellationException() + val g = firstOf(futures, log) { + ++invocations + futures.forEach { it.setException(fakeCancel) } // One handler attempt here. + it.getOrThrow() + } + f1.set(100) + assertEquals(100, g.getOrThrow()) + assertEquals(1, invocations) // Handler didn't run as g was already done. + verify(log).error(eq(shortCircuitedTaskFailedMessage), same(fakeCancel)) + assertThatThrownBy { f2.getOrThrow() }.isSameAs(fakeCancel) + } + + @Test + fun `firstOf cancel is not special`() { + val g = firstOf(arrayOf(f2, f1), log) { + ++invocations + it.getOrThrow() // This can always do something fancy if 'it' was cancelled. + } + f1.cancel(false) + assertThatThrownBy { g.getOrThrow() }.isInstanceOf(CancellationException::class.java) + assertEquals(1, invocations) + verifyNoMoreInteractions(log) + } +} diff --git a/node/src/integration-test/kotlin/net/corda/node/BootTests.kt b/node/src/integration-test/kotlin/net/corda/node/BootTests.kt index f356852b09..d2f8ebff8e 100644 --- a/node/src/integration-test/kotlin/net/corda/node/BootTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/BootTests.kt @@ -6,11 +6,15 @@ import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC import net.corda.core.getOrThrow import net.corda.core.messaging.startFlow +import net.corda.core.node.services.ServiceInfo +import net.corda.core.node.services.ServiceType import net.corda.core.utilities.ALICE import net.corda.testing.driver.driver import net.corda.node.internal.NodeStartup import net.corda.node.services.startFlowPermission import net.corda.nodeapi.User +import net.corda.testing.driver.ListenProcessDeathException +import net.corda.testing.driver.NetworkMapStartStrategy import net.corda.testing.ProjectStructure.projectRootDir import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy @@ -18,6 +22,7 @@ import org.junit.Test import java.io.* import java.nio.file.Files import kotlin.test.assertEquals +import kotlin.test.assertFailsWith class BootTests { @@ -48,6 +53,15 @@ class BootTests { assertEquals(1, numberOfNodesThatLogged) } } + + @Test + fun `node quits on failure to register with network map`() { + val tooManyAdvertisedServices = (1..100).map { ServiceInfo(ServiceType.regulator.getSubType("$it")) }.toSet() + driver(networkMapStartStrategy = NetworkMapStartStrategy.Nominated(ALICE.name)) { + val future = startNode(ALICE.name, advertisedServices = tooManyAdvertisedServices) + assertFailsWith(ListenProcessDeathException::class) { future.getOrThrow() } + } + } } @StartableByRPC diff --git a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt index 095318a92f..937e61d18e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt +++ b/node/src/main/kotlin/net/corda/node/internal/NodeStartup.kt @@ -118,9 +118,6 @@ open class NodeStartup(val args: Array) { logger.error("Shell failed to start", e) } } - } failure { - logger.error("Error during network map registration", it) - exitProcess(1) } node.run() } diff --git a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt index 9bc17e894c..bb46e06be6 100644 --- a/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/net/corda/node/services/messaging/NodeMessagingClient.kt @@ -2,7 +2,7 @@ package net.corda.node.services.messaging import com.google.common.net.HostAndPort import com.google.common.util.concurrent.ListenableFuture -import net.corda.core.ThreadBox +import net.corda.core.* import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.MessageRecipients import net.corda.core.messaging.RPCOps @@ -10,9 +10,7 @@ import net.corda.core.messaging.SingleMessageRecipient import net.corda.core.node.VersionInfo import net.corda.core.node.services.PartyInfo import net.corda.core.node.services.TransactionVerifierService -import net.corda.core.random63BitValue import net.corda.core.serialization.opaque -import net.corda.core.success import net.corda.core.transactions.LedgerTransaction import net.corda.core.utilities.loggerFor import net.corda.core.utilities.trace @@ -236,7 +234,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } - private var shutdownLatch = CountDownLatch(1) + private val shutdownLatch = CountDownLatch(1) private fun processMessage(consumer: ClientConsumer): Boolean { // Two possibilities here: @@ -286,6 +284,9 @@ class NodeMessagingClient(override val config: NodeConfiguration, while (!networkMapRegistrationFuture.isDone && processMessage(consumer)) { } + with(networkMapRegistrationFuture) { + if (isDone) getOrThrow() else andForget(log) // Trigger node shutdown here to avoid deadlock in shutdown hooks. + } } private fun runPostNetworkMap() { @@ -306,11 +307,14 @@ class NodeMessagingClient(override val config: NodeConfiguration, * consume all messages via a new consumer without a filter applied. */ fun run(serverControl: ActiveMQServerControl) { - // Build the network map. - runPreNetworkMap(serverControl) - // Process everything else once we have the network map. - runPostNetworkMap() - shutdownLatch.countDown() + try { + // Build the network map. + runPreNetworkMap(serverControl) + // Process everything else once we have the network map. + runPostNetworkMap() + } finally { + shutdownLatch.countDown() + } } private fun artemisToCordaMessage(message: ClientMessage): ReceivedMessage? { diff --git a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt index 4667a52bef..e90b25a655 100644 --- a/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt +++ b/test-utils/src/main/kotlin/net/corda/testing/driver/Driver.kt @@ -9,7 +9,9 @@ import com.typesafe.config.ConfigRenderOptions import net.corda.client.rpc.CordaRPCClient import net.corda.cordform.CordformContext import net.corda.cordform.CordformNode +import net.corda.cordform.NodeDefinition import net.corda.core.* +import net.corda.core.concurrent.firstOf import net.corda.core.crypto.X509Utilities import net.corda.core.crypto.appendToCommonName import net.corda.core.crypto.commonName @@ -31,7 +33,6 @@ import net.corda.nodeapi.ArtemisMessagingComponent import net.corda.nodeapi.User import net.corda.nodeapi.config.SSLConfiguration import net.corda.nodeapi.config.parseAs -import net.corda.nodeapi.internal.ShutdownHook import net.corda.nodeapi.internal.addShutdownHook import net.corda.testing.MOCK_VERSION_INFO import okhttp3.OkHttpClient @@ -275,19 +276,16 @@ fun genericD coerce: (D) -> DI, dsl: DI.() -> A ): A { - var shutdownHook: ShutdownHook? = null + val shutdownHook = addShutdownHook(driverDsl::shutdown) try { driverDsl.start() - shutdownHook = addShutdownHook { - driverDsl.shutdown() - } return dsl(coerce(driverDsl)) } catch (exception: Throwable) { log.error("Driver shutting down because of exception", exception) throw exception } finally { driverDsl.shutdown() - shutdownHook?.cancel() + shutdownHook.cancel() } } @@ -295,7 +293,7 @@ fun getTimestampAsDirectoryName(): String { return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now()) } -class ListenProcessDeathException(message: String) : Exception(message) +class ListenProcessDeathException(hostAndPort: HostAndPort, listenProcess: Process) : Exception("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}") /** * @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended. @@ -307,7 +305,7 @@ fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: H fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process? = null): ListenableFuture { return poll(executorService, "address $hostAndPort to bind") { if (listenProcess != null && !listenProcess.isAlive) { - throw ListenProcessDeathException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}") + throw ListenProcessDeathException(hostAndPort, listenProcess) } try { Socket(hostAndPort.host, hostAndPort.port).close() @@ -340,33 +338,26 @@ fun poll( warnCount: Int = 120, check: () -> A? ): ListenableFuture { - val initialResult = check() val resultFuture = SettableFuture.create() - if (initialResult != null) { - resultFuture.set(initialResult) - return resultFuture - } - var counter = 0 - fun schedulePoll() { - executorService.schedule(task@ { - counter++ - if (counter == warnCount) { + val task = object : Runnable { + var counter = -1 + override fun run() { + if (resultFuture.isCancelled) return // Give up, caller can no longer get the result. + if (++counter == warnCount) { log.warn("Been polling $pollName for ${pollInterval.multipliedBy(warnCount.toLong()).seconds} seconds...") } - val result = try { - check() - } catch (t: Throwable) { - resultFuture.setException(t) - return@task - } - if (result == null) { - schedulePoll() - } else { - resultFuture.set(result) - } - }, pollInterval.toMillis(), MILLISECONDS) + ErrorOr.catch(check).match(onValue = { + if (it != null) { + resultFuture.set(it) + } else { + executorService.schedule(this, pollInterval.toMillis(), MILLISECONDS) + } + }, onError = { + resultFuture.setException(it) + }) + } } - schedulePoll() + executorService.submit(task) // The check may be expensive, so always run it in the background even the first time. return resultFuture } @@ -518,21 +509,28 @@ class DriverDSL( _executorService?.shutdownNow() } - private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration): ListenableFuture { + private fun establishRpc(nodeAddress: HostAndPort, sslConfig: SSLConfiguration, processDeathFuture: ListenableFuture): ListenableFuture { val client = CordaRPCClient(nodeAddress, sslConfig) - return poll(executorService, "for RPC connection") { + val connectionFuture = poll(executorService, "RPC connection") { try { - val connection = client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER) - shutdownManager.registerShutdown { connection.close() } - return@poll connection.proxy - } catch(e: Exception) { + client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER) + } catch (e: Exception) { + if (processDeathFuture.isDone) throw e log.error("Exception $e, Retrying RPC connection at $nodeAddress") null } } + return firstOf(connectionFuture, processDeathFuture) { + if (it == processDeathFuture) { + throw processDeathFuture.getOrThrow() + } + val connection = connectionFuture.getOrThrow() + shutdownManager.registerShutdown(connection::close) + connection.proxy + } } - private fun networkMapServiceConfigLookup(networkMapCandidates: List): (X500Name) -> Map? { + private fun networkMapServiceConfigLookup(networkMapCandidates: List): (X500Name) -> Map? { return networkMapStartStrategy.run { when (this) { is NetworkMapStartStrategy.Dedicated -> { @@ -564,6 +562,10 @@ class DriverDSL( val webAddress = portAllocation.nextHostAndPort() // TODO: Derive name from the full picked name, don't just wrap the common name val name = providedName ?: X509Utilities.getX509Name("${oneOf(names).commonName}-${p2pAddress.port}","London","demo@r3.com",null) + val networkMapServiceConfigLookup = networkMapServiceConfigLookup(listOf(object : NodeDefinition { + override fun getName() = name.toString() + override fun getConfig() = configOf("p2pAddress" to p2pAddress.toString()) + })) val config = ConfigHelper.loadConfig( baseDirectory = baseDirectory(name), allowMissingConfig = true, @@ -573,7 +575,7 @@ class DriverDSL( "rpcAddress" to rpcAddress.toString(), "webAddress" to webAddress.toString(), "extraAdvertisedServiceIds" to advertisedServices.map { it.toString() }, - "networkMapService" to networkMapServiceConfigLookup(emptyList())(name), + "networkMapService" to networkMapServiceConfigLookup(name), "useTestClock" to useTestClock, "rpcUsers" to rpcUsers.map { it.toMap() }, "verifierType" to verifierType.name @@ -708,7 +710,7 @@ class DriverDSL( } } ) return nodeAndThreadFuture.flatMap { (node, thread) -> - establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc -> + establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, SettableFuture.create()).flatMap { rpc -> rpc.waitUntilRegisteredWithNetworkMap().map { NodeHandle.InProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, node, thread) } @@ -719,9 +721,20 @@ class DriverDSL( val processFuture = startOutOfProcessNode(executorService, nodeConfiguration, config, quasarJarPath, debugPort, systemProperties, callerPackage) registerProcess(processFuture) return processFuture.flatMap { process -> + val processDeathFuture = poll(executorService, "process death") { + if (process.isAlive) null else ListenProcessDeathException(nodeConfiguration.p2pAddress, process) + } // We continue to use SSL enabled port for RPC when its for node user. - establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration).flatMap { rpc -> - rpc.waitUntilRegisteredWithNetworkMap().map { + establishRpc(nodeConfiguration.p2pAddress, nodeConfiguration, processDeathFuture).flatMap { rpc -> + // Call waitUntilRegisteredWithNetworkMap in background in case RPC is failing over: + val networkMapFuture = executorService.submit(Callable { + rpc.waitUntilRegisteredWithNetworkMap() + }).flatMap { it } + firstOf(processDeathFuture, networkMapFuture) { + if (it == processDeathFuture) { + throw processDeathFuture.getOrThrow() + } + processDeathFuture.cancel(false) NodeHandle.OutOfProcess(rpc.nodeIdentity(), rpc, nodeConfiguration, webAddress, debugPort, process) } }