From ab000e05338b35b90923842b9ec18fe361b6718f Mon Sep 17 00:00:00 2001 From: williamvigorr3 <58432369+williamvigorr3@users.noreply.github.com> Date: Mon, 2 Mar 2020 10:04:48 +0000 Subject: [PATCH] CORDA-3597 Replace old Checkpoint table with new one. (#5992) * Replace old Checkpoint table with new one. Adds some of the new fields into the table where needed (I have guessed this stuff but we can update it as we go along). * Fix database constraints + name table correctly opps. * Fixed typos in Liquidbase script Also corrected constraints and added missed fields in hibernate checkpoint class and liquibase scripts. * Update CheckpointStorage to pass in serialization context. This is cleaner than passing both the checkpoint and the serialized checkpoint into the methods. Also fixed CordaPersistanceServiceTests which I accidentally broke. * Fix detekt problem * Revert "Update CheckpointStorage to pass in serialization context." This reverts commit b71e78f20274ab0f5b3cf3fda1451ae2bd7a6797. * Fix test broken by reverting commit * CORDA-3597 Update metadata join, timestamp columns and serialization - Change the metadata join to the checkpoints table to use `invocation_id` instead of `flow_id`. There were issues joining between the tables because `flow_id` was not the primary key of the metadata table. Switching over to `invocation_id` has at least allowed us to bypass this issue. The information about the `invocation_id` is stored in the `Checkpoint` class which makes it simple to save at runtime. - Some of timestamp columns were nullable when they should always be populated, the nullable flags have now been removed. - Previously the whole checkpoint was being serialized and stored into the `checkpoints_blob.checkpoint` column. This meant duplicated saving as the `flow_state` was contained in this object. Only the `CheckpointState` property of `Checkpoint` is now being serialized and saved to this field. Furthermore, it now uses the default `STORAGE_CONTEXT` serialization (AMQP) instead of Kryo (which is only used for serializing the `flow_state` / flow stack). - The checkpoint database performance metrics recording has been abstracted to its own class. * CORDA-3597 Make metadata join non optional Remove the nullable declaration on the metadata field of `DBFlowCheckpoint` * CORDA-3597 Rename `node_checkpoints_blobs` to `node_checkpoint_blobs` * CORDA-3597 Update some kdocs Co-authored-by: Dan Newton --- .../persistence/MissingSchemaMigrationTest.kt | 6 +- .../CordaPersistenceServiceTests.kt | 54 ++- .../net/corda/node/internal/AbstractNode.kt | 5 +- .../corda/node/internal/CheckpointVerifier.kt | 2 +- .../node/services/api/CheckpointStorage.kt | 24 +- .../DBCheckpointPerformanceRecorder.kt | 60 +++ .../persistence/DBCheckpointStorage.kt | 433 ++++++++++++------ .../node/services/rpc/CheckpointDumperImpl.kt | 4 +- .../node/services/schema/NodeSchemaService.kt | 5 +- .../statemachine/ActionExecutorImpl.kt | 30 +- .../SingleThreadedStateMachineManager.kt | 18 +- .../statemachine/StateMachineState.kt | 70 ++- .../node-core.changelog-v17-keys.xml | 33 +- .../node-core.changelog-v17-postgres.xml | 12 +- .../migration/node-core.changelog-v17.xml | 12 +- .../persistence/DBCheckpointStorageTests.kt | 270 +++++++++-- .../services/rpc/CheckpointDumperImplTest.kt | 24 +- 17 files changed, 785 insertions(+), 277 deletions(-) create mode 100644 node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointPerformanceRecorder.kt diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/MissingSchemaMigrationTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/MissingSchemaMigrationTest.kt index 1c975cd93a..d5d803d9c1 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/MissingSchemaMigrationTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/MissingSchemaMigrationTest.kt @@ -47,7 +47,7 @@ class MissingSchemaMigrationTest { fun `test that an error is thrown when forceThrowOnMissingMigration is set and a mapped schema is missing a migration`() { assertThatThrownBy { createSchemaMigration(setOf(GoodSchema), true) - .nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) + .nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }) }.isInstanceOf(MissingMigrationException::class.java) } @@ -55,7 +55,7 @@ class MissingSchemaMigrationTest { fun `test that an error is not thrown when forceThrowOnMissingMigration is not set and a mapped schema is missing a migration`() { assertDoesNotThrow { createSchemaMigration(setOf(GoodSchema), false) - .nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) + .nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }) } } @@ -64,7 +64,7 @@ class MissingSchemaMigrationTest { assertDoesNotThrow("This test failure indicates " + "a new table has been added to the node without the appropriate migration scripts being present") { createSchemaMigration(NodeSchemaService().internalSchemas(), false) - .nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) + .nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt index 7d6aa1edf8..ff36d5e7a2 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt @@ -3,11 +3,17 @@ package net.corda.node.services.persistence import co.paralleluniverse.fibers.Suspendable import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC +import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.messaging.startFlow import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService +import net.corda.core.node.services.vault.SessionScope import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.getOrThrow +import net.corda.node.services.statemachine.Checkpoint +import net.corda.node.services.statemachine.Checkpoint.FlowStatus +import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver @@ -15,6 +21,8 @@ import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.node.internal.enclosedCordapp import org.junit.Test import java.sql.DriverManager +import java.time.Instant +import java.util.UUID import kotlin.test.assertEquals import kotlin.test.assertTrue @@ -50,16 +58,52 @@ class CordaPersistenceServiceTests { @CordaService class MultiThreadedDbLoader(private val services: AppServiceHub) : SingletonSerializeAsToken() { - fun createObjects(count: Int) : Int { + fun createObjects(count: Int): Int { (1..count).toList().parallelStream().forEach { + val now = Instant.now() services.database.transaction { - session.save(DBCheckpointStorage.DBCheckpoint().apply { - checkpointId = it.toString() - }) + session.save( + DBCheckpointStorage.DBFlowCheckpoint( + id = it.toString(), + blob = DBCheckpointStorage.DBFlowCheckpointBlob( + checkpoint = ByteArray(8192), + flowStack = ByteArray(8192), + hmac = ByteArray(16), + persistedInstant = now + ), + result = DBCheckpointStorage.DBFlowResult(value = ByteArray(16), persistedInstant = now), + exceptionDetails = null, + status = FlowStatus.RUNNABLE, + compatible = false, + progressStep = "", + ioRequestType = FlowIORequest.ForceCheckpoint.javaClass, + checkpointInstant = Instant.now(), + flowMetadata = createMetadataRecord(UUID.randomUUID(), now) + ) + ) } } return count } + + private fun SessionScope.createMetadataRecord(invocationId: UUID, timestamp: Instant): DBCheckpointStorage.DBFlowMetadata { + val metadata = DBCheckpointStorage.DBFlowMetadata( + invocationId = invocationId.toString(), + flowId = null, + flowName = "random.flow", + userSuppliedIdentifier = null, + startType = DBCheckpointStorage.StartReason.RPC, + launchingCordapp = "this cordapp", + platformVersion = PLATFORM_VERSION, + rpcUsername = "Batman", + invocationInstant = Instant.now(), + receivedInstant = Instant.now(), + startInstant = timestamp, + finishInstant = null + ) + session.save(metadata) + return metadata + } } -} \ No newline at end of file +} diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 38675eb979..6164c92ccd 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -109,6 +109,7 @@ import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.persistence.AbstractPartyDescriptor import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter import net.corda.node.services.persistence.AttachmentStorageInternal +import net.corda.node.services.persistence.DBCheckpointPerformanceRecorder import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBTransactionMappingStorage import net.corda.node.services.persistence.DBTransactionStorage @@ -251,7 +252,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } val networkMapCache = PersistentNetworkMapCache(cacheFactory, database, identityService).tokenize() - val checkpointStorage = DBCheckpointStorage() @Suppress("LeakingThis") val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize() val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, versionInfo) } @@ -318,6 +318,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, @Suppress("LeakingThis") protected val network: MessagingService = makeMessagingService().tokenize() val services = ServiceHubInternalImpl().tokenize() + val checkpointStorage = DBCheckpointStorage(DBCheckpointPerformanceRecorder(services.monitoringService.metrics)) @Suppress("LeakingThis") val smm = makeStateMachineManager() val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory) @@ -1276,7 +1277,7 @@ fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfi try { val dataSource = DataSourceFactory.createDataSource(hikariProperties, metricRegistry = metricRegistry) val schemaMigration = SchemaMigration(schemas, dataSource, databaseConfig, cordappLoader, currentDir, ourName) - schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) + schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }) start(dataSource) } catch (ex: Exception) { when { diff --git a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt index cb7fe99b6f..340226492e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt @@ -39,7 +39,7 @@ object CheckpointVerifier { checkpointStorage.getAllCheckpoints().use { it.forEach { (_, serializedCheckpoint) -> val checkpoint = try { - serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext) + serializedCheckpoint.deserialize(checkpointSerializationContext) } catch (e: ClassNotFoundException) { val message = e.message if (message != null) { diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index b463372909..8268322233 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -3,7 +3,7 @@ package net.corda.node.services.api import net.corda.core.flows.StateMachineRunId import net.corda.core.serialization.SerializedBytes import net.corda.node.services.statemachine.Checkpoint -import java.sql.Connection +import net.corda.node.services.statemachine.FlowState import java.util.stream.Stream /** @@ -13,12 +13,12 @@ interface CheckpointStorage { /** * Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id */ - fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) + fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes) /** * Update an existing checkpoint. Will throw if there is not checkpoint for this id. */ - fun updateCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) + fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes) /** * Remove existing checkpoint from the store. @@ -28,21 +28,17 @@ interface CheckpointStorage { /** * Load an existing checkpoint from the store. - * @return the checkpoint, still in serialized form, or null if not found. + * + * The checkpoint returned from this function will be a _clean_ checkpoint. No error information is loaded into the checkpoint + * even if the previous status of the checkpoint was [Checkpoint.FlowStatus.FAILED] or [Checkpoint.FlowStatus.HOSPITALIZED]. + * + * @return The checkpoint, in a partially serialized form, or null if not found. */ - fun getCheckpoint(id: StateMachineRunId): SerializedBytes? + fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? /** * Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the * underlying database connection is closed, so any processing should happen before it is closed. */ - fun getAllCheckpoints(): Stream>> - - /** - * This needs to run before Hibernate is initialised. - * - * @param connection The SQL Connection. - * @return the number of checkpoints stored in the database. - */ - fun getCheckpointCount(connection: Connection): Long + fun getAllCheckpoints(): Stream> } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointPerformanceRecorder.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointPerformanceRecorder.kt new file mode 100644 index 0000000000..7bc30ec804 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointPerformanceRecorder.kt @@ -0,0 +1,60 @@ +package net.corda.node.services.persistence + +import com.codahale.metrics.Gauge +import com.codahale.metrics.Histogram +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.Reservoir +import com.codahale.metrics.SlidingTimeWindowArrayReservoir +import com.codahale.metrics.SlidingTimeWindowReservoir +import net.corda.core.serialization.SerializedBytes +import net.corda.node.services.statemachine.CheckpointState +import net.corda.node.services.statemachine.FlowState +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong + +interface CheckpointPerformanceRecorder { + + /** + * Record performance metrics regarding the serialized size of [CheckpointState] and [FlowState] + */ + fun record(serializedCheckpointState: SerializedBytes, serializedFlowState: SerializedBytes) +} + +class DBCheckpointPerformanceRecorder(metrics: MetricRegistry) : CheckpointPerformanceRecorder { + + private val checkpointingMeter = metrics.meter("Flows.Checkpointing Rate") + private val checkpointSizesThisSecond = SlidingTimeWindowReservoir(1, TimeUnit.SECONDS) + private val lastBandwidthUpdate = AtomicLong(0) + private val checkpointBandwidthHist = metrics.register( + "Flows.CheckpointVolumeBytesPerSecondHist", Histogram( + SlidingTimeWindowArrayReservoir(1, TimeUnit.DAYS) + ) + ) + private val checkpointBandwidth = metrics.register( + "Flows.CheckpointVolumeBytesPerSecondCurrent", + LatchedGauge(checkpointSizesThisSecond) + ) + + /** + * This [Gauge] just reports the sum of the bytes checkpointed during the last second. + */ + private class LatchedGauge(private val reservoir: Reservoir) : Gauge { + override fun getValue(): Long { + return reservoir.snapshot.values.sum() + } + } + + override fun record(serializedCheckpointState: SerializedBytes, serializedFlowState: SerializedBytes) { + val totalSize = serializedCheckpointState.size.toLong() + serializedFlowState.size.toLong() + checkpointingMeter.mark() + checkpointSizesThisSecond.update(totalSize) + var lastUpdateTime = lastBandwidthUpdate.get() + while (System.nanoTime() - lastUpdateTime > TimeUnit.SECONDS.toNanos(1)) { + if (lastBandwidthUpdate.compareAndSet(lastUpdateTime, System.nanoTime())) { + val checkpointVolume = checkpointSizesThisSecond.snapshot.values.sum() + checkpointBandwidthHist.update(checkpointVolume) + } + lastUpdateTime = lastBandwidthUpdate.get() + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 13b5b2ace7..45a6739b2a 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -2,245 +2,392 @@ package net.corda.node.services.persistence import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.PLATFORM_VERSION +import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes -import net.corda.core.utilities.ProgressTracker -import net.corda.core.utilities.debug +import net.corda.core.serialization.serialize +import net.corda.core.utilities.contextLogger import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.Checkpoint.FlowStatus +import net.corda.node.services.statemachine.CheckpointState +import net.corda.node.services.statemachine.ErrorState +import net.corda.node.services.statemachine.FlowState import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.currentDBSession import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY +import org.hibernate.annotations.Type import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.util.* -import java.util.stream.Stream -import javax.persistence.Column -import javax.persistence.Entity -import javax.persistence.Id -import org.hibernate.annotations.Type -import java.lang.Exception -import java.math.BigInteger import java.sql.Connection import java.sql.SQLException import java.time.Instant +import java.util.UUID +import java.util.stream.Stream +import javax.persistence.CascadeType +import javax.persistence.Column +import javax.persistence.Entity import javax.persistence.FetchType +import javax.persistence.GeneratedValue +import javax.persistence.GenerationType +import javax.persistence.Id import javax.persistence.JoinColumn import javax.persistence.OneToOne /** * Simple checkpoint key value storage in DB. */ -class DBCheckpointStorage : CheckpointStorage { - val log: Logger = LoggerFactory.getLogger(this::class.java) +class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointPerformanceRecorder) : CheckpointStorage { + + companion object { + val log = contextLogger() + + private const val HMAC_SIZE_BYTES = 16 + + /** + * This needs to run before Hibernate is initialised. + * + * No need to set up [DBCheckpointStorage] fully for this function + * + * @param connection The SQL Connection. + * @return the number of checkpoints stored in the database. + */ + fun getCheckpointCount(connection: Connection): Long { + // No need to set up [DBCheckpointStorage] fully for this function + return try { + connection.prepareStatement("select count(*) from node_checkpoints").use { ps -> + ps.executeQuery().use { rs -> + rs.next() + rs.getLong(1) + } + } + } catch (e: SQLException) { + // Happens when the table was not created yet. + 0L + } + } + } enum class StartReason { RPC, FLOW, SERVICE, SCHEDULED, INITIATED } @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_new") + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") class DBFlowCheckpoint( - @Id - @Column(name = "flow_id", length = 64, nullable = false) - var id: String, + @Id + @Column(name = "flow_id", length = 64, nullable = false) + var id: String, - @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id") - var blob: DBFlowCheckpointBlob, + @OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true) + @JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id") + var blob: DBFlowCheckpointBlob, - @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "result_id", referencedColumnName = "id") - var result: DBFlowResult?, + @OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true) + @JoinColumn(name = "result_id", referencedColumnName = "id") + var result: DBFlowResult?, - @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "error_id", referencedColumnName = "id") - var exceptionDetails: DBFlowException?, + @OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true) + @JoinColumn(name = "error_id", referencedColumnName = "id") + var exceptionDetails: DBFlowException?, - @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "flow_id", referencedColumnName = "flow_id") - var flowMetadata: DBFlowMetadata, + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "invocation_id", referencedColumnName = "invocation_id") + var flowMetadata: DBFlowMetadata, - @Column(name = "status") - var status: FlowStatus, + @Column(name = "status", nullable = false) + var status: FlowStatus, - @Column(name = "compatible") - var compatible: Boolean, + @Column(name = "compatible", nullable = false) + var compatible: Boolean, - @Column(name = "progress_step") - var progressStep: String, + @Column(name = "progress_step") + var progressStep: String?, - @Column(name = "flow_io_request") - var ioRequestType: Class>, + @Column(name = "flow_io_request") + var ioRequestType: Class>?, - @Column(name = "timestamp") - var checkpointInstant: Instant + @Column(name = "timestamp", nullable = false) + var checkpointInstant: Instant ) @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs") + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoint_blobs") class DBFlowCheckpointBlob( - @Id - @Column(name = "id", nullable = false) - var id: BigInteger? = null, + @Id + @GeneratedValue(strategy = GenerationType.SEQUENCE) + @Column(name = "id", nullable = false) + private var id: Long = 0, - @Type(type = "corda-blob") - @Column(name = "checkpoint_value", nullable = false) - var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, + @Type(type = "corda-blob") + @Column(name = "checkpoint_value", nullable = false) + var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, - @Type(type = "corda-blob") - @Column(name = "flow_state", nullable = false) - var flowStack: ByteArray = EMPTY_BYTE_ARRAY, + // A future task will make this nullable + @Type(type = "corda-blob") + @Column(name = "flow_state", nullable = false) + var flowStack: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "timestamp") - var persistedInstant: Instant? = null + @Column(name = "hmac") + var hmac: ByteArray, + + @Column(name = "timestamp") + var persistedInstant: Instant ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_results") class DBFlowResult( - @Id - @Column(name = "id", nullable = false) - var id: BigInteger? = null, + @Id + @Column(name = "id", nullable = false) + @GeneratedValue(strategy = GenerationType.SEQUENCE) + private var id: Long = 0, - @Type(type = "corda-blob") - @Column(name = "result_value", nullable = false) - var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, + @Type(type = "corda-blob") + @Column(name = "result_value", nullable = false) + var value: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "timestamp") - val persistedInstant: Instant? = null + @Column(name = "timestamp") + val persistedInstant: Instant ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_exceptions") class DBFlowException( - @Id - @Column(name = "id", nullable = false) - var id: BigInteger? = null, + @Id + @Column(name = "id", nullable = false) + @GeneratedValue(strategy = GenerationType.SEQUENCE) + private var id: Long = 0, - @Column(name = "type", nullable = false) - var type: Class, + @Column(name = "type", nullable = false) + var type: Class, - @Type(type = "corda-blob") - @Column(name = "exception_value", nullable = false) - var value: ByteArray = EMPTY_BYTE_ARRAY, + @Type(type = "corda-blob") + @Column(name = "exception_value", nullable = false) + var value: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "exception_message") - var message: String? = null, + @Column(name = "exception_message") + var message: String? = null, - @Column(name = "timestamp") - val persistedInstant: Instant? = null + @Column(name = "timestamp") + val persistedInstant: Instant ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_metadata") class DBFlowMetadata( - @Id - @Column(name = "flow_id", length = 64, nullable = false) - var flowId: String, + @Id + @Column(name = "invocation_id", nullable = false) + var invocationId: String, - @Column(name = "flow_name", nullable = false) - var flowName: String, + @Column(name = "flow_id", nullable = true) + var flowId: String?, - @Column(name = "flow_identifier", nullable = true) - var userSuppliedIdentifier: String?, + @Column(name = "flow_name", nullable = false) + var flowName: String, - @Column(name = "started_type", nullable = false) - var startType: StartReason, + @Column(name = "flow_identifier", nullable = true) + var userSuppliedIdentifier: String?, - @Column(name = "flow_parameters", nullable = false) - var initialParameters: ByteArray = EMPTY_BYTE_ARRAY, + @Column(name = "started_type", nullable = false) + var startType: StartReason, - @Column(name = "cordapp_name", nullable = false) - var launchingCordapp: String, + @Column(name = "flow_parameters", nullable = false) + var initialParameters: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "platform_version", nullable = false) - var platformVersion: Int, + @Column(name = "cordapp_name", nullable = false) + var launchingCordapp: String, - @Column(name = "rpc_user", nullable = false) - var rpcUsername: String, + @Column(name = "platform_version", nullable = false) + var platformVersion: Int, - @Column(name = "invocation_time", nullable = false) - var invocationInstant: Instant, + @Column(name = "rpc_user", nullable = false) + var rpcUsername: String, - @Column(name = "received_time", nullable = false) - var receivedInstant: Instant, + @Column(name = "invocation_time", nullable = false) + var invocationInstant: Instant, - @Column(name = "start_time", nullable = true) - var startInstant: Instant?, + @Column(name = "received_time", nullable = false) + var receivedInstant: Instant, - @Column(name = "finish_time", nullable = true) - var finishInstant: Instant? + @Column(name = "start_time", nullable = true) + var startInstant: Instant?, + + @Column(name = "finish_time", nullable = true) + var finishInstant: Instant? ) - @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") - class DBCheckpoint( - @Id - @Suppress("MagicNumber") // database column width - @Column(name = "checkpoint_id", length = 64, nullable = false) - var checkpointId: String = "", - - @Type(type = "corda-blob") - @Column(name = "checkpoint_value", nullable = false) - var checkpoint: ByteArray = EMPTY_BYTE_ARRAY - ) { - override fun toString() = "DBCheckpoint(checkpointId = ${checkpointId}, checkpointSize = ${checkpoint.size})" + override fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes) { + currentDBSession().save(createDBCheckpoint(id, checkpoint, serializedFlowState)) } - override fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) { - currentDBSession().save(DBCheckpoint().apply { - checkpointId = id.uuid.toString() - this.checkpoint = checkpoint.bytes - log.debug { "Checkpoint $checkpointId, size=${this.checkpoint.size}" } - }) - } - - override fun updateCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) { - currentDBSession().update(DBCheckpoint().apply { - checkpointId = id.uuid.toString() - this.checkpoint = checkpoint.bytes - log.debug { "Checkpoint $checkpointId, size=${this.checkpoint.size}" } - }) + override fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes) { + currentDBSession().update(updateDBCheckpoint(id, checkpoint, serializedFlowState)) } override fun removeCheckpoint(id: StateMachineRunId): Boolean { val session = currentDBSession() val criteriaBuilder = session.criteriaBuilder - val delete = criteriaBuilder.createCriteriaDelete(DBCheckpoint::class.java) - val root = delete.from(DBCheckpoint::class.java) - delete.where(criteriaBuilder.equal(root.get(DBCheckpoint::checkpointId.name), id.uuid.toString())) + val delete = criteriaBuilder.createCriteriaDelete(DBFlowCheckpoint::class.java) + val root = delete.from(DBFlowCheckpoint::class.java) + delete.where(criteriaBuilder.equal(root.get(DBFlowCheckpoint::id.name), id.uuid.toString())) return session.createQuery(delete).executeUpdate() > 0 } - override fun getCheckpoint(id: StateMachineRunId): SerializedBytes? { - val bytes = currentDBSession().get(DBCheckpoint::class.java, id.uuid.toString())?.checkpoint ?: return null - return SerializedBytes(bytes) + override fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? { + return currentDBSession().get(DBFlowCheckpoint::class.java, id.uuid.toString())?.toSerializedCheckpoint() } - override fun getAllCheckpoints(): Stream>> { + override fun getAllCheckpoints(): Stream> { val session = currentDBSession() - val criteriaQuery = session.criteriaBuilder.createQuery(DBCheckpoint::class.java) - val root = criteriaQuery.from(DBCheckpoint::class.java) + val criteriaQuery = session.criteriaBuilder.createQuery(DBFlowCheckpoint::class.java) + val root = criteriaQuery.from(DBFlowCheckpoint::class.java) criteriaQuery.select(root) return session.createQuery(criteriaQuery).stream().map { - StateMachineRunId(UUID.fromString(it.checkpointId)) to SerializedBytes(it.checkpoint) + StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint() } } - override fun getCheckpointCount(connection: Connection): Long { - return try { - connection.prepareStatement("select count(*) from node_checkpoints").use { ps -> - ps.executeQuery().use { rs -> - rs.next() - rs.getLong(1) - } - } - } catch (e: SQLException) { - // Happens when the table was not created yet. - 0L + private fun createDBCheckpoint( + id: StateMachineRunId, + checkpoint: Checkpoint, + serializedFlowState: SerializedBytes + ): DBFlowCheckpoint { + val flowId = id.uuid.toString() + val now = Instant.now() + val invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value + + val serializedCheckpointState = checkpoint.checkpointState.storageSerialize() + checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState) + + val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now) + // Need to update the metadata record to join it to the main checkpoint record + + // This code needs to be added back in once the metadata record is properly created (remove the code below it) + // val metadata = requireNotNull(currentDBSession().find( + // DBFlowMetadata::class.java, + // invocationId + // )) { "The flow metadata record for flow [$flowId] with invocation id [$invocationId] does not exist"} + val metadata = (currentDBSession().find( + DBFlowMetadata::class.java, + invocationId + )) ?: createTemporaryMetadata(checkpoint) + metadata.flowId = flowId + currentDBSession().update(metadata) + // Most fields are null as they cannot have been set when creating the initial checkpoint + return DBFlowCheckpoint( + id = flowId, + blob = blob, + result = null, + exceptionDetails = null, + flowMetadata = metadata, + status = checkpoint.status, + compatible = checkpoint.compatible, + progressStep = null, + ioRequestType = null, + checkpointInstant = Instant.now() + ) + } + + // Remove this when saving of metadata is properly handled + private fun createTemporaryMetadata(checkpoint: Checkpoint): DBFlowMetadata { + return DBFlowMetadata( + invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value, + flowId = null, + flowName = "random.flow", + userSuppliedIdentifier = null, + startType = DBCheckpointStorage.StartReason.RPC, + launchingCordapp = "this cordapp", + platformVersion = PLATFORM_VERSION, + rpcUsername = "Batman", + invocationInstant = checkpoint.checkpointState.invocationContext.trace.invocationId.timestamp, + receivedInstant = Instant.now(), + startInstant = null, + finishInstant = null + ).apply { + currentDBSession().save(this) } } + + private fun updateDBCheckpoint( + id: StateMachineRunId, + checkpoint: Checkpoint, + serializedFlowState: SerializedBytes + ): DBFlowCheckpoint { + val flowId = id.uuid.toString() + val now = Instant.now() + + val serializedCheckpointState = checkpoint.checkpointState.storageSerialize() + checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState) + + val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now) + val result = checkpoint.result?.let { createDBFlowResult(it, now) } + val exceptionDetails = (checkpoint.errorState as? ErrorState.Errored)?.let { createDBFlowException(it, now) } + // Load the previous entity from the hibernate cache so the meta data join does not get updated + val entity = currentDBSession().find(DBFlowCheckpoint::class.java, flowId) + return entity.apply { + this.blob = blob + this.result = result + this.exceptionDetails = exceptionDetails + // Do not update the meta data relationship on updates + this.flowMetadata = entity.flowMetadata + this.status = checkpoint.status + this.compatible = checkpoint.compatible + this.progressStep = checkpoint.progressStep + this.ioRequestType = checkpoint.flowIoRequest + this.checkpointInstant = now + } + } + + private fun createDBCheckpointBlob( + serializedCheckpointState: SerializedBytes, + serializedFlowState: SerializedBytes, + now: Instant + ): DBFlowCheckpointBlob { + return DBFlowCheckpointBlob( + checkpoint = serializedCheckpointState.bytes, + flowStack = serializedFlowState.bytes, + hmac = ByteArray(HMAC_SIZE_BYTES), + persistedInstant = now + ) + } + + private fun createDBFlowResult(result: Any, now: Instant): DBFlowResult { + return DBFlowResult( + value = result.storageSerialize().bytes, + persistedInstant = now + ) + } + + private fun createDBFlowException(errorState: ErrorState.Errored, now: Instant): DBFlowException { + return errorState.errors.last().exception.let { + DBFlowException( + type = it::class.java, + message = it.message, + value = it.storageSerialize().bytes, + persistedInstant = now + ) + } + } + + private fun DBFlowCheckpoint.toSerializedCheckpoint(): Checkpoint.Serialized { + return Checkpoint.Serialized( + serializedCheckpointState = SerializedBytes(blob.checkpoint), + serializedFlowState = SerializedBytes(blob.flowStack), + // Always load as a [Clean] checkpoint to represent that the checkpoint is the last _good_ checkpoint + errorState = ErrorState.Clean, + // A checkpoint with a result should not normally be loaded (it should be [null] most of the time) + result = result?.let { SerializedBytes(it.value) }, + status = status, + progressStep = progressStep, + flowIoRequest = ioRequestType, + compatible = compatible + ) + } + + private fun T.storageSerialize(): SerializedBytes { + return serialize(context = SerializationDefaults.STORAGE_CONTEXT) + } } diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt index d27a480a5a..c3979612ab 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt @@ -149,8 +149,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri instrumentCheckpointAgent(runId) val (bytes, fileName) = try { - val checkpoint = - serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext) + val checkpoint = serialisedCheckpoint.deserialize(checkpointSerializationContext) val json = checkpoint.toJson(runId.uuid, now) val jsonBytes = writer.writeValueAsBytes(json) jsonBytes to "${json.topLevelFlowClass.simpleName}-${runId.uuid}.json" @@ -229,6 +228,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri origin = checkpointState.invocationContext.origin.toOrigin(), ourIdentity = checkpointState.ourIdentity, activeSessions = checkpointState.sessions.mapNotNull { it.value.toActiveSession(it.key) }, + // This can only ever return as [ErrorState.Clean] which causes it to become [null] errored = errorState as? ErrorState.Errored ) } diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 73d7cbb3d9..d38c6371ef 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -33,10 +33,7 @@ class NodeSchemaService(private val extraSchemas: Set = emptySet() object NodeCore object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1, - mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java, - - //new checkpoints - keeping old around to allow testing easily (for now) - DBCheckpointStorage.DBFlowCheckpoint::class.java, + mappedTypes = listOf(DBCheckpointStorage.DBFlowCheckpoint::class.java, DBCheckpointStorage.DBFlowCheckpointBlob::class.java, DBCheckpointStorage.DBFlowResult::class.java, DBCheckpointStorage.DBFlowException::class.java, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 3ffdd4b709..e7a45b145b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -31,8 +31,7 @@ class ActionExecutorImpl( private val checkpointStorage: CheckpointStorage, private val flowMessaging: FlowMessaging, private val stateMachineManager: StateMachineManagerInternal, - private val checkpointSerializationContext: CheckpointSerializationContext, - metrics: MetricRegistry + private val checkpointSerializationContext: CheckpointSerializationContext ) : ActionExecutor { private companion object { @@ -48,12 +47,6 @@ class ActionExecutorImpl( } } - private val checkpointingMeter = metrics.meter("Flows.Checkpointing Rate") - private val checkpointSizesThisSecond = SlidingTimeWindowReservoir(1, TimeUnit.SECONDS) - private val lastBandwidthUpdate = AtomicLong(0) - private val checkpointBandwidthHist = metrics.register("Flows.CheckpointVolumeBytesPerSecondHist", Histogram(SlidingTimeWindowArrayReservoir(1, TimeUnit.DAYS))) - private val checkpointBandwidth = metrics.register("Flows.CheckpointVolumeBytesPerSecondCurrent", LatchedGauge(checkpointSizesThisSecond)) - @Suspendable override fun executeAction(fiber: FlowFiber, action: Action) { log.trace { "Flow ${fiber.id} executing $action" } @@ -100,21 +93,12 @@ class ActionExecutorImpl( @Suspendable private fun executePersistCheckpoint(action: Action.PersistCheckpoint) { - val checkpointBytes = serializeCheckpoint(action.checkpoint) + val checkpoint = action.checkpoint + val serializedFlowState = checkpoint.flowState.checkpointSerialize(checkpointSerializationContext) if (action.isCheckpointUpdate) { - checkpointStorage.updateCheckpoint(action.id, checkpointBytes) + checkpointStorage.updateCheckpoint(action.id, checkpoint, serializedFlowState) } else { - checkpointStorage.addCheckpoint(action.id, checkpointBytes) - } - checkpointingMeter.mark() - checkpointSizesThisSecond.update(checkpointBytes.size.toLong()) - var lastUpdateTime = lastBandwidthUpdate.get() - while (System.nanoTime() - lastUpdateTime > TimeUnit.SECONDS.toNanos(1)) { - if (lastBandwidthUpdate.compareAndSet(lastUpdateTime, System.nanoTime())) { - val checkpointVolume = checkpointSizesThisSecond.snapshot.values.sum() - checkpointBandwidthHist.update(checkpointVolume) - } - lastUpdateTime = lastBandwidthUpdate.get() + checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState) } } @@ -258,10 +242,6 @@ class ActionExecutorImpl( stateMachineManager.retryFlowFromSafePoint(action.currentState) } - private fun serializeCheckpoint(checkpoint: Checkpoint): SerializedBytes { - return checkpoint.checkpointSerialize(context = checkpointSerializationContext) - } - private fun cancelFlowTimeout(action: Action.CancelFlowTimeout) { stateMachineManager.cancelFlowTimeout(action.flowId) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 1352fb6ec6..d722ad9fdb 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -590,7 +590,7 @@ class SingleThreadedStateMachineManager( // The checkpoint will be missing if the flow failed before persisting the original checkpoint // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) checkpointStorage.getCheckpoint(flowId)?.let { serializedCheckpoint -> - val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId) + val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) if (checkpoint == null) { return openFuture>().mapError { IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " + @@ -758,14 +758,23 @@ class SingleThreadedStateMachineManager( } } + private fun tryDeserializeCheckpoint(serializedCheckpoint: Checkpoint.Serialized, flowId: StateMachineRunId): Checkpoint? { + return try { + serializedCheckpoint.deserialize(checkpointSerializationContext!!) + } catch (e: Exception) { + logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e) + null + } + } + private fun createFlowFromCheckpoint( id: StateMachineRunId, - serializedCheckpoint: SerializedBytes, + serializedCheckpoint: Checkpoint.Serialized, isAnyCheckpointPersisted: Boolean, isStartIdempotent: Boolean, initialDeduplicationHandler: DeduplicationHandler? ): Flow? { - val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, id) ?: return null + val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return null val flowState = checkpoint.flowState val resultFuture = openFuture() val fiber = when (flowState) { @@ -865,8 +874,7 @@ class SingleThreadedStateMachineManager( checkpointStorage, flowMessaging, this, - checkpointSerializationContext, - metrics + checkpointSerializationContext ) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index f4c9e790b6..99882aa022 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -7,7 +7,12 @@ import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.internal.CheckpointSerializationContext +import net.corda.core.serialization.internal.checkpointDeserialize import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler import java.time.Instant @@ -57,9 +62,10 @@ data class Checkpoint( val result: Any? = null, val status: FlowStatus = FlowStatus.RUNNABLE, val progressStep: String? = null, - val flowIoRequest: FlowIORequest<*>? = null, + val flowIoRequest: Class>? = null, val compatible: Boolean = true ) { + @CordaSerializable enum class FlowStatus { RUNNABLE, FAILED, @@ -84,9 +90,15 @@ data class Checkpoint( ): Try { return SubFlow.create(flowLogicClass, subFlowVersion, isEnabledTimedFlow).map { topLevelSubFlow -> Checkpoint( - checkpointState = CheckpointState(invocationContext, ourIdentity, emptyMap(), listOf(topLevelSubFlow), numberOfSuspends = 0), - flowState = FlowState.Unstarted(flowStart, frozenFlowLogic), - errorState = ErrorState.Clean + checkpointState = CheckpointState( + invocationContext, + ourIdentity, + emptyMap(), + listOf(topLevelSubFlow), + numberOfSuspends = 0 + ), + errorState = ErrorState.Clean, + flowState = FlowState.Unstarted(flowStart, frozenFlowLogic) ) } } @@ -123,6 +135,41 @@ data class Checkpoint( fun addSubflow(subFlow: SubFlow) : Checkpoint { return copy(checkpointState = checkpointState.copy(subFlowStack = checkpointState.subFlowStack + subFlow)) } + + /** + * A partially serialized form of [Checkpoint]. + * + * [Checkpoint.Serialized] contains the same fields as [Checkpoint] except that some of its fields are still serialized. The checkpoint + * can then be deserialized as needed. + */ + data class Serialized( + val serializedCheckpointState: SerializedBytes, + val serializedFlowState: SerializedBytes, + val errorState: ErrorState, + val result: SerializedBytes?, + val status: FlowStatus, + val progressStep: String?, + val flowIoRequest: Class>?, + val compatible: Boolean + ) { + /** + * Deserializes the serialized fields contained in [Checkpoint.Serialized]. + * + * @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized] + */ + fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint { + return Checkpoint( + checkpointState = serializedCheckpointState.deserialize(context = SerializationDefaults.STORAGE_CONTEXT), + flowState = serializedFlowState.checkpointDeserialize(checkpointSerializationContext), + errorState = errorState, + result = result?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT), + status = status, + progressStep = progressStep, + flowIoRequest = flowIoRequest, + compatible = compatible + ) + } + } } /** @@ -132,12 +179,13 @@ data class Checkpoint( * @param subFlowStack the stack of currently executing subflows. * @param numberOfSuspends the number of flow suspends due to IO API calls. */ +@CordaSerializable data class CheckpointState( - val invocationContext: InvocationContext, - val ourIdentity: Party, - val sessions: SessionMap, // This must preserve the insertion order! - val subFlowStack: List, - val numberOfSuspends: Int + val invocationContext: InvocationContext, + val ourIdentity: Party, + val sessions: SessionMap, // This must preserve the insertion order! + val subFlowStack: List, + val numberOfSuspends: Int ) /** @@ -254,17 +302,20 @@ sealed class FlowState { * @param exception the exception itself. Note that this may not contain information about the source error depending * on whether the source error was a FlowException or otherwise. */ +@CordaSerializable data class FlowError(val errorId: Long, val exception: Throwable) /** * The flow's error state. */ +@CordaSerializable sealed class ErrorState { abstract fun addErrors(newErrors: List): ErrorState /** * The flow is in a clean state. */ + @CordaSerializable object Clean : ErrorState() { override fun addErrors(newErrors: List): ErrorState { return Errored(newErrors, 0, false) @@ -281,6 +332,7 @@ sealed class ErrorState { * @param propagating true if error propagation was triggered. If this is set the dirtiness is permanent as the * sessions associated with the flow have been (or about to be) dirtied in counter-flows. */ + @CordaSerializable data class Errored( val errors: List, val propagatedIndex: Int, diff --git a/node/src/main/resources/migration/node-core.changelog-v17-keys.xml b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml index fdc812a02c..482c4d6418 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17-keys.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml @@ -5,30 +5,31 @@ logicalFilePath="migration/node-services.changelog-init.xml"> - + - - - + + - + + + - + + - - \ No newline at end of file + diff --git a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml index 9602ec51b3..acb3557060 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml @@ -5,7 +5,8 @@ logicalFilePath="migration/node-services.changelog-init.xml"> - + + @@ -18,6 +19,9 @@ + + + @@ -84,7 +88,7 @@ - + @@ -94,7 +98,7 @@ - + @@ -136,4 +140,4 @@ - \ No newline at end of file + diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml index 2c1789fc95..266361124e 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -5,7 +5,8 @@ logicalFilePath="migration/node-services.changelog-init.xml"> - + + @@ -18,6 +19,9 @@ + + + @@ -84,7 +88,7 @@ - + @@ -94,7 +98,7 @@ - + @@ -136,4 +140,4 @@ - \ No newline at end of file + diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index c7ceb785da..c30044cb02 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -3,6 +3,7 @@ package net.corda.node.services.persistence import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId +import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.internal.checkpointSerialize @@ -10,11 +11,16 @@ import net.corda.node.internal.CheckpointIncompatibleException import net.corda.node.internal.CheckpointVerifier import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.Checkpoint +import net.corda.node.services.statemachine.CheckpointState +import net.corda.node.services.statemachine.ErrorState +import net.corda.node.services.statemachine.FlowError import net.corda.node.services.statemachine.FlowStart +import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.SubFlowVersion import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity @@ -28,9 +34,15 @@ import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test +import java.lang.IllegalStateException +import java.time.Instant import kotlin.streams.toList +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue -internal fun CheckpointStorage.checkpoints(): List> { +internal fun CheckpointStorage.checkpoints(): List { return getAllCheckpoints().use { it.map { it.second }.toList() } @@ -61,26 +73,42 @@ class DBCheckpointStorageTests { LogHelper.reset(PersistentUniquenessProvider::class) } - @Test(timeout=300_000) - fun `add new checkpoint`() { + @Test(timeout = 300_000) + fun `add new checkpoint`() { val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint) + assertEquals( + checkpoint, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } newCheckpointStorage() database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint) + assertEquals( + checkpoint, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) + session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).also { + assertNotNull(it) + assertNotNull(it.blob) + } } } - @Test(timeout=300_000) - fun `remove checkpoint`() { + @Test(timeout = 300_000) + fun `remove checkpoint`() { val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } database.transaction { checkpointStorage.removeCheckpoint(id) @@ -94,58 +122,87 @@ class DBCheckpointStorageTests { } } - @Test(timeout=300_000) - fun `add and remove checkpoint in single commit operate`() { + @Test(timeout = 300_000) + fun `add and remove checkpoint in single commit operation`() { val (id, checkpoint) = newCheckpoint() + val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) val (id2, checkpoint2) = newCheckpoint() + val serializedFlowState2 = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) - checkpointStorage.addCheckpoint(id2, checkpoint2) + createMetadataRecord(checkpoint) + createMetadataRecord(checkpoint2) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + checkpointStorage.addCheckpoint(id2, checkpoint2, serializedFlowState2) checkpointStorage.removeCheckpoint(id) } database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint2) + assertEquals( + checkpoint2, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } newCheckpointStorage() database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint2) + assertEquals( + checkpoint2, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } } - @Test(timeout=300_000) - fun `add two checkpoints then remove first one`() { + @Test(timeout = 300_000) + fun `add two checkpoints then remove first one`() { val (id, firstCheckpoint) = newCheckpoint() + val serializedFirstFlowState = + firstCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { - checkpointStorage.addCheckpoint(id, firstCheckpoint) + createMetadataRecord(firstCheckpoint) + checkpointStorage.addCheckpoint(id, firstCheckpoint, serializedFirstFlowState) } val (id2, secondCheckpoint) = newCheckpoint() + val serializedSecondFlowState = + secondCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id2, secondCheckpoint) + createMetadataRecord(secondCheckpoint) + checkpointStorage.addCheckpoint(id2, secondCheckpoint, serializedSecondFlowState) } database.transaction { checkpointStorage.removeCheckpoint(id) } database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint) + assertEquals( + secondCheckpoint, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } newCheckpointStorage() database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint) + assertEquals( + secondCheckpoint, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } } - @Test(timeout=300_000) - fun `add checkpoint and then remove after 'restart'`() { + @Test(timeout = 300_000) + fun `add checkpoint and then remove after 'restart'`() { val (id, originalCheckpoint) = newCheckpoint() + val serializedOriginalFlowState = + originalCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id, originalCheckpoint) + createMetadataRecord(originalCheckpoint) + checkpointStorage.addCheckpoint(id, originalCheckpoint, serializedOriginalFlowState) } newCheckpointStorage() val reconstructedCheckpoint = database.transaction { checkpointStorage.checkpoints().single() } database.transaction { - assertThat(reconstructedCheckpoint).isEqualTo(originalCheckpoint).isNotSameAs(originalCheckpoint) + assertEquals(originalCheckpoint, reconstructedCheckpoint.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)) + assertThat(reconstructedCheckpoint.serializedFlowState).isEqualTo(serializedOriginalFlowState) + .isNotSameAs(serializedOriginalFlowState) } database.transaction { checkpointStorage.removeCheckpoint(id) @@ -155,12 +212,15 @@ class DBCheckpointStorageTests { } } - @Test(timeout=300_000) - fun `verify checkpoints compatible`() { + @Test(timeout = 300_000) + fun `verify checkpoints compatible`() { val mockServices = MockServices(emptyList(), ALICE.name) database.transaction { val (id, checkpoint) = newCheckpoint(1) - checkpointStorage.addCheckpoint(id, checkpoint) + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } database.transaction { @@ -169,7 +229,10 @@ class DBCheckpointStorageTests { database.transaction { val (id1, checkpoint1) = newCheckpoint(2) - checkpointStorage.addCheckpoint(id1, checkpoint1) + val serializedFlowState1 = + checkpoint1.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + createMetadataRecord(checkpoint1) + checkpointStorage.addCheckpoint(id1, checkpoint1, serializedFlowState1) } assertThatThrownBy { @@ -179,21 +242,158 @@ class DBCheckpointStorageTests { }.isInstanceOf(CheckpointIncompatibleException::class.java) } - private fun newCheckpointStorage() { + @Test(timeout = 300_000) + fun `checkpoint can be recreated from database record`() { + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage = DBCheckpointStorage() + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + database.transaction { + assertEquals(serializedFlowState, checkpointStorage.checkpoints().single().serializedFlowState) + } + database.transaction { + assertEquals(checkpoint, checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)) } } - private fun newCheckpoint(version: Int = 1): Pair> { + @Test(timeout = 300_000) + fun `update checkpoint with result information`() { + val result = "This is the result" + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.copy(result = result) + val updatedSerializedFlowState = + updatedCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { + checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) + } + database.transaction { + assertEquals( + result, + checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).result + ) + assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result) + } + } + + @Test(timeout = 300_000) + fun `update checkpoint with error information`() { + val exception = IllegalStateException("I am a naughty exception") + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.copy( + errorState = ErrorState.Errored( + listOf( + FlowError( + 0, + exception + ) + ), 0, false + ) + ) + val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } + database.transaction { + // Checkpoint always returns clean error state when retrieved via [getCheckpoint] + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean) + assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails) + } + } + + @Test(timeout = 300_000) + fun `clean checkpoints clear out error information from the database`() { + val exception = IllegalStateException("I am a naughty exception") + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.copy( + errorState = ErrorState.Errored( + listOf( + FlowError( + 0, + exception + ) + ), 0, false + ) + ) + val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } + database.transaction { + // Checkpoint always returns clean error state when retrieved via [getCheckpoint] + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean) + } + // Set back to clean + database.transaction { checkpointStorage.updateCheckpoint(id, checkpoint, serializedFlowState) } + database.transaction { + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean) + assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails) + } + } + + private fun newCheckpointStorage() { + database.transaction { + checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder { + override fun record( + serializedCheckpointState: SerializedBytes, + serializedFlowState: SerializedBytes + ) { + // do nothing + } + }) + } + } + + private fun newCheckpoint(version: Int = 1): Pair { val id = StateMachineRunId.createRandom() val logic: FlowLogic<*> = object : FlowLogic() { override fun call() {} } val frozenLogic = logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) - val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, ALICE, SubFlowVersion.CoreFlow(version), false) - .getOrThrow() - return id to checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val checkpoint = Checkpoint.create( + InvocationContext.shell(), + FlowStart.Explicit, + logic.javaClass, + frozenLogic, + ALICE, + SubFlowVersion.CoreFlow(version), + false + ) + .getOrThrow() + return id to checkpoint } + private fun DatabaseTransaction.createMetadataRecord(checkpoint: Checkpoint) { + val metadata = DBCheckpointStorage.DBFlowMetadata( + invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value, + flowId = null, + flowName = "random.flow", + userSuppliedIdentifier = null, + startType = DBCheckpointStorage.StartReason.RPC, + launchingCordapp = "this cordapp", + platformVersion = PLATFORM_VERSION, + rpcUsername = "Batman", + invocationInstant = checkpoint.checkpointState.invocationContext.trace.invocationId.timestamp, + receivedInstant = Instant.now(), + startInstant = null, + finishInstant = null + ) + session.save(metadata) + } } diff --git a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt index bf9a482e66..14c4586ee6 100644 --- a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt @@ -23,9 +23,12 @@ import net.corda.core.serialization.internal.checkpointSerialize import net.corda.nodeapi.internal.lifecycle.NodeServicesContext import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent import net.corda.node.internal.NodeStartup +import net.corda.node.services.persistence.CheckpointPerformanceRecorder import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.Checkpoint +import net.corda.node.services.statemachine.CheckpointState import net.corda.node.services.statemachine.FlowStart +import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.SubFlowVersion import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.testing.core.SerializationEnvironmentRule @@ -104,7 +107,7 @@ class CheckpointDumperImplTest { // add a checkpoint val (id, checkpoint) = newCheckpoint() database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializeFlowState(checkpoint)) } dumper.dumpCheckpoints() @@ -130,7 +133,7 @@ class CheckpointDumperImplTest { // add a checkpoint val (id, checkpoint) = newCheckpoint() database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializeFlowState(checkpoint)) } dumper.dumpCheckpoints() @@ -140,11 +143,18 @@ class CheckpointDumperImplTest { private fun newCheckpointStorage() { database.transaction { - checkpointStorage = DBCheckpointStorage() + checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder { + override fun record( + serializedCheckpointState: SerializedBytes, + serializedFlowState: SerializedBytes + ) { + // do nothing + } + }) } } - private fun newCheckpoint(version: Int = 1): Pair> { + private fun newCheckpoint(version: Int = 1): Pair { val id = StateMachineRunId.createRandom() val logic: FlowLogic<*> = object : FlowLogic() { override fun call() {} @@ -152,6 +162,10 @@ class CheckpointDumperImplTest { val frozenLogic = logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, myself.identity.party, SubFlowVersion.CoreFlow(version), false) .getOrThrow() - return id to checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + return id to checkpoint + } + + private fun serializeFlowState(checkpoint: Checkpoint): SerializedBytes { + return checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) } } \ No newline at end of file