mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
CORDA-2838 Set Artemis memory config. (#5011)
* CORDA-2838 Set Artemis memory config. * CORDA-2838 Cannot have page size larger than max size. * CORDA-2838 Use real slow consumers. Need to see if the old config settings can work with a global limit to avoid this.
This commit is contained in:
parent
d611a41ec8
commit
d6ac4e2393
@ -508,7 +508,7 @@ class RPCStabilityTests {
|
|||||||
session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue)
|
session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue)
|
||||||
val consumer = session.createConsumer(myQueue, null, -1, -1, false)
|
val consumer = session.createConsumer(myQueue, null, -1, -1, false)
|
||||||
consumer.setMessageHandler {
|
consumer.setMessageHandler {
|
||||||
Thread.sleep(50) // 5x slower than the server producer
|
Thread.sleep(5000) // Needs to be slower than one per second to get kicked.
|
||||||
it.acknowledge()
|
it.acknowledge()
|
||||||
}
|
}
|
||||||
val producer = session.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
val producer = session.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||||
@ -520,7 +520,7 @@ class RPCStabilityTests {
|
|||||||
val request = RPCApi.ClientToServer.RpcRequest(
|
val request = RPCApi.ClientToServer.RpcRequest(
|
||||||
clientAddress = SimpleString(myQueue),
|
clientAddress = SimpleString(myQueue),
|
||||||
methodName = SlowConsumerRPCOps::streamAtInterval.name,
|
methodName = SlowConsumerRPCOps::streamAtInterval.name,
|
||||||
serialisedArguments = listOf(10.millis, 123456).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT),
|
serialisedArguments = listOf(100.millis, 1234).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT),
|
||||||
replyId = Trace.InvocationId.newInstance(),
|
replyId = Trace.InvocationId.newInstance(),
|
||||||
sessionId = Trace.SessionId.newInstance()
|
sessionId = Trace.SessionId.newInstance()
|
||||||
)
|
)
|
||||||
|
@ -6,6 +6,7 @@ import com.codahale.metrics.MetricRegistry
|
|||||||
import com.github.benmanes.caffeine.cache.Caffeine
|
import com.github.benmanes.caffeine.cache.Caffeine
|
||||||
import com.palominolabs.metrics.newrelic.AllEnabledMetricAttributeFilter
|
import com.palominolabs.metrics.newrelic.AllEnabledMetricAttributeFilter
|
||||||
import com.palominolabs.metrics.newrelic.NewRelicReporter
|
import com.palominolabs.metrics.newrelic.NewRelicReporter
|
||||||
|
import io.netty.util.NettyRuntime
|
||||||
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
import net.corda.client.rpc.internal.serialization.amqp.AMQPClientSerializationScheme
|
||||||
import net.corda.cliutils.ShellConstants
|
import net.corda.cliutils.ShellConstants
|
||||||
import net.corda.core.concurrent.CordaFuture
|
import net.corda.core.concurrent.CordaFuture
|
||||||
@ -68,6 +69,8 @@ import org.slf4j.Logger
|
|||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import rx.Scheduler
|
import rx.Scheduler
|
||||||
import rx.schedulers.Schedulers
|
import rx.schedulers.Schedulers
|
||||||
|
import java.lang.Long.max
|
||||||
|
import java.lang.Long.min
|
||||||
import java.net.BindException
|
import java.net.BindException
|
||||||
import java.net.InetAddress
|
import java.net.InetAddress
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
@ -232,6 +235,13 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
|
|
||||||
network as P2PMessagingClient
|
network as P2PMessagingClient
|
||||||
|
|
||||||
|
if (System.getProperty("io.netty.allocator.numHeapArenas").isNullOrBlank()) {
|
||||||
|
// Netty arenas are approx 16MB each when max'd out. Set arenas based on memory, not core count, unless memory is abundant.
|
||||||
|
val memBasedArenas = max(Runtime.getRuntime().maxMemory() / 256.MB, 1L)
|
||||||
|
// We set the min of the above and the default.
|
||||||
|
System.setProperty("io.netty.allocator.numHeapArenas", min(memBasedArenas, NettyRuntime.availableProcessors() * 2L).toString())
|
||||||
|
}
|
||||||
|
|
||||||
// Construct security manager reading users data either from the 'security' config section
|
// Construct security manager reading users data either from the 'security' config section
|
||||||
// if present or from rpcUsers list if the former is missing from config.
|
// if present or from rpcUsers list if the former is missing from config.
|
||||||
val securityManagerConfig = configuration.security?.authService
|
val securityManagerConfig = configuration.security?.authService
|
||||||
|
@ -2,6 +2,7 @@ package net.corda.node.services.messaging
|
|||||||
|
|
||||||
import net.corda.core.internal.ThreadBox
|
import net.corda.core.internal.ThreadBox
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
|
import net.corda.core.internal.errors.AddressBindingException
|
||||||
import net.corda.core.serialization.SingletonSerializeAsToken
|
import net.corda.core.serialization.SingletonSerializeAsToken
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
@ -9,7 +10,6 @@ import net.corda.core.utilities.debug
|
|||||||
import net.corda.node.internal.artemis.*
|
import net.corda.node.internal.artemis.*
|
||||||
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_P2P_ROLE
|
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_P2P_ROLE
|
||||||
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.PEER_ROLE
|
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.PEER_ROLE
|
||||||
import net.corda.core.internal.errors.AddressBindingException
|
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor
|
import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor
|
||||||
import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor
|
import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor
|
||||||
@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer
|
|||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
|
||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
|
import java.lang.Long.max
|
||||||
import java.security.KeyStoreException
|
import java.security.KeyStoreException
|
||||||
import javax.annotation.concurrent.ThreadSafe
|
import javax.annotation.concurrent.ThreadSafe
|
||||||
import javax.security.auth.login.AppConfigurationEntry
|
import javax.security.auth.login.AppConfigurationEntry
|
||||||
@ -115,10 +116,16 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private fun createArtemisConfig() = SecureArtemisConfiguration().apply {
|
private fun createArtemisConfig() = SecureArtemisConfiguration().apply {
|
||||||
|
name = "P2P"
|
||||||
|
|
||||||
val artemisDir = config.baseDirectory / "artemis"
|
val artemisDir = config.baseDirectory / "artemis"
|
||||||
bindingsDirectory = (artemisDir / "bindings").toString()
|
bindingsDirectory = (artemisDir / "bindings").toString()
|
||||||
journalDirectory = (artemisDir / "journal").toString()
|
journalDirectory = (artemisDir / "journal").toString()
|
||||||
largeMessagesDirectory = (artemisDir / "large-messages").toString()
|
largeMessagesDirectory = (artemisDir / "large-messages").toString()
|
||||||
|
pagingDirectory = (artemisDir / "paging").toString()
|
||||||
|
// The transaction cache is configurable, and drives other cache sizes.
|
||||||
|
globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize)
|
||||||
|
|
||||||
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config.p2pSslOptions))
|
acceptorConfigurations = mutableSetOf(p2pAcceptorTcpTransport(NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port), config.p2pSslOptions))
|
||||||
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
|
||||||
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
|
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
|
||||||
|
@ -15,12 +15,15 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
|
|||||||
import org.apache.activemq.artemis.core.security.Role
|
import org.apache.activemq.artemis.core.security.Role
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy
|
||||||
import java.nio.file.Path
|
import java.nio.file.Path
|
||||||
|
|
||||||
internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, jmxEnabled: Boolean, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort?, sslOptions: BrokerRpcSslOptions?, useSsl: Boolean, nodeConfiguration: MutualSslConfiguration, shouldStartLocalShell: Boolean) : SecureArtemisConfiguration() {
|
internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int, jmxEnabled: Boolean, address: NetworkHostAndPort, adminAddress: NetworkHostAndPort?, sslOptions: BrokerRpcSslOptions?, useSsl: Boolean, nodeConfiguration: MutualSslConfiguration, shouldStartLocalShell: Boolean) : SecureArtemisConfiguration() {
|
||||||
val loginListener: (String) -> Unit
|
val loginListener: (String) -> Unit
|
||||||
|
|
||||||
init {
|
init {
|
||||||
|
name = "RPC"
|
||||||
|
|
||||||
setDirectories(baseDirectory)
|
setDirectories(baseDirectory)
|
||||||
|
|
||||||
val acceptorConfigurationsSet = mutableSetOf(
|
val acceptorConfigurationsSet = mutableSetOf(
|
||||||
@ -36,11 +39,16 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
|
|||||||
managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS)
|
managementNotificationAddress = SimpleString(ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS)
|
||||||
addressesSettings = mapOf(
|
addressesSettings = mapOf(
|
||||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
|
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
|
||||||
maxSizeBytes = 10L * maxMessageSize
|
maxSizeBytes = 5L * maxMessageSize
|
||||||
addressFullMessagePolicy = AddressFullMessagePolicy.FAIL
|
addressFullMessagePolicy = AddressFullMessagePolicy.PAGE
|
||||||
|
pageSizeBytes = 1L * maxMessageSize
|
||||||
|
slowConsumerPolicy = SlowConsumerPolicy.KILL
|
||||||
|
slowConsumerThreshold = 1
|
||||||
|
slowConsumerCheckPeriod = 30
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
globalMaxSize = Runtime.getRuntime().maxMemory() / 8
|
||||||
initialiseSettings(maxMessageSize)
|
initialiseSettings(maxMessageSize)
|
||||||
|
|
||||||
val nodeInternalRole = Role(BrokerJaasLoginModule.NODE_RPC_ROLE, true, true, true, true, true, true, true, true, true, true)
|
val nodeInternalRole = Role(BrokerJaasLoginModule.NODE_RPC_ROLE, true, true, true, true, true, true, true, true, true, true)
|
||||||
@ -108,6 +116,7 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
|
|||||||
bindingsDirectory = (baseDirectory / "bindings").toString()
|
bindingsDirectory = (baseDirectory / "bindings").toString()
|
||||||
journalDirectory = (baseDirectory / "journal").toString()
|
journalDirectory = (baseDirectory / "journal").toString()
|
||||||
largeMessagesDirectory = (baseDirectory / "large-messages").toString()
|
largeMessagesDirectory = (baseDirectory / "large-messages").toString()
|
||||||
|
pagingDirectory = (baseDirectory / "paging").toString()
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration {
|
private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): CoreQueueConfiguration {
|
||||||
|
@ -50,6 +50,7 @@ import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
|
|||||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
|
||||||
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
|
||||||
|
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy
|
||||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
|
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
|
||||||
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
|
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager3
|
||||||
import java.lang.reflect.Method
|
import java.lang.reflect.Method
|
||||||
@ -176,6 +177,7 @@ data class RPCDriverDSL(
|
|||||||
const val notificationAddress = "notifications"
|
const val notificationAddress = "notifications"
|
||||||
|
|
||||||
private fun ConfigurationImpl.configureCommonSettings(maxFileSize: Int, maxBufferedBytesPerClient: Long) {
|
private fun ConfigurationImpl.configureCommonSettings(maxFileSize: Int, maxBufferedBytesPerClient: Long) {
|
||||||
|
name = "RPCDriver"
|
||||||
managementNotificationAddress = SimpleString(notificationAddress)
|
managementNotificationAddress = SimpleString(notificationAddress)
|
||||||
isPopulateValidatedUser = true
|
isPopulateValidatedUser = true
|
||||||
journalBufferSize_NIO = maxFileSize
|
journalBufferSize_NIO = maxFileSize
|
||||||
@ -203,7 +205,11 @@ data class RPCDriverDSL(
|
|||||||
addressesSettings = mapOf(
|
addressesSettings = mapOf(
|
||||||
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
|
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
|
||||||
maxSizeBytes = maxBufferedBytesPerClient
|
maxSizeBytes = maxBufferedBytesPerClient
|
||||||
addressFullMessagePolicy = AddressFullMessagePolicy.FAIL
|
addressFullMessagePolicy = AddressFullMessagePolicy.PAGE
|
||||||
|
pageSizeBytes = maxSizeBytes / 10
|
||||||
|
slowConsumerPolicy = SlowConsumerPolicy.KILL
|
||||||
|
slowConsumerThreshold = 1
|
||||||
|
slowConsumerCheckPeriod = 30
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@ -222,6 +228,7 @@ data class RPCDriverDSL(
|
|||||||
bindingsDirectory = "$artemisDir/bindings"
|
bindingsDirectory = "$artemisDir/bindings"
|
||||||
journalDirectory = "$artemisDir/journal"
|
journalDirectory = "$artemisDir/journal"
|
||||||
largeMessagesDirectory = "$artemisDir/large-messages"
|
largeMessagesDirectory = "$artemisDir/large-messages"
|
||||||
|
pagingDirectory = "$artemisDir/paging"
|
||||||
acceptorConfigurations = setOf(ArtemisTcpTransport.rpcAcceptorTcpTransport(hostAndPort, null))
|
acceptorConfigurations = setOf(ArtemisTcpTransport.rpcAcceptorTcpTransport(hostAndPort, null))
|
||||||
configureCommonSettings(maxFileSize, maxBufferedBytesPerClient)
|
configureCommonSettings(maxFileSize, maxBufferedBytesPerClient)
|
||||||
}
|
}
|
||||||
@ -314,7 +321,7 @@ data class RPCDriverDSL(
|
|||||||
rpcUser: User = rpcTestUser,
|
rpcUser: User = rpcTestUser,
|
||||||
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
nodeLegalName: CordaX500Name = fakeNodeLegalName,
|
||||||
maxFileSize: Int = MAX_MESSAGE_SIZE,
|
maxFileSize: Int = MAX_MESSAGE_SIZE,
|
||||||
maxBufferedBytesPerClient: Long = 10L * MAX_MESSAGE_SIZE,
|
maxBufferedBytesPerClient: Long = 5L * MAX_MESSAGE_SIZE,
|
||||||
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
configuration: RPCServerConfiguration = RPCServerConfiguration.DEFAULT,
|
||||||
customPort: NetworkHostAndPort? = null,
|
customPort: NetworkHostAndPort? = null,
|
||||||
ops: I
|
ops: I
|
||||||
|
Loading…
Reference in New Issue
Block a user