mirror of
https://github.com/corda/corda.git
synced 2025-05-02 00:39:53 +00:00
Merge pull request #2046 from thschroeter/thomas-remove-snapshotting
CORDA-767: Raft Notary: remove snapshotting
This commit is contained in:
commit
2577c75f28
@ -19,7 +19,7 @@ import java.util.LinkedHashMap
|
|||||||
* to disk, and sharing them across the cluster. A new node joining the cluster will have to obtain and install a snapshot
|
* to disk, and sharing them across the cluster. A new node joining the cluster will have to obtain and install a snapshot
|
||||||
* containing the entire JDBC table contents.
|
* containing the entire JDBC table contents.
|
||||||
*/
|
*/
|
||||||
class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap<K, V, E, EK>) : StateMachine(), Snapshottable {
|
class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap<K, V, E, EK>) : StateMachine() {
|
||||||
companion object {
|
companion object {
|
||||||
private val log = loggerFor<DistributedImmutableMap<*, *, *, *>>()
|
private val log = loggerFor<DistributedImmutableMap<*, *, *, *>>()
|
||||||
}
|
}
|
||||||
@ -27,9 +27,16 @@ class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence,
|
|||||||
object Commands {
|
object Commands {
|
||||||
class PutAll<K, V>(val entries: Map<K, V>) : Command<Map<K, V>> {
|
class PutAll<K, V>(val entries: Map<K, V>) : Command<Map<K, V>> {
|
||||||
override fun compaction(): Command.CompactionMode {
|
override fun compaction(): Command.CompactionMode {
|
||||||
// The SNAPSHOT compaction mode indicates that a command can be removed from the Raft log once
|
// The FULL compaction mode retains the command in the log until it has been stored and applied on all
|
||||||
// a snapshot of the state machine has been written to disk
|
// servers in the cluster. Once the commit has been applied to a state machine and closed it may be
|
||||||
return Command.CompactionMode.SNAPSHOT
|
// removed from the log during minor or major compaction.
|
||||||
|
//
|
||||||
|
// Note that we are not closing the commits, thus our log grows without bounds. We let the log grow on
|
||||||
|
// purpose to be able to increase the size of a running cluster, e.g. to add and decommission nodes.
|
||||||
|
// TODO: Cluster membership changes need testing.
|
||||||
|
// TODO: I'm wondering if we should support resizing notary clusters, or if we could require users to
|
||||||
|
// setup a new cluster of the desired size and transfer the data.
|
||||||
|
return Command.CompactionMode.FULL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -70,29 +77,4 @@ class DistributedImmutableMap<K : Any, V : Any, E, EK>(val db: CordaPersistence,
|
|||||||
return db.transaction { map.size }
|
return db.transaction { map.size }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Writes out all [map] entries to disk. Note that this operation does not load all entries into memory, as the
|
|
||||||
* [SnapshotWriter] is using a disk-backed buffer internally, and iterating map entries results in only a
|
|
||||||
* fixed number of recently accessed entries to ever be kept in memory.
|
|
||||||
*/
|
|
||||||
override fun snapshot(writer: SnapshotWriter) {
|
|
||||||
db.transaction {
|
|
||||||
writer.writeInt(map.size)
|
|
||||||
map.allPersisted().forEach { writer.writeObject(it.first to it.second) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Reads entries from disk and adds them to [map]. */
|
|
||||||
override fun install(reader: SnapshotReader) {
|
|
||||||
val size = reader.readInt()
|
|
||||||
db.transaction {
|
|
||||||
map.clear()
|
|
||||||
// TODO: read & put entries in batches
|
|
||||||
for (i in 1..size) {
|
|
||||||
val (key, value) = reader.readObject<Pair<K, V>>()
|
|
||||||
map[key] = value
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user