INFRA-477: Start nodes in parallel when possible (#6460)

Co-authored-by: Ross Nicoll <ross.nicoll@r3.com>
This commit is contained in:
Yiftach Kaplan 2020-07-23 16:35:34 +01:00 committed by GitHub
parent 0b1ccb48d0
commit 4acf41ea3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 397 additions and 283 deletions

View File

@ -8,6 +8,7 @@ import net.corda.core.identity.Party;
import net.corda.core.utilities.KotlinUtilsKt; import net.corda.core.utilities.KotlinUtilsKt;
import net.corda.testing.core.TestConstants; import net.corda.testing.core.TestConstants;
import net.corda.testing.core.TestUtils; import net.corda.testing.core.TestUtils;
import net.corda.testing.driver.DriverDSL;
import net.corda.testing.driver.DriverParameters; import net.corda.testing.driver.DriverParameters;
import net.corda.testing.driver.NodeHandle; import net.corda.testing.driver.NodeHandle;
import net.corda.testing.driver.NodeParameters; import net.corda.testing.driver.NodeParameters;
@ -19,8 +20,11 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable; import java.io.Serializable;
import java.time.Duration; import java.time.Duration;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.stream.Collectors;
import static net.corda.testing.driver.Driver.driver; import static net.corda.testing.driver.Driver.driver;
@ -29,14 +33,9 @@ public class FlowExternalOperationInJavaTest extends AbstractFlowExternalOperati
@Test @Test
public void awaitFlowExternalOperationInJava() { public void awaitFlowExternalOperationInJava() {
driver(new DriverParameters().withStartNodesInProcess(true), driver -> { driver(new DriverParameters().withStartNodesInProcess(true), driver -> {
NodeHandle alice = KotlinUtilsKt.getOrThrow( List<NodeHandle> aliceAndBob = aliceAndBob(driver);
driver.startNode(new NodeParameters().withProvidedName(TestConstants.ALICE_NAME)), NodeHandle alice = aliceAndBob.get(0);
Duration.of(1, ChronoUnit.MINUTES) NodeHandle bob = aliceAndBob.get(1);
);
NodeHandle bob = KotlinUtilsKt.getOrThrow(
driver.startNode(new NodeParameters().withProvidedName(TestConstants.BOB_NAME)),
Duration.of(1, ChronoUnit.MINUTES)
);
return KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic( return KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
FlowWithExternalOperationInJava.class, FlowWithExternalOperationInJava.class,
TestUtils.singleIdentity(bob.getNodeInfo()) TestUtils.singleIdentity(bob.getNodeInfo())
@ -47,14 +46,9 @@ public class FlowExternalOperationInJavaTest extends AbstractFlowExternalOperati
@Test @Test
public void awaitFlowExternalAsyncOperationInJava() { public void awaitFlowExternalAsyncOperationInJava() {
driver(new DriverParameters().withStartNodesInProcess(true), driver -> { driver(new DriverParameters().withStartNodesInProcess(true), driver -> {
NodeHandle alice = KotlinUtilsKt.getOrThrow( List<NodeHandle> aliceAndBob = aliceAndBob(driver);
driver.startNode(new NodeParameters().withProvidedName(TestConstants.ALICE_NAME)), NodeHandle alice = aliceAndBob.get(0);
Duration.of(1, ChronoUnit.MINUTES) NodeHandle bob = aliceAndBob.get(1);
);
NodeHandle bob = KotlinUtilsKt.getOrThrow(
driver.startNode(new NodeParameters().withProvidedName(TestConstants.BOB_NAME)),
Duration.of(1, ChronoUnit.MINUTES)
);
return KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic( return KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
FlowWithExternalAsyncOperationInJava.class, FlowWithExternalAsyncOperationInJava.class,
TestUtils.singleIdentity(bob.getNodeInfo()) TestUtils.singleIdentity(bob.getNodeInfo())
@ -65,14 +59,9 @@ public class FlowExternalOperationInJavaTest extends AbstractFlowExternalOperati
@Test @Test
public void awaitFlowExternalOperationInJavaCanBeRetried() { public void awaitFlowExternalOperationInJavaCanBeRetried() {
driver(new DriverParameters().withStartNodesInProcess(true), driver -> { driver(new DriverParameters().withStartNodesInProcess(true), driver -> {
NodeHandle alice = KotlinUtilsKt.getOrThrow( List<NodeHandle> aliceAndBob = aliceAndBob(driver);
driver.startNode(new NodeParameters().withProvidedName(TestConstants.ALICE_NAME)), NodeHandle alice = aliceAndBob.get(0);
Duration.of(1, ChronoUnit.MINUTES) NodeHandle bob = aliceAndBob.get(1);
);
NodeHandle bob = KotlinUtilsKt.getOrThrow(
driver.startNode(new NodeParameters().withProvidedName(TestConstants.BOB_NAME)),
Duration.of(1, ChronoUnit.MINUTES)
);
KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic( KotlinUtilsKt.getOrThrow(alice.getRpc().startFlowDynamic(
FlowWithExternalOperationThatGetsRetriedInJava.class, FlowWithExternalOperationThatGetsRetriedInJava.class,
TestUtils.singleIdentity(bob.getNodeInfo()) TestUtils.singleIdentity(bob.getNodeInfo())
@ -190,4 +179,15 @@ public class FlowExternalOperationInJavaTest extends AbstractFlowExternalOperati
return operation.apply(futureService, deduplicationId); return operation.apply(futureService, deduplicationId);
} }
} }
private List<NodeHandle> aliceAndBob(DriverDSL driver) {
return Arrays.asList(TestConstants.ALICE_NAME, TestConstants.BOB_NAME)
.stream()
.map(nm -> driver.startNode(new NodeParameters().withProvidedName(nm)))
.collect(Collectors.toList())
.stream()
.map(future -> KotlinUtilsKt.getOrThrow(future,
Duration.of(1, ChronoUnit.MINUTES)))
.collect(Collectors.toList());
}
} }

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.HospitalizeFlowException import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes import net.corda.core.utilities.minutes
@ -24,8 +25,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation`() { fun `external async operation`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
alice.rpc.startFlow(::FlowWithExternalAsyncOperation, bob.nodeInfo.singleIdentity()) alice.rpc.startFlow(::FlowWithExternalAsyncOperation, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(1.minutes) .returnValue.getOrThrow(1.minutes)
assertHospitalCounters(0, 0) assertHospitalCounters(0, 0)
@ -35,8 +38,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation that checks deduplicationId is not rerun when flow is retried`() { fun `external async operation that checks deduplicationId is not rerun when flow is retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
assertFailsWith<DuplicatedProcessException> { assertFailsWith<DuplicatedProcessException> {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalAsyncOperationWithDeduplication, ::FlowWithExternalAsyncOperationWithDeduplication,
@ -50,8 +55,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation propagates exception to calling flow`() { fun `external async operation propagates exception to calling flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
assertFailsWith<MyCordaException> { assertFailsWith<MyCordaException> {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalAsyncOperationPropagatesException, ::FlowWithExternalAsyncOperationPropagatesException,
@ -66,8 +73,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation exception can be caught in flow`() { fun `external async operation exception can be caught in flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
val result = alice.rpc.startFlow( val result = alice.rpc.startFlow(
::FlowWithExternalAsyncOperationThatThrowsExceptionAndCaughtInFlow, ::FlowWithExternalAsyncOperationThatThrowsExceptionAndCaughtInFlow,
bob.nodeInfo.singleIdentity() bob.nodeInfo.singleIdentity()
@ -80,8 +89,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation with exception that hospital keeps for observation does not fail`() { fun `external async operation with exception that hospital keeps for observation does not fail`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
blockUntilFlowKeptInForObservation { blockUntilFlowKeptInForObservation {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalAsyncOperationPropagatesException, ::FlowWithExternalAsyncOperationPropagatesException,
@ -96,8 +107,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation with exception that hospital discharges is retried and runs the future again`() { fun `external async operation with exception that hospital discharges is retried and runs the future again`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
blockUntilFlowKeptInForObservation { blockUntilFlowKeptInForObservation {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalAsyncOperationPropagatesException, ::FlowWithExternalAsyncOperationPropagatesException,
@ -112,8 +125,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation that throws exception rather than completing future exceptionally fails with internal exception`() { fun `external async operation that throws exception rather than completing future exceptionally fails with internal exception`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
assertFailsWith<StateTransitionException> { assertFailsWith<StateTransitionException> {
alice.rpc.startFlow(::FlowWithExternalAsyncOperationUnhandledException, bob.nodeInfo.singleIdentity()) alice.rpc.startFlow(::FlowWithExternalAsyncOperationUnhandledException, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(1.minutes) .returnValue.getOrThrow(1.minutes)
@ -125,8 +140,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation that passes serviceHub into process can be retried`() { fun `external async operation that passes serviceHub into process can be retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
blockUntilFlowKeptInForObservation { blockUntilFlowKeptInForObservation {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalAsyncOperationThatPassesInServiceHubCanRetry, ::FlowWithExternalAsyncOperationThatPassesInServiceHubCanRetry,
@ -140,8 +157,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation that accesses serviceHub from flow directly will fail when retried`() { fun `external async operation that accesses serviceHub from flow directly will fail when retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
assertFailsWith<DirectlyAccessedServiceHubException> { assertFailsWith<DirectlyAccessedServiceHubException> {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalAsyncOperationThatDirectlyAccessesServiceHubFailsRetry, ::FlowWithExternalAsyncOperationThatDirectlyAccessesServiceHubFailsRetry,
@ -155,8 +174,10 @@ class FlowExternalAsyncOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `starting multiple futures and joining on their results`() { fun `starting multiple futures and joining on their results`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
alice.rpc.startFlow(::FlowThatStartsMultipleFuturesAndJoins, bob.nodeInfo.singleIdentity()).returnValue.getOrThrow(1.minutes) alice.rpc.startFlow(::FlowThatStartsMultipleFuturesAndJoins, bob.nodeInfo.singleIdentity()).returnValue.getOrThrow(1.minutes)
assertHospitalCounters(0, 0) assertHospitalCounters(0, 0)
} }

View File

@ -3,6 +3,7 @@ package net.corda.coretests.flows
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes import net.corda.core.utilities.minutes
@ -18,8 +19,10 @@ class FlowExternalOperationStartFlowTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `starting a flow inside of a flow that starts a future will succeed`() { fun `starting a flow inside of a flow that starts a future will succeed`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
alice.rpc.startFlow(::FlowThatStartsAnotherFlowInAnExternalOperation, bob.nodeInfo.singleIdentity()) alice.rpc.startFlow(::FlowThatStartsAnotherFlowInAnExternalOperation, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(1.minutes) .returnValue.getOrThrow(1.minutes)
assertHospitalCounters(0, 0) assertHospitalCounters(0, 0)
@ -29,8 +32,10 @@ class FlowExternalOperationStartFlowTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `multiple flows can be started and their futures joined from inside a flow`() { fun `multiple flows can be started and their futures joined from inside a flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
alice.rpc.startFlow(::ForkJoinFlows, bob.nodeInfo.singleIdentity()) alice.rpc.startFlow(::ForkJoinFlows, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(1.minutes) .returnValue.getOrThrow(1.minutes)
assertHospitalCounters(0, 0) assertHospitalCounters(0, 0)

View File

@ -5,6 +5,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.HospitalizeFlowException import net.corda.core.flows.HospitalizeFlowException
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.packageName import net.corda.core.internal.packageName
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.services.queryBy import net.corda.core.node.services.queryBy
@ -29,8 +30,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external operation`() { fun `external operation`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
alice.rpc.startFlow(::FlowWithExternalOperation, bob.nodeInfo.singleIdentity()) alice.rpc.startFlow(::FlowWithExternalOperation, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(1.minutes) .returnValue.getOrThrow(1.minutes)
assertHospitalCounters(0, 0) assertHospitalCounters(0, 0)
@ -40,8 +43,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external operation that checks deduplicationId is not rerun when flow is retried`() { fun `external operation that checks deduplicationId is not rerun when flow is retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
assertFailsWith<DuplicatedProcessException> { assertFailsWith<DuplicatedProcessException> {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalOperationWithDeduplication, ::FlowWithExternalOperationWithDeduplication,
@ -55,8 +60,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external operation propagates exception to calling flow`() { fun `external operation propagates exception to calling flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
assertFailsWith<MyCordaException> { assertFailsWith<MyCordaException> {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalOperationPropagatesException, ::FlowWithExternalOperationPropagatesException,
@ -71,8 +78,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external operation exception can be caught in flow`() { fun `external operation exception can be caught in flow`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
alice.rpc.startFlow(::FlowWithExternalOperationThatThrowsExceptionAndCaughtInFlow, bob.nodeInfo.singleIdentity()) alice.rpc.startFlow(::FlowWithExternalOperationThatThrowsExceptionAndCaughtInFlow, bob.nodeInfo.singleIdentity())
.returnValue.getOrThrow(1.minutes) .returnValue.getOrThrow(1.minutes)
assertHospitalCounters(0, 0) assertHospitalCounters(0, 0)
@ -82,8 +91,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external operation with exception that hospital keeps for observation does not fail`() { fun `external operation with exception that hospital keeps for observation does not fail`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
blockUntilFlowKeptInForObservation { blockUntilFlowKeptInForObservation {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalOperationPropagatesException, ::FlowWithExternalOperationPropagatesException,
@ -98,8 +109,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external operation with exception that hospital discharges is retried and runs the external operation again`() { fun `external operation with exception that hospital discharges is retried and runs the external operation again`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
blockUntilFlowKeptInForObservation { blockUntilFlowKeptInForObservation {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalOperationPropagatesException, ::FlowWithExternalOperationPropagatesException,
@ -114,8 +127,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation that passes serviceHub into process can be retried`() { fun `external async operation that passes serviceHub into process can be retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
blockUntilFlowKeptInForObservation { blockUntilFlowKeptInForObservation {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalOperationThatPassesInServiceHubCanRetry, ::FlowWithExternalOperationThatPassesInServiceHubCanRetry,
@ -129,8 +144,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external async operation that accesses serviceHub from flow directly will fail when retried`() { fun `external async operation that accesses serviceHub from flow directly will fail when retried`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
assertFailsWith<DirectlyAccessedServiceHubException> { assertFailsWith<DirectlyAccessedServiceHubException> {
alice.rpc.startFlow( alice.rpc.startFlow(
::FlowWithExternalOperationThatDirectlyAccessesServiceHubFailsRetry, ::FlowWithExternalOperationThatDirectlyAccessesServiceHubFailsRetry,
@ -199,8 +216,10 @@ class FlowExternalOperationTest : AbstractFlowExternalOperationTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `external operation can be retried when an error occurs inside of database transaction`() { fun `external operation can be retried when an error occurs inside of database transaction`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
val success = alice.rpc.startFlow( val success = alice.rpc.startFlow(
::FlowWithExternalOperationThatErrorsInsideOfDatabaseTransaction, ::FlowWithExternalOperationThatErrorsInsideOfDatabaseTransaction,
bob.nodeInfo.singleIdentity() bob.nodeInfo.singleIdentity()

View File

@ -10,6 +10,7 @@ import net.corda.core.flows.StartableByRPC
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes import net.corda.core.utilities.minutes
@ -56,9 +57,10 @@ class FlowIsKilledTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `manually handled killed flows propagate error to counter parties`() { fun `manually handled killed flows propagate error to counter parties`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
val charlie = startNode(providedName = CHARLIE_NAME).getOrThrow() .transpose()
.getOrThrow()
alice.rpc.let { rpc -> alice.rpc.let { rpc ->
val handle = rpc.startFlow( val handle = rpc.startFlow(
::AFlowThatWantsToDieAndKillsItsFriends, ::AFlowThatWantsToDieAndKillsItsFriends,
@ -85,8 +87,11 @@ class FlowIsKilledTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `a manually killed initiated flow will propagate the killed error to the initiator and its counter parties`() { fun `a manually killed initiated flow will propagate the killed error to the initiator and its counter parties`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
val handle = alice.rpc.startFlow( val handle = alice.rpc.startFlow(
::AFlowThatGetsMurderedByItsFriend, ::AFlowThatGetsMurderedByItsFriend,
bob.nodeInfo.singleIdentity() bob.nodeInfo.singleIdentity()

View File

@ -7,6 +7,7 @@ import net.corda.core.flows.InitiatedBy
import net.corda.core.flows.InitiatingFlow import net.corda.core.flows.InitiatingFlow
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.minutes import net.corda.core.utilities.minutes
@ -53,8 +54,10 @@ class FlowSleepTest {
fun `flow can sleep and perform other suspending functions`() { fun `flow can sleep and perform other suspending functions`() {
// ensures that events received while the flow is sleeping are not processed // ensures that events received while the flow is sleeping are not processed
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
val (start, finish) = alice.rpc.startFlow( val (start, finish) = alice.rpc.startFlow(
::SleepAndInteractWithPartyFlow, ::SleepAndInteractWithPartyFlow,
bob.nodeInfo.singleIdentity() bob.nodeInfo.singleIdentity()

View File

@ -23,8 +23,10 @@ class NodesStartStopSingleVmTests(@Suppress("unused") private val iteration: Int
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun nodesStartStop() { fun nodesStartStop() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
startNode(providedName = ALICE_NAME).getOrThrow() val alice = startNode(providedName = ALICE_NAME)
startNode(providedName = BOB_NAME).getOrThrow() val bob = startNode(providedName = BOB_NAME)
alice.getOrThrow()
bob.getOrThrow()
} }
} }
} }

View File

@ -1,5 +1,6 @@
package net.corda.node.logging package net.corda.node.logging
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.OpaqueBytes
@ -22,8 +23,10 @@ class IssueCashLoggingTests {
fun `issuing and sending cash as payment do not result in duplicate insertion warnings`() { fun `issuing and sending cash as payment do not result in duplicate insertion warnings`() {
val user = User("mark", "dadada", setOf(all())) val user = User("mark", "dadada", setOf(all()))
driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) { driver(DriverParameters(cordappsForAllNodes = FINANCE_CORDAPPS)) {
val nodeA = startNode(rpcUsers = listOf(user)).getOrThrow() val (nodeA, nodeB) = listOf(startNode(rpcUsers = listOf(user)),
val nodeB = startNode().getOrThrow() startNode())
.transpose()
.getOrThrow()
val amount = 1.DOLLARS val amount = 1.DOLLARS
val ref = OpaqueBytes.of(0) val ref = OpaqueBytes.of(0)

View File

@ -62,30 +62,49 @@ abstract class StateMachineErrorHandlingTest {
} }
} }
internal fun DriverDSL.createBytemanNode( internal fun DriverDSL.createBytemanNode(nodeProvidedName: CordaX500Name): Pair<NodeHandle, Int> {
providedName: CordaX500Name, val port = nextPort()
val bytemanNodeHandle = (this as InternalDriverDSL).startNode(
NodeParameters(
providedName = nodeProvidedName,
rpcUsers = listOf(rpcUser)
),
bytemanPort = port
)
return bytemanNodeHandle.getOrThrow() to port
}
internal fun DriverDSL.createNode(nodeProvidedName: CordaX500Name): NodeHandle {
return (this as InternalDriverDSL).startNode(
NodeParameters(
providedName = nodeProvidedName,
rpcUsers = listOf(rpcUser)
)
).getOrThrow()
}
internal fun DriverDSL.createNodeAndBytemanNode(
nodeProvidedName: CordaX500Name,
bytemanNodeProvidedName: CordaX500Name,
additionalCordapps: Collection<TestCordapp> = emptyList() additionalCordapps: Collection<TestCordapp> = emptyList()
): Pair<NodeHandle, Int> { ): Triple<NodeHandle, NodeHandle, Int> {
val port = nextPort() val port = nextPort()
val nodeHandle = (this as InternalDriverDSL).startNode( val nodeHandle = (this as InternalDriverDSL).startNode(
NodeParameters( NodeParameters(
providedName = providedName, providedName = nodeProvidedName,
rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps
)
)
val bytemanNodeHandle = startNode(
NodeParameters(
providedName = bytemanNodeProvidedName,
rpcUsers = listOf(rpcUser), rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps additionalCordapps = additionalCordapps
), ),
bytemanPort = port bytemanPort = port
).getOrThrow()
return nodeHandle to port
}
internal fun DriverDSL.createNode(providedName: CordaX500Name, additionalCordapps: Collection<TestCordapp> = emptyList()): NodeHandle {
return startNode(
NodeParameters(
providedName = providedName,
rpcUsers = listOf(rpcUser),
additionalCordapps = additionalCordapps
) )
).getOrThrow() return Triple(nodeHandle.getOrThrow(), bytemanNodeHandle.getOrThrow(), port)
} }
internal fun submitBytemanRules(rules: String, port: Int) { internal fun submitBytemanRules(rules: String, port: Int) {

View File

@ -35,8 +35,7 @@ class StateMachineFinalityErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() { fun `error recording a transaction inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) { startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS) val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work // could not get rule for FinalityDoctor + observation counter to work
val rules = """ val rules = """
@ -97,8 +96,7 @@ class StateMachineFinalityErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error resolving a transaction's dependencies inside of ReceiveFinalityFlow will keep the flow in for observation`() { fun `error resolving a transaction's dependencies inside of ReceiveFinalityFlow will keep the flow in for observation`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) { startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS) val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
// could not get rule for FinalityDoctor + observation counter to work // could not get rule for FinalityDoctor + observation counter to work
val rules = """ val rules = """
@ -161,8 +159,7 @@ class StateMachineFinalityErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and complete successfully`() { fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and complete successfully`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) { startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS) val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -229,8 +226,7 @@ class StateMachineFinalityErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and be kept for observation is error persists`() { fun `error during transition with CommitTransaction action while receiving a transaction inside of ReceiveFinalityFlow will be retried and be kept for observation is error persists`() {
startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) { startDriver(notarySpec = NotarySpec(DUMMY_NOTARY_NAME, validating = false)) {
val (charlie, port) = createBytemanNode(CHARLIE_NAME, FINANCE_CORDAPPS) val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME, FINANCE_CORDAPPS)
val alice = createNode(ALICE_NAME, FINANCE_CORDAPPS)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter

View File

@ -40,8 +40,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() { fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -88,8 +87,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `unexpected error during flow initialisation throws exception to client`() { fun `unexpected error during flow initialisation throws exception to client`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
CLASS ${FlowStateMachineImpl::class.java.name} CLASS ${FlowStateMachineImpl::class.java.name}
@ -134,8 +132,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during initialisation when trying to rollback the flow's database transaction the flow is able to retry and complete successfully`() { fun `error during initialisation when trying to rollback the flow's database transaction the flow is able to retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -187,8 +184,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during initialisation when trying to close the flow's database transaction the flow is able to retry and complete successfully`() { fun `error during initialisation when trying to close the flow's database transaction the flow is able to retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -242,8 +238,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() { fun `error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -298,8 +293,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during retrying a flow that failed when committing its original checkpoint will retry the flow again and complete successfully`() { fun `error during retrying a flow that failed when committing its original checkpoint will retry the flow again and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Throw exception on executeCommitTransaction action after first suspend + commit RULE Throw exception on executeCommitTransaction action after first suspend + commit
@ -351,8 +345,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() { fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and complete successfully`() {
startDriver { startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME) val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -400,8 +393,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() { fun `responding flow - error during transition with CommitTransaction action that occurs during flow initialisation will retry and be kept for observation if error persists`() {
startDriver { startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME) val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -464,8 +456,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `responding flow - session init can be retried when there is a transient connection error to the database`() { fun `responding flow - session init can be retried when there is a transient connection error to the database`() {
startDriver { startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME) val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -529,8 +520,7 @@ class StateMachineFlowInitErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() { fun `responding flow - session init can be retried when there is a transient connection error to the database goes to observation if error persists`() {
startDriver { startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME) val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter

View File

@ -35,8 +35,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() { fun `error during transition with SendInitial action is retried 3 times and kept for observation if error persists`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -87,8 +86,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() { fun `error during transition with SendInitial action that does not persist will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -135,8 +133,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() { fun `error during transition with AcknowledgeMessages action is swallowed and flow completes successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Set flag when inside executeAcknowledgeMessages RULE Set flag when inside executeAcknowledgeMessages
@ -230,8 +227,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during flow retry when executing retryFlowFromSafePoint the flow is able to retry and recover`() { fun `error during flow retry when executing retryFlowFromSafePoint the flow is able to retry and recover`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Set flag when executing first suspend RULE Set flag when executing first suspend
@ -296,8 +292,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() { fun `error during transition with CommitTransaction action that occurs after the first suspend will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
// seems to be restarting the flow from the beginning every time // seems to be restarting the flow from the beginning every time
val rules = """ val rules = """
@ -362,8 +357,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { fun `error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
// seems to be restarting the flow from the beginning every time // seems to be restarting the flow from the beginning every time
val rules = """ val rules = """
@ -419,8 +413,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() { fun `error during transition with CommitTransaction action and ConstraintViolationException that occurs when completing a flow will retry and be kept for observation if error persists`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -488,8 +481,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `flow can be retried when there is a transient connection error to the database`() { fun `flow can be retried when there is a transient connection error to the database`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -552,8 +544,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() { fun `flow can be retried when there is a transient connection error to the database goes to observation if error persists`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -610,8 +601,7 @@ class StateMachineGeneralErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() { fun `responding flow - error during transition with CommitTransaction action that occurs when completing a flow and deleting its checkpoint will retry and complete successfully`() {
startDriver { startDriver {
val (charlie, port) = createBytemanNode(CHARLIE_NAME) val (alice, charlie, port) = createNodeAndBytemanNode(ALICE_NAME, CHARLIE_NAME)
val alice = createNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter

View File

@ -103,8 +103,7 @@ class StateMachineKillFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `flow killed when it is in the flow hospital for observation is removed correctly`() { fun `flow killed when it is in the flow hospital for observation is removed correctly`() {
startDriver { startDriver {
val (alice, port) = createBytemanNode(ALICE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val charlie = createNode(CHARLIE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter

View File

@ -40,8 +40,7 @@ class StateMachineSubFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() { fun `initiating subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -119,8 +118,7 @@ class StateMachineSubFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `initiating subflow - error during transition with CommitTransaction action that occurs after the first receive will retry and complete successfully`() { fun `initiating subflow - error during transition with CommitTransaction action that occurs after the first receive will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -190,8 +188,7 @@ class StateMachineSubFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() { fun `inline subflow - error during transition with CommitTransaction action that occurs during the first send will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter
@ -253,8 +250,7 @@ class StateMachineSubFlowErrorHandlingTest : StateMachineErrorHandlingTest() {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `inline subflow - error during transition with CommitTransaction action that occurs during the first receive will retry and complete successfully`() { fun `inline subflow - error during transition with CommitTransaction action that occurs during the first receive will retry and complete successfully`() {
startDriver { startDriver {
val charlie = createNode(CHARLIE_NAME) val (charlie, alice, port) = createNodeAndBytemanNode(CHARLIE_NAME, ALICE_NAME)
val (alice, port) = createBytemanNode(ALICE_NAME)
val rules = """ val rules = """
RULE Create Counter RULE Create Counter

View File

@ -12,6 +12,7 @@ import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.SignTransactionFlow import net.corda.core.flows.SignTransactionFlow
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub import net.corda.core.node.AppServiceHub
import net.corda.core.node.services.CordaService import net.corda.core.node.services.CordaService
@ -318,8 +319,10 @@ class FlowEntityManagerTest : AbstractFlowEntityManagerTest() {
StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter } StaffedFlowHospital.onFlowDischarged.add { _, _ -> ++counter }
driver(DriverParameters(startNodesInProcess = true)) { driver(DriverParameters(startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
val txId = val txId =
alice.rpc.startFlow(::EntityManagerWithFlushCatchAndInteractWithOtherPartyFlow, bob.nodeInfo.singleIdentity()) alice.rpc.startFlow(::EntityManagerWithFlushCatchAndInteractWithOtherPartyFlow, bob.nodeInfo.singleIdentity())

View File

@ -3,6 +3,7 @@ package net.corda.node.flows
import co.paralleluniverse.fibers.Suspendable import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.* import net.corda.core.flows.*
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
@ -68,14 +69,14 @@ class FlowOverrideTests {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `should use the most specific implementation of a responding flow`() { fun `should use the most specific implementation of a responding flow`() {
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode(NodeParameters( val (nodeA, nodeB) = listOf(ALICE_NAME, BOB_NAME)
providedName = ALICE_NAME, .map {
additionalCordapps = setOf(cordappForClasses(*nodeAClasses.toTypedArray())) NodeParameters(providedName = it,
)).getOrThrow() additionalCordapps = setOf(cordappForClasses(*nodeAClasses.toTypedArray())))
val nodeB = startNode(NodeParameters( }
providedName = BOB_NAME, .map { startNode(it) }
additionalCordapps = setOf(cordappForClasses(*nodeBClasses.toTypedArray())) .transpose()
)).getOrThrow() .getOrThrow()
assertThat(nodeB.rpc.startFlow(::Ping, nodeA.nodeInfo.singleIdentity()).returnValue.getOrThrow(), `is`(Pongiest.GORGONZOLA)) assertThat(nodeB.rpc.startFlow(::Ping, nodeA.nodeInfo.singleIdentity()).returnValue.getOrThrow(), `is`(Pongiest.GORGONZOLA))
} }
} }
@ -84,17 +85,16 @@ class FlowOverrideTests {
fun `should use the overriden implementation of a responding flow`() { fun `should use the overriden implementation of a responding flow`() {
val flowOverrides = mapOf(Ping::class.java to Pong::class.java) val flowOverrides = mapOf(Ping::class.java to Pong::class.java)
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) { driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = emptySet())) {
val nodeA = startNode(NodeParameters( val (nodeA, nodeB) = listOf(ALICE_NAME, BOB_NAME)
providedName = ALICE_NAME, .map {
additionalCordapps = setOf(cordappForClasses(*nodeAClasses.toTypedArray())), NodeParameters(providedName = it,
flowOverrides = flowOverrides flowOverrides = flowOverrides,
)).getOrThrow() additionalCordapps = setOf(cordappForClasses(*nodeAClasses.toTypedArray())))
val nodeB = startNode(NodeParameters( }
providedName = BOB_NAME, .map { startNode(it) }
additionalCordapps = setOf(cordappForClasses(*nodeBClasses.toTypedArray())) .transpose()
)).getOrThrow() .getOrThrow()
assertThat(nodeB.rpc.startFlow(::Ping, nodeA.nodeInfo.singleIdentity()).returnValue.getOrThrow(), `is`(Pong.PONG)) assertThat(nodeB.rpc.startFlow(::Ping, nodeA.nodeInfo.singleIdentity()).returnValue.getOrThrow(), `is`(Pong.PONG))
} }
} }
} }

View File

@ -7,6 +7,7 @@ import net.corda.core.CordaRuntimeException
import net.corda.core.flows.* import net.corda.core.flows.*
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.IdempotentFlow import net.corda.core.internal.IdempotentFlow
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.serialization.CordaSerializable import net.corda.core.serialization.CordaSerializable
import net.corda.core.utilities.ProgressTracker import net.corda.core.utilities.ProgressTracker
@ -66,8 +67,10 @@ class FlowRetryTest {
startNodesInProcess = isQuasarAgentSpecified(), startNodesInProcess = isQuasarAgentSpecified(),
notarySpecs = emptyList() notarySpecs = emptyList()
)) { )) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { val result = CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
it.proxy.startFlow(::InitiatorFlow, numSessions, numIterations, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow() it.proxy.startFlow(::InitiatorFlow, numSessions, numIterations, nodeBHandle.nodeInfo.singleIdentity()).returnValue.getOrThrow()
@ -134,8 +137,10 @@ class FlowRetryTest {
val user = User("mark", "dadada", setOf(Permissions.all())) val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertFailsWith<TimeoutException> { assertFailsWith<TimeoutException> {
it.proxy.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity()) it.proxy.startFlow(::TransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
@ -152,8 +157,10 @@ class FlowRetryTest {
val user = User("mark", "dadada", setOf(Permissions.all())) val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertFailsWith<TimeoutException> { assertFailsWith<TimeoutException> {
it.proxy.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity()) it.proxy.startFlow(::WrappedTransientConnectionFailureFlow, nodeBHandle.nodeInfo.singleIdentity())
@ -170,8 +177,10 @@ class FlowRetryTest {
val user = User("mark", "dadada", setOf(Permissions.all())) val user = User("mark", "dadada", setOf(Permissions.all()))
driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) { driver(DriverParameters(isDebug = true, startNodesInProcess = isQuasarAgentSpecified())) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use { CordaRPCClient(nodeAHandle.rpcAddress, config).start(user.username, user.password).use {
assertFailsWith<CordaRuntimeException> { assertFailsWith<CordaRuntimeException> {

View File

@ -14,6 +14,7 @@ import net.corda.core.flows.StateMachineRunId
import net.corda.core.flows.UnexpectedFlowEndException import net.corda.core.flows.UnexpectedFlowEndException
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.CordaRPCOps import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.services.StatesNotAvailableException import net.corda.core.node.services.StatesNotAvailableException
@ -68,9 +69,10 @@ class KillFlowTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `a killed flow will propagate the killed error to counter parties when it reaches the next suspension point`() { fun `a killed flow will propagate the killed error to counter parties when it reaches the next suspension point`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
val charlie = startNode(providedName = CHARLIE_NAME).getOrThrow() .transpose()
.getOrThrow()
alice.rpc.let { rpc -> alice.rpc.let { rpc ->
val handle = rpc.startFlow( val handle = rpc.startFlow(
::AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends, ::AFlowThatGetsMurderedWhenItTriesToSuspendAndSomehowKillsItsFriends,
@ -118,8 +120,10 @@ class KillFlowTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `killing a flow suspended in send + receive + sendAndReceive ends the flow immediately`() { fun `killing a flow suspended in send + receive + sendAndReceive ends the flow immediately`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = false)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = false)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob) = listOf(ALICE_NAME, BOB_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
.transpose()
.getOrThrow()
val bobParty = bob.nodeInfo.singleIdentity() val bobParty = bob.nodeInfo.singleIdentity()
bob.stop() bob.stop()
val terminated = (bob as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS) val terminated = (bob as OutOfProcess).process.waitFor(30, TimeUnit.SECONDS)
@ -192,9 +196,10 @@ class KillFlowTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `a killed flow will propagate the killed error to counter parties if it was suspended`() { fun `a killed flow will propagate the killed error to counter parties if it was suspended`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
val charlie = startNode(providedName = CHARLIE_NAME).getOrThrow() .transpose()
.getOrThrow()
alice.rpc.let { rpc -> alice.rpc.let { rpc ->
val handle = rpc.startFlow( val handle = rpc.startFlow(
::AFlowThatGetsMurderedAndSomehowKillsItsFriends, ::AFlowThatGetsMurderedAndSomehowKillsItsFriends,
@ -224,9 +229,10 @@ class KillFlowTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `a killed initiated flow will propagate the killed error to the initiator and its counter parties`() { fun `a killed initiated flow will propagate the killed error to the initiator and its counter parties`() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) { driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
val alice = startNode(providedName = ALICE_NAME).getOrThrow() val (alice, bob, charlie) = listOf(ALICE_NAME, BOB_NAME, CHARLIE_NAME)
val bob = startNode(providedName = BOB_NAME).getOrThrow() .map { startNode(providedName = it) }
val charlie = startNode(providedName = CHARLIE_NAME).getOrThrow() .transpose()
.getOrThrow()
val handle = alice.rpc.startFlow( val handle = alice.rpc.startFlow(
::AFlowThatGetsMurderedByItsFriend, ::AFlowThatGetsMurderedByItsFriend,
listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity()) listOf(bob.nodeInfo.singleIdentity(), charlie.nodeInfo.singleIdentity())

View File

@ -7,6 +7,7 @@ import net.corda.core.contracts.Command
import net.corda.core.contracts.StateAndContract import net.corda.core.contracts.StateAndContract
import net.corda.core.flows.* import net.corda.core.flows.*
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.packageName import net.corda.core.internal.packageName
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.transactions.SignedTransaction import net.corda.core.transactions.SignedTransaction
@ -57,8 +58,10 @@ class FlowsDrainingModeContentionTest {
portAllocation = portAllocation, portAllocation = portAllocation,
extraCordappPackagesToScan = listOf(MessageState::class.packageName) extraCordappPackagesToScan = listOf(MessageState::class.packageName)
)) { )) {
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow() val (nodeA, nodeB) = listOf(ALICE_NAME, BOB_NAME)
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow() .map { startNode(providedName = it, rpcUsers = users) }
.transpose()
.getOrThrow()
val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password) val nodeARpcInfo = RpcInfo(nodeA.rpcAddress, user.username, user.password)
val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity) val flow = nodeA.rpc.startFlow(::ProposeTransactionAndWaitForCommit, message, nodeARpcInfo, nodeB.nodeInfo.singleIdentity(), defaultNotaryIdentity)

View File

@ -4,6 +4,7 @@ import co.paralleluniverse.fibers.Suspendable
import net.corda.core.flows.* import net.corda.core.flows.*
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.map import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.contextLogger import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
@ -53,8 +54,11 @@ class P2PFlowsDrainingModeTest {
@Test(timeout=300_000) @Test(timeout=300_000)
fun `flows draining mode suspends consumption of initial session messages`() { fun `flows draining mode suspends consumption of initial session messages`() {
driver(DriverParameters(startNodesInProcess = false, portAllocation = portAllocation, notarySpecs = emptyList())) { driver(DriverParameters(startNodesInProcess = false, portAllocation = portAllocation, notarySpecs = emptyList())) {
val initiatedNode = startNode(providedName = ALICE_NAME).getOrThrow() val (initiatedNode, bob) = listOf(ALICE_NAME, BOB_NAME)
val initiating = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow().rpc .map { startNode(providedName = it, rpcUsers = users) }
.transpose()
.getOrThrow()
val initiating = bob.rpc
val counterParty = initiatedNode.nodeInfo.singleIdentity() val counterParty = initiatedNode.nodeInfo.singleIdentity()
val initiated = initiatedNode.rpc val initiated = initiatedNode.rpc
@ -85,8 +89,10 @@ class P2PFlowsDrainingModeTest {
driver(DriverParameters(portAllocation = portAllocation, notarySpecs = emptyList())) { driver(DriverParameters(portAllocation = portAllocation, notarySpecs = emptyList())) {
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow() val (nodeA, nodeB) = listOf(ALICE_NAME, BOB_NAME)
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow() .map { startNode(providedName = it, rpcUsers = users) }
.transpose()
.getOrThrow()
var successful = false var successful = false
val latch = CountDownLatch(1) val latch = CountDownLatch(1)
@ -133,8 +139,10 @@ class P2PFlowsDrainingModeTest {
driver(DriverParameters(portAllocation = portAllocation, notarySpecs = emptyList())) { driver(DriverParameters(portAllocation = portAllocation, notarySpecs = emptyList())) {
val nodeA = startNode(providedName = ALICE_NAME, rpcUsers = users).getOrThrow() val (nodeA, nodeB) = listOf(ALICE_NAME, BOB_NAME)
val nodeB = startNode(providedName = BOB_NAME, rpcUsers = users).getOrThrow() .map { startNode(providedName = it, rpcUsers = users) }
.transpose()
.getOrThrow()
var successful = false var successful = false
val latch = CountDownLatch(1) val latch = CountDownLatch(1)

View File

@ -5,6 +5,7 @@ import net.corda.core.CordaRuntimeException
import net.corda.core.flows.* import net.corda.core.flows.*
import net.corda.core.identity.CordaX500Name import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.getOrThrow import net.corda.core.utilities.getOrThrow
import net.corda.core.utilities.unwrap import net.corda.core.utilities.unwrap
@ -58,8 +59,10 @@ class RpcExceptionHandlingTest {
} }
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()))) { driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()))) {
val devModeNode = startNode(params, BOB_NAME).getOrThrow() val (devModeNode, node) = listOf(startNode(params, BOB_NAME),
val node = startNode(ALICE_NAME, devMode = false, parameters = params).getOrThrow() startNode(ALICE_NAME, devMode = false, parameters = params))
.transpose()
.getOrThrow()
assertThatThrownExceptionIsReceivedUnwrapped(devModeNode) assertThatThrownExceptionIsReceivedUnwrapped(devModeNode)
assertThatThrownExceptionIsReceivedUnwrapped(node) assertThatThrownExceptionIsReceivedUnwrapped(node)
@ -77,8 +80,10 @@ class RpcExceptionHandlingTest {
} }
driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()))) { driver(DriverParameters(startNodesInProcess = true, notarySpecs = emptyList(), cordappsForAllNodes = listOf(enclosedCordapp()))) {
val devModeNode = startNode(params, BOB_NAME).getOrThrow() val (devModeNode, node) = listOf(startNode(params, BOB_NAME),
val node = startNode(ALICE_NAME, devMode = false, parameters = params).getOrThrow() startNode(ALICE_NAME, devMode = false, parameters = params))
.transpose()
.getOrThrow()
assertThatThrownBy { devModeNode.throwExceptionFromFlow() }.isInstanceOfSatisfying(FlowException::class.java) { exception -> assertThatThrownBy { devModeNode.throwExceptionFromFlow() }.isInstanceOfSatisfying(FlowException::class.java) { exception ->
assertThat(exception).hasNoCause() assertThat(exception).hasNoCause()
@ -102,8 +107,10 @@ class RpcExceptionHandlingTest {
fun DriverDSL.scenario(nameA: CordaX500Name, nameB: CordaX500Name, devMode: Boolean) { fun DriverDSL.scenario(nameA: CordaX500Name, nameB: CordaX500Name, devMode: Boolean) {
val nodeA = startNode(nameA, devMode, params).getOrThrow() val (nodeA, nodeB) = listOf(nameA, nameB)
val nodeB = startNode(nameB, devMode, params).getOrThrow() .map { startNode(it, devMode, params) }
.transpose()
.getOrThrow()
nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow() nodeA.rpc.startFlow(::InitFlow, nodeB.nodeInfo.singleIdentity()).returnValue.getOrThrow()
} }

View File

@ -15,6 +15,7 @@ import net.corda.core.flows.NotaryException
import net.corda.core.flows.ReceiveFinalityFlow import net.corda.core.flows.ReceiveFinalityFlow
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.StateMachineUpdate import net.corda.core.messaging.StateMachineUpdate
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.utilities.OpaqueBytes import net.corda.core.utilities.OpaqueBytes
@ -49,11 +50,17 @@ class FlowHospitalTest {
@Test(timeout = 300_000) @Test(timeout = 300_000)
fun `when double spend occurs, the flow is successfully deleted on the counterparty`() { fun `when double spend occurs, the flow is successfully deleted on the counterparty`() {
driver(DriverParameters(cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts")))) { driver(DriverParameters(cordappsForAllNodes = listOf(enclosedCordapp(), findCordapp("net.corda.testing.contracts")))) {
val charlie = startNode(providedName = CHARLIE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow() val (charlieClient, aliceClient) = listOf(CHARLIE_NAME, ALICE_NAME)
val alice = startNode(providedName = ALICE_NAME, rpcUsers = listOf(rpcUser)).getOrThrow() .map {
startNode(providedName = it,
val charlieClient = CordaRPCClient(charlie.rpcAddress).start(rpcUser.username, rpcUser.password).proxy rpcUsers = listOf(rpcUser))
val aliceClient = CordaRPCClient(alice.rpcAddress).start(rpcUser.username, rpcUser.password).proxy }
.transpose()
.getOrThrow()
.map {
CordaRPCClient(it.rpcAddress)
.start(rpcUser.username, rpcUser.password).proxy
}
val aliceParty = aliceClient.nodeInfo().legalIdentities.first() val aliceParty = aliceClient.nodeInfo().legalIdentities.first()
@ -201,7 +208,6 @@ class FlowHospitalTest {
val notarised = subFlow(FinalityFlow(signedTransaction, emptySet<FlowSession>())) val notarised = subFlow(FinalityFlow(signedTransaction, emptySet<FlowSession>()))
return notarised.coreTransaction.outRef(0) return notarised.coreTransaction.outRef(0)
} }
} }
@StartableByRPC @StartableByRPC
@ -216,7 +222,6 @@ class FlowHospitalTest {
sessionWithCounterParty.sendAndReceive<String>("initial-message") sessionWithCounterParty.sendAndReceive<String>("initial-message")
subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty))) subFlow(FinalityFlow(signedTransaction, setOf(sessionWithCounterParty)))
} }
} }
@InitiatedBy(SpendFlow::class) @InitiatedBy(SpendFlow::class)
@ -229,7 +234,6 @@ class FlowHospitalTest {
subFlow(ReceiveFinalityFlow(otherSide)) subFlow(ReceiveFinalityFlow(otherSide))
} }
} }
@StartableByRPC @StartableByRPC
@ -249,7 +253,6 @@ class FlowHospitalTest {
throw DoubleSpendException("double spend!", e) throw DoubleSpendException("double spend!", e)
} }
} }
} }
@InitiatedBy(SpendFlowWithCustomException::class) @InitiatedBy(SpendFlowWithCustomException::class)
@ -262,7 +265,6 @@ class FlowHospitalTest {
subFlow(ReceiveFinalityFlow(otherSide)) subFlow(ReceiveFinalityFlow(otherSide))
} }
} }
class DoubleSpendException(message: String, cause: Throwable) : FlowException(message, cause) class DoubleSpendException(message: String, cause: Throwable) : FlowException(message, cause)
@ -294,5 +296,4 @@ class FlowHospitalTest {
setCause(SQLException("deadlock")) setCause(SQLException("deadlock"))
} }
} }
} }

View File

@ -16,6 +16,7 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StartableByRPC import net.corda.core.flows.StartableByRPC
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.openFuture import net.corda.core.internal.concurrent.openFuture
import net.corda.core.internal.concurrent.transpose
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.services.Vault import net.corda.core.node.services.Vault
import net.corda.core.node.services.vault.QueryCriteria import net.corda.core.node.services.vault.QueryCriteria
@ -450,8 +451,11 @@ class VaultObserverExceptionTest {
findCordapp("com.r3.dbfailure.schemas") findCordapp("com.r3.dbfailure.schemas")
),inMemoryDB = false) ),inMemoryDB = false)
) { ) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (aliceNode, bobNode) = listOf(ALICE_NAME, BOB_NAME)
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it,
rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenConsumingState = { val startErrorInObservableWhenConsumingState = {
@ -540,8 +544,11 @@ class VaultObserverExceptionTest {
), ),
inMemoryDB = false) inMemoryDB = false)
) { ) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (aliceNode, bobNode) = listOf(ALICE_NAME, BOB_NAME)
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it,
rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenConsumingState = { val startErrorInObservableWhenConsumingState = {
@ -622,8 +629,11 @@ class VaultObserverExceptionTest {
), ),
inMemoryDB = false) inMemoryDB = false)
) { ) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (aliceNode, bobNode) = listOf(ALICE_NAME, BOB_NAME)
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it,
rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenCreatingSecondState = { val startErrorInObservableWhenCreatingSecondState = {
@ -699,8 +709,11 @@ class VaultObserverExceptionTest {
), ),
inMemoryDB = false) inMemoryDB = false)
) { ) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (aliceNode, bobNode) = listOf(ALICE_NAME, BOB_NAME)
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it,
rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first() val notary = defaultNotaryHandle.nodeHandles.getOrThrow().first()
val startErrorInObservableWhenConsumingState = { val startErrorInObservableWhenConsumingState = {

View File

@ -21,6 +21,7 @@ import net.corda.core.flows.StartableByService
import net.corda.core.flows.StateMachineRunId import net.corda.core.flows.StateMachineRunId
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.PLATFORM_VERSION import net.corda.core.internal.PLATFORM_VERSION
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.uncheckedCast import net.corda.core.internal.uncheckedCast
import net.corda.core.messaging.startFlow import net.corda.core.messaging.startFlow
import net.corda.core.node.AppServiceHub import net.corda.core.node.AppServiceHub
@ -74,8 +75,10 @@ class FlowMetadataRecordingTest {
fun `rpc started flows have metadata recorded`() { fun `rpc started flows have metadata recorded`() {
driver(DriverParameters(startNodesInProcess = true)) { driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
var flowId: StateMachineRunId? = null var flowId: StateMachineRunId? = null
var context: InvocationContext? = null var context: InvocationContext? = null
@ -162,8 +165,10 @@ class FlowMetadataRecordingTest {
fun `rpc started flows have their arguments removed from in-memory checkpoint after zero'th checkpoint`() { fun `rpc started flows have their arguments removed from in-memory checkpoint after zero'th checkpoint`() {
driver(DriverParameters(startNodesInProcess = true)) { driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
var context: InvocationContext? = null var context: InvocationContext? = null
var metadata: DBCheckpointStorage.DBFlowMetadata? = null var metadata: DBCheckpointStorage.DBFlowMetadata? = null
@ -214,8 +219,10 @@ class FlowMetadataRecordingTest {
fun `initiated flows have metadata recorded`() { fun `initiated flows have metadata recorded`() {
driver(DriverParameters(startNodesInProcess = true)) { driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
var flowId: StateMachineRunId? = null var flowId: StateMachineRunId? = null
var context: InvocationContext? = null var context: InvocationContext? = null
@ -260,8 +267,10 @@ class FlowMetadataRecordingTest {
fun `service started flows have metadata recorded`() { fun `service started flows have metadata recorded`() {
driver(DriverParameters(startNodesInProcess = true)) { driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
var flowId: StateMachineRunId? = null var flowId: StateMachineRunId? = null
var context: InvocationContext? = null var context: InvocationContext? = null
@ -306,8 +315,10 @@ class FlowMetadataRecordingTest {
fun `scheduled flows have metadata recorded`() { fun `scheduled flows have metadata recorded`() {
driver(DriverParameters(startNodesInProcess = true)) { driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
val lock = Semaphore(0) val lock = Semaphore(0)
@ -361,8 +372,10 @@ class FlowMetadataRecordingTest {
fun `flows have their finish time recorded when completed`() { fun `flows have their finish time recorded when completed`() {
driver(DriverParameters(startNodesInProcess = true)) { driver(DriverParameters(startNodesInProcess = true)) {
val nodeAHandle = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (nodeAHandle, nodeBHandle) = listOf(ALICE_NAME, BOB_NAME)
val nodeBHandle = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
var flowId: StateMachineRunId? = null var flowId: StateMachineRunId? = null
var metadata: DBCheckpointStorage.DBFlowMetadata? = null var metadata: DBCheckpointStorage.DBFlowMetadata? = null

View File

@ -16,6 +16,7 @@ import net.corda.core.crypto.SecureHash
import net.corda.core.flows.* import net.corda.core.flows.*
import net.corda.core.identity.AbstractParty import net.corda.core.identity.AbstractParty
import net.corda.core.identity.Party import net.corda.core.identity.Party
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.createDirectories import net.corda.core.internal.createDirectories
import net.corda.core.internal.div import net.corda.core.internal.div
import net.corda.core.internal.inputStream import net.corda.core.internal.inputStream
@ -364,8 +365,10 @@ class InteractiveShellIntegrationTest {
fun `dumpCheckpoints creates zip with json file for suspended flow`() { fun `dumpCheckpoints creates zip with json file for suspended flow`() {
val user = User("u", "p", setOf(all())) val user = User("u", "p", setOf(all()))
driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) { driver(DriverParameters(startNodesInProcess = true, cordappsForAllNodes = listOf(enclosedCordapp()))) {
val aliceNode = startNode(providedName = ALICE_NAME, rpcUsers = listOf(user)).getOrThrow() val (aliceNode, bobNode) = listOf(ALICE_NAME, BOB_NAME)
val bobNode = startNode(providedName = BOB_NAME, rpcUsers = listOf(user)).getOrThrow() .map { startNode(providedName = it, rpcUsers = listOf(user)) }
.transpose()
.getOrThrow()
bobNode.stop() bobNode.stop()
// Create logs directory since the driver is not creating it // Create logs directory since the driver is not creating it