diff --git a/node/build.gradle b/node/build.gradle index 94687cfbe9..2b69308580 100644 --- a/node/build.gradle +++ b/node/build.gradle @@ -105,6 +105,7 @@ dependencies { // Unit testing helpers. testCompile "junit:junit:$junit_version" testCompile "org.assertj:assertj-core:${assertj_version}" + testCompile 'com.github.stefanbirkner:system-rules:1.16.0' testCompile project(':test-utils') testCompile project(':client:jfx') testCompile project(':finance') diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index de4e986dab..1045807798 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -70,6 +70,7 @@ import org.hibernate.type.descriptor.java.JavaTypeDescriptorRegistry import org.slf4j.Logger import rx.Observable import java.io.IOException +import java.lang.management.ManagementFactory import java.lang.reflect.InvocationTargetException import java.security.KeyPair import java.security.KeyStoreException @@ -194,6 +195,12 @@ abstract class AbstractNode(val configuration: NodeConfiguration, val transactionStorage = makeTransactionStorage(database) val stateLoader = StateLoaderImpl(transactionStorage) val nodeServices = makeServices(keyPairs, schemaService, transactionStorage, stateLoader, database, info, identityService) + val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration + if (mutualExclusionConfiguration.on) { + RunOnceService(database, mutualExclusionConfiguration.machineName, + ManagementFactory.getRuntimeMXBean().name.split("@")[0], + mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start() + } val notaryService = makeNotaryService(nodeServices, database) val smm = makeStateMachineManager(database) val flowStarter = FlowStarterImpl(serverThread, smm) @@ -582,7 +589,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } open protected fun checkNetworkMapIsInitialized() { - if (!services.networkMapCache.loadDBSuccess ) { + if (!services.networkMapCache.loadDBSuccess) { // TODO: There should be a consistent approach to configuration error exceptions. throw NetworkMapCacheEmptyException() } diff --git a/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt new file mode 100644 index 0000000000..f8aee69d48 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/config/EnterpriseConfiguration.kt @@ -0,0 +1,5 @@ +package net.corda.node.services.config + +data class EnterpriseConfiguration(val mutualExclusionConfiguration: MutualExclusionConfiguration) + +data class MutualExclusionConfiguration(val on: Boolean = false, val machineName: String, val updateInterval: Long, val waitInterval: Long) \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt index e87d547749..a40937ccfa 100644 --- a/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/net/corda/node/services/config/NodeConfiguration.kt @@ -35,6 +35,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val p2pAddress: NetworkHostAndPort val rpcAddress: NetworkHostAndPort? val messagingServerAddress: NetworkHostAndPort? + val enterpriseConfiguration: EnterpriseConfiguration // TODO Move into DevModeOptions val useTestClock: Boolean get() = false val detectPublicIp: Boolean get() = true @@ -105,6 +106,7 @@ data class NodeConfigurationImpl( // TODO This field is slightly redundant as p2pAddress is sufficient to hold the address of the node's MQ broker. // Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one override val messagingServerAddress: NetworkHostAndPort?, + override val enterpriseConfiguration: EnterpriseConfiguration, override val notary: NotaryConfig?, override val certificateChainCheckPolicies: List, override val devMode: Boolean = false, diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt new file mode 100644 index 0000000000..e986160cc1 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt @@ -0,0 +1,165 @@ +package net.corda.node.services.persistence + +import net.corda.core.serialization.SingletonSerializeAsToken +import net.corda.core.utilities.loggerFor +import net.corda.node.utilities.AffinityExecutor +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX +import org.hibernate.Session +import java.time.Duration +import java.util.* +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import javax.persistence.* + +const val TABLE = "${NODE_DATABASE_PREFIX}mutual_exclusion" +const val ID = "mutual_exclusion_id" +const val MACHINE_NAME = "machine_name" +const val PID = "pid" +const val TIMESTAMP = "mutual_exclusion_timestamp" + +/** + * Makes sure only one node is able to write to database. + * Running node updates row whilst running. When a node starts up, it checks no one has updated row within a specified time frame. + * + * @property pid process id + * @property updateInterval rate(milliseconds) at which the running node updates row. + * @property waitInterval amount of time(milliseconds) to wait since last row update before being able to become the master node. + * @property updateExecutor runs a row update every [updateInterval] milliseconds + */ +class RunOnceService(private val database: CordaPersistence, private val machineName: String, private val pid: String, + private val updateInterval: Long, private val waitInterval: Long, + private val updateExecutor: ScheduledExecutorService = + AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1)) : SingletonSerializeAsToken() { + + private val log = loggerFor() + private val running = AtomicBoolean(false) + + init { + if (waitInterval <= updateInterval) { + throw RuntimeException("Configuration Error: Node must wait longer than update rate otherwise someone else might be running!" + + " Wait interval: $waitInterval, Update interval: $updateInterval") + } + } + + @Entity + @Table(name = TABLE) + class MutualExclusion(machineNameInit: String, pidInit: String) { + @Column(name = ID, insertable = false, updatable = false) + @Id + val id: Char = 'X' + + @Column(name = MACHINE_NAME) + val machineName = machineNameInit + + @Column(name = PID) + val pid = pidInit + + @Column(name = TIMESTAMP) + @Temporal(TemporalType.TIMESTAMP) + val timestamp: Date? = null + } + + fun start() { + database.transaction { + val mutualExclusion = getMutualExclusion(session) + + when { + mutualExclusion == null -> { + log.info("No other node running before") + insertMutualExclusion(session) + } + mutualExclusion.machineName == machineName -> { + log.info("Node last run on same machine:$machineName") + updateTimestamp(session, mutualExclusion) + } + else -> { + log.info("Node last run on different machine:${mutualExclusion.machineName} PID: ${mutualExclusion.pid}. " + + "Now running on $machineName PID: $pid") + updateTimestamp(session, mutualExclusion) + } + } + } + + updateExecutor.scheduleAtFixedRate({ + if (running.compareAndSet(false, true)) { + try { + database.transaction { + val mutualExclusion = getMutualExclusion(session) + + if (mutualExclusion == null) { + log.error("$machineName PID: $pid failed mutual exclusion update. " + + "Expected to have a row in $TABLE table. " + + "Check if another node is running") + System.exit(1) + } else if (mutualExclusion.machineName != machineName || mutualExclusion.pid != pid) { + log.error("Expected $machineName PID: $pid but was ${mutualExclusion.machineName} PID: ${mutualExclusion.pid}. " + + "Check if another node is running") + System.exit(1) + } + + updateTimestamp(session, mutualExclusion!!) + } + } finally { + running.set(false) + } + } + }, updateInterval, updateInterval, TimeUnit.MILLISECONDS) + } + + private fun insertMutualExclusion(session: Session) { + val query = session.createNativeQuery("INSERT INTO $TABLE VALUES ('X', :machineName, :pid, CURRENT_TIMESTAMP)", MutualExclusion::class.java) + query.unwrap(org.hibernate.SQLQuery::class.java).addSynchronizedEntityClass(MutualExclusion::class.java) + query.setParameter("pid", pid) + query.setParameter("machineName", machineName) + val returnValue = query.executeUpdate() + + if (returnValue != 1) { + log.error("$machineName PID: $pid failed to insert mutual exclusion. Check if another node is running") + System.exit(1) + } + } + + private fun getMutualExclusion(session: Session): MutualExclusion? { + val query = session.createNativeQuery("SELECT * FROM $TABLE WHERE $ID='X'", MutualExclusion::class.java) + val result = query.resultList.singleOrNull() + return if (result != null) result as MutualExclusion else null + } + + private fun updateTimestamp(session: Session, mutualExclusion: MutualExclusion): Boolean { + val query = session.createNativeQuery("UPDATE $TABLE SET $MACHINE_NAME=:machineName, $TIMESTAMP=CURRENT_TIMESTAMP, $PID=:pid " + + "WHERE $ID='X' AND " + + + // we are master node + "($MACHINE_NAME=:machineName) OR " + + + // change master node + "($MACHINE_NAME!=:machineName AND " + + // no one else has updated timestamp whilst we attempted this update + "$TIMESTAMP=:mutualExclusionTimestamp AND " + + // old timestamp + "CURRENT_TIMESTAMP>:waitTime)", MutualExclusion::class.java) + + query.unwrap(org.hibernate.SQLQuery::class.java).addSynchronizedEntityClass(MutualExclusion::class.java) + + query.setParameter("pid", pid) + query.setParameter("machineName", machineName) + query.setParameter("mutualExclusionTimestamp", mutualExclusion.timestamp) + query.setParameter("waitTime", Date(mutualExclusion.timestamp!!.time + waitInterval)) + val returnValue = query.executeUpdate() + + if (returnValue != 1) { + if (machineName == mutualExclusion.machineName) { + log.error("$machineName PID: $pid failed mutual exclusion update. Check if another node is running") + } else { + log.error("$machineName PID: $pid failed to become the master node. " + + "Check if ${mutualExclusion.machineName}, PID: ${mutualExclusion.pid} is still running. " + + "Try again in ${Duration.ofMillis(waitInterval)}") + } + System.exit(1) + } + + return returnValue == 1 + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index df4b85c3c5..72749b8b48 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -15,10 +15,7 @@ import net.corda.node.services.events.NodeSchedulerService import net.corda.node.services.identity.PersistentIdentityService import net.corda.node.services.keys.PersistentKeyManagementService import net.corda.node.services.messaging.P2PMessagingClient -import net.corda.node.services.persistence.DBCheckpointStorage -import net.corda.node.services.persistence.DBTransactionMappingStorage -import net.corda.node.services.persistence.DBTransactionStorage -import net.corda.node.services.persistence.NodeAttachmentService +import net.corda.node.services.persistence.* import net.corda.node.services.transactions.BFTNonValidatingNotaryService import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.node.services.transactions.RaftUniquenessProvider @@ -52,7 +49,8 @@ class NodeSchemaService(extraSchemas: Set = emptySet()) : SchemaSe BFTNonValidatingNotaryService.PersistedCommittedState::class.java, PersistentIdentityService.PersistentIdentity::class.java, PersistentIdentityService.PersistentIdentityNames::class.java, - ContractUpgradeServiceImpl.DBContractUpgrade::class.java + ContractUpgradeServiceImpl.DBContractUpgrade::class.java, + RunOnceService.MutualExclusion::class.java )) // Required schemas are those used by internal Corda services diff --git a/node/src/main/resources/reference.conf b/node/src/main/resources/reference.conf index 8097e35be9..a14b06d28a 100644 --- a/node/src/main/resources/reference.conf +++ b/node/src/main/resources/reference.conf @@ -24,3 +24,11 @@ activeMQServer = { maxRetryIntervalMin = 3 } } +enterpriseConfiguration = { + mutualExclusionConfiguration = { + on = false + machineName = "" + updateInterval = 20000 + waitInterval = 40000 + } +} diff --git a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt index b4075a77d7..f6fffd581b 100644 --- a/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/config/NodeConfigurationImplTest.kt @@ -49,5 +49,6 @@ class NodeConfigurationImplTest { certificateChainCheckPolicies = emptyList(), devMode = true, activeMQServer = ActiveMqServerConfiguration(BridgeConfiguration(0, 0, 0.0)), - relay = null) + relay = null, + enterpriseConfiguration = EnterpriseConfiguration((MutualExclusionConfiguration(false, "", 20000, 40000)))) } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt new file mode 100644 index 0000000000..d853b9608e --- /dev/null +++ b/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt @@ -0,0 +1,244 @@ +package net.corda.node.services.persistence + +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.verify +import com.nhaarman.mockito_kotlin.whenever +import net.corda.node.internal.configureDatabase +import net.corda.nodeapi.internal.persistence.CordaPersistence +import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties +import net.corda.testing.node.MockServices.Companion.makeTestDatabaseProperties +import net.corda.testing.rigorousMock +import org.junit.After +import org.junit.Before +import org.junit.Rule +import org.junit.Test +import org.junit.contrib.java.lang.system.ExpectedSystemExit +import java.util.* +import java.util.concurrent.ScheduledExecutorService +import java.util.concurrent.ScheduledFuture +import javax.persistence.Query +import kotlin.test.assertEquals +import kotlin.test.assertTrue + +class RunOnceServiceTest { + + @Rule + @JvmField + val exit: ExpectedSystemExit = ExpectedSystemExit.none() + + private lateinit var database: CordaPersistence + private val selectQuery = "SELECT * FROM $TABLE WHERE $ID='X'" + private val deleteQuery = "DELETE FROM $TABLE" + private val updateMachineNameQuery = "UPDATE $TABLE SET $MACHINE_NAME='someOtherMachine'" + private val updateMachinePidQuery = "UPDATE $TABLE SET $PID='999'" + + private lateinit var runOnceServiceMachine1: RunOnceService + private lateinit var runOnceServiceMachine2: RunOnceService + private val mockUpdateExecutor = mock() + + @Before + fun setup() { + database = configureDatabase(makeTestDataSourceProperties(), makeTestDatabaseProperties(), rigorousMock()) + runOnceServiceMachine1 = RunOnceService(database, "machine1", "123", 1, 2, mockUpdateExecutor) + runOnceServiceMachine2 = RunOnceService(database, "machine2", "789", 1, 2, mockUpdateExecutor) + } + + @After + fun cleanUp() { + database.close() + } + + @Test + fun `change of master node exits if failed to update row`() { + exit.expectSystemExitWithStatus(1) + + runOnceServiceMachine1.start() + + val runOnceServiceLongWait = RunOnceService(database, "machineLongWait", "99999", 1, 20000000000000000, mockUpdateExecutor) + // fails as didn't wait long enough, someone else could still be running + runOnceServiceLongWait.start() + } + + @Test + fun `row updated when change of master node`() { + runOnceServiceMachine1.start() + + var firstTimestamp: Date? = null + database.transaction { + val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) + val result = machine1RowCheck(query) + firstTimestamp = result.timestamp + } + + // runOnceServiceMachine2 changes to master node if we have waited more than 2 millis + Thread.sleep(3) + + runOnceServiceMachine2.start() + + var secondTimestamp: Date? = null + database.transaction { + val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) + val result = machine2RowCheck(query) + secondTimestamp = result.timestamp + } + + assertTrue(secondTimestamp!!.toInstant().isAfter(firstTimestamp!!.toInstant())) + } + + @Test + fun `row created if none exist`() { + database.transaction { + val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) + assertEquals(0, query.resultList.size) + } + + runOnceServiceMachine1.start() + + database.transaction { + val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) + machine1RowCheck(query) + } + } + + @Test + fun `row updated when last run was same machine`() { + runOnceServiceMachine1.start() + + var firstTimestamp: Date? = null + database.transaction { + val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) + val result = machine1RowCheck(query) + firstTimestamp = result.timestamp + } + + // make sure to wait so secondTimestamp is after first + Thread.sleep(1) + + runOnceServiceMachine1.start() + + var secondTimestamp: Date? = null + database.transaction { + val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) + val result = machine1RowCheck(query) + secondTimestamp = result.timestamp + } + + assertTrue(secondTimestamp!!.toInstant().isAfter(firstTimestamp!!.toInstant())) + } + + @Test + fun `timer updates row`() { + whenever(mockUpdateExecutor.scheduleAtFixedRate(any(), any(), any(), any())).thenAnswer { invocation -> + val runnable = invocation.arguments[0] as Runnable + + var firstTimestamp: Date? = null + database.transaction { + val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) + val result = machine1RowCheck(query) + firstTimestamp = result.timestamp + } + + runnable.run() + + var secondTimestamp: Date? = null + database.transaction { + val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) + val result = machine1RowCheck(query) + secondTimestamp = result.timestamp + } + + assertTrue(secondTimestamp!!.toInstant().isAfter(firstTimestamp!!.toInstant())) + + mock>() + } + + runOnceServiceMachine1.start() + + verify(mockUpdateExecutor).scheduleAtFixedRate(any(), any(), any(), any()) + } + + + @Test + fun `timer exits if no row`() { + exit.expectSystemExitWithStatus(1) + + whenever(mockUpdateExecutor.scheduleAtFixedRate(any(), any(), any(), any())).thenAnswer { invocation -> + val runnable = invocation.arguments[0] as Runnable + + // delete row + database.transaction { + val query = session.createNativeQuery(deleteQuery, RunOnceService.MutualExclusion::class.java) + query.executeUpdate() + } + + runnable.run() + mock>() + } + + runOnceServiceMachine1.start() + } + + @Test + fun `timer exits if different machine name`() { + exit.expectSystemExitWithStatus(1) + + whenever(mockUpdateExecutor.scheduleAtFixedRate(any(), any(), any(), any())).thenAnswer { invocation -> + val runnable = invocation.arguments[0] as Runnable + + database.transaction { + val query = session.createNativeQuery(updateMachineNameQuery, RunOnceService.MutualExclusion::class.java) + query.executeUpdate() + } + + runnable.run() + mock>() + } + + runOnceServiceMachine1.start() + } + + @Test + fun `timer exits if different machine pid`() { + exit.expectSystemExitWithStatus(1) + + whenever(mockUpdateExecutor.scheduleAtFixedRate(any(), any(), any(), any())).thenAnswer { invocation -> + val runnable = invocation.arguments[0] as Runnable + + database.transaction { + val query = session.createNativeQuery(updateMachinePidQuery, RunOnceService.MutualExclusion::class.java) + query.executeUpdate() + } + + runnable.run() + mock>() + } + + runOnceServiceMachine1.start() + } + + @Test(expected = RuntimeException::class) + fun `wait interval greater than update interval`() { + RunOnceService(database, "machine1", "123", 2, 2, mockUpdateExecutor) + } + + private fun machine1RowCheck(query: Query): RunOnceService.MutualExclusion { + assertEquals(1, query.resultList.size) + val result = query.resultList[0] as RunOnceService.MutualExclusion + assertEquals('X', result.id) + assertEquals("machine1", result.machineName) + assertEquals("123", result.pid) + assertTrue(result.timestamp is Date) + return result + } + + private fun machine2RowCheck(query: Query): RunOnceService.MutualExclusion { + assertEquals(1, query.resultList.size) + val result = query.resultList[0] as RunOnceService.MutualExclusion + assertEquals('X', result.id) + assertEquals("machine2", result.machineName) + assertEquals("789", result.pid) + assertTrue(result.timestamp is Date) + return result + } +} \ No newline at end of file diff --git a/testing/node-driver/src/main/kotlin/net/corda/testing/NodeTestUtils.kt b/testing/node-driver/src/main/kotlin/net/corda/testing/NodeTestUtils.kt index 012116f961..ce353ee79d 100644 --- a/testing/node-driver/src/main/kotlin/net/corda/testing/NodeTestUtils.kt +++ b/testing/node-driver/src/main/kotlin/net/corda/testing/NodeTestUtils.kt @@ -17,9 +17,7 @@ import net.corda.core.transactions.TransactionBuilder import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.seconds import net.corda.node.services.api.StartedNodeServices -import net.corda.node.services.config.CertChainPolicyConfig -import net.corda.node.services.config.NodeConfiguration -import net.corda.node.services.config.VerifierType +import net.corda.node.services.config.* import net.corda.nodeapi.User import net.corda.testing.node.MockServices import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties @@ -78,6 +76,7 @@ fun testNodeConfiguration( doCallRealMethod().whenever(it).trustStoreFile doCallRealMethod().whenever(it).sslKeystore doCallRealMethod().whenever(it).nodeKeystore + doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration } }