mirror of
https://github.com/corda/corda.git
synced 2025-01-13 16:30:25 +00:00
CORDA-3195 Default behaviour of FlowHospital (WIP) (#2520)
* Add GP to flow hospital, and start working on a list of things the GP knows to be incurable. * Only hospitalise SQL and Persistence Exceptions (let's see if that is enough?), also rename to DatabaseDentist. * Disabled hospitalisation of SQL exceptions in flow retry tests * Fix RPC exception handling test by not using PersistenceException * Ignore flaky integration test * Code review: Rename staff member and add testing annotation * Revert compiler.xml
This commit is contained in:
parent
93ff072812
commit
938828b52f
@ -17,6 +17,7 @@ import net.corda.core.utilities.getOrThrow
|
|||||||
import net.corda.core.utilities.unwrap
|
import net.corda.core.utilities.unwrap
|
||||||
import net.corda.node.services.Permissions
|
import net.corda.node.services.Permissions
|
||||||
import net.corda.node.services.statemachine.FlowTimeoutException
|
import net.corda.node.services.statemachine.FlowTimeoutException
|
||||||
|
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
import net.corda.testing.core.BOB_NAME
|
import net.corda.testing.core.BOB_NAME
|
||||||
import net.corda.testing.core.singleIdentity
|
import net.corda.testing.core.singleIdentity
|
||||||
@ -25,6 +26,7 @@ import net.corda.testing.driver.driver
|
|||||||
import net.corda.testing.node.User
|
import net.corda.testing.node.User
|
||||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||||
import org.hibernate.exception.ConstraintViolationException
|
import org.hibernate.exception.ConstraintViolationException
|
||||||
|
import org.junit.After
|
||||||
import org.junit.Before
|
import org.junit.Before
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import java.lang.management.ManagementFactory
|
import java.lang.management.ManagementFactory
|
||||||
@ -46,6 +48,12 @@ class FlowRetryTest {
|
|||||||
TransientConnectionFailureFlow.retryCount = -1
|
TransientConnectionFailureFlow.retryCount = -1
|
||||||
WrappedTransientConnectionFailureFlow.retryCount = -1
|
WrappedTransientConnectionFailureFlow.retryCount = -1
|
||||||
GeneralExternalFailureFlow.retryCount = -1
|
GeneralExternalFailureFlow.retryCount = -1
|
||||||
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> true }
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
fun cleanUp() {
|
||||||
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -140,7 +140,7 @@ class InitiatedFlow(private val initiatingSession: FlowSession) : FlowLogic<Unit
|
|||||||
@Suspendable
|
@Suspendable
|
||||||
override fun call() {
|
override fun call() {
|
||||||
initiatingSession.receive<String>().unwrap { it }
|
initiatingSession.receive<String>().unwrap { it }
|
||||||
throw GenericJDBCException("Something went wrong!", SQLException("Oops!"))
|
throw ClientRelevantException("Something went wrong!", SQLException("Oops!"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,10 +7,7 @@ import net.corda.core.flows.ReceiveTransactionFlow
|
|||||||
import net.corda.core.flows.StateMachineRunId
|
import net.corda.core.flows.StateMachineRunId
|
||||||
import net.corda.core.flows.UnexpectedFlowEndException
|
import net.corda.core.flows.UnexpectedFlowEndException
|
||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.internal.DeclaredField
|
import net.corda.core.internal.*
|
||||||
import net.corda.core.internal.ThreadBox
|
|
||||||
import net.corda.core.internal.TimedFlow
|
|
||||||
import net.corda.core.internal.bufferUntilSubscribed
|
|
||||||
import net.corda.core.messaging.DataFeed
|
import net.corda.core.messaging.DataFeed
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.core.utilities.seconds
|
import net.corda.core.utilities.seconds
|
||||||
@ -22,6 +19,7 @@ import java.sql.SQLTransientConnectionException
|
|||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
import javax.persistence.PersistenceException
|
||||||
import kotlin.math.pow
|
import kotlin.math.pow
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -31,11 +29,12 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
|||||||
private companion object {
|
private companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
private val staff = listOf(
|
private val staff = listOf(
|
||||||
DeadlockNurse,
|
DeadlockNurse,
|
||||||
DuplicateInsertSpecialist,
|
DuplicateInsertSpecialist,
|
||||||
DoctorTimeout,
|
DoctorTimeout,
|
||||||
FinalityDoctor,
|
FinalityDoctor,
|
||||||
TransientConnectionCardiologist
|
TransientConnectionCardiologist,
|
||||||
|
DatabaseEndocrinologist
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -259,7 +258,6 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
|||||||
NOT_MY_SPECIALTY
|
NOT_MY_SPECIALTY
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
interface Staff {
|
interface Staff {
|
||||||
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
|
fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis
|
||||||
}
|
}
|
||||||
@ -334,7 +332,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun isErrorPropagatedFromCounterparty(error: Throwable): Boolean {
|
private fun isErrorPropagatedFromCounterparty(error: Throwable): Boolean {
|
||||||
return when(error) {
|
return when (error) {
|
||||||
is UnexpectedFlowEndException -> {
|
is UnexpectedFlowEndException -> {
|
||||||
val peer = DeclaredField<Party?>(UnexpectedFlowEndException::class.java, "peer", error).value
|
val peer = DeclaredField<Party?>(UnexpectedFlowEndException::class.java, "peer", error).value
|
||||||
peer != null
|
peer != null
|
||||||
@ -361,7 +359,6 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
|||||||
return strippedStacktrace.isNotEmpty() &&
|
return strippedStacktrace.isNotEmpty() &&
|
||||||
strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!! )
|
strippedStacktrace.first().className.startsWith(ReceiveTransactionFlow::class.qualifiedName!! )
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -384,6 +381,23 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging, private val
|
|||||||
return exception.mentionsThrowable(SQLTransientConnectionException::class.java, "connection is not available")
|
return exception.mentionsThrowable(SQLTransientConnectionException::class.java, "connection is not available")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hospitalise any database (SQL) exception that wasn't handled otherwise, unless on the configurable whitelist
|
||||||
|
* Note that retry decisions from other specialists will not be affected as retries take precedence over hospitalisation.
|
||||||
|
*/
|
||||||
|
object DatabaseEndocrinologist : Staff {
|
||||||
|
override fun consult(flowFiber: FlowFiber, currentState: StateMachineState, newError: Throwable, history: FlowMedicalHistory): Diagnosis {
|
||||||
|
return if (( newError is SQLException || newError is PersistenceException )&& !customConditions.any{it(newError)}) {
|
||||||
|
Diagnosis.OVERNIGHT_OBSERVATION
|
||||||
|
} else {
|
||||||
|
Diagnosis.NOT_MY_SPECIALTY
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
val customConditions = mutableSetOf<(t: Throwable) -> Boolean>()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {
|
private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>, errorMessage: String? = null): Boolean {
|
||||||
@ -396,4 +410,5 @@ private fun <T : Throwable> Throwable?.mentionsThrowable(exceptionType: Class<T>
|
|||||||
true
|
true
|
||||||
}
|
}
|
||||||
return (exceptionType.isAssignableFrom(this::class.java) && containsMessage) || cause.mentionsThrowable(exceptionType, errorMessage)
|
return (exceptionType.isAssignableFrom(this::class.java) && containsMessage) || cause.mentionsThrowable(exceptionType, errorMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,6 +49,8 @@ class RetryFlowMockTest {
|
|||||||
SendAndRetryFlow.count = 0
|
SendAndRetryFlow.count = 0
|
||||||
RetryInsertFlow.count = 0
|
RetryInsertFlow.count = 0
|
||||||
KeepSendingFlow.count.set(0)
|
KeepSendingFlow.count.set(0)
|
||||||
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is LimitedRetryCausingError }
|
||||||
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add { t -> t is RetryCausingError }
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun <T> TestStartedNode.startFlow(logic: FlowLogic<T>): CordaFuture<T> {
|
private fun <T> TestStartedNode.startFlow(logic: FlowLogic<T>): CordaFuture<T> {
|
||||||
@ -58,6 +60,7 @@ class RetryFlowMockTest {
|
|||||||
@After
|
@After
|
||||||
fun cleanUp() {
|
fun cleanUp() {
|
||||||
mockNet.stopNodes()
|
mockNet.stopNodes()
|
||||||
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -6,6 +6,7 @@ import net.corda.core.identity.CordaX500Name
|
|||||||
import net.corda.core.identity.Party
|
import net.corda.core.identity.Party
|
||||||
import net.corda.core.node.services.queryBy
|
import net.corda.core.node.services.queryBy
|
||||||
import net.corda.core.transactions.TransactionBuilder
|
import net.corda.core.transactions.TransactionBuilder
|
||||||
|
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||||
import net.corda.testing.core.DummyCommandData
|
import net.corda.testing.core.DummyCommandData
|
||||||
import net.corda.testing.core.singleIdentity
|
import net.corda.testing.core.singleIdentity
|
||||||
import net.corda.testing.internal.vault.DUMMY_DEAL_PROGRAM_ID
|
import net.corda.testing.internal.vault.DUMMY_DEAL_PROGRAM_ID
|
||||||
@ -48,10 +49,12 @@ class VaultFlowTest {
|
|||||||
@After
|
@After
|
||||||
fun tearDown() {
|
fun tearDown() {
|
||||||
mockNetwork.stopNodes()
|
mockNetwork.stopNodes()
|
||||||
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.clear()
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun `Unique column constraint failing causes states to not persist to vaults`() {
|
fun `Unique column constraint failing causes states to not persist to vaults`() {
|
||||||
|
StaffedFlowHospital.DatabaseEndocrinologist.customConditions.add( { t: Throwable -> t is javax.persistence.PersistenceException })
|
||||||
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
|
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
|
||||||
Assertions.assertThatExceptionOfType(ExecutionException::class.java).isThrownBy {
|
Assertions.assertThatExceptionOfType(ExecutionException::class.java).isThrownBy {
|
||||||
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
|
partyA.startFlow(Initiator(listOf(partyA.info.singleIdentity(), partyB.info.singleIdentity()))).get()
|
||||||
|
Loading…
Reference in New Issue
Block a user