diff --git a/client/src/integration-test/kotlin/com/r3corda/client/CordaRPCClientTest.kt b/client/src/integration-test/kotlin/com/r3corda/client/CordaRPCClientTest.kt index b60a4245fe..a6d376635a 100644 --- a/client/src/integration-test/kotlin/com/r3corda/client/CordaRPCClientTest.kt +++ b/client/src/integration-test/kotlin/com/r3corda/client/CordaRPCClientTest.kt @@ -2,6 +2,7 @@ package com.r3corda.client import com.r3corda.core.random63BitValue import com.r3corda.node.driver.driver +import com.r3corda.node.services.User import com.r3corda.node.services.config.configureTestSSL import com.r3corda.node.services.messaging.ArtemisMessagingComponent.Companion.toHostAndPort import org.apache.activemq.artemis.api.core.ActiveMQSecurityException @@ -14,8 +15,7 @@ import kotlin.concurrent.thread class CordaRPCClientTest { - private val validUsername = "user1" - private val validPassword = "test" + private val rpcUser = User("user1", "test", permissions = emptySet()) private val stopDriver = CountDownLatch(1) private var driverThread: Thread? = null private lateinit var client: CordaRPCClient @@ -25,7 +25,7 @@ class CordaRPCClientTest { val driverStarted = CountDownLatch(1) driverThread = thread { driver { - val driverInfo = startNode().get() + val driverInfo = startNode(rpcUsers = listOf(rpcUser)).get() client = CordaRPCClient(toHostAndPort(driverInfo.nodeInfo.address), configureTestSSL()) driverStarted.countDown() stopDriver.await() @@ -42,20 +42,20 @@ class CordaRPCClientTest { @Test fun `log in with valid username and password`() { - client.start(validUsername, validPassword) + client.start(rpcUser.username, rpcUser.password) } @Test fun `log in with unknown user`() { assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { - client.start(random63BitValue().toString(), validPassword) + client.start(random63BitValue().toString(), rpcUser.password) } } @Test fun `log in with incorrect password`() { assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy { - client.start(validUsername, random63BitValue().toString()) + client.start(rpcUser.username, random63BitValue().toString()) } } diff --git a/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt b/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt index 3c59bed2db..9172678d78 100644 --- a/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt +++ b/client/src/integration-test/kotlin/com/r3corda/client/NodeMonitorModelTest.kt @@ -13,6 +13,8 @@ import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.serialization.OpaqueBytes import com.r3corda.core.transactions.SignedTransaction import com.r3corda.node.driver.driver +import com.r3corda.node.internal.CordaRPCOpsImpl +import com.r3corda.node.services.User import com.r3corda.node.services.config.configureTestSSL import com.r3corda.node.services.messaging.StateMachineUpdate import com.r3corda.node.services.transactions.SimpleNotaryService @@ -48,7 +50,8 @@ class NodeMonitorModelTest { val driverStarted = CountDownLatch(1) driverThread = thread { driver { - val aliceNodeFuture = startNode("Alice") + val cashUser = User("user1", "test", permissions = setOf(CordaRPCOpsImpl.CASH_PERMISSION)) + val aliceNodeFuture = startNode("Alice", rpcUsers = listOf(cashUser)) val notaryNodeFuture = startNode("Notary", advertisedServices = setOf(ServiceInfo(SimpleNotaryService.type))) aliceNode = aliceNodeFuture.get().nodeInfo @@ -64,7 +67,7 @@ class NodeMonitorModelTest { networkMapUpdates = monitor.networkMap.bufferUntilSubscribed() clientToService = monitor.clientToService - monitor.register(aliceNode, configureTestSSL(), "user1", "test") + monitor.register(aliceNode, configureTestSSL(), cashUser.username, cashUser.password) driverStarted.countDown() stopDriver.await() } @@ -79,7 +82,7 @@ class NodeMonitorModelTest { } @Test - fun testNetworkMapUpdate() { + fun `network map update`() { newNode("Bob") newNode("Charlie") networkMapUpdates.expectEvents(isStrict = false) { @@ -99,7 +102,7 @@ class NodeMonitorModelTest { } @Test - fun cashIssueWorksEndToEnd() { + fun `cash issue works end to end`() { clientToService.onNext(ClientToServiceCommand.IssueCash( amount = Amount(100, USD), issueRef = OpaqueBytes(ByteArray(1, { 1 })), @@ -124,8 +127,7 @@ class NodeMonitorModelTest { } @Test - fun issueAndMoveWorks() { - + fun `cash issue and move`() { clientToService.onNext(ClientToServiceCommand.IssueCash( amount = Amount(100, USD), issueRef = OpaqueBytes(ByteArray(1, { 1 })), diff --git a/client/src/main/kotlin/com/r3corda/client/CordaRPCClient.kt b/client/src/main/kotlin/com/r3corda/client/CordaRPCClient.kt index 18cf403159..7a441fc35a 100644 --- a/client/src/main/kotlin/com/r3corda/client/CordaRPCClient.kt +++ b/client/src/main/kotlin/com/r3corda/client/CordaRPCClient.kt @@ -5,8 +5,10 @@ import com.r3corda.client.impl.CordaRPCClientImpl import com.r3corda.core.ThreadBox import com.r3corda.node.services.config.NodeSSLConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingComponent +import com.r3corda.node.services.messaging.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX import com.r3corda.node.services.messaging.CordaRPCOps import com.r3corda.node.services.messaging.RPCException +import com.r3corda.node.services.messaging.rpcLog import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException import org.apache.activemq.artemis.api.core.client.ActiveMQClient import org.apache.activemq.artemis.api.core.client.ClientSession @@ -25,10 +27,6 @@ import kotlin.concurrent.thread */ @ThreadSafe class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfiguration) : Closeable, ArtemisMessagingComponent() { - companion object { - private val rpcLog = LoggerFactory.getLogger("com.r3corda.rpc") - } - // TODO: Certificate handling for clients needs more work. private inner class State { var running = false @@ -38,16 +36,6 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur } private val state = ThreadBox(State()) - /** - * An ID that we used to identify this connection on the server side: kind of like a local port number but - * it persists for the lifetime of this process and survives short TCP connection interruptions. Is -1 - * until [start] is called. - */ - var myID: Int = -1 - private set - - private val myAddressPrefix: String get() = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$myID" - /** Opens the connection to the server and registers a JVM shutdown hook to cleanly disconnect. */ @Throws(ActiveMQNotConnectedException::class) fun start(username: String, password: String) { @@ -57,16 +45,14 @@ class CordaRPCClient(val host: HostAndPort, override val config: NodeSSLConfigur val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport(ConnectionDirection.OUTBOUND, host.hostText, host.port)) serverLocator.threadPoolMaxSize = 1 sessionFactory = serverLocator.createSessionFactory() - // We use our initial connection ID as the queue namespace. - myID = sessionFactory.connection.id as Int and 0x000000FFFFFF session = sessionFactory.createSession(username, password, false, true, true, serverLocator.isPreAcknowledge, serverLocator.ackBatchSize) session.start() - clientImpl = CordaRPCClientImpl(session, state.lock, myAddressPrefix) + clientImpl = CordaRPCClientImpl(session, state.lock, username) running = true // We will use the ID in strings so strip the sign bit. } - Runtime.getRuntime().addShutdownHook(thread(start = false) { + Runtime.getRuntime().addShutdownHook(Thread { close() }) } diff --git a/client/src/main/kotlin/com/r3corda/client/impl/CordaRPCClientImpl.kt b/client/src/main/kotlin/com/r3corda/client/impl/CordaRPCClientImpl.kt index 2850eede20..2a82cdd595 100644 --- a/client/src/main/kotlin/com/r3corda/client/impl/CordaRPCClientImpl.kt +++ b/client/src/main/kotlin/com/r3corda/client/impl/CordaRPCClientImpl.kt @@ -16,6 +16,7 @@ import com.r3corda.core.utilities.debug import com.r3corda.core.utilities.trace import com.r3corda.node.services.messaging.* import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException +import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.client.ClientConsumer import org.apache.activemq.artemis.api.core.client.ClientMessage @@ -42,7 +43,7 @@ import kotlin.reflect.jvm.javaMethod */ class CordaRPCClientImpl(private val session: ClientSession, private val sessionLock: ReentrantLock, - private val myAddressPrefix: String) { + private val username: String) { companion object { private val closeableCloseMethod = Closeable::close.javaMethod private val autocloseableCloseMethod = AutoCloseable::close.javaMethod @@ -113,18 +114,20 @@ class CordaRPCClientImpl(private val session: ClientSession, */ @ThreadSafe private inner class RPCProxyHandler(private val timeout: Duration?) : InvocationHandler, Closeable { - private val proxyAddress = "$myAddressPrefix.rpc.responses.${random63BitValue()}" + private val proxyAddress = constructAddress() private val consumer: ClientConsumer var serverProtocolVersion = 0 init { - consumer = sessionLock.withLock{ + consumer = sessionLock.withLock { session.createTemporaryQueue(proxyAddress, proxyAddress) session.createConsumer(proxyAddress) } } + private fun constructAddress() = "${ArtemisMessagingComponent.CLIENTS_PREFIX}$username.rpc.${random63BitValue()}" + @Synchronized override fun invoke(proxy: Any, method: Method, args: Array?): Any? { if (isCloseInvocation(method)) { @@ -187,13 +190,12 @@ class CordaRPCClientImpl(private val session: ClientSession, sessionLock.withLock { val msg = createMessage(method) val kryo = if (returnsObservables) maybePrepareForObservables(location, method, msg) else null - val argsArray = args ?: Array(0) { null } - val serializedBytes = try { - argsArray.serialize() + val serializedArgs = try { + (args ?: emptyArray()).serialize() } catch (e: KryoException) { throw RPCException("Could not serialize RPC arguments", e) } - msg.writeBodyBufferBytes(serializedBytes.bits) + msg.writeBodyBufferBytes(serializedArgs.bits) producer!!.send(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE, msg) return kryo } @@ -201,12 +203,12 @@ class CordaRPCClientImpl(private val session: ClientSession, private fun maybePrepareForObservables(location: Throwable, method: Method, msg: ClientMessage): Kryo { // Create a temporary queue just for the emissions on any observables that are returned. - val qName = "$myAddressPrefix.rpc.observations.${random63BitValue()}" - session.createTemporaryQueue(qName, qName) - msg.putStringProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, qName) + val observationsQueueName = constructAddress() + session.createTemporaryQueue(observationsQueueName, observationsQueueName) + msg.putStringProperty(ClientRPCRequestMessage.OBSERVATIONS_TO, observationsQueueName) // And make sure that we deserialise observable handles so that they're linked to the right // queue. Also record a bit of metadata for debugging purposes. - return createRPCKryo(observableSerializer = ObservableDeserializer(qName, method.name, location)) + return createRPCKryo(observableSerializer = ObservableDeserializer(observationsQueueName, method.name, location)) } private fun createMessage(method: Method): ClientMessage { @@ -214,7 +216,7 @@ class CordaRPCClientImpl(private val session: ClientSession, putStringProperty(ClientRPCRequestMessage.METHOD_NAME, method.name) putStringProperty(ClientRPCRequestMessage.REPLY_TO, proxyAddress) // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) } } diff --git a/client/src/test/kotlin/com/r3corda/client/ClientRPCInfrastructureTests.kt b/client/src/test/kotlin/com/r3corda/client/ClientRPCInfrastructureTests.kt index c45f3bb61c..daf4fc9091 100644 --- a/client/src/test/kotlin/com/r3corda/client/ClientRPCInfrastructureTests.kt +++ b/client/src/test/kotlin/com/r3corda/client/ClientRPCInfrastructureTests.kt @@ -1,13 +1,20 @@ package com.r3corda.client import com.r3corda.client.impl.CordaRPCClientImpl +import com.r3corda.core.millis +import com.r3corda.core.random63BitValue import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.utilities.LogHelper +import com.r3corda.node.services.RPCUserService +import com.r3corda.node.services.User import com.r3corda.node.services.messaging.* +import com.r3corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE import com.r3corda.node.utilities.AffinityExecutor +import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.api.core.TransportConfiguration import org.apache.activemq.artemis.api.core.client.ActiveMQClient +import org.apache.activemq.artemis.api.core.client.ClientMessage import org.apache.activemq.artemis.api.core.client.ClientProducer import org.apache.activemq.artemis.api.core.client.ClientSession import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl @@ -15,12 +22,14 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.After import org.junit.Before import org.junit.Test import rx.Observable import rx.subjects.PublishSubject import java.io.Closeable +import java.time.Duration import java.util.* import java.util.concurrent.CountDownLatch import java.util.concurrent.LinkedBlockingQueue @@ -28,6 +37,7 @@ import java.util.concurrent.locks.ReentrantLock import kotlin.test.assertEquals import kotlin.test.assertFailsWith import kotlin.test.assertTrue +import kotlin.test.fail class ClientRPCInfrastructureTests { // TODO: Test that timeouts work @@ -37,7 +47,9 @@ class ClientRPCInfrastructureTests { lateinit var clientSession: ClientSession lateinit var producer: ClientProducer lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor - lateinit var proxy: ITestOps + var proxy: TestOps? = null + + private val authenticatedUser = User("test", "password", permissions = setOf()) @Before fun setup() { @@ -54,20 +66,25 @@ class ClientRPCInfrastructureTests { serverSession = sessionFactory.createSession() serverSession.start() - serverSession.createTemporaryQueue(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE, ArtemisMessagingComponent.RPC_REQUESTS_QUEUE) + serverSession.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE) producer = serverSession.createProducer() - val dispatcher = object : RPCDispatcher(TestOps()) { + val userService = object : RPCUserService { + override fun getUser(usename: String): User? = throw UnsupportedOperationException() + override val users: List get() = throw UnsupportedOperationException() + } + val dispatcher = object : RPCDispatcher(TestOpsImpl(), userService) { override fun send(bits: SerializedBytes<*>, toAddress: String) { val msg = serverSession.createMessage(false).apply { writeBodyBufferBytes(bits.bits) // Use the magic deduplication property built into Artemis as our message identity too - putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) + putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString())) } producer.send(toAddress, msg) } + override fun getUser(message: ClientMessage): User = authenticatedUser } serverThread = AffinityExecutor.ServiceAffinityExecutor("unit-tests-rpc-dispatch-thread", 1) - val serverConsumer = serverSession.createConsumer(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE) + val serverConsumer = serverSession.createConsumer(RPC_REQUESTS_QUEUE) serverSession.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 'BINDING_REMOVED'") val serverNotifConsumer = serverSession.createConsumer("rpc.qremovals") dispatcher.start(serverConsumer, serverNotifConsumer, serverThread) @@ -75,21 +92,27 @@ class ClientRPCInfrastructureTests { clientSession = sessionFactory.createSession() clientSession.start() - LogHelper.setLevel("+com.r3corda.rpc"/*, "+org.apache.activemq"*/) - - proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), "tests").proxyFor(ITestOps::class.java) + LogHelper.setLevel("+com.r3corda.rpc") } + private fun createProxyUsingReplyTo(username: String, timeout: Duration? = null): TestOps { + val proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), username).proxyFor(TestOps::class.java, timeout = timeout) + this.proxy = proxy + return proxy + } + + private fun createProxyUsingAuthenticatedReplyTo() = createProxyUsingReplyTo(authenticatedUser.username) + @After fun shutdown() { - (proxy as Closeable).close() + (proxy as Closeable?)?.close() clientSession.stop() serverSession.stop() artemis.stop() serverThread.shutdownNow() } - interface ITestOps : RPCOps { + interface TestOps : RPCOps { @Throws(IllegalArgumentException::class) fun barf() @@ -105,34 +128,33 @@ class ClientRPCInfrastructureTests { @RPCSinceVersion(2) fun addedLater() + + fun captureUser(): String } lateinit var complicatedObservable: Observable>> - inner class TestOps : ITestOps { + inner class TestOpsImpl : TestOps { override val protocolVersion = 1 - override fun barf() { - throw IllegalArgumentException("Barf!") - } + override fun barf(): Unit = throw IllegalArgumentException("Barf!") override fun void() { } override fun someCalculation(str: String, num: Int) = "$str $num" - override fun makeObservable(): Observable { - return Observable.just(1, 2, 3, 4) - } + override fun makeObservable(): Observable = Observable.just(1, 2, 3, 4) override fun makeComplicatedObservable() = complicatedObservable - override fun addedLater() { - throw UnsupportedOperationException("not implemented") - } + override fun addedLater(): Unit = throw UnsupportedOperationException("not implemented") + + override fun captureUser(): String = CURRENT_RPC_USER.get().username } @Test - fun simpleRPCs() { + fun `simple RPCs`() { + val proxy = createProxyUsingAuthenticatedReplyTo() // Does nothing, doesn't throw. proxy.void() @@ -144,14 +166,16 @@ class ClientRPCInfrastructureTests { } @Test - fun simpleObservable() { + fun `simple observable`() { + val proxy = createProxyUsingAuthenticatedReplyTo() // This tests that the observations are transmitted correctly, also completion is transmitted. val observations = proxy.makeObservable().toBlocking().toIterable().toList() assertEquals(listOf(1, 2, 3, 4), observations) } @Test - fun complexObservables() { + fun `complex observables`() { + val proxy = createProxyUsingAuthenticatedReplyTo() // This checks that we can return an object graph with complex usage of observables, like an observable // that emits objects that contain more observables. val serverQuotes = PublishSubject.create>>() @@ -177,7 +201,8 @@ class ClientRPCInfrastructureTests { } } - assertEquals(1, clientSession.addressQuery(SimpleString("tests.rpc.observations.#")).queueNames.size) + val rpcQueuesQuery = SimpleString("clients.${authenticatedUser.username}.rpc.*") + assertEquals(2, clientSession.addressQuery(rpcQueuesQuery).queueNames.size) assertThat(clientQuotes).isEmpty() @@ -192,11 +217,36 @@ class ClientRPCInfrastructureTests { assertTrue(serverQuotes.hasObservers()) subscription.unsubscribe() unsubscribeLatch.await() - assertEquals(0, clientSession.addressQuery(SimpleString("tests.rpc.observations.#")).queueNames.size) + assertEquals(1, clientSession.addressQuery(rpcQueuesQuery).queueNames.size) } @Test fun versioning() { + val proxy = createProxyUsingAuthenticatedReplyTo() assertFailsWith { proxy.addedLater() } } + + @Test + fun `authenticated user is available to RPC`() { + val proxy = createProxyUsingAuthenticatedReplyTo() + assertThat(proxy.captureUser()).isEqualTo(authenticatedUser.username) + } + + @Test + fun `using another username for the reply-to`() { + assertThatExceptionOfType(RPCException.DeadlineExceeded::class.java).isThrownBy { + val proxy = createProxyUsingReplyTo(random63BitValue().toString(), timeout = 300.millis) + proxy.void() + fail("RPC successfully returned using someone else's username for the reply-to") + } + } + + @Test + fun `using another username for the reply-to, which contains our username as a prefix`() { + assertThatExceptionOfType(RPCException.DeadlineExceeded::class.java).isThrownBy { + val proxy = createProxyUsingReplyTo("${authenticatedUser.username}extra", timeout = 300.millis) + proxy.void() + fail("RPC successfully returned using someone else's username for the reply-to") + } + } } \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/core/Utils.kt b/core/src/main/kotlin/com/r3corda/core/Utils.kt index 80248cf812..cdd21bd822 100644 --- a/core/src/main/kotlin/com/r3corda/core/Utils.kt +++ b/core/src/main/kotlin/com/r3corda/core/Utils.kt @@ -13,10 +13,13 @@ import rx.Observable import rx.subjects.UnicastSubject import java.io.BufferedInputStream import java.io.InputStream +import java.io.OutputStream import java.math.BigDecimal import java.nio.file.Files import java.nio.file.LinkOption +import java.nio.file.OpenOption import java.nio.file.Path +import java.nio.file.attribute.FileAttribute import java.time.Duration import java.time.temporal.Temporal import java.util.concurrent.ExecutionException @@ -31,6 +34,7 @@ val Int.days: Duration get() = Duration.ofDays(this.toLong()) val Int.hours: Duration get() = Duration.ofHours(this.toLong()) val Int.minutes: Duration get() = Duration.ofMinutes(this.toLong()) val Int.seconds: Duration get() = Duration.ofSeconds(this.toLong()) +val Int.millis: Duration get() = Duration.ofMillis(this.toLong()) // TODO: Review by EOY2016 if we ever found these utilities helpful. @@ -89,8 +93,17 @@ inline fun SettableFuture.catch(block: () -> T) { } } -fun Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block) +/** Allows you to write code like: Paths.get("someDir") / "subdir" / "filename" but using the Paths API to avoid platform separator problems. */ +operator fun Path.div(other: String): Path = resolve(other) +fun Path.createDirectories(vararg attrs: FileAttribute<*>): Path = Files.createDirectories(this, *attrs) fun Path.exists(vararg options: LinkOption): Boolean = Files.exists(this, *options) +inline fun Path.use(block: (InputStream) -> R): R = Files.newInputStream(this).use(block) +inline fun Path.write(createDirs: Boolean = false, vararg options: OpenOption, block: (OutputStream) -> Unit) { + if (createDirs) { + normalize().parent?.createDirectories() + } + Files.newOutputStream(this, *options).use(block) +} // Simple infix function to add back null safety that the JDK lacks: timeA until timeB infix fun Temporal.until(endExclusive: Temporal) = Duration.between(this, endExclusive) @@ -228,9 +241,6 @@ fun extractZipFile(zipPath: Path, toPath: Path) { val Throwable.rootCause: Throwable get() = Throwables.getRootCause(this) -/** Allows you to write code like: Paths.get("someDir") / "subdir" / "filename" but using the Paths API to avoid platform separator problems. */ -operator fun Path.div(other: String): Path = resolve(other) - /** Representation of an operation that may have thrown an error. */ data class ErrorOr private constructor(val value: A?, val error: Throwable?) { constructor(value: A) : this(value, null) @@ -278,21 +288,4 @@ fun Observable.bufferUntilSubscribed(): Observable { val subject = UnicastSubject.create() val subscription = subscribe(subject) return subject.doOnUnsubscribe { subscription.unsubscribe() } -} - -/** - * Determine if an iterable data type's contents are ordered and unique, based on their [Comparable].compareTo - * function. - */ -fun > Iterable.isOrderedAndUnique(extractId: T.() -> I): Boolean { - var last: I? = null - return all { it -> - val lastLast = last - last = extractId(it) - if (lastLast == null) { - true - } else { - lastLast < extractId(it) - } - } } \ No newline at end of file diff --git a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt index 08e9f019b1..8b5c0fc277 100644 --- a/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt +++ b/core/src/main/kotlin/com/r3corda/core/messaging/Messaging.kt @@ -179,7 +179,6 @@ interface Message { val data: ByteArray val debugTimestamp: Instant val uniqueMessageId: UUID - fun serialise(): ByteArray } /** A singleton that's useful for validating topic strings */ diff --git a/core/src/test/kotlin/com/r3corda/core/UtilsTest.kt b/core/src/test/kotlin/com/r3corda/core/UtilsTest.kt deleted file mode 100644 index ea3191fdab..0000000000 --- a/core/src/test/kotlin/com/r3corda/core/UtilsTest.kt +++ /dev/null @@ -1,24 +0,0 @@ -package com.r3corda.core - -import kotlin.test.assertFalse -import kotlin.test.assertTrue - -class UtilsTest { - fun `ordered and unique basic`() { - val basic = listOf(1, 2, 3, 5, 8) - assertTrue(basic.isOrderedAndUnique { this }) - - val negative = listOf(-1, 2, 5) - assertTrue(negative.isOrderedAndUnique { this }) - } - - fun `ordered and unique duplicate`() { - val duplicated = listOf(1, 2, 2, 3, 5, 8) - assertFalse(duplicated.isOrderedAndUnique { this }) - } - - fun `ordered and unique out of sequence`() { - val mixed = listOf(3, 1, 2, 8, 5) - assertFalse(mixed.isOrderedAndUnique { this }) - } -} \ No newline at end of file diff --git a/docs/source/clientrpc.rst b/docs/source/clientrpc.rst index 63f83845f2..77e2bad004 100644 --- a/docs/source/clientrpc.rst +++ b/docs/source/clientrpc.rst @@ -17,6 +17,14 @@ detail on how to use this is provided in the docs for the proxy method. For a brief tutorial on how one can use the RPC API see :doc:`tutorial-clientrpc-api`. +Security +-------- + +Users wanting to use the RPC library are first required to authenticate themselves with the node using a valid username +and password. These are kept in ``rpc-users.properties`` in the node base directory. This file also specifies +permissions for each user, which the RPC implementation can use to control access. The file format is described in +:doc:`corda-configuration-files`. + Observables ----------- diff --git a/docs/source/corda-configuration-files.rst b/docs/source/corda-configuration-files.rst index f4111f1073..f24b155a55 100644 --- a/docs/source/corda-configuration-files.rst +++ b/docs/source/corda-configuration-files.rst @@ -4,7 +4,7 @@ The Corda Configuration File Configuration File Location --------------------------- -The Corda all in one ``corda.jar`` file is generated by the ``gradle buildCordaJAR`` task and defaults to reading configuration from a ``node.conf`` file in the present working directory. +The Corda all-in-one ``corda.jar`` file is generated by the ``gradle buildCordaJAR`` task and defaults to reading configuration from a ``node.conf`` file in the present working directory. This behaviour can be overidden using the ``--config-file`` command line option to target configuration files with different names, or different file location (relative paths are relative to the current working directory). Also, the ``--base-directory`` command line option alters the Corda node workspace location and if specified a ``node.conf`` configuration file is then expected in the root of the workspace. @@ -97,6 +97,20 @@ Configuration File Fields :useHTTPS: If false the node's web server will be plain HTTP. If true the node will use the same certificate and private key from the ``/certificates/sslkeystore.jks`` file as the ArtemisMQ port for HTTPS. If HTTPS is enabled then unencrypted HTTP traffic to the node's **webAddress** port is not supported. +RPC Users File +============== +Corda also uses the ``rpc-users.properties`` file, found in the base directory, to control access to the RPC subsystem. +This is a Java properties file (details can be found in the `Javadocs `_) +which specifies a list of users with their password and list of permissions they're enabled for. +.. code-block:: text + :caption: Sample + admin=notsecure,ADMIN + user1=letmein,CASH,PAPER + +In this example ``user1`` has password ``letmein`` and has permissions for ``CASH`` and ``PAPER``. The permissions are +free-form strings which can be used by the RPC methods to control access. + +If ``rpc-users.properties`` is empty or doesn't exist then the RPC subsystem is effectively locked down. \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt index 04b5d3e2df..70871a44fc 100644 --- a/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt +++ b/node/src/main/kotlin/com/r3corda/node/driver/Driver.kt @@ -5,33 +5,30 @@ import com.fasterxml.jackson.databind.module.SimpleModule import com.google.common.net.HostAndPort import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.Party -import com.r3corda.core.crypto.generateKeyPair +import com.r3corda.core.div import com.r3corda.core.node.NodeInfo -import com.r3corda.core.node.services.NetworkMapCache import com.r3corda.core.node.services.ServiceInfo -import com.r3corda.node.serialization.NodeClock +import com.r3corda.core.utilities.loggerFor +import com.r3corda.core.write +import com.r3corda.node.services.User import com.r3corda.node.services.config.ConfigHelper import com.r3corda.node.services.config.FullNodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingComponent import com.r3corda.node.services.messaging.ArtemisMessagingServer import com.r3corda.node.services.messaging.NodeMessagingClient -import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.utilities.JsonSupport import com.typesafe.config.Config import com.typesafe.config.ConfigRenderOptions import org.slf4j.Logger -import org.slf4j.LoggerFactory import java.io.File -import java.io.InputStreamReader import java.net.* import java.nio.file.Path import java.nio.file.Paths -import java.text.SimpleDateFormat -import java.time.Clock +import java.time.Instant +import java.time.ZoneOffset.UTC +import java.time.format.DateTimeFormatter import java.util.* import java.util.concurrent.* -import kotlin.concurrent.thread /** * This file defines a small "Driver" DSL for starting up nodes that is only intended for development, demos and tests. @@ -45,7 +42,7 @@ import kotlin.concurrent.thread * TODO The network map service bootstrap is hacky (needs to fake the service's public key in order to retrieve the true one), needs some thought. */ -private val log: Logger = LoggerFactory.getLogger(DriverDSL::class.java) +private val log: Logger = loggerFor() /** * This is the interface that's exposed to DSL users. @@ -57,9 +54,12 @@ interface DriverDSLExposedInterface { * @param providedName Optional name of the node, which will be its legal name in [Party]. Defaults to something * random. Note that this must be unique as the driver uses it as a primary key! * @param advertisedServices The set of services to be advertised by the node. Defaults to empty set. + * @param rpcUsers List of users who are authorised to use the RPC system. Defaults to empty list. * @return The [NodeInfo] of the started up node retrieved from the network map service. */ - fun startNode(providedName: String? = null, advertisedServices: Set = setOf()): Future + fun startNode(providedName: String? = null, + advertisedServices: Set = emptySet(), + rpcUsers: List = emptyList()): Future fun waitForAllNodesToFinish() } @@ -102,7 +102,7 @@ sealed class PortAllocation { * * The driver implicitly bootstraps a [NetworkMapService] that may be accessed through a local cache [DriverDSL.networkMapCache]. * - * @param baseDirectory The base directory node directories go into, defaults to "build//". The node + * @param driverDirectory The base directory node directories go into, defaults to "build//". The node * directories themselves are "//", where legalName defaults to "-" * and may be specified in [DriverDSL.startNode]. * @param portAllocation The port allocation strategy to use for the messaging and the web server addresses. Defaults to incremental. @@ -113,7 +113,7 @@ sealed class PortAllocation { * @return The value returned in the [dsl] closure. */ fun driver( - baseDirectory: String = "build/${getTimestampAsDirectoryName()}", + driverDirectory: Path = Paths.get("build", getTimestampAsDirectoryName()), portAllocation: PortAllocation = PortAllocation.Incremental(10000), debugPortAllocation: PortAllocation = PortAllocation.Incremental(5005), useTestClock: Boolean = false, @@ -123,7 +123,7 @@ fun driver( driverDsl = DriverDSL( portAllocation = portAllocation, debugPortAllocation = debugPortAllocation, - baseDirectory = baseDirectory, + driverDirectory = driverDirectory, useTestClock = useTestClock, isDebug = isDebug ), @@ -162,10 +162,7 @@ fun genericD } private fun getTimestampAsDirectoryName(): String { - val tz = TimeZone.getTimeZone("UTC") - val df = SimpleDateFormat("yyyyMMddHHmmss") - df.timeZone = tz - return df.format(Date()) + return DateTimeFormatter.ofPattern("yyyyMMddHHmmss").withZone(UTC).format(Instant.now()) } fun addressMustBeBound(hostAndPort: HostAndPort) { @@ -207,7 +204,7 @@ fun poll(pollName: String, pollIntervalMs: Long = 500, warnCount: Int = 120, open class DriverDSL( val portAllocation: PortAllocation, val debugPortAllocation: PortAllocation, - val baseDirectory: String, + val driverDirectory: Path, val useTestClock: Boolean, val isDebug: Boolean ) : DriverDSLInternalInterface { @@ -271,7 +268,7 @@ open class DriverDSL( } private fun queryNodeInfo(webAddress: HostAndPort): NodeInfo? { - val url = URL("http://${webAddress.toString()}/api/info") + val url = URL("http://$webAddress/api/info") try { val conn = url.openConnection() as HttpURLConnection conn.requestMethod = "GET" @@ -291,20 +288,20 @@ open class DriverDSL( } } - override fun startNode(providedName: String?, advertisedServices: Set): Future { + override fun startNode(providedName: String?, advertisedServices: Set, rpcUsers: List): Future { val messagingAddress = portAllocation.nextHostAndPort() val apiAddress = portAllocation.nextHostAndPort() val debugPort = if (isDebug) debugPortAllocation.nextPort() else null val name = providedName ?: "${pickA(name)}-${messagingAddress.port}" - val nodeDirectory = "$baseDirectory/$name" + val baseDirectory = driverDirectory / name val config = ConfigHelper.loadConfig( - baseDirectoryPath = Paths.get(nodeDirectory), + baseDirectoryPath = baseDirectory, allowMissingConfig = true, configOverrides = mapOf( "myLegalName" to name, - "basedir" to Paths.get(nodeDirectory).normalize().toString(), + "basedir" to baseDirectory.normalize().toString(), "artemisAddress" to messagingAddress.toString(), "webAddress" to apiAddress.toString(), "extraAdvertisedServiceIds" to advertisedServices.joinToString(","), @@ -313,8 +310,16 @@ open class DriverDSL( ) ) + val nodeConfig = FullNodeConfiguration(config) + + nodeConfig.rpcUsersFile.write(createDirs = true) { + rpcUsers.map { it.username to "${it.password},${it.permissions.joinToString(",")}" } + .toMap(Properties()) + .store(it, null) + } + return Executors.newSingleThreadExecutor().submit(Callable { - registerProcess(DriverDSL.startNode(config, quasarJarPath, debugPort)) + registerProcess(DriverDSL.startNode(nodeConfig, quasarJarPath, debugPort)) NodeInfoAndConfig(queryNodeInfo(apiAddress)!!, config) }) } @@ -327,13 +332,13 @@ open class DriverDSL( val apiAddress = portAllocation.nextHostAndPort() val debugPort = if (isDebug) debugPortAllocation.nextPort() else null - val nodeDirectory = "$baseDirectory/$networkMapName" + val baseDirectory = driverDirectory / networkMapName val config = ConfigHelper.loadConfig( - baseDirectoryPath = Paths.get(nodeDirectory), + baseDirectoryPath = baseDirectory, allowMissingConfig = true, configOverrides = mapOf( "myLegalName" to networkMapName, - "basedir" to Paths.get(nodeDirectory).normalize().toString(), + "basedir" to baseDirectory.normalize().toString(), "artemisAddress" to networkMapAddress.toString(), "webAddress" to apiAddress.toString(), "extraAdvertisedServiceIds" to "", @@ -342,7 +347,7 @@ open class DriverDSL( ) log.info("Starting network-map-service") - registerProcess(startNode(config, quasarJarPath, debugPort)) + registerProcess(startNode(FullNodeConfiguration(config), quasarJarPath, debugPort)) } companion object { @@ -356,13 +361,12 @@ open class DriverDSL( fun pickA(array: Array): A = array[Math.abs(Random().nextInt()) % array.size] private fun startNode( - config: Config, + nodeConf: FullNodeConfiguration, quasarJarPath: String, debugPort: Int? ): Process { - val nodeConf = FullNodeConfiguration(config) // Write node.conf - writeConfig(nodeConf.basedir, "node.conf", config) + writeConfig(nodeConf.basedir, "node.conf", nodeConf.config) val className = "com.r3corda.node.MainKt" // cannot directly get class for this, so just use string val separator = System.getProperty("file.separator") diff --git a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt index 1e19492c0c..4f3de4ed89 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/AbstractNode.kt @@ -26,7 +26,7 @@ import com.r3corda.node.services.events.NodeSchedulerService import com.r3corda.node.services.events.ScheduledActivityObserver import com.r3corda.node.services.identity.InMemoryIdentityService import com.r3corda.node.services.keys.PersistentKeyManagementService -import com.r3corda.node.services.messaging.CordaRPCOps +import com.r3corda.node.services.messaging.RPCOps import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.network.NetworkMapService.Companion.REGISTER_PROTOCOL_TOPIC @@ -252,7 +252,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo isPreviousCheckpointsPresent = true false } - startMessagingService(ServerRPCOps(services, smm, database)) + startMessagingService(CordaRPCOpsImpl(services, smm, database)) runOnStop += Runnable { net.stop() } _networkMapRegistrationFuture.setFuture(registerWithNetworkMap()) smm.start() @@ -434,7 +434,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration, val netwo protected abstract fun makeMessagingService(): MessagingServiceInternal - protected abstract fun startMessagingService(cordaRPCOps: CordaRPCOps) + protected abstract fun startMessagingService(rpcOps: RPCOps) protected open fun initialiseStorageService(dir: Path): Pair { val attachments = makeAttachmentStorage(dir) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt b/node/src/main/kotlin/com/r3corda/node/internal/CordaRPCOpsImpl.kt similarity index 95% rename from node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt rename to node/src/main/kotlin/com/r3corda/node/internal/CordaRPCOpsImpl.kt index 8ff41aa804..2860655bcc 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/ServerRPCOps.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/CordaRPCOpsImpl.kt @@ -1,7 +1,6 @@ package com.r3corda.node.internal import com.r3corda.contracts.asset.Cash -import com.r3corda.core.contracts.InsufficientBalanceException import com.r3corda.core.contracts.* import com.r3corda.core.crypto.Party import com.r3corda.core.crypto.SecureHash @@ -13,10 +12,7 @@ import com.r3corda.core.node.services.StateMachineTransactionMapping import com.r3corda.core.node.services.Vault import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.TransactionBuilder -import com.r3corda.node.services.messaging.CordaRPCOps -import com.r3corda.node.services.messaging.StateMachineInfo -import com.r3corda.node.services.messaging.StateMachineUpdate -import com.r3corda.node.services.messaging.TransactionBuildResult +import com.r3corda.node.services.messaging.* import com.r3corda.node.services.statemachine.StateMachineManager import com.r3corda.node.utilities.databaseTransaction import com.r3corda.protocols.BroadcastTransactionProtocol @@ -29,12 +25,16 @@ import java.security.KeyPair * Server side implementations of RPCs available to MQ based client tools. Execution takes place on the server * thread (i.e. serially). Arguments are serialised and deserialised automatically. */ -class ServerRPCOps( +class CordaRPCOpsImpl( val services: ServiceHub, val smm: StateMachineManager, val database: Database ) : CordaRPCOps { - override val protocolVersion: Int = 0 + companion object { + const val CASH_PERMISSION = "CASH" + } + + override val protocolVersion: Int get() = 0 override fun networkMapUpdates(): Pair, Observable> { return services.networkMapCache.track() @@ -59,6 +59,7 @@ class ServerRPCOps( changes.map { StateMachineUpdate.fromStateMachineChange(it) } ) } + override fun stateMachineRecordedTransactionMapping(): Pair, Observable> { return databaseTransaction(database) { services.storageService.stateMachineRecordedTransactionMapping.track() @@ -66,6 +67,7 @@ class ServerRPCOps( } override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult { + requirePermission(CASH_PERMISSION) return databaseTransaction(database) { when (command) { is ClientToServiceCommand.IssueCash -> issueCash(command) diff --git a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt index 359a0fcb34..814f9fd3ab 100644 --- a/node/src/main/kotlin/com/r3corda/node/internal/Node.kt +++ b/node/src/main/kotlin/com/r3corda/node/internal/Node.kt @@ -7,11 +7,13 @@ import com.r3corda.core.node.services.ServiceInfo import com.r3corda.core.then import com.r3corda.core.utilities.loggerFor import com.r3corda.node.serialization.NodeClock +import com.r3corda.node.services.PropertiesFileRPCUserService +import com.r3corda.node.services.RPCUserService import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.FullNodeConfiguration import com.r3corda.node.services.messaging.ArtemisMessagingServer -import com.r3corda.node.services.messaging.CordaRPCOps import com.r3corda.node.services.messaging.NodeMessagingClient +import com.r3corda.node.services.messaging.RPCOps import com.r3corda.node.services.transactions.PersistentUniquenessProvider import com.r3corda.node.servlets.AttachmentDownloadServlet import com.r3corda.node.servlets.Config @@ -112,10 +114,13 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: private var shutdownThread: Thread? = null + private lateinit var userService: RPCUserService + override fun makeMessagingService(): MessagingServiceInternal { + userService = PropertiesFileRPCUserService(configuration.rpcUsersFile) val serverAddr = with(configuration) { messagingServerAddress ?: { - messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache) + messageBroker = ArtemisMessagingServer(this, artemisAddress, services.networkMapCache, userService) artemisAddress }() } @@ -124,7 +129,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: return NodeMessagingClient(configuration, serverAddr, myIdentityOrNullIfNetworkMapService, serverThread, database) } - override fun startMessagingService(cordaRPCOps: CordaRPCOps) { + override fun startMessagingService(rpcOps: RPCOps) { // Start up the embedded MQ server messageBroker?.apply { runOnStop += Runnable { messageBroker?.stop() } @@ -135,7 +140,7 @@ class Node(override val configuration: FullNodeConfiguration, networkMapAddress: // Start up the MQ client. val net = net as NodeMessagingClient net.configureWithDevSSLCertificate() // TODO: Client might need a separate certificate - net.start(cordaRPCOps) + net.start(rpcOps, userService) } private fun initWebServer(): Server { diff --git a/node/src/main/kotlin/com/r3corda/node/services/RPCUserService.kt b/node/src/main/kotlin/com/r3corda/node/services/RPCUserService.kt new file mode 100644 index 0000000000..aaa5d62c77 --- /dev/null +++ b/node/src/main/kotlin/com/r3corda/node/services/RPCUserService.kt @@ -0,0 +1,51 @@ +package com.r3corda.node.services + +import com.r3corda.core.exists +import com.r3corda.core.use +import java.nio.file.Path +import java.util.* + +/** + * Service for retrieving [User] objects representing RPC users who are authorised to use the RPC system. A [User] + * contains their login username and password along with a set of permissions for RPC services they are allowed access + * to. These permissions are represented as [String]s to allow RPC implementations to add their own permissioning. + */ +interface RPCUserService { + fun getUser(usename: String): User? + val users: List +} + +// TODO If this sticks around then change it to use HOCON ... +// TODO ... and also store passwords as salted hashes. +// TODO Otherwise consider something like Apache Shiro +class PropertiesFileRPCUserService(file: Path) : RPCUserService { + + private val _users: Map + + init { + _users = if (file.exists()) { + val properties = Properties() + file.use { + properties.load(it) + } + properties.map { + val parts = it.value.toString().split(delimiters = ",") + val username = it.key.toString() + require(!username.contains("""\.|\*|#""".toRegex())) { """Usernames cannot have the following characters: * . # """ } + val password = parts[0] + val permissions = parts.drop(1).map(String::toUpperCase).toSet() + User(username, password, permissions) + }.associateBy(User::username) + } else { + emptyMap() + } + } + + override fun getUser(usename: String): User? = _users[usename] + + override val users: List get() = _users.values.toList() +} + +data class User(val username: String, val password: String, val permissions: Set) { + override fun toString(): String = "${javaClass.simpleName}($username, permissions=$permissions)" +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt index 5886f6a92c..06fdea1ded 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/config/NodeConfiguration.kt @@ -11,7 +11,6 @@ import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.utilities.TestClock import com.typesafe.config.Config import java.nio.file.Path -import java.time.Clock import java.util.* interface NodeSSLConfiguration { @@ -25,6 +24,7 @@ interface NodeSSLConfiguration { interface NodeConfiguration : NodeSSLConfiguration { val basedir: Path override val certificatesPath: Path get() = basedir / "certificates" + val rpcUsersFile: Path get() = basedir / "rpc-users.properties" val myLegalName: String val nearestCity: String val emailAddress: String @@ -33,7 +33,7 @@ interface NodeConfiguration : NodeSSLConfiguration { val devMode: Boolean } -class FullNodeConfiguration(config: Config) : NodeConfiguration { +class FullNodeConfiguration(val config: Config) : NodeConfiguration { override val basedir: Path by config override val myLegalName: String by config override val nearestCity: String by config diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt index 635e784f7a..88a954fe9e 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/ArtemisMessagingServer.kt @@ -4,14 +4,12 @@ import com.google.common.net.HostAndPort import com.r3corda.core.ThreadBox import com.r3corda.core.crypto.AddressFormatException import com.r3corda.core.div -import com.r3corda.core.exists import com.r3corda.core.messaging.SingleMessageRecipient import com.r3corda.core.node.services.NetworkMapCache -import com.r3corda.core.use import com.r3corda.core.utilities.loggerFor +import com.r3corda.node.services.RPCUserService import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_ROLE_NAME -import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.RPC_ROLE_NAME +import com.r3corda.node.services.messaging.ArtemisMessagingServer.NodeLoginModule.Companion.NODE_USER import org.apache.activemq.artemis.api.core.SimpleString import org.apache.activemq.artemis.core.config.BridgeConfiguration import org.apache.activemq.artemis.core.config.Configuration @@ -25,8 +23,6 @@ import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal import org.apache.activemq.artemis.spi.core.security.jaas.UserPrincipal import rx.Subscription import java.io.IOException -import java.nio.file.Files -import java.nio.file.Path import java.security.Principal import java.util.* import javax.annotation.concurrent.ThreadSafe @@ -57,9 +53,11 @@ import javax.security.auth.spi.LoginModule @ThreadSafe class ArtemisMessagingServer(override val config: NodeConfiguration, val myHostPort: HostAndPort, - val networkMapCache: NetworkMapCache) : ArtemisMessagingComponent() { + val networkMapCache: NetworkMapCache, + val userService: RPCUserService) : ArtemisMessagingComponent() { + companion object { - val log = loggerFor() + private val log = loggerFor() } private class InnerState { @@ -187,20 +185,23 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, idCacheSize = 2000 // Artemis Default duplicate cache size i.e. a guess isPersistIDCache = true isPopulateValidatedUser = true - setupUserRoles() + configureQueueSecurity() } - // This gives nodes full access and RPC clients only enough to do RPC - private fun ConfigurationImpl.setupUserRoles() { - // TODO COR-307 - val nodeRole = Role(NODE_ROLE_NAME, true, true, true, true, true, true, true, true) - val clientRpcRole = restrictedRole(RPC_ROLE_NAME, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true) - securityRoles = mapOf( - "#" to setOf(nodeRole), - "clients.*.rpc.responses.*" to setOf(nodeRole, clientRpcRole), - "clients.*.rpc.observations.*" to setOf(nodeRole, clientRpcRole), - RPC_REQUESTS_QUEUE to setOf(nodeRole, restrictedRole(RPC_ROLE_NAME, send = true)) - ) + private fun ConfigurationImpl.configureQueueSecurity() { + val nodeRPCSendRole = restrictedRole(NODE_USER, send = true) // The node needs to be able to send responses on the client queues + + for ((username) in userService.users) { + // Clients need to be able to consume the responses on their queues, and they're also responsible for creating and destroying them + val clientRole = restrictedRole(username, consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true) + securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf(nodeRPCSendRole, clientRole) + } + + // TODO Restrict this down to just what the node needs + securityRoles["#"] = setOf(Role(NODE_USER, true, true, true, true, true, true, true, true)) + securityRoles[RPC_REQUESTS_QUEUE] = setOf( + restrictedRole(NODE_USER, createNonDurableQueue = true, deleteNonDurableQueue = true), + restrictedRole(RPC_REQUESTS_QUEUE, send = true)) // Clients need to be able to send their requests } private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false, @@ -211,19 +212,10 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, } private fun createArtemisSecurityManager(): ActiveMQJAASSecurityManager { - val rpcUsersFile = config.basedir / "rpc-users.properties" - if (!rpcUsersFile.exists()) { - val users = Properties() - users["user1"] = "test" - Files.newOutputStream(rpcUsersFile).use { - users.store(it, null) - } - } - val securityConfig = object : SecurityConfiguration() { // Override to make it work with our login module override fun getAppConfigurationEntry(name: String): Array { - val options = mapOf(NodeLoginModule.FILE_KEY to rpcUsersFile) + val options = mapOf(RPCUserService::class.java.name to userService) return arrayOf(AppConfigurationEntry(name, REQUIRED, options)) } } @@ -277,29 +269,25 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, } + // We could have used the built-in PropertiesLoginModule but that exposes a roles properties file. Roles are used + // for queue access control and our RPC users must only have access to the queues they need and this cannot be allowed + // to be modified. class NodeLoginModule : LoginModule { companion object { - const val FILE_KEY = "rpc-users-file" - const val NODE_ROLE_NAME = "NodeRole" - const val RPC_ROLE_NAME = "RpcRole" + const val NODE_USER = "Node" } - private val users = Properties() private var loginSucceeded: Boolean = false private lateinit var subject: Subject private lateinit var callbackHandler: CallbackHandler - private lateinit var principals: List + private lateinit var userService: RPCUserService + private val principals = ArrayList() override fun initialize(subject: Subject, callbackHandler: CallbackHandler, sharedState: Map, options: Map) { this.subject = subject this.callbackHandler = callbackHandler - val rpcUsersFile = options[FILE_KEY] as Path - if (rpcUsersFile.exists()) { - rpcUsersFile.use { - users.load(it) - } - } + userService = options[RPCUserService::class.java.name] as RPCUserService } override fun login(): Boolean { @@ -316,14 +304,16 @@ class ArtemisMessagingServer(override val config: NodeConfiguration, val username = nameCallback.name ?: throw FailedLoginException("User name is null") val receivedPassword = passwordCallback.password ?: throw FailedLoginException("Password is null") - val password = if (username == "Node") "Node" else users[username] ?: throw FailedLoginException("User does not exist") + val password = if (username == NODE_USER) "Node" else userService.getUser(username)?.password ?: throw FailedLoginException("User does not exist") if (password != String(receivedPassword)) { throw FailedLoginException("Password does not match") } - principals = listOf( - UserPrincipal(username), - RolePrincipal(if (username == "Node") NODE_ROLE_NAME else RPC_ROLE_NAME)) + principals += UserPrincipal(username) + principals += RolePrincipal(username) // The roles are configured using the usernames + if (username != NODE_USER) { + principals += RolePrincipal(RPC_REQUESTS_QUEUE) // This enables the RPC client to send requests + } loginSucceeded = true return loginSucceeded diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt index 0f7b616603..6afa401e71 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/CordaRPCOps.kt @@ -113,7 +113,7 @@ interface CordaRPCOps : RPCOps { fun networkMapUpdates(): Pair, Observable> /** - * Executes the given command, possibly triggering cash creation etc. + * Executes the given command if the user is permissioned to do so, possibly triggering cash creation etc. * TODO: The signature of this is weird because it's the remains of an old service call, we should have a call for each command instead. */ fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt index b0f398c0d6..f69c528f1c 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/NodeMessagingClient.kt @@ -7,6 +7,7 @@ import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.serialization.opaque import com.r3corda.core.utilities.loggerFor import com.r3corda.core.utilities.trace +import com.r3corda.node.services.RPCUserService import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.utilities.* @@ -44,11 +45,6 @@ import javax.annotation.concurrent.ThreadSafe * @param myIdentity Either the public key to be used as the ArtemisMQ address and queue name for the node globally, or null to indicate * that this is a NetworkMapService node which will be bound globally to the name "networkmap" * @param executor An executor to run received message tasks upon. - * @param persistentInbox If true the inbox will be created persistent if not already created. - * If false the inbox queue will be transient, which is appropriate for UI clients for example. - * @param persistenceTx A lambda to wrap message processing in any transaction required by the persistence layer (e.g. - * a database transaction) without introducing a dependency on the actual solution and any parameters it requires - * in this class. */ @ThreadSafe class NodeMessagingClient(override val config: NodeConfiguration, @@ -115,7 +111,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, } }) - fun start(rpcOps: CordaRPCOps) { + fun start(rpcOps: RPCOps, userService: RPCUserService) { state.locked { check(!started) { "start can't be called twice" } started = true @@ -151,7 +147,7 @@ class NodeMessagingClient(override val config: NodeConfiguration, session.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 1") rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE) rpcNotificationConsumer = session.createConsumer("rpc.qremovals") - dispatcher = createRPCDispatcher(state, rpcOps) + dispatcher = createRPCDispatcher(rpcOps, userService) } } @@ -229,7 +225,6 @@ class NodeMessagingClient(override val config: NodeConfiguration, override val data: ByteArray = body override val debugTimestamp: Instant = Instant.ofEpochMilli(message.timestamp) override val uniqueMessageId: UUID = uuid - override fun serialise(): ByteArray = body override fun toString() = topic + "#" + data.opaque() } @@ -389,15 +384,14 @@ class NodeMessagingClient(override val config: NodeConfiguration, override val topicSession: TopicSession = topicSession override val data: ByteArray = data override val debugTimestamp: Instant = Instant.now() - override fun serialise(): ByteArray = this.serialise() override val uniqueMessageId: UUID = uuid - override fun toString() = topicSession.toString() + "#" + String(data) + override fun toString() = "$topicSession#${String(data)}" } } var dispatcher: RPCDispatcher? = null - private fun createRPCDispatcher(state: ThreadBox, ops: CordaRPCOps) = object : RPCDispatcher(ops) { + private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService) = object : RPCDispatcher(ops, userService) { override fun send(bits: SerializedBytes<*>, toAddress: String) { state.locked { val msg = session!!.createMessage(false).apply { @@ -409,4 +403,4 @@ class NodeMessagingClient(override val config: NodeConfiguration, } } } -} +} \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCDispatcher.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCDispatcher.kt index 5152a9561a..e6ed9f8d5c 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCDispatcher.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCDispatcher.kt @@ -5,30 +5,35 @@ import com.esotericsoftware.kryo.KryoException import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output +import com.google.common.annotations.VisibleForTesting import com.google.common.collect.HashMultimap import com.r3corda.core.ErrorOr import com.r3corda.core.serialization.SerializedBytes import com.r3corda.core.serialization.deserialize import com.r3corda.core.serialization.serialize import com.r3corda.core.utilities.debug +import com.r3corda.node.services.RPCUserService +import com.r3corda.node.services.User import com.r3corda.node.utilities.AffinityExecutor +import org.apache.activemq.artemis.api.core.Message import org.apache.activemq.artemis.api.core.client.ClientConsumer +import org.apache.activemq.artemis.api.core.client.ClientMessage import rx.Notification import rx.Observable import rx.Subscription import java.lang.reflect.InvocationTargetException import java.util.concurrent.atomic.AtomicInteger -// TODO: Exposing the authenticated message sender. - /** * Intended to service transient clients only (not p2p nodes) for short-lived, transient request/response pairs. * If you need robustness, this is the wrong system. If you don't want a response, this is probably the * wrong system (you could just send a message). If you want complex customisation of how requests/responses * are handled, this is probably the wrong system. */ -abstract class RPCDispatcher(val target: Any) { - private val methodTable = target.javaClass.declaredMethods.associateBy { it.name } +abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService) { + // Throw an exception if there are overloaded methods + private val methodTable = ops.javaClass.declaredMethods.groupBy { it.name }.mapValues { it.value.single() } + private val queueToSubscription = HashMultimap.create() // Created afresh for every RPC that is annotated as returning observables. Every time an observable is @@ -64,23 +69,24 @@ abstract class RPCDispatcher(val target: Any) { } fun dispatch(msg: ClientRPCRequestMessage) { - val (argBytes, replyTo, observationsTo, name) = msg - val maybeArgs = argBytes.deserialize>() + val (argsBytes, replyTo, observationsTo, methodName) = msg - rpcLog.debug { "-> RPC -> $name(${maybeArgs.joinToString()}) [reply to $replyTo]" } val response: ErrorOr = ErrorOr.catch { - val method = methodTable[name] ?: throw RPCException("Received RPC for unknown method $name - possible client/server version skew?") - + val method = methodTable[methodName] ?: throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?") if (method.isAnnotationPresent(RPCReturnsObservables::class.java) && observationsTo == null) throw RPCException("Received RPC without any destination for observations, but the RPC returns observables") + val args = argsBytes.deserialize() + + rpcLog.debug { "-> RPC -> $methodName(${args.joinToString()}) [reply to $replyTo]" } + try { - method.invoke(target, *maybeArgs) + method.invoke(ops, *args) } catch (e: InvocationTargetException) { throw e.cause!! } } - rpcLog.debug { "<- RPC <- $name = $response " } + rpcLog.debug { "<- RPC <- $methodName = $response " } val kryo = createRPCKryo(observableSerializer = if (observationsTo != null) ObservableSerializer(observationsTo) else null) @@ -88,7 +94,7 @@ abstract class RPCDispatcher(val target: Any) { val responseBits = try { response.serialize(kryo) } catch (e: KryoException) { - rpcLog.error("Failed to respond to inbound RPC $name", e) + rpcLog.error("Failed to respond to inbound RPC $methodName", e) ErrorOr.of(e).serialize(kryo) } send(responseBits, replyTo) @@ -116,14 +122,48 @@ abstract class RPCDispatcher(val target: Any) { onExecutor.execute { try { val rpcMessage = msg.toRPCRequestMessage() + CURRENT_RPC_USER.set(rpcMessage.user) dispatch(rpcMessage) } catch(e: RPCException) { rpcLog.warn("Received malformed client RPC message: ${e.message}") rpcLog.trace("RPC exception", e) } catch(e: Throwable) { rpcLog.error("Uncaught exception when dispatching client RPC", e) + } finally { + CURRENT_RPC_USER.remove() } } } } + + private fun ClientMessage.requiredString(name: String): String { + return getStringProperty(name) ?: throw RPCException("missing $name property") + } + + /** Convert an Artemis [ClientMessage] to a MQ-neutral [ClientRPCRequestMessage]. */ + private fun ClientMessage.toRPCRequestMessage(): ClientRPCRequestMessage { + val user = getUser(this) + val replyTo = getAuthenticatedAddress(user, ClientRPCRequestMessage.REPLY_TO, true)!! + val observationsTo = getAuthenticatedAddress(user, ClientRPCRequestMessage.OBSERVATIONS_TO, false) + val argBytes = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) } + if (argBytes.isEmpty()) { + throw RPCException("empty serialized args") + } + val methodName = requiredString(ClientRPCRequestMessage.METHOD_NAME) + return ClientRPCRequestMessage(SerializedBytes(argBytes), replyTo, observationsTo, methodName, user) + } + + @VisibleForTesting + protected open fun getUser(message: ClientMessage): User { + return userService.getUser(message.requiredString(Message.HDR_VALIDATED_USER.toString()))!! + } + + private fun ClientMessage.getAuthenticatedAddress(user: User, property: String, required: Boolean): String? { + val address: String? = if (required) requiredString(property) else getStringProperty(property) + val expectedAddressPrefix = "${ArtemisMessagingComponent.CLIENTS_PREFIX}${user.username}." + if (address != null && !address.startsWith(expectedAddressPrefix)) { + throw RPCException("$property address does not match up with the user") + } + return address + } } \ No newline at end of file diff --git a/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCStructures.kt b/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCStructures.kt index 22bdc4581b..5a1eb09b83 100644 --- a/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCStructures.kt +++ b/node/src/main/kotlin/com/r3corda/node/services/messaging/RPCStructures.kt @@ -1,3 +1,5 @@ +@file:JvmName("RPCStructures") + package com.r3corda.node.services.messaging import com.esotericsoftware.kryo.Kryo @@ -22,12 +24,13 @@ import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.serialization.* import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.transactions.WireTransaction +import com.r3corda.node.services.User import de.javakaffee.kryoserializers.ArraysAsListSerializer import de.javakaffee.kryoserializers.guava.* import net.i2p.crypto.eddsa.EdDSAPrivateKey import net.i2p.crypto.eddsa.EdDSAPublicKey -import org.apache.activemq.artemis.api.core.client.ClientMessage import org.objenesis.strategy.StdInstantiatorStrategy +import org.slf4j.Logger import org.slf4j.LoggerFactory import rx.Notification import rx.Observable @@ -35,7 +38,7 @@ import java.time.Instant import java.util.* /** Global RPC logger */ -val rpcLog by lazy { LoggerFactory.getLogger("com.r3corda.rpc") } +val rpcLog: Logger by lazy { LoggerFactory.getLogger("com.r3corda.rpc") } /** Used in the RPC wire protocol to wrap an observation with the handle of the observable it's intended for. */ data class MarshalledObservation(val forHandle: Int, val what: Notification<*>) @@ -59,7 +62,8 @@ data class ClientRPCRequestMessage( val args: SerializedBytes>, val replyToAddress: String, val observationsToAddress: String?, - val methodName: String + val methodName: String, + val user: User ) { companion object { const val REPLY_TO = "reply-to" @@ -77,6 +81,20 @@ interface RPCOps { val protocolVersion: Int } +/** + * This is available to RPC implementations to query the validated [User] that is calling it. Each user has a set of + * permissions they're entitled to which can be used to control access. + */ +@JvmField +val CURRENT_RPC_USER: ThreadLocal = ThreadLocal() + +/** Helper method which checks that the current RPC user is entitled for the given permission. Throws a [PermissionException] otherwise. */ +fun requirePermission(permission: String) { + if (permission !in CURRENT_RPC_USER.get().permissions) { + throw PermissionException("User not permissioned for $permission") + } +} + /** * Thrown to indicate a fatal error in the RPC system itself, as opposed to an error generated by the invoked * method. @@ -86,26 +104,12 @@ open class RPCException(msg: String, cause: Throwable?) : RuntimeException(msg, class DeadlineExceeded(rpcName: String) : RPCException("Deadline exceeded on call to $rpcName") } -/** Convert an Artemis [ClientMessage] to a MQ-neutral [ClientRPCRequestMessage]. */ -fun ClientMessage.toRPCRequestMessage(): ClientRPCRequestMessage { - fun ClientMessage.requiredString(name: String): String = getStringProperty(name) ?: throw RPCException("Malformed request message: missing $name property") - - val methodName = requiredString(ClientRPCRequestMessage.METHOD_NAME) - // TODO: Look up the authenticated sender identity once we upgrade to Artemis 1.4 and use that instead. - // This current approach is insecure: one client could send an RPC with a reply-to address owned by - // another, although they'd have to be able to figure out the other client ID first. - // We also need that to figure out what RPCs are allowed. - val replyTo = requiredString(ClientRPCRequestMessage.REPLY_TO) - val observationsTo = getStringProperty(ClientRPCRequestMessage.OBSERVATIONS_TO) - val argBytes = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) } - check(argBytes.isNotEmpty()) - return ClientRPCRequestMessage(SerializedBytes(argBytes), replyTo, observationsTo, methodName) -} +class PermissionException(msg: String) : RuntimeException(msg) // The Kryo used for the RPC wire protocol. Every type in the wire protocol is listed here explicitly. // This is annoying to write out, but will make it easier to formalise the wire protocol when the time comes, // because we can see everything we're using in one place. -private class RPCKryo(private val observableSerializer: Serializer>? = null) : Kryo() { +private class RPCKryo(observableSerializer: Serializer>? = null) : Kryo() { init { isRegistrationRequired = true // Allow construction of objects using a JVM backdoor that skips invoking the constructors, if there is no @@ -173,7 +177,11 @@ private class RPCKryo(private val observableSerializer: Serializer ArtemisMessagingComponent.NodeAddress(parsePublicKeyBase58(kryo.readObject(input, String::class.java)), kryo.readObject(input, HostAndPort::class.java)) }, + read = { kryo, input -> + ArtemisMessagingComponent.NodeAddress( + parsePublicKeyBase58(kryo.readObject(input, String::class.java)), + kryo.readObject(input, HostAndPort::class.java)) + }, write = { kryo, output, nodeAddress -> kryo.writeObject(output, nodeAddress.identity.toBase58String()) kryo.writeObject(output, nodeAddress.hostAndPort) @@ -193,13 +201,8 @@ private class RPCKryo(private val observableSerializer: Serializer register(type: Class, read: (Kryo, Input) -> T, write: (Kryo, Output, T) -> Unit) { register(type, object : Serializer() { - override fun read(kryo: Kryo, input: Input, type: Class?): T { - return read(kryo, input) - } - - override fun write(kryo: Kryo, output: Output, o: T) { - write(kryo, output, o) - } + override fun read(kryo: Kryo, input: Input, type: Class): T = read(kryo, input) + override fun write(kryo: Kryo, output: Output, o: T) = write(kryo, output, o) }) } diff --git a/node/src/test/kotlin/com/r3corda/node/ServerRPCTest.kt b/node/src/test/kotlin/com/r3corda/node/CordaRPCOpsImplTest.kt similarity index 85% rename from node/src/test/kotlin/com/r3corda/node/ServerRPCTest.kt rename to node/src/test/kotlin/com/r3corda/node/CordaRPCOpsImplTest.kt index d0a40228f6..04825afe1b 100644 --- a/node/src/test/kotlin/com/r3corda/node/ServerRPCTest.kt +++ b/node/src/test/kotlin/com/r3corda/node/CordaRPCOpsImplTest.kt @@ -7,7 +7,10 @@ import com.r3corda.core.node.services.Vault import com.r3corda.core.protocols.StateMachineRunId import com.r3corda.core.serialization.OpaqueBytes import com.r3corda.core.transactions.SignedTransaction -import com.r3corda.node.internal.ServerRPCOps +import com.r3corda.node.internal.CordaRPCOpsImpl +import com.r3corda.node.services.User +import com.r3corda.node.services.messaging.CURRENT_RPC_USER +import com.r3corda.node.services.messaging.PermissionException import com.r3corda.node.services.messaging.StateMachineUpdate import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.SimpleNotaryService @@ -17,20 +20,19 @@ import com.r3corda.testing.expectEvents import com.r3corda.testing.node.MockNetwork import com.r3corda.testing.node.MockNetwork.MockNode import com.r3corda.testing.sequence +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.Before import org.junit.Test import rx.Observable import kotlin.test.assertEquals import kotlin.test.assertFalse -/** - * Unit tests for the node monitoring service. - */ -class ServerRPCTest { +class CordaRPCOpsImplTest { + lateinit var network: MockNetwork lateinit var aliceNode: MockNode lateinit var notaryNode: MockNode - lateinit var rpc: ServerRPCOps + lateinit var rpc: CordaRPCOpsImpl lateinit var stateMachineUpdates: Observable lateinit var transactions: Observable lateinit var vaultUpdates: Observable @@ -41,7 +43,8 @@ class ServerRPCTest { val networkMap = network.createNode(advertisedServices = ServiceInfo(NetworkMapService.type)) aliceNode = network.createNode(networkMapAddress = networkMap.info.address) notaryNode = network.createNode(advertisedServices = ServiceInfo(SimpleNotaryService.type), networkMapAddress = networkMap.info.address) - rpc = ServerRPCOps(aliceNode.services, aliceNode.smm, aliceNode.database) + rpc = CordaRPCOpsImpl(aliceNode.services, aliceNode.smm, aliceNode.database) + CURRENT_RPC_USER.set(User("user", "pwd", permissions = setOf(CordaRPCOpsImpl.CASH_PERMISSION))) stateMachineUpdates = rpc.stateMachinesAndUpdates().second transactions = rpc.verifiedTransactions().second @@ -96,7 +99,7 @@ class ServerRPCTest { } @Test - fun issueAndMoveWorks() { + fun `issue and move`() { rpc.executeCommand(ClientToServiceCommand.IssueCash( amount = Amount(100, USD), @@ -174,4 +177,17 @@ class ServerRPCTest { ) } } + + @Test + fun `cash command by user not permissioned for cash`() { + CURRENT_RPC_USER.set(User("user", "pwd", permissions = emptySet())) + assertThatExceptionOfType(PermissionException::class.java).isThrownBy { + rpc.executeCommand(ClientToServiceCommand.IssueCash( + amount = Amount(100, USD), + issueRef = OpaqueBytes(ByteArray(1, { 1 })), + recipient = aliceNode.info.legalIdentity, + notary = notaryNode.info.notaryIdentity + )) + } + } } diff --git a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt index 5795c940b6..d7188af848 100644 --- a/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt +++ b/node/src/test/kotlin/com/r3corda/node/services/ArtemisMessagingTests.kt @@ -1,22 +1,15 @@ package com.r3corda.node.services import com.google.common.net.HostAndPort -import com.r3corda.core.contracts.ClientToServiceCommand -import com.r3corda.core.contracts.ContractState -import com.r3corda.core.contracts.StateAndRef -import com.r3corda.core.crypto.SecureHash import com.r3corda.core.crypto.generateKeyPair import com.r3corda.core.messaging.Message import com.r3corda.core.messaging.createMessage -import com.r3corda.core.node.NodeInfo import com.r3corda.core.node.services.DEFAULT_SESSION_ID -import com.r3corda.core.node.services.NetworkMapCache -import com.r3corda.core.node.services.StateMachineTransactionMapping -import com.r3corda.core.node.services.Vault -import com.r3corda.core.transactions.SignedTransaction import com.r3corda.core.utilities.LogHelper import com.r3corda.node.services.config.NodeConfiguration -import com.r3corda.node.services.messaging.* +import com.r3corda.node.services.messaging.ArtemisMessagingServer +import com.r3corda.node.services.messaging.NodeMessagingClient +import com.r3corda.node.services.messaging.RPCOps import com.r3corda.node.services.network.InMemoryNetworkMapCache import com.r3corda.node.services.transactions.PersistentUniquenessProvider import com.r3corda.node.utilities.AffinityExecutor @@ -31,7 +24,6 @@ import org.junit.Before import org.junit.Rule import org.junit.Test import org.junit.rules.TemporaryFolder -import rx.Observable import java.io.Closeable import java.net.ServerSocket import java.nio.file.Path @@ -51,51 +43,22 @@ class ArtemisMessagingTests { lateinit var config: NodeConfiguration lateinit var dataSource: Closeable lateinit var database: Database + lateinit var userService: RPCUserService + var messagingClient: NodeMessagingClient? = null var messagingServer: ArtemisMessagingServer? = null + val networkMapCache = InMemoryNetworkMapCache() - val rpcOps = object : CordaRPCOps { - override val protocolVersion: Int - get() = throw UnsupportedOperationException() - - override fun stateMachinesAndUpdates(): Pair, Observable> { - throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. - } - - override fun vaultAndUpdates(): Pair>, Observable> { - throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. - } - - override fun verifiedTransactions(): Pair, Observable> { - throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. - } - - override fun stateMachineRecordedTransactionMapping(): Pair, Observable> { - throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. - } - - override fun networkMapUpdates(): Pair, Observable> { - throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. - } - - override fun executeCommand(command: ClientToServiceCommand): TransactionBuildResult { - throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. - } - - override fun addVaultTransactionNote(txnId: SecureHash, txnNote: String) { - throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. - } - - override fun getVaultTransactionNotes(txnId: SecureHash): Iterable { - throw UnsupportedOperationException("not implemented") //To change body of created functions use File | Settings | File Templates. - } + val rpcOps = object : RPCOps { + override val protocolVersion: Int get() = throw UnsupportedOperationException() } @Before fun setUp() { + userService = PropertiesFileRPCUserService(temporaryFolder.newFile().toPath()) // TODO: create a base class that provides a default implementation config = object : NodeConfiguration { override val basedir: Path = temporaryFolder.newFolder().toPath() @@ -134,7 +97,7 @@ class ArtemisMessagingTests { val remoteServerAddress = freeLocalHostAndPort() createMessagingServer(remoteServerAddress).start() - createMessagingClient(server = remoteServerAddress).start(rpcOps) + createMessagingClient(server = remoteServerAddress).start(rpcOps, userService) } @Test @@ -145,14 +108,14 @@ class ArtemisMessagingTests { createMessagingServer(serverAddress).start() messagingClient = createMessagingClient(server = invalidServerAddress) - assertThatThrownBy { messagingClient!!.start(rpcOps) } + assertThatThrownBy { messagingClient!!.start(rpcOps, userService) } messagingClient = null } @Test fun `client should connect to local server`() { createMessagingServer().start() - createMessagingClient().start(rpcOps) + createMessagingClient().start(rpcOps, userService) } @Test @@ -162,7 +125,7 @@ class ArtemisMessagingTests { createMessagingServer().start() val messagingClient = createMessagingClient() - messagingClient.start(rpcOps) + messagingClient.start(rpcOps, userService) thread { messagingClient.run() } messagingClient.addMessageHandler(topic) { message, r -> @@ -187,7 +150,7 @@ class ArtemisMessagingTests { } private fun createMessagingServer(local: HostAndPort = hostAndPort): ArtemisMessagingServer { - return ArtemisMessagingServer(config, local, networkMapCache).apply { + return ArtemisMessagingServer(config, local, networkMapCache, userService).apply { configureWithDevSSLCertificate() messagingServer = this } diff --git a/node/src/test/kotlin/com/r3corda/node/services/PropertiesFileRPCUserServiceTest.kt b/node/src/test/kotlin/com/r3corda/node/services/PropertiesFileRPCUserServiceTest.kt new file mode 100644 index 0000000000..dc55e60182 --- /dev/null +++ b/node/src/test/kotlin/com/r3corda/node/services/PropertiesFileRPCUserServiceTest.kt @@ -0,0 +1,81 @@ +package com.r3corda.node.services + +import com.google.common.jimfs.Configuration.unix +import com.google.common.jimfs.Jimfs +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.After +import org.junit.Test +import java.nio.file.Files + +class PropertiesFileRPCUserServiceTest { + + private val fileSystem = Jimfs.newFileSystem(unix()) + private val file = fileSystem.getPath("users.properties") + + @After + fun cleanUp() { + fileSystem.close() + } + + @Test + fun `file doesn't exist`() { + val service = PropertiesFileRPCUserService(file) + assertThat(service.getUser("user")).isNull() + assertThat(service.users).isEmpty() + } + + @Test + fun `empty file`() { + val service = loadWithContents() + assertThat(service.getUser("user")).isNull() + assertThat(service.users).isEmpty() + } + + @Test + fun `no permissions`() { + val service = loadWithContents("user=password") + assertThat(service.getUser("user")).isEqualTo(User("user", "password", permissions = emptySet())) + assertThat(service.users).containsOnly(User("user", "password", permissions = emptySet())) + } + + @Test + fun `single permission, which is in lower case`() { + val service = loadWithContents("user=password,cash") + assertThat(service.getUser("user")?.permissions).containsOnly("CASH") + } + + @Test + fun `two permissions, which are upper case`() { + val service = loadWithContents("user=password,CASH,ADMIN") + assertThat(service.getUser("user")?.permissions).containsOnly("CASH", "ADMIN") + } + + @Test + fun `two users`() { + val service = loadWithContents("user=password,ADMIN", "user2=password2") + assertThat(service.getUser("user")).isNotNull() + assertThat(service.getUser("user2")).isNotNull() + assertThat(service.users).containsOnly( + User("user", "password", permissions = setOf("ADMIN")), + User("user2", "password2", permissions = emptySet())) + } + + @Test + fun `unknown user`() { + val service = loadWithContents("user=password") + assertThat(service.getUser("test")).isNull() + } + + @Test + fun `Artemis special characters not permitted in usernames`() { + assertThatThrownBy { loadWithContents("user.name=password") } + assertThatThrownBy { loadWithContents("user*name=password") } + assertThatThrownBy { loadWithContents("user#name=password") } + } + + private fun loadWithContents(vararg lines: String): PropertiesFileRPCUserService { + Files.write(file, lines.asList()) + return PropertiesFileRPCUserService(file) + } +} \ No newline at end of file diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt b/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt index d32c52560a..2708bd19c5 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/node/InMemoryMessagingNetwork.kt @@ -288,10 +288,8 @@ class InMemoryMessagingNetwork(val sendManuallyPumped: Boolean) : SingletonSeria override val topicSession: TopicSession get() = topicSession override val data: ByteArray get() = data override val debugTimestamp: Instant = Instant.now() - override fun serialise(): ByteArray = this.serialise() override val uniqueMessageId: UUID = uuid - - override fun toString() = topicSession.toString() + "#" + String(data) + override fun toString() = "$topicSession#${String(data)}" } } diff --git a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt index 9f5dd04315..e7cecabf43 100644 --- a/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt +++ b/test-utils/src/main/kotlin/com/r3corda/testing/node/MockNode.kt @@ -13,10 +13,12 @@ import com.r3corda.core.random63BitValue import com.r3corda.core.utilities.DUMMY_NOTARY_KEY import com.r3corda.core.utilities.loggerFor import com.r3corda.node.internal.AbstractNode +import com.r3corda.node.internal.CordaRPCOpsImpl import com.r3corda.node.services.api.MessagingServiceInternal import com.r3corda.node.services.config.NodeConfiguration import com.r3corda.node.services.keys.E2ETestKeyManagementService import com.r3corda.node.services.messaging.CordaRPCOps +import com.r3corda.node.services.messaging.RPCOps import com.r3corda.node.services.network.InMemoryNetworkMapService import com.r3corda.node.services.network.NetworkMapService import com.r3corda.node.services.transactions.InMemoryUniquenessProvider @@ -129,7 +131,7 @@ class MockNetwork(private val networkSendManuallyPumped: Boolean = false, override fun makeKeyManagementService(): KeyManagementService = E2ETestKeyManagementService(partyKeys) - override fun startMessagingService(cordaRPCOps: CordaRPCOps) { + override fun startMessagingService(rpcOps: RPCOps) { // Nothing to do }