This commit is contained in:
Andras Slemmer
2017-05-08 15:23:41 +01:00
parent b198c68304
commit 5db4e4723d
6 changed files with 86 additions and 51 deletions

View File

@ -1,15 +1,21 @@
package net.corda.node package net.corda.node
import net.corda.core.div
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.CordaPluginRegistry
import net.corda.core.utilities.ALICE
import net.corda.node.driver.driver import net.corda.node.driver.driver
import net.corda.node.services.startFlowPermission import net.corda.node.services.startFlowPermission
import net.corda.nodeapi.User import net.corda.nodeapi.User
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test import org.junit.Test
import java.io.* import java.io.*
import java.nio.file.Files
import java.nio.file.Paths
import kotlin.test.assertEquals
class BootTests { class BootTests {
@ -23,6 +29,23 @@ class BootTests {
} }
} }
@Test
fun `double node start doesn't write into log file`() {
val logConfigFile = Paths.get("..", "config", "dev", "log4j2.xml").toAbsolutePath()
assertThat(logConfigFile).isRegularFile()
driver(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())) {
val alice = startNode(ALICE.name).get()
val logFolder = alice.configuration.baseDirectory / "logs"
val logFile = logFolder.toFile().listFiles { _, name -> name.endsWith(".log") }.single()
// Start second Alice, should fail
assertThatThrownBy {
startNode(ALICE.name).getOrThrow()
}
// We count the number of nodes that wrote into the logfile by counting "Logs can be found in"
val numberOfNodesThatLogged = Files.lines(logFile.toPath()).filter { it.contains("Logs can be found in") }.count()
assertEquals(1, numberOfNodesThatLogged)
}
}
} }
class ObjectInputStreamFlow : FlowLogic<Unit>() { class ObjectInputStreamFlow : FlowLogic<Unit>() {

View File

@ -75,10 +75,8 @@ class DriverTests {
driver(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())) { driver(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())) {
val baseDirectory = startNode(DUMMY_BANK_A.name).getOrThrow().configuration.baseDirectory val baseDirectory = startNode(DUMMY_BANK_A.name).getOrThrow().configuration.baseDirectory
val logFile = (baseDirectory / LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() } val logFile = (baseDirectory / LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() }
println("ASD $logFile")
val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } } val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } }
assertThat(debugLinesPresent).isTrue() assertThat(debugLinesPresent).isTrue()
println("hmm.")
} }
} }

View File

@ -3,13 +3,13 @@
package net.corda.node package net.corda.node
import com.jcabi.manifests.Manifests import com.jcabi.manifests.Manifests
import com.sun.org.apache.xml.internal.serializer.utils.Utils.messages
import com.typesafe.config.ConfigException import com.typesafe.config.ConfigException
import joptsimple.OptionException import joptsimple.OptionException
import net.corda.core.* import net.corda.core.*
import net.corda.core.node.VersionInfo import net.corda.core.node.VersionInfo
import net.corda.core.utilities.Emoji import net.corda.core.utilities.Emoji
import net.corda.core.utilities.LogHelper.withLevel import net.corda.core.utilities.LogHelper.withLevel
import net.corda.node.internal.EnforceSingleNodeIsRunning
import net.corda.node.internal.Node import net.corda.node.internal.Node
import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.shell.InteractiveShell import net.corda.node.shell.InteractiveShell
@ -25,7 +25,6 @@ import java.net.InetAddress
import java.nio.file.Paths import java.nio.file.Paths
import java.util.* import java.util.*
import kotlin.system.exitProcess import kotlin.system.exitProcess
import kotlin.system.measureTimeMillis
private var renderBasicInfoToConsole = true private var renderBasicInfoToConsole = true
@ -65,6 +64,11 @@ fun main(args: Array<String>) {
exitProcess(1) exitProcess(1)
} }
val enforceSingleNodeIsRunning = EnforceSingleNodeIsRunning(cmdlineOptions.baseDirectory)
// We do the single node check before we initialise logging so that in case of a double-node start it doesn't mess
// with the running node's logs.
enforceSingleNodeIsRunning.start()
initLogging(cmdlineOptions) initLogging(cmdlineOptions)
disableJavaDeserialization() // Should be after initLogging to avoid TMI. disableJavaDeserialization() // Should be after initLogging to avoid TMI.

View File

