CORDA-3597 Replace old Checkpoint table with new one. (#5992)

* Replace old Checkpoint table with new one.

Adds some of the new fields into the table where needed (I have
guessed this stuff but we can update it as we go along).

* Fix database constraints + name table correctly opps.

* Fixed typos in Liquidbase script

Also corrected constraints and added missed fields in hibernate
checkpoint class and liquibase scripts.

* Update CheckpointStorage to pass in serialization context.

This is cleaner than passing both the checkpoint and the
serialized checkpoint into the methods. Also fixed
CordaPersistanceServiceTests which I accidentally broke.

* Fix detekt problem

* Revert "Update CheckpointStorage to pass in serialization context."

This reverts commit b71e78f20274ab0f5b3cf3fda1451ae2bd7a6797.

* Fix test broken by reverting commit

* CORDA-3597 Update metadata join, timestamp columns and serialization

- Change the metadata join to the checkpoints table to use
  `invocation_id` instead of `flow_id`. There were issues joining
  between the tables because `flow_id` was not the primary key of the
  metadata table. Switching over to `invocation_id` has at least allowed
  us to bypass this issue. The information about the `invocation_id` is
  stored in the `Checkpoint` class which makes it simple to save at
  runtime.
- Some of timestamp columns were nullable when they should always be
  populated, the nullable flags have now been removed.
- Previously the whole checkpoint was being serialized and stored into
  the `checkpoints_blob.checkpoint` column. This meant duplicated saving
  as the `flow_state` was contained in this object. Only the
  `CheckpointState` property of `Checkpoint` is now being serialized and
  saved to this field. Furthermore, it now uses the default
  `STORAGE_CONTEXT` serialization (AMQP) instead of Kryo (which is only
  used for serializing the `flow_state` / flow stack).
- The checkpoint database performance metrics recording has been
  abstracted to its own class.

* CORDA-3597 Make metadata join non optional

Remove the nullable declaration on the metadata field of
`DBFlowCheckpoint`

* CORDA-3597 Rename `node_checkpoints_blobs` to `node_checkpoint_blobs`

* CORDA-3597 Update some kdocs

Co-authored-by: Dan Newton <danknewton@hotmail.com>
This commit is contained in:
williamvigorr3 2020-03-02 10:04:48 +00:00 committed by GitHub
parent 9d4d128f4e
commit ab000e0533
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 785 additions and 277 deletions

View File

@ -47,7 +47,7 @@ class MissingSchemaMigrationTest {
fun `test that an error is thrown when forceThrowOnMissingMigration is set and a mapped schema is missing a migration`() {
assertThatThrownBy {
createSchemaMigration(setOf(GoodSchema), true)
.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })
.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
}.isInstanceOf(MissingMigrationException::class.java)
}
@ -55,7 +55,7 @@ class MissingSchemaMigrationTest {
fun `test that an error is not thrown when forceThrowOnMissingMigration is not set and a mapped schema is missing a migration`() {
assertDoesNotThrow {
createSchemaMigration(setOf(GoodSchema), false)
.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })
.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
}
}
@ -64,7 +64,7 @@ class MissingSchemaMigrationTest {
assertDoesNotThrow("This test failure indicates " +
"a new table has been added to the node without the appropriate migration scripts being present") {
createSchemaMigration(NodeSchemaService().internalSchemas(), false)
.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })
.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
}
}

View File

@ -3,11 +3,17 @@ package net.corda.node.services.persistence
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService
import net.corda.core.node.services.vault.SessionScope
import net.corda.core.serialization.SingletonSerializeAsToken
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.Checkpoint.FlowStatus
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
@ -15,6 +21,8 @@ import net.corda.testing.driver.internal.incrementalPortAllocation
import net.corda.testing.node.internal.enclosedCordapp
import org.junit.Test
import java.sql.DriverManager
import java.time.Instant
import java.util.UUID
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@ -50,16 +58,52 @@ class CordaPersistenceServiceTests {
@CordaService
class MultiThreadedDbLoader(private val services: AppServiceHub) : SingletonSerializeAsToken() {
fun createObjects(count: Int) : Int {
fun createObjects(count: Int): Int {
(1..count).toList().parallelStream().forEach {
val now = Instant.now()
services.database.transaction {
session.save(DBCheckpointStorage.DBCheckpoint().apply {
checkpointId = it.toString()
})
session.save(
DBCheckpointStorage.DBFlowCheckpoint(
id = it.toString(),
blob = DBCheckpointStorage.DBFlowCheckpointBlob(
checkpoint = ByteArray(8192),
flowStack = ByteArray(8192),
hmac = ByteArray(16),
persistedInstant = now
),
result = DBCheckpointStorage.DBFlowResult(value = ByteArray(16), persistedInstant = now),
exceptionDetails = null,
status = FlowStatus.RUNNABLE,
compatible = false,
progressStep = "",
ioRequestType = FlowIORequest.ForceCheckpoint.javaClass,
checkpointInstant = Instant.now(),
flowMetadata = createMetadataRecord(UUID.randomUUID(), now)
)
)
}
}
return count
}
private fun SessionScope.createMetadataRecord(invocationId: UUID, timestamp: Instant): DBCheckpointStorage.DBFlowMetadata {
val metadata = DBCheckpointStorage.DBFlowMetadata(
invocationId = invocationId.toString(),
flowId = null,
flowName = "random.flow",
userSuppliedIdentifier = null,
startType = DBCheckpointStorage.StartReason.RPC,
launchingCordapp = "this cordapp",
platformVersion = PLATFORM_VERSION,
rpcUsername = "Batman",
invocationInstant = Instant.now(),
receivedInstant = Instant.now(),
startInstant = timestamp,
finishInstant = null
)
session.save(metadata)
return metadata
}
}
}
}

View File

