Merge pull request #7906 from corda/ENT-12852-port-single-broker

ENT-12852-port-single-broker
This commit is contained in:
Adel El-Beik
2025-03-27 12:44:25 +00:00
committed by GitHub
16 changed files with 345 additions and 123 deletions

View File

@ -106,6 +106,7 @@ buildscript {
ext.proguard_version = constants.getProperty('proguardVersion')
ext.jsch_version = constants.getProperty("jschVersion")
ext.protonj_version = constants.getProperty("protonjVersion")
ext.qpid_version = constants.getProperty("qpidVersion")
ext.snappy_version = constants.getProperty("snappyVersion")
ext.class_graph_version = constants.getProperty('classgraphVersion')
ext.jcabi_manifests_version = constants.getProperty("jcabiManifestsVersion")

View File

@ -42,7 +42,7 @@ tcnativeVersion=2.0.48.Final
# We must configure it manually to use the latest capsule version.
capsuleVersion=1.0.4_r3
asmVersion=9.5
artemisVersion=2.36.0
artemisVersion=2.38.0
# TODO Upgrade Jackson only when corda is using kotlin 1.3.10
jacksonVersion=2.17.2
jacksonKotlinVersion=2.17.0
@ -85,8 +85,9 @@ dockerComposeRuleVersion=1.5.0
seleniumVersion=3.141.59
ghostdriverVersion=2.1.0
jschVersion=0.1.55
# Override Artemis version
protonjVersion=0.33.0
# Override Artemis version - no longer think we should be doing this as Artemis has now moved on. This now matches Artemis.
protonjVersion=0.34.1
qpidVersion=0.34.0
snappyVersion=0.5
jcabiManifestsVersion=1.1
picocliVersion=3.9.6

View File

@ -34,8 +34,12 @@ class ArtemisMessagingComponent {
// TODO: we might want to make this value configurable.
const val JOURNAL_HEADER_SIZE = 1024
// Time interval after which every connected client is re-authenticated using BrokerJaasLoginModule.
// Setting it to 1 hour (instead of default value of 10 seconds) to avoid frequent expensive checks, e.g. CRL check.
const val SECURITY_INVALIDATION_INTERVAL = 3600 * 1000L
// Setting it to the default value of 10 seconds to balance responsiveness of RPC and P2P requirements
// Previously, this was 1 hour, to avoid frequent expensive checks, e.g. CRL check, but the CRL check has already been removed
// in earlier refactoring
// Made configurable while we are here in case a need to diagnose support issue
val SECURITY_INVALIDATION_INTERVAL: Long
get() = System.getProperty("net.corda.node.services.messaging.securityInvalidationInterval")?.toLong() ?: 10 * 1000L
object P2PMessagingHeaders {
// This is a "property" attached to an Artemis MQ message object, which contains our own notion of "topic".

View File

@ -9,7 +9,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<Packet>(maxMessageSize), Interceptor {
class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int, port: Int? = null) : MessageSizeChecksInterceptor<Packet>(maxMessageSize, port), Interceptor {
override fun getMessageSize(packet: Packet?): Long? {
return when (packet) {
// This is an estimate of how much memory a Message body takes up.
@ -21,19 +21,25 @@ class ArtemisMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChec
}
}
class AmqpMessageSizeChecksInterceptor(maxMessageSize: Int) : MessageSizeChecksInterceptor<AMQPMessage>(maxMessageSize), AmqpInterceptor {
class AmqpMessageSizeChecksInterceptor(maxMessageSize: Int, port: Int) : MessageSizeChecksInterceptor<AMQPMessage>(maxMessageSize, port), AmqpInterceptor {
override fun getMessageSize(packet: AMQPMessage?): Long? = packet?.wholeMessageSize
}
/**
* Artemis message interceptor to enforce maxMessageSize on incoming messages.
*/
sealed class MessageSizeChecksInterceptor<T : Any>(private val maxMessageSize: Int) : BaseInterceptor<T> {
sealed class MessageSizeChecksInterceptor<T : Any>(private val maxMessageSize: Int, intPort: Int?) : BaseInterceptor<T> {
companion object {
private val logger = contextLogger()
}
private val port = intPort?.toString()
override fun intercept(packet: T, connection: RemotingConnection?): Boolean {
if (port != null) {
val localPort = connection?.transportLocalAddress?.substringAfterLast(':') ?: return true
if (localPort != port) return true
}
val messageSize = getMessageSize(packet) ?: return true
return if (messageSize > maxMessageSize) {
logger.warn("Message size exceeds maxMessageSize network parameter, maxMessageSize: [$maxMessageSize], message size: [$messageSize], " +

View File

@ -224,7 +224,12 @@ dependencies {
integrationTestImplementation "de.javakaffee:kryo-serializers:$kryo_serializer_version"
integrationTestImplementation "junit:junit:$junit_version"
integrationTestImplementation "org.assertj:assertj-core:${assertj_version}"
integrationTestImplementation "org.apache.qpid:qpid-jms-client:${protonj_version}"
integrationTestImplementation "org.apache.qpid:qpid-jms-client:${qpid_version}"
// Allow access to simple SOCKS Server for integration testing
integrationTestImplementation("io.netty:netty-example:$netty_version") {
exclude group: "io.netty", module: "netty-tcnative"
exclude group: "ch.qos.logback", module: "logback-classic"
}
// used by FinalityFlowErrorHandlingTest
slowIntegrationTestImplementation project(':testing:cordapps:cashobservers')

View File

@ -15,6 +15,7 @@ import net.corda.node.internal.DataSourceFactory
import net.corda.node.internal.NodeWithInfo
import net.corda.node.services.Permissions
import net.corda.node.services.config.PasswordEncryption
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.SECURITY_INVALIDATION_INTERVAL
import net.corda.testing.core.ALICE_NAME
import net.corda.testing.internal.IS_S390X
import net.corda.testing.node.internal.NodeBasedTest
@ -208,6 +209,26 @@ class AuthDBTests : NodeBasedTest(cordappPackages = CORDAPPS) {
proxy.stateMachinesFeed()
db.deleteUser("user4")
Thread.sleep(1500)
assertFailsWith(
PermissionException::class,
"This user should not be authorized to call 'stateMachinesFeed'") {
proxy.stateMachinesFeed()
}
}
}
@Test(timeout = 300_000)
fun `Revoke user permissions during RPC session - cache expiry`() {
db.insert(UserAndRoles(
username = "user5",
password = encodePassword("test"),
roles = listOf("default")))
client.start("user5", "test").use {
val proxy = it.proxy
proxy.stateMachinesFeed()
db.deleteUser("user5")
Thread.sleep(SECURITY_INVALIDATION_INTERVAL)
assertFailsWith(
RPCException::class,
"This user should not be authorized to call 'stateMachinesFeed'") {
@ -216,6 +237,51 @@ class AuthDBTests : NodeBasedTest(cordappPackages = CORDAPPS) {
}
}
@Test(timeout = 300_000)
fun `login after password change`() {
db.insert(UserAndRoles(
username = "user6",
password = encodePassword("bar"),
roles = emptyList()))
client.start("user6", "bar").close()
assertFailsWith(
ActiveMQSecurityException::class,
"Login with incorrect password should fail") {
client.start("user6", "foo").close()
}
db.deleteUser("user6")
db.insert(UserAndRoles(
username = "user6",
password = encodePassword("foo"),
roles = emptyList()))
client.start("user6", "foo").close()
}
@Test(timeout = 300_000)
fun `login with old password after password change`() {
db.insert(UserAndRoles(
username = "user7",
password = encodePassword("bar"),
roles = emptyList()))
client.start("user7", "bar").close()
assertFailsWith(
ActiveMQSecurityException::class,
"Login with incorrect password should fail") {
client.start("user7", "foo").close()
}
db.deleteUser("user7")
db.insert(UserAndRoles(
username = "user7",
password = encodePassword("foo"),
roles = emptyList()))
Thread.sleep(SECURITY_INVALIDATION_INTERVAL)
assertFailsWith(
ActiveMQSecurityException::class,
"Login with incorrect password should fail") {
client.start("user7", "bar").close()
}
}
@StartableByRPC
@InitiatingFlow
class DummyFlow : FlowLogic<Unit>() {

View File

@ -71,12 +71,12 @@ abstract class P2PMQSecurityTest : MQSecurityTest() {
@Test(timeout=300_000)
fun `consume message from RPC requests queue`() {
assertConsumeAttackFailsNonexistent(RPCApi.RPC_SERVER_QUEUE_NAME)
assertConsumeAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
}
@Test(timeout=300_000)
fun `consume message from logged in user's RPC queue`() {
val user1Queue = loginToRPCAndGetClientQueue()
assertConsumeAttackFailsNonexistent(user1Queue)
assertConsumeAttackFails(user1Queue)
}
}

View File

@ -2,7 +2,6 @@ package net.corda.services.messaging
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.toStringShort
import net.corda.core.utilities.toBase58String
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
@ -17,30 +16,30 @@ import org.junit.Test
abstract class RPCMQSecurityTest : MQSecurityTest() {
@Test(timeout=300_000)
fun `consume message from P2P queue`() {
assertConsumeAttackFailsNonexistent("$P2P_PREFIX${alice.info.singleIdentity().owningKey.toStringShort()}")
assertConsumeAttackFails("$P2P_PREFIX${alice.info.singleIdentity().owningKey.toStringShort()}")
}
@Test(timeout=300_000)
fun `consume message from peer queue`() {
val bobParty = startBobAndCommunicateWithAlice()
assertConsumeAttackFailsNonexistent("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
assertConsumeAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
}
@Test(timeout=300_000)
fun `send message to address of peer which has been communicated with`() {
val bobParty = startBobAndCommunicateWithAlice()
assertConsumeAttackFailsNonexistent("$PEERS_PREFIX${bobParty.owningKey.toBase58String()}")
assertSendAttackFails("$PEERS_PREFIX${bobParty.owningKey.toStringShort()}")
}
@Test(timeout=300_000)
fun `create queue for peer which has not been communicated with`() {
val bob = startNode(BOB_NAME)
assertConsumeAttackFailsNonexistent("$PEERS_PREFIX${bob.info.singleIdentity().owningKey.toBase58String()}")
assertConsumeAttackFailsNonexistent("$PEERS_PREFIX${bob.info.singleIdentity().owningKey.toStringShort()}")
}
@Test(timeout=300_000)
fun `create queue for unknown peer`() {
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toBase58String()}"
val invalidPeerQueue = "$PEERS_PREFIX${generateKeyPair().public.toStringShort()}"
assertConsumeAttackFailsNonexistent(invalidPeerQueue)
}

View File

@ -244,8 +244,11 @@ open class Node(configuration: NodeConfiguration,
)
}
override fun startMessagingService(rpcOps: List<RPCOps>, nodeInfo: NodeInfo, myNotaryIdentity: PartyAndCertificate?, networkParameters: NetworkParameters) {
require(nodeInfo.legalIdentities.size in 1..2) { "Currently nodes must have a primary address and optionally one serviced address" }
@Suppress("LongMethod", "ComplexMethod", "NestedBlockDepth")
override fun startMessagingService(rpcOps: List<RPCOps>, nodeInfo: NodeInfo, myNotaryIdentity: PartyAndCertificate?,
networkParameters: NetworkParameters) {
require(nodeInfo.legalIdentities.size in 1..2)
{ "Currently nodes must have a primary address and optionally one serviced address" }
network as P2PMessagingClient
@ -269,12 +272,20 @@ open class Node(configuration: NodeConfiguration,
val messageBroker = if (!configuration.messagingServerExternal) {
val brokerBindAddress = configuration.messagingServerAddress
?: NetworkHostAndPort("0.0.0.0", configuration.p2pAddress.port)
ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize, journalBufferTimeout)
if (configuration.rpcOptions.standAloneBroker) {
ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize, journalBufferTimeout)
} else {
ArtemisMessagingServer(configuration, brokerBindAddress, networkParameters.maxMessageSize, journalBufferTimeout,
threadPoolName = "P2PAndRPCServer",
rpcAddresses = BrokerAddresses(configuration.rpcOptions.address, configuration.rpcOptions.adminAddress),
rpcSecurityManager = securityManager,
rpcSslOptions = configuration.p2pSslOptions)
}
} else {
null
}
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker) {
val rpcServerAddresses = if (configuration.rpcOptions.standAloneBroker || messageBroker != null) {
BrokerAddresses(configuration.rpcOptions.address, configuration.rpcOptions.adminAddress)
} else {
startLocalRpcBroker(securityManager)
@ -316,6 +327,8 @@ open class Node(configuration: NodeConfiguration,
advertisedAddress = nodeInfo.addresses[0],
maxMessageSize = networkParameters.maxMessageSize
)
if (rpcBroker == null) rpcBroker = messageBroker
}
private fun makeBridgeControlListener(serverAddress: NetworkHostAndPort, networkParameters: NetworkParameters): BridgeControlListener {

View File

@ -2,6 +2,7 @@ package net.corda.node.internal.security
import net.corda.core.context.AuthServiceId
import javax.security.auth.login.FailedLoginException
import javax.security.auth.login.LoginException
/**
* Manage security of RPC users, providing logic for user authentication and authorization.
@ -33,7 +34,7 @@ fun RPCSecurityManager.tryAuthenticate(principal: String, password: Password): A
password.use {
return try {
authenticate(principal, password)
} catch (e: FailedLoginException) {
} catch (e: LoginException) {
null
}
}

View File

@ -10,6 +10,7 @@ import net.corda.node.services.config.AuthDataSourceType
import net.corda.node.services.config.PasswordEncryption
import net.corda.node.services.config.SecurityConfiguration
import net.corda.nodeapi.internal.config.User
import org.apache.activemq.artemis.spi.core.security.jaas.NoCacheLoginException
import org.apache.shiro.authc.AuthenticationException
import org.apache.shiro.authc.AuthenticationInfo
import org.apache.shiro.authc.AuthenticationToken
@ -27,7 +28,6 @@ import org.apache.shiro.realm.jdbc.JdbcRealm
import org.apache.shiro.subject.PrincipalCollection
import org.apache.shiro.subject.SimplePrincipalCollection
import java.util.concurrent.ConcurrentHashMap
import javax.security.auth.login.FailedLoginException
private typealias AuthServiceConfig = SecurityConfiguration.AuthService
@ -46,7 +46,7 @@ class RPCSecurityManagerImpl(config: AuthServiceConfig, cacheFactory: NamedCache
try {
manager.authenticate(authToken)
} catch (authcException: AuthenticationException) {
throw FailedLoginException(authcException.toString())
throw NoCacheLoginException(authcException.toString())
}
return ShiroAuthorizingSubject(
subjectId = SimplePrincipalCollection(principal, id.value),

View File

@ -10,14 +10,24 @@ import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.artemis.BrokerJaasLoginModule
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_P2P_ROLE
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_SECURITY_CONFIG
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.P2P_SECURITY_CONFIG
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.PEER_ROLE
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.RPC_SECURITY_CONFIG
import net.corda.node.internal.artemis.NodeJaasConfig
import net.corda.node.internal.artemis.P2PJaasConfig
import net.corda.node.internal.artemis.RPCJaasConfig
import net.corda.node.internal.artemis.SecureArtemisConfiguration
import net.corda.node.internal.artemis.UserValidationPlugin
import net.corda.node.internal.artemis.isBindingError
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.shell.INTERNAL_SHELL_USER
import net.corda.node.services.config.shouldStartLocalShell
import net.corda.node.services.rpc.RolesAdderOnLogin
import net.corda.node.services.rpc.RpcBrokerConfiguration.Companion.queueConfigurations
import net.corda.node.utilities.artemis.startSynchronously
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.internal.AmqpMessageSizeChecksInterceptor
import net.corda.nodeapi.internal.ArtemisMessageSizeChecksInterceptor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
@ -26,6 +36,9 @@ import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NOTIFICATI
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.SECURITY_INVALIDATION_INTERVAL
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.p2pAcceptorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcAcceptorTcpTransport
import net.corda.nodeapi.internal.ArtemisTcpTransport.Companion.rpcInternalAcceptorTcpTransport
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfig
import net.corda.nodeapi.internal.protonwrapper.netty.RevocationConfigImpl
import net.corda.nodeapi.internal.protonwrapper.netty.trustManagerFactoryWithRevocation
@ -34,8 +47,6 @@ import net.corda.nodeapi.internal.revocation.CertDistPointCrlSource
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
@ -61,14 +72,19 @@ import kotlin.io.path.div
* a fully connected network, trusted network or on localhost.
*/
@ThreadSafe
class ArtemisMessagingServer(private val config: NodeConfiguration,
private val messagingServerAddress: NetworkHostAndPort,
private val maxMessageSize: Int,
private val journalBufferTimeout : Int? = null,
private val threadPoolName: String = "P2PServer",
private val trace: Boolean = false,
private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON,
private val remotingThreads: Int? = null) : ArtemisBroker, SingletonSerializeAsToken() {
class ArtemisMessagingServer(
private val config: NodeConfiguration,
private val messagingServerAddress: NetworkHostAndPort,
private val maxMessageSize: Int,
private val journalBufferTimeout: Int? = null,
private val threadPoolName: String = "P2PServer",
private val trace: Boolean = false,
private val distPointCrlSource: CertDistPointCrlSource = CertDistPointCrlSource.SINGLETON,
private val remotingThreads: Int? = null,
private val rpcAddresses: BrokerAddresses? = null,
val rpcSecurityManager: RPCSecurityManager? = null,
val rpcSslOptions: MutualSslConfiguration? = null,
) : ArtemisBroker, SingletonSerializeAsToken() {
companion object {
private val log = contextLogger()
}
@ -94,6 +110,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
override fun stop() = mutex.locked {
activeMQServer.stop()
rpcSecurityManager?.close()
running = false
}
@ -102,10 +119,10 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
override val started: Boolean
get() = activeMQServer.isStarted
@Suppress("ThrowsCount")
@Suppress("ThrowsCount", "NestedBlockDepth")
private fun configureAndStartServer() {
val artemisConfig = createArtemisConfig()
val securityManager = createArtemisSecurityManager()
val securityManager = createArtemisSecurityManager(artemisConfig.loginListener)
activeMQServer = ActiveMQServerImpl(artemisConfig, securityManager).apply {
// Some types of queue might need special preparation on our side, like dialling back or preparing
// a lazily initialised subsystem.
@ -118,67 +135,104 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
} catch (e: Throwable) {
log.error("Unable to start message broker", e)
if (e.isBindingError()) {
if (e is IllegalStateException && e.message?.startsWith("AMQ229230:") ?: false) {
// java.lang.IllegalStateException: AMQ229230: Failed to bind acceptor 07340871-69cc-11ef-b363-a2d564e2b212 to 0.0.0.0:62165
val actualHostAndPort = NetworkHostAndPort.parse(e.message!!.substringAfterLast(' '))
if (actualHostAndPort.port != config.p2pAddress.port) throw AddressBindingException(actualHostAndPort)
}
throw AddressBindingException(config.p2pAddress)
} else {
log.error("Unexpected error starting message broker", e)
throw e
}
}
activeMQServer.remotingService.addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize))
activeMQServer.remotingService.addIncomingInterceptor(AmqpMessageSizeChecksInterceptor(maxMessageSize))
activeMQServer.remotingService.addIncomingInterceptor(ArtemisMessageSizeChecksInterceptor(maxMessageSize, config.p2pAddress.port))
activeMQServer.remotingService.addIncomingInterceptor(AmqpMessageSizeChecksInterceptor(maxMessageSize, config.p2pAddress.port))
// Config driven switch between legacy CORE bridges and the newer AMQP protocol bridges.
log.info("P2P messaging server listening on $messagingServerAddress")
}
private fun createArtemisConfig() = SecureArtemisConfiguration().apply {
name = "P2P"
internal abstract class P2PBrokerConfig : SecureArtemisConfiguration() {
abstract val loginListener: (String) -> Unit
}
val artemisDir = config.baseDirectory / "artemis"
bindingsDirectory = (artemisDir / "bindings").toString()
journalDirectory = (artemisDir / "journal").toString()
largeMessagesDirectory = (artemisDir / "large-messages").toString()
pagingDirectory = (artemisDir / "paging").toString()
// The transaction cache is configurable, and drives other cache sizes.
globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize)
@Suppress("ComplexMethod", "MagicNumber")
private fun createArtemisConfig() = object : P2PBrokerConfig() {
override val loginListener: (String) -> Unit
val revocationMode = if (config.crlCheckArtemisServer) {
if (config.crlCheckSoftFail) RevocationConfig.Mode.SOFT_FAIL else RevocationConfig.Mode.HARD_FAIL
} else {
RevocationConfig.Mode.OFF
init {
name = if (rpcAddresses != null) "P2PAndRPC" else "P2P"
val artemisDir = config.baseDirectory / "artemis"
bindingsDirectory = (artemisDir / "bindings").toString()
journalDirectory = (artemisDir / "journal").toString()
largeMessagesDirectory = (artemisDir / "large-messages").toString()
pagingDirectory = (artemisDir / "paging").toString()
// The transaction cache is configurable, and drives other cache sizes.
globalMaxSize = max(config.transactionCacheSizeBytes, 10L * maxMessageSize)
val revocationMode = if (config.crlCheckArtemisServer) {
if (config.crlCheckSoftFail) RevocationConfig.Mode.SOFT_FAIL else RevocationConfig.Mode.HARD_FAIL
} else {
RevocationConfig.Mode.OFF
}
val trustManagerFactory = trustManagerFactoryWithRevocation(
config.p2pSslOptions.trustStore.get(),
RevocationConfigImpl(revocationMode),
distPointCrlSource
)
addAcceptorConfiguration(p2pAcceptorTcpTransport(
NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port),
config.p2pSslOptions,
trustManagerFactory,
threadPoolName = threadPoolName,
trace = trace,
remotingThreads = remotingThreads
))
if (rpcAddresses != null) {
addAcceptorConfiguration(rpcAcceptorTcpTransport(rpcAddresses.primary, config.rpcOptions.sslConfig, enableSSL = config.rpcOptions.useSsl, threadPoolName = "RPCServer"))
if (rpcAddresses.admin != rpcAddresses.primary) {
addAcceptorConfiguration(rpcInternalAcceptorTcpTransport(rpcAddresses.admin, rpcSslOptions!!, threadPoolName = "RPCServerAdmin"))
}
queueConfigs = queueConfigurations()
}
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true
isPopulateValidatedUser = true
journalBufferSize_NIO = maxMessageSize + JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
journalBufferSize_AIO = maxMessageSize + JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
journalBufferTimeout_NIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio()
journalBufferTimeout_AIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutAio()
journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB.
managementNotificationAddress = SimpleString.of(NOTIFICATIONS_ADDRESS)
// JMX enablement
if (config.jmxMonitoringHttpPort != null) {
isJMXManagementEnabled = true
isJMXUseBrokerName = true
}
// Validate user in AMQP message header against authenticated session
registerBrokerPlugin(UserValidationPlugin())
if (rpcSecurityManager == null) {
loginListener = { }
} else {
val nodeInternalRole = Role(BrokerJaasLoginModule.NODE_RPC_ROLE, true, true, true, true, true, true, true, true, true, true, false, false)
val addRPCRoleToUsers = if (config.shouldStartLocalShell()) listOf(INTERNAL_SHELL_USER) else emptyList()
val rolesAdderOnLogin = RolesAdderOnLogin(addRPCRoleToUsers) { username ->
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#" to setOf(nodeInternalRole, restrictedRole(
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username",
consume = true,
createNonDurableQueue = true,
deleteNonDurableQueue = true)
)
}
securitySettingPlugins.add(rolesAdderOnLogin)
loginListener = { username: String -> rolesAdderOnLogin.onLogin(username) }
}
}
val trustManagerFactory = trustManagerFactoryWithRevocation(
config.p2pSslOptions.trustStore.get(),
RevocationConfigImpl(revocationMode),
distPointCrlSource
)
addAcceptorConfiguration(p2pAcceptorTcpTransport(
NetworkHostAndPort(messagingServerAddress.host, messagingServerAddress.port),
config.p2pSslOptions,
trustManagerFactory,
threadPoolName = threadPoolName,
trace = trace,
remotingThreads = remotingThreads
))
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true
isPopulateValidatedUser = true
journalBufferSize_NIO = maxMessageSize + JOURNAL_HEADER_SIZE // Artemis default is 490KiB - required to address IllegalArgumentException (when Artemis uses Java NIO): Record is too large to store.
journalBufferSize_AIO = maxMessageSize + JOURNAL_HEADER_SIZE // Required to address IllegalArgumentException (when Artemis uses Linux Async IO): Record is too large to store.
journalBufferTimeout_NIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutNio()
journalBufferTimeout_AIO = journalBufferTimeout ?: ActiveMQDefaultConfiguration.getDefaultJournalBufferTimeoutAio()
journalFileSize = maxMessageSize + JOURNAL_HEADER_SIZE// The size of each journal file in bytes. Artemis default is 10MiB.
managementNotificationAddress = SimpleString.of(NOTIFICATIONS_ADDRESS)
// JMX enablement
if (config.jmxMonitoringHttpPort != null) {
isJMXManagementEnabled = true
isJMXUseBrokerName = true
}
// Validate user in AMQP message header against authenticated session
registerBrokerPlugin(UserValidationPlugin())
}.configureAddressSecurity()
/**
@ -188,10 +242,16 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
* 3. RPC users. These are only given sufficient access to perform RPC with us.
* 4. Verifiers. These are given read access to the verification request queue and write access to the response queue.
*/
private fun ConfigurationImpl.configureAddressSecurity(): Configuration {
val nodeInternalRole = Role(NODE_P2P_ROLE, true, true, true, true, true, true, true, true, true, true, false, false)
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
securityRoles["$P2P_PREFIX#"] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
private fun P2PBrokerConfig.configureAddressSecurity(): P2PBrokerConfig {
val nodeInternalP2PRole = Role(NODE_P2P_ROLE, true, true, true, true, true, true, true, true, true, true, false, false)
securityRoles["$P2P_PREFIX#"] = setOf(nodeInternalP2PRole, restrictedRole(PEER_ROLE, send = true))
if (rpcAddresses != null) {
val nodeInternalRPCRole = Role(BrokerJaasLoginModule.NODE_RPC_ROLE, true, true, true, true, true, true, true, true, true, true, false, false)
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalP2PRole, nodeInternalRPCRole) // Do not add any other roles here as it's only for the node
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRPCRole, restrictedRole(BrokerJaasLoginModule.RPC_ROLE, send = true))
} else {
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalP2PRole) // Do not add any other roles here as it's only for the node
}
securityInvalidationInterval = SECURITY_INVALIDATION_INTERVAL
return this
}
@ -203,7 +263,7 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
deleteNonDurableQueue, manage, browse, createDurableQueue || createNonDurableQueue, deleteDurableQueue || deleteNonDurableQueue, false, false)
}
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
private fun createArtemisSecurityManager(loginListener: (String) -> Unit): ActiveMQJAASSecurityManager {
val keyStore = config.p2pSslOptions.keyStore.get().value.internal
val trustStore = config.p2pSslOptions.trustStore.get().value.internal
val revocationMode = when {
@ -215,13 +275,17 @@ class ArtemisMessagingServer(private val config: NodeConfiguration,
val securityConfig = object : SecurityConfiguration() {
// Override to make it work with our login module
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf(
BrokerJaasLoginModule.P2P_SECURITY_CONFIG to P2PJaasConfig(keyStore, trustStore, revocationMode),
BrokerJaasLoginModule.NODE_SECURITY_CONFIG to NodeJaasConfig(keyStore, trustStore)
val options = mutableMapOf(
P2P_SECURITY_CONFIG to P2PJaasConfig(keyStore, trustStore, revocationMode),
NODE_SECURITY_CONFIG to NodeJaasConfig(keyStore, trustStore)
)
if (rpcSecurityManager != null) {
options[RPC_SECURITY_CONFIG] = RPCJaasConfig(rpcSecurityManager, loginListener, config.rpcOptions.useSsl)
}
return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
}
}
return ActiveMQJAASSecurityManager(BrokerJaasLoginModule::class.java.name, securityConfig)
return InterceptingActiveMQJAASSecurityManager(BrokerJaasLoginModule::class.java.name, securityConfig, messagingServerAddress.port, rpcAddresses?.primary?.port
?: -1, rpcAddresses?.admin?.port ?: -1)
}
}

View File

@ -0,0 +1,52 @@
package net.corda.node.services.messaging
import net.corda.core.utilities.loggerFor
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_P2P_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.NODE_RPC_USER
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import javax.security.auth.Subject
class InterceptingActiveMQJAASSecurityManager(configurationName: String, configuration: SecurityConfiguration, private val p2pPort: Int, private val rpcPort: Int, private val rpcAdminPort: Int) : ActiveMQJAASSecurityManager(configurationName, configuration) {
companion object {
private val log = loggerFor<InterceptingActiveMQJAASSecurityManager>()
enum class BrokerType {
RPC, P2P
}
fun RemotingConnection?.categorise(p2pPort: String, rpcPort: String, rpcAdminPort: String): BrokerType {
return if (this?.transportLocalAddress?.endsWith(":$p2pPort") ?: false) {
BrokerType.P2P
} else if (this?.transportLocalAddress?.endsWith(":$rpcPort") ?: false) {
BrokerType.RPC
} else if (this?.transportLocalAddress?.endsWith(":$rpcAdminPort") ?: false) {
BrokerType.RPC
} else throw IllegalStateException("Neither RPC port ($rpcPort), RPC admin port ($rpcAdminPort), nor P2P port ($p2pPort) for local connection ${this?.transportLocalAddress}")
}
fun categoriseUser(user: String?): BrokerType {
return if (user == null) {
BrokerType.RPC
} else if (user == PEER_USER || user == NODE_P2P_USER) {
BrokerType.P2P
} else if (user == NODE_RPC_USER) {
BrokerType.RPC
} else {
BrokerType.RPC
}
}
}
override fun authenticate(user: String?, password: String?, remotingConnection: RemotingConnection?, securityDomain: String?): Subject? {
val userCategory = categoriseUser(user)
val connectionCategory = remotingConnection.categorise(p2pPort.toString(), rpcPort.toString(), rpcAdminPort.toString())
if (userCategory != connectionCategory) {
log.warn("Authenticate attempt user=$user, remotingConnection=$remotingConnection, securityDomain=$securityDomain connectionCategory=$connectionCategory userCategory=$userCategory")
return null
}
return super.authenticate(user, password, remotingConnection, securityDomain)
}
}

View File

@ -2,11 +2,17 @@ package net.corda.node.services.rpc
import net.corda.core.internal.errors.AddressBindingException
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.loggerFor
import net.corda.core.utilities.debug
import net.corda.node.internal.artemis.*
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.artemis.ArtemisBroker
import net.corda.node.internal.artemis.BrokerAddresses
import net.corda.node.internal.artemis.BrokerJaasLoginModule
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.NODE_SECURITY_CONFIG
import net.corda.node.internal.artemis.BrokerJaasLoginModule.Companion.RPC_SECURITY_CONFIG
import net.corda.node.internal.artemis.NodeJaasConfig
import net.corda.node.internal.artemis.RPCJaasConfig
import net.corda.node.internal.artemis.isBindingError
import net.corda.node.services.messaging.InterceptingActiveMQJAASSecurityManager
import net.corda.node.internal.security.RPCSecurityManager
import net.corda.node.utilities.artemis.startSynchronously
import net.corda.nodeapi.BrokerRpcSslOptions
@ -22,7 +28,7 @@ import java.security.KeyStoreException
import javax.security.auth.login.AppConfigurationEntry
class ArtemisRpcBroker internal constructor(
address: NetworkHostAndPort,
private val address: NetworkHostAndPort,
private val adminAddressOptional: NetworkHostAndPort?,
private val sslOptions: BrokerRpcSslOptions?,
private val useSsl: Boolean,
@ -32,7 +38,8 @@ class ArtemisRpcBroker internal constructor(
private val jmxEnabled: Boolean = false,
private val baseDirectory: Path,
private val nodeConfiguration: MutualSslConfiguration,
private val shouldStartLocalShell: Boolean) : ArtemisBroker {
private val shouldStartLocalShell: Boolean,
) : ArtemisBroker {
companion object {
private val logger = loggerFor<ArtemisRpcBroker>()
@ -107,7 +114,8 @@ class ArtemisRpcBroker internal constructor(
return arrayOf(AppConfigurationEntry(name, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options))
}
}
return ActiveMQJAASSecurityManager(BrokerJaasLoginModule::class.java.name, securityConfig)
return InterceptingActiveMQJAASSecurityManager(BrokerJaasLoginModule::class.java.name, securityConfig, -1, address.port, adminAddressOptional?.port
?: -1)
}
}

View File

@ -24,6 +24,30 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
useSsl: Boolean, nodeConfiguration: MutualSslConfiguration, shouldStartLocalShell: Boolean) : SecureArtemisConfiguration() {
val loginListener: (String) -> Unit
companion object {
fun queueConfigurations(): List<QueueConfiguration> {
return listOf(
queueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME, durable = false),
queueConfiguration(
name = RPCApi.RPC_CLIENT_BINDING_REMOVALS,
address = ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS,
filter = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION,
durable = false
),
queueConfiguration(
name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS,
address = ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS,
filter = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION,
durable = false
)
)
}
private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): QueueConfiguration {
return QueueConfiguration.of(name).setAddress(address).setFilterString(filter).setDurable(durable)
}
}
init {
name = "RPC"
@ -101,24 +125,6 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
journalFileSize = maxMessageSize // The size of each journal file in bytes. Artemis default is 10 MB.
}
private fun queueConfigurations(): List<QueueConfiguration> {
return listOf(
queueConfiguration(RPCApi.RPC_SERVER_QUEUE_NAME, durable = false),
queueConfiguration(
name = RPCApi.RPC_CLIENT_BINDING_REMOVALS,
address = ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS,
filter = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION,
durable = false
),
queueConfiguration(
name = RPCApi.RPC_CLIENT_BINDING_ADDITIONS,
address = ArtemisMessagingComponent.NOTIFICATIONS_ADDRESS,
filter = RPCApi.RPC_CLIENT_BINDING_ADDITION_FILTER_EXPRESSION,
durable = false
)
)
}
private fun setDirectories(baseDirectory: Path) {
bindingsDirectory = (baseDirectory / "bindings").toString()
journalDirectory = (baseDirectory / "journal").toString()
@ -126,10 +132,6 @@ internal class RpcBrokerConfiguration(baseDirectory: Path, maxMessageSize: Int,
pagingDirectory = (baseDirectory / "paging").toString()
}
private fun queueConfiguration(name: String, address: String = name, filter: String? = null, durable: Boolean): QueueConfiguration {
return QueueConfiguration.of(name).setAddress(address).setFilterString(filter).setDurable(durable)
}
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false,
deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role {

View File

@ -13,9 +13,9 @@ import net.corda.node.services.config.SecurityConfiguration
import net.corda.nodeapi.internal.config.User
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.fromUserList
import org.apache.activemq.artemis.spi.core.security.jaas.NoCacheLoginException
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Test
import javax.security.auth.login.FailedLoginException
import kotlin.reflect.KFunction
import kotlin.test.assertFails
import kotlin.test.assertFailsWith
@ -106,7 +106,7 @@ class RPCSecurityManagerTest {
users = listOf(User("user", "xxxx", emptySet())),
id = AuthServiceId("TEST"))
userRealm.authenticate("user", Password("xxxx"))
assertFailsWith(FailedLoginException::class, "Login with wrong password should fail") {
assertFailsWith(NoCacheLoginException::class, "Login with wrong password should fail") {
userRealm.authenticate("foo", Password("xxxx"))
}
assertNull(userRealm.tryAuthenticate("foo", Password("wrong")),
@ -119,7 +119,7 @@ class RPCSecurityManagerTest {
users = listOf(User("user", "password", emptySet())),
id = AuthServiceId("TEST"))
userRealm.authenticate("user", Password("password"))
assertFailsWith(FailedLoginException::class, "Login with wrong password should fail") {
assertFailsWith(NoCacheLoginException::class, "Login with wrong password should fail") {
userRealm.authenticate("user", Password("wrong"))
}
assertNull(userRealm.tryAuthenticate("user", Password("wrong")),