[ENT-2039] Move mutual exclusion check to run first (#946)

* Don't sleep inside database transaction
This commit is contained in:
cburlinchon 2018-06-13 13:31:02 +01:00 committed by GitHub
parent a4a75cf22d
commit 70a1a3a3d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 21 deletions

View File

@ -295,6 +295,22 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
schemaService, schemaService,
{ name -> identityServiceRef.get().wellKnownPartyFromX500Name(name) }, { name -> identityServiceRef.get().wellKnownPartyFromX500Name(name) },
{ party -> identityServiceRef.get().wellKnownPartyFromAnonymous(party) }) { party -> identityServiceRef.get().wellKnownPartyFromAnonymous(party) })
val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration
if (mutualExclusionConfiguration.on) {
// Ensure uniqueness in case nodes are hosted on the same machine.
val extendedMachineName = "${configuration.baseDirectory}/${mutualExclusionConfiguration.machineName}"
try {
RunOnceService(database, extendedMachineName,
ManagementFactory.getRuntimeMXBean().name.split("@")[0],
mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start()
} catch (exception: RunOnceService.RunOnceServiceWaitIntervalSleepException) {
log.info("Will sleep for $mutualExclusionConfiguration.waitInterval seconds till lease expires then shutting down this process.")
Thread.sleep(mutualExclusionConfiguration.waitInterval)
System.exit(1)
}
}
val identityService = makeIdentityService(identity.certificate, database).also(identityServiceRef::set) val identityService = makeIdentityService(identity.certificate, database).also(identityServiceRef::set)
networkMapClient = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, identityService.trustRoot) } networkMapClient = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, identityService.trustRoot) }
val networkParameteresReader = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory) val networkParameteresReader = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory)
@ -302,7 +318,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) {
"Node's platform version is lower than network's required minimumPlatformVersion" "Node's platform version is lower than network's required minimumPlatformVersion"
} }
val (startedImpl, schedulerService) = database.transaction { val (startedImpl, schedulerService) = database.transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService, database) val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService, database)
val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair) val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair)
@ -329,14 +344,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
nodeProperties, nodeProperties,
cordappProvider, cordappProvider,
networkParameters) networkParameters)
val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration
if (mutualExclusionConfiguration.on) {
// Ensure uniqueness in case nodes are hosted on the same machine.
val extendedMachineName = "${configuration.baseDirectory}/${mutualExclusionConfiguration.machineName}"
RunOnceService(database, extendedMachineName,
ManagementFactory.getRuntimeMXBean().name.split("@")[0],
mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start()
}
val notaryService = makeNotaryService(nodeServices, database) val notaryService = makeNotaryService(nodeServices, database)
val smm = makeStateMachineManager(database) val smm = makeStateMachineManager(database)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader) val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)

View File

@ -41,13 +41,11 @@ const val VERSION = "version"
* @property updateInterval rate(milliseconds) at which the running node updates row. * @property updateInterval rate(milliseconds) at which the running node updates row.
* @property waitInterval amount of time(milliseconds) to wait since last row update before being able to become the master node. * @property waitInterval amount of time(milliseconds) to wait since last row update before being able to become the master node.
* @property updateExecutor runs a row update every [updateInterval] milliseconds. * @property updateExecutor runs a row update every [updateInterval] milliseconds.
* @property shutdownWait function executed before shutdown.
*/ */
class RunOnceService(private val database: CordaPersistence, private val machineName: String, private val pid: String, class RunOnceService(private val database: CordaPersistence, private val machineName: String, private val pid: String,
private val updateInterval: Long, private val waitInterval: Long, private val updateInterval: Long, private val waitInterval: Long,
private val updateExecutor: ScheduledExecutorService = private val updateExecutor: ScheduledExecutorService =
AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1), AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1)) : SingletonSerializeAsToken() {
private val shutdownWait: (Long) -> Unit = Thread::sleep) : SingletonSerializeAsToken() {
private val log = loggerFor<RunOnceService>() private val log = loggerFor<RunOnceService>()
private val running = AtomicBoolean(false) private val running = AtomicBoolean(false)
@ -178,11 +176,11 @@ class RunOnceService(private val database: CordaPersistence, private val machine
"Check if ${mutualExclusion.machineName}, PID: ${mutualExclusion.pid} is still running. " + "Check if ${mutualExclusion.machineName}, PID: ${mutualExclusion.pid} is still running. " +
"Try again in ${Duration.ofMillis(waitInterval)}") "Try again in ${Duration.ofMillis(waitInterval)}")
} }
log.info("Will sleep for $waitInterval seconds till lease expires then shutting down this process.") throw RunOnceServiceWaitIntervalSleepException()
shutdownWait(waitInterval)
System.exit(1)
} }
return returnValue == 1 return returnValue == 1
} }
class RunOnceServiceWaitIntervalSleepException : Exception()
} }

View File

@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.ScheduledFuture import java.util.concurrent.ScheduledFuture
import javax.persistence.Query import javax.persistence.Query
import kotlin.test.assertEquals import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue import kotlin.test.assertTrue
class RunOnceServiceTest { class RunOnceServiceTest {
@ -60,17 +61,14 @@ class RunOnceServiceTest {
@Test @Test
fun `change of master node exits if failed to update row`() { fun `change of master node exits if failed to update row`() {
exit.expectSystemExitWithStatus(1)
runOnceServiceMachine1.start() runOnceServiceMachine1.start()
val waitInterval = 20000000000000000; val waitInterval = 20000000000000000
val shutdownWaitStub = { waitTime: Long -> assertEquals(waitInterval, waitTime) }
val runOnceServiceLongWait = RunOnceService(database, "machineLongWait", "99999", 1, val runOnceServiceLongWait = RunOnceService(database, "machineLongWait", "99999", 1,
waitInterval, mockUpdateExecutor, shutdownWaitStub) waitInterval, mockUpdateExecutor)
// fails as didn't wait long enough, someone else could still be running // fails as didn't wait long enough, someone else could still be running
runOnceServiceLongWait.start() assertFailsWith<RunOnceService.RunOnceServiceWaitIntervalSleepException> { runOnceServiceLongWait.start() }
} }
@Test @Test