Port ledger integrity work to SingleThreadedStateMachineManager

- Port ledger integrity work to `SingleThreadedStateMachineManager`
- Fix `StatemachineErrorHandlingTest`
- Fix compile errors in `RetryFlowMockTest` + `VaultObserverExceptionTest`
- Add method to `StaffedFlowHospital` that was missed during original merge
This commit is contained in:
LankyDan 2019-11-01 07:59:09 +00:00
parent bedfba8c3d
commit e737b01184
5 changed files with 150 additions and 67 deletions

View File

@ -39,8 +39,6 @@ import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.NodeHandle
import net.corda.testing.driver.NodeParameters
import net.corda.testing.driver.driver
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.node.NotarySpec
import net.corda.testing.node.TestCordapp
import net.corda.testing.node.User
@ -59,10 +57,9 @@ import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
@Suppress("MaxLineLength") // Byteman rules cannot be easily wrapped
class StatemachineErrorHandlingTest : IntegrationTest() {
class StatemachineErrorHandlingTest {
companion object {
val databaseSchemas = IntegrationTestSchemas(CHARLIE_NAME, ALICE_NAME, DUMMY_NOTARY_NAME)
val rpcUser = User("user1", "test", permissions = setOf(Permissions.all()))
var counter = 0
}
@ -715,7 +712,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
ENDRULE
RULE Throw exception on retry
CLASS ${MultiThreadedStateMachineManager::class.java.name}
CLASS ${SingleThreadedStateMachineManager::class.java.name}
METHOD addAndStartFlow
AT ENTRY
IF flagged("suspend_flag") && flagged("commit_flag") && !flagged("retry_exception_flag")
@ -899,7 +896,7 @@ class StatemachineErrorHandlingTest : IntegrationTest() {
ENDRULE
RULE Throw exception on retry
CLASS ${MultiThreadedStateMachineManager::class.java.name}
CLASS ${SingleThreadedStateMachineManager::class.java.name}
METHOD onExternalStartFlow
AT ENTRY
IF flagged("commit_exception_flag") && !flagged("retry_exception_flag")

View File

@ -12,11 +12,8 @@ import net.corda.core.utilities.seconds
import net.corda.node.services.Permissions
import net.corda.node.services.statemachine.StaffedFlowHospital
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.DUMMY_NOTARY_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import net.corda.testing.internal.IntegrationTest
import net.corda.testing.internal.IntegrationTestSchemas
import net.corda.testing.node.User
import net.corda.testing.node.internal.findCordapp
import org.junit.After
@ -31,11 +28,10 @@ import java.util.concurrent.TimeoutException
import javax.persistence.PersistenceException
import kotlin.test.assertFailsWith
class VaultObserverExceptionTest : IntegrationTest() {
class VaultObserverExceptionTest {
companion object {
@ClassRule
@JvmField
val databaseSchemas = IntegrationTestSchemas(ALICE_NAME, DUMMY_NOTARY_NAME)
val log = contextLogger()
@ -46,8 +42,7 @@ class VaultObserverExceptionTest : IntegrationTest() {
}
@After
override fun tearDown() {
super.tearDown()
fun tearDown() {
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
StaffedFlowHospital.onFlowAdmitted.clear()

View File

@ -16,6 +16,7 @@ import net.corda.core.identity.Party
import net.corda.core.internal.*
import net.corda.core.internal.concurrent.OpenFuture
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.mapError
import net.corda.core.internal.concurrent.openFuture
import net.corda.core.messaging.DataFeed
import net.corda.core.serialization.SerializedBytes
@ -210,12 +211,14 @@ class SingleThreadedStateMachineManager(
}
private fun <A> startFlow(
flowId: StateMachineRunId,
flowLogic: FlowLogic<A>,
context: InvocationContext,
ourIdentity: Party?,
deduplicationHandler: DeduplicationHandler?
): CordaFuture<FlowStateMachine<A>> {
return startFlowInternal(
flowId,
invocationContext = context,
flowLogic = flowLogic,
flowStart = FlowStart.Explicit,
@ -230,7 +233,10 @@ class SingleThreadedStateMachineManager(
cancelTimeoutIfScheduled(id)
val flow = flows.remove(id)
if (flow != null) {
logger.debug("Killing flow known to physical node.")
flow.fiber.transientState?.let {
flow.fiber.transientState = TransientReference(it.value.copy(isRemoved = true))
}
logger.info("Killing flow $id known to this node.")
decrementLiveFibers()
totalFinishedFlows.inc()
try {
@ -343,58 +349,71 @@ class SingleThreadedStateMachineManager(
}
}
@Suppress("TooGenericExceptionCaught", "ComplexMethod") // this is fully intentional here, see comment in the catch clause
override fun retryFlowFromSafePoint(currentState: StateMachineState) {
// Get set of external events
val flowId = currentState.flowLogic.runId
val oldFlowLeftOver = mutex.locked { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
if (oldFlowLeftOver == null) {
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
try {
val oldFlowLeftOver = mutex.locked { flows[flowId] }?.fiber?.transientValues?.value?.eventQueue
if (oldFlowLeftOver == null) {
logger.error("Unable to find flow for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
// Resurrect flow
createFlowFromCheckpoint(
val flow = if (currentState.isAnyCheckpointPersisted) {
// We intentionally grab the checkpoint from storage rather than relying on the one referenced by currentState. This is so that
// we mirror exactly what happens when restarting the node.
val serializedCheckpoint = checkpointStorage.getCheckpoint(flowId)
if (serializedCheckpoint == null) {
logger.error("Unable to find database checkpoint for flow $flowId. Something is very wrong. The flow will not retry.")
return
}
// Resurrect flow
createFlowFromCheckpoint(
id = flowId,
serializedCheckpoint = serializedCheckpoint,
initialDeduplicationHandler = null,
isAnyCheckpointPersisted = true,
isStartIdempotent = false
) ?: return
} else {
// Just flow initiation message
null
}
mutex.locked {
if (stopping) {
return
) ?: return
} else {
// Just flow initiation message
null
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
mutex.locked {
if (stopping) {
return
}
// Remove any sessions the old flow has.
for (sessionId in getFlowSessionIds(currentState.checkpoint)) {
sessionToFlow.remove(sessionId)
}
if (flow != null) {
injectOldProgressTracker(currentState.flowLogic.progressTracker, flow.fiber.logic)
addAndStartFlow(flowId, flow)
}
// Deliver all the external events from the old flow instance.
val unprocessedExternalEvents = mutableListOf<ExternalEvent>()
do {
val event = oldFlowLeftOver.tryReceive()
if (event is Event.GeneratedByExternalEvent) {
unprocessedExternalEvents += event.deduplicationHandler.externalCause
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
} while (event != null)
val externalEvents = currentState.pendingDeduplicationHandlers.map { it.externalCause } + unprocessedExternalEvents
for (externalEvent in externalEvents) {
deliverExternalEvent(externalEvent)
}
} catch (e: Exception) {
// Failed to retry - manually put the flow in for observation rather than
// relying on the [HospitalisingInterceptor] to do so
val exceptions = (currentState.checkpoint.errorState as? ErrorState.Errored)
?.errors
?.map { it.exception }
?.plus(e) ?: emptyList()
logger.info("Failed to retry flow $flowId, keeping in for observation and aborting")
flowHospital.forceIntoOvernightObservation(flowId, exceptions)
throw e
}
}
@ -410,7 +429,13 @@ class SingleThreadedStateMachineManager(
}
private fun <T> onExternalStartFlow(event: ExternalEvent.ExternalStartFlowEvent<T>) {
val future = startFlow(event.flowLogic, event.context, ourIdentity = null, deduplicationHandler = event.deduplicationHandler)
val future = startFlow(
event.flowId,
event.flowLogic,
event.context,
ourIdentity = null,
deduplicationHandler = event.deduplicationHandler
)
event.wireUpFuture(future)
}
@ -476,7 +501,16 @@ class SingleThreadedStateMachineManager(
is InitiatedFlowFactory.Core -> event.receivedMessage.platformVersion
is InitiatedFlowFactory.CorDapp -> null
}
startInitiatedFlow(flowLogic, event.deduplicationHandler, senderSession, initiatedSessionId, sessionMessage, senderCoreFlowVersion, initiatedFlowInfo)
startInitiatedFlow(
event.flowId,
flowLogic,
event.deduplicationHandler,
senderSession,
initiatedSessionId,
sessionMessage,
senderCoreFlowVersion,
initiatedFlowInfo
)
} catch (t: Throwable) {
logger.warn("Unable to initiate flow from $sender (appName=${sessionMessage.appName} " +
"flowVersion=${sessionMessage.flowVersion}), sending to the flow hospital", t)
@ -503,7 +537,9 @@ class SingleThreadedStateMachineManager(
return serviceHub.getFlowFactory(initiatorFlowClass) ?: throw SessionRejectException.NotRegistered(initiatorFlowClass)
}
@Suppress("LongParameterList")
private fun <A> startInitiatedFlow(
flowId: StateMachineRunId,
flowLogic: FlowLogic<A>,
initiatingMessageDeduplicationHandler: DeduplicationHandler,
peerSession: FlowSessionImpl,
@ -515,13 +551,19 @@ class SingleThreadedStateMachineManager(
val flowStart = FlowStart.Initiated(peerSession, initiatedSessionId, initiatingMessage, senderCoreFlowVersion, initiatedFlowInfo)
val ourIdentity = ourFirstIdentity
startFlowInternal(
InvocationContext.peer(peerSession.counterparty.name), flowLogic, flowStart, ourIdentity,
flowId,
InvocationContext.peer(peerSession.counterparty.name),
flowLogic,
flowStart,
ourIdentity,
initiatingMessageDeduplicationHandler,
isStartIdempotent = false
)
}
@Suppress("LongParameterList")
private fun <A> startFlowInternal(
flowId: StateMachineRunId,
invocationContext: InvocationContext,
flowLogic: FlowLogic<A>,
flowStart: FlowStart,
@ -529,7 +571,6 @@ class SingleThreadedStateMachineManager(
deduplicationHandler: DeduplicationHandler?,
isStartIdempotent: Boolean
): CordaFuture<FlowStateMachine<A>> {
val flowId = StateMachineRunId.createRandom()
// Before we construct the state machine state by freezing the FlowLogic we need to make sure that lazy properties
// have access to the fiber (and thereby the service hub)
@ -541,22 +582,44 @@ class SingleThreadedStateMachineManager(
val flowCorDappVersion = createSubFlowVersion(serviceHub.cordappProvider.getCordappForFlow(flowLogic), serviceHub.myInfo.platformVersion)
val initialCheckpoint = Checkpoint.create(
invocationContext,
flowStart,
flowLogic.javaClass,
frozenFlowLogic,
ourIdentity,
flowCorDappVersion,
flowLogic.isEnabledTimedFlow()
val flowAlreadyExists = mutex.locked { flows[flowId] != null }
val existingCheckpoint = if (flowAlreadyExists) {
// Load the flow's checkpoint
// The checkpoint will be missing if the flow failed before persisting the original checkpoint
// CORDA-3359 - Do not start/retry a flow that failed after deleting its checkpoint (the whole of the flow might replay)
checkpointStorage.getCheckpoint(flowId)?.let { serializedCheckpoint ->
val checkpoint = tryCheckpointDeserialize(serializedCheckpoint, flowId)
if (checkpoint == null) {
return openFuture<FlowStateMachine<A>>().mapError {
IllegalStateException("Unable to deserialize database checkpoint for flow $flowId. " +
"Something is very wrong. The flow will not retry.")
}
} else {
checkpoint
}
}
} else {
// This is a brand new flow
null
}
val checkpoint = existingCheckpoint ?: Checkpoint.create(
invocationContext,
flowStart,
flowLogic.javaClass,
frozenFlowLogic,
ourIdentity,
flowCorDappVersion,
flowLogic.isEnabledTimedFlow()
).getOrThrow()
val startedFuture = openFuture<Unit>()
val initialState = StateMachineState(
checkpoint = initialCheckpoint,
checkpoint = checkpoint,
pendingDeduplicationHandlers = deduplicationHandler?.let { listOf(it) } ?: emptyList(),
isFlowResumed = false,
isTransactionTracked = false,
isAnyCheckpointPersisted = false,
isAnyCheckpointPersisted = existingCheckpoint != null,
isStartIdempotent = isStartIdempotent,
isRemoved = false,
flowLogic = flowLogic,

View File

@ -158,6 +158,33 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
return true
}
/**
* Forces the flow to be kept in for overnight observation by the hospital. A flow must already exist inside the hospital
* and have existing medical records for it to be moved to overnight observation. If it does not meet these criteria then
* an [IllegalArgumentException] will be thrown.
*
* @param id The [StateMachineRunId] of the flow that you are trying to force into observation
* @param errors The errors to include in the new medical record
*/
fun forceIntoOvernightObservation(id: StateMachineRunId, errors: List<Throwable>) {
mutex.locked {
// If a flow does not meet the criteria below, then it has moved into an invalid state or the function is being
// called from an incorrect location. The assertions below should error out the flow if they are not true.
requireNotNull(flowsInHospital[id]) { "Flow must already be in the hospital before forcing into overnight observation" }
val history = requireNotNull(flowPatients[id]) { "Flow must already have history before forcing into overnight observation" }
// Use the last staff member that last discharged the flow as the current staff member
val record = history.records.last().copy(
time = clock.instant(),
errors = errors,
outcome = Outcome.OVERNIGHT_OBSERVATION
)
onFlowKeptForOvernightObservation.forEach { hook -> hook.invoke(id, record.by.map { it.toString() }) }
history.records += record
recordsPublisher.onNext(record)
}
}
/**
* Request treatment for the [flowFiber]. A flow can only be added to the hospital if they are not already being
* treated.

View File

@ -2,6 +2,7 @@ package net.corda.node.services.statemachine
import co.paralleluniverse.fibers.Suspendable
import net.corda.core.concurrent.CordaFuture
import net.corda.core.flows.Destination
import net.corda.core.flows.FlowInfo
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.FlowSession