diff --git a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt index 494c5099aa..0d54a4715a 100644 --- a/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt +++ b/core/src/main/kotlin/net/corda/core/internal/FlowIORequest.kt @@ -45,6 +45,7 @@ sealed class FlowIORequest { * @property shouldRetrySend specifies whether the send should be retried. * @return a map from session to received message. */ + //net.corda.core.internal.FlowIORequest.SendAndReceive data class SendAndReceive( val sessionToMessage: Map>, val shouldRetrySend: Boolean @@ -80,7 +81,15 @@ sealed class FlowIORequest { /** * Suspend the flow until all Initiating sessions are confirmed. */ - object WaitForSessionConfirmations : FlowIORequest() + class WaitForSessionConfirmations : FlowIORequest() { + override fun equals(other: Any?): Boolean { + return this === other + } + + override fun hashCode(): Int { + return System.identityHashCode(this) + } + } /** * Execute the specified [operation], suspend the flow until completion. diff --git a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt index b1eec763f6..c9d883a5b0 100644 --- a/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt +++ b/node/src/main/kotlin/net/corda/node/services/persistence/DBCheckpointStorage.kt @@ -1,7 +1,9 @@ package net.corda.node.services.persistence import net.corda.core.flows.StateMachineRunId +import net.corda.core.internal.FlowIORequest import net.corda.core.serialization.SerializedBytes +import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.debug import net.corda.node.services.api.CheckpointStorage import net.corda.node.services.statemachine.Checkpoint @@ -16,8 +18,14 @@ import javax.persistence.Column import javax.persistence.Entity import javax.persistence.Id import org.hibernate.annotations.Type +import java.lang.Exception +import java.math.BigInteger import java.sql.Connection import java.sql.SQLException +import java.time.Instant +import javax.persistence.FetchType +import javax.persistence.JoinColumn +import javax.persistence.OneToOne /** * Simple checkpoint key value storage in DB. @@ -25,6 +33,156 @@ import java.sql.SQLException class DBCheckpointStorage : CheckpointStorage { val log: Logger = LoggerFactory.getLogger(this::class.java) + enum class FlowStatus { + RUNNABLE, + FAILED, + COMPLETED, + HOSPITALIZED, + KILLED, + PAUSED + } + + enum class StartReason { + RPC, FLOW, SERVICE, SCHEDULED, INITIATED + } + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_new") + class DBFlowCheckpoint( + @Id + @Column(name = "flow_id", length = 64, nullable = false) + var id: String, + + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id") + var blob: DBFlowCheckpointBlob, + + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "result_id", referencedColumnName = "id") + var result: DBFlowResult?, + + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "error_id", referencedColumnName = "id") + var exceptionDetails: DBFlowException?, + + @OneToOne(fetch = FetchType.LAZY) + @JoinColumn(name = "flow_id", referencedColumnName = "flow_id") + var flowMetadata: DBFlowMetadata, + + @Column(name = "status") + var status: FlowStatus, + + @Column(name = "compatible") + var compatible: Boolean, + + @Column(name = "progress_step") + var progressStep: String, + + @Column(name = "flow_io_request") + var ioRequestType: Class>, + + @Column(name = "timestamp") + var checkpointInstant: Instant + ) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints_blobs") + class DBFlowCheckpointBlob( + @Id + @Column(name = "id", nullable = false) + var id: BigInteger? = null, + + @Type(type = "corda-blob") + @Column(name = "checkpoint_value", nullable = false) + var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, + + @Type(type = "corda-blob") + @Column(name = "flow_state", nullable = false) + var flowStack: ByteArray = EMPTY_BYTE_ARRAY, + + @Column(name = "timestamp") + var persistedInstant: Instant? = null + ) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_results") + class DBFlowResult( + @Id + @Column(name = "id", nullable = false) + var id: BigInteger? = null, + + @Type(type = "corda-blob") + @Column(name = "result_value", nullable = false) + var checkpoint: ByteArray = EMPTY_BYTE_ARRAY, + + @Column(name = "timestamp") + val persistedInstant: Instant? = null + ) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_exceptions") + class DBFlowException( + @Id + @Column(name = "id", nullable = false) + var id: BigInteger? = null, + + @Column(name = "type", nullable = false) + var type: Class, + + @Type(type = "corda-blob") + @Column(name = "exception_value", nullable = false) + var value: ByteArray = EMPTY_BYTE_ARRAY, + + @Column(name = "exception_message") + var message: String? = null, + + @Column(name = "timestamp") + val persistedInstant: Instant? = null + ) + + @Entity + @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}flow_metadata") + class DBFlowMetadata( + + @Id + @Column(name = "flow_id", length = 64, nullable = false) + var flowId: String, + + @Column(name = "flow_name", nullable = false) + var flowName: String, + + @Column(name = "flow_identifier", nullable = true) + var userSuppliedIdentifier: String?, + + @Column(name = "started_type", nullable = false) + var startType: StartReason, + + @Column(name = "flow_parameters", nullable = false) + var initialParameters: ByteArray = EMPTY_BYTE_ARRAY, + + @Column(name = "cordapp_name", nullable = false) + var launchingCordapp: String, + + @Column(name = "platform_version", nullable = false) + var platformVersion: Int, + + @Column(name = "rpc_user", nullable = false) + var rpcUsername: String, + + @Column(name = "invocation_time", nullable = false) + var invocationInstant: Instant, + + @Column(name = "received_time", nullable = false) + var receivedInstant: Instant, + + @Column(name = "start_time", nullable = true) + var startInstant: Instant?, + + @Column(name = "finish_time", nullable = true) + var finishInstant: Instant? + + ) + @Entity @javax.persistence.Table(name = "${NODE_DATABASE_PREFIX}checkpoints") class DBCheckpoint( @@ -35,10 +193,10 @@ class DBCheckpointStorage : CheckpointStorage { @Type(type = "corda-blob") @Column(name = "checkpoint_value", nullable = false) - var checkpoint: ByteArray = EMPTY_BYTE_ARRAY + var checkpoint: ByteArray = EMPTY_BYTE_ARRAY ) { - override fun toString() = "DBCheckpoint(checkpointId = ${checkpointId}, checkpointSize = ${checkpoint.size})" - } + override fun toString() = "DBCheckpoint(checkpointId = ${checkpointId}, checkpointSize = ${checkpoint.size})" + } override fun addCheckpoint(id: StateMachineRunId, checkpoint: SerializedBytes) { currentDBSession().save(DBCheckpoint().apply { @@ -56,7 +214,6 @@ class DBCheckpointStorage : CheckpointStorage { }) } - override fun removeCheckpoint(id: StateMachineRunId): Boolean { val session = currentDBSession() val criteriaBuilder = session.criteriaBuilder diff --git a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt index a834dbb3ab..73d7cbb3d9 100644 --- a/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt +++ b/node/src/main/kotlin/net/corda/node/services/schema/NodeSchemaService.kt @@ -34,6 +34,14 @@ class NodeSchemaService(private val extraSchemas: Set = emptySet() object NodeCoreV1 : MappedSchema(schemaFamily = NodeCore.javaClass, version = 1, mappedTypes = listOf(DBCheckpointStorage.DBCheckpoint::class.java, + + //new checkpoints - keeping old around to allow testing easily (for now) + DBCheckpointStorage.DBFlowCheckpoint::class.java, + DBCheckpointStorage.DBFlowCheckpointBlob::class.java, + DBCheckpointStorage.DBFlowResult::class.java, + DBCheckpointStorage.DBFlowException::class.java, + DBCheckpointStorage.DBFlowMetadata::class.java, + DBTransactionStorage.DBTransaction::class.java, BasicHSMKeyManagementService.PersistentKey::class.java, NodeSchedulerService.PersistentScheduledState::class.java, diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt index b947f62f2b..9f80005880 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowMonitor.kt @@ -81,7 +81,7 @@ internal class FlowMonitor( is FlowIORequest.WaitForLedgerCommit -> "for the ledger to commit transaction with hash ${request.hash}" is FlowIORequest.GetFlowInfo -> "to get flow information from parties ${request.sessions.partiesInvolved()}" is FlowIORequest.Sleep -> "to wake up from sleep ending at ${LocalDateTime.ofInstant(request.wakeUpAfter, ZoneId.systemDefault())}" - FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed" + is FlowIORequest.WaitForSessionConfirmations -> "for sessions to be confirmed" is FlowIORequest.ExecuteAsyncOperation -> "for asynchronous operation of type ${request.operation::javaClass} to complete" FlowIORequest.ForceCheckpoint -> "for forcing a checkpoint at an arbitrary point in a flow" } diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt index 4a9a407473..2bbdae6ba3 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/FlowStateMachineImpl.kt @@ -269,7 +269,7 @@ class FlowStateMachineImpl(override val id: StateMachineRunId, Thread.currentThread().contextClassLoader = (serviceHub.cordappProvider as CordappProviderImpl).cordappLoader.appClassLoader val result = logic.call() - suspend(FlowIORequest.WaitForSessionConfirmations, maySkipCheckpoint = true) + suspend(FlowIORequest.WaitForSessionConfirmations(), maySkipCheckpoint = true) Try.Success(result) } catch (t: Throwable) { if(t.isUnrecoverable()) { diff --git a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt index 34c9b77582..371d5ed92a 100644 --- a/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt +++ b/node/src/main/kotlin/net/corda/node/services/statemachine/SingleThreadedStateMachineManager.kt @@ -370,11 +370,11 @@ class SingleThreadedStateMachineManager( } // Resurrect flow createFlowFromCheckpoint( - id = flowId, - serializedCheckpoint = serializedCheckpoint, - initialDeduplicationHandler = null, - isAnyCheckpointPersisted = true, - isStartIdempotent = false + id = flowId, + serializedCheckpoint = serializedCheckpoint, + initialDeduplicationHandler = null, + isAnyCheckpointPersisted = true, + isStartIdempotent = false ) ?: return } else { // Just flow initiation message @@ -409,9 +409,9 @@ class SingleThreadedStateMachineManager( // Failed to retry - manually put the flow in for observation rather than // relying on the [HospitalisingInterceptor] to do so val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored) - ?.errors - ?.map { it.exception } - ?.plus(e) ?: emptyList() + ?.errors + ?.map { it.exception } + ?.plus(e) ?: emptyList() logger.info("Failed to retry flow $flowId, keeping in for observation and aborting") flowHospital.forceIntoOvernightObservation(flowId, exceptions) throw e @@ -431,11 +431,11 @@ class SingleThreadedStateMachineManager( private fun onExternalStartFlow(event: ExternalEvent.ExternalStartFlowEvent) { val future = startFlow( - event.flowId, - event.flowLogic, - event.context, - ourIdentity = null, - deduplicationHandler = event.deduplicationHandler + event.flowId, + event.flowLogic, + event.context, + ourIdentity = null, + deduplicationHandler = event.deduplicationHandler ) event.wireUpFuture(future) } @@ -503,14 +503,14 @@ class SingleThreadedStateMachineManager( is InitiatedFlowFactory.CorDapp -> null } startInitiatedFlow( - event.flowId, - flowLogic, - event.deduplicationHandler, - senderSession, - initiatedSessionId, - sessionMessage, - senderCoreFlowVersion, - initiatedFlowInfo + event.flowId, + flowLogic, + event.deduplicationHandler, + senderSession, + initiatedSessionId, + sessionMessage, + senderCoreFlowVersion, + initiatedFlowInfo ) } catch (t: Throwable) { logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " + @@ -605,13 +605,13 @@ class SingleThreadedStateMachineManager( null } val checkpoint = existingCheckpoint ?: Checkpoint.create( - invocationContext, - flowStart, - flowLogic.javaClass, - frozenFlowLogic, - ourIdentity, - flowCorDappVersion, - flowLogic.isEnabledTimedFlow() + invocationContext, + flowStart, + flowLogic.javaClass, + frozenFlowLogic, + ourIdentity, + flowCorDappVersion, + flowLogic.isEnabledTimedFlow() ).getOrThrow() val startedFuture = openFuture() diff --git a/node/src/main/resources/migration/node-core.changelog-master.xml b/node/src/main/resources/migration/node-core.changelog-master.xml index 28842e0825..8c5a7916e1 100644 --- a/node/src/main/resources/migration/node-core.changelog-master.xml +++ b/node/src/main/resources/migration/node-core.changelog-master.xml @@ -31,4 +31,8 @@ + + + + diff --git a/node/src/main/resources/migration/node-core.changelog-v17-keys.xml b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml new file mode 100644 index 0000000000..fdc812a02c --- /dev/null +++ b/node/src/main/resources/migration/node-core.changelog-v17-keys.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml new file mode 100644 index 0000000000..9602ec51b3 --- /dev/null +++ b/node/src/main/resources/migration/node-core.changelog-v17-postgres.xml @@ -0,0 +1,139 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/node/src/main/resources/migration/node-core.changelog-v17.xml b/node/src/main/resources/migration/node-core.changelog-v17.xml new file mode 100644 index 0000000000..2c1789fc95 --- /dev/null +++ b/node/src/main/resources/migration/node-core.changelog-v17.xml @@ -0,0 +1,139 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file