Update CheckpointStorage to pass in serialization context.

This is cleaner than passing both the checkpoint and the
serialized checkpoint into the methods. Also fixed
CordaPersistanceServiceTests which I accidentally broke.
This commit is contained in:
Will Vigor 2020-02-26 10:42:54 +00:00
parent 4e64ac37d4
commit b71e78f202
6 changed files with 37 additions and 31 deletions

View File

@ -59,7 +59,8 @@ class CordaPersistenceServiceTests {
id = it.toString(),
blob = DBCheckpointStorage.DBFlowCheckpointBlob(
checkpoint = ByteArray(8192),
flowStack = ByteArray(8192)
flowStack = ByteArray(8192),
hmac = ByteArray(16)
),
result = DBCheckpointStorage.DBFlowResult(),
exceptionDetails = null,
@ -67,7 +68,8 @@ class CordaPersistenceServiceTests {
compatible = false,
progressStep = "",
ioRequestType = FlowIORequest.ForceCheckpoint.javaClass,
checkpointInstant = Instant.now()
checkpointInstant = Instant.now(),
flowMetadata = null
))
}
}

View File

@ -2,6 +2,7 @@ package net.corda.node.services.api
import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.node.services.statemachine.Checkpoint
import java.sql.Connection
import java.util.stream.Stream
@ -13,12 +14,12 @@ interface CheckpointStorage {
/**
* Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id
*/
fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedCheckpoint: SerializedBytes<Checkpoint>)
fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializationContext : CheckpointSerializationContext)
/**
* Update an existing checkpoint. Will throw if there is not checkpoint for this id.
*/
fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedCheckpoint: SerializedBytes<Checkpoint>)
fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializationContext : CheckpointSerializationContext)
/**
* Remove existing checkpoint from the store.

View File

@ -3,6 +3,8 @@ package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.Checkpoint.FlowStatus
@ -207,12 +209,12 @@ class DBCheckpointStorage : CheckpointStorage {
flowMetadata = null)
}
override fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedCheckpoint: SerializedBytes<Checkpoint>) {
currentDBSession().save(createDBCheckpoint(id, checkpoint, serializedCheckpoint))
override fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializationContext : CheckpointSerializationContext) {
currentDBSession().save(createDBCheckpoint(id, checkpoint, checkpoint.checkpointSerialize(context = serializationContext)))
}
override fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedCheckpoint: SerializedBytes<Checkpoint>) {
currentDBSession().update(createDBCheckpoint(id, checkpoint, serializedCheckpoint))
override fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializationContext : CheckpointSerializationContext) {
currentDBSession().update(createDBCheckpoint(id, checkpoint, checkpoint.checkpointSerialize(context = serializationContext)))
}
override fun removeCheckpoint(id: StateMachineRunId): Boolean {

View File

@ -100,13 +100,14 @@ class ActionExecutorImpl(
@Suspendable
private fun executePersistCheckpoint(action: Action.PersistCheckpoint) {
val checkpointBytes = serializeCheckpoint(action.checkpoint)
if (action.isCheckpointUpdate) {
checkpointStorage.updateCheckpoint(action.id, action.checkpoint, checkpointBytes)
checkpointStorage.updateCheckpoint(action.id, action.checkpoint,checkpointSerializationContext)
} else {
checkpointStorage.addCheckpoint(action.id, action.checkpoint, checkpointBytes)
checkpointStorage.addCheckpoint(action.id, action.checkpoint, checkpointSerializationContext)
}
checkpointingMeter.mark()
// We should clean this up, when we move all of the serialization into DBCheckpointStorage
val checkpointBytes = serializeCheckpoint(action.checkpoint)
checkpointSizesThisSecond.update(checkpointBytes.size.toLong())
var lastUpdateTime = lastBandwidthUpdate.get()
while (System.nanoTime() - lastUpdateTime > TimeUnit.SECONDS.toNanos(1)) {

View File

@ -49,6 +49,8 @@ class DBCheckpointStorageTests {
private lateinit var checkpointStorage: DBCheckpointStorage
private lateinit var database: CordaPersistence
private val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT
@Before
fun setUp() {
LogHelper.setLevel(PersistentUniquenessProvider::class)
@ -65,9 +67,9 @@ class DBCheckpointStorageTests {
@Test(timeout=300_000)
fun `add new checkpoint`() {
val (id, checkpoint) = newCheckpoint()
val serializedCheckpoint = checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedCheckpoint = checkpoint.checkpointSerialize(context = checkpointSerializationContext)
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedCheckpoint)
checkpointStorage.addCheckpoint(id, checkpoint, checkpointSerializationContext)
}
database.transaction {
assertThat(checkpointStorage.checkpoints()).containsExactly(serializedCheckpoint)
@ -81,9 +83,8 @@ class DBCheckpointStorageTests {
@Test(timeout=300_000)
fun `remove checkpoint`() {
val (id, checkpoint) = newCheckpoint()
val serializedCheckpoint = checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedCheckpoint)
checkpointStorage.addCheckpoint(id, checkpoint, checkpointSerializationContext)
}
database.transaction {
checkpointStorage.removeCheckpoint(id)
@ -100,12 +101,12 @@ class DBCheckpointStorageTests {
@Test(timeout=300_000)
fun `add and remove checkpoint in single commit operate`() {
val (id, checkpoint) = newCheckpoint()
val serializedCheckpoint = checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedCheckpoint = checkpoint.checkpointSerialize(context = checkpointSerializationContext)
val (id2, checkpoint2) = newCheckpoint()
val serializedCheckpoint2 = checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedCheckpoint2 = checkpoint.checkpointSerialize(context = checkpointSerializationContext)
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializedCheckpoint)
checkpointStorage.addCheckpoint(id2, checkpoint2, serializedCheckpoint2)
checkpointStorage.addCheckpoint(id, checkpoint, checkpointSerializationContext)
checkpointStorage.addCheckpoint(id2, checkpoint2, checkpointSerializationContext)
checkpointStorage.removeCheckpoint(id)
}
database.transaction {
@ -120,15 +121,15 @@ class DBCheckpointStorageTests {
@Test(timeout=300_000)
fun `add two checkpoints then remove first one`() {
val (id, firstCheckpoint) = newCheckpoint()
val serializedFirstCheckpoint = firstCheckpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedFirstCheckpoint = firstCheckpoint.checkpointSerialize(context = checkpointSerializationContext)
database.transaction {
checkpointStorage.addCheckpoint(id, firstCheckpoint, serializedFirstCheckpoint)
checkpointStorage.addCheckpoint(id, firstCheckpoint, checkpointSerializationContext)
}
val (id2, secondCheckpoint) = newCheckpoint()
val serializedSecondCheckpoint = secondCheckpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedSecondCheckpoint = secondCheckpoint.checkpointSerialize(context = checkpointSerializationContext)
database.transaction {
checkpointStorage.addCheckpoint(id2, secondCheckpoint, serializedSecondCheckpoint)
checkpointStorage.addCheckpoint(id2, secondCheckpoint, checkpointSerializationContext)
}
database.transaction {
checkpointStorage.removeCheckpoint(id)
@ -145,9 +146,9 @@ class DBCheckpointStorageTests {
@Test(timeout=300_000)
fun `add checkpoint and then remove after 'restart'`() {
val (id, originalCheckpoint) = newCheckpoint()
val serializedOriginalCheckpoint = originalCheckpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val serializedOriginalCheckpoint = originalCheckpoint.checkpointSerialize(context = checkpointSerializationContext)
database.transaction {
checkpointStorage.addCheckpoint(id, originalCheckpoint, serializedOriginalCheckpoint)
checkpointStorage.addCheckpoint(id, originalCheckpoint, checkpointSerializationContext)
}
newCheckpointStorage()
val reconstructedCheckpoint = database.transaction {
@ -169,8 +170,7 @@ class DBCheckpointStorageTests {
val mockServices = MockServices(emptyList(), ALICE.name)
database.transaction {
val (id, checkpoint) = newCheckpoint(1)
val serializedCheckpoint = checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.addCheckpoint(id, checkpoint, serializedCheckpoint)
checkpointStorage.addCheckpoint(id, checkpoint, checkpointSerializationContext)
}
database.transaction {
@ -179,8 +179,7 @@ class DBCheckpointStorageTests {
database.transaction {
val (id1, checkpoint1) = newCheckpoint(2)
val serializedCheckpoint1 = checkpoint1.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
checkpointStorage.addCheckpoint(id1, checkpoint1, serializedCheckpoint1)
checkpointStorage.addCheckpoint(id1, checkpoint1, checkpointSerializationContext)
}
assertThatThrownBy {

View File

@ -54,6 +54,7 @@ class CheckpointDumperImplTest {
private val baseDirectory = Files.createTempDirectory("CheckpointDumperTest")
private val file = baseDirectory / NodeStartup.LOGS_DIRECTORY_NAME /
"checkpoints_dump-${CheckpointDumperImpl.TIME_FORMATTER.format(currentTimestamp)}.zip"
private val checkpointSerializationContext = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT
private lateinit var database: CordaPersistence
private lateinit var services: ServiceHub
@ -104,7 +105,7 @@ class CheckpointDumperImplTest {
// add a checkpoint
val (id, checkpoint) = newCheckpoint()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializeCheckpoint(checkpoint))
checkpointStorage.addCheckpoint(id, checkpoint, checkpointSerializationContext)
}
dumper.dumpCheckpoints()
@ -130,7 +131,7 @@ class CheckpointDumperImplTest {
// add a checkpoint
val (id, checkpoint) = newCheckpoint()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint, serializeCheckpoint(checkpoint))
checkpointStorage.addCheckpoint(id, checkpoint, checkpointSerializationContext)
}
dumper.dumpCheckpoints()