Merge pull request #644 from corda/aslemmer-fix-issue-55

Fix issue #55
This commit is contained in:
Andras Slemmer 2017-05-10 10:53:11 +01:00 committed by GitHub
commit f4354d25dd
6 changed files with 85 additions and 54 deletions

View File

@ -1,15 +1,21 @@
package net.corda.node package net.corda.node
import net.corda.core.div
import net.corda.core.flows.FlowLogic import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow import net.corda.core.getOrThrow
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.CordaPluginRegistry import net.corda.core.node.CordaPluginRegistry
import net.corda.core.utilities.ALICE
import net.corda.node.driver.driver import net.corda.node.driver.driver
import net.corda.node.services.startFlowPermission import net.corda.node.services.startFlowPermission
import net.corda.nodeapi.User import net.corda.nodeapi.User
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test import org.junit.Test
import java.io.* import java.io.*
import java.nio.file.Files
import java.nio.file.Paths
import kotlin.test.assertEquals
class BootTests { class BootTests {
@ -23,6 +29,23 @@ class BootTests {
} }
} }
@Test
fun `double node start doesn't write into log file`() {
val logConfigFile = Paths.get("..", "config", "dev", "log4j2.xml").toAbsolutePath()
assertThat(logConfigFile).isRegularFile()
driver(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())) {
val alice = startNode(ALICE.name).get()
val logFolder = alice.configuration.baseDirectory / "logs"
val logFile = logFolder.toFile().listFiles { _, name -> name.endsWith(".log") }.single()
// Start second Alice, should fail
assertThatThrownBy {
startNode(ALICE.name).getOrThrow()
}
// We count the number of nodes that wrote into the logfile by counting "Logs can be found in"
val numberOfNodesThatLogged = Files.lines(logFile.toPath()).filter { it.contains(LOGS_CAN_BE_FOUND_IN_STRING) }.count()
assertEquals(1, numberOfNodesThatLogged)
}
}
} }
class ObjectInputStreamFlow : FlowLogic<Unit>() { class ObjectInputStreamFlow : FlowLogic<Unit>() {

View File

@ -75,10 +75,8 @@ class DriverTests {
driver(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())) { driver(isDebug = true, systemProperties = mapOf("log4j.configurationFile" to logConfigFile.toString())) {
val baseDirectory = startNode(DUMMY_BANK_A.name).getOrThrow().configuration.baseDirectory val baseDirectory = startNode(DUMMY_BANK_A.name).getOrThrow().configuration.baseDirectory
val logFile = (baseDirectory / LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() } val logFile = (baseDirectory / LOGS_DIRECTORY_NAME).list { it.sorted().findFirst().get() }
println("ASD $logFile")
val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } } val debugLinesPresent = logFile.readLines { lines -> lines.anyMatch { line -> line.startsWith("[DEBUG]") } }
assertThat(debugLinesPresent).isTrue() assertThat(debugLinesPresent).isTrue()
println("hmm.")
} }
} }

View File

