mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
Merge pull request #2477 from corda/aslemmer-corda/issues/2300
Add RPC deduplication to client and server
This commit is contained in:
@ -18,9 +18,7 @@ import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.context.Trace.InvocationId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.LazyStickyPool
|
||||
import net.corda.core.internal.LifeCycle
|
||||
import net.corda.core.internal.join
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
|
||||
@ -29,14 +27,17 @@ import net.corda.core.utilities.*
|
||||
import net.corda.node.internal.security.AuthorizingSubject
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.logging.pushToLoggingContext
|
||||
import net.corda.nodeapi.*
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.externalTrace
|
||||
import net.corda.nodeapi.impersonatedActor
|
||||
import net.corda.nodeapi.internal.DeduplicationChecker
|
||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||
import net.corda.nodeapi.internal.persistence.contextDatabase
|
||||
import net.corda.nodeapi.internal.persistence.contextDatabaseOrNull
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper
|
||||
@ -49,24 +50,23 @@ import rx.Subscription
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
import java.util.concurrent.*
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
data class RPCServerConfiguration(
|
||||
/** The number of threads to use for handling RPC requests */
|
||||
val rpcThreadPoolSize: Int,
|
||||
/** The number of consumers to handle incoming messages */
|
||||
val consumerPoolSize: Int,
|
||||
/** The maximum number of producers to create to handle outgoing messages */
|
||||
val producerPoolBound: Int,
|
||||
/** The interval of subscription reaping */
|
||||
val reapInterval: Duration
|
||||
val reapInterval: Duration,
|
||||
/** The cache expiry of a deduplication watermark per client. */
|
||||
val deduplicationCacheExpiry: Duration
|
||||
) {
|
||||
companion object {
|
||||
val default = RPCServerConfiguration(
|
||||
rpcThreadPoolSize = 4,
|
||||
consumerPoolSize = 2,
|
||||
producerPoolBound = 4,
|
||||
reapInterval = 1.seconds
|
||||
reapInterval = 1.seconds,
|
||||
deduplicationCacheExpiry = 1.days
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -115,22 +115,24 @@ class RPCServer(
|
||||
/** The scheduled reaper handle. */
|
||||
private var reaperScheduledFuture: ScheduledFuture<*>? = null
|
||||
|
||||
private var observationSendExecutor: ExecutorService? = null
|
||||
private var senderThread: Thread? = null
|
||||
private var rpcExecutor: ScheduledExecutorService? = null
|
||||
private var reaperExecutor: ScheduledExecutorService? = null
|
||||
|
||||
private val sessionAndConsumers = ArrayList<ArtemisConsumer>(rpcConfiguration.consumerPoolSize)
|
||||
private val sessionAndProducerPool = LazyStickyPool(rpcConfiguration.producerPoolBound) {
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
session.start()
|
||||
ArtemisProducer(sessionFactory, session, session.createProducer())
|
||||
}
|
||||
private var sessionFactory: ClientSessionFactory? = null
|
||||
private var producerSession: ClientSession? = null
|
||||
private var consumerSession: ClientSession? = null
|
||||
private var rpcProducer: ClientProducer? = null
|
||||
private var rpcConsumer: ClientConsumer? = null
|
||||
private var clientBindingRemovalConsumer: ClientConsumer? = null
|
||||
private var clientBindingAdditionConsumer: ClientConsumer? = null
|
||||
private var serverControl: ActiveMQServerControl? = null
|
||||
|
||||
private val responseMessageBuffer = ConcurrentHashMap<SimpleString, BufferOrNone>()
|
||||
private val sendJobQueue = LinkedBlockingQueue<RpcSendJob>()
|
||||
|
||||
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
|
||||
private var deduplicationIdentity: String? = null
|
||||
|
||||
init {
|
||||
val groupedMethods = ops.javaClass.declaredMethods.groupBy { it.name }
|
||||
@ -154,16 +156,12 @@ class RPCServer(
|
||||
try {
|
||||
lifeCycle.requireState(State.UNSTARTED)
|
||||
log.info("Starting RPC server with configuration $rpcConfiguration")
|
||||
observationSendExecutor = Executors.newFixedThreadPool(
|
||||
1,
|
||||
ThreadFactoryBuilder().setNameFormat("rpc-observation-sender-%d").build()
|
||||
)
|
||||
senderThread = startSenderThread()
|
||||
rpcExecutor = Executors.newScheduledThreadPool(
|
||||
rpcConfiguration.rpcThreadPoolSize,
|
||||
ThreadFactoryBuilder().setNameFormat("rpc-server-handler-pool-%d").build()
|
||||
)
|
||||
reaperExecutor = Executors.newScheduledThreadPool(
|
||||
1,
|
||||
reaperExecutor = Executors.newSingleThreadScheduledExecutor(
|
||||
ThreadFactoryBuilder().setNameFormat("rpc-server-reaper-%d").build()
|
||||
)
|
||||
reaperScheduledFuture = reaperExecutor!!.scheduleAtFixedRate(
|
||||
@ -172,55 +170,85 @@ class RPCServer(
|
||||
rpcConfiguration.reapInterval.toMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
)
|
||||
val sessions = createConsumerSessions()
|
||||
createNotificationConsumers()
|
||||
|
||||
sessionFactory = serverLocator.createSessionFactory()
|
||||
producerSession = sessionFactory!!.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
createRpcProducer(producerSession!!)
|
||||
consumerSession = sessionFactory!!.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
createRpcConsumer(consumerSession!!)
|
||||
createNotificationConsumers(consumerSession!!)
|
||||
serverControl = activeMqServerControl
|
||||
deduplicationIdentity = UUID.randomUUID().toString()
|
||||
lifeCycle.transition(State.UNSTARTED, State.STARTED)
|
||||
// We delay the consumer session start because Artemis starts delivering messages immediately, so we need to be
|
||||
// fully initialised.
|
||||
sessions.forEach {
|
||||
it.start()
|
||||
}
|
||||
producerSession!!.start()
|
||||
consumerSession!!.start()
|
||||
} catch (exception: Throwable) {
|
||||
close()
|
||||
throw exception
|
||||
}
|
||||
}
|
||||
|
||||
private fun createConsumerSessions(): ArrayList<ClientSession> {
|
||||
val sessions = ArrayList<ClientSession>()
|
||||
for (i in 1..rpcConfiguration.consumerPoolSize) {
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler)
|
||||
sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer))
|
||||
sessions.add(session)
|
||||
}
|
||||
return sessions
|
||||
private fun createRpcProducer(producerSession: ClientSession) {
|
||||
rpcProducer = producerSession.createProducer()
|
||||
}
|
||||
|
||||
private fun createNotificationConsumers() {
|
||||
clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS)
|
||||
private fun createRpcConsumer(consumerSession: ClientSession) {
|
||||
rpcConsumer = consumerSession.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
rpcConsumer!!.setMessageHandler(this::clientArtemisMessageHandler)
|
||||
}
|
||||
|
||||
private fun createNotificationConsumers(consumerSession: ClientSession) {
|
||||
clientBindingRemovalConsumer = consumerSession.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS)
|
||||
clientBindingRemovalConsumer!!.setMessageHandler(this::bindingRemovalArtemisMessageHandler)
|
||||
clientBindingAdditionConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_ADDITIONS)
|
||||
clientBindingAdditionConsumer = consumerSession.createConsumer(RPCApi.RPC_CLIENT_BINDING_ADDITIONS)
|
||||
clientBindingAdditionConsumer!!.setMessageHandler(this::bindingAdditionArtemisMessageHandler)
|
||||
}
|
||||
|
||||
private fun startSenderThread(): Thread {
|
||||
return thread(name = "rpc-server-sender", isDaemon = true) {
|
||||
var deduplicationSequenceNumber = 0L
|
||||
while (true) {
|
||||
val job = sendJobQueue.poll()
|
||||
when (job) {
|
||||
is RpcSendJob.Send -> handleSendJob(deduplicationSequenceNumber++, job)
|
||||
RpcSendJob.Stop -> return@thread
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleSendJob(sequenceNumber: Long, job: RpcSendJob.Send) {
|
||||
try {
|
||||
val artemisMessage = producerSession!!.createMessage(false)
|
||||
if (job.database != null) {
|
||||
contextDatabase = job.database
|
||||
}
|
||||
// We must do the serialisation here as any encountered Observables may already have events, which would
|
||||
// trigger more sends. We must make sure that the root of the Observables (e.g. the RPC reply) is sent
|
||||
// before any child observations.
|
||||
job.message.writeToClientMessage(job.serializationContext, artemisMessage)
|
||||
artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, sequenceNumber)
|
||||
rpcProducer!!.send(job.clientAddress, artemisMessage)
|
||||
log.debug { "<- RPC <- ${job.message}" }
|
||||
} catch (throwable: Throwable) {
|
||||
log.error("Failed to send message, kicking client. Message was ${job.message}", throwable)
|
||||
serverControl!!.closeConsumerConnectionsForAddress(job.clientAddress.toString())
|
||||
invalidateClient(job.clientAddress)
|
||||
}
|
||||
}
|
||||
|
||||
fun close() {
|
||||
observationSendExecutor?.join()
|
||||
sendJobQueue.put(RpcSendJob.Stop)
|
||||
senderThread?.join()
|
||||
reaperScheduledFuture?.cancel(false)
|
||||
rpcExecutor?.shutdownNow()
|
||||
reaperExecutor?.shutdownNow()
|
||||
securityManager.close()
|
||||
sessionAndConsumers.forEach {
|
||||
it.sessionFactory.close()
|
||||
}
|
||||
sessionFactory?.close()
|
||||
observableMap.invalidateAll()
|
||||
reapSubscriptions()
|
||||
sessionAndProducerPool.close().forEach {
|
||||
it.sessionFactory.close()
|
||||
}
|
||||
lifeCycle.justTransition(State.FINISHED)
|
||||
}
|
||||
|
||||
@ -271,32 +299,44 @@ class RPCServer(
|
||||
lifeCycle.requireState(State.STARTED)
|
||||
val clientToServer = RPCApi.ClientToServer.fromClientMessage(artemisMessage)
|
||||
log.debug { "-> RPC -> $clientToServer" }
|
||||
when (clientToServer) {
|
||||
is RPCApi.ClientToServer.RpcRequest -> {
|
||||
val arguments = Try.on {
|
||||
clientToServer.serialisedArguments.deserialize<List<Any?>>(context = RPC_SERVER_CONTEXT)
|
||||
}
|
||||
val context = artemisMessage.context(clientToServer.sessionId)
|
||||
context.invocation.pushToLoggingContext()
|
||||
when (arguments) {
|
||||
is Try.Success -> {
|
||||
rpcExecutor!!.submit {
|
||||
val result = invokeRpc(context, clientToServer.methodName, arguments.value)
|
||||
sendReply(clientToServer.replyId, clientToServer.clientAddress, result)
|
||||
try {
|
||||
when (clientToServer) {
|
||||
is RPCApi.ClientToServer.RpcRequest -> {
|
||||
val deduplicationSequenceNumber = artemisMessage.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME)
|
||||
if (deduplicationChecker.checkDuplicateMessageId(
|
||||
identity = clientToServer.clientAddress,
|
||||
sequenceNumber = deduplicationSequenceNumber
|
||||
)) {
|
||||
log.info("Message duplication detected, discarding message")
|
||||
return
|
||||
}
|
||||
val arguments = Try.on {
|
||||
clientToServer.serialisedArguments.deserialize<List<Any?>>(context = RPC_SERVER_CONTEXT)
|
||||
}
|
||||
val context = artemisMessage.context(clientToServer.sessionId)
|
||||
context.invocation.pushToLoggingContext()
|
||||
when (arguments) {
|
||||
is Try.Success -> {
|
||||
log.info("SUBMITTING")
|
||||
rpcExecutor!!.submit {
|
||||
val result = invokeRpc(context, clientToServer.methodName, arguments.value)
|
||||
sendReply(clientToServer.replyId, clientToServer.clientAddress, result)
|
||||
}
|
||||
}
|
||||
is Try.Failure -> {
|
||||
// We failed to deserialise the arguments, route back the error
|
||||
log.warn("Inbound RPC failed", arguments.exception)
|
||||
sendReply(clientToServer.replyId, clientToServer.clientAddress, arguments)
|
||||
}
|
||||
}
|
||||
is Try.Failure -> {
|
||||
// We failed to deserialise the arguments, route back the error
|
||||
log.warn("Inbound RPC failed", arguments.exception)
|
||||
sendReply(clientToServer.replyId, clientToServer.clientAddress, arguments)
|
||||
}
|
||||
}
|
||||
is RPCApi.ClientToServer.ObservablesClosed -> {
|
||||
observableMap.invalidateAll(clientToServer.ids)
|
||||
}
|
||||
}
|
||||
is RPCApi.ClientToServer.ObservablesClosed -> {
|
||||
observableMap.invalidateAll(clientToServer.ids)
|
||||
}
|
||||
} finally {
|
||||
artemisMessage.acknowledge()
|
||||
}
|
||||
artemisMessage.acknowledge()
|
||||
}
|
||||
|
||||
private fun invokeRpc(context: RpcAuthContext, methodName: String, arguments: List<Any?>): Try<Any> {
|
||||
@ -316,15 +356,16 @@ class RPCServer(
|
||||
}
|
||||
|
||||
private fun sendReply(replyId: InvocationId, clientAddress: SimpleString, result: Try<Any>) {
|
||||
val reply = RPCApi.ServerToClient.RpcReply(replyId, result)
|
||||
val reply = RPCApi.ServerToClient.RpcReply(
|
||||
id = replyId,
|
||||
result = result,
|
||||
deduplicationIdentity = deduplicationIdentity!!
|
||||
)
|
||||
val observableContext = ObservableContext(
|
||||
replyId,
|
||||
observableMap,
|
||||
clientAddressToObservables,
|
||||
clientAddress,
|
||||
serverControl!!,
|
||||
sessionAndProducerPool,
|
||||
observationSendExecutor!!
|
||||
deduplicationIdentity!!,
|
||||
clientAddress
|
||||
)
|
||||
|
||||
val buffered = bufferIfQueueNotBound(clientAddress, reply, observableContext)
|
||||
@ -370,6 +411,36 @@ class RPCServer(
|
||||
val targetLegalIdentity = message.getStringProperty(RPCApi.RPC_TARGET_LEGAL_IDENTITY)?.let(CordaX500Name.Companion::parse) ?: nodeLegalName
|
||||
return Pair(Actor(Id(validatedUser), securityManager.id, targetLegalIdentity), securityManager.buildSubject(validatedUser))
|
||||
}
|
||||
|
||||
/*
|
||||
* We construct an observable context on each RPC request. If subsequently a nested Observable is encountered this
|
||||
* same context is propagated by the instrumented KryoPool. This way all observations rooted in a single RPC will be
|
||||
* muxed correctly. Note that the context construction itself is quite cheap.
|
||||
*/
|
||||
inner class ObservableContext(
|
||||
val observableMap: ObservableSubscriptionMap,
|
||||
val clientAddressToObservables: SetMultimap<SimpleString, InvocationId>,
|
||||
val deduplicationIdentity: String,
|
||||
val clientAddress: SimpleString
|
||||
) {
|
||||
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this)
|
||||
|
||||
fun sendMessage(serverToClient: RPCApi.ServerToClient) {
|
||||
sendJobQueue.put(RpcSendJob.Send(contextDatabaseOrNull, clientAddress, serializationContextWithObservableContext, serverToClient))
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RpcSendJob {
|
||||
data class Send(
|
||||
// TODO HACK this is because during serialisation we subscribe to observables that may use
|
||||
// DatabaseTransactionWrappingSubscriber which tries to access the current database,
|
||||
val database: CordaPersistence?,
|
||||
val clientAddress: SimpleString,
|
||||
val serializationContext: SerializationContext,
|
||||
val message: RPCApi.ServerToClient
|
||||
) : RpcSendJob()
|
||||
object Stop : RpcSendJob()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO replace this by creating a new CordaRPCImpl for each request, passing the context, after we fix Shell and WebServer
|
||||
@ -417,45 +488,11 @@ class ObservableSubscription(
|
||||
|
||||
typealias ObservableSubscriptionMap = Cache<InvocationId, ObservableSubscription>
|
||||
|
||||
// We construct an observable context on each RPC request. If subsequently a nested Observable is
|
||||
// encountered this same context is propagated by the instrumented KryoPool. This way all
|
||||
// observations rooted in a single RPC will be muxed correctly. Note that the context construction
|
||||
// itself is quite cheap.
|
||||
class ObservableContext(
|
||||
val invocationId: InvocationId,
|
||||
val observableMap: ObservableSubscriptionMap,
|
||||
val clientAddressToObservables: SetMultimap<SimpleString, InvocationId>,
|
||||
val clientAddress: SimpleString,
|
||||
val serverControl: ActiveMQServerControl,
|
||||
val sessionAndProducerPool: LazyStickyPool<ArtemisProducer>,
|
||||
val observationSendExecutor: ExecutorService
|
||||
) {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this)
|
||||
|
||||
fun sendMessage(serverToClient: RPCApi.ServerToClient) {
|
||||
try {
|
||||
sessionAndProducerPool.run(invocationId) {
|
||||
val artemisMessage = it.session.createMessage(false)
|
||||
serverToClient.writeToClientMessage(serializationContextWithObservableContext, artemisMessage)
|
||||
it.producer.send(clientAddress, artemisMessage)
|
||||
log.debug("<- RPC <- $serverToClient")
|
||||
}
|
||||
} catch (throwable: Throwable) {
|
||||
log.error("Failed to send message, kicking client. Message was $serverToClient", throwable)
|
||||
serverControl.closeConsumerConnectionsForAddress(clientAddress.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
private object RpcObservableContextKey
|
||||
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
fun createContext(observableContext: ObservableContext): SerializationContext {
|
||||
fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext {
|
||||
return RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
|
||||
}
|
||||
|
||||
@ -465,7 +502,7 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
|
||||
val observableId = InvocationId.newInstance()
|
||||
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
|
||||
val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext
|
||||
output.writeInvocationId(observableId)
|
||||
val observableWithSubscription = ObservableSubscription(
|
||||
// We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing
|
||||
@ -474,9 +511,12 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
object : Subscriber<Notification<*>>() {
|
||||
override fun onNext(observation: Notification<*>) {
|
||||
if (!isUnsubscribed) {
|
||||
observableContext.observationSendExecutor.submit {
|
||||
observableContext.sendMessage(RPCApi.ServerToClient.Observation(observableId, observation))
|
||||
}
|
||||
val message = RPCApi.ServerToClient.Observation(
|
||||
id = observableId,
|
||||
content = observation,
|
||||
deduplicationIdentity = observableContext.deduplicationIdentity
|
||||
)
|
||||
observableContext.sendMessage(message)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user