address dan review comments

This commit is contained in:
stefano 2020-02-10 11:52:54 +00:00
parent 546166e057
commit a28c15c2fd
2 changed files with 51 additions and 93 deletions

View File

@ -51,38 +51,38 @@ class DBCheckpointStorage : CheckpointStorage {
class DBFlowCheckpoint(
@Id
@Column(name = "flow_id", length = 64, nullable = false)
var id: String? = null,
var id: String,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "checkpoint_blob_id", referencedColumnName = "id")
var blob: DBFlowCheckpointBlob? = null,
var blob: DBFlowCheckpointBlob,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "result_id", referencedColumnName = "id")
var result: DBFlowResult? = null,
var result: DBFlowResult,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "error_id", referencedColumnName = "id")
var exceptionDetails: DBFlowException? = null,
var exceptionDetails: DBFlowException,
@OneToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "flow_id", referencedColumnName = "flow_id")
var flowMetadata: DBFlowMetadata? = null,
var flowMetadata: DBFlowMetadata,
@Column(name = "status")
var status: FlowStatus? = null,
var status: FlowStatus,
@Column(name = "compatible")
var compatible: Boolean? = null,
var compatible: Boolean,
@Column(name = "progress_step")
var progressStep: String? = null,
var progressStep: String,
@Column(name = "flow_io_request")
var ioRequestType: Class<FlowIORequest<*>>? = null,
var ioRequestType: Class<FlowIORequest<*>>,
@Column(name = "timestamp")
var checkpointInstant: Instant? = null
var checkpointInstant: Instant
)
@Entity
@ -131,7 +131,7 @@ class DBCheckpointStorage : CheckpointStorage {
@Type(type = "corda-blob")
@Column(name = "exception_value", nullable = false)
var value: ByteArray,
var value: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "exception_message")
var message: String? = null,
@ -146,40 +146,40 @@ class DBCheckpointStorage : CheckpointStorage {
@Id
@Column(name = "flow_id", length = 64, nullable = false)
var flowId: String? = null,
var flowId: String,
@Column(name = "flow_name", nullable = false)
var flowName: String? = null,
var flowName: String,
@Column(name = "flow_identifier", nullable = true)
var userSuppliedIdentifier: String? = null,
var userSuppliedIdentifier: String,
@Column(name = "started_type", nullable = true)
var startType: StartReason? = null,
var startType: StartReason,
@Column(name = "flow_parameters", nullable = true)
var initialParameters: ByteArray? = null,
var initialParameters: ByteArray = EMPTY_BYTE_ARRAY,
@Column(name = "cordapp_name", nullable = true)
var launchingCordapp: String? = null,
var launchingCordapp: String,
@Column(name = "platform_version", nullable = true)
var platformVersion: Int? = null,
var platformVersion: Int,
@Column(name = "rpc_user", nullable = true)
var rpcUsername: String? = null,
var rpcUsername: String,
@Column(name = "invocation_time", nullable = true)
var invocationInstant: Instant? = null,
var invocationInstant: Instant,
@Column(name = "received_time", nullable = true)
var receivedInstant: Instant? = null,
var receivedInstant: Instant,
@Column(name = "start_time", nullable = true)
var startInstant: Instant? = null,
var startInstant: Instant,
@Column(name = "finish_time", nullable = true)
var finishInstant: Instant? = null
var finishInstant: Instant
)

View File

