RPC client authentication using user/password from config file

This commit is contained in:
Shams Asari 2016-10-20 17:55:16 +01:00
parent 60c1dcdbde
commit e2d6ace449
12 changed files with 324 additions and 158 deletions

View File

@ -0,0 +1,62 @@
package com.r3corda.client
import com.r3corda.core.random63BitValue
import com.r3corda.node.driver.driver
import com.r3corda.node.services.config.configureTestSSL
import com.r3corda.node.services.messaging.ArtemisMessagingComponent.Companion.toHostAndPort
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
class CordaRPCClientTest {
private val validUsername = "user1"
private val validPassword = "test"
private val stopDriver = CountDownLatch(1)
private var driverThread: Thread? = null
private lateinit var client: CordaRPCClient
@Before
fun start() {
val driverStarted = CountDownLatch(1)
driverThread = thread {
driver {
val nodeInfo = startNode().get()
client = CordaRPCClient(toHostAndPort(nodeInfo.address), configureTestSSL())
driverStarted.countDown()
stopDriver.await()
}
}
driverStarted.await()
}
@After
fun stop() {
stopDriver.countDown()
driverThread?.join()
}
@Test
fun `log in with valid username and password`() {
client.start(validUsername, validPassword)
}
@Test
fun `log in with unknown user`() {
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
client.start(random63BitValue().toString(), validPassword)
}
}
@Test
fun `log in with incorrect password`() {
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
client.start(validUsername, random63BitValue().toString())
}
}
}

View File