@ -109,6 +109,7 @@ import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.node.services.persistence.AbstractPartyDescriptor
import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter
import net.corda.node.services.persistence.AttachmentStorageInternal
import net.corda.node.services.persistence.DBCheckpointPerformanceRecorder
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.persistence.DBTransactionMappingStorage
import net.corda.node.services.persistence.DBTransactionStorage
@ -251,7 +252,6 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
}
val networkMapCache = PersistentNetworkMapCache(cacheFactory, database, identityService).tokenize()
val checkpointStorage = DBCheckpointStorage()
@Suppress("LeakingThis")
val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize()
val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, versionInfo) }
@ -318,6 +318,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
@Suppress("LeakingThis")
protected val network: MessagingService = makeMessagingService().tokenize()
val services = ServiceHubInternalImpl().tokenize()
val checkpointStorage = DBCheckpointStorage(DBCheckpointPerformanceRecorder(services.monitoringService.metrics))
@Suppress("LeakingThis")
val smm = makeStateMachineManager()
val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory)
@ -1276,7 +1277,7 @@ fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfi
try {
val dataSource = DataSourceFactory.createDataSource(hikariProperties, metricRegistry = metricRegistry)
val schemaMigration = SchemaMigration(schemas, dataSource, databaseConfig, cordappLoader, currentDir, ourName)
schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L })
schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L })
start(dataSource)
} catch (ex: Exception) {
when {

View File

@ -39,7 +39,7 @@ object CheckpointVerifier {
checkpointStorage.getAllCheckpoints().use {
it.forEach { (_, serializedCheckpoint) ->
val checkpoint = try {
serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
serializedCheckpoint.deserialize(checkpointSerializationContext)
} catch (e: ClassNotFoundException) {
val message = e.message
if (message != null) {

View File

@ -3,7 +3,7 @@ package net.corda.node.services.api
import net.corda.core.flows.StateMachineRunId
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.statemachine.Checkpoint
import java.sql.Connection
import net.corda.node.services.statemachine.FlowState
import java.util.stream.Stream
/**
@ -13,12 +13,12 @@ interface CheckpointStorage {
/**
* Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id
*/
fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>)
fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>)
/**
* Update an existing checkpoint. Will throw if there is not checkpoint for this id.
*/
fun updateCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>)
fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>)
/**
* Remove existing checkpoint from the store.
@ -28,21 +28,17 @@ interface CheckpointStorage {
/**
* Load an existing checkpoint from the store.
* @return the checkpoint, still in serialized form, or null if not found.
*
* The checkpoint returned from this function will be a _clean_ checkpoint. No error information is loaded into the checkpoint
* even if the previous status of the checkpoint was [Checkpoint.FlowStatus.FAILED] or [Checkpoint.FlowStatus.HOSPITALIZED].
*
* @return The checkpoint, in a partially serialized form, or null if not found.
*/
fun getCheckpoint(id: StateMachineRunId): SerializedBytes<Checkpoint>?
fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized?
/**
* Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the
* underlying database connection is closed, so any processing should happen before it is closed.
*/
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, SerializedBytes<Checkpoint>>>
/**
* This needs to run before Hibernate is initialised.
*
* @param connection The SQL Connection.
* @return the number of checkpoints stored in the database.
*/
fun getCheckpointCount(connection: Connection): Long
fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>>
}

View File

@ -0,0 +1,60 @@
package net.corda.node.services.persistence
import com.codahale.metrics.Gauge
import com.codahale.metrics.Histogram
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.Reservoir
import com.codahale.metrics.SlidingTimeWindowArrayReservoir
import com.codahale.metrics.SlidingTimeWindowReservoir
import net.corda.core.serialization.SerializedBytes
import net.corda.node.services.statemachine.CheckpointState
import net.corda.node.services.statemachine.FlowState
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
interface CheckpointPerformanceRecorder {
/**
* Record performance metrics regarding the serialized size of [CheckpointState] and [FlowState]
*/
fun record(serializedCheckpointState: SerializedBytes<CheckpointState>, serializedFlowState: SerializedBytes<FlowState>)
}
class DBCheckpointPerformanceRecorder(metrics: MetricRegistry) : CheckpointPerformanceRecorder {
private val checkpointingMeter = metrics.meter("Flows.Checkpointing Rate")
private val checkpointSizesThisSecond = SlidingTimeWindowReservoir(1, TimeUnit.SECONDS)
private val lastBandwidthUpdate = AtomicLong(0)
private val checkpointBandwidthHist = metrics.register(
"Flows.CheckpointVolumeBytesPerSecondHist", Histogram(
SlidingTimeWindowArrayReservoir(1, TimeUnit.DAYS)
)
)
private val checkpointBandwidth = metrics.register(
"Flows.CheckpointVolumeBytesPerSecondCurrent",
LatchedGauge(checkpointSizesThisSecond)
)
/**
* This [Gauge] just reports the sum of the bytes checkpointed during the last second.
*/
private class LatchedGauge(private val reservoir: Reservoir) : Gauge<Long> {
override fun getValue(): Long {
return reservoir.snapshot.values.sum()
}
}
override fun record(serializedCheckpointState: SerializedBytes<CheckpointState>, serializedFlowState: SerializedBytes<FlowState>) {
val totalSize = serializedCheckpointState.size.toLong() + serializedFlowState.size.toLong()
checkpointingMeter.mark()
checkpointSizesThisSecond.update(totalSize)
var lastUpdateTime = lastBandwidthUpdate.get()
while (System.nanoTime() - lastUpdateTime > TimeUnit.SECONDS.toNanos(1)) {
if (lastBandwidthUpdate.compareAndSet(lastUpdateTime, System.nanoTime())) {
val checkpointVolume = checkpointSizesThisSecond.snapshot.values.sum()
checkpointBandwidthHist.update(checkpointVolume)
}
lastUpdateTime = lastBandwidthUpdate.get()
}
}
}

View File

@ -2,245 +2,392 @@ package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.debug
import net.corda.core.serialization.serialize
import net.corda.core.utilities.contextLogger
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.Checkpoint.FlowStatus
import net.corda.node.services.statemachine.CheckpointState
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.FlowState
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
import net.corda.nodeapi.internal.persistence.currentDBSession
import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY
import org.hibernate.annotations.Type
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.*
import java.util.stream.Stream
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.Id
import org.hibernate.annotations.Type
import java.lang.Exception
import java.math.BigInteger
import java.sql.Connection
import java.sql.SQLException
import java.time.Instant
import java.util.UUID
import java.util.stream.Stream
import javax.persistence.CascadeType
import javax.persistence.Column
import javax.persistence.Entity
import javax.persistence.FetchType
import javax.persistence.GeneratedValue
import javax.persistence.GenerationType
import javax.persistence.Id
import javax.persistence.JoinColumn
import javax.persistence.OneToOne
/**
* Simple checkpoint key value storage in DB.
*/
class DBCheckpointStorage : CheckpointStorage {
val log: Logger = LoggerFactory.getLogger(this::class.java)
class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointPerformanceRecorder) : CheckpointStorage {
companion object {
val log = contextLogger()
private const val HMAC_SIZE_BYTES = 16
/**
* This needs to run before Hibernate is initialised.
*
* No need to set up [DBCheckpointStorage] fully for this function
*
* @param connection The SQL Connection.
* @return the number of checkpoints stored in the database.
*/
fun getCheckpointCount(connection: Connection): Long {
// No need to set up [DBCheckpointStorage] fully for this function
return try {
connection.prepareStatement("select count(*) from node_checkpoints").use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
} catch (e: SQLException) {
// Happens when the table was not created yet.
0L
}
}
}
enum class StartReason {
RPC, FLOW, SERVICE, SCHEDULED, INITIATED
}
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_new")
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints")
class DBFlowCheckpoint(
@Id
@Column(name = "flow_id", length = 64, nullable = false)
var id: String,
@Id
@Column(name = "flow_id", length = 64, nullable = false)
var id: String,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id")
var blob: DBFlowCheckpointBlob,
@OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true)
@JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id")
var blob: DBFlowCheckpointBlob,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "result_id", referencedColumnName = "id")
var result: DBFlowResult?,
@OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true)
@JoinColumn(name = "result_id", referencedColumnName = "id")
var result: DBFlowResult?,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "error_id", referencedColumnName = "id")
var exceptionDetails: DBFlowException?,
@OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true)
@JoinColumn(name = "error_id", referencedColumnName = "id")
var exceptionDetails: DBFlowException?,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "flow_id", referencedColumnName = "flow_id")
var flowMetadata: DBFlowMetadata,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "invocation_id", referencedColumnName = "invocation_id")
var flowMetadata: DBFlowMetadata,
@Column(name = "status")
var status: FlowStatus,
@Column(name = "status", nullable = false)
var status: FlowStatus,
@Column(name = "compatible")
var compatible: Boolean,
@Column(name = "compatible", nullable = false)
var compatible: Boolean,
@Column(name = "progress_step")
var progressStep: String,
@Column(name = "progress_step")
var progressStep: String?,
@Column(name = "flow_io_request")
var ioRequestType: Class<FlowIORequest<*>>,
@Column(name = "flow_io_request")
var ioRequestType: Class<out FlowIORequest<*>>?,
@Column(name = "timestamp")
var checkpointInstant: Instant
@Column(name = "timestamp", nullable = false)
var checkpointInstant: Instant
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs")
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoint_blobs")
class DBFlowCheckpointBlob(
@Id
@Column(name = "id", nullable = false)
var id: BigInteger? = null,
@Id
@GeneratedValue(strategy = GenerationType.SEQUENCE)
@Column(name = "id", nullable = false)
private var id: Long = 0,
@Type(type = "corda-blob")
@Column(name = "checkpoint_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY,
@Type(type = "corda-blob")
@Column(name = "checkpoint_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY,
@Type(type = "corda-blob")
@Column(name = "flow_state", nullable = false)
var flowStack: ByteArray = EMPTY_BYTE_ARRAY,
// A future task will make this nullable
@Type(type = "corda-blob")
@Column(name = "flow_state", nullable = false)
var flowStack: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "timestamp")
var persistedInstant: Instant? = null
@Column(name = "hmac")
var hmac: ByteArray,
@Column(name = "timestamp")
var persistedInstant: Instant
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_results")
class DBFlowResult(
@Id
@Column(name = "id", nullable = false)
var id: BigInteger? = null,
@Id
@Column(name = "id", nullable = false)
@GeneratedValue(strategy = GenerationType.SEQUENCE)
private var id: Long = 0,
@Type(type = "corda-blob")
@Column(name = "result_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY,
@Type(type = "corda-blob")
@Column(name = "result_value", nullable = false)
var value: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "timestamp")
val persistedInstant: Instant? = null
@Column(name = "timestamp")
val persistedInstant: Instant
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_exceptions")
class DBFlowException(
@Id
@Column(name = "id", nullable = false)
var id: BigInteger? = null,
@Id
@Column(name = "id", nullable = false)
@GeneratedValue(strategy = GenerationType.SEQUENCE)
private var id: Long = 0,
@Column(name = "type", nullable = false)
var type: Class<Exception>,
@Column(name = "type", nullable = false)
var type: Class<out Throwable>,
@Type(type = "corda-blob")
@Column(name = "exception_value", nullable = false)
var value: ByteArray = EMPTY_BYTE_ARRAY,
@Type(type = "corda-blob")
@Column(name = "exception_value", nullable = false)
var value: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "exception_message")
var message: String? = null,
@Column(name = "exception_message")
var message: String? = null,
@Column(name = "timestamp")
val persistedInstant: Instant? = null
@Column(name = "timestamp")
val persistedInstant: Instant
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_metadata")
class DBFlowMetadata(
@Id
@Column(name = "flow_id", length = 64, nullable = false)
var flowId: String,
@Id
@Column(name = "invocation_id", nullable = false)
var invocationId: String,
@Column(name = "flow_name", nullable = false)
var flowName: String,
@Column(name = "flow_id", nullable = true)
var flowId: String?,
@Column(name = "flow_identifier", nullable = true)
var userSuppliedIdentifier: String?,
@Column(name = "flow_name", nullable = false)
var flowName: String,
@Column(name = "started_type", nullable = false)
var startType: StartReason,
@Column(name = "flow_identifier", nullable = true)
var userSuppliedIdentifier: String?,
@Column(name = "flow_parameters", nullable = false)
var initialParameters: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "started_type", nullable = false)
var startType: StartReason,
@Column(name = "cordapp_name", nullable = false)
var launchingCordapp: String,
@Column(name = "flow_parameters", nullable = false)
var initialParameters: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "platform_version", nullable = false)
var platformVersion: Int,
@Column(name = "cordapp_name", nullable = false)
var launchingCordapp: String,
@Column(name = "rpc_user", nullable = false)
var rpcUsername: String,
@Column(name = "platform_version", nullable = false)
var platformVersion: Int,
@Column(name = "invocation_time", nullable = false)
var invocationInstant: Instant,
@Column(name = "rpc_user", nullable = false)
var rpcUsername: String,
@Column(name = "received_time", nullable = false)
var receivedInstant: Instant,
@Column(name = "invocation_time", nullable = false)
var invocationInstant: Instant,
@Column(name = "start_time", nullable = true)
var startInstant: Instant?,
@Column(name = "received_time", nullable = false)
var receivedInstant: Instant,
@Column(name = "finish_time", nullable = true)
var finishInstant: Instant?
@Column(name = "start_time", nullable = true)
var startInstant: Instant?,
@Column(name = "finish_time", nullable = true)
var finishInstant: Instant?
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints")
class DBCheckpoint(
@Id
@Suppress("MagicNumber") // database column width
@Column(name = "checkpoint_id", length = 64, nullable = false)
var checkpointId: String = "",
@Type(type = "corda-blob")
@Column(name = "checkpoint_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY
) {
override fun toString() = "DBCheckpoint(checkpointId = ${checkpointId}, checkpointSize = ${checkpoint.size})"
override fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>) {
currentDBSession().save(createDBCheckpoint(id, checkpoint, serializedFlowState))
}
override fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>) {
currentDBSession().save(DBCheckpoint().apply {
checkpointId = id.uuid.toString()
this.checkpoint = checkpoint.bytes
log.debug { "Checkpoint $checkpointId, size=${this.checkpoint.size}" }
})
}
override fun updateCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes<Checkpoint>) {
currentDBSession().update(DBCheckpoint().apply {
checkpointId = id.uuid.toString()
this.checkpoint = checkpoint.bytes
log.debug { "Checkpoint $checkpointId, size=${this.checkpoint.size}" }
})
override fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes<FlowState>) {
currentDBSession().update(updateDBCheckpoint(id, checkpoint, serializedFlowState))
}
override fun removeCheckpoint(id: StateMachineRunId): Boolean {
val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder
val delete = criteriaBuilder.createCriteriaDelete(DBCheckpoint::class.java)
val root = delete.from(DBCheckpoint::class.java)
delete.where(criteriaBuilder.equal(root.get<String>(DBCheckpoint::checkpointId.name), id.uuid.toString()))
val delete = criteriaBuilder.createCriteriaDelete(DBFlowCheckpoint::class.java)
val root = delete.from(DBFlowCheckpoint::class.java)
delete.where(criteriaBuilder.equal(root.get<String>(DBFlowCheckpoint::id.name), id.uuid.toString()))
return session.createQuery(delete).executeUpdate() > 0
}
override fun getCheckpoint(id: StateMachineRunId): SerializedBytes<Checkpoint>? {
val bytes = currentDBSession().get(DBCheckpoint::class.java, id.uuid.toString())?.checkpoint ?: return null
return SerializedBytes(bytes)
override fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? {
return currentDBSession().get(DBFlowCheckpoint::class.java, id.uuid.toString())?.toSerializedCheckpoint()
}
override fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, SerializedBytes<Checkpoint>>> {
override fun getAllCheckpoints(): Stream<Pair<StateMachineRunId, Checkpoint.Serialized>> {
val session = currentDBSession()
val criteriaQuery = session.criteriaBuilder.createQuery(DBCheckpoint::class.java)
val root = criteriaQuery.from(DBCheckpoint::class.java)
val criteriaQuery = session.criteriaBuilder.createQuery(DBFlowCheckpoint::class.java)
val root = criteriaQuery.from(DBFlowCheckpoint::class.java)
criteriaQuery.select(root)
return session.createQuery(criteriaQuery).stream().map {
StateMachineRunId(UUID.fromString(it.checkpointId)) to SerializedBytes<Checkpoint>(it.checkpoint)
StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint()
}
}
override fun getCheckpointCount(connection: Connection): Long {
return try {
connection.prepareStatement("select count(*) from node_checkpoints").use { ps ->
ps.executeQuery().use { rs ->
rs.next()
rs.getLong(1)
}
}
} catch (e: SQLException) {
// Happens when the table was not created yet.
0L
private fun createDBCheckpoint(
id: StateMachineRunId,
checkpoint: Checkpoint,
serializedFlowState: SerializedBytes<FlowState>
): DBFlowCheckpoint {
val flowId = id.uuid.toString()
val now = Instant.now()
val invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value
val serializedCheckpointState = checkpoint.checkpointState.storageSerialize()
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now)
// Need to update the metadata record to join it to the main checkpoint record
// This code needs to be added back in once the metadata record is properly created (remove the code below it)
// val metadata = requireNotNull(currentDBSession().find(
// DBFlowMetadata::class.java,
// invocationId
// )) { "The flow metadata record for flow [$flowId] with invocation id [$invocationId] does not exist"}
val metadata = (currentDBSession().find(
DBFlowMetadata::class.java,
invocationId
)) ?: createTemporaryMetadata(checkpoint)
metadata.flowId = flowId
currentDBSession().update(metadata)
// Most fields are null as they cannot have been set when creating the initial checkpoint
return DBFlowCheckpoint(
id = flowId,
blob = blob,
result = null,
exceptionDetails = null,
flowMetadata = metadata,
status = checkpoint.status,
compatible = checkpoint.compatible,
progressStep = null,
ioRequestType = null,
checkpointInstant = Instant.now()
)
}
// Remove this when saving of metadata is properly handled
private fun createTemporaryMetadata(checkpoint: Checkpoint): DBFlowMetadata {
return DBFlowMetadata(
invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value,
flowId = null,
flowName = "random.flow",
userSuppliedIdentifier = null,
startType = DBCheckpointStorage.StartReason.RPC,
launchingCordapp = "this cordapp",
platformVersion = PLATFORM_VERSION,
rpcUsername = "Batman",
invocationInstant = checkpoint.checkpointState.invocationContext.trace.invocationId.timestamp,
receivedInstant = Instant.now(),
startInstant = null,
finishInstant = null
).apply {
currentDBSession().save(this)
}
}
private fun updateDBCheckpoint(
id: StateMachineRunId,
checkpoint: Checkpoint,
serializedFlowState: SerializedBytes<FlowState>
): DBFlowCheckpoint {
val flowId = id.uuid.toString()
val now = Instant.now()
val serializedCheckpointState = checkpoint.checkpointState.storageSerialize()
checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState)
val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now)
val result = checkpoint.result?.let { createDBFlowResult(it, now) }
val exceptionDetails = (checkpoint.errorState as? ErrorState.Errored)?.let { createDBFlowException(it, now) }
// Load the previous entity from the hibernate cache so the meta data join does not get updated
val entity = currentDBSession().find(DBFlowCheckpoint::class.java, flowId)
return entity.apply {
this.blob = blob
this.result = result
this.exceptionDetails = exceptionDetails
// Do not update the meta data relationship on updates
this.flowMetadata = entity.flowMetadata
this.status = checkpoint.status
this.compatible = checkpoint.compatible
this.progressStep = checkpoint.progressStep
this.ioRequestType = checkpoint.flowIoRequest
this.checkpointInstant = now
}
}
private fun createDBCheckpointBlob(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>,
now: Instant
): DBFlowCheckpointBlob {
return DBFlowCheckpointBlob(
checkpoint = serializedCheckpointState.bytes,
flowStack = serializedFlowState.bytes,
hmac = ByteArray(HMAC_SIZE_BYTES),
persistedInstant = now
)
}
private fun createDBFlowResult(result: Any, now: Instant): DBFlowResult {
return DBFlowResult(
value = result.storageSerialize().bytes,
persistedInstant = now
)
}
private fun createDBFlowException(errorState: ErrorState.Errored, now: Instant): DBFlowException {
return errorState.errors.last().exception.let {
DBFlowException(
type = it::class.java,
message = it.message,
value = it.storageSerialize().bytes,
persistedInstant = now
)
}
}
private fun DBFlowCheckpoint.toSerializedCheckpoint(): Checkpoint.Serialized {
return Checkpoint.Serialized(
serializedCheckpointState = SerializedBytes(blob.checkpoint),
serializedFlowState = SerializedBytes(blob.flowStack),
// Always load as a [Clean] checkpoint to represent that the checkpoint is the last _good_ checkpoint
errorState = ErrorState.Clean,
// A checkpoint with a result should not normally be loaded (it should be [null] most of the time)
result = result?.let { SerializedBytes<Any>(it.value) },
status = status,
progressStep = progressStep,
flowIoRequest = ioRequestType,
compatible = compatible
)
}
private fun <T : Any> T.storageSerialize(): SerializedBytes<T> {
return serialize(context = SerializationDefaults.STORAGE_CONTEXT)
}
}

View File

@ -149,8 +149,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
instrumentCheckpointAgent(runId)
val (bytes, fileName) = try {
val checkpoint =
serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext)
val checkpoint = serialisedCheckpoint.deserialize(checkpointSerializationContext)
val json = checkpoint.toJson(runId.uuid, now)
val jsonBytes = writer.writeValueAsBytes(json)
jsonBytes to "${json.topLevelFlowClass.simpleName}-${runId.uuid}.json"
@ -229,6 +228,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri
origin = checkpointState.invocationContext.origin.toOrigin(),
ourIdentity = checkpointState.ourIdentity,
activeSessions = checkpointState.sessions.mapNotNull { it.value.toActiveSession(it.key) },
// This can only ever return as [ErrorState.Clean] which causes it to become [null]
errored = errorState as? ErrorState.Errored
)
}

