mirror of
https://github.com/corda/corda.git
synced 2025-01-15 17:30:02 +00:00
[ENT-1639] Add version
column (#585)
* Add `version` column The equality check of the timestamp was tricky to get right. Better to use a logical timestamp.
This commit is contained in:
parent
24366012e2
commit
5def901980
@ -29,6 +29,7 @@ const val ID = "mutual_exclusion_id"
|
||||
const val MACHINE_NAME = "machine_name"
|
||||
const val PID = "pid"
|
||||
const val TIMESTAMP = "mutual_exclusion_timestamp"
|
||||
const val VERSION = "version"
|
||||
|
||||
/**
|
||||
* Makes sure only one node is able to write to database.
|
||||
@ -56,7 +57,7 @@ class RunOnceService(private val database: CordaPersistence, private val machine
|
||||
|
||||
@Entity
|
||||
@Table(name = TABLE)
|
||||
class MutualExclusion(machineNameInit: String, pidInit: String, timeStampInit: LocalDateTime) {
|
||||
class MutualExclusion(machineNameInit: String, pidInit: String, timeStampInit: LocalDateTime, versionInit: Long = 0) {
|
||||
@Column(name = ID, insertable = false, updatable = false)
|
||||
@Id
|
||||
val id: Char = 'X'
|
||||
@ -69,6 +70,9 @@ class RunOnceService(private val database: CordaPersistence, private val machine
|
||||
|
||||
@Column(name = TIMESTAMP)
|
||||
val timestamp = timeStampInit
|
||||
|
||||
@Column(name = VERSION)
|
||||
val version = versionInit
|
||||
}
|
||||
|
||||
fun start() {
|
||||
@ -119,10 +123,11 @@ class RunOnceService(private val database: CordaPersistence, private val machine
|
||||
}
|
||||
|
||||
private fun insertMutualExclusion(session: Session) {
|
||||
val query = session.createNativeQuery("INSERT INTO $TABLE VALUES ('X', :machineName, :pid, CURRENT_TIMESTAMP)", MutualExclusion::class.java)
|
||||
val query = session.createNativeQuery("INSERT INTO $TABLE VALUES ('X', :machineName, :pid, CURRENT_TIMESTAMP, :version)", MutualExclusion::class.java)
|
||||
query.unwrap(org.hibernate.SQLQuery::class.java).addSynchronizedEntityClass(MutualExclusion::class.java)
|
||||
query.setParameter("pid", pid)
|
||||
query.setParameter("machineName", machineName)
|
||||
query.setParameter("version", 0)
|
||||
val returnValue = query.executeUpdate()
|
||||
|
||||
if (returnValue != 1) {
|
||||
@ -138,23 +143,27 @@ class RunOnceService(private val database: CordaPersistence, private val machine
|
||||
}
|
||||
|
||||
private fun updateTimestamp(session: Session, mutualExclusion: MutualExclusion): Boolean {
|
||||
val minWaitTime = mutualExclusion.timestamp.plus(waitInterval, ChronoField.MILLI_OF_SECOND.baseUnit)
|
||||
val hql = "UPDATE RunOnceService\$MutualExclusion SET $MACHINE_NAME = :machineName, $TIMESTAMP = CURRENT_TIMESTAMP, $PID = :pid " +
|
||||
|
||||
val hql = "UPDATE RunOnceService\$MutualExclusion SET $MACHINE_NAME = :machineName, $TIMESTAMP = CURRENT_TIMESTAMP, $PID = :pid, $VERSION = :newVersion " +
|
||||
"WHERE $ID = 'X' AND " +
|
||||
// we are master node
|
||||
"($MACHINE_NAME = :machineName OR " +
|
||||
// change master node
|
||||
"($MACHINE_NAME != :machineName AND " +
|
||||
// no one else has updated timestamp whilst we attempted this update
|
||||
"$TIMESTAMP = CAST(:mutualExclusionTimestamp AS LocalDateTime) AND " +
|
||||
"$VERSION = :oldVersion AND " +
|
||||
// old timestamp
|
||||
"CURRENT_TIMESTAMP > CAST(:waitTime AS LocalDateTime)))"
|
||||
"CAST(CURRENT_TIMESTAMP as LocalDateTime) > CAST(:waitTime as LocalDateTime)))"
|
||||
|
||||
val query = session.createQuery(hql)
|
||||
|
||||
val oldVersion = mutualExclusion.version
|
||||
val minWaitTime = mutualExclusion.timestamp.plus(waitInterval, ChronoField.MILLI_OF_SECOND.baseUnit)
|
||||
|
||||
query.setParameter("pid", pid)
|
||||
query.setParameter("machineName", machineName)
|
||||
query.setParameter("mutualExclusionTimestamp", mutualExclusion.timestamp)
|
||||
query.setParameter("oldVersion", oldVersion)
|
||||
query.setParameter("newVersion", oldVersion+1)
|
||||
query.setParameter("waitTime", minWaitTime)
|
||||
val returnValue = query.executeUpdate()
|
||||
|
||||
|
@ -20,4 +20,13 @@
|
||||
<!--this is needed because pre-v3 attachments can't be used-->
|
||||
<delete tableName="node_attachments"/>
|
||||
</changeSet>
|
||||
|
||||
<changeSet author="R3.Corda" id="add_version_column_to_mutual_exclusion_table">
|
||||
<addColumn tableName="node_mutual_exclusion">
|
||||
<column name="version" type="BIGINT">
|
||||
<constraints nullable="false"/>
|
||||
</column>
|
||||
</addColumn>
|
||||
</changeSet>
|
||||
|
||||
</databaseChangeLog>
|
||||
|
@ -146,11 +146,14 @@ class RunOnceServiceTest {
|
||||
|
||||
var secondTimestamp = LocalDateTime.now()
|
||||
var firstTimestamp = LocalDateTime.now()
|
||||
var firstVersion = -1L
|
||||
var secondVersion = -1L
|
||||
|
||||
database.transaction {
|
||||
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
|
||||
val result = machine1RowCheck(query)
|
||||
firstTimestamp = result.timestamp
|
||||
firstVersion = result.version
|
||||
}
|
||||
|
||||
runnable.run()
|
||||
@ -159,9 +162,11 @@ class RunOnceServiceTest {
|
||||
val query = session.createNativeQuery(selectQuery, RunOnceService.MutualExclusion::class.java)
|
||||
val result = machine1RowCheck(query)
|
||||
secondTimestamp = result.timestamp
|
||||
secondVersion = result.version
|
||||
}
|
||||
|
||||
assertTrue(secondTimestamp.isAfter(firstTimestamp))
|
||||
assertTrue(secondVersion > firstVersion)
|
||||
|
||||
mock<ScheduledFuture<*>>()
|
||||
}
|
||||
@ -171,7 +176,6 @@ class RunOnceServiceTest {
|
||||
verify(mockUpdateExecutor).scheduleAtFixedRate(any(), any(), any(), any())
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
fun `timer exits if no row`() {
|
||||
exit.expectSystemExitWithStatus(1)
|
||||
@ -255,3 +259,4 @@ class RunOnceServiceTest {
|
||||
return result
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user