Store raft commit index in DistributedImmutableMap

This commit is contained in:
Thomas Schroeter 2017-11-10 15:48:12 +00:00
parent 2577c75f28
commit ecd09b7e6c
2 changed files with 16 additions and 15 deletions

View File

@ -3,10 +3,7 @@ package net.corda.node.services.transactions
import io.atomix.copycat.Command import io.atomix.copycat.Command
import io.atomix.copycat.Query import io.atomix.copycat.Query
import io.atomix.copycat.server.Commit import io.atomix.copycat.server.Commit
import io.atomix.copycat.server.Snapshottable
import io.atomix.copycat.server.StateMachine import io.atomix.copycat.server.StateMachine
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
import net.corda.core.utilities.loggerFor import net.corda.core.utilities.loggerFor
import net.corda.node.utilities.* import net.corda.node.utilities.*
import java.util.LinkedHashMap import java.util.LinkedHashMap
@ -15,11 +12,10 @@ import java.util.LinkedHashMap
* A distributed map state machine that doesn't allow overriding values. The state machine is replicated * A distributed map state machine that doesn't allow overriding values. The state machine is replicated
* across a Copycat Raft cluster. * across a Copycat Raft cluster.
* *
* The map contents are backed by a JDBC table. State re-synchronisation is achieved by periodically persisting snapshots * The map contents are backed by a JDBC table. State re-synchronisation is achieved by replaying the command log to the
* to disk, and sharing them across the cluster. A new node joining the cluster will have to obtain and install a snapshot * new (or re-joining) cluster member.
* containing the entire JDBC table contents.
*/ */
class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap<K, V, E, EK>) : StateMachine() { class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap<K, Pair<Long, V>, E, EK>) : StateMachine() {
companion object { companion object {
private val log = loggerFor<DistributedImmutableMap<*, *, *, *>>() private val log = loggerFor<DistributedImmutableMap<*, *, *, *>>()
} }
@ -50,7 +46,7 @@ class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence,
fun get(commit: Commit<Commands.Get<K, V>>): V? { fun get(commit: Commit<Commands.Get<K, V>>): V? {
commit.use { commit.use {
val key = it.operation().key val key = it.operation().key
return db.transaction { map[key] } return db.transaction { map[key]?.second }
} }
} }
@ -61,12 +57,13 @@ class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence,
*/ */
fun put(commit: Commit<Commands.PutAll<K, V>>): Map<K, V> { fun put(commit: Commit<Commands.PutAll<K, V>>): Map<K, V> {
commit.use { commit.use {
val index = commit.index()
val conflicts = LinkedHashMap<K, V>() val conflicts = LinkedHashMap<K, V>()
db.transaction { db.transaction {
val entries = commit.operation().entries val entries = commit.operation().entries
log.debug("State machine commit: storing entries with keys (${entries.keys.joinToString()})") log.debug("State machine commit: storing entries with keys (${entries.keys.joinToString()})")
for (key in entries.keys) map[key]?.let { conflicts[key] = it } for (key in entries.keys) map[key]?.let { conflicts[key] = it.second }
if (conflicts.isEmpty()) map.putAll(entries) if (conflicts.isEmpty()) map.putAll(entries.mapValues { Pair(index, it.value) })
} }
return conflicts return conflicts
} }

View File

@ -48,16 +48,17 @@ class RaftUniquenessProvider(private val services: ServiceHubInternal, private v
companion object { companion object {
private val log = loggerFor<RaftUniquenessProvider>() private val log = loggerFor<RaftUniquenessProvider>()
fun createMap(): AppendOnlyPersistentMap<String, Any, RaftState, String> = fun createMap(): AppendOnlyPersistentMap<String, Pair<Long, Any>, RaftState, String> =
AppendOnlyPersistentMap( AppendOnlyPersistentMap(
toPersistentEntityKey = { it }, toPersistentEntityKey = { it },
fromPersistentEntity = { fromPersistentEntity = {
Pair(it.key, it.value.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)) Pair(it.key, Pair(it.index, it.value.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)))
}, },
toPersistentEntity = { k: String, v: Any -> toPersistentEntity = { k: String, v: Pair<Long, Any> ->
RaftState().apply { RaftState().apply {
key = k key = k
value = v.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes value = v.second.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes
index = v.first
} }
}, },
persistentEntityClass = RaftState::class.java persistentEntityClass = RaftState::class.java
@ -73,7 +74,10 @@ class RaftUniquenessProvider(private val services: ServiceHubInternal, private v
@Lob @Lob
@Column @Column
var value: ByteArray = ByteArray(0) var value: ByteArray = ByteArray(0),
@Column
var index: Long = 0
) )
/** Directory storing the Raft log and state machine snapshots */ /** Directory storing the Raft log and state machine snapshots */