From d53d9106937238d453bbb649c98ad6a6aa71bcf8 Mon Sep 17 00:00:00 2001 From: cburlinchon Date: Tue, 15 May 2018 16:48:37 +0100 Subject: [PATCH] ENT-1824 Mutual Exclusion sleep (#841) * Another host is detected as leader, sleep till lease expires --- .../net/corda/node/services/persistence/RunOnceService.kt | 7 +++++-- .../corda/node/services/persistence/RunOnceServiceTest.kt | 6 +++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt index cb7ea4fe8f..38d2c3839f 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt @@ -41,11 +41,13 @@ const val VERSION = "version" * @property updateInterval rate(milliseconds) at which the running node updates row. * @property waitInterval amount of time(milliseconds) to wait since last row update before being able to become the master node. * @property updateExecutor runs a row update every [updateInterval] milliseconds. + * @property shutdownWait function executed before shutdown. */ class RunOnceService(private val database: CordaPersistence, private val machineName: String, private val pid: String, private val updateInterval: Long, private val waitInterval: Long, private val updateExecutor: ScheduledExecutorService = - AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1)) : SingletonSerializeAsToken() { + AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1), + private val shutdownWait: (Long) -> Unit = Thread::sleep) : SingletonSerializeAsToken() { private val log = loggerFor() private val running = AtomicBoolean(false) @@ -145,7 +147,6 @@ class RunOnceService(private val database: CordaPersistence, private val machine } private fun updateTimestamp(session: Session, mutualExclusion: MutualExclusion): Boolean { - val hql = "UPDATE RunOnceService\$MutualExclusion SET $MACHINE_NAME = :machineName, $TIMESTAMP = CURRENT_TIMESTAMP, $PID = :pid, $VERSION = :newVersion " + "WHERE $ID = 'X' AND " + // we are master node @@ -177,6 +178,8 @@ class RunOnceService(private val database: CordaPersistence, private val machine "Check if ${mutualExclusion.machineName}, PID: ${mutualExclusion.pid} is still running. " + "Try again in ${Duration.ofMillis(waitInterval)}") } + log.info("Will sleep for $waitInterval seconds till lease expires then shutting down this process.") + shutdownWait(waitInterval) System.exit(1) } diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt index 862139647a..4daee2c91d 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt @@ -65,7 +65,11 @@ class RunOnceServiceTest { runOnceServiceMachine1.start() - val runOnceServiceLongWait = RunOnceService(database, "machineLongWait", "99999", 1, 20000000000000000, mockUpdateExecutor) + val waitInterval = 20000000000000000; + val shutdownWaitStub = { waitTime: Long -> assertEquals(waitInterval, waitTime) } + + val runOnceServiceLongWait = RunOnceService(database, "machineLongWait", "99999", 1, + waitInterval, mockUpdateExecutor, shutdownWaitStub) // fails as didn't wait long enough, someone else could still be running runOnceServiceLongWait.start() }