mirror of
https://github.com/corda/corda.git
synced 2025-06-21 08:40:03 +00:00
#592: Address more comments
This commit is contained in:
@ -14,7 +14,6 @@ import net.corda.nodeapi.RPCApi
|
|||||||
import net.corda.nodeapi.RPCKryo
|
import net.corda.nodeapi.RPCKryo
|
||||||
import net.corda.testing.*
|
import net.corda.testing.*
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.bouncycastle.crypto.tls.ConnectionEnd.server
|
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
@ -80,7 +79,7 @@ class RPCStabilityTests {
|
|||||||
}
|
}
|
||||||
val server = startRpcServer<TrackSubscriberOps>(
|
val server = startRpcServer<TrackSubscriberOps>(
|
||||||
configuration = RPCServerConfiguration.default.copy(
|
configuration = RPCServerConfiguration.default.copy(
|
||||||
reapIntervalMs = 100
|
reapInterval = 100.millis
|
||||||
),
|
),
|
||||||
ops = trackSubscriberOpsImpl
|
ops = trackSubscriberOpsImpl
|
||||||
).get()
|
).get()
|
||||||
|
@ -162,7 +162,6 @@ class RPCClientProxyHandler(
|
|||||||
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
|
* Start the client. This creates the per-client queue, starts the consumer session and the reaper.
|
||||||
*/
|
*/
|
||||||
fun start() {
|
fun start() {
|
||||||
lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET)
|
|
||||||
reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate(
|
reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate(
|
||||||
this::reapObservables,
|
this::reapObservables,
|
||||||
rpcConfiguration.reapInterval.toMillis(),
|
rpcConfiguration.reapInterval.toMillis(),
|
||||||
@ -176,12 +175,12 @@ class RPCClientProxyHandler(
|
|||||||
val session = sessionFactory.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
val session = sessionFactory.createSession(rpcUsername, rpcPassword, false, true, true, false, DEFAULT_ACK_BATCH_SIZE)
|
||||||
val consumer = session.createConsumer(clientAddress)
|
val consumer = session.createConsumer(clientAddress)
|
||||||
consumer.setMessageHandler(this@RPCClientProxyHandler::artemisMessageHandler)
|
consumer.setMessageHandler(this@RPCClientProxyHandler::artemisMessageHandler)
|
||||||
session.start()
|
|
||||||
sessionAndConsumer = ArtemisConsumer(sessionFactory, session, consumer)
|
sessionAndConsumer = ArtemisConsumer(sessionFactory, session, consumer)
|
||||||
|
lifeCycle.transition(State.UNSTARTED, State.SERVER_VERSION_NOT_SET)
|
||||||
|
session.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is the general function that transforms a client side RPC to internal Artemis messages.
|
// This is the general function that transforms a client side RPC to internal Artemis messages.
|
||||||
@CallerSensitive
|
|
||||||
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
|
override fun invoke(proxy: Any, method: Method, arguments: Array<out Any?>?): Any? {
|
||||||
lifeCycle.requireState { it == State.STARTED || it == State.SERVER_VERSION_NOT_SET }
|
lifeCycle.requireState { it == State.STARTED || it == State.SERVER_VERSION_NOT_SET }
|
||||||
checkProtocolVersion(method)
|
checkProtocolVersion(method)
|
||||||
@ -269,7 +268,6 @@ class RPCClientProxyHandler(
|
|||||||
* Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors.
|
* Closes the RPC proxy. Reaps all observables, shuts down the reaper, closes all sessions and executors.
|
||||||
*/
|
*/
|
||||||
fun close() {
|
fun close() {
|
||||||
lifeCycle.transition(State.STARTED, State.FINISHED)
|
|
||||||
sessionAndConsumer.consumer.close()
|
sessionAndConsumer.consumer.close()
|
||||||
sessionAndConsumer.session.close()
|
sessionAndConsumer.session.close()
|
||||||
sessionAndConsumer.sessionFactory.close()
|
sessionAndConsumer.sessionFactory.close()
|
||||||
@ -287,6 +285,7 @@ class RPCClientProxyHandler(
|
|||||||
val observationExecutors = observationExecutorPool.close()
|
val observationExecutors = observationExecutorPool.close()
|
||||||
observationExecutors.forEach { it.shutdownNow() }
|
observationExecutors.forEach { it.shutdownNow() }
|
||||||
observationExecutors.forEach { it.awaitTermination(100, TimeUnit.MILLISECONDS) }
|
observationExecutors.forEach { it.awaitTermination(100, TimeUnit.MILLISECONDS) }
|
||||||
|
lifeCycle.transition(State.STARTED, State.FINISHED)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -310,12 +309,12 @@ class RPCClientProxyHandler(
|
|||||||
* RPCs already may be called with it.
|
* RPCs already may be called with it.
|
||||||
*/
|
*/
|
||||||
internal fun setServerProtocolVersion(version: Int) {
|
internal fun setServerProtocolVersion(version: Int) {
|
||||||
lifeCycle.transition(State.SERVER_VERSION_NOT_SET, State.STARTED)
|
|
||||||
if (serverProtocolVersion == null) {
|
if (serverProtocolVersion == null) {
|
||||||
serverProtocolVersion = version
|
serverProtocolVersion = version
|
||||||
} else {
|
} else {
|
||||||
throw IllegalStateException("setServerProtocolVersion called, but the protocol version was already set!")
|
throw IllegalStateException("setServerProtocolVersion called, but the protocol version was already set!")
|
||||||
}
|
}
|
||||||
|
lifeCycle.transition(State.SERVER_VERSION_NOT_SET, State.STARTED)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun reapObservables() {
|
private fun reapObservables() {
|
||||||
|
@ -8,15 +8,11 @@ import java.util.concurrent.LinkedBlockingQueue
|
|||||||
* Any subsequent borrows using the same object will return the same pooled instance.
|
* Any subsequent borrows using the same object will return the same pooled instance.
|
||||||
*
|
*
|
||||||
* @param size The size of the pool.
|
* @param size The size of the pool.
|
||||||
* @param shouldReturnToPool If specified this function will be run on each release to determine whether the instance
|
|
||||||
* should be returned to the pool for reuse. This may be useful for pooled resources that dynamically grow during
|
|
||||||
* usage, and we may not want to retain them forever.
|
|
||||||
* @param newInstance The function to call to create a pooled resource.
|
* @param newInstance The function to call to create a pooled resource.
|
||||||
*/
|
*/
|
||||||
// TODO This could be implemented more efficiently. Currently the "non-sticky" use case is not optimised, it just chooses a random instance to wait on.
|
// TODO This could be implemented more efficiently. Currently the "non-sticky" use case is not optimised, it just chooses a random instance to wait on.
|
||||||
class LazyStickyPool<A : Any>(
|
class LazyStickyPool<A : Any>(
|
||||||
size: Int,
|
size: Int,
|
||||||
private val shouldReturnToPool: ((A) -> Boolean)? = null,
|
|
||||||
private val newInstance: () -> A
|
private val newInstance: () -> A
|
||||||
) {
|
) {
|
||||||
private class InstanceBox<A> {
|
private class InstanceBox<A> {
|
||||||
@ -52,12 +48,7 @@ class LazyStickyPool<A : Any>(
|
|||||||
|
|
||||||
fun release(stickTo: Any, instance: A) {
|
fun release(stickTo: Any, instance: A) {
|
||||||
val box = boxes[toIndex(stickTo)]
|
val box = boxes[toIndex(stickTo)]
|
||||||
if (shouldReturnToPool == null || shouldReturnToPool.invoke(instance)) {
|
|
||||||
box.instance!!.add(instance)
|
box.instance!!.add(instance)
|
||||||
} else {
|
|
||||||
// We need to create a new instance instead of setting the queue to null to unblock potentially waiting threads.
|
|
||||||
box.instance!!.add(newInstance())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
inline fun <R> run(stickToOrNull: Any? = null, withInstance: (A) -> R): R {
|
inline fun <R> run(stickToOrNull: Any? = null, withInstance: (A) -> R): R {
|
||||||
|
@ -54,7 +54,7 @@ import java.util.*
|
|||||||
* Note that multiple sessions like the above may interleave in an arbitrary fashion.
|
* Note that multiple sessions like the above may interleave in an arbitrary fashion.
|
||||||
*
|
*
|
||||||
* Additionally the server may listen on client binding removals for cleanup using [RPC_CLIENT_BINDING_REMOVALS]. This
|
* Additionally the server may listen on client binding removals for cleanup using [RPC_CLIENT_BINDING_REMOVALS]. This
|
||||||
* requires the server to create a filter on the artemis notification address using
|
* requires the server to create a filter on the artemis notification address using [RPC_CLIENT_BINDING_REMOVAL_FILTER_EXPRESSION]
|
||||||
*/
|
*/
|
||||||
object RPCApi {
|
object RPCApi {
|
||||||
private val TAG_FIELD_NAME = "tag"
|
private val TAG_FIELD_NAME = "tag"
|
||||||
|
@ -16,6 +16,7 @@ import net.corda.core.ErrorOr
|
|||||||
import net.corda.core.crypto.commonName
|
import net.corda.core.crypto.commonName
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.random63BitValue
|
import net.corda.core.random63BitValue
|
||||||
|
import net.corda.core.seconds
|
||||||
import net.corda.core.serialization.KryoPoolWithContext
|
import net.corda.core.serialization.KryoPoolWithContext
|
||||||
import net.corda.core.utilities.LazyStickyPool
|
import net.corda.core.utilities.LazyStickyPool
|
||||||
import net.corda.core.utilities.LifeCycle
|
import net.corda.core.utilities.LifeCycle
|
||||||
@ -40,6 +41,7 @@ import rx.Observable
|
|||||||
import rx.Subscriber
|
import rx.Subscriber
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
import java.lang.reflect.InvocationTargetException
|
import java.lang.reflect.InvocationTargetException
|
||||||
|
import java.time.Duration
|
||||||
import java.util.concurrent.ExecutorService
|
import java.util.concurrent.ExecutorService
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
import java.util.concurrent.ScheduledFuture
|
import java.util.concurrent.ScheduledFuture
|
||||||
@ -52,15 +54,15 @@ data class RPCServerConfiguration(
|
|||||||
val consumerPoolSize: Int,
|
val consumerPoolSize: Int,
|
||||||
/** The maximum number of producers to create to handle outgoing messages */
|
/** The maximum number of producers to create to handle outgoing messages */
|
||||||
val producerPoolBound: Int,
|
val producerPoolBound: Int,
|
||||||
/** The interval of subscription reaping in milliseconds */
|
/** The interval of subscription reaping */
|
||||||
val reapIntervalMs: Long
|
val reapInterval: Duration
|
||||||
) {
|
) {
|
||||||
companion object {
|
companion object {
|
||||||
val default = RPCServerConfiguration(
|
val default = RPCServerConfiguration(
|
||||||
rpcThreadPoolSize = 4,
|
rpcThreadPoolSize = 4,
|
||||||
consumerPoolSize = 2,
|
consumerPoolSize = 2,
|
||||||
producerPoolBound = 4,
|
producerPoolBound = 4,
|
||||||
reapIntervalMs = 1000
|
reapInterval = 1.seconds
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -138,8 +140,8 @@ class RPCServer(
|
|||||||
log.info("Starting RPC server with configuration $rpcConfiguration")
|
log.info("Starting RPC server with configuration $rpcConfiguration")
|
||||||
reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate(
|
reaperScheduledFuture = reaperExecutor.scheduleAtFixedRate(
|
||||||
this::reapSubscriptions,
|
this::reapSubscriptions,
|
||||||
rpcConfiguration.reapIntervalMs,
|
rpcConfiguration.reapInterval.toMillis(),
|
||||||
rpcConfiguration.reapIntervalMs,
|
rpcConfiguration.reapInterval.toMillis(),
|
||||||
TimeUnit.MILLISECONDS
|
TimeUnit.MILLISECONDS
|
||||||
)
|
)
|
||||||
val sessions = ArrayList<ClientSession>()
|
val sessions = ArrayList<ClientSession>()
|
||||||
@ -274,6 +276,11 @@ class RPCServer(
|
|||||||
|
|
||||||
@JvmField
|
@JvmField
|
||||||
internal val CURRENT_RPC_CONTEXT: ThreadLocal<RpcContext> = ThreadLocal()
|
internal val CURRENT_RPC_CONTEXT: ThreadLocal<RpcContext> = ThreadLocal()
|
||||||
|
/**
|
||||||
|
* Returns a context specific to the current RPC call. Note that trying to call this function outside of an RPC will
|
||||||
|
* throw. If you'd like to use the context outside of the call (e.g. in another thread) then pass the returned reference
|
||||||
|
* around explicitly.
|
||||||
|
*/
|
||||||
fun getRpcContext(): RpcContext = CURRENT_RPC_CONTEXT.get()
|
fun getRpcContext(): RpcContext = CURRENT_RPC_CONTEXT.get()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Reference in New Issue
Block a user