flesh out entities for new checkpointing

This commit is contained in:
stefano 2020-01-29 17:10:34 +00:00
parent 78d83e9583
commit 7b3da95456
5 changed files with 80 additions and 9 deletions

View File

@ -45,6 +45,7 @@ sealed class FlowIORequest<out R : Any> {
* @property shouldRetrySend specifies whether the send should be retried. * @property shouldRetrySend specifies whether the send should be retried.
* @return a map from session to received message. * @return a map from session to received message.
*/ */
//net.corda.core.internal.FlowIORequest.SendAndReceive
data class SendAndReceive( data class SendAndReceive(
val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>, val sessionToMessage: Map<FlowSession, SerializedBytes<Any>>,
val shouldRetrySend: Boolean val shouldRetrySend: Boolean
@ -80,7 +81,15 @@ sealed class FlowIORequest<out R : Any> {
/** /**
* Suspend the flow until all Initiating sessions are confirmed. * Suspend the flow until all Initiating sessions are confirmed.
*/ */
object WaitForSessionConfirmations : FlowIORequest<Unit>() class WaitForSessionConfirmations : FlowIORequest<Unit>() {
override fun equals(other: Any?): Boolean {
return this === other
}
override fun hashCode(): Int {
return System.identityHashCode(this)
}
}
/** /**
* Execute the specified [operation], suspend the flow until completion. * Execute the specified [operation], suspend the flow until completion.

View File

@ -1,7 +1,9 @@
package net.corda.node.services.persistence package net.corda.node.services.persistence
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.internal.FlowIORequest
import net.corda.core.serialization.SerializedBytes import net.corda.core.serialization.SerializedBytes
import net.corda.core.utilities.ProgressTracker
import net.corda.core.utilities.debug import net.corda.core.utilities.debug
import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.statemachine.Checkpoint import net.corda.node.services.statemachine.Checkpoint
@ -16,8 +18,13 @@ import javax.persistence.Column
import javax.persistence.Entity import javax.persistence.Entity
import javax.persistence.Id import javax.persistence.Id
import org.hibernate.annotations.Type import org.hibernate.annotations.Type
import java.math.BigInteger
import java.sql.Connection import java.sql.Connection
import java.sql.SQLException import java.sql.SQLException
import java.time.Instant
import javax.persistence.FetchType
import javax.persistence.JoinColumn
import javax.persistence.OneToOne
/** /**
* Simple checkpoint key value storage in DB. * Simple checkpoint key value storage in DB.
@ -25,16 +32,71 @@ import java.sql.SQLException
class DBCheckpointStorage : CheckpointStorage { class DBCheckpointStorage : CheckpointStorage {
val log: Logger = LoggerFactory.getLogger(this::class.java) val log: Logger = LoggerFactory.getLogger(this::class.java)
enum class FlowStatus {
RUNNABLE,
FAILED,
COMPLETED,
HOSPITALIZED,
KILLED,
PAUSED
}
@Entity @Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints")
class DBFlowCheckpoint( class DBFlowCheckpoint(
@Id
@Column(name = "flow_id", length = 64, nullable = false)
private var id: String? = null,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "id")
private var blob: DBFlowCheckpointBlob? = null,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "id")
private var result: DBFlowResult? = null,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "id")
private var exceptionDetails: DBFlowException? = null,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "flow_id")
private var flowMetadata: DBFlowMetadata? = null,
@Column(name = "status")
private var status: FlowStatus? = null,
@Column(name = "compatible")
private var compatible: Boolean? = null,
@Column(name = "progress_step")
private var progressStep: String? = null,
@Column(name = "flow_io_request")
private val ioRequestType: Class<FlowIORequest<*>>? = null,
@Column(name = "timestamp")
private val checkpointInstant: Instant? = null
) )
@Entity @Entity
@javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs") @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs")
class DBFlowCheckpointBlob( class DBFlowCheckpointBlob(
@Id
@Column(name = "id", nullable = false)
private var id: BigInteger? = null,
@Type(type = "corda-blob")
@Column(name = "checkpoint_value", nullable = false)
var checkpoint: ByteArray = EMPTY_BYTE_ARRAY,
@Type(type = "corda-blob")
@Column(name = "flow_state", nullable = false)
var flowStack: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "timestamp")
private val instant: Instant? = null
) )
@Entity @Entity
@ -86,7 +148,6 @@ class DBCheckpointStorage : CheckpointStorage {
}) })
} }
override fun removeCheckpoint(id: StateMachineRunId): Boolean { override fun removeCheckpoint(id: StateMachineRunId): Boolean {
val session = currentDBSession() val session = currentDBSession()
val criteriaBuilder = session.criteriaBuilder val criteriaBuilder = session.criteriaBuilder

View File

@ -81,7 +81,7 @@ internal class FlowMonitor(
is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}" is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}"
is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}" is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}"
is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}" is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}"
FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed" is FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed"
is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete" is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete"
FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow" FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow"
} }

View File

@ -269,7 +269,7 @@ class FlowStateMachineImpl<R>(override val id: StateMachineRunId,
Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader
val result = logic.call() val result = logic.call()
suspend(FlowIORequest.WaitForSessionConfirmations, maySkipCheckpoint = true) suspend(FlowIORequest.WaitForSessionConfirmations(), maySkipCheckpoint = true)
Try.Success(result) Try.Success(result)
} catch (t: Throwable) { } catch (t: Throwable) {
if(t.isUnrecoverable()) { if(t.isUnrecoverable()) {

View File

@ -22,7 +22,7 @@
<column name="error_id" type="BIGINT"> <column name="error_id" type="BIGINT">
<constraints nullable="true"/> <constraints nullable="true"/>
</column> </column>
<column name="status" type="NVARCHAR(16)"> <column name="status" type="BIGINT">
<constraints nullable="false"/> <constraints nullable="false"/>
</column> </column>
<column name="compatible" type="BOOLEAN"> <column name="compatible" type="BOOLEAN">
@ -31,7 +31,8 @@
<column name="progress_step" type="NVARCHAR(256)"> <column name="progress_step" type="NVARCHAR(256)">
<constraints nullable="true"/> <constraints nullable="true"/>
</column> </column>
<column name="flow_io_request" type="NVARCHAR(32)"> <!-- net.corda.core.internal.FlowIORequest.SendAndReceive -->
<column name="flow_io_request" type="NVARCHAR(128)">
<constraints nullable="true"/> <constraints nullable="true"/>
</column> </column>
<column name="timestamp" type="java.sql.Types.TIMESTAMP"> <column name="timestamp" type="java.sql.Types.TIMESTAMP">