CORDA-3726: Remove memory leaks when many InProcess nodes started (#6169)

* CORDA-3762: Integration test exposing the problem reported

* CORDA-3726: Additional logging

* CORDA-3726: Prevent thread leaks

* CORDA-3726: New `journalBufferTimeout` parameter

* CORDA-3726: Override `journalBufferTimeout` parameter

* CORDA-3726: Making Detekt happier

* CORDA-3276: Account for extra thread user in MockNetwork

For real node this does not matter as `shutdown` can safely be called multiple times, which is not true for server thread provided by MockNetwork

* CORDA-3276: Do not make SMM shutdown "executor" as it belongs to AbstractNode

* CORDA-3276: Address input from @rick-r3

* CORDA-3276: Fix test after rebase
This commit is contained in:
Viktor Kolomeyko 2020-04-23 08:53:51 +01:00 committed by GitHub
parent f3c78c02da
commit 257026f606
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 109 additions and 29 deletions

View File

@ -340,7 +340,6 @@
<ID>ForbiddenComment:FlowLogicRefFactoryImpl.kt$FlowLogicRefFactoryImpl$// TODO: This is used via RPC but it's probably better if we pass in argument names and values explicitly</ID>
<ID>ForbiddenComment:GenerateRpcSslCertsCli.kt$GenerateRpcSslCerts$// TODO: consider adding a password strength policy.</ID>
<ID>ForbiddenComment:GuiUtilities.kt$// TODO: This is a temporary fix for the UI to show the correct issuer identity, this will break when we start randomizing keys. More work is needed here when the identity work is done.</ID>
<ID>ForbiddenComment:HibernateConfiguration.kt$HibernateConfiguration$// TODO: require mechanism to set schemaOptions (databaseSchema, tablePrefix) which are not global to session</ID>
<ID>ForbiddenComment:IRS.kt$FloatingRatePaymentEvent$// TODO: Should an uncalculated amount return a zero ? null ? etc.</ID>
<ID>ForbiddenComment:IRS.kt$InterestRateSwap$// TODO: Confirm: would someone really enter a swap with a negative fixed rate?</ID>
<ID>ForbiddenComment:IRS.kt$InterestRateSwap$// TODO: further tests</ID>
@ -632,8 +631,8 @@
<ID>LongParameterList:AbstractNode.kt$(databaseConfig: DatabaseConfig, wellKnownPartyFromX500Name: (CordaX500Name) -&gt; Party?, wellKnownPartyFromAnonymous: (AbstractParty) -&gt; Party?, schemaService: SchemaService, hikariProperties: Properties, cacheFactory: NamedCacheFactory, customClassLoader: ClassLoader?)</ID>
<ID>LongParameterList:AbstractNode.kt$(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set&lt;MappedSchema&gt;, metricRegistry: MetricRegistry? = null, cordappLoader: CordappLoader? = null, currentDir: Path? = null, ourName: CordaX500Name)</ID>
<ID>LongParameterList:ArtemisMessagingServer.kt$ArtemisMessagingServer$(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false, deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false)</ID>
<ID>LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, securityManager: RPCSecurityManager, maxMessageSize: Int, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)</ID>
<ID>LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, sslOptions: BrokerRpcSslOptions, securityManager: RPCSecurityManager, maxMessageSize: Int, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)</ID>
<ID>LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)</ID>
<ID>LongParameterList:ArtemisRpcBroker.kt$ArtemisRpcBroker.Companion$(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, sslOptions: BrokerRpcSslOptions, securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean)</ID>
<ID>LongParameterList:ArtemisRpcTests.kt$ArtemisRpcTests$(nodeSSlconfig: MutualSslConfiguration, brokerSslOptions: BrokerRpcSslOptions?, useSslForBroker: Boolean, clientSslOptions: ClientRpcSslOptions?, address: NetworkHostAndPort = ports.nextHostAndPort(), adminAddress: NetworkHostAndPort = ports.nextHostAndPort(), baseDirectory: Path = tempFolder.root.toPath() )</ID>
<ID>LongParameterList:AttachmentsClassLoader.kt$AttachmentsClassLoaderBuilder$(attachments: List&lt;Attachment&gt;, params: NetworkParameters, txId: SecureHash, isAttachmentTrusted: (Attachment) -&gt; Boolean, parent: ClassLoader = ClassLoader.getSystemClassLoader(), block: (ClassLoader) -&gt; T)</ID>
<ID>LongParameterList:BFTSmart.kt$BFTSmart.Replica$( states: List&lt;StateRef&gt;, txId: SecureHash, callerName: CordaX500Name, requestSignature: NotarisationRequestSignature, timeWindow: TimeWindow?, references: List&lt;StateRef&gt; = emptyList() )</ID>
@ -1194,7 +1193,6 @@
<ID>MatchingDeclarationName:ConfigException.kt$net.corda.core.cordapp.ConfigException.kt</ID>
<ID>MatchingDeclarationName:ConfigUtilities.kt$net.corda.node.services.config.ConfigUtilities.kt</ID>
<ID>MatchingDeclarationName:ContractsDSL.kt$net.corda.core.contracts.ContractsDSL.kt</ID>
<ID>MatchingDeclarationName:CordaUtils.kt$net.corda.core.internal.CordaUtils.kt</ID>
<ID>MatchingDeclarationName:CurrencyParameterSensitivitySerialiser.kt$net.corda.vega.plugin.customserializers.CurrencyParameterSensitivitySerialiser.kt</ID>
<ID>MatchingDeclarationName:FinanceWorkflowsUtils.kt$net.corda.finance.workflows.utils.FinanceWorkflowsUtils.kt</ID>
<ID>MatchingDeclarationName:FlowStackSnapshot.kt$net.corda.testing.internal.FlowStackSnapshot.kt</ID>
@ -1465,6 +1463,7 @@
<ID>TooGenericExceptionCaught:AbstractNode.kt$ex: Exception</ID>
<ID>TooGenericExceptionCaught:AbstractNodeTests.kt$ColdJVM.Companion$t: Throwable</ID>
<ID>TooGenericExceptionCaught:Amount.kt$Amount.Companion$e: Exception</ID>
<ID>TooGenericExceptionCaught:ArtemisRpcBroker.kt$ArtemisRpcBroker$th: Throwable</ID>
<ID>TooGenericExceptionCaught:AttachmentDemo.kt$e: Exception</ID>
<ID>TooGenericExceptionCaught:AttachmentLoadingTests.kt$AttachmentLoadingTests.ConsumeAndBroadcastResponderFlow$e: Exception</ID>
<ID>TooGenericExceptionCaught:AttachmentVersionNumberMigration.kt$AttachmentVersionNumberMigration$e: Exception</ID>
@ -1500,6 +1499,7 @@
<ID>TooGenericExceptionCaught:DeserializeSimpleTypesTests.kt$DeserializeSimpleTypesTests$e: Exception</ID>
<ID>TooGenericExceptionCaught:DockerInstantiator.kt$DockerInstantiator$e: Exception</ID>
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$DriverDSLImpl$e: Exception</ID>
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$DriverDSLImpl.Companion$th: Throwable</ID>
<ID>TooGenericExceptionCaught:DriverDSLImpl.kt$exception: Throwable</ID>
<ID>TooGenericExceptionCaught:DriverTests.kt$DriverTests$e: Exception</ID>
<ID>TooGenericExceptionCaught:ErrorCodeLoggingTests.kt$e: Exception</ID>

View File

@ -289,7 +289,13 @@ class CordaPersistence(
override fun close() {
// DataSource doesn't implement AutoCloseable so we just have to hope that the implementation does so that we can close it
(_dataSource as? AutoCloseable)?.close()
val mayBeAutoClosableDataSource = _dataSource as? AutoCloseable
if(mayBeAutoClosableDataSource != null) {
log.info("Closing $mayBeAutoClosableDataSource")
mayBeAutoClosableDataSource.close()
} else {
log.warn("$_dataSource has not been properly closed")
}
}
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")

View File

@ -0,0 +1,30 @@
package net.corda.node.endurance
import net.corda.core.utilities.getOrThrow
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.BOB_NAME
import net.corda.testing.driver.DriverParameters
import net.corda.testing.driver.driver
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
@RunWith(Parameterized::class)
class NodesStartStopSingleVmTests(@Suppress("unused") private val iteration: Int) {
companion object {
@JvmStatic
@Parameterized.Parameters(name = "iteration = {0}")
fun iterations(): Iterable<Array<Int>> {
return (1..60).map { arrayOf(it) }
}
}
@Test(timeout = 300_000)
fun nodesStartStop() {
driver(DriverParameters(notarySpecs = emptyList(), startNodesInProcess = true)) {
startNode(providedName = ALICE_NAME).getOrThrow()
startNode(providedName = BOB_NAME).getOrThrow()
}
}
}

View File

@ -204,7 +204,7 @@ class AMQPBridgeTest {
doReturn(null).whenever(it).jmxMonitoringHttpPort
}
artemisConfig.configureWithDevSSLCertificate()
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE)
val artemisServer = ArtemisMessagingServer(artemisConfig, artemisAddress.copy(host = "0.0.0.0"), MAX_MESSAGE_SIZE, null)
val artemisClient = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisAddress, MAX_MESSAGE_SIZE)
artemisServer.start()
artemisClient.start()

