mirror of
https://github.com/corda/corda.git
synced 2025-02-22 18:12:53 +00:00
ENT-6743: Contains method of flow hospital now correctly returns if flow is in hospital. Historic patient records not used.
This commit is contained in:
parent
35986f7ad5
commit
fa607024c2
@ -103,17 +103,6 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
|||||||
* Flows should be removed from [flowsInHospital] when they have completed a successful transition.
|
* Flows should be removed from [flowsInHospital] when they have completed a successful transition.
|
||||||
*/
|
*/
|
||||||
private val flowsInHospital = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
|
private val flowsInHospital = ConcurrentHashMap<StateMachineRunId, FlowFiber>()
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns true if the flow is currently being treated in the hospital.
|
|
||||||
* The differs to flows with a medical history (which can accessed via [StaffedFlowHospital.contains]).
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
internal fun flowInHospital(runId: StateMachineRunId): Boolean {
|
|
||||||
// The .keys avoids https://youtrack.jetbrains.com/issue/KT-18053
|
|
||||||
return runId in flowsInHospital.keys
|
|
||||||
}
|
|
||||||
|
|
||||||
private val mutex = ThreadBox(object {
|
private val mutex = ThreadBox(object {
|
||||||
/**
|
/**
|
||||||
* Contains medical history of every flow (a patient) that has entered the hospital. A flow can leave the hospital,
|
* Contains medical history of every flow (a patient) that has entered the hospital. A flow can leave the hospital,
|
||||||
@ -347,7 +336,7 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
operator fun contains(flowId: StateMachineRunId) = mutex.locked { flowId in flowPatients }
|
operator fun contains(flowId: StateMachineRunId) = flowId in flowsInHospital.keys
|
||||||
|
|
||||||
override fun close() {
|
override fun close() {
|
||||||
hospitalJobTimer.cancel()
|
hospitalJobTimer.cancel()
|
||||||
|
@ -6,6 +6,7 @@ import net.corda.core.flows.Destination
|
|||||||
import net.corda.core.flows.FlowInfo
|
import net.corda.core.flows.FlowInfo
|
||||||
import net.corda.core.flows.FlowLogic
|
import net.corda.core.flows.FlowLogic
|
||||||
import net.corda.core.flows.FlowSession
|
import net.corda.core.flows.FlowSession
|
||||||
|
import net.corda.core.flows.HospitalizeFlowException
|
||||||
import net.corda.core.flows.InitiatedBy
|
import net.corda.core.flows.InitiatedBy
|
||||||
import net.corda.core.flows.InitiatingFlow
|
import net.corda.core.flows.InitiatingFlow
|
||||||
import net.corda.core.flows.KilledFlowException
|
import net.corda.core.flows.KilledFlowException
|
||||||
@ -29,6 +30,7 @@ import net.corda.testing.node.internal.MessagingServiceSpy
|
|||||||
import net.corda.testing.node.internal.TestStartedNode
|
import net.corda.testing.node.internal.TestStartedNode
|
||||||
import net.corda.testing.node.internal.enclosedCordapp
|
import net.corda.testing.node.internal.enclosedCordapp
|
||||||
import net.corda.testing.node.internal.newContext
|
import net.corda.testing.node.internal.newContext
|
||||||
|
import net.corda.testing.node.internal.startFlow
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
import org.assertj.core.api.Assertions.assertThatExceptionOfType
|
||||||
import org.h2.util.Utils
|
import org.h2.util.Utils
|
||||||
@ -47,6 +49,7 @@ import java.util.concurrent.Semaphore
|
|||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
import kotlin.test.assertFailsWith
|
import kotlin.test.assertFailsWith
|
||||||
|
import kotlin.test.assertFalse
|
||||||
import kotlin.test.assertNotNull
|
import kotlin.test.assertNotNull
|
||||||
import kotlin.test.assertNull
|
import kotlin.test.assertNull
|
||||||
|
|
||||||
@ -324,6 +327,25 @@ class RetryFlowMockTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class HospitalizeThenSucceedFlow : FlowLogic<Boolean>() {
|
||||||
|
companion object {
|
||||||
|
var runs = 0
|
||||||
|
var flowRetried = Semaphore(0)
|
||||||
|
var flowWillReturn = Semaphore(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Suspendable
|
||||||
|
override fun call(): Boolean {
|
||||||
|
if (runs == 0) {
|
||||||
|
runs++
|
||||||
|
throw HospitalizeFlowException("Hospitalize on first run")
|
||||||
|
}
|
||||||
|
flowRetried.release()
|
||||||
|
flowWillReturn.acquire()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@InitiatingFlow
|
@InitiatingFlow
|
||||||
class UnbalancedSendAndReceiveFlow(private val other: Party) : FlowLogic<Unit>() {
|
class UnbalancedSendAndReceiveFlow(private val other: Party) : FlowLogic<Unit>() {
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user