diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt b/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt index 21d4742482..26d9b9e3fe 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/RunOnceService.kt @@ -29,6 +29,7 @@ const val ID = "mutual_exclusion_id" const val MACHINE_NAME = "machine_name" const val PID = "pid" const val TIMESTAMP = "mutual_exclusion_timestamp" +const val VERSION = "version" /** * Makes sure only one node is able to write to database. @@ -56,7 +57,7 @@ class RunOnceService(private val database: CordaPersistence, private val machine @Entity @Table(name = TABLE) - class MutualExclusion(machineNameInit: String, pidInit: String, timeStampInit: LocalDateTime) { + class MutualExclusion(machineNameInit: String, pidInit: String, timeStampInit: LocalDateTime, versionInit: Long = 0) { @Column(name = ID, insertable = false, updatable = false) @Id val id: Char = 'X' @@ -69,6 +70,9 @@ class RunOnceService(private val database: CordaPersistence, private val machine @Column(name = TIMESTAMP) val timestamp = timeStampInit + + @Column(name = VERSION) + val version = versionInit } fun start() { @@ -119,10 +123,11 @@ class RunOnceService(private val database: CordaPersistence, private val machine } private fun insertMutualExclusion(session: Session) { - val query = session.createNativeQuery("INSERT INTO $TABLE VALUES ('X', :machineName, :pid, CURRENT_TIMESTAMP)", MutualExclusion::class.java) + val query = session.createNativeQuery("INSERT INTO $TABLE VALUES ('X', :machineName, :pid, CURRENT_TIMESTAMP, :version)", MutualExclusion::class.java) query.unwrap(org.hibernate.SQLQuery::class.java).addSynchronizedEntityClass(MutualExclusion::class.java) query.setParameter("pid", pid) query.setParameter("machineName", machineName) + query.setParameter("version", 0) val returnValue = query.executeUpdate() if (returnValue != 1) { @@ -138,23 +143,27 @@ class RunOnceService(private val database: CordaPersistence, private val machine } private fun updateTimestamp(session: Session, mutualExclusion: MutualExclusion): Boolean { - val minWaitTime = mutualExclusion.timestamp.plus(waitInterval, ChronoField.MILLI_OF_SECOND.baseUnit) - val hql = "UPDATE RunOnceService\$MutualExclusion SET $MACHINE_NAME = :machineName, $TIMESTAMP = CURRENT_TIMESTAMP, $PID = :pid " + + + val hql = "UPDATE RunOnceService\$MutualExclusion SET $MACHINE_NAME = :machineName, $TIMESTAMP = CURRENT_TIMESTAMP, $PID = :pid, $VERSION = :newVersion " + "WHERE $ID = 'X' AND " + // we are master node "($MACHINE_NAME = :machineName OR " + // change master node "($MACHINE_NAME != :machineName AND " + // no one else has updated timestamp whilst we attempted this update - "$TIMESTAMP = CAST(:mutualExclusionTimestamp AS LocalDateTime) AND " + + "$VERSION = :oldVersion AND " + // old timestamp - "CURRENT_TIMESTAMP > CAST(:waitTime AS LocalDateTime)))" + "CAST(CURRENT_TIMESTAMP as LocalDateTime) > CAST(:waitTime as LocalDateTime)))" val query = session.createQuery(hql) + val oldVersion = mutualExclusion.version + val minWaitTime = mutualExclusion.timestamp.plus(waitInterval, ChronoField.MILLI_OF_SECOND.baseUnit) + query.setParameter("pid", pid) query.setParameter("machineName", machineName) - query.setParameter("mutualExclusionTimestamp", mutualExclusion.timestamp) + query.setParameter("oldVersion", oldVersion) + query.setParameter("newVersion", oldVersion+1) query.setParameter("waitTime", minWaitTime) val returnValue = query.executeUpdate() diff --git a/node/src/main/resources/migration/node-core.changelog-v3.xml b/node/src/main/resources/migration/node-core.changelog-v3.xml index dc2761a97c..9e0cc17293 100644 --- a/node/src/main/resources/migration/node-core.changelog-v3.xml +++ b/node/src/main/resources/migration/node-core.changelog-v3.xml @@ -20,4 +20,13 @@ + + + + + + + + + diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt b/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt index d8f3a4f970..4cb6a4a928 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/RunOnceServiceTest.kt @@ -146,11 +146,14 @@ class RunOnceServiceTest { var secondTimestamp = LocalDateTime.now() var firstTimestamp = LocalDateTime.now() + var firstVersion = -1L + var secondVersion = -1L database.transaction { val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) val result = machine1RowCheck(query) firstTimestamp = result.timestamp + firstVersion = result.version } runnable.run() @@ -159,9 +162,11 @@ class RunOnceServiceTest { val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java) val result = machine1RowCheck(query) secondTimestamp = result.timestamp + secondVersion = result.version } assertTrue(secondTimestamp.isAfter(firstTimestamp)) + assertTrue(secondVersion > firstVersion) mock>() } @@ -171,7 +176,6 @@ class RunOnceServiceTest { verify(mockUpdateExecutor).scheduleAtFixedRate(any(), any(), any(), any()) } - @Test fun `timer exits if no row`() { exit.expectSystemExitWithStatus(1) @@ -255,3 +259,4 @@ class RunOnceServiceTest { return result } } +