ENT-1127 mutual exclusion so a node cannot start if another is running (#111)

* Dont lock row
* Add config for RunOnceService
* Don't invalidate Hibernate L2 cache on native queries
* Change column names to avoid name clash
This commit is contained in:
cburlinchon 2017-12-05 14:46:35 +00:00 committed by GitHub
parent 0c667fe9d3
commit c87f37af3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 440 additions and 10 deletions

View File

@ -105,6 +105,7 @@ dependencies {
// Unit testing helpers. // Unit testing helpers.
testCompile "junit:junit:$junit_version" testCompile "junit:junit:$junit_version"
testCompile "org.assertj:assertj-core:${assertj_version}" testCompile "org.assertj:assertj-core:${assertj_version}"
testCompile 'com.github.stefanbirkner:system-rules:1.16.0'
testCompile project(':test-utils') testCompile project(':test-utils')
testCompile project(':client:jfx') testCompile project(':client:jfx')
testCompile project(':finance') testCompile project(':finance')

View File

@ -70,6 +70,7 @@ import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry
import org.slf4j.Logger import org.slf4j.Logger
import rx.Observable import rx.Observable
import java.io.IOException import java.io.IOException
import java.lang.management.ManagementFactory
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.security.KeyPair import java.security.KeyPair
import java.security.KeyStoreException import java.security.KeyStoreException
@ -194,6 +195,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
val transactionStorage = makeTransactionStorage(database) val transactionStorage = makeTransactionStorage(database)
val stateLoader = StateLoaderImpl(transactionStorage) val stateLoader = StateLoaderImpl(transactionStorage)
val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader, database, info, identityService) val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader, database, info, identityService)
val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration
if (mutualExclusionConfiguration.on) {
RunOnceService(database, mutualExclusionConfiguration.machineName,
ManagementFactory.getRuntimeMXBean().name.split("@")[0],
mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start()
}
val notaryService = makeNotaryService(nodeServices, database) val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database) val smm = makeStateMachineManager(database)
val flowStarter = FlowStarterImpl(serverThread, smm) val flowStarter = FlowStarterImpl(serverThread, smm)
@ -582,7 +589,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
} }
open protected fun checkNetworkMapIsInitialized() { open protected fun checkNetworkMapIsInitialized() {
if (!services.networkMapCache.loadDBSuccess ) { if (!services.networkMapCache.loadDBSuccess) {
// TODO: There should be a consistent approach to configuration error exceptions. // TODO: There should be a consistent approach to configuration error exceptions.
throw NetworkMapCacheEmptyException() throw NetworkMapCacheEmptyException()
} }

View File

@ -0,0 +1,5 @@
package net.corda.node.services.config
data class EnterpriseConfiguration(val mutualExclusionConfiguration: MutualExclusionConfiguration)
data class MutualExclusionConfiguration(val on: Boolean = false, val machineName: String, val updateInterval: Long, val waitInterval: Long)

View File

@ -35,6 +35,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val p2pAddress: NetworkHostAndPort val p2pAddress: NetworkHostAndPort
val rpcAddress: NetworkHostAndPort? val rpcAddress: NetworkHostAndPort?
val messagingServerAddress: NetworkHostAndPort? val messagingServerAddress: NetworkHostAndPort?
val enterpriseConfiguration: EnterpriseConfiguration
// TODO Move into DevModeOptions // TODO Move into DevModeOptions
val useTestClock: Boolean get() = false val useTestClock: Boolean get() = false
val detectPublicIp: Boolean get() = true val detectPublicIp: Boolean get() = true
@ -105,6 +106,7 @@ data class NodeConfigurationImpl(
// TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker. // TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker.
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one // Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
override val messagingServerAddress: NetworkHostAndPort?, override val messagingServerAddress: NetworkHostAndPort?,
override val enterpriseConfiguration: EnterpriseConfiguration,
override val notary: NotaryConfig?, override val notary: NotaryConfig?,
override val certificateChainCheckPolicies: List<CertChainPolicyConfig>, override val certificateChainCheckPolicies: List<CertChainPolicyConfig>,
override val devMode: Boolean = false, override val devMode: Boolean = false,

View File

@ -0,0 +1,165 @@
package net.corda.node.services.persistence
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.loggerFor
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import org.hibernate.Session
import java.time.Duration
import java.util.*
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import javax.persistence.*
const val TABLE = "${NODE_DATABASE_PREFIX}mutual_exclusion"
const val ID = "mutual_exclusion_id"
const val MACHINE_NAME = "machine_name"
const val PID = "pid"
const val TIMESTAMP = "mutual_exclusion_timestamp"
/**
* Makes sure only one node is able to write to database.
* Running node updates row whilst running. When a node starts up, it checks no one has updated row within a specified time frame.
*
* @property pid process id
* @property updateInterval rate(milliseconds) at which the running node updates row.
* @property waitInterval amount of time(milliseconds) to wait since last row update before being able to become the master node.
* @property updateExecutor runs a row update every [updateInterval] milliseconds
*/
class RunOnceService(private val database: CordaPersistence, private val machineName: String, private val pid: String,
private val updateInterval: Long, private val waitInterval: Long,
private val updateExecutor: ScheduledExecutorService =
AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1)) : SingletonSerializeAsToken() {
private val log = loggerFor<RunOnceService>()
private val running = AtomicBoolean(false)
init {
if (waitInterval <= updateInterval) {
throw RuntimeException("Configuration Error: Node must wait longer than update rate otherwise someone else might be running!" +
" Wait interval: $waitInterval, Update interval: $updateInterval")
}
}
@Entity
@Table(name = TABLE)
class MutualExclusion(machineNameInit: String, pidInit: String) {
@Column(name = ID, insertable = false, updatable = false)
@Id
val id: Char = 'X'
@Column(name = MACHINE_NAME)
val machineName = machineNameInit
@Column(name = PID)
val pid = pidInit
@Column(name = TIMESTAMP)
@Temporal(TemporalType.TIMESTAMP)
val timestamp: Date? = null
}
fun start() {
database.transaction {
val mutualExclusion = getMutualExclusion(session)
when {
mutualExclusion == null -> {
log.info("No other node running before")
insertMutualExclusion(session)
}
mutualExclusion.machineName == machineName -> {
log.info("Node last run on same machine:$machineName")
updateTimestamp(session, mutualExclusion)
}
else -> {
log.info("Node last run on different machine:${mutualExclusion.machineName} PID: ${mutualExclusion.pid}. " +
"Now running on $machineName PID: $pid")
updateTimestamp(session, mutualExclusion)
}
}
}
updateExecutor.scheduleAtFixedRate({
if (running.compareAndSet(false, true)) {
try {
database.transaction {
val mutualExclusion = getMutualExclusion(session)
if (mutualExclusion == null) {
log.error("$machineName PID: $pid failed mutual exclusion update. " +
"Expected to have a row in $TABLE table. " +
"Check if another node is running")
System.exit(1)
} else if (mutualExclusion.machineName != machineName || mutualExclusion.pid != pid) {
log.error("Expected $machineName PID: $pid but was ${mutualExclusion.machineName} PID: ${mutualExclusion.pid}. " +
"Check if another node is running")
System.exit(1)
}
updateTimestamp(session, mutualExclusion!!)
}
} finally {
running.set(false)
}
}
}, updateInterval, updateInterval, TimeUnit.MILLISECONDS)
}
private fun insertMutualExclusion(session: Session) {
val query = session.createNativeQuery("INSERT INTO $TABLE VALUES ('X', :machineName, :pid, CURRENT_TIMESTAMP)", MutualExclusion::class.java)
query.unwrap(org.hibernate.SQLQuery::class.java).addSynchronizedEntityClass(MutualExclusion::class.java)
query.setParameter("pid", pid)
query.setParameter("machineName", machineName)
val returnValue = query.executeUpdate()
if (returnValue != 1) {
log.error("$machineName PID: $pid failed to insert mutual exclusion. Check if another node is running")
System.exit(1)
}
}
private fun getMutualExclusion(session: Session): MutualExclusion? {
val query = session.createNativeQuery("SELECT * FROM $TABLE WHERE $ID='X'", MutualExclusion::class.java)
val result = query.resultList.singleOrNull()
return if (result != null) result as MutualExclusion else null
}
private fun updateTimestamp(session: Session, mutualExclusion: MutualExclusion): Boolean {
val query = session.createNativeQuery("UPDATE $TABLE SET $MACHINE_NAME=:machineName, $TIMESTAMP=CURRENT_TIMESTAMP, $PID=:pid " +
"WHERE $ID='X' AND " +
// we are master node
"($MACHINE_NAME=:machineName) OR " +
// change master node
"($MACHINE_NAME!=:machineName AND " +
// no one else has updated timestamp whilst we attempted this update
"$TIMESTAMP=:mutualExclusionTimestamp AND " +
// old timestamp
"CURRENT_TIMESTAMP>:waitTime)", MutualExclusion::class.java)
query.unwrap(org.hibernate.SQLQuery::class.java).addSynchronizedEntityClass(MutualExclusion::class.java)
query.setParameter("pid", pid)
query.setParameter("machineName", machineName)
query.setParameter("mutualExclusionTimestamp", mutualExclusion.timestamp)
query.setParameter("waitTime", Date(mutualExclusion.timestamp!!.time + waitInterval))
val returnValue = query.executeUpdate()
if (returnValue != 1) {
if (machineName == mutualExclusion.machineName) {
log.error("$machineName PID: $pid failed mutual exclusion update. Check if another node is running")
} else {
log.error("$machineName PID: $pid failed to become the master node. " +
"Check if ${mutualExclusion.machineName}, PID: ${mutualExclusion.pid} is still running. " +
"Try again in ${Duration.ofMillis(waitInterval)}")
}
System.exit(1)
}
return returnValue == 1
}
}

View File

@ -15,10 +15,7 @@ import net.corda.node.services.events.NodeSchedulerService
import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.identity.PersistentIdentityService
import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.keys.PersistentKeyManagementService
import net.corda.node.services.messaging.P2PMessagingClient import net.corda.node.services.messaging.P2PMessagingClient
import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.*
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
import net.corda.node.services.persistence.NodeAttachmentService
import net.corda.node.services.transactions.BFTNonValidatingNotaryService import net.corda.node.services.transactions.BFTNonValidatingNotaryService
import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider import net.corda.node.services.transactions.RaftUniquenessProvider
@ -52,7 +49,8 @@ class NodeSchemaService(extraSchemas: Set<MappedSchema> = emptySet()) : SchemaSe
BFTNonValidatingNotaryService.PersistedCommittedState::class.java, BFTNonValidatingNotaryService.PersistedCommittedState::class.java,
PersistentIdentityService.PersistentIdentity::class.java, PersistentIdentityService.PersistentIdentity::class.java,
PersistentIdentityService.PersistentIdentityNames::class.java, PersistentIdentityService.PersistentIdentityNames::class.java,
ContractUpgradeServiceImpl.DBContractUpgrade::class.java ContractUpgradeServiceImpl.DBContractUpgrade::class.java,
RunOnceService.MutualExclusion::class.java
)) ))
// Required schemas are those used by internal Corda services // Required schemas are those used by internal Corda services

