RPC muxing, multithreading, RPC driver, performance tests

This commit is contained in:
Andras Slemmer
2017-03-29 17:28:02 +01:00
parent 25dbac0f07
commit de88ad4f40
63 changed files with 3223 additions and 1417 deletions

View File

@ -17,9 +17,8 @@ class BootTests {
fun `java deserialization is disabled`() {
driver {
val user = User("u", "p", setOf(startFlowPermission<ObjectInputStreamFlow>()))
val future = startNode(rpcUsers = listOf(user)).getOrThrow().rpcClientToNode().apply {
start(user.username, user.password)
}.proxy().startFlow(::ObjectInputStreamFlow).returnValue
val future = startNode(rpcUsers = listOf(user)).getOrThrow().rpcClientToNode().
start(user.username, user.password).proxy.startFlow(::ObjectInputStreamFlow).returnValue
assertThatThrownBy { future.getOrThrow() }.isInstanceOf(InvalidClassException::class.java).hasMessage("filter status: REJECTED")
}
}

View File

@ -60,8 +60,7 @@ class DistributedServiceTests : DriverBasedTest() {
// Connect to Alice and the notaries
fun connectRpc(node: NodeHandle): CordaRPCOps {
val client = node.rpcClientToNode()
client.start("test", "test")
return client.proxy()
return client.start("test", "test").proxy
}
aliceProxy = connectRpc(alice)
val rpcClientsToNotaries = notaries.map(::connectRpc)

View File

@ -2,7 +2,7 @@ package net.corda.services.messaging
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.nodeapi.RPCApi
import net.corda.testing.messaging.SimpleMQClient
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.ActiveMQClusterSecurityException
@ -24,7 +24,7 @@ class MQSecurityAsNodeTest : MQSecurityTest() {
@Test
fun `send message to RPC requests address`() {
assertSendAttackFails(RPC_REQUESTS_QUEUE)
assertSendAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
}
@Test

View File

@ -1,6 +1,7 @@
package net.corda.services.messaging
import net.corda.nodeapi.User
import net.corda.testing.configureTestSSL
import net.corda.testing.messaging.SimpleMQClient
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException
import org.assertj.core.api.Assertions.assertThatExceptionOfType
@ -23,14 +24,13 @@ class MQSecurityAsRPCTest : MQSecurityTest() {
override val extraRPCUsers = listOf(User("evil", "pass", permissions = emptySet()))
override fun startAttacker(attacker: SimpleMQClient) {
attacker.loginToRPC(extraRPCUsers[0])
attacker.start(extraRPCUsers[0].username, extraRPCUsers[0].password, false)
}
@Test
fun `login to a ssl port as a RPC user`() {
val attacker = clientTo(alice.configuration.p2pAddress)
assertThatExceptionOfType(ActiveMQSecurityException::class.java).isThrownBy {
attacker.loginToRPC(extraRPCUsers[0], enableSSL = true)
loginToRPC(alice.configuration.p2pAddress, extraRPCUsers[0], configureTestSSL())
}
}
}

View File

@ -2,7 +2,7 @@ package net.corda.services.messaging
import co.paralleluniverse.fibers.Suspendable
import com.google.common.net.HostAndPort
import net.corda.client.rpc.CordaRPCClientImpl
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.crypto.Party
import net.corda.core.crypto.generateKeyPair
import net.corda.core.crypto.toBase58String
@ -10,19 +10,16 @@ import net.corda.core.flows.FlowLogic
import net.corda.core.getOrThrow
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.random63BitValue
import net.corda.core.seconds
import net.corda.core.utilities.ALICE
import net.corda.core.utilities.BOB
import net.corda.core.utilities.unwrap
import net.corda.node.internal.Node
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.INTERNAL_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NETWORK_MAP_QUEUE
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NOTIFICATIONS_ADDRESS
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.P2P_QUEUE
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEERS_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_QUEUE_REMOVALS_QUEUE
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.RPC_REQUESTS_QUEUE
import net.corda.nodeapi.RPCApi
import net.corda.nodeapi.User
import net.corda.nodeapi.config.SSLConfiguration
import net.corda.testing.configureTestSSL
@ -36,7 +33,6 @@ import org.junit.After
import org.junit.Before
import org.junit.Test
import java.util.*
import java.util.concurrent.locks.ReentrantLock
import kotlin.test.assertEquals
/**
@ -108,7 +104,7 @@ abstract class MQSecurityTest : NodeBasedTest() {
@Test
fun `consume message from RPC requests queue`() {
assertConsumeAttackFails(RPC_REQUESTS_QUEUE)
assertConsumeAttackFails(RPCApi.RPC_SERVER_QUEUE_NAME)
}
@Test
@ -119,21 +115,16 @@ abstract class MQSecurityTest : NodeBasedTest() {
@Test
fun `create queue for valid RPC user`() {
val user1Queue = "$CLIENTS_PREFIX${rpcUser.username}.rpc.${random63BitValue()}"
val user1Queue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.${random63BitValue()}"
assertTempQueueCreationAttackFails(user1Queue)
}
@Test
fun `create queue for invalid RPC user`() {
val invalidRPCQueue = "$CLIENTS_PREFIX${random63BitValue()}.rpc.${random63BitValue()}"
val invalidRPCQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${random63BitValue()}.${random63BitValue()}"
assertTempQueueCreationAttackFails(invalidRPCQueue)
}
@Test
fun `consume message from RPC queue removals queue`() {
assertConsumeAttackFails(RPC_QUEUE_REMOVALS_QUEUE)
}
@Test
fun `send message to notifications address`() {
assertSendAttackFails(NOTIFICATIONS_ADDRESS)
@ -157,22 +148,16 @@ abstract class MQSecurityTest : NodeBasedTest() {
return client
}
fun loginToRPC(target: HostAndPort, rpcUser: User, sslConfiguration: SSLConfiguration? = null): SimpleMQClient {
val client = clientTo(target, sslConfiguration)
client.loginToRPC(rpcUser)
return client
}
fun SimpleMQClient.loginToRPC(rpcUser: User, enableSSL: Boolean = false): CordaRPCOps {
start(rpcUser.username, rpcUser.password, enableSSL)
val clientImpl = CordaRPCClientImpl(session, ReentrantLock(), rpcUser.username)
return clientImpl.proxyFor(CordaRPCOps::class.java, timeout = 1.seconds)
fun loginToRPC(target: HostAndPort, rpcUser: User, sslConfiguration: SSLConfiguration? = null): CordaRPCOps {
return CordaRPCClient(target, sslConfiguration).start(rpcUser.username, rpcUser.password).proxy
}
fun loginToRPCAndGetClientQueue(): String {
val rpcClient = loginToRPC(alice.configuration.rpcAddress!!, rpcUser)
val clientQueueQuery = SimpleString("$CLIENTS_PREFIX${rpcUser.username}.rpc.*")
return rpcClient.session.addressQuery(clientQueueQuery).queueNames.single().toString()
loginToRPC(alice.configuration.rpcAddress!!, rpcUser)
val clientQueueQuery = SimpleString("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.${rpcUser.username}.*")
val client = clientTo(alice.configuration.rpcAddress!!)
client.start(rpcUser.username, rpcUser.password, false)
return client.session.addressQuery(clientQueueQuery).queueNames.single().toString()
}
fun assertAllQueueCreationAttacksFail(queue: String) {

View File

@ -7,13 +7,10 @@ import com.google.common.util.concurrent.*
import com.typesafe.config.Config
import com.typesafe.config.ConfigRenderOptions
import net.corda.client.rpc.CordaRPCClient
import net.corda.core.ThreadBox
import net.corda.core.*
import net.corda.core.crypto.Party
import net.corda.core.crypto.X509Utilities
import net.corda.core.crypto.commonName
import net.corda.core.div
import net.corda.core.flatMap
import net.corda.core.map
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.ServiceInfo
@ -40,6 +37,7 @@ import java.io.File
import java.net.*
import java.nio.file.Path
import java.nio.file.Paths
import java.time.Duration
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter
@ -110,6 +108,26 @@ interface DriverDSLExposedInterface {
fun startNetworkMapService()
fun waitForAllNodesToFinish()
/**
* Polls a function until it returns a non-null value. Note that there is no timeout on the polling.
*
* @param pollName A description of what is being polled.
* @param pollInterval The interval of polling.
* @param warnCount The number of polls after the Driver gives a warning.
* @param check The function being polled.
* @return A future that completes with the non-null value [check] has returned.
*/
fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration = 500.millis, warnCount: Int = 120, check: () -> A?): ListenableFuture<A>
/**
* Polls the given function until it returns true.
* @see pollUntilNonNull
*/
fun pollUntilTrue(pollName: String, pollInterval: Duration = 500.millis, warnCount: Int = 120, check: () -> Boolean): ListenableFuture<Unit> {
return pollUntilNonNull(pollName, pollInterval, warnCount) { if (check()) Unit else null }
}
val shutdownManager: ShutdownManager
}
interface DriverDSLInternalInterface : DriverDSLExposedInterface {
@ -216,15 +234,13 @@ fun <DI : DriverDSLExposedInterface, D : DriverDSLInternalInterface, A> genericD
var shutdownHook: Thread? = null
try {
driverDsl.start()
val returnValue = dsl(coerce(driverDsl))
shutdownHook = Thread({
driverDsl.shutdown()
})
Runtime.getRuntime().addShutdownHook(shutdownHook)
return returnValue
return dsl(coerce(driverDsl))
} catch (exception: Throwable) {
println("Driver shutting down because of exception $exception")
exception.printStackTrace()
log.error("Driver shutting down because of exception", exception)
throw exception
} finally {
driverDsl.shutdown()
@ -271,7 +287,7 @@ fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort
fun <A> poll(
executorService: ScheduledExecutorService,
pollName: String,
pollIntervalMs: Long = 500,
pollInterval: Duration = 500.millis,
warnCount: Int = 120,
check: () -> A?
): ListenableFuture<A> {
@ -286,7 +302,7 @@ fun <A> poll(
executorService.schedule(task@ {
counter++
if (counter == warnCount) {
log.warn("Been polling $pollName for ${pollIntervalMs * warnCount / 1000.0} seconds...")
log.warn("Been polling $pollName for ${pollInterval.seconds * warnCount} seconds...")
}
val result = try {
check()
@ -299,7 +315,7 @@ fun <A> poll(
} else {
resultFuture.set(result)
}
}, pollIntervalMs, MILLISECONDS)
}, pollInterval.toMillis(), MILLISECONDS)
}
schedulePoll()
return resultFuture
@ -326,7 +342,13 @@ class ShutdownManager(private val executorService: ExecutorService) {
/** Could not get all of them, collect what we have */
shutdownFutures.filter { it.isDone }.map { it.get() }
}
shutdowns.reversed().forEach { it() }
shutdowns.reversed().forEach { shutdown ->
try {
shutdown()
} catch (throwable: Throwable) {
log.error("Exception while shutting down", throwable)
}
}
}
fun registerShutdown(shutdown: ListenableFuture<() -> Unit>) {
@ -335,6 +357,7 @@ class ShutdownManager(private val executorService: ExecutorService) {
registeredShutdowns.add(shutdown)
}
}
fun registerShutdown(shutdown: () -> Unit) = registerShutdown(Futures.immediateFuture(shutdown))
fun registerProcessShutdown(processFuture: ListenableFuture<Process>) {
val processShutdown = processFuture.map { process ->
@ -368,8 +391,10 @@ class DriverDSL(
) : DriverDSLInternalInterface {
private val networkMapLegalName = DUMMY_MAP.name
private val networkMapAddress = portAllocation.nextHostAndPort()
val executorService: ListeningScheduledExecutorService = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(2))
val shutdownManager = ShutdownManager(executorService)
val executorService: ListeningScheduledExecutorService = MoreExecutors.listeningDecorator(
Executors.newScheduledThreadPool(2, ThreadFactoryBuilder().setNameFormat("driver-pool-thread-%d").build())
)
override val shutdownManager = ShutdownManager(executorService)
class State {
val processes = ArrayList<ListenableFuture<Process>>()
@ -401,9 +426,6 @@ class DriverDSL(
override fun shutdown() {
shutdownManager.shutdown()
// Check that we shut down properly
addressMustNotBeBound(executorService, networkMapAddress).get()
executorService.shutdown()
}
@ -411,8 +433,9 @@ class DriverDSL(
val client = CordaRPCClient(nodeAddress, sslConfig)
return poll(executorService, "for RPC connection") {
try {
client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
return@poll client.proxy()
val connection = client.start(ArtemisMessagingComponent.NODE_USER, ArtemisMessagingComponent.NODE_USER)
shutdownManager.registerShutdown { connection.close() }
return@poll connection.proxy
} catch(e: Exception) {
log.error("Exception $e, Retrying RPC connection at $nodeAddress")
null
@ -566,6 +589,12 @@ class DriverDSL(
registerProcess(startNode)
}
override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): ListenableFuture<A> {
val pollFuture = poll(executorService, pollName, pollInterval, warnCount, check)
shutdownManager.registerShutdown { pollFuture.cancel(true) }
return pollFuture
}
companion object {
val name = arrayOf(
ALICE.name,

View File

@ -97,7 +97,7 @@ abstract class AbstractNode(open val configuration: NodeConfiguration,
CashExitFlow::class.java to setOf(Amount::class.java, PartyAndReference::class.java),
CashIssueFlow::class.java to setOf(Amount::class.java, OpaqueBytes::class.java, Party::class.java),
CashPaymentFlow::class.java to setOf(Amount::class.java, Party::class.java),
FinalityFlow::class.java to emptySet(),
FinalityFlow::class.java to setOf(LinkedHashSet::class.java),
ContractUpgradeFlow::class.java to emptySet()
)
}

View File

@ -21,11 +21,11 @@ import net.corda.core.node.services.vault.Sort
import net.corda.core.serialization.CordaSerializable
import net.corda.core.transactions.SignedTransaction
import net.corda.node.services.api.ServiceHubInternal
import net.corda.node.services.messaging.getRpcContext
import net.corda.node.services.messaging.requirePermission
import net.corda.node.services.startFlowPermission
import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.utilities.transaction
import net.corda.nodeapi.CURRENT_RPC_USER
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
import rx.Observable
@ -121,8 +121,9 @@ class CordaRPCOpsImpl(
// TODO: Check that this flow is annotated as being intended for RPC invocation
override fun <T : Any> startTrackedFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowProgressHandle<T> {
requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(CURRENT_RPC_USER.get().username)
val rpcContext = getRpcContext()
rpcContext.requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(rpcContext.currentUser.username)
val stateMachine = services.invokeFlowAsync(logicType, currentUser, *args)
return FlowProgressHandleImpl(
id = stateMachine.id,
@ -133,8 +134,9 @@ class CordaRPCOpsImpl(
// TODO: Check that this flow is annotated as being intended for RPC invocation
override fun <T : Any> startFlowDynamic(logicType: Class<out FlowLogic<T>>, vararg args: Any?): FlowHandle<T> {
requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(CURRENT_RPC_USER.get().username)
val rpcContext = getRpcContext()
rpcContext.requirePermission(startFlowPermission(logicType))
val currentUser = FlowInitiator.RPC(rpcContext.currentUser.username)
val stateMachine = services.invokeFlowAsync(logicType, currentUser, *args)
return FlowHandleImpl(id = stateMachine.id, returnValue = stateMachine.resultFuture)
}

View File

@ -256,7 +256,7 @@ class Node(override val configuration: FullNodeConfiguration,
/** Starts a blocking event loop for message dispatch. */
fun run() {
(net as NodeMessagingClient).run()
(net as NodeMessagingClient).run(messageBroker!!.serverControl)
}
// TODO: Do we really need setup?

View File

@ -21,10 +21,10 @@ import net.corda.node.services.messaging.NodeLoginModule.Companion.PEER_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.RPC_ROLE
import net.corda.node.services.messaging.NodeLoginModule.Companion.VERIFIER_ROLE
import net.corda.nodeapi.*
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.CLIENTS_PREFIX
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.PEER_USER
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.activemq.artemis.core.config.BridgeConfiguration
import org.apache.activemq.artemis.core.config.Configuration
import org.apache.activemq.artemis.core.config.CoreQueueConfiguration
@ -37,6 +37,8 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactor
import org.apache.activemq.artemis.core.security.Role
import org.apache.activemq.artemis.core.server.ActiveMQServer
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy
import org.apache.activemq.artemis.core.settings.impl.AddressSettings
import org.apache.activemq.artemis.spi.core.remoting.*
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager
import org.apache.activemq.artemis.spi.core.security.jaas.CertificateCallback
@ -97,6 +99,7 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
private val mutex = ThreadBox(InnerState())
private lateinit var activeMQServer: ActiveMQServer
val serverControl: ActiveMQServerControl get() = activeMQServer.activeMQServerControl
private val _networkMapConnectionFuture = config.networkMapService?.let { SettableFuture.create<Unit>() }
/**
* A [ListenableFuture] which completes when the server successfully connects to the network map node. If a
@ -185,10 +188,19 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
// Create an RPC queue: this will service locally connected clients only (not via a bridge) and those
// clients must have authenticated. We could use a single consumer for everything and perhaps we should,
// but these queues are not worth persisting.
queueConfig(RPC_REQUESTS_QUEUE, durable = false),
// The custom name for the queue is intentional - we may wish other things to subscribe to the
// NOTIFICATIONS_ADDRESS with different filters in future
queueConfig(RPC_QUEUE_REMOVALS_QUEUE, address = NOTIFICATIONS_ADDRESS, filter = "_AMQ_NotifType = 1", durable = false)
queueConfig(RPCApi.RPC_SERVER_QUEUE_NAME, durable = false),
queueConfig(
name = RPCApi.RPC_CLIENT_BINDING_REMOVALS,
address = NOTIFICATIONS_ADDRESS,
filter = RPCApi.RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION,
durable = false
)
)
addressesSettings = mapOf(
"${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.#" to AddressSettings().apply {
maxSizeBytes = 10L * MAX_FILE_SIZE
addressFullMessagePolicy = AddressFullMessagePolicy.FAIL
}
)
configureAddressSecurity()
}
@ -213,16 +225,16 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
val nodeInternalRole = Role(NODE_ROLE, true, true, true, true, true, true, true, true)
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
securityRoles[RPC_REQUESTS_QUEUE] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
securityRoles[RPCApi.RPC_SERVER_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
// TODO remove the NODE_USER role once the webserver doesn't need it
securityRoles["$CLIENTS_PREFIX$NODE_USER.rpc.*"] = setOf(nodeInternalRole)
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$NODE_USER.#"] = setOf(nodeInternalRole)
for ((username) in userService.users) {
securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf(
securityRoles["${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username.#"] = setOf(
nodeInternalRole,
restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true))
restrictedRole("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true))
}
securityRoles[VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, consume = true))
securityRoles["${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.*"] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, send = true))
securityRoles["${VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX}.#"] = setOf(nodeInternalRole, restrictedRole(VERIFIER_ROLE, send = true))
}
private fun restrictedRole(name: String, send: Boolean = false, consume: Boolean = false, createDurableQueue: Boolean = false,
@ -629,7 +641,7 @@ class NodeLoginModule : LoginModule {
throw FailedLoginException("Password for user $username does not match")
}
principals += RolePrincipal(RPC_ROLE) // This enables the RPC client to send requests
principals += RolePrincipal("$CLIENTS_PREFIX$username") // This enables the RPC client to receive responses
principals += RolePrincipal("${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.$username") // This enables the RPC client to receive responses
return username
}

View File

@ -11,7 +11,6 @@ import net.corda.core.node.VersionInfo
import net.corda.core.node.services.PartyInfo
import net.corda.core.node.services.TransactionVerifierService
import net.corda.core.random63BitValue
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.opaque
import net.corda.core.success
import net.corda.core.transactions.LedgerTransaction
@ -25,10 +24,7 @@ import net.corda.node.services.statemachine.StateMachineManager
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
import net.corda.node.services.transactions.OutOfProcessTransactionVerifierService
import net.corda.node.utilities.*
import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.ArtemisTcpTransport
import net.corda.nodeapi.ConnectionDirection
import net.corda.nodeapi.VerifierApi
import net.corda.nodeapi.*
import net.corda.nodeapi.VerifierApi.VERIFICATION_REQUESTS_QUEUE_NAME
import net.corda.nodeapi.VerifierApi.VERIFICATION_RESPONSES_QUEUE_NAME_PREFIX
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException
@ -36,6 +32,7 @@ import org.apache.activemq.artemis.api.core.Message.*
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.*
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.bouncycastle.asn1.x500.X500Name
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.ResultRow
@ -71,7 +68,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val versionInfo: VersionInfo,
val serverHostPort: HostAndPort,
val myIdentity: PublicKey?,
val nodeExecutor: AffinityExecutor,
val nodeExecutor: AffinityExecutor.ServiceAffinityExecutor,
val database: Database,
val networkMapRegistrationFuture: ListenableFuture<Unit>,
val monitoringService: MonitoringService
@ -100,11 +97,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
var producer: ClientProducer? = null
var p2pConsumer: ClientConsumer? = null
var session: ClientSession? = null
var clientFactory: ClientSessionFactory? = null
var rpcDispatcher: RPCDispatcher? = null
var sessionFactory: ClientSessionFactory? = null
var rpcServer: RPCServer? = null
// Consumer for inbound client RPC messages.
var rpcConsumer: ClientConsumer? = null
var rpcNotificationConsumer: ClientConsumer? = null
var verificationResponseConsumer: ClientConsumer? = null
}
@ -163,18 +158,19 @@ class NodeMessagingClient(override val config: NodeConfiguration,
val tcpTransport = ArtemisTcpTransport.tcpTransport(ConnectionDirection.Outbound(), serverHostPort, config)
val locator = ActiveMQClient.createServerLocatorWithoutHA(tcpTransport)
locator.minLargeMessageSize = ArtemisMessagingServer.MAX_FILE_SIZE
clientFactory = locator.createSessionFactory()
sessionFactory = locator.createSessionFactory()
// Login using the node username. The broker will authentiate us as its node (as opposed to another peer)
// using our TLS certificate.
// Note that the acknowledgement of messages is not flushed to the Artermis journal until the default buffer
// size of 1MB is acknowledged.
val session = clientFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
val session = sessionFactory!!.createSession(NODE_USER, NODE_USER, false, true, true, locator.isPreAcknowledge, DEFAULT_ACK_BATCH_SIZE)
this.session = session
session.start()
// Create a general purpose producer.
producer = session.createProducer()
val producer = session.createProducer()
this.producer = producer
// Create a queue, consumer and producer for handling P2P network messages.
p2pConsumer = makeP2PConsumer(session, true)
@ -190,9 +186,7 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
rpcConsumer = session.createConsumer(RPC_REQUESTS_QUEUE)
rpcNotificationConsumer = session.createConsumer(RPC_QUEUE_REMOVALS_QUEUE)
rpcDispatcher = createRPCDispatcher(rpcOps, userService, config.myLegalName)
rpcServer = RPCServer(rpcOps, NODE_USER, NODE_USER, locator, userService, config.myLegalName)
fun checkVerifierCount() {
if (session.queueQuery(SimpleString(VERIFICATION_REQUESTS_QUEUE_NAME)).consumerCount == 0) {
@ -269,12 +263,12 @@ class NodeMessagingClient(override val config: NodeConfiguration,
return true
}
private fun runPreNetworkMap() {
private fun runPreNetworkMap(serverControl: ActiveMQServerControl) {
val consumer = state.locked {
check(started) { "start must be called first" }
check(!running) { "run can't be called twice" }
running = true
rpcDispatcher!!.start(rpcConsumer!!, rpcNotificationConsumer!!, nodeExecutor)
rpcServer!!.start(serverControl)
(verifierService as? OutOfProcessTransactionVerifierService)?.start(verificationResponseConsumer!!)
p2pConsumer!!
}
@ -300,9 +294,9 @@ class NodeMessagingClient(override val config: NodeConfiguration,
* we get our network map fetch response. At that point the filtering consumer is closed and we proceed to the second loop and
* consume all messages via a new consumer without a filter applied.
*/
fun run() {
fun run(serverControl: ActiveMQServerControl) {
// Build the network map.
runPreNetworkMap()
runPreNetworkMap(serverControl)
// Process everything else once we have the network map.
runPostNetworkMap()
shutdownLatch.countDown()
@ -404,17 +398,13 @@ class NodeMessagingClient(override val config: NodeConfiguration,
// Only first caller to gets running true to protect against double stop, which seems to happen in some integration tests.
if (running) {
state.locked {
rpcConsumer?.close()
rpcConsumer = null
rpcNotificationConsumer?.close()
rpcNotificationConsumer = null
producer?.close()
producer = null
// Ensure any trailing messages are committed to the journal
session!!.commit()
// Closing the factory closes all the sessions it produced as well.
clientFactory!!.close()
clientFactory = null
sessionFactory!!.close()
sessionFactory = null
}
}
}
@ -547,22 +537,6 @@ class NodeMessagingClient(override val config: NodeConfiguration,
}
}
private fun createRPCDispatcher(ops: RPCOps, userService: RPCUserService, nodeLegalName: X500Name): RPCDispatcher =
object : RPCDispatcher(ops, userService, nodeLegalName) {
override fun send(data: SerializedBytes<*>, toAddress: String) {
messagingExecutor.fetchFrom {
state.locked {
val msg = session!!.createMessage(false).apply {
writeBodyBufferBytes(data.bytes)
// Use the magic deduplication property built into Artemis as our message identity too
putStringProperty(HDR_DUPLICATE_DETECTION_ID, SimpleString(UUID.randomUUID().toString()))
}
producer!!.send(toAddress, msg)
}
}
}
}
private fun createOutOfProcessVerifierService(): TransactionVerifierService {
return object : OutOfProcessTransactionVerifierService(monitoringService) {
override fun sendRequest(nonce: Long, transaction: LedgerTransaction) {

View File

@ -1,219 +0,0 @@
package net.corda.node.services.messaging
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.HashMultimap
import net.corda.core.ErrorOr
import net.corda.core.crypto.commonName
import net.corda.core.messaging.RPCOps
import net.corda.core.messaging.RPCReturnsObservables
import net.corda.core.serialization.SerializedBytes
import net.corda.core.serialization.deserialize
import net.corda.core.serialization.serialize
import net.corda.core.utilities.debug
import net.corda.node.services.RPCUserService
import net.corda.node.utilities.AffinityExecutor
import net.corda.nodeapi.*
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.bouncycastle.asn1.x500.X500Name
import rx.Notification
import rx.Observable
import rx.Subscription
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.AtomicInteger
/**
* Intended to service transient clients only (not p2p nodes) for short-lived, transient request/response pairs.
* If you need robustness, this is the wrong system. If you don't want a response, this is probably the
* wrong system (you could just send a message). If you want complex customisation of how requests/responses
* are handled, this is probably the wrong system.
*/
// TODO remove the nodeLegalName parameter once the webserver doesn't need special privileges
abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, val nodeLegalName: X500Name) {
// Throw an exception if there are overloaded methods
private val methodTable = ops.javaClass.declaredMethods.groupBy { it.name }.mapValues { it.value.single() }
private val queueToSubscription = HashMultimap.create<String, Subscription>()
private val handleCounter = AtomicInteger()
// Created afresh for every RPC that is annotated as returning observables. Every time an observable is
// encountered either in the RPC response or in an object graph that is being emitted by one of those
// observables, the handle counter is incremented and the server-side observable is subscribed to. The
// materialized observations are then sent to the queue the client created where they can be picked up.
//
// When the observables are deserialised on the client side, the handle is read from the byte stream and
// the queue is filtered to extract just those observations.
class ObservableSerializer : Serializer<Observable<Any>>() {
private fun toQName(kryo: Kryo): String = kryo.context[RPCKryoQNameKey] as String
private fun toDispatcher(kryo: Kryo): RPCDispatcher = kryo.context[RPCKryoDispatcherKey] as RPCDispatcher
override fun read(kryo: Kryo, input: Input, type: Class<Observable<Any>>): Observable<Any> {
throw UnsupportedOperationException("not implemented")
}
override fun write(kryo: Kryo, output: Output, obj: Observable<Any>) {
val qName = toQName(kryo)
val dispatcher = toDispatcher(kryo)
val handle = dispatcher.handleCounter.andIncrement
output.writeInt(handle, true)
// Observables can do three kinds of callback: "next" with a content object, "completed" and "error".
// Materializing the observable converts these three kinds of callback into a single stream of objects
// representing what happened, which is useful for us to send over the wire.
val subscription = obj.materialize().subscribe { materialised: Notification<out Any> ->
val newKryo = createRPCKryoForSerialization(qName, dispatcher)
val bits = try {
MarshalledObservation(handle, materialised).serialize(newKryo)
} finally {
releaseRPCKryoForSerialization(newKryo)
}
rpcLog.debug("RPC sending observation: $materialised")
dispatcher.send(bits, qName)
}
synchronized(dispatcher.queueToSubscription) {
dispatcher.queueToSubscription.put(qName, subscription)
}
}
}
fun dispatch(msg: ClientRPCRequestMessage) {
val (argsBytes, replyTo, observationsTo, methodName) = msg
val response: ErrorOr<Any> = ErrorOr.catch {
val method = methodTable[methodName] ?: throw RPCException("Received RPC for unknown method $methodName - possible client/server version skew?")
if (method.isAnnotationPresent(RPCReturnsObservables::class.java) && observationsTo == null)
throw RPCException("Received RPC without any destination for observations, but the RPC returns observables")
val kryo = createRPCKryoForSerialization(observationsTo, this)
val args = try {
argsBytes.deserialize(kryo)
} finally {
releaseRPCKryoForSerialization(kryo)
}
rpcLog.debug { "-> RPC -> $methodName(${args.joinToString()}) [reply to $replyTo]" }
try {
method.invoke(ops, *args)
} catch (e: InvocationTargetException) {
throw e.cause!!
}
}
rpcLog.debug { "<- RPC <- $methodName = $response " }
// Serialise, or send back a simple serialised ErrorOr structure if we couldn't do it.
val kryo = createRPCKryoForSerialization(observationsTo, this)
val responseBits = try {
response.serialize(kryo)
} catch (e: KryoException) {
rpcLog.error("Failed to respond to inbound RPC $methodName", e)
ErrorOr.of(e).serialize(kryo)
} finally {
releaseRPCKryoForSerialization(kryo)
}
send(responseBits, replyTo)
}
abstract fun send(data: SerializedBytes<*>, toAddress: String)
fun start(rpcConsumer: ClientConsumer, rpcNotificationConsumer: ClientConsumer?, onExecutor: AffinityExecutor) {
rpcNotificationConsumer?.setMessageHandler { msg ->
val qName = msg.getStringProperty("_AMQ_RoutingName")
val subscriptions = synchronized(queueToSubscription) {
queueToSubscription.removeAll(qName)
}
if (subscriptions.isNotEmpty()) {
rpcLog.debug("Observable queue was deleted, unsubscribing: $qName")
subscriptions.forEach { it.unsubscribe() }
}
}
rpcConsumer.setMessageHandler { msg ->
msg.acknowledge()
// All RPCs run on the main server thread, in order to avoid running concurrently with
// potentially state changing requests from other nodes and each other. If we need to
// give better latency to client RPCs in future we could use an executor that supports
// job priorities.
onExecutor.execute {
try {
val rpcMessage = msg.toRPCRequestMessage()
CURRENT_RPC_USER.set(rpcMessage.user)
dispatch(rpcMessage)
} catch(e: RPCException) {
rpcLog.warn("Received malformed client RPC message: ${e.message}")
rpcLog.trace("RPC exception", e)
} catch(e: Throwable) {
rpcLog.error("Uncaught exception when dispatching client RPC", e)
} finally {
CURRENT_RPC_USER.remove()
}
}
}
}
private fun ClientMessage.requiredString(name: String): String {
return getStringProperty(name) ?: throw RPCException("missing $name property")
}
/** Convert an Artemis [ClientMessage] to a MQ-neutral [ClientRPCRequestMessage]. */
private fun ClientMessage.toRPCRequestMessage(): ClientRPCRequestMessage {
val user = getUser(this)
val replyTo = getReturnAddress(user, ClientRPCRequestMessage.REPLY_TO, true)!!
val observationsTo = getReturnAddress(user, ClientRPCRequestMessage.OBSERVATIONS_TO, false)
val argBytes = ByteArray(bodySize).apply { bodyBuffer.readBytes(this) }
if (argBytes.isEmpty()) {
throw RPCException("empty serialized args")
}
val methodName = requiredString(ClientRPCRequestMessage.METHOD_NAME)
return ClientRPCRequestMessage(SerializedBytes(argBytes), replyTo, observationsTo, methodName, user)
}
// TODO remove this User once webserver doesn't need it
private val nodeUser = User(NODE_USER, NODE_USER, setOf())
@VisibleForTesting
protected open fun getUser(message: ClientMessage): User {
val validatedUser = message.requiredString(Message.HDR_VALIDATED_USER.toString())
val rpcUser = userService.getUser(validatedUser)
if (rpcUser != null) {
return rpcUser
} else {
try {
if (X500Name(validatedUser) == nodeLegalName) {
return nodeUser
}
} catch (ex: IllegalArgumentException) {
// Just means the two can't be compared, treat as no match
}
throw IllegalArgumentException("Validated user '$validatedUser' is not an RPC user nor the NODE user")
}
}
private fun ClientMessage.getReturnAddress(user: User, property: String, required: Boolean): String? {
return if (containsProperty(property)) {
"${ArtemisMessagingComponent.CLIENTS_PREFIX}${user.username}.rpc.${getLongProperty(property)}"
} else {
if (required) throw RPCException("missing $property property") else null
}
}
}
private val rpcSerKryoPool = KryoPool.Builder { RPCKryo(RPCDispatcher.ObservableSerializer()) }.build()
fun createRPCKryoForSerialization(qName: String? = null, dispatcher: RPCDispatcher? = null): Kryo {
val kryo = rpcSerKryoPool.borrow()
kryo.context.put(RPCKryoQNameKey, qName)
kryo.context.put(RPCKryoDispatcherKey, dispatcher)
return kryo
}
fun releaseRPCKryoForSerialization(kryo: Kryo) {
rpcSerKryoPool.release(kryo)
}

View File

@ -0,0 +1,346 @@
package net.corda.node.services.messaging
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import com.esotericsoftware.kryo.pool.KryoPool
import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.google.common.cache.RemovalListener
import com.google.common.collect.HashMultimap
import com.google.common.collect.Multimaps
import com.google.common.collect.SetMultimap
import com.google.common.util.concurrent.ThreadFactoryBuilder
import net.corda.core.ErrorOr
import net.corda.core.crypto.commonName
import net.corda.core.messaging.RPCOps
import net.corda.core.random63BitValue
import net.corda.core.serialization.KryoPoolWithContext
import net.corda.core.utilities.LazyStickyPool
import net.corda.core.utilities.debug
import net.corda.core.utilities.loggerFor
import net.corda.node.services.RPCUserService
import net.corda.nodeapi.*
import net.corda.nodeapi.ArtemisMessagingComponent.Companion.NODE_USER
import org.apache.activemq.artemis.api.core.Message
import org.apache.activemq.artemis.api.core.SimpleString
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
import org.apache.activemq.artemis.api.core.client.ClientConsumer
import org.apache.activemq.artemis.api.core.client.ClientMessage
import org.apache.activemq.artemis.api.core.client.ServerLocator
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
import org.apache.activemq.artemis.api.core.management.ManagementHelper
import org.bouncycastle.asn1.x500.X500Name
import rx.Notification
import rx.Observable
import rx.Subscriber
import rx.Subscription
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledFuture
import java.util.concurrent.TimeUnit
data class RPCServerConfiguration(
/** The number of threads to use for handling RPC requests */
val rpcThreadPoolSize: Int,
/** The number of consumers to handle incoming messages */
val consumerPoolSize: Int,
/** The maximum number of producers to create to handle outgoing messages */
val producerPoolBound: Int,
/** The interval of subscription reaping in milliseconds */
val reapIntervalMs: Long
) {
companion object {
val default = RPCServerConfiguration(
rpcThreadPoolSize = 4,
consumerPoolSize = 2,
producerPoolBound = 4,
reapIntervalMs = 1000
)
}
}
/**
* The [RPCServer] implements the complement of [RPCClient]. When an RPC request arrives it dispatches to the
* corresponding function in [ops]. During serialisation of the reply (and later observations) the server subscribes to
* each Observable it encounters and captures the client address to associate with these Observables. Later it uses this
* address to forward observations arriving on the Observables.
*
* The way this is done is similar to that in [RPCClient], we use Kryo and add a context to stores the subscription map.
*/
class RPCServer(
private val ops: RPCOps,
private val rpcServerUsername: String,
private val rpcServerPassword: String,
private val serverLocator: ServerLocator,
private val userService: RPCUserService,
private val nodeLegalName: X500Name,
private val rpcConfiguration: RPCServerConfiguration = RPCServerConfiguration.default
) {
private companion object {
val log = loggerFor<RPCServer>()
val kryoPool = KryoPool.Builder { RPCKryo(RpcServerObservableSerializer) }.build()
}
// The methodname->Method map to use for dispatching.
private val methodTable = ops.javaClass.declaredMethods.groupBy { it.name }.mapValues { it.value.single() }
// The observable subscription mapping.
private val observableMap = createObservableSubscriptionMap()
// A mapping from client addresses to IDs of associated Observables
private val clientAddressToObservables = Multimaps.synchronizedSetMultimap(HashMultimap.create<SimpleString, RPCApi.ObservableId>())
// The scheduled reaper handle.
private lateinit var reaperScheduledFuture: ScheduledFuture<*>
private val observationSendExecutor = Executors.newFixedThreadPool(
1,
ThreadFactoryBuilder().setNameFormat("rpc-observation-sender-%d").build()
)
private val rpcExecutor = Executors.newScheduledThreadPool(
rpcConfiguration.rpcThreadPoolSize,
ThreadFactoryBuilder().setNameFormat("rpc-server-handler-pool-%d").build()
)
private val reaperExecutor = Executors.newScheduledThreadPool(
1,
ThreadFactoryBuilder().setNameFormat("rpc-server-reaper-%d").build()
)
private val sessionAndConsumers = ArrayList<ArtemisConsumer>(rpcConfiguration.consumerPoolSize)
private val sessionAndProducerPool = LazyStickyPool(rpcConfiguration.producerPoolBound) {
val sessionFactory = serverLocator.createSessionFactory()
val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
session.start()
ArtemisProducer(sessionFactory, session, session.createProducer())
}
private lateinit var clientBindingRemovalConsumer: ClientConsumer
private lateinit var serverControl: ActiveMQServerControl
private fun createObservableSubscriptionMap(): ObservableSubscriptionMap {
val onObservableRemove = RemovalListener<RPCApi.ObservableId, ObservableSubscription> {
log.debug { "Unsubscribing from Observable with id ${it.key} because of ${it.cause}" }
it.value.subscription.unsubscribe()
}
return CacheBuilder.newBuilder().removalListener(onObservableRemove).build()
}
fun start(activeMqServerControl: ActiveMQServerControl) {
log.info("Starting RPC server with configuration $rpcConfiguration")
reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate(
this::reapSubscriptions,
rpcConfiguration.reapIntervalMs,
rpcConfiguration.reapIntervalMs,
TimeUnit.MILLISECONDS
)
for (i in 1 .. rpcConfiguration.consumerPoolSize) {
val sessionFactory = serverLocator.createSessionFactory()
val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME)
consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler)
session.start()
sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer))
}
clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS)
clientBindingRemovalConsumer.setMessageHandler(this::bindingRemovalArtemisMessageHandler)
serverControl = activeMqServerControl
}
fun close() {
reaperScheduledFuture.cancel(false)
rpcExecutor.shutdownNow()
reaperExecutor.shutdownNow()
rpcExecutor.awaitTermination(500, TimeUnit.MILLISECONDS)
reaperExecutor.awaitTermination(500, TimeUnit.MILLISECONDS)
sessionAndConsumers.forEach {
it.consumer.close()
it.session.close()
it.sessionFactory.close()
}
observableMap.invalidateAll()
reapSubscriptions()
sessionAndProducerPool.close().forEach {
it.producer.close()
it.session.close()
it.sessionFactory.close()
}
}
private fun bindingRemovalArtemisMessageHandler(artemisMessage: ClientMessage) {
val notificationType = artemisMessage.getStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE)
require(notificationType == CoreNotificationType.BINDING_REMOVED.name)
val clientAddress = artemisMessage.getStringProperty(ManagementHelper.HDR_ROUTING_NAME)
log.warn("Detected RPC client disconnect on address $clientAddress, scheduling for reaping")
invalidateClient(SimpleString(clientAddress))
}
// Note that this function operates on the *current* view of client observables. During invalidation further
// Observables may be serialised and thus registered.
private fun invalidateClient(clientAddress: SimpleString) {
val observableIds = clientAddressToObservables.removeAll(clientAddress)
observableMap.invalidateAll(observableIds)
}
private fun clientArtemisMessageHandler(artemisMessage: ClientMessage) {
val clientToServer = RPCApi.ClientToServer.fromClientMessage(kryoPool, artemisMessage)
log.debug { "-> RPC -> $clientToServer" }
when (clientToServer) {
is RPCApi.ClientToServer.RpcRequest -> {
val rpcContext = RpcContext(
currentUser = getUser(artemisMessage)
)
rpcExecutor.submit {
val result = ErrorOr.catch {
try {
CURRENT_RPC_CONTEXT.set(rpcContext)
log.debug { "Calling ${clientToServer.methodName}" }
val method = methodTable[clientToServer.methodName] ?:
throw RPCException("Received RPC for unknown method ${clientToServer.methodName} - possible client/server version skew?")
method.invoke(ops, *clientToServer.arguments.toTypedArray())
} finally {
CURRENT_RPC_CONTEXT.remove()
}
}
val resultWithExceptionUnwrapped = result.mapError {
if (it is InvocationTargetException) {
it.cause ?: RPCException("Caught InvocationTargetException without cause")
} else {
it
}
}
val reply = RPCApi.ServerToClient.RpcReply(
id = clientToServer.id,
result = resultWithExceptionUnwrapped
)
val observableContext = ObservableContext(
clientToServer.id,
observableMap,
clientAddressToObservables,
clientToServer.clientAddress,
serverControl,
sessionAndProducerPool,
observationSendExecutor,
kryoPool
)
observableContext.sendMessage(reply)
}
}
is RPCApi.ClientToServer.ObservablesClosed -> {
observableMap.invalidateAll(clientToServer.ids)
}
}
artemisMessage.acknowledge()
}
private fun reapSubscriptions() {
observableMap.cleanUp()
}
// TODO remove this User once webserver doesn't need it
private val nodeUser = User(NODE_USER, NODE_USER, setOf())
private fun getUser(message: ClientMessage): User {
val validatedUser = message.getStringProperty(Message.HDR_VALIDATED_USER) ?: throw IllegalArgumentException("Missing validated user from the Artemis message")
val rpcUser = userService.getUser(validatedUser)
if (rpcUser != null) {
return rpcUser
} else if (X500Name(validatedUser) == nodeLegalName) {
return nodeUser
} else {
throw IllegalArgumentException("Validated user '$validatedUser' is not an RPC user nor the NODE user")
}
}
}
@JvmField
internal val CURRENT_RPC_CONTEXT: ThreadLocal<RpcContext> = ThreadLocal()
fun getRpcContext(): RpcContext = CURRENT_RPC_CONTEXT.get()
/**
* @param currentUser This is available to RPC implementations to query the validated [User] that is calling it. Each
* user has a set of permissions they're entitled to which can be used to control access.
*/
data class RpcContext(
val currentUser: User
)
class ObservableSubscription(
val subscription: Subscription
)
typealias ObservableSubscriptionMap = Cache<RPCApi.ObservableId, ObservableSubscription>
// We construct an observable context on each RPC request. If subsequently a nested Observable is
// encountered this same context is propagated by the instrumented KryoPool. This way all
// observations rooted in a single RPC will be muxed correctly. Note that the context construction
// itself is quite cheap.
class ObservableContext(
val rpcRequestId: RPCApi.RpcRequestId,
val observableMap: ObservableSubscriptionMap,
val clientAddressToObservables: SetMultimap<SimpleString, RPCApi.ObservableId>,
val clientAddress: SimpleString,
val serverControl: ActiveMQServerControl,
val sessionAndProducerPool: LazyStickyPool<ArtemisProducer>,
val observationSendExecutor: ExecutorService,
kryoPool: KryoPool
) {
private companion object {
val log = loggerFor<ObservableContext>()
}
private val kryoPoolWithObservableContext = RpcServerObservableSerializer.createPoolWithContext(kryoPool, this)
fun sendMessage(serverToClient: RPCApi.ServerToClient) {
try {
sessionAndProducerPool.run(rpcRequestId) {
val artemisMessage = it.session.createMessage(false)
serverToClient.writeToClientMessage(kryoPoolWithObservableContext, artemisMessage)
it.producer.send(clientAddress, artemisMessage)
log.debug("<- RPC <- $serverToClient")
}
} catch (throwable: Throwable) {
log.error("Failed to send message, kicking client. Message was $serverToClient", throwable)
serverControl.closeConsumerConnectionsForAddress(clientAddress.toString())
}
}
}
private object RpcServerObservableSerializer : Serializer<Observable<Any>>() {
private object RpcObservableContextKey
private val log = loggerFor<RpcServerObservableSerializer>()
fun createPoolWithContext(kryoPool: KryoPool, observableContext: ObservableContext): KryoPool {
return KryoPoolWithContext(kryoPool, RpcObservableContextKey, observableContext)
}
override fun read(kryo: Kryo?, input: Input?, type: Class<Observable<Any>>?): Observable<Any> {
throw UnsupportedOperationException()
}
override fun write(kryo: Kryo, output: Output, observable: Observable<Any>) {
val observableId = RPCApi.ObservableId(random63BitValue())
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
output.writeLong(observableId.toLong, true)
val observableWithSubscription = ObservableSubscription(
// We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing
// must be done again within the subscriber
subscription = observable.materialize().subscribe(
object : Subscriber<Notification<Any>>() {
override fun onNext(observation: Notification<Any>) {
if (!isUnsubscribed) {
observableContext.observationSendExecutor.submit {
observableContext.sendMessage(RPCApi.ServerToClient.Observation(observableId, observation))
}
}
}
override fun onError(exception: Throwable) {
log.error("onError called in materialize()d RPC Observable", exception)
}
override fun onCompleted() {
}
}
)
)
observableContext.clientAddressToObservables.put(observableContext.clientAddress, observableId)
observableContext.observableMap.put(observableId, observableWithSubscription)
}
}

View File

@ -3,13 +3,11 @@
package net.corda.node.services.messaging
import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.CURRENT_RPC_USER
import net.corda.nodeapi.PermissionException
/** Helper method which checks that the current RPC user is entitled for the given permission. Throws a [PermissionException] otherwise. */
fun requirePermission(permission: String) {
fun RpcContext.requirePermission(permission: String) {
// TODO remove the NODE_USER condition once webserver doesn't need it
val currentUser = CURRENT_RPC_USER.get()
val currentUserPermissions = currentUser.permissions
if (currentUser.username != ArtemisMessagingComponent.NODE_USER && currentUserPermissions.intersect(listOf(permission, "ALL")).isEmpty()) {
throw PermissionException("User not permissioned for $permission, permissions are $currentUserPermissions")

View File

@ -19,10 +19,11 @@ import net.corda.jackson.JacksonSupport
import net.corda.jackson.StringToMethodCallParser
import net.corda.node.internal.Node
import net.corda.node.printBasicNodeInfo
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcContext
import net.corda.node.services.statemachine.FlowStateMachineImpl
import net.corda.node.utilities.ANSIProgressRenderer
import net.corda.nodeapi.ArtemisMessagingComponent
import net.corda.nodeapi.CURRENT_RPC_USER
import net.corda.nodeapi.User
import org.crsh.command.InvocationContext
import org.crsh.console.jline.JLineProcessor
@ -120,7 +121,7 @@ object InteractiveShell {
InterruptHandler { jlineProcessor.interrupt() }.install()
thread(name = "Command line shell processor", isDaemon = true) {
// Give whoever has local shell access administrator access to the node.
CURRENT_RPC_USER.set(User(ArtemisMessagingComponent.NODE_USER, "", setOf()))
CURRENT_RPC_CONTEXT.set(RpcContext(User(ArtemisMessagingComponent.NODE_USER, "", setOf())))
Emoji.renderIfSupported {
jlineProcessor.run()
}

View File

@ -15,11 +15,12 @@ import net.corda.core.transactions.SignedTransaction
import net.corda.flows.CashIssueFlow
import net.corda.flows.CashPaymentFlow
import net.corda.node.internal.CordaRPCOpsImpl
import net.corda.node.services.messaging.CURRENT_RPC_CONTEXT
import net.corda.node.services.messaging.RpcContext
import net.corda.node.services.network.NetworkMapService
import net.corda.node.services.startFlowPermission
import net.corda.node.services.transactions.SimpleNotaryService
import net.corda.node.utilities.transaction
import net.corda.nodeapi.CURRENT_RPC_USER
import net.corda.nodeapi.PermissionException
import net.corda.nodeapi.User
import net.corda.testing.expect
@ -57,10 +58,10 @@ class CordaRPCOpsImplTest {
aliceNode = network.createNode(networkMapAddress = networkMap.info.address)
notaryNode = network.createNode(advertisedServices = ServiceInfo(SimpleNotaryService.type), networkMapAddress = networkMap.info.address)
rpc = CordaRPCOpsImpl(aliceNode.services, aliceNode.smm, aliceNode.database)
CURRENT_RPC_USER.set(User("user", "pwd", permissions = setOf(
CURRENT_RPC_CONTEXT.set(RpcContext(User("user", "pwd", permissions = setOf(
startFlowPermission<CashIssueFlow>(),
startFlowPermission<CashPaymentFlow>()
)))
))))
aliceNode.database.transaction {
stateMachineUpdates = rpc.stateMachinesAndUpdates().second
@ -194,7 +195,7 @@ class CordaRPCOpsImplTest {
@Test
fun `cash command by user not permissioned for cash`() {
CURRENT_RPC_USER.set(User("user", "pwd", permissions = emptySet()))
CURRENT_RPC_CONTEXT.set(RpcContext(User("user", "pwd", permissions = emptySet())))
assertThatExceptionOfType(PermissionException::class.java).isThrownBy {
rpc.startFlow(::CashIssueFlow,
Amount(100, USD),

View File

@ -214,7 +214,7 @@ class ArtemisMessagingTests {
receivedMessages.add(message)
}
// Run after the handlers are added, otherwise (some of) the messages get delivered and discarded / dead-lettered.
thread { messagingClient.run() }
thread { messagingClient.run(messagingServer!!.serverControl) }
return messagingClient
}