mirror of
https://github.com/corda/corda.git
synced 2025-04-16 07:27:17 +00:00
NOTICK Increase timeouts in external operation tests (#6076)
* NOTICK Increase timeouts in external operation tests It seems these tests are timing out when they shouldn't be when running on the CI servers. To resolve this, the timeouts of `getOrThrow` calls have been increased to a minute. Reliance on timeouts has also been reduced through using locks. This should speed up tests that don't need to wait for the timeout anymore to progress. * NOTICK Remove hospital checking code * NOTICK Change locking code
This commit is contained in:
parent
f9ccb88fea
commit
fb64e47326
@ -23,7 +23,6 @@ import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
import static net.corda.testing.driver.Driver.driver;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class FlowExternalOperationInJavaTest extends AbstractFlowExternalOperationTest {
|
||||
|
||||
@ -32,16 +31,16 @@ public class FlowExternalOperationInJavaTest extends AbstractFlowExternalOperati
|
||||
driver(new DriverParameters().withStartNodesInProcess(true), driver -> {
|
||||
NodeHandle alice = KotlinUtilsKt.getOrThrow(
|
||||
driver.startNode(new NodeParameters().withProvidedName(TestConstants.ALICE_NAME)),
|
||||
Duration.of(20, ChronoUnit.SECONDS)
|
||||
Duration.of(1, ChronoUnit.MINUTES)
|
||||
);
|
||||
NodeHandle bob = KotlinUtilsKt.getOrThrow(
|
||||
driver.startNode(new NodeParameters().withProvidedName(TestConstants.BOB_NAME)),
|
||||
Duration.of(20, ChronoUnit.SECONDS)
|
||||
Duration.of(1, ChronoUnit.MINUTES)
|
||||
);
|
||||
return KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
|
||||
FlowWithExternalOperationInJava.class,
|
||||
TestUtils.singleIdentity(bob.getNodeInfo())
|
||||
).getReturnValue(), Duration.of(20, ChronoUnit.SECONDS));
|
||||
).getReturnValue(), Duration.of(1, ChronoUnit.MINUTES));
|
||||
});
|
||||
}
|
||||
|
||||
@ -50,16 +49,16 @@ public class FlowExternalOperationInJavaTest extends AbstractFlowExternalOperati
|
||||
driver(new DriverParameters().withStartNodesInProcess(true), driver -> {
|
||||
NodeHandle alice = KotlinUtilsKt.getOrThrow(
|
||||
driver.startNode(new NodeParameters().withProvidedName(TestConstants.ALICE_NAME)),
|
||||
Duration.of(20, ChronoUnit.SECONDS)
|
||||
Duration.of(1, ChronoUnit.MINUTES)
|
||||
);
|
||||
NodeHandle bob = KotlinUtilsKt.getOrThrow(
|
||||
driver.startNode(new NodeParameters().withProvidedName(TestConstants.BOB_NAME)),
|
||||
Duration.of(20, ChronoUnit.SECONDS)
|
||||
Duration.of(1, ChronoUnit.MINUTES)
|
||||
);
|
||||
return KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
|
||||
FlowWithExternalAsyncOperationInJava.class,
|
||||
TestUtils.singleIdentity(bob.getNodeInfo())
|
||||
).getReturnValue(), Duration.of(20, ChronoUnit.SECONDS));
|
||||
).getReturnValue(), Duration.of(1, ChronoUnit.MINUTES));
|
||||
});
|
||||
}
|
||||
|
||||
@ -68,22 +67,18 @@ public class FlowExternalOperationInJavaTest extends AbstractFlowExternalOperati
|
||||
driver(new DriverParameters().withStartNodesInProcess(true), driver -> {
|
||||
NodeHandle alice = KotlinUtilsKt.getOrThrow(
|
||||
driver.startNode(new NodeParameters().withProvidedName(TestConstants.ALICE_NAME)),
|
||||
Duration.of(20, ChronoUnit.SECONDS)
|
||||
Duration.of(1, ChronoUnit.MINUTES)
|
||||
);
|
||||
NodeHandle bob = KotlinUtilsKt.getOrThrow(
|
||||
driver.startNode(new NodeParameters().withProvidedName(TestConstants.BOB_NAME)),
|
||||
Duration.of(20, ChronoUnit.SECONDS)
|
||||
Duration.of(1, ChronoUnit.MINUTES)
|
||||
);
|
||||
KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
|
||||
FlowWithExternalOperationThatGetsRetriedInJava.class,
|
||||
TestUtils.singleIdentity(bob.getNodeInfo())
|
||||
).getReturnValue(), Duration.of(20, ChronoUnit.SECONDS));
|
||||
).getReturnValue(), Duration.of(1, ChronoUnit.MINUTES));
|
||||
|
||||
HospitalCounts counts = KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
|
||||
GetHospitalCountersFlow.class
|
||||
).getReturnValue(), Duration.of(20, ChronoUnit.SECONDS));
|
||||
assertEquals(1, counts.getDischarge());
|
||||
assertEquals(0, counts.getObservation());
|
||||
assertHospitalCounters(1, 0);
|
||||
|
||||
return null;
|
||||
});
|
||||
|
@ -12,26 +12,54 @@ import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.flows.StartableByService
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.internal.concurrent.doOnComplete
|
||||
import net.corda.core.messaging.FlowHandle
|
||||
import net.corda.core.node.AppServiceHub
|
||||
import net.corda.core.node.ServiceHub
|
||||
import net.corda.core.node.services.CordaService
|
||||
import net.corda.core.schemas.MappedSchema
|
||||
import net.corda.core.serialization.CordaSerializable
|
||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import org.junit.Before
|
||||
import java.sql.SQLTransientConnectionException
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.Semaphore
|
||||
import java.util.function.Supplier
|
||||
import javax.persistence.Column
|
||||
import javax.persistence.Entity
|
||||
import javax.persistence.Id
|
||||
import javax.persistence.Table
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
abstract class AbstractFlowExternalOperationTest {
|
||||
|
||||
var dischargeCounter = 0
|
||||
var observationCounter = 0
|
||||
|
||||
@Before
|
||||
fun before() {
|
||||
StaffedFlowHospital.onFlowDischarged.clear()
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++dischargeCounter }
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.clear()
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
|
||||
dischargeCounter = 0
|
||||
observationCounter = 0
|
||||
}
|
||||
|
||||
fun blockUntilFlowKeptInForObservation(flow: () -> FlowHandle<*>) {
|
||||
val lock = Semaphore(0)
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> lock.release() }
|
||||
flow()
|
||||
lock.acquire()
|
||||
}
|
||||
|
||||
fun assertHospitalCounters(discharge: Int, observation: Int) {
|
||||
assertEquals(discharge, dischargeCounter)
|
||||
assertEquals(observation, observationCounter)
|
||||
}
|
||||
|
||||
@StartableByRPC
|
||||
@InitiatingFlow
|
||||
@StartableByService
|
||||
@ -182,31 +210,6 @@ abstract class AbstractFlowExternalOperationTest {
|
||||
|
||||
object CustomMappedSchema : MappedSchema(CustomSchema::class.java, 1, listOf(CustomTableEntity::class.java))
|
||||
|
||||
// Internal use for testing only!!
|
||||
@StartableByRPC
|
||||
class GetHospitalCountersFlow : FlowLogic<HospitalCounts>() {
|
||||
override fun call(): HospitalCounts =
|
||||
HospitalCounts(
|
||||
serviceHub.cordaService(HospitalCounter::class.java).dischargeCounter,
|
||||
serviceHub.cordaService(HospitalCounter::class.java).observationCounter
|
||||
)
|
||||
}
|
||||
|
||||
@CordaSerializable
|
||||
data class HospitalCounts(val discharge: Int, val observation: Int)
|
||||
|
||||
@Suppress("UNUSED_PARAMETER")
|
||||
@CordaService
|
||||
class HospitalCounter(services: AppServiceHub) : SingletonSerializeAsToken() {
|
||||
var observationCounter: Int = 0
|
||||
var dischargeCounter: Int = 0
|
||||
|
||||
init {
|
||||
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++dischargeCounter }
|
||||
StaffedFlowHospital.onFlowKeptForOvernightObservation.add { _, _ -> ++observationCounter }
|
||||
}
|
||||
}
|
||||
|
||||
class MyCordaException(message: String) : CordaException(message)
|
||||
|
||||
class DirectlyAccessedServiceHubException : CordaException("Null pointer from accessing flow's serviceHub")
|
||||
|
@ -6,12 +6,7 @@ import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.DirectlyAccessedServiceHubException
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.ExternalAsyncOperation
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.FlowWithExternalProcess
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.FutureService
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.MyCordaException
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.node.services.statemachine.StateTransitionException
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
@ -21,28 +16,24 @@ import net.corda.testing.driver.driver
|
||||
import org.junit.Test
|
||||
import java.sql.SQLTransientConnectionException
|
||||
import java.util.concurrent.CompletableFuture
|
||||
import java.util.concurrent.TimeoutException
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
alice.rpc.startFlow(::FlowWithExternalAsyncOperation, bob.nodeInfo.singleIdentity())
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation that checks deduplicationId is not rerun when flow is retried`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation that checks deduplicationId is not rerun when flow is retried`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
@ -50,16 +41,14 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalAsyncOperationWithDeduplication,
|
||||
bob.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
).returnValue.getOrThrow(1.minutes)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(1, discharged)
|
||||
assertEquals(0, observation)
|
||||
assertHospitalCounters(1, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation propagates exception to calling flow`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation propagates exception to calling flow`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
@ -68,100 +57,88 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
|
||||
::FlowWithExternalAsyncOperationPropagatesException,
|
||||
bob.nodeInfo.singleIdentity(),
|
||||
MyCordaException::class.java
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
).returnValue.getOrThrow(1.minutes)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation exception can be caught in flow`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation exception can be caught in flow`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
val result = alice.rpc.startFlow(
|
||||
::FlowWithExternalAsyncOperationThatThrowsExceptionAndCaughtInFlow,
|
||||
bob.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
).returnValue.getOrThrow(1.minutes)
|
||||
assertTrue(result as Boolean)
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation with exception that hospital keeps for observation does not fail`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation with exception that hospital keeps for observation does not fail`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
assertFailsWith<TimeoutException> {
|
||||
blockUntilFlowKeptInForObservation {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalAsyncOperationPropagatesException,
|
||||
bob.nodeInfo.singleIdentity(),
|
||||
HospitalizeFlowException::class.java
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(1, observation)
|
||||
assertHospitalCounters(0, 1)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation with exception that hospital discharges is retried and runs the future again`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation with exception that hospital discharges is retried and runs the future again`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
assertFailsWith<TimeoutException> {
|
||||
blockUntilFlowKeptInForObservation {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalAsyncOperationPropagatesException,
|
||||
bob.nodeInfo.singleIdentity(),
|
||||
SQLTransientConnectionException::class.java
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(3, discharged)
|
||||
assertEquals(1, observation)
|
||||
assertHospitalCounters(3, 1)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation that throws exception rather than completing future exceptionally fails with internal exception`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation that throws exception rather than completing future exceptionally fails with internal exception`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
assertFailsWith<StateTransitionException> {
|
||||
alice.rpc.startFlow(::FlowWithExternalAsyncOperationUnhandledException, bob.nodeInfo.singleIdentity())
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation that passes serviceHub into process can be retried`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation that passes serviceHub into process can be retried`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
assertFailsWith<TimeoutException> {
|
||||
blockUntilFlowKeptInForObservation {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalAsyncOperationThatPassesInServiceHubCanRetry,
|
||||
bob.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(3, discharged)
|
||||
assertEquals(1, observation)
|
||||
assertHospitalCounters(3, 1)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation that accesses serviceHub from flow directly will fail when retried`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation that accesses serviceHub from flow directly will fail when retried`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
@ -169,23 +146,19 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalAsyncOperationThatDirectlyAccessesServiceHubFailsRetry,
|
||||
bob.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
).returnValue.getOrThrow(1.minutes)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(1, discharged)
|
||||
assertEquals(0, observation)
|
||||
assertHospitalCounters(1, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `starting multiple futures and joining on their results`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `starting multiple futures and joining on their results`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
alice.rpc.startFlow(::FlowThatStartsMultipleFuturesAndJoins, bob.nodeInfo.singleIdentity()).returnValue.getOrThrow(20.seconds)
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
alice.rpc.startFlow(::FlowThatStartsMultipleFuturesAndJoins, bob.nodeInfo.singleIdentity()).returnValue.getOrThrow(1.minutes)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,40 +5,35 @@ import net.corda.core.flows.StartableByRPC
|
||||
import net.corda.core.identity.Party
|
||||
import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
import net.corda.testing.core.BOB_NAME
|
||||
import net.corda.testing.core.singleIdentity
|
||||
import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import org.junit.Test
|
||||
import kotlin.test.assertEquals
|
||||
|
||||
class FlowExternalOperationStartFlowTest : AbstractFlowExternalOperationTest() {
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `starting a flow inside of a flow that starts a future will succeed`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `starting a flow inside of a flow that starts a future will succeed`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
alice.rpc.startFlow(::FlowThatStartsAnotherFlowInAnExternalOperation, bob.nodeInfo.singleIdentity())
|
||||
.returnValue.getOrThrow(40.seconds)
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `multiple flows can be started and their futures joined from inside a flow`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `multiple flows can be started and their futures joined from inside a flow`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
alice.rpc.startFlow(::ForkJoinFlows, bob.nodeInfo.singleIdentity())
|
||||
.returnValue.getOrThrow(40.seconds)
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -10,13 +10,8 @@ import net.corda.core.messaging.startFlow
|
||||
import net.corda.core.node.services.queryBy
|
||||
import net.corda.core.transactions.TransactionBuilder
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.CustomTableEntity
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.DirectlyAccessedServiceHubException
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.ExternalOperation
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.FlowWithExternalProcess
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.FutureService
|
||||
import net.corda.coretests.flows.AbstractFlowExternalOperationTest.MyCordaException
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.node.services.statemachine.StaffedFlowHospital
|
||||
import net.corda.testing.contracts.DummyContract
|
||||
import net.corda.testing.contracts.DummyState
|
||||
import net.corda.testing.core.ALICE_NAME
|
||||
@ -26,30 +21,26 @@ import net.corda.testing.driver.DriverParameters
|
||||
import net.corda.testing.driver.driver
|
||||
import net.corda.testing.node.internal.cordappsForPackages
|
||||
import org.junit.Test
|
||||
import java.lang.IllegalStateException
|
||||
import java.sql.SQLTransientConnectionException
|
||||
import java.util.concurrent.TimeoutException
|
||||
import kotlin.test.assertEquals
|
||||
import java.util.concurrent.Semaphore
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external operation`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external operation`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
alice.rpc.startFlow(::FlowWithExternalOperation, bob.nodeInfo.singleIdentity())
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external operation that checks deduplicationId is not rerun when flow is retried`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external operation that checks deduplicationId is not rerun when flow is retried`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
@ -57,16 +48,14 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalOperationWithDeduplication,
|
||||
bob.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
).returnValue.getOrThrow(1.minutes)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(1, discharged)
|
||||
assertEquals(0, observation)
|
||||
assertHospitalCounters(1, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external operation propagates exception to calling flow`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external operation propagates exception to calling flow`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
@ -75,82 +64,72 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
|
||||
::FlowWithExternalOperationPropagatesException,
|
||||
bob.nodeInfo.singleIdentity(),
|
||||
MyCordaException::class.java
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
).returnValue.getOrThrow(1.minutes)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external operation exception can be caught in flow`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external operation exception can be caught in flow`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
alice.rpc.startFlow(::FlowWithExternalOperationThatThrowsExceptionAndCaughtInFlow, bob.nodeInfo.singleIdentity())
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(0, observation)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertHospitalCounters(0, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external operation with exception that hospital keeps for observation does not fail`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external operation with exception that hospital keeps for observation does not fail`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
assertFailsWith<TimeoutException> {
|
||||
blockUntilFlowKeptInForObservation {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalOperationPropagatesException,
|
||||
bob.nodeInfo.singleIdentity(),
|
||||
HospitalizeFlowException::class.java
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(0, discharged)
|
||||
assertEquals(1, observation)
|
||||
assertHospitalCounters(0, 1)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external operation with exception that hospital discharges is retried and runs the external operation again`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external operation with exception that hospital discharges is retried and runs the external operation again`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
assertFailsWith<TimeoutException> {
|
||||
blockUntilFlowKeptInForObservation {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalOperationPropagatesException,
|
||||
bob.nodeInfo.singleIdentity(),
|
||||
SQLTransientConnectionException::class.java
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(3, discharged)
|
||||
assertEquals(1, observation)
|
||||
assertHospitalCounters(3, 1)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation that passes serviceHub into process can be retried`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation that passes serviceHub into process can be retried`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
assertFailsWith<TimeoutException> {
|
||||
blockUntilFlowKeptInForObservation {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalOperationThatPassesInServiceHubCanRetry,
|
||||
bob.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(3, discharged)
|
||||
assertEquals(1, observation)
|
||||
assertHospitalCounters(3, 1)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external async operation that accesses serviceHub from flow directly will fail when retried`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external async operation that accesses serviceHub from flow directly will fail when retried`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
@ -158,16 +137,14 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
|
||||
alice.rpc.startFlow(
|
||||
::FlowWithExternalOperationThatDirectlyAccessesServiceHubFailsRetry,
|
||||
bob.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
).returnValue.getOrThrow(1.minutes)
|
||||
}
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(1, discharged)
|
||||
assertEquals(0, observation)
|
||||
assertHospitalCounters(1, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `vault can be queried`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `vault can be queried`() {
|
||||
driver(
|
||||
DriverParameters(
|
||||
cordappsForAllNodes = cordappsForPackages(DummyState::class.packageName),
|
||||
@ -176,64 +153,62 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
|
||||
) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val success = alice.rpc.startFlow(::FlowWithWithExternalOperationThatQueriesVault)
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertTrue(success)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `data can be persisted to node database via entity manager`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `data can be persisted to node database via entity manager`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val success = alice.rpc.startFlow(::FlowWithExternalOperationThatPersistsViaEntityManager)
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertTrue(success)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `data can be persisted to node database via jdbc session`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `data can be persisted to node database via jdbc session`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val success = alice.rpc.startFlow(::FlowWithExternalOperationThatPersistsViaJdbcSession)
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertTrue(success)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `data can be persisted to node database via servicehub database transaction`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `data can be persisted to node database via servicehub database transaction`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val success = alice.rpc.startFlow(::FlowWithExternalOperationThatPersistsViaDatabaseTransaction)
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertTrue(success)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `data can be persisted to node database in external operation and read from another process once finished`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `data can be persisted to node database in external operation and read from another process once finished`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val success = alice.rpc.startFlow(::FlowWithExternalOperationThatPersistsToDatabaseAndReadsFromExternalOperation)
|
||||
.returnValue.getOrThrow(20.seconds)
|
||||
.returnValue.getOrThrow(1.minutes)
|
||||
assertTrue(success)
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=300_000)
|
||||
fun `external operation can be retried when an error occurs inside of database transaction`() {
|
||||
@Test(timeout = 300_000)
|
||||
fun `external operation can be retried when an error occurs inside of database transaction`() {
|
||||
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
|
||||
val alice = startNode(providedName = ALICE_NAME).getOrThrow()
|
||||
val bob = startNode(providedName = BOB_NAME).getOrThrow()
|
||||
val success = alice.rpc.startFlow(
|
||||
::FlowWithExternalOperationThatErrorsInsideOfDatabaseTransaction,
|
||||
bob.nodeInfo.singleIdentity()
|
||||
).returnValue.getOrThrow(20.seconds)
|
||||
).returnValue.getOrThrow(1.minutes)
|
||||
assertTrue(success as Boolean)
|
||||
val (discharged, observation) = alice.rpc.startFlow(::GetHospitalCountersFlow).returnValue.getOrThrow()
|
||||
assertEquals(1, discharged)
|
||||
assertEquals(0, observation)
|
||||
assertHospitalCounters(1, 0)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user