mirror of
https://github.com/corda/corda.git
synced 2025-06-01 15:10:54 +00:00
Merge up of cleanup of AMQP protocol code
This commit is contained in:
parent
15d868b113
commit
0d18bf8e33
@ -22,11 +22,11 @@ import net.corda.core.internal.div
|
|||||||
import net.corda.core.internal.readAll
|
import net.corda.core.internal.readAll
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEERS_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
|
||||||
import net.corda.nodeapi.internal.crypto.X509KeyStore
|
import net.corda.nodeapi.internal.crypto.X509KeyStore
|
||||||
import net.corda.nodeapi.internal.crypto.X509Utilities
|
import net.corda.nodeapi.internal.crypto.X509Utilities
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||||
import net.corda.testing.core.ALICE_NAME
|
import net.corda.testing.core.ALICE_NAME
|
||||||
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
import net.corda.testing.core.DUMMY_BANK_A_NAME
|
||||||
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
import net.corda.testing.core.DUMMY_BANK_B_NAME
|
||||||
@ -35,6 +35,7 @@ import org.junit.Assert.assertArrayEquals
|
|||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.junit.rules.TemporaryFolder
|
import org.junit.rules.TemporaryFolder
|
||||||
|
import java.security.KeyStore
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
class AMQPListenerTest {
|
class AMQPListenerTest {
|
||||||
@ -87,16 +88,17 @@ class AMQPListenerTest {
|
|||||||
clientConfig.createBridgeKeyStores(DUMMY_BANK_B_NAME)
|
clientConfig.createBridgeKeyStores(DUMMY_BANK_B_NAME)
|
||||||
val clientKeyStore = clientConfig.loadSslKeyStore().internal
|
val clientKeyStore = clientConfig.loadSslKeyStore().internal
|
||||||
val clientTrustStore = clientConfig.loadTrustStore().internal
|
val clientTrustStore = clientConfig.loadTrustStore().internal
|
||||||
|
val amqpConfig = object : AMQPConfiguration {
|
||||||
|
override val keyStore: KeyStore = clientKeyStore
|
||||||
|
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||||
|
override val trustStore: KeyStore = clientTrustStore
|
||||||
|
override val maxMessageSize: Int = maxMessageSize
|
||||||
|
override val trace: Boolean = true
|
||||||
|
}
|
||||||
// create and connect a real client
|
// create and connect a real client
|
||||||
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)),
|
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)),
|
||||||
setOf(DUMMY_BANK_A_NAME),
|
setOf(DUMMY_BANK_A_NAME),
|
||||||
PEER_USER,
|
amqpConfig)
|
||||||
PEER_USER,
|
|
||||||
clientKeyStore,
|
|
||||||
clientConfig.keyStorePassword,
|
|
||||||
clientTrustStore,
|
|
||||||
true,
|
|
||||||
maxMessageSize = maxMessageSize)
|
|
||||||
|
|
||||||
amqpClient.start()
|
amqpClient.start()
|
||||||
// Should see events to show we got a valid connection
|
// Should see events to show we got a valid connection
|
||||||
@ -158,16 +160,17 @@ class AMQPListenerTest {
|
|||||||
clientKeyStore.setPrivateKey("TLS_CERT", clientKeys.private, listOf(clientCert))
|
clientKeyStore.setPrivateKey("TLS_CERT", clientKeys.private, listOf(clientCert))
|
||||||
val clientTrustStore = X509KeyStore("password")
|
val clientTrustStore = X509KeyStore("password")
|
||||||
clientTrustStore.setCertificate("TLS_ROOT", clientCert)
|
clientTrustStore.setCertificate("TLS_ROOT", clientCert)
|
||||||
|
val amqpConfig = object : AMQPConfiguration {
|
||||||
|
override val keyStore: KeyStore = clientKeyStore.internal
|
||||||
|
override val keyStorePrivateKeyPassword: CharArray = "password".toCharArray()
|
||||||
|
override val trustStore: KeyStore = clientTrustStore.internal
|
||||||
|
override val maxMessageSize: Int = maxMessageSize
|
||||||
|
override val trace: Boolean = true
|
||||||
|
}
|
||||||
// create and connect a real client
|
// create and connect a real client
|
||||||
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)),
|
val amqpClient = AMQPClient(listOf(NetworkHostAndPort("localhost", 10005)),
|
||||||
setOf(DUMMY_BANK_A_NAME),
|
setOf(DUMMY_BANK_A_NAME),
|
||||||
PEER_USER,
|
amqpConfig)
|
||||||
PEER_USER,
|
|
||||||
clientKeyStore.internal,
|
|
||||||
"password",
|
|
||||||
clientTrustStore.internal,
|
|
||||||
true,
|
|
||||||
maxMessageSize = maxMessageSize)
|
|
||||||
amqpClient.start()
|
amqpClient.start()
|
||||||
val connectionEvent = connectionFollower.next()
|
val connectionEvent = connectionFollower.next()
|
||||||
assertEquals(false, connectionEvent.connected)
|
assertEquals(false, connectionEvent.connected)
|
||||||
|
@ -17,9 +17,9 @@ import net.corda.bridge.services.api.ServiceStateSupport
|
|||||||
import net.corda.bridge.services.util.ServiceStateCombiner
|
import net.corda.bridge.services.util.ServiceStateCombiner
|
||||||
import net.corda.bridge.services.util.ServiceStateHelper
|
import net.corda.bridge.services.util.ServiceStateHelper
|
||||||
import net.corda.core.utilities.contextLogger
|
import net.corda.core.utilities.contextLogger
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
|
||||||
import net.corda.nodeapi.internal.crypto.KEYSTORE_TYPE
|
import net.corda.nodeapi.internal.crypto.KEYSTORE_TYPE
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
@ -31,7 +31,7 @@ import java.security.KeyStore
|
|||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
|
class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
|
||||||
val maxMessageSize: Int,
|
val maximumMessageSize: Int,
|
||||||
val auditService: BridgeAuditService,
|
val auditService: BridgeAuditService,
|
||||||
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeAMQPListenerService, ServiceStateSupport by stateHelper {
|
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : BridgeAMQPListenerService, ServiceStateSupport by stateHelper {
|
||||||
companion object {
|
companion object {
|
||||||
@ -61,16 +61,17 @@ class BridgeAMQPListenerServiceImpl(val conf: BridgeConfiguration,
|
|||||||
val keyStore = loadKeyStoreAndWipeKeys(keyStoreBytes, keyStorePassword)
|
val keyStore = loadKeyStoreAndWipeKeys(keyStoreBytes, keyStorePassword)
|
||||||
val trustStore = loadKeyStoreAndWipeKeys(trustStoreBytes, trustStorePassword)
|
val trustStore = loadKeyStoreAndWipeKeys(trustStoreBytes, trustStorePassword)
|
||||||
val bindAddress = conf.inboundConfig!!.listeningAddress
|
val bindAddress = conf.inboundConfig!!.listeningAddress
|
||||||
|
val amqpConfiguration = object : AMQPConfiguration {
|
||||||
|
override val keyStore: KeyStore = keyStore
|
||||||
|
override val keyStorePrivateKeyPassword: CharArray = keyStorePrivateKeyPassword
|
||||||
|
override val trustStore: KeyStore = trustStore
|
||||||
|
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
|
||||||
|
override val maxMessageSize: Int = maximumMessageSize
|
||||||
|
override val trace: Boolean = conf.enableAMQPPacketTrace
|
||||||
|
}
|
||||||
val server = AMQPServer(bindAddress.host,
|
val server = AMQPServer(bindAddress.host,
|
||||||
bindAddress.port,
|
bindAddress.port,
|
||||||
PEER_USER,
|
amqpConfiguration)
|
||||||
PEER_USER,
|
|
||||||
keyStore,
|
|
||||||
keyStorePrivateKeyPassword,
|
|
||||||
trustStore,
|
|
||||||
conf.crlCheckSoftFail,
|
|
||||||
maxMessageSize,
|
|
||||||
conf.enableAMQPPacketTrace)
|
|
||||||
onConnectSubscription = server.onConnection.subscribe(_onConnection)
|
onConnectSubscription = server.onConnection.subscribe(_onConnection)
|
||||||
onConnectAuditSubscription = server.onConnection.subscribe({
|
onConnectAuditSubscription = server.onConnection.subscribe({
|
||||||
if (it.connected) {
|
if (it.connected) {
|
||||||
|
@ -24,6 +24,7 @@ import net.corda.core.utilities.contextLogger
|
|||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
@ -33,7 +34,7 @@ import kotlin.concurrent.withLock
|
|||||||
|
|
||||||
|
|
||||||
class FloatControlListenerService(val conf: BridgeConfiguration,
|
class FloatControlListenerService(val conf: BridgeConfiguration,
|
||||||
val maxMessageSize: Int,
|
val maximumMessageSize: Int,
|
||||||
val auditService: BridgeAuditService,
|
val auditService: BridgeAuditService,
|
||||||
val amqpListener: BridgeAMQPListenerService,
|
val amqpListener: BridgeAMQPListenerService,
|
||||||
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatControlService, ServiceStateSupport by stateHelper {
|
private val stateHelper: ServiceStateHelper = ServiceStateHelper(log)) : FloatControlService, ServiceStateSupport by stateHelper {
|
||||||
@ -49,9 +50,6 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
|
|||||||
private var receiveSubscriber: Subscription? = null
|
private var receiveSubscriber: Subscription? = null
|
||||||
private var amqpControlServer: AMQPServer? = null
|
private var amqpControlServer: AMQPServer? = null
|
||||||
private val sslConfiguration: BridgeSSLConfiguration
|
private val sslConfiguration: BridgeSSLConfiguration
|
||||||
private val keyStore: KeyStore
|
|
||||||
private val keyStorePrivateKeyPassword: String
|
|
||||||
private val trustStore: KeyStore
|
|
||||||
private val floatControlAddress = conf.floatOuterConfig!!.floatAddress
|
private val floatControlAddress = conf.floatOuterConfig!!.floatAddress
|
||||||
private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject
|
private val floatClientName = conf.floatOuterConfig!!.expectedCertificateSubject
|
||||||
private var activeConnectionInfo: ConnectionChange? = null
|
private var activeConnectionInfo: ConnectionChange? = null
|
||||||
@ -61,9 +59,6 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
|
|||||||
init {
|
init {
|
||||||
statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener))
|
statusFollower = ServiceStateCombiner(listOf(auditService, amqpListener))
|
||||||
sslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: BridgeSSLConfigurationImpl(conf)
|
sslConfiguration = conf.floatOuterConfig?.customSSLConfiguration ?: BridgeSSLConfigurationImpl(conf)
|
||||||
keyStore = sslConfiguration.loadSslKeyStore().internal
|
|
||||||
keyStorePrivateKeyPassword = sslConfiguration.keyStorePassword
|
|
||||||
trustStore = sslConfiguration.loadTrustStore().internal
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -83,16 +78,22 @@ class FloatControlListenerService(val conf: BridgeConfiguration,
|
|||||||
|
|
||||||
private fun startControlListener() {
|
private fun startControlListener() {
|
||||||
lock.withLock {
|
lock.withLock {
|
||||||
|
val keyStore = sslConfiguration.loadSslKeyStore().internal
|
||||||
|
val keyStorePrivateKeyPassword = sslConfiguration.keyStorePassword
|
||||||
|
val trustStore = sslConfiguration.loadTrustStore().internal
|
||||||
|
val amqpConfig = object : AMQPConfiguration {
|
||||||
|
override val userName: String? = null
|
||||||
|
override val password: String? = null
|
||||||
|
override val keyStore: KeyStore = keyStore
|
||||||
|
override val keyStorePrivateKeyPassword: CharArray = keyStorePrivateKeyPassword.toCharArray()
|
||||||
|
override val trustStore: KeyStore = trustStore
|
||||||
|
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
|
||||||
|
override val maxMessageSize: Int = maximumMessageSize
|
||||||
|
override val trace: Boolean = conf.enableAMQPPacketTrace
|
||||||
|
}
|
||||||
val controlServer = AMQPServer(floatControlAddress.host,
|
val controlServer = AMQPServer(floatControlAddress.host,
|
||||||
floatControlAddress.port,
|
floatControlAddress.port,
|
||||||
null,
|
amqpConfig)
|
||||||
null,
|
|
||||||
keyStore,
|
|
||||||
keyStorePrivateKeyPassword,
|
|
||||||
trustStore,
|
|
||||||
conf.crlCheckSoftFail,
|
|
||||||
maxMessageSize,
|
|
||||||
conf.enableAMQPPacketTrace)
|
|
||||||
connectSubscriber = controlServer.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
|
connectSubscriber = controlServer.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
|
||||||
receiveSubscriber = controlServer.onReceive.subscribe({ onControlMessage(it) }, { log.error("Receive event error", it) })
|
receiveSubscriber = controlServer.onReceive.subscribe({ onControlMessage(it) }, { log.error("Receive event error", it) })
|
||||||
amqpControlServer = controlServer
|
amqpControlServer = controlServer
|
||||||
|
@ -27,6 +27,7 @@ import net.corda.nodeapi.internal.config.SSLConfiguration
|
|||||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
import net.corda.nodeapi.internal.protonwrapper.messages.ReceivedMessage
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
import net.corda.nodeapi.internal.protonwrapper.netty.ConnectionChange
|
||||||
import rx.Subscription
|
import rx.Subscription
|
||||||
import java.io.ByteArrayOutputStream
|
import java.io.ByteArrayOutputStream
|
||||||
@ -36,7 +37,7 @@ import java.util.concurrent.TimeUnit
|
|||||||
import java.util.concurrent.TimeoutException
|
import java.util.concurrent.TimeoutException
|
||||||
|
|
||||||
class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
|
class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
|
||||||
val maxMessageSize: Int,
|
val maximumMessageSize: Int,
|
||||||
val auditService: BridgeAuditService,
|
val auditService: BridgeAuditService,
|
||||||
haService: BridgeMasterService,
|
haService: BridgeMasterService,
|
||||||
val filterService: IncomingMessageFilterService,
|
val filterService: IncomingMessageFilterService,
|
||||||
@ -52,9 +53,6 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
|
|||||||
private var amqpControlClient: AMQPClient? = null
|
private var amqpControlClient: AMQPClient? = null
|
||||||
private val controlLinkSSLConfiguration: SSLConfiguration
|
private val controlLinkSSLConfiguration: SSLConfiguration
|
||||||
private val floatListenerSSLConfiguration: SSLConfiguration
|
private val floatListenerSSLConfiguration: SSLConfiguration
|
||||||
private val controlLinkKeyStore: KeyStore
|
|
||||||
private val controLinkKeyStorePrivateKeyPassword: String
|
|
||||||
private val controlLinkTrustStore: KeyStore
|
|
||||||
private val expectedCertificateSubject: CordaX500Name
|
private val expectedCertificateSubject: CordaX500Name
|
||||||
private val secureRandom: SecureRandom = newSecureRandom()
|
private val secureRandom: SecureRandom = newSecureRandom()
|
||||||
|
|
||||||
@ -62,9 +60,6 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
|
|||||||
statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService))
|
statusFollower = ServiceStateCombiner(listOf(auditService, haService, filterService))
|
||||||
controlLinkSSLConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf
|
controlLinkSSLConfiguration = conf.bridgeInnerConfig?.customSSLConfiguration ?: conf
|
||||||
floatListenerSSLConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf
|
floatListenerSSLConfiguration = conf.bridgeInnerConfig?.customFloatOuterSSLConfiguration ?: conf
|
||||||
controlLinkKeyStore = controlLinkSSLConfiguration.loadSslKeyStore().internal
|
|
||||||
controLinkKeyStorePrivateKeyPassword = controlLinkSSLConfiguration.keyStorePassword
|
|
||||||
controlLinkTrustStore = controlLinkSSLConfiguration.loadTrustStore().internal
|
|
||||||
expectedCertificateSubject = conf.bridgeInnerConfig!!.expectedCertificateSubject
|
expectedCertificateSubject = conf.bridgeInnerConfig!!.expectedCertificateSubject
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,16 +68,23 @@ class TunnelingBridgeReceiverService(val conf: BridgeConfiguration,
|
|||||||
statusSubscriber = statusFollower.activeChange.subscribe({
|
statusSubscriber = statusFollower.activeChange.subscribe({
|
||||||
if (it) {
|
if (it) {
|
||||||
val floatAddresses = conf.bridgeInnerConfig!!.floatAddresses
|
val floatAddresses = conf.bridgeInnerConfig!!.floatAddresses
|
||||||
|
val controlLinkKeyStore = controlLinkSSLConfiguration.loadSslKeyStore().internal
|
||||||
|
val controLinkKeyStorePrivateKeyPassword = controlLinkSSLConfiguration.keyStorePassword
|
||||||
|
val controlLinkTrustStore = controlLinkSSLConfiguration.loadTrustStore().internal
|
||||||
|
val amqpConfig = object : AMQPConfiguration {
|
||||||
|
override val userName: String? = null
|
||||||
|
override val password: String? = null
|
||||||
|
override val keyStore: KeyStore = controlLinkKeyStore
|
||||||
|
override val keyStorePrivateKeyPassword: CharArray = controLinkKeyStorePrivateKeyPassword.toCharArray()
|
||||||
|
override val trustStore: KeyStore = controlLinkTrustStore
|
||||||
|
override val crlCheckSoftFail: Boolean = conf.crlCheckSoftFail
|
||||||
|
override val maxMessageSize: Int = maximumMessageSize
|
||||||
|
override val trace: Boolean = conf.enableAMQPPacketTrace
|
||||||
|
|
||||||
|
}
|
||||||
val controlClient = AMQPClient(floatAddresses,
|
val controlClient = AMQPClient(floatAddresses,
|
||||||
setOf(expectedCertificateSubject),
|
setOf(expectedCertificateSubject),
|
||||||
null,
|
amqpConfig)
|
||||||
null,
|
|
||||||
controlLinkKeyStore,
|
|
||||||
controLinkKeyStorePrivateKeyPassword,
|
|
||||||
controlLinkTrustStore,
|
|
||||||
conf.crlCheckSoftFail,
|
|
||||||
maxMessageSize,
|
|
||||||
conf.enableAMQPPacketTrace)
|
|
||||||
connectSubscriber = controlClient.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
|
connectSubscriber = controlClient.onConnection.subscribe({ onConnectToControl(it) }, { log.error("Connection event error", it) })
|
||||||
receiveSubscriber = controlClient.onReceive.subscribe({ onFloatMessage(it) }, { log.error("Receive event error", it) })
|
receiveSubscriber = controlClient.onReceive.subscribe({ onFloatMessage(it) }, { log.error("Receive event error", it) })
|
||||||
amqpControlClient = controlClient
|
amqpControlClient = controlClient
|
||||||
|
@ -27,8 +27,8 @@ import net.corda.nodeapi.internal.bridging.AMQPBridgeManager.AMQPBridge.Companio
|
|||||||
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
import net.corda.nodeapi.internal.config.NodeSSLConfiguration
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
|
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPConfiguration
|
||||||
|
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
|
||||||
import org.apache.activemq.artemis.api.core.SimpleString
|
import org.apache.activemq.artemis.api.core.SimpleString
|
||||||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
import org.apache.activemq.artemis.api.core.client.ActiveMQClient.DEFAULT_ACK_BATCH_SIZE
|
||||||
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
import org.apache.activemq.artemis.api.core.client.ClientConsumer
|
||||||
@ -48,7 +48,8 @@ import kotlin.concurrent.withLock
|
|||||||
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
* The Netty thread pool used by the AMQPBridges is also shared and managed by the AMQPBridgeManager.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
class AMQPBridgeManager(config: NodeSSLConfiguration, maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
class AMQPBridgeManager(config: NodeSSLConfiguration, socksProxyConfig: SocksProxyConfig? = null,
|
||||||
|
maxMessageSize: Int, private val artemisMessageClientFactory: () -> ArtemisSessionProvider) : BridgeManager {
|
||||||
|
|
||||||
private val lock = ReentrantLock()
|
private val lock = ReentrantLock()
|
||||||
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
private val bridgeNameToBridgeMap = mutableMapOf<String, AMQPBridge>()
|
||||||
@ -56,14 +57,16 @@ class AMQPBridgeManager(config: NodeSSLConfiguration, maxMessageSize: Int, priva
|
|||||||
private class AMQPConfigurationImpl private constructor(override val keyStore: KeyStore,
|
private class AMQPConfigurationImpl private constructor(override val keyStore: KeyStore,
|
||||||
override val keyStorePrivateKeyPassword: CharArray,
|
override val keyStorePrivateKeyPassword: CharArray,
|
||||||
override val trustStore: KeyStore,
|
override val trustStore: KeyStore,
|
||||||
|
override val socksProxyConfig: SocksProxyConfig?,
|
||||||
override val maxMessageSize: Int) : AMQPConfiguration {
|
override val maxMessageSize: Int) : AMQPConfiguration {
|
||||||
constructor(config: NodeSSLConfiguration, maxMessageSize: Int) : this(config.loadSslKeyStore().internal,
|
constructor(config: NodeSSLConfiguration, socksProxyConfig: SocksProxyConfig?, maxMessageSize: Int) : this(config.loadSslKeyStore().internal,
|
||||||
config.keyStorePassword.toCharArray(),
|
config.keyStorePassword.toCharArray(),
|
||||||
config.loadTrustStore().internal,
|
config.loadTrustStore().internal,
|
||||||
|
socksProxyConfig,
|
||||||
maxMessageSize)
|
maxMessageSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, maxMessageSize)
|
private val amqpConfig: AMQPConfiguration = AMQPConfigurationImpl(config, socksProxyConfig, maxMessageSize)
|
||||||
private var sharedEventLoopGroup: EventLoopGroup? = null
|
private var sharedEventLoopGroup: EventLoopGroup? = null
|
||||||
private var artemis: ArtemisSessionProvider? = null
|
private var artemis: ArtemisSessionProvider? = null
|
||||||
|
|
||||||
|
@ -30,8 +30,8 @@ import net.corda.nodeapi.internal.protonwrapper.messages.impl.SendableMessageImp
|
|||||||
import net.corda.nodeapi.internal.requireMessageSize
|
import net.corda.nodeapi.internal.requireMessageSize
|
||||||
import rx.Observable
|
import rx.Observable
|
||||||
import rx.subjects.PublishSubject
|
import rx.subjects.PublishSubject
|
||||||
import java.net.InetSocketAddress
|
|
||||||
import java.lang.Long.min
|
import java.lang.Long.min
|
||||||
|
import java.net.InetSocketAddress
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.locks.ReentrantLock
|
import java.util.concurrent.locks.ReentrantLock
|
||||||
import javax.net.ssl.KeyManagerFactory
|
import javax.net.ssl.KeyManagerFactory
|
||||||
@ -148,10 +148,10 @@ class AMQPClient(val targets: List<NetworkHostAndPort>,
|
|||||||
|
|
||||||
override fun initChannel(ch: SocketChannel) {
|
override fun initChannel(ch: SocketChannel) {
|
||||||
val pipeline = ch.pipeline()
|
val pipeline = ch.pipeline()
|
||||||
val socksConfig = parent.socksProxyConfig
|
val socksConfig = conf.socksProxyConfig
|
||||||
if (socksConfig != null) {
|
if (socksConfig != null) {
|
||||||
val proxyAddress = InetSocketAddress(socksConfig.proxyAddress.host, socksConfig.proxyAddress.port)
|
val proxyAddress = InetSocketAddress(socksConfig.proxyAddress.host, socksConfig.proxyAddress.port)
|
||||||
val proxy = when (parent.socksProxyConfig!!.version) {
|
val proxy = when (conf.socksProxyConfig!!.version) {
|
||||||
SocksProxyVersion.SOCKS4 -> {
|
SocksProxyVersion.SOCKS4 -> {
|
||||||
Socks4ProxyHandler(proxyAddress, socksConfig.userName)
|
Socks4ProxyHandler(proxyAddress, socksConfig.userName)
|
||||||
}
|
}
|
||||||
|
@ -55,5 +55,9 @@ interface AMQPConfiguration {
|
|||||||
* but currently that is deferred to Artemis and the bridge code.
|
* but currently that is deferred to Artemis and the bridge code.
|
||||||
*/
|
*/
|
||||||
val maxMessageSize: Int
|
val maxMessageSize: Int
|
||||||
|
|
||||||
|
@JvmDefault
|
||||||
|
val socksProxyConfig: SocksProxyConfig?
|
||||||
|
get() = null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,10 +16,13 @@ import net.corda.core.crypto.toStringShort
|
|||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.core.utilities.loggerFor
|
import net.corda.core.utilities.loggerFor
|
||||||
|
import net.corda.node.services.config.EnterpriseConfiguration
|
||||||
|
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2PMessagingHeaders
|
||||||
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
|
import net.corda.nodeapi.internal.bridging.AMQPBridgeManager
|
||||||
import net.corda.nodeapi.internal.bridging.BridgeManager
|
import net.corda.nodeapi.internal.bridging.BridgeManager
|
||||||
@ -282,7 +285,6 @@ class AMQPBridgeTest {
|
|||||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||||
doReturn(targetAdress).whenever(it).p2pAddress
|
doReturn(targetAdress).whenever(it).p2pAddress
|
||||||
doReturn("").whenever(it).jmxMonitoringHttpPort
|
doReturn("").whenever(it).jmxMonitoringHttpPort
|
||||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
|
||||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||||
}
|
}
|
||||||
artemisConfig.configureWithDevSSLCertificate()
|
artemisConfig.configureWithDevSSLCertificate()
|
||||||
|
@ -18,6 +18,8 @@ import net.corda.core.identity.CordaX500Name
|
|||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.toFuture
|
import net.corda.core.toFuture
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
|
import net.corda.node.services.config.EnterpriseConfiguration
|
||||||
|
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||||
import net.corda.node.services.config.NodeConfiguration
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
import net.corda.node.services.config.configureWithDevSSLCertificate
|
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||||
|
@ -24,16 +24,15 @@ import net.corda.core.identity.CordaX500Name
|
|||||||
import net.corda.core.internal.div
|
import net.corda.core.internal.div
|
||||||
import net.corda.core.toFuture
|
import net.corda.core.toFuture
|
||||||
import net.corda.core.utilities.NetworkHostAndPort
|
import net.corda.core.utilities.NetworkHostAndPort
|
||||||
import net.corda.node.services.config.*
|
import net.corda.node.services.config.EnterpriseConfiguration
|
||||||
|
import net.corda.node.services.config.MutualExclusionConfiguration
|
||||||
|
import net.corda.node.services.config.NodeConfiguration
|
||||||
|
import net.corda.node.services.config.configureWithDevSSLCertificate
|
||||||
import net.corda.node.services.messaging.ArtemisMessagingServer
|
import net.corda.node.services.messaging.ArtemisMessagingServer
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
import net.corda.nodeapi.internal.ArtemisMessagingClient
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.P2P_PREFIX
|
||||||
import net.corda.nodeapi.internal.ArtemisMessagingComponent.Companion.PEER_USER
|
|
||||||
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
import net.corda.nodeapi.internal.protonwrapper.messages.MessageStatus
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPClient
|
import net.corda.nodeapi.internal.protonwrapper.netty.*
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.AMQPServer
|
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyConfig
|
|
||||||
import net.corda.nodeapi.internal.protonwrapper.netty.SocksProxyVersion
|
|
||||||
import net.corda.testing.core.*
|
import net.corda.testing.core.*
|
||||||
import net.corda.testing.internal.rigorousMock
|
import net.corda.testing.internal.rigorousMock
|
||||||
import org.apache.activemq.artemis.api.core.RoutingType
|
import org.apache.activemq.artemis.api.core.RoutingType
|
||||||
@ -43,6 +42,7 @@ import org.junit.Before
|
|||||||
import org.junit.Rule
|
import org.junit.Rule
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.junit.rules.TemporaryFolder
|
import org.junit.rules.TemporaryFolder
|
||||||
|
import java.security.KeyStore
|
||||||
import kotlin.test.assertEquals
|
import kotlin.test.assertEquals
|
||||||
|
|
||||||
class SocksTests {
|
class SocksTests {
|
||||||
@ -281,7 +281,6 @@ class SocksTests {
|
|||||||
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
doReturn("cordacadevpass").whenever(it).keyStorePassword
|
||||||
doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress
|
doReturn(NetworkHostAndPort("0.0.0.0", artemisPort)).whenever(it).p2pAddress
|
||||||
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
doReturn(null).whenever(it).jmxMonitoringHttpPort
|
||||||
doReturn(emptyList<CertChainPolicyConfig>()).whenever(it).certificateChainCheckPolicies
|
|
||||||
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
doReturn(EnterpriseConfiguration(MutualExclusionConfiguration(false, "", 20000, 40000))).whenever(it).enterpriseConfiguration
|
||||||
}
|
}
|
||||||
artemisConfig.configureWithDevSSLCertificate()
|
artemisConfig.configureWithDevSSLCertificate()
|
||||||
@ -304,20 +303,20 @@ class SocksTests {
|
|||||||
|
|
||||||
val clientTruststore = clientConfig.loadTrustStore().internal
|
val clientTruststore = clientConfig.loadTrustStore().internal
|
||||||
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
||||||
|
val amqpConfig = object : AMQPConfiguration {
|
||||||
|
override val keyStore: KeyStore = clientKeystore
|
||||||
|
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||||
|
override val trustStore: KeyStore = clientTruststore
|
||||||
|
override val trace: Boolean = true
|
||||||
|
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||||
|
override val socksProxyConfig: SocksProxyConfig? = SocksProxyConfig(SocksProxyVersion.SOCKS5, NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
||||||
|
}
|
||||||
return AMQPClient(
|
return AMQPClient(
|
||||||
listOf(NetworkHostAndPort("localhost", serverPort),
|
listOf(NetworkHostAndPort("localhost", serverPort),
|
||||||
NetworkHostAndPort("localhost", serverPort2),
|
NetworkHostAndPort("localhost", serverPort2),
|
||||||
NetworkHostAndPort("localhost", artemisPort)),
|
NetworkHostAndPort("localhost", artemisPort)),
|
||||||
setOf(ALICE_NAME, CHARLIE_NAME),
|
setOf(ALICE_NAME, CHARLIE_NAME),
|
||||||
PEER_USER,
|
amqpConfig)
|
||||||
PEER_USER,
|
|
||||||
clientKeystore,
|
|
||||||
clientConfig.keyStorePassword,
|
|
||||||
clientTruststore, true,
|
|
||||||
MAX_MESSAGE_SIZE,
|
|
||||||
socksProxyConfig = SocksProxyConfig(SocksProxyVersion.SOCKS5,
|
|
||||||
NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createSharedThreadsClient(sharedEventGroup: EventLoopGroup, id: Int): AMQPClient {
|
private fun createSharedThreadsClient(sharedEventGroup: EventLoopGroup, id: Int): AMQPClient {
|
||||||
@ -331,21 +330,20 @@ class SocksTests {
|
|||||||
|
|
||||||
val clientTruststore = clientConfig.loadTrustStore().internal
|
val clientTruststore = clientConfig.loadTrustStore().internal
|
||||||
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
val clientKeystore = clientConfig.loadSslKeyStore().internal
|
||||||
|
val amqpConfig = object : AMQPConfiguration {
|
||||||
|
override val keyStore: KeyStore = clientKeystore
|
||||||
|
override val keyStorePrivateKeyPassword: CharArray = clientConfig.keyStorePassword.toCharArray()
|
||||||
|
override val trustStore: KeyStore = clientTruststore
|
||||||
|
override val trace: Boolean = true
|
||||||
|
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||||
|
override val socksProxyConfig: SocksProxyConfig? = SocksProxyConfig(SocksProxyVersion.SOCKS5, NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
||||||
|
}
|
||||||
|
|
||||||
return AMQPClient(
|
return AMQPClient(
|
||||||
listOf(NetworkHostAndPort("localhost", serverPort)),
|
listOf(NetworkHostAndPort("localhost", serverPort)),
|
||||||
setOf(ALICE_NAME),
|
setOf(ALICE_NAME),
|
||||||
PEER_USER,
|
amqpConfig,
|
||||||
PEER_USER,
|
sharedEventGroup)
|
||||||
clientKeystore,
|
|
||||||
clientConfig.keyStorePassword,
|
|
||||||
clientTruststore,
|
|
||||||
true,
|
|
||||||
MAX_MESSAGE_SIZE,
|
|
||||||
true,
|
|
||||||
sharedEventGroup,
|
|
||||||
socksProxyConfig = SocksProxyConfig(SocksProxyVersion.SOCKS5,
|
|
||||||
NetworkHostAndPort("127.0.0.1", socksPort), null, null)
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME): AMQPServer {
|
private fun createServer(port: Int, name: CordaX500Name = ALICE_NAME): AMQPServer {
|
||||||
@ -359,15 +357,16 @@ class SocksTests {
|
|||||||
|
|
||||||
val serverTruststore = serverConfig.loadTrustStore().internal
|
val serverTruststore = serverConfig.loadTrustStore().internal
|
||||||
val serverKeystore = serverConfig.loadSslKeyStore().internal
|
val serverKeystore = serverConfig.loadSslKeyStore().internal
|
||||||
|
val amqpConfig = object : AMQPConfiguration {
|
||||||
|
override val keyStore: KeyStore = serverKeystore
|
||||||
|
override val keyStorePrivateKeyPassword: CharArray = serverConfig.keyStorePassword.toCharArray()
|
||||||
|
override val trustStore: KeyStore = serverTruststore
|
||||||
|
override val trace: Boolean = true
|
||||||
|
override val maxMessageSize: Int = MAX_MESSAGE_SIZE
|
||||||
|
}
|
||||||
return AMQPServer(
|
return AMQPServer(
|
||||||
"0.0.0.0",
|
"0.0.0.0",
|
||||||
port,
|
port,
|
||||||
PEER_USER,
|
amqpConfig)
|
||||||
PEER_USER,
|
|
||||||
serverKeystore,
|
|
||||||
serverConfig.keyStorePassword,
|
|
||||||
serverTruststore,
|
|
||||||
true,
|
|
||||||
MAX_MESSAGE_SIZE)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user