@ -1,6 +1,5 @@
package com.r3corda.client
import com.google.common.util.concurrent.SettableFuture
import com.r3corda.client.model.NodeMonitorModel
import com.r3corda.client.model.ProgressTrackingEvent
import com.r3corda.core.bufferUntilSubscribed
@ -14,9 +13,7 @@ import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.driver.driver
import com.r3corda.node.services.config.NodeSSLConfiguration
import com.r3corda.node.services.config.configureWithDevSSLCertificate
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.config.configureTestSSL
import com.r3corda.node.services.messaging.StateMachineUpdate
import com.r3corda.node.services.transactions.SimpleNotaryService
import com.r3corda.testing.expect
@ -27,18 +24,15 @@ import org.junit.Before
import org.junit.Test
import rx.Observable
import rx.Observer
import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.CountDownLatch
import kotlin.concurrent.thread
class NodeMonitorModelTest {
lateinit var aliceNode: NodeInfo
lateinit var notaryNode: NodeInfo
lateinit var aliceClient: NodeMessagingClient
val driverStarted = SettableFuture.create<Unit>()
val stopDriver = SettableFuture.create<Unit>()
val driverStopped = SettableFuture.create<Unit>()
val stopDriver = CountDownLatch(1)
var driverThread: Thread? = null
lateinit var stateMachineTransactionMapping: Observable<StateMachineTransactionMapping>
lateinit var stateMachineUpdates: Observable<StateMachineUpdate>
@ -51,7 +45,8 @@ class NodeMonitorModelTest {
@Before
fun start() {
thread {
val driverStarted = CountDownLatch(1)
driverThread = thread {
driver {
val aliceNodeFuture = startNode("Alice")
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
@ -61,16 +56,6 @@ class NodeMonitorModelTest {
newNode = { nodeName -> startNode(nodeName).get() }
val monitor = NodeMonitorModel()
val sslConfig = object : NodeSSLConfiguration {
override val certificatesPath: Path = Files.createTempDirectory("certs")
override val keyStorePassword = "cordacadevpass"
override val trustStorePassword = "trustpass"
init {
configureWithDevSSLCertificate()
}
}
stateMachineTransactionMapping = monitor.stateMachineTransactionMapping.bufferUntilSubscribed()
stateMachineUpdates = monitor.stateMachineUpdates.bufferUntilSubscribed()
progressTracking = monitor.progressTracking.bufferUntilSubscribed()
@ -79,20 +64,18 @@ class NodeMonitorModelTest {
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
clientToService = monitor.clientToService
monitor.register(aliceNode, sslConfig.certificatesPath)
driverStarted.set(Unit)
stopDriver.get()
monitor.register(aliceNode, configureTestSSL(), "user1", "test")
driverStarted.countDown()
stopDriver.await()
}
driverStopped.set(Unit)
}
driverStarted.get()
driverStarted.await()
}
@After
fun stop() {
stopDriver.set(Unit)
driverStopped.get()
stopDriver.countDown()
driverThread?.join()
}
@Test

View File

@ -24,19 +24,12 @@ import kotlin.concurrent.thread
* useful tasks. See the documentation for [proxy] or review the docsite to learn more about how this API works.
*/
@ThreadSafe
class CordaRPCClient(val host: HostAndPort, certificatesPath: Path) : Closeable, ArtemisMessagingComponent(sslConfig(certificatesPath)) {
class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfiguration) : Closeable, ArtemisMessagingComponent() {
companion object {
private val rpcLog = LoggerFactory.getLogger("com.r3corda.rpc")
private fun sslConfig(certificatesPath: Path): NodeSSLConfiguration = object : NodeSSLConfiguration {
override val certificatesPath: Path = certificatesPath
override val keyStorePassword = "cordacadevpass"
override val trustStorePassword = "trustpass"
}
}
// TODO: Certificate handling for clients needs more work.
private inner class State {
var running = false
lateinit var sessionFactory: ClientSessionFactory
@ -57,7 +50,7 @@ class CordaRPCClient(val host: HostAndPort, certificatesPath: Path) : Closeable,
/** Opens the connection to the server and registers a JVM shutdown hook to cleanly disconnect. */
@Throws(ActiveMQNotConnectedException::class)
fun start() {
fun start(username: String, password: String) {
state.locked {
check(!running)
checkStorePasswords() // Check the password.
@ -66,7 +59,7 @@ class CordaRPCClient(val host: HostAndPort, certificatesPath: Path) : Closeable,
sessionFactory = serverLocator.createSessionFactory()
// We use our initial connection ID as the queue namespace.
myID = sessionFactory.connection.id as Int and 0x000000FFFFFF
session = sessionFactory.createSession()
session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize)
session.start()
clientImpl = CordaRPCClientImpl(session, state.lock, myAddressPrefix)
running = true

View File

@ -8,14 +8,14 @@ import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
import com.r3corda.node.services.config.NodeSSLConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingComponent.Companion.toHostAndPort
import com.r3corda.node.services.messaging.CordaRPCOps
import com.r3corda.node.services.messaging.StateMachineInfo
import com.r3corda.node.services.messaging.StateMachineUpdate
import javafx.beans.property.SimpleObjectProperty
import rx.Observable
import rx.subjects.PublishSubject
import java.nio.file.Path
data class ProgressTrackingEvent(val stateMachineId: StateMachineRunId, val message: String) {
companion object {
@ -54,14 +54,11 @@ class NodeMonitorModel {
/**
* Register for updates to/from a given vault.
* @param messagingService The messaging to use for communication.
* @param monitorNodeInfo the [Node] to connect to.
* TODO provide an unsubscribe mechanism
*/
fun register(vaultMonitorNodeInfo: NodeInfo, certificatesPath: Path) {
val client = CordaRPCClient(ArtemisMessagingComponent.toHostAndPort(vaultMonitorNodeInfo.address), certificatesPath)
client.start()
fun register(vaultMonitorNodeInfo: NodeInfo, sslConfig: NodeSSLConfiguration, username: String, password: String) {
val client = CordaRPCClient(toHostAndPort(vaultMonitorNodeInfo.address), sslConfig)
client.start(username, password)
val proxy = client.proxy()
val (stateMachines, stateMachineUpdates) = proxy.stateMachinesAndUpdates()

View File

@ -13,8 +13,10 @@ import rx.Observable
import rx.subjects.UnicastSubject
import java.io.BufferedInputStream
import java.io.InputStream
import java.lang.Comparable
import java.math.BigDecimal
import java.nio.file.Files
import java.nio.file.LinkOption
import java.nio.file.Path
import java.time.Duration
import java.time.temporal.Temporal
@ -89,6 +91,7 @@ inline fun <T> SettableFuture<T>.catch(block: () -> T) {
}
fun <R> Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block)
fun Path.exists(vararg options: LinkOption): Boolean = Files.exists(this, *options)
// Simple infix function to add back null safety that the JDK lacks: timeA until timeB
infix fun Temporal.until(endExclusive: Temporal) = Duration.between(this, endExclusive)

View File

@ -3,6 +3,7 @@ package com.r3corda.docs
import com.google.common.net.HostAndPort
import com.r3corda.client.CordaRPCClient
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.services.config.NodeSSLConfiguration
import org.graphstream.graph.Edge
import org.graphstream.graph.Node
import org.graphstream.graph.implementations.SingleGraph
@ -26,12 +27,18 @@ fun main(args: Array<String>) {
}
val nodeAddress = HostAndPort.fromString(args[0])
val printOrVisualise = PrintOrVisualise.valueOf(args[1])
val certificatesPath = Paths.get("build/trader-demo/buyer/certificates")
val sslConfig = object : NodeSSLConfiguration {
override val certificatesPath = Paths.get("build/trader-demo/buyer/certificates")
override val keyStorePassword = "cordacadevpass"
override val trustStorePassword = "trustpass"
}
// END 1
// START 2
val client = CordaRPCClient(nodeAddress, certificatesPath)
client.start()
val username = System.console().readLine("Enter username: ")
val password = String(System.console().readPassword("Enter password: "))
val client = CordaRPCClient(nodeAddress, sslConfig)
client.start(username, password)
val proxy = client.proxy()
// END 2
@ -65,7 +72,7 @@ fun main(args: Array<String>) {
futureTransactions.subscribe { transaction ->
graph.addNode<Node>("${transaction.id}")
transaction.tx.inputs.forEach { ref ->
graph.addEdge<Edge>("${ref}", "${ref.txhash}", "${transaction.id}")
graph.addEdge<Edge>("$ref", "${ref.txhash}", "${transaction.id}")
}
}
graph.display()

View File

@ -16,7 +16,7 @@ we also need to access the certificates of the node, we will access the node's `
:start-after: START 1
:end-before: END 1
Now we can connect to the node itself:
Now we can connect to the node itself using a valid RPC login. By default the user `user1` is available with password `test`.
.. literalinclude:: example-code/src/main/kotlin/com/r3corda/docs/ClientRpcTutorial.kt
:language: kotlin

View File

@ -6,13 +6,10 @@ import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.explorer.model.IdentityModel
import com.r3corda.node.driver.PortAllocation
import com.r3corda.node.driver.driver
import com.r3corda.node.services.config.NodeSSLConfiguration
import com.r3corda.node.services.config.configureWithDevSSLCertificate
import com.r3corda.node.services.config.configureTestSSL
import com.r3corda.node.services.transactions.SimpleNotaryService
import javafx.stage.Stage
import tornadofx.App
import java.nio.file.Files
import java.nio.file.Path
class Main : App() {
override val primaryView = MainWindow::class
@ -38,19 +35,9 @@ class Main : App() {
val aliceNode = aliceNodeFuture.get()
val notaryNode = notaryNodeFuture.get()
val sslConfig = object : NodeSSLConfiguration {
override val certificatesPath: Path = Files.createTempDirectory("certs")
override val keyStorePassword = "cordacadevpass"
override val trustStorePassword = "trustpass"
init {
configureWithDevSSLCertificate()
}
}
Models.get<IdentityModel>(Main::class).notary.set(notaryNode.notaryIdentity)
Models.get<IdentityModel>(Main::class).myIdentity.set(aliceNode.legalIdentity)
Models.get<NodeMonitorModel>(Main::class).register(aliceNode, sslConfig.certificatesPath)
Models.get<NodeMonitorModel>(Main::class).register(aliceNode, configureTestSSL(), "user1", "test")
startNode("Bob").get()

View File

@ -2,6 +2,7 @@ package com.r3corda.node.services.config
import com.google.common.net.HostAndPort
import com.r3corda.core.crypto.X509Utilities
import com.r3corda.core.exists
import com.r3corda.core.utilities.loggerFor
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@ -89,14 +90,24 @@ fun Config.getProperties(path: String): Properties {
*/
fun NodeSSLConfiguration.configureWithDevSSLCertificate() {
Files.createDirectories(certificatesPath)
if (!Files.exists(trustStorePath)) {
if (!trustStorePath.exists()) {
Files.copy(javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordatruststore.jks"),
trustStorePath)
}
if (!Files.exists(keyStorePath)) {
if (!keyStorePath.exists()) {
val caKeyStore = X509Utilities.loadKeyStore(
javaClass.classLoader.getResourceAsStream("com/r3corda/node/internal/certificates/cordadevcakeys.jks"),
"cordacadevpass")
X509Utilities.createKeystoreForSSL(keyStorePath, keyStorePassword, keyStorePassword, caKeyStore, "cordacadevkeypass")
}
}
// TODO Move this to CoreTestUtils.kt once we can pry this from the explorer
fun configureTestSSL(): NodeSSLConfiguration = object : NodeSSLConfiguration {
override val certificatesPath = Files.createTempDirectory("certs")
override val keyStorePassword: String get() = "cordacadevpass"
override val trustStorePassword: String get() = "trustpass"
init {
configureWithDevSSLCertificate()
}
}

View File

@ -15,16 +15,15 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants
import java.nio.file.FileSystems
import java.nio.file.Path
import java.security.KeyStore
import java.security.PublicKey
/**
* The base class for Artemis services that defines shared data structures and transport configuration
*
* @param certificatePath A place where Artemis can stash its message journal and other files.
* @param config The config object is used to pass in the passwords for the certificate KeyStore and TrustStore
*/
abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : SingletonSerializeAsToken() {
abstract class ArtemisMessagingComponent() : SingletonSerializeAsToken() {
companion object {
init {
@ -36,7 +35,7 @@ abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : Sin
const val RPC_REQUESTS_QUEUE = "rpc.requests"
@JvmStatic
protected val NETWORK_MAP_ADDRESS = SimpleString(PEERS_PREFIX +"networkmap")
protected val NETWORK_MAP_ADDRESS = SimpleString("${PEERS_PREFIX}networkmap")
/**
* Assuming the passed in target address is actually an ArtemisAddress will extract the host and port of the node. This should
@ -70,7 +69,7 @@ abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : Sin
}
protected data class NetworkMapAddress(override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
override val queueName: SimpleString = NETWORK_MAP_ADDRESS
override val queueName: SimpleString get() = NETWORK_MAP_ADDRESS
}
/**
@ -80,12 +79,12 @@ abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : Sin
*/
data class NodeAddress(val identity: PublicKey, override val hostAndPort: HostAndPort) : SingleMessageRecipient, ArtemisAddress {
override val queueName: SimpleString by lazy { SimpleString(PEERS_PREFIX+identity.toBase58String()) }
override fun toString(): String {
return "NodeAddress(identity = $queueName, $hostAndPort"
}
override fun toString(): String = "${javaClass.simpleName}(identity = $queueName, $hostAndPort)"
}
/** The config object is used to pass in the passwords for the certificate KeyStore and TrustStore */
abstract val config: NodeSSLConfiguration
protected fun parseKeyFromQueueName(name: String): PublicKey {
require(name.startsWith(PEERS_PREFIX))
return parsePublicKeyBase58(name.substring(PEERS_PREFIX.length))
@ -119,39 +118,46 @@ abstract class ArtemisMessagingComponent(val config: NodeSSLConfiguration) : Sin
}
}
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int) =
TransportConfiguration(
when (direction) {
ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
},
mapOf(
// Basic TCP target details
TransportConstants.HOST_PROP_NAME to host,
TransportConstants.PORT_PROP_NAME to port.toInt(),
protected fun tcpTransport(direction: ConnectionDirection, host: String, port: Int): TransportConfiguration {
config.keyStorePath.expectedOnDefaultFileSystem()
config.trustStorePath.expectedOnDefaultFileSystem()
return TransportConfiguration(
when (direction) {
ConnectionDirection.INBOUND -> NettyAcceptorFactory::class.java.name
ConnectionDirection.OUTBOUND -> NettyConnectorFactory::class.java.name
},
mapOf(
// Basic TCP target details
TransportConstants.HOST_PROP_NAME to host,
TransportConstants.PORT_PROP_NAME to port.toInt(),
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop
// It does not use AMQP messages for its own messages e.g. topology and heartbeats
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP",
// Turn on AMQP support, which needs the protocol jar on the classpath.
// Unfortunately we cannot disable core protocol as artemis only uses AMQP for interop
// It does not use AMQP messages for its own messages e.g. topology and heartbeats
// TODO further investigate how to ensure we use a well defined wire level protocol for Node to Node communications
TransportConstants.PROTOCOLS_PROP_NAME to "CORE,AMQP",
// Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake
// and AES encryption
TransportConstants.SSL_ENABLED_PROP_NAME to true,
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.KEYSTORE_PATH_PROP_NAME to config.keyStorePath,
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to config.trustStorePath,
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true
)
)
// Enable TLS transport layer with client certs and restrict to at least SHA256 in handshake
// and AES encryption
TransportConstants.SSL_ENABLED_PROP_NAME to true,
TransportConstants.KEYSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.KEYSTORE_PATH_PROP_NAME to config.keyStorePath,
TransportConstants.KEYSTORE_PASSWORD_PROP_NAME to config.keyStorePassword, // TODO proper management of keystores and password
TransportConstants.TRUSTSTORE_PROVIDER_PROP_NAME to "JKS",
TransportConstants.TRUSTSTORE_PATH_PROP_NAME to config.trustStorePath,
TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME to config.trustStorePassword,
TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME to CIPHER_SUITES.joinToString(","),
TransportConstants.ENABLED_PROTOCOLS_PROP_NAME to "TLSv1.2",
TransportConstants.NEED_CLIENT_AUTH_PROP_NAME to true
)
)
}
fun configureWithDevSSLCertificate() {
config.configureWithDevSSLCertificate()
}
protected fun Path.expectedOnDefaultFileSystem() {
require(fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
}

View File

@ -3,11 +3,15 @@ package com.r3corda.node.services.messaging
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.AddressFormatException
import com.r3corda.core.crypto.newSecureRandom
import com.r3corda.core.div
import com.r3corda.core.exists
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.use
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_ROLE_NAME
import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.RPC_ROLE_NAME
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
@ -17,11 +21,25 @@ import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
import rx.Subscription
import java.math.BigInteger
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import java.security.Principal
import java.util.*
import javax.annotation.concurrent.ThreadSafe
import javax.security.auth.Subject
import javax.security.auth.callback.CallbackHandler
import javax.security.auth.callback.NameCallback
import javax.security.auth.callback.PasswordCallback
import javax.security.auth.callback.UnsupportedCallbackException
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.AppConfigurationEntry.LoginModuleControlFlag.REQUIRED
import javax.security.auth.login.FailedLoginException
import javax.security.auth.login.LoginException
import javax.security.auth.spi.LoginModule
// TODO: Verify that nobody can connect to us and fiddle with our config over the socket due to the secman.
// TODO: Implement a discovery engine that can trigger builds of new connections when another node registers? (later)
@ -37,9 +55,9 @@ import javax.annotation.concurrent.ThreadSafe
* a fully connected network, trusted network or on localhost.
*/
@ThreadSafe
class ArtemisMessagingServer(config: NodeConfiguration,
class ArtemisMessagingServer(override val config: NodeConfiguration,
val myHostPort: HostAndPort,
val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent(config) {
val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent() {
companion object {
val log = loggerFor<ArtemisMessagingServer>()
}
@ -52,6 +70,10 @@ class ArtemisMessagingServer(config: NodeConfiguration,
private lateinit var activeMQServer: ActiveMQServer
private var networkChangeHandle: Subscription? = null
init {
config.basedir.expectedOnDefaultFileSystem()
}
fun start() = mutex.locked {
if (!running) {
configureAndStartServer()
@ -116,12 +138,7 @@ class ArtemisMessagingServer(config: NodeConfiguration,
}
private fun configureAndStartServer() {
val config = createArtemisConfig(config.certificatesPath, myHostPort).apply {
securityRoles = mapOf(
"#" to setOf(Role("internal", true, true, true, true, true, true, true))
)
}
val config = createArtemisConfig()
val securityManager = createArtemisSecurityManager()
activeMQServer = ActiveMQServerImpl(config, securityManager).apply {
@ -157,28 +174,61 @@ class ArtemisMessagingServer(config: NodeConfiguration,
activeMQServer.start()
}
private fun createArtemisConfig(directory: Path, hp: HostAndPort): Configuration {
val config = ConfigurationImpl()
setConfigDirectories(config, directory)
config.acceptorConfigurations = setOf(
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", hp.port)
private fun createArtemisConfig(): Configuration = ConfigurationImpl().apply {
val artemisDir = config.basedir / "artemis"
bindingsDirectory = (artemisDir / "bindings").toString()
journalDirectory = (artemisDir / "journal").toString()
largeMessagesDirectory = (artemisDir / "largemessages").toString()
acceptorConfigurations = setOf(
tcpTransport(ConnectionDirection.INBOUND, "0.0.0.0", myHostPort.port)
)
// Enable built in message deduplication. Note we still have to do our own as the delayed commits
// and our own definition of commit mean that the built in deduplication cannot remove all duplicates.
config.idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
config.isPersistIDCache = true
return config
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true
isPopulateValidatedUser = true
setupUserRoles()
}
// This gives nodes full access and RPC clients only enough to do RPC
private fun ConfigurationImpl.setupUserRoles() {
// TODO COR-307
val nodeRole = Role(NODE_ROLE_NAME, true, true, true, true, true, true, true, true)
val clientRpcRole = restrictedRole(RPC_ROLE_NAME, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)
securityRoles = mapOf(
"#" to setOf(nodeRole),
"clients.*.rpc.responses.*" to setOf(nodeRole, clientRpcRole),
"clients.*.rpc.observations.*" to setOf(nodeRole, clientRpcRole),
RPC_REQUESTS_QUEUE to setOf(nodeRole, restrictedRole(RPC_ROLE_NAME, send = true))
)
}
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
deleteDurableQueue: Boolean = false, createNonDurableQueue: Boolean = false,
deleteNonDurableQueue: Boolean = false, manage: Boolean = false, browse: Boolean = false): Role {
return Role(name, send, consume, createDurableQueue, deleteDurableQueue, createNonDurableQueue,
deleteNonDurableQueue, manage, browse)
}
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
// TODO: set up proper security configuration https://r3-cev.atlassian.net/browse/COR-307
val securityConfig = SecurityConfiguration().apply {
addUser("internal", BigInteger(128, newSecureRandom()).toString(16))
addRole("internal", "internal")
defaultUser = "internal"
val rpcUsersFile = config.basedir / "rpc-users.properties"
if (!rpcUsersFile.exists()) {
val users = Properties()
users["user1"] = "test"
Files.newOutputStream(rpcUsersFile).use {
users.store(it, null)
}
}
return ActiveMQJAASSecurityManager(InVMLoginModule::class.java.name, securityConfig)
val securityConfig = object : SecurityConfiguration() {
// Override to make it work with our login module
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf(NodeLoginModule.FILE_KEY to rpcUsersFile)
return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
}
}
return ActiveMQJAASSecurityManager(NodeLoginModule::class.java.name, securityConfig)
}
private fun connectorExists(hostAndPort: HostAndPort) = hostAndPort.toString() in activeMQServer.configuration.connectorConfigurations
@ -194,12 +244,11 @@ class ArtemisMessagingServer(config: NodeConfiguration,
private fun bridgeExists(name: SimpleString) = activeMQServer.clusterManager.bridges.containsKey(name.toString())
private fun deployBridge(hostAndPort: HostAndPort, name: SimpleString) {
private fun deployBridge(hostAndPort: HostAndPort, name: String) {
activeMQServer.deployBridge(BridgeConfiguration().apply {
val nameStr = name.toString()
setName(nameStr)
queueName = nameStr
forwardingAddress = nameStr
setName(name)
queueName = name
forwardingAddress = name
staticConnectors = listOf(hostAndPort.toString())
confirmationWindowSize = 100000 // a guess
isUseDuplicateDetection = true // Enable the bridges automatic deduplication logic
@ -218,7 +267,7 @@ class ArtemisMessagingServer(config: NodeConfiguration,
if (!connectorExists(hostAndPort))
addConnector(hostAndPort)
if (!bridgeExists(name))
deployBridge(hostAndPort, name)
deployBridge(hostAndPort, name.toString())
}
private fun maybeDestroyBridge(name: SimpleString) {
@ -227,11 +276,81 @@ class ArtemisMessagingServer(config: NodeConfiguration,
}
}
private fun setConfigDirectories(config: Configuration, dir: Path) {
config.apply {
bindingsDirectory = dir.resolve("bindings").toString()
journalDirectory = dir.resolve("journal").toString()
largeMessagesDirectory = dir.resolve("largemessages").toString()
class NodeLoginModule : LoginModule {
companion object {
const val FILE_KEY = "rpc-users-file"
const val NODE_ROLE_NAME = "NodeRole"
const val RPC_ROLE_NAME = "RpcRole"
}
private val users = Properties()
private var loginSucceeded: Boolean = false
private lateinit var subject: Subject
private lateinit var callbackHandler: CallbackHandler
private lateinit var principals: List<Principal>
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
this.subject = subject
this.callbackHandler = callbackHandler
val rpcUsersFile = options[FILE_KEY] as Path
if (rpcUsersFile.exists()) {
rpcUsersFile.use {
users.load(it)
}
}
}
override fun login(): Boolean {
val nameCallback = NameCallback("Username: ")
val passwordCallback = PasswordCallback("Password: ", false)
try {
callbackHandler.handle(arrayOf(nameCallback, passwordCallback))
} catch (e: IOException) {
throw LoginException(e.message)
} catch (e: UnsupportedCallbackException) {
throw LoginException("${e.message} not available to obtain information from user")
}
val username = nameCallback.name ?: throw FailedLoginException("User name is null")
val receivedPassword = passwordCallback.password ?: throw FailedLoginException("Password is null")
val password = if (username == "Node") "Node" else users[username] ?: throw FailedLoginException("User does not exist")
if (password != String(receivedPassword)) {
throw FailedLoginException("Password does not match")
}
principals = listOf(
UserPrincipal(username),
RolePrincipal(if (username == "Node") NODE_ROLE_NAME else RPC_ROLE_NAME))
loginSucceeded = true
return loginSucceeded
}
override fun commit(): Boolean {
val result = loginSucceeded
if (result) {
subject.principals.addAll(principals)
}
clear()
return result
}
override fun abort(): Boolean {
clear()
return true
}
override fun logout(): Boolean {
subject.principals.removeAll(principals)
return true
}
private fun clear() {
loginSucceeded = false
}
}
}

View File

@ -11,12 +11,13 @@ import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.utilities.*
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.Message.HDR_VALIDATED_USER
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
import org.jetbrains.exposed.sql.statements.InsertStatement
import java.nio.file.FileSystems
import java.security.PublicKey
import java.time.Instant
import java.util.*
@ -50,11 +51,11 @@ import javax.annotation.concurrent.ThreadSafe
* in this class.
*/
@ThreadSafe
class NodeMessagingClient(config: NodeConfiguration,
class NodeMessagingClient(override val config: NodeConfiguration,
val serverHostPort: HostAndPort,
val myIdentity: PublicKey?,
val executor: AffinityExecutor,
val database: Database) : ArtemisMessagingComponent(config), MessagingServiceInternal {
val database: Database) : ArtemisMessagingComponent(), MessagingServiceInternal {
companion object {
val log = loggerFor<NodeMessagingClient>()
@ -114,10 +115,6 @@ class NodeMessagingClient(config: NodeConfiguration,
}
})
init {
require(config.basedir.fileSystem == FileSystems.getDefault()) { "Artemis only uses the default file system" }
}
fun start(rpcOps: CordaRPCOps) {
state.locked {
check(!started) { "start can't be called twice" }
@ -131,7 +128,7 @@ class NodeMessagingClient(config: NodeConfiguration,
// Create a session. Note that the acknowledgement of messages is not flushed to
// the Artermis journal until the default buffer size of 1MB is acknowledged.
val session = clientFactory!!.createSession(true, true, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
val session = clientFactory!!.createSession("Node", "Node", false, true, true, locator.isPreAcknowledge, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)
this.session = session
session.start()
@ -221,8 +218,9 @@ class NodeMessagingClient(config: NodeConfiguration,
val topic = message.getStringProperty(TOPIC_PROPERTY)
val sessionID = message.getLongProperty(SESSION_ID_PROPERTY)
// Use the magic deduplication property built into Artemis as our message identity too
val uuid = UUID.fromString(message.getStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID))
log.info("received message from: ${message.address} topic: $topic sessionID: $sessionID uuid: $uuid")
val uuid = UUID.fromString(message.getStringProperty(HDR_DUPLICATE_DETECTION_ID))
val user = message.getStringProperty(HDR_VALIDATED_USER)
log.info("Received message from: ${message.address} user: $user topic: $topic sessionID: $sessionID uuid: $uuid")
val body = ByteArray(message.bodySize).apply { message.bodyBuffer.readBytes(this) }
@ -340,7 +338,7 @@ class NodeMessagingClient(config: NodeConfiguration,
putLongProperty(SESSION_ID_PROPERTY, sessionID)
writeBodyBufferBytes(message.data)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
if (knownQueues.add(queueName)) {
@ -388,8 +386,8 @@ class NodeMessagingClient(config: NodeConfiguration,
override fun createMessage(topicSession: TopicSession, data: ByteArray, uuid: UUID): Message {
// TODO: We could write an object that proxies directly to an underlying MQ message here and avoid copying.
return object : Message {
override val topicSession: TopicSession get() = topicSession
override val data: ByteArray get() = data
override val topicSession: TopicSession = topicSession
override val data: ByteArray = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val uniqueMessageId: UUID = uuid
@ -405,7 +403,7 @@ class NodeMessagingClient(config: NodeConfiguration,
val msg = session!!.createMessage(false).apply {
writeBodyBufferBytes(bits.bits)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
producer!!.send(toAddress, msg)
}