@ -34,7 +34,6 @@ import net.corda.node.services.api.CheckpointStorage
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.config.shouldCheckCheckpoints
import net.corda.node.services.messaging.DeduplicationHandler
import net.corda.node.services.persistence.DBCheckpointStorage
import net.corda.node.services.statemachine.FlowStateMachineImpl.Companion.createSubFlowVersion
import net.corda.node.services.statemachine.interceptors.*
import net.corda.node.services.statemachine.transitions.StateMachine
@ -43,7 +42,6 @@ import net.corda.node.utilities.errorAndTerminate
import net.corda.node.utilities.injectOldProgressTracker
import net.corda.node.utilities.isEnabledTimedFlow
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.currentDBSession
import net.corda.nodeapi.internal.persistence.wrapWithDatabaseTransaction
import net.corda.serialization.internal.CheckpointSerializeAsTokenContextImpl
import net.corda.serialization.internal.withTokenContext
@ -372,11 +370,11 @@ class SingleThreadedStateMachineManager(
}
// Resurrect flow
createFlowFromCheckpoint(
id = flowId,
serializedCheckpoint = serializedCheckpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false
id = flowId,
serializedCheckpoint = serializedCheckpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false
) ?: return
} else {
// Just flow initiation message
@ -411,9 +409,9 @@ class SingleThreadedStateMachineManager(
// Failed to retry - manually put the flow in for observation rather than
// relying on the [HospitalisingInterceptor] to do so
val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored)
?.errors
?.map { it.exception }
?.plus(e) ?: emptyList()
?.errors
?.map { it.exception }
?.plus(e) ?: emptyList()
logger.info("Failed to retry flow $flowId, keeping in for observation and aborting")
flowHospital.forceIntoOvernightObservation(flowId, exceptions)
throw e
@ -433,11 +431,11 @@ class SingleThreadedStateMachineManager(
private fun <T> onExternalStartFlow(event: ExternalEvent.ExternalStartFlowEvent<T>) {
val future = startFlow(
event.flowId,
event.flowLogic,
event.context,
ourIdentity = null,
deduplicationHandler = event.deduplicationHandler
event.flowId,
event.flowLogic,
event.context,
ourIdentity = null,
deduplicationHandler = event.deduplicationHandler
)
event.wireUpFuture(future)
}
@ -505,14 +503,14 @@ class SingleThreadedStateMachineManager(
is InitiatedFlowFactory.CorDapp -> null
}
startInitiatedFlow(
event.flowId,
flowLogic,
event.deduplicationHandler,
senderSession,
initiatedSessionId,
sessionMessage,
senderCoreFlowVersion,
initiatedFlowInfo
event.flowId,
flowLogic,
event.deduplicationHandler,
senderSession,
initiatedSessionId,
sessionMessage,
senderCoreFlowVersion,
initiatedFlowInfo
)
} catch (t: Throwable) {
logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " +
@ -588,46 +586,6 @@ class SingleThreadedStateMachineManager(
val flowAlreadyExists = mutex.locked { flows[flowId] != null }
val existingCheckpoint = if (flowAlreadyExists) {
val currentDBSession = currentDBSession()
val dbFlowCheckpoint = currentDBSession.get(DBCheckpointStorage.DBFlowCheckpoint::class.java, flowId.toString())
?: DBCheckpointStorage.DBFlowCheckpoint(
flowId.toString(),
null,
null,
null,
null,
DBCheckpointStorage.FlowStatus.RUNNABLE,
true,
flowLogic.progressTracker?.currentStep?.toString(),
null,
null
)
var cordappName : String? = null
var initiatingParty: String? = null
var startReason: DBCheckpointStorage.StartReason = DBCheckpointStorage.StartReason.FLOW
when (flowStart) {
is FlowStart.Initiated -> {
cordappName = flowStart.initiatedFlowInfo.appName
initiatingParty = flowStart.peerSession.counterparty.name.toString()
startReason = DBCheckpointStorage.StartReason.INITIATED
}
}
currentDBSession.get(DBCheckpointStorage.DBFlowMetadata::class.java, flowId.toString()) ?: DBCheckpointStorage.DBFlowMetadata(
flowId = flowId.toString(),
flowName = "thisIsAPlaceholder",
userSuppliedIdentifier = "thisIsAnotherPlaceholder",
startType = startReason,
initialParameters = null,
launchingCordapp = cordappName
)
currentDBSession.persist(dbFlowCheckpoint)
// Load the flow's checkpoint
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
@ -647,13 +605,13 @@ class SingleThreadedStateMachineManager(
null
}
val checkpoint = existingCheckpoint ?: Checkpoint.create(
invocationContext,
flowStart,
flowLogic.javaClass,
frozenFlowLogic,
ourIdentity,
flowCorDappVersion,
flowLogic.isEnabledTimedFlow()
invocationContext,
flowStart,
flowLogic.javaClass,
frozenFlowLogic,
ourIdentity,
flowCorDappVersion,
flowLogic.isEnabledTimedFlow()
).getOrThrow()
val startedFuture = openFuture<Unit>()