@ -10,6 +10,7 @@ import net.corda.core.node.VersionInfo
import net.corda.core.utilities.Emoji import net.corda.core.utilities.Emoji
import net.corda.core.utilities.LogHelper.withLevel import net.corda.core.utilities.LogHelper.withLevel
import net.corda.node.internal.Node import net.corda.node.internal.Node
import net.corda.node.internal.enforceSingleNodeIsRunning
import net.corda.node.services.config.FullNodeConfiguration import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.shell.InteractiveShell import net.corda.node.shell.InteractiveShell
import net.corda.node.utilities.registration.HTTPNetworkRegistrationService import net.corda.node.utilities.registration.HTTPNetworkRegistrationService
@ -35,6 +36,7 @@ fun printBasicNodeInfo(description: String, info: String? = null) {
} }
val LOGS_DIRECTORY_NAME = "logs" val LOGS_DIRECTORY_NAME = "logs"
val LOGS_CAN_BE_FOUND_IN_STRING = "Logs can be found in"
private val log by lazy { LoggerFactory.getLogger("Main") } private val log by lazy { LoggerFactory.getLogger("Main") }
private fun initLogging(cmdlineOptions: CmdLineOptions) { private fun initLogging(cmdlineOptions: CmdLineOptions) {
@ -63,6 +65,10 @@ fun main(args: Array<String>) {
exitProcess(1) exitProcess(1)
} }
// We do the single node check before we initialise logging so that in case of a double-node start it doesn't mess
// with the running node's logs.
enforceSingleNodeIsRunning(cmdlineOptions.baseDirectory)
initLogging(cmdlineOptions) initLogging(cmdlineOptions)
disableJavaDeserialization() // Should be after initLogging to avoid TMI. disableJavaDeserialization() // Should be after initLogging to avoid TMI.
@ -91,7 +97,7 @@ fun main(args: Array<String>) {
drawBanner(versionInfo) drawBanner(versionInfo)
printBasicNodeInfo("Logs can be found in", System.getProperty("log-path")) printBasicNodeInfo(LOGS_CAN_BE_FOUND_IN_STRING, System.getProperty("log-path"))
val conf = try { val conf = try {
cmdlineOptions.loadConfig() cmdlineOptions.loadConfig()

View File

@ -330,24 +330,28 @@ class ShutdownManager(private val executorService: ExecutorService) {
fun shutdown() { fun shutdown() {
val shutdownFutures = state.locked { val shutdownFutures = state.locked {
require(!isShutdown) if (isShutdown) {
isShutdown = true emptyList<ListenableFuture<() -> Unit>>()
registeredShutdowns } else {
} isShutdown = true
val shutdownsFuture = Futures.allAsList(shutdownFutures) registeredShutdowns
val shutdowns = try {
shutdownsFuture.get(1, SECONDS)
} catch (exception: TimeoutException) {
/** Could not get all of them, collect what we have */
shutdownFutures.filter { it.isDone }.map { it.get() }
}
shutdowns.reversed().forEach { shutdown ->
try {
shutdown()
} catch (throwable: Throwable) {
log.error("Exception while shutting down", throwable)
} }
} }
val shutdowns = shutdownFutures.map { ErrorOr.catch { it.get(1, SECONDS) } }
shutdowns.reversed().forEach { errorOrShutdown ->
errorOrShutdown.match(
onValue = { shutdown ->
try {
shutdown()
} catch (throwable: Throwable) {
log.error("Exception while shutting down", throwable)
}
},
onError = { error ->
log.error("Exception while getting shutdown method, disregarding", error)
}
)
}
} }
fun registerShutdown(shutdown: ListenableFuture<() -> Unit>) { fun registerShutdown(shutdown: ListenableFuture<() -> Unit>) {

View File

@ -0,0 +1,35 @@
package net.corda.node.internal
import net.corda.core.div
import net.corda.core.utilities.loggerFor
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.nio.file.Path
/**
* This function enforces that only a single node is running using the given [baseDirectory] by using a file lock.
*/
fun enforceSingleNodeIsRunning(baseDirectory: Path) {
// Write out our process ID (which may or may not resemble a UNIX process id - to us it's just a string) to a
// file that we'll do our best to delete on exit. But if we don't, it'll be overwritten next time. If it already
// exists, we try to take the file lock first before replacing it and if that fails it means we're being started
// twice with the same directory: that's a user error and we should bail out.
val pidFile = (baseDirectory / "process-id").toFile()
pidFile.createNewFile()
pidFile.deleteOnExit()
val pidFileRw = RandomAccessFile(pidFile, "rw")
val pidFileLock = pidFileRw.channel.tryLock()
if (pidFileLock == null) {
println("It appears there is already a node running with the specified data directory $baseDirectory")
println("Shut that other node down and try again. It may have process ID ${pidFile.readText()}")
System.exit(1)
}
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
// when our process shuts down, but we try in stop() anyway just to be nice.
Runtime.getRuntime().addShutdownHook(Thread {
pidFileLock.release()
})
val ourProcessID: String = ManagementFactory.getRuntimeMXBean().name.split("@")[0]
pidFileRw.setLength(0)
pidFileRw.write(ourProcessID.toByteArray())
}

View File

@ -5,7 +5,6 @@ import com.google.common.net.HostAndPort
import com.google.common.util.concurrent.Futures import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.SettableFuture import com.google.common.util.concurrent.SettableFuture
import net.corda.core.div
import net.corda.core.flatMap import net.corda.core.flatMap
import net.corda.core.messaging.RPCOps import net.corda.core.messaging.RPCOps
import net.corda.core.node.ServiceHub import net.corda.core.node.ServiceHub
@ -32,9 +31,6 @@ import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress import net.corda.nodeapi.ArtemisMessagingComponent.NetworkMapAddress
import org.bouncycastle.asn1.x500.X500Name import org.bouncycastle.asn1.x500.X500Name
import org.slf4j.Logger import org.slf4j.Logger
import java.io.RandomAccessFile
import java.lang.management.ManagementFactory
import java.nio.channels.FileLock
import java.time.Clock import java.time.Clock
import javax.management.ObjectName import javax.management.ObjectName
import kotlin.concurrent.thread import kotlin.concurrent.thread
@ -102,10 +98,6 @@ class Node(override val configuration: FullNodeConfiguration,
var messageBroker: ArtemisMessagingServer? = null var messageBroker: ArtemisMessagingServer? = null
// Avoid the lock being garbage collected. We don't really need to release it as the OS will do so for us
// when our process shuts down, but we try in stop() anyway just to be nice.
private var nodeFileLock: FileLock? = null
private var shutdownThread: Thread? = null private var shutdownThread: Thread? = null
private lateinit var userService: RPCUserService private lateinit var userService: RPCUserService
@ -221,7 +213,6 @@ class Node(override val configuration: FullNodeConfiguration,
val startupComplete: ListenableFuture<Unit> = SettableFuture.create() val startupComplete: ListenableFuture<Unit> = SettableFuture.create()
override fun start(): Node { override fun start(): Node {
alreadyRunningNodeCheck()
super.start() super.start()
networkMapRegistrationFuture.success(serverThread) { networkMapRegistrationFuture.success(serverThread) {
@ -286,34 +277,8 @@ class Node(override val configuration: FullNodeConfiguration,
// In particular this prevents premature shutdown of the Database by AbstractNode whilst the serverThread is active // In particular this prevents premature shutdown of the Database by AbstractNode whilst the serverThread is active
super.stop() super.stop()
nodeFileLock!!.release()
log.info("Shutdown complete") log.info("Shutdown complete")
} }
private fun alreadyRunningNodeCheck() {
// Write out our process ID (which may or may not resemble a UNIX process id - to us it's just a string) to a
// file that we'll do our best to delete on exit. But if we don't, it'll be overwritten next time. If it already
// exists, we try to take the file lock first before replacing it and if that fails it means we're being started
// twice with the same directory: that's a user error and we should bail out.
val pidPath = configuration.baseDirectory / "process-id"
val file = pidPath.toFile()
if (!file.exists()) {
file.createNewFile()
}
file.deleteOnExit()
val f = RandomAccessFile(file, "rw")
val l = f.channel.tryLock()
if (l == null) {
log.error("It appears there is already a node running with the specified data directory ${configuration.baseDirectory}")
log.error("Shut that other node down and try again. It may have process ID ${file.readText()}")
System.exit(1)
}
nodeFileLock = l
val ourProcessID: String = ManagementFactory.getRuntimeMXBean().name.split("@")[0]
f.setLength(0)
f.write(ourProcessID.toByteArray())
}
} }
class ConfigurationException(message: String) : Exception(message) class ConfigurationException(message: String) : Exception(message)