mirror of
https://github.com/corda/corda.git
synced 2025-06-19 15:43:52 +00:00
CORDA-265: Implement "ALL" permission for RPC users. (#306)
* CORDA-265: Implement "ALL" permission for RPC users. Users with this permission in node.conf can use any flow. * CORDA-265: Ensure that we always close the RPC proxy object after each test. * CORDA-265: Refactor construction of dummy RPC client into an abstract base class. * CORDA-265: Document RPC "ALL" permission.
This commit is contained in:
@ -58,7 +58,7 @@ fun requirePermission(permission: String) {
|
||||
// TODO remove the NODE_USER condition once webserver doesn't need it
|
||||
val currentUser = CURRENT_RPC_USER.get()
|
||||
val currentUserPermissions = currentUser.permissions
|
||||
if (currentUser.username != NODE_USER && permission !in currentUserPermissions) {
|
||||
if (currentUser.username != NODE_USER && currentUserPermissions.intersect(listOf(permission, "ALL")).isEmpty()) {
|
||||
throw PermissionException("User not permissioned for $permission, permissions are $currentUserPermissions")
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,98 @@
|
||||
package net.corda.node.messaging
|
||||
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent
|
||||
import net.corda.node.services.messaging.CordaRPCClientImpl
|
||||
import net.corda.node.services.messaging.RPCDispatcher
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
|
||||
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import java.util.*
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
abstract class AbstractClientRPC {
|
||||
|
||||
lateinit var artemis: EmbeddedActiveMQ
|
||||
lateinit var serverSession: ClientSession
|
||||
lateinit var clientSession: ClientSession
|
||||
lateinit var producer: ClientProducer
|
||||
lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
|
||||
|
||||
@Before
|
||||
fun rpcSetup() {
|
||||
// Set up an in-memory Artemis with an RPC requests queue.
|
||||
artemis = EmbeddedActiveMQ()
|
||||
artemis.setConfiguration(ConfigurationImpl().apply {
|
||||
acceptorConfigurations = setOf(TransportConfiguration(InVMAcceptorFactory::class.java.name))
|
||||
isSecurityEnabled = false
|
||||
isPersistenceEnabled = false
|
||||
})
|
||||
artemis.start()
|
||||
|
||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(TransportConfiguration(InVMConnectorFactory::class.java.name))
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
serverSession = sessionFactory.createSession()
|
||||
serverSession.start()
|
||||
|
||||
serverSession.createTemporaryQueue(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE, ArtemisMessagingComponent.RPC_REQUESTS_QUEUE)
|
||||
producer = serverSession.createProducer()
|
||||
serverThread = AffinityExecutor.ServiceAffinityExecutor("unit-tests-rpc-dispatch-thread", 1)
|
||||
serverSession.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 'BINDING_REMOVED'")
|
||||
|
||||
clientSession = sessionFactory.createSession()
|
||||
clientSession.start()
|
||||
|
||||
LogHelper.setLevel("+net.corda.rpc")
|
||||
}
|
||||
|
||||
@After
|
||||
fun rpcShutdown() {
|
||||
safeClose(producer)
|
||||
clientSession.stop()
|
||||
serverSession.stop()
|
||||
artemis.stop()
|
||||
serverThread.shutdownNow()
|
||||
}
|
||||
|
||||
fun <T: RPCOps> rpcProxyFor(rpcUser: User, rpcImpl: T, type: Class<T>): T {
|
||||
val userService = object : RPCUserService {
|
||||
override fun getUser(username: String): User? = if (username == rpcUser.username) rpcUser else null
|
||||
override val users: List<User> get() = listOf(rpcUser)
|
||||
}
|
||||
|
||||
val dispatcher = object : RPCDispatcher(rpcImpl, userService, "SomeName") {
|
||||
override fun send(data: SerializedBytes<*>, toAddress: String) {
|
||||
val msg = serverSession.createMessage(false).apply {
|
||||
writeBodyBufferBytes(data.bytes)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(Message.HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
producer.send(toAddress, msg)
|
||||
}
|
||||
|
||||
override fun getUser(message: ClientMessage): User = rpcUser
|
||||
}
|
||||
|
||||
val serverNotifConsumer = serverSession.createConsumer("rpc.qremovals")
|
||||
val serverConsumer = serverSession.createConsumer(ArtemisMessagingComponent.RPC_REQUESTS_QUEUE)
|
||||
dispatcher.start(serverConsumer, serverNotifConsumer, serverThread)
|
||||
return CordaRPCClientImpl(clientSession, ReentrantLock(), rpcUser.username).proxyFor(type)
|
||||
}
|
||||
|
||||
fun safeClose(obj: Any) = try { (obj as AutoCloseable).close() } catch (e: Exception) {}
|
||||
}
|
@ -6,109 +6,38 @@ import com.google.common.util.concurrent.SettableFuture
|
||||
import net.corda.core.getOrThrow
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.messaging.RPCReturnsObservables
|
||||
import net.corda.core.serialization.SerializedBytes
|
||||
import net.corda.core.success
|
||||
import net.corda.core.utilities.LogHelper
|
||||
import net.corda.node.services.RPCUserService
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
|
||||
import net.corda.node.services.messaging.CURRENT_RPC_USER
|
||||
import net.corda.node.services.messaging.CordaRPCClientImpl
|
||||
import net.corda.node.services.messaging.RPCDispatcher
|
||||
import net.corda.node.services.messaging.RPCSinceVersion
|
||||
import net.corda.node.utilities.AffinityExecutor
|
||||
import org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.TransportConfiguration
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory
|
||||
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory
|
||||
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
import rx.subjects.PublishSubject
|
||||
import java.io.Closeable
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import kotlin.test.assertEquals
|
||||
import kotlin.test.assertFailsWith
|
||||
import kotlin.test.assertTrue
|
||||
|
||||
class ClientRPCInfrastructureTests {
|
||||
class ClientRPCInfrastructureTests : AbstractClientRPC() {
|
||||
// TODO: Test that timeouts work
|
||||
|
||||
lateinit var artemis: EmbeddedActiveMQ
|
||||
lateinit var serverSession: ClientSession
|
||||
lateinit var clientSession: ClientSession
|
||||
lateinit var producer: ClientProducer
|
||||
lateinit var serverThread: AffinityExecutor.ServiceAffinityExecutor
|
||||
lateinit var proxy: TestOps
|
||||
|
||||
private val authenticatedUser = User("test", "password", permissions = setOf())
|
||||
|
||||
@Before
|
||||
fun setup() {
|
||||
// Set up an in-memory Artemis with an RPC requests queue.
|
||||
artemis = EmbeddedActiveMQ()
|
||||
artemis.setConfiguration(ConfigurationImpl().apply {
|
||||
acceptorConfigurations = setOf(TransportConfiguration(InVMAcceptorFactory::class.java.name))
|
||||
isSecurityEnabled = false
|
||||
isPersistenceEnabled = false
|
||||
})
|
||||
artemis.start()
|
||||
val serverLocator = ActiveMQClient.createServerLocatorWithoutHA(TransportConfiguration(InVMConnectorFactory::class.java.name))
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
|
||||
serverSession = sessionFactory.createSession()
|
||||
serverSession.start()
|
||||
serverSession.createTemporaryQueue(RPC_REQUESTS_QUEUE, RPC_REQUESTS_QUEUE)
|
||||
producer = serverSession.createProducer()
|
||||
val userService = object : RPCUserService {
|
||||
override fun getUser(username: String): User? = throw UnsupportedOperationException()
|
||||
override val users: List<User> get() = throw UnsupportedOperationException()
|
||||
}
|
||||
val dispatcher = object : RPCDispatcher(TestOpsImpl(), userService, "SomeName") {
|
||||
override fun send(data: SerializedBytes<*>, toAddress: String) {
|
||||
val msg = serverSession.createMessage(false).apply {
|
||||
writeBodyBufferBytes(data.bytes)
|
||||
// Use the magic deduplication property built into Artemis as our message identity too
|
||||
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
|
||||
}
|
||||
producer.send(toAddress, msg)
|
||||
}
|
||||
|
||||
override fun getUser(message: ClientMessage): User = authenticatedUser
|
||||
}
|
||||
serverThread = AffinityExecutor.ServiceAffinityExecutor("unit-tests-rpc-dispatch-thread", 1)
|
||||
val serverConsumer = serverSession.createConsumer(RPC_REQUESTS_QUEUE)
|
||||
serverSession.createTemporaryQueue("activemq.notifications", "rpc.qremovals", "_AMQ_NotifType = 'BINDING_REMOVED'")
|
||||
val serverNotifConsumer = serverSession.createConsumer("rpc.qremovals")
|
||||
dispatcher.start(serverConsumer, serverNotifConsumer, serverThread)
|
||||
|
||||
clientSession = sessionFactory.createSession()
|
||||
clientSession.start()
|
||||
|
||||
LogHelper.setLevel("+net.corda.rpc")
|
||||
|
||||
proxy = CordaRPCClientImpl(clientSession, ReentrantLock(), authenticatedUser.username).proxyFor(TestOps::class.java)
|
||||
proxy = rpcProxyFor(authenticatedUser, TestOpsImpl(), TestOps::class.java)
|
||||
}
|
||||
|
||||
@After
|
||||
fun shutdown() {
|
||||
(proxy as Closeable?)?.close()
|
||||
clientSession.stop()
|
||||
serverSession.stop()
|
||||
artemis.stop()
|
||||
serverThread.shutdownNow()
|
||||
safeClose(proxy)
|
||||
}
|
||||
|
||||
interface TestOps : RPCOps {
|
||||
|
@ -0,0 +1,84 @@
|
||||
package net.corda.node.messaging
|
||||
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.node.services.User
|
||||
import net.corda.node.services.messaging.*
|
||||
import org.junit.After
|
||||
import org.junit.Test
|
||||
import kotlin.test.*
|
||||
|
||||
class RPCPermissionsTest : AbstractClientRPC() {
|
||||
companion object {
|
||||
const val DUMMY_FLOW = "StartFlow.net.corda.flows.DummyFlow"
|
||||
const val OTHER_FLOW = "StartFlow.net.corda.flows.OtherFlow"
|
||||
const val ALL_ALLOWED = "ALL"
|
||||
}
|
||||
|
||||
lateinit var proxy: TestOps
|
||||
|
||||
@After
|
||||
fun shutdown() {
|
||||
safeClose(proxy)
|
||||
}
|
||||
|
||||
/*
|
||||
* RPC operation.
|
||||
*/
|
||||
interface TestOps : RPCOps {
|
||||
fun validatePermission(str: String)
|
||||
}
|
||||
|
||||
class TestOpsImpl : TestOps {
|
||||
override val protocolVersion = 1
|
||||
override fun validatePermission(str: String) = requirePermission(str)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an RPC proxy for the given user.
|
||||
*/
|
||||
private fun proxyFor(rpcUser: User): TestOps = rpcProxyFor(rpcUser, TestOpsImpl(), TestOps::class.java)
|
||||
|
||||
private fun userOf(name: String, permissions: Set<String>) = User(name, "password", permissions)
|
||||
|
||||
@Test
|
||||
fun `empty user cannot use any flows`() {
|
||||
val emptyUser = userOf("empty", emptySet())
|
||||
proxy = proxyFor(emptyUser)
|
||||
assertFailsWith(PermissionException::class,
|
||||
"User ${emptyUser.username} should not be allowed to use $DUMMY_FLOW.",
|
||||
{ proxy.validatePermission(DUMMY_FLOW) })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `admin user can use any flow`() {
|
||||
val adminUser = userOf("admin", setOf(ALL_ALLOWED))
|
||||
proxy = proxyFor(adminUser)
|
||||
proxy.validatePermission(DUMMY_FLOW)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `joe user is allowed to use DummyFlow`() {
|
||||
val joeUser = userOf("joe", setOf(DUMMY_FLOW))
|
||||
proxy = proxyFor(joeUser)
|
||||
proxy.validatePermission(DUMMY_FLOW)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `joe user is not allowed to use OtherFlow`() {
|
||||
val joeUser = userOf("joe", setOf(DUMMY_FLOW))
|
||||
proxy = proxyFor(joeUser)
|
||||
assertFailsWith(PermissionException::class,
|
||||
"User ${joeUser.username} should not be allowed to use $OTHER_FLOW",
|
||||
{ proxy.validatePermission(OTHER_FLOW) })
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `check ALL is implemented the correct way round` () {
|
||||
val joeUser = userOf("joe", setOf(DUMMY_FLOW))
|
||||
proxy = proxyFor(joeUser)
|
||||
assertFailsWith(PermissionException::class,
|
||||
"Permission $ALL_ALLOWED should not do anything for User ${joeUser.username}",
|
||||
{ proxy.validatePermission(ALL_ALLOWED) })
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user