View File

@ -648,7 +648,7 @@ class CertificateRevocationListNodeTests {
doReturn(crlCheckArtemisServer).whenever(it).crlCheckArtemisServer
}
artemisConfig.configureWithDevSSLCertificate()
val server = ArtemisMessagingServer(artemisConfig, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE)
val server = ArtemisMessagingServer(artemisConfig, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE, null)
val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, artemisConfig.p2pAddress, MAX_MESSAGE_SIZE)
server.start()
client.start()

View File

@ -428,7 +428,7 @@ class ProtonWrapperTests {
}
artemisConfig.configureWithDevSSLCertificate()
val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize)
val server = ArtemisMessagingServer(artemisConfig, NetworkHostAndPort("0.0.0.0", artemisPort), maxMessageSize, null)
val client = ArtemisMessagingClient(artemisConfig.p2pSslOptions, NetworkHostAndPort("localhost", artemisPort), maxMessageSize)
server.start()
client.start()

View File

@ -239,7 +239,7 @@ class ArtemisMessagingTest {
}
private fun createMessagingServer(local: Int = serverPort, maxMessageSize: Int = MAX_MESSAGE_SIZE): ArtemisMessagingServer {
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize).apply {
return ArtemisMessagingServer(config, NetworkHostAndPort("0.0.0.0", local), maxMessageSize, null).apply {
config.configureWithDevSSLCertificate()
messagingServer = this
}

View File

@ -50,7 +50,7 @@ class ArtemisRpcTests {
@JvmField
val tempFolder = TemporaryFolder()
val testName = X500Principal("CN=Test,O=R3 Ltd,L=London,C=GB")
private val testName = X500Principal("CN=Test,O=R3 Ltd,L=London,C=GB")
@Test(timeout=300_000)
fun rpc_with_ssl_enabled() {
@ -108,9 +108,9 @@ class ArtemisRpcTests {
val jmxEnabled = false
val artemisBroker: ArtemisBroker = if (useSslForBroker) {
ArtemisRpcBroker.withSsl(nodeSSlconfig, address, adminAddress, brokerSslOptions!!, securityManager, maxMessageSize, jmxEnabled, baseDirectory, false)
ArtemisRpcBroker.withSsl(nodeSSlconfig, address, adminAddress, brokerSslOptions!!, securityManager, maxMessageSize, null, jmxEnabled, baseDirectory, false)
} else {
ArtemisRpcBroker.withoutSsl(nodeSSlconfig, address, adminAddress, securityManager, maxMessageSize, jmxEnabled, baseDirectory, false)
ArtemisRpcBroker.withoutSsl(nodeSSlconfig, address, adminAddress, securityManager, maxMessageSize, null, jmxEnabled, baseDirectory, false)
}
artemisBroker.use { broker ->
broker.start()

View File

@ -279,6 +279,8 @@ open class Node(configuration: NodeConfiguration,
private var internalRpcMessagingClient: InternalRPCMessagingClient? = null
private var rpcBroker: ArtemisBroker? = null
protected open val journalBufferTimeout : Int? = null
private var shutdownHook: ShutdownHook? = null
// DISCUSSION
@ -360,7 +362,7 @@ open class Node(configuration: NodeConfiguration,
val messageBroker = if (!configuration.messagingServerExternal) {
val brokerBindAddress = configuration.messagingServerAddress
?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port)
ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize)
ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize, journalBufferTimeout)
} else {
null
}
@ -435,9 +437,11 @@ open class Node(configuration: NodeConfiguration,
val rpcBrokerDirectory: Path = baseDirectory / "brokers" / "rpc"
with(rpcOptions) {
rpcBroker = if (useSsl) {
ArtemisRpcBroker.withSsl(configuration.p2pSslOptions, this.address, adminAddress, sslConfig!!, securityManager, MAX_RPC_MESSAGE_SIZE, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell())
ArtemisRpcBroker.withSsl(configuration.p2pSslOptions, this.address, adminAddress, sslConfig!!, securityManager, MAX_RPC_MESSAGE_SIZE,
journalBufferTimeout, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell())
} else {
ArtemisRpcBroker.withoutSsl(configuration.p2pSslOptions, this.address, adminAddress, securityManager, MAX_RPC_MESSAGE_SIZE, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell())
ArtemisRpcBroker.withoutSsl(configuration.p2pSslOptions, this.address, adminAddress, securityManager, MAX_RPC_MESSAGE_SIZE,
journalBufferTimeout, jmxMonitoringHttpPort != null, rpcBrokerDirectory, shouldStartLocalShell())
}
}
rpcBroker!!.addresses
@ -528,6 +532,7 @@ open class Node(configuration: NodeConfiguration,
val url = try {
server.start().url
} catch (e: JdbcSQLNonTransientConnectionException) {
log.error("Unexpected database connectivity error", e)
if (e.cause is BindException) {
throw AddressBindingException(effectiveH2Settings.address)
} else {

View File

@ -20,6 +20,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pAcceptorTcpTransport
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
import net.corda.nodeapi.internal.requireOnDefaultFileSystem
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.activemq.artemis.core.config.Configuration
@ -52,7 +53,8 @@ import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.RE
@ThreadSafe
class ArtemisMessagingServer(private val config: NodeConfiguration,
private val messagingServerAddress: NetworkHostAndPort,
private val maxMessageSize: Int) : ArtemisBroker, SingletonSerializeAsToken() {
private val maxMessageSize: Int,
private val journalBufferTimeout : Int?) : ArtemisBroker, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
}
@ -103,7 +105,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
try {
activeMQServer.start()
} catch (e: java.io.IOException) {
} catch (e: IOException) {
log.error("Unable to start message broker", e)
if (e.isBindingError()) {
throw AddressBindingException(config.p2pAddress)
} else {
@ -135,6 +138,8 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
isPopulateValidatedUser = true
journalBufferSize_NIO = maxMessageSize + JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
journalBufferSize_AIO = maxMessageSize + JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
journalBufferTimeout_NIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio()
journalBufferTimeout_AIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutAio()
journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB.
managementNotificationAddress = SimpleString(NOTIFICATIONS_ADDRESS)

View File

@ -3,6 +3,7 @@ package net.corda.node.services.rpc
import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.debug
import net.corda.node.internal.artemis.*
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_SECURITY_CONFIG
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.RPC_SECURITY_CONFIG
@ -26,6 +27,7 @@ class ArtemisRpcBroker internal constructor(
private val useSsl: Boolean,
private val securityManager: RPCSecurityManager,
private val maxMessageSize: Int,
private val journalBufferTimeout: Int?,
private val jmxEnabled: Boolean = false,
private val baseDirectory: Path,
private val nodeConfiguration: MutualSslConfiguration,
@ -34,25 +36,35 @@ class ArtemisRpcBroker internal constructor(
companion object {
private val logger = loggerFor<ArtemisRpcBroker>()
fun withSsl(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, sslOptions: BrokerRpcSslOptions, securityManager: RPCSecurityManager, maxMessageSize: Int, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean): ArtemisBroker {
return ArtemisRpcBroker(address, adminAddress, sslOptions, true, securityManager, maxMessageSize, jmxEnabled, baseDirectory, configuration, shouldStartLocalShell)
fun withSsl(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort,
sslOptions: BrokerRpcSslOptions, securityManager: RPCSecurityManager, maxMessageSize: Int,
journalBufferTimeout: Int?, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean): ArtemisBroker {
return ArtemisRpcBroker(address, adminAddress, sslOptions, true, securityManager, maxMessageSize, journalBufferTimeout,
jmxEnabled, baseDirectory, configuration, shouldStartLocalShell)
}
fun withoutSsl(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort, securityManager: RPCSecurityManager, maxMessageSize: Int, jmxEnabled: Boolean, baseDirectory: Path, shouldStartLocalShell: Boolean): ArtemisBroker {
return ArtemisRpcBroker(address, adminAddress, null, false, securityManager, maxMessageSize, jmxEnabled, baseDirectory, configuration, shouldStartLocalShell)
fun withoutSsl(configuration: MutualSslConfiguration, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort,
securityManager: RPCSecurityManager, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean,
baseDirectory: Path, shouldStartLocalShell: Boolean): ArtemisBroker {
return ArtemisRpcBroker(address, adminAddress, null, false, securityManager, maxMessageSize, journalBufferTimeout,
jmxEnabled, baseDirectory, configuration, shouldStartLocalShell)
}
}
override fun start() {
logger.debug("Artemis RPC broker is starting.")
logger.debug { "Artemis RPC broker is starting for: $addresses" }
try {
server.start()
} catch (e: java.io.IOException) {
} catch (e: IOException) {
logger.error("Unable to start message broker", e)
if (e.isBindingError()) {
throw AddressBindingException(adminAddressOptional?.let { setOf(it, addresses.primary) } ?: setOf(addresses.primary))
} else {
throw e
}
} catch (th: Throwable) {
logger.error("Unexpected error starting message broker", th)
throw th
}
logger.debug("Artemis RPC broker is started.")
}
@ -73,7 +85,8 @@ class ArtemisRpcBroker internal constructor(
private val server = initialiseServer()
private fun initialiseServer(): ActiveMQServer {
val serverConfiguration = RpcBrokerConfiguration(baseDirectory, maxMessageSize, jmxEnabled, addresses.primary, adminAddressOptional, sslOptions, useSsl, nodeConfiguration, shouldStartLocalShell)
val serverConfiguration = RpcBrokerConfiguration(baseDirectory, maxMessageSize, journalBufferTimeout, jmxEnabled,
addresses.primary, adminAddressOptional, sslOptions, useSsl, nodeConfiguration, shouldStartLocalShell)
val serverSecurityManager = createArtemisSecurityManager(serverConfiguration.loginListener)
return ActiveMQServerImpl(serverConfiguration, serverSecurityManager).apply {

View File

@ -11,6 +11,7 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcAcceptorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcInternalAcceptorTcpTransport
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
import org.apache.activemq.artemis.core.security.Role
@ -18,7 +19,9 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import java.nio.file.Path
internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, jmxEnabled: Boolean, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort?, sslOptions: BrokerRpcSslOptions?, useSsl: Boolean, nodeConfiguration: MutualSslConfiguration, shouldStartLocalShell: Boolean) : SecureArtemisConfiguration() {
internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, journalBufferTimeout: Int?, jmxEnabled: Boolean,
address: NetworkHostAndPort, adminAddress: NetworkHostAndPort?, sslOptions: BrokerRpcSslOptions?,
useSsl: Boolean, nodeConfiguration: MutualSslConfiguration, shouldStartLocalShell: Boolean) : SecureArtemisConfiguration() {
val loginListener: (String) -> Unit
init {
@ -46,7 +49,7 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
)
globalMaxSize = Runtime.getRuntime().maxMemory() / 8
initialiseSettings(maxMessageSize)
initialiseSettings(maxMessageSize, journalBufferTimeout)
val nodeInternalRole = Role(BrokerJaasLoginModule.NODE_RPC_ROLE, true, true, true, true, true, true, true, true, true, true)
@ -80,14 +83,16 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
isJMXUseBrokerName = true
}
private fun initialiseSettings(maxMessageSize: Int) {
private fun initialiseSettings(maxMessageSize: Int, journalBufferTimeout: Int?) {
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true
isPopulateValidatedUser = true
journalBufferSize_NIO = maxMessageSize // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
journalBufferTimeout_NIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio()
journalBufferSize_AIO = maxMessageSize // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
journalBufferTimeout_AIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutAio()
journalFileSize = maxMessageSize // The size of each journal file in bytes. Artemis default is 10MiB.
}

View File

@ -196,6 +196,9 @@ class SingleThreadedStateMachineManager(
val foundUnrestorableFibers = it.stop()
check(!foundUnrestorableFibers) { "Unrestorable checkpoints were created, please check the logs for details." }
}
flowHospital.close()
timeoutScheduler.shutdown()
scheduler.shutdown()
}
/**

View File

@ -23,6 +23,7 @@ import net.corda.core.utilities.seconds
import net.corda.node.services.FinalityHandler
import org.hibernate.exception.ConstraintViolationException
import rx.subjects.PublishSubject
import java.io.Closeable
import java.sql.SQLException
import java.sql.SQLTransientConnectionException
import java.time.Clock
@ -39,7 +40,7 @@ import kotlin.math.pow
*/
class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
private val clock: Clock,
private val ourSenderUUID: String) {
private val ourSenderUUID: String) : Closeable {
companion object {
private val log = contextLogger()
private val staff = listOf(
@ -306,6 +307,10 @@ class StaffedFlowHospital(private val flowMessaging: FlowMessaging,
operator fun contains(flowId: StateMachineRunId) = mutex.locked { flowId in flowPatients }
override fun close() {
hospitalJobTimer.cancel()
}
class FlowMedicalHistory {
internal val records: MutableList<MedicalRecord.Flow> = mutableListOf()

View File

@ -868,7 +868,12 @@ class DriverDSLImpl(
val nodeInfo = node.start()
val nodeWithInfo = NodeWithInfo(node, nodeInfo)
val nodeThread = thread(name = config.corda.myLegalName.organisation) {
node.run()
try {
node.run()
} catch (th: Throwable) {
log.error("Node run terminated unexpectedly", th)
}
log.info("Node run completed")
}
nodeWithInfo to nodeThread
}.flatMap { nodeAndThread ->

View File

@ -159,4 +159,7 @@ class InProcessNode(configuration: NodeConfiguration, versionInfo: VersionInfo,
}
override val rxIoScheduler get() = CachedThreadScheduler(testThreadFactory()).also { runOnStop += it::shutdown }
// Switch journal buffering off or else for many nodes it is possible to receive OOM in un-managed heap space
override val journalBufferTimeout = 0
}