mirror of
https://github.com/corda/corda.git
synced 2025-06-18 15:18:16 +00:00
Revert "Raft Notary: remove snapshotting" (#2423)
This reverts commit cf33be66ff
.
This commit is contained in:
@ -3,7 +3,10 @@ package net.corda.node.services.transactions
|
|||||||
import io.atomix.copycat.Command
|
import io.atomix.copycat.Command
|
||||||
import io.atomix.copycat.Query
|
import io.atomix.copycat.Query
|
||||||
import io.atomix.copycat.server.Commit
|
import io.atomix.copycat.server.Commit
|
||||||
|
import io.atomix.copycat.server.Snapshottable
|
||||||
import io.atomix.copycat.server.StateMachine
|
import io.atomix.copycat.server.StateMachine
|
||||||
|
import io.atomix.copycat.server.storage.snapshot.SnapshotReader
|
||||||
|
import io.atomix.copycat.server.storage.snapshot.SnapshotWriter
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.node.utilities.AppendOnlyPersistentMap
|
import net.corda.node.utilities.AppendOnlyPersistentMap
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
@ -16,7 +19,7 @@ import java.util.*
|
|||||||
* The map contents are backed by a JDBC table. State re-synchronisation is achieved by replaying the command log to the
|
* The map contents are backed by a JDBC table. State re-synchronisation is achieved by replaying the command log to the
|
||||||
* new (or re-joining) cluster member.
|
* new (or re-joining) cluster member.
|
||||||
*/
|
*/
|
||||||
class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap<K, Pair<Long, V>, E, EK>) : StateMachine() {
|
class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap<K, Pair<Long, V>, E, EK>) : StateMachine(), Snapshottable {
|
||||||
companion object {
|
companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
}
|
}
|
||||||
@ -75,4 +78,29 @@ class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence,
|
|||||||
return db.transaction { map.size }
|
return db.transaction { map.size }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes out all [map] entries to disk. Note that this operation does not load all entries into memory, as the
|
||||||
|
* [SnapshotWriter] is using a disk-backed buffer internally, and iterating map entries results in only a
|
||||||
|
* fixed number of recently accessed entries to ever be kept in memory.
|
||||||
|
*/
|
||||||
|
override fun snapshot(writer: SnapshotWriter) {
|
||||||
|
db.transaction {
|
||||||
|
writer.writeInt(map.size)
|
||||||
|
map.allPersisted().forEach { writer.writeObject(it.first to it.second) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Reads entries from disk and adds them to [map]. */
|
||||||
|
override fun install(reader: SnapshotReader) {
|
||||||
|
val size = reader.readInt()
|
||||||
|
db.transaction {
|
||||||
|
map.clear()
|
||||||
|
// TODO: read & put entries in batches
|
||||||
|
for (i in 1..size) {
|
||||||
|
val (key, value) = reader.readObject<Pair<K, Pair<Long, V>>>()
|
||||||
|
map[key] = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user