From 4257891c98a9a0bf115cbdd391534fead47ca438 Mon Sep 17 00:00:00 2001 From: Thomas Schroeter Date: Fri, 26 Jan 2018 16:23:59 +0000 Subject: [PATCH] Revert "Raft Notary: remove snapshotting" (#2423) This reverts commit cf33be66fffa5d547b6d2370d8dbf0fe4121d714. --- .../transactions/DistributedImmutableMap.kt | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt b/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt index 4e6bc0ac89..4cb2082fb4 100644 --- a/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt +++ b/node/src/main/kotlin/net/corda/node/services/transactions/DistributedImmutableMap.kt @@ -3,7 +3,10 @@ package net.corda.node.services.transactions import io.atomix.copycat.Command import io.atomix.copycat.Query import io.atomix.copycat.server.Commit +import io.atomix.copycat.server.Snapshottable import io.atomix.copycat.server.StateMachine +import io.atomix.copycat.server.storage.snapshot.SnapshotReader +import io.atomix.copycat.server.storage.snapshot.SnapshotWriter import net.corda.core.utilities.contextLogger import net.corda.node.utilities.AppendOnlyPersistentMap import net.corda.nodeapi.internal.persistence.CordaPersistence @@ -16,7 +19,7 @@ import java.util.* * The map contents are backed by a JDBC table. State re-synchronisation is achieved by replaying the command log to the * new (or re-joining) cluster member. */ -class DistributedImmutableMap(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap, E, EK>) : StateMachine() { +class DistributedImmutableMap(val db: CordaPersistence, createMap: () -> AppendOnlyPersistentMap, E, EK>) : StateMachine(), Snapshottable { companion object { private val log = contextLogger() } @@ -75,4 +78,29 @@ class DistributedImmutableMap(val db: CordaPersistence, return db.transaction { map.size } } } + + /** + * Writes out all [map] entries to disk. Note that this operation does not load all entries into memory, as the + * [SnapshotWriter] is using a disk-backed buffer internally, and iterating map entries results in only a + * fixed number of recently accessed entries to ever be kept in memory. + */ + override fun snapshot(writer: SnapshotWriter) { + db.transaction { + writer.writeInt(map.size) + map.allPersisted().forEach { writer.writeObject(it.first to it.second) } + } + } + + /** Reads entries from disk and adds them to [map]. */ + override fun install(reader: SnapshotReader) { + val size = reader.readInt() + db.transaction { + map.clear() + // TODO: read & put entries in batches + for (i in 1..size) { + val (key, value) = reader.readObject>>() + map[key] = value + } + } + } }