Simple RPC access control, with a demo control on the cash RPCs

This commit is contained in:
Shams Asari 2016-10-25 11:31:20 +01:00
parent a7dfb43c63
commit 6d39b71bf9
26 changed files with 502 additions and 323 deletions

View File

@ -2,6 +2,7 @@ package com.r3corda.client
import com.r3corda.core.random63BitValue
import com.r3corda.node.driver.driver
import com.r3corda.node.services.User
import com.r3corda.node.services.config.configureTestSSL
import com.r3corda.node.services.messaging.ArtemisMessagingComponent.Companion.toHostAndPort
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
@ -14,8 +15,7 @@ import kotlin.concurrent.thread
class CordaRPCClientTest {
private val validUsername = "user1"
private val validPassword = "test"
private val rpcUser = User("user1", "test", permissions = emptySet())
private val stopDriver = CountDownLatch(1)
private var driverThread: Thread? = null
private lateinit var client: CordaRPCClient
@ -25,7 +25,7 @@ class CordaRPCClientTest {
val driverStarted = CountDownLatch(1)
driverThread = thread {
driver {
val driverInfo = startNode().get()
val driverInfo = startNode(rpcUsers = listOf(rpcUser)).get()
client = CordaRPCClient(toHostAndPort(driverInfo.nodeInfo.address), configureTestSSL())
driverStarted.countDown()
stopDriver.await()
@ -42,20 +42,20 @@ class CordaRPCClientTest {
@Test
fun `log in with valid username and password`() {
client.start(validUsername, validPassword)
client.start(rpcUser.username, rpcUser.password)
}
@Test
fun `log in with unknown user`() {
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
client.start(random63BitValue().toString(), validPassword)
client.start(random63BitValue().toString(), rpcUser.password)
}
}
@Test
fun `log in with incorrect password`() {
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
client.start(validUsername, random63BitValue().toString())
client.start(rpcUser.username, random63BitValue().toString())
}
}

View File

@ -13,6 +13,8 @@ import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.driver.driver
import com.r3corda.node.internal.CordaRPCOpsImpl
import com.r3corda.node.services.User
import com.r3corda.node.services.config.configureTestSSL
import com.r3corda.node.services.messaging.StateMachineUpdate
import com.r3corda.node.services.transactions.SimpleNotaryService
@ -48,7 +50,8 @@ class NodeMonitorModelTest {
val driverStarted = CountDownLatch(1)
driverThread = thread {
driver {
val aliceNodeFuture = startNode("Alice")
val cashUser = User("user1", "test", permissions = setOf(CordaRPCOpsImpl.CASH_PERMISSION))
val aliceNodeFuture = startNode("Alice", rpcUsers = listOf(cashUser))
val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type)))
aliceNode = aliceNodeFuture.get().nodeInfo
@ -64,7 +67,7 @@ class NodeMonitorModelTest {
networkMapUpdates = monitor.networkMap.bufferUntilSubscribed()
clientToService = monitor.clientToService
monitor.register(aliceNode, configureTestSSL(), "user1", "test")
monitor.register(aliceNode, configureTestSSL(), cashUser.username, cashUser.password)
driverStarted.countDown()
stopDriver.await()
}
@ -79,7 +82,7 @@ class NodeMonitorModelTest {
}
@Test
fun testNetworkMapUpdate() {
fun `network map update`() {
newNode("Bob")
newNode("Charlie")
networkMapUpdates.expectEvents(isStrict = false) {
@ -99,7 +102,7 @@ class NodeMonitorModelTest {
}
@Test
fun cashIssueWorksEndToEnd() {
fun `cash issue works end to end`() {
clientToService.onNext(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
@ -124,8 +127,7 @@ class NodeMonitorModelTest {
}
@Test
fun issueAndMoveWorks() {
fun `cash issue and move`() {
clientToService.onNext(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),

View File

@ -5,8 +5,10 @@ import com.r3corda.client.impl.CordaRPCClientImpl
import com.r3corda.core.ThreadBox
import com.r3corda.node.services.config.NodeSSLConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
import com.r3corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
import com.r3corda.node.services.messaging.CordaRPCOps
import com.r3corda.node.services.messaging.RPCException
import com.r3corda.node.services.messaging.rpcLog
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientSession
@ -25,10 +27,6 @@ import kotlin.concurrent.thread
*/
@ThreadSafe
class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfiguration) : Closeable, ArtemisMessagingComponent() {
companion object {
private val rpcLog = LoggerFactory.getLogger("com.r3corda.rpc")
}
// TODO: Certificate handling for clients needs more work.
private inner class State {
var running = false
@ -38,16 +36,6 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
}
private val state = ThreadBox(State())
/**
* An ID that we used to identify this connection on the server side: kind of like a local port number but
* it persists for the lifetime of this process and survives short TCP connection interruptions. Is -1
* until [start] is called.
*/
var myID: Int = -1
private set
private val myAddressPrefix: String get() = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$myID"
/** Opens the connection to the server and registers a JVM shutdown hook to cleanly disconnect. */
@Throws(ActiveMQNotConnectedException::class)
fun start(username: String, password: String) {
@ -57,16 +45,14 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.OUTBOUND, host.hostText, host.port))
serverLocator.threadPoolMaxSize = 1
sessionFactory = serverLocator.createSessionFactory()
// We use our initial connection ID as the queue namespace.
myID = sessionFactory.connection.id as Int and 0x000000FFFFFF
session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize)
session.start()
clientImpl = CordaRPCClientImpl(session, state.lock, myAddressPrefix)
clientImpl = CordaRPCClientImpl(session, state.lock, username)
running = true
// We will use the ID in strings so strip the sign bit.
}
Runtime.getRuntime().addShutdownHook(thread(start = false) {
Runtime.getRuntime().addShutdownHook(Thread {
close()
})
}

View File

@ -16,6 +16,7 @@ import com.r3corda.core.utilities.debug
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.messaging.*
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
@ -42,7 +43,7 @@ import kotlin.reflect.jvm.javaMethod
*/
class CordaRPCClientImpl(private val session: ClientSession,
private val sessionLock: ReentrantLock,
private val myAddressPrefix: String) {
private val username: String) {
companion object {
private val closeableCloseMethod = Closeable::close.javaMethod
private val autocloseableCloseMethod = AutoCloseable::close.javaMethod
@ -113,18 +114,20 @@ class CordaRPCClientImpl(private val session: ClientSession,
*/
@ThreadSafe
private inner class RPCProxyHandler(private val timeout: Duration?) : InvocationHandler, Closeable {
private val proxyAddress = "$myAddressPrefix.rpc.responses.${random63BitValue()}"
private val proxyAddress = constructAddress()
private val consumer: ClientConsumer
var serverProtocolVersion = 0
init {
consumer = sessionLock.withLock{
consumer = sessionLock.withLock {
session.createTemporaryQueue(proxyAddress, proxyAddress)
session.createConsumer(proxyAddress)
}
}
private fun constructAddress() = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$username.rpc.${random63BitValue()}"
@Synchronized
override fun invoke(proxy: Any, method: Method, args: Array<out Any>?): Any? {
if (isCloseInvocation(method)) {
@ -187,13 +190,12 @@ class CordaRPCClientImpl(private val session: ClientSession,
sessionLock.withLock {
val msg = createMessage(method)
val kryo = if (returnsObservables) maybePrepareForObservables(location, method, msg) else null
val argsArray = args ?: Array<Any?>(0) { null }
val serializedBytes = try {
argsArray.serialize()
val serializedArgs = try {
(args ?: emptyArray<Any?>()).serialize()
} catch (e: KryoException) {
throw RPCException("Could not serialize RPC arguments", e)
}
msg.writeBodyBufferBytes(serializedBytes.bits)
msg.writeBodyBufferBytes(serializedArgs.bits)
producer!!.send(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE, msg)
return kryo
}
@ -201,12 +203,12 @@ class CordaRPCClientImpl(private val session: ClientSession,
private fun maybePrepareForObservables(location: Throwable, method: Method, msg: ClientMessage): Kryo {
// Create a temporary queue just for the emissions on any observables that are returned.
val qName = "$myAddressPrefix.rpc.observations.${random63BitValue()}"
session.createTemporaryQueue(qName, qName)
msg.putStringProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, qName)
val observationsQueueName = constructAddress()
session.createTemporaryQueue(observationsQueueName, observationsQueueName)
msg.putStringProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, observationsQueueName)
// And make sure that we deserialise observable handles so that they're linked to the right
// queue. Also record a bit of metadata for debugging purposes.
return createRPCKryo(observableSerializer = ObservableDeserializer(qName, method.name, location))
return createRPCKryo(observableSerializer = ObservableDeserializer(observationsQueueName, method.name, location))
}
private fun createMessage(method: Method): ClientMessage {
@ -214,7 +216,7 @@ class CordaRPCClientImpl(private val session: ClientSession,
putStringProperty(ClientRPCRequestMessage.METHOD_NAME, method.name)
putStringProperty(ClientRPCRequestMessage.REPLY_TO, proxyAddress)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
}

View File

@ -1,13 +1,20 @@
package com.r3corda.client
import com.r3corda.client.impl.CordaRPCClientImpl
import com.r3corda.core.millis
import com.r3corda.core.random63BitValue
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.utilities.LogHelper
import com.r3corda.node.services.RPCUserService
import com.r3corda.node.services.User
import com.r3corda.node.services.messaging.*
import com.r3corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import com.r3corda.node.utilities.AffinityExecutor
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.TransportConfiguration
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ClientProducer
import org.apache.activemq.artemis.api.core.client.ClientSession
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
@ -15,12 +22,14 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.After
import org.junit.Before
import org.junit.Test
import rx.Observable
import rx.subjects.PublishSubject
import java.io.Closeable
import java.time.Duration
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.LinkedBlockingQueue
@ -28,6 +37,7 @@ import java.util.concurrent.locks.ReentrantLock
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
import kotlin.test.fail
class ClientRPCInfrastructureTests {
// TODO: Test that timeouts work
@ -37,7 +47,9 @@ class ClientRPCInfrastructureTests {
lateinit var clientSession: ClientSession
lateinit var producer: ClientProducer
lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
lateinit var proxy: ITestOps
var proxy: TestOps? = null
private val authenticatedUser = User("test", "password", permissions = setOf())
@Before
fun setup() {
@ -54,20 +66,25 @@ class ClientRPCInfrastructureTests {
serverSession = sessionFactory.createSession()
serverSession.start()
serverSession.createTemporaryQueue(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE, ArtemisMessagingComponent.RPC_REQUESTS_QUEUE)
serverSession.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
producer = serverSession.createProducer()
val dispatcher = object : RPCDispatcher(TestOps()) {
val userService = object : RPCUserService {
override fun getUser(usename: String): User? = throw UnsupportedOperationException()
override val users: List<User> get() = throw UnsupportedOperationException()
}
val dispatcher = object : RPCDispatcher(TestOpsImpl(), userService) {
override fun send(bits: SerializedBytes<*>, toAddress: String) {
val msg = serverSession.createMessage(false).apply {
writeBodyBufferBytes(bits.bits)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
producer.send(toAddress, msg)
}
override fun getUser(message: ClientMessage): User = authenticatedUser
}
serverThread = AffinityExecutor.ServiceAffinityExecutor("unit-tests-rpc-dispatch-thread", 1)
val serverConsumer = serverSession.createConsumer(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE)
val serverConsumer = serverSession.createConsumer(RPC_REQUESTS_QUEUE)
serverSession.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 'BINDING_REMOVED'")
val serverNotifConsumer = serverSession.createConsumer("rpc.qremovals")
dispatcher.start(serverConsumer, serverNotifConsumer, serverThread)
@ -75,21 +92,27 @@ class ClientRPCInfrastructureTests {
clientSession = sessionFactory.createSession()
clientSession.start()
LogHelper.setLevel("+com.r3corda.rpc"/*, "+org.apache.activemq"*/)
proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), "tests").proxyFor(ITestOps::class.java)
LogHelper.setLevel("+com.r3corda.rpc")
}
private fun createProxyUsingReplyTo(username: String, timeout: Duration? = null): TestOps {
val proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), username).proxyFor(TestOps::class.java, timeout = timeout)
this.proxy = proxy
return proxy
}
private fun createProxyUsingAuthenticatedReplyTo() = createProxyUsingReplyTo(authenticatedUser.username)
@After
fun shutdown() {
(proxy as Closeable).close()
(proxy as Closeable?)?.close()
clientSession.stop()
serverSession.stop()
artemis.stop()
serverThread.shutdownNow()
}
interface ITestOps : RPCOps {
interface TestOps : RPCOps {
@Throws(IllegalArgumentException::class)
fun barf()
@ -105,34 +128,33 @@ class ClientRPCInfrastructureTests {
@RPCSinceVersion(2)
fun addedLater()
fun captureUser(): String
}
lateinit var complicatedObservable: Observable<Pair<String, Observable<String>>>
inner class TestOps : ITestOps {
inner class TestOpsImpl : TestOps {
override val protocolVersion = 1
override fun barf() {
throw IllegalArgumentException("Barf!")
}
override fun barf(): Unit = throw IllegalArgumentException("Barf!")
override fun void() { }
override fun someCalculation(str: String, num: Int) = "$str $num"
override fun makeObservable(): Observable<Int> {
return Observable.just(1, 2, 3, 4)
}
override fun makeObservable(): Observable<Int> = Observable.just(1, 2, 3, 4)
override fun makeComplicatedObservable() = complicatedObservable
override fun addedLater() {
throw UnsupportedOperationException("not implemented")
}
override fun addedLater(): Unit = throw UnsupportedOperationException("not implemented")
override fun captureUser(): String = CURRENT_RPC_USER.get().username
}
@Test
fun simpleRPCs() {
fun `simple RPCs`() {
val proxy = createProxyUsingAuthenticatedReplyTo()
// Does nothing, doesn't throw.
proxy.void()
@ -144,14 +166,16 @@ class ClientRPCInfrastructureTests {
}
@Test
fun simpleObservable() {
fun `simple observable`() {
val proxy = createProxyUsingAuthenticatedReplyTo()
// This tests that the observations are transmitted correctly, also completion is transmitted.
val observations = proxy.makeObservable().toBlocking().toIterable().toList()
assertEquals(listOf(1, 2, 3, 4), observations)
}
@Test
fun complexObservables() {
fun `complex observables`() {
val proxy = createProxyUsingAuthenticatedReplyTo()
// This checks that we can return an object graph with complex usage of observables, like an observable
// that emits objects that contain more observables.
val serverQuotes = PublishSubject.create<Pair<String, Observable<String>>>()
@ -177,7 +201,8 @@ class ClientRPCInfrastructureTests {
}
}
assertEquals(1, clientSession.addressQuery(SimpleString("tests.rpc.observations.#")).queueNames.size)
val rpcQueuesQuery = SimpleString("clients.${authenticatedUser.username}.rpc.*")
assertEquals(2, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
assertThat(clientQuotes).isEmpty()
@ -192,11 +217,36 @@ class ClientRPCInfrastructureTests {
assertTrue(serverQuotes.hasObservers())
subscription.unsubscribe()
unsubscribeLatch.await()
assertEquals(0, clientSession.addressQuery(SimpleString("tests.rpc.observations.#")).queueNames.size)
assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size)
}
@Test
fun versioning() {
val proxy = createProxyUsingAuthenticatedReplyTo()
assertFailsWith<UnsupportedOperationException> { proxy.addedLater() }
}
@Test
fun `authenticated user is available to RPC`() {
val proxy = createProxyUsingAuthenticatedReplyTo()
assertThat(proxy.captureUser()).isEqualTo(authenticatedUser.username)
}
@Test
fun `using another username for the reply-to`() {
assertThatExceptionOfType(RPCException.DeadlineExceeded::class.java).isThrownBy {
val proxy = createProxyUsingReplyTo(random63BitValue().toString(), timeout = 300.millis)
proxy.void()
fail("RPC successfully returned using someone else's username for the reply-to")
}
}
@Test
fun `using another username for the reply-to, which contains our username as a prefix`() {
assertThatExceptionOfType(RPCException.DeadlineExceeded::class.java).isThrownBy {
val proxy = createProxyUsingReplyTo("${authenticatedUser.username}extra", timeout = 300.millis)
proxy.void()
fail("RPC successfully returned using someone else's username for the reply-to")
}
}
}

View File

@ -13,10 +13,13 @@ import rx.Observable
import rx.subjects.UnicastSubject
import java.io.BufferedInputStream
import java.io.InputStream
import java.io.OutputStream
import java.math.BigDecimal
import java.nio.file.Files
import java.nio.file.LinkOption
import java.nio.file.OpenOption
import java.nio.file.Path
import java.nio.file.attribute.FileAttribute
import java.time.Duration
import java.time.temporal.Temporal
import java.util.concurrent.ExecutionException
@ -31,6 +34,7 @@ val Int.days: Duration get() = Duration.ofDays(this.toLong())
val Int.hours: Duration get() = Duration.ofHours(this.toLong())
val Int.minutes: Duration get() = Duration.ofMinutes(this.toLong())
val Int.seconds: Duration get() = Duration.ofSeconds(this.toLong())
val Int.millis: Duration get() = Duration.ofMillis(this.toLong())
// TODO: Review by EOY2016 if we ever found these utilities helpful.
@ -89,8 +93,17 @@ inline fun <T> SettableFuture<T>.catch(block: () -> T) {
}
}
fun <R> Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block)
/** Allows you to write code like: Paths.get("someDir") / "subdir" / "filename" but using the Paths API to avoid platform separator problems. */
operator fun Path.div(other: String): Path = resolve(other)
fun Path.createDirectories(vararg attrs: FileAttribute<*>): Path = Files.createDirectories(this, *attrs)
fun Path.exists(vararg options: LinkOption): Boolean = Files.exists(this, *options)
inline fun <R> Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block)
inline fun Path.write(createDirs: Boolean = false, vararg options: OpenOption, block: (OutputStream) -> Unit) {
if (createDirs) {
normalize().parent?.createDirectories()
}
Files.newOutputStream(this, *options).use(block)
}
// Simple infix function to add back null safety that the JDK lacks: timeA until timeB
infix fun Temporal.until(endExclusive: Temporal) = Duration.between(this, endExclusive)
@ -228,9 +241,6 @@ fun extractZipFile(zipPath: Path, toPath: Path) {
val Throwable.rootCause: Throwable get() = Throwables.getRootCause(this)
/** Allows you to write code like: Paths.get("someDir") / "subdir" / "filename" but using the Paths API to avoid platform separator problems. */
operator fun Path.div(other: String): Path = resolve(other)
/** Representation of an operation that may have thrown an error. */
data class ErrorOr<out A> private constructor(val value: A?, val error: Throwable?) {
constructor(value: A) : this(value, null)
@ -278,21 +288,4 @@ fun <T> Observable<T>.bufferUntilSubscribed(): Observable<T> {
val subject = UnicastSubject.create<T>()
val subscription = subscribe(subject)
return subject.doOnUnsubscribe { subscription.unsubscribe() }
}
/**
* Determine if an iterable data type's contents are ordered and unique, based on their [Comparable].compareTo
* function.
*/
fun <T, I: Comparable<I>> Iterable<T>.isOrderedAndUnique(extractId: T.() -> I): Boolean {
var last: I? = null
return all { it ->
val lastLast = last
last = extractId(it)
if (lastLast == null) {
true
} else {
lastLast < extractId(it)
}
}
}

View File

@ -179,7 +179,6 @@ interface Message {
val data: ByteArray
val debugTimestamp: Instant
val uniqueMessageId: UUID
fun serialise(): ByteArray
}
/** A singleton that's useful for validating topic strings */

View File

@ -1,24 +0,0 @@
package com.r3corda.core
import kotlin.test.assertFalse
import kotlin.test.assertTrue
class UtilsTest {
fun `ordered and unique basic`() {
val basic = listOf(1, 2, 3, 5, 8)
assertTrue(basic.isOrderedAndUnique { this })
val negative = listOf(-1, 2, 5)
assertTrue(negative.isOrderedAndUnique { this })
}
fun `ordered and unique duplicate`() {
val duplicated = listOf(1, 2, 2, 3, 5, 8)
assertFalse(duplicated.isOrderedAndUnique { this })
}
fun `ordered and unique out of sequence`() {
val mixed = listOf(3, 1, 2, 8, 5)
assertFalse(mixed.isOrderedAndUnique { this })
}
}

View File

@ -17,6 +17,14 @@ detail on how to use this is provided in the docs for the proxy method.
For a brief tutorial on how one can use the RPC API see :doc:`tutorial-clientrpc-api`.
Security
--------
Users wanting to use the RPC library are first required to authenticate themselves with the node using a valid username
and password. These are kept in ``rpc-users.properties`` in the node base directory. This file also specifies
permissions for each user, which the RPC implementation can use to control access. The file format is described in
:doc:`corda-configuration-files`.
Observables
-----------

View File

@ -4,7 +4,7 @@ The Corda Configuration File
Configuration File Location
---------------------------
The Corda all in one ``corda.jar`` file is generated by the ``gradle buildCordaJAR`` task and defaults to reading configuration from a ``node.conf`` file in the present working directory.
The Corda all-in-one ``corda.jar`` file is generated by the ``gradle buildCordaJAR`` task and defaults to reading configuration from a ``node.conf`` file in the present working directory.
This behaviour can be overidden using the ``--config-file`` command line option to target configuration files with different names, or different file location (relative paths are relative to the current working directory).
Also, the ``--base-directory`` command line option alters the Corda node workspace location and if specified a ``node.conf`` configuration file is then expected in the root of the workspace.
@ -97,6 +97,20 @@ Configuration File Fields
:useHTTPS: If false the node's web server will be plain HTTP. If true the node will use the same certificate and private key from the ``<workspace>/certificates/sslkeystore.jks`` file as the ArtemisMQ port for HTTPS. If HTTPS is enabled then unencrypted HTTP traffic to the node's **webAddress** port is not supported.
RPC Users File
==============
Corda also uses the ``rpc-users.properties`` file, found in the base directory, to control access to the RPC subsystem.
This is a Java properties file (details can be found in the `Javadocs <https://docs.oracle.com/javase/8/docs/api/java/util/Properties.html#load-java.io.Reader->`_)
which specifies a list of users with their password and list of permissions they're enabled for.
.. code-block:: text
:caption: Sample
admin=notsecure,ADMIN
user1=letmein,CASH,PAPER
In this example ``user1`` has password ``letmein`` and has permissions for ``CASH`` and ``PAPER``. The permissions are
free-form strings which can be used by the RPC methods to control access.
If ``rpc-users.properties`` is empty or doesn't exist then the RPC subsystem is effectively locked down.

View File

@ -5,33 +5,30 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.div
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.node.serialization.NodeClock
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.write
import com.r3corda.node.services.User
import com.r3corda.node.services.config.ConfigHelper
import com.r3corda.node.services.config.FullNodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingComponent
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.utilities.JsonSupport
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.File
import java.io.InputStreamReader
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
import java.text.SimpleDateFormat
import java.time.Clock
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.*
import kotlin.concurrent.thread
/**
* This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests.
@ -45,7 +42,7 @@ import kotlin.concurrent.thread
* TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought.
*/
private val log: Logger = LoggerFactory.getLogger(DriverDSL::class.java)
private val log: Logger = loggerFor<DriverDSL>()
/**
* This is the interface that's exposed to DSL users.
@ -57,9 +54,12 @@ interface DriverDSLExposedInterface {
* @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something
* random. Note that this must be unique as the driver uses it as a primary key!
* @param advertisedServices The set of services to be advertised by the node. Defaults to empty set.
* @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list.
* @return The [NodeInfo] of the started up node retrieved from the network map service.
*/
fun startNode(providedName: String? = null, advertisedServices: Set<ServiceInfo> = setOf()): Future<NodeInfoAndConfig>
fun startNode(providedName: String? = null,
advertisedServices: Set<ServiceInfo> = emptySet(),
rpcUsers: List<User> = emptyList()): Future<NodeInfoAndConfig>
fun waitForAllNodesToFinish()
}
@ -102,7 +102,7 @@ sealed class PortAllocation {
*
* The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache].
*
* @param baseDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
* @param driverDirectory The base directory node directories go into, defaults to "build/<timestamp>/". The node
* directories themselves are "<baseDirectory>/<legalName>/", where legalName defaults to "<randomName>-<messagingPort>"
* and may be specified in [DriverDSL.startNode].
* @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental.
@ -113,7 +113,7 @@ sealed class PortAllocation {
* @return The value returned in the [dsl] closure.
*/
fun <A> driver(
baseDirectory: String = "build/${getTimestampAsDirectoryName()}",
driverDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()),
portAllocation: PortAllocation = PortAllocation.Incremental(10000),
debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005),
useTestClock: Boolean = false,
@ -123,7 +123,7 @@ fun <A> driver(
driverDsl = DriverDSL(
portAllocation = portAllocation,
debugPortAllocation = debugPortAllocation,
baseDirectory = baseDirectory,
driverDirectory = driverDirectory,
useTestClock = useTestClock,
isDebug = isDebug
),
@ -162,10 +162,7 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
}
private fun getTimestampAsDirectoryName(): String {
val tz = TimeZone.getTimeZone("UTC")
val df = SimpleDateFormat("yyyyMMddHHmmss")
df.timeZone = tz
return df.format(Date())
return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now())
}
fun addressMustBeBound(hostAndPort: HostAndPort) {
@ -207,7 +204,7 @@ fun <A> poll(pollName: String, pollIntervalMs: Long = 500, warnCount: Int = 120,
open class DriverDSL(
val portAllocation: PortAllocation,
val debugPortAllocation: PortAllocation,
val baseDirectory: String,
val driverDirectory: Path,
val useTestClock: Boolean,
val isDebug: Boolean
) : DriverDSLInternalInterface {
@ -271,7 +268,7 @@ open class DriverDSL(
}
private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? {
val url = URL("http://${webAddress.toString()}/api/info")
val url = URL("http://$webAddress/api/info")
try {
val conn = url.openConnection() as HttpURLConnection
conn.requestMethod = "GET"
@ -291,20 +288,20 @@ open class DriverDSL(
}
}
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>): Future<NodeInfoAndConfig> {
override fun startNode(providedName: String?, advertisedServices: Set<ServiceInfo>, rpcUsers: List<User>): Future<NodeInfoAndConfig> {
val messagingAddress = portAllocation.nextHostAndPort()
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val name = providedName ?: "${pickA(name)}-${messagingAddress.port}"
val nodeDirectory = "$baseDirectory/$name"
val baseDirectory = driverDirectory / name
val config = ConfigHelper.loadConfig(
baseDirectoryPath = Paths.get(nodeDirectory),
baseDirectoryPath = baseDirectory,
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to name,
"basedir" to Paths.get(nodeDirectory).normalize().toString(),
"basedir" to baseDirectory.normalize().toString(),
"artemisAddress" to messagingAddress.toString(),
"webAddress" to apiAddress.toString(),
"extraAdvertisedServiceIds" to advertisedServices.joinToString(","),
@ -313,8 +310,16 @@ open class DriverDSL(
)
)
val nodeConfig = FullNodeConfiguration(config)
nodeConfig.rpcUsersFile.write(createDirs = true) {
rpcUsers.map { it.username to "${it.password},${it.permissions.joinToString(",")}" }
.toMap(Properties())
.store(it, null)
}
return Executors.newSingleThreadExecutor().submit(Callable<NodeInfoAndConfig> {
registerProcess(DriverDSL.startNode(config, quasarJarPath, debugPort))
registerProcess(DriverDSL.startNode(nodeConfig, quasarJarPath, debugPort))
NodeInfoAndConfig(queryNodeInfo(apiAddress)!!, config)
})
}
@ -327,13 +332,13 @@ open class DriverDSL(
val apiAddress = portAllocation.nextHostAndPort()
val debugPort = if (isDebug) debugPortAllocation.nextPort() else null
val nodeDirectory = "$baseDirectory/$networkMapName"
val baseDirectory = driverDirectory / networkMapName
val config = ConfigHelper.loadConfig(
baseDirectoryPath = Paths.get(nodeDirectory),
baseDirectoryPath = baseDirectory,
allowMissingConfig = true,
configOverrides = mapOf(
"myLegalName" to networkMapName,
"basedir" to Paths.get(nodeDirectory).normalize().toString(),
"basedir" to baseDirectory.normalize().toString(),
"artemisAddress" to networkMapAddress.toString(),
"webAddress" to apiAddress.toString(),
"extraAdvertisedServiceIds" to "",
@ -342,7 +347,7 @@ open class DriverDSL(
)
log.info("Starting network-map-service")
registerProcess(startNode(config, quasarJarPath, debugPort))
registerProcess(startNode(FullNodeConfiguration(config), quasarJarPath, debugPort))
}
companion object {
@ -356,13 +361,12 @@ open class DriverDSL(
fun <A> pickA(array: Array<A>): A = array[Math.abs(Random().nextInt()) % array.size]
private fun startNode(
config: Config,
nodeConf: FullNodeConfiguration,
quasarJarPath: String,
debugPort: Int?
): Process {
val nodeConf = FullNodeConfiguration(config)
// Write node.conf
writeConfig(nodeConf.basedir, "node.conf", config)
writeConfig(nodeConf.basedir, "node.conf", nodeConf.config)
val className = "com.r3corda.node.MainKt" // cannot directly get class for this, so just use string
val separator = System.getProperty("file.separator")

View File

@ -26,7 +26,7 @@ import com.r3corda.node.services.events.NodeSchedulerService
import com.r3corda.node.services.events.ScheduledActivityObserver
import com.r3corda.node.services.identity.InMemoryIdentityService
import com.r3corda.node.services.keys.PersistentKeyManagementService
import com.r3corda.node.services.messaging.CordaRPCOps
import com.r3corda.node.services.messaging.RPCOps
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC
@ -252,7 +252,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
isPreviousCheckpointsPresent = true
false
}
startMessagingService(ServerRPCOps(services, smm, database))
startMessagingService(CordaRPCOpsImpl(services, smm, database))
runOnStop += Runnable { net.stop() }
_networkMapRegistrationFuture.setFuture(registerWithNetworkMap())
smm.start()
@ -434,7 +434,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo
protected abstract fun makeMessagingService(): MessagingServiceInternal
protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps)
protected abstract fun startMessagingService(rpcOps: RPCOps)
protected open fun initialiseStorageService(dir: Path): Pair<TxWritableStorageService, CheckpointStorage> {
val attachments = makeAttachmentStorage(dir)

View File

@ -1,7 +1,6 @@
package com.r3corda.node.internal
import com.r3corda.contracts.asset.Cash
import com.r3corda.core.contracts.InsufficientBalanceException
import com.r3corda.core.contracts.*
import com.r3corda.core.crypto.Party
import com.r3corda.core.crypto.SecureHash
@ -13,10 +12,7 @@ import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.TransactionBuilder
import com.r3corda.node.services.messaging.CordaRPCOps
import com.r3corda.node.services.messaging.StateMachineInfo
import com.r3corda.node.services.messaging.StateMachineUpdate
import com.r3corda.node.services.messaging.TransactionBuildResult
import com.r3corda.node.services.messaging.*
import com.r3corda.node.services.statemachine.StateMachineManager
import com.r3corda.node.utilities.databaseTransaction
import com.r3corda.protocols.BroadcastTransactionProtocol
@ -29,12 +25,16 @@ import java.security.KeyPair
* Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server
* thread (i.e. serially). Arguments are serialised and deserialised automatically.
*/
class ServerRPCOps(
class CordaRPCOpsImpl(
val services: ServiceHub,
val smm: StateMachineManager,
val database: Database
) : CordaRPCOps {
override val protocolVersion: Int = 0
companion object {
const val CASH_PERMISSION = "CASH"
}
override val protocolVersion: Int get() = 0
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
return services.networkMapCache.track()
@ -59,6 +59,7 @@ class ServerRPCOps(
changes.map { StateMachineUpdate.fromStateMachineChange(it) }
)
}
override fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> {
return databaseTransaction(database) {
services.storageService.stateMachineRecordedTransactionMapping.track()
@ -66,6 +67,7 @@ class ServerRPCOps(
}
override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult {
requirePermission(CASH_PERMISSION)
return databaseTransaction(database) {
when (command) {
is ClientToServiceCommand.IssueCash -> issueCash(command)

View File

@ -7,11 +7,13 @@ import com.r3corda.core.node.services.ServiceInfo
import com.r3corda.core.then
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.serialization.NodeClock
import com.r3corda.node.services.PropertiesFileRPCUserService
import com.r3corda.node.services.RPCUserService
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.FullNodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.messaging.CordaRPCOps
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.RPCOps
import com.r3corda.node.services.transactions.PersistentUniquenessProvider
import com.r3corda.node.servlets.AttachmentDownloadServlet
import com.r3corda.node.servlets.Config
@ -112,10 +114,13 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
private var shutdownThread: Thread? = null
private lateinit var userService: RPCUserService
override fun makeMessagingService(): MessagingServiceInternal {
userService = PropertiesFileRPCUserService(configuration.rpcUsersFile)
val serverAddr = with(configuration) {
messagingServerAddress ?: {
messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache)
messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService)
artemisAddress
}()
}
@ -124,7 +129,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database)
}
override fun startMessagingService(cordaRPCOps: CordaRPCOps) {
override fun startMessagingService(rpcOps: RPCOps) {
// Start up the embedded MQ server
messageBroker?.apply {
runOnStop += Runnable { messageBroker?.stop() }
@ -135,7 +140,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress:
// Start up the MQ client.
val net = net as NodeMessagingClient
net.configureWithDevSSLCertificate() // TODO: Client might need a separate certificate
net.start(cordaRPCOps)
net.start(rpcOps, userService)
}
private fun initWebServer(): Server {

View File

@ -0,0 +1,51 @@
package com.r3corda.node.services
import com.r3corda.core.exists
import com.r3corda.core.use
import java.nio.file.Path
import java.util.*
/**
* Service for retrieving [User] objects representing RPC users who are authorised to use the RPC system. A [User]
* contains their login username and password along with a set of permissions for RPC services they are allowed access
* to. These permissions are represented as [String]s to allow RPC implementations to add their own permissioning.
*/
interface RPCUserService {
fun getUser(usename: String): User?
val users: List<User>
}
// TODO If this sticks around then change it to use HOCON ...
// TODO ... and also store passwords as salted hashes.
// TODO Otherwise consider something like Apache Shiro
class PropertiesFileRPCUserService(file: Path) : RPCUserService {
private val _users: Map<String, User>
init {
_users = if (file.exists()) {
val properties = Properties()
file.use {
properties.load(it)
}
properties.map {
val parts = it.value.toString().split(delimiters = ",")
val username = it.key.toString()
require(!username.contains("""\.|\*|#""".toRegex())) { """Usernames cannot have the following characters: * . # """ }
val password = parts[0]
val permissions = parts.drop(1).map(String::toUpperCase).toSet()
User(username, password, permissions)
}.associateBy(User::username)
} else {
emptyMap()
}
}
override fun getUser(usename: String): User? = _users[usename]
override val users: List<User> get() = _users.values.toList()
}
data class User(val username: String, val password: String, val permissions: Set<String>) {
override fun toString(): String = "${javaClass.simpleName}($username, permissions=$permissions)"
}

View File

@ -11,7 +11,6 @@ import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.utilities.TestClock
import com.typesafe.config.Config
import java.nio.file.Path
import java.time.Clock
import java.util.*
interface NodeSSLConfiguration {
@ -25,6 +24,7 @@ interface NodeSSLConfiguration {
interface NodeConfiguration : NodeSSLConfiguration {
val basedir: Path
override val certificatesPath: Path get() = basedir / "certificates"
val rpcUsersFile: Path get() = basedir / "rpc-users.properties"
val myLegalName: String
val nearestCity: String
val emailAddress: String
@ -33,7 +33,7 @@ interface NodeConfiguration : NodeSSLConfiguration {
val devMode: Boolean
}
class FullNodeConfiguration(config: Config) : NodeConfiguration {
class FullNodeConfiguration(val config: Config) : NodeConfiguration {
override val basedir: Path by config
override val myLegalName: String by config
override val nearestCity: String by config

View File

@ -4,14 +4,12 @@ import com.google.common.net.HostAndPort
import com.r3corda.core.ThreadBox
import com.r3corda.core.crypto.AddressFormatException
import com.r3corda.core.div
import com.r3corda.core.exists
import com.r3corda.core.messaging.SingleMessageRecipient
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.use
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.services.RPCUserService
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_ROLE_NAME
import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.RPC_ROLE_NAME
import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_USER
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
@ -25,8 +23,6 @@ import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal
import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal
import rx.Subscription
import java.io.IOException
import java.nio.file.Files
import java.nio.file.Path
import java.security.Principal
import java.util.*
import javax.annotation.concurrent.ThreadSafe
@ -57,9 +53,11 @@ import javax.security.auth.spi.LoginModule
@ThreadSafe
class ArtemisMessagingServer(override val config: NodeConfiguration,
val myHostPort: HostAndPort,
val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent() {
val networkMapCache: NetworkMapCache,
val userService: RPCUserService) : ArtemisMessagingComponent() {
companion object {
val log = loggerFor<ArtemisMessagingServer>()
private val log = loggerFor<ArtemisMessagingServer>()
}
private class InnerState {
@ -187,20 +185,23 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess
isPersistIDCache = true
isPopulateValidatedUser = true
setupUserRoles()
configureQueueSecurity()
}
// This gives nodes full access and RPC clients only enough to do RPC
private fun ConfigurationImpl.setupUserRoles() {
// TODO COR-307
val nodeRole = Role(NODE_ROLE_NAME, true, true, true, true, true, true, true, true)
val clientRpcRole = restrictedRole(RPC_ROLE_NAME, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)
securityRoles = mapOf(
"#" to setOf(nodeRole),
"clients.*.rpc.responses.*" to setOf(nodeRole, clientRpcRole),
"clients.*.rpc.observations.*" to setOf(nodeRole, clientRpcRole),
RPC_REQUESTS_QUEUE to setOf(nodeRole, restrictedRole(RPC_ROLE_NAME, send = true))
)
private fun ConfigurationImpl.configureQueueSecurity() {
val nodeRPCSendRole = restrictedRole(NODE_USER, send = true) // The node needs to be able to send responses on the client queues
for ((username) in userService.users) {
// Clients need to be able to consume the responses on their queues, and they're also responsible for creating and destroying them
val clientRole = restrictedRole(username, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true)
securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf(nodeRPCSendRole, clientRole)
}
// TODO Restrict this down to just what the node needs
securityRoles["#"] = setOf(Role(NODE_USER, true, true, true, true, true, true, true, true))
securityRoles[RPC_REQUESTS_QUEUE] = setOf(
restrictedRole(NODE_USER, createNonDurableQueue = true, deleteNonDurableQueue = true),
restrictedRole(RPC_REQUESTS_QUEUE, send = true)) // Clients need to be able to send their requests
}
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
@ -211,19 +212,10 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager {
val rpcUsersFile = config.basedir / "rpc-users.properties"
if (!rpcUsersFile.exists()) {
val users = Properties()
users["user1"] = "test"
Files.newOutputStream(rpcUsersFile).use {
users.store(it, null)
}
}
val securityConfig = object : SecurityConfiguration() {
// Override to make it work with our login module
override fun getAppConfigurationEntry(name: String): Array<AppConfigurationEntry> {
val options = mapOf(NodeLoginModule.FILE_KEY to rpcUsersFile)
val options = mapOf(RPCUserService::class.java.name to userService)
return arrayOf(AppConfigurationEntry(name, REQUIRED, options))
}
}
@ -277,29 +269,25 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
}
// We could have used the built-in PropertiesLoginModule but that exposes a roles properties file. Roles are used
// for queue access control and our RPC users must only have access to the queues they need and this cannot be allowed
// to be modified.
class NodeLoginModule : LoginModule {
companion object {
const val FILE_KEY = "rpc-users-file"
const val NODE_ROLE_NAME = "NodeRole"
const val RPC_ROLE_NAME = "RpcRole"
const val NODE_USER = "Node"
}
private val users = Properties()
private var loginSucceeded: Boolean = false
private lateinit var subject: Subject
private lateinit var callbackHandler: CallbackHandler
private lateinit var principals: List<Principal>
private lateinit var userService: RPCUserService
private val principals = ArrayList<Principal>()
override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map<String, *>, options: Map<String, *>) {
this.subject = subject
this.callbackHandler = callbackHandler
val rpcUsersFile = options[FILE_KEY] as Path
if (rpcUsersFile.exists()) {
rpcUsersFile.use {
users.load(it)
}
}
userService = options[RPCUserService::class.java.name] as RPCUserService
}
override fun login(): Boolean {
@ -316,14 +304,16 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
val username = nameCallback.name ?: throw FailedLoginException("User name is null")
val receivedPassword = passwordCallback.password ?: throw FailedLoginException("Password is null")
val password = if (username == "Node") "Node" else users[username] ?: throw FailedLoginException("User does not exist")
val password = if (username == NODE_USER) "Node" else userService.getUser(username)?.password ?: throw FailedLoginException("User does not exist")
if (password != String(receivedPassword)) {
throw FailedLoginException("Password does not match")
}
principals = listOf(
UserPrincipal(username),
RolePrincipal(if (username == "Node") NODE_ROLE_NAME else RPC_ROLE_NAME))
principals += UserPrincipal(username)
principals += RolePrincipal(username) // The roles are configured using the usernames
if (username != NODE_USER) {
principals += RolePrincipal(RPC_REQUESTS_QUEUE) // This enables the RPC client to send requests
}
loginSucceeded = true
return loginSucceeded

View File

@ -113,7 +113,7 @@ interface CordaRPCOps : RPCOps {
fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>>
/**
* Executes the given command, possibly triggering cash creation etc.
* Executes the given command if the user is permissioned to do so, possibly triggering cash creation etc.
* TODO: The signature of this is weird because it's the remains of an old service call, we should have a call for each command instead.
*/
fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult

View File

@ -7,6 +7,7 @@ import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.opaque
import com.r3corda.core.utilities.loggerFor
import com.r3corda.core.utilities.trace
import com.r3corda.node.services.RPCUserService
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.utilities.*
@ -44,11 +45,6 @@ import javax.annotation.concurrent.ThreadSafe
* @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate
* that this is a NetworkMapService node which will be bound globally to the name "networkmap"
* @param executor An executor to run received message tasks upon.
* @param persistentInbox If true the inbox will be created persistent if not already created.
* If false the inbox queue will be transient, which is appropriate for UI clients for example.
* @param persistenceTx A lambda to wrap message processing in any transaction required by the persistence layer (e.g.
* a database transaction) without introducing a dependency on the actual solution and any parameters it requires
* in this class.
*/
@ThreadSafe
class NodeMessagingClient(override val config: NodeConfiguration,
@ -115,7 +111,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
})
fun start(rpcOps: CordaRPCOps) {
fun start(rpcOps: RPCOps, userService: RPCUserService) {
state.locked {
check(!started) { "start can't be called twice" }
started = true
@ -151,7 +147,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1")
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer("rpc.qremovals")
dispatcher = createRPCDispatcher(state, rpcOps)
dispatcher = createRPCDispatcher(rpcOps, userService)
}
}
@ -229,7 +225,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
override val data: ByteArray = body
override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp)
override val uniqueMessageId: UUID = uuid
override fun serialise(): ByteArray = body
override fun toString() = topic + "#" + data.opaque()
}
@ -389,15 +384,14 @@ class NodeMessagingClient(override val config: NodeConfiguration,
override val topicSession: TopicSession = topicSession
override val data: ByteArray = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val uniqueMessageId: UUID = uuid
override fun toString() = topicSession.toString() + "#" + String(data)
override fun toString() = "$topicSession#${String(data)}"
}
}
var dispatcher: RPCDispatcher? = null
private fun createRPCDispatcher(state: ThreadBox<InnerState>, ops: CordaRPCOps) = object : RPCDispatcher(ops) {
private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService) = object : RPCDispatcher(ops, userService) {
override fun send(bits: SerializedBytes<*>, toAddress: String) {
state.locked {
val msg = session!!.createMessage(false).apply {
@ -409,4 +403,4 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
}
}
}

View File

@ -5,30 +5,35 @@ import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.HashMultimap
import com.r3corda.core.ErrorOr
import com.r3corda.core.serialization.SerializedBytes
import com.r3corda.core.serialization.deserialize
import com.r3corda.core.serialization.serialize
import com.r3corda.core.utilities.debug
import com.r3corda.node.services.RPCUserService
import com.r3corda.node.services.User
import com.r3corda.node.utilities.AffinityExecutor
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import rx.Notification
import rx.Observable
import rx.Subscription
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.AtomicInteger
// TODO: Exposing the authenticated message sender.
/**
* Intended to service transient clients only (not p2p nodes) for short-lived, transient request/response pairs.
* If you need robustness, this is the wrong system. If you don't want a response, this is probably the
* wrong system (you could just send a message). If you want complex customisation of how requests/responses
* are handled, this is probably the wrong system.
*/
abstract class RPCDispatcher(val target: Any) {
private val methodTable = target.javaClass.declaredMethods.associateBy { it.name }
abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) {
// Throw an exception if there are overloaded methods
private val methodTable = ops.javaClass.declaredMethods.groupBy { it.name }.mapValues { it.value.single() }
private val queueToSubscription = HashMultimap.create<String, Subscription>()
// Created afresh for every RPC that is annotated as returning observables. Every time an observable is
@ -64,23 +69,24 @@ abstract class RPCDispatcher(val target: Any) {
}
fun dispatch(msg: ClientRPCRequestMessage) {
val (argBytes, replyTo, observationsTo, name) = msg
val maybeArgs = argBytes.deserialize<Array<Any>>()
val (argsBytes, replyTo, observationsTo, methodName) = msg
rpcLog.debug { "-> RPC -> $name(${maybeArgs.joinToString()}) [reply to $replyTo]" }
val response: ErrorOr<Any?> = ErrorOr.catch {
val method = methodTable[name] ?: throw RPCException("Received RPC for unknown method $name - possible client/server version skew?")
val method = methodTable[methodName] ?: throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?")
if (method.isAnnotationPresent(RPCReturnsObservables::class.java) && observationsTo == null)
throw RPCException("Received RPC without any destination for observations, but the RPC returns observables")
val args = argsBytes.deserialize()
rpcLog.debug { "-> RPC -> $methodName(${args.joinToString()}) [reply to $replyTo]" }
try {
method.invoke(target, *maybeArgs)
method.invoke(ops, *args)
} catch (e: InvocationTargetException) {
throw e.cause!!
}
}
rpcLog.debug { "<- RPC <- $name = $response " }
rpcLog.debug { "<- RPC <- $methodName = $response " }
val kryo = createRPCKryo(observableSerializer = if (observationsTo != null) ObservableSerializer(observationsTo) else null)
@ -88,7 +94,7 @@ abstract class RPCDispatcher(val target: Any) {
val responseBits = try {
response.serialize(kryo)
} catch (e: KryoException) {
rpcLog.error("Failed to respond to inbound RPC $name", e)
rpcLog.error("Failed to respond to inbound RPC $methodName", e)
ErrorOr.of(e).serialize(kryo)
}
send(responseBits, replyTo)
@ -116,14 +122,48 @@ abstract class RPCDispatcher(val target: Any) {
onExecutor.execute {
try {
val rpcMessage = msg.toRPCRequestMessage()
CURRENT_RPC_USER.set(rpcMessage.user)
dispatch(rpcMessage)
} catch(e: RPCException) {
rpcLog.warn("Received malformed client RPC message: ${e.message}")
rpcLog.trace("RPC exception", e)
} catch(e: Throwable) {
rpcLog.error("Uncaught exception when dispatching client RPC", e)
} finally {
CURRENT_RPC_USER.remove()
}
}
}
}
private fun ClientMessage.requiredString(name: String): String {
return getStringProperty(name) ?: throw RPCException("missing $name property")
}
/** Convert an Artemis [ClientMessage] to a MQ-neutral [ClientRPCRequestMessage]. */
private fun ClientMessage.toRPCRequestMessage(): ClientRPCRequestMessage {
val user = getUser(this)
val replyTo = getAuthenticatedAddress(user, ClientRPCRequestMessage.REPLY_TO, true)!!
val observationsTo = getAuthenticatedAddress(user, ClientRPCRequestMessage.OBSERVATIONS_TO, false)
val argBytes = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }
if (argBytes.isEmpty()) {
throw RPCException("empty serialized args")
}
val methodName = requiredString(ClientRPCRequestMessage.METHOD_NAME)
return ClientRPCRequestMessage(SerializedBytes(argBytes), replyTo, observationsTo, methodName, user)
}
@VisibleForTesting
protected open fun getUser(message: ClientMessage): User {
return userService.getUser(message.requiredString(Message.HDR_VALIDATED_USER.toString()))!!
}
private fun ClientMessage.getAuthenticatedAddress(user: User, property: String, required: Boolean): String? {
val address: String? = if (required) requiredString(property) else getStringProperty(property)
val expectedAddressPrefix = "${ArtemisMessagingComponent.CLIENTS_PREFIX}${user.username}."
if (address != null && !address.startsWith(expectedAddressPrefix)) {
throw RPCException("$property address does not match up with the user")
}
return address
}
}

View File

@ -1,3 +1,5 @@
@file:JvmName("RPCStructures")
package com.r3corda.node.services.messaging
import com.esotericsoftware.kryo.Kryo
@ -22,12 +24,13 @@ import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.*
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.transactions.WireTransaction
import com.r3corda.node.services.User
import de.javakaffee.kryoserializers.ArraysAsListSerializer
import de.javakaffee.kryoserializers.guava.*
import net.i2p.crypto.eddsa.EdDSAPrivateKey
import net.i2p.crypto.eddsa.EdDSAPublicKey
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.objenesis.strategy.StdInstantiatorStrategy
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import rx.Notification
import rx.Observable
@ -35,7 +38,7 @@ import java.time.Instant
import java.util.*
/** Global RPC logger */
val rpcLog by lazy { LoggerFactory.getLogger("com.r3corda.rpc") }
val rpcLog: Logger by lazy { LoggerFactory.getLogger("com.r3corda.rpc") }
/** Used in the RPC wire protocol to wrap an observation with the handle of the observable it's intended for. */
data class MarshalledObservation(val forHandle: Int, val what: Notification<*>)
@ -59,7 +62,8 @@ data class ClientRPCRequestMessage(
val args: SerializedBytes<Array<Any>>,
val replyToAddress: String,
val observationsToAddress: String?,
val methodName: String
val methodName: String,
val user: User
) {
companion object {
const val REPLY_TO = "reply-to"
@ -77,6 +81,20 @@ interface RPCOps {
val protocolVersion: Int
}
/**
* This is available to RPC implementations to query the validated [User] that is calling it. Each user has a set of
* permissions they're entitled to which can be used to control access.
*/
@JvmField
val CURRENT_RPC_USER: ThreadLocal<User> = ThreadLocal()
/** Helper method which checks that the current RPC user is entitled for the given permission. Throws a [PermissionException] otherwise. */
fun requirePermission(permission: String) {
if (permission !in CURRENT_RPC_USER.get().permissions) {
throw PermissionException("User not permissioned for $permission")
}
}
/**
* Thrown to indicate a fatal error in the RPC system itself, as opposed to an error generated by the invoked
* method.
@ -86,26 +104,12 @@ open class RPCException(msg: String, cause: Throwable?) : RuntimeException(msg,
class DeadlineExceeded(rpcName: String) : RPCException("Deadline exceeded on call to $rpcName")
}
/** Convert an Artemis [ClientMessage] to a MQ-neutral [ClientRPCRequestMessage]. */
fun ClientMessage.toRPCRequestMessage(): ClientRPCRequestMessage {
fun ClientMessage.requiredString(name: String): String = getStringProperty(name) ?: throw RPCException("Malformed request message: missing $name property")
val methodName = requiredString(ClientRPCRequestMessage.METHOD_NAME)
// TODO: Look up the authenticated sender identity once we upgrade to Artemis 1.4 and use that instead.
// This current approach is insecure: one client could send an RPC with a reply-to address owned by
// another, although they'd have to be able to figure out the other client ID first.
// We also need that to figure out what RPCs are allowed.
val replyTo = requiredString(ClientRPCRequestMessage.REPLY_TO)
val observationsTo = getStringProperty(ClientRPCRequestMessage.OBSERVATIONS_TO)
val argBytes = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }
check(argBytes.isNotEmpty())
return ClientRPCRequestMessage(SerializedBytes(argBytes), replyTo, observationsTo, methodName)
}
class PermissionException(msg: String) : RuntimeException(msg)
// The Kryo used for the RPC wire protocol. Every type in the wire protocol is listed here explicitly.
// This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes,
// because we can see everything we're using in one place.
private class RPCKryo(private val observableSerializer: Serializer<Observable<Any>>? = null) : Kryo() {
private class RPCKryo(observableSerializer: Serializer<Observable<Any>>? = null) : Kryo() {
init {
isRegistrationRequired = true
// Allow construction of objects using a JVM backdoor that skips invoking the constructors, if there is no
@ -173,7 +177,11 @@ private class RPCKryo(private val observableSerializer: Serializer<Observable<An
register(NetworkMapCache.MapChange::class.java)
register(NetworkMapCache.MapChangeType::class.java)
register(ArtemisMessagingComponent.NodeAddress::class.java,
read = { kryo, input -> ArtemisMessagingComponent.NodeAddress(parsePublicKeyBase58(kryo.readObject(input, String::class.java)), kryo.readObject(input, HostAndPort::class.java)) },
read = { kryo, input ->
ArtemisMessagingComponent.NodeAddress(
parsePublicKeyBase58(kryo.readObject(input, String::class.java)),
kryo.readObject(input, HostAndPort::class.java))
},
write = { kryo, output, nodeAddress ->
kryo.writeObject(output, nodeAddress.identity.toBase58String())
kryo.writeObject(output, nodeAddress.hostAndPort)
@ -193,13 +201,8 @@ private class RPCKryo(private val observableSerializer: Serializer<Observable<An
// Helper method, attempt to reduce boiler plate code
private fun <T> register(type: Class<T>, read: (Kryo, Input) -> T, write: (Kryo, Output, T) -> Unit) {
register(type, object : Serializer<T>() {
override fun read(kryo: Kryo, input: Input, type: Class<T>?): T {
return read(kryo, input)
}
override fun write(kryo: Kryo, output: Output, o: T) {
write(kryo, output, o)
}
override fun read(kryo: Kryo, input: Input, type: Class<T>): T = read(kryo, input)
override fun write(kryo: Kryo, output: Output, o: T) = write(kryo, output, o)
})
}

View File

@ -7,7 +7,10 @@ import com.r3corda.core.node.services.Vault
import com.r3corda.core.protocols.StateMachineRunId
import com.r3corda.core.serialization.OpaqueBytes
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.node.internal.ServerRPCOps
import com.r3corda.node.internal.CordaRPCOpsImpl
import com.r3corda.node.services.User
import com.r3corda.node.services.messaging.CURRENT_RPC_USER
import com.r3corda.node.services.messaging.PermissionException
import com.r3corda.node.services.messaging.StateMachineUpdate
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.SimpleNotaryService
@ -17,20 +20,19 @@ import com.r3corda.testing.expectEvents
import com.r3corda.testing.node.MockNetwork
import com.r3corda.testing.node.MockNetwork.MockNode
import com.r3corda.testing.sequence
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.Before
import org.junit.Test
import rx.Observable
import kotlin.test.assertEquals
import kotlin.test.assertFalse
/**
* Unit tests for the node monitoring service.
*/
class ServerRPCTest {
class CordaRPCOpsImplTest {
lateinit var network: MockNetwork
lateinit var aliceNode: MockNode
lateinit var notaryNode: MockNode
lateinit var rpc: ServerRPCOps
lateinit var rpc: CordaRPCOpsImpl
lateinit var stateMachineUpdates: Observable<StateMachineUpdate>
lateinit var transactions: Observable<SignedTransaction>
lateinit var vaultUpdates: Observable<Vault.Update>
@ -41,7 +43,8 @@ class ServerRPCTest {
val networkMap = network.createNode(advertisedServices = ServiceInfo(NetworkMapService.type))
aliceNode = network.createNode(networkMapAddress = networkMap.info.address)
notaryNode = network.createNode(advertisedServices = ServiceInfo(SimpleNotaryService.type), networkMapAddress = networkMap.info.address)
rpc = ServerRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database)
rpc = CordaRPCOpsImpl(aliceNode.services, aliceNode.smm, aliceNode.database)
CURRENT_RPC_USER.set(User("user", "pwd", permissions = setOf(CordaRPCOpsImpl.CASH_PERMISSION)))
stateMachineUpdates = rpc.stateMachinesAndUpdates().second
transactions = rpc.verifiedTransactions().second
@ -96,7 +99,7 @@ class ServerRPCTest {
}
@Test
fun issueAndMoveWorks() {
fun `issue and move`() {
rpc.executeCommand(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD),
@ -174,4 +177,17 @@ class ServerRPCTest {
)
}
}
@Test
fun `cash command by user not permissioned for cash`() {
CURRENT_RPC_USER.set(User("user", "pwd", permissions = emptySet()))
assertThatExceptionOfType(PermissionException::class.java).isThrownBy {
rpc.executeCommand(ClientToServiceCommand.IssueCash(
amount = Amount(100, USD),
issueRef = OpaqueBytes(ByteArray(1, { 1 })),
recipient = aliceNode.info.legalIdentity,
notary = notaryNode.info.notaryIdentity
))
}
}
}

View File

@ -1,22 +1,15 @@
package com.r3corda.node.services
import com.google.common.net.HostAndPort
import com.r3corda.core.contracts.ClientToServiceCommand
import com.r3corda.core.contracts.ContractState
import com.r3corda.core.contracts.StateAndRef
import com.r3corda.core.crypto.SecureHash
import com.r3corda.core.crypto.generateKeyPair
import com.r3corda.core.messaging.Message
import com.r3corda.core.messaging.createMessage
import com.r3corda.core.node.NodeInfo
import com.r3corda.core.node.services.DEFAULT_SESSION_ID
import com.r3corda.core.node.services.NetworkMapCache
import com.r3corda.core.node.services.StateMachineTransactionMapping
import com.r3corda.core.node.services.Vault
import com.r3corda.core.transactions.SignedTransaction
import com.r3corda.core.utilities.LogHelper
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.messaging.*
import com.r3corda.node.services.messaging.ArtemisMessagingServer
import com.r3corda.node.services.messaging.NodeMessagingClient
import com.r3corda.node.services.messaging.RPCOps
import com.r3corda.node.services.network.InMemoryNetworkMapCache
import com.r3corda.node.services.transactions.PersistentUniquenessProvider
import com.r3corda.node.utilities.AffinityExecutor
@ -31,7 +24,6 @@ import org.junit.Before
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import rx.Observable
import java.io.Closeable
import java.net.ServerSocket
import java.nio.file.Path
@ -51,51 +43,22 @@ class ArtemisMessagingTests {
lateinit var config: NodeConfiguration
lateinit var dataSource: Closeable
lateinit var database: Database
lateinit var userService: RPCUserService
var messagingClient: NodeMessagingClient? = null
var messagingServer: ArtemisMessagingServer? = null
val networkMapCache = InMemoryNetworkMapCache()
val rpcOps = object : CordaRPCOps {
override val protocolVersion: Int
get() = throw UnsupportedOperationException()
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun vaultAndUpdates(): Pair<List<StateAndRef<ContractState>>, Observable<Vault.Update>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun verifiedTransactions(): Pair<List<SignedTransaction>, Observable<SignedTransaction>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun stateMachineRecordedTransactionMapping(): Pair<List<StateMachineTransactionMapping>, Observable<StateMachineTransactionMapping>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun networkMapUpdates(): Pair<List<NodeInfo>, Observable<NetworkMapCache.MapChange>> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
override fun getVaultTransactionNotes(txnId: SecureHash): Iterable<String> {
throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates.
}
val rpcOps = object : RPCOps {
override val protocolVersion: Int get() = throw UnsupportedOperationException()
}
@Before
fun setUp() {
userService = PropertiesFileRPCUserService(temporaryFolder.newFile().toPath())
// TODO: create a base class that provides a default implementation
config = object : NodeConfiguration {
override val basedir: Path = temporaryFolder.newFolder().toPath()
@ -134,7 +97,7 @@ class ArtemisMessagingTests {
val remoteServerAddress = freeLocalHostAndPort()
createMessagingServer(remoteServerAddress).start()
createMessagingClient(server = remoteServerAddress).start(rpcOps)
createMessagingClient(server = remoteServerAddress).start(rpcOps, userService)
}
@Test
@ -145,14 +108,14 @@ class ArtemisMessagingTests {
createMessagingServer(serverAddress).start()
messagingClient = createMessagingClient(server = invalidServerAddress)
assertThatThrownBy { messagingClient!!.start(rpcOps) }
assertThatThrownBy { messagingClient!!.start(rpcOps, userService) }
messagingClient = null
}
@Test
fun `client should connect to local server`() {
createMessagingServer().start()
createMessagingClient().start(rpcOps)
createMessagingClient().start(rpcOps, userService)
}
@Test
@ -162,7 +125,7 @@ class ArtemisMessagingTests {
createMessagingServer().start()
val messagingClient = createMessagingClient()
messagingClient.start(rpcOps)
messagingClient.start(rpcOps, userService)
thread { messagingClient.run() }
messagingClient.addMessageHandler(topic) { message, r ->
@ -187,7 +150,7 @@ class ArtemisMessagingTests {
}
private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer {
return ArtemisMessagingServer(config, local, networkMapCache).apply {
return ArtemisMessagingServer(config, local, networkMapCache, userService).apply {
configureWithDevSSLCertificate()
messagingServer = this
}

View File

@ -0,0 +1,81 @@
package com.r3corda.node.services
import com.google.common.jimfs.Configuration.unix
import com.google.common.jimfs.Jimfs
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.After
import org.junit.Test
import java.nio.file.Files
class PropertiesFileRPCUserServiceTest {
private val fileSystem = Jimfs.newFileSystem(unix())
private val file = fileSystem.getPath("users.properties")
@After
fun cleanUp() {
fileSystem.close()
}
@Test
fun `file doesn't exist`() {
val service = PropertiesFileRPCUserService(file)
assertThat(service.getUser("user")).isNull()
assertThat(service.users).isEmpty()
}
@Test
fun `empty file`() {
val service = loadWithContents()
assertThat(service.getUser("user")).isNull()
assertThat(service.users).isEmpty()
}
@Test
fun `no permissions`() {
val service = loadWithContents("user=password")
assertThat(service.getUser("user")).isEqualTo(User("user", "password", permissions = emptySet()))
assertThat(service.users).containsOnly(User("user", "password", permissions = emptySet()))
}
@Test
fun `single permission, which is in lower case`() {
val service = loadWithContents("user=password,cash")
assertThat(service.getUser("user")?.permissions).containsOnly("CASH")
}
@Test
fun `two permissions, which are upper case`() {
val service = loadWithContents("user=password,CASH,ADMIN")
assertThat(service.getUser("user")?.permissions).containsOnly("CASH", "ADMIN")
}
@Test
fun `two users`() {
val service = loadWithContents("user=password,ADMIN", "user2=password2")
assertThat(service.getUser("user")).isNotNull()
assertThat(service.getUser("user2")).isNotNull()
assertThat(service.users).containsOnly(
User("user", "password", permissions = setOf("ADMIN")),
User("user2", "password2", permissions = emptySet()))
}
@Test
fun `unknown user`() {
val service = loadWithContents("user=password")
assertThat(service.getUser("test")).isNull()
}
@Test
fun `Artemis special characters not permitted in usernames`() {
assertThatThrownBy { loadWithContents("user.name=password") }
assertThatThrownBy { loadWithContents("user*name=password") }
assertThatThrownBy { loadWithContents("user#name=password") }
}
private fun loadWithContents(vararg lines: String): PropertiesFileRPCUserService {
Files.write(file, lines.asList())
return PropertiesFileRPCUserService(file)
}
}

View File

@ -288,10 +288,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria
override val topicSession: TopicSession get() = topicSession
override val data: ByteArray get() = data
override val debugTimestamp: Instant = Instant.now()
override fun serialise(): ByteArray = this.serialise()
override val uniqueMessageId: UUID = uuid
override fun toString() = topicSession.toString() + "#" + String(data)
override fun toString() = "$topicSession#${String(data)}"
}
}

View File

@ -13,10 +13,12 @@ import com.r3corda.core.random63BitValue
import com.r3corda.core.utilities.DUMMY_NOTARY_KEY
import com.r3corda.core.utilities.loggerFor
import com.r3corda.node.internal.AbstractNode
import com.r3corda.node.internal.CordaRPCOpsImpl
import com.r3corda.node.services.api.MessagingServiceInternal
import com.r3corda.node.services.config.NodeConfiguration
import com.r3corda.node.services.keys.E2ETestKeyManagementService
import com.r3corda.node.services.messaging.CordaRPCOps
import com.r3corda.node.services.messaging.RPCOps
import com.r3corda.node.services.network.InMemoryNetworkMapService
import com.r3corda.node.services.network.NetworkMapService
import com.r3corda.node.services.transactions.InMemoryUniquenessProvider
@ -129,7 +131,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false,
override fun makeKeyManagementService(): KeyManagementService = E2ETestKeyManagementService(partyKeys)
override fun startMessagingService(cordaRPCOps: CordaRPCOps) {
override fun startMessagingService(rpcOps: RPCOps) {
// Nothing to do
}