mirror of
https://github.com/corda/corda.git
synced 2025-01-18 02:39:51 +00:00
Add RPC deduplication to client and server
This commit is contained in:
parent
8081ee42ad
commit
7924a5a834
3
.idea/compiler.xml
generated
3
.idea/compiler.xml
generated
@ -72,6 +72,7 @@
|
||||
<module name="irs-demo-web_test" target="1.8" />
|
||||
<module name="irs-demo_integrationTest" target="1.8" />
|
||||
<module name="irs-demo_main" target="1.8" />
|
||||
<module name="irs-demo_systemTest" target="1.8" />
|
||||
<module name="irs-demo_test" target="1.8" />
|
||||
<module name="isolated_main" target="1.8" />
|
||||
<module name="isolated_test" target="1.8" />
|
||||
@ -159,4 +160,4 @@
|
||||
<component name="JavacSettings">
|
||||
<option name="ADDITIONAL_OPTIONS_STRING" value="-parameters" />
|
||||
</component>
|
||||
</project>
|
||||
</project>
|
@ -19,6 +19,7 @@ import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.junit.After
|
||||
import org.junit.Assert.assertEquals
|
||||
import org.junit.Assert.assertTrue
|
||||
import org.junit.Ignore
|
||||
import org.junit.Rule
|
||||
import org.junit.Test
|
||||
import rx.Observable
|
||||
@ -127,7 +128,7 @@ class RPCStabilityTests {
|
||||
rpcDriver {
|
||||
fun startAndCloseServer(broker: RpcBrokerHandle) {
|
||||
startRpcServerWithBrokerRunning(
|
||||
configuration = RPCServerConfiguration.default.copy(consumerPoolSize = 1, producerPoolBound = 1),
|
||||
configuration = RPCServerConfiguration.default,
|
||||
ops = DummyOps,
|
||||
brokerHandle = broker
|
||||
).rpcServer.close()
|
||||
@ -148,7 +149,7 @@ class RPCStabilityTests {
|
||||
@Test
|
||||
fun `rpc client close doesnt leak broker resources`() {
|
||||
rpcDriver {
|
||||
val server = startRpcServer(configuration = RPCServerConfiguration.default.copy(consumerPoolSize = 1, producerPoolBound = 1), ops = DummyOps).get()
|
||||
val server = startRpcServer(configuration = RPCServerConfiguration.default, ops = DummyOps).get()
|
||||
RPCClient<RPCOps>(server.broker.hostAndPort!!).start(RPCOps::class.java, rpcTestUser.username, rpcTestUser.password).close()
|
||||
val initial = server.broker.getStats()
|
||||
repeat(100) {
|
||||
@ -337,11 +338,12 @@ class RPCStabilityTests {
|
||||
val request = RPCApi.ClientToServer.RpcRequest(
|
||||
clientAddress = SimpleString(myQueue),
|
||||
methodName = SlowConsumerRPCOps::streamAtInterval.name,
|
||||
serialisedArguments = listOf(10.millis, 123456).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT).bytes,
|
||||
serialisedArguments = listOf(10.millis, 123456).serialize(context = SerializationDefaults.RPC_SERVER_CONTEXT),
|
||||
replyId = Trace.InvocationId.newInstance(),
|
||||
sessionId = Trace.SessionId.newInstance()
|
||||
)
|
||||
request.writeToClientMessage(message)
|
||||
message.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, 0)
|
||||
producer.send(message)
|
||||
session.commit()
|
||||
|
||||
@ -350,6 +352,46 @@ class RPCStabilityTests {
|
||||
}
|
||||
}
|
||||
|
||||
interface StreamOps : RPCOps {
|
||||
fun stream(streamInterval: Duration): Observable<Long>
|
||||
}
|
||||
class StreamOpsImpl : StreamOps {
|
||||
override val protocolVersion = 0
|
||||
override fun stream(streamInterval: Duration): Observable<Long> {
|
||||
return Observable.interval(streamInterval.toNanos(), TimeUnit.NANOSECONDS)
|
||||
}
|
||||
}
|
||||
@Ignore("This is flaky as sometimes artemis delivers out of order messages after the kick")
|
||||
@Test
|
||||
fun `deduplication on the client side`() {
|
||||
rpcDriver {
|
||||
val server = startRpcServer(ops = StreamOpsImpl()).getOrThrow()
|
||||
val proxy = startRpcClient<StreamOps>(
|
||||
server.broker.hostAndPort!!,
|
||||
configuration = RPCClientConfiguration.default.copy(
|
||||
connectionRetryInterval = 1.days // switch off failover
|
||||
)
|
||||
).getOrThrow()
|
||||
// Find the internal address of the client
|
||||
val clientAddress = server.broker.serverControl.addressNames.find { it.startsWith(RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX) }
|
||||
val events = ArrayList<Long>()
|
||||
// Start streaming an incrementing value 2000 times per second from the server.
|
||||
val subscription = proxy.stream(streamInterval = Duration.ofNanos(500_000)).subscribe {
|
||||
events.add(it)
|
||||
}
|
||||
// These sleeps are *fine*, the invariant should hold regardless of any delays
|
||||
Thread.sleep(50)
|
||||
// Kick the client. This seems to trigger redelivery of (presumably non-acked) messages.
|
||||
server.broker.serverControl.closeConsumerConnectionsForAddress(clientAddress)
|
||||
Thread.sleep(50)
|
||||
subscription.unsubscribe()
|
||||
for (i in 0 until events.size) {
|
||||
require(events[i] == i.toLong()) {
|
||||
"Events not incremental, possible duplicate, ${events[i]} != ${i.toLong()}\nExpected: ${(0..i).toList()}\nGot : $events\n"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun RPCDriverDSL.pollUntilClientNumber(server: RpcServerHandle, expected: Int) {
|
||||
|
@ -42,8 +42,6 @@ data class RPCClientConfiguration(
|
||||
val reapInterval: Duration,
|
||||
/** The number of threads to use for observations (for executing [Observable.onNext]) */
|
||||
val observationExecutorPoolSize: Int,
|
||||
/** The maximum number of producers to create to handle outgoing messages */
|
||||
val producerPoolBound: Int,
|
||||
/**
|
||||
* Determines the concurrency level of the Observable Cache. This is exposed because it implicitly determines
|
||||
* the limit on the number of leaked observables reaped because of garbage collection per reaping.
|
||||
@ -56,9 +54,12 @@ data class RPCClientConfiguration(
|
||||
val connectionRetryIntervalMultiplier: Double,
|
||||
/** Maximum retry interval */
|
||||
val connectionMaxRetryInterval: Duration,
|
||||
/** Maximum reconnect attempts on failover */
|
||||
val maxReconnectAttempts: Int,
|
||||
/** Maximum file size */
|
||||
val maxFileSize: Int
|
||||
val maxFileSize: Int,
|
||||
/** The cache expiry of a deduplication watermark per client. */
|
||||
val deduplicationCacheExpiry: Duration
|
||||
) {
|
||||
companion object {
|
||||
val unlimitedReconnectAttempts = -1
|
||||
@ -68,14 +69,14 @@ data class RPCClientConfiguration(
|
||||
trackRpcCallSites = false,
|
||||
reapInterval = 1.seconds,
|
||||
observationExecutorPoolSize = 4,
|
||||
producerPoolBound = 1,
|
||||
cacheConcurrencyLevel = 8,
|
||||
connectionRetryInterval = 5.seconds,
|
||||
connectionRetryIntervalMultiplier = 1.5,
|
||||
connectionMaxRetryInterval = 3.minutes,
|
||||
maxReconnectAttempts = unlimitedReconnectAttempts,
|
||||
/** 10 MiB maximum allowed file size for attachments, including message headers. TODO: acquire this value from Network Map when supported. */
|
||||
maxFileSize = 10485760
|
||||
maxFileSize = 10485760,
|
||||
deduplicationCacheExpiry = 1.days
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -15,7 +15,6 @@ import net.corda.client.rpc.RPCSinceVersion
|
||||
import net.corda.core.context.Actor
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.context.Trace.InvocationId
|
||||
import net.corda.core.internal.LazyPool
|
||||
import net.corda.core.internal.LazyStickyPool
|
||||
import net.corda.core.internal.LifeCycle
|
||||
import net.corda.core.internal.ThreadBox
|
||||
@ -26,14 +25,12 @@ import net.corda.core.utilities.Try
|
||||
import net.corda.core.utilities.contextLogger
|
||||
import net.corda.core.utilities.debug
|
||||
import net.corda.core.utilities.getOrThrow
|
||||
import net.corda.nodeapi.ArtemisConsumer
|
||||
import net.corda.nodeapi.ArtemisProducer
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.internal.DeduplicationChecker
|
||||
import org.apache.activemq.artemis.api.core.RoutingType
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||
import rx.Notification
|
||||
import rx.Observable
|
||||
import rx.subjects.UnicastSubject
|
||||
@ -43,6 +40,7 @@ import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.*
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import kotlin.reflect.jvm.javaMethod
|
||||
|
||||
/**
|
||||
@ -111,6 +109,8 @@ class RPCClientProxyHandler(
|
||||
|
||||
// Used for reaping
|
||||
private var reaperExecutor: ScheduledExecutorService? = null
|
||||
// Used for sending
|
||||
private var sendExecutor: ExecutorService? = null
|
||||
|
||||
// A sticky pool for running Observable.onNext()s. We need the stickiness to preserve the observation ordering.
|
||||
private val observationExecutorThreadFactory = ThreadFactoryBuilder().setNameFormat("rpc-client-observation-pool-%d").setDaemon(true).build()
|
||||
@ -161,22 +161,14 @@ class RPCClientProxyHandler(
|
||||
build()
|
||||
}
|
||||
|
||||
// We cannot pool consumers as we need to preserve the original muxed message order.
|
||||
// TODO We may need to pool these somehow anyway, otherwise if the server sends many big messages in parallel a
|
||||
// single consumer may be starved for flow control credits. Recheck this once Artemis's large message streaming is
|
||||
// integrated properly.
|
||||
private var sessionAndConsumer: ArtemisConsumer? = null
|
||||
// Pool producers to reduce contention on the client side.
|
||||
private val sessionAndProducerPool = LazyPool(bound = rpcConfiguration.producerPoolBound) {
|
||||
// Note how we create new sessions *and* session factories per producer.
|
||||
// We cannot simply pool producers on one session because sessions are single threaded.
|
||||
// We cannot simply pool sessions on one session factory because flow control credits are tied to factories, so
|
||||
// sessions tend to starve each other when used concurrently.
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
val session = sessionFactory.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
session.start()
|
||||
ArtemisProducer(sessionFactory, session, session.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME))
|
||||
}
|
||||
private var sessionFactory: ClientSessionFactory? = null
|
||||
private var producerSession: ClientSession? = null
|
||||
private var consumerSession: ClientSession? = null
|
||||
private var rpcProducer: ClientProducer? = null
|
||||
private var rpcConsumer: ClientConsumer? = null
|
||||
|
||||
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
|
||||
private val deduplicationSequenceNumber = AtomicLong(0)
|
||||
|
||||
/**
|
||||
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
|
||||
@ -187,22 +179,25 @@ class RPCClientProxyHandler(
|
||||
1,
|
||||
ThreadFactoryBuilder().setNameFormat("rpc-client-reaper-%d").setDaemon(true).build()
|
||||
)
|
||||
sendExecutor = Executors.newSingleThreadExecutor(
|
||||
ThreadFactoryBuilder().setNameFormat("rpc-client-sender-%d").build()
|
||||
)
|
||||
reaperScheduledFuture = reaperExecutor!!.scheduleAtFixedRate(
|
||||
this::reapObservablesAndNotify,
|
||||
rpcConfiguration.reapInterval.toMillis(),
|
||||
rpcConfiguration.reapInterval.toMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
)
|
||||
sessionAndProducerPool.run {
|
||||
it.session.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
|
||||
}
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
val session = sessionFactory.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
val consumer = session.createConsumer(clientAddress)
|
||||
consumer.setMessageHandler(this@RPCClientProxyHandler::artemisMessageHandler)
|
||||
sessionAndConsumer = ArtemisConsumer(sessionFactory, session, consumer)
|
||||
sessionFactory = serverLocator.createSessionFactory()
|
||||
producerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
rpcProducer = producerSession!!.createProducer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
consumerSession = sessionFactory!!.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
consumerSession!!.createTemporaryQueue(clientAddress, RoutingType.ANYCAST, clientAddress)
|
||||
rpcConsumer = consumerSession!!.createConsumer(clientAddress)
|
||||
rpcConsumer!!.setMessageHandler(this::artemisMessageHandler)
|
||||
lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET)
|
||||
session.start()
|
||||
consumerSession!!.start()
|
||||
producerSession!!.start()
|
||||
}
|
||||
|
||||
// This is the general function that transforms a client side RPC to internal Artemis messages.
|
||||
@ -212,7 +207,7 @@ class RPCClientProxyHandler(
|
||||
if (method == toStringMethod) {
|
||||
return "Client RPC proxy for $rpcOpsClass"
|
||||
}
|
||||
if (sessionAndConsumer!!.session.isClosed) {
|
||||
if (consumerSession!!.isClosed) {
|
||||
throw RPCException("RPC Proxy is closed")
|
||||
}
|
||||
|
||||
@ -220,23 +215,20 @@ class RPCClientProxyHandler(
|
||||
callSiteMap?.set(replyId, Throwable("<Call site of root RPC '${method.name}'>"))
|
||||
try {
|
||||
val serialisedArguments = (arguments?.toList() ?: emptyList()).serialize(context = serializationContextWithObservableContext)
|
||||
val request = RPCApi.ClientToServer.RpcRequest(clientAddress, method.name, serialisedArguments.bytes, replyId, sessionId, externalTrace, impersonatedActor)
|
||||
val request = RPCApi.ClientToServer.RpcRequest(
|
||||
clientAddress,
|
||||
method.name,
|
||||
serialisedArguments,
|
||||
replyId,
|
||||
sessionId,
|
||||
externalTrace,
|
||||
impersonatedActor
|
||||
)
|
||||
val replyFuture = SettableFuture.create<Any>()
|
||||
sessionAndProducerPool.run {
|
||||
val message = it.session.createMessage(false)
|
||||
request.writeToClientMessage(message)
|
||||
|
||||
log.debug {
|
||||
val argumentsString = arguments?.joinToString() ?: ""
|
||||
"-> RPC(${replyId.value}) -> ${method.name}($argumentsString): ${method.returnType}"
|
||||
}
|
||||
|
||||
require(rpcReplyMap.put(replyId, replyFuture) == null) {
|
||||
"Generated several RPC requests with same ID $replyId"
|
||||
}
|
||||
it.producer.send(message)
|
||||
it.session.commit()
|
||||
require(rpcReplyMap.put(replyId, replyFuture) == null) {
|
||||
"Generated several RPC requests with same ID $replyId"
|
||||
}
|
||||
sendMessage(request)
|
||||
return replyFuture.getOrThrow()
|
||||
} catch (e: RuntimeException) {
|
||||
// Already an unchecked exception, so just rethrow it
|
||||
@ -249,9 +241,24 @@ class RPCClientProxyHandler(
|
||||
}
|
||||
}
|
||||
|
||||
private fun sendMessage(message: RPCApi.ClientToServer) {
|
||||
val artemisMessage = producerSession!!.createMessage(false)
|
||||
message.writeToClientMessage(artemisMessage)
|
||||
sendExecutor!!.submit {
|
||||
artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, deduplicationSequenceNumber.getAndIncrement())
|
||||
log.debug { "-> RPC -> $message" }
|
||||
rpcProducer!!.send(artemisMessage)
|
||||
}
|
||||
}
|
||||
|
||||
// The handler for Artemis messages.
|
||||
private fun artemisMessageHandler(message: ClientMessage) {
|
||||
val serverToClient = RPCApi.ServerToClient.fromClientMessage(serializationContextWithObservableContext, message)
|
||||
val deduplicationSequenceNumber = message.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME)
|
||||
if (deduplicationChecker.checkDuplicateMessageId(serverToClient.deduplicationIdentity, deduplicationSequenceNumber)) {
|
||||
log.info("Message duplication detected, discarding message")
|
||||
return
|
||||
}
|
||||
log.debug { "Got message from RPC server $serverToClient" }
|
||||
when (serverToClient) {
|
||||
is RPCApi.ServerToClient.RpcReply -> {
|
||||
@ -325,14 +332,12 @@ class RPCClientProxyHandler(
|
||||
* @param notify whether to notify observables or not.
|
||||
*/
|
||||
private fun close(notify: Boolean = true) {
|
||||
sessionAndConsumer?.sessionFactory?.close()
|
||||
sessionFactory?.close()
|
||||
reaperScheduledFuture?.cancel(false)
|
||||
observableContext.observableMap.invalidateAll()
|
||||
reapObservables(notify)
|
||||
reaperExecutor?.shutdownNow()
|
||||
sessionAndProducerPool.close().forEach {
|
||||
it.sessionFactory.close()
|
||||
}
|
||||
sendExecutor?.shutdownNow()
|
||||
// Note the ordering is important, we shut down the consumer *before* the observation executor, otherwise we may
|
||||
// leak borrowed executors.
|
||||
val observationExecutors = observationExecutorPool.close()
|
||||
@ -385,11 +390,7 @@ class RPCClientProxyHandler(
|
||||
}
|
||||
if (observableIds != null) {
|
||||
log.debug { "Reaping ${observableIds.size} observables" }
|
||||
sessionAndProducerPool.run {
|
||||
val message = it.session.createMessage(false)
|
||||
RPCApi.ClientToServer.ObservablesClosed(observableIds).writeToClientMessage(message)
|
||||
it.producer.send(message)
|
||||
}
|
||||
sendMessage(RPCApi.ClientToServer.ObservablesClosed(observableIds))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package net.corda.client.rpc
|
||||
import com.google.common.base.Stopwatch
|
||||
import net.corda.client.rpc.internal.RPCClientConfiguration
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.utilities.days
|
||||
import net.corda.core.utilities.minutes
|
||||
import net.corda.core.utilities.seconds
|
||||
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||
@ -87,13 +88,10 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
||||
val proxy = testProxy(
|
||||
RPCClientConfiguration.default.copy(
|
||||
cacheConcurrencyLevel = 16,
|
||||
observationExecutorPoolSize = 2,
|
||||
producerPoolBound = 2
|
||||
observationExecutorPoolSize = 2
|
||||
),
|
||||
RPCServerConfiguration.default.copy(
|
||||
rpcThreadPoolSize = 8,
|
||||
consumerPoolSize = 2,
|
||||
producerPoolBound = 8
|
||||
rpcThreadPoolSize = 8
|
||||
)
|
||||
)
|
||||
|
||||
@ -130,13 +128,10 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
||||
val proxy = testProxy(
|
||||
RPCClientConfiguration.default.copy(
|
||||
reapInterval = 1.seconds,
|
||||
cacheConcurrencyLevel = 16,
|
||||
producerPoolBound = 8
|
||||
cacheConcurrencyLevel = 16
|
||||
),
|
||||
RPCServerConfiguration.default.copy(
|
||||
rpcThreadPoolSize = 8,
|
||||
consumerPoolSize = 1,
|
||||
producerPoolBound = 8
|
||||
rpcThreadPoolSize = 8
|
||||
)
|
||||
)
|
||||
startPublishingFixedRateInjector(
|
||||
@ -165,9 +160,7 @@ class RPCPerformanceTests : AbstractRPCTest() {
|
||||
rpcDriver {
|
||||
val proxy = testProxy(
|
||||
RPCClientConfiguration.default,
|
||||
RPCServerConfiguration.default.copy(
|
||||
consumerPoolSize = 1
|
||||
)
|
||||
RPCServerConfiguration.default
|
||||
)
|
||||
val numberOfMessages = 1000
|
||||
val bigSize = 10_000_000
|
||||
|
@ -10,6 +10,7 @@ import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.deserialize
|
||||
import net.corda.core.serialization.serialize
|
||||
import net.corda.core.utilities.Id
|
||||
import net.corda.core.utilities.OpaqueBytes
|
||||
import net.corda.core.utilities.Try
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
@ -72,6 +73,9 @@ object RPCApi {
|
||||
const val RPC_CLIENT_BINDING_ADDITIONS = "rpc.clientqueueadditions"
|
||||
const val RPC_TARGET_LEGAL_IDENTITY = "rpc-target-legal-identity"
|
||||
|
||||
const val DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME = "deduplication-sequence-number"
|
||||
|
||||
|
||||
val RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION =
|
||||
"${ManagementHelper.HDR_NOTIFICATION_TYPE} = '${CoreNotificationType.BINDING_REMOVED.name}' AND " +
|
||||
"${ManagementHelper.HDR_ROUTING_NAME} LIKE '$RPC_CLIENT_QUEUE_NAME_PREFIX.%'"
|
||||
@ -94,6 +98,8 @@ object RPCApi {
|
||||
OBSERVABLES_CLOSED
|
||||
}
|
||||
|
||||
abstract fun writeToClientMessage(message: ClientMessage)
|
||||
|
||||
/**
|
||||
* Request to a server to trigger the specified method with the provided arguments.
|
||||
*
|
||||
@ -105,13 +111,13 @@ object RPCApi {
|
||||
data class RpcRequest(
|
||||
val clientAddress: SimpleString,
|
||||
val methodName: String,
|
||||
val serialisedArguments: ByteArray,
|
||||
val serialisedArguments: OpaqueBytes,
|
||||
val replyId: InvocationId,
|
||||
val sessionId: SessionId,
|
||||
val externalTrace: Trace? = null,
|
||||
val impersonatedActor: Actor? = null
|
||||
) : ClientToServer() {
|
||||
fun writeToClientMessage(message: ClientMessage) {
|
||||
override fun writeToClientMessage(message: ClientMessage) {
|
||||
MessageUtil.setJMSReplyTo(message, clientAddress)
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REQUEST.ordinal)
|
||||
|
||||
@ -122,12 +128,12 @@ object RPCApi {
|
||||
impersonatedActor?.mapToImpersonated(message)
|
||||
|
||||
message.putStringProperty(METHOD_NAME_FIELD_NAME, methodName)
|
||||
message.bodyBuffer.writeBytes(serialisedArguments)
|
||||
message.bodyBuffer.writeBytes(serialisedArguments.bytes)
|
||||
}
|
||||
}
|
||||
|
||||
data class ObservablesClosed(val ids: List<InvocationId>) : ClientToServer() {
|
||||
fun writeToClientMessage(message: ClientMessage) {
|
||||
override fun writeToClientMessage(message: ClientMessage) {
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.OBSERVABLES_CLOSED.ordinal)
|
||||
val buffer = message.bodyBuffer
|
||||
buffer.writeInt(ids.size)
|
||||
@ -144,7 +150,7 @@ object RPCApi {
|
||||
RPCApi.ClientToServer.Tag.RPC_REQUEST -> RpcRequest(
|
||||
clientAddress = MessageUtil.getJMSReplyTo(message),
|
||||
methodName = message.getStringProperty(METHOD_NAME_FIELD_NAME),
|
||||
serialisedArguments = message.getBodyAsByteArray(),
|
||||
serialisedArguments = OpaqueBytes(message.getBodyAsByteArray()),
|
||||
replyId = message.replyId(),
|
||||
sessionId = message.sessionId(),
|
||||
externalTrace = message.externalTrace(),
|
||||
@ -175,13 +181,21 @@ object RPCApi {
|
||||
|
||||
abstract fun writeToClientMessage(context: SerializationContext, message: ClientMessage)
|
||||
|
||||
/** Reply in response to an [ClientToServer.RpcRequest]. */
|
||||
abstract val deduplicationIdentity: String
|
||||
|
||||
/**
|
||||
* Reply in response to an [ClientToServer.RpcRequest].
|
||||
* @property deduplicationSequenceNumber a sequence number strictly incrementing with each message. Use this for
|
||||
* duplicate detection on the client.
|
||||
*/
|
||||
data class RpcReply(
|
||||
val id: InvocationId,
|
||||
val result: Try<Any?>
|
||||
val result: Try<Any?>,
|
||||
override val deduplicationIdentity: String
|
||||
) : ServerToClient() {
|
||||
override fun writeToClientMessage(context: SerializationContext, message: ClientMessage) {
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.RPC_REPLY.ordinal)
|
||||
message.putStringProperty(DEDUPLICATION_IDENTITY_FIELD_NAME, deduplicationIdentity)
|
||||
id.mapTo(message, RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME)
|
||||
message.bodyBuffer.writeBytes(result.safeSerialize(context) { Try.Failure<Any>(it) }.bytes)
|
||||
}
|
||||
@ -189,10 +203,12 @@ object RPCApi {
|
||||
|
||||
data class Observation(
|
||||
val id: InvocationId,
|
||||
val content: Notification<*>
|
||||
val content: Notification<*>,
|
||||
override val deduplicationIdentity: String
|
||||
) : ServerToClient() {
|
||||
override fun writeToClientMessage(context: SerializationContext, message: ClientMessage) {
|
||||
message.putIntProperty(TAG_FIELD_NAME, Tag.OBSERVATION.ordinal)
|
||||
message.putStringProperty(DEDUPLICATION_IDENTITY_FIELD_NAME, deduplicationIdentity)
|
||||
id.mapTo(message, OBSERVABLE_ID_FIELD_NAME, OBSERVABLE_ID_TIMESTAMP_FIELD_NAME)
|
||||
message.bodyBuffer.writeBytes(content.safeSerialize(context) { Notification.createOnError<Void?>(it) }.bytes)
|
||||
}
|
||||
@ -207,17 +223,26 @@ object RPCApi {
|
||||
|
||||
fun fromClientMessage(context: SerializationContext, message: ClientMessage): ServerToClient {
|
||||
val tag = Tag.values()[message.getIntProperty(TAG_FIELD_NAME)]
|
||||
val deduplicationIdentity = message.getStringProperty(DEDUPLICATION_IDENTITY_FIELD_NAME)
|
||||
return when (tag) {
|
||||
RPCApi.ServerToClient.Tag.RPC_REPLY -> {
|
||||
val id = message.invocationId(RPC_ID_FIELD_NAME, RPC_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.")
|
||||
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, id)
|
||||
RpcReply(id, message.getBodyAsByteArray().deserialize(context = poolWithIdContext))
|
||||
RpcReply(
|
||||
id = id,
|
||||
deduplicationIdentity = deduplicationIdentity,
|
||||
result = message.getBodyAsByteArray().deserialize(context = poolWithIdContext)
|
||||
)
|
||||
}
|
||||
RPCApi.ServerToClient.Tag.OBSERVATION -> {
|
||||
val observableId = message.invocationId(OBSERVABLE_ID_FIELD_NAME, OBSERVABLE_ID_TIMESTAMP_FIELD_NAME) ?: throw IllegalStateException("Cannot parse invocation id from client message.")
|
||||
val poolWithIdContext = context.withProperty(RpcRequestOrObservableIdKey, observableId)
|
||||
val payload = message.getBodyAsByteArray().deserialize<Notification<*>>(context = poolWithIdContext)
|
||||
Observation(observableId, payload)
|
||||
Observation(
|
||||
id = observableId,
|
||||
deduplicationIdentity = deduplicationIdentity,
|
||||
content = payload
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -225,18 +250,6 @@ object RPCApi {
|
||||
}
|
||||
}
|
||||
|
||||
data class ArtemisProducer(
|
||||
val sessionFactory: ClientSessionFactory,
|
||||
val session: ClientSession,
|
||||
val producer: ClientProducer
|
||||
)
|
||||
|
||||
data class ArtemisConsumer(
|
||||
val sessionFactory: ClientSessionFactory,
|
||||
val session: ClientSession,
|
||||
val consumer: ClientConsumer
|
||||
)
|
||||
|
||||
private val TAG_FIELD_NAME = "tag"
|
||||
private val RPC_ID_FIELD_NAME = "rpc-id"
|
||||
private val RPC_ID_TIMESTAMP_FIELD_NAME = "rpc-id-timestamp"
|
||||
@ -249,6 +262,7 @@ private val RPC_EXTERNAL_SESSION_ID_TIMESTAMP_FIELD_NAME = "rpc-external-session
|
||||
private val RPC_IMPERSONATED_ACTOR_ID = "rpc-impersonated-actor-id"
|
||||
private val RPC_IMPERSONATED_ACTOR_STORE_ID = "rpc-impersonated-actor-store-id"
|
||||
private val RPC_IMPERSONATED_ACTOR_OWNING_LEGAL_IDENTITY = "rpc-impersonated-actor-owningLegalIdentity"
|
||||
private val DEDUPLICATION_IDENTITY_FIELD_NAME = "deduplication-identity"
|
||||
private val OBSERVABLE_ID_FIELD_NAME = "observable-id"
|
||||
private val OBSERVABLE_ID_TIMESTAMP_FIELD_NAME = "observable-id-timestamp"
|
||||
private val METHOD_NAME_FIELD_NAME = "method-name"
|
||||
|
@ -0,0 +1,29 @@
|
||||
package net.corda.nodeapi.internal
|
||||
|
||||
import com.google.common.cache.CacheBuilder
|
||||
import com.google.common.cache.CacheLoader
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
|
||||
/**
|
||||
* A class allowing the deduplication of a strictly incrementing sequence number.
|
||||
*/
|
||||
class DeduplicationChecker(cacheExpiry: Duration) {
|
||||
// dedupe identity -> watermark cache
|
||||
private val watermarkCache = CacheBuilder.newBuilder()
|
||||
.expireAfterAccess(cacheExpiry.toNanos(), TimeUnit.NANOSECONDS)
|
||||
.build(WatermarkCacheLoader)
|
||||
|
||||
private object WatermarkCacheLoader : CacheLoader<Any, AtomicLong>() {
|
||||
override fun load(key: Any) = AtomicLong(-1)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param identity the identity that generates the sequence numbers.
|
||||
* @param sequenceNumber the sequence number to check.
|
||||
*/
|
||||
fun checkDuplicateMessageId(identity: Any, sequenceNumber: Long): Boolean {
|
||||
return watermarkCache[identity].getAndUpdate { maxOf(sequenceNumber, it) } >= sequenceNumber
|
||||
}
|
||||
}
|
@ -18,9 +18,7 @@ import net.corda.core.context.InvocationContext
|
||||
import net.corda.core.context.Trace
|
||||
import net.corda.core.context.Trace.InvocationId
|
||||
import net.corda.core.identity.CordaX500Name
|
||||
import net.corda.core.internal.LazyStickyPool
|
||||
import net.corda.core.internal.LifeCycle
|
||||
import net.corda.core.internal.join
|
||||
import net.corda.core.messaging.RPCOps
|
||||
import net.corda.core.serialization.SerializationContext
|
||||
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
|
||||
@ -29,14 +27,14 @@ import net.corda.core.utilities.*
|
||||
import net.corda.node.internal.security.AuthorizingSubject
|
||||
import net.corda.node.internal.security.RPCSecurityManager
|
||||
import net.corda.node.services.logging.pushToLoggingContext
|
||||
import net.corda.nodeapi.*
|
||||
import net.corda.nodeapi.RPCApi
|
||||
import net.corda.nodeapi.externalTrace
|
||||
import net.corda.nodeapi.impersonatedActor
|
||||
import net.corda.nodeapi.internal.DeduplicationChecker
|
||||
import org.apache.activemq.artemis.api.core.Message
|
||||
import org.apache.activemq.artemis.api.core.SimpleString
|
||||
import org.apache.activemq.artemis.api.core.client.*
|
||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
|
||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper
|
||||
@ -49,24 +47,26 @@ import rx.Subscription
|
||||
import java.lang.reflect.InvocationTargetException
|
||||
import java.lang.reflect.Method
|
||||
import java.time.Duration
|
||||
import java.util.*
|
||||
import java.util.concurrent.*
|
||||
import kotlin.concurrent.thread
|
||||
|
||||
data class RPCServerConfiguration(
|
||||
/** The number of threads to use for handling RPC requests */
|
||||
val rpcThreadPoolSize: Int,
|
||||
/** The number of consumers to handle incoming messages */
|
||||
val consumerPoolSize: Int,
|
||||
/** The maximum number of producers to create to handle outgoing messages */
|
||||
val producerPoolBound: Int,
|
||||
/** The interval of subscription reaping */
|
||||
val reapInterval: Duration
|
||||
val reapInterval: Duration,
|
||||
/** The cache expiry of a deduplication watermark per client. */
|
||||
val deduplicationCacheExpiry: Duration,
|
||||
/** The size of the send queue */
|
||||
val sendJobQueueSize: Int
|
||||
) {
|
||||
companion object {
|
||||
val default = RPCServerConfiguration(
|
||||
rpcThreadPoolSize = 4,
|
||||
consumerPoolSize = 2,
|
||||
producerPoolBound = 4,
|
||||
reapInterval = 1.seconds
|
||||
reapInterval = 1.seconds,
|
||||
deduplicationCacheExpiry = 1.days,
|
||||
sendJobQueueSize = 256
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -115,22 +115,24 @@ class RPCServer(
|
||||
/** The scheduled reaper handle. */
|
||||
private var reaperScheduledFuture: ScheduledFuture<*>? = null
|
||||
|
||||
private var observationSendExecutor: ExecutorService? = null
|
||||
private var senderThread: Thread? = null
|
||||
private var rpcExecutor: ScheduledExecutorService? = null
|
||||
private var reaperExecutor: ScheduledExecutorService? = null
|
||||
|
||||
private val sessionAndConsumers = ArrayList<ArtemisConsumer>(rpcConfiguration.consumerPoolSize)
|
||||
private val sessionAndProducerPool = LazyStickyPool(rpcConfiguration.producerPoolBound) {
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
session.start()
|
||||
ArtemisProducer(sessionFactory, session, session.createProducer())
|
||||
}
|
||||
private var sessionFactory: ClientSessionFactory? = null
|
||||
private var producerSession: ClientSession? = null
|
||||
private var consumerSession: ClientSession? = null
|
||||
private var rpcProducer: ClientProducer? = null
|
||||
private var rpcConsumer: ClientConsumer? = null
|
||||
private var clientBindingRemovalConsumer: ClientConsumer? = null
|
||||
private var clientBindingAdditionConsumer: ClientConsumer? = null
|
||||
private var serverControl: ActiveMQServerControl? = null
|
||||
|
||||
private val responseMessageBuffer = ConcurrentHashMap<SimpleString, BufferOrNone>()
|
||||
private val sendJobQueue = ArrayBlockingQueue<RpcSendJob>(rpcConfiguration.sendJobQueueSize)
|
||||
|
||||
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
|
||||
private var deduplicationIdentity: String? = null
|
||||
|
||||
init {
|
||||
val groupedMethods = ops.javaClass.declaredMethods.groupBy { it.name }
|
||||
@ -154,16 +156,12 @@ class RPCServer(
|
||||
try {
|
||||
lifeCycle.requireState(State.UNSTARTED)
|
||||
log.info("Starting RPC server with configuration $rpcConfiguration")
|
||||
observationSendExecutor = Executors.newFixedThreadPool(
|
||||
1,
|
||||
ThreadFactoryBuilder().setNameFormat("rpc-observation-sender-%d").build()
|
||||
)
|
||||
senderThread = startSenderThread()
|
||||
rpcExecutor = Executors.newScheduledThreadPool(
|
||||
rpcConfiguration.rpcThreadPoolSize,
|
||||
ThreadFactoryBuilder().setNameFormat("rpc-server-handler-pool-%d").build()
|
||||
)
|
||||
reaperExecutor = Executors.newScheduledThreadPool(
|
||||
1,
|
||||
reaperExecutor = Executors.newSingleThreadScheduledExecutor(
|
||||
ThreadFactoryBuilder().setNameFormat("rpc-server-reaper-%d").build()
|
||||
)
|
||||
reaperScheduledFuture = reaperExecutor!!.scheduleAtFixedRate(
|
||||
@ -172,55 +170,77 @@ class RPCServer(
|
||||
rpcConfiguration.reapInterval.toMillis(),
|
||||
TimeUnit.MILLISECONDS
|
||||
)
|
||||
val sessions = createConsumerSessions()
|
||||
createNotificationConsumers()
|
||||
|
||||
sessionFactory = serverLocator.createSessionFactory()
|
||||
producerSession = sessionFactory!!.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
createRpcProducer(producerSession!!)
|
||||
consumerSession = sessionFactory!!.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
createRpcConsumer(consumerSession!!)
|
||||
createNotificationConsumers(consumerSession!!)
|
||||
serverControl = activeMqServerControl
|
||||
deduplicationIdentity = UUID.randomUUID().toString()
|
||||
lifeCycle.transition(State.UNSTARTED, State.STARTED)
|
||||
// We delay the consumer session start because Artemis starts delivering messages immediately, so we need to be
|
||||
// fully initialised.
|
||||
sessions.forEach {
|
||||
it.start()
|
||||
}
|
||||
producerSession!!.start()
|
||||
consumerSession!!.start()
|
||||
} catch (exception: Throwable) {
|
||||
close()
|
||||
throw exception
|
||||
}
|
||||
}
|
||||
|
||||
private fun createConsumerSessions(): ArrayList<ClientSession> {
|
||||
val sessions = ArrayList<ClientSession>()
|
||||
for (i in 1..rpcConfiguration.consumerPoolSize) {
|
||||
val sessionFactory = serverLocator.createSessionFactory()
|
||||
val session = sessionFactory.createSession(rpcServerUsername, rpcServerPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||
val consumer = session.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
consumer.setMessageHandler(this@RPCServer::clientArtemisMessageHandler)
|
||||
sessionAndConsumers.add(ArtemisConsumer(sessionFactory, session, consumer))
|
||||
sessions.add(session)
|
||||
}
|
||||
return sessions
|
||||
private fun createRpcProducer(producerSession: ClientSession) {
|
||||
rpcProducer = producerSession.createProducer()
|
||||
}
|
||||
|
||||
private fun createNotificationConsumers() {
|
||||
clientBindingRemovalConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS)
|
||||
private fun createRpcConsumer(consumerSession: ClientSession) {
|
||||
rpcConsumer = consumerSession.createConsumer(RPCApi.RPC_SERVER_QUEUE_NAME)
|
||||
rpcConsumer!!.setMessageHandler(this::clientArtemisMessageHandler)
|
||||
}
|
||||
|
||||
private fun createNotificationConsumers(consumerSession: ClientSession) {
|
||||
clientBindingRemovalConsumer = consumerSession.createConsumer(RPCApi.RPC_CLIENT_BINDING_REMOVALS)
|
||||
clientBindingRemovalConsumer!!.setMessageHandler(this::bindingRemovalArtemisMessageHandler)
|
||||
clientBindingAdditionConsumer = sessionAndConsumers[0].session.createConsumer(RPCApi.RPC_CLIENT_BINDING_ADDITIONS)
|
||||
clientBindingAdditionConsumer = consumerSession.createConsumer(RPCApi.RPC_CLIENT_BINDING_ADDITIONS)
|
||||
clientBindingAdditionConsumer!!.setMessageHandler(this::bindingAdditionArtemisMessageHandler)
|
||||
}
|
||||
|
||||
private fun startSenderThread(): Thread {
|
||||
return thread(name = "rpc-server-sender", isDaemon = true) {
|
||||
var deduplicationSequenceNumber = 0L
|
||||
while (true) {
|
||||
val job = sendJobQueue.poll()
|
||||
when (job) {
|
||||
is RpcSendJob.Send -> handleSendJob(deduplicationSequenceNumber++, job)
|
||||
RpcSendJob.Stop -> return@thread
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleSendJob(sequenceNumber: Long, job: RpcSendJob.Send) {
|
||||
try {
|
||||
job.artemisMessage.putLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME, sequenceNumber)
|
||||
rpcProducer!!.send(job.clientAddress, job.artemisMessage)
|
||||
log.debug { "<- RPC <- ${job.originalMessage}" }
|
||||
} catch (throwable: Throwable) {
|
||||
log.error("Failed to send message, kicking client. Message was ${job.originalMessage}", throwable)
|
||||
serverControl!!.closeConsumerConnectionsForAddress(job.clientAddress.toString())
|
||||
invalidateClient(job.clientAddress)
|
||||
}
|
||||
}
|
||||
|
||||
fun close() {
|
||||
observationSendExecutor?.join()
|
||||
sendJobQueue.put(RpcSendJob.Stop)
|
||||
senderThread?.join()
|
||||
reaperScheduledFuture?.cancel(false)
|
||||
rpcExecutor?.shutdownNow()
|
||||
reaperExecutor?.shutdownNow()
|
||||
securityManager.close()
|
||||
sessionAndConsumers.forEach {
|
||||
it.sessionFactory.close()
|
||||
}
|
||||
sessionFactory?.close()
|
||||
observableMap.invalidateAll()
|
||||
reapSubscriptions()
|
||||
sessionAndProducerPool.close().forEach {
|
||||
it.sessionFactory.close()
|
||||
}
|
||||
lifeCycle.justTransition(State.FINISHED)
|
||||
}
|
||||
|
||||
@ -273,6 +293,14 @@ class RPCServer(
|
||||
log.debug { "-> RPC -> $clientToServer" }
|
||||
when (clientToServer) {
|
||||
is RPCApi.ClientToServer.RpcRequest -> {
|
||||
val deduplicationSequenceNumber = artemisMessage.getLongProperty(RPCApi.DEDUPLICATION_SEQUENCE_NUMBER_FIELD_NAME)
|
||||
if (deduplicationChecker.checkDuplicateMessageId(
|
||||
identity = clientToServer.clientAddress,
|
||||
sequenceNumber = deduplicationSequenceNumber
|
||||
)) {
|
||||
log.info("Message duplication detected, discarding message")
|
||||
return
|
||||
}
|
||||
val arguments = Try.on {
|
||||
clientToServer.serialisedArguments.deserialize<List<Any?>>(context = RPC_SERVER_CONTEXT)
|
||||
}
|
||||
@ -316,15 +344,16 @@ class RPCServer(
|
||||
}
|
||||
|
||||
private fun sendReply(replyId: InvocationId, clientAddress: SimpleString, result: Try<Any>) {
|
||||
val reply = RPCApi.ServerToClient.RpcReply(replyId, result)
|
||||
val reply = RPCApi.ServerToClient.RpcReply(
|
||||
id = replyId,
|
||||
result = result,
|
||||
deduplicationIdentity = deduplicationIdentity!!
|
||||
)
|
||||
val observableContext = ObservableContext(
|
||||
replyId,
|
||||
observableMap,
|
||||
clientAddressToObservables,
|
||||
clientAddress,
|
||||
serverControl!!,
|
||||
sessionAndProducerPool,
|
||||
observationSendExecutor!!
|
||||
deduplicationIdentity!!,
|
||||
clientAddress
|
||||
)
|
||||
|
||||
val buffered = bufferIfQueueNotBound(clientAddress, reply, observableContext)
|
||||
@ -370,6 +399,34 @@ class RPCServer(
|
||||
val targetLegalIdentity = message.getStringProperty(RPCApi.RPC_TARGET_LEGAL_IDENTITY)?.let(CordaX500Name.Companion::parse) ?: nodeLegalName
|
||||
return Pair(Actor(Id(validatedUser), securityManager.id, targetLegalIdentity), securityManager.buildSubject(validatedUser))
|
||||
}
|
||||
|
||||
// We construct an observable context on each RPC request. If subsequently a nested Observable is
|
||||
// encountered this same context is propagated by the instrumented KryoPool. This way all
|
||||
// observations rooted in a single RPC will be muxed correctly. Note that the context construction
|
||||
// itself is quite cheap.
|
||||
inner class ObservableContext(
|
||||
val observableMap: ObservableSubscriptionMap,
|
||||
val clientAddressToObservables: SetMultimap<SimpleString, InvocationId>,
|
||||
val deduplicationIdentity: String,
|
||||
val clientAddress: SimpleString
|
||||
) {
|
||||
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this)
|
||||
|
||||
fun sendMessage(serverToClient: RPCApi.ServerToClient) {
|
||||
val artemisMessage = producerSession!!.createMessage(false)
|
||||
serverToClient.writeToClientMessage(serializationContextWithObservableContext, artemisMessage)
|
||||
sendJobQueue.put(RpcSendJob.Send(clientAddress, artemisMessage, serverToClient))
|
||||
}
|
||||
}
|
||||
|
||||
private sealed class RpcSendJob {
|
||||
data class Send(
|
||||
val clientAddress: SimpleString,
|
||||
val artemisMessage: ClientMessage,
|
||||
val originalMessage: RPCApi.ServerToClient
|
||||
) : RpcSendJob()
|
||||
object Stop : RpcSendJob()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO replace this by creating a new CordaRPCImpl for each request, passing the context, after we fix Shell and WebServer
|
||||
@ -417,45 +474,11 @@ class ObservableSubscription(
|
||||
|
||||
typealias ObservableSubscriptionMap = Cache<InvocationId, ObservableSubscription>
|
||||
|
||||
// We construct an observable context on each RPC request. If subsequently a nested Observable is
|
||||
// encountered this same context is propagated by the instrumented KryoPool. This way all
|
||||
// observations rooted in a single RPC will be muxed correctly. Note that the context construction
|
||||
// itself is quite cheap.
|
||||
class ObservableContext(
|
||||
val invocationId: InvocationId,
|
||||
val observableMap: ObservableSubscriptionMap,
|
||||
val clientAddressToObservables: SetMultimap<SimpleString, InvocationId>,
|
||||
val clientAddress: SimpleString,
|
||||
val serverControl: ActiveMQServerControl,
|
||||
val sessionAndProducerPool: LazyStickyPool<ArtemisProducer>,
|
||||
val observationSendExecutor: ExecutorService
|
||||
) {
|
||||
private companion object {
|
||||
private val log = contextLogger()
|
||||
}
|
||||
|
||||
private val serializationContextWithObservableContext = RpcServerObservableSerializer.createContext(this)
|
||||
|
||||
fun sendMessage(serverToClient: RPCApi.ServerToClient) {
|
||||
try {
|
||||
sessionAndProducerPool.run(invocationId) {
|
||||
val artemisMessage = it.session.createMessage(false)
|
||||
serverToClient.writeToClientMessage(serializationContextWithObservableContext, artemisMessage)
|
||||
it.producer.send(clientAddress, artemisMessage)
|
||||
log.debug("<- RPC <- $serverToClient")
|
||||
}
|
||||
} catch (throwable: Throwable) {
|
||||
log.error("Failed to send message, kicking client. Message was $serverToClient", throwable)
|
||||
serverControl.closeConsumerConnectionsForAddress(clientAddress.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
private object RpcObservableContextKey
|
||||
|
||||
private val log = LoggerFactory.getLogger(javaClass)
|
||||
fun createContext(observableContext: ObservableContext): SerializationContext {
|
||||
fun createContext(observableContext: RPCServer.ObservableContext): SerializationContext {
|
||||
return RPC_SERVER_CONTEXT.withProperty(RpcServerObservableSerializer.RpcObservableContextKey, observableContext)
|
||||
}
|
||||
|
||||
@ -465,7 +488,7 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
|
||||
override fun write(kryo: Kryo, output: Output, observable: Observable<*>) {
|
||||
val observableId = InvocationId.newInstance()
|
||||
val observableContext = kryo.context[RpcObservableContextKey] as ObservableContext
|
||||
val observableContext = kryo.context[RpcObservableContextKey] as RPCServer.ObservableContext
|
||||
output.writeInvocationId(observableId)
|
||||
val observableWithSubscription = ObservableSubscription(
|
||||
// We capture [observableContext] in the subscriber. Note that all synchronisation/kryo borrowing
|
||||
@ -474,9 +497,12 @@ object RpcServerObservableSerializer : Serializer<Observable<*>>() {
|
||||
object : Subscriber<Notification<*>>() {
|
||||
override fun onNext(observation: Notification<*>) {
|
||||
if (!isUnsubscribed) {
|
||||
observableContext.observationSendExecutor.submit {
|
||||
observableContext.sendMessage(RPCApi.ServerToClient.Observation(observableId, observation))
|
||||
}
|
||||
val message = RPCApi.ServerToClient.Observation(
|
||||
id = observableId,
|
||||
content = observation,
|
||||
deduplicationIdentity = observableContext.deduplicationIdentity
|
||||
)
|
||||
observableContext.sendMessage(message)
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user