diff --git a/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/MissingSchemaMigrationTest.kt b/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/MissingSchemaMigrationTest.kt index 1c975cd93a..d5d803d9c1 100644 --- a/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/MissingSchemaMigrationTest.kt +++ b/node-api/src/test/kotlin/net/corda/nodeapi/internal/persistence/MissingSchemaMigrationTest.kt @@ -47,7 +47,7 @@ class MissingSchemaMigrationTest { fun `test that an error is thrown when forceThrowOnMissingMigration is set and a mapped schema is missing a migration`() { assertThatThrownBy { createSchemaMigration(setOf(GoodSchema), true) - .nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) + .nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }) }.isInstanceOf(MissingMigrationException::class.java) } @@ -55,7 +55,7 @@ class MissingSchemaMigrationTest { fun `test that an error is not thrown when forceThrowOnMissingMigration is not set and a mapped schema is missing a migration`() { assertDoesNotThrow { createSchemaMigration(setOf(GoodSchema), false) - .nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) + .nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }) } } @@ -64,7 +64,7 @@ class MissingSchemaMigrationTest { assertDoesNotThrow("This test failure indicates " + "a new table has been added to the node without the appropriate migration scripts being present") { createSchemaMigration(NodeSchemaService().internalSchemas(), false) - .nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) + .nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }) } } diff --git a/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt b/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt index 7d6aa1edf8..ff36d5e7a2 100644 --- a/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt +++ b/node/src/integration-test/kotlin/net/corda/node/services/persistence/CordaPersistenceServiceTests.kt @@ -3,11 +3,17 @@ package net.corda.node.services.persistence import co.paralleluniverse.fibers.Suspendable import net.corda.core.flows.FlowLogic import net.corda.core.flows.StartableByRPC +import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.messaging.startFlow import net.corda.core.node.AppServiceHub import net.corda.core.node.services.CordaService +import net.corda.core.node.services.vault.SessionScope import net.corda.core.serialization.SingletonSerializeAsToken import net.corda.core.utilities.getOrThrow +import net.corda.node.services.statemachine.Checkpoint +import net.corda.node.services.statemachine.Checkpoint.FlowStatus +import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.testing.driver.DriverParameters import net.corda.testing.driver.driver @@ -15,6 +21,8 @@ import net.corda.testing.driver.internal.incrementalPortAllocation import net.corda.testing.node.internal.enclosedCordapp import org.junit.Test import java.sql.DriverManager +import java.time.Instant +import java.util.UUID import kotlin.test.assertEquals import kotlin.test.assertTrue @@ -50,16 +58,52 @@ class CordaPersistenceServiceTests { @CordaService class MultiThreadedDbLoader(private val services: AppServiceHub) : SingletonSerializeAsToken() { - fun createObjects(count: Int) : Int { + fun createObjects(count: Int): Int { (1..count).toList().parallelStream().forEach { + val now = Instant.now() services.database.transaction { - session.save(DBCheckpointStorage.DBCheckpoint().apply { - checkpointId = it.toString() - }) + session.save( + DBCheckpointStorage.DBFlowCheckpoint( + id = it.toString(), + blob = DBCheckpointStorage.DBFlowCheckpointBlob( + checkpoint = ByteArray(8192), + flowStack = ByteArray(8192), + hmac = ByteArray(16), + persistedInstant = now + ), + result = DBCheckpointStorage.DBFlowResult(value = ByteArray(16), persistedInstant = now), + exceptionDetails = null, + status = FlowStatus.RUNNABLE, + compatible = false, + progressStep = "", + ioRequestType = FlowIORequest.ForceCheckpoint.javaClass, + checkpointInstant = Instant.now(), + flowMetadata = createMetadataRecord(UUID.randomUUID(), now) + ) + ) } } return count } + + private fun SessionScope.createMetadataRecord(invocationId: UUID, timestamp: Instant): DBCheckpointStorage.DBFlowMetadata { + val metadata = DBCheckpointStorage.DBFlowMetadata( + invocationId = invocationId.toString(), + flowId = null, + flowName = "random.flow", + userSuppliedIdentifier = null, + startType = DBCheckpointStorage.StartReason.RPC, + launchingCordapp = "this cordapp", + platformVersion = PLATFORM_VERSION, + rpcUsername = "Batman", + invocationInstant = Instant.now(), + receivedInstant = Instant.now(), + startInstant = timestamp, + finishInstant = null + ) + session.save(metadata) + return metadata + } } -} \ No newline at end of file +} diff --git a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt index 38675eb979..6164c92ccd 100644 --- a/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt @@ -109,6 +109,7 @@ import net.corda.node.services.network.PersistentNetworkMapCache import net.corda.node.services.persistence.AbstractPartyDescriptor import net.corda.node.services.persistence.AbstractPartyToX500NameAsStringConverter import net.corda.node.services.persistence.AttachmentStorageInternal +import net.corda.node.services.persistence.DBCheckpointPerformanceRecorder import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.persistence.DBTransactionMappingStorage import net.corda.node.services.persistence.DBTransactionStorage @@ -251,7 +252,6 @@ abstract class AbstractNode(val configuration: NodeConfiguration, } val networkMapCache = PersistentNetworkMapCache(cacheFactory, database, identityService).tokenize() - val checkpointStorage = DBCheckpointStorage() @Suppress("LeakingThis") val transactionStorage = makeTransactionStorage(configuration.transactionCacheSizeBytes).tokenize() val networkMapClient: NetworkMapClient? = configuration.networkServices?.let { NetworkMapClient(it.networkMapURL, versionInfo) } @@ -318,6 +318,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration, @Suppress("LeakingThis") protected val network: MessagingService = makeMessagingService().tokenize() val services = ServiceHubInternalImpl().tokenize() + val checkpointStorage = DBCheckpointStorage(DBCheckpointPerformanceRecorder(services.monitoringService.metrics)) @Suppress("LeakingThis") val smm = makeStateMachineManager() val flowStarter = FlowStarterImpl(smm, flowLogicRefFactory) @@ -1276,7 +1277,7 @@ fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfi try { val dataSource = DataSourceFactory.createDataSource(hikariProperties, metricRegistry = metricRegistry) val schemaMigration = SchemaMigration(schemas, dataSource, databaseConfig, cordappLoader, currentDir, ourName) - schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage().getCheckpointCount(it) != 0L }) + schemaMigration.nodeStartup(dataSource.connection.use { DBCheckpointStorage.getCheckpointCount(it) != 0L }) start(dataSource) } catch (ex: Exception) { when { diff --git a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt index cb7fe99b6f..340226492e 100644 --- a/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt +++ b/node/src/main/kotlin/net/corda/node/internal/CheckpointVerifier.kt @@ -39,7 +39,7 @@ object CheckpointVerifier { checkpointStorage.getAllCheckpoints().use { it.forEach { (_, serializedCheckpoint) -> val checkpoint = try { - serializedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext) + serializedCheckpoint.deserialize(checkpointSerializationContext) } catch (e: ClassNotFoundException) { val message = e.message if (message != null) { diff --git a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt index b463372909..8268322233 100644 --- a/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/api/CheckpointStorage.kt @@ -3,7 +3,7 @@ package net.corda.node.services.api import net.corda.core.flows.StateMachineRunId import net.corda.core.serialization.SerializedBytes import net.corda.node.services.statemachine.Checkpoint -import java.sql.Connection +import net.corda.node.services.statemachine.FlowState import java.util.stream.Stream /** @@ -13,12 +13,12 @@ interface CheckpointStorage { /** * Add a checkpoint for a new id to the store. Will throw if there is already a checkpoint for this id */ - fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) + fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes) /** * Update an existing checkpoint. Will throw if there is not checkpoint for this id. */ - fun updateCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) + fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes) /** * Remove existing checkpoint from the store. @@ -28,21 +28,17 @@ interface CheckpointStorage { /** * Load an existing checkpoint from the store. - * @return the checkpoint, still in serialized form, or null if not found. + * + * The checkpoint returned from this function will be a _clean_ checkpoint. No error information is loaded into the checkpoint + * even if the previous status of the checkpoint was [Checkpoint.FlowStatus.FAILED] or [Checkpoint.FlowStatus.HOSPITALIZED]. + * + * @return The checkpoint, in a partially serialized form, or null if not found. */ - fun getCheckpoint(id: StateMachineRunId): SerializedBytes? + fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? /** * Stream all checkpoints from the store. If this is backed by a database the stream will be valid until the * underlying database connection is closed, so any processing should happen before it is closed. */ - fun getAllCheckpoints(): Stream>> - - /** - * This needs to run before Hibernate is initialised. - * - * @param connection The SQL Connection. - * @return the number of checkpoints stored in the database. - */ - fun getCheckpointCount(connection: Connection): Long + fun getAllCheckpoints(): Stream> } \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointPerformanceRecorder.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointPerformanceRecorder.kt new file mode 100644 index 0000000000..7bc30ec804 --- /dev/null +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointPerformanceRecorder.kt @@ -0,0 +1,60 @@ +package net.corda.node.services.persistence + +import com.codahale.metrics.Gauge +import com.codahale.metrics.Histogram +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.Reservoir +import com.codahale.metrics.SlidingTimeWindowArrayReservoir +import com.codahale.metrics.SlidingTimeWindowReservoir +import net.corda.core.serialization.SerializedBytes +import net.corda.node.services.statemachine.CheckpointState +import net.corda.node.services.statemachine.FlowState +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicLong + +interface CheckpointPerformanceRecorder { + + /** + * Record performance metrics regarding the serialized size of [CheckpointState] and [FlowState] + */ + fun record(serializedCheckpointState: SerializedBytes, serializedFlowState: SerializedBytes) +} + +class DBCheckpointPerformanceRecorder(metrics: MetricRegistry) : CheckpointPerformanceRecorder { + + private val checkpointingMeter = metrics.meter("Flows.Checkpointing Rate") + private val checkpointSizesThisSecond = SlidingTimeWindowReservoir(1, TimeUnit.SECONDS) + private val lastBandwidthUpdate = AtomicLong(0) + private val checkpointBandwidthHist = metrics.register( + "Flows.CheckpointVolumeBytesPerSecondHist", Histogram( + SlidingTimeWindowArrayReservoir(1, TimeUnit.DAYS) + ) + ) + private val checkpointBandwidth = metrics.register( + "Flows.CheckpointVolumeBytesPerSecondCurrent", + LatchedGauge(checkpointSizesThisSecond) + ) + + /** + * This [Gauge] just reports the sum of the bytes checkpointed during the last second. + */ + private class LatchedGauge(private val reservoir: Reservoir) : Gauge { + override fun getValue(): Long { + return reservoir.snapshot.values.sum() + } + } + + override fun record(serializedCheckpointState: SerializedBytes, serializedFlowState: SerializedBytes) { + val totalSize = serializedCheckpointState.size.toLong() + serializedFlowState.size.toLong() + checkpointingMeter.mark() + checkpointSizesThisSecond.update(totalSize) + var lastUpdateTime = lastBandwidthUpdate.get() + while (System.nanoTime() - lastUpdateTime > TimeUnit.SECONDS.toNanos(1)) { + if (lastBandwidthUpdate.compareAndSet(lastUpdateTime, System.nanoTime())) { + val checkpointVolume = checkpointSizesThisSecond.snapshot.values.sum() + checkpointBandwidthHist.update(checkpointVolume) + } + lastUpdateTime = lastBandwidthUpdate.get() + } + } +} \ No newline at end of file diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index 13b5b2ace7..45a6739b2a 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -2,245 +2,392 @@ package net.corda.node.services.persistence import net.corda.core.flows.StateMachineRunId import net.corda.core.internal.FlowIORequest +import net.corda.core.internal.PLATFORM_VERSION +import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes -import net.corda.core.utilities.ProgressTracker -import net.corda.core.utilities.debug +import net.corda.core.serialization.serialize +import net.corda.core.utilities.contextLogger import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.Checkpoint.FlowStatus +import net.corda.node.services.statemachine.CheckpointState +import net.corda.node.services.statemachine.ErrorState +import net.corda.node.services.statemachine.FlowState import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX import net.corda.nodeapi.internal.persistence.currentDBSession import org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY +import org.hibernate.annotations.Type import org.slf4j.Logger import org.slf4j.LoggerFactory -import java.util.* -import java.util.stream.Stream -import javax.persistence.Column -import javax.persistence.Entity -import javax.persistence.Id -import org.hibernate.annotations.Type -import java.lang.Exception -import java.math.BigInteger import java.sql.Connection import java.sql.SQLException import java.time.Instant +import java.util.UUID +import java.util.stream.Stream +import javax.persistence.CascadeType +import javax.persistence.Column +import javax.persistence.Entity import javax.persistence.FetchType +import javax.persistence.GeneratedValue +import javax.persistence.GenerationType +import javax.persistence.Id import javax.persistence.JoinColumn import javax.persistence.OneToOne /** * Simple checkpoint key value storage in DB. */ -class DBCheckpointStorage : CheckpointStorage { - val log: Logger = LoggerFactory.getLogger(this::class.java) +class DBCheckpointStorage(private val checkpointPerformanceRecorder: CheckpointPerformanceRecorder) : CheckpointStorage { + + companion object { + val log = contextLogger() + + private const val HMAC_SIZE_BYTES = 16 + + /** + * This needs to run before Hibernate is initialised. + * + * No need to set up [DBCheckpointStorage] fully for this function + * + * @param connection The SQL Connection. + * @return the number of checkpoints stored in the database. + */ + fun getCheckpointCount(connection: Connection): Long { + // No need to set up [DBCheckpointStorage] fully for this function + return try { + connection.prepareStatement("select count(*) from node_checkpoints").use { ps -> + ps.executeQuery().use { rs -> + rs.next() + rs.getLong(1) + } + } + } catch (e: SQLException) { + // Happens when the table was not created yet. + 0L + } + } + } enum class StartReason { RPC, FLOW, SERVICE, SCHEDULED, INITIATED } @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_new") + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") class DBFlowCheckpoint( - @Id - @Column(name = "flow_id", length = 64, nullable = false) - var id: String, + @Id + @Column(name = "flow_id", length = 64, nullable = false) + var id: String, - @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id") - var blob: DBFlowCheckpointBlob, + @OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true) + @JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id") + var blob: DBFlowCheckpointBlob, - @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "result_id", referencedColumnName = "id") - var result: DBFlowResult?, + @OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true) + @JoinColumn(name = "result_id", referencedColumnName = "id") + var result: DBFlowResult?, - @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "error_id", referencedColumnName = "id") - var exceptionDetails: DBFlowException?, + @OneToOne(fetch = FetchType.LAZY, cascade = [CascadeType.ALL], optional = true) + @JoinColumn(name = "error_id", referencedColumnName = "id") + var exceptionDetails: DBFlowException?, - @OneToOne(fetch = FetchType.LAZY) - @JoinColumn(name = "flow_id", referencedColumnName = "flow_id") - var flowMetadata: DBFlowMetadata, + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "invocation_id", referencedColumnName = "invocation_id") + var flowMetadata: DBFlowMetadata, - @Column(name = "status") - var status: FlowStatus, + @Column(name = "status", nullable = false) + var status: FlowStatus, - @Column(name = "compatible") - var compatible: Boolean, + @Column(name = "compatible", nullable = false) + var compatible: Boolean, - @Column(name = "progress_step") - var progressStep: String, + @Column(name = "progress_step") + var progressStep: String?, - @Column(name = "flow_io_request") - var ioRequestType: Class>, + @Column(name = "flow_io_request") + var ioRequestType: Class>?, - @Column(name = "timestamp") - var checkpointInstant: Instant + @Column(name = "timestamp", nullable = false) + var checkpointInstant: Instant ) @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs") + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoint_blobs") class DBFlowCheckpointBlob( - @Id - @Column(name = "id", nullable = false) - var id: BigInteger? = null, + @Id + @GeneratedValue(strategy = GenerationType.SEQUENCE) + @Column(name = "id", nullable = false) + private var id: Long = 0, - @Type(type = "corda-blob") - @Column(name = "checkpoint_value", nullable = false) - var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, + @Type(type = "corda-blob") + @Column(name = "checkpoint_value", nullable = false) + var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, - @Type(type = "corda-blob") - @Column(name = "flow_state", nullable = false) - var flowStack: ByteArray = EMPTY_BYTE_ARRAY, + // A future task will make this nullable + @Type(type = "corda-blob") + @Column(name = "flow_state", nullable = false) + var flowStack: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "timestamp") - var persistedInstant: Instant? = null + @Column(name = "hmac") + var hmac: ByteArray, + + @Column(name = "timestamp") + var persistedInstant: Instant ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_results") class DBFlowResult( - @Id - @Column(name = "id", nullable = false) - var id: BigInteger? = null, + @Id + @Column(name = "id", nullable = false) + @GeneratedValue(strategy = GenerationType.SEQUENCE) + private var id: Long = 0, - @Type(type = "corda-blob") - @Column(name = "result_value", nullable = false) - var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, + @Type(type = "corda-blob") + @Column(name = "result_value", nullable = false) + var value: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "timestamp") - val persistedInstant: Instant? = null + @Column(name = "timestamp") + val persistedInstant: Instant ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_exceptions") class DBFlowException( - @Id - @Column(name = "id", nullable = false) - var id: BigInteger? = null, + @Id + @Column(name = "id", nullable = false) + @GeneratedValue(strategy = GenerationType.SEQUENCE) + private var id: Long = 0, - @Column(name = "type", nullable = false) - var type: Class, + @Column(name = "type", nullable = false) + var type: Class, - @Type(type = "corda-blob") - @Column(name = "exception_value", nullable = false) - var value: ByteArray = EMPTY_BYTE_ARRAY, + @Type(type = "corda-blob") + @Column(name = "exception_value", nullable = false) + var value: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "exception_message") - var message: String? = null, + @Column(name = "exception_message") + var message: String? = null, - @Column(name = "timestamp") - val persistedInstant: Instant? = null + @Column(name = "timestamp") + val persistedInstant: Instant ) @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_metadata") class DBFlowMetadata( - @Id - @Column(name = "flow_id", length = 64, nullable = false) - var flowId: String, + @Id + @Column(name = "invocation_id", nullable = false) + var invocationId: String, - @Column(name = "flow_name", nullable = false) - var flowName: String, + @Column(name = "flow_id", nullable = true) + var flowId: String?, - @Column(name = "flow_identifier", nullable = true) - var userSuppliedIdentifier: String?, + @Column(name = "flow_name", nullable = false) + var flowName: String, - @Column(name = "started_type", nullable = false) - var startType: StartReason, + @Column(name = "flow_identifier", nullable = true) + var userSuppliedIdentifier: String?, - @Column(name = "flow_parameters", nullable = false) - var initialParameters: ByteArray = EMPTY_BYTE_ARRAY, + @Column(name = "started_type", nullable = false) + var startType: StartReason, - @Column(name = "cordapp_name", nullable = false) - var launchingCordapp: String, + @Column(name = "flow_parameters", nullable = false) + var initialParameters: ByteArray = EMPTY_BYTE_ARRAY, - @Column(name = "platform_version", nullable = false) - var platformVersion: Int, + @Column(name = "cordapp_name", nullable = false) + var launchingCordapp: String, - @Column(name = "rpc_user", nullable = false) - var rpcUsername: String, + @Column(name = "platform_version", nullable = false) + var platformVersion: Int, - @Column(name = "invocation_time", nullable = false) - var invocationInstant: Instant, + @Column(name = "rpc_user", nullable = false) + var rpcUsername: String, - @Column(name = "received_time", nullable = false) - var receivedInstant: Instant, + @Column(name = "invocation_time", nullable = false) + var invocationInstant: Instant, - @Column(name = "start_time", nullable = true) - var startInstant: Instant?, + @Column(name = "received_time", nullable = false) + var receivedInstant: Instant, - @Column(name = "finish_time", nullable = true) - var finishInstant: Instant? + @Column(name = "start_time", nullable = true) + var startInstant: Instant?, + + @Column(name = "finish_time", nullable = true) + var finishInstant: Instant? ) - @Entity - @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") - class DBCheckpoint( - @Id - @Suppress("MagicNumber") // database column width - @Column(name = "checkpoint_id", length = 64, nullable = false) - var checkpointId: String = "", - - @Type(type = "corda-blob") - @Column(name = "checkpoint_value", nullable = false) - var checkpoint: ByteArray = EMPTY_BYTE_ARRAY - ) { - override fun toString() = "DBCheckpoint(checkpointId = ${checkpointId}, checkpointSize = ${checkpoint.size})" + override fun addCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes) { + currentDBSession().save(createDBCheckpoint(id, checkpoint, serializedFlowState)) } - override fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) { - currentDBSession().save(DBCheckpoint().apply { - checkpointId = id.uuid.toString() - this.checkpoint = checkpoint.bytes - log.debug { "Checkpoint $checkpointId, size=${this.checkpoint.size}" } - }) - } - - override fun updateCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) { - currentDBSession().update(DBCheckpoint().apply { - checkpointId = id.uuid.toString() - this.checkpoint = checkpoint.bytes - log.debug { "Checkpoint $checkpointId, size=${this.checkpoint.size}" } - }) + override fun updateCheckpoint(id: StateMachineRunId, checkpoint: Checkpoint, serializedFlowState: SerializedBytes) { + currentDBSession().update(updateDBCheckpoint(id, checkpoint, serializedFlowState)) } override fun removeCheckpoint(id: StateMachineRunId): Boolean { val session = currentDBSession() val criteriaBuilder = session.criteriaBuilder - val delete = criteriaBuilder.createCriteriaDelete(DBCheckpoint::class.java) - val root = delete.from(DBCheckpoint::class.java) - delete.where(criteriaBuilder.equal(root.get(DBCheckpoint::checkpointId.name), id.uuid.toString())) + val delete = criteriaBuilder.createCriteriaDelete(DBFlowCheckpoint::class.java) + val root = delete.from(DBFlowCheckpoint::class.java) + delete.where(criteriaBuilder.equal(root.get(DBFlowCheckpoint::id.name), id.uuid.toString())) return session.createQuery(delete).executeUpdate() > 0 } - override fun getCheckpoint(id: StateMachineRunId): SerializedBytes? { - val bytes = currentDBSession().get(DBCheckpoint::class.java, id.uuid.toString())?.checkpoint ?: return null - return SerializedBytes(bytes) + override fun getCheckpoint(id: StateMachineRunId): Checkpoint.Serialized? { + return currentDBSession().get(DBFlowCheckpoint::class.java, id.uuid.toString())?.toSerializedCheckpoint() } - override fun getAllCheckpoints(): Stream>> { + override fun getAllCheckpoints(): Stream> { val session = currentDBSession() - val criteriaQuery = session.criteriaBuilder.createQuery(DBCheckpoint::class.java) - val root = criteriaQuery.from(DBCheckpoint::class.java) + val criteriaQuery = session.criteriaBuilder.createQuery(DBFlowCheckpoint::class.java) + val root = criteriaQuery.from(DBFlowCheckpoint::class.java) criteriaQuery.select(root) return session.createQuery(criteriaQuery).stream().map { - StateMachineRunId(UUID.fromString(it.checkpointId)) to SerializedBytes(it.checkpoint) + StateMachineRunId(UUID.fromString(it.id)) to it.toSerializedCheckpoint() } } - override fun getCheckpointCount(connection: Connection): Long { - return try { - connection.prepareStatement("select count(*) from node_checkpoints").use { ps -> - ps.executeQuery().use { rs -> - rs.next() - rs.getLong(1) - } - } - } catch (e: SQLException) { - // Happens when the table was not created yet. - 0L + private fun createDBCheckpoint( + id: StateMachineRunId, + checkpoint: Checkpoint, + serializedFlowState: SerializedBytes + ): DBFlowCheckpoint { + val flowId = id.uuid.toString() + val now = Instant.now() + val invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value + + val serializedCheckpointState = checkpoint.checkpointState.storageSerialize() + checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState) + + val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now) + // Need to update the metadata record to join it to the main checkpoint record + + // This code needs to be added back in once the metadata record is properly created (remove the code below it) + // val metadata = requireNotNull(currentDBSession().find( + // DBFlowMetadata::class.java, + // invocationId + // )) { "The flow metadata record for flow [$flowId] with invocation id [$invocationId] does not exist"} + val metadata = (currentDBSession().find( + DBFlowMetadata::class.java, + invocationId + )) ?: createTemporaryMetadata(checkpoint) + metadata.flowId = flowId + currentDBSession().update(metadata) + // Most fields are null as they cannot have been set when creating the initial checkpoint + return DBFlowCheckpoint( + id = flowId, + blob = blob, + result = null, + exceptionDetails = null, + flowMetadata = metadata, + status = checkpoint.status, + compatible = checkpoint.compatible, + progressStep = null, + ioRequestType = null, + checkpointInstant = Instant.now() + ) + } + + // Remove this when saving of metadata is properly handled + private fun createTemporaryMetadata(checkpoint: Checkpoint): DBFlowMetadata { + return DBFlowMetadata( + invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value, + flowId = null, + flowName = "random.flow", + userSuppliedIdentifier = null, + startType = DBCheckpointStorage.StartReason.RPC, + launchingCordapp = "this cordapp", + platformVersion = PLATFORM_VERSION, + rpcUsername = "Batman", + invocationInstant = checkpoint.checkpointState.invocationContext.trace.invocationId.timestamp, + receivedInstant = Instant.now(), + startInstant = null, + finishInstant = null + ).apply { + currentDBSession().save(this) } } + + private fun updateDBCheckpoint( + id: StateMachineRunId, + checkpoint: Checkpoint, + serializedFlowState: SerializedBytes + ): DBFlowCheckpoint { + val flowId = id.uuid.toString() + val now = Instant.now() + + val serializedCheckpointState = checkpoint.checkpointState.storageSerialize() + checkpointPerformanceRecorder.record(serializedCheckpointState, serializedFlowState) + + val blob = createDBCheckpointBlob(serializedCheckpointState, serializedFlowState, now) + val result = checkpoint.result?.let { createDBFlowResult(it, now) } + val exceptionDetails = (checkpoint.errorState as? ErrorState.Errored)?.let { createDBFlowException(it, now) } + // Load the previous entity from the hibernate cache so the meta data join does not get updated + val entity = currentDBSession().find(DBFlowCheckpoint::class.java, flowId) + return entity.apply { + this.blob = blob + this.result = result + this.exceptionDetails = exceptionDetails + // Do not update the meta data relationship on updates + this.flowMetadata = entity.flowMetadata + this.status = checkpoint.status + this.compatible = checkpoint.compatible + this.progressStep = checkpoint.progressStep + this.ioRequestType = checkpoint.flowIoRequest + this.checkpointInstant = now + } + } + + private fun createDBCheckpointBlob( + serializedCheckpointState: SerializedBytes, + serializedFlowState: SerializedBytes, + now: Instant + ): DBFlowCheckpointBlob { + return DBFlowCheckpointBlob( + checkpoint = serializedCheckpointState.bytes, + flowStack = serializedFlowState.bytes, + hmac = ByteArray(HMAC_SIZE_BYTES), + persistedInstant = now + ) + } + + private fun createDBFlowResult(result: Any, now: Instant): DBFlowResult { + return DBFlowResult( + value = result.storageSerialize().bytes, + persistedInstant = now + ) + } + + private fun createDBFlowException(errorState: ErrorState.Errored, now: Instant): DBFlowException { + return errorState.errors.last().exception.let { + DBFlowException( + type = it::class.java, + message = it.message, + value = it.storageSerialize().bytes, + persistedInstant = now + ) + } + } + + private fun DBFlowCheckpoint.toSerializedCheckpoint(): Checkpoint.Serialized { + return Checkpoint.Serialized( + serializedCheckpointState = SerializedBytes(blob.checkpoint), + serializedFlowState = SerializedBytes(blob.flowStack), + // Always load as a [Clean] checkpoint to represent that the checkpoint is the last _good_ checkpoint + errorState = ErrorState.Clean, + // A checkpoint with a result should not normally be loaded (it should be [null] most of the time) + result = result?.let { SerializedBytes(it.value) }, + status = status, + progressStep = progressStep, + flowIoRequest = ioRequestType, + compatible = compatible + ) + } + + private fun T.storageSerialize(): SerializedBytes { + return serialize(context = SerializationDefaults.STORAGE_CONTEXT) + } } diff --git a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt index d27a480a5a..c3979612ab 100644 --- a/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/rpc/CheckpointDumperImpl.kt @@ -149,8 +149,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri instrumentCheckpointAgent(runId) val (bytes, fileName) = try { - val checkpoint = - serialisedCheckpoint.checkpointDeserialize(context = checkpointSerializationContext) + val checkpoint = serialisedCheckpoint.deserialize(checkpointSerializationContext) val json = checkpoint.toJson(runId.uuid, now) val jsonBytes = writer.writeValueAsBytes(json) jsonBytes to "${json.topLevelFlowClass.simpleName}-${runId.uuid}.json" @@ -229,6 +228,7 @@ class CheckpointDumperImpl(private val checkpointStorage: CheckpointStorage, pri origin = checkpointState.invocationContext.origin.toOrigin(), ourIdentity = checkpointState.ourIdentity, activeSessions = checkpointState.sessions.mapNotNull { it.value.toActiveSession(it.key) }, + // This can only ever return as [ErrorState.Clean] which causes it to become [null] errored = errorState as? ErrorState.Errored ) } diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index 73d7cbb3d9..d38c6371ef 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -33,10 +33,7 @@ class NodeSchemaService(private val extraSchemas: Set = emptySet() object NodeCore object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1, - mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java, - - //new checkpoints - keeping old around to allow testing easily (for now) - DBCheckpointStorage.DBFlowCheckpoint::class.java, + mappedTypes = listOf(DBCheckpointStorage.DBFlowCheckpoint::class.java, DBCheckpointStorage.DBFlowCheckpointBlob::class.java, DBCheckpointStorage.DBFlowResult::class.java, DBCheckpointStorage.DBFlowException::class.java, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt index 3ffdd4b709..e7a45b145b 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/ActionExecutorImpl.kt @@ -31,8 +31,7 @@ class ActionExecutorImpl( private val checkpointStorage: CheckpointStorage, private val flowMessaging: FlowMessaging, private val stateMachineManager: StateMachineManagerInternal, - private val checkpointSerializationContext: CheckpointSerializationContext, - metrics: MetricRegistry + private val checkpointSerializationContext: CheckpointSerializationContext ) : ActionExecutor { private companion object { @@ -48,12 +47,6 @@ class ActionExecutorImpl( } } - private val checkpointingMeter = metrics.meter("Flows.Checkpointing Rate") - private val checkpointSizesThisSecond = SlidingTimeWindowReservoir(1, TimeUnit.SECONDS) - private val lastBandwidthUpdate = AtomicLong(0) - private val checkpointBandwidthHist = metrics.register("Flows.CheckpointVolumeBytesPerSecondHist", Histogram(SlidingTimeWindowArrayReservoir(1, TimeUnit.DAYS))) - private val checkpointBandwidth = metrics.register("Flows.CheckpointVolumeBytesPerSecondCurrent", LatchedGauge(checkpointSizesThisSecond)) - @Suspendable override fun executeAction(fiber: FlowFiber, action: Action) { log.trace { "Flow ${fiber.id} executing $action" } @@ -100,21 +93,12 @@ class ActionExecutorImpl( @Suspendable private fun executePersistCheckpoint(action: Action.PersistCheckpoint) { - val checkpointBytes = serializeCheckpoint(action.checkpoint) + val checkpoint = action.checkpoint + val serializedFlowState = checkpoint.flowState.checkpointSerialize(checkpointSerializationContext) if (action.isCheckpointUpdate) { - checkpointStorage.updateCheckpoint(action.id, checkpointBytes) + checkpointStorage.updateCheckpoint(action.id, checkpoint, serializedFlowState) } else { - checkpointStorage.addCheckpoint(action.id, checkpointBytes) - } - checkpointingMeter.mark() - checkpointSizesThisSecond.update(checkpointBytes.size.toLong()) - var lastUpdateTime = lastBandwidthUpdate.get() - while (System.nanoTime() - lastUpdateTime > TimeUnit.SECONDS.toNanos(1)) { - if (lastBandwidthUpdate.compareAndSet(lastUpdateTime, System.nanoTime())) { - val checkpointVolume = checkpointSizesThisSecond.snapshot.values.sum() - checkpointBandwidthHist.update(checkpointVolume) - } - lastUpdateTime = lastBandwidthUpdate.get() + checkpointStorage.addCheckpoint(action.id, checkpoint, serializedFlowState) } } @@ -258,10 +242,6 @@ class ActionExecutorImpl( stateMachineManager.retryFlowFromSafePoint(action.currentState) } - private fun serializeCheckpoint(checkpoint: Checkpoint): SerializedBytes { - return checkpoint.checkpointSerialize(context = checkpointSerializationContext) - } - private fun cancelFlowTimeout(action: Action.CancelFlowTimeout) { stateMachineManager.cancelFlowTimeout(action.flowId) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 1352fb6ec6..d722ad9fdb 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -590,7 +590,7 @@ class SingleThreadedStateMachineManager( // The checkpoint will be missing if the flow failed before persisting the original checkpoint // CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay) checkpointStorage.getCheckpoint(flowId)?.let { serializedCheckpoint -> - val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId) + val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, flowId) if (checkpoint == null) { return openFuture>().mapError { IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " + @@ -758,14 +758,23 @@ class SingleThreadedStateMachineManager( } } + private fun tryDeserializeCheckpoint(serializedCheckpoint: Checkpoint.Serialized, flowId: StateMachineRunId): Checkpoint? { + return try { + serializedCheckpoint.deserialize(checkpointSerializationContext!!) + } catch (e: Exception) { + logger.error("Unable to deserialize checkpoint for flow $flowId. Something is very wrong and this flow will be ignored.", e) + null + } + } + private fun createFlowFromCheckpoint( id: StateMachineRunId, - serializedCheckpoint: SerializedBytes, + serializedCheckpoint: Checkpoint.Serialized, isAnyCheckpointPersisted: Boolean, isStartIdempotent: Boolean, initialDeduplicationHandler: DeduplicationHandler? ): Flow? { - val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, id) ?: return null + val checkpoint = tryDeserializeCheckpoint(serializedCheckpoint, id) ?: return null val flowState = checkpoint.flowState val resultFuture = openFuture() val fiber = when (flowState) { @@ -865,8 +874,7 @@ class SingleThreadedStateMachineManager( checkpointStorage, flowMessaging, this, - checkpointSerializationContext, - metrics + checkpointSerializationContext ) } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt index f4c9e790b6..99882aa022 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/StateMachineState.kt @@ -7,7 +7,12 @@ import net.corda.core.flows.FlowInfo import net.corda.core.flows.FlowLogic import net.corda.core.identity.Party import net.corda.core.internal.FlowIORequest +import net.corda.core.serialization.CordaSerializable +import net.corda.core.serialization.SerializationDefaults import net.corda.core.serialization.SerializedBytes +import net.corda.core.serialization.deserialize +import net.corda.core.serialization.internal.CheckpointSerializationContext +import net.corda.core.serialization.internal.checkpointDeserialize import net.corda.core.utilities.Try import net.corda.node.services.messaging.DeduplicationHandler import java.time.Instant @@ -57,9 +62,10 @@ data class Checkpoint( val result: Any? = null, val status: FlowStatus = FlowStatus.RUNNABLE, val progressStep: String? = null, - val flowIoRequest: FlowIORequest<*>? = null, + val flowIoRequest: Class>? = null, val compatible: Boolean = true ) { + @CordaSerializable enum class FlowStatus { RUNNABLE, FAILED, @@ -84,9 +90,15 @@ data class Checkpoint( ): Try { return SubFlow.create(flowLogicClass, subFlowVersion, isEnabledTimedFlow).map { topLevelSubFlow -> Checkpoint( - checkpointState = CheckpointState(invocationContext, ourIdentity, emptyMap(), listOf(topLevelSubFlow), numberOfSuspends = 0), - flowState = FlowState.Unstarted(flowStart, frozenFlowLogic), - errorState = ErrorState.Clean + checkpointState = CheckpointState( + invocationContext, + ourIdentity, + emptyMap(), + listOf(topLevelSubFlow), + numberOfSuspends = 0 + ), + errorState = ErrorState.Clean, + flowState = FlowState.Unstarted(flowStart, frozenFlowLogic) ) } } @@ -123,6 +135,41 @@ data class Checkpoint( fun addSubflow(subFlow: SubFlow) : Checkpoint { return copy(checkpointState = checkpointState.copy(subFlowStack = checkpointState.subFlowStack + subFlow)) } + + /** + * A partially serialized form of [Checkpoint]. + * + * [Checkpoint.Serialized] contains the same fields as [Checkpoint] except that some of its fields are still serialized. The checkpoint + * can then be deserialized as needed. + */ + data class Serialized( + val serializedCheckpointState: SerializedBytes, + val serializedFlowState: SerializedBytes, + val errorState: ErrorState, + val result: SerializedBytes?, + val status: FlowStatus, + val progressStep: String?, + val flowIoRequest: Class>?, + val compatible: Boolean + ) { + /** + * Deserializes the serialized fields contained in [Checkpoint.Serialized]. + * + * @return A [Checkpoint] with all its fields filled in from [Checkpoint.Serialized] + */ + fun deserialize(checkpointSerializationContext: CheckpointSerializationContext): Checkpoint { + return Checkpoint( + checkpointState = serializedCheckpointState.deserialize(context = SerializationDefaults.STORAGE_CONTEXT), + flowState = serializedFlowState.checkpointDeserialize(checkpointSerializationContext), + errorState = errorState, + result = result?.deserialize(context = SerializationDefaults.STORAGE_CONTEXT), + status = status, + progressStep = progressStep, + flowIoRequest = flowIoRequest, + compatible = compatible + ) + } + } } /** @@ -132,12 +179,13 @@ data class Checkpoint( * @param subFlowStack the stack of currently executing subflows. * @param numberOfSuspends the number of flow suspends due to IO API calls. */ +@CordaSerializable data class CheckpointState( - val invocationContext: InvocationContext, - val ourIdentity: Party, - val sessions: SessionMap, // This must preserve the insertion order! - val subFlowStack: List, - val numberOfSuspends: Int + val invocationContext: InvocationContext, + val ourIdentity: Party, + val sessions: SessionMap, // This must preserve the insertion order! + val subFlowStack: List, + val numberOfSuspends: Int ) /** @@ -254,17 +302,20 @@ sealed class FlowState { * @param exception the exception itself. Note that this may not contain information about the source error depending * on whether the source error was a FlowException or otherwise. */ +@CordaSerializable data class FlowError(val errorId: Long, val exception: Throwable) /** * The flow's error state. */ +@CordaSerializable sealed class ErrorState { abstract fun addErrors(newErrors: List): ErrorState /** * The flow is in a clean state. */ + @CordaSerializable object Clean : ErrorState() { override fun addErrors(newErrors: List): ErrorState { return Errored(newErrors, 0, false) @@ -281,6 +332,7 @@ sealed class ErrorState { * @param propagating true if error propagation was triggered. If this is set the dirtiness is permanent as the * sessions associated with the flow have been (or about to be) dirtied in counter-flows. */ + @CordaSerializable data class Errored( val errors: List, val propagatedIndex: Int, diff --git a/node/src/main/resources/migration/node-core.changelog-v17-keys.xml b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml index fdc812a02c..482c4d6418 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17-keys.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml @@ -5,30 +5,31 @@ logicalFilePath="migration/node-services.changelog-init.xml"> - + - - - + + - + + + - + + - - \ No newline at end of file + diff --git a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml index 9602ec51b3..acb3557060 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml @@ -5,7 +5,8 @@ logicalFilePath="migration/node-services.changelog-init.xml"> - + + @@ -18,6 +19,9 @@ + + + @@ -84,7 +88,7 @@ - + @@ -94,7 +98,7 @@ - + @@ -136,4 +140,4 @@ - \ No newline at end of file + diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml index 2c1789fc95..266361124e 100644 --- a/node/src/main/resources/migration/node-core.changelog-v17.xml +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -5,7 +5,8 @@ logicalFilePath="migration/node-services.changelog-init.xml"> - + + @@ -18,6 +19,9 @@ + + + @@ -84,7 +88,7 @@ - + @@ -94,7 +98,7 @@ - + @@ -136,4 +140,4 @@ - \ No newline at end of file + diff --git a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt index c7ceb785da..c30044cb02 100644 --- a/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/persistence/DBCheckpointStorageTests.kt @@ -3,6 +3,7 @@ package net.corda.node.services.persistence import net.corda.core.context.InvocationContext import net.corda.core.flows.FlowLogic import net.corda.core.flows.StateMachineRunId +import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.internal.CheckpointSerializationDefaults import net.corda.core.serialization.internal.checkpointSerialize @@ -10,11 +11,16 @@ import net.corda.node.internal.CheckpointIncompatibleException import net.corda.node.internal.CheckpointVerifier import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.Checkpoint +import net.corda.node.services.statemachine.CheckpointState +import net.corda.node.services.statemachine.ErrorState +import net.corda.node.services.statemachine.FlowError import net.corda.node.services.statemachine.FlowStart +import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.SubFlowVersion import net.corda.node.services.transactions.PersistentUniquenessProvider import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.nodeapi.internal.persistence.DatabaseConfig +import net.corda.nodeapi.internal.persistence.DatabaseTransaction import net.corda.testing.core.ALICE_NAME import net.corda.testing.core.SerializationEnvironmentRule import net.corda.testing.core.TestIdentity @@ -28,9 +34,15 @@ import org.junit.After import org.junit.Before import org.junit.Rule import org.junit.Test +import java.lang.IllegalStateException +import java.time.Instant import kotlin.streams.toList +import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertNull +import kotlin.test.assertTrue -internal fun CheckpointStorage.checkpoints(): List> { +internal fun CheckpointStorage.checkpoints(): List { return getAllCheckpoints().use { it.map { it.second }.toList() } @@ -61,26 +73,42 @@ class DBCheckpointStorageTests { LogHelper.reset(PersistentUniquenessProvider::class) } - @Test(timeout=300_000) - fun `add new checkpoint`() { + @Test(timeout = 300_000) + fun `add new checkpoint`() { val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint) + assertEquals( + checkpoint, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } newCheckpointStorage() database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint) + assertEquals( + checkpoint, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) + session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).also { + assertNotNull(it) + assertNotNull(it.blob) + } } } - @Test(timeout=300_000) - fun `remove checkpoint`() { + @Test(timeout = 300_000) + fun `remove checkpoint`() { val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } database.transaction { checkpointStorage.removeCheckpoint(id) @@ -94,58 +122,87 @@ class DBCheckpointStorageTests { } } - @Test(timeout=300_000) - fun `add and remove checkpoint in single commit operate`() { + @Test(timeout = 300_000) + fun `add and remove checkpoint in single commit operation`() { val (id, checkpoint) = newCheckpoint() + val serializedFlowState = checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) val (id2, checkpoint2) = newCheckpoint() + val serializedFlowState2 = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) - checkpointStorage.addCheckpoint(id2, checkpoint2) + createMetadataRecord(checkpoint) + createMetadataRecord(checkpoint2) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + checkpointStorage.addCheckpoint(id2, checkpoint2, serializedFlowState2) checkpointStorage.removeCheckpoint(id) } database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint2) + assertEquals( + checkpoint2, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } newCheckpointStorage() database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(checkpoint2) + assertEquals( + checkpoint2, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } } - @Test(timeout=300_000) - fun `add two checkpoints then remove first one`() { + @Test(timeout = 300_000) + fun `add two checkpoints then remove first one`() { val (id, firstCheckpoint) = newCheckpoint() + val serializedFirstFlowState = + firstCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { - checkpointStorage.addCheckpoint(id, firstCheckpoint) + createMetadataRecord(firstCheckpoint) + checkpointStorage.addCheckpoint(id, firstCheckpoint, serializedFirstFlowState) } val (id2, secondCheckpoint) = newCheckpoint() + val serializedSecondFlowState = + secondCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id2, secondCheckpoint) + createMetadataRecord(secondCheckpoint) + checkpointStorage.addCheckpoint(id2, secondCheckpoint, serializedSecondFlowState) } database.transaction { checkpointStorage.removeCheckpoint(id) } database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint) + assertEquals( + secondCheckpoint, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } newCheckpointStorage() database.transaction { - assertThat(checkpointStorage.checkpoints()).containsExactly(secondCheckpoint) + assertEquals( + secondCheckpoint, + checkpointStorage.checkpoints().single().deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + ) } } - @Test(timeout=300_000) - fun `add checkpoint and then remove after 'restart'`() { + @Test(timeout = 300_000) + fun `add checkpoint and then remove after 'restart'`() { val (id, originalCheckpoint) = newCheckpoint() + val serializedOriginalFlowState = + originalCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage.addCheckpoint(id, originalCheckpoint) + createMetadataRecord(originalCheckpoint) + checkpointStorage.addCheckpoint(id, originalCheckpoint, serializedOriginalFlowState) } newCheckpointStorage() val reconstructedCheckpoint = database.transaction { checkpointStorage.checkpoints().single() } database.transaction { - assertThat(reconstructedCheckpoint).isEqualTo(originalCheckpoint).isNotSameAs(originalCheckpoint) + assertEquals(originalCheckpoint, reconstructedCheckpoint.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)) + assertThat(reconstructedCheckpoint.serializedFlowState).isEqualTo(serializedOriginalFlowState) + .isNotSameAs(serializedOriginalFlowState) } database.transaction { checkpointStorage.removeCheckpoint(id) @@ -155,12 +212,15 @@ class DBCheckpointStorageTests { } } - @Test(timeout=300_000) - fun `verify checkpoints compatible`() { + @Test(timeout = 300_000) + fun `verify checkpoints compatible`() { val mockServices = MockServices(emptyList(), ALICE.name) database.transaction { val (id, checkpoint) = newCheckpoint(1) - checkpointStorage.addCheckpoint(id, checkpoint) + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) } database.transaction { @@ -169,7 +229,10 @@ class DBCheckpointStorageTests { database.transaction { val (id1, checkpoint1) = newCheckpoint(2) - checkpointStorage.addCheckpoint(id1, checkpoint1) + val serializedFlowState1 = + checkpoint1.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + createMetadataRecord(checkpoint1) + checkpointStorage.addCheckpoint(id1, checkpoint1, serializedFlowState1) } assertThatThrownBy { @@ -179,21 +242,158 @@ class DBCheckpointStorageTests { }.isInstanceOf(CheckpointIncompatibleException::class.java) } - private fun newCheckpointStorage() { + @Test(timeout = 300_000) + fun `checkpoint can be recreated from database record`() { + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) database.transaction { - checkpointStorage = DBCheckpointStorage() + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + database.transaction { + assertEquals(serializedFlowState, checkpointStorage.checkpoints().single().serializedFlowState) + } + database.transaction { + assertEquals(checkpoint, checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT)) } } - private fun newCheckpoint(version: Int = 1): Pair> { + @Test(timeout = 300_000) + fun `update checkpoint with result information`() { + val result = "This is the result" + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.copy(result = result) + val updatedSerializedFlowState = + updatedCheckpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { + checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) + } + database.transaction { + assertEquals( + result, + checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).result + ) + assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).result) + } + } + + @Test(timeout = 300_000) + fun `update checkpoint with error information`() { + val exception = IllegalStateException("I am a naughty exception") + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.copy( + errorState = ErrorState.Errored( + listOf( + FlowError( + 0, + exception + ) + ), 0, false + ) + ) + val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } + database.transaction { + // Checkpoint always returns clean error state when retrieved via [getCheckpoint] + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean) + assertNotNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails) + } + } + + @Test(timeout = 300_000) + fun `clean checkpoints clear out error information from the database`() { + val exception = IllegalStateException("I am a naughty exception") + val (id, checkpoint) = newCheckpoint() + val serializedFlowState = + checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { + createMetadataRecord(checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializedFlowState) + } + val updatedCheckpoint = checkpoint.copy( + errorState = ErrorState.Errored( + listOf( + FlowError( + 0, + exception + ) + ), 0, false + ) + ) + val updatedSerializedFlowState = updatedCheckpoint.flowState.checkpointSerialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + database.transaction { checkpointStorage.updateCheckpoint(id, updatedCheckpoint, updatedSerializedFlowState) } + database.transaction { + // Checkpoint always returns clean error state when retrieved via [getCheckpoint] + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean) + } + // Set back to clean + database.transaction { checkpointStorage.updateCheckpoint(id, checkpoint, serializedFlowState) } + database.transaction { + assertTrue(checkpointStorage.getCheckpoint(id)!!.deserialize(CheckpointSerializationDefaults.CHECKPOINT_CONTEXT).errorState is ErrorState.Clean) + assertNull(session.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, id.uuid.toString()).exceptionDetails) + } + } + + private fun newCheckpointStorage() { + database.transaction { + checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder { + override fun record( + serializedCheckpointState: SerializedBytes, + serializedFlowState: SerializedBytes + ) { + // do nothing + } + }) + } + } + + private fun newCheckpoint(version: Int = 1): Pair { val id = StateMachineRunId.createRandom() val logic: FlowLogic<*> = object : FlowLogic() { override fun call() {} } val frozenLogic = logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) - val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, ALICE, SubFlowVersion.CoreFlow(version), false) - .getOrThrow() - return id to checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + val checkpoint = Checkpoint.create( + InvocationContext.shell(), + FlowStart.Explicit, + logic.javaClass, + frozenLogic, + ALICE, + SubFlowVersion.CoreFlow(version), + false + ) + .getOrThrow() + return id to checkpoint } + private fun DatabaseTransaction.createMetadataRecord(checkpoint: Checkpoint) { + val metadata = DBCheckpointStorage.DBFlowMetadata( + invocationId = checkpoint.checkpointState.invocationContext.trace.invocationId.value, + flowId = null, + flowName = "random.flow", + userSuppliedIdentifier = null, + startType = DBCheckpointStorage.StartReason.RPC, + launchingCordapp = "this cordapp", + platformVersion = PLATFORM_VERSION, + rpcUsername = "Batman", + invocationInstant = checkpoint.checkpointState.invocationContext.trace.invocationId.timestamp, + receivedInstant = Instant.now(), + startInstant = null, + finishInstant = null + ) + session.save(metadata) + } } diff --git a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt index bf9a482e66..14c4586ee6 100644 --- a/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt +++ b/node/src/test/kotlin/net/corda/node/services/rpc/CheckpointDumperImplTest.kt @@ -23,9 +23,12 @@ import net.corda.core.serialization.internal.checkpointSerialize import net.corda.nodeapi.internal.lifecycle.NodeServicesContext import net.corda.nodeapi.internal.lifecycle.NodeLifecycleEvent import net.corda.node.internal.NodeStartup +import net.corda.node.services.persistence.CheckpointPerformanceRecorder import net.corda.node.services.persistence.DBCheckpointStorage import net.corda.node.services.statemachine.Checkpoint +import net.corda.node.services.statemachine.CheckpointState import net.corda.node.services.statemachine.FlowStart +import net.corda.node.services.statemachine.FlowState import net.corda.node.services.statemachine.SubFlowVersion import net.corda.nodeapi.internal.persistence.CordaPersistence import net.corda.testing.core.SerializationEnvironmentRule @@ -104,7 +107,7 @@ class CheckpointDumperImplTest { // add a checkpoint val (id, checkpoint) = newCheckpoint() database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializeFlowState(checkpoint)) } dumper.dumpCheckpoints() @@ -130,7 +133,7 @@ class CheckpointDumperImplTest { // add a checkpoint val (id, checkpoint) = newCheckpoint() database.transaction { - checkpointStorage.addCheckpoint(id, checkpoint) + checkpointStorage.addCheckpoint(id, checkpoint, serializeFlowState(checkpoint)) } dumper.dumpCheckpoints() @@ -140,11 +143,18 @@ class CheckpointDumperImplTest { private fun newCheckpointStorage() { database.transaction { - checkpointStorage = DBCheckpointStorage() + checkpointStorage = DBCheckpointStorage(object : CheckpointPerformanceRecorder { + override fun record( + serializedCheckpointState: SerializedBytes, + serializedFlowState: SerializedBytes + ) { + // do nothing + } + }) } } - private fun newCheckpoint(version: Int = 1): Pair> { + private fun newCheckpoint(version: Int = 1): Pair { val id = StateMachineRunId.createRandom() val logic: FlowLogic<*> = object : FlowLogic() { override fun call() {} @@ -152,6 +162,10 @@ class CheckpointDumperImplTest { val frozenLogic = logic.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) val checkpoint = Checkpoint.create(InvocationContext.shell(), FlowStart.Explicit, logic.javaClass, frozenLogic, myself.identity.party, SubFlowVersion.CoreFlow(version), false) .getOrThrow() - return id to checkpoint.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) + return id to checkpoint + } + + private fun serializeFlowState(checkpoint: Checkpoint): SerializedBytes { + return checkpoint.flowState.checkpointSerialize(context = CheckpointSerializationDefaults.CHECKPOINT_CONTEXT) } } \ No newline at end of file