View File

@ -33,10 +33,7 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
object NodeCore
object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1,
mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java,
//new checkpoints - keeping old around to allow testing easily (for now)
DBCheckpointStorage.DBFlowCheckpoint::class.java,
mappedTypes = listOf(DBCheckpointStorage.DBFlowCheckpoint::class.java,
DBCheckpointStorage.DBFlowCheckpointBlob::class.java,
DBCheckpointStorage.DBFlowResult::class.java,
DBCheckpointStorage.DBFlowException::class.java,

View File

@ -31,8 +31,7 @@ class ActionExecutorImpl(
private val checkpointStorage: CheckpointStorage,
private val flowMessaging: FlowMessaging,
private val stateMachineManager: StateMachineManagerInternal,
private val checkpointSerializationContext: CheckpointSerializationContext,
metrics: MetricRegistry
private val checkpointSerializationContext: CheckpointSerializationContext
) : ActionExecutor {
private companion object {
@ -48,12 +47,6 @@ class ActionExecutorImpl(
}
}
private val checkpointingMeter = metrics.meter("Flows.Checkpointing Rate")
private val checkpointSizesThisSecond = SlidingTimeWindowReservoir(1, TimeUnit.SECONDS)
private val lastBandwidthUpdate = AtomicLong(0)
private val checkpointBandwidthHist = metrics.register("Flows.CheckpointVolumeBytesPerSecondHist", Histogram(SlidingTimeWindowArrayReservoir(1, TimeUnit.DAYS)))
private val checkpointBandwidth = metrics.register("Flows.CheckpointVolumeBytesPerSecondCurrent", LatchedGauge(checkpointSizesThisSecond))
@Suspendable
override fun executeAction(fiber: FlowFiber, action: Action) {
log.trace { "Flow ${fiber.id} executing $action" }
@ -100,21 +93,12 @@ class ActionExecutorImpl(
@Suspendable
private fun executePersistCheckpoint(action: Action.PersistCheckpoint) {
val checkpointBytes = serializeCheckpoint(action.checkpoint)
val checkpoint = action.checkpoint
val serializedFlowState = checkpoint.flowState.checkpointSerialize(checkpointSerializationContext)
if (action.isCheckpointUpdate) {
checkpointStorage.updateCheckpoint(action.id, checkpointBytes)
checkpointStorage.updateCheckpoint(action.id, checkpoint, serializedFlowState)
} else {
checkpointStorage.addCheckpoint(action.id, checkpointBytes)
}
checkpointingMeter.mark()
checkpointSizesThisSecond.update(checkpointBytes.size.toLong())
var lastUpdateTime = lastBandwidthUpdate.get()
while (System.nanoTime() - lastUpdateTime > TimeUnit.SECONDS.toNanos(1)) {
if (lastBandwidthUpdate.compareAndSet(lastUpdateTime, System.nanoTime())) {
val checkpointVolume = checkpointSizesThisSecond.snapshot.values.sum()
checkpointBandwidthHist.update(checkpointVolume)
}
lastUpdateTime = lastBandwidthUpdate.get()
checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState)
}
}
@ -258,10 +242,6 @@ class ActionExecutorImpl(
stateMachineManager.retryFlowFromSafePoint(action.currentState)
}
private fun serializeCheckpoint(checkpoint: Checkpoint): SerializedBytes<Checkpoint> {
return checkpoint.checkpointSerialize(context = checkpointSerializationContext)
}
private fun cancelFlowTimeout(action: Action.CancelFlowTimeout) {
stateMachineManager.cancelFlowTimeout(action.flowId)
}

View File

@ -590,7 +590,7 @@ class SingleThreadedStateMachineManager(
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
checkpointStorage.getCheckpoint(flowId)?.let { serializedCheckpoint ->
val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId)
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId)
if (checkpoint == null) {
return openFuture<FlowStateMachine<A>>().mapError {
IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " +
@ -758,14 +758,23 @@ class SingleThreadedStateMachineManager(
}
}
private fun tryDeserializeCheckpoint(serializedCheckpoint: Checkpoint.Serialized, flowId: StateMachineRunId): Checkpoint? {
return try {
serializedCheckpoint.deserialize(checkpointSerializationContext!!)
} catch (e: Exception) {
logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e)
null
}
}
private fun createFlowFromCheckpoint(
id: StateMachineRunId,
serializedCheckpoint: SerializedBytes<Checkpoint>,
serializedCheckpoint: Checkpoint.Serialized,
isAnyCheckpointPersisted: Boolean,
isStartIdempotent: Boolean,
initialDeduplicationHandler: DeduplicationHandler?
): Flow? {
val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, id) ?: return null
val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return null
val flowState = checkpoint.flowState
val resultFuture = openFuture<Any?>()
val fiber = when (flowState) {
@ -865,8 +874,7 @@ class SingleThreadedStateMachineManager(
checkpointStorage,
flowMessaging,
this,
checkpointSerializationContext,
metrics
checkpointSerializationContext
)
}

View File

@ -7,7 +7,12 @@ import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.CordaSerializable
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.internal.CheckpointSerializationContext
import net.corda.core.serialization.internal.checkpointDeserialize
import net.corda.core.utilities.Try
import net.corda.node.services.messaging.DeduplicationHandler
import java.time.Instant
@ -57,9 +62,10 @@ data class Checkpoint(
val result: Any? = null,
val status: FlowStatus = FlowStatus.RUNNABLE,
val progressStep: String? = null,
val flowIoRequest: FlowIORequest<*>? = null,
val flowIoRequest: Class<out FlowIORequest<*>>? = null,
val compatible: Boolean = true
) {
@CordaSerializable
enum class FlowStatus {
RUNNABLE,
FAILED,
@ -84,9 +90,15 @@ data class Checkpoint(
): Try<Checkpoint> {
return SubFlow.create(flowLogicClass, subFlowVersion, isEnabledTimedFlow).map { topLevelSubFlow ->
Checkpoint(
checkpointState = CheckpointState(invocationContext, ourIdentity, emptyMap(), listOf(topLevelSubFlow), numberOfSuspends = 0),
flowState = FlowState.Unstarted(flowStart, frozenFlowLogic),
errorState = ErrorState.Clean
checkpointState = CheckpointState(
invocationContext,
ourIdentity,
emptyMap(),
listOf(topLevelSubFlow),
numberOfSuspends = 0
),
errorState = ErrorState.Clean,
flowState = FlowState.Unstarted(flowStart, frozenFlowLogic)
)
}
}
@ -123,6 +135,41 @@ data class Checkpoint(
fun addSubflow(subFlow: SubFlow) : Checkpoint {
return copy(checkpointState = checkpointState.copy(subFlowStack = checkpointState.subFlowStack + subFlow))
}
/**
* A partially serialized form of [Checkpoint].
*
* [Checkpoint.Serialized] contains the same fields as [Checkpoint] except that some of its fields are still serialized. The checkpoint
* can then be deserialized as needed.
*/
data class Serialized(
val serializedCheckpointState: SerializedBytes<CheckpointState>,
val serializedFlowState: SerializedBytes<FlowState>,
val errorState: ErrorState,
val result: SerializedBytes<Any>?,
val status: FlowStatus,
val progressStep: String?,
val flowIoRequest: Class<out FlowIORequest<*>>?,
val compatible: Boolean
) {
/**
* Deserializes the serialized fields contained in [Checkpoint.Serialized].
*
* @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized]
*/
fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint {
return Checkpoint(
checkpointState = serializedCheckpointState.deserialize(context = SerializationDefaults.STORAGE_CONTEXT),
flowState = serializedFlowState.checkpointDeserialize(checkpointSerializationContext),
errorState = errorState,
result = result?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT),
status = status,
progressStep = progressStep,
flowIoRequest = flowIoRequest,
compatible = compatible
)
}
}
}
/**
@ -132,12 +179,13 @@ data class Checkpoint(
* @param subFlowStack the stack of currently executing subflows.
* @param numberOfSuspends the number of flow suspends due to IO API calls.
*/
@CordaSerializable
data class CheckpointState(
val invocationContext: InvocationContext,
val ourIdentity: Party,
val sessions: SessionMap, // This must preserve the insertion order!
val subFlowStack: List<SubFlow>,
val numberOfSuspends: Int
val invocationContext: InvocationContext,
val ourIdentity: Party,
val sessions: SessionMap, // This must preserve the insertion order!
val subFlowStack: List<SubFlow>,
val numberOfSuspends: Int
)
/**
@ -254,17 +302,20 @@ sealed class FlowState {
* @param exception the exception itself. Note that this may not contain information about the source error depending
* on whether the source error was a FlowException or otherwise.
*/
@CordaSerializable
data class FlowError(val errorId: Long, val exception: Throwable)
/**
* The flow's error state.
*/
@CordaSerializable
sealed class ErrorState {
abstract fun addErrors(newErrors: List<FlowError>): ErrorState
/**
* The flow is in a clean state.
*/
@CordaSerializable
object Clean : ErrorState() {
override fun addErrors(newErrors: List<FlowError>): ErrorState {
return Errored(newErrors, 0, false)
@ -281,6 +332,7 @@ sealed class ErrorState {
* @param propagating true if error propagation was triggered. If this is set the dirtiness is permanent as the
* sessions associated with the flow have been (or about to be) dirtied in counter-flows.
*/
@CordaSerializable
data class Errored(
val errors: List<FlowError>,
val propagatedIndex: Int,

View File

@ -5,30 +5,31 @@
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_new_checkpoint_schema_primary_keys">
<addPrimaryKey columnNames="flow_id" constraintName="node_checkpoints_pk" tableName="node_checkpoints_new"/>
<addPrimaryKey columnNames="flow_id" constraintName="node_checkpoints_pk" tableName="node_checkpoints"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_blobs_pk" tableName="node_checkpoint_blobs"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_exceptions_pk" tableName="node_flow_exceptions"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_results_pk" tableName="node_flow_results"/>
<addPrimaryKey columnNames="invocation_id" constraintName="node_flow_metadata_pk" tableName="node_flow_metadata"/>
</changeSet>
<changeSet author="R3.Corda" id="add_new_checkpoint_schema_foreign_keys">
<addForeignKeyConstraint baseColumnNames="checkpoint_blob_id" baseTableName="node_checkpoints_new"
constraintName="node_checkpoint_blob_id_to_blob_table_fk"
referencedColumnNames="id" referencedTableName="node_checkpoint_blobs"/>
<addForeignKeyConstraint baseColumnNames="error_id" baseTableName="node_checkpoints_new"
constraintName="node_checkpoints_to_exceptions_fk"
referencedColumnNames="id" referencedTableName="node_flow_exceptions"/>
<changeSet author="R3.Corda" id="add_new_checkpoint_schema_foreign_keys">
<addForeignKeyConstraint baseColumnNames="checkpoint_blob_id" baseTableName="node_checkpoints"
constraintName="node_checkpoint_blob_id_to_blob_table_fk"
referencedColumnNames="id" referencedTableName="node_checkpoint_blobs"/>
<addForeignKeyConstraint baseColumnNames="result_id" baseTableName="node_checkpoints_new"
constraintName="node_checkpoint_to_result_fk"
referencedColumnNames="id" referencedTableName="node_flow_results"/>
<addForeignKeyConstraint baseColumnNames="error_id" baseTableName="node_checkpoints"
constraintName="node_checkpoints_to_exceptions_fk"
referencedColumnNames="id" referencedTableName="node_flow_exceptions"/>
<addForeignKeyConstraint baseColumnNames="result_id" baseTableName="node_checkpoints"
constraintName="node_checkpoint_to_result_fk"
referencedColumnNames="id" referencedTableName="node_flow_results"/>
<addForeignKeyConstraint baseColumnNames="flow_id" baseTableName="node_flow_metadata"
constraintName="node_metadata_to_checkpoints_fk"
referencedColumnNames="flow_id" referencedTableName="node_checkpoints_new"/>
<addForeignKeyConstraint baseColumnNames="invocation_id" baseTableName="node_checkpoints"
constraintName="node_metadata_to_checkpoints_fk"
referencedColumnNames="invocation_id" referencedTableName="node_flow_metadata"/>
</changeSet>
</changeSet>
</databaseChangeLog>
</databaseChangeLog>

View File

@ -5,7 +5,8 @@
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_new_checkpoints_table-pg" dbms="postgresql">
<createTable tableName="node_checkpoints_new">
<dropTable tableName="node_checkpoints"></dropTable>
<createTable tableName="node_checkpoints">
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
@ -18,6 +19,9 @@
<column name="error_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="invocation_id" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="status" type="TINYINT">
<constraints nullable="false"/>
</column>
@ -84,7 +88,7 @@
<constraints nullable="false"/>
</column>
<column name="exception_message" type="NVARCHAR(512)">
<constraints nullable="false"/>
<constraints nullable="true"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
@ -94,7 +98,7 @@
<changeSet author="R3.Corda" id="add_new_flow_metadata_table-pg" dbms="postgresql">
<createTable tableName="node_flow_metadata">
<column name="invocation_id" type="BIGINT">
<column name="invocation_id" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="flow_id" type="NVARCHAR(64)">
@ -136,4 +140,4 @@
</createTable>
</changeSet>
</databaseChangeLog>
</databaseChangeLog>

View File

@ -5,7 +5,8 @@
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_new_checkpoints_table" dbms="!postgresql">
<createTable tableName="node_checkpoints_new">
<dropTable tableName="node_checkpoints"></dropTable>
<createTable tableName="node_checkpoints">
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
@ -18,6 +19,9 @@
<column name="error_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="invocation_id" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="status" type="TINYINT">
<constraints nullable="false"/>
</column>
@ -84,7 +88,7 @@
<constraints nullable="false"/>
</column>
<column name="exception_message" type="NVARCHAR(512)">
<constraints nullable="false"/>
<constraints nullable="true"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
@ -94,7 +98,7 @@
<changeSet author="R3.Corda" id="add_new_flow_metadata_table" dbms="!postgresql">
<createTable tableName="node_flow_metadata">
<column name="invocation_id" type="BIGINT">
<column name="invocation_id" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="flow_id" type="NVARCHAR(64)">
@ -136,4 +140,4 @@
</createTable>
</changeSet>
</databaseChangeLog>
</databaseChangeLog>

View File

@ -3,6 +3,7 @@ package net.corda.node.services.persistence
import net.corda.core.context.InvocationContext
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.internal.CheckpointSerializationDefaults
import net.corda.core.serialization.internal.checkpointSerialize
@ -10,11 +11,16 @@ import net.corda.node.internal.CheckpointIncompatibleException
import net.corda.node.internal.CheckpointVerifier
import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.CheckpointState
import net.corda.node.services.statemachine.ErrorState
import net.corda.node.services.statemachine.FlowError
import net.corda.node.services.statemachine.FlowStart
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.SubFlowVersion
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.nodeapi.internal.persistence.DatabaseTransaction
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.core.TestIdentity
@ -28,9 +34,15 @@ import org.junit.After
import org.junit.Before
import org.junit.Rule
import org.junit.Test
import java.lang.IllegalStateException
import java.time.Instant
import kotlin.streams.toList
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
internal fun CheckpointStorage.checkpoints(): List<SerializedBytes<Checkpoint>> {
internal fun CheckpointStorage.checkpoints(): List<Checkpoint.Serialized> {
return getAllCheckpoints().use {
it.map { it.second }.toList()
}
@ -61,26 +73,42 @@ class DBCheckpointStorageTests {
LogHelper.reset(PersistentUniquenessProvider::class)
}
@Test(timeout=300_000)
fun `add new checkpoint`() {
@Test(timeout = 300_000)
fun `add new checkpoint`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint)
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
database.transaction {
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint)
assertEquals(
checkpoint,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
)
}
newCheckpointStorage()
database.transaction {
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint)
assertEquals(
checkpoint,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
)
session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).also {
assertNotNull(it)
assertNotNull(it.blob)
}
}
}
@Test(timeout=300_000)
fun `remove checkpoint`() {
@Test(timeout = 300_000)
fun `remove checkpoint`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint)
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
database.transaction {
checkpointStorage.removeCheckpoint(id)
@ -94,58 +122,87 @@ class DBCheckpointStorageTests {
}
}
@Test(timeout=300_000)
fun `add and remove checkpoint in single commit operate`() {
@Test(timeout = 300_000)
fun `add and remove checkpoint in single commit operation`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val (id2, checkpoint2) = newCheckpoint()
val serializedFlowState2 =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint)
checkpointStorage.addCheckpoint(id2, checkpoint2)
createMetadataRecord(checkpoint)
createMetadataRecord(checkpoint2)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
checkpointStorage.addCheckpoint(id2, checkpoint2, serializedFlowState2)
checkpointStorage.removeCheckpoint(id)
}
database.transaction {
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint2)
assertEquals(
checkpoint2,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
)
}
newCheckpointStorage()
database.transaction {
assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint2)
assertEquals(
checkpoint2,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
)
}
}
@Test(timeout=300_000)
fun `add two checkpoints then remove first one`() {
@Test(timeout = 300_000)
fun `add two checkpoints then remove first one`() {
val (id, firstCheckpoint) = newCheckpoint()
val serializedFirstFlowState =
firstCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
checkpointStorage.addCheckpoint(id, firstCheckpoint)
createMetadataRecord(firstCheckpoint)
checkpointStorage.addCheckpoint(id, firstCheckpoint, serializedFirstFlowState)
}
val (id2, secondCheckpoint) = newCheckpoint()
val serializedSecondFlowState =
secondCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
checkpointStorage.addCheckpoint(id2, secondCheckpoint)
createMetadataRecord(secondCheckpoint)
checkpointStorage.addCheckpoint(id2, secondCheckpoint, serializedSecondFlowState)
}
database.transaction {
checkpointStorage.removeCheckpoint(id)
}
database.transaction {
assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint)
assertEquals(
secondCheckpoint,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
)
}
newCheckpointStorage()
database.transaction {
assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint)
assertEquals(
secondCheckpoint,
checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
)
}
}
@Test(timeout=300_000)
fun `add checkpoint and then remove after 'restart'`() {
@Test(timeout = 300_000)
fun `add checkpoint and then remove after 'restart'`() {
val (id, originalCheckpoint) = newCheckpoint()
val serializedOriginalFlowState =
originalCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
checkpointStorage.addCheckpoint(id, originalCheckpoint)
createMetadataRecord(originalCheckpoint)
checkpointStorage.addCheckpoint(id, originalCheckpoint, serializedOriginalFlowState)
}
newCheckpointStorage()
val reconstructedCheckpoint = database.transaction {
checkpointStorage.checkpoints().single()
}
database.transaction {
assertThat(reconstructedCheckpoint).isEqualTo(originalCheckpoint).isNotSameAs(originalCheckpoint)
assertEquals(originalCheckpoint, reconstructedCheckpoint.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT))
assertThat(reconstructedCheckpoint.serializedFlowState).isEqualTo(serializedOriginalFlowState)
.isNotSameAs(serializedOriginalFlowState)
}
database.transaction {
checkpointStorage.removeCheckpoint(id)
@ -155,12 +212,15 @@ class DBCheckpointStorageTests {
}
}
@Test(timeout=300_000)
fun `verify checkpoints compatible`() {
@Test(timeout = 300_000)
fun `verify checkpoints compatible`() {
val mockServices = MockServices(emptyList(), ALICE.name)
database.transaction {
val (id, checkpoint) = newCheckpoint(1)
checkpointStorage.addCheckpoint(id, checkpoint)
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
database.transaction {
@ -169,7 +229,10 @@ class DBCheckpointStorageTests {
database.transaction {
val (id1, checkpoint1) = newCheckpoint(2)
checkpointStorage.addCheckpoint(id1, checkpoint1)
val serializedFlowState1 =
checkpoint1.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
createMetadataRecord(checkpoint1)
checkpointStorage.addCheckpoint(id1, checkpoint1, serializedFlowState1)
}
assertThatThrownBy {
@ -179,21 +242,158 @@ class DBCheckpointStorageTests {
}.isInstanceOf(CheckpointIncompatibleException::class.java)
}
private fun newCheckpointStorage() {
@Test(timeout = 300_000)
fun `checkpoint can be recreated from database record`() {
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
checkpointStorage = DBCheckpointStorage()
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
database.transaction {
assertEquals(serializedFlowState, checkpointStorage.checkpoints().single().serializedFlowState)
}
database.transaction {
assertEquals(checkpoint, checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT))
}
}
private fun newCheckpoint(version: Int = 1): Pair<StateMachineRunId, SerializedBytes<Checkpoint>> {
@Test(timeout = 300_000)
fun `update checkpoint with result information`() {
val result = "This is the result"
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.copy(result = result)
val updatedSerializedFlowState =
updatedCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState)
}
database.transaction {
assertEquals(
result,
checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).result
)
assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result)
}
}
@Test(timeout = 300_000)
fun `update checkpoint with error information`() {
val exception = IllegalStateException("I am a naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.copy(
errorState = ErrorState.Errored(
listOf(
FlowError(
0,
exception
)
), 0, false
)
)
val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) }
database.transaction {
// Checkpoint always returns clean error state when retrieved via [getCheckpoint]
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean)
assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails)
}
}
@Test(timeout = 300_000)
fun `clean checkpoints clear out error information from the database`() {
val exception = IllegalStateException("I am a naughty exception")
val (id, checkpoint) = newCheckpoint()
val serializedFlowState =
checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction {
createMetadataRecord(checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState)
}
val updatedCheckpoint = checkpoint.copy(
errorState = ErrorState.Errored(
listOf(
FlowError(
0,
exception
)
), 0, false
)
)
val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) }
database.transaction {
// Checkpoint always returns clean error state when retrieved via [getCheckpoint]
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean)
}
// Set back to clean
database.transaction { checkpointStorage.updateCheckpoint(id, checkpoint, serializedFlowState) }
database.transaction {
assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean)
assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails)
}
}
private fun newCheckpointStorage() {
database.transaction {
checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
}
}
private fun newCheckpoint(version: Int = 1): Pair<StateMachineRunId, Checkpoint> {
val id = StateMachineRunId.createRandom()
val logic: FlowLogic<*> = object : FlowLogic<Unit>() {
override fun call() {}
}
val frozenLogic = logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, ALICE, SubFlowVersion.CoreFlow(version), false)
.getOrThrow()
return id to checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val checkpoint = Checkpoint.create(
InvocationContext.shell(),
FlowStart.Explicit,
logic.javaClass,
frozenLogic,
ALICE,
SubFlowVersion.CoreFlow(version),
false
)
.getOrThrow()
return id to checkpoint
}
private fun DatabaseTransaction.createMetadataRecord(checkpoint: Checkpoint) {
val metadata = DBCheckpointStorage.DBFlowMetadata(
invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value,
flowId = null,
flowName = "random.flow",
userSuppliedIdentifier = null,
startType = DBCheckpointStorage.StartReason.RPC,
launchingCordapp = "this cordapp",
platformVersion = PLATFORM_VERSION,
rpcUsername = "Batman",
invocationInstant = checkpoint.checkpointState.invocationContext.trace.invocationId.timestamp,
receivedInstant = Instant.now(),
startInstant = null,
finishInstant = null
)
session.save(metadata)
}
}

