mirror of
https://github.com/corda/corda.git
synced 2025-06-17 14:48:16 +00:00
ENT-2023 Minimise code diff between OS and ENT for enterprise change since code diff during merge resulted in a bad merge relating to rpcThreadPoolSize. (#3306)
This commit is contained in:
@ -34,8 +34,17 @@ import net.corda.node.serialization.kryo.KryoServerSerializationScheme
|
|||||||
import net.corda.node.services.Permissions
|
import net.corda.node.services.Permissions
|
||||||
import net.corda.node.services.api.NodePropertiesStore
|
import net.corda.node.services.api.NodePropertiesStore
|
||||||
import net.corda.node.services.api.SchemaService
|
import net.corda.node.services.api.SchemaService
|
||||||
import net.corda.node.services.config.*
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.messaging.*
|
import net.corda.node.services.config.SecurityConfiguration
|
||||||
|
import net.corda.node.services.config.VerifierType
|
||||||
|
import net.corda.node.services.config.shouldInitCrashShell
|
||||||
|
import net.corda.node.services.config.shouldStartLocalShell
|
||||||
|
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||||
|
import net.corda.node.services.messaging.InternalRPCMessagingClient
|
||||||
|
import net.corda.node.services.messaging.MessagingService
|
||||||
|
import net.corda.node.services.messaging.P2PMessagingClient
|
||||||
|
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||||
|
import net.corda.node.services.messaging.VerifierMessagingClient
|
||||||
import net.corda.node.services.rpc.ArtemisRpcBroker
|
import net.corda.node.services.rpc.ArtemisRpcBroker
|
||||||
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
import net.corda.node.services.transactions.InMemoryTransactionVerifierService
|
||||||
import net.corda.node.utilities.AddressUtils
|
import net.corda.node.utilities.AddressUtils
|
||||||
@ -48,7 +57,11 @@ import net.corda.nodeapi.internal.bridging.BridgeControlListener
|
|||||||
import net.corda.nodeapi.internal.config.User
|
import net.corda.nodeapi.internal.config.User
|
||||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||||
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
import net.corda.nodeapi.internal.persistence.CordaPersistence
|
||||||
import net.corda.serialization.internal.*
|
import net.corda.serialization.internal.AMQP_P2P_CONTEXT
|
||||||
|
import net.corda.serialization.internal.AMQP_RPC_CLIENT_CONTEXT
|
||||||
|
import net.corda.serialization.internal.AMQP_RPC_SERVER_CONTEXT
|
||||||
|
import net.corda.serialization.internal.AMQP_STORAGE_CONTEXT
|
||||||
|
import net.corda.serialization.internal.SerializationFactoryImpl
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import rx.Scheduler
|
import rx.Scheduler
|
||||||
@ -193,8 +206,9 @@ open class Node(configuration: NodeConfiguration,
|
|||||||
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
|
bridgeControlListener = BridgeControlListener(configuration, serverAddress, networkParameters.maxMessageSize)
|
||||||
|
|
||||||
printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString())
|
printBasicNodeInfo("Advertised P2P messaging addresses", info.addresses.joinToString())
|
||||||
|
val rpcServerConfiguration = RPCServerConfiguration.default
|
||||||
rpcServerAddresses?.let {
|
rpcServerAddresses?.let {
|
||||||
internalRpcMessagingClient = InternalRPCMessagingClient(configuration, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal))
|
internalRpcMessagingClient = InternalRPCMessagingClient(configuration, it.admin, MAX_RPC_MESSAGE_SIZE, CordaX500Name.build(configuration.loadSslKeyStore().getCertificate(X509Utilities.CORDA_CLIENT_TLS).subjectX500Principal), rpcServerConfiguration)
|
||||||
printBasicNodeInfo("RPC connection address", it.primary.toString())
|
printBasicNodeInfo("RPC connection address", it.primary.toString())
|
||||||
printBasicNodeInfo("RPC admin connection address", it.admin.toString())
|
printBasicNodeInfo("RPC admin connection address", it.admin.toString())
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,7 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
|||||||
/**
|
/**
|
||||||
* Used by the Node to communicate with the RPC broker.
|
* Used by the Node to communicate with the RPC broker.
|
||||||
*/
|
*/
|
||||||
class InternalRPCMessagingClient(val sslConfig: SSLConfiguration, val serverAddress: NetworkHostAndPort, val maxMessageSize: Int, val nodeName: CordaX500Name) : SingletonSerializeAsToken(), AutoCloseable {
|
class InternalRPCMessagingClient(val sslConfig: SSLConfiguration, val serverAddress: NetworkHostAndPort, val maxMessageSize: Int, val nodeName: CordaX500Name, val rpcServerConfiguration: RPCServerConfiguration) : SingletonSerializeAsToken(), AutoCloseable {
|
||||||
private var locator: ServerLocator? = null
|
private var locator: ServerLocator? = null
|
||||||
private var rpcServer: RPCServer? = null
|
private var rpcServer: RPCServer? = null
|
||||||
|
|
||||||
@ -32,7 +32,7 @@ class InternalRPCMessagingClient(val sslConfig: SSLConfiguration, val serverAddr
|
|||||||
isUseGlobalPools = nodeSerializationEnv != null
|
isUseGlobalPools = nodeSerializationEnv != null
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcServer = RPCServer(rpcOps, NODE_RPC_USER, NODE_RPC_USER, locator!!, securityManager, nodeName)
|
rpcServer = RPCServer(rpcOps, NODE_RPC_USER, NODE_RPC_USER, locator!!, securityManager, nodeName, rpcServerConfiguration)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun start(serverControl: ActiveMQServerControl) = synchronized(this) {
|
fun start(serverControl: ActiveMQServerControl) = synchronized(this) {
|
||||||
|
@ -18,11 +18,15 @@ import net.corda.core.serialization.SerializationContext
|
|||||||
import net.corda.core.serialization.SerializationDefaults
|
import net.corda.core.serialization.SerializationDefaults
|
||||||
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
|
import net.corda.core.serialization.SerializationDefaults.RPC_SERVER_CONTEXT
|
||||||
import net.corda.core.serialization.deserialize
|
import net.corda.core.serialization.deserialize
|
||||||
import net.corda.core.utilities.*
|
import net.corda.core.utilities.Try
|
||||||
|
import net.corda.core.utilities.contextLogger
|
||||||
|
import net.corda.core.utilities.days
|
||||||
|
import net.corda.core.utilities.debug
|
||||||
|
import net.corda.core.utilities.seconds
|
||||||
import net.corda.node.internal.security.AuthorizingSubject
|
import net.corda.node.internal.security.AuthorizingSubject
|
||||||
import net.corda.node.internal.security.RPCSecurityManager
|
import net.corda.node.internal.security.RPCSecurityManager
|
||||||
import net.corda.node.services.logging.pushToLoggingContext
|
|
||||||
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
|
import net.corda.node.serialization.amqp.RpcServerObservableSerializer
|
||||||
|
import net.corda.node.services.logging.pushToLoggingContext
|
||||||
import net.corda.nodeapi.RPCApi
|
import net.corda.nodeapi.RPCApi
|
||||||
import net.corda.nodeapi.externalTrace
|
import net.corda.nodeapi.externalTrace
|
||||||
import net.corda.nodeapi.impersonatedActor
|
import net.corda.nodeapi.impersonatedActor
|
||||||
@ -32,8 +36,13 @@ import net.corda.nodeapi.internal.persistence.contextDatabase
|
|||||||
import net.corda.nodeapi.internal.persistence.contextDatabaseOrNull
|
import net.corda.nodeapi.internal.persistence.contextDatabaseOrNull
|
||||||
import org.apache.activemq.artemis.api.core.Message
|
import org.apache.activemq.artemis.api.core.Message
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.*
|
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientMessage
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientProducer
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSession
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory
|
||||||
|
import org.apache.activemq.artemis.api.core.client.ServerLocator
|
||||||
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
|
||||||
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
|
import org.apache.activemq.artemis.api.core.management.CoreNotificationType
|
||||||
import org.apache.activemq.artemis.api.core.management.ManagementHelper
|
import org.apache.activemq.artemis.api.core.management.ManagementHelper
|
||||||
@ -43,7 +52,12 @@ import java.lang.reflect.InvocationTargetException
|
|||||||
import java.lang.reflect.Method
|
import java.lang.reflect.Method
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import java.util.concurrent.*
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue
|
||||||
|
import java.util.concurrent.ScheduledExecutorService
|
||||||
|
import java.util.concurrent.ScheduledFuture
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
import kotlin.concurrent.thread
|
import kotlin.concurrent.thread
|
||||||
|
|
||||||
private typealias ObservableSubscriptionMap = Cache<InvocationId, ObservableSubscription>
|
private typealias ObservableSubscriptionMap = Cache<InvocationId, ObservableSubscription>
|
||||||
@ -80,7 +94,7 @@ class RPCServer(
|
|||||||
private val serverLocator: ServerLocator,
|
private val serverLocator: ServerLocator,
|
||||||
private val securityManager: RPCSecurityManager,
|
private val securityManager: RPCSecurityManager,
|
||||||
private val nodeLegalName: CordaX500Name,
|
private val nodeLegalName: CordaX500Name,
|
||||||
private val rpcConfiguration: RPCServerConfiguration = RPCServerConfiguration.default
|
private val rpcConfiguration: RPCServerConfiguration
|
||||||
) {
|
) {
|
||||||
private companion object {
|
private companion object {
|
||||||
private val log = contextLogger()
|
private val log = contextLogger()
|
||||||
|
@ -5,6 +5,7 @@ import net.corda.client.rpc.internal.RPCClient
|
|||||||
import net.corda.core.context.AuthServiceId
|
import net.corda.core.context.AuthServiceId
|
||||||
import net.corda.core.identity.CordaX500Name
|
import net.corda.core.identity.CordaX500Name
|
||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
|
import net.corda.core.messaging.ClientRpcSslOptions
|
||||||
import net.corda.core.messaging.RPCOps
|
import net.corda.core.messaging.RPCOps
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.internal.artemis.ArtemisBroker
|
import net.corda.node.internal.artemis.ArtemisBroker
|
||||||
@ -12,9 +13,9 @@ import net.corda.node.internal.security.RPCSecurityManager
|
|||||||
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
import net.corda.node.internal.security.RPCSecurityManagerImpl
|
||||||
import net.corda.node.services.Permissions.Companion.all
|
import net.corda.node.services.Permissions.Companion.all
|
||||||
import net.corda.node.services.messaging.InternalRPCMessagingClient
|
import net.corda.node.services.messaging.InternalRPCMessagingClient
|
||||||
|
import net.corda.node.services.messaging.RPCServerConfiguration
|
||||||
import net.corda.nodeapi.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
|
import net.corda.nodeapi.ArtemisTcpTransport.Companion.rpcConnectorTcpTransport
|
||||||
import net.corda.nodeapi.BrokerRpcSslOptions
|
import net.corda.nodeapi.BrokerRpcSslOptions
|
||||||
import net.corda.core.messaging.ClientRpcSslOptions
|
|
||||||
import net.corda.nodeapi.internal.config.SSLConfiguration
|
import net.corda.nodeapi.internal.config.SSLConfiguration
|
||||||
import net.corda.nodeapi.internal.config.User
|
import net.corda.nodeapi.internal.config.User
|
||||||
import net.corda.testing.core.SerializationEnvironmentRule
|
import net.corda.testing.core.SerializationEnvironmentRule
|
||||||
@ -104,7 +105,7 @@ class ArtemisRpcTests {
|
|||||||
}
|
}
|
||||||
artemisBroker.use { broker ->
|
artemisBroker.use { broker ->
|
||||||
broker.start()
|
broker.start()
|
||||||
InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB")).use { server ->
|
InternalRPCMessagingClient(nodeSSlconfig, adminAddress, maxMessageSize, CordaX500Name("MegaCorp", "London", "GB"), RPCServerConfiguration.default).use { server ->
|
||||||
server.start(TestRpcOpsImpl(), securityManager, broker.serverControl)
|
server.start(TestRpcOpsImpl(), securityManager, broker.serverControl)
|
||||||
|
|
||||||
val client = RPCClient<TestRpcOps>(rpcConnectorTcpTransport(broker.addresses.primary, clientSslOptions))
|
val client = RPCClient<TestRpcOps>(rpcConnectorTcpTransport(broker.addresses.primary, clientSslOptions))
|
||||||
|
Reference in New Issue
Block a user