Revert "Non-ssl artemis acceptor for RPC connection. (#271)"

This reverts commit f0d82e4918.
This commit is contained in:
Ross Nicoll
2017-03-03 17:43:13 +00:00
parent 44dbba90c9
commit b5ba070301
25 changed files with 180 additions and 170 deletions

View File

@ -6,7 +6,6 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC
import net.corda.testing.messaging.SimpleMQClient
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Test
@ -15,9 +14,6 @@ import org.junit.Test
* Runs the security tests with the attacker pretending to be a node on the network.
*/
class MQSecurityAsNodeTest : MQSecurityTest() {
override fun createAttacker(): SimpleMQClient {
return clientTo(alice.configuration.artemisAddress)
}
override fun startAttacker(attacker: SimpleMQClient) {
attacker.start(PEER_USER, PEER_USER) // Login as a peer
@ -51,20 +47,4 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
attacker.start()
}
}
@Test
fun `login to a non ssl port as a node user`() {
val attacker = clientTo(alice.configuration.rpcAddress!!, sslConfiguration = null)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start(NODE_USER, NODE_USER)
}
}
@Test
fun `login to a non ssl port as a peer user`() {
val attacker = clientTo(alice.configuration.rpcAddress!!, sslConfiguration = null)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.start(PEER_USER, PEER_USER) // Login as a peer
}
}
}

View File

@ -2,35 +2,14 @@ package net.corda.services.messaging
import net.corda.node.services.User
import net.corda.testing.messaging.SimpleMQClient
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Test
/**
* Runs the security tests with the attacker being a valid RPC user of Alice.
*/
class MQSecurityAsRPCTest : MQSecurityTest() {
override fun createAttacker(): SimpleMQClient {
return clientTo(alice.configuration.rpcAddress!!)
}
@Test
fun `send message on logged in user's RPC address`() {
val user1Queue = loginToRPCAndGetClientQueue()
assertSendAttackFails(user1Queue)
}
override val extraRPCUsers = listOf(User("evil", "pass", permissions = emptySet()))
override fun startAttacker(attacker: SimpleMQClient) {
attacker.loginToRPC(extraRPCUsers[0])
}
@Test
fun `login to a ssl port as a RPC user`() {
val attacker = clientTo(alice.configuration.artemisAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.loginToRPC(extraRPCUsers[0], enableSSL = true)
}
}
}
}

View File