View File

@ -24,3 +24,11 @@ activeMQServer = {
maxRetryIntervalMin = 3 maxRetryIntervalMin = 3
} }
} }
enterpriseConfiguration = {
mutualExclusionConfiguration = {
on = false
machineName = ""
updateInterval = 20000
waitInterval = 40000
}
}

View File

@ -49,5 +49,6 @@ class NodeConfigurationImplTest {
certificateChainCheckPolicies = emptyList(), certificateChainCheckPolicies = emptyList(),
devMode = true, devMode = true,
activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)), activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)),
relay = null) relay = null,
enterpriseConfiguration = EnterpriseConfiguration((MutualExclusionConfiguration(false, "", 20000, 40000))))
} }

View File

@ -0,0 +1,244 @@
package net.corda.node.services.persistence
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
import net.corda.node.internal.configureDatabase
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties
import net.corda.testing.rigorousMock
import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.contrib.java.lang.system.ExpectedSystemExit
import java.util.*
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture
import javax.persistence.Query
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class RunOnceServiceTest {
@Rule
@JvmField
val exit: ExpectedSystemExit = ExpectedSystemExit.none()
private lateinit var database: CordaPersistence
private val selectQuery = "SELECT * FROM $TABLE WHERE $ID='X'"
private val deleteQuery = "DELETE FROM $TABLE"
private val updateMachineNameQuery = "UPDATE $TABLE SET $MACHINE_NAME='someOtherMachine'"
private val updateMachinePidQuery = "UPDATE $TABLE SET $PID='999'"
private lateinit var runOnceServiceMachine1: RunOnceService
private lateinit var runOnceServiceMachine2: RunOnceService
private val mockUpdateExecutor = mock<ScheduledExecutorService>()
@Before
fun setup() {
database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), rigorousMock())
runOnceServiceMachine1 = RunOnceService(database, "machine1", "123", 1, 2, mockUpdateExecutor)
runOnceServiceMachine2 = RunOnceService(database, "machine2", "789", 1, 2, mockUpdateExecutor)
}
@After
fun cleanUp() {
database.close()
}
@Test
fun `change of master node exits if failed to update row`() {
exit.expectSystemExitWithStatus(1)
runOnceServiceMachine1.start()
val runOnceServiceLongWait = RunOnceService(database, "machineLongWait", "99999", 1, 20000000000000000, mockUpdateExecutor)
// fails as didn't wait long enough, someone else could still be running
runOnceServiceLongWait.start()
}
@Test
fun `row updated when change of master node`() {
runOnceServiceMachine1.start()
var firstTimestamp: Date? = null
database.transaction {
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
val result = machine1RowCheck(query)
firstTimestamp = result.timestamp
}
// runOnceServiceMachine2 changes to master node if we have waited more than 2 millis
Thread.sleep(3)
runOnceServiceMachine2.start()
var secondTimestamp: Date? = null
database.transaction {
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
val result = machine2RowCheck(query)
secondTimestamp = result.timestamp
}
assertTrue(secondTimestamp!!.toInstant().isAfter(firstTimestamp!!.toInstant()))
}
@Test
fun `row created if none exist`() {
database.transaction {
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
assertEquals(0, query.resultList.size)
}
runOnceServiceMachine1.start()
database.transaction {
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
machine1RowCheck(query)
}
}
@Test
fun `row updated when last run was same machine`() {
runOnceServiceMachine1.start()
var firstTimestamp: Date? = null
database.transaction {
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
val result = machine1RowCheck(query)
firstTimestamp = result.timestamp
}
// make sure to wait so secondTimestamp is after first
Thread.sleep(1)
runOnceServiceMachine1.start()
var secondTimestamp: Date? = null
database.transaction {
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
val result = machine1RowCheck(query)
secondTimestamp = result.timestamp
}
assertTrue(secondTimestamp!!.toInstant().isAfter(firstTimestamp!!.toInstant()))
}
@Test
fun `timer updates row`() {
whenever(mockUpdateExecutor.scheduleAtFixedRate(any(), any(), any(), any())).thenAnswer { invocation ->
val runnable = invocation.arguments[0] as Runnable
var firstTimestamp: Date? = null
database.transaction {
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
val result = machine1RowCheck(query)
firstTimestamp = result.timestamp
}
runnable.run()
var secondTimestamp: Date? = null
database.transaction {
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
val result = machine1RowCheck(query)
secondTimestamp = result.timestamp
}
assertTrue(secondTimestamp!!.toInstant().isAfter(firstTimestamp!!.toInstant()))
mock<ScheduledFuture<*>>()
}
runOnceServiceMachine1.start()
verify(mockUpdateExecutor).scheduleAtFixedRate(any(), any(), any(), any())
}
@Test
fun `timer exits if no row`() {
exit.expectSystemExitWithStatus(1)
whenever(mockUpdateExecutor.scheduleAtFixedRate(any(), any(), any(), any())).thenAnswer { invocation ->
val runnable = invocation.arguments[0] as Runnable
// delete row
database.transaction {
val query = session.createNativeQuery(deleteQuery, RunOnceService.MutualExclusion::class.java)
query.executeUpdate()
}
runnable.run()
mock<ScheduledFuture<*>>()
}
runOnceServiceMachine1.start()
}
@Test
fun `timer exits if different machine name`() {
exit.expectSystemExitWithStatus(1)
whenever(mockUpdateExecutor.scheduleAtFixedRate(any(), any(), any(), any())).thenAnswer { invocation ->
val runnable = invocation.arguments[0] as Runnable
database.transaction {
val query = session.createNativeQuery(updateMachineNameQuery, RunOnceService.MutualExclusion::class.java)
query.executeUpdate()
}
runnable.run()
mock<ScheduledFuture<*>>()
}
runOnceServiceMachine1.start()
}
@Test
fun `timer exits if different machine pid`() {
exit.expectSystemExitWithStatus(1)
whenever(mockUpdateExecutor.scheduleAtFixedRate(any(), any(), any(), any())).thenAnswer { invocation ->
val runnable = invocation.arguments[0] as Runnable
database.transaction {
val query = session.createNativeQuery(updateMachinePidQuery, RunOnceService.MutualExclusion::class.java)
query.executeUpdate()
}
runnable.run()
mock<ScheduledFuture<*>>()
}
runOnceServiceMachine1.start()
}
@Test(expected = RuntimeException::class)
fun `wait interval greater than update interval`() {
RunOnceService(database, "machine1", "123", 2, 2, mockUpdateExecutor)
}
private fun machine1RowCheck(query: Query): RunOnceService.MutualExclusion {
assertEquals(1, query.resultList.size)
val result = query.resultList[0] as RunOnceService.MutualExclusion
assertEquals('X', result.id)
assertEquals("machine1", result.machineName)
assertEquals("123", result.pid)
assertTrue(result.timestamp is Date)
return result
}
private fun machine2RowCheck(query: Query): RunOnceService.MutualExclusion {
assertEquals(1, query.resultList.size)
val result = query.resultList[0] as RunOnceService.MutualExclusion
assertEquals('X', result.id)
assertEquals("machine2", result.machineName)
assertEquals("789", result.pid)
assertTrue(result.timestamp is Date)
return result
}
}

View File

@ -17,9 +17,7 @@ import net.corda.core.transactions.TransactionBuilder
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.seconds import net.corda.core.utilities.seconds
import net.corda.node.services.api.StartedNodeServices import net.corda.node.services.api.StartedNodeServices
import net.corda.node.services.config.CertChainPolicyConfig import net.corda.node.services.config.*
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.VerifierType
import net.corda.nodeapi.User import net.corda.nodeapi.User
import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
@ -78,6 +76,7 @@ fun testNodeConfiguration(
doCallRealMethod().whenever(it).trustStoreFile doCallRealMethod().whenever(it).trustStoreFile
doCallRealMethod().whenever(it).sslKeystore doCallRealMethod().whenever(it).sslKeystore
doCallRealMethod().whenever(it).nodeKeystore doCallRealMethod().whenever(it).nodeKeystore
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
} }
} }