View File

@ -23,9 +23,12 @@ import net.corda.core.serialization.internal.checkpointSerialize
import net.corda.nodeapi.internal.lifecycle.NodeServicesContext
import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent
import net.corda.node.internal.NodeStartup
import net.corda.node.services.persistence.CheckpointPerformanceRecorder
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.statemachine.Checkpoint
import net.corda.node.services.statemachine.CheckpointState
import net.corda.node.services.statemachine.FlowStart
import net.corda.node.services.statemachine.FlowState
import net.corda.node.services.statemachine.SubFlowVersion
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.testing.core.SerializationEnvironmentRule
@ -104,7 +107,7 @@ class CheckpointDumperImplTest {
// add a checkpoint
val (id, checkpoint) = newCheckpoint()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializeFlowState(checkpoint))
}
dumper.dumpCheckpoints()
@ -130,7 +133,7 @@ class CheckpointDumperImplTest {
// add a checkpoint
val (id, checkpoint) = newCheckpoint()
database.transaction {
checkpointStorage.addCheckpoint(id, checkpoint)
checkpointStorage.addCheckpoint(id, checkpoint, serializeFlowState(checkpoint))
}
dumper.dumpCheckpoints()
@ -140,11 +143,18 @@ class CheckpointDumperImplTest {
private fun newCheckpointStorage() {
database.transaction {
checkpointStorage = DBCheckpointStorage()
checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder {
override fun record(
serializedCheckpointState: SerializedBytes<CheckpointState>,
serializedFlowState: SerializedBytes<FlowState>
) {
// do nothing
}
})
}
}
private fun newCheckpoint(version: Int = 1): Pair<StateMachineRunId, SerializedBytes<Checkpoint>> {
private fun newCheckpoint(version: Int = 1): Pair<StateMachineRunId, Checkpoint> {
val id = StateMachineRunId.createRandom()
val logic: FlowLogic<*> = object : FlowLogic<Unit>() {
override fun call() {}
@ -152,6 +162,10 @@ class CheckpointDumperImplTest {
val frozenLogic = logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, myself.identity.party, SubFlowVersion.CoreFlow(version), false)
.getOrThrow()
return id to checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
return id to checkpoint
}
private fun serializeFlowState(checkpoint: Checkpoint): SerializedBytes<FlowState> {
return checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)
}
}