Merge pull request #5892 from corda/new_checkpoint_schema

New checkpoint schema
This commit is contained in:
Stefano Franz 2020-02-10 15:05:59 +00:00 committed by GitHub
commit c9f055aab4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 525 additions and 35 deletions

View File

@ -45,6 +45,7 @@ sealed class FlowIORequest<out R : Any> {
* @property shouldRetrySend specifies whether the send should be retried. * @property shouldRetrySend specifies whether the send should be retried.
* @return a map from session to received message. * @return a map from session to received message.
*/ */
//net.corda.core.internal.FlowIORequest.SendAndReceive
data class SendAndReceive( data class SendAndReceive(
val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>, val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>,
val shouldRetrySend: Boolean val shouldRetrySend: Boolean
@ -80,7 +81,15 @@ sealed class FlowIORequest<out R : Any> {
/** /**
* Suspend the flow until all Initiating sessions are confirmed. * Suspend the flow until all Initiating sessions are confirmed.
*/ */
object WaitForSessionConfirmations : FlowIORequest<Unit>() class WaitForSessionConfirmations : FlowIORequest<Unit>() {
override fun equals(other: Any?): Boolean {
return this === other
}
override fun hashCode(): Int {
return System.identityHashCode(this)
}
}
/** /**
* Execute the specified [operation], suspend the flow until completion. * Execute the specified [operation], suspend the flow until completion.

View File

@ -1,7 +1,9 @@
package net.corda.node.services.persistence package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.Checkpoint
@ -16,8 +18,14 @@ import javax.persistence.Column
import javax.persistence.Entity import javax.persistence.Entity
import javax.persistence.Id import javax.persistence.Id
import org.hibernate.annotations.Type import org.hibernate.annotations.Type
import java.lang.Exception
import java.math.BigInteger
import java.sql.Connection import java.sql.Connection
import java.sql.SQLException import java.sql.SQLException
import java.time.Instant
import javax.persistence.FetchType
import javax.persistence.JoinColumn
import javax.persistence.OneToOne
/** /**
* Simple checkpoint key value storage in DB. * Simple checkpoint key value storage in DB.
@ -25,6 +33,156 @@ import java.sql.SQLException
class DBCheckpointStorage : CheckpointStorage { class DBCheckpointStorage : CheckpointStorage {
val log: Logger = LoggerFactory.getLogger(this::class.java) val log: Logger = LoggerFactory.getLogger(this::class.java)
enum class FlowStatus {
RUNNABLE,
FAILED,
COMPLETED,
HOSPITALIZED,
KILLED,
PAUSED
}
enum class StartReason {
RPC, FLOW, SERVICE, SCHEDULED, INITIATED
}
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_new")
class DBFlowCheckpoint(
@Id
@Column(name = "flow_id", length = 64, nullable = false)
var id: String,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id")
var blob: DBFlowCheckpointBlob,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "result_id", referencedColumnName = "id")
var result: DBFlowResult?,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "error_id", referencedColumnName = "id")
var exceptionDetails: DBFlowException?,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "flow_id", referencedColumnName = "flow_id")
var flowMetadata: DBFlowMetadata,
@Column(name = "status")
var status: FlowStatus,
@Column(name = "compatible")
var compatible: Boolean,
@Column(name = "progress_step")
var progressStep: String,
@Column(name = "flow_io_request")
var ioRequestType: Class<FlowIORequest<*>>,
@Column(name = "timestamp")
var checkpointInstant: Instant
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs")
class DBFlowCheckpointBlob(
@Id
@Column(name = "id", nullable = false)
var id: BigInteger? = null,
@Type(type = "corda-blob")
@Column(name = "checkpoint_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY,
@Type(type = "corda-blob")
@Column(name = "flow_state", nullable = false)
var flowStack: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "timestamp")
var persistedInstant: Instant? = null
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_results")
class DBFlowResult(
@Id
@Column(name = "id", nullable = false)
var id: BigInteger? = null,
@Type(type = "corda-blob")
@Column(name = "result_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "timestamp")
val persistedInstant: Instant? = null
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_exceptions")
class DBFlowException(
@Id
@Column(name = "id", nullable = false)
var id: BigInteger? = null,
@Column(name = "type", nullable = false)
var type: Class<Exception>,
@Type(type = "corda-blob")
@Column(name = "exception_value", nullable = false)
var value: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "exception_message")
var message: String? = null,
@Column(name = "timestamp")
val persistedInstant: Instant? = null
)
@Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_metadata")
class DBFlowMetadata(
@Id
@Column(name = "flow_id", length = 64, nullable = false)
var flowId: String,
@Column(name = "flow_name", nullable = false)
var flowName: String,
@Column(name = "flow_identifier", nullable = true)
var userSuppliedIdentifier: String?,
@Column(name = "started_type", nullable = false)
var startType: StartReason,
@Column(name = "flow_parameters", nullable = false)
var initialParameters: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "cordapp_name", nullable = false)
var launchingCordapp: String,
@Column(name = "platform_version", nullable = false)
var platformVersion: Int,
@Column(name = "rpc_user", nullable = false)
var rpcUsername: String,
@Column(name = "invocation_time", nullable = false)
var invocationInstant: Instant,
@Column(name = "received_time", nullable = false)
var receivedInstant: Instant,
@Column(name = "start_time", nullable = true)
var startInstant: Instant?,
@Column(name = "finish_time", nullable = true)
var finishInstant: Instant?
)
@Entity @Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints")
class DBCheckpoint( class DBCheckpoint(
@ -56,7 +214,6 @@ class DBCheckpointStorage : CheckpointStorage {
}) })
} }
override fun removeCheckpoint(id: StateMachineRunId): Boolean { override fun removeCheckpoint(id: StateMachineRunId): Boolean {
val session = currentDBSession() val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder val criteriaBuilder = session.criteriaBuilder

View File

@ -34,6 +34,14 @@ class NodeSchemaService(private val extraSchemas: Set<MappedSchema> = emptySet()
object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1, object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1,
mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java, mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java,
//new checkpoints - keeping old around to allow testing easily (for now)
DBCheckpointStorage.DBFlowCheckpoint::class.java,
DBCheckpointStorage.DBFlowCheckpointBlob::class.java,
DBCheckpointStorage.DBFlowResult::class.java,
DBCheckpointStorage.DBFlowException::class.java,
DBCheckpointStorage.DBFlowMetadata::class.java,
DBTransactionStorage.DBTransaction::class.java, DBTransactionStorage.DBTransaction::class.java,
BasicHSMKeyManagementService.PersistentKey::class.java, BasicHSMKeyManagementService.PersistentKey::class.java,
NodeSchedulerService.PersistentScheduledState::class.java, NodeSchedulerService.PersistentScheduledState::class.java,

View File

@ -81,7 +81,7 @@ internal class FlowMonitor(
is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}" is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}"
is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}" is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}"
is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}" is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}"
FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed" is FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed"
is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete" is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete"
FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow" FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow"
} }

View File

@ -269,7 +269,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader
val result = logic.call() val result = logic.call()
suspend(FlowIORequest.WaitForSessionConfirmations, maySkipCheckpoint = true) suspend(FlowIORequest.WaitForSessionConfirmations(), maySkipCheckpoint = true)
Try.Success(result) Try.Success(result)
} catch (t: Throwable) { } catch (t: Throwable) {
if(t.isUnrecoverable()) { if(t.isUnrecoverable()) {

View File

@ -31,4 +31,8 @@
<include file="migration/node-core.changelog-v14-data.xml"/> <include file="migration/node-core.changelog-v14-data.xml"/>
<include file="migration/node-core.changelog-v17.xml"/>
<include file="migration/node-core.changelog-v17-postgres.xml"/>
<include file="migration/node-core.changelog-v17-keys.xml"/>
</databaseChangeLog> </databaseChangeLog>

View File

@ -0,0 +1,34 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_new_checkpoint_schema_primary_keys">
<addPrimaryKey columnNames="flow_id" constraintName="node_checkpoints_pk" tableName="node_checkpoints_new"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_blobs_pk" tableName="node_checkpoint_blobs"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_exceptions_pk" tableName="node_flow_exceptions"/>
<addPrimaryKey columnNames="id" constraintName="node_checkpoint_results_pk" tableName="node_flow_results"/>
<addPrimaryKey columnNames="invocation_id" constraintName="node_flow_metadata_pk" tableName="node_flow_metadata"/>
</changeSet>
<changeSet author="R3.Corda" id="add_new_checkpoint_schema_foreign_keys">
<addForeignKeyConstraint baseColumnNames="checkpoint_blob_id" baseTableName="node_checkpoints_new"
constraintName="node_checkpoint_blob_id_to_blob_table_fk"
referencedColumnNames="id" referencedTableName="node_checkpoint_blobs"/>
<addForeignKeyConstraint baseColumnNames="error_id" baseTableName="node_checkpoints_new"
constraintName="node_checkpoints_to_exceptions_fk"
referencedColumnNames="id" referencedTableName="node_flow_exceptions"/>
<addForeignKeyConstraint baseColumnNames="result_id" baseTableName="node_checkpoints_new"
constraintName="node_checkpoint_to_result_fk"
referencedColumnNames="id" referencedTableName="node_flow_results"/>
<addForeignKeyConstraint baseColumnNames="flow_id" baseTableName="node_flow_metadata"
constraintName="node_metadata_to_checkpoints_fk"
referencedColumnNames="flow_id" referencedTableName="node_checkpoints_new"/>
</changeSet>
</databaseChangeLog>

View File

@ -0,0 +1,139 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_new_checkpoints_table-pg" dbms="postgresql">
<createTable tableName="node_checkpoints_new">
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="checkpoint_blob_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="result_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="error_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="status" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="compatible" type="BOOLEAN">
<constraints nullable="false"/>
</column>
<column name="progress_step" type="NVARCHAR(256)">
<constraints nullable="true"/>
</column>
<!-- net.corda.core.internal.FlowIORequest.SendAndReceive -->
<column name="flow_io_request" type="NVARCHAR(128)">
<constraints nullable="true"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_checkpoint_blob_table-pg" dbms="postgresql">
<createTable tableName="node_checkpoint_blobs">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="checkpoint_value" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="flow_state" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="hmac" type="NVARCHAR(32)">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_result_table-pg" dbms="postgresql">
<createTable tableName="node_flow_results">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="result_value" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_exceptions_table-pg" dbms="postgresql">
<createTable tableName="node_flow_exceptions">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="type" type="NVARCHAR(256)">
<constraints nullable="false"/>
</column>
<column name="exception_value" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="exception_message" type="NVARCHAR(512)">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_metadata_table-pg" dbms="postgresql">
<createTable tableName="node_flow_metadata">
<column name="invocation_id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="true"/>
</column>
<column name="flow_name" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="flow_identifier" type="NVARCHAR(512)">
<constraints nullable="true"/>
</column>
<column name="started_type" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="flow_parameters" type="varbinary(33554432)">
<constraints nullable="false"/>
</column>
<column name="cordapp_name" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="platform_version" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="rpc_user" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="invocation_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="received_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="start_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/>
</column>
<column name="finish_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/>
</column>
</createTable>
</changeSet>
</databaseChangeLog>

View File

@ -0,0 +1,139 @@
<?xml version="1.1" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.5.xsd"
logicalFilePath="migration/node-services.changelog-init.xml">
<changeSet author="R3.Corda" id="add_new_checkpoints_table" dbms="!postgresql">
<createTable tableName="node_checkpoints_new">
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="checkpoint_blob_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="result_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="error_id" type="BIGINT">
<constraints nullable="true"/>
</column>
<column name="status" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="compatible" type="BOOLEAN">
<constraints nullable="false"/>
</column>
<column name="progress_step" type="NVARCHAR(256)">
<constraints nullable="true"/>
</column>
<!-- net.corda.core.internal.FlowIORequest.SendAndReceive -->
<column name="flow_io_request" type="NVARCHAR(128)">
<constraints nullable="true"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_checkpoint_blob_table" dbms="!postgresql">
<createTable tableName="node_checkpoint_blobs">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="checkpoint_value" type="blob">
<constraints nullable="false"/>
</column>
<column name="flow_state" type="blob">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="hmac" type="NVARCHAR(32)">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_result_table" dbms="!postgresql">
<createTable tableName="node_flow_results">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="result_value" type="blob">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_exceptions_table" dbms="!postgresql">
<createTable tableName="node_flow_exceptions">
<column name="id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="type" type="NVARCHAR(256)">
<constraints nullable="false"/>
</column>
<column name="exception_value" type="blob">
<constraints nullable="false"/>
</column>
<column name="exception_message" type="NVARCHAR(512)">
<constraints nullable="false"/>
</column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
</createTable>
</changeSet>
<changeSet author="R3.Corda" id="add_new_flow_metadata_table" dbms="!postgresql">
<createTable tableName="node_flow_metadata">
<column name="invocation_id" type="BIGINT">
<constraints nullable="false"/>
</column>
<column name="flow_id" type="NVARCHAR(64)">
<constraints nullable="true"/>
</column>
<column name="flow_name" type="NVARCHAR(128)">
<constraints nullable="false"/>
</column>
<column name="flow_identifier" type="NVARCHAR(512)">
<constraints nullable="true"/>
</column>
<column name="started_type" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="flow_parameters" type="blob">
<constraints nullable="false"/>
</column>
<column name="cordapp_name" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="platform_version" type="TINYINT">
<constraints nullable="false"/>
</column>
<column name="rpc_user" type="NVARCHAR(64)">
<constraints nullable="false"/>
</column>
<column name="invocation_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="received_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="false"/>
</column>
<column name="start_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/>
</column>
<column name="finish_time" type="java.sql.Types.TIMESTAMP">
<constraints nullable="true"/>
</column>
</createTable>
</changeSet>
</databaseChangeLog>