@ -14,6 +14,7 @@ import net.corda.core.utilities.unwrap
import net.corda.node.internal.Node
import net.corda.node.services.User
import net.corda.node.services.config.SSLConfiguration
import net.corda.node.services.config.configureTestSSL
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE
@ -23,7 +24,6 @@ import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.PEE
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.node.services.messaging.CordaRPCClientImpl
import net.corda.testing.configureTestSSL
import net.corda.testing.messaging.SimpleMQClient
import net.corda.testing.node.NodeBasedTest
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException
@ -49,14 +49,12 @@ abstract class MQSecurityTest : NodeBasedTest() {
@Before
fun start() {
alice = startNode("Alice", rpcUsers = extraRPCUsers + rpcUser).getOrThrow()
attacker = createAttacker()
attacker = clientTo(alice.configuration.artemisAddress)
startAttacker(attacker)
}
open val extraRPCUsers: List<User> get() = emptyList()
abstract fun createAttacker(): SimpleMQClient
abstract fun startAttacker(attacker: SimpleMQClient)
@After
@ -114,6 +112,12 @@ abstract class MQSecurityTest : NodeBasedTest() {
assertConsumeAttackFails(user1Queue)
}
@Test
fun `send message on logged in user's RPC address`() {
val user1Queue = loginToRPCAndGetClientQueue()
assertSendAttackFails(user1Queue)
}
@Test
fun `create queue for valid RPC user`() {
val user1Queue = "$CLIENTS_PREFIX${rpcUser.username}.rpc.${random63BitValue()}"
@ -148,26 +152,26 @@ abstract class MQSecurityTest : NodeBasedTest() {
assertAllQueueCreationAttacksFail(randomQueue)
}
fun clientTo(target: HostAndPort, sslConfiguration: SSLConfiguration? = configureTestSSL()): SimpleMQClient {
val client = SimpleMQClient(target, sslConfiguration)
fun clientTo(target: HostAndPort, config: SSLConfiguration = configureTestSSL()): SimpleMQClient {
val client = SimpleMQClient(target, config)
clients += client
return client
}
fun loginToRPC(target: HostAndPort, rpcUser: User): SimpleMQClient {
val client = clientTo(target, null)
val client = clientTo(target)
client.loginToRPC(rpcUser)
return client
}
fun SimpleMQClient.loginToRPC(rpcUser: User, enableSSL: Boolean = false): CordaRPCOps {
start(rpcUser.username, rpcUser.password, enableSSL)
fun SimpleMQClient.loginToRPC(rpcUser: User): CordaRPCOps {
start(rpcUser.username, rpcUser.password)
val clientImpl = CordaRPCClientImpl(session, ReentrantLock(), rpcUser.username)
return clientImpl.proxyFor(CordaRPCOps::class.java, timeout = 1.seconds)
}
fun loginToRPCAndGetClientQueue(): String {
val rpcClient = loginToRPC(alice.configuration.rpcAddress!!, rpcUser)
val rpcClient = loginToRPC(alice.configuration.artemisAddress, rpcUser)
val clientQueueQuery = SimpleString("$CLIENTS_PREFIX${rpcUser.username}.rpc.*")
return rpcClient.session.addressQuery(clientQueueQuery).queueNames.single().toString()
}

View File

@ -105,7 +105,7 @@ data class NodeHandle(
val configuration: FullNodeConfiguration,
val process: Process
) {
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.rpcAddress!!)
fun rpcClientToNode(): CordaRPCClient = CordaRPCClient(configuration.artemisAddress, configuration)
}
sealed class PortAllocation {
@ -343,8 +343,7 @@ open class DriverDSL(
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>,
rpcUsers: List<User>, customOverrides: Map<String, Any?>): ListenableFuture<NodeHandle> {
val messagingAddress = portAllocation.nextHostAndPort()
val rpcAddress = portAllocation.nextHostAndPort()
val webAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val name = providedName ?: "${pickA(name)}-${messagingAddress.port}"
@ -352,8 +351,7 @@ open class DriverDSL(
val configOverrides = mapOf(
"myLegalName" to name,
"artemisAddress" to messagingAddress.toString(),
"rpcAddress" to rpcAddress.toString(),
"webAddress" to webAddress.toString(),
"webAddress" to apiAddress.toString(),
"extraAdvertisedServiceIds" to advertisedServices.map { it.toString() },
"networkMapService" to mapOf(
"address" to networkMapAddress.toString(),

View File

@ -22,7 +22,11 @@ import net.corda.node.services.config.FullNodeConfiguration
import net.corda.node.services.messaging.ArtemisMessagingComponent.NetworkMapAddress
import net.corda.node.services.messaging.ArtemisMessagingServer
import net.corda.node.services.messaging.NodeMessagingClient
import net.corda.node.services.transactions.*
import net.corda.node.services.transactions.PersistentUniquenessProvider
import net.corda.node.services.transactions.RaftUniquenessProvider
import net.corda.node.services.transactions.RaftValidatingNotaryService
import net.corda.node.services.transactions.BFTSmartUniquenessProvider
import net.corda.node.services.transactions.BFTValidatingNotaryService
import net.corda.node.utilities.AffinityExecutor
import net.corda.node.utilities.databaseTransaction
import org.jetbrains.exposed.sql.Database
@ -125,7 +129,7 @@ class Node(override val configuration: FullNodeConfiguration,
val serverAddress = with(configuration) {
messagingServerAddress ?: {
messageBroker = ArtemisMessagingServer(this, artemisAddress, rpcAddress, services.networkMapCache, userService)
messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService)
artemisAddress
}()
}

View File

@ -15,6 +15,7 @@ import net.corda.core.div
import net.corda.core.exists
import net.corda.core.utilities.loggerFor
import java.net.URL
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Instant
@ -48,9 +49,6 @@ object ConfigHelper {
@Suppress("UNCHECKED_CAST")
operator fun <T> Config.getValue(receiver: Any, metadata: KProperty<*>): T {
if (metadata.returnType.isMarkedNullable && !hasPath(metadata.name)) {
return null as T
}
return when (metadata.returnType.javaType) {
String::class.java -> getString(metadata.name) as T
Int::class.java -> getInt(metadata.name) as T
@ -102,7 +100,7 @@ inline fun <reified T : Any> Config.getListOrElse(path: String, default: Config.
*/
fun NodeConfiguration.configureWithDevSSLCertificate() = configureDevKeyAndTrustStores(myLegalName)
fun SSLConfiguration.configureDevKeyAndTrustStores(myLegalName: String) {
private fun SSLConfiguration.configureDevKeyAndTrustStores(myLegalName: String) {
certificatesDirectory.createDirectories()
if (!trustStoreFile.exists()) {
javaClass.classLoader.getResourceAsStream("net/corda/node/internal/certificates/cordatruststore.jks").copyTo(trustStoreFile)
@ -114,3 +112,15 @@ fun SSLConfiguration.configureDevKeyAndTrustStores(myLegalName: String) {
X509Utilities.createKeystoreForSSL(keyStoreFile, keyStorePassword, keyStorePassword, caKeyStore, "cordacadevkeypass", myLegalName)
}
}
// TODO Move this to CoreTestUtils.kt once we can pry this from the explorer
@JvmOverloads
fun configureTestSSL(legalName: String = "Mega Corp."): SSLConfiguration = object : SSLConfiguration {
override val certificatesDirectory = Files.createTempDirectory("certs")
override val keyStorePassword: String get() = "cordacadevpass"
override val trustStorePassword: String get() = "trustpass"
init {
configureDevKeyAndTrustStores(legalName)
}
}

View File

@ -65,14 +65,13 @@ class FullNodeConfiguration(override val baseDirectory: Path, val config: Config
}
val useHTTPS: Boolean by config
val artemisAddress: HostAndPort by config
val rpcAddress: HostAndPort? by config
val webAddress: HostAndPort by config
// TODO This field is slightly redundant as artemisAddress is sufficient to hold the address of the node's MQ broker.
// Instead this should be a Boolean indicating whether that broker is an internal one started by the node or an external one
val messagingServerAddress: HostAndPort? by config
val messagingServerAddress: HostAndPort? by config.getOrElse { null }
val extraAdvertisedServiceIds: List<String> = config.getListOrElse<String>("extraAdvertisedServiceIds") { emptyList() }
val useTestClock: Boolean by config.getOrElse { false }
val notaryNodeAddress: HostAndPort? by config
val notaryNodeAddress: HostAndPort? by config.getOrElse { null }
val notaryClusterAddresses: List<HostAndPort> = config
.getListOrElse<String>("notaryClusterAddresses") { emptyList() }
.map { HostAndPort.fromString(it) }

View File

@ -22,7 +22,7 @@ import java.security.KeyStore
/**
* The base class for Artemis services that defines shared data structures and SSL transport configuration.
*/
abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
companion object {
init {
System.setProperty("org.jboss.logging.provider", "slf4j")
@ -88,7 +88,6 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
fun asPeer(peerIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
return NodeAddress("$PEERS_PREFIX${peerIdentity.toBase58String()}", hostAndPort)
}
fun asService(serviceIdentity: CompositeKey, hostAndPort: HostAndPort): NodeAddress {
return NodeAddress("$SERVICES_PREFIX${serviceIdentity.toBase58String()}", hostAndPort)
}
@ -138,7 +137,7 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
}
}
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int, enableSSL: Boolean = true): TransportConfiguration {
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int): TransportConfiguration {
val config = config
val options = mutableMapOf<String, Any?>(
// Basic TCP target details
@ -152,7 +151,7 @@ abstract class ArtemisMessagingComponent : SingletonSerializeAsToken() {
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP"
)
if (config != null && enableSSL) {
if (config != null) {
config.keyStoreFile.expectedOnDefaultFileSystem()
config.trustStoreFile.expectedOnDefaultFileSystem()
val tlsOptions = mapOf<String, Any?>(

View File

@ -81,8 +81,7 @@ import javax.security.cert.X509Certificate
*/
@ThreadSafe
class ArtemisMessagingServer(override val config: NodeConfiguration,
val artemisHostPort: HostAndPort,
val rpcHostPort: HostAndPort?,
val myHostPort: HostAndPort,
val networkMapCache: NetworkMapCache,
val userService: RPCUserService) : ArtemisMessagingComponent() {
companion object {
@ -140,10 +139,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
registerPostQueueDeletionCallback { address, qName -> log.debug { "Queue deleted: $qName for $address" } }
}
activeMQServer.start()
printBasicNodeInfo("Node listening on address", artemisHostPort.toString())
if (rpcHostPort != null) {
printBasicNodeInfo("Node RPC service listening on address", rpcHostPort.toString())
}
printBasicNodeInfo("Node listening on address", myHostPort.toString())
}
private fun createArtemisConfig(): Configuration = ConfigurationImpl().apply {
@ -151,11 +147,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
bindingsDirectory = (artemisDir / "bindings").toString()
journalDirectory = (artemisDir / "journal").toString()
largeMessagesDirectory = (artemisDir / "large-messages").toString()
val acceptors = mutableSetOf(tcpTransport(Inbound, "0.0.0.0", artemisHostPort.port))
if (rpcHostPort != null) {
acceptors.add(tcpTransport(Inbound, "0.0.0.0", rpcHostPort.port, enableSSL = false))
}
acceptorConfigurations = acceptors
acceptorConfigurations = setOf(tcpTransport(Inbound, "0.0.0.0", myHostPort.port))
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
@ -168,15 +160,15 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
// by having its password be an unknown securely random 128-bit value.
clusterPassword = BigInteger(128, newSecureRandom()).toString(16)
queueConfigurations = listOf(
queueConfig(NETWORK_MAP_QUEUE, durable = true),
queueConfig(P2P_QUEUE, durable = true),
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
// but these queues are not worth persisting.
queueConfig(RPC_REQUESTS_QUEUE, durable = false),
// The custom name for the queue is intentional - we may wish other things to subscribe to the
// NOTIFICATIONS_ADDRESS with different filters in future
queueConfig(RPC_QUEUE_REMOVALS_QUEUE, address = NOTIFICATIONS_ADDRESS, filter = "_AMQ_NotifType = 1", durable = false)
queueConfig(NETWORK_MAP_QUEUE, durable = true),
queueConfig(P2P_QUEUE, durable = true),
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
// but these queues are not worth persisting.
queueConfig(RPC_REQUESTS_QUEUE, durable = false),
// The custom name for the queue is intentional - we may wish other things to subscribe to the
// NOTIFICATIONS_ADDRESS with different filters in future
queueConfig(RPC_QUEUE_REMOVALS_QUEUE, address = NOTIFICATIONS_ADDRESS, filter = "_AMQ_NotifType = 1", durable = false)
)
configureAddressSecurity()
}
@ -298,8 +290,8 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
fun deployBridges(node: NodeInfo) {
gatherAddresses(node)
.filter { queueExists(it.queueName) && !bridgeExists(it.bridgeName) }
.forEach { deployBridge(it, node.legalIdentity.name) }
.filter { queueExists(it.queueName) && !bridgeExists(it.bridgeName) }
.forEach { deployBridge(it, node.legalIdentity.name) }
}
fun destroyBridges(node: NodeInfo) {
@ -405,7 +397,8 @@ private class VerifyingNettyConnector(configuration: MutableMap<String, Any>?,
threadPool: Executor?,
scheduledThreadPool: ScheduledExecutorService?,
protocolManager: ClientProtocolManager?) :
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager) {
NettyConnector(configuration, handler, listener, closeExecutor, threadPool, scheduledThreadPool, protocolManager)
{
private val server = configuration?.get(ArtemisMessagingServer::class.java.name) as? ArtemisMessagingServer
private val expectedCommonName = configuration?.get(ArtemisMessagingComponent.VERIFY_PEER_COMMON_NAME) as? String
@ -487,15 +480,15 @@ class NodeLoginModule : LoginModule {
val username = nameCallback.name ?: throw FailedLoginException("Username not provided")
val password = String(passwordCallback.password ?: throw FailedLoginException("Password not provided"))
val certificates = certificateCallback.certificates
log.info("Processing login for $username")
val validatedUser = when (determineUserRole(certificates, username)) {
PEER_ROLE -> authenticatePeer(certificates)
NODE_ROLE -> authenticateNode(certificates)
RPC_ROLE -> authenticateRpcUser(password, username)
else -> throw FailedLoginException("Peer does not belong on our network")
val validatedUser = if (username == PEER_USER || username == NODE_USER) {
val certificates = certificateCallback.certificates ?: throw FailedLoginException("No TLS?")
authenticateNode(certificates, username)
} else {
// Otherwise assume they're an RPC user
authenticateRpcUser(password, username)
}
principals += UserPrincipal(validatedUser)
@ -503,24 +496,24 @@ class NodeLoginModule : LoginModule {
return loginSucceeded
}
private fun authenticateNode(certificates: Array<X509Certificate>): String {
private fun authenticateNode(certificates: Array<X509Certificate>, username: String): String {
val peerCertificate = certificates.first()
if (peerCertificate.publicKey != ourPublicKey) {
throw FailedLoginException("Only the node can login as $NODE_USER")
val role = if (username == NODE_USER) {
if (peerCertificate.publicKey != ourPublicKey) {
throw FailedLoginException("Only the node can login as $NODE_USER")
}
NODE_ROLE
} else {
val theirRootCAPublicKey = certificates.last().publicKey
if (theirRootCAPublicKey != ourRootCAPublicKey) {
throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey")
}
PEER_ROLE // This enables the peer to send to our P2P address
}
principals += RolePrincipal(NODE_ROLE)
principals += RolePrincipal(role)
return peerCertificate.subjectDN.name
}
private fun authenticatePeer(certificates: Array<X509Certificate>): String {
val theirRootCAPublicKey = certificates.last().publicKey
if (theirRootCAPublicKey != ourRootCAPublicKey) {
throw FailedLoginException("Peer does not belong on our network. Their root CA: $theirRootCAPublicKey")
}
principals += RolePrincipal(PEER_ROLE)
return certificates.first().subjectDN.name
}
private fun authenticateRpcUser(password: String, username: String): String {
val rpcUser = userService.getUser(username) ?: throw FailedLoginException("User does not exist")
if (password != rpcUser.password) {
@ -533,18 +526,6 @@ class NodeLoginModule : LoginModule {
return username
}
private fun determineUserRole(certificates: Array<X509Certificate>?, username: String): String? {
return if (username == PEER_USER || username == NODE_USER) {
certificates ?: throw FailedLoginException("No TLS?")
if (username == PEER_USER) PEER_ROLE else NODE_ROLE
} else if (certificates == null) {
// Assume they're an RPC user if its from a non-ssl connection
RPC_ROLE
} else {
null
}
}
override fun commit(): Boolean {
val result = loginSucceeded
if (result) {

View File

@ -24,10 +24,10 @@ import javax.annotation.concurrent.ThreadSafe
* useful tasks. See the documentation for [proxy] or review the docsite to learn more about how this API works.
*
* @param host The hostname and messaging port of the node.
* @param config If specified, the SSL configuration to use. If not specified, SSL will be disabled and the node will only be authenticated on non-SSL RPC port, the RPC traffic with not be encrypted when SSL is disabled.
* @param config If specified, the SSL configuration to use. If not specified, SSL will be disabled and the node will not be authenticated, nor will RPC traffic be encrypted.
*/
@ThreadSafe
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration? = null, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() {
class CordaRPCClient(val host: HostAndPort, override val config: SSLConfiguration?, val serviceConfigurationOverride: (ServerLocator.() -> Unit)? = null) : Closeable, ArtemisMessagingComponent() {
private companion object {
val log = loggerFor<CordaRPCClient>()
}

View File

@ -47,7 +47,6 @@ class ArtemisMessagingTests {
@Rule @JvmField val temporaryFolder = TemporaryFolder()
val hostAndPort = freeLocalHostAndPort()
val rpcHostAndPort = freeLocalHostAndPort()
val topic = "platform.self"
val identity = generateKeyPair()
@ -231,8 +230,8 @@ class ArtemisMessagingTests {
}
}
private fun createMessagingServer(local: HostAndPort = hostAndPort, rpc: HostAndPort = rpcHostAndPort): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, rpc, networkMapCache, userService).apply {
private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, networkMapCache, userService).apply {
config.configureWithDevSSLCertificate()
messagingServer = this
}