CORDA-3973 Fix memory leak due to DB not shutting down (#6605)

Fix memory leak due to DB not shutting down in FlowFrameworkPersistenceTests.flow restarted just after receiving payload.

Also reduces number of class-wide variables to reduce scope for references being accidentally held between runs.
This commit is contained in:
Ross Nicoll 2020-08-10 20:04:51 +01:00 committed by GitHub
parent c191960cb8
commit 29e87a586a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,7 +2,6 @@ package net.corda.node.services.statemachine
import net.corda.core.crypto.random63BitValue
import net.corda.core.flows.FlowLogic
import net.corda.core.identity.Party
import net.corda.core.utilities.getOrThrow
import net.corda.node.services.persistence.checkpoints
import net.corda.testing.core.ALICE_NAME
@ -12,14 +11,18 @@ import net.corda.testing.core.singleIdentity
import net.corda.testing.flows.registerCordappFlowFactory
import net.corda.testing.internal.LogHelper
import net.corda.testing.node.InMemoryMessagingNetwork
import net.corda.testing.node.internal.*
import net.corda.testing.node.internal.DUMMY_CONTRACTS_CORDAPP
import net.corda.testing.node.internal.InternalMockNetwork
import net.corda.testing.node.internal.InternalMockNodeParameters
import net.corda.testing.node.internal.MockNodeFlowManager
import net.corda.testing.node.internal.TestStartedNode
import net.corda.testing.node.internal.startFlow
import org.assertj.core.api.Assertions.assertThat
import org.junit.After
import org.junit.Before
import org.junit.Ignore
import org.junit.Test
import rx.Observable
import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@ -31,14 +34,8 @@ class FlowFrameworkPersistenceTests {
}
private lateinit var mockNet: InternalMockNetwork
private val receivedSessionMessages = ArrayList<SessionTransfer>()
private lateinit var aliceNode: TestStartedNode
private lateinit var bobNode: TestStartedNode
private lateinit var notaryIdentity: Party
private lateinit var alice: Party
private lateinit var bob: Party
private lateinit var aliceFlowManager: MockNodeFlowManager
private lateinit var bobFlowManager: MockNodeFlowManager
@Before
fun start() {
@ -46,24 +43,20 @@ class FlowFrameworkPersistenceTests {
cordappsForAllNodes = listOf(DUMMY_CONTRACTS_CORDAPP),
servicePeerAllocationStrategy = InMemoryMessagingNetwork.ServicePeerAllocationStrategy.RoundRobin()
)
aliceFlowManager = MockNodeFlowManager()
bobFlowManager = MockNodeFlowManager()
aliceNode = mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager))
bobNode = mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager))
receivedSessionMessagesObservable().forEach { receivedSessionMessages += it }
// Extract identities
alice = aliceNode.info.singleIdentity()
bob = bobNode.info.singleIdentity()
notaryIdentity = mockNet.defaultNotaryIdentity
aliceNode = MockNodeFlowManager().let { aliceFlowManager ->
mockNet.createNode(InternalMockNodeParameters(legalName = ALICE_NAME, flowManager = aliceFlowManager))
}
bobNode = MockNodeFlowManager().let { bobFlowManager ->
mockNet.createNode(InternalMockNodeParameters(legalName = BOB_NAME, flowManager = bobFlowManager))
}
}
@After
fun cleanUp() {
mockNet.stopNodes()
receivedSessionMessages.clear()
aliceNode.internals.manuallyCloseDB()
bobNode.internals.manuallyCloseDB()
mockNet.close()
}
@Test(timeout=300_000)
@ -76,6 +69,7 @@ class FlowFrameworkPersistenceTests {
@Test(timeout=300_000)
fun `flow restarted just after receiving payload`() {
val bob = bobNode.info.singleIdentity()
bobNode.registerCordappFlowFactory(SendFlow::class) { InitiatedReceiveFlow(it)
.nonTerminating() }
aliceNode.services.startFlow(SendFlow("Hello", bob))
@ -92,6 +86,7 @@ class FlowFrameworkPersistenceTests {
@Test(timeout=300_000)
fun `flow loaded from checkpoint will respond to messages from before start`() {
val alice = aliceNode.info.singleIdentity()
aliceNode.registerCordappFlowFactory(ReceiveFlow::class) { InitiatedSendFlow("Hello", it) }
bobNode.services.startFlow(ReceiveFlow(alice).nonTerminating()) // Prepare checkpointed receive flow
val restoredFlow = bobNode.restartAndGetRestoredFlow<ReceiveFlow>()
@ -101,6 +96,9 @@ class FlowFrameworkPersistenceTests {
@Ignore("Some changes in startup order make this test's assumptions fail.")
@Test(timeout=300_000)
fun `flow with send will resend on interrupted restart`() {
val receivedSessionMessages: List<SessionTransfer> = mutableListOf<SessionTransfer>().also { messages ->
receivedSessionMessagesObservable().forEach { messages += it }
}
val payload = random63BitValue()
val payload2 = random63BitValue()