ENT-1824 Mutual Exclusion sleep (#841)

* Another host is detected as leader, sleep till lease expires
This commit is contained in:
cburlinchon 2018-05-15 16:48:37 +01:00 committed by GitHub
parent 361569901a
commit d53d910693
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 10 additions and 3 deletions

View File

@ -41,11 +41,13 @@ const val VERSION = "version"
* @property updateInterval rate(milliseconds) at which the running node updates row.
* @property waitInterval amount of time(milliseconds) to wait since last row update before being able to become the master node.
* @property updateExecutor runs a row update every [updateInterval] milliseconds.
* @property shutdownWait function executed before shutdown.
*/
class RunOnceService(private val database: CordaPersistence, private val machineName: String, private val pid: String,
private val updateInterval: Long, private val waitInterval: Long,
private val updateExecutor: ScheduledExecutorService =
AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1)) : SingletonSerializeAsToken() {
AffinityExecutor.ServiceAffinityExecutor("RunOnceService", 1),
private val shutdownWait: (Long) -> Unit = Thread::sleep) : SingletonSerializeAsToken() {
private val log = loggerFor<RunOnceService>()
private val running = AtomicBoolean(false)
@ -145,7 +147,6 @@ class RunOnceService(private val database: CordaPersistence, private val machine
}
private fun updateTimestamp(session: Session, mutualExclusion: MutualExclusion): Boolean {
val hql = "UPDATE RunOnceService\$MutualExclusion SET $MACHINE_NAME = :machineName, $TIMESTAMP = CURRENT_TIMESTAMP, $PID = :pid, $VERSION = :newVersion " +
"WHERE $ID = 'X' AND " +
// we are master node
@ -177,6 +178,8 @@ class RunOnceService(private val database: CordaPersistence, private val machine
"Check if ${mutualExclusion.machineName}, PID: ${mutualExclusion.pid} is still running. " +
"Try again in ${Duration.ofMillis(waitInterval)}")
}
log.info("Will sleep for $waitInterval seconds till lease expires then shutting down this process.")
shutdownWait(waitInterval)
System.exit(1)
}

View File

@ -65,7 +65,11 @@ class RunOnceServiceTest {
runOnceServiceMachine1.start()
val runOnceServiceLongWait = RunOnceService(database, "machineLongWait", "99999", 1, 20000000000000000, mockUpdateExecutor)
val waitInterval = 20000000000000000;
val shutdownWaitStub = { waitTime: Long -> assertEquals(waitInterval, waitTime) }
val runOnceServiceLongWait = RunOnceService(database, "machineLongWait", "99999", 1,
waitInterval, mockUpdateExecutor, shutdownWaitStub)
// fails as didn't wait long enough, someone else could still be running
runOnceServiceLongWait.start()
}