diff --git a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt index be2870ece1..c3f16f84e9 100644 --- a/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt +++ b/node/src/test/kotlin/net/corda/node/services/statemachine/FlowFrameworkPersistenceTests.kt @@ -2,7 +2,6 @@ package net.corda.node.services.statemachine import net.corda.core.crypto.random63BitValue import net.corda.core.flows.FlowLogic -import net.corda.core.identity.Party import net.corda.core.utilities.getOrThrow import net.corda.node.services.persistence.checkpoints import net.corda.testing.core.ALICE_NAME @@ -12,14 +11,18 @@ import net.corda.testing.core.singleIdentity import net.corda.testing.flows.registerCordappFlowFactory import net.corda.testing.internal.LogHelper import net.corda.testing.node.InMemoryMessagingNetwork -import net.corda.testing.node.internal.* +import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP +import net.corda.testing.node.internal.InternalMockNetwork +import net.corda.testing.node.internal.InternalMockNodeParameters +import net.corda.testing.node.internal.MockNodeFlowManager +import net.corda.testing.node.internal.TestStartedNode +import net.corda.testing.node.internal.startFlow import org.assertj.core.api.Assertions.assertThat import org.junit.After import org.junit.Before import org.junit.Ignore import org.junit.Test import rx.Observable -import java.util.* import kotlin.test.assertEquals import kotlin.test.assertTrue @@ -31,14 +34,8 @@ class FlowFrameworkPersistenceTests { } private lateinit var mockNet: InternalMockNetwork - private val receivedSessionMessages = ArrayList() private lateinit var aliceNode: TestStartedNode private lateinit var bobNode: TestStartedNode - private lateinit var notaryIdentity: Party - private lateinit var alice: Party - private lateinit var bob: Party - private lateinit var aliceFlowManager: MockNodeFlowManager - private lateinit var bobFlowManager: MockNodeFlowManager @Before fun start() { @@ -46,24 +43,20 @@ class FlowFrameworkPersistenceTests { cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP), servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin() ) - aliceFlowManager = MockNodeFlowManager() - bobFlowManager = MockNodeFlowManager() - aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager)) - bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager)) - - receivedSessionMessagesObservable().forEach { receivedSessionMessages += it } - - // Extract identities - alice = aliceNode.info.singleIdentity() - bob = bobNode.info.singleIdentity() - notaryIdentity = mockNet.defaultNotaryIdentity + aliceNode = MockNodeFlowManager().let { aliceFlowManager -> + mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager)) + } + bobNode = MockNodeFlowManager().let { bobFlowManager -> + mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager)) + } } @After fun cleanUp() { - mockNet.stopNodes() - receivedSessionMessages.clear() + aliceNode.internals.manuallyCloseDB() + bobNode.internals.manuallyCloseDB() + mockNet.close() } @Test(timeout=300_000) @@ -76,6 +69,7 @@ class FlowFrameworkPersistenceTests { @Test(timeout=300_000) fun `flow restarted just after receiving payload`() { + val bob = bobNode.info.singleIdentity() bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it) .nonTerminating() } aliceNode.services.startFlow(SendFlow("Hello", bob)) @@ -92,6 +86,7 @@ class FlowFrameworkPersistenceTests { @Test(timeout=300_000) fun `flow loaded from checkpoint will respond to messages from before start`() { + val alice = aliceNode.info.singleIdentity() aliceNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) } bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow val restoredFlow = bobNode.restartAndGetRestoredFlow() @@ -101,6 +96,9 @@ class FlowFrameworkPersistenceTests { @Ignore("Some changes in startup order make this test's assumptions fail.") @Test(timeout=300_000) fun `flow with send will resend on interrupted restart`() { + val receivedSessionMessages: List = mutableListOf().also { messages -> + receivedSessionMessagesObservable().forEach { messages += it } + } val payload = random63BitValue() val payload2 = random63BitValue()