From ecd09b7e6c174448220e20fa5d881c11574c619c Mon Sep 17 00:00:00 2001 From: Thomas Schroeter Date: Fri, 10 Nov 2017 15:48:12 +0000 Subject: [PATCH] Store raft commit index in DistributedImmutableMap --- .../transactions/DistributedImmutableMap.kt | 17 +++++++---------- .../transactions/RaftUniquenessProvider.kt | 14 +++++++++----- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt b/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt index 9a73e02b5e..0b8b00983f 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt @@ -3,10 +3,7 @@ package net.corda.node.services.transactions import io.atomix.copycat.Command import io.atomix.copycat.Query import io.atomix.copycat.server.Commit -import io.atomix.copycat.server.Snapshottable import io.atomix.copycat.server.StateMachine -import io.atomix.copycat.server.storage.snapshot.SnapshotReader -import io.atomix.copycat.server.storage.snapshot.SnapshotWriter import net.corda.core.utilities.loggerFor import net.corda.node.utilities.* import java.util.LinkedHashMap @@ -15,11 +12,10 @@ import java.util.LinkedHashMap * A distributed map state machine that doesn't allow overriding values. The state machine is replicated * across a Copycat Raft cluster. * - * The map contents are backed by a JDBC table. State re-synchronisation is achieved by periodically persisting snapshots - * to disk, and sharing them across the cluster. A new node joining the cluster will have to obtain and install a snapshot - * containing the entire JDBC table contents. + * The map contents are backed by a JDBC table. State re-synchronisation is achieved by replaying the command log to the + * new (or re-joining) cluster member. */ -class DistributedImmutableMap(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap) : StateMachine() { +class DistributedImmutableMap(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap, E, EK>) : StateMachine() { companion object { private val log = loggerFor>() } @@ -50,7 +46,7 @@ class DistributedImmutableMap(val db: CordaPersistence, fun get(commit: Commit>): V? { commit.use { val key = it.operation().key - return db.transaction { map[key] } + return db.transaction { map[key]?.second } } } @@ -61,12 +57,13 @@ class DistributedImmutableMap(val db: CordaPersistence, */ fun put(commit: Commit>): Map { commit.use { + val index = commit.index() val conflicts = LinkedHashMap() db.transaction { val entries = commit.operation().entries log.debug("State machine commit: storing entries with keys (${entries.keys.joinToString()})") - for (key in entries.keys) map[key]?.let { conflicts[key] = it } - if (conflicts.isEmpty()) map.putAll(entries) + for (key in entries.keys) map[key]?.let { conflicts[key] = it.second } + if (conflicts.isEmpty()) map.putAll(entries.mapValues { Pair(index, it.value) }) } return conflicts } diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt index 130f954a50..97e58d8de9 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/RaftUniquenessProvider.kt @@ -48,16 +48,17 @@ class RaftUniquenessProvider(private val services: ServiceHubInternal, private v companion object { private val log = loggerFor() - fun createMap(): AppendOnlyPersistentMap = + fun createMap(): AppendOnlyPersistentMap, RaftState, String> = AppendOnlyPersistentMap( toPersistentEntityKey = { it }, fromPersistentEntity = { - Pair(it.key, it.value.deserialize(context = SerializationDefaults.STORAGE_CONTEXT)) + Pair(it.key, Pair(it.index, it.value.deserialize(context = SerializationDefaults.STORAGE_CONTEXT))) }, - toPersistentEntity = { k: String, v: Any -> + toPersistentEntity = { k: String, v: Pair -> RaftState().apply { key = k - value = v.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes + value = v.second.serialize(context = SerializationDefaults.STORAGE_CONTEXT).bytes + index = v.first } }, persistentEntityClass = RaftState::class.java @@ -73,7 +74,10 @@ class RaftUniquenessProvider(private val services: ServiceHubInternal, private v @Lob @Column - var value: ByteArray = ByteArray(0) + var value: ByteArray = ByteArray(0), + + @Column + var index: Long = 0 ) /** Directory storing the Raft log and state machine snapshots */