@ -331,20 +331,21 @@ class ShutdownManager(private val executorService: ExecutorService) {
fun shutdown() { fun shutdown() {
val shutdownFutures = state.locked { val shutdownFutures = state.locked {
require(!isShutdown) if (isShutdown) {
emptyList<ListenableFuture<() -> Unit>>()
} else {
isShutdown = true isShutdown = true
registeredShutdowns registeredShutdowns
} }
val shutdownsFuture = Futures.allAsList(shutdownFutures)
val shutdowns = try {
shutdownsFuture.get(1, SECONDS)
} catch (exception: TimeoutException) {
/** Could not get all of them, collect what we have */
shutdownFutures.filter { it.isDone }.map { it.get() }
} }
shutdowns.reversed().forEach { shutdown -> val shutdowns = shutdownFutures.map { ErrorOr.catch { it.get(1, SECONDS) } }
shutdowns.reversed().forEach { errorOrShutdown ->
try { try {
shutdown() if (errorOrShutdown.error == null) {
errorOrShutdown.value?.invoke()
} else {
log.error("Exception while getting shutdown method, disregarding", errorOrShutdown.error)
}
} catch (throwable: Throwable) { } catch (throwable: Throwable) {
log.error("Exception while shutting down", throwable) log.error("Exception while shutting down", throwable)
} }

View File

@ -0,0 +1,44 @@
package net.corda.node.internal
import net.corda.core.div
import net.corda.core.utilities.loggerFor
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.nio.file.Path
/**
* This class enforces that only a single node is running using the given [baseDirectory] by using a file lock.
*/
class EnforceSingleNodeIsRunning(val baseDirectory: Path) {
private companion object {
val log = loggerFor<EnforceSingleNodeIsRunning>()
}
fun start() {
// Write out our process ID (which may or may not resemble a UNIX process id - to us it's just a string) to a
// file that we'll do our best to delete on exit. But if we don't, it'll be overwritten next time. If it already
// exists, we try to take the file lock first before replacing it and if that fails it means we're being started
// twice with the same directory: that's a user error and we should bail out.
val pidPath = baseDirectory / "process-id"
val pidFile = pidPath.toFile()
if (!pidFile.exists()) {
pidFile.createNewFile()
}
pidFile.deleteOnExit()
val pidFileRw = RandomAccessFile(pidFile, "rw")
val pidFileLock = pidFileRw.channel.tryLock()
if (pidFileLock == null) {
log.error("It appears there is already a node running with the specified data directory $baseDirectory")
log.error("Shut that other node down and try again. It may have process ID ${pidFile.readText()}")
System.exit(1)
}
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
// when our process shuts down, but we try in stop() anyway just to be nice.
Runtime.getRuntime().addShutdownHook(Thread {
pidFileLock.release()
})
val ourProcessID: String = ManagementFactory.getRuntimeMXBean().name.split("@")[0]
pidFileRw.setLength(0)
pidFileRw.write(ourProcessID.toByteArray())
}
}

View File

@ -5,7 +5,6 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.div
import net.corda.core.flatMap import net.corda.core.flatMap
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
@ -32,9 +31,6 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import org.slf4j.Logger import org.slf4j.Logger
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.nio.channels.FileLock
import java.time.Clock import java.time.Clock
import javax.management.ObjectName import javax.management.ObjectName
import kotlin.concurrent.thread import kotlin.concurrent.thread
@ -102,10 +98,6 @@ class Node(override val configuration: FullNodeConfiguration,
var messageBroker: ArtemisMessagingServer? = null var messageBroker: ArtemisMessagingServer? = null
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
// when our process shuts down, but we try in stop() anyway just to be nice.
private var nodeFileLock: FileLock? = null
private var shutdownThread: Thread? = null private var shutdownThread: Thread? = null
private lateinit var userService: RPCUserService private lateinit var userService: RPCUserService
@ -221,7 +213,6 @@ class Node(override val configuration: FullNodeConfiguration,
val startupComplete: ListenableFuture<Unit> = SettableFuture.create() val startupComplete: ListenableFuture<Unit> = SettableFuture.create()
override fun start(): Node { override fun start(): Node {
alreadyRunningNodeCheck()
super.start() super.start()
networkMapRegistrationFuture.success(serverThread) { networkMapRegistrationFuture.success(serverThread) {
@ -286,34 +277,8 @@ class Node(override val configuration: FullNodeConfiguration,
// In particular this prevents premature shutdown of the Database by AbstractNode whilst the serverThread is active // In particular this prevents premature shutdown of the Database by AbstractNode whilst the serverThread is active
super.stop() super.stop()
nodeFileLock!!.release()
log.info("Shutdown complete") log.info("Shutdown complete")
} }
private fun alreadyRunningNodeCheck() {
// Write out our process ID (which may or may not resemble a UNIX process id - to us it's just a string) to a
// file that we'll do our best to delete on exit. But if we don't, it'll be overwritten next time. If it already
// exists, we try to take the file lock first before replacing it and if that fails it means we're being started
// twice with the same directory: that's a user error and we should bail out.
val pidPath = configuration.baseDirectory / "process-id"
val file = pidPath.toFile()
if (!file.exists()) {
file.createNewFile()
}
file.deleteOnExit()
val f = RandomAccessFile(file, "rw")
val l = f.channel.tryLock()
if (l == null) {
log.error("It appears there is already a node running with the specified data directory ${configuration.baseDirectory}")
log.error("Shut that other node down and try again. It may have process ID ${file.readText()}")
System.exit(1)
}
nodeFileLock = l
val ourProcessID: String = ManagementFactory.getRuntimeMXBean().name.split("@")[0]
f.setLength(0)
f.write(ourProcessID.toByteArray())
}
} }
class ConfigurationException(message: String) : Exception(message) class ConfigurationException(message: String) : Exception(message)