mirror of
https://github.com/corda/corda.git
synced 2025-06-19 15:43:52 +00:00
rewrite NodeSchedulerService to use Hibernate
This commit is contained in:
@ -1,13 +1,12 @@
|
|||||||
package net.corda.node.services.events
|
package net.corda.node.services.events
|
||||||
|
|
||||||
import com.google.common.util.concurrent.SettableFuture
|
import com.google.common.util.concurrent.SettableFuture
|
||||||
|
import net.corda.core.contracts.*
|
||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.contracts.SchedulableState
|
import net.corda.core.crypto.SecureHash
|
||||||
import net.corda.core.contracts.ScheduledActivity
|
|
||||||
import net.corda.core.contracts.ScheduledStateRef
|
|
||||||
import net.corda.core.contracts.StateRef
|
|
||||||
import net.corda.core.flows.FlowInitiator
|
import net.corda.core.flows.FlowInitiator
|
||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
|
import net.corda.core.schemas.PersistentStateRef
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
import net.corda.core.utilities.trace
|
import net.corda.core.utilities.trace
|
||||||
@ -16,12 +15,12 @@ import net.corda.node.services.api.ServiceHubInternal
|
|||||||
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
import net.corda.node.services.statemachine.FlowLogicRefFactoryImpl
|
||||||
import net.corda.node.utilities.*
|
import net.corda.node.utilities.*
|
||||||
import org.apache.activemq.artemis.utils.ReusableLatch
|
import org.apache.activemq.artemis.utils.ReusableLatch
|
||||||
import org.jetbrains.exposed.sql.ResultRow
|
|
||||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
|
import java.util.*
|
||||||
import java.util.concurrent.Executor
|
import java.util.concurrent.Executor
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
|
import javax.persistence.*
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A first pass of a simple [SchedulerService] that works with [MutableClock]s for testing, demonstrations and simulations
|
* A first pass of a simple [SchedulerService] that works with [MutableClock]s for testing, demonstrations and simulations
|
||||||
@ -48,40 +47,44 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
|||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
private val log = loggerFor<NodeSchedulerService>()
|
private val log = loggerFor<NodeSchedulerService>()
|
||||||
|
|
||||||
|
fun createMap(): PersistentMap<StateRef, ScheduledStateRef, PersistentScheduledState, PersistentStateRef> {
|
||||||
|
return PersistentMap(
|
||||||
|
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
|
||||||
|
fromPersistentEntity = {
|
||||||
|
//TODO null check will become obsolete after making DB/JPA columns not nullable
|
||||||
|
var txId = it.output.txId ?: throw IllegalStateException("DB returned null SecureHash transactionId")
|
||||||
|
var index = it.output.index ?: throw IllegalStateException("DB returned null SecureHash index")
|
||||||
|
Pair(StateRef(SecureHash.parse(txId), index),
|
||||||
|
ScheduledStateRef(StateRef(SecureHash.parse(txId), index), it.scheduledAt))
|
||||||
|
},
|
||||||
|
toPersistentEntity = { key: StateRef, value: ScheduledStateRef ->
|
||||||
|
PersistentScheduledState().apply {
|
||||||
|
output = PersistentStateRef(key.txhash.toString(), key.index)
|
||||||
|
scheduledAt = value.scheduledAt
|
||||||
|
}
|
||||||
|
},
|
||||||
|
persistentEntityClass = PersistentScheduledState::class.java
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private object Table : JDBCHashedTable("${NODE_DATABASE_PREFIX}scheduled_states") {
|
@Entity
|
||||||
val output = stateRef("transaction_id", "output_index")
|
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}scheduled_states")
|
||||||
val scheduledAt = instant("scheduled_at")
|
class PersistentScheduledState(
|
||||||
}
|
@EmbeddedId
|
||||||
|
var output: PersistentStateRef = PersistentStateRef(),
|
||||||
|
|
||||||
|
@Column(name = "scheduled_at", nullable = false)
|
||||||
|
var scheduledAt: Instant = Instant.now()
|
||||||
|
)
|
||||||
|
|
||||||
// Variables inside InnerState are protected with a lock by the ThreadBox and aren't in scope unless you're
|
|
||||||
// inside mutex.locked {} code block. So we can't forget to take the lock unless we accidentally leak a reference
|
|
||||||
// to somewhere.
|
|
||||||
private class InnerState {
|
private class InnerState {
|
||||||
var scheduledStates = object : AbstractJDBCHashMap<StateRef, ScheduledStateRef, Table>(Table, loadOnInit = true) {
|
var scheduledStates = createMap()
|
||||||
override fun keyFromRow(row: ResultRow): StateRef = StateRef(row[table.output.txId], row[table.output.index])
|
|
||||||
|
|
||||||
override fun valueFromRow(row: ResultRow): ScheduledStateRef {
|
var scheduledStatesQueue: PriorityQueue<ScheduledStateRef> = PriorityQueue( { a, b -> a.scheduledAt.compareTo(b.scheduledAt) } )
|
||||||
return ScheduledStateRef(StateRef(row[table.output.txId], row[table.output.index]), row[table.scheduledAt])
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun addKeyToInsert(insert: InsertStatement, entry: Map.Entry<StateRef, ScheduledStateRef>, finalizables: MutableList<() -> Unit>) {
|
|
||||||
insert[table.output.txId] = entry.key.txhash
|
|
||||||
insert[table.output.index] = entry.key.index
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun addValueToInsert(insert: InsertStatement, entry: Map.Entry<StateRef, ScheduledStateRef>, finalizables: MutableList<() -> Unit>) {
|
|
||||||
insert[table.scheduledAt] = entry.value.scheduledAt
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
var earliestState: ScheduledStateRef? = null
|
|
||||||
var rescheduled: SettableFuture<Boolean>? = null
|
var rescheduled: SettableFuture<Boolean>? = null
|
||||||
|
|
||||||
internal fun recomputeEarliest() {
|
|
||||||
earliestState = scheduledStates.values.sortedBy { it.scheduledAt }.firstOrNull()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private val mutex = ThreadBox(InnerState())
|
private val mutex = ThreadBox(InnerState())
|
||||||
@ -89,7 +92,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
|||||||
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
|
// We need the [StateMachineManager] to be constructed before this is called in case it schedules a flow.
|
||||||
fun start() {
|
fun start() {
|
||||||
mutex.locked {
|
mutex.locked {
|
||||||
recomputeEarliest()
|
scheduledStatesQueue.addAll(scheduledStates.all().map { it.second } .toMutableList())
|
||||||
rescheduleWakeUp()
|
rescheduleWakeUp()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -97,16 +100,20 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
|||||||
override fun scheduleStateActivity(action: ScheduledStateRef) {
|
override fun scheduleStateActivity(action: ScheduledStateRef) {
|
||||||
log.trace { "Schedule $action" }
|
log.trace { "Schedule $action" }
|
||||||
mutex.locked {
|
mutex.locked {
|
||||||
if (scheduledStates.put(action.ref, action) == null) {
|
val previousState = scheduledStates[action.ref]
|
||||||
|
scheduledStates[action.ref] = action
|
||||||
|
var previousEarliest = scheduledStatesQueue.peek()
|
||||||
|
scheduledStatesQueue.remove(previousState)
|
||||||
|
scheduledStatesQueue.add(action)
|
||||||
|
if (previousState == null) {
|
||||||
unfinishedSchedules.countUp()
|
unfinishedSchedules.countUp()
|
||||||
}
|
}
|
||||||
if (action.scheduledAt.isBefore(earliestState?.scheduledAt ?: Instant.MAX)) {
|
|
||||||
|
if (action.scheduledAt.isBefore(previousEarliest?.scheduledAt ?: Instant.MAX)) {
|
||||||
// We are earliest
|
// We are earliest
|
||||||
earliestState = action
|
|
||||||
rescheduleWakeUp()
|
rescheduleWakeUp()
|
||||||
} else if (earliestState?.ref == action.ref && earliestState!!.scheduledAt != action.scheduledAt) {
|
} else if(previousEarliest?.ref == action.ref && previousEarliest.scheduledAt != action.scheduledAt) {
|
||||||
// We were earliest but might not be any more
|
// We were earliest but might not be any more
|
||||||
recomputeEarliest()
|
|
||||||
rescheduleWakeUp()
|
rescheduleWakeUp()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -117,9 +124,9 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
|||||||
mutex.locked {
|
mutex.locked {
|
||||||
val removedAction = scheduledStates.remove(ref)
|
val removedAction = scheduledStates.remove(ref)
|
||||||
if (removedAction != null) {
|
if (removedAction != null) {
|
||||||
|
scheduledStatesQueue.remove(removedAction)
|
||||||
unfinishedSchedules.countDown()
|
unfinishedSchedules.countDown()
|
||||||
if (removedAction == earliestState) {
|
if (removedAction == scheduledStatesQueue.peek()) {
|
||||||
recomputeEarliest()
|
|
||||||
rescheduleWakeUp()
|
rescheduleWakeUp()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -139,7 +146,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
|||||||
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
|
val (scheduledState, ourRescheduledFuture) = mutex.alreadyLocked {
|
||||||
rescheduled?.cancel(false)
|
rescheduled?.cancel(false)
|
||||||
rescheduled = SettableFuture.create()
|
rescheduled = SettableFuture.create()
|
||||||
Pair(earliestState, rescheduled!!)
|
Pair(scheduledStatesQueue.peek(), rescheduled!!)
|
||||||
}
|
}
|
||||||
if (scheduledState != null) {
|
if (scheduledState != null) {
|
||||||
schedulerTimerExecutor.execute {
|
schedulerTimerExecutor.execute {
|
||||||
@ -157,7 +164,7 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
private fun onTimeReached(scheduledState: ScheduledStateRef) {
|
||||||
serverThread.fetchFrom {
|
serverThread.execute {
|
||||||
services.database.transaction {
|
services.database.transaction {
|
||||||
val scheduledFlow = getScheduledFlow(scheduledState)
|
val scheduledFlow = getScheduledFlow(scheduledState)
|
||||||
if (scheduledFlow != null) {
|
if (scheduledFlow != null) {
|
||||||
@ -175,28 +182,28 @@ class NodeSchedulerService(private val services: ServiceHubInternal,
|
|||||||
var scheduledFlow: FlowLogic<*>? = null
|
var scheduledFlow: FlowLogic<*>? = null
|
||||||
mutex.locked {
|
mutex.locked {
|
||||||
// need to remove us from those scheduled, but only if we are still next
|
// need to remove us from those scheduled, but only if we are still next
|
||||||
scheduledStates.compute(scheduledState.ref) { _, value ->
|
val previousState = scheduledStates[scheduledState.ref]
|
||||||
if (value === scheduledState) {
|
if (previousState != null && previousState === scheduledState) {
|
||||||
if (scheduledActivity == null) {
|
if (scheduledActivity == null) {
|
||||||
log.info("Scheduled state $scheduledState has rescheduled to never.")
|
log.info("Scheduled state $scheduledState has rescheduled to never.")
|
||||||
unfinishedSchedules.countDown()
|
unfinishedSchedules.countDown()
|
||||||
null
|
scheduledStates.remove(scheduledState.ref)
|
||||||
|
scheduledStatesQueue.remove(scheduledState)
|
||||||
} else if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) {
|
} else if (scheduledActivity.scheduledAt.isAfter(services.clock.instant())) {
|
||||||
log.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
|
log.info("Scheduled state $scheduledState has rescheduled to ${scheduledActivity.scheduledAt}.")
|
||||||
ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
|
var newState = ScheduledStateRef(scheduledState.ref, scheduledActivity.scheduledAt)
|
||||||
|
scheduledStates[scheduledState.ref] = newState
|
||||||
|
scheduledStatesQueue.remove(scheduledState)
|
||||||
|
scheduledStatesQueue.add(newState)
|
||||||
} else {
|
} else {
|
||||||
// TODO: FlowLogicRefFactory needs to sort out the class loader etc
|
|
||||||
val flowLogic = FlowLogicRefFactoryImpl.toFlowLogic(scheduledActivity.logicRef)
|
val flowLogic = FlowLogicRefFactoryImpl.toFlowLogic(scheduledActivity.logicRef)
|
||||||
log.trace { "Scheduler starting FlowLogic $flowLogic" }
|
log.trace { "Scheduler starting FlowLogic $flowLogic" }
|
||||||
scheduledFlow = flowLogic
|
scheduledFlow = flowLogic
|
||||||
null
|
scheduledStates.remove(scheduledState.ref)
|
||||||
}
|
scheduledStatesQueue.remove(scheduledState)
|
||||||
} else {
|
|
||||||
value
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// and schedule the next one
|
// and schedule the next one
|
||||||
recomputeEarliest()
|
|
||||||
rescheduleWakeUp()
|
rescheduleWakeUp()
|
||||||
}
|
}
|
||||||
return scheduledFlow
|
return scheduledFlow
|
||||||
|
@ -9,6 +9,7 @@ import net.corda.core.schemas.PersistentState
|
|||||||
import net.corda.core.schemas.QueryableState
|
import net.corda.core.schemas.QueryableState
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
import net.corda.node.services.api.SchemaService
|
import net.corda.node.services.api.SchemaService
|
||||||
|
import net.corda.node.services.events.NodeSchedulerService
|
||||||
import net.corda.node.services.keys.PersistentKeyManagementService
|
import net.corda.node.services.keys.PersistentKeyManagementService
|
||||||
import net.corda.node.services.persistence.DBCheckpointStorage
|
import net.corda.node.services.persistence.DBCheckpointStorage
|
||||||
import net.corda.node.services.persistence.DBTransactionMappingStorage
|
import net.corda.node.services.persistence.DBTransactionMappingStorage
|
||||||
@ -35,7 +36,8 @@ class NodeSchemaService(customSchemas: Set<MappedSchema> = emptySet()) : SchemaS
|
|||||||
DBTransactionStorage.DBTransaction::class.java,
|
DBTransactionStorage.DBTransaction::class.java,
|
||||||
DBTransactionMappingStorage.DBTransactionMapping::class.java,
|
DBTransactionMappingStorage.DBTransactionMapping::class.java,
|
||||||
PersistentKeyManagementService.PersistentKey::class.java,
|
PersistentKeyManagementService.PersistentKey::class.java,
|
||||||
PersistentUniquenessProvider.PersistentUniqueness::class.java
|
PersistentUniquenessProvider.PersistentUniqueness::class.java,
|
||||||
|
NodeSchedulerService.PersistentScheduledState::class.java
|
||||||
))
|
))
|
||||||
|
|
||||||
// Required schemas are those used by internal Corda services
|
// Required schemas are those used by internal Corda services
|
||||||
|
@ -79,8 +79,8 @@ class AppendOnlyPersistentMap<K, V, E, EK> (
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Puts the value into the map and the underlying storage.
|
* Associates the specified value with the specified key in this map and persists it.
|
||||||
* Inserting the duplicated key may be unpredictable.
|
* If the map previously contained a mapping for the key, the behaviour is unpredictable and may throw an error from the underlying storage.
|
||||||
*/
|
*/
|
||||||
operator fun set(key: K, value: V) =
|
operator fun set(key: K, value: V) =
|
||||||
set(key, value, logWarning = false) {
|
set(key, value, logWarning = false) {
|
||||||
@ -89,8 +89,8 @@ class AppendOnlyPersistentMap<K, V, E, EK> (
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Puts the value into the map and underlying storage.
|
* Associates the specified value with the specified key in this map and persists it.
|
||||||
* Duplicated key is not added into the map and underlying storage.
|
* If the map previously contained a mapping for the key, the old value is not replaced.
|
||||||
* @return true if added key was unique, otherwise false
|
* @return true if added key was unique, otherwise false
|
||||||
*/
|
*/
|
||||||
fun addWithDuplicatesAllowed(key: K, value: V): Boolean =
|
fun addWithDuplicatesAllowed(key: K, value: V): Boolean =
|
||||||
|
@ -0,0 +1,36 @@
|
|||||||
|
package net.corda.node.utilities
|
||||||
|
|
||||||
|
import com.google.common.cache.*
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture
|
||||||
|
|
||||||
|
|
||||||
|
class NonInvalidatingUnboundCache<K, V> private constructor(
|
||||||
|
val cache: LoadingCache<K, V>
|
||||||
|
): LoadingCache<K, V> by cache {
|
||||||
|
|
||||||
|
constructor(concurrencyLevel: Int, loadFunction: (K) -> V) :
|
||||||
|
this(buildCache(concurrencyLevel, loadFunction, RemovalListener<K, V> {
|
||||||
|
//no removal
|
||||||
|
}))
|
||||||
|
|
||||||
|
constructor(concurrencyLevel: Int, loadFunction: (K) -> V, removalListener: RemovalListener<K, V>) :
|
||||||
|
this(buildCache(concurrencyLevel, loadFunction, removalListener))
|
||||||
|
|
||||||
|
private companion object {
|
||||||
|
private fun <K, V> buildCache(concurrencyLevel: Int, loadFunction: (K) -> V, removalListener: RemovalListener<K, V>): LoadingCache<K, V> {
|
||||||
|
val builder = CacheBuilder.newBuilder().concurrencyLevel(concurrencyLevel).removalListener(removalListener)
|
||||||
|
return builder.build(NonInvalidatingCacheLoader(loadFunction))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO look into overriding loadAll() if we ever use it
|
||||||
|
private class NonInvalidatingCacheLoader<K, V>(val loadFunction: (K) -> V) : CacheLoader<K, V>() {
|
||||||
|
override fun reload(key: K, oldValue: V): ListenableFuture<V> {
|
||||||
|
throw IllegalStateException("Non invalidating cache refreshed")
|
||||||
|
}
|
||||||
|
override fun load(key: K) = loadFunction(key)
|
||||||
|
override fun loadAll(keys: Iterable<K>): MutableMap<K, V> {
|
||||||
|
return super.loadAll(keys)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
129
node/src/main/kotlin/net/corda/node/utilities/PersistentMap.kt
Normal file
129
node/src/main/kotlin/net/corda/node/utilities/PersistentMap.kt
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
package net.corda.node.utilities
|
||||||
|
|
||||||
|
|
||||||
|
import com.google.common.cache.RemovalCause
|
||||||
|
import com.google.common.cache.RemovalListener
|
||||||
|
import com.google.common.cache.RemovalNotification
|
||||||
|
import net.corda.core.utilities.loggerFor
|
||||||
|
import java.util.*
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements an unbound caching layer on top of a table accessed via Hibernate mapping.
|
||||||
|
*/
|
||||||
|
class PersistentMap<K, V, E, EK> (
|
||||||
|
val toPersistentEntityKey: (K) -> EK,
|
||||||
|
val fromPersistentEntity: (E) -> Pair<K,V>,
|
||||||
|
val toPersistentEntity: (key: K, value: V) -> E,
|
||||||
|
val persistentEntityClass: Class<E>
|
||||||
|
) {
|
||||||
|
|
||||||
|
private companion object {
|
||||||
|
val log = loggerFor<PersistentMap<*, *, *, *>>()
|
||||||
|
}
|
||||||
|
|
||||||
|
private val cache = NonInvalidatingUnboundCache(
|
||||||
|
concurrencyLevel = 8,
|
||||||
|
loadFunction = { key -> Optional.ofNullable(loadValue(key)) },
|
||||||
|
removalListener = ExplicitRemoval(toPersistentEntityKey, persistentEntityClass)
|
||||||
|
)
|
||||||
|
|
||||||
|
class ExplicitRemoval<K, V, E, EK>(val toPersistentEntityKey: (K) -> EK, val persistentEntityClass: Class<E>): RemovalListener<K,V> {
|
||||||
|
override fun onRemoval(notification: RemovalNotification<K, V>?) {
|
||||||
|
when (notification?.cause) {
|
||||||
|
RemovalCause.EXPLICIT -> {
|
||||||
|
val session = DatabaseTransactionManager.current().session
|
||||||
|
val elem = session.find(persistentEntityClass, toPersistentEntityKey(notification.key))
|
||||||
|
if (elem != null) {
|
||||||
|
session.remove(elem)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
RemovalCause.EXPIRED, RemovalCause.SIZE, RemovalCause.COLLECTED -> {
|
||||||
|
log.error("Entry was removed from cache!!!")
|
||||||
|
}
|
||||||
|
//else do nothing for RemovalCause.REPLACED
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
operator fun get(key: K): V? {
|
||||||
|
return cache.get(key).orElse(null)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun all(): Sequence<Pair<K, V>> {
|
||||||
|
return cache.asMap().map { entry -> Pair(entry.key as K, entry.value as V) }.asSequence()
|
||||||
|
}
|
||||||
|
|
||||||
|
private tailrec fun set(key: K, value: V, logWarning: Boolean = true, store: (K,V) -> V?): Boolean {
|
||||||
|
var insertionAttempt = false
|
||||||
|
var isUnique = true
|
||||||
|
val existingInCache = cache.get(key) { // Thread safe, if multiple threads may wait until the first one has loaded.
|
||||||
|
insertionAttempt = true
|
||||||
|
// Key wasn't in the cache and might be in the underlying storage.
|
||||||
|
// Depending on 'store' method, this may insert without checking key duplication or it may avoid inserting a duplicated key.
|
||||||
|
val existingInDb = store(key, value)
|
||||||
|
if (existingInDb != null) { // Always reuse an existing value from the storage of a duplicated key.
|
||||||
|
Optional.of(existingInDb)
|
||||||
|
} else {
|
||||||
|
Optional.of(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!insertionAttempt) {
|
||||||
|
if (existingInCache.isPresent) {
|
||||||
|
// Key already exists in cache, do nothing.
|
||||||
|
isUnique = false
|
||||||
|
} else {
|
||||||
|
// This happens when the key was queried before with no value associated. We invalidate the cached null
|
||||||
|
// value and recursively call set again. This is to avoid race conditions where another thread queries after
|
||||||
|
// the invalidate but before the set.
|
||||||
|
cache.invalidate(key)
|
||||||
|
return set(key, value, logWarning, store)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (logWarning && !isUnique) {
|
||||||
|
log.warn("Double insert in ${this.javaClass.name} for entity class $persistentEntityClass key $key, not inserting the second time")
|
||||||
|
}
|
||||||
|
return isUnique
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Associates the specified value with the specified key in this map and persists it.
|
||||||
|
* If the map previously contained a mapping for the key, the behaviour is unpredictable and may throw an error from the underlying storage.
|
||||||
|
*/
|
||||||
|
operator fun set(key: K, value: V) =
|
||||||
|
set(key, value, logWarning = false) {
|
||||||
|
key,value -> DatabaseTransactionManager.current().session.save(toPersistentEntity(key,value))
|
||||||
|
null
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Associates the specified value with the specified key in this map and persists it.
|
||||||
|
* If the map previously contained a mapping for the key, the old value is not replaced.
|
||||||
|
* @return true if added key was unique, otherwise false
|
||||||
|
*/
|
||||||
|
fun addWithDuplicatesAllowed(key: K, value: V): Boolean =
|
||||||
|
set(key, value) {
|
||||||
|
key, value ->
|
||||||
|
val existingEntry = DatabaseTransactionManager.current().session.find(persistentEntityClass, toPersistentEntityKey(key))
|
||||||
|
if (existingEntry == null) {
|
||||||
|
DatabaseTransactionManager.current().session.save(toPersistentEntity(key,value))
|
||||||
|
null
|
||||||
|
} else {
|
||||||
|
fromPersistentEntity(existingEntry).second
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun loadValue(key: K): V? {
|
||||||
|
val result = DatabaseTransactionManager.current().session.find(persistentEntityClass, toPersistentEntityKey(key))
|
||||||
|
return result?.let(fromPersistentEntity)?.second
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the mapping for the specified key from this map and underlying storage if present.
|
||||||
|
*/
|
||||||
|
fun remove(key: K): V? {
|
||||||
|
val result = cache.get(key).orElse(null)
|
||||||
|
cache.invalidate(key)
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user