From 70a1a3a3d4c0c903a3279948fb2a815efab96fa3 Mon Sep 17 00:00:00 2001 From: cburlinchon Date: Wed, 13 Jun 2018 13:31:02 +0100 Subject: [PATCH] [ENT-2039] Move mutual exclusion check to run first (#946) * Don't sleep inside database transaction --- .../net/corda/node/internal/AbstractNode.kt | 25 ++++++++++++------- .../services/persistence/RunOnceService.kt | 10 +++----- .../persistence/RunOnceServiceTest.kt | 10 +++----- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index ddb13096b6..1e4aa0819a 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -295,6 +295,22 @@ abstract class AbstractNode(val configuration: NodeConfiguration, schemaService, { name -> identityServiceRef.get().wellKnownPartyFromX500Name(name) }, { party -> identityServiceRef.get().wellKnownPartyFromAnonymous(party) }) + + val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration + if (mutualExclusionConfiguration.on) { + // Ensure uniqueness in case nodes are hosted on the same machine. + val extendedMachineName = "${configuration.baseDirectory}/${mutualExclusionConfiguration.machineName}" + try { + RunOnceService(database, extendedMachineName, + ManagementFactory.getRuntimeMXBean().name.split("@")[0], + mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start() + } catch (exception: RunOnceService.RunOnceServiceWaitIntervalSleepException) { + log.info("Will sleep for $mutualExclusionConfiguration.waitInterval seconds till lease expires then shutting down this process.") + Thread.sleep(mutualExclusionConfiguration.waitInterval) + System.exit(1) + } + } + val identityService = makeIdentityService(identity.certificate, database).also(identityServiceRef::set) networkMapClient = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, identityService.trustRoot) } val networkParameteresReader = NetworkParametersReader(identityService.trustRoot, networkMapClient, configuration.baseDirectory) @@ -302,7 +318,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, check(networkParameters.minimumPlatformVersion <= versionInfo.platformVersion) { "Node's platform version is lower than network's required minimumPlatformVersion" } - val (startedImpl, schedulerService) = database.transaction { val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService, database) val (keyPairs, nodeInfo) = updateNodeInfo(networkMapCache, networkMapClient, identity, identityKeyPair) @@ -329,14 +344,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, nodeProperties, cordappProvider, networkParameters) - val mutualExclusionConfiguration = configuration.enterpriseConfiguration.mutualExclusionConfiguration - if (mutualExclusionConfiguration.on) { - // Ensure uniqueness in case nodes are hosted on the same machine. - val extendedMachineName = "${configuration.baseDirectory}/${mutualExclusionConfiguration.machineName}" - RunOnceService(database, extendedMachineName, - ManagementFactory.getRuntimeMXBean().name.split("@")[0], - mutualExclusionConfiguration.updateInterval, mutualExclusionConfiguration.waitInterval).start() - } val notaryService = makeNotaryService(nodeServices, database) val smm = makeStateMachineManager(database) val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt index 384422b8fb..590ded135a 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt @@ -41,13 +41,11 @@ const val VERSION = "version" * @property updateInterval rate(milliseconds) at which the running node updates row. * @property waitInterval amount of time(milliseconds) to wait since last row update before being able to become the master node. * @property updateExecutor runs a row update every [updateInterval] milliseconds. - * @property shutdownWait function executed before shutdown. */ class RunOnceService(private val database: CordaPersistence, private val machineName: String, private val pid: String, private val updateInterval: Long, private val waitInterval: Long, private val updateExecutor: ScheduledExecutorService = - AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1), - private val shutdownWait: (Long) -> Unit = Thread::sleep) : SingletonSerializeAsToken() { + AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1)) : SingletonSerializeAsToken() { private val log = loggerFor() private val running = AtomicBoolean(false) @@ -178,11 +176,11 @@ class RunOnceService(private val database: CordaPersistence, private val machine "Check if ${mutualExclusion.machineName}, PID: ${mutualExclusion.pid} is still running. " + "Try again in ${Duration.ofMillis(waitInterval)}") } - log.info("Will sleep for $waitInterval seconds till lease expires then shutting down this process.") - shutdownWait(waitInterval) - System.exit(1) + throw RunOnceServiceWaitIntervalSleepException() } return returnValue == 1 } + + class RunOnceServiceWaitIntervalSleepException : Exception() } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt index 5a983fc39c..68b0692549 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt @@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService import java.util.concurrent.ScheduledFuture import javax.persistence.Query import kotlin.test.assertEquals +import kotlin.test.assertFailsWith import kotlin.test.assertTrue class RunOnceServiceTest { @@ -60,17 +61,14 @@ class RunOnceServiceTest { @Test fun `change of master node exits if failed to update row`() { - exit.expectSystemExitWithStatus(1) - runOnceServiceMachine1.start() - val waitInterval = 20000000000000000; - val shutdownWaitStub = { waitTime: Long -> assertEquals(waitInterval, waitTime) } + val waitInterval = 20000000000000000 val runOnceServiceLongWait = RunOnceService(database, "machineLongWait", "99999", 1, - waitInterval, mockUpdateExecutor, shutdownWaitStub) + waitInterval, mockUpdateExecutor) // fails as didn't wait long enough, someone else could still be running - runOnceServiceLongWait.start() + assertFailsWith { runOnceServiceLongWait.start() } } @Test