ENT-2569: Integration test to simulate FlowWorker restart. (#1463)

* ENT-2569: Integration test to simulate Rpc/FlowWorker restart.

Initial analysis shows that 15-25 MB cannot be accounted for after RpcWorker restart.
Please see below for stats sample.
I will need to run memory profiler to understand where the memory is going.

Sample 1:
```
Memory stats @Very beginning - Used memory: 66,167 KB, Total memory: 1,158,144 KB, Max memory: 7,414,784 KB
Memory stats @Between restarts - Used memory: 214,952 KB, Total memory: 3,228,160 KB, Max memory: 7,414,784 KB
Memory stats @Very end - Used memory: 231,285 KB, Total memory: 3,226,112 KB, Max memory: 7,414,784 KB
```

Sample 2:
```
Memory stats @Very beginning - Used memory: 65,649 KB, Total memory: 1,125,376 KB, Max memory: 7,414,784 KB
Memory stats @Between restarts - Used memory: 202,542 KB, Total memory: 3,390,976 KB, Max memory: 7,414,784 KB
Memory stats @Very end - Used memory: 217,969 KB, Total memory: 3,387,392 KB, Max memory: 7,414,784 KB
```

* ENT-2569: Repair FlowWorkerTest and make it use signed network parameters.

* ENT-2569: Take flow worker up and down multiple times.

* ENT-2569: Add memory consumption stats.

Sample output for 10 iterations (1 warm-up and 9 subsequent):
```
Memory stats @Very beginning - Used memory: 126,495 KB, Total memory: 1,599,488 KB, Max memory: 7,416,832 KB
Memory stats @After warm-up round - Used memory: 172,411 KB, Total memory: 2,096,128 KB, Max memory: 7,416,832 KB
Memory stats @Testing done - Used memory: 196,912 KB, Total memory: 2,213,376 KB, Max memory: 7,416,832 KB
```
So during 9 iterations/restart cycles we have lost ~25MB.

* ENT-2569: Re-structure the test in preparation for 2nd legal name.

* ENT-2569: Mote test re-structure.

* ENT-2569: Introduce BankB into the test scenario.
This commit is contained in:
Viktor Kolomeyko 2018-10-12 13:51:11 +01:00 committed by GitHub
parent ba271f7adc
commit ad981c7f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 334 additions and 17 deletions

View File

@ -0,0 +1,229 @@
package net.corda.flowworker
import net.corda.core.context.InvocationContext
import net.corda.core.context.Trace
import net.corda.core.crypto.Crypto.generateKeyPair
import net.corda.core.crypto.toStringShort
import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.internal.createDirectories
import net.corda.core.internal.div
import net.corda.core.node.NetworkParameters
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.SerializationDefaults
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.OpaqueBytes
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.seconds
import net.corda.finance.DOLLARS
import net.corda.finance.USD
import net.corda.finance.contracts.getCashBalances
import net.corda.finance.flows.CashIssueFlow
import net.corda.node.internal.NetworkParametersReader.NetworkParametersAndSigned
import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.services.config.*
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import net.corda.nodeapi.internal.createDevNetworkMapCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.testing.core.*
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.PortAllocation
import net.corda.testing.node.MockServices
import net.corda.testing.node.internal.TestCordappDirectories
import net.corda.testing.node.internal.cordappsForPackages
import org.apache.activemq.artemis.api.core.RoutingType
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.junit.Rule
import org.junit.Test
import java.nio.file.Paths
import java.security.KeyPair
import java.security.cert.X509Certificate
import java.time.Instant
import java.util.*
import kotlin.test.assertEquals
class FlowWorkerStartStopTest {
@Rule
@JvmField
val serializationEnvironment = SerializationEnvironmentRule(true)
private val portAllocation = PortAllocation.Incremental(10000)
companion object {
private val logger = contextLogger()
private val networkParameters = NetworkParameters(
minimumPlatformVersion = 1,
notaries = listOf(),
modifiedTime = Instant.now(),
maxMessageSize = MAX_MESSAGE_SIZE,
maxTransactionSize = 4000000,
epoch = 1,
whitelistedContractImplementations = emptyMap()
)
private fun signNetworkParams(certKeyPair: CertificateAndKeyPair, trustRoot: X509Certificate): NetworkParametersAndSigned {
val signedParams = certKeyPair.sign(networkParameters)
return NetworkParametersAndSigned(signedParams, trustRoot)
}
}
private val notaryKeyPair = generateKeyPair()
private val notary = Party(DUMMY_NOTARY_NAME, notaryKeyPair.public)
private val notaryPartyAndCertificate = getTestPartyAndCertificate(notary)
private val cordappDirectories = TestCordappDirectories.cached(cordappsForPackages(listOf("net.corda.finance"))).toList()
@Test
fun startStop() {
val infraA = setupFlowWorkerInfra(DUMMY_BANK_A_NAME)
val infraB = setupFlowWorkerInfra(DUMMY_BANK_B_NAME)
val signedNetworkParameters = signNetworkParams(createDevNetworkMapCa(), infraA.trustRoot)
try {
logger.logMemoryStats("Very beginning")
createFlowWorkerAndPerformTest(infraA, signedNetworkParameters, 1).first.stop()
val (nodeBFlowWorker, nodeBFlowWorkerHub) = createFlowWorkerAndPerformTest(infraB, signedNetworkParameters, 1)
logger.logMemoryStats("After warm-up round")
(2..10).forEach {
logger.info("Iteration #$it")
createFlowWorkerAndPerformTest(infraA, signedNetworkParameters, it).first.stop()
issueCash(infraB, nodeBFlowWorkerHub, it)
}
nodeBFlowWorker.stop()
logger.logMemoryStats("Testing done")
} finally {
infraA.bridgeControlListener.stop()
infraA.broker.stop()
}
}
private data class FlowWorkerInfra(val trustRoot: X509Certificate, val nodeCa: X509Certificate, val bankInfo: NodeInfo, val bankKeyPair: KeyPair,
val session: ClientSession, val consumer: ClientConsumer, val producer: ClientProducer,
val flowWorkerRequestQueueAddress: String, val flowWorkerReplyQueueAddress: String,
val broker: ArtemisBroker, val bridgeControlListener: BridgeControlListener,
val config: NodeConfiguration)
private fun setupFlowWorkerInfra(legalName: CordaX500Name): FlowWorkerInfra {
val bankKeyPair = generateKeyPair()
val bank = Party(legalName, bankKeyPair.public)
val bankPartyAndCertificate = getTestPartyAndCertificate(bank)
val bankInfo = NodeInfo(listOf(NetworkHostAndPort("localhost", 1111)), listOf(bankPartyAndCertificate), 1, 1)
val baseDirectory = DriverParameters().driverDirectory
val nodeDirectory = baseDirectory / legalName.organisation / "flowWorker"
nodeDirectory.createDirectories()
val brokerAddress = NetworkHostAndPort("localhost", portAllocation.nextPort())
val config = genericConfig().copy(myLegalName = legalName, baseDirectory = nodeDirectory,
messagingServerAddress = brokerAddress, dataSourceProperties = MockServices.makeTestDataSourceProperties(),
cordappDirectories = cordappDirectories)
// create test certificates
config.configureWithDevSSLCertificate()
val trustRoot = config.p2pSslOptions.trustStore.get().query { getCertificate(X509Utilities.CORDA_ROOT_CA) }
val nodeCa = config.signingCertificateStore.get().query { getCertificate(X509Utilities.CORDA_CLIENT_CA) }
val broker = createFlowWorkerBroker(config, networkParameters.maxMessageSize)
val bridgeControlListener = createBridgeControlListener(config, networkParameters.maxMessageSize)
val flowWorkerRequestQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${bankKeyPair.public.toStringShort()}"
val flowWorkerReplyQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}reply"
val (session, consumer, producer) = createArtemisClient(config, flowWorkerReplyQueueAddress)
return FlowWorkerInfra(trustRoot, nodeCa, bankInfo, bankKeyPair, session, consumer, producer, flowWorkerRequestQueueAddress, flowWorkerReplyQueueAddress,
broker, bridgeControlListener, config)
}
private fun createFlowWorkerAndPerformTest(infra: FlowWorkerInfra, signedNetworkParameters: NetworkParametersAndSigned, iterNumber: Int): Pair<FlowWorker, FlowWorkerServiceHub> {
val (flowWorker, flowWorkerServiceHub) = createFlowWorker(infra.config, infra.bankInfo, signedNetworkParameters,
infra.bankKeyPair, infra.trustRoot, infra.nodeCa)
issueCash(infra, flowWorkerServiceHub, iterNumber)
return Pair(flowWorker, flowWorkerServiceHub)
}
private fun issueCash(infra: FlowWorkerInfra, flowWorkerServiceHub: FlowWorkerServiceHub, iterNumber: Int) {
val traceId = Trace.InvocationId.newInstance()
val legalName = infra.bankInfo.legalIdentities.single().name
val startFlowMessage = StartFlow(legalName, CashIssueFlow::class.java, arrayOf(10.DOLLARS, OpaqueBytes.of(0x01), notary),
InvocationContext.service("bla", legalName), infra.flowWorkerReplyQueueAddress, traceId)
val message = infra.session.createMessage(true)
message.writeBodyBufferBytes(startFlowMessage.serialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT).bytes)
infra.producer.send(infra.flowWorkerRequestQueueAddress, message)
val flowReplyStateMachineRunId = receiveFlowWorkerMessage<FlowReplyStateMachineRunId>(infra.consumer)
println(flowReplyStateMachineRunId)
val flowReplyResult = receiveFlowWorkerMessage<FlowReplyResult>(infra.consumer)
assertEquals(traceId, flowReplyResult.replyId)
println(flowReplyResult)
val cashBalance = flowWorkerServiceHub.getCashBalances()
assertEquals((10 * iterNumber).DOLLARS, cashBalance[USD])
println("Cash: $cashBalance")
}
private fun genericConfig(): NodeConfigurationImpl {
return NodeConfigurationImpl(baseDirectory = Paths.get("."), myLegalName = CHARLIE_NAME, emailAddress = "",
keyStorePassword = "pass", trustStorePassword = "pass", crlCheckSoftFail = true, dataSourceProperties = Properties(),
rpcUsers = listOf(), verifierType = VerifierType.InMemory, flowTimeout = FlowTimeoutConfiguration(5.seconds, 3, 1.0),
p2pAddress = NetworkHostAndPort("localhost", 1), rpcSettings = NodeRpcSettings(NetworkHostAndPort("localhost", 1), null, ssl = null),
relay = null, messagingServerAddress = null, enterpriseConfiguration = EnterpriseConfiguration(mutualExclusionConfiguration = MutualExclusionConfiguration(updateInterval = 0, waitInterval = 0)),
notary = null)
}
private fun createFlowWorkerBroker(config: NodeConfiguration, maxMessageSize: Int): ArtemisBroker {
val broker = ArtemisMessagingServer(config, config.messagingServerAddress!!, maxMessageSize)
broker.start()
return broker
}
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, signedNetworkParameters: NetworkParametersAndSigned, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): Pair<FlowWorker, FlowWorkerServiceHub> {
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, ourKeyPair, trustRoot, nodeCa, signedNetworkParameters)
val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub)
flowWorker.start()
flowWorkerServiceHub.database.transaction {
flowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate)
}
return Pair(flowWorker, flowWorkerServiceHub)
}
private fun createBridgeControlListener(config: NodeConfiguration, maxMessageSize: Int): BridgeControlListener {
val bridgeControlListener = BridgeControlListener(config.p2pSslOptions, config.messagingServerAddress!!, maxMessageSize)
bridgeControlListener.start()
return bridgeControlListener
}
private fun createArtemisClient(config: NodeConfiguration, queueAddress: String): Triple<ClientSession, ClientConsumer, ClientProducer> {
val artemisClient = ArtemisMessagingClient(config.p2pSslOptions, config.messagingServerAddress!!, MAX_MESSAGE_SIZE)
val started = artemisClient.start()
started.session.createQueue(queueAddress, RoutingType.ANYCAST, queueAddress, true)
return Triple(started.session, started.session.createConsumer(queueAddress), started.session.createProducer())
}
private inline fun <reified T : FlowWorkerMessage> receiveFlowWorkerMessage(consumer: ClientConsumer): T {
val message = consumer.receive()
val data = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
return data.deserialize(context = SerializationDefaults.RPC_CLIENT_CONTEXT)
}
}

View File

@ -20,11 +20,14 @@ import net.corda.finance.DOLLARS
import net.corda.finance.USD
import net.corda.finance.contracts.getCashBalances
import net.corda.finance.flows.CashIssueFlow
import net.corda.node.internal.NetworkParametersReader.NetworkParametersAndSigned
import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.services.config.*
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.nodeapi.internal.ArtemisMessagingClient
import net.corda.nodeapi.internal.bridging.BridgeControlListener
import net.corda.nodeapi.internal.createDevNetworkMapCa
import net.corda.nodeapi.internal.crypto.CertificateAndKeyPair
import net.corda.nodeapi.internal.crypto.X509Utilities
import net.corda.testing.core.*
import net.corda.testing.driver.DriverParameters
@ -53,16 +56,22 @@ class FlowWorkerTest {
private val portAllocation = PortAllocation.Incremental(10000)
// TODO: Convert to Signed Network Parameters and pass into FlowWorkerServiceHub constructor.
private val networkParameters = NetworkParameters(
minimumPlatformVersion = 1,
notaries = listOf(),
modifiedTime = Instant.now(),
maxMessageSize = MAX_MESSAGE_SIZE,
maxTransactionSize = 4000000,
epoch = 1,
whitelistedContractImplementations = emptyMap()
)
companion object {
private val networkParameters = NetworkParameters(
minimumPlatformVersion = 1,
notaries = listOf(),
modifiedTime = Instant.now(),
maxMessageSize = MAX_MESSAGE_SIZE,
maxTransactionSize = 4000000,
epoch = 1,
whitelistedContractImplementations = emptyMap()
)
private fun signNetworkParams(certKeyPair: CertificateAndKeyPair, trustRoot: X509Certificate): NetworkParametersAndSigned {
val signedParams = certKeyPair.sign(networkParameters)
return NetworkParametersAndSigned(signedParams, trustRoot)
}
}
private val bankAKeyPair = generateKeyPair()
private val bankBKeyPair = generateKeyPair()
@ -103,7 +112,9 @@ class FlowWorkerTest {
val (session, consumer, producer) = createArtemisClient(config, flowWorkerReplyQueueAddress)
val (flowWorker, flowWorkerServiceHub) = createFlowWorker(config, bankAInfo, networkParameters, bankAKeyPair, trustRoot, nodeCa)
val signedNetworkParameters = signNetworkParams(createDevNetworkMapCa(), trustRoot)
val (flowWorker, flowWorkerServiceHub) = createFlowWorker(config, bankAInfo, signedNetworkParameters, bankAKeyPair, trustRoot, nodeCa)
try {
flowWorkerServiceHub.database.transaction {
flowWorkerServiceHub.identityService.registerIdentity(notaryPartyAndCertificate)
@ -152,7 +163,9 @@ class FlowWorkerTest {
val bankABroker = createFlowWorkerBroker(bankAConfig, networkParameters.maxMessageSize)
val bankABridgeControlListener = createBridgeControlListener(bankAConfig, networkParameters.maxMessageSize)
val (bankAFlowWorker, bankAFlowWorkerServiceHub) = createFlowWorker(bankAConfig, bankAInfo, networkParameters, bankAKeyPair, bankATrustRoot, bankANodeCa)
val signedNetworkParameters = signNetworkParams(createDevNetworkMapCa(), bankATrustRoot)
val (bankAFlowWorker, bankAFlowWorkerServiceHub) = createFlowWorker(bankAConfig, bankAInfo, signedNetworkParameters, bankAKeyPair, bankATrustRoot, bankANodeCa)
val bankARequestQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}${bankAKeyPair.public.toStringShort()}"
val bankAReplyQueueAddress = "${FlowWorker.FLOW_WORKER_QUEUE_ADDRESS_PREFIX}reply"
@ -173,7 +186,7 @@ class FlowWorkerTest {
val bankBBroker = createFlowWorkerBroker(bankBConfig, networkParameters.maxMessageSize)
val bankBBridgeControlListener = createBridgeControlListener(bankBConfig, networkParameters.maxMessageSize)
val (bankBFlowWorker, bankBFlowWorkerServiceHub) = createFlowWorker(bankBConfig, bankBInfo, networkParameters, bankBKeyPair, bankBTrustRoot, bankBNodeCa)
val (bankBFlowWorker, bankBFlowWorkerServiceHub) = createFlowWorker(bankBConfig, bankBInfo, signedNetworkParameters, bankBKeyPair, bankBTrustRoot, bankBNodeCa)
try {
bankAFlowWorkerServiceHub.database.transaction {
@ -227,8 +240,8 @@ class FlowWorkerTest {
return broker
}
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, networkParameters: NetworkParameters, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): Pair<FlowWorker, FlowWorkerServiceHub> {
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, ourKeyPair, trustRoot, nodeCa, TODO())
private fun createFlowWorker(config: NodeConfiguration, myInfo: NodeInfo, signedNetworkParameters: NetworkParametersAndSigned, ourKeyPair: KeyPair, trustRoot: X509Certificate, nodeCa: X509Certificate): Pair<FlowWorker, FlowWorkerServiceHub> {
val flowWorkerServiceHub = FlowWorkerServiceHub(config, myInfo, ourKeyPair, trustRoot, nodeCa, signedNetworkParameters)
val flowWorker = FlowWorker(UUID.randomUUID().toString(), flowWorkerServiceHub)
flowWorker.start()
return Pair(flowWorker, flowWorkerServiceHub)

View File

@ -0,0 +1,21 @@
package net.corda.flowworker
import org.slf4j.Logger
import java.text.NumberFormat
fun Logger.logMemoryStats(stage: String) {
fun Long.toKB(): Long = this / 1024
System.gc()
System.gc()
val nf = NumberFormat.getNumberInstance()
val runtime = Runtime.getRuntime()
val freeMemory = runtime.freeMemory().toKB()
val totalMemory = runtime.totalMemory().toKB()
val maxMemory = runtime.maxMemory().toKB()
info("Memory stats @$stage - Used memory: ${nf.format(totalMemory - freeMemory)} KB, Total memory: ${nf.format(totalMemory)} KB, Max memory: ${nf.format(maxMemory)} KB")
}

View File

@ -18,7 +18,6 @@ import net.corda.core.internal.concurrent.flatMap
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.transpose
import net.corda.core.internal.createDirectories
import net.corda.core.internal.createDirectory
import net.corda.core.internal.div
import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
@ -166,7 +165,7 @@ data class RpcFlowWorkerDriverDSL(private val driverDSL: DriverDSLImpl) : Intern
val flowWorkerBrokerAddress = NetworkHostAndPort("localhost", driverDSL.portAllocation.nextPort())
val baseDirectory = driverDSL.driverDirectory / myLegalName.organisation
baseDirectory.createDirectory()
baseDirectory.createDirectories()
val dataSourceProperties = MockServices.makeTestDataSourceProperties()
dataSourceProperties.setProperty("maximumPoolSize", "10")

View File

@ -0,0 +1,55 @@
package net.corda.rpcWorker
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.messaging.startFlow
import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.OpaqueBytes
import net.corda.finance.DOLLARS
import net.corda.finance.contracts.getCashBalances
import net.corda.finance.flows.CashIssueFlow
import net.corda.flowworker.logMemoryStats
import net.corda.testing.core.DUMMY_BANK_A_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.node.User
import org.junit.Test
class RpcWorkerStartStopTest {
companion object {
private val log = contextLogger()
}
@Test
fun startStop() {
log.logMemoryStats("Very beginning")
rpcFlowWorkerDriver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
val bankAUser = User("username", "password", permissions = setOf("ALL"))
val bankA = startRpcFlowWorker(DUMMY_BANK_A_NAME, listOf(bankAUser), 1).get()
val bankAProxy = CordaRPCClient(bankA.rpcAddress).start("username", "password").proxy
val cashIssueResult = bankAProxy.startFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0x01), defaultNotaryIdentity).returnValue.get()
println(cashIssueResult)
println(bankAProxy.getCashBalances())
}
log.logMemoryStats("Between restarts")
// Starting brand new instance
rpcFlowWorkerDriver(DriverParameters(startNodesInProcess = true, extraCordappPackagesToScan = listOf("net.corda.finance"))) {
val bankAUser = User("username", "password", permissions = setOf("ALL"))
val bankA = startRpcFlowWorker(DUMMY_BANK_A_NAME, listOf(bankAUser), 1).get()
val bankAProxy = CordaRPCClient(bankA.rpcAddress).start("username", "password").proxy
val cashIssueResult = bankAProxy.startFlow(::CashIssueFlow, 10.DOLLARS, OpaqueBytes.of(0x01), defaultNotaryIdentity).returnValue.get()
println(cashIssueResult)
println(bankAProxy.getCashBalances())
}
log.logMemoryStats("